confluent connect写出到ES及ClickHouse
1 連接Elasticsearch測試
1.1 啟動confluent
/home/kafka/.local/confluent/bin/confluent start
This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlUsing CONFLUENT_CURRENT: /tmp/confluent.swpIapNw Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Starting ksql-server ksql-server is [UP] Starting control-center control-center is [UP]1.2 增加配置
vim /home/kafka/.local/confluent/etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties
name=iot-elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=road_traffic key.ignore=true connection.url=http://10.0.165.8:9200 type.name=iot-kafka-connect batch.size=1 flush.timeout.ms=200000 topic.schema.ignore=road_traffic schema.ignore=true retry.backoff.ms=30001.3 增加connect
bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties
$ bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlWarning: Install 'jq' to add support for parsing JSON {"name":"iot-elasticsearch-sink","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"road_traffic","key.ignore":"true","connection.url":"http://10.0.165.8:9200","type.name":"iot-kafka-connect","batch.size":"1","flush.timeout.ms":"200000","topic.schema.ignore":"road_traffic","schema.ignore":"true","retry.backoff.ms":"3000","name":"iot-elasticsearch-sink"},"tasks":[],"type":"sink"}查看狀態
$ bin/confluent status connectors This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html ["iot-elasticsearch-sink"]$ bin/confluent status iot-elasticsearch-sink This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html {"name":"iot-elasticsearch-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.9:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.0.165.8:8083"}],"type":"sink"}1.4 創建kafkatopic
/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --create --replication-factor 2 --partitions 2 --topic road_traffic
查看是否創建成功
/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --list
1.5 生產數據
(1)添加如下的依賴
<groupId>org.example</groupId><artifactId>Manufacturing_data</artifactId><version>1.0-SNAPSHOT</version><properties><scala.binary.version>2.11</scala.binary.version><kafka.version>1.0.0</kafka.version><avro.version>1.8.0</avro.version></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.66</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>${avro.version}</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro-tools</artifactId><version>${avro.version}</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>(2)confluent的相關包在maven上是找不到的。需要自己手動添加,否則會報錯找不到io.confluent.kafka.serializers.KafkaAvroSerializer。
confluent-4.0.0 解壓后,其 share/java/目錄下有 confluent 各個組件的 jar 包:我們需要 confluent-common 目錄下的common-config-4.1.1.jar、common-utils-4.1.1.jar和全部以jackson開頭的 jar 包以及 kafka-serde-tools 目錄下的kafka-schema-registry-client-4.1.1.jar和kafka-avro-serializer-4.1.1.jar
復制出來在模塊下新建一個lib包放入,然后右鍵Add as Libary…
生產者代碼如下:
import java.io.File import java.util.Propertiesimport com.alibaba.fastjson.JSON import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}case class RoadTraffic(status:String,avgMeasuredTime:Int,avgSpeed: Int, extID:String,medianMeasuredTime: Int, timestamp: Long,vehicleCount:Int,id:Long,perort_id:String,process_time:Long)object KafkaToTraffic {def main(args: Array[String]): Unit = {// kafka配置參數val props = new Properties()props.put("bootstrap.servers","10.0.165.8:9092,10.0.165.9:9092")props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")props.put("schema.registry.url", "http://10.0.165.8:8081");// Avro Schema解析val schema:Schema = new Schema.Parser().parse(new File("E:\\working\\ideaWorking\\iot_road_traffic\\src\\main\\resources\\RoadTraffic.avsc"));//val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)val avroRecord:GenericData.Record = new GenericData.Record(schema)//創建一個kafka生產者val producer: KafkaProducer[String,GenericRecord] = new KafkaProducer(props)val str= "{\"status\":\"OK\",\"avgMeasuredTime\":\"53\",\"avgSpeed\":\"58\",\"extID\":\"724\",\"medianMeasuredTime\":\"53\",\"TIMESTAMP\":\"2014-04-25T19:35:00\",\"vehicleCount\":\"1\",\"id\":\"8961146\",\"perort_id\":\"179444\",\"process_time\":\"1593386110\"}"val roadTraffic = JSON.parseObject(str, classOf[RoadTraffic])System.out.println(roadTraffic)avroRecord.put("status", roadTraffic.status);avroRecord.put("avgMeasuredTime", roadTraffic.avgMeasuredTime);avroRecord.put("avgSpeed", roadTraffic.avgSpeed);avroRecord.put("extID", roadTraffic.extID);avroRecord.put("medianMeasuredTime", roadTraffic.medianMeasuredTime);avroRecord.put("timestamp", roadTraffic.timestamp);avroRecord.put("vehicleCount", roadTraffic.vehicleCount);avroRecord.put("id", roadTraffic.id);avroRecord.put("perort_id", roadTraffic.perort_id);avroRecord.put("process_time", roadTraffic.process_time);try {val record = new ProducerRecord[String, GenericRecord]("road_traffic", avroRecord)System.out.println(record.toString)producer.send(record).get()} catch {case e: Exception => e.printStackTrace()}producer.close();} }RoadTraffic.avsc
{"type": "record","name": "traffic","fields": [{"name": "status", "type": "string"},{"name": "avgMeasuredTime", "type": "int"},{"name": "avgSpeed", "type": "int"},{"name": "extID", "type": "string"},{"name": "medianMeasuredTime", "type": "int"},{"name": "timestamp", "type": "long"},{"name": "vehicleCount", "type": "int"},{"name": "id", "type": "long"},{"name": "perort_id", "type": "string"},{"name": "process_time", "type": "long"}] }1.6 查看結果
加載iot-elasticsearch-sink后啟動生產者,會自動在ES上建立與topic一樣的索引,查看es
curl -GET http://10.0.165.8:9200/road_traffic/_search
$ curl -GET http://10.0.165.8:9200/road_traffic/_search {"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"road_traffic","_type":"iot-kafka-connect","_id":"road_traffic+0+0","_score":1.0,"_source":{"status":"OK","avgMeasuredTime":53,"avgSpeed":58,"extID":"724","medianMeasuredTime":53,"timestamp":1398425700000,"vehicleCount":1,"id":8961146,"perort_id":"179444","process_time":1593386110}}]}}2 連接ClickHouse測試
連接ClickHouse是通過jdbc的方式
2.1 增加配置
vim /home/kafka/.local/confluent/etc/kafka-connect-jdbc/iot-clickhouse-sink.properties
name=iot-clickhouse-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=road_traffic connection.url=jdbc:clickhouse://10.0.50.1:8123/iot connection.user=default auto.create=false insert.mode=INSERT table.name.format=traffic_all errors.log.enable=true db.timezone=Asia/Shanghai2.2 增加jar包
通過jdbc連接ClickHouse是需要在/home/kafka/.local/confluent/share/java/kafka-connect-jdbc目錄下增加ClickHouse的jdbc連接的jar包:clickhouse-jdbc-0.2.4.jar
$ ll total 10952 -rw-r--r-- 1 kafka kafka 20437 Mar 27 08:37 audience-annotations-0.5.0.jar -rw-r--r-- 1 root root 211574 Jun 29 12:30 clickhouse-jdbc-0.2.4.jar -rw-r--r-- 1 kafka kafka 20903 Mar 27 08:37 common-utils-5.2.4.jar -rw-r--r-- 1 kafka kafka 87325 Mar 27 08:37 jline-0.9.94.jar -rw-r--r-- 1 kafka kafka 317816 Mar 27 08:37 jtds-1.3.1.jar -rw-r--r-- 1 kafka kafka 223878 Mar 27 08:37 kafka-connect-jdbc-5.2.4.jar -rw-r--r-- 1 kafka kafka 1292696 Mar 27 08:37 netty-3.10.6.Final.jar -rw-r--r-- 1 kafka kafka 927447 Mar 27 08:37 postgresql-42.2.10.jar -rw-r--r-- 1 kafka kafka 41203 Mar 27 08:37 slf4j-api-1.7.25.jar -rw-r--r-- 1 kafka kafka 7064881 Mar 27 08:37 sqlite-jdbc-3.25.2.jar -rw-r--r-- 1 kafka kafka 74798 Mar 27 08:37 zkclient-0.10.jar -rw-r--r-- 1 kafka kafka 906708 Mar 27 08:37 zookeeper-3.4.13.jar注意:需要重啟confluent,否則會報錯: java.sql.SQLException: No suitable driver found for jdbc:clickhouse://10.0.50.1:8123/iot
2.3 在clickhouse建庫建表
2.4 增加connect
因為前面已經在進行es測試的時候往road_traffic的主題上寫入了一條數據直接進行測試
bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties
[kafka@fbi-local-08 confluent]$ bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlWarning: Install 'jq' to add support for parsing JSON {"name":"iot-clickhouse-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"road_traffic","connection.url":"jdbc:clickhouse://10.0.50.1:8123/iot","connection.user":"default","auto.create":"false","insert.mode":"INSERT","table.name.format":"traffic_all","errors.log.enable":"true","db.timezone":"Asia/Shanghai","name":"iot-clickhouse-sink"},"tasks":[],"type":"sink"} $ bin/confluent status iot-clickhouse-sink This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html{"name":"iot-clickhouse-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.8:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"10.0.165.8:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Table \"traffic_all\" is missing and auto-creation is disabled\n\tat io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:88)\n\tat io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\t... 10 more\n"}],"type":"sink"}當修改了iot-clickhouse-sink.properties中的表為本地表traffic時不報錯。
解決如下:
修改kafka-connect-jdbc-5.2.4源碼,增加clickhouse的連接然后將修改編譯后的jar包上傳到/home/kafka/.local/confluent/share/java/kafka-connect-jdbc,并刪除原來的kafka-connect-jdbc-5.2.4.jar
總結
以上是生活随笔為你收集整理的confluent connect写出到ES及ClickHouse的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php 正则 尖括号,php使用正则表达
- 下一篇: 蓝色三角_叶子长得像韭菜,花朵开得像个糖