sparkStreaming连接kafka整合hbase和redis
生活随笔
收集整理的這篇文章主要介紹了
sparkStreaming连接kafka整合hbase和redis
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
sparkStreaming消費kafka數據,并將數據保存到redis和hbase當中去,實現實時
import org.apache.hadoop.hbase.client.{Admin, Connection} import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import redis.clients.jedis.Jedisobject StreamingKafka extends Logging{def main(args: Array[String]): Unit = {val brokers = ConfigUtil.getConfig(Constants.KAFKA_BOOTSTRAP_SERVERS)val topics = Array(ConfigUtil.getConfig(Constants.CHENG_DU_GPS_TOPIC),ConfigUtil.getConfig(Constants.HAI_KOU_GPS_TOPIC))val conf = new SparkConf().setMaster("local[1]").setAppName("sparkKafka")val group:String = "gps_consum_group"val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> group,"auto.offset.reset" -> "latest",// earliest,latest,和none"enable.auto.commit" -> (false: java.lang.Boolean))val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()val context: SparkContext = sparkSession.sparkContextcontext.setLogLevel("WARN")// val streamingContext = new StreamingContext(conf,Seconds(5))//獲取streamingContextval streamingContext: StreamingContext = new StreamingContext(context,Seconds(1))val result: InputDStream[ConsumerRecord[String, String]] = HbaseTools.getStreamingContextFromHBase(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")result.foreachRDD(eachRdd =>{if(!eachRdd.isEmpty()){eachRdd.foreachPartition(eachPartition =>{val connection: Connection = HBaseUtil.getConnectionval jedis: Jedis = JedisUtil.getJedis//判斷表是否存在,如果不存在就進行創建val admin: Admin = connection.getAdminif(!admin.tableExists(TableName.valueOf(Constants.HTAB_GPS))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_GPS))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}if(!admin.tableExists(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}eachPartition.foreach(record =>{//保存到HBase和redisval consumerRecords: ConsumerRecord[String, String] = HbaseTools.saveToHBaseAndRedis(connection,jedis, record)})JedisUtil.returnJedis(jedis)connection.close()})//更新offsetval offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges//result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //將offset提交到默認的kafka的topic里面去保存for(eachrange <- offsetRanges){val startOffset: Long = eachrange.fromOffset //起始offsetval endOffset: Long = eachrange.untilOffset //結束offsetval topic: String = eachrange.topicval partition: Int = eachrange.partitionHbaseTools.saveBatchOffset(group,topic,partition+"",endOffset)}}})streamingContext.start()streamingContext.awaitTermination()} }總結
以上是生活随笔為你收集整理的sparkStreaming连接kafka整合hbase和redis的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【记录保存】批量删除进程
- 下一篇: Linux常用压缩和解压命令