Spark On MaxCompute如何访问Phonix数据
簡介:?如何使用Spark On MaxCompute連接Phonix,將Hbase的數(shù)據(jù)寫入到MaxCompute的對(duì)應(yīng)表中,目前沒有對(duì)應(yīng)的案例,為了滿足用戶的需求。本文主要講解使用Spark連接Phonix訪問Hbase的數(shù)據(jù)再寫入到MaxCompute方案實(shí)踐。該方案的驗(yàn)證是使用hbase1.1對(duì)應(yīng)Phonix為4.12.0。本文從阿里云Hbase版本的選擇、確認(rèn)VPC、vswitchID、設(shè)置白名單和訪問方式,Phonix4.12.0的客戶端安裝,在客戶端實(shí)現(xiàn)Phonix表的創(chuàng)建和寫入,Spark代碼在本地IDEA的編寫以及pom文件以及vpcList的配置,打包上傳jar包并進(jìn)行冒煙測(cè)試。
一、購買Hbase1.1并設(shè)置對(duì)應(yīng)資源
1.1購買hbase
hbase主要版本為2.0與1.1,這邊選擇對(duì)應(yīng)hbase對(duì)應(yīng)的版本為1.1
Hbase與Hbase2.0版本的區(qū)別
HBase1.1版本
1.1版本基于HBase社區(qū)1.1.2版本開發(fā)。
HBase2.0版本
2.0版本是基于社區(qū)2018年發(fā)布的HBase2.0.0版本開發(fā)的全新版本。同樣,在此基礎(chǔ)上,做了大量的改進(jìn)和優(yōu)化,吸收了眾多阿里內(nèi)部成功經(jīng)驗(yàn),比社區(qū)HBase版本具有更好的穩(wěn)定性和性能。
1.2確認(rèn)VPC,vsWitchID
確保測(cè)試聯(lián)通性的可以方便可行,該hbase的VPCId,vsWitchID盡量與購買的獨(dú)享集成資源組的為一致的,獨(dú)享集成資源的文檔可以參考https://help.aliyun.com/document_detail/137838.html
1.3設(shè)置hbase白名單,其中DataWorks白名單如下,個(gè)人ECS也可添加
根據(jù)文檔鏈接選擇對(duì)應(yīng)的DataWorks的region下的白名單進(jìn)行添加https://help.aliyun.com/document_detail/137792.html
1.4查看hbase對(duì)應(yīng)的版本和訪問地址
打開數(shù)據(jù)庫鏈接的按鈕,可以查看到Hbase的主版本以及Hbase的專有網(wǎng)絡(luò)訪問地址,以及是否開通公網(wǎng)訪問的方式進(jìn)行連接。
二、安裝Phonix客戶端,并創(chuàng)建表和插入數(shù)據(jù)
2.1安裝客戶端
根據(jù)hbase的版本為1.1選擇Phonix的版本為4.12.0根據(jù)文檔https://help.aliyun.com/document_detail/53600.html?下載對(duì)應(yīng)的客戶端文件ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz
登陸客戶端執(zhí)行命令
創(chuàng)建表:
插入數(shù)據(jù):
UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');2.2查看是否創(chuàng)建和插入成功
在客戶端執(zhí)行命令,查看當(dāng)前表與數(shù)據(jù)是否上傳成功
select * from users;三、編寫對(duì)應(yīng)代碼邏輯
3.1編寫代碼邏輯
在IDEA按照對(duì)應(yīng)得Pom文件進(jìn)行配置本地得開發(fā)環(huán)境,將代碼涉及到得配置信息填寫完整,進(jìn)行編寫測(cè)試,這里可以先使用Hbase得公網(wǎng)訪問鏈接進(jìn)行測(cè)試,代碼邏輯驗(yàn)證成功后可調(diào)整配置參數(shù),具體代碼如下
package com.git.phonix import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.phoenix.spark._ /*** 本實(shí)例適用于Phoenix 4.x版本*/ object SparkOnPhoenix4xSparkSession {def main(args: Array[String]): Unit = {//HBase集群的ZK鏈接地址。//格式為:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181val zkAddress = args(0)//Phoenix側(cè)的表名,需要在Phoenix側(cè)提前創(chuàng)建。Phoenix表創(chuàng)建可以參考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUWval phoenixTableName = args(1)//Spark側(cè)的表名。val ODPSTableName = args(2)val sparkSession = SparkSession.builder().appName("SparkSQL-on-MaxCompute").config("spark.sql.broadcastTimeout", 20 * 60).config("spark.sql.crossJoin.enabled", true).config("odps.exec.dynamic.partition.mode", "nonstrict")//.config("spark.master", "local[4]") // 需設(shè)置spark.master為local[N]才能直接運(yùn)行,N為并發(fā)數(shù).config("spark.hadoop.odps.project.name", "***").config("spark.hadoop.odps.access.id", "***").config("spark.hadoop.odps.access.key", "***")//.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api").config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api").config("spark.sql.catalogImplementation", "odps").getOrCreate()//第一種插入方式var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load()df.show()df.write.mode("overwrite").insertInto(ODPSTableName)} }3.2對(duì)應(yīng)Pom文件
pom文件中分為Spark依賴,與ali-phoenix-spark相關(guān)的依賴,由于涉及到ODPS的jar包,會(huì)在集群中引起jar沖突,所以要將ODPS的包排除掉
<?xml version="1.0" encoding="UTF-8"?> <!--Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License. See accompanying LICENSE file. --> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><properties><spark.version>2.3.0</spark.version><cupid.sdk.version>3.3.8-public</cupid.sdk.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><phoenix.version>4.12.0-HBase-1.1</phoenix.version></properties><groupId>com.aliyun.odps</groupId><artifactId>Spark-Phonix</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><dependencies><dependency><groupId>org.jpmml</groupId><artifactId>pmml-model</artifactId><version>1.3.8</version></dependency><dependency><groupId>org.jpmml</groupId><artifactId>pmml-evaluator</artifactId><version>1.3.10</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope><exclusions><exclusion><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></exclusion><exclusion><groupId>org.scala-lang</groupId><artifactId>scalap</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>cupid-sdk</artifactId><version>${cupid.sdk.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.phoenix</groupId><artifactId>ali-phoenix-core</artifactId><version>4.12.0-AliHBase-1.1-0.8</version><exclusions><exclusion><groupId>com.aliyun.odps</groupId><artifactId>odps-sdk-mapred</artifactId></exclusion><exclusion><groupId>com.aliyun.odps</groupId><artifactId>odps-sdk-commons</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.aliyun.phoenix</groupId><artifactId>ali-phoenix-spark</artifactId><version>4.12.0-AliHBase-1.1-0.8</version><exclusions><exclusion><groupId>com.aliyun.phoenix</groupId><artifactId>ali-phoenix-core</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><minimizeJar>false</minimizeJar><shadedArtifactAttached>true</shadedArtifactAttached><artifactSet><includes><!-- Include here the dependencies youwant to be packed in your fat jar --><include>*:*</include></includes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude><exclude>**/log4j.properties</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource></transformer></transformers></configuration></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.3.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution><execution><id>scala-test-compile-first</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build> </project>四、打包上傳到DataWorks進(jìn)行冒煙測(cè)試
4.1創(chuàng)建要傳入的MaxCompute表
CREATE TABLE IF NOT EXISTS users_phonix (id INT ,username STRING,password STRING ) ;4.2打包上傳到MaxCompute
在IDEA打包要打成shaded包,將所有的依賴包,打入jar包中,由于DatadWork界面方式上傳jar包有50M的限制,因此采用MaxCompute客戶端進(jìn)行jar包
4.3選擇對(duì)應(yīng)的project環(huán)境,查看上傳資源,并點(diǎn)擊添加到數(shù)據(jù)開發(fā)
進(jìn)入DataWorks界面選擇左側(cè)資源圖標(biāo),選擇對(duì)應(yīng)的環(huán)境位開發(fā)換進(jìn),輸入刪除文件時(shí)的文件名稱進(jìn)行搜索,列表中展示該資源已經(jīng)上傳成,點(diǎn)擊提交到數(shù)據(jù)開發(fā)
點(diǎn)擊提交按鈕
4.4配置對(duì)應(yīng)的vpcList參數(shù)并提交任務(wù)測(cè)試
其中的配置vpcList文件的配置信息如下,可具體根據(jù)個(gè)人hbase的鏈接,進(jìn)行配置
{"regionId":"cn-beijing","vpcs":[{"vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk","zones":[{"urls":[{"domain":"172.16.0.12","port":2181},{"domain":"172.16.0.13","port":2181},{"domain":"172.16.0.15","port":2181},{"domain":"172.16.0.14","port":2181},{"domain":"172.16.0.12","port":16000},{"domain":"172.16.0.13","port":16000},{"domain":"172.16.0.15","port":16000},{"domain":"172.16.0.14","port":16000},{"domain":"172.16.0.12","port":16020},{"domain":"172.16.0.13","port":16020},{"domain":"172.16.0.15","port":16020},{"domain":"172.16.0.14","port":16020}]}]}] }Spark任務(wù)提交任務(wù)的配置參數(shù),主類,以及對(duì)應(yīng)的參數(shù)
該參數(shù)主要為3個(gè)參數(shù)第一個(gè)為Phonix的鏈接,第二個(gè)為Phonix的表名稱,第三個(gè)為傳入的MaxCompute表
點(diǎn)擊冒煙測(cè)試按鈕,可以看到任務(wù)執(zhí)行成功
在臨時(shí)查詢節(jié)點(diǎn)中執(zhí)行查詢語句,可以得到數(shù)據(jù)已經(jīng)寫入MaxCompute的表中
總結(jié):
使用Spark on MaxCompute訪問Phonix的數(shù)據(jù),并將數(shù)據(jù)寫入到MaxCompute的表中經(jīng)過實(shí)踐,該方案時(shí)可行的。但在實(shí)踐的時(shí)有幾點(diǎn)注意事項(xiàng):
1.結(jié)合實(shí)際使用情況選擇對(duì)應(yīng)的Hbase以及Phonix版本,對(duì)應(yīng)的版本一致,并且所使用的客戶端,以及代碼依賴都會(huì)有所改變。
2.使用公網(wǎng)在IEAD進(jìn)行本地測(cè)試,要注意Hbase白名單,不僅要設(shè)置DataWorks的白名單,還需將自己本地的地址加入到白名單中。
3.代碼打包時(shí)需要將pom中的依賴關(guān)系進(jìn)行梳理,避免ODPS所存在的包在對(duì)應(yīng)的依賴中,進(jìn)而引起jar包沖突,并且打包時(shí)打成shaded包,避免缺失遺漏對(duì)應(yīng)的依賴。
?
?
?
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Spark On MaxCompute如何访问Phonix数据的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用 Mars Remote API 轻松
- 下一篇: PyFlink + 区块链?揭秘行业领头