Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. Earlier we did setup Kafka Cluster Multi Broker Configuration and performed basic Kafka producer /consumer operations. In this post we will use multi broker Kafka cluster and demonstrate how to use Kafka Streams with word count example.
Create Input and Output Kafka Topic: Create input & output topic with two partitions which will be used to publish & consume messages.
Start Kafka: Starting Kafka on localhost and port 9091 and 9092.
3. Create a Java class with following code. Below class listens to Input topic registered brokers (bootstrap servers) and publish message to output topic. This Kafka stream app process incoming textLine and split them based on space, keep on counting word and increments its count.
Start a Kafka consumer : This consumer reads message from output topic where message is processed and published by Kafka stream java client.
Start Producer and publish message to Kafka input topic:
Kafka Consumer message consumption: Consumer display word count processed by Java Kafka Stream applications and published in output topic. It is live stream processing.
Kafka stream application Internal topics: Running a Kafka Stream may eventually create internal intermediary topics. These are two types of internal topic created - Managed by Kafka stream and used by Kafka stream. We should never use, delete or publish any message in it. It's internal so does not exist for outer world.
Scaling Kafka Stream application: Input topic we created earlier has two partition so we could spin up to 2 instance of Kafka Stream application. Most importantly Kafka Stream application relies on Kafka Consumer and we can add multiple Kafka consumer to consumer group. We will demonstrate this using two instance of wordCount app and launch two instance of WordCount app using jar.
Create Input and Output Kafka Topic: Create input & output topic with two partitions which will be used to publish & consume messages.
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic word-count-input-topic Created topic "word-count-input-topic". [centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic word-count-output-topic Created topic "word-count-output-topic".
Start Kafka: Starting Kafka on localhost and port 9091 and 9092.
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties [centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties
Word count Kafka stream application :
1. Create a Maven project and modify pom.xml with following dependencies.<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>2. Create a file log4.properties in resource directory and add following properties.
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n
3. Create a Java class with following code. Below class listens to Input topic registered brokers (bootstrap servers) and publish message to output topic. This Kafka stream app process incoming textLine and split them based on space, keep on counting word and increments its count.
package com.devinline.kafkastream; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; public class StreamStartupApp { public static void main(String[] args) { Properties props = new Properties(); /* * ApplicationId is specific to application which is used for consumer * groupId/default clientId prefix */ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wrod-count"); /* List of IP address:port that need to be connected */ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9091, 127.0.0.1:9092"); /* Start topic consumption from Start */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); /* For serialization and desrialization of data */ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); KStreamBuilder builder = new KStreamBuilder(); /* Stream from Kafka : name of topic */ KStream<String, String> wordCountInputStream = builder.stream("word-count-input-topic"); /* Convert word to lowercase */ KTable<String, Long> wordCounts = wordCountInputStream.mapValues(textLineInput -> textLineInput.toLowerCase()) /* Convert word to lowercase */ .flatMapValues(lowerCaseTextLine -> Arrays.asList(lowerCaseTextLine.split(" "))) /* Select key */ .selectKey((ignoredKey, value) -> value) /* Using default grouping using Serdes.String().getClass() */ .groupByKey() /* Compute word count*/ .count("Count"); /* Publish wordcounts output to output kafka */ wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output-topic"); KafkaStreams kafkaStreams = new KafkaStreams(builder, props); kafkaStreams.start(); /*Print topology details*/ System.out.println(kafkaStreams); /* Graceful degradation of app - add Shutdownhook */ Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); } }
Start a Kafka consumer : This consumer reads message from output topic where message is processed and published by Kafka stream java client.
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ > --topic word-count-output-topic \ > --from-beginning \ > --formatter kafka.tools.DefaultMessageFormatter \ > --property print.key=true \ > --property print.value=true \ > --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ > --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
WordCount App Eclipse Project Structure |
Start Producer and publish message to Kafka input topic:
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic word-count-input-topic >Kafka stream works >Hello Kafka Stream >WOrks [centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic word-count-input-topic ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. >From producer-1 hello kafka >Kafka stream >
Kafka Consumer message consumption: Consumer display word count processed by Java Kafka Stream applications and published in output topic. It is live stream processing.
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ > --topic word-count-output-topic \ > --from-beginning \ > --formatter kafka.tools.DefaultMessageFormatter \ > --property print.key=true \ > --property print.value=true \ > --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ > --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer kafka 1 stream 1 works 1 kafka 2 hello 1 stream 2 works 2 from 1 kafka 3 producer-1 1 hello 2 kafka 4 stream 3
Kafka stream application Internal topics: Running a Kafka Stream may eventually create internal intermediary topics. These are two types of internal topic created - Managed by Kafka stream and used by Kafka stream. We should never use, delete or publish any message in it. It's internal so does not exist for outer world.
- Repartitioning topic: On transforming of key of stream, repartitioning will happen at some processor.
- Change-log topic: On aggregation Kafka Stream saves compacted data in topic.
[centos@host01 kafka]$ kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets kafka-stream-wrod-count-Count-changelog kafka-stream-wrod-count-Count-repartition word-count-input word-count-input-topic word-count-output word-count-output-topic
Scaling Kafka Stream application: Input topic we created earlier has two partition so we could spin up to 2 instance of Kafka Stream application. Most importantly Kafka Stream application relies on Kafka Consumer and we can add multiple Kafka consumer to consumer group. We will demonstrate this using two instance of wordCount app and launch two instance of WordCount app using jar.
- Export runnable jar from eclipse with dependencies say KSWordCountApp.jar.
- Spin two instance of word count app. Run below command in two different terminal.
[centos@host01 Desktop]$ java -jar KSWordCountApp-1.jar
- Observe log after starting instance-1 and how does it changes after starting instance-2.
WordCountApp Instance-1 Log: [centos@host01 Desktop]$ java -jar KSWordCountApp-1.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:rsrc:slf4j-simple-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:rsrc:slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory] 189 [main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: application.id = kafka-stream-wrod-count application.server = bootstrap.servers = [127.0.0.1:9091, 127.0.0.1:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 10485760 ..... ...... Count-repartition-0, word-count-input-topic-0] assigned at the end of consumer rebalance. current active tasks: [0_0, 1_0, 0_1, 1_1] current suspended standby tasks: []
- Start instance-2 and again observe the log of terminal-1 and terminal-2 : Active tasks spliced between two word count app.
WordCountApp Instance-1 Log: 39121 [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] partition assignment took 37 ms. current active tasks: [0_0, 1_0] current standby tasks: [] WordCountApp Instance-2 Log: 3114 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] partition assignment took 605 ms. current active tasks: [0_1, 1_1] current standby tasks: []
- For end user change is not visible. Publish message and observe response in consumer, it will same as earlier. At any time we can shutdown instance-1 or 2 and it will not affect consumer working.
- Shutdown instance-2 and again observe logs. Instance-2 gracefully shutdown and active task list of instance-1 is same as when it was started with all four states.
WordCountApp Instance-2 Log: 1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] Removing all standby tasks [] 1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] Stream thread shutdown complete 1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD. 1280475 [kafka-streams-close-thread] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651] Stopped Kafka Streams process. 1280475 [Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651] State transition from PENDING_SHUTDOWN to NOT_RUNNING. WordCountApp Instance-1 Log: 1328049 [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] partition assignment took 205 ms. current active tasks: [0_0, 1_0, 0_1, 1_1] current standby tasks: []
Tags:
Kafka
Wow,great information. I am sure the info on your blog will help others,Thanks.
ReplyDeleteC C++ Training in Chennai
C Training in Chennai
C++ Training in Chennai
C C++ Training in Velachery
core java training in chennai
javascript training in chennai
SAS Training in Chennai
QTP Training in Chennai
Liên hệ Aivivu, đặt vé máy bay tham khảo
ReplyDeletevé máy bay đi Mỹ tháng nào rẻ nhất
chuyến bay từ mỹ về việt nam 2021
vé máy bay từ canada về việt nam
có chuyến bay từ nhật về việt nam chưa
vé máy bay hàn quốc hà nội
Vé máy bay từ Đài Loan về Việt Nam
giá khách sạn cách ly ở tphcm
Nonetheless, PVC does have a few major drawbacks. Its defining characteristic is also the source of its greatest weakness. It is difficult to preserve since it solidifies and contracts when exposed to cold. Distributors and end users should be aware that PVC Shrink Film will automatically and irreversibly shrink in storage if kept in the improper climate. Moreover, PVC shrink film must be sealed using specialized packing equipment. PVC films have been prohibited by several European and international manufacturers due to concerns about their impact on the environment.
ReplyDelete