Kafka Connect简介
一. Kafka Connect簡介
Kafka是一個使用越來越廣的消息系統,尤其是在大數據開發中(實時數據處理和分析)。為何集成其他系統和解耦應用,經常使用Producer來發送消息到Broker,并使用Consumer來消費Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡化了其他系統與Kafka的集成。Kafka Connect運用用戶快速定義并實現各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數據導入/導出Kafka很方便。
? ? ? ? ? ? ?
如圖中所示,左側的Sources負責從其他異構系統中讀取數據并導入到Kafka中;右側的Sinks是把Kafka中的數據寫入到其他的系統中。
二. 各種Kafka Connector
Kafka Connector很多,包括開源和商業版本的。如下列表中是常用的開源Connector
| Connectors | References |
| Jdbc | Source,?Sink |
| Elastic Search | Sink1,?Sink2,?Sink3 |
| Cassandra | Source1,?Source 2,?Sink1,?Sink2? |
| MongoDB | Source |
| HBase | Sink |
| Syslog | Source |
| MQTT (Source) | Source |
| Twitter (Source) | Source,?Sink |
| S3 | Sink1,?Sink2 ? |
商業版的可以通過Confluent.io獲得
三. 示例
3.1 FileConnector Demo
本例演示如何使用Kafka Connect把Source(test.txt)轉為流數據再寫入到Destination(test.sink.txt)中。如下圖所示:
? ? ? ? ??
? ? ? 本例使用到了兩個Connector:
- FileStreamSource:從test.txt中讀取并發布到Broker中
- FileStreamSink:從Broker中讀取數據并寫入到test.sink.txt文件中
其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-testBroker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000?
3.2 運行Demo
需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)
?3.2.1 啟動Kafka Broker
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/ [root@localhost kafka_2.11-0.11.0.0]# ls bin config libs LICENSE logs NOTICE site-docs [root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties & [root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &3.2.2 啟動Source Connector和Sink Connector
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties3.3.3 打開console-consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test3.3.4 寫入到test.txt文件中,并觀察3.3.3中的變化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt [root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt 3.3.3中打開的窗口輸出如下 {"schema":{"type":"string","optional":false},"payload":"firest line"} {"schema":{"type":"string","optional":false},"payload":"second line"}3.3.5 查看test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt firest line second line?本例僅僅演示了Kafka自帶的File Connector,后續文章會完成JndiConnector,HdfsConnector,并且會使用CDC(Changed Data Capture)集成Kafka來完成一個ETL的例子
?
四. kafka 0.9 connect JDBC測試
kafka 0.9的connect功能,測試過程如下:
1.創建容器(本次采用docker容器構建kafka環境)
docker run -p 10924:9092 -p 21814:2181 --name?confluent -i -t -d java /bin/bash
2.將confluent安裝程序拷貝進容器;
docker cp ?confluent.zip confluent:/root
3.進入到confluent容器
docker exec -it confluent /bin/bash
4.解壓confluent壓縮包
unzip confluent.zip
5.啟動kafka
/root/confluent/bin/zookeeper-server-start? /root/confluent/etc/kafka/zookeeper.properties ?& > zookeeper.log
/root/confluent/bin/kafka-server-start? /root/confluent/etc/kafka/server.properties?& > server.log
/root/confluent/bin/schema-registry-start ?/root/confluent/etc/schema-registry/schema-registry.properties?& > schema.log
6.測試kafka 是否正常
開兩個docker窗口,一個跑producer,一個跑consumer,
/root/confluent/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
?
/root/confluent/bin/kafka-avro-console-consumer --topic test ?--zookeeper localhost:2181 --from-beginning
在producer端依次輸入以下記錄,確認consumer能正確顯示;
{"f1":?"value1"}
{"f1":?"value2"}
{"f1":?"value3"}
以上為安裝kafka過程,接下來開始測試jdbc接口;
測試之前,需要獲取mysql JDBC的驅動并將獲放在kafka環境對應的jre/lib文件夾里
測試jdbc connect
1.創建配置文件quickstart-mysql.properties,內容如下:?
name=test-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://192.168.99.100:33061/test1?user=root&password=welcome1
mode=incrementing
incrementing.column.name=id
topic.prefix=test-mysql-jdbc-
注:mysql是我在另一個容器里運行的,jdbc:mysql://192.168.99.100:33061/test1?user=root&password=welcome1是連接容器里的mysql的連接串
2.執行./bin/connect-standalone?etc/schema-registry/connect-avro-standalone.properties?etc/kafka-connect-jdbc/quickstart-mysql.properties
3.執行./bin/kafka-avro-console-consumer?--new-consumer?--bootstrap-server?192.168.99.100:10924 --topic test-mysql-jdbc-accounts?--from-beginning
然后在數據庫里增加一條記錄
然后就會在consumer端顯示新增記錄
?
五. 配置連接器
?
Connector的配置是簡單的key-value映射。對于獨立模式,這些都是在屬性文件中定義,并通過在命令行上的Connect處理。在分布式模式,JSON負載connector的創建(或修改)請求。大多數配置都是依賴的connector,有幾個常見的選項:
- name?- 連接器唯一的名稱,不能重復。
- connector.calss?- 連接器的Java類。
- tasks.max?- 連接器創建任務的最大數。
- connector.class配置支持多種格式:全名或連接器類的別名。比如連接器是org.apache.kafka.connect.file.FileStreamSinkConnector,你可以指定全名,也可以使用FileStreamSink或FileStreamSinkConnector。Sink connector也有一個額外的選項來控制它們的輸入:
- topics - 作為連接器的輸入的topic列表。
對于其他的選項,你可以查看連接器的文檔。
六、rest api
kafka connect的目的是作為一個服務運行,默認情況下,此服務運行于端口8083。它支持rest管理,用來獲取 Kafka Connect 狀態,管理 Kafka Connect 配置,Kafka Connect 集群內部通信,常用命令如下:
GET /connectors?返回一個活動的connect列表
POST /connectors?創建一個新的connect;請求體是一個JSON對象包含一個名稱字段和連接器配置參數
GET /connectors/{name}?獲取有關特定連接器的信息
GET /connectors/{name}/config?獲得特定連接器的配置參數
PUT /connectors/{name}/config?更新特定連接器的配置參數
GET /connectors/{name}/tasks 獲得正在運行的一個連接器的任務的列表
DELETE /connectors/{name}?刪除一個連接器,停止所有任務,并刪除它的配置
GET /connectors?返回一個活動的connect列表
POST /connectors?創建一個新的connect;請求體是一個JSON對象包含一個名稱字段和連接器配置參數
GET /connectors/{name}?獲取有關特定連接器的信息
GET /connectors/{name}/config?獲得特定連接器的配置參數
PUT /connectors/{name}/config?更新特定連接器的配置參數
GET /connectors/{name}/tasks 獲得正在運行的一個連接器的任務的列表
DELETE /connectors/{name}?刪除一個連接器,停止所有任務,并刪除它的配置
curl -s <Kafka Connect Worker URL>:8083/ | jq???獲取 Connect Worker 信息
curl -s <Kafka Connect Worker URL>:8083/connector-plugins | jq?列出 Connect Worker 上所有 Connector
curl -s <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/tasks | jq?獲取 Connector 上 Task 以及相關配置的信息
curl -s <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/status | jq?獲取 Connector 狀態信息
curl -s <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/config | jq?獲取 Connector 配置信息
curl -s -X PUT <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/pause?暫停 Connector
curl -s -X PUT <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/resume?重啟 Connector
curl -s -X DELETE <Kafka Connect Worker URL>:8083/connectors/<Connector名字>?刪除 Connector
創建新 Connector (以FileStreamSourceConnector舉例)
curl -s -X POST -H "Content-Type: application/json" --data
'{"name": "<Connector名字>",
"config":
{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter.schemas.enable":"true",
"file":"demo-file.txt",
"tasks.max":"1",
"value.converter.schemas.enable":"true",
"name":"file-stream-demo-distributed",
"topic":"demo-distributed",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter":"org.apache.kafka.connect.json.JsonConverter"}
}'
http://<Kafka Connect Worker URL>:8083/connectors | jq
?
更新 Connector配置 (以FileStreamSourceConnector舉例)
curl -s -X PUT -H "Content-Type: application/json" --data
'{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter.schemas.enable":"true",
"file":"demo-file.txt",
"tasks.max":"2",
"value.converter.schemas.enable":"true",
"name":"file-stream-demo-distributed",
"topic":"demo-2-distributed",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter":"org.apache.kafka.connect.json.JsonConverter"}'
<Kafka Connect Worker URL>:8083/connectors/file-stream-demo-distributed/config | jq
七、kafka connect + debezium,解析binlog至kafka
在已知kafka connect和debezium作用,會使用kafka的基礎上,學會使用debezium來讀取binlog,并通過kafka connect將讀取的內容放入kafka topic中。?
基于kafka0.10.0和Debezium0.6,mysql5.6
kafka connect
- Kafka Connect是一種用于Kafka和其他數據系統之間進行數據傳輸的工具。
- 僅關注數據的復制,并且不處理其他任務
- Kafka connect有兩個概念,一個source,另一個是sink。source是把數據從一個系統拷貝到kafka里,sink是從kafka拷貝到另一個系統里。
- 可使用插件,獲取不同系統的數據。例如通過Debezium插件解析mysql的日志,獲取數據。
- 支持集群,可以通過REST API管理Kafka Connect。
- 對數據的傳輸進行管理和監控。
Debezium
- Debezium是一個分布式平臺,可將現有數據庫轉換為事件流,因此應用程序可以立即查看并立即響應數據庫中每一行的更改。
- Debezium建立在Apache Kafka之上,并提供用于監視特定數據庫管理系統的Kafka Connect兼容連接器。
- 本教程使用Debezium監控binlog。
準備操作
- mysql需開啟binlog
[mysqld]
log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW #選擇row模式
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重復
- mysql需創建一個有mysql slave相關權限的賬號,若mysql不在本機,則需要遠程權限,防火墻放行。
//mysql slave相關權限
CREATE USER canal IDENTIFIED BY 'debezium';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'debezium'@'%' ;
FLUSH PRIVILEGES;
- 操作概述
- 安裝并啟動kafka
- 安裝并啟動mysql
- 下載Debezium的mysql連接器http://debezium.io/docs/install/并解壓
- 安裝debezium,即將解壓目錄寫入classpath變量,例如:export classpath=/root/debezium-connector-mysql/*?
只在當前shell有效 - 參考http://debezium.io/docs/connectors/mysql/的配置文件示例,寫好配置文件。
- 以獨立模式啟動kafka connect,此時debezium會對數據庫中的每一個表創建一個topic,消費相應的topic,即可獲取binlog解析信息。
//啟動kafka connect
bin/connect-standalone.sh config/connect-standalone.properties mysql.properties
//查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
//消費該主題
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
- 配置文件
//mysql.properties
name=inventory-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=192.168.99.100
database.port=3306
database.user=debezium
database.password=dbz
database.server.id=184054
database.server.name=fullfillment
database.whitelist=inventory
database.history.kafka.bootstrap.servers=192.168.30.30:9092
database.history.kafka.topic=dbhistory.fullfillment
include.schema.changes=true
- 索引
debezium官網?http://debezium.io/?
kafka文檔?http://kafka.apache.org/0100/documentation.html
?
八、Kafka Connect的優點
1.對開發者提供了統一的實現接口
2.開發,部署和管理都非常方便,統一?
3.使用分布式模式進行水平擴展,毫無壓力
4.在分布式模式下可以通過Rest Api提交和管理Connectors
5.對offset自動管理,只需要很簡單的配置,而不像Consumer中需要開發者處理
6.流式/批式處理的支持
九、第三方資源
這是已經得到支持的組件,不需要做額外的開發:?https://www.confluent.io/product/connectors/
括號中的Source表示將數據從其他系統導入Kafka,Sink表示將數據從Kafka導出到其他系統。
其他的我沒看,但是JDBC的實現比較的坑爹,是通過primary key(如id)和時間戳(如updateTime)字段,
來判斷數據是否更新,這樣的話應用范圍非常受局限。
?
十、Connector Development Guide
?
在kafka與其他系統間復制數據需要創建kafka connect,他們將數復制到kafka或者從kafka復制到其他系統
連接器有兩種形式:sourceconnectors將另一個系統數據導入kafka,sinkconnectors將數據導出到另一個系統
連接器不執行任何數據復制:它們的描述復制的數據,并且負責將工作分配給多個task
task分為sourcetask與sinktask
每個task從kafka復制數據,connect會保證record與schema的一致性完成任務分配,通常record與schema的映射是明顯的,每一個文件對應一個流,流中的每一條記錄利用schema解析并且保存對應的offset,另外一種情況是我們需要自己完成這種映射,比如數據庫,表的offset不是很明確(沒有自增id),一種可能的選擇是利用時間(timestamp)來完成增量查詢。
Streams and Records
每一個stream是包含key value對的記錄的序列,key value可以是原始類型,可以支持復雜結構,除了array,object,嵌套等。數據轉換是框架來完成的,record中包含stream id與offset,用于定時offset提交,幫助當處理失敗時恢復避免重復處理。
Dynamic Connectors
所有的job不是靜態的,它需要監聽外部系統的變化,比如數據庫表的增加刪除,當一個新table創建時,它必須發現并且更新配置由框架來分配給該表一個task去處理,當通知發布后框架會更新對應的task.
Developing a Simple Connector
例子很簡單
在standalone模式下實現 SourceConnector/SourceTask 讀取文件并且發布record給SinkConnector/SinkTask 由sink寫入文件
Connector Example
我們將實現SourceConnector,SinkConnector實現與它非常類似,它包括兩個私有字段存放配置信息(讀取的文件名與topic名稱)
public class FileStreamSourceConnector extends SourceConnector {
? ? private String filename;
? ? private String topic;
getTaskClass()方法定義實現執行處理的task
@Override
public Class getTaskClass() {
? ? return FileStreamSourceTask.class;
}
下面定義FileStreamSourceTask,它包括兩個生命周期方法start,stop
@Override
public void start(Map<String, String> props) {
? ? // The complete version includes error handling as well.
? ? filename = props.get(FILE_CONFIG);
? ? topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
? ? // Nothing to do since no background monitoring is required.
}
最后是真正核心的方法getTaskConfigs()在這里我們僅處理一個文件,所以我們雖然定義了max task(在配置文件里)但是只會返回一個包含一條entry的list
@Override
public List<Map<String, String>> getTaskConfigs(int maxTasks) {
? ? ArrayList>Map<String, String>> configs = new ArrayList<>();
? ? // Only one input stream makes sense.
? ? Map<String, String> config = new Map<>();
? ? if (filename != null)
? ? ? ? config.put(FILE_CONFIG, filename);
? ? config.put(TOPIC_CONFIG, topic);
? ? configs.add(config);
? ? return configs;
}
即使有多個任務,這種方法的執行通常很簡單。它只是要確定輸入任務的數量,這可能需要拉取數據從遠程服務,然后分攤。請注意,這個簡單的例子不包括動態輸入。在下一節中看到討論如何觸發任務的配置更新。
Task Example - Source Task
實現task,我們使用偽代碼描述核心代碼
public class FileStreamSourceTask extends SourceTask<Object, Object> {
? ? String filename;
? ? InputStream stream;
? ? String topic;
? ? public void start(Map<String, String> props) {
? ? ? ? filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
? ? ? ? stream = openOrThrowError(filename);
? ? ? ? topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
? ? }
? ? @Override
? ? public synchronized void stop() {
? ? ? ? stream.close()
? ? }
start方法讀取之前的offset,并且處理新的數據,stop方法停止stream,下面實現poll方法
@Override
public List<SourceRecord> poll() throws InterruptedException {
? ? try {
? ? ? ? ArrayList<SourceRecord> records = new ArrayList<>();
? ? ? ? while (streamValid(stream) && records.isEmpty()) {
? ? ? ? ? ? LineAndOffset line = readToNextLine(stream);
? ? ? ? ? ? if (line != null) {
? ? ? ? ? ? ? ? Map sourcePartition = Collections.singletonMap("filename", filename);
? ? ? ? ? ? ? ? Map sourceOffset = Collections.singletonMap("position", streamOffset);
? ? ? ? ? ? ? ? records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? Thread.sleep(1);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return records;
? ? } catch (IOException e) {
? ? ? ? // Underlying stream was killed, probably as a result of calling stop. Allow to return
? ? ? ? // null, and driving thread will handle any shutdown if necessary.
? ? }
? ? return null;
}
該方法重復執行讀取操作,跟蹤file offset,并且利用上述信息創建SourceRecord,它需要四個字段:source partition,source offset,topic name,output value(包括value及value的schema)
Sink Tasks
之前描述了sourcetask實現,sinktask與它完全不同,因為前者是拉取數據,后者是推送數據
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) { ... }
public abstract void put(Collection<SinkRecord> records);
public abstract void flush(Map<TopicPartition, Long> offsets);
put方法是最重要的方法,接收sinkrecords,執行任何需要的轉換,并將其存儲在目標系統。此方法不需要確保數據已被完全寫入目標系統,然后返回。事實上首先放入緩沖,因此,批量數據可以被一次發送,減少對下游存儲的壓力。sourcerecords中保存的信息與sourcesink中的相同。flush提交offset,它接受任務從故障中恢復,沒有數據丟失。該方法將數據推送至目標系統,并且block直到寫入已被確認。的offsets參數通常可以忽略不計,但在某些情況保存偏移信息到目標系統確保一次交貨。例如,一個HDFS連接器可以確保flush()操作自動提交數據和偏移到HDFS中的位置。
Resuming from Previous Offsets
kafka connect是為了bulk 數據拷貝工作,它拷貝整個db而不是拷貝某個表,這樣會使用connnect的input或者output隨時改變,source connector需要監聽source系統的改變,當改變時通知框架(通過ConnectorContext對象)
舉例
if (inputsChanged())
? ? this.context.requestTaskReconfiguration();
當接收到通知框架會即時的更新配置,并且在更新前確保優雅完成當前任務
如果一個額外的線程來執行此監控,該線程必須存在于連接器中。該線程不會影響connector。然而,其他變化也會影響task,最常見的是輸入流失敗在輸入系統中,例如如果一個表被從數據庫中刪除。這時連接器需要進行更改,任務將需要處理這種異常。sinkconnectors只能處理流的加入,可以分配新的數據到task(例如,一個新的數據庫表)。框架會處理任何kafka輸入的改變,例如當組輸入topic的變化因為一個正則表達式的訂閱。sinktasks應該期待新的輸入流,可能需要在下游系統創造新的資源,如數據庫中的一個新的表。在這些情況下,可能會出現輸入流之間的沖突(同時創建新資源),其他時候,一般不需要特殊的代碼處理一系列動態流??
Dynamic Input/Output Streams
FileStream連接器是很好的例子,因為他們很簡單的,每一行是一個字符串。實際連接器都需要具有更復雜的數據格式。要創建更復雜的數據,你需要使用kafka connector數據接口:Schema,Struct
Schema schema = SchemaBuilder.struct().name(NAME)
? ? ? ? ? ? ? ? ? ? .field("name", Schema.STRING_SCHEMA)
? ? ? ? ? ? ? ? ? ? .field("age", Schema.INT_SCHEMA)
? ? ? ? ? ? ? ? ? ? .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
? ? ? ? ? ? ? ? ? ? .build();
Struct struct = new Struct(schema)
? ? ? ? ? ? ? ? ? ? ? ? ? ?.put("name", "Barbara Liskov")
? ? ? ? ? ? ? ? ? ? ? ? ? ?.put("age", 75)
? ? ? ? ? ? ? ? ? ? ? ? ? ?.build();
如果上游數據與schema數據格式不一致應該在sinktask中拋出異常
總結
以上是生活随笔為你收集整理的Kafka Connect简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据湖技术 Iceberg 的探索与实践
- 下一篇: 详解联邦学习Federated Lear