Desperately Seeking Kafka
The Feldera system we're building at www.feldera.com relies on Apache Kafka to store input and output data reliably for fault tolerance. Kafka is a distributed system that stores and buffers messages, called “events”, which are stored on disk in numbered “partitions” within named “topics”. Kafka “producers” append events to partitions, and Kafka “consumers” read events from partitions.
The following diagram shows a topic with three partitions, each of which contains several events, each labeled with its offset. Kafka partitions can be modified only by appending to the end or deleting from the beginning. Therefore, since partition 0 starts with offset 0, it contains all of the events that were ever written to it, whereas partitions 1 and 2 have had their earliest events deleted:
Even though Kafka is widely used across the software industry, I found out that some aspects of how it works are not well documented or easily learned from web resources, even in simple cases. This blog post describes some of what I learned while integrating Kafka and Feldera. My experience is using the rdkafka bindings for Rust, which is a wrapper for the librdkafka implementation of the Kafka client protocol, on top of Redpanda, but I believe that these experiences
generalize beyond those details.
When a program starts to consume a partition with a Kafka consumer, it specifies what position to start from, such as the earliest event in the partition, the most recent, or some specific position in between. The Kafka client library contacts the Kafka "broker" (server) and retrieves an initial batch of events starting from that partition, and then further batches as the program finishes processing the previous ones. Processing a stream of events in order like this is the common case for programs that use Kafka.
The problem with seeks
Now consider a program that needs to access events randomly instead of linearly. It might need to read both the oldest and the newest events from a partition, or it might need to do a binary search through a partition based on event payloads. The Feldera system we’re building does the latter on startup for crash recovery, for example, to search an “index” partition that tracks what data has already been processed.
To support random reads like this, Kafka client libraries support a “seek” operation to start consuming a partition from a new position. You can use this to write code for reading from the beginning or end of a partition, or for binary search. If you do so naively, you will get a surprise: each seek takes about half a second, so a binary search of n
events takes (log₂n)/2
seconds. While Kafka is not optimized for random access, this is much slower than any storage device.
It took me a while to figure out what’s going on. The key is a note in the seek
documentation for librdkafka
that says, “This call triggers a fetch queue barrier flush.” The documentation doesn’t explain further, but by digging into librdkafka
source code, one finds out that it means that the library waits for the broker to return a response to its most recent event fetch request (because
Kafka is all about reading everything in a partition, there is always some most recent fetch request). If there are no more records to fetch, which is common when one is testing with a small test environment, this request times out after 500 milliseconds.
As an aside, the seek
documentation for the Kafka Java API has the same issue, but it documents this quirk explicitly: "Note that, the seek offset won't change to the in-flight fetch request, it will take effect in next fetch request. So, the consumer might wait for fetch.max.wait.ms
before starting to fetch the records from desired offset."
First tries at solutions
librdkafka
offers various promising alternatives to avoiding the problem. We can, for example, use the pause
and resume
functions to stop and then restart fetching events from a partition before or after or bracketing calls to seek
. Or, instead of using seek
, we can use the assign
function that changes the set of partitions that our consumer is reading, because this function can also assign a new, specific position to the partition. These approaches, and combinations
and variations of them, do not work: they either yield errors or 500-ms delays.
Another approach is to consider the 500-ms delay. Looking through the librdkafka configuration properties, this comes from fetch.wait.max.ms
[^1], which defaults to 500 but can be adjusted as low as 0. Indeed, if we set it to 0, this eliminates the delay in Kafka seeks. At this point in my experimentation, I concluded that I had solved the problem and continued working on other features.
[^1]: librdkafka
uses this name. The Java client for Kafka names the equivalent configuration property fetch.max.wait.ms
. Neither client accepts the other’s name for the property.
After I’d done some more work, I noticed that my machine’s fan was running faster than expected during testing. I looked in a thread monitor, which showed that librdkafka
was using a whole CPU core to poll the Kafka broker in a tight loop. After reading more code in librdkafka
, I concluded that this behavior is unavoidable when fetch.wait.max.ms
is 0. And librdkafka
does not provide a way to change this configuration property on a consumer, only to set it when
the consumer is created.
Refined solution
The solution I finally adopted was to create two consumers instead of one. The first consumer, which is configured with fetch.wait.max.ms
of 0, is used only for operations that require fast seeks and short reads. When it is not needed for these purposes (which is most of the time), the program calls pause
to suspend the librdkafka
thread that polls the broker in a tight loop. The second consumer, which is configured with the default fetch.wait.max.ms
of 500, is used for reading data linearly in the most common case for Kafka. In the current code base, the second consumer never needs to seek after it is created; if it did need to seek, it would be faster to close it and create a new consumer whose initial position was the desired one.
Final words
Ideally, Kafka would implement seeks that execute quickly regardless of the configured poll interval. Based on my own limited insight, my guess is that the Kafka protocol could support this. Until some future generation of Kafka client libraries makes fast seeks possible without using excessive CPU, the strategy described here of using multiple consumer instances could be effective for others, too.
I'd love to hear your feedback at @Ben_Pfaff or via email—perhaps you have a better solution than mine? As a company, we'd love to see you download and try Feldera at www.feldera.com. You can also reach out to us via learnmore@feldera.com or chat with us and other community members in our community Slack instance.
Other articles you may like
Incremental Update 6 at Feldera
We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!
Database computations on Z-sets
How can Z-sets be used to implement database computations
Incremental Update 5 at Feldera
A quick overview of what's new in v0.25.