2021年大数据Flink(三十八):Table与SQL 案例五 FlinkSQL整合Hive
目錄
案例五 FlinkSQL整合Hive
介紹
集成Hive的基本方式
準備工作
1.添加hadoop_classpath
2.下載jar并上傳至flink/lib目錄
3.修改hive配置
4.啟動hive元數據服務
SQL CLI
1.修改flinksql配置
2.啟動flink集群
3.啟動flink-sql客戶端
4.執行sql:
代碼演示
案例五 FlinkSQL整合Hive
介紹
Apache Flink 1.12 Documentation: Hive
Flink集成Hive之快速入門--以Flink1.12為例 - 知乎
使用Hive構建數據倉庫已經成為了比較普遍的一種解決方案。目前,一些比較常見的大數據處理引擎,都無一例外兼容Hive。Flink從1.9開始支持集成Hive,不過1.9版本為beta版,不推薦在生產環境中使用。在Flink1.10版本中,標志著對 Blink的整合宣告完成,對 Hive 的集成也達到了生產級別的要求。值得注意的是,不同版本的Flink對于Hive的集成有所差異,接下來將以最新的Flink1.12版本為例,實現Flink集成Hive
集成Hive的基本方式
Flink 與 Hive 的集成主要體現在以下兩個方面:
- 持久化元數據
Flink利用 Hive 的 MetaStore 作為持久化的 Catalog,我們可通過HiveCatalog將不同會話中的 Flink 元數據存儲到 Hive Metastore 中。例如,我們可以使用HiveCatalog將其 Kafka的數據源表存儲在 Hive Metastore 中,這樣該表的元數據信息會被持久化到Hive的MetaStore對應的元數據庫中,在后續的 SQL 查詢中,我們可以重復使用它們。
- 利用 Flink 來讀寫 Hive 的表
Flink打通了與Hive的集成,如同使用SparkSQL或者Impala操作Hive中的數據一樣,我們可以使用Flink直接讀寫Hive中的表。
HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive表。不需要修改現有的 Hive Metastore,也不需要更改表的數據位置或分區。
???????準備工作
1.添加hadoop_classpath
vim /etc/profile
增加如下配置
export HADOOP_CLASSPATH=`hadoop classpath`
刷新配置
source /etc/profile
2.下載jar并上傳至flink/lib目錄
Apache Flink 1.12 Documentation: Hive
3.修改hive配置
vim /export/server/hive/conf/hive-site.xml
<property><name>hive.metastore.uris</name><value>thrift://node3:9083</value></property>
4.啟動hive元數據服務
nohup /export/server/hive/bin/hive?--service metastore &
???????SQL CLI
1.修改flinksql配置
vim /export/server/flink/conf/sql-client-defaults.yaml
增加如下配置
catalogs:- name: myhivetype: hivehive-conf-dir: /export/server/hive/confdefault-database: default
2.啟動flink集群
/export/server/flink/bin/start-cluster.sh
3.啟動flink-sql客戶端
/export/server/flink/bin/sql-client.sh embedded
4.執行sql:
show catalogs;use catalog myhive;show tables;select * from person;
???????代碼演示
package cn.it.extend;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** Author lanson* Desc*/
public class HiveDemo {public static void main(String[] args){EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();TableEnvironment tableEnv = TableEnvironment.create(settings);String name ???????????= "myhive";String defaultDatabase = "default";String hiveConfDir = "./conf";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);//注冊catalogtableEnv.registerCatalog("myhive", hive);//使用注冊的catalogtableEnv.useCatalog("myhive");//向Hive表中寫入數據String insertSQL = "insert into person select * from person";TableResult result = tableEnv.executeSql(insertSQL);System.out.println(result.getJobClient().get().getJobStatus());}
}
總結
以上是生活随笔為你收集整理的2021年大数据Flink(三十八):Table与SQL 案例五 FlinkSQL整合Hive的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(三十七):
- 下一篇: 2021年大数据Flink(三十九):