线程池优化之充分利用线程池资源
一、前言
最近做了電子發(fā)票的需求,分省開票接口和發(fā)票下載接口都有一定的延遲。為了完成開票后自動將發(fā)票插入用戶微信卡包,目前的解決方案是利用線程池,將開票后插入卡包的任務(wù)(輪詢分省發(fā)票接口,直到獲取到發(fā)票相關(guān)信息或者輪詢次數(shù)用完,如果獲取到發(fā)票信息,執(zhí)行發(fā)票插入微信卡包,結(jié)束任務(wù))放入線程池異步執(zhí)行。仔細(xì)想一想,這種實(shí)現(xiàn)方案存在一個問題,線程池沒有充分的利用。為什么沒有充分的利用?下面詳細(xì)的分析。
二、異步線程池和異步任務(wù)包裝
AsyncConfigurerSupport可以幫我們指定異步任務(wù)(注有@Async注解)對應(yīng)的線程池。
@Configuration public class MyAsyncConfigurer extends AsyncConfigurerSupport {private static Logger LOGGER = LoggerFactory.getLogger(MyAsyncConfigurer.class);@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(2);taskExecutor.setMaxPoolSize(4);taskExecutor.setQueueCapacity(10);taskExecutor.setRejectedExecutionHandler((runnable, executor) -> LOGGER.error("異步線程池拒絕任務(wù)..." + runnable));taskExecutor.setThreadFactory(new MyAsyncThreadFactory());taskExecutor.initialize();return taskExecutor;}static class MyAsyncThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;MyAsyncThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "myasync-pool-" +poolNumber.getAndIncrement() +"-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}} }異步任務(wù)包裝,除了異步,還加入了retry功能,實(shí)現(xiàn)指定次數(shù)的接口輪詢。
@Component public class AsyncWrapped {protected static Logger LOGGER = LoggerFactory.getLogger(AsyncWrapped.class);@Asyncpublic void asyncProcess(Runnable runnable, Callback callback, Retry retry) {try {if (retry == null) {retry = new Retry(1);}retry.execute(ctx -> {runnable.run();return null;}, ctx -> {if (callback != null) {callback.call();}return null;});} catch (Exception e) {LOGGER.error("異步調(diào)用異常...", e);}} }業(yè)務(wù)代碼大致邏輯如下。
asyncWrapped.asyncProcess(() -> {//調(diào)用分省接口獲取發(fā)票信息//如果發(fā)票信息異常,拋出異常(進(jìn)入下次重試)//否則,插入用戶微信卡包}, () -> {//輪詢次數(shù)用盡,用戶插入卡包失敗 }, new Retry(2, 1000) );這里說一下為什么線程池沒有充分的利用。異步任務(wù)中包含輪詢操作,輪詢有一定的時間間隔,導(dǎo)致在這段時間間隔內(nèi),線程一直處于被閑置的狀態(tài)。所以為了能更好的利用線程池資源,我們得想辦法解決時間間隔的問題。假如有個延遲隊(duì)列,隊(duì)列里放著我們的異步任務(wù)(不包含重試機(jī)制),然后延遲(輪詢的時間間隔)一定時間之后,將任務(wù)放入線程池中執(zhí)行,任務(wù)執(zhí)行完畢之后根據(jù)是否需要再次執(zhí)行決定是否再次放入到延遲隊(duì)列去,這樣每個線程池中的線程都不會閑著,達(dá)到了充分利用的目的。
三、定時任務(wù)線程池和實(shí)現(xiàn)輪詢機(jī)制
@EnableScheduling 幫助開啟@Scheduled注解解析。注冊一個名字是ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME的定時任務(wù)線程池。
@Configuration @EnableScheduling @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class TaskConfiguration {@Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-schedule-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}} }? 實(shí)現(xiàn)輪詢?nèi)蝿?wù),實(shí)現(xiàn)接口SchedulingConfigurer,獲取ScheduledTaskRegistrar 并指定定時任務(wù)線程池。
@Override public void configureTasks(ScheduledTaskRegistrar registrar) {this.registrar = registrar;this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));scheduledTaskRegistrarHelper = new ScheduledTaskRegistrarHelper(); }scheduledFutures提交定時任務(wù)時返回結(jié)果集,periodTasks 定時任務(wù)結(jié)果集。
private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, TimingTask> periodTasks = new ConcurrentHashMap<>();定時任務(wù)包裝類,包含任務(wù)的執(zhí)行次數(shù)(重試次數(shù))、重試間隔、具體任務(wù)、重試次數(shù)用盡之后的回調(diào)等,以及自動結(jié)束定時任務(wù)、重試計數(shù)重置功能。
private static class TimingTask {//重試次數(shù)private Integer retry;//任務(wù)標(biāo)識private String taskId;//重試間隔private Long period;//具體任務(wù)private ScheduledRunnable task;//結(jié)束回調(diào)private ScheduledCallback callback;//重試計數(shù)private AtomicInteger count = new AtomicInteger(0);//父線程MDCprivate Map<String, String> curContext;public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {this.retry = retry;this.taskId = taskId;this.period = period;this.task = task;this.callback = callback;this.curContext = MDC.getCopyOfContextMap();}public Long getPeriod() {return period;}public void setPeriod(Long period) {this.period = period;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public Integer getRetry() {return retry;}public void setRetry(Integer retry) {this.retry = retry;}public AtomicInteger getCount() {return count;}public boolean reset() {for (int cnt = this.count.intValue(); cnt < this.retry; cnt = this.count.intValue()) {if (this.count.compareAndSet(cnt, 0)) {return true;}}return false;}public void process() {Map<String, String> preContext = MDC.getCopyOfContextMap();try {if (this.curContext == null) {MDC.clear();} else {// 將父線程的MDC內(nèi)容傳給子線程MDC.setContextMap(this.curContext);}this.task.run();exitTask(false);} catch (Exception e) {LOGGER.error("定時任務(wù)異常..." + this, e);if (count.incrementAndGet() >= this.retry) {exitTask(true);}} finally {if (preContext == null) {MDC.clear();} else {MDC.setContextMap(preContext);}}}//定時任務(wù)退出private void exitTask(boolean execCallback) {scheduledFutures.get(this.taskId).cancel(false);scheduledFutures.remove(this.getTaskId());periodTasks.remove(this.getTaskId());LOGGER.info("結(jié)束定時任務(wù): " + this);if (execCallback && callback != null) {callback.call();}}@Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, TimingTask.class);} }注意上面定時任務(wù)是如何退出的,是在某一次任務(wù)執(zhí)行成功之后(沒有異常拋出)或者定時任務(wù)執(zhí)行次數(shù)用盡才退出的。直接調(diào)用ScheduledFuture的cancel方法可以退出定時任務(wù)。還有就是定時任務(wù)中的日志需要父線程中的日志變量,所以需要對MDC進(jìn)行一下處理。
@Scope("prototype") @Bean public AspectTimingTask aspectTimingTask() {return new AspectTimingTask(); }@Aspect @Component public static class ScheduledAspect {@Around("target(AspectTimingTask)")public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) {MethodInvocationProceedingJoinPoint methodJoinPoint = (MethodInvocationProceedingJoinPoint) proceedingJoinPoint;Method method = ((MethodSignature) methodJoinPoint.getSignature()).getMethod();if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) {LOGGER.info("電子發(fā)票定時任務(wù)日志同步...");//其他處理 }}return proceedingJoinPoint.proceed();} }public static class AspectTimingTask implements Runnable {private TimingTask timingTask;@Override@ScheduledTaskpublic void run() {timingTask.process();}public void setTimingTask(TimingTask timingTask) {this.timingTask = timingTask;} }AspectTimingTask 是對TimingTask 的包裝類,實(shí)現(xiàn)了Runnable接口。主要是為了對run接口做一層切面,獲取ProceedingJoinPoint 實(shí)例(公司中的日志調(diào)用鏈系統(tǒng)需要這個參數(shù))。AspectTimingTask 的bean實(shí)例的scope是prototype,這個注意下。
public static void register(Integer retry, Long period, String taskId, ScheduledRunnable task, ScheduledCallback callback) {scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback); }private class ScheduledTaskRegistrarHelper {public void register(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {//是否可以重置定時任務(wù)TimingTask preTask = periodTasks.get(taskId);if (null != preTask&& preTask.reset()&& existTask(taskId)) {return;}TimingTask curTask = new TimingTask(retry, taskId, period, task, callback);AspectTimingTask aspectTimingTask = applicationContext.getBean(AspectTimingTask.class);aspectTimingTask.setTimingTask(curTask);ScheduledFuture<?> scheduledFuture = registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period);scheduledFutures.put(taskId, scheduledFuture);periodTasks.put(taskId, curTask);LOGGER.info("注冊定時任務(wù): " + curTask);}private boolean existTask(String taskId) {return scheduledFutures.containsKey(taskId) && periodTasks.containsKey(taskId);} }如果taskId的定時任務(wù)已經(jīng)存在則重置定時任務(wù),否則注冊新的定時任務(wù)。AspectTimingTask 實(shí)例通過ApplicationContext獲取,每次獲取都是一個新的實(shí)例。
由 異步輪詢?nèi)蝿?wù) 優(yōu)化成 定時任務(wù),充分利用了線程池。修改之后的業(yè)務(wù)代碼如下。
ScheduledTaskRegistrarHelper.register(10, 5*1000L, "taskId", () -> {//調(diào)用分省接口獲取發(fā)票信息//如果發(fā)票信息異常,拋出異常(進(jìn)入下次重試)//否則,插入用戶微信卡包 }() -> {//輪詢次數(shù)用盡,用戶插入卡包失敗 } );針對電子發(fā)票插入微信卡包定時任務(wù),重試執(zhí)行次數(shù)10次,每隔5秒執(zhí)行一次。任務(wù)完成之后結(jié)束定時任務(wù),執(zhí)行次數(shù)用盡之后觸發(fā)插入卡包失敗動作。
四、參考
? ? ?Spring異步調(diào)用原理及SpringAop攔截器鏈原理
? ? ?Springboot定時任務(wù)原理及如何動態(tài)創(chuàng)建定時任務(wù)
轉(zhuǎn)載于:https://www.cnblogs.com/hujunzheng/p/10660479.html
總結(jié)
以上是生活随笔為你收集整理的线程池优化之充分利用线程池资源的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Springboot源码——应用程序上下
- 下一篇: 借呗关闭了还能开通吗