使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)
生活随笔
收集整理的這篇文章主要介紹了
使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
接? ? 使用idea編寫SparkStreaming消費kafka中的數據【小案例】(四)
https://georgedage.blog.csdn.net/article/details/103508619
先對上篇做一個回顧,在上一篇我們編寫消費者,并且使用sparkStreaming對kafka中的數據進行批處理。
這篇對我們接收來的數據進行一個指標的統計。
這是原始的數據
1576139418467 4 1 henan kaifeng 1576139419467 3 0 hebei handan 1576139420467 3 2 henan kaifeng 1576139421467 0 0 beijing daxing 1576139422467 3 0 beijing daxing 1576139423467 1 2 henan zhengzhou 1576139424467 4 2 henan kaifeng 1576139425477 0 0 henan kaifeng 1576139426483 0 0 beijing haidian 1576139427483 3 0 beijing haidian 1576139428483 4 2 henan kaifeng 1576139429483 1 2 henan kaifeng 1576139430483 1 0 guangzhou zhongshan 1576139431483 3 0 henan zhengzhou 1576139432483 4 0 guangzhou zhuhai 1576139433483 1 0 guangzhou zhongshan對時間戳進行轉化,然后統計這一天中訪問的同一ip多次點擊同一廣告id的前幾位,可以經過一個閾值的過濾,將其列為黑名單。也就是我們所生產的日志,例如你進行搜索時百度第一頁的某廣告,點擊是需要向百度進行付款的,所以我們對于惡意點擊者進行拉黑處理。算是一個風控措施。
友情提示:如果按照之前我們對于生產者的隨機數的話,不容易看到效果,所以講生產者代碼中隨機數范圍進行縮小。
?
消費者代碼如下:
package com.kafkaimport java.text.SimpleDateFormat import java.util.Dateimport org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}object KafkaConsumerStreamingDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[3]").setAppName("kks")val sc = new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(5))sc.setLogLevel("error")val topic = List("george")val map = Map("bootstrap.servers" -> "henu2:9092","group.id" -> "george","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")ssc.checkpoint("D:\\ckpoint")val ds = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic, map))val mapDs = ds.map(_.value())val sdf = new SimpleDateFormat("yyyy-MM-dd")val userDs: DStream[(String, Int)] = mapDs.transform(x => x.map(line => {val arr = line.split(" ")val date = sdf.format(new Date(arr(0).toLong))val userId = arr(1)val adId = arr(2)(date + "," + userId + "," + adId, 1)}))val reduceDs = userDs.reduceByKey(_+_)val resDs: DStream[(String, Int)] = reduceDs.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {val now = currentValues.sumval pre = preValue.getOrElse(0)Option(now + pre)})resDs.print()ssc.start()ssc.awaitTermination()} }部分結果展示:?
(2019-12-12,2,2,1) (2019-12-12,4,0,3) (2019-12-12,1,0,4) (2019-12-12,3,1,1) (2019-12-12,4,1,3) (2019-12-12,3,2,2) (2019-12-12,2,0,2) (2019-12-12,4,2,2) (2019-12-12,2,1,1) (2019-12-12,0,0,5)后續就是我們假如將一天內單個ip同一adid的訪問數值大于100的進行過濾處理,然后存儲即可。下回分解!!!
總結
以上是生活随笔為你收集整理的使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SparkStreaming Exce
- 下一篇: 每日两SQL(5),欢迎交流~