Kafka Streams Settings for Real-Time Alerting

Kafka Stream Real-Time Header

The Enterprise Insights team at Twilio leverages Apache Kafka to collect data in our warehouse which other teams at Twilio use for data exploration and to create meaningful insights.

Our team is building a service to do near real time alerting on Twilio’s bursty traffic – some hours of the day experience high traffic while other hours experience a lull. These traffic patterns create challenges for streaming systems. During our design and experimentation phase we learned a lot about tuning Kafka streams, and this blog post outlines what we learned.

This blog post will focus solely on what we learned while tuning Kafka streams. We will explain our rationale behind choosing Kafka streams and the design of the alerting system in a future blog post.

App design

The near real-time alerting app consists of two stages which measure metrics by rolling up individual records and checking if they violate a customer-defined threshold. If the metric value exceeds the threshold, we alert the customer. The app’s stages are:

Near real-time alerting app with Kafka Streams diagram


1. Metric calculation - This stage does metric calculations for individual records and produces them to a Kafka topic with the same time window.
2. Metric aggregation and evaluation against alert thresholds - This stage takes the individual metrics and aggregates them into time windows. These aggregates are then rolled up and checked against alert thresholds.

Experiments

Kafka Streams expose a lot of fine-grained settings around the Streams, Producer, and Consumer. Before we begin talking about what worked for us, let's talk about our testbeds for these experiments.

Testbed

While we'll avoid talking about how many instances exist in the Kafka Streams cluster and how many containers are in the Kubernetes pod, you'll need to match those settings to how many CPU cores and how much disk space a use case ends up using based on your infrastructure.

These experiments are run on EC2 instances with direct attached SSD storage.

 
The two input traffic profiles we optimized for were:

  1. A steady stream of around 100 records per second.
  2. A bursty stream with a maximum of around 40,000 records per second, and a minimum  near 5000 records per second.

Kafka settings

For our integration of Kafka, we needed to tinker with quite a few Kafka settings. Notable Kafka Stream settings that we tuned are:

  • NUM.STREAM.THREADS
  • COMMIT.INTERVAL.MS
  • ENABLE_AUTO_COMMIT_CONFIG
  • POLL.MS
  • FETCH_MAX_WAIT_MS
  • FETCH_MIN_BYTES
  • MAX_POLL_RECORDS
  • MAX_PARTITION_FETCH_BYTES
  • FETCH_MAX_BYTES
  • LINGER.ms
  • CustomRocksDB config - Shows a decrease in processor time by using block cache.

Let’s go through each of these settings in detail.

1. StreamsConfig.NUM_STREAM_THREADS_CONFIG (num.stream.threads)

As the name suggests, NUM.STREAM.THREADS is the number of stream threads (Java threads) that actuate the processing of records for tasks.

As long as there is work to be done by the stream tasks (which are created for each input topic partition) concurrently, stream threads will handle the creation, partition assignment (standby and active) and other things needed by stream tasks.

A good document on Stream threads can be found here.

As long as there are cores (including hyperthreading) available to be used by the Kafka Streams application, you can bump up the number of stream threads for processing. However, the threads will lay idle if:

  1. the number of partitions isn’t high enough OR
  2. some partitions don’t get data at all

2. StreamsConfig.COMMIT_INTERVAL_MS_CONFIG (commit.interval.ms)

The COMMIT.INTERVAL.MS setting is used to save the position (offset) of the processor in the Kafka offset management system.

Note that this is not the same as committing Kafka records to different topics in the app.

If  processing.guarantee is set to exactly_once,   the default value is 100 otherwise the default value is 30000. This setting only gets applied if enable.auto.commit is also set to false.

We had a few topics to store metrics and their rolled up aggregates while expecting exactly_once processing, so we experimented with values between 1 and 20 seconds to avoid burdening the brokers with these offset status updates.

Depending on how risk-averse you are, it’s possible to make the system handle duplicate processing in case of failures. Increase the time value for this setting to avoid any double processing.

3. StreamsConfig.POLL_MS_CONFIG (poll.ms)

The POLL.MS setting represents the amount of time we’ll block while waiting on data from brokers. We have to optimize this setting for two goals – we don’t want the blocking time to be too long, but we also need to fetch sufficient data on each poll.

The latter of these is controlled by max.poll.interval.ms as well as the consumer fetch settings around how many bytes and records to fetch.

This setting of poll.ms essentially avoids busy-waiting of a thread in case there is no data for a partition from the broker.

We settled on a value of 50ms for this setting.

4. ConsumerConfig.FETCH_MIN_BYTES_CONFIG (fetch.min.bytes)

Changing FETCH_MIN_BYTES from its default (1) is a good idea if we have the luxury of waiting a few milliseconds between fetches – our tasks are processing the data just fetched.

We don’t want the fetches to be too frequent if there is more data that can be fetched. In that case, we can wait for our records to be processed.

Otherwise, leave this setting as default if you are hoping to work in near real-time and want to fetch even a single new byte of data right away.

For our testbed A (100 records per second), this setting worked well between values of 1000 and 10000 bytes, but in testbed B (40k records per second), the app was more performant with the default value.

5. ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG (fetch.max.wait.ms)

We had a similar experience with FETCH_MAX_WAIT_MS_CONFIG as we did with FETCH_MIN_BYTES –  somewhere between 1 second and 5 seconds gave us satisfactory results on testbed A, but we stuck to using the default of 500ms on testbed B.

6. StreamsConfig.MAX_POLL_RECORDS_CONFIG (max.poll.records)

The MAX_POLL_RECORDS setting is useful in cases where we are trying to decrease offset lag quickly. It dictates how many records are fetched per poll, and if more than the specified number of records are fetched from the broker, they are stored in an internal buffer to avoid RPCs (requests) till the records already fetched are processed.

MAX_POLL_RECORDS essentially lets Kafka Streams know that an application will process the specified number of records before calling poll() again. We didn’t see any marked improvements in our consumer throughput when tinkering with this setting, so we didn’t end up changing it.

7. ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG (max.partition.fetch.bytes) and ConsumerConfig.FETCH_MAX_BYTES_CONFIG (fetch.max.bytes)

We did not see any marked improvement in our consumer throughput when tweaking either MAX_PARTITION_FETCH_BYTES or FETCH_MIN_BYTES.

8. ProducerConfig.LINGER_MS (linger.ms)

The LINGER.ms setting allows us to buffer records in the application before sending them to Kafka brokers. While the default value is 0 (send records to the broker as soon as you hit send), we cannot discount the load that 0 places on the brokers when producing too many topics simultaneously.

Hence, increasing the LINGER.ms setting to create a batch of records is better to safeguard systems from an accidental flood.

Though the advised linger value is between 5 and 10ms, for our testbed B (the 40k records per second testbed), we were able to increase this to 1000ms without causing any harm to our bottom line (measured as latency of results on the Kafka topic). Based on how aggressive you plan to be with your services end-to-end, you can optimize this value to batch things on the producer side.

9. Custom RocksDB configuration

Our application uses six different state stores. Facebook has plenty of documentation around the settings that can be changed for RocksDB, so we’ll concentrate on the areas where we made the most optimizations.

We were more focused on the READ side of things and hence optimized for reads happening at each stage of RocksDB.

RocksDB memory side:

  1. MemStore
  2. BloomFilter

RocksDB disk side:

  1. Index
  2. SSTable
  3. BlockCache (we ended up adding this configuration)

You can set a custom RocksDB config by pointing the setting at the class.

config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, AlerterRocksDBConfig.class);

We use BlockCache instead of the default FileCache – in cases where the Storage attached to an instance is remote, it’s better to have a secondary cache that does not go outside the box and instead use the local non-volatile disk. The cache size is significantly higher than the DRAM on the instance.

BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setBlockCache(new org.rocksdb.LRUCache(16 * 1024L * 1024L));
tableConfig.setBlockSize(16 * 1024L);

The next few lines show how to tune the size of your Cache and Block Size based on the instance type that you have and how much you can support. This ensures there is no memory pressure exerted from RocksDB that cannot be handled.

tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
tableConfig.setCacheIndexAndFilterBlocks(false);

These next settings look the same, but we do not want to use BlockCache to store Index and Filters (bloom filters in our case). Those lookups can come for the MemStore(L0), which is what we ended up doing.

tableConfig.setFilter(new BloomFilter(10));

If doing Key-Value lookups, it’s a good idea to set a bloom filter as a filter with 10 as the default number of bits used to represent a key.

tableConfig.setIndexType(IndexType.kHashSearch);

For Key-Value lookups, a good optimization is to use Hash Search instead of the default Binary Search. This optimizes the first stage to be closer to the result. If there are keys with the same prefix, RocksDB uses binary search to narrow down to the result.

We set the above table config in the RocksDB options.

options.setTableFormatConfig(tableConfig);

Other RocksDB options to set are:

options.useCappedPrefixExtractor(4);

In the above setting, 4 bytes means 4 X 8 = 32 bits which essentially means that we can support 2^32 prefix combinations in 32 bits

options.setMemtablePrefixBloomSizeRatio(0.25);

This line sets the Bloom Prefix ratio for the MemStore or MemTable. By default it is 0, and we can enable it by setting any value between 0 and 0.25.

The above settings are not an exhaustive list. Based on the RocksDB documentation, you can tweak settings to suit your use case.

On the write side, how frequently you want compactions to happen and how big you want the write buffers to be are critical settings. You’ll need to adjust the size of SSTables and MemTables in your application to optimize for these.

Other Kafka Streams features

  • Punctuators
  • Number of partitions of input topic

1. Kafka Punctuators

Punctuators are a special form of Stream Processor which can be scheduled on either user defined intervals or wall clock time. On a particular run of the Punctuator, the processing of the Kafka Streams application stops so that the Punctuator can finish its job on the current state of the application. It's only after a particular run of the Punctuator finishes that the application resumes processing.

This can make the Punctuator a bottleneck or a slowly ticking time bomb which will fail when the number of records overwhelms the Punctuator with too many to finish quickly.

Punctuators are useful as long as they finish quickly (similar to a quick snapshot) which is possible by making the number of records processed – read or written – in a single run deterministic or constant.

We made our Punctuator more performant by:

  1. Using Java multithreading for reading from statestores.
  2. Using the same multithreading paradigm to do quick evaluations in parallel.

One also needs to make sure that long Punctuator runs are flagged as violations. You’ll need to make sure your application logic avoids letting things become unresponsive.

2. Number of partitions of input topics

The number of partitions of a topic defines the level of parallelism we get when processing records of that topic. In Kafka Streams, it defines the number of tasks created from the application’s topology to process records.

To achieve optimal parallelism, we want to avoid data skew which can make some partitions (and hence, some stream tasks) hot while others remain idle. If you face hot spotting, think about why your data is biased towards a subset of partitions (due to your record key) and if you can use salting to distribute data evenly across partitions.

Summary

Making Kafka Streams work at scale while alerting in near real-time gave us a few challenges around managing fetches and handling our statestores – and we learned quite a bit. Here are the high-level lessons we took from the process:

  1. RocksDB tuning - Range queries are good on window and session stores, but avoid point lookups on them as much as possible.

    Tune RocksDB with a custom config to allow bloom filters wherever possible along with BlockCache.
  2. Poll more records and fetch more bytes whenever possible. The critical balance to reach here is to batch the polls while not polling too often in case there isn’t enough data (it will depend on your topic volume profile). Keep your poll interval especially short so your broker fetches are closer to  real time if there is always a good amount of data guaranteed to be present on each poll.
  3. Batch operations as much as possible – by batching, we can linger on the producer side to send many records together.

    Punctuate also represents batching of processes for checking if an alert criteria is triggered.
  4. Max_Poll_Records and Auto_Commit_Ms_Config don’t go well together if the latter setting is too high. If you set a fairly high value for Auto_Commit_Ms_Config, it’s advisable to remove your Max_Poll_Records and let the default value (500) be chosen
  5. The processing throughput of your application is directly proportional to the number of partitions of the input topic if your data is not heavily skewed towards some partitions (hot spotting).

Analytics and Insights at Twilio

Hopefully, this post about our experiments with Kafka Streams at Twilio helped you understand some of the tradeoffs therein – and when to tinker with your own Kafka settings. If you’re interested in solving these kinds of problems and wish to join us, here is the careers page at Twilio.

Your authors today:

The Enterprise Insights Team is a diverse group of engineers building solutions which provide insights on Twilio customer data which teams can leverage for their business needs. We use technologies like Apache Kudu, Apache Calcite, Apache Kafka, Apache Spark, React.js, and Twilio Paste in our stack to provide insights in real time to our customers.