Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)
概述:
對參考鏈接[1]進行DDL上的復現。
一些基本的業務常識
| ? | 來源載體 | 數據特點 |
| 維表 | Mysql/Csv/Hbase | 很少變化 |
| 事實表 | Kafka | 不停變化 |
?
開發環境與準備工作
| 組件 | 版本 |
| Flink(HA) | 1.12 |
| Zookeeper | 3.6.0 |
| Hadoop | 3.1.2 |
| Kafka(HA) | 2.5.0 |
| Mysql | 8.0.22-0ubuntu0.20.04.2 (Ubuntu) |
關閉防火墻:
service firewalld stop
然后啟動上述所有集群
實驗框架
數據集+完整實驗步驟+DDL/SQL
①CSV數據集如下(供t1讀取)
?
把數據集放到HDFS上面去:
hdfs dfs -mkdir /test
hdfs dfs -put UserBehavior.csv /test
創建mysql表格
create database dijie_test;
use dijie_test;
-- Mysql 建表語句,注意這是在Mysql執行的!不要在Zeppelin執行CREATE TABLE `dim_behavior` (`id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',`en_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '英文 行為',`zh_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '中文 行為',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;-- 搞兩條數據INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (1, 'buy', '購買');INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (2, 'pv', '瀏覽');這樣外部準備工作(CSV和mysql)就算結束了。
②操作Flink SQL Client步驟
| 詳細操作步驟 | 備注 | 中間實驗效果 |
| 創建t1(見下方gitee鏈接) | 定義source | ? |
| select * from t1; | 檢查flink sql client是否被順利讀取到 | ? |
| 創建t2(見下方gitee鏈接) | 定義sink | ? |
| insert into t2 select user_id,item_id,category_id,behavior,UNIX_TIMESTAMP() as ts from t1; | csv寫入kafka | |
| $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test | kafka的消費端檢查下是否真的存入了這些數據 | ? |
| 創建t3(見下方gitee鏈接) | 建立事實表source | ? |
| select * from t3 | 確保事實表格的每條數據有出現在kafka中 | ? |
| 創建dim_behavior(見下方gitee鏈接) | 建立維度表source | ? |
| 進行left join查詢(見下方gitee鏈接) | 最終結果 |
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/KafkaMysqlJoinDDL.sql
上述表格看著挺凌亂,時刻記住最上方的框圖,出現問題時,要在本文開頭的框圖中定位是哪個環節出了問題,才好調試。
###################################################################################################################################################
附錄
可能用到的kafka操作
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往zeppelin_01_test這個 topic發送 json 消息 | ? $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic zeppelin_01_test | 這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致 [2]中的報錯還可能是某個節點的kafka掛掉導致的. ? 可能碰到[3] 注意關閉防火墻 ? ? |
| 使用kafka自帶消費端測試下消費 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test | 如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了, 此時如果使用Flink SQL Client來消費也必然會出現問題 |
| 清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic zeppelin_01_test | 需要$KAFKA/config/server.properties設置 delete.topic.enable=true |
?
#######################################################################################################################
?
Reference:
[1]https://blog.csdn.net/weixin_47482194/article/details/106672613
[2]Kafka->Flink->Hbase(純DDL/SQL形式)
總結
以上是生活随笔為你收集整理的Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于宁波一些眼科流传的营养针
- 下一篇: 彻底理解IP地址分类与CIDR IP地址