Elasticsearch java api(五) Bulk批量索引
生活随笔
收集整理的這篇文章主要介紹了
Elasticsearch java api(五) Bulk批量索引
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、Bulk API
使用bulk命令時,REST API以_bulk結尾,批量操作寫在json文件中,官網給出的語法格式:
action_and_meta_data\n optional_source\n action_and_meta_data\n optional_source\n .... action_and_meta_data\n optional_source\n也就是說每一個操作都有2行數據組成,末尾要回車換行。第一行用來說明操作命令和原數據、第二行是自定義的選項.舉個例子,同時執行插入2條數據、刪除一條數據, 新建bulkdata.json,寫入如下內容:
{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }} { "title":"title1","posttime":"2016-07-02","content":"內容一" }{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }} { "title":"title2","posttime":"2016-07-03","content":"內容2" }{ "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}執行:
$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json {"took" : 11,"errors" : false,"items" : [ {"create" : {"_index" : "blog","_type" : "article","_id" : "13","_version" : 1,"_shards" : {"total" : 1,"successful" : 1,"failed" : 0},"status" : 201}} ] }注意:行末要回車換行,不然會因為命令不能識別而出現錯誤.
$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json {"error" : {"root_cause" : [ {"type" : "action_request_validation_exception","reason" : "Validation Failed: 1: no requests added;"} ],"type" : "action_request_validation_exception","reason" : "Validation Failed: 1: no requests added;"},"status" : 400 }二、批量導出
下面的例子是把索引庫中的文檔以json格式批量導出到文件中,其中集群名稱為”bropen”,索引庫名為”blog”,type為”article”,項目根目錄下新建files/bulk.txt,索引內容寫入bulk.txt中:
import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException;import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHits;public class ElasticSearchBulkOut {public static void main(String[] args) {try {Settings settings = Settings.settingsBuilder().put("cluster.name", "bropen").build();// cluster.name在elasticsearch.ymlClient client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));QueryBuilder qb = QueryBuilders.matchAllQuery();SearchResponse response = client.prepareSearch("blog").setTypes("article").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();SearchHits resultHits = response.getHits();File article = new File("files/bulk.txt");FileWriter fw = new FileWriter(article);BufferedWriter bfw = new BufferedWriter(fw);if (resultHits.getHits().length == 0) {System.out.println("查到0條數據!");} else {for (int i = 0; i < resultHits.getHits().length; i++) {String jsonStr = resultHits.getHits()[i].getSourceAsString();System.out.println(jsonStr);bfw.write(jsonStr);bfw.write("\n");}}bfw.close();fw.close();} catch (UnknownHostException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}}三、批量導入
從剛才導出的bulk.txt文件中按行讀取,然后bulk導入。首先通過調用client.prepareBulk()實例化一個BulkRequestBuilder對象,調用BulkRequestBuilder對象的add方法添加數據。實現代碼:
import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException;import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress;public class ElasticSearchBulkIn {public static void main(String[] args) {try {Settings settings = Settings.settingsBuilder().put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置Client client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));File article = new File("files/bulk.txt");FileReader fr=new FileReader(article);BufferedReader bfr=new BufferedReader(fr);String line=null;BulkRequestBuilder bulkRequest=client.prepareBulk();int count=0;while((line=bfr.readLine())!=null){bulkRequest.add(client.prepareIndex("test","article").setSource(line));if (count%10==0) {bulkRequest.execute().actionGet();}count++;//System.out.println(line);}bulkRequest.execute().actionGet();bfr.close();fr.close();} catch (UnknownHostException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}}批量導入:
Settings settings = Settings.settingsBuilder().put("cluster.name", "es1").build(); TransportClient client = TransportClient.builder().settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("bigdata2"), 9300));Long count = 100000L; String index = "bigdata"; String type = "student1"; BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < count; i++) {Map<String, Object> ret = new HashMap<String, Object>();ret.put("recordtime", 11);ret.put("area", 22);ret.put("usertype", 33);ret.put("count", 44);bulkRequest.add(client.prepareIndex(index, type).setSource(ret));// 每10000條提交一次if (i % 10000 == 0) {bulkRequest.execute().actionGet();} }參考文檔:
總結
以上是生活随笔為你收集整理的Elasticsearch java api(五) Bulk批量索引的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ElasticSearch Java A
- 下一篇: ElasticSearch5.3插件开发