Kafka

A look at the Camel Kafka Consumer

In this post let’s look at the Kafka Camel Component. Apache Camel is an implementation of the enterprise integration patterns. The class we want is KafkaConsumer

 @Override
    protected void doStart() throws Exception {
        //...
        executor = endpoint.createExecutor();
        //...
        for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
            KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
            // pre-initialize task during startup so if there is any error we
            // have it thrown asap
            task.preInit();
            executor.submit(task);
            tasks.add(task);
        }
    }

The first thing to look at is the executor at line 19. We see this method

   public ExecutorService createExecutor() {
        return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
    }

This is a little bit confusing since the consumerStreams property is actually the thread pool size. If we look at the docs we see this

consumerStreams (consumer)Number of concurrent consumers on the consumer10
consumerStreams property

Anyway by default we start with a thread pool size of 10. Then we look at the consumerCount

consumersCount (consumer)The number of consumers that connect to kafka server1
consumersCount property

and we create the respective threads

    for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
      KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
      // pre-initialize task during startup so if there is any error we
      // have it thrown asap
      task.preInit();
      executor.submit(task);
      tasks.add(task);

It’s important to remember that a thread per consumer instance is the rule we must never break(KafkaConsumer is NOT thread safe). We see that this rule is respected here in the task.preInit() method

   protected void doInit () {
      // create consumer
      ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
      try {
        // Kafka uses reflection for loading authentication settings,
        // use its classloader
        Thread.currentThread()
                .setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
        // this may throw an exception if something is wrong with kafka
        // consumer
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
      } finally {
        Thread.currentThread().setContextClassLoader(threadClassLoader);
      }
    }

Now

 class KafkaFetchRecords implements Runnable, ConsumerRebalanceListener {

which means we use a fire and forget mechanism, we don’t wait for a response. This will improve greatly the throughput. Also we see that this takes some action when a rebalance is triggered.

Lines 254-304 take care of subscribing to the topic and initializing an offset. Then the polling starts

LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs);
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);

We go through all partitions and start pulling records

 Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator();

and create Exchange objects as we go

    Exchange exchange = endpoint.createKafkaExchange(record);

    propagateHeaders(record, exchange, endpoint.getConfiguration());

    // if not auto commit then we have additional
    // information on the exchange
    if (!isAutoCommitEnabled()) {
      exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordIterator.hasNext());
    }
    if (endpoint.getConfiguration().isAllowManualCommit()) {
      // allow Camel users to access the Kafka
      // consumer API to be able to do for example
      // manual commits
      KafkaManualCommit manual = endpoint.getComponent().getKafkaManualCommitFactory()
              .newInstance(exchange, consumer, topicName, threadId,
                      offsetRepository, partition, record.offset());
      exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
    }

Another interesting bit is what should happen when we encounter an error

 if (endpoint.getConfiguration().isBreakOnFirstError()) {
      // we are failing and we should break
      // out
      LOG.warn(
              "Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.",
              exchange,
              topicName, partitionLastOffset);
      // force commit so we resume on next
      // poll where we failed
      commitOffset(offsetRepository, partition, partitionLastOffset, true);
      // continue to next partition
      breakOnErrorHit = true;
    }

The interesting property here is

breakOnFirstError (consumer)This options controls what happens when a consumer is processing an exchange and it fails. If the option is false then the consumer continues to the next message and processes it. If the option is true then the consumer breaks out, and will seek back to offset of the message that caused a failure, and then re-attempt to process this message. However this can lead to endless processing of the same message if its bound to fail every time, eg a poison message. Therefore its recommended to deal with that for example by using Camel’s error handler.false
breakOnFirstError property

Before the consumer unsubscribes we make sure we commit the offset

   if (!reConnect) {
      if (isAutoCommitEnabled()) {
        if ("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
          LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName);
          consumer.commitAsync();
        } else if ("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
          LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName);
          consumer.commitSync();
        }
      }
    }
autoCommitOnStop (consumer)Whether to perform an explicit auto commit when the consumer stops to ensure the broker has a commit from the last consumed message. This requires the option autoCommitEnable is turned on. The possible values are: sync, async, or none. And sync is the default value. The value can be one of: sync, async, nonesync
autoCommitOnStop property

On rebalance listener the only special thing that is happening comes from the fact that camel offers the possibility to store offsets into different types of repositories, overriding the default (__consumer_offsets topic). This will trigger only if we have defined a custom one.

 @Override
    public void onPartitionsRevoked (Collection < TopicPartition > partitions) {
      LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName);

      StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
      for (TopicPartition partition : partitions) {
        String offsetKey = serializeOffsetKey(partition);
        Long offset = lastProcessedOffset.get(offsetKey);
        if (offset == null) {
          offset = -1L;
        }
        LOG.debug("Saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, offset);
        commitOffset(offsetRepository, partition, offset, true);
        lastProcessedOffset.remove(offsetKey);
      }
    }

    @Override
    public void onPartitionsAssigned (Collection < TopicPartition > partitions) {
      LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, topicName);

      StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
      if (offsetRepository != null) {
        for (TopicPartition partition : partitions) {
          String offsetState = offsetRepository.getState(serializeOffsetKey(partition));
          if (offsetState != null && !offsetState.isEmpty()) {
            // The state contains the last read offset so you need
            // to seek from the next one
            long offset = deserializeOffsetValue(offsetState) + 1;
            LOG.debug("Resuming partition {} from offset {} from state", partition.partition(), offset);
            consumer.seek(partition, offset);
          }
        }
      }
    }
offsetRepository (consumer)The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit.
offsetRepository property

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.