In previous post we setup Kafka (Single & Multi broker) single node cluster and performed basic Kafka operations. In this post we will create Java Producer and Consumer and perform produce & consume messages. We will also see variety of producer(Async and Sync) which controls how message are published to Kafka topic and whether call is blocking or not.
Prerequisite:
1. Zookeeper is up and running.
2. Kafka server setup in previous post - Single node and single broker is up and running.
How to check Zookeeper is running or not ?
How to check Kafka is running or not ?
Note: With jps command also it can be validated both Kafka and Zookeeper is running or not.
For simplicity we will use single broker setup done in this post - topic name "topic-devinline-1" and with one partition & one replication factor. Execute below command to describe topic details.
Kafka Producer: Below Java Kafka producer produces message and publish in Kafka topic "topic-devinline-1".
Sample Output(Producer): On executing producer client , it produces message and publish to topic. Which can be consumed by consumer.
Kafka Consumer:
Sample Output(Consumer): On executing consumer client , it consumes message from topic.
Message received -> partition = 0, offset = 43, key = Key0, value = Message from Kafka-topic-devinline-0
Message received -> partition = 0, offset = 44, key = Key1, value = Message from Kafka-topic-devinline-1
Message received -> partition = 0, offset = 45, key = Key2, value = Message from Kafka-topic-devinline-2
Message received -> partition = 0, offset = 46, key = Key3, value = Message from Kafka-topic-devinline-3
Message received -> partition = 0, offset = 57, key = Key4, value = Message from Kafka-topic-devinline-4
Message received -> partition = 0, offset = 48, key = Key5, value = Message from Kafka-topic-devinline-5
Message received -> partition = 0, offset = 49, key = Key6, value = Message from Kafka-topic-devinline-6
Message received -> partition = 0, offset = 50, key = Key7, value = Message from Kafka-topic-devinline-7
Message received -> partition = 0, offset = 51, key = Key8, value = Message from Kafka-topic-devinline-8
Message received -> partition = 0, offset = 52, key = Key9, value = Message from Kafka-topic-devinline-9
Send call is asynchronous and it returns a Future for the RecordMetadata that will be assigned to this record. On invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record. Below sample code demonstrate sending message synchronously.
Open terminal and start consumer to consume message published to topic "topic-devinline-1".
Kafka provides capability to send message asynchronously using callback mechanism. Kafka defines a Callback interface that we use for asynchronous operations. The callback interface allows code to execute when the request is complete. The onCompletion(RecordMetadata metadata, Exception exception) gets called when the asynchronous operation completes. The metadata gets set (not null) if the operation was a success, and the exception gets set (not null) if the operation had an error.
Prerequisite:
1. Zookeeper is up and running.
2. Kafka server setup in previous post - Single node and single broker is up and running.
How to check Zookeeper is running or not ?
[centos@host01 ~]$ zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: standalone
How to check Kafka is running or not ?
[centos@host01 ~]$ echo dump | nc localhost 2181 | grep brokers /brokers/ids/101
Note: With jps command also it can be validated both Kafka and Zookeeper is running or not.
[centos@host01 ~]$ jps 15204 org.eclipse.equinox.launcher_1.3.200.v20160318-1642.jar 29749 Jps 24219 QuorumPeerMain 29003 Kafka
For simplicity we will use single broker setup done in this post - topic name "topic-devinline-1" and with one partition & one replication factor. Execute below command to describe topic details.
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic-devinline-1 Topic:topic-devinline-1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: topic-devinline-1 Partition: 0 Leader: 101 Replicas: 101 Isr: 101
Kafka Producer: Below Java Kafka producer produces message and publish in Kafka topic "topic-devinline-1".
- We have used key.serializer and value.serializer as StringSerializer, most commonly used.
- "acks" config controls the criteria under which requests are considered complete. "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
- By default send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns.
package com.devinline.kafkasamples; /** * https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html */ import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * @author www.devinline.com (nikhil) * */ public class SimpleProducer { public static void main(String[] args) throws InterruptedException, ExecutionException { String topicName = "topic-devinline-1"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9091"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) { String key = "Key" + i; String message = "Message from Kafka-topic-devinline-" + i; /* Asynchronously send a record to a topic and returns RecordMetadata */ Future<RecordMetadata> out = producer.send(new ProducerRecord<String, String>(topicName, key, message)); String messageOut = " Topic: "+ out.get().topic() + " "+ " Partition: "+ out.get().partition() + " "+ " Offset: "+out.get().offset() + " Message: "+message; System.out.println(messageOut); } producer.close(); System.out.println("Message sent successfully"); } }
Sample Output(Producer): On executing producer client , it produces message and publish to topic. Which can be consumed by consumer.
20 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [localhost:9091] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.StringSerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null max.block.ms = 60000 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null ssl.protocol = TLS request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] acks = all batch.size = 16384 ssl.keystore.location = null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT retries = 0 max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.StringSerializer ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 linger.ms = 0 211 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0 211 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a Topic: topic-devinline-1 Partition: 0 Offset: 43 Message: Message from Kafka-topic-devinline-0 Topic: topic-devinline-1 Partition: 0 Offset: 44 Message: Message from Kafka-topic-devinline-1 Topic: topic-devinline-1 Partition: 0 Offset: 45 Message: Message from Kafka-topic-devinline-2 Topic: topic-devinline-1 Partition: 0 Offset: 46 Message: Message from Kafka-topic-devinline-3 Topic: topic-devinline-1 Partition: 0 Offset: 47 Message: Message from Kafka-topic-devinline-4 Topic: topic-devinline-1 Partition: 0 Offset: 48 Message: Message from Kafka-topic-devinline-5 Topic: topic-devinline-1 Partition: 0 Offset: 49 Message: Message from Kafka-topic-devinline-6 Topic: topic-devinline-1 Partition: 0 Offset: 50 Message: Message from Kafka-topic-devinline-7 Topic: topic-devinline-1 Partition: 0 Offset: 51 Message: Message from Kafka-topic-devinline-8 Topic: topic-devinline-1 Partition: 0 Offset: 52 Message: Message from Kafka-topic-devinline-9 631 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Message sent successfully
Kafka Consumer:
package com.devinline.kafkasamples; /** * */ import java.util.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * @author www.devinline.com (nikhil) * */ public class SimpleConsumer { public static void main(String[] args) throws Exception { String topicName = "topic-devinline-1"; String groupName = "mygroup"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9091"); props.put("group.id", groupName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = null; try { consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Message received -> partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } catch (Exception ex) { ex.printStackTrace(); } finally { consumer.close(); } } }
Sample Output(Consumer): On executing consumer client , it consumes message from topic.
Message received -> partition = 0, offset = 43, key = Key0, value = Message from Kafka-topic-devinline-0
Message received -> partition = 0, offset = 44, key = Key1, value = Message from Kafka-topic-devinline-1
Message received -> partition = 0, offset = 45, key = Key2, value = Message from Kafka-topic-devinline-2
Message received -> partition = 0, offset = 46, key = Key3, value = Message from Kafka-topic-devinline-3
Message received -> partition = 0, offset = 57, key = Key4, value = Message from Kafka-topic-devinline-4
Message received -> partition = 0, offset = 48, key = Key5, value = Message from Kafka-topic-devinline-5
Message received -> partition = 0, offset = 49, key = Key6, value = Message from Kafka-topic-devinline-6
Message received -> partition = 0, offset = 50, key = Key7, value = Message from Kafka-topic-devinline-7
Message received -> partition = 0, offset = 51, key = Key8, value = Message from Kafka-topic-devinline-8
Message received -> partition = 0, offset = 52, key = Key9, value = Message from Kafka-topic-devinline-9
Send records synchronously with Kafka Producer(blocking call)
Kafka provides capability to send message synchronously using get() call followed by send().Send call is asynchronous and it returns a Future for the RecordMetadata that will be assigned to this record. On invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record. Below sample code demonstrate sending message synchronously.
package com.devinline.kafkasamples; /** * https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html */ import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * @author www.devinline.com (nikhil) * */ public class SyncKafkaProducer { public static void main(String... args) throws Exception { if (args.length == 0) { doSyncProduce(5); } else { doSyncProduce(Integer.parseInt(args[0])); } } public static void doSyncProduce(int msgCount) { String topicName = "topic-devinline-1"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9091"); props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); long time = System.currentTimeMillis(); Producer<Long, String> producer = new KafkaProducer<Long, String>(props); for (long i = time; i < time + msgCount; i++) { String message = "Message from Kafka-topic-devinline-" + i; final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(topicName, i, message); /* * Synchronously send a record to a topic and returns RecordMetadata */ RecordMetadata outMetadata; try { outMetadata = producer.send(record).get(); long elapsedTime = System.currentTimeMillis() - time; System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n", record.key(),record.value(), outMetadata.partition(), outMetadata.offset(), elapsedTime); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } producer.close(); System.out.println("Message sent successfully"); } }
Open terminal and start consumer to consume message published to topic "topic-devinline-1".
[centos@host01 ~]$ kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic-devinline-1 --from-beginning ...... ...... Message from Kafka-topic-devinline-7 Message from Kafka-topic-devinline-8 Message from Kafka-topic-devinline-9 Message from Kafka-topic-devinline-1536140160571 Message from Kafka-topic-devinline-1536140160572 Message from Kafka-topic-devinline-1536140160573 Message from Kafka-topic-devinline-1536140160574 Message from Kafka-topic-devinline-1536140160575
Send records asynchronously with Kafka Producer(non-blocking call)
Kafka provides capability to send message asynchronously using callback mechanism. Kafka defines a Callback interface that we use for asynchronous operations. The callback interface allows code to execute when the request is complete. The onCompletion(RecordMetadata metadata, Exception exception) gets called when the asynchronous operation completes. The metadata gets set (not null) if the operation was a success, and the exception gets set (not null) if the operation had an error.
package com.devinline.kafkasamples; /** * https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html */ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /** * @author www.devinline.com (nikhil) * */ public class AsyncKafkaProducer { public static void main(String... args) throws Exception { doRunProducer(5); } static void doRunProducer(final int sendMessageCount) throws InterruptedException { String topicName = "topic-devinline-1"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9091"); props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); long time = System.currentTimeMillis(); Producer<Long, String> producer = new KafkaProducer<Long, String>(props); final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount); try { for (long index = time; index < time + sendMessageCount; index++) { final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(topicName, index, "Hello message " + index); producer.send(record, (metadata, exception) -> { long elapsedTime = System.currentTimeMillis() - time; if (metadata != null) { System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n", record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(25, TimeUnit.SECONDS); } finally { producer.flush(); producer.close(); } } }
Sample output (Async producer): Message is published in topic and callback is called in async way which can be validated by output time details. Above sample program uses CountDownLatch to send N messages and wait for all to complete.
38 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9091]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 0
413 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
413 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
sent record(key=1536142295179 value=Hello message 1536142295179) meta(partition=0, offset=213) time=788
sent record(key=1536142295180 value=Hello message 1536142295180) meta(partition=0, offset=214) time=790
sent record(key=1536142295181 value=Hello message 1536142295181) meta(partition=0, offset=215) time=790
sent record(key=1536142295182 value=Hello message 1536142295182) meta(partition=0, offset=216) time=790
sent record(key=1536142295183 value=Hello message 1536142295183) meta(partition=0, offset=217) time=790
695 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
38 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9091]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 0
413 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
413 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
sent record(key=1536142295179 value=Hello message 1536142295179) meta(partition=0, offset=213) time=788
sent record(key=1536142295180 value=Hello message 1536142295180) meta(partition=0, offset=214) time=790
sent record(key=1536142295181 value=Hello message 1536142295181) meta(partition=0, offset=215) time=790
sent record(key=1536142295182 value=Hello message 1536142295182) meta(partition=0, offset=216) time=790
sent record(key=1536142295183 value=Hello message 1536142295183) meta(partition=0, offset=217) time=790
695 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Liên hệ Aivivu, đặt vé máy bay tham khảo
ReplyDeletevé máy bay đi Mỹ Vietnam Airline
vé máy bay từ mỹ về việt nam hãng korea
vé máy bay từ đức về sài gòn
giá vé máy bay từ nga về việt nam
vé máy bay từ anh về việt nam vietnam airlines
lịch bay từ pháp về việt nam
khách sạn cách ly hà nội
Chuyen bay cho chuyen gia nuoc ngoai
Photography is a fundamental piece of TV creation. https://watch-free.tv/
ReplyDelete