亿级别记录的mongodb批量导入Es的java代码完整实现
生活随笔
收集整理的這篇文章主要介紹了
亿级别记录的mongodb批量导入Es的java代码完整实现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
針對mongodb億級別或者十億級別的模糊查詢,效率不高,解決方式是使用Es查詢,這樣就需要把數據導入的ES中
完整的代碼實現如下所示:(僅供參考)
import java.io.IOException; import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map;import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpHost; import org.bson.types.ObjectId; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType;import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException;public class Test {public static void main(String[] args) throws IOException {int pageSize=10000;try {MongoClient mongo = new MongoClient("localhost", 27017);/**** Get database ****/// if database doesn't exists, MongoDB will create it for youDB db = mongo.getDB("www");/**** Get collection / table from 'testdb' ****/// if collection doesn't exists, MongoDB will create it for youDBCollection table = db.getCollection("person");RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));DBCursor dbObjects; Long cnt=table.count();System.out.println(table.getStats().toString());Long page=getPageSize(cnt,pageSize);ObjectId lastIdObject=null; Long start=System.currentTimeMillis();long ss=start;for(Long i=0L;i<page;i++) {start=System.currentTimeMillis();dbObjects=getCursorForCollection(table, lastIdObject, pageSize);System.out.println("第"+(i+1)+"次查詢,耗時:"+(System.currentTimeMillis()-start)+" 毫秒");List<DBObject> objs=dbObjects.toArray();start=System.currentTimeMillis();batchInsertToEsSync(client,objs,"person","doc");lastIdObject=(ObjectId) objs.get(objs.size()-1).get("_id");System.out.println("第"+(i+1)+"次插入,耗時:"+(System.currentTimeMillis()-start)+" 毫秒"); } System.out.println("耗時:"+(System.currentTimeMillis()-ss)/1000+"秒"); } catch (UnknownHostException e) {e.printStackTrace();} catch (MongoException e) {e.printStackTrace();}}public static void batchInsertToEsSync(RestHighLevelClient client,List<DBObject> objs,String tableName,String type) throws IOException {BulkRequest bulkRequest=new BulkRequest();for(DBObject obj:objs) {IndexRequest req = new IndexRequest(tableName, type); Map<String,Object> map=new HashMap<>();for(String key:obj.keySet()) {if("_id".equalsIgnoreCase(key)) {map.put("id", obj.get(key));}else {String valStr="";Object val=obj.get(key);if(val!=null) {valStr=Base64.encodeBase64String(val.toString().getBytes());}map.put(key, valStr);}}req.id(map.get("id").toString());req.source(map, XContentType.JSON);bulkRequest.add(req);} BulkResponse bulkResponse=client.bulk(bulkRequest);for (BulkItemResponse bulkItemResponse : bulkResponse) {if (bulkItemResponse.isFailed()) { System.out.println(bulkItemResponse.getId()+","+bulkItemResponse.getFailureMessage());}}}public static DBCursor getCursorForCollection(DBCollection collection,ObjectId lastIdObject,int pageSize) {DBCursor dbObjects=null;if(lastIdObject==null) {lastIdObject=(ObjectId) collection.findOne().get("_id");}BasicDBObject query=new BasicDBObject();query.append("_id",new BasicDBObject("$gt",lastIdObject));BasicDBObject sort=new BasicDBObject();sort.append("_id",1);dbObjects=collection.find(query).limit(pageSize).sort(sort);return dbObjects;}public static Long getPageSize(Long cnt,int pageSize) {return cnt%pageSize==0?cnt/pageSize:cnt/pageSize+1;}?
轉載于:https://www.cnblogs.com/davidwang456/p/9909422.html
總結
以上是生活随笔為你收集整理的亿级别记录的mongodb批量导入Es的java代码完整实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Elasticsearch使用BulkP
- 下一篇: es索引的RestHighLevelCl