Spark Streaming 实战案例(五) Spark Streaming与Kafka
生活随笔
收集整理的這篇文章主要介紹了
Spark Streaming 实战案例(五) Spark Streaming与Kafka
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
主要內容
1. Spark Streaming與Kafka版本的WordCount示例 (一)
向kafka集群發送消息
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest配置運行參數:
具體如下:
sparkmaster:2181,zookeeper監聽地址
test-consumer-group, consumer-group的名稱,必須和$KAFKA_HOME/config/consumer.properties中的group.id的配置內容一致
kafkatopictest,topic名稱
1,線程數
運行KafkaWordCount 后,在producer中輸入下列內容
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest [2015-11-04 03:25:39,666] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Spark Spark TEST TEST Spark Streaming得到結果如下:
2. Spark Streaming與Kafka版本的WordCount示例(二)
前面的例子中,producer是通過kafka的腳本生成的,本例中將給出通過編寫程序生成的producer
// 隨機生成1-100間的數字 object KafkaWordCountProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper連接屬性配置val props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")//創建KafkaProducerval producer = new KafkaProducer[String, String](props)// 向kafka集群發送消息while(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}}KafkaWordCountProducer 運行參數設置如下:
sparkmaster:9092 kafkatopictest 5 8sparkmaster:9092,broker-list
kafkatopictest,top名稱
5表示每秒發多少條消息
8表示每條消息中有幾個單詞
先KafkaWordCountProducer,然后再運行KafkaWordCount ,得到的計算結果如下:
總結
以上是生活随笔為你收集整理的Spark Streaming 实战案例(五) Spark Streaming与Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark Streaming 实战案例
- 下一篇: Scala入门到精通——第二十六节 Sc