Demo:基于 Flink SQL 构建流式应用
摘要:上周四在 Flink 中文社區(qū)釘釘群中直播分享了《Demo:基于 Flink SQL 構(gòu)建流式應(yīng)用》,直播內(nèi)容偏向?qū)崙?zhàn)演示。這篇文章是對(duì)直播內(nèi)容的一個(gè)總結(jié),并且改善了部分內(nèi)容,比如除 Flink 外其他組件全部采用 Docker Compose 安裝,簡(jiǎn)化準(zhǔn)備流程。讀者也可以結(jié)合視頻和本文一起學(xué)習(xí)。
Flink 1.10.0 于近期剛發(fā)布,釋放了許多令人激動(dòng)的新特性。尤其是 Flink SQL 模塊,發(fā)展速度非常快,因此本文特意從實(shí)踐的角度出發(fā),帶領(lǐng)大家一起探索使用 Flink SQL 如何快速構(gòu)建流式應(yīng)用。
本文將基于 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構(gòu)建一個(gè)電商用戶行為的實(shí)時(shí)分析應(yīng)用。本文所有的實(shí)戰(zhàn)演練都將在 Flink SQL CLI 上執(zhí)行,全程只涉及 SQL 純文本,無需一行 Java/Scala 代碼,無需安裝 IDE。本實(shí)戰(zhàn)演練的最終效果圖:
??準(zhǔn)備
?一臺(tái)裝有 Docker 和 Java8 的 Linux 或 MacOS 計(jì)算機(jī)。
使用 Docker Compose 啟動(dòng)容器
本實(shí)戰(zhàn)演示所依賴的組件全都編排到了容器中,因此可以通過 docker-compose 一鍵啟動(dòng)。你可以通過 wget 命令自動(dòng)下載該 docker-compose.yml 文件,也可以手動(dòng)下載。
mkdir flink-demo; cd flink-demo; wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml該 Docker Compose 中包含的容器有:
-
DataGen:數(shù)據(jù)生成器。容器啟動(dòng)后會(huì)自動(dòng)開始生成用戶行為數(shù)據(jù),并發(fā)送到 Kafka 集群中。默認(rèn)每秒生成 1000 條數(shù)據(jù),持續(xù)生成約 3 小時(shí)。也可以更改 docker-compose.yml 中 datagen 的 speedup 參數(shù)來調(diào)整生成速率(重啟 docker compose 才能生效)。
-
MySQL:集成了 MySQL 5.7 ,以及預(yù)先創(chuàng)建好了類目表(category),預(yù)先填入了子類目與頂級(jí)類目的映射關(guān)系,后續(xù)作為維表使用。
-
Kafka:主要用作數(shù)據(jù)源。DataGen 組件會(huì)自動(dòng)將數(shù)據(jù)灌入這個(gè)容器中。
-
Zookeeper:Kafka 容器依賴。
-
Elasticsearch:主要存儲(chǔ) Flink SQL 產(chǎn)出的數(shù)據(jù)。
-
Kibana:可視化 Elasticsearch 中的數(shù)據(jù)。
在啟動(dòng)容器前,建議修改 Docker 的配置,將資源調(diào)整到 4GB 以及 4核。啟動(dòng)所有的容器,只需要在 docker-compose.yml 所在目錄下運(yùn)行如下命令。
docker-compose up -d該命令會(huì)以 detached 模式自動(dòng)啟動(dòng) Docker Compose 配置中定義的所有容器。你可以通過 docker ps 來觀察上述的五個(gè)容器是否正常啟動(dòng)了。也可以訪問 http://localhost:5601/ 來查看 Kibana 是否運(yùn)行正常。
另外可以通過如下命令停止所有的容器:
docker-compose down下載安裝 Flink 本地集群
我們推薦用戶手動(dòng)下載安裝 Flink,而不是通過 Docker 自動(dòng)啟動(dòng) Flink。因?yàn)檫@樣可以更直觀地理解 Flink 的各個(gè)組件、依賴、和腳本。
1.下載 Flink 1.10.0 安裝包并解壓(解壓目錄 flink-1.10.0):https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
2.進(jìn)入 flink-1.10.0 目錄:cd flink-1.10.0。
3.通過如下命令下載依賴 jar 包,并拷貝到 lib/ 目錄下,也可手動(dòng)下載和拷貝。因?yàn)槲覀冞\(yùn)行時(shí)需要依賴各個(gè) connector 實(shí)現(xiàn)。
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar4.將 conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots?修改成 10,因?yàn)槲覀儠?huì)同時(shí)運(yùn)行多個(gè)任務(wù)。??
5.?執(zhí)行 ./bin/start-cluster.sh,啟動(dòng)集群。
運(yùn)行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。并且可以看到可用 Slots 數(shù)為 10 個(gè)。
6.執(zhí)行 bin/sql-client.sh embedded 啟動(dòng) SQL CLI。便會(huì)看到如下的松鼠歡迎界面。
使用 DDL 創(chuàng)建 Kafka 表
Datagen 容器在啟動(dòng)后會(huì)往 Kafka 的 user_behavior topic 中持續(xù)不斷地寫入數(shù)據(jù)。數(shù)據(jù)包含了2017年11月27日一天的用戶行為(行為包括點(diǎn)擊、購(gòu)買、加購(gòu)、喜歡),每一行表示一條用戶行為,以 JSON 的格式由用戶ID、商品ID、商品類目ID、行為類型和時(shí)間組成。該原始數(shù)據(jù)集來自阿里云天池公開數(shù)據(jù)集,特此鳴謝。
我們可以在 docker-compose.yml 所在目錄下運(yùn)行如下命令,查看 Kafka 集群中生成的前10條數(shù)據(jù)。
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10' {"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} {"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} ...有了數(shù)據(jù)源后,我們就可以用 DDL 去創(chuàng)建并連接這個(gè) Kafka 中的 topic 了。在 Flink SQL CLI 中執(zhí)行該 DDL。
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(), -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定義watermark,ts成為事件時(shí)間列 ) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_behavior', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址'format.type' = 'json' -- 數(shù)據(jù)源格式為 json );如上我們按照數(shù)據(jù)的格式聲明了 5 個(gè)字段,除此之外,我們還通過計(jì)算列語法和 PROCTIME() 內(nèi)置函數(shù)聲明了一個(gè)產(chǎn)生處理時(shí)間的虛擬列。我們還通過 WATERMARK 語法,在 ts 字段上聲明了 watermark 策略(容忍5秒亂序), ts 字段因此也成了事件時(shí)間列。關(guān)于時(shí)間屬性以及 DDL 語法可以閱讀官方文檔了解更多:
-
時(shí)間屬性:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html
-
DDL:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
在 SQL CLI 中成功創(chuàng)建 Kafka 表后,可以通過 show tables; 和 describe user_behavior; 來查看目前已注冊(cè)的表,以及表的詳細(xì)信息。我們也可以直接在 SQL CLI 中運(yùn)行 SELECT * FROM user_behavior; 預(yù)覽下數(shù)據(jù)(按q退出)。
接下來,我們會(huì)通過三個(gè)實(shí)戰(zhàn)場(chǎng)景來更深入地了解 Flink SQL 。
統(tǒng)計(jì)每小時(shí)的成交量
使用 DDL 創(chuàng)建 Elasticsearch 表
我們先在 SQL CLI 中創(chuàng)建一個(gè) ES 結(jié)果表,根據(jù)場(chǎng)景需求主要需要保存兩個(gè)數(shù)據(jù):小時(shí)、成交量。
CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT,buy_cnt BIGINT ) WITH ('connector.type' = 'elasticsearch', -- 使用 elasticsearch connector'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相當(dāng)于數(shù)據(jù)庫(kù)的表名'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當(dāng)于數(shù)據(jù)庫(kù)的庫(kù)名'connector.bulk-flush.max-actions' = '1', -- 每條數(shù)據(jù)都刷新'format.type' = 'json', -- 輸出數(shù)據(jù)格式 json'update-mode' = 'append' );我們不需要在 Elasticsearch 中事先創(chuàng)建 buy_cnt_per_hour 索引,Flink Job 會(huì)自動(dòng)創(chuàng)建該索引。
提交 Query
統(tǒng)計(jì)每小時(shí)的成交量就是每小時(shí)共有多少 "buy" 的用戶行為。因此會(huì)需要用到 TUMBLE 窗口函數(shù),按照一小時(shí)切窗。然后每個(gè)窗口分別統(tǒng)計(jì) "buy" 的個(gè)數(shù),這可以通過先過濾出 "buy" 的數(shù)據(jù),然后 COUNT(*) 實(shí)現(xiàn)。
INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);這里我們使用 HOUR 內(nèi)置函數(shù),從一個(gè) TIMESTAMP 列中提取出一天中第幾個(gè)小時(shí)的值。使用了 INSERT INTO將 query 的結(jié)果持續(xù)不斷地插入到上文定義的 es 結(jié)果表中(可以將 es 結(jié)果表理解成 query 的物化視圖)。另外可以閱讀該文檔了解更多關(guān)于窗口聚合的內(nèi)容:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
在 Flink SQL CLI 中運(yùn)行上述查詢后,在 Flink Web UI 中就能看到提交的任務(wù),該任務(wù)是一個(gè)流式任務(wù),因此會(huì)一直運(yùn)行。
可以看到凌晨是一天中成交量的低谷。
使用 Kibana 可視化結(jié)果
我們已經(jīng)通過 Docker Compose 啟動(dòng)了 Kibana 容器,可以通過 http://localhost:5601 訪問 Kibana。首先我們需要先配置一個(gè) index pattern。點(diǎn)擊左側(cè)工具欄的 "Management",就能找到 "Index Patterns"。點(diǎn)擊 "Create Index Pattern",然后通過輸入完整的索引名 "buy_cnt_per_hour" 創(chuàng)建 index pattern。創(chuàng)建完成后, Kibana 就知道了我們的索引,我們就可以開始探索數(shù)據(jù)了。
先點(diǎn)擊左側(cè)工具欄的"Discovery"按鈕,Kibana 就會(huì)列出剛剛創(chuàng)建的索引中的內(nèi)容。
接下來,我們先創(chuàng)建一個(gè) Dashboard 用來展示各個(gè)可視化的視圖。點(diǎn)擊頁面左側(cè)的"Dashboard",創(chuàng)建一個(gè)名為 ”用戶行為日志分析“ 的Dashboard。然后點(diǎn)擊 "Create New" 創(chuàng)建一個(gè)新的視圖,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側(cè))畫出成交量面積圖,并保存為”每小時(shí)成交量“。
統(tǒng)計(jì)一天每10分鐘累計(jì)獨(dú)立用戶數(shù)
另一個(gè)有意思的可視化是統(tǒng)計(jì)一天中每一刻的累計(jì)獨(dú)立用戶數(shù)(uv),也就是每一刻的 uv 數(shù)都代表從0點(diǎn)到當(dāng)前時(shí)刻為止的總計(jì) uv 數(shù),因此該曲線肯定是單調(diào)遞增的。
我們?nèi)匀幌仍?SQL CLI 中創(chuàng)建一個(gè) Elasticsearch 表,用于存儲(chǔ)結(jié)果匯總數(shù)據(jù)。主要有兩個(gè)字段:時(shí)間和累積 uv 數(shù)。
CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT ) WITH ('connector.type' = 'elasticsearch','connector.version' = '6','connector.hosts' = 'http://localhost:9200','connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior','format.type' = 'json','update-mode' = 'upsert' );為了實(shí)現(xiàn)該曲線,我們可以先通過 OVER WINDOW 計(jì)算出每條數(shù)據(jù)的當(dāng)前分鐘,以及當(dāng)前累計(jì) uv(從0點(diǎn)開始到當(dāng)前行為止的獨(dú)立用戶數(shù))。uv 的統(tǒng)計(jì)我們通過內(nèi)置的 COUNT(DISTINCT user_id)來完成,Flink SQL 內(nèi)部對(duì) COUNT DISTINCT 做了非常多的優(yōu)化,因此可以放心使用。
CREATE VIEW uv_per_10min AS SELECT MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);這里我們使用 SUBSTR 和 ?DATE_FORMAT 還有 || 內(nèi)置函數(shù),將一個(gè) TIMESTAMP 字段轉(zhuǎn)換成了 10分鐘單位的時(shí)間字符串,如: 12:10, 12:20。關(guān)于 OVER WINDOW 的更多內(nèi)容可以參考文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations
我們還使用了 CREATE VIEW 語法將 query 注冊(cè)成了一個(gè)邏輯視圖,可以方便地在后續(xù)查詢中對(duì)該 query 進(jìn)行引用,這有利于拆解復(fù)雜 query。注意,創(chuàng)建邏輯視圖不會(huì)觸發(fā)作業(yè)的執(zhí)行,視圖的結(jié)果也不會(huì)落地,因此使用起來非常輕量,沒有額外開銷。由于 uv_per_10min 每條輸入數(shù)據(jù)都產(chǎn)生一條輸出數(shù)據(jù),因此對(duì)于存儲(chǔ)壓力較大。我們可以基于 uv_per_10min 再根據(jù)分鐘時(shí)間進(jìn)行一次聚合,這樣每10分鐘只有一個(gè)點(diǎn)會(huì)存儲(chǔ)在 Elasticsearch 中,對(duì)于 Elasticsearch 和 Kibana 可視化渲染的壓力會(huì)小很多。
INSERT INTO cumulative_uv SELECT time_str, MAX(uv) FROM uv_per_10min GROUP BY time_str;提交上述查詢后,在 Kibana 中創(chuàng)建 cumulative_uv 的 index pattern,然后在 Dashboard 中創(chuàng)建一個(gè)"Line"折線圖,選擇 cumulative_uv 索引,按照如下截圖中的配置(左側(cè))畫出累計(jì)獨(dú)立用戶數(shù)曲線,并保存。
頂級(jí)類目排行榜
最后一個(gè)有意思的可視化是類目排行榜,從而了解哪些類目是支柱類目。不過由于源數(shù)據(jù)中的類目分類太細(xì)(約5000個(gè)類目),對(duì)于排行榜意義不大,因此我們希望能將其歸約到頂級(jí)類目。所以筆者在 mysql 容器中預(yù)先準(zhǔn)備了子類目與頂級(jí)類目的映射數(shù)據(jù),用作維表。
在 SQL CLI 中創(chuàng)建 MySQL 表,后續(xù)用作維表查詢。
CREATE TABLE category_dim (sub_category_id BIGINT, -- 子類目parent_category_id BIGINT -- 頂級(jí)類目 ) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://localhost:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = '123456','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min' );同時(shí)我們?cè)賱?chuàng)建一個(gè) Elasticsearch 表,用于存儲(chǔ)類目統(tǒng)計(jì)結(jié)果。
CREATE TABLE top_category (category_name STRING, -- 類目名稱buy_cnt BIGINT -- 銷量 ) WITH ('connector.type' = 'elasticsearch','connector.version' = '6','connector.hosts' = 'http://localhost:9200','connector.index' = 'top_category','connector.document-type' = 'user_behavior','format.type' = 'json','update-mode' = 'upsert' );第一步我們通過維表關(guān)聯(lián),補(bǔ)全類目名稱。我們?nèi)匀皇褂?CREATE VIEW 將該查詢注冊(cè)成一個(gè)視圖,簡(jiǎn)化邏輯。維表關(guān)聯(lián)使用 temporal join 語法,可以查看文檔了解更多:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, CASE C.parent_category_idWHEN 1 THEN '服飾鞋包'WHEN 2 THEN '家裝家飾'WHEN 3 THEN '家電'WHEN 4 THEN '美妝'WHEN 5 THEN '母嬰'WHEN 6 THEN '3C數(shù)碼'WHEN 7 THEN '運(yùn)動(dòng)戶外'WHEN 8 THEN '食品'ELSE '其他'END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id;最后根據(jù) 類目名稱分組,統(tǒng)計(jì)出 buy 的事件數(shù),并寫入 Elasticsearch 中。
INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name;提交上述查詢后,在 Kibana 中創(chuàng)建 top_category 的 index pattern,然后在 Dashboard 中創(chuàng)建一個(gè)"Horizontal Bar"條形圖,選擇 top_category 索引,按照如下截圖中的配置(左側(cè))畫出類目排行榜,并保存。
可以看到服飾鞋包的成交量遠(yuǎn)遠(yuǎn)領(lǐng)先其他類目。
Kibana 還提供了非常豐富的圖形和可視化選項(xiàng),感興趣的用戶可以用 Flink SQL 對(duì)數(shù)據(jù)進(jìn)行更多維度的分析,并使用 Kibana 展示出可視化圖,并觀測(cè)圖形數(shù)據(jù)的實(shí)時(shí)變化。
結(jié)尾
在本文中,我們展示了如何使用 Flink SQL 集成 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個(gè)實(shí)時(shí)分析應(yīng)用。整個(gè)過程無需一行 Java/Scala 代碼,使用 SQL 純文本即可完成。期望通過本文,可以讓讀者了解到 Flink SQL 的易用和強(qiáng)大,包括輕松連接各種外部系統(tǒng)、對(duì)事件時(shí)間和亂序數(shù)據(jù)處理的原生支持、維表關(guān)聯(lián)、豐富的內(nèi)置函數(shù)等等。希望你能喜歡我們的實(shí)戰(zhàn)演練,并從中獲得樂趣和知識(shí)!
作者介紹:
伍翀(云邪),Apache Flink PMC member & Committer,阿里巴巴技術(shù)專家,北京理工大學(xué)碩士畢業(yè)。2015年加入阿里巴巴,從事 JStorm 的開發(fā)與設(shè)計(jì)。自2016年開始長(zhǎng)期活躍于 Flink 社區(qū),Flink/Blink SQL 模塊的核心開發(fā)之一。
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Demo:基于 Flink SQL 构建流式应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 白话 Session 与 Cookie:
- 下一篇: 性能提升约 7 倍!Apache Fli