Start/Stop Kafka Consumers or subscribe to new topic Programmatically using Spring Kafka

Bikas Katwal
2 min readApr 1, 2019

Overview

Apache Kafka is a distributed, streaming platform.

In this post, I will share, how to start and stop a Kafka consumer using spring-kafka. You might want to have this feature to manage Kafka consumers life cycle or might want to start and stop the consumers using an API.
There could be a case where you want to stop all your streaming processes(Kafka consumers), before triggering your full batch job.

Setup

You just need the below dependency to start working with spring kafka:

<dependency> 
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <version>2.1.6.RELEASE</version>
</dependency>

You can achieve the same thing using native Kafka java client too. But, it is much easier to implement using spring-kafka, as you do not have to manage multiple threads to create consumers.

Implementation

We need to care of below things:

  • We need a HashMap which stores “topic name” as key and “ConcurrentMessageListenerContainer” as value. This HashMap is later required to refer consumers/listeners based on a topic name, to start/stop.

ConcurrentMessageListenerContainer Creates one or more kafka message listeners/consumers based on the configured number of consumers(concurrency)

  • We need to create custom message listener class that processes(may process and save to a database) topic messages. Message Listeners are of two types:
    (i) AcknowledgingMessageListener — need to acknowledge/commit read offsets manually.
    (ii) MessageListener — Auto commits offsets.

Use the object of AcknowledgingMessageListener when you want to commit offsets manually else go with normal MessageListener.

Below are the implementation of the two interfaces:
AcknowledgingMessageListener :

MessageListener:

With that let’s Look at the implementation of KafkaConsumerUtil.java, where we have two methods that will start and stop the consumer for a given topic name.

Follow comments in the code

Note: if enable.auto.commit property is true, pass an instance of CustomMessageListener else instance of AcknowledgingMessageListener to the method param “Object messageListener”

Stop method is pretty straight forward. Fetch the respective container from based on given topic name and then stop the container:

Testing and Usage

We will create a sample implementation of IMessageProcessor, which will just print the data in the console, ideally, you might want to process the data and save to multiple databases:

For testing, we will use Embedded Kafka provided by spring-kafka-test.
Snippet from Test Class:

In the above example, we have created 2 consumers for “test-topic” and started consumer with enable.auto.commit=false, which means, we need to pass an instance of CustomAckMessageListener to startOrCreateConsumers() method.

Conclusion

In this post, we talked about starting and stopping consumers programmatically and creating custom message listeners.
You can find the source code and the usage/setup details in the GitHub repo.

--

--

Bikas Katwal

Coder | Distributed Systems | Search | Software Engineer @Walmartlabs