【技术教程】SequoiaDB对接Kafka
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
?
1、?背景
當(dāng)前互聯(lián)網(wǎng)、金融、政府等行業(yè),活動(dòng)流數(shù)據(jù)幾乎無(wú)處不在。對(duì)這種數(shù)據(jù)通常的處理方式是先把各種活動(dòng)以日志的形式寫入某種文件,然后周期性地對(duì)這些文件進(jìn)行統(tǒng)計(jì)分析。活動(dòng)流數(shù)據(jù)的這種處理方式對(duì)實(shí)時(shí)性要求越來(lái)越高的場(chǎng)景已經(jīng)不在適用并且這種處理方式也增加了整個(gè)系統(tǒng)的復(fù)雜性,為了解決這種問題,分布式開源消息系統(tǒng)Kakfa已被多家不同類型的公司?作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。提供消息持久化能力,支持消息分區(qū),分布式消費(fèi),同時(shí)保證每個(gè)分區(qū)內(nèi)的消息順序傳輸,支持在線水平擴(kuò)展、高吞吐率,同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。
巨杉數(shù)據(jù)庫(kù)SequoiaDB支持海量分布式數(shù)據(jù)存儲(chǔ),并且支持垂直分區(qū)和水平分區(qū),利用這些特性可以將Kafka中的消息存儲(chǔ)到SequoiaDB中方便業(yè)務(wù)系統(tǒng)后續(xù)數(shù)據(jù)分析、數(shù)據(jù)應(yīng)用。本文主要講解巨杉數(shù)據(jù)庫(kù)SequoiaDB如何消費(fèi)Kafka中的消息以及將消息存儲(chǔ)到SequoiaDB中。
2、?產(chǎn)品介紹
巨杉數(shù)據(jù)庫(kù)SequoiaDB是一款分布式非關(guān)系型文檔數(shù)據(jù)庫(kù),可以被用來(lái)存取海量非關(guān)系型的數(shù)據(jù),其底層主要基于分布式,高可用,高性能與動(dòng)態(tài)數(shù)據(jù)類型設(shè)計(jì),它兼顧了關(guān)系型數(shù)據(jù)庫(kù)中眾多的優(yōu)秀設(shè)計(jì):如索引、動(dòng)態(tài)查詢和更新等,同時(shí)以文檔記錄為基礎(chǔ)更好地處理了動(dòng)態(tài)靈活的數(shù)據(jù)類型。PostgreSQL支持標(biāo)準(zhǔn)SQL,巨杉SequoiaDB?SSQL套件通過擴(kuò)展?PostgreSQL功能可以使用標(biāo)準(zhǔn)SQL?語(yǔ)句訪問?SequoiaDB?數(shù)據(jù)庫(kù),完成對(duì)SequoiaDB?數(shù)據(jù)庫(kù)的各種操作。將Kafka中的消息存儲(chǔ)到SequoiaDB后,可利用巨杉SequoiaDB?SSQL對(duì)這些消息數(shù)據(jù)進(jìn)行在線實(shí)時(shí)的數(shù)據(jù)分析和數(shù)據(jù)應(yīng)用。
3、?環(huán)境搭建
3.1、軟件配置
操作系統(tǒng):windows?7
JDK:1.7.0_80?64位,下載地址為:http://www.oracle.com/technetwork/java/javase/downloads/java-archive-downloads-javase7-521261.html#jdk-7u80-oth-JPR
eclipse:4.5.2
SequoiaDB:1.12.5或以上版本
Kakfa:0.10.0.0,下載地址為:http://211.162.127.20/files/5115000001D9C0FE/www-us.apache.org/dist/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
本項(xiàng)目主要實(shí)現(xiàn)從Kafka中消費(fèi)數(shù)據(jù)并寫入到SequoiaDB中來(lái)展示Kafka對(duì)接SequoiaDB的整個(gè)過程。
創(chuàng)建項(xiàng)目工程如下圖:
?
圖3-1-1
3.2、kafka啟動(dòng)及topic創(chuàng)建
在kafka啟動(dòng)前啟動(dòng)zookeeper,Kafka啟動(dòng),執(zhí)行腳本如下:
./kafka-server-start.sh?../config/server.properties?&
Kafka創(chuàng)建topic,執(zhí)行腳本如下:
./kafka-topics.sh?--zookeeper?localhost:2181?--create?--topic?kafkaSdb?--partitions?1?--replication-factor?1執(zhí)行結(jié)果如下圖:
?
圖3-2-1
驗(yàn)證Kafka主題,執(zhí)行腳本如下:
./kafka-topics.sh?--zookeeper?localhost:2181?–list執(zhí)行結(jié)果如下圖:
?
圖3-2-2
4、?代碼演示
4.1、框架搭建代碼展示
Kafka分布式系統(tǒng)分為生產(chǎn)者和消費(fèi)者,生產(chǎn)者主要產(chǎn)生消息數(shù)據(jù)供消費(fèi)者消費(fèi),消費(fèi)者主要消費(fèi)存儲(chǔ)在Kafka中的消息數(shù)據(jù)。本項(xiàng)目主要演示向SequoiaDB中寫入Kafka中的消息,故消息的生產(chǎn)只提供演示代碼。生產(chǎn)者和消費(fèi)者各種參數(shù)分別放在各自的配置文件中。
??生產(chǎn)端配置文件如下:
kafka-producer.propertiesbootstrap.servers=192.168.1.35:9092retries=0linger.ms=1key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializerpartitioner.class=com.sequoiadb.kafka.DefaultPartitioner??消費(fèi)端配置文件如下:
kafka-consumer.propertiesbootstrap.servers=192.168.1.35:9092?enable.auto.commit=true??auto.commit.interval.ms=60000enable.auto.commit=falseauto.offset.reset=earliestsession.timeout.ms=30000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer??Kafka主題、SequoiaDB集合、消息分區(qū)配置文件如下:
config.json[{topicName:'kafkaSdb',sdbCLName:'kafkaSdb',partitionNum:1,topicGroupName:'kafkaSdb-consumer-group',pollTimeout:5000}]4.2、業(yè)務(wù)實(shí)現(xiàn)代碼展示
4.2.1、配置代碼展示
本項(xiàng)目將Kafka的配置放在配置文件中如Kafka的主題,主題的分區(qū)數(shù),SequoiaDB集合并用java對(duì)象進(jìn)行封裝,利用工具類進(jìn)行獲取。
配置信息java實(shí)體類如下:
package?com.sequoiadb.kafka.bean;public?class?KafkaConsumerConfig?{private?String?topicName;private?String?sdbCLName;private?int?partitionNum?=?1;private?String?topicGroupName;private?long?pollTimeout?=?Long.MAX_VALUE;public?String?getTopicName()?{return?topicName;}public?void?setTopicName(String?topicName)?{this.topicName?=?topicName;}public?String?getSdbCLName()?{return?sdbCLName;}public?void?setSdbCLName(String?sdbCLName)?{this.sdbCLName?=?sdbCLName;}public?int?getPartitionNum()?{return?partitionNum;}public?void?setPartitionNum(int?partitionNum)?{this.partitionNum?=?partitionNum;}public?String?getTopicGroupName()?{return?topicGroupName;}public?void?setTopicGroupName(String?topicGroupName)?{this.topicGroupName?=?topicGroupName;}public?long?getPollTimeout()?{return?pollTimeout;}public?void?setPollTimeout(long?pollTimeout)?{this.pollTimeout?=?pollTimeout;}public?String?toString(){return?"[topicName="+this.topicName+",sdbCLName="+this.sdbCLName+",partitionNum="+this.partitionNum",topicGroupName="+this.topicGroupName+",pollTimeout="+this.pollTimeout+"]";}}配置信息獲取工具類如下:
package?com.sequoiadb.utils;import?java.io.IOException;import?java.io.InputStream;import?java.util.Properties;public?class?PropertiesUtils?{private?static?Properties?prop?=?null;static{InputStream?in?=?PropertiesUtils.class.getClassLoader().getResourceAsStream("config.properties");prop?=?new?Properties();try?{prop.load(in);}?catch?(IOException?e)?{e.printStackTrace();}}public?static?String?getProperties(String?key){return?(String)prop.get(key);}public?static?void?main(String[]?argc){System.out.println(PropertiesUtils.getProperties("scm.url"));}}4.2.2、業(yè)務(wù)邏輯代碼演示
生產(chǎn)者業(yè)務(wù)邏輯代碼展示:
package?com.sequoiadb.kafka;import?java.io.IOException;import?java.io.InputStream;import?java.util.Properties;import?org.apache.commons.io.IOUtils;import?org.apache.kafka.clients.producer.Callback;import?org.apache.kafka.clients.producer.KafkaProducer;import?org.apache.kafka.clients.producer.ProducerRecord;import?org.apache.kafka.clients.producer.RecordMetadata;import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?com.sequoiadb.utils.Configuration;public?class?PartitionTest?{private?static?Logger?log?=?LoggerFactory.getLogger(PartitionTest.class);private?static?String?location?=?"kafka-producer.properties";//?配置文件位置public?static?void?main(String[]?args)?{Properties?props?=?new?Properties();String?json?=?null;try?{props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));InputStream?in?=?Configuration.class.getClassLoader().getResourceAsStream("oracle.json");json?=?IOUtils.toString(in);}?catch?(IOException?e)?{e.printStackTrace();}KafkaProducer<String,?String>?producer?=?new?KafkaProducer<String,?String>(props);for?(int?i?=?0;?i?<?1000;?i++)?{ProducerRecord<String,?String>?record?=?new?ProducerRecord<String,?String>("oracle",?json);producer.send(record,?new?Callback()?{@Overridepublic?void?onCompletion(RecordMetadata?metadata,?Exception?e)?{if?(e?!=?null)?{log.error("the?producer?has?a?error:"?+?e.getMessage());}}});}try?{Thread.sleep(1000);producer.close();}?catch?(InterruptedException?e1)?{e1.printStackTrace();}}}消費(fèi)者業(yè)務(wù)邏輯采用一線程一主題的方式進(jìn)行消息的消費(fèi),主程序入口代碼如下:
package?com.sequoiadb.kafka;import?java.util.ArrayList;import?java.util.List;import?java.util.Map;import?java.util.concurrent.ExecutorService;import?java.util.concurrent.Executors;import?java.util.concurrent.TimeUnit;import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?com.sequoiadb.kafka.bean.KafkaConsumerConfig;import?com.sequoiadb.utils.Configuration;import?com.sequoiadb.utils.Constants;public?class?KafkaSdb?{private?static?Logger?log?=?LoggerFactory.getLogger(KafkaSdb.class);private?static?ExecutorService?executor;public?static?void?main(String[]?args)?{//?獲取kafka主題配置List<KafkaConsumerConfig>?topicSdbList?=?Configuration.getConfiguration();if?(topicSdbList?!=?null?&&?topicSdbList.size()?>?0)?{executor?=?Executors.newFixedThreadPool(topicSdbList.size());final?List<ConsumerThread>?consumerList?=?new?ArrayList<ConsumerThread>();for?(int?i?=?0;?i?<?topicSdbList.size();?i++)?{KafkaConsumerConfig?consumerConfig?=?topicSdbList.get(i);ConsumerThread?consumer?=?new?ConsumerThread(consumerConfig);consumerList.add(consumer);executor.submit(consumer);}Runtime.getRuntime().addShutdownHook(new?Thread()?{@Overridepublic?void?run()?{for?(ConsumerThread?consumer?:?consumerList)?{consumer.shutdown();}executor.shutdown();try?{executor.awaitTermination(5000,?TimeUnit.MILLISECONDS);}?catch?(InterruptedException?e)?{e.printStackTrace();}}});}?else?{log.error("主題為空,請(qǐng)確認(rèn)主題配置是否正確!");}}}線程類負(fù)責(zé)具體的消息的消費(fèi),并且將消息數(shù)據(jù)寫入到SequoiaDB中,具體代碼如下:
package?com.sequoiadb.kafka;import?java.io.IOException;import?java.util.ArrayList;import?java.util.Arrays;import?java.util.Iterator;import?java.util.List;import?java.util.Properties;import?org.apache.kafka.clients.consumer.ConsumerRecord;import?org.apache.kafka.clients.consumer.ConsumerRecords;import?org.apache.kafka.clients.consumer.KafkaConsumer;import?org.apache.kafka.common.errors.WakeupException;import?org.bson.BSONObject;import?org.bson.BasicBSONObject;import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?com.sequoiadb.base.CollectionSpace;import?com.sequoiadb.base.DBCollection;import?com.sequoiadb.base.Sequoiadb;import?com.sequoiadb.exception.BaseException;import?com.sequoiadb.kafka.bean.KafkaConsumerConfig;import?com.sequoiadb.utils.ConnectionPool;import?com.sequoiadb.utils.Constants;import?net.sf.json.JSONArray;import?net.sf.json.JSONObject;public?class?ConsumerThread?implements?Runnable?{private?static?Logger?log?=?LoggerFactory.getLogger(ConsumerThread.class);private?String?location?=?"kafka-consumer.properties";//?配置文件位置private?Sequoiadb?sdb?=?null;private?CollectionSpace?cs?=?null;private?DBCollection?cl?=?null;private?KafkaConsumer<String,?String>?consumer?=?null;// private?String?topicName?=?null;// private?String?clName?=?null;// private?String?topicGroupName?=?null;// private?long?pollTimeout?=?1000;private?KafkaConsumerConfig?consumerConfig;public?ConsumerThread(KafkaConsumerConfig?consumerConfig)?{if?(null?==?sdb)?{sdb?=?ConnectionPool.getInstance().getConnection();}if?(sdb.isCollectionSpaceExist(Constants.CS_NAME))?{cs?=?sdb.getCollectionSpace(Constants.CS_NAME);}?else?{throw?new?BaseException("集合空間"?+?Constants.CS_NAME?+?"不存在!");}if?(null?==?cs)?{throw?new?BaseException("集合空間不能為null!");}?else?{this.consumerConfig?=?consumerConfig;this.cl?=?cs.getCollection(this.consumerConfig.getSdbCLName());}Properties?props?=?new?Properties();try?{props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));}?catch?(IOException?e)?{e.printStackTrace();}props.put("group.id",?this.consumerConfig.getTopicGroupName());consumer?=?new?KafkaConsumer<>(props);}@Overridepublic?void?run()?{log.info("主題為"?+?this.consumerConfig.getTopicName()?+?"的消費(fèi)者線程啟動(dòng)!");try?{//?訂閱topicconsumer.subscribe(Arrays.asList(this.consumerConfig.getTopicName()));while?(true)?{ConsumerRecords<String,?String>?records?=?consumer.poll(this.consumerConfig.getPollTimeout());//?consumer.seekToBeginning(Arrays.asList(new//?TopicPartition(this.topicName,?0)));//?consumer.seek(new?TopicPartition(this.topicName,?0),?0);List<BSONObject>?list?=?new?ArrayList<BSONObject>();for?(ConsumerRecord<String,?String>?record?:?records)?{String?value?=?record.value();JSONObject?valueJson?=?JSONObject.fromObject(value);if?(valueJson.containsKey("data"))?{JSONArray?dataJsonArray?=?valueJson.getJSONArray("data");for?(int?i?=?0;?i?<?dataJsonArray.size();?i++)?{BSONObject?httpBson?=?new?BasicBSONObject();JSONObject?dataJson?=?dataJsonArray.getJSONObject(i);Iterator?iter?=?dataJson.keys();while?(iter.hasNext())?{String?key?=?(String)?iter.next();String?bsonValue?=?dataJson.getString(key);httpBson.put(key,?bsonValue);}list.add(httpBson);//?clHttp.insert(httpBson);}}?else?{log.error("消息中不存在data節(jié)點(diǎn)!");}}if?(list?!=?null?&&?list.size()?>?0)?{try?{this.cl.bulkInsert(list,?DBCollection.FLG_INSERT_CONTONDUP);log.info("主題為"+this.consumerConfig.getTopicName()+"的消息插入SDB成功,插入記錄數(shù)為:"+list.size());}?catch?(BaseException?e)?{e.printStackTrace();}}consumer.commitSync();}}?catch?(WakeupException?e)?{}?finally?{consumer.close();}}public?void?shutdown(){consumer.wakeup();}}5、?總結(jié)
從上述對(duì)接過程中,Kafka中的消息寫入SequoiaDB難點(diǎn)是Kafka中主題分區(qū)的配置以及多線程如何消費(fèi)各主題分區(qū)中的消息,并且處理消息消費(fèi)失敗的情況。
?
?
? ? ?
轉(zhuǎn)載于:https://my.oschina.net/wangzhonnew/blog/1559772
總結(jié)
以上是生活随笔為你收集整理的【技术教程】SequoiaDB对接Kafka的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HTML 5 视频,音频
- 下一篇: 【版本更新】Aspose.Slides