聊聊ExecutorService的监控
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
序
本文主要研究一下ExecutorService的監(jiān)控
InstrumentedExecutorService
metrics-core-4.0.2-sources.jar!/com/codahale/metrics/InstrumentedExecutorService.java
/*** An {@link ExecutorService} that monitors the number of tasks submitted, running,* completed and also keeps a {@link Timer} for the task duration.* <p/>* It will register the metrics using the given (or auto-generated) name as classifier, e.g:* "your-executor-service.submitted", "your-executor-service.running", etc.*/ public class InstrumentedExecutorService implements ExecutorService {private static final AtomicLong NAME_COUNTER = new AtomicLong();private final ExecutorService delegate;private final Meter submitted;private final Counter running;private final Meter completed;private final Timer idle;private final Timer duration;/*** Wraps an {@link ExecutorService} uses an auto-generated default name.** @param delegate {@link ExecutorService} to wrap.* @param registry {@link MetricRegistry} that will contain the metrics.*/public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry) {this(delegate, registry, "instrumented-delegate-" + NAME_COUNTER.incrementAndGet());}/*** Wraps an {@link ExecutorService} with an explicit name.** @param delegate {@link ExecutorService} to wrap.* @param registry {@link MetricRegistry} that will contain the metrics.* @param name name for this executor service.*/public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {this.delegate = delegate;this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));this.running = registry.counter(MetricRegistry.name(name, "running"));this.completed = registry.meter(MetricRegistry.name(name, "completed"));this.idle = registry.timer(MetricRegistry.name(name, "idle"));this.duration = registry.timer(MetricRegistry.name(name, "duration"));}@Overridepublic void execute(Runnable runnable) {submitted.mark();delegate.execute(new InstrumentedRunnable(runnable));}@Overridepublic Future<?> submit(Runnable runnable) {submitted.mark();return delegate.submit(new InstrumentedRunnable(runnable));}@Overridepublic <T> Future<T> submit(Runnable runnable, T result) {submitted.mark();return delegate.submit(new InstrumentedRunnable(runnable), result);}@Overridepublic <T> Future<T> submit(Callable<T> task) {submitted.mark();return delegate.submit(new InstrumentedCallable<>(task));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAll(instrumented);}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAll(instrumented, timeout, unit);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws ExecutionException, InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAny(instrumented);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAny(instrumented, timeout, unit);}private <T> Collection<? extends Callable<T>> instrument(Collection<? extends Callable<T>> tasks) {final List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size());for (Callable<T> task : tasks) {instrumented.add(new InstrumentedCallable<>(task));}return instrumented;}@Overridepublic void shutdown() {delegate.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return delegate.shutdownNow();}@Overridepublic boolean isShutdown() {return delegate.isShutdown();}@Overridepublic boolean isTerminated() {return delegate.isTerminated();}@Overridepublic boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {return delegate.awaitTermination(l, timeUnit);}//...... }- InstrumentedExecutorService實(shí)現(xiàn)了ExecutorService,對jdk原始的ExecutorService進(jìn)行了包裝,對相應(yīng)的方法織入指標(biāo)統(tǒng)計(jì)
- 主要統(tǒng)計(jì)了已提交的任務(wù)submitted(Meter),運(yùn)行中的任務(wù)running(Counter),完成的任務(wù)completed(Meter),空閑時(shí)長idle(Timer),運(yùn)行時(shí)長duration(Timer)
- 為了統(tǒng)計(jì)后面幾個(gè)指標(biāo),需要對Runnable以及Callable進(jìn)行織入,因而引入了InstrumentedRunnable、InstrumentedCallable
InstrumentedRunnable
private class InstrumentedRunnable implements Runnable {private final Runnable task;private final Timer.Context idleContext;InstrumentedRunnable(Runnable task) {this.task = task;this.idleContext = idle.time();}@Overridepublic void run() {idleContext.stop();running.inc();final Timer.Context durationContext = duration.time();try {task.run();} finally {durationContext.stop();running.dec();completed.mark();}}}- 織入了對idle、duration、running、completed的統(tǒng)計(jì)
InstrumentedCallable
private class InstrumentedCallable<T> implements Callable<T> {private final Callable<T> callable;InstrumentedCallable(Callable<T> callable) {this.callable = callable;}@Overridepublic T call() throws Exception {running.inc();final Timer.Context context = duration.time();try {return callable.call();} finally {context.stop();running.dec();completed.mark();}}}- 織入了對duration、running、completed的統(tǒng)計(jì)
ExecutorServiceMetrics
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java
/*** Monitors the status of executor service pools. Does not record timings on operations executed in the {@link ExecutorService},* as this requires the instance to be wrapped. Timings are provided separately by wrapping the executor service* with {@link TimedExecutorService}.** @author Jon Schneider* @author Clint Checketts*/ @NonNullApi @NonNullFields public class ExecutorServiceMetrics implements MeterBinder {@Nullableprivate final ExecutorService executorService;private final Iterable<Tag> tags;public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName, Iterable<Tag> tags) {this.executorService = executorService;this.tags = Tags.concat(tags, "name", executorServiceName);}//....../*** Record metrics on the use of an {@link Executor}.** @param registry The registry to bind metrics to.* @param executor The executor to instrument.* @param executorName Will be used to tag metrics with "name".* @param tags Tags to apply to all recorded metrics.* @return The instrumented executor, proxied.*/public static Executor monitor(MeterRegistry registry, Executor executor, String executorName, Iterable<Tag> tags) {if (executor instanceof ExecutorService) {return monitor(registry, (ExecutorService) executor, executorName, tags);}return new TimedExecutor(registry, executor, executorName, tags);}/*** Record metrics on the use of an {@link ExecutorService}.** @param registry The registry to bind metrics to.* @param executor The executor to instrument.* @param executorServiceName Will be used to tag metrics with "name".* @param tags Tags to apply to all recorded metrics.* @return The instrumented executor, proxied.*/public static ExecutorService monitor(MeterRegistry registry, ExecutorService executor, String executorServiceName, Iterable<Tag> tags) {new ExecutorServiceMetrics(executor, executorServiceName, tags).bindTo(registry);return new TimedExecutorService(registry, executor, executorServiceName, tags);}@Overridepublic void bindTo(MeterRegistry registry) {if (executorService == null) {return;}String className = executorService.getClass().getName();if (executorService instanceof ThreadPoolExecutor) {monitor(registry, (ThreadPoolExecutor) executorService);} else if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass()));} else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));} else if (executorService instanceof ForkJoinPool) {monitor(registry, (ForkJoinPool) executorService);}}private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {if (tp == null) {return;}FunctionCounter.builder("executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount).tags(tags).description("The approximate total number of tasks that have completed execution").baseUnit("tasks").register(registry);Gauge.builder("executor.active", tp, ThreadPoolExecutor::getActiveCount).tags(tags).description("The approximate number of threads that are actively executing tasks").baseUnit("threads").register(registry);Gauge.builder("executor.queued", tp, tpRef -> tpRef.getQueue().size()).tags(tags).description("The approximate number of threads that are queued for execution").baseUnit("threads").register(registry);Gauge.builder("executor.pool.size", tp, ThreadPoolExecutor::getPoolSize).tags(tags).description("The current number of threads in the pool").baseUnit("threads").register(registry);}//...... }- ExecutorServiceMetrics實(shí)現(xiàn)了MeterBinder接口,另外提供了靜態(tài)方法來創(chuàng)建帶有監(jiān)控指標(biāo)的ExecutorService,該靜態(tài)方法命名為monitor,非常形象
- monitor方法首先創(chuàng)建ExecutorServiceMetrics,并bindTo了MeterRegistry,然后返回TimedExecutorService
- bindTo方法上報(bào)了executor.completed(FunctionCounter),executor.active(Gauge),executor.queued(Gauge),executor.pool.size(Gauge)這幾個(gè)指標(biāo)
TimedExecutorService
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/internal/TimedExecutorService.java
/*** An {@link java.util.concurrent.ExecutorService} that is timed** @author Jon Schneider*/ public class TimedExecutorService implements ExecutorService {private final ExecutorService delegate;private final Timer timer;public TimedExecutorService(MeterRegistry registry, ExecutorService delegate, String executorServiceName, Iterable<Tag> tags) {this.delegate = delegate;this.timer = registry.timer("executor", Tags.concat(tags ,"name", executorServiceName));}@Overridepublic void shutdown() {delegate.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return delegate.shutdownNow();}@Overridepublic boolean isShutdown() {return delegate.isShutdown();}@Overridepublic boolean isTerminated() {return delegate.isTerminated();}@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return delegate.awaitTermination(timeout, unit);}@Overridepublic <T> Future<T> submit(Callable<T> task) {return delegate.submit(timer.wrap(task));}@Overridepublic <T> Future<T> submit(Runnable task, T result) {return delegate.submit(() -> timer.record(task), result);}@Overridepublic Future<?> submit(Runnable task) {return delegate.submit(() -> timer.record(task));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {return delegate.invokeAll(wrapAll(tasks));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {return delegate.invokeAll(wrapAll(tasks), timeout, unit);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {return delegate.invokeAny(wrapAll(tasks));}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return delegate.invokeAny(wrapAll(tasks), timeout, unit);}@Overridepublic void execute(Runnable command) {delegate.execute(timer.wrap(command));}private <T> Collection<? extends Callable<T>> wrapAll(Collection<? extends Callable<T>> tasks) {return tasks.stream().map(timer::wrap).collect(toList());} }- 對ExecutorService進(jìn)行包裝,增加了
Timer.record
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/Timer.java
/*** Executes the runnable `f` and records the time taken.** @param f Function to execute and measure the execution time.*/void record(Runnable f);/*** Wrap a {@link Runnable} so that it is timed when invoked.** @param f The Runnable to time when it is invoked.* @return The wrapped Runnable.*/default Runnable wrap(Runnable f) {return () -> record(f);}/*** Wrap a {@link Callable} so that it is timed when invoked.** @param f The Callable to time when it is invoked.* @param <T> The return type of the callable.* @return The wrapped callable.*/default <T> Callable<T> wrap(Callable<T> f) {return () -> recordCallable(f);}- warp方法主要是包裝調(diào)用record方法,而record由實(shí)現(xiàn)類去實(shí)現(xiàn)
AbstractTimer
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/AbstractTimer.java
@Overridepublic void record(Runnable f) {final long s = clock.monotonicTime();try {f.run();} finally {final long e = clock.monotonicTime();record(e - s, TimeUnit.NANOSECONDS);}}@Overridepublic final void record(long amount, TimeUnit unit) {if (amount >= 0) {histogram.recordLong(TimeUnit.NANOSECONDS.convert(amount, unit));recordNonNegative(amount, unit);if (intervalEstimator != null) {intervalEstimator.recordInterval(clock.monotonicTime());}}}- record采用histogram進(jìn)行統(tǒng)計(jì)
小結(jié)
dropwizard及micrometer均提供了對ExecutorService的指標(biāo)統(tǒng)計(jì)的包裝,micrometer則更近一步提供了靜態(tài)方法來直接創(chuàng)建,非常方便。
doc
- InstrumentedExecutorService
- ExecutorServiceMetrics
轉(zhuǎn)載于:https://my.oschina.net/go4it/blog/2223506
總結(jié)
以上是生活随笔為你收集整理的聊聊ExecutorService的监控的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring-boot-admin 2.
- 下一篇: 【数学】【CF27E】 Number W