In previous posts we setup single node multi broker Kafka and did write Kafka producer & consumer in java for producing message and consuming with single broker. In this post we will post message to multi broker and create a consumer which reads always from beginning.
Prerequisite:
1. Zookeeper is up and running.
2. Kafka server setup in previous post - Single node and multiple broker is up and running.
How to check Zookeeper and Kafka is running or not ?
In this case 3 instance of Kafka with id 101,102 and 103 is running.
Create Kafka Topic: Create a topic "Multibroker-App-Devinline" with replication factor 3 as we three broker up and running.
Kafka Producer: Below sample publish message to topic "Multibroker-App-Devinline" with replication 3 (localhost:9091, localhost:9092, localhost:9093).
Kafka Seeking Consumer : Generally Kafka Consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will.
Below sample code creates a seeking consumer which starts reading from nth message available in topic. We have to provide implementation for method onPartitionsAssigned () of class ConsumerRebalanceListener.
Note: If we change POSITION value in above sample consumer.seek(partition,<POSITION>), this consumer will read message from that position.
Sample Output (SeekingConsumer): Start consuming from 6th Message.
29 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = mygroup
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092, localhost:9093]
....
....
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> offset = 5, key = null, value = 6
Message received -> offset = 6, key = null, value = 7
Message received -> offset = 12, key = Key0, value = Message from Kafka App Devinline 0
Message received -> offset = 13, key = Key1, value = Message from Kafka App Devinline 1
Message received -> offset = 14, key = Key2, value = Message from Kafka App Devinline 2
Message received -> offset = 15, key = Key3, value = Message from Kafka App Devinline 3
Message received -> offset = 16, key = Key4, value = Message from Kafka App Devinline 4
Lets start consumer from terminal and validate the messages from topic "Multibroker-App-Devinline".
Reference Kafka Consumer : https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Prerequisite:
1. Zookeeper is up and running.
2. Kafka server setup in previous post - Single node and multiple broker is up and running.
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties [centos@host01 ~]$ bin/kafka-server-start.sh config/server-2.properties [centos@host01 kafka]$ bin/kafka-server-start.sh config/server-3.properties
How to check Zookeeper and Kafka is running or not ?
In this case 3 instance of Kafka with id 101,102 and 103 is running.
[centos@host01 kafka]$ jps 11859 Kafka 15204 org.eclipse.equinox.launcher_1.3.200.v20160318-1642.jar 10502 QuorumPeerMain 11543 Kafka 12200 Jps 11211 Kafka [centos@host01 kafka]$ echo dump | nc localhost 2181 | grep brokers /brokers/ids/103 /brokers/ids/102 /brokers/ids/101
Create Kafka Topic: Create a topic "Multibroker-App-Devinline" with replication factor 3 as we three broker up and running.
[centos@host01 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibroker-App-Devinline Created topic "Multibroker-App-Devinline".
Kafka Producer: Below sample publish message to topic "Multibroker-App-Devinline" with replication 3 (localhost:9091, localhost:9092, localhost:9093).
package com.devinline.kafkasamples; import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaMultiBrokerProducer { public static void main(String[] args) { String topicName = "Multibroker-App-Devinline"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 5; i++) { String key = "Key" + i; String message = "Message from Kafka App Devinline " + i; producer.send(new ProducerRecord<String, String>(topicName, key, message)); } System.out.println("Message sent successfully"); producer.close(); } }
Kafka Seeking Consumer : Generally Kafka Consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will.
Below sample code creates a seeking consumer which starts reading from nth message available in topic. We have to provide implementation for method onPartitionsAssigned () of class ConsumerRebalanceListener.
package com.devinline.kafkasamples; /** * @author www.devinline.com (nikhil) * */ import java.util.Arrays; import java.util.Collection; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; public class KafkaSeekingConsumer { public static void main(String[] args) throws Exception { String topicName = "Multibroker-App-Devinline"; String groupName = "mygroup"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("group.id", groupName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String, String> consumer; try { consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { consumer.seek(partition, 5); } } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Message received -> offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } catch (Exception ex) { ex.printStackTrace(); } } }
Note: If we change POSITION value in above sample consumer.seek(partition,<POSITION>), this consumer will read message from that position.
Sample Output (SeekingConsumer): Start consuming from 6th Message.
29 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = mygroup
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092, localhost:9093]
....
....
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> offset = 5, key = null, value = 6
Message received -> offset = 6, key = null, value = 7
Message received -> offset = 12, key = Key0, value = Message from Kafka App Devinline 0
Message received -> offset = 13, key = Key1, value = Message from Kafka App Devinline 1
Message received -> offset = 14, key = Key2, value = Message from Kafka App Devinline 2
Message received -> offset = 15, key = Key3, value = Message from Kafka App Devinline 3
Message received -> offset = 16, key = Key4, value = Message from Kafka App Devinline 4
[centos@host01 ~]$ kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic Multibroker-App-Devinline --from-beginning 1 2 3 4 5 6 7 Message from Kafka App Devinline 0 Message from Kafka App Devinline 1 Message from Kafka App Devinline 2 Message from Kafka App Devinline 3 Message from Kafka App Devinline 4
Reference Kafka Consumer : https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Aivivu chuyên vé máy bay, tham khảo
ReplyDeletevé máy bay đi Mỹ giá rẻ
có chuyến bay từ mỹ về việt nam chưa
vé máy bay về việt nam từ nhật
giá vé máy bay từ đức về việt nam
vé máy bay từ canada về việt nam giá rẻ
giá vé máy bay hàn quốc về việt nam
khách sạn cách ly ở việt nam