Stuck in the loop

Write . Fight resistance . Ship!

My first kafka outage

A few weeks ago myself and my team experienced our first kafka outage and we got to learn a lot from it. This post describes how we approached the problem, the steps we took to debug it and in the end solve it.

Context

A few months ago the team started this new project, the system connects to a kafka cluster from which it consumes messages from multiple topics, gets the payload and after performing some operations and decoration, it generates a new message, which is then sent to different groups of clients that subscribe to messages that present specific characteristics that they find relevant.

The system wasn’t yet in production. It was being used by a restricted group of users for beta testing. Although the system was only up for a few weeks, the kafka cluster we connect to had been receiving messages for a few months.

The problem was raised by one of the users participating in the beta group which sent us a message warning that he was not seeing any data on the web application that is fed from our service.

Investigation

The first step we took was looking at our metrics dashboards and indeed, the graphic for consumed messages was flat at zero, so we went to search the logs for more information.

[...]
o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-5, groupId=consumer.group1] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: org.apache.kafka.common.errors.DisconnectException.
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=consumer.group1] Discovered group coordinator broker02:9092 (id: 2147483646 rack: null)
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=consumer.group1] Group coordinator broker02:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-7, groupId=consumer.group1] Connection to node 2147483646 could not be established. Broker may not be available.
org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-7, groupId=consumer.group1] Connection to node 1 could not be established. Broker may not be available.
[...]

The service was trying to connect to the kafka group coordinator but it was failing over and over again. We needed more information so we tried to connect a local consumer in debug mode to get a more verbose log.

Group coordinator lookup failed: The coordinator is not available.
Coordinator discovery failed, refreshing metadata
Updated cluster metadata version 22 to Cluster(
    id = Wcz5M3HmQ6ii3KLw2yQEJQ,
    nodes = [broker01:9092 (id: 0 rack: null), broker03:9092 (id: 2 rack: null)],
    partitions = [
        Partition(topic = prd-topic, partition = 1, leader = 2, replicas = [2], isr = [2], offlineReplicas = []),
        Partition(topic = prd-topic, partition = 9, leader = none, replicas = [1], isr = [], offlineReplicas = []),
        Partition(topic = prd-topic, partition = 0, leader = none, replicas = [1], isr = [], offlineReplicas = []),
        Partition(topic = prd-topic, partition = 8, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
        Partition(topic = prd-topic, partition = 7, leader = 2, replicas = [2], isr = [2], offlineReplicas = []),
        Partition(topic = prd-topic, partition = 6, leader = none, replicas = [1], isr = [], offlineReplicas = []),
        Partition(topic = prd-topic, partition = 5, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
        Partition(topic = prd-topic, partition = 4, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])
        Partition(topic = prd-topic, partition = 3, leader = none, replicas = [1], isr = [], offlineReplicas = [])
        Partition(topic = prd-topic, partition = 2, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
        Partition(topic = prd-other-topic, partition = 5, leader = 2, replicas = [2], isr = [2], offlineReplicas = []),
        Partition(topic = prd-other-topic, partition = 4, leader = none, replicas = [1], isr = [], offlineReplicas = []),
        Partition(topic = prd-other-topic, partition = 7, leader = none, replicas = [1], isr = [], offlineReplicas = []),
        Partition(topic = prd-other-topic, partition = 6, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
        Partition(topic = prd-other-topic, partition = 9, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
        Partition(topic = prd-other-topic, partition = 1, leader = none, replicas = [1], isr = [], offlineReplicas = [])
        Partition(topic = prd-other-topic, partition = 8, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])
        Partition(topic = prd-other-topic, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
        Partition(topic = prd-other-topic, partition = 3, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
        Partition(topic = prd-other-topic, partition = 2, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])
    ],
    controller = broker01:9092 (id: 0 rack: null))

Analyzing the cluster metadata we could see that we only had two working nodes, broker02:9092 (broker 1) was not showing in the node list. Also, topics with replicas = [1] had no leader (leader = none). Definitely there was some problem with broker 1.

We ssh into the machine to check what was happening with it and when we got there we instantly found the problem - the machine was out of disk space.

$ du -h --max-depth=1 var/kafkadata/ | sort -hr
118G	var/kafkadata/
63G	var/kafkadata/data03
56G	var/kafkadata/data02
53M	var/kafkadata/data01

We had found the problem but it raised more questions than answers.

Drill down

What caused the coordinator connection to fail and consequently lose all messages?

Broker 1 went down with lack of disk space and all partitions that were managed by it at the time ended up with no leader as there were no replicas to take its place (ReplicationFactor:1). Although this is a problem and we should be aware of this possible source of failure in the future, it should not make the consumers connection fail. We would lose messages from leaderless partitions but clients would continue to consume from the ones managed by healthy brokers. However, Broker 1 was the group coordinator. The coordinator is in charge of managing the state of the group and in this case the sole responsible for __consumer_offsets topic, as the name suggests, used to manage consumer offsets.

$ kafka-topics --zookeeper zookeeper:2181/kafka --describe --topic __consumer_offsets
Topic:__consumer_offsets	PartitionCount:50	ReplicationFactor:1	Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
	Topic: __consumer_offsets	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
    [...]
	Topic: __consumer_offsets	Partition: 48	Leader: 1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 49	Leader: 1	Replicas: 1	Isr: 1

We can see that all partitions of __consumer_offsets topic were being managed by Broker 1. With no failover strategy we just lost the all cluster. We had understood why we were not receiving any messages but we hadn’t yet found the root cause of the problem.

Why did Broker 1 run out of disk space?

We found the answer on the kafka server.properties file where we spotted some weird configuration.

### /opt/kafka/config/server.properties ###

auto.leader.rebalance.enable=false
log.retention.hours=48
log.cleaner.enable=false

First the auto.leader.rebalance.enable setting was set to false which wouldn’t enable the possibility of partitions being auto rebalanced by healthy nodes when a broker goes down. This is a problem, however, this was not causing the machine to get out of disk space.

log.cleaner.enable set to false. That was the real issue.

[log.cleaner.enable] Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size. - Kafka documentation

By default the log cleaner is disabled and the log retention policy set to delete which evicts segments after their retention time expires. From the topic details (above) we could see that the cleanup.policy for __consumer_offsets was set to compact, which mean the topic was not being compacted and the kafka logs were growing forever.

Actions

As result of the investigation we devised some actions to fix the problem and improve the overall system.

  1. Set log.cleaner.enable to true so we start compacting __consumer_offsets topic.
  2. Increase the replication factor of __consumer_offsets and other topics so we improve service resiliency.
  3. Set auto.leader.rebalance.enable to true to enable leader failover when a node goes down.

Besides, we also felt that having to wait for a user to warn us that something was wrong was not good enough, we needed to have a better understanding on the state of the kafka cluster to ensure the reliability and stability of our service - we should make an effort to improve visibility and alerting over the kafka cluster.