如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用
需求是在一個大數(shù)據(jù)量的表中按條件查詢出數(shù)據(jù)后做相應(yīng)的業(yè)務(wù)。我是使用的java線程池ThreadPoolExecutor,實現(xiàn)分批次去查詢,查詢到數(shù)據(jù)后,又分多個線程去做業(yè)務(wù)。
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構(gòu)造方法為:ThreadPoolExecutor(int corePoolSize,? int maximumPoolSize, ? long keepAliveTime,?? TimeUnit unit,?? BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize: 線程池維護線程的最少數(shù)量
maximumPoolSize:線程池維護線程的最大數(shù)量
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 線程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊列
handler: 線程池對拒絕任務(wù)的處理策略
一個任務(wù)通過 execute(Runnable)方法被添加到線程池,任務(wù)就是一個 Runnable類型的對象,任務(wù)的執(zhí)行方法就是Runnable類型對象的run()方法。
線程池剛創(chuàng)建時,里面沒有一個線程。任務(wù)隊列是作為參數(shù)傳進來的。不過,就算隊列里面有任務(wù),線程池也不會馬上執(zhí)行它們。當調(diào)用 execute() 方法添加一個任務(wù)時,線程池會做如下判斷:
???????? a. 如果正在運行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運行這個任務(wù);
b. 如果正在運行的線程數(shù)量大于或等于 corePoolSize,那么將這個任務(wù)放入隊列。
c. 如果這時候隊列滿了,而且正在運行的線程數(shù)量小于 maximumPoolSize,那么還是要創(chuàng)建線程運行這個任務(wù);
d. 如果隊列滿了,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會拋出異常,告訴調(diào)用者“我不能再接受任務(wù)了”。
當一個線程完成任務(wù)時,它會從隊列中取下一個任務(wù)來執(zhí)行。
當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數(shù)大于 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務(wù)完成后,它最終會收縮到 corePoolSize 的大小。
這樣的過程說明,并不是先加入任務(wù)就一定會先執(zhí)行。假設(shè)隊列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那么當加入 20 個任務(wù)時,執(zhí)行的順序就是這樣的:首先執(zhí)行任務(wù) 1、2、3,然后任務(wù) 4~13 被放入隊列。這時候隊列滿了,任務(wù) 14、15、16 會被馬上執(zhí)行,而任務(wù) 17~20 則會拋出異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
下面來看具體的代碼(代碼中會有部分代碼以+++表示不方便各位查看的):
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
??????????????? new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
??????? int pageSize = 1000; // 每次查詢交易條數(shù)
??????? int handleSize = 200; // 線程一次性處理的交易條數(shù)
??????? int handleCount = 0;
??????? int transCount = +++mapper.getTransCount(batchDate, +);//根據(jù)條件去查詢需要做業(yè)務(wù)的數(shù)據(jù)條數(shù),查詢條數(shù)的sql語句快
??????? logger.info(MessageFormat.format("[+++日期:[{0}], 待+++記錄條數(shù)為:[{1}]", batchDate, transCount));//MessageFormat.format是日志的一個方法,推薦大家這么使用
??????? List<+++> tranList = null;
??????? while (handleCount < transCount)
??????? {
??????????? tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
??????????? if (tranList == null || tranList.size() == 0)
??????????? {
??????????????? logger.info(MessageFormat.format(++++
??????????????? handleCount += pageSize;
??????????????? continue;
??????????? }
??????????? int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
??????????? CountDownLatch latch = new CountDownLatch(splitCount);
??????????? for (int i = 0; i < splitCount; i++)
??????????? {
??????????????? int toIndex = (i + 1) * handleSize;
??????????????? if (i == splitCount - 1)
??????????????? {
??????????????????? toIndex = tranList.size();
??????????????? }
??????????????? List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
??????????????? threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到線程池,執(zhí)行的方法是+++Thread類中的run方法
??????????? }
??????????? handleCount += pageSize;
??????????? try
??????????? {
??????????????? latch.await(5, TimeUnit.MINUTES);
??????????? }
??????????? catch (InterruptedException e)
??????????? {
??????????????? logger.error(getClass().getName() + " doTask fail.", e);
??????????? }
??????? }
下面是threadPool.execute(new +++Thread...中的thread類
public class +++Thread implements Runnable
{
??? private Logger logger =
??? private List<NpcTransaction> trans;
??? private CountDownLatch latch;
??? private String batchDate;
??? private +++Manager
??? public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
??????????? CountDownLatch latch)
??? {
??????? this.+++Manager = +++Manager;
??????? this.trans = trans;
??????? this.batchDate = batchDate;
??????? this.latch = latch;
??? }
??? /**
???? * 重載方法
???? */
??? @Override
??? public void run()
??? {
??????? int saveCount = 0;
??????? try
??????? {
??????????? saveCount = +++Manager.save+++Record(trans);//
??????? }
??????? catch (Exception e)
??????? {
??????????? logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常:[{1}]", batchDate, e.getMessage()));
??????????? e.printStackTrace();
??????? }
??????? if (saveCount != trans.size())
??????? {
??????????? logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常,++成功條數(shù):[{1}],預(yù)期條數(shù):[{2}]", batchDate,
??????????????????? saveCount, trans.size()));
??????? }
??????? latch.countDown();
??? }
??? public List<++Transaction> getTrans()
??? {
??????? return trans;
??? }
??? public void setTrans(List<++Transaction> trans)
??? {
??????? this.trans = trans;
??? }
??? /**
???? * 獲取 latch
???? *
???? * @return 返回 latch
???? */
??? public CountDownLatch getLatch()
??? {
??????? return latch;
??? }
??? /**
???? * 設(shè)置 latch
???? *
???? * @param 對latch進行賦值
???? */
??? public void setLatch(CountDownLatch latch)
??? {
??????? this.latch = latch;
??? }
??? /**
???? * 獲取 batchDate
???? *
???? * @return 返回 batchDate
???? */
??? public String getBatchDate()
??? {
??????? return batchDate;
??? }
??? /**
???? * 設(shè)置 batchDate
???? *
???? * @param 對batchDate進行賦值
???? */
??? public void setBatchDate(String batchDate)
??? {
??????? this.batchDate = batchDate;
??? }
}
都要寫get,set方法,latch.countDown();這個最好寫在finally中
關(guān)于CountDownLatch這個,我下面簡單的說一下:
CountDownLatch,一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待。
主要方法
?public CountDownLatch(int count);
?public void countDown();
?public void await() throws InterruptedException
?構(gòu)造方法參數(shù)指定了計數(shù)的次數(shù)
?countDown方法,當前線程調(diào)用此方法,則計數(shù)減一
?awaint方法,調(diào)用此方法會一直阻塞當前線程,直到計時器的值為0
此處用計數(shù)器,因為一組1000條數(shù)據(jù)通過5個線程去執(zhí)行,這組執(zhí)行完再進行第二組以及其后組的1000條數(shù)據(jù)操作。
總結(jié)
以上是生活随笔為你收集整理的如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 濮阳第二届创客机器人比赛_咸阳市举行第二
- 下一篇: 镜像上传到linux失败,Docker