In this post we did setup multi broker Kafka environment with replication factor 3 and partition 1. We can alter partition count and same can be validated with describe command.
Kafka Producer (Multi broker Kafka Single Node environment):
Kafka Consumer (Multi broker Kafka Single Node environment) : Below Consumer has group.id = "mygroup". Create another copy of below sample program say KafkaMultiBrokerConsumer2.
Run two instance of above consumer and run producer program. We observe that messages produced by producer is consumed by two consumer (from partition = 0 and partition = 1). Here Consumer Group of two consumer instances of same group id so messages are consumed by both the consumers of the consumer group.
Below sample output shows that message 0 and 4 is consumed by one consumer from partition = 1 and message 1, 2 , 3 is consumed by another consumer from partition = 0.
Output of consumer-1:
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> partition = 0, offset = 20, key = Key1, value = Message from Kafka App Devinline 1
Message received -> partition = 0, offset = 21, key = Key2, value = Message from Kafka App Devinline 2
Message received -> partition = 0, offset = 22, key = Key3, value = Message from Kafka App Devinline 3
Output of consumer-2:
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
27607 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
Message received -> partition = 1, offset = 2, key = Key0, value = Message from Kafka App Devinline 0
Message received -> partition = 1, offset = 3, key = Key4, value = Message from Kafka App Devinline 4
Replication & broker failover: In Kafka replication helps broker failover, it means suppose one broker leader and it goes down then another broker takes place of leader and prevents broker failover. Below commands create topic with replication factor of 3 and with describe commands we displays leader.
Replication prevents Kafka broker failure by electing leader from available Isr consumer list.
[centos@host01 config]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Multibroker-App-Devinline --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [centos@host01 config]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibroker-App-Devinline Topic:Multibroker-App-Devinline PartitionCount:2 ReplicationFactor:3 Configs: Topic: Multibroker-App-Devinline Partition: 0 Leader: 103 Replicas: 101,103,102 Isr: 103,102,101 Topic: Multibroker-App-Devinline Partition: 1 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101
Kafka Producer (Multi broker Kafka Single Node environment):
package com.devinline.kafkasamples; import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaMultiBrokerProducer { public static void main(String[] args) { String topicName = "Multibroker-App-Devinline"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 5; i++) { String key = "Key" + i; String message = "Message from Kafka App Devinline " + i; producer.send(new ProducerRecord<String, String>(topicName, key, message)); } System.out.println("Message sent successfully"); producer.close(); } }
Kafka Consumer (Multi broker Kafka Single Node environment) : Below Consumer has group.id = "mygroup". Create another copy of below sample program say KafkaMultiBrokerConsumer2.
package com.devinline.kafkasamples; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaMultiBrokerConsumer1 { public static void main(String[] args) throws Exception { String topicName = "Multibroker-App-Devinline"; String groupName = "mygroup"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("group.id", groupName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = null; try { consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Message received -> partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } catch (Exception ex) { ex.printStackTrace(); } finally { consumer.close(); } } }
Run two instance of above consumer and run producer program. We observe that messages produced by producer is consumed by two consumer (from partition = 0 and partition = 1). Here Consumer Group of two consumer instances of same group id so messages are consumed by both the consumers of the consumer group.
Below sample output shows that message 0 and 4 is consumed by one consumer from partition = 1 and message 1, 2 , 3 is consumed by another consumer from partition = 0.
Output of consumer-1:
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> partition = 0, offset = 20, key = Key1, value = Message from Kafka App Devinline 1
Message received -> partition = 0, offset = 21, key = Key2, value = Message from Kafka App Devinline 2
Message received -> partition = 0, offset = 22, key = Key3, value = Message from Kafka App Devinline 3
Output of consumer-2:
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
27607 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
Message received -> partition = 1, offset = 2, key = Key0, value = Message from Kafka App Devinline 0
Message received -> partition = 1, offset = 3, key = Key4, value = Message from Kafka App Devinline 4
- Create a topic Failsafe-Topic with 1 partition and replication factor of 3
[centos@host01 kafka]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Failsafe-Devinline-Topic Created topic "Failsafe-Devinline-Topic".
- List all the topics and verify that the new topic is created
[centos@host01 kafka]$ kafka-topics.sh --list --zookeeper localhost:2181 Failsafe-Devinline-Topic Multibroker-App Multibroker-App-Devinline __consumer_offsets topic-devinline-1
- Open a new terminal and start producer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-producer.sh --broker-list localhost:9091,localhost:9092 --topic Failsafe-Devinline-Topic >Message-1 >Message-2 >Message-3
- Open a new terminal and start consumer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic Failsafe-Devinline-Topic --from-beginning Message-1 Message-2 Message-3
- Describe the topic details and verify the output similar to displayed below
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
- Switch to the terminal where the leader is running and shutdown the same by entering Ctrl + D
- Describe the topic again and verify that new leader election. Isr (InSync replica) has two entry only.
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103
- Restart consumer again and describe topic details again. Newly started consumer added in Isr (InSync replica)
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103,101
Replication prevents Kafka broker failure by electing leader from available Isr consumer list.
Đặt vé tại phòng vé Aivivu, tham khảo
ReplyDeletevé máy bay đi Mỹ
vé máy bay từ mỹ về việt nam hãng korea
giá vé từ nhật về việt nam
đặt vé máy bay từ đức về việt nam
các đường bay từ canada về việt nam
mua vé máy bay từ hàn quốc về việt nam
khách sạn cách ly
Wow, such an awesome blog you have written there and you and I get exactly what information I am looking for, in the third paragraph you put amazing effort to explain the theme of the content.
ReplyDelete고스톱
I think this is among the most vital info for me. And I'm glad
ReplyDeletereading your article. But should remark on few general things, The
web site style is wonderful, the articles is really great.
토토사이트