ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
-
Kafka表集成引擎
-
配置
- Kerberos 支持
- 虛擬列
-
配置
- 資料分享
- 參考文章
Kafka表集成引擎
此引擎與Apache Kafka結合使用。
Kafka 特性:
- 發布或者訂閱數據流。
- 容錯存儲機制。
- 處理流數據。
老版Kafka集成表引擎參數格式:
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
新版Kafka集成表引擎參數格式:
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
kafka_schema = '',
kafka_num_consumers = 2
必要參數:
-
kafka_broker_list– 以逗號分隔的 brokers 列表 (localhost:9092)。 -
kafka_topic_list– topic 列表 (my_topic)。 -
kafka_group_name– Kafka 消費組名稱 (group1)。如果不希望消息在集群中重復,請在每個分片中使用相同的組名。 -
kafka_format– 消息體格式。使用與 SQL 部分的FORMAT函數相同表示方法,例如JSONEachRow。
可選參數:
-
kafka_row_delimiter- 每個消息體(記錄)之間的分隔符。 -
kafka_schema– 如果解析格式需要一個 schema 時,此參數必填。 -
kafka_num_consumers– 單個表的消費者數量。默認值是:1,如果一個消費者的吞吐量不足,則指定更多的消費者。消費者的總數不應該超過 topic 中分區的數量,因為每個分區只能分配一個消費者。
ClickHouse可以接受和返回各種格式的數據。受支持的輸入格式可用于提交給INSERT語句、從文件表(File,URL,HDFS或者外部目錄)執行SELECT語句,受支持的輸出格式可用于格式化SELECT語句的返回結果,或者通過INSERT寫入到文件表。
以下kafka_format是支持的格式,ClickHouse可以接受和返回各種格式的數據。受支持的輸入格式可用于提交給INSERT語句、從文件表(File,URL,HDFS或者外部目錄)執行SELECT語句,受支持的輸出格式可用于格式化SELECT語句的返回結果,或者通過INSERT寫入到文件表。
| 格式 | 輸入 | 輸出 |
|---|---|---|
| [TabSeparated] | ? | ? |
| [TabSeparatedRaw] | ? | ? |
| [TabSeparatedWithNames] | ? | ? |
| [TabSeparatedWithNamesAndTypes] | ? | ? |
| [Template] | ? | ? |
| [TemplateIgnoreSpaces] | ? | ? |
| [CSV] | ? | ? |
| [CSVWithNames] | ? | ? |
| [CustomSeparated] | ? | ? |
| [Values] | ? | ? |
| [Vertical] | ? | ? |
| [JSON] | ? | ? |
| [JSONAsString] | ? | ? |
| [JSONStrings] | ? | ? |
| [JSONCompact] | ? | ? |
| [JSONCompactStrings] | ? | ? |
| [JSONEachRow] | ? | ? |
| [JSONEachRowWithProgress] | ? | ? |
| [JSONStringsEachRow] | ? | ? |
| [JSONStringsEachRowWithProgress] | ? | ? |
| [JSONCompactEachRow] | ? | ? |
| [JSONCompactEachRowWithNamesAndTypes] | ? | ? |
| [JSONCompactStringsEachRow] | ? | ? |
| [JSONCompactStringsEachRowWithNamesAndTypes] | ? | ? |
| [TSKV] | ? | ? |
| [Pretty] | ? | ? |
| [PrettyCompact] | ? | ? |
| [PrettyCompactMonoBlock] | ? | ? |
| [PrettyNoEscapes] | ? | ? |
| [PrettySpace] | ? | ? |
| [Protobuf] | ? | ? |
| [ProtobufSingle] | ? | ? |
| [Avro] | ? | ? |
| [AvroConfluent] | ? | ? |
| [Parquet] | ? | ? |
| [Arrow] | ? | ? |
| [ArrowStream] | ? | ? |
| [ORC] | ? | ? |
| [RowBinary] | ? | ? |
| [RowBinaryWithNamesAndTypes] | ? | ? |
| [Native] | ? | ? |
| [Null] | ? | ? |
| [XML] | ? | ? |
| [CapnProto] | ? | ? |
| [LineAsString] | ? | ? |
| [Regexp] | ? | ? |
| [RawBLOB] | ? | ? |
示例:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
消費的消息會被自動追蹤,因此每個消息在不同的消費組里只會記錄一次。如果希望獲得兩次數據,則使用另一個組名創建副本。
消費組可以靈活配置并且在集群之間同步。例如,如果群集中有10個主題和5個表副本,則每個副本將獲得2個主題。 如果副本數量發生變化,主題將自動在副本中重新分配。
SELECT 查詢對于讀取消息并不是很有用(調試除外),因為每條消息只能被讀取一次。使用物化視圖創建實時線程更實用。您可以這樣做:
- 使用引擎創建一個 Kafka 消費者并作為一條數據流。
- 創建一個結構表。
- 創建物化視圖,改視圖會在后臺轉換引擎中的數據并將其放入之前創建的表中。
當 MATERIALIZED VIEW 添加至引擎,它將會在后臺收集數據。可以持續不斷地從 Kafka 收集數據并通過 SELECT 將數據轉換為所需要的格式。
示例:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
為了提高性能,接受的消息被分組為max_insert_block_size大小的塊。如果未在stream_flush_interval_ms毫秒內形成塊,則不關心塊的完整性,都會將數據刷新到表中。
停止接收主題數據或更改轉換邏輯,請 detach 物化視圖:
DETACH TABLE consumer;
ATTACH TABLE consumer;
如果使用 ALTER 更改目標表,為了避免目標表與視圖中的數據之間存在差異,推薦停止物化視圖。
配置
與 GraphiteMergeTree 類似,Kafka 引擎支持使用ClickHouse配置文件進行擴展配置。可以使用兩個配置鍵:全局 (kafka) 和 主題級別 (kafka_*)。首先應用全局配置,然后應用主題級配置(如果存在)。
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific for topic "logs" -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
在ClickHouse配置中使用下劃線 (_) ,并不是使用點 (.)。例如,check.crcs=true 將是 <check_crcs>true</check_crcs>。
Kerberos 支持
對于使用了kerberos的kafka, 將security_protocol 設置為sasl_plaintext就夠了,如果kerberos的ticket是由操作系統獲取和緩存的。
clickhouse也支持自己使用keyfile的方式來維護kerbros的憑證。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三個子元素就可以。
示例:
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
虛擬列
-
_topic– Kafka 主題。 -
_key– 信息的鍵。 -
_offset– 消息的偏移量。 -
_timestamp– 消息的時間戳。 -
_timestamp_ms– 消息的時間戳(毫秒)。 -
_partition– Kafka 主題的分區。
資料分享
ClickHouse經典中文文檔分享
參考文章
- ClickHouse(01)什么是ClickHouse,ClickHouse適用于什么場景
- ClickHouse(02)ClickHouse架構設計介紹概述與ClickHouse數據分片設計
- ClickHouse(03)ClickHouse怎么安裝和部署
- ClickHouse(04)如何搭建ClickHouse集群
- ClickHouse(05)ClickHouse數據類型詳解
- ClickHouse(06)ClickHouse建表語句DDL詳細解析
- ClickHouse(07)ClickHouse數據庫引擎解析
- ClickHouse(08)ClickHouse表引擎概況
- ClickHouse(09)ClickHouse合并樹MergeTree家族表引擎之MergeTree詳細解析
- ClickHouse(10)ClickHouse合并樹MergeTree家族表引擎之ReplacingMergeTree詳細解析
- ClickHouse(11)ClickHouse合并樹MergeTree家族表引擎之SummingMergeTree詳細解析
- ClickHouse(12)ClickHouse合并樹MergeTree家族表引擎之AggregatingMergeTree詳細解析
- ClickHouse(13)ClickHouse合并樹MergeTree家族表引擎之CollapsingMergeTree詳細解析
- ClickHouse(14)ClickHouse合并樹MergeTree家族表引擎之VersionedCollapsingMergeTree詳細解析
- ClickHouse(15)ClickHouse合并樹MergeTree家族表引擎之GraphiteMergeTree詳細解析
- ClickHouse(16)ClickHouse日志引擎Log詳細解析
- ClickHouse(17)ClickHouse集成JDBC表引擎詳細解析
- ClickHouse(18)ClickHouse集成ODBC表引擎詳細解析
- ClickHouse(19)ClickHouse集成Hive表引擎詳細解析
- ClickHouse(20)ClickHouse集成PostgreSQL表引擎詳細解析
總結
以上是生活随笔為你收集整理的ClickHouse(21)ClickHouse集成Kafka表引擎详细解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊一聊如何整合Microsoft.Ext
- 下一篇: 文心一言 VS 讯飞星火 VS chat