14_clickhouse,kafka引擎,kafka消息到ClickHouse的MergeTree引擎
19.Kafka引擎
19.1.Kafka引擎
Kafka引擎結(jié)合Kafka使用,可實現(xiàn)訂閱或發(fā)布數(shù)據(jù)流。
指定表引擎:
ENGINE = Kafka() SETTINGSkafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,][kafka_row_delimiter = 'delimiter_symbol',][kafka_schema = '',][kafka_num_consumers = N,][kafka_skip_broken_messages = N]必選參數(shù):
kafka_broker_list :以逗號分隔的brokers列表。
kafka_topic_list :以逗號分隔的kafka的topic列表。
kafka_group_name :Kafka消費組。
kafka_format :消息的格式,例如JSONEachRow。
可選參數(shù):
kafka_row_delimiter :行之間的分隔符。
kafka_schema :按需定義schema,例如Cap’n Proto格式需指定。
kafka_num_consumers :消費者數(shù)量,默認1,最多不超過Topic的分區(qū)數(shù)。
kafka_skip_broken_messages :每個block中,Kafka的消息解析器容忍schema不兼容消息的數(shù)量。默認值:0。
創(chuàng)建Kafka引擎表示例
示例1:
示例2:
CREATE TABLE queue2 (timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1') SETTINGS kafka_format ='JSONEachRow', kafka_num_consumers = 4;示例3:
CREATE TABLE queue2 (timestamp UInt64,level String,message String ) ENGINE = Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_num_consumers = 4;SELECT 查詢對于讀取消息并不是很有用(除了調(diào)試),因為每個消息只能讀取一次。
通常,將該引擎結(jié)合物化視圖一起使用,使用方法:
(1)、使用Kafka引擎創(chuàng)建一個Kafka的消費者,并將其視為一個數(shù)據(jù)流。
(2)、創(chuàng)建所需結(jié)構(gòu)的表。
(3)、創(chuàng)建一個物化視圖,該視圖轉(zhuǎn)換來自引擎的數(shù)據(jù)并將其放入上一步創(chuàng)建的表中。
當物化視圖添加至該引擎,它將會在后臺收集數(shù)據(jù)。這就允許你從Kafka持續(xù)接收消息并使用SELECT將數(shù)據(jù)轉(zhuǎn)換為所需的格式。它們不直接從Kafka中讀取數(shù)據(jù),而是接收新記錄,以block為單位,這樣就可以寫入具有不同詳細信息級別的多個表(分組聚合或無聚合)中。
為了提高性能,接收到的消息將被分組為大小為max_insert_block_size的block(塊)。如果block沒有在stream_flush_interval_ms時間內(nèi)形成,則不管block的完整性如何,數(shù)據(jù)將刷新到表中。
要停止接收topic數(shù)據(jù)或更改轉(zhuǎn)換邏輯,需detach物化視圖。
DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;
如果要使用ALTER更改目標表,建議禁用物化視圖,以避免目標表和該視圖中的數(shù)據(jù)之間出現(xiàn)差異。
Kafka的擴展配置
Kafka引擎支持使用ClickHouse配置文件擴展配置。
用戶可以使用兩個配置key,全局的kafka和topic級別的kafka_*。首先應用全局配置,然后應用topic級別的配置。
有關(guān)可能的配置選項的列表,參見librdkafka配置,鏈接:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。
ClickHouse配置中使用下劃線(_)代替點,例如,check.crcs=true將配置為
<check_crcs>true</check_crcs>最終在自己的機器上的配置如下:
[root@middleware config.d]# vim kafka.xml [root@middleware config.d]# pwd /etc/clickhouse-server/config.d [root@middleware config.d]# ls kafka.xml [root@middleware config.d]# ls kafka.xml [root@middleware config.d]# cat kafka.xml <yandex><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "topic_ch" --><kafka_topic_ch><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_topic_ch><kafka_my_topic><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_my_topic></yandex> [root@middleware config.d]#19.2.示例1
示例1:通過兩張表分別保存Kafka的清單數(shù)據(jù)和分組聚合數(shù)據(jù)。
創(chuàng)建Kafka的topic:
參考地址:https://kafka.apachecn.org/quickstart.html
可以運行l(wèi)ist(列表)命令來查看這個topic:
[root@middleware kafka_2.12-2.6.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181(1)、創(chuàng)建topic的數(shù)據(jù)流
drop table if exists topic_ch_kafka; CREATE TABLE topic_ch_kafka (timestamp UInt64,level String,message String ) ENGINE = Kafka('localhost:9092', 'topic_ch', 'group_ch', 'JSONEachRow');效果圖:
middleware :) CREATE TABLE topic_ch_kafka ( :-] timestamp UInt64, :-] level String, :-] message String :-] ) ENGINE = Kafka('localhost:9092', 'topic_ch', 'group_ch', 'JSONEachRow');CREATE TABLE topic_ch_kafka (`timestamp` UInt64,`level` String,`message` String ) ENGINE = Kafka('localhost:9092', 'topic_ch', 'group_ch', 'JSONEachRow')Ok.0 rows in set. Elapsed: 0.007 sec. middleware :)(2)、創(chuàng)建保存清單的表以及以及相應的物化視圖:
DROP TABLE topic_ch_list; CREATE TABLE topic_ch_list (timestamp UInt64,level String,message String ) ENGINE = MergeTree() order by (timestamp);DROP TABLE topic_ch_list_view; CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_listAS SELECT timestamp, level, messageFROM topic_ch_kafka;效果圖:
middleware :) DROP TABLE topic_ch_list;DROP TABLE topic_ch_listReceived exception from server (version 20.9.3): Code: 60. DB::Exception: Received from localhost:9000. DB::Exception: Table default.topic_ch_list doesn't exist.. 0 rows in set. Elapsed: 0.015 sec. middleware :) CREATE TABLE topic_ch_list ( :-] timestamp UInt64, :-] level String, :-] message String :-] ) ENGINE = MergeTree() :-] order by (timestamp);CREATE TABLE topic_ch_list (`timestamp` UInt64,`level` String,`message` String ) ENGINE = MergeTree() ORDER BY timestampOk.0 rows in set. Elapsed: 0.006 sec. middleware :) DROP TABLE topic_ch_list_view;DROP TABLE topic_ch_list_viewReceived exception from server (version 20.9.3): Code: 60. DB::Exception: Received from localhost:9000. DB::Exception: Table default.topic_ch_list_view doesn't exist.. 0 rows in set. Elapsed: 0.002 sec. middleware :) CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_list :-] AS SELECT timestamp, level, message :-] FROM topic_ch_kafka;CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_list AS SELECT timestamp,level,message FROM topic_ch_kafkaOk.0 rows in set. Elapsed: 0.005 sec. middleware :)(3)、創(chuàng)建統(tǒng)計聚合的表以及相應的物化視圖:
DROP TABLE topic_ch_daily;CREATE TABLE topic_ch_daily (day Date,level String,total UInt64) ENGINE = SummingMergeTree(day)ORDER BY (day, level);DROP TABLE topic_ch_daily_view; CREATE MATERIALIZED VIEW topic_ch_daily_view TO topic_ch_dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM topic_ch_kafka GROUP BY day, level;2、生產(chǎn)數(shù)據(jù)
[root@middleware ~]# source /etc/profile [root@middleware ~]# $ZOOKEEPER_HOME/bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /root/apache-zookeeper-3.6.2-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@middleware ~]# # 啟動kafka [root@middleware ~]# cd $KAFKA_HOME [root@middleware kafka_2.12-2.6.0]# bin/kafka-server-start.sh -daemon config/server.properties [root@middleware kafka_2.12-2.6.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets my_topic test topic topic_ch topic_ch2 [root@middleware kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_ch >{"timestamp":1542426134, "level":"high", "message":"hehe"} >{"timestamp":1542427132, "level":"high", "message":"hehe"} >{"timestamp":1542428133, "level":"mid", "message":"hehe"} >{"timestamp":1542429134, "level":"low", "message":"hehe"} >{"timestamp":1542430134, "level":"high", "message":"hehe"} >{"timestamp":1542423134, "level":"low", "message":"hehe"} >{"timestamp":1542434434, "level":"low", "message":"hehe"} >{"timestamp":1542444134, "level":"low", "message":"hehe"} >{"timestamp":1542454136, "level":"high", "message":"hehe"} >{"timestamp":1542464134, "level":"high", "message":"hehe"} >{"timestamp":1542474134, "level":"high", "message":"hehe"} >{"timestamp":1542484134, "level":"low", "message":"hehe"} >{"timestamp":1542494134, "level":"high", "message":"hehe"} >{"timestamp":1542424194, "level":"mid", "message":"hehe"} >查看結(jié)果(數(shù)據(jù)有歷史消息):
middleware :) select * from topic_ch_list;SELECT * FROM topic_ch_list┌──timestamp─┬─level─┬─message─┐ │ 1542474134 │ high │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542423134 │ low │ hehe │ │ 1542424132 │ high │ hehe │ │ 1542424132 │ high │ hehe │ │ 1542424133 │ mid │ hehe │ │ 1542424133 │ mid │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424136 │ high │ hehe │ │ 1542424434 │ low │ hehe │ │ 1542426134 │ high │ hehe │ │ 1542427132 │ high │ hehe │ │ 1542428133 │ mid │ hehe │ │ 1542429134 │ low │ hehe │ │ 1542430134 │ high │ hehe │ │ 1542434134 │ high │ hehe │ │ 1542434434 │ low │ hehe │ │ 1542444134 │ low │ hehe │ │ 1542454136 │ high │ hehe │ │ 1542464134 │ high │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542424194 │ mid │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542484134 │ low │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542494134 │ high │ hehe │ └────────────┴───────┴─────────┘33 rows in set. Elapsed: 0.058 sec. middleware :)聚合統(tǒng)計表:
SELECT level, sum(total) FROM topic_ch_daily GROUP BY level;結(jié)果類似:
┌─level─┬─sum(total)─┐ │ mid │ 3 │ │ low │ 5 │ │ high │ 8 │ └───────┴────────────┘如果要停止接收主題或更改轉(zhuǎn)換邏輯,可以使用下面的命令分離物化視圖(這個是在clickhouse-client -m中執(zhí)行的):
DETACH TABLE consumer; ATTACH TABLE consumer;19.3.示例2:Kafka的配置
通過使用ClickHouse配置文件,Kafka引擎支持擴展配置。有兩個配置key,你可以使用:全局(kafka)和topic-level(kafka_*)。全局的配置在最前面,接著是topic-level的配置。
在目錄/etc/clickhouse-server/config.d/新建配置文件,配資文件名稱任意指定,這里命名為kafka.xml。如下:
Kafka.xml的具體內(nèi)容如下:
<yandex><!-- 下面是通用的配置,支持所有的kafka的topic中的消息 --><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "topic_ch" 只是針對topic_ch這個topic的 --><kafka_topic_ch><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_topic_ch><!-- 這個是針對my_topic這個topic的 --><kafka_my_topic><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_my_topic></yandex>如果想了解更多的關(guān)于這些的可選配置,參見librdkafka configuration reference(https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。使用下劃線(_)代替ClickHouse配置文件中的點。例如:check.crcs=true將會寫成<check_crcs>true</check_crcs>。
總結(jié)
以上是生活随笔為你收集整理的14_clickhouse,kafka引擎,kafka消息到ClickHouse的MergeTree引擎的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 迟志强的歌曲囚歌大全(迟志强歌曲全集囚歌
- 下一篇: 现值指数计算公式推导(现值指数计算公式)