Elasticsearch和Hive整合,将hive数据同步到ES中
1 Elasticsearch整合Hive
1.1 軟件環境
Hadoop軟件環境
Hive軟件環境
ES軟件環境
1.2 ES-Hadoop介紹
1.2.1 官網
https://www.elastic.co/cn/products/hadoop
1.2.2 對 Hadoop 數據進行交互分析
Hadoop 是出色的批量處理系統,但是要想提供實時結果則頗具挑戰。為了實現真正的交互式數據探索,您可以使用 ES-Hadoop 將 Hadoop 數據索引到 Elastic Stack,以充分利用快速的 Elasticsearch 引擎和Kibana精美的可視化效果。
有了 ES-Hadoop,您可以輕松構建動態的嵌入式搜索應用來處理您的 Hadoop 數據,或者使用全文本、空間地理查詢和聚合,執行深度的低延時分析。從產品推薦到基因組測序,ES-Hadoop 開啟了廣泛而全新的應用領域。
1.2.3 讓數據在 Elasticsearch 和 Hadoop 之間無縫移動
只有實現了數據的快速移動,才能讓實時決策成為可能。憑借現有Hadoop API的動態擴展,ES-Hadoop讓您能夠在Elasticsearch和Hadoop之間輕松地雙向移動數據,同時借助HDFS作為存儲庫,進行長期存檔。分區感知、故障處理、類型轉換和數據共享均可透明地完成。
1.2.4 本地對接Spark及其衍生技術
ES-Hadoop 完全支持 Spark、Spark Streaming 和 SparkSQL。此外,無論您使用 Hive、Pig、Storm、Cascading,還是標準 MapReduce,ES-Hadoop 都將提供本地對接,供您向 Elasticsearch 索引數據并從 Elasticsearch 查詢數據。無論您用哪種技術,Elasticsearch 的所有功能任您支配。
1.2.5 隨時隨地確保數據安全
ES-Hadoop 擁有您需要的所有安全功能,包括 HTTP 身份驗證和對 SSL/TLS 的支持。此外,它還能與支持 Kerberos 的 Hadoop 部署一起使用。
1.3 安裝
1.3.1 常規安裝
獲取Elasticsearch-hadoop二進制文件可以通過從http://elastic.co/下載一個zip包(這個zip包中包含jars,sources,和documention),或者通過添加依賴文件:
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId><version>7.1.1</version> </dependency>上面的這個jar包包含所有的Elasticsearch-Hadoop的特性,在運行時期間不需要任何其它的依賴。換句話說,它可以原樣使用。
Elasticsearch-hadoop二進制適用于Hadoop 2.x(又叫做yarn)環境,在5.5版本之后支持hadoop 1.x版本環境的將會過時,在6.0之后將不會再進行測試。
1.3.2 最小版二進制包
Elasticsearch-hadoop提供最小版本的用于每個集成的jar包,1.3.2.1 Map/Reduce集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop-mr</artifactId> <version>7.1.1</version> </dependency>1.3.2.2 Hive集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop-hive</artifactId> <version>7.1.1</version> </dependency>1.3.2.3 Pig集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop-pig</artifactId> <version>7.1.1</version> </dependency>1.3.2.4 Spark集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.10</artifactId> <version>7.1.1</version> </dependency>注意:spark artifact。注意后綴中的這個-20表示spark的兼容版本。Spark 2.0+使用20,Spark 1.3 ~1.6使用13.
要注意的是,_2.10后綴表示scala的兼容版本。
以下是Spark version和ES-Hadoop Artifact ID的對應版本。
1.3.2.5 Strom集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-storm</artifactId> <version>7.1.1</version> </dependency>1.3.3 配置
Elasticsearch-Hadoop的行為可以通過下面的屬性來定制。
1.3.3.1 Required settings
es.resource
Elasticsearch資源的位置,數據讀取和寫入的位置。需要的格式是: /。
es.resource.read(默認為es.resource)
Elasticsearch讀取的數據資源(不是寫)。在使用相同的job將數據讀或寫到不同的Elasticsearch的indices的時候將會很有用。通常設置成自動(除了Map/Reduce模塊需要手動配置)。格式也是/,如artists/_doc。
支持多個index,如artists,bank/_doc,表示從artists和bank索引的_doc/讀取數據。artists,bank/,表示從artists和bank索引中讀取數據,type任意。_all/_doc表示從所有的_doc讀取數據。
add jar elasticsearch-hadoop-6.1.2.jar; add jar json-udf-1.3.8-jar-with-dependencies.jar; add jar json-serde-1.3.8-jar-with-dependencies.jar;CREATE TABLE x (`es_metadata` string,`nested1` struct<item1:string, item2:int>,`nested2` struct<iterm3:double, iterm4:string>) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.output.json' = 'true', 'es.resource.read' = 'netsed/test', 'es.nodes'='${nodes}', 'es.read.metadata' = 'true', 'es.read.metadata.field' = 'es_metadata', 'es.field.read.empty.as.null'='false', 'es.mapping.names' = 'nested2:Nested2,nested1:nested1' );es.resource.write(默認為es.resource)
Elasticsearch用于寫入的資源(而不是讀),通常用于動態資源寫入,或在同一作業中將數據寫入和讀取到不同的Elasticsearch索引時使用.通常設置成自動(除了Map/Reduce模塊需要手動配置)。
1.3.3.2 Dynamic/multi resource writes
對于編寫,Elasticsearch -hadoop允許在運行時使用模式(通過使用{}格式)解析目標資源,并根據流到Elasticsearch的數據在運行時解析。也就是說,可以基于從要保存的文檔解析的一個或多個字段將文檔保存到某個索引或類型。
例如,假設有下面的文檔集合 {"media_type":"game","title":"Final Fantasy VI","year":"1994" }, {"media_type":"book","title":"Harry Potter","year":"2010" }, {"media_type":"music","title":"Surfing With The Alien","year":"1987" }要根據它們的media_type為每個類建立索引,可以使用一下模式:
# 根據文檔的類型來索引它們 es.resource.write = my-collection/{media_type} 通過上面的配置,將導致”Final Fantasy VI”在my-collection/game中,Harry Potter在my-collection/book,”Surfing With The Alien”在my-collection/music。想了解更多的信息,可以參考專門的dedicated集成章節。1.3.3.3 Formatting dynamic/multi resource writes
當使用dynamic/multi寫時,還可以指定字段返回值的格式。hadoop提供了日期/時間戳字段的開箱即用格式,這對于在相同索引下的特定時間范圍內自動分組基于時間的數據(例如日志)非常有用。通過使用Java SimpleDataFormat語法,可以以一種對語言環境敏感的方式格式化和解析日期。 例如,假設數據包含@timestamp字段,可以使用以下配置將文檔分組到每日索引中:
@timestamp field formatting - in this case yyyy.MM.dd
@timestamp字段格式,在本例中是yyyy.MM.dd格式。
同樣是使用這個相同的配置(es.resource.write),然而,通過特殊的|字符指定格式化模式。請參考SimpleDateFormat的javadocs獲取更多的關于這個的語法支持。在這種情況下,yyyy.MM.dd將日期轉換為年份(由四位數字指定)、月份(由兩位數字指定)和天(如2015.01.28)。
1.3.3.4 Essential Settings
網絡相關
es.nodes(默認localhost)
列出要連接的Elasticsearch節點。當遠程使用Elasticsearch的時候,請設置這個參數。要注意的是這個列表中不一定非要包含Elasticsearch集群中的每個節點;默認情況下,這些是由elasticsearch-hadoop自動發現的(參見下面)。每個節點還可以單獨指定其HTTP/REST端口(例如:mynode:9600)。
es.port(默認9200)
用于連接到Elasticsearch的默認的HTTP/REST端口。這個設置用于在es.nodes中沒有指定端口的情況下使用。
es.nodes.path.prefix(默認空)
前綴,以添加到向Elasticsearch發出的所有請求中。適用于集群在特定路徑下代理/路由的環境。例如,如果es集群位于someaddress:someport/custom/path/prefix 下,可以設置es.nodes.path.prefix 為 /custom/path/prefix。
1.3.3.5 Querying
es.query(默認為none)
保存從指定的es.resource中讀取的數據,默認情況下他不為設置,也不為空。意味著在指定的index/type下的整個數據都被返回。es.query可以有三個來源:
uri query
使用?uri_query的這種格式,可以設置一個query string。要注意的是這個前導’?’。
query dsl
使用query_dsl格式,注意這個query dsl前綴需要以{開始,以}結束。
external resource
如果上面兩個都不匹配,elasticsearch-hadoop將嘗試將該參數解釋為HDFS文件系統中的路徑。如果不是這樣,它將嘗試從類路徑加載資源,如果失敗,則嘗試從Hadoop DistributedCache加載資源。資源應該包含uri查詢或查詢dsl。
下面是示例:
# uri (or parameter) query es.query = ?q=costinl# query dsl es.query = { "query" : { "term" : { "user" : "costinl" } } }# external resource es.query = org/mypackage/myquery.json其它參數:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
1.3.4 Apache Hive integration
1.3.4.1 Installation
要確保elasticsearch-hadoop的jar能夠在Hive classpath能夠訪問到。取決于你的選擇,有很多中方式可以實現這一點。使用add命令添加jar文件,或者歸檔類路徑。
ADD JAR /path/elasticsearch-hadoop.jar;作為一個替代方案,也可以使用過下面的命令行:
CLI配置
或者在命令行中使用hive.aux.jars.path屬性、或者在hive-site.xml文件中,注冊額外的jar(它也接收URI):
$ bin/hive -hiveconf hive.aux.jars.path=/path/elasticsearch-hadoop.jar在hive-site.xml中也可以配置:
<property><name>hive.aux.jars.path</name><value>/path/elasticsearch-hadoop.jar</value><description>A comma separated list (with no spaces) of the jar files</description> </property>1.3.4.2 Configuration
當使用Hive,當聲明支持Elasticsearch的外部表的時候,可以使用TBLPROPERTIES指定這個配置屬性(作為Hadoop配置對象的可選配置),例如:
CREATE EXTERNAL TABLE artists (...) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'radio/artists','es.index.auto.create' = 'false');1.3.4.3 Mapping
默認情況下,elasticsearch-hadoop使用Hive的schema去映射在Elasticsearch中的數據。在這個過程中使用字段名稱和類型。但是,在某些情況下,Hive中的名稱不能與Elasticsearch一起使用(字段名可以包含Elasticsearch接受但Hive不接受的字符)。對于這種情況,可以使用es.mapping.names設置,接收以下的按照冒號分割的格式: Hive field name : Elasticsearch field name.
即:
CREATE EXTERNAL TABLE artists (...) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'radio/artists','es.mapping.names' = 'date:@timestamp, url:url_123'); Hive 列 date映射到 Elasticsearch 中的 @timestamp; Hive的列url映射到Elasticsearch為url_123注意:
1、Hive是大小寫不敏感的然而Elasticsearch不是。數據丟失了可能產生無效的查詢(因為在Hive中的列可能不能匹配Elasticsearch中的列)。為了避免這種問題,elasticsearch-hadoop將總是將Hive的列名稱都轉成小寫。這就是說,建議使用默認的Hive樣式,只對Hive命令使用大寫名稱,并避免混合大小寫名稱。
2、Hive通過一個特殊的NULL來對待丟失的值。這就意味著當運行一個不正確的查詢(不正確或者名稱不存在)時,Hive表將使用NULL填充,而不是拋出一個異常。確保驗證你的數據,密切關注你的schema, 否則由于這種寬松的行為,更新將不會被注意到。
1.3.4.4 Writing data to Elasticsearch
有了elasticsearch-hadoop,Elasticsearch可以僅僅通過一個外部表load和讀取數據:
CREATE EXTERNAL TABLE artists (id BIGINT,name STRING,links STRUCT<url:STRING, picture:STRING>) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' (1) TBLPROPERTIES('es.resource' = 'radio/artists'); (2)-- insert data to Elasticsearch from another table called 'source' INSERT OVERWRITE TABLE artistsSELECT NULL, s.name, named_struct('url', s.url, 'picture', s.picture)FROM source s; (1)Elasticsearch Hive StorageHandler (2)Elasticsearch resource (index and type) associated with the given storage當文檔中需要指定id(或者其他的metadata字段如ttl和timestamp)的時候,可以設置適當的mapping,也就是 es.mapping.id.緊跟上面的例子,指示Elasticsearch使用id作為文檔的id,更新表的屬性:
CREATE EXTERNAL TABLE artists (id BIGINT,...) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.mapping.id' = 'id'...);1.3.4.5 Writing existing JSON to Elasticsearch
對于job作業中輸入數據已經在JSON中的場景,elasticsearch-hadoop允許直接索引,而不需要應用任何轉換。數據直接按照原樣直接發送到Elasticsearch.在這種情況下,需要創建為這個json創建索引通過設置es.input.json參數。同樣地,elasticsearch-hadoop期望輸出表只包含一個字段,這個內容用于作為json文檔。就是說,這個library將識別指定的textual類型(例如:string 或 binary),或簡單地調用(toString)。
注意:
確保數據以UTF-8正確的編碼。字段內容被認為是發送到Elasticsearch的文檔的最終形式。
1.3.4.6 Writing to dynamic/multi-resources
可以使用模式將數據索引到不同的資源,具體取決于讀取的行, 回到前面提到的media例子,我們可以這樣配置它:
CREATE EXTERNAL TABLE media (name STRING,type STRING, (1)year STRING, STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'my-collection-{type}/doc'); (1)表的字段被用于resource pattern. 可以使用任何聲明的字段。 (2)資源的pattern 使用字段type對于將要編寫的每一行,elasticsearch-hadoop將提取type字段并使用其值確定目標資源。
在處理json數據的時候,同樣適用,既然這樣,這個值將從JSON文檔中提取。假設有以下的JSON資源包含的文檔結構如下:
{"media_type":"music", (1)"title":"Surfing With The Alien","year":"1987" } (1)將被用于pattern的json中的字段。表的聲明可以按照下面的方式聲明:
CREATE EXTERNAL TABLE json (data STRING) (1) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'my-collection-{media_type}/doc', 'es.input.json` = 'yes'); (2)1.3.4.7 Reading data from Elasticsearch
從ElasticSearch中讀取數據,類似如下:
CREATE EXTERNAL TABLE artists (id BIGINT,name STRING,links STRUCT<url:STRING, picture:STRING>) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' (1) TBLPROPERTIES('es.resource' = 'radio/artists', (2)'es.query' = '?q=me*'); (3)-- stream data from Elasticsearch SELECT * FROM artists;Type conversion
Hive為定義數據提供了各種類型,并根據目標環境(從JDK本機類型到二進制優化的類型)在內部使用不同的實現。Elasticsearch集成了所有這些,包括和Serde2 lazy和lazy binary:
注意:
盡管Elasticsearch在2.0版之前可以理解Hive類型,但它向后兼容Hive 1.0
1.4 案例
將hive中的數據同步到ES中。
drop table if exists testhadoop; CREATE EXTERNAL TABLE testhadoop (ID bigint,SUBJECT STRING,xxxxx) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.nodes'='ip:10200', 'es.resource'='xxx/xxxx', 'es.index.auto.create'='false', 'es.mapping.id'='ID', 'es.mapping.names'= 'ID:ID,SUBJECT:SUBJECT,xxxxx', 'es.nodes.wan.only'='true', 'es.batch.write.retry.count'='10', 'es.batch.write.refresh'='true', 'es.batch.write.retry.wait'='60s', 'es.http.timeout'='100m', 'es.batch.size.entries'='100');INSERT INTO TABLE testhadoop select 子句;總結
以上是生活随笔為你收集整理的Elasticsearch和Hive整合,将hive数据同步到ES中的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中行信用卡临时额度调整的方法
- 下一篇: 退休人员能贷款吗