In previous posts we setup single node multi broker Kafka and did write Kafka producer & consumer in java for producing message and consuming with single broker. In this post we will post message to multi broker and create a consumer which reads always from beginning.
1. Zookeeper is up and running.
2. Kafka server setup in previous post - Single node and multiple broker is up and running.
How to check Zookeeper and Kafka is running or not ?
In this case 3 instance of Kafka with id 101,102 and 103 is running.
Create Kafka Topic: Create a topic "Multibroker-App-Devinline" with replication factor 3 as we three broker up and running.
Kafka Producer: Below sample publish message to topic "Multibroker-App-Devinline" with replication 3 (localhost:9091, localhost:9092, localhost:9093).
Kafka Seeking Consumer : Generally Kafka Consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will.
Below sample code creates a seeking consumer which starts reading from nth message available in topic. We have to provide implementation for method onPartitionsAssigned () of class ConsumerRebalanceListener.
Note: If we change POSITION value in above sample,<POSITION>), this consumer will read message from that position.
Sample Output (SeekingConsumer): Start consuming from 6th Message.
29 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = [] = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer = mygroup
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092, localhost:9093]
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version :
460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> offset = 5, key = null, value = 6
Message received -> offset = 6, key = null, value = 7
Message received -> offset = 12, key = Key0, value = Message from Kafka App Devinline 0
Message received -> offset = 13, key = Key1, value = Message from Kafka App Devinline 1
Message received -> offset = 14, key = Key2, value = Message from Kafka App Devinline 2
Message received -> offset = 15, key = Key3, value = Message from Kafka App Devinline 3
Message received -> offset = 16, key = Key4, value = Message from Kafka App Devinline 4
Lets start consumer from terminal and validate the messages from topic "Multibroker-App-Devinline".
