Elasticsearch7.15.2 修改IK分词器源码实现基于MySql8的词库热更新
文章目錄
- 一、源碼分析
- 1. 默認熱更新
- 2. 熱更新分析
- 3. 方法分析
- 二、詞庫熱更新
- 2.1. 導入依賴
- 2.2. 數據庫
- 2.3. JDBC 配置
- 2.4. 打包配置
- 2.5. 權限策略
- 2.6. 修改 Dictionary
- 2.7. 熱更新類
- 2.8. 編譯打包
- 2.9. 上傳
- 2.10. 修改記錄
- 三、服務器操作
- 3.1. 分詞插件目錄
- 3.2. 解壓es
- 3.3. 移動文件
- 3.4. 目錄結構
- 3.5. 配置轉移
- 3.6. 重新啟動es
- 3.7. 測試分詞
- 3.8. 新增分詞
- 3.9. es控制臺監控
- 3.10. 重新查看分詞
- 3.11. 分詞數據
- 3.12. 修改后的源碼
一、源碼分析
1. 默認熱更新
官方提供的熱更新方式
https://github.com/medcl/elasticsearch-analysis-ik
2. 熱更新分析
上圖是官方提供的一種熱更新詞庫的方式,是基于遠程文件的,不太實用,但我們可以模仿這種方式自己實現一個基于 MySQL 的,官方提供的實現org.wltea.analyzer.dic.Monitor類中,以下是其完整代碼。
- 1.向詞庫服務器發送Head請求
- 2.從響應中獲取Last-Modify、ETags字段值,判斷是否變化
- 3.如果未變化,休眠1min,返回第①步
- 4.如果有變化,調用 Dictionary#reLoadMainDict()方法重新加載詞典
- 5.休眠1min,返回第①步
3. 方法分析
eLoadMainDict()會調用loadMainDict(),進而調用loadRemoteExtDict()加載了遠程自定義詞庫,同樣的調用loadStopWordDict()也會同時加載遠程停用詞庫。 reLoadMainDict()方法新創建了一個詞典實例來重新加載詞典,然后替換原來的詞典,是一個全量替換。
void reLoadMainDict() {logger.info("重新加載詞典...");// 新開一個實例加載詞典,減少加載過程對當前詞典使用的影響Dictionary tmpDict = new Dictionary(configuration);tmpDict.configuration = getSingleton().configuration;tmpDict.loadMainDict();tmpDict.loadStopWordDict();_MainDict = tmpDict._MainDict;_StopWords = tmpDict._StopWords;logger.info("重新加載詞典完畢..."); }/*** 加載主詞典及擴展詞典*/ private void () {// 建立一個主詞典實例_MainDict = new DictSegment((char) 0);// 讀取主詞典文件Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);loadDictFile(_MainDict, file, false, "Main Dict");// 加載擴展詞典this.loadExtDict();// 加載遠程自定義詞庫this.loadRemoteExtDict(); }loadRemoteExtDict()方法的邏輯也很清晰:
- 1.獲取遠程詞典的 URL,可能有多個
- 2.循環請求每個 URL,取回遠程詞典
- 3.將遠程詞典添加到主詞典中 _MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
這里需要重點關注的是 fillSegment()方法,它的作用是將一個詞加入詞典,與之相反的方法是disableSegment(),屏蔽詞典中的一個詞。
Monitor類只是一個監控程序,它是在org.wltea.analyzer.dic.Dictionary類的initial()方法被啟動的,以下代碼的 29~35 行。
... ... // 線程池 private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); ... .../*** 詞典初始化 由于IK Analyzer的詞典采用Dictionary類的靜態方法進行詞典初始化* 只有當Dictionary類被實際調用時,才會開始載入詞典, 這將延長首次分詞操作的時間 該方法提供了一個在應用加載階段就初始化字典的手段* * @return Dictionary*/ public static synchronized void initial(Configuration cfg) {if (singleton == null) {synchronized (Dictionary.class) {if (singleton == null) {singleton = new Dictionary(cfg);singleton.loadMainDict();singleton.loadSurnameDict();singleton.loadQuantifierDict();singleton.loadSuffixDict();singleton.loadPrepDict();singleton.loadStopWordDict();if(cfg.isEnableRemoteDict()){// 建立監控線程for (String location : singleton.getRemoteExtDictionarys()) {// 10 秒是初始延遲可以修改的 60是間隔時間 單位秒pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);}for (String location : singleton.getRemoteExtStopWordDictionarys()) {pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);}}}}} }二、詞庫熱更新
實現基于MySql的詞庫熱更新
2.1. 導入依賴
在項目根目錄的pom文件中修改es的版本,以及引入mysql8.0依賴
<properties><elasticsearch.version>7.15.2</elasticsearch.version></properties><!--mysql驅動--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency>默認是7.14.0-SNAPSHOT
調整版本為7.15.2
2.2. 數據庫
創建數據庫dianpingdb,初始化表結構
es_extra_main、es_extra_stopword分別為主詞典和停用詞典。
2.3. JDBC 配置
在項目的config文件夾下創建jdbc.properties 文件,記錄 MySQL 的 url、driver、username、password,和查詢主詞典、停用詞典的 SQL,以及熱更新的間隔秒數。從兩個 SQL 可以看出我的設計是增量更新,而不是官方的全量替換。
jdbc.properties內容
2.4. 打包配置
src/main/assemblies/plugin.xml
將 MySQL 驅動的依賴寫入,否則打成 zip 后會沒有 MySQL 驅動的 jar 包。
2.5. 權限策略
src/main/resources/plugin-security.policy
添加permission java.lang.RuntimePermission "setContextClassLoader";,否則會因為權限問題拋出以下異常。
不添加以上配置,拋出的異常信息:
java.lang.ExceptionInInitializerError: nullat java.lang.Class.forName0(Native Method) ~[?:1.8.0_261]at java.lang.Class.forName(Unknown Source) ~[?:1.8.0_261]at com.mysql.cj.jdbc.NonRegisteringDriver.<clinit>(NonRegisteringDriver.java:97) ~[?:?]at java.lang.Class.forName0(Native Method) ~[?:1.8.0_261]at java.lang.Class.forName(Unknown Source) ~[?:1.8.0_261]at org.wltea.analyzer.dic.DatabaseMonitor.lambda$new$0(DatabaseMonitor.java:72) ~[?:?]at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_261]at org.wltea.analyzer.dic.DatabaseMonitor.<init>(DatabaseMonitor.java:70) ~[?:?]at org.wltea.analyzer.dic.Dictionary.initial(Dictionary.java:172) ~[?:?]at org.wltea.analyzer.cfg.Configuration.<init>(Configuration.java:40) ~[?:?]at org.elasticsearch.index.analysis.IkTokenizerFactory.<init>(IkTokenizerFactory.java:15) ~[?:?]at org.elasticsearch.index.analysis.IkTokenizerFactory.getIkSmartTokenizerFactory(IkTokenizerFactory.java:23) ~[?:?]at org.elasticsearch.index.analysis.AnalysisRegistry.buildMapping(AnalysisRegistry.java:379) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.index.analysis.AnalysisRegistry.buildTokenizerFactories(AnalysisRegistry.java:189) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.index.analysis.AnalysisRegistry.build(AnalysisRegistry.java:163) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.index.IndexService.<init>(IndexService.java:164) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.index.IndexModule.newIndexService(IndexModule.java:402) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.indices.IndicesService.createIndexService(IndicesService.java:526) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.indices.IndicesService.verifyIndexMetadata(IndicesService.java:599) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.gateway.Gateway.performStateRecovery(Gateway.java:129) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.gateway.GatewayService$1.doRun(GatewayService.java:227) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:751) ~[elasticsearch-6.7.2.jar:6.7.2]at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-6.7.2.jar:6.7.2]at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:1.8.0_261]at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:1.8.0_261]at java.lang.Thread.run(Unknown Source) [?:1.8.0_261] Caused by: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "setContextClassLoader")at java.security.AccessControlContext.checkPermission(Unknown Source) ~[?:1.8.0_261]at java.security.AccessController.checkPermission(Unknown Source) ~[?:1.8.0_261]at java.lang.SecurityManager.checkPermission(Unknown Source) ~[?:1.8.0_261]at java.lang.Thread.setContextClassLoader(Unknown Source) ~[?:1.8.0_261]at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.lambda$static$0(AbandonedConnectionCleanupThread.java:72) ~[?:?]at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(Unknown Source) ~[?:1.8.0_261]at java.util.concurrent.ThreadPoolExecutor.addWorker(Unknown Source) ~[?:1.8.0_261]at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) ~[?:1.8.0_261]at java.util.concurrent.Executors$DelegatedExecutorService.execute(Unknown Source) ~[?:1.8.0_261]at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.<clinit>(AbandonedConnectionCleanupThread.java:75) ~[?:?]... 26 more2.6. 修改 Dictionary
- 1.在構造方法中加載 jdbc.properties 文件
- 2.將 getProperty()改為 public
- 3.添加了幾個方法,用于增刪詞條
在類的最后添加以下幾個方法
- 4.initial()啟動自己實現的數據庫監控線程
搜索initial(Configuration cfg)方法
2.7. 熱更新類
MySQL 熱更新的實現類 DatabaseMonitor
- 1.lastUpdateTimeOfMainDic、lastUpdateTimeOfStopword 記錄上次處理的最后一條的updateTime
- 2.查出上次處理之后新增或刪除的記錄
- 3.循環判斷 is_deleted 字段,為true則添加詞條,false則刪除詞條
在org.wltea.analyzer.dic包下創建DatabaseMonitor類
package org.wltea.analyzer.dic;import org.apache.logging.log4j.Logger; import org.elasticsearch.SpecialPermission; import org.wltea.analyzer.help.ESPluginLoggerFactory;import java.security.AccessController; import java.security.PrivilegedAction; import java.sql.*; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime;/*** 通過 mysql 更新詞典** @author gblfy* @date 2021-11-21* @WebSite gblfy.com*/ public class DatabaseMonitor implements Runnable {private static final Logger logger = ESPluginLoggerFactory.getLogger(DatabaseMonitor.class.getName());public static final String PATH_JDBC_PROPERTIES = "jdbc.properties";private static final String JDBC_URL = "jdbc.url";private static final String JDBC_USERNAME = "jdbc.username";private static final String JDBC_PASSWORD = "jdbc.password";private static final String JDBC_DRIVER = "jdbc.driver";private static final String SQL_UPDATE_MAIN_DIC = "jdbc.update.main.dic.sql";private static final String SQL_UPDATE_STOPWORD = "jdbc.update.stopword.sql";/*** 更新間隔*/public final static String JDBC_UPDATE_INTERVAL = "jdbc.update.interval";private static final Timestamp DEFAULT_LAST_UPDATE = Timestamp.valueOf(LocalDateTime.of(LocalDate.of(2020, 1, 1), LocalTime.MIN));private static Timestamp lastUpdateTimeOfMainDic = null;private static Timestamp lastUpdateTimeOfStopword = null;public String getUrl() {return Dictionary.getSingleton().getProperty(JDBC_URL);}public String getUsername() {return Dictionary.getSingleton().getProperty(JDBC_USERNAME);}public String getPassword() {return Dictionary.getSingleton().getProperty(JDBC_PASSWORD);}public String getDriver() {return Dictionary.getSingleton().getProperty(JDBC_DRIVER);}public String getUpdateMainDicSql() {return Dictionary.getSingleton().getProperty(SQL_UPDATE_MAIN_DIC);}public String getUpdateStopwordSql() {return Dictionary.getSingleton().getProperty(SQL_UPDATE_STOPWORD);}/*** 加載MySQL驅動*/public DatabaseMonitor() {SpecialPermission.check();AccessController.doPrivileged((PrivilegedAction<Void>) () -> {try {Class.forName(getDriver());} catch (ClassNotFoundException e) {logger.error("mysql jdbc driver not found", e);}return null;});}@Overridepublic void run() {SpecialPermission.check();AccessController.doPrivileged((PrivilegedAction<Void>) () -> {Connection conn = getConnection();// 更新主詞典updateMainDic(conn);// 更新停用詞updateStopword(conn);closeConnection(conn);return null;});}public Connection getConnection() {Connection connection = null;try {connection = DriverManager.getConnection(getUrl(), getUsername(), getPassword());} catch (SQLException e) {logger.error("failed to get connection", e);}return connection;}public void closeConnection(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {logger.error("failed to close Connection", e);}}}public void closeRsAndPs(ResultSet rs, PreparedStatement ps) {if (rs != null) {try {rs.close();} catch (SQLException e) {logger.error("failed to close ResultSet", e);}}if (ps != null) {try {ps.close();} catch (SQLException e) {logger.error("failed to close PreparedStatement", e);}}}/*** 主詞典*/public synchronized void updateMainDic(Connection conn) {logger.info("start update main dic");int numberOfAddWords = 0;int numberOfDisableWords = 0;PreparedStatement ps = null;ResultSet rs = null;try {String sql = getUpdateMainDicSql();Timestamp param = lastUpdateTimeOfMainDic == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfMainDic;logger.info("param: " + param);ps = conn.prepareStatement(sql);ps.setTimestamp(1, param);rs = ps.executeQuery();while (rs.next()) {String word = rs.getString("word");word = word.trim();if (word.isEmpty()) {continue;}lastUpdateTimeOfMainDic = rs.getTimestamp("update_time");if (rs.getBoolean("is_deleted")) {logger.info("[main dic] disable word: {}", word);// 刪除Dictionary.disableWord(word);numberOfDisableWords++;} else {logger.info("[main dic] add word: {}", word);// 添加Dictionary.addWord(word);numberOfAddWords++;}}logger.info("end update main dic -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);} catch (SQLException e) {logger.error("failed to update main_dic", e);// 關閉 ResultSet、PreparedStatementcloseRsAndPs(rs, ps);}}/*** 停用詞*/public synchronized void updateStopword(Connection conn) {logger.info("start update stopword");int numberOfAddWords = 0;int numberOfDisableWords = 0;PreparedStatement ps = null;ResultSet rs = null;try {String sql = getUpdateStopwordSql();Timestamp param = lastUpdateTimeOfStopword == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfStopword;logger.info("param: " + param);ps = conn.prepareStatement(sql);ps.setTimestamp(1, param);rs = ps.executeQuery();while (rs.next()) {String word = rs.getString("word");word = word.trim();if (word.isEmpty()) {continue;}lastUpdateTimeOfStopword = rs.getTimestamp("update_time");if (rs.getBoolean("is_deleted")) {logger.info("[stopword] disable word: {}", word);// 刪除Dictionary.disableStopword(word);numberOfDisableWords++;} else {logger.info("[stopword] add word: {}", word);// 添加Dictionary.addStopword(word);numberOfAddWords++;}}logger.info("end update stopword -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);} catch (SQLException e) {logger.error("failed to update main_dic", e);} finally {// 關閉 ResultSet、PreparedStatementcloseRsAndPs(rs, ps);}} }2.8. 編譯打包
直接mvn clean package,然后在 elasticsearch-analysis-ik/target/releases目錄中找到 elasticsearch-analysis-ik-7.15.2.zip 壓縮包,上傳到plugins目錄下面(我的目錄是/app/elasticsearch-7.15.2/plugins)
2.9. 上傳
2.10. 修改記錄
三、服務器操作
3.1. 分詞插件目錄
新建analysis-ik文件夾
cd /app/elasticsearch-7.15.2/plugins/ mkdir analysis-ik3.2. 解壓es
unzip elasticsearch-analysis-ik-7.15.2.zip3.3. 移動文件
將解壓后的文件都移動到 analysis-ik文件夾下面
mv *.jar plugin-* config/ analysis-ik3.4. 目錄結構
3.5. 配置轉移
將jdbc復制到指定目錄
啟動時會加載/app/elasticsearch-7.15.2/config/analysis-ik/jdbc.properties
cd /app/elasticsearch-7.15.2/plugins/ cp analysis-ik/config/jdbc.properties /app/elasticsearch-7.15.2/config/analysis-ik/3.6. 重新啟動es
cd /app/elasticsearch-7.15.2/ bin/elasticsearch -d && tail -f logs/dianping.log3.7. 測試分詞
沒有添加任何自定義分詞的情況下,提前測試看效果
# 查閱凱悅分詞 GET /shop/_analyze {"analyzer": "ik_smart","text": "我叫凱悅" }GET /shop/_analyze {"analyzer": "ik_max_word","text": "我叫凱悅" }搜索結果:把我叫凱悅分詞成了單字組合形式
{"tokens" : [{"token" : "我","start_offset" : 0,"end_offset" : 1,"type" : "CN_CHAR","position" : 0},{"token" : "叫","start_offset" : 1,"end_offset" : 2,"type" : "CN_CHAR","position" : 1},{"token" : "凱","start_offset" : 2,"end_offset" : 3,"type" : "CN_CHAR","position" : 2},{"token" : "悅","start_offset" : 3,"end_offset" : 4,"type" : "CN_CHAR","position" : 3}] }3.8. 新增分詞
在是數據庫中的es_extra_main表中添加自定義分析“我叫凱瑞” ,提交事務
3.9. es控制臺監控
從下面截圖中更可以看出,已經加載到咱么剛才添加的自定義“我叫凱瑞”分詞了
3.10. 重新查看分詞
# 查閱凱悅分詞 GET /shop/_analyze {"analyzer": "ik_smart","text": "我叫凱悅" }GET /shop/_analyze {"analyzer": "ik_max_word","text": "我叫凱悅" }3.11. 分詞數據
從截圖中可以看出,把 “我叫凱瑞”作為一個整體的分詞了
3.12. 修改后的源碼
https://gitee.com/gb_90/elasticsearch-analysis-ik
總結
以上是生活随笔為你收集整理的Elasticsearch7.15.2 修改IK分词器源码实现基于MySql8的词库热更新的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: KAFKA SpringBoot2 Na
- 下一篇: 使用dubbo后尽量不用要@Refere