sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据
Spark Streaming官方提供Receiver-based和Direct Approach兩種方法接入Kafka數據,本文簡單介紹兩種方式的pyspark實現。
1、Spark Streaming接入Kafka方式介紹
Spark Streaming 官方提供了兩種方式讀取Kafka數據:
一是Receiver-based Approach。該種讀取模式官方最先支持,并在Spark 1.2提供了數據零丟失(zero-data loss)的支持;
一是Direct Approach (No Receivers)。該種讀取方式在Spark 1.3引入。
1.1 Receiver-based Approach
Receiver-based的Kafka讀取方式是基于Kafka高階(high-level) api來實現對Kafka數據的消費。在提交Spark Streaming任務后,Spark集群會劃出指定的Receivers來專門、持續不斷、異步讀取Kafka的數據,讀取時間間隔以及每次讀取offsets范圍可以由參數來配置。讀取的數據保存在Receiver中,具體StorageLevel方式由用戶指定,諸如MEMORY_ONLY等。當driver 觸發batch任務的時候,Receivers中的數據會轉移到剩余的Executors中去執行。在執行完之后,Receivers會相應更新ZooKeeper的offsets。如要確保at least once的讀取方式,可以設置spark.streaming.receiver.writeAheadLog.enable為true。具體Receiver執行流程見下圖:
需要借助Write Ahead Logs 來保證數據的不丟失,如果啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數據上的并行度
對于不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來并行接收數據,之后可以利用union來統一成一個Dstream
1.2 Direct Approach (No Receivers)
Direct方式采用Kafka簡單的consumer api方式來讀取數據,無需經由ZooKeeper,此種方式不再需要專門Receiver來持續不斷讀取數據。當batch任務觸發時,由Executor讀取數據,并參與到其他Executor的數據計算過程中去。由drive來決定讀取多少offsets,并將offsets交由checkpoints來維護。將觸發下次batch任務,再由Executor讀取Kafka數據并計算。從此過程可以發現Direct方式無需Receiver讀取數據,而是需要計算時再讀取數據,所以Direct方式的數據消費對內存的要求不高,只需要考慮批量計算所需要的內存即可;另外batch任務堆積時,也不會影響數據堆積。其具體讀取方式如下圖:
簡化的并行:在Receiver的方式中提到創建多個Receiver之后利用union來合并成一個Dstream的方式提高數據傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的并行讀取Kafka數據,這種映射關系也更利于理解和優化。
高效:在Receiver的方式中,為了達到0數據丟失需要將數據存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數據,浪費!而第二種方式不存在這個問題,只要我們Kafka的數據保留時間足夠長,我們都能夠從Kafka進行數據恢復。
精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取數據的方式,但由于Spark Streaming消費的數據和Zookeeper中記錄的offset不同步,這種方式偶爾會造成數據重復消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一致性。
2、Spark Streaming接入Kafka數據實現
以wordcount統計為例,kafka生產端輸入詞組,Spark端讀取kafka流數據,并統計詞頻
2.1 Receiver方式收取數據
1)Import KafkaUtils并創建DStream
from pyspark.streaming.kafka import KafkaUtilskafkaStream = KafkaUtils.createStream(streamingContext, \
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
ZK Quorum:Zookeeper quorum (hostname:port,hostname:port,..)
Groupid:消費者的groupid
Topics:{topic_name : numPartitions}
2)具體實現代碼如下:
from pyspark import SparkContextfrom pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
#if len(sys.argv) != 3:
# print("Usage: kafka_wordcount.py ", file=sys.stderr)
# exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 10)
zkQuorum = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181"
groupid = "spark-streaming-consumer"
topic = {"kafka_spark_test1":0,"kafka_spark_test1":1,"kafka_spark_test1":2}
#zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
在Spark目錄執行命令:
spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py2.2 Direct方式收取數據
1)Import KafkaUtils并創建DStream
from pyspark.streaming.kafka import KafkaUtilsdirectKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
ssc:StreamingContext
topics:消費的topics清單
{"metadata.broker.list": brokers}:kafka參數,可以指定為 metadata.broker.list或bootstrap.servers
默認情況下,從每個kafka分區的最新的offset進行消費,如果在kafka參數中設置了auto.offset.reset 為smallest,則會從最小的offset進行消費
如果希望保存每個批量消費的kafka offset,可以進行如下操作:
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream \
.transform(storeOffsetRanges) \
.foreachRDD(printOffsetRanges)
如果希望使用基于Zookeeper的Kafka監控,也可以通過這種方法展現Streaming的進程。
2)具體實現代碼如下:
from pyspark import SparkContextfrom pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
if __name__ == "__main__":
#if len(sys.argv) != 3:
# print("Usage: direct_kafka_wordcount.py ", file=sys.stderr)
# exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)
#brokers, topic = sys.argv[1:]
topic="kafka_spark_test1"
brokers = "192.168.112.101:9092,192.168.112.102:9092,192.168.112.103:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
kvs.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
counts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
在Spark根目錄執行命令:
spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py2.3 Kafka生產者配置
Kafka集群環境的安裝配置,參考之前的文檔"大數據系列之Kafka集群環境部署"中相關內容
1)啟動zookeeper
[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
2)啟動Kafka集群
[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
3)創建Kafka topic
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --replication-factor 2 --partitions 3 --topic kafka_spark_test1Created topic "kafka_spark_test1".
創建名為kafka_spark_test1 的Topic,復制因子設為2,同時分區數為3,注意,分區數是read parallelisms的最大值
4)查看Topic詳情
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --topic kafka_spark_test1Topic:kafka_spark_test1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: kafka_spark_test1 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: kafka_spark_test1 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: kafka_spark_test1 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
指定--zookeeper選項的值為192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181,對應的Topic,即剛創建的kafka_spark_test1
2.4 Kafka-Spark Streaming流測試
1)下載依賴的jars包
2)啟動kafka生產者
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test13)運行Spark Streaming流數據處理程序
[root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py[root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py
4)在Kafka生產端輸入流數據
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1>hello world
>hello tango hello
>hello tango tango
5)終端打印結果
-------------------------------------------Time: 2018-08-08 11:03:15
-------------------------------------------
(u'tango', 2)
(u'hello', 1)
6)登錄SparkWeb UI,查看Spark Streaming的的運行情況
a) spark-submit時候指定spark-submit --master spark://192.168.112.121:7077才能在8080端口看到數據
b) 如果通過yarn模式調度,可通過8088端口查看
2.5 Spark寫入Kafka
1)安裝Kafka插件
Pyspark訪問Kafka需要使用到kafka安裝包,使用以下命令安裝:
pip install --no-index --find-links=../kafka-1.3.5-py2.py3-none.any.whl kafka2)調用KafkaProducer模塊,spark作為生產者將數據傳輸到kafka端
from kafka import KafkaProducerto_kafka = KafkaProducer(bootstrap_servers=broker_list)
to_kafka.send(topic_name,send_msg,encode(‘utf8’))
to_kafka.flush()
參考資料
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
大數據系列之Kafka集群環境部署
總結
以上是生活随笔為你收集整理的sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 养老保险一个月多少钱啊?
- 下一篇: 《寻人大师》剧中各主角的相应扮演者都是谁