Favourite(Popular) Mobile application: In previous post we did setup and wrote word count program. In this post we will write a Kafka stream app which give favourite colour - count of each colour favoured by number of user.
Prerequisite: Zookeeper and Single node Kafka setup should be up and running. Refer this to setup Setup Single Node Single/Multi Broker Configuration (Kafka and Zookeeper).
User and its favourite mobile is captured as input (producer submit in Input topic) and Kafka Stream listener (Java application that need to be developed) processes the request and gives output most popular mobile brand and associated count.
Input: <user_name>,<favourite_mobile_brand> -- coma is delimiter
Output: <Mobile_brand> : <Count>
1. Read input topic from Kafka as Kstream
2. Filter invalid input - consider only mobile brands {"iphone", "vivo", "xiaomi","nokia"}
3. Use SelectKey to fetch userId
4. Use MapValues to extract mobile brand in Lowercase
5. Filter and remove unwanted input (Input with userId and mobile from different brand)
6. Write Output to intermediary topic - For converting KStream to KTtable
7. Read from Kafka as KTable
8. Group by Mobile brand
9. Count to total mobile brands in KTable
10. Write output to Output topic
Create Kafka topic : Create Input/output and intermediary topic.
Open terminal Start Zookeeper and Kafka broker on localhost:9091. Use jps command to validate zookeeper and Kafka is updated and running.
Start Zookeeper
Start Kafka producer(Input to Kafka stream client) and consumer(Output topic): Input (userid,mobile) is passed constantly in input Kafka topic and Kafka stream client process that and produces output(mobile,count>) in output topic.
Start Kafka Producer : Producer produces input in topic "favourite-mobile-input"
Sample Output : Below are sample input and output of producer and consumer.
Producer :
Consumer:
Prerequisite: Zookeeper and Single node Kafka setup should be up and running. Refer this to setup Setup Single Node Single/Multi Broker Configuration (Kafka and Zookeeper).
User and its favourite mobile is captured as input (producer submit in Input topic) and Kafka Stream listener (Java application that need to be developed) processes the request and gives output most popular mobile brand and associated count.
Input: <user_name>,<favourite_mobile_brand> -- coma is delimiter
Output: <Mobile_brand> : <Count>
Development strategy: Topology and Operations involved
2. Filter invalid input - consider only mobile brands {"iphone", "vivo", "xiaomi","nokia"}
3. Use SelectKey to fetch userId
4. Use MapValues to extract mobile brand in Lowercase
5. Filter and remove unwanted input (Input with userId and mobile from different brand)
6. Write Output to intermediary topic - For converting KStream to KTtable
7. Read from Kafka as KTable
8. Group by Mobile brand
9. Count to total mobile brands in KTable
10. Write output to Output topic
Create Kafka topic : Create Input/output and intermediary topic.
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-input Created topic "favourite-mobile-input". [centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact Created topic "user-detail-and-mobile". [centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact Created topic "favourite-mobile-output".
Java Kafka Stream client:
Create a Java class similar to WordCount example and copy & paste below sample code. Below is Java Kafka Stream client for processing Input <User, MobileBrand>:package com.devinline.kafkastream; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; /** * @author www.devinline.com (nikhil) * */ public class FavouriteMobileApp { public static void main(String[] args) { Properties props = new Properties(); /* * ApplicationId is specific to application which is used for consumer * groupId/default clientId prefix */ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-mobile-app"); /* List of IP address:port that need to be connected */ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9091"); /* Start topic consumption from Start */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); /* For serialization and desrialization of data */ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); KStreamBuilder builder = new KStreamBuilder(); /* Stream from Kafka : name of topic */ KStream<String, String> textInputLines = builder.stream("favourite-mobile-input"); String[] phones = {"iphone", "vivo", "xiaomi","nokia"}; /*favColorInputStream.filter((key,value) -> Arrays.asList(words).contains(value))*/ KStream<String, String> userAndMobilesStream = textInputLines.filter((key,value) -> value.contains(",")) .selectKey((key, value) -> value.split(",")[0].toLowerCase()) .mapValues( value -> value.split(",")[1].toLowerCase()) .filter((key,value) -> Arrays.asList(phones).contains(value)); //.filter((user,mobile) -> Arrays.stream(phones).parallel().anyMatch(mobile::contains)); /* Publish in intermediary topic */ userAndMobilesStream.to("user-detail-and-mobile"); /* Read topic in KTable */ KTable<String, String> userMobileTable = builder.table("user-detail-and-mobile"); /* Perform GroupBy Operation and followed by count colour */ KTable<String, Long> favouriteColors = userMobileTable.groupBy((user,mobile) -> new KeyValue<>(mobile, mobile)) .count("CountByMobileBrand"); /* Publish in output topic */ favouriteColors.to(Serdes.String(),Serdes.Long(),"favourite-mobile-output"); KafkaStreams streams = new KafkaStreams(builder, props); streams.cleanUp(); streams.start(); /*Print topology details*/ System.out.println(streams.toString()); /*Add shutdownhook for graceful shutdown */ Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Open terminal Start Zookeeper and Kafka broker on localhost:9091. Use jps command to validate zookeeper and Kafka is updated and running.
Start Zookeeper
[centos@host01 zookeeper]$ zkServer.sh start
Start Kafka broker
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties
[centos@host01 zookeeper]$ jps 3169 Kafka 2649 QuorumPeerMain 10169 Jps
Start Kafka producer(Input to Kafka stream client) and consumer(Output topic): Input (userid,mobile) is passed constantly in input Kafka topic and Kafka stream client process that and produces output(mobile,count>) in output topic.
Start Kafka Producer : Producer produces input in topic "favourite-mobile-input"
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic favourite-mobile-input
Start Kafka Consumer: Consumer consume message from topic "favourite-mobile-output"[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \ --topic favourite-mobile-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Sample Output : Below are sample input and output of producer and consumer.
Producer :
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic favourite-mobile-input >nik,iphone >ranjan,iphone >Jack,Nokia >Susi,lg >Kavi,xiaomi >Raj,vivo >Rajni,Vivo >Roy,Nokia >Nik,iphone >Ranjan,Nokia >Kavi,Iphone >
Consumer:
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \ > --topic favourite-mobile-output \ > --from-beginning \ > --formatter kafka.tools.DefaultMessageFormatter \ > --property print.key=true \ > --property print.value=true \ > --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ > --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer iphone 1 iphone 2 nokia 1 xiaomi 1 vivo 1 vivo 2 nokia 2 iphone 1 iphone 2 iphone 1 nokia 3 xiaomi 0 iphone 2
Tags:
Kafka
Mua vé tại đại lý vé máy bay Aivivu, tham khảo
ReplyDeletevé máy bay đi Mỹ Vietnam Airline
về việt nam từ mỹ
gia ve may bay tu han quoc ve viet nam
Vé máy bay từ Úc về VN
từ canada về việt nam quá cảnh ở đâu