sparkstreaming监听hdfs目录_flume kafka和sparkstreaming整合
本文介紹Flume、Kafka和Sparkstreaming的整合。代碼流程是,我們通過shell腳本重播測試軌跡數(shù)據(jù)到指定軌跡文件中,使用Flume監(jiān)聽該軌跡數(shù)據(jù)文件,實時將軌跡數(shù)據(jù)發(fā)送到Kafka,然后使用SparkStreaming簡單統(tǒng)計軌跡數(shù)據(jù)量。
簡單介紹下Flume
flume核心角色是agent,每個Agent相當(dāng)于數(shù)據(jù)傳遞員,agent內(nèi)部有3大組件
source:源端數(shù)據(jù)采集,Flume內(nèi)置多種Source,同時還提供了自定義的Source
Channel:數(shù)據(jù)傳輸通道,主要用的是memory channel和File channel
Sink:移動數(shù)據(jù)到目標(biāo)端,如HDFS、KAFKA、DB以及自定義的sink
kafka和spark在這里就不做介紹了,之后會有大量文章分析Kafka和Spark。
代碼整合流程:
1.將flume包下載到數(shù)據(jù)節(jié)點,解壓,進入flume的conf目錄,編輯 gps.conf
a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = exec#flume監(jiān)聽軌跡文件內(nèi)容的變化 tuch gpsa1.sources.r1.command = tail -F /data/gps/gpsa1.sources.r1.fileHeader = true#a1.sources.r1.ignorePattern=(^gps_)# Describe the sinka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink#flume監(jiān)聽的文件數(shù)據(jù)發(fā)送到此kafka的主題當(dāng)中a1.sinks.k1.topic = gps_topica1.sinks.k1.brokerList= cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092a1.sinks.k1.batchSize = 20a1.sinks.k1.requiredAcks = 1a1.sinks.k1.producer.linger.ms = 1# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel?=?c12.創(chuàng)建回放腳本 play_gps.sh
#!/bin/bashsrcfile=/data/gps/2020-9-13/testGps.logoutputpath=/data/gps/gpsecho "srcfile:"$srcfileecho "outputpath:"$outputpathwhile read linedo# sleep 0.01 sleep 0.5 echo $line `echo $line >>/data/gps/gps`done <$srcfileecho "compled!"3.創(chuàng)建軌跡存放文件,也就是flume agent要監(jiān)聽的文件
創(chuàng)建數(shù)據(jù)存放路徑,然后把準(zhǔn)備好的軌跡數(shù)據(jù)放到此目錄下
4.啟動flume
[root@cdh3 bin]# nohup flume-ng agent --name a1 --conf-file ../conf/gps.conf &[1] 976[root@cdh3 bin]# nohup: ignoring input and appending output to ‘nohup.out’[root@cdh3 bin]# tail -f nohup.out transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer20/09/13 01:36:54 INFO utils.AppInfoParser: Kafka version : 2.0.120/09/13 01:36:54 INFO utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce520/09/13 01:36:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.20/09/13 01:36:54 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started20/09/13 01:36:57 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.20/09/13 01:36:57 WARN clients.NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 2 : {gps_topic=LEADER_NOT_AVAILABLE}20/09/13?01:36:57?INFO?clients.Metadata:?Cluster?ID:?7Sq6gNsRQtW4c9eosNA6Nw5.回放軌跡數(shù)據(jù)
[root@cdh3 ~]# sh play_gps.sh srcfile:/data/gps/2020-9-13/testGps.logoutputpath:/data/gps/gps8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969147,104.07513,30.727248f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969150,104.07513,30.727028f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969154,104.07504,30.726728f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969156,104.07497,30.726308f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969159,104.07497,30.725828f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969162,104.07496,30.725448f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969168,104.07489,30.724876.數(shù)據(jù)回放后我們查看下kafka是否已經(jīng)產(chǎn)生對應(yīng)的主題
[root@cdh3 ~]# kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --list20/09/13 01:39:34 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Initializing a new session to cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka.20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.5.5-100-51e233a47ddbcf45f5aa690243bc31b25eded2a2, built on 08/05/2020 09:41 GMT20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:host.name=cdh3.macro.com20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_17120/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.compiler=20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd6420/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.version=3.10.0-327.el7.x86_6420/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:user.name=root20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:user.home=/root20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:user.dir=/root20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.memory.free=231MB20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.memory.max=256MB20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.memory.total=250MB20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka sessionTimeout=30000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@4efbca5a20/09/13 01:39:34 INFO common.X509Util: Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation20/09/13 01:39:34 INFO zookeeper.ClientCnxnSocket: jute.maxbuffer value is 4194304 Bytes20/09/13 01:39:34 INFO zookeeper.ClientCnxn: zookeeper.request.timeout value is 0. feature enabled=20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Waiting until connected.20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Opening socket connection to server cdh2.macro.com/192.168.0.207:2181. Will not attempt to authenticate using SASL (unknown error)20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Socket error occurred: cdh2.macro.com/192.168.0.207:2181: Connection refused20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Opening socket connection to server cdh3.macro.com/192.168.0.208:2181. Will not attempt to authenticate using SASL (unknown error)20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /192.168.0.208:50761, server: cdh3.macro.com/192.168.0.208:218120/09/13 01:39:34 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh3.macro.com/192.168.0.208:2181, sessionid = 0x10014dbded001e2, negotiated timeout = 3000020/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Connected.ATLAS_HOOKATLAS_SPARK_HOOK__consumer_offsetsgps_topicpos20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closing.20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Session: 0x10014dbded001e2 closed20/09/13 01:39:34 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x10014dbded001e220/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closed.7.可以看到kafka已經(jīng)生成對應(yīng)的topic,我們消費一下數(shù)據(jù)
[root@cdh3 ~]# kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --topic gps_topic20/09/13 01:40:00 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean20/09/13 01:40:00 INFO consumer.ConsumerConfig: ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [cdh1.macro.com:9092, cdh2.macro.com:9092, cdh3.macro.com:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = console-consumer-31005 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer20/09/13 01:40:01 INFO utils.AppInfoParser: Kafka version: 2.4.1.7.1.3.0-10020/09/13 01:40:01 INFO utils.AppInfoParser: Kafka commitId: 0cfbf9b7ef3ca50d20/09/13 01:40:01 INFO utils.AppInfoParser: Kafka startTimeMs: 159993240106720/09/13 01:40:01 INFO consumer.KafkaConsumer: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Subscribed to topic(s): gps_topic20/09/13 01:40:01 INFO clients.Metadata: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Cluster ID: 7Sq6gNsRQtW4c9eosNA6Nw20/09/13 01:40:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Discovered group coordinator cdh3.macro.com:9092 (id: 2147483539 rack: null)20/09/13 01:40:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] (Re-)joining group20/09/13 01:40:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] (Re-)joining group20/09/13 01:40:04 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Finished assignment for group at generation 1: {consumer-console-consumer-31005-1-d8761451-a2c6-4b40-a18e-ba4b27ee4315=Assignment(partitions=[gps_topic-0])}20/09/13 01:40:04 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Successfully joined group with generation 120/09/13 01:40:04 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Adding newly assigned partitions:gps_topic-020/09/13 01:40:04 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Found no committed offset for partition gps_topic-020/09/13 01:40:04 INFO internals.SubscriptionState: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Resetting offset for partition gps_topic-0 to offset 185.8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969700,104.10055,30.706508f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969702,104.10064,30.706348f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969706,104.10083,30.706038f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969711,104.10114,30.705498f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969714,104.10130,30.705218f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969717,104.10148,30.704918f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969724,104.10179,30.704388f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969727,104.10195,30.704118f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969730,104.10205,30.70393^C20/09/13 01:40:21 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Revoke previously assigned partitions gps_topic-020/09/13 01:40:21 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Member consumer-console-consumer-31005-1-d8761451-a2c6-4b40-a18e-ba4b27ee4315 sending LeaveGroup request to coordinator cdh3.macro.com:9092 (id: 2147483539 rack: null) due to the consumer is being closedProcessed a total of 36 messages用SparkStreaming代碼消費Kafka數(shù)據(jù)
import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.streaming.kafka010.KafkaUtilsimport org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentobject SparkStreamingAndKafka { def main(args: Array[String]): Unit = { import org.apache.spark._ import org.apache.spark.streaming._ Logger.getLogger("org").setLevel(Level.WARN) //表示使用兩個線程來模擬spark集群 val conf = new SparkConf().setAppName("SparkStreamingAndKafka").setMaster("local[2]") //初始化Spark Streaming環(huán)境 val streamingContext = new StreamingContext(conf, Seconds(1)) //設(shè)置檢查點 streamingContext.checkpoint("/sparkapp/tmp") //"auto.offset.reset" -> "earliest" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "192.168.0.171:9092,192.168.0.207:9092,192.168.0.208:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "test0002", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("cheng_du_gps_topic") topics.foreach(println(_)) println("topics:" + topics) val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.count().print(); //啟動sparkstreaming程序 streamingContext.start(); streamingContext.awaitTermination(); streamingContext.stop() }}8.輸出結(jié)果
gps_topictopics:[Ljava.lang.String;@54a3ab8f20/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor20/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor20/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-test000220/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 -------------------------------------------Time: 1599938740000 ms-------------------------------------------466-------------------------------------------Time: 1599938741000 ms-------------------------------------------0至此flume、Kafka和sparkstreaming的整合完畢。
總結(jié)
以上是生活随笔為你收集整理的sparkstreaming监听hdfs目录_flume kafka和sparkstreaming整合的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何编写无法维护的代码_如何写出让同事无
- 下一篇: 中只有负下标里才能有零_门诊营销第一人武