springboot1.5.10兼容高版本6.1.1elasticsearch
生活随笔
收集整理的這篇文章主要介紹了
springboot1.5.10兼容高版本6.1.1elasticsearch
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.引入依賴
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>transport-netty4-client</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.0</version></dependency>2.配置信息:
/*** 讀取client配置信息* @author **/ @Configuration @Getter @Setter public class ClientConfig {/** * elk集群地址 */ @Value("${elasticsearch.ip}")private String esHostName; /** * 端口 */ @Value("${elasticsearch.port}")private Integer esPort; /** * 集群名稱 */ @Value("${elasticsearch.cluster.name}")private String esClusterName; /** * 連接池 */ @Value("${elasticsearch.pool}")private Integer esPoolSize; /** * 是否服務啟動時重新創建索引*/ @Value("${elasticsearch.regenerateIndexEnabled}")private Boolean esRegenerateIndexFlag; /** * 是否服務啟動時索引數據同步*/ @Value("${elasticsearch.syncDataEnabled}")private Boolean esSyncDataEnabled; }3.es配置啟動類:
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.net.InetAddress;/*** es配置啟動類* @author**/ @Configuration public class ElasticsearchConfig {private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);@AutowiredClientConfig clientConfig;@Beanpublic TransportClient init() {LOGGER.info("初始化開始。。。。。"); TransportClient transportClient = null;try { /*** 配置信息 * client.transport.sniff 增加嗅探機制,找到ES集群 * thread_pool.search.size 增加線程池個數,暫時設為5 */Settings esSetting = Settings.builder().put("client.transport.sniff", true) .put("thread_pool.search.size", clientConfig.getEsPoolSize()).build(); //配置信息Settings自定義transportClient = new PreBuiltTransportClient(esSetting);TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(clientConfig.getEsHostName()), clientConfig.getEsPort());transportClient.addTransportAddresses(transportAddress); } catch (Exception e) { LOGGER.error("elasticsearch TransportClient create error!!!", e); } return transportClient; } }4.操作工具類:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import java.io.InputStream; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID;public class ElasticsearchUtils {private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtils.class);@Autowiredprivate TransportClient transportClient;private static TransportClient client;@PostConstructpublic void init() {client = this.transportClient;}/*** 創建索引以及設置其內容* @param index* @param indexType* @param filePath:json文件路徑*/public static void createIndex(String index,String indexType,String filePath) throws RuntimeException {try {StringBuffer strBuf = new StringBuffer();//解析json配置ClassPathResource resource = new ClassPathResource(filePath);InputStream inputStream = resource.getInputStream();int len = 0;byte[] buf = new byte[1024];while((len=inputStream.read(buf)) != -1) {strBuf.append(new String(buf, 0, len, "utf-8"));}inputStream.close();//創建索引 createIndex(index);//設置索引元素 putMapping(index, indexType, strBuf.toString());}catch(Exception e){throw new RuntimeException(e.getMessage());}}/*** 創建索引** @param index 索引名稱* @return*/public static boolean createIndex(String index){try {if (isIndexExist(index)) {//索引庫存在則刪除索引 deleteIndex(index);}CreateIndexResponse indexresponse = client.admin().indices().prepareCreate(index).setSettings(Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1)).get();LOGGER.info("創建索引 {} 執行狀態 {}", index , indexresponse.isAcknowledged());return indexresponse.isAcknowledged();}catch (Exception e) {throw new RuntimeException(e.getMessage());}}/*** 創建索引** @param index 索引名稱* @param indexType 索引類型* @param mapping 創建的mapping結構* @return*/public static boolean putMapping(String index,String indexType,String mapping) throws RuntimeException {if (!isIndexExist(index)) {throw new RuntimeException("創建索引庫"+index+"mapping"+mapping+"結構失敗,索引庫不存在!");}try {PutMappingResponse indexresponse = client.admin().indices().preparePutMapping(index).setType(indexType).setSource(mapping, XContentType.JSON).get();LOGGER.info("索引 {} 設置 mapping {} 執行狀態 {}", index ,indexType, indexresponse.isAcknowledged());return indexresponse.isAcknowledged();}catch (Exception e) {throw new RuntimeException(e.getMessage());}}/*** 判斷索引是否存在** @param index* @return*/public static boolean isIndexExist(String index) {IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet();return inExistsResponse.isExists();}/*** 刪除索引** @param index* @return*/public static boolean deleteIndex(String index) throws RuntimeException{if (!isIndexExist(index)) {return true;}try {DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();if (dResponse.isAcknowledged()) {LOGGER.info("delete index " + index + " successfully!");} else {LOGGER.info("Fail to delete index " + index);}return dResponse.isAcknowledged();} catch (Exception e) {throw new RuntimeException(e.getMessage());}}/*** 數據添加** @param jsonObject* 要增加的數據* @param index* 索引,類似數據庫* @param type* 類型,類似表* @return*/public static String addData(JSONObject jsonObject, String index, String type) {return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());}/*** 數據添加,正定ID** @param jsonObject* 要增加的數據* @param index* 索引,類似數據庫* @param type* 類型,類似表* @param id* 數據ID* @return*/public static String addData(JSONObject jsonObject, String index, String type, String id)throws RuntimeException {try {IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get();LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());return response.getId();} catch (Exception e) {throw new RuntimeException(e.getMessage());}}/*** 批量數據添加,** @param list* 要增加的數據* @param pkName* 主鍵id* @param index* 索引,類似數據庫* @param type* 類型,類似表* @return*/public static <T> void addBatchData(List<T> list, String pkName, String index, String type) {if(list == null || list.isEmpty()) {return;}// 創建BulkPorcessor對象BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {// TODO Auto-generated method stub }// 執行出錯時執行 @Overridepublic void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {// TODO Auto-generated method stub }@Overridepublic void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {// TODO Auto-generated method stub }})// 1w次請求執行一次bulk.setBulkActions(1000)// 1gb的數據刷新一次bulk// .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 固定5s必須刷新一次.setFlushInterval(TimeValue.timeValueSeconds(5))// 并發請求數量, 0不并發, 1并發允許執行.setConcurrentRequests(1)// 設置退避, 100ms后執行, 最大請求3次.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();for (T vo : list) {if(getPkValueByName(vo, pkName)!= null) {String id = getPkValueByName(vo, pkName).toString();bulkProcessor.add(new IndexRequest(index, type, id).source(JSON.toJSONString(vo), XContentType.JSON));}}bulkProcessor.close();}/*** 根據主鍵名稱獲取實體類主鍵屬性值** @param clazz* @param pkName* @return*/private static Object getPkValueByName(Object clazz, String pkName) {try {String firstLetter = pkName.substring(0, 1).toUpperCase();String getter = "get" + firstLetter + pkName.substring(1);Method method = clazz.getClass().getMethod(getter, new Class[] {});Object value = method.invoke(clazz, new Object[] {});return value;} catch (Exception e) {return null;}}/*** 通過ID 更新數據** @param jsonObject* 要增加的數據* @param index* 索引,類似數據庫* @param type* 類型,類似表* @param id* 數據ID* @return*/public static void updateDataById(JSONObject jsonObject, String index, String type, String id) throws RuntimeException {try{UpdateRequest updateRequest = new UpdateRequest();updateRequest.index(index).type(type).id(id).doc(jsonObject);client.update(updateRequest);} catch (Exception e) {throw new RuntimeException(e.getMessage());}}/*** 批量數據更新,** @param list* 要增加的數據* @param pkName* 主鍵id* @param index* 索引,類似數據庫* @param type* 類型,類似表* @return*/public static <T> void updateBatchData(List<T> list, String pkName, String index, String type) {// 創建BulkPorcessor對象BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {// TODO Auto-generated method stub }// 執行出錯時執行 @Overridepublic void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {// TODO Auto-generated method stub }@Overridepublic void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {// TODO Auto-generated method stub }})// 1w次請求執行一次bulk.setBulkActions(1000)// 1gb的數據刷新一次bulk// .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 固定5s必須刷新一次.setFlushInterval(TimeValue.timeValueSeconds(5))// 并發請求數量, 0不并發, 1并發允許執行.setConcurrentRequests(1)// 設置退避, 100ms后執行, 最大請求3次.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();for (T vo : list) {String id = getPkValueByName(vo, pkName).toString();bulkProcessor.add(new UpdateRequest(index, type, id).doc(JSON.toJSONString(vo), XContentType.JSON));}bulkProcessor.close();}/*** 通過ID獲取數據** @param index* 索引,類似數據庫* @param type* 類型,類似表* @param id* 數據ID* @param fields* 需要顯示的字段,逗號分隔(缺省為全部字段)* @return*/public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);if (StringUtils.isNotEmpty(fields)) {getRequestBuilder.setFetchSource(fields.split(","), null);}GetResponse getResponse = getRequestBuilder.execute().actionGet();return getResponse.getSource();}/*** 使用分詞查詢** @param index* 索引名稱* @param type* 類型名稱,可傳入多個type逗號分隔* @param clz* 數據對應實體類* @param fields* 需要顯示的字段,逗號分隔(缺省為全部字段)* @param boolQuery* 查詢條件* @return*/public static <T> List<T> searchListData(String index, String type, Class<T> clz, String fields,BoolQueryBuilder boolQuery) {return searchListData(index, type, clz, 0, fields, null, null,boolQuery);}/*** 使用分詞查詢** @param index* 索引名稱* @param type* 類型名稱,可傳入多個type逗號分隔* @param clz* 數據對應實體類* @param size* 文檔大小限制* @param fields* 需要顯示的字段,逗號分隔(缺省為全部字段)* @param sortField* 排序字段* @param highlightField* 高亮字段* @param boolQuery* 查詢條件* @return*/public static <T> List<T> searchListData(String index, String type, Class<T> clz,Integer size, String fields, String sortField, String highlightField,BoolQueryBuilder boolQuery) throws RuntimeException{SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);if (StringUtils.isNotEmpty(type)) {searchRequestBuilder.setTypes(type.split(","));}// 高亮(xxx=111,aaa=222)if (StringUtils.isNotEmpty(highlightField)) {HighlightBuilder highlightBuilder = new HighlightBuilder();// 設置高亮字段 highlightBuilder.field(highlightField);searchRequestBuilder.highlighter(highlightBuilder);}searchRequestBuilder.setQuery(boolQuery);if (StringUtils.isNotEmpty(fields)) {searchRequestBuilder.setFetchSource(fields.split(","), null);}searchRequestBuilder.setFetchSource(true);if (StringUtils.isNotEmpty(sortField)) {searchRequestBuilder.addSort(sortField, SortOrder.DESC);}if (size != null && size > 0) {searchRequestBuilder.setSize(size);}searchRequestBuilder.setScroll(new TimeValue(1000));searchRequestBuilder.setSize(10000);// 打印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢LOGGER.info("\n{}", searchRequestBuilder);SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();long totalHits = searchResponse.getHits().totalHits;if(LOGGER.isDebugEnabled()) {long length = searchResponse.getHits().getHits().length;LOGGER.info("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length);}if (searchResponse.status().getStatus() ==200) {// 解析對象return setSearchResponse(clz, searchResponse, highlightField);}return null;}/*** 高亮結果集 特殊處理** @param clz* 數據對應實體類* @param searchResponse** @param highlightField* 高亮字段*/private static <T> List<T> setSearchResponse(Class<T> clz, SearchResponse searchResponse, String highlightField) {List<T> sourceList = new ArrayList<T>();for (SearchHit searchHit : searchResponse.getHits().getHits()) {searchHit.getSourceAsMap().put("id", searchHit.getId());StringBuffer stringBuffer = new StringBuffer();if (StringUtils.isNotEmpty(highlightField)) {// System.out.println("遍歷 高亮結果集,覆蓋 正常結果集" + searchHit.getSourceAsMap());HighlightField highlight = searchHit.getHighlightFields().get(highlightField);if(highlight == null) {continue;}Text[] text = highlight.getFragments();if (text != null) {for (Text str : text) {stringBuffer.append(str.string());}// 遍歷 高亮結果集,覆蓋 正常結果集 searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());}}T t = JSON.parseObject(JSON.toJSONString(searchHit.getSourceAsMap()), clz);sourceList.add(t);}return sourceList;}}?
轉載于:https://www.cnblogs.com/tinyj/p/10014117.html
總結
以上是生活随笔為你收集整理的springboot1.5.10兼容高版本6.1.1elasticsearch的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于在pycharm下提示ModuleN
- 下一篇: matlab中文本文件与图像转化