Recently I got the chance to help a customer moving from a Helm based deployment of Apache Kafka in Kubernetes to the Strimzi, which manages the deployment of Kafka in Kubernetes with an operator. In the final phase, I had to transfer the data to the newly deployed Apache Kafka.

Setting the scope

The customer is using compacted topics (cleanup.policy=compact and log.rentention.ms=-1) as the default configuration for all topics in Apache Kafka, so they’re using it somehow like a database where messages are produced with an unique ID and are nulled afterwards when it is required to delete a message.

I’ve used Kafka Mirror Maker 2 and the appropriate resources provided by Strimzi KafkaMirrorMaker2 to transfer the data. The setup and configuration of the MirrorMaker2 resource was quite intuitive and self-explanatory. But we were missing messages based on the total count between the original and the mirrored topic. This was quite annoying, and there was not an obvious pattern or ratio between the topics. Looking through millions of messages and comparing them wouldn’t be smart either😀.

I was a bit surprised by the result and behavior - how would one build a resilient event-sourcing architecture if not every message in a compacted topic would be mirrored? The confusion and questions are still not fully answered yet in my head. My assumption was that the first and the last value of the same message key in a Kafka consumer batch are being mirrored and the state changes of the same key in between are ignored or better to say, compacted. In order to verify my assumption, to make sure that the customer is not losing any data, and to avoid sleepless nights, I’ve written a Python script to compare the contents of the topics.

Fetching the data

Before dumping the messages of a topic, I was running the Mirror Maker and made sure that the consumer lag was already down to zero. One thing to note here is that the throughput of topics were luckily not very high. This made my life a bit easier during the stopping of the data fetching process. For fetching the messages, including the key, timestamp, partition, and value, into a file, I’ve used the following command in the CLI:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --property print.key=true \
    --property print.timestamp=true \
    --property print.partition=true \
    --property key.separator='::' \
    --topic topicxx \
    --from-beginning > /tmp/topicxx.txt

Fetching data from the old and mirrored topic should be done at best in the same terminal and in background, so that both processes could be stopped simultaneously. The output of the files should look like this.

# output of the the topicxx.txt
CreateTime:1637768143839::Partition:2::KEY1::VALUE1
CreateTime:1637768153839::Partition:0::KEY2::VALUE1
CreateTime:1637768163339::Partition:2::KEY1::null
CreateTime:1637768163839::Partition:2::KEY1::VALUE2

The comparing script

In the final step, I’ve compared the output with the attached Python script: python diff-kafka-messages.py old-topic.txt mirrored-topic.txt. Before going through the script, I’d like to show you the output of the comparison.

------------- STARTING CHECK -------------
parsing old-topic.txt
parsed messages: 394 and 178 unique messages
parsing mirrored-topic.txt
parsed messages: 345 and 178 unique messages
************ STATISTICS ************
msg keys not found in old-topic.txt set()
msg keys not found in mirrored-topic.txt set()
modified {}
same value 178

The starting block gives one an overview of the message count and the unique messages in a topic based on the key. The more interesting part is in the statistics block, where in an ideal scenario, the first three values are empty, as in the example. The first two are quite self-explanatory. The modified block outputs the difference if the key exists in both topics, but the last value of a message is not the same. The last output should ideally have the same count as the uniqueness of the messages in one topic.

import sys
from dataclasses import dataclass

# tested with python 3.9
@dataclass
class KafkaMessage:
    key: str
    value: str
    timestamp: int
    partition: int

@dataclass
class KafkaMessageWithOccurrance:
    kafkaMessage: KafkaMessage
    keyOccurrance: int = 1

    def incrementOccurrance(self):
        self.keyOccurrance = self.keyOccurrance + 1
    
    def set_msg(self, msg: KafkaMessage):
        self.kafkaMessage = msg

class MessageParser:
    def __init__(self, args):
        self.messages: list[KafkaMessage] = []
        self.messages_by_keys: dict[str, KafkaMessageWithOccurrance] = {}
        filename = args
        # 1) ENTRYPOINT with parsing the messages from the dump into python dataclasses
        self.__parse_input_file(filename)
        # 2) Reorder the parsed messages to dict and accumulative statics like occurrances
        self.messages_by_key()

    def messages(self):
        return self.messages

    def count_messages(self) -> int:
        return len(self.messages)

    def messages_by_key(self) -> dict:
        for msg in self.messages:
            msg_key = msg.key
            if msg_key in self.messages_by_keys:
                self.messages_by_keys[msg_key].set_msg(msg)
                self.messages_by_keys[msg_key].incrementOccurrance()                
            else:
                self.messages_by_keys[msg_key] = KafkaMessageWithOccurrance(msg)

        return self.messages_by_keys

    def get_message_by_key(self, key):
        self.messages_by_keys.get(key)

    def count_unique_messages(self) -> int:
        return len(self.messages_by_keys.keys())

    # 3) Comparing magic ¯\_(ツ)_/¯
    def compare(self, other_messages: dict[str, KafkaMessageWithOccurrance]):
        d1 = self.messages_by_keys
        d1_keys = set(d1.keys())
        d2 = other_messages
        d2_keys = set(d2.keys())
        shared_keys = d1_keys.intersection(d2_keys)
        added = d1_keys - d2_keys
        removed = d2_keys - d1_keys
        modified = {o: ({"new": d1[o]}, {"old": d2[o]}) for o in shared_keys if d1[o].kafkaMessage.value != d2[o].kafkaMessage.value }
        same = set(o for o in shared_keys if d1[o].kafkaMessage.value == d2[o].kafkaMessage.value )
        return added, removed, modified, same

    def __parse_input_file(self, filename):
        print(f"parsing {filename}")
        with open(filename, 'r') as reader:
            for line in reader.readlines():
                split = line.split("::")
                self.messages.append(KafkaMessage(split[2], split[3].strip(), int(split[0].split(":")[1]), int(split[1].split(":")[1])))


args = sys.argv[1:]
if len(args) != 2:
    print("usage diff.py old.txt new.txt")
    exit(1)

print("\n------------- STARTING CHECK ------------- ")
old = MessageParser(args[0])
print(f"parsed messages: {old.count_messages()} and {old.count_unique_messages()} unique messages")


new = MessageParser(args[1])
print(f"parsed messages: {new.count_messages()} and {new.count_unique_messages()} unique messages")

print("************ STATISTICS ************")
added, removed, modified, same = new.compare(old.messages_by_keys)
print(f"msg keys not found in {args[0]}", added)
print(f"msg keys not found in {args[1]}", removed)
print("modified", modified)
print("same value", len(same))

Mainly, this script consists of three main functions, 1) parsing the files, 2) reordering based on the key, and 3) comparing the two inputs. One limiting factor could be the in-memory processing, so this might probably get stuck with a very large amount of topic (GB’s).

Conclusion

Luckily I was able to verify my assumptions like mentioned in the post with the script with whom I was able to double-check the successful migration to a Strimzi based deployment for a production environment in the insurance sector. The customer was quite happy, reassured with the result and was able to go forward with the migration of all Apache Kafka clients to the newly deployed Strimzi without having the feeling of missing any data, without having the same overall message count in the topics.