My first kafka outage
A few weeks ago myself and my team experienced our first kafka outage and we got to learn a lot from it. This post describes how we approached the problem, the steps we took to debug it and in the end solve it.
Context
A few months ago the team started this new project, the system connects to a kafka cluster from which it consumes messages from multiple topics, gets the payload and after performing some operations and decoration, it generates a new message, which is then sent to different groups of clients that subscribe to messages that present specific characteristics that they find relevant.
The system wasn’t yet in production. It was being used by a restricted group of users for beta testing. Although the system was only up for a few weeks, the kafka cluster we connect to had been receiving messages for a few months.
The problem was raised by one of the users participating in the beta group which sent us a message warning that he was not seeing any data on the web application that is fed from our service.
Investigation
The first step we took was looking at our metrics dashboards and indeed, the graphic for consumed messages was flat at zero, so we went to search the logs for more information.
[...]
o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-5, groupId=consumer.group1] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: org.apache.kafka.common.errors.DisconnectException.
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=consumer.group1] Discovered group coordinator broker02:9092 (id: 2147483646 rack: null)
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=consumer.group1] Group coordinator broker02:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-7, groupId=consumer.group1] Connection to node 2147483646 could not be established. Broker may not be available.
org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-7, groupId=consumer.group1] Connection to node 1 could not be established. Broker may not be available.
[...]
The service was trying to connect to the kafka group coordinator but it was failing over and over again. We needed more information so we tried to connect a local consumer in debug mode to get a more verbose log.
Group coordinator lookup failed: The coordinator is not available.
Coordinator discovery failed, refreshing metadata
Updated cluster metadata version 22 to Cluster(
id = Wcz5M3HmQ6ii3KLw2yQEJQ,
nodes = [broker01:9092 (id: 0 rack: null), broker03:9092 (id: 2 rack: null)],
partitions = [
Partition(topic = prd-topic, partition = 1, leader = 2, replicas = [2], isr = [2], offlineReplicas = []),
Partition(topic = prd-topic, partition = 9, leader = none, replicas = [1], isr = [], offlineReplicas = []),
Partition(topic = prd-topic, partition = 0, leader = none, replicas = [1], isr = [], offlineReplicas = []),
Partition(topic = prd-topic, partition = 8, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
Partition(topic = prd-topic, partition = 7, leader = 2, replicas = [2], isr = [2], offlineReplicas = []),
Partition(topic = prd-topic, partition = 6, leader = none, replicas = [1], isr = [], offlineReplicas = []),
Partition(topic = prd-topic, partition = 5, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
Partition(topic = prd-topic, partition = 4, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])
Partition(topic = prd-topic, partition = 3, leader = none, replicas = [1], isr = [], offlineReplicas = [])
Partition(topic = prd-topic, partition = 2, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
Partition(topic = prd-other-topic, partition = 5, leader = 2, replicas = [2], isr = [2], offlineReplicas = []),
Partition(topic = prd-other-topic, partition = 4, leader = none, replicas = [1], isr = [], offlineReplicas = []),
Partition(topic = prd-other-topic, partition = 7, leader = none, replicas = [1], isr = [], offlineReplicas = []),
Partition(topic = prd-other-topic, partition = 6, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
Partition(topic = prd-other-topic, partition = 9, leader = 0, replicas = [0], isr = [0], offlineReplicas = []),
Partition(topic = prd-other-topic, partition = 1, leader = none, replicas = [1], isr = [], offlineReplicas = [])
Partition(topic = prd-other-topic, partition = 8, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])
Partition(topic = prd-other-topic, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
Partition(topic = prd-other-topic, partition = 3, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
Partition(topic = prd-other-topic, partition = 2, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])
],
controller = broker01:9092 (id: 0 rack: null))
Analyzing the cluster metadata we could see that we only had two working nodes, broker02:9092 (broker 1) was not showing in the node list. Also, topics with replicas = [1]
had no leader (leader = none
). Definitely there was some problem with broker 1.
We ssh into the machine to check what was happening with it and when we got there we instantly found the problem - the machine was out of disk space.
$ du -h --max-depth=1 var/kafkadata/ | sort -hr
118G var/kafkadata/
63G var/kafkadata/data03
56G var/kafkadata/data02
53M var/kafkadata/data01
We had found the problem but it raised more questions than answers.
Drill down
What caused the coordinator connection to fail and consequently lose all messages?
Broker 1 went down with lack of disk space and all partitions that were managed by it at the time ended up with no leader as there were no replicas to take its place (ReplicationFactor:1). Although this is a problem and we should be aware of this possible source of failure in the future, it should not make the consumers connection fail. We would lose messages from leaderless partitions but clients would continue to consume from the ones managed by healthy brokers. However, Broker 1 was the group coordinator. The coordinator is in charge of managing the state of the group and in this case the sole responsible for __consumer_offsets topic, as the name suggests, used to manage consumer offsets.
$ kafka-topics --zookeeper zookeeper:2181/kafka --describe --topic __consumer_offsets
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 2 Leader: 1 Replicas: 1 Isr: 1
[...]
Topic: __consumer_offsets Partition: 48 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 49 Leader: 1 Replicas: 1 Isr: 1
We can see that all partitions of __consumer_offsets topic were being managed by Broker 1. With no failover strategy we just lost the all cluster. We had understood why we were not receiving any messages but we hadn’t yet found the root cause of the problem.
Why did Broker 1 run out of disk space?
We found the answer on the kafka server.properties
file where we spotted some weird configuration.
### /opt/kafka/config/server.properties ###
auto.leader.rebalance.enable=false
log.retention.hours=48
log.cleaner.enable=false
First the auto.leader.rebalance.enable
setting was set to false which wouldn’t enable the possibility of partitions being auto rebalanced by healthy nodes when a broker goes down. This is a problem, however, this was not causing the machine to get out of disk space.
log.cleaner.enable
set to false. That was the real issue.
[log.cleaner.enable] Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size. - Kafka documentation
By default the log cleaner is disabled and the log retention policy set to delete which evicts segments after their retention time expires. From the topic details (above) we could see that the cleanup.policy
for __consumer_offsets was set to compact, which mean the topic was not being compacted and the kafka logs were growing forever.
Actions
As result of the investigation we devised some actions to fix the problem and improve the overall system.
- Set
log.cleaner.enable
totrue
so we start compacting __consumer_offsets topic. - Increase the replication factor of __consumer_offsets and other topics so we improve service resiliency.
- Set
auto.leader.rebalance.enable
totrue
to enable leader failover when a node goes down.
Besides, we also felt that having to wait for a user to warn us that something was wrong was not good enough, we needed to have a better understanding on the state of the kafka cluster to ensure the reliability and stability of our service - we should make an effort to improve visibility and alerting over the kafka cluster.