30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步)
1、kafka同步到Elasticsearch方式?
目前已知常用的方式有四種:?
1)logstash_input_kafka插件;?
缺點:不穩定(ES中文社區討論)?
2)spark stream同步;?
缺點:太龐大?
3)kafka connector同步;?
4)自寫程序讀取、解析、寫入?
?
本文主要基于kafka connector實現kafka到Elasticsearch全量、增量同步。
2、從confluenct說起
LinkedIn有個三人小組出來創業了—正是當時開發出Apache Kafka實時信息列隊技術的團隊成員,基于這項技術Jay Kreps帶頭創立了新公司Confluent。Confluent的產品圍繞著Kafka做的。?
Confluent Platform簡化了連接數據源到Kafka,用Kafka構建應用程序,以及安全,監控和管理您的Kafka的基礎設施。?
confluent組成如下所示:?
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
默認端口對應表:
組件 | 端口
Apache Kafka brokers (plain text):9092
Confluent Control Center:9021
Kafka Connect REST API:8083
REST Proxy:8082
Schema Registry REST API:8081
ZooKeeper:2181
3、kafka connector介紹。
Kafka 0.9+增加了一個新的特性 Kafka Connect,可以更方便的創建和管理數據流管道。它為Kafka和其它系統創建規模可擴展的、可信賴的流數據提供了一個簡單的模型。
通過 connectors可以將大數據從其它系統導入到Kafka中,也可以從Kafka中導出到其它系統。
Kafka Connect可以將完整的數據庫注入到Kafka的Topic中,或者將服務器的系統監控指標注入到Kafka,然后像正常的Kafka流處理機制一樣進行數據流處理。
而導出工作則是將數據從Kafka Topic中導出到其它數據存儲系統、查詢系統或者離線分析系統等,比如數據庫、 Elastic Search、 Apache Ignite等。
KafkaConnect有兩個核心概念:Source和Sink。 Source負責導入數據到Kafka,Sink負責從Kafka導出數據,它們都被稱為Connector。
kafkaConnect通過Jest實現Kafka對接Elasticsearch。
4、kafka connector安裝
實操非研究性的目的,不建議源碼安裝。?
直接從官網down confluent安裝即可。地址:https://www.confluent.io/download/
如下,解壓后既可以使用。
[root@kafka_no1 confluent-3.3.0]# pwd /home/confluent/confluent-3.3.0[root@kafka_no1 confluent-3.3.0]# ls -al total 32 drwxrwxr-x. 7 root root 4096 Dec 16 10:08 . drwxr-xr-x. 3 root root 4096 Dec 20 15:34 .. drwxr-xr-x. 3 root root 4096 Jul 28 08:30 bin drwxr-xr-x. 18 root root 4096 Jul 28 08:30 etc drwxr-xr-x. 2 root root 4096 Dec 21 15:34 logs -rw-rw-r--. 1 root root 871 Jul 28 08:45 README drwxr-xr-x. 10 root root 4096 Jul 28 08:30 share drwxrwxr-x. 2 root root 4096 Jul 28 08:45 src- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
5、kafka connector模式
Kafka connect 有兩種工作模式?
1)standalone:在standalone模式中,所有的worker都在一個獨立的進程中完成。
2)distributed:distributed模式具有高擴展性,以及提供自動容錯機制。你可以使用一個group.ip來啟動很多worker進程,在有效的worker進程中它們會自動的去協調執行connector和task,如果你新加了一個worker或者掛了一個worker,其他的worker會檢測到然后在重新分配connector和task。
6、kafka connector同步步驟
前提:
$ confluent start- 1
如下的服務都需要啟動:
Starting zookeeper zookeeper is [UP] ——對應端口:2181 Starting kafka kafka is [UP]——對應端口:9092 Starting schema-registry schema-registry is [UP]——對應端口:8081 Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP]- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
可以,netstat -natpl 查看端口是否監聽ok。
步驟1:創建topic
./kafka-topics.sh --create --zookeeper 110.118.7.11 :2181 --replication-factor 3 --partitions 1 --topic test-elasticsearch-sink- 1
步驟2:生產者發布消息
假定avrotest topic已經創建。
./kafka-avro-console-producer --broker-list 110.118.7.11:9092 --topic test-elasticsearch-sink \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}- 1
- 2
- 3
- 4
- 5
- 6
步驟3:消費者訂閱消息測試(驗證生產者消息可以接收到)
./kafka-avro-console-consumer --bootstrap-server 110.118.7.11:9092 :9092 --topic test-elasticsearch-sink --from-beginning- 1
步驟4:connector傳輸數據操作到ES
./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties \ ../etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties- 1
- 2
注意此處: connect-standalone模式,對應 connect-avro-standalone.properties要修改;?
如果使用connect-distribute模式,對應的connect-avro-distribute.properties要修改。?
這里 quickstart-elasticsearch.properties :啟動到目的Elasticsearch配置。
quickstart-elasticsearch.properties**設置**:
name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 #kafka主題名稱,也是對應Elasticsearch索引名稱 topics= test-elasticsearch-sinkkey.ignore=true #ES url信息 connection.url=http://110.18.6.20:9200 #ES type.name固定 type.name=kafka-connect- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
7、同步效果。
curl -XGET 'http:// 110.18.6.20 :9200/test-elasticsearch-sink/_search?pretty'
8、連接信息查詢REST API
- -
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
9、小結。
他山之石,可以攻玉。?
kafka上的小學生,繼續加油!
參考:
[1]kafka-connect部署及簡介:http://t.cn/RiUCaWx?
[2]connector介紹:http://orchome.com/344?
[3]英文-同步介紹http://t.cn/RYeZm7P?
[4]部署&開發http://t.cn/RTeyOEl?
[5]confluent生態鏈http://t.cn/RTebVyL?
[6]快速啟動參考:https://docs.confluent.io/3.3.0/quickstart.html?
[7]ES-connector:http://t.cn/RTecXmc
總結
以上是生活随笔為你收集整理的30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于Go语言Gin+Xorm+Layui
- 下一篇: Meta拟裁撤Instagram伦敦员工