Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式)
##############################################實驗目的和環境###############################################
本文是為了在最新的版本上復現[1],環境如下:
| 組件 | 版本 |
| Zookeeper | 3.6.0 |
| Flink | 1.12 |
| Mysql | 8.0.22-0ubuntu0.20.04.2 |
| Kafka | 2.5.0 |
[1]的作者Jark 采用了Java代碼的形式,這篇博客對其流程進行了等效簡化,
采用Flink SQL Client上純DDL+SQL 的形式,全篇無一行Java代碼.
?
注意關閉防火墻
service firewalld stop
啟動zookeeper,hadoop(這個應該沒用,但是平時習慣了,也啟動吧),flink,mysql,kafka
138210 StandaloneSessionClusterEntrypoint
24593 ResourceManager
115362 QuorumPeerMain
24006 DataNode
124727 Kafka
138491 TaskManagerRunner
138603 Jps
23817 NameNode
24315 SecondaryNameNode
79162 SqlClient
24798 NodeManager
############################################################################################################
文件作用解析:
| Jark給的文件 | 文件的作用 | 需要修改的地方 |
| kafka-common.sh | kafka生產端輸入數據 ? | brokers/ids/1 localhost改成自己的節點域名 |
| run.sh | 用來在 Web UI 中看到拓撲 | 無 |
| source-generator.sh | 創建topic 往kafka里面填充數據 | --broker-list后面改掉 topic改成自己需要的 |
| pom.xml | 依賴文件 | 版本號根據自己需要修改即可 |
| kafka-consumer.sh | kafka消費端 | ? |
| env.sh | 環境變量設置 | FLINK_DIR KAFKA_DIR |
| src/main/resources/user_behavior.log | 數據來源 | 無 |
| src/main/resources/q1.sql | source table定義 sink table定義 | connector.properties.0.value connector.properties.1.value connector.url connector.username connector.password ? |
| *.java | ? | 無 |
操作步驟:
| 操作命令 | 作用 |
| ? 一些準備工作 mysql> create database `flink-test`;(因為這里有橫杠,所以需要使用``包起來) datagrip建表語句 create table pvuv_sink 注意,dt不要使用varchar,否則會導致無法設定為primary key | 在mysql中建立sink表格 |
| intellij運行SqlSubmit | 生成flink-sql-submit.jar 被下面的source-generator.sh調用 |
| ./source-generator.sh | 創建kafka的topic,然后往里面填充數據 |
| ./run.sh q1 | ?提交成功后,可以在 Web UI 中看到拓撲 |
該實驗的kafka主題是user_behavior
############################################################################################################
kafka常用操作如下:
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 如果想刪除topic,可以是: ?
|
| 往user_behavior這個 topic發送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic?user_behavior | 這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致 [2]中的報錯還可能是某個節點的kafka掛掉導致的. ? 可能碰到[3] 注意關閉防火墻 ? ? |
| 使用kafka自帶消費端測試下消費 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic?user_behavior | 如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了, 此時如果使用Flink SQL Client來消費也必然會出現問題 |
| 清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic?user_behavior | 需要$KAFKA/config/server.properties設置 delete.topic.enable=true |
傳入kafka的user_behavior的數據舉例如下(完整數據集):
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
############################################################################################################
下面時對q1.sql中的DDL/SQL的簡要介紹
| DDL/SQL語句 | 作用 |
| CREATE TABLE user_log ( | 接收數據源頭 |
| CREATE TABLE pvuv_sink ( | 數據存儲目標 |
| INSERT INTO pvuv_sink SELECT ? DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, ? COUNT(*) AS pv, ? COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'); | 對source的數據的處理, 指定要存入的sink |
網頁可能會污染上述DDL導致各種報錯,以下面為準:
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/flink-sql-submit/src/main/resources/q1.sql
?
整個實驗操作步驟流程其實就是一句話:
kafka往user_behavior這個topic填入數據以后
啟動Flink SQL Client
$FLINK_HOME/bin/sql-client.sh embedded
然后上面2句DDL+1句SQL全部復制到Flink SQL Client按下回車,
就會自動生成任務提交到Flink集群,實驗結束.
?
?
最終實驗效果如下:
?
一點題外話:
整個實驗運作起來后,硬盤磁頭一直在響,所以一旦看到mysql中有數據sink以后,
立刻關掉flink集群,不然實在太傷硬盤了.
Reference:
[1]Flink 1.9 實戰:使用 SQL 讀取 Kafka 并寫入 MySQ
[2]Flink 1.10 SQL 讀寫Kafka
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink SQL Client读Kaf
- 下一篇: IPv6地址配置方式 IPV6地址设置)