Nacos深入浅出(三)
生活随笔
收集整理的這篇文章主要介紹了
Nacos深入浅出(三)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
?
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));跟著代碼一步步往下看:
public class EventDispatcher {/*** add event listener*/static public void addEventListener(AbstractEventListener listener) {for (Class<? extends Event> type : listener.interest()) {getEntry(type).listeners.addIfAbsent(listener);}}/*** fire event, notify listeners.*/static public void fireEvent(Event event) {if (null == event) {throw new IllegalArgumentException();}for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {try {listener.onEvent(event);} catch (Exception e) {log.error(e.toString(), e);}}}這個getEntry(event.getClass()).listeners需要重點看下
?
/*** get event listener for eventType. Add Entry if not exist.*/static Entry getEntry(Class<? extends Event> eventType) {for (; ; ) {for (Entry entry : LISTENER_HUB) {if (entry.eventType == eventType) {return entry;}}Entry tmp = new Entry(eventType);/*** false means already exists*/if (LISTENER_HUB.addIfAbsent(tmp)) {return tmp;}}}?
static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();這個LISTENER_HUB 會預先把幾個相應的Entry 加載進去(這個我們后面分析),然后就開始onEvent,通知Listeners了;
?
@Service public class AsyncNotifyService extends AbstractEventListener {@Overridepublic List<Class<? extends Event>> interest() {List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>();// 觸發(fā)配置變更同步通知types.add(ConfigDataChangeEvent.class);return types;}@Overridepublic void onEvent(Event event) {// 并發(fā)產(chǎn)生 ConfigDataChangeEventif (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;// listen的服務地址List<?> ipList = serverListService.getServerList();// 其實這里任何類型隊列都可以 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();for (int i = 0; i < ipList.size(); i++) { queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta));} EXCUTOR.execute(new AsyncTask(httpclient, queue));}}
把這個配置關聯(lián)的服務一個個通知下,建議大家可以結合觀察者模式來理解這個;
private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());然后把這個通知服務扔到縣線程池,放到另外一個線程中去執(zhí)行通知任務;
?
class AsyncTask implements Runnable {public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {this.httpclient = httpclient;this.queue = queue;}@Overridepublic void run() { executeAsyncInvoke(); }private void executeAsyncInvoke() {while (!queue.isEmpty()) {NotifySingleTask task = queue.poll();String targetIp = task.getTargetIP();if (serverListService.getServerList().contains(targetIp)) {// 啟動健康檢查且有不監(jiān)控的ip則直接把放到通知隊列,否則通知if (serverListService.isHealthCheck()&& ServerListService.getServerListUnhealth().contains(targetIp)) {// target ip 不健康,則放入通知列表中ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(),LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);// get delay time and set fail count to the taskint delay = getDelayTime(task);Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();queue.add(task);AsyncTask asyncTask = new AsyncTask(httpclient, queue);((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);} else {HttpGet request = new HttpGet(task.url);request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,String.valueOf(task.getLastModified()));request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);if (task.isBeta) {request.setHeader("isBeta", "true");}httpclient.execute(request, new AyscNotifyCallBack(httpclient, task));}}}}private Queue<NotifySingleTask> queue;private CloseableHttpAsyncClient httpclient;}
?
上面這段代碼明天再慢慢解析吧,加油!
轉載于:https://www.cnblogs.com/longxok/p/11011722.html
總結
以上是生活随笔為你收集整理的Nacos深入浅出(三)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux 安装json神器 jq
- 下一篇: JAVA 23种开发模式详解(代码举例)