聊聊Elasticsearch的TimedRunnable
生活随笔
收集整理的這篇文章主要介紹了
聊聊Elasticsearch的TimedRunnable
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
序
本文主要研究一下Elasticsearch的TimedRunnable
TimedRunnable
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java
class TimedRunnable extends AbstractRunnable implements WrappedRunnable {private final Runnable original;private final long creationTimeNanos;private long startTimeNanos;private long finishTimeNanos = -1;TimedRunnable(final Runnable original) {this.original = original;this.creationTimeNanos = System.nanoTime();}@Overridepublic void doRun() {try {startTimeNanos = System.nanoTime();original.run();} finally {finishTimeNanos = System.nanoTime();}}@Overridepublic void onRejection(final Exception e) {if (original instanceof AbstractRunnable) {((AbstractRunnable) original).onRejection(e);}}@Overridepublic void onAfter() {if (original instanceof AbstractRunnable) {((AbstractRunnable) original).onAfter();}}@Overridepublic void onFailure(final Exception e) {if (original instanceof AbstractRunnable) {((AbstractRunnable) original).onFailure(e);}}@Overridepublic boolean isForceExecution() {return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();}/*** Return the time since this task was created until it finished running.* If the task is still running or has not yet been run, returns -1.*/long getTotalNanos() {if (finishTimeNanos == -1) {// There must have been an exception thrown, the total time is unknown (-1)return -1;}return Math.max(finishTimeNanos - creationTimeNanos, 1);}/*** Return the time this task spent being run.* If the task is still running or has not yet been run, returns -1.*/long getTotalExecutionNanos() {if (startTimeNanos == -1 || finishTimeNanos == -1) {// There must have been an exception thrown, the total time is unknown (-1)return -1;}return Math.max(finishTimeNanos - startTimeNanos, 1);}@Overridepublic Runnable unwrap() {return original;}} 復制代碼- TimedRunnable繼承了AbstractRunnable,同時實現了WrappedRunnable接口;它在doRun方法里頭記錄了原始Runnable的startTimeNanos及finishTimeNanos;同時提供了getTotalExecutionNanos來返回該task的執行耗時
實例
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {//......protected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);// A task has been completed, it has left the building. We should now be able to get the// total time as a combination of the time in the queue and time spent running the task. We// only want runnables that did not throw errors though, because they could be fast-failures// that throw off our timings, so only check when t is null.assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);final long taskNanos = timedRunnable.getTotalNanos();final long totalNanos = totalTaskNanos.addAndGet(taskNanos);final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;executionEWMA.addValue(taskExecutionNanos);if (taskCount.incrementAndGet() == this.tasksPerFrame) {final long endTimeNs = System.nanoTime();final long totalRuntime = endTimeNs - this.startNs;// Reset the start time for all tasks. At first glance this appears to need to be// volatile, since we are reading from a different thread when it is set, but it// is protected by the taskCount memory barrier.// See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.htmlstartNs = endTimeNs;// Calculate the new desired queue sizetry {final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L));final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);final int oldCapacity = workQueue.capacity();if (logger.isDebugEnabled()) {final long avgTaskTime = totalNanos / tasksPerFrame;logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +"[{} tasks/s], optimal queue is [{}], current capacity [{}]",getName(),tasksPerFrame,TimeValue.timeValueNanos(totalRuntime),TimeValue.timeValueNanos(avgTaskTime),TimeValue.timeValueNanos((long)executionEWMA.getAverage()),String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),desiredQueueSize,oldCapacity);}// Adjust the queue size towards the desired capacity using an adjust of// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max// values the queue size can have.final int newCapacity =workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);if (oldCapacity != newCapacity && logger.isDebugEnabled()) {logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(),newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,oldCapacity, newCapacity);}} catch (ArithmeticException e) {// There was an integer overflow, so just log about it, rather than adjust the queue sizelogger.warn(() -> new ParameterizedMessage("failed to calculate optimal queue size for [{}] thread pool, " +"total frame time [{}ns], tasks [{}], task execution time [{}ns]",getName(), totalRuntime, tasksPerFrame, totalNanos),e);} finally {// Finally, decrement the task count and time back to their starting values. We// do this at the end so there is no concurrent adjustments happening. We also// decrement them instead of resetting them back to zero, as resetting them back// to zero causes operations that came in during the adjustment to be uncountedint tasks = taskCount.addAndGet(-this.tasksPerFrame);assert tasks >= 0 : "tasks should never be negative, got: " + tasks;if (tasks >= this.tasksPerFrame) {// Start over, because we can potentially reach a "never adjusting" state,//// consider the following:// - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)// - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25// - Adjustment happens and we decrement the tasks by 10, taskCount is now 15// - Since taskCount will now be incremented forever, it will never be 10 again,// so there will be no further adjustmentslogger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName());totalTaskNanos.getAndSet(1);taskCount.getAndSet(0);startNs = System.nanoTime();} else {// Do a regular adjustmenttotalTaskNanos.addAndGet(-totalNanos);}}}}//...... } 復制代碼- QueueResizingEsThreadPoolExecutor的afterExecute會使用timedRunnable.getTotalExecutionNanos()的來進行EWMA統計
小結
TimedRunnable繼承了AbstractRunnable,同時實現了WrappedRunnable接口;它在doRun方法里頭記錄了原始Runnable的startTimeNanos及finishTimeNanos;同時提供了getTotalExecutionNanos來返回該task的執行耗時
doc
- TimedRunnable
轉載于:https://juejin.im/post/5cffb26c5188252c023faa3b
總結
以上是生活随笔為你收集整理的聊聊Elasticsearch的TimedRunnable的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cpu虚焊是什么意思(显卡挖矿是什么原理
- 下一篇: windows电脑怎么合盘