logback AbstractLogstashTcpSocketAppender 源码解析
大家好,我是烤鴨:
今天分享下 logback 源碼 ,版本是 6.5-SNAPSHOT。
寫這篇的目的
由于最近項目中一直出現(xiàn)這個日志,而且基本每20秒就會打印一次,也沒法關(guān)掉,百度上資料也很少,只能自己來了。
10:04:01,393 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxxx.com:1111: Waiting 19999ms before attempting reconnection.
正常來說,這個提示就是簡單提示下,socket連接斷開,可能是網(wǎng)絡(luò)或者是服務(wù)端的原因,然后重連。比如下邊這個日志。
11:17:06,337 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[LOGSTASH] - Log destination xxx.com:1234: Waiting 27476ms before attempting reconnection. 11:17:13,302 |-WARN in net.logstash.logback.appender.LogstashAccessTcpSocketAppender[logstash] - Log destination xxx.com:1234: connection failed. java.net.ConnectException: Connection refused: connectat java.net.ConnectException: Connection refused: connectat at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)at at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)at at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)at at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)at at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)at at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)at at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)at at java.net.Socket.connect(Socket.java:606)at at net.logstash.logback.appender.AbstractLogstashTcpSocketAppender$TcpSendingEventHandler.openSocket(AbstractLogstashTcpSocketAppender.java:721)at at net.logstash.logback.appender.AbstractLogstashTcpSocketAppender$TcpSendingEventHandler.onStart(AbstractLogstashTcpSocketAppender.java:640)at at net.logstash.logback.appender.AsyncDisruptorAppender$EventClearingEventHandler.onStart(AsyncDisruptorAppender.java:351)at at com.xxx.arch.encoder.com.lmax.disruptor.BatchEventProcessor.notifyStart(BatchEventProcessor.java:224)at at com.xxx.arch.encoder.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:120)at at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at at java.util.concurrent.FutureTask.run(FutureTask.java:266)at at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at at java.lang.Thread.run(Thread.java:748) 11:17:13,303 |-WARN in net.logstash.logback.appender.LogstashAccessTcpSocketAppender[logstash] - Log destination xxx.com:1234: Waiting 27662ms before attempting reconnection.異常的日志,連接成功后,每10s斷開連接,然后過20s重試后連接成功,一直反復(fù),樂此不疲...
11:48:54,484 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: connection established. 11:49:04,524 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: Waiting 19949ms before attempting reconnection. 11:49:24,477 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: connection established. 11:49:34,478 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: Waiting 19995ms before attempting reconnection.源碼:AbstractLogstashTcpSocketAppender(嫌麻煩的直接看3)
由于 這個用到 com.lmax.disruptor 這個包,推薦看一下美團的這篇 https://tech.meituan.com/2016/11/18/disruptor.html
AbstractLogstashTcpSocketAppender ,一般是用于發(fā)送日志內(nèi)容,比如將日志內(nèi)容發(fā)送到 logstash/flume 等。
具體的配置可以參考下 https://www.cnblogs.com/zhyg/p/6994314.html
內(nèi)部類:
TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, LifecycleAware
負(fù)責(zé)執(zhí)行TCP傳輸?shù)氖录幚砥?#xff0c;這個類內(nèi)部還有3個線程內(nèi)部類 分別是
KeepAliveRunnable(用于和socet 保持連接)、ReaderCallable(接收socket的流信息)、WriteTimeoutRunnable(檢測寫入超時,如果超時了就關(guān)閉連接)
UnconnectedConfigurableSSLSocketFactory extends ConfigurableSSLSocketFactory (創(chuàng)建鏈接,使用自定義配置參數(shù))
TcpSendingEventHandler 重點看下這個類,處理tcp事務(wù)都處理些啥,方法如下:
onEvent ,對 EventHandler.onEvent 的實現(xiàn),有事件就去處理。代碼不長,而且注釋特別清晰。
接受到事件后循環(huán)5次,判斷socket的讀取流的線程是否結(jié)束或者socket是否為空,調(diào)用 reopenSocket 方法,否則調(diào)用下面的writeEvent。如果 readerFuture.isDone() 是服務(wù)器關(guān)閉了連接,如果是 socket為空,是寫入超時。
if (readerFuture.isDone() || socket == null) {/** If readerFuture.isDone(), then the destination has shut down its output (our input),* and the destination is probably no longer listening to its input (our output).* This will be the case for Amazon's Elastic Load Balancers (ELB)* when an instance behind the ELB becomes unhealthy while we're connected to it.** If socket == null here, it means that a write timed out,* and the socket was closed by the WriteTimeoutRunnable.* * Therefore, attempt reconnection.*/addInfo(peerId + "destination terminated the connection. Reconnecting.");reopenSocket();try {readerFuture.get();sendFailureException = NOT_CONNECTED_EXCEPTION;} catch (Exception e) {sendFailureException = e;}continue; }writeEvent,tcp 往服務(wù)器寫數(shù)據(jù)。由于keepalive 也會觸發(fā)事件,但是event 為null。所以這時候判斷是 keepalive還是其他事件。
其他事件的寫入還要 兼容 logbakck1.x版本,keepalive 寫入的話,寫入 換行符。還有個屬性 endOfBatch,如果是的話,會執(zhí)行 outputStream.flush()
onStart , 啟動方法。
初始化 destinationAttemptStartTimes 數(shù)組,目的是為了存放每個連接目標(biāo)最后嘗試連接的時間。openSocket(建立 socket連接),scheduleKeepAlive (定時線程 觸發(fā)keepAlive 事件),scheduleWriteTimeout(定時檢查寫超時的話,就關(guān)閉連接(這個在5.x是沒有的方法))
onShutdown 就不說了
reopenSocket 調(diào)了兩個方法,關(guān)閉連接,打開連接。
openSocket ,是被 synchronized 修飾的。方法注釋說的是反復(fù)打開socket,直到線程被打斷了。
/*** Repeatedly tries to open a socket until it is successful,* or the hander is stopped, or the handler thread is interrupted.** If the socket is non-null when this method returns,* then it should be able to be used to send.*/方法比較長,一點點看
while (isStarted() && !Thread.currentThread().isInterrupted()) {// 獲取下一個連接的index,多個鏈接地址的時候多個<destination>標(biāo)簽,默認(rèn)主從,還有輪詢獲取和隨機destinationIndex = connectionStrategy.selectNextDestinationIndex(destinationIndex, destinations.size());long startWallTime = System.currentTimeMillis();Socket tempSocket = null;OutputStream tempOutputStream = null;/** Choose next server*/InetSocketAddress currentDestination = destinations.get(destinationIndex);try {/** Update peerId (for status message)*/peerId = "Log destination " + currentDestination + ": ";/** Delay the connection attempt if the last attempt to the selected destination* was less than the reconnectionDelay.* 判斷最后一次嘗試連接的時間和延遲重連比較,如果上一次重試的時間小于30s,會提示并在30減去重試時間后,發(fā)起重連*/final long millisSinceLastAttempt = startWallTime - destinationAttemptStartTimes[destinationIndex];if (millisSinceLastAttempt < reconnectionDelay.getMilliseconds()) {final long sleepTime = reconnectionDelay.getMilliseconds() - millisSinceLastAttempt;if (errorCount < MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {addWarn(peerId + "Waiting " + sleepTime + "ms before attempting reconnection.");}try {shutdownLatch.await(sleepTime, TimeUnit.MILLISECONDS);if (!isStarted()) {return;}} catch (InterruptedException ie) {Thread.currentThread().interrupt();addWarn(peerId + "connection interrupted. Will no longer attempt reconnection.");return;}// reset the start time to be after the wait period.startWallTime = System.currentTimeMillis();}// 更新當(dāng)前index的最后重試時間destinationAttemptStartTimes[destinationIndex] = startWallTime;/** Set the SO_TIMEOUT so that SSL handshakes will timeout if they take too long.** Note that SO_TIMEOUT only applies to reads (which occur during the handshake process).*/tempSocket = socketFactory.createSocket();tempSocket.setSoTimeout(acceptConnectionTimeout);/** currentDestination is unresolved, so a new InetSocketAddress* must be created to resolve the hostname.*/tempSocket.connect(new InetSocketAddress(getHostString(currentDestination), currentDestination.getPort()), acceptConnectionTimeout);/** Trigger SSL handshake immediately and declare the socket unconnected if it fails*/if (tempSocket instanceof SSLSocket) {((SSLSocket)tempSocket).startHandshake();}/** Issue #218, make buffering the output stream optional.*/tempOutputStream = writeBufferSize > 0? new BufferedOutputStream(tempSocket.getOutputStream(), writeBufferSize): tempSocket.getOutputStream();if (getLogback11Support().isLogback11OrBefore()) {getLogback11Support().init(encoder, tempOutputStream);}addInfo(peerId + "connection established.");this.socket = tempSocket;this.outputStream = tempOutputStream;boolean shouldUpdateThreadName = (destinationIndex != connectedDestinationIndex);connectedDestinationIndex = destinationIndex;connectedDestination = currentDestination;connectionStrategy.connectSuccess(startWallTime, destinationIndex, destinations.size());if (shouldUpdateThreadName) {/** destination has changed, so update the thread name*/updateCurrentThreadName();}// 默認(rèn)的schedule線程池,每10s觸發(fā)一次,讀取server的返回this.readerFuture = scheduleReaderCallable(new ReaderCallable(tempSocket.getInputStream()));fireConnectionOpened(this.socket);return;} catch (Exception e) {CloseUtil.closeQuietly(tempOutputStream);CloseUtil.closeQuietly(tempSocket);connectionStrategy.connectFailed(startWallTime, destinationIndex, destinations.size());fireConnectionFailed(currentDestination, e);/** Avoid spamming status messages by checking the MAX_REPEAT_CONNECTION_ERROR_LOG.*/if (errorCount++ < MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {addWarn(peerId + "connection failed.", e);}} }scheduleKeepAlive 維持連接的,需要在xml中配置 <keepAliveDuration>,默認(rèn)不觸發(fā)這個方法
scheduleWriteTimeout 監(jiān)測寫入超時的
ReaderCallable.call 讀取服務(wù)器的流,沒有返回空。但是! 觸發(fā)定時線程池往 Disruptor 中觸發(fā)一個空事件。
其實作者也說了觸發(fā)空事件就是為了 keepalive,觸發(fā)的時候會判斷 future是否結(jié)束,結(jié)束的話重新打開socket。
如果沒有這個方法,會在下次觸發(fā) onEvent時重新連接,所以為了盡快打開socket,作者加了這個折中的方案。
@Override public Void call() throws Exception {updateCurrentThreadName();try {while (true) {try {if (inputStream.read() == -1) {/** End of stream reached, so we're done.*/return null;}} catch (SocketTimeoutException e) {/** ignore, and try again*/} catch (Exception e) {/** Something else bad happened, so we're done.*/throw e;}}} finally {if (!Thread.currentThread().isInterrupted()) {getExecutorService().submit(() -> {/** https://github.com/logstash/logstash-logback-encoder/issues/341** Pro-actively trigger the event handler's onEvent method in the handler thread* by publishing a null event (which usually means a keepAlive event).** When onEvent handles the event in the handler thread,* it will detect that readerFuture.isDone() and reopen the socket.** Without this, onEvent would not be called until the next event,* which might not occur for a while.* So, this is really just an optimization to reopen the socket as soon as possible.** We can't reopen the socket from this thread,* since all socket open/close must be done from the event handler thread.** There is a potential race condition here as well, since* onEvent could be triggered before the readerFuture completes.* We reduce (but not eliminate) the chance of that happening by* scheduling this task on the executorService.*/getDisruptor().getRingBuffer().tryPublishEvent(getEventTranslator(), null);});}} }其實看完這塊代碼,我的問題就破案了。
每隔10s發(fā)起重連是來源于這個地方觸發(fā)的空事件,也是正常的。期間很有可能是服務(wù)器斷開了連接,之后發(fā)起了重連。
對上面的流程梳理下
啟動的時候:創(chuàng)建連接、定時心跳維護連接(默認(rèn)關(guān)閉)、定時監(jiān)測寫入超時(默認(rèn)100ms)
創(chuàng)建連接:看上次嘗試連接時間是否超過30s,沒超過的話,等待20s后重連,超過的話立即重連。定時10s的單個線程讀取socket的輸入流,讀取完畢后觸發(fā) Disruptor 一個空事件。
觸發(fā)事件的時候:循環(huán)5次,判斷當(dāng)前的連接狀態(tài)(線程狀態(tài)和socket狀態(tài)),關(guān)閉:調(diào)用關(guān)閉連接和創(chuàng)建連接。開啟:調(diào)用寫入方法。
寫入方法:判斷是心跳維護還是正常事件,心跳維護寫換行符,正常事件寫入事件值。如果是批量終結(jié),調(diào)用 flush ,刷新流。
解決方案
雖然問題找到了,由于猜測是服務(wù)端釋放連接導(dǎo)致的這個問題,所以沒什么好的解決方案,粗暴一點,直接改了 logback-encoder 的日志級別,改為ERROR,看不到WARN日志了,有點騙自己的意思...
當(dāng)我寫完整個的時候發(fā)現(xiàn)了真正的問題所在...?logstash-logback-encoder 版本問題,改成5.3 可以了。
?
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的logback AbstractLogstashTcpSocketAppender 源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Finereport集群配置
- 下一篇: php 数据库类