学习笔记Kafka(七)—— Kafka 与Spark集成 —— 原理介绍与开发环境配置、实战
生活随笔
收集整理的這篇文章主要介紹了
学习笔记Kafka(七)—— Kafka 与Spark集成 —— 原理介绍与开发环境配置、实战
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
一、環(huán)境
1.1、Hadoop環(huán)境
1.2、Spark環(huán)境
1.3、Spark Streaming
1.4、Add Maven Dependencies & 開發(fā)流程
Add Scala Framework Support
添加依賴(在pom.xml添加)
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.3</version> </dependency> <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.3</version> </dependency> <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.4.3</version> </dependency>開發(fā)流程
二、實戰(zhàn)
實戰(zhàn)一:Kafka消息輸出到Console|本地測試|集群測試
package demo01import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}object SecondKafkaWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("SecondKafkaWordCount")val ssc = new StreamingContext(conf,Seconds(3))ssc.sparkContext.setLogLevel("WARN")val kafkaProperties = Map("bootstrap.servers"->"node100:9092,node101:9092,node102:9092")val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaProperties,Set("test_02_02"))//transformationval result = data.map(_._2).map(x=>(x,1)).reduceByKey((x,y)=>x+y)//actionresult.print()//啟動ssc.start()ssc.awaitTermination()} }
編譯打包,并上傳jar文件
執(zhí)行TimerProducer.java
結(jié)果:
實戰(zhàn)二:Kafka消息輸出到HDFS|本地測試|集群測試
package demo01import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}object KafkaWordCountHDFS {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").set("spark.executor.memory","512m").setAppName("KafkaWordCountConsole")val ssc = new StreamingContext(conf,Seconds(3))ssc.sparkContext.setLogLevel("WARN")val kafkaProperties = Map("bootstrap.servers"->"node110:9092,node111:9092,node112:9092")val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaProperties,Set("test_02_02"))//transformationval result = data.map(_._2).map(x=>(x,1)).reduceByKey((x,y)=>x+y)//actionresult.saveAsTextFiles("/tmp/wordcount","txt")//啟動ssc.start()ssc.awaitTermination()} }
編譯打包,上傳jar文件
spark提交
運行TimerProducer.java文件
結(jié)果:
總結(jié)
以上是生活随笔為你收集整理的学习笔记Kafka(七)—— Kafka 与Spark集成 —— 原理介绍与开发环境配置、实战的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学习笔记Kafka(六)—— Kafka
- 下一篇: 学习笔记Flink(七)—— Flink