Java队列集合的性能测试
同時開10個線程存入和取出100萬的數據,結論如下:
DoubleBufferedQueue < ConcurrentLinkedQueue < ArrayBlockingQueue < LinkedBlockingQueue
執行結果如下:
100萬 DoubleBufferedQueue入隊時間:9510 出隊時間:10771
100萬 DoubleBufferedQueue入隊時間:8169 出隊時間:9789
1000萬 DoubleBufferedQueue入隊時間:98285 出隊時間:101088
1000萬 DoubleBufferedQueue入隊時間:101859 出隊時間:105964
100萬 ConcurrentLinkedQueue入隊時間:10557 出隊時間:13716
100萬 ConcurrentLinkedQueue入隊時間:25298 出隊時間:25332
1000萬 ConcurrentLinkedQueue隊列時間:121868 出隊時間:136116
1000萬 ConcurrentLinkedQueue隊列時間:134306 出隊時間:147893
100萬 ArrayBlockingQueue入隊時間:21080 出隊時間:22025
100萬 ArrayBlockingQueue入隊時間:17689 出隊時間:19654
1000萬 ArrayBlockingQueue入隊時間:194400 出隊時間:205968
1000萬 ArrayBlockingQueue入隊時間:192268 出隊時間:197982
100萬 LinkedBlockingQueue入隊時間:38236 出隊時間:52555
100萬 LinkedBlockingQueue入隊時間:30646 出隊時間:38573
1000萬 LinkedBlockingQueue入隊時間:375669 出隊時間:391976
1000萬 LinkedBlockingQueue入隊時間:701363 出隊時間:711217
?
doubleBufferedQueue:
package test.MoreThread.d;import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import test.MoreThread.l.linkedBlockingQueue; import comrt.util.DoubleBufferedQueue;//DoubleBufferedQueue入隊時間:9510 出隊時間:10771 //DoubleBufferedQueue入隊時間:8169 出隊時間:9789 public class doubleBufferedQueue {private static final Logger log = LoggerFactory.getLogger(doubleBufferedQueue.class);public final static int size1 = 1000000;public static DoubleBufferedQueue<Object> queue = new DoubleBufferedQueue<Object>(size1);public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {// long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new ExecDoubleBufferedQueue());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}doubleBufferedQueue.isOver = true;log.info("入隊列總共執行時間:" + allTime);}});thread1.start();// log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart));// ------------------------------Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new ExecDoubleBufferedQueue_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出隊列總共執行時間:" + allTime_out);}});thread2.start();} }class ExecDoubleBufferedQueue implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(doubleBufferedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < doubleBufferedQueue.size1; i++) {doubleBufferedQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time;// log.info("執行時間:" + time2);return time2;} }class ExecDoubleBufferedQueue_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(doubleBufferedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!doubleBufferedQueue.isOver) {doubleBufferedQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("執行時間:" + time2);return time2;} }?
concurrentLinkedQueue:
package test.MoreThread.c;import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory;//ConcurrentLinkedQueue入隊時間:10557 出隊時間:13716 //ConcurrentLinkedQueue入隊時間:25298 出隊時間:25332 public class concurrentLinkedQueue {private static final Logger log = LoggerFactory.getLogger(concurrentLinkedQueue.class);public static ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();public final static int size1 = 1000000;public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {// long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new Exec());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get(); // log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}concurrentLinkedQueue.isOver = true;log.info("隊列總共執行時間:" + allTime);}});thread1.start();// ------------------------------Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new Exec_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出隊列總共執行時間:" + allTime_out);}});thread2.start();// log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart)); } }class Exec implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(concurrentLinkedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < concurrentLinkedQueue.size1; i++) {concurrentLinkedQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2);return time2;} }class Exec_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(concurrentLinkedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!concurrentLinkedQueue.isOver) {concurrentLinkedQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("執行時間:" + time2);return time2;} }?
arrayBlockingQueue:
package test.MoreThread.a;import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory;//ArrayBlockingQueue入隊時間:21080 出隊時間:22025 //ArrayBlockingQueue入隊時間:17689 出隊時間:19654 public class arrayBlockingQueue {private static final Logger log = LoggerFactory.getLogger(arrayBlockingQueue.class);public final static int size1 = 1000000;public static ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(size1);public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {// long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new ExecArrayBlockingQueue());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}arrayBlockingQueue.isOver = true;log.info("隊列總共執行時間:" + allTime);}});thread1.start();// log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart));// ------------------------------Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new ExecArrayBlockingQueue_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出隊列總共執行時間:" + allTime_out);}});thread2.start();} }class ExecArrayBlockingQueue implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(arrayBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < arrayBlockingQueue.size1; i++) {arrayBlockingQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time;// log.info("執行時間:" + time2);return time2;} }class ExecArrayBlockingQueue_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(arrayBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!arrayBlockingQueue.isOver) {arrayBlockingQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("執行時間:" + time2);return time2;} }?
linkedBlockingQueue:
package test.MoreThread.l;import java.util.ArrayList; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory;//LinkedBlockingQueue入隊時間:38236 出隊時間:52555 //LinkedBlockingQueue入隊時間:30646 出隊時間:38573 public class linkedBlockingQueue {private static final Logger log = LoggerFactory.getLogger(linkedBlockingQueue.class);public final static int size1 = 1000000;public static LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(size1);public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new ExecLinkedBlockingQueue());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}linkedBlockingQueue.isOver = true;log.info("入隊列總共執行時間:" + allTime);}});thread1.start();// log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart)); // System.out.println(linkedBlockingQueue.queue.size());// ------------------------------ Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new ExecLinkedBlockingQueue_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出隊列總共執行時間:" + allTime_out);}});thread2.start();} }class ExecLinkedBlockingQueue implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(linkedBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < linkedBlockingQueue.size1; i++) {linkedBlockingQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time;// log.info("執行時間:" + time2);return time2;} }class ExecLinkedBlockingQueue_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(linkedBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!linkedBlockingQueue.isOver) {linkedBlockingQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("執行時間:" + time2);return time2;} }?
DoubleBufferedQueue雙緩沖隊列
package comrt.util;import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;import org.slf4j.Logger; import org.slf4j.LoggerFactory;//雙緩沖隊列,線程安全 public class DoubleBufferedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = 1011398447523020L;public static final int DEFAULT_QUEUE_CAPACITY = 5000000;public static final long DEFAULT_MAX_TIMEOUT = 0;public static final long DEFAULT_MAX_COUNT = 10;private Logger logger = LoggerFactory.getLogger(DoubleBufferedQueue.class.getName());/** The queued items */private ReentrantLock readLock;// 寫鎖private ReentrantLock writeLock;// 是否滿private Condition notFull;private Condition awake;// 讀寫數組private transient E[] writeArray;private transient E[] readArray;// 讀寫計數private volatile int writeCount;private volatile int readCount;// 寫數組下標指針private int writeArrayTP;private int writeArrayHP;// 讀數組下標指針private int readArrayTP;private int readArrayHP;private int capacity;public DoubleBufferedQueue(int capacity) {// 默認this.capacity = DEFAULT_QUEUE_CAPACITY;if (capacity > 0) {this.capacity = capacity;}readArray = (E[]) new Object[capacity];writeArray = (E[]) new Object[capacity];readLock = new ReentrantLock();writeLock = new ReentrantLock();notFull = writeLock.newCondition();awake = writeLock.newCondition();}private void insert(E e) {writeArray[writeArrayTP] = e;++writeArrayTP;++writeCount;}private E extract() {E e = readArray[readArrayHP];readArray[readArrayHP] = null;++readArrayHP;--readCount;return e;}/*** switch condition: read queue is empty && write queue is not empty* * Notice:This function can only be invoked after readLock is grabbed,or may* cause dead lock* * @param timeout* @param isInfinite* : whether need to wait forever until some other thread awake* it* @return* @throws InterruptedException*/private long queueSwap(long timeout, boolean isInfinite) throws InterruptedException {writeLock.lock();try {if (writeCount <= 0) {// logger.debug("Write Count:" + writeCount// + ", Write Queue is empty, do not switch!");try {// logger.debug("Queue is empty, need wait....");if (isInfinite && timeout <= 0) {awake.await();return -1;} else if (timeout > 0) {return awake.awaitNanos(timeout);} else {return 0;}} catch (InterruptedException ie) {awake.signal();throw ie;}} else {E[] tmpArray = readArray;readArray = writeArray;writeArray = tmpArray;readCount = writeCount;readArrayHP = 0;readArrayTP = writeArrayTP;writeCount = 0;writeArrayHP = readArrayHP;writeArrayTP = 0;notFull.signal();// logger.debug("Queue switch successfully!");return 0;}} finally {writeLock.unlock();}}@Overridepublic boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {if (e == null) {throw new NullPointerException();}long nanoTime = 0;if (timeout > 0) {nanoTime = unit.toNanos(timeout);}writeLock.lockInterruptibly();try {for (int i = 0; i < DEFAULT_MAX_COUNT; i++) {if (writeCount < writeArray.length) {insert(e);if (writeCount == 1) {awake.signal();}return true;}// Time outif (nanoTime <= 0) {// logger.debug("offer wait time out!");return false;}// keep waitingtry {// logger.debug("Queue is full, need wait....");nanoTime = notFull.awaitNanos(nanoTime);} catch (InterruptedException ie) {notFull.signal();throw ie;}}} finally {writeLock.unlock();}return false;}// 取 @Overridepublic E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanoTime = 0;if (timeout > 0) {nanoTime = unit.toNanos(timeout);}readLock.lockInterruptibly();try {if (nanoTime > 0) {for (int i = 0; i < DEFAULT_MAX_COUNT; i++) {if (readCount > 0) {return extract();}if (nanoTime <= 0) {// logger.debug("poll time out!");return null;}nanoTime = queueSwap(nanoTime, false);}} else {if (readCount > 0) {return extract();}queueSwap(nanoTime, false);if (readCount > 0) {return extract();} }} finally {readLock.unlock();}return null;}// 等待500毫秒 @Overridepublic E poll() {E ret = null;try {ret = poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);} catch (Exception e) {ret = null;}return ret;}// 查看 @Overridepublic E peek() {E e = null;readLock.lock();try {if (readCount > 0) {e = readArray[readArrayHP];}} finally {readLock.unlock();}return e;}// 默認500毫秒 @Overridepublic boolean offer(E e) {boolean ret = false;try {ret = offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);} catch (Exception e2) {ret = false;}return ret;}@Overridepublic void put(E e) throws InterruptedException {// never need to // block offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); }@Overridepublic E take() throws InterruptedException {return poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);}@Overridepublic int remainingCapacity() {return this.capacity;}@Overridepublic int drainTo(Collection<? super E> c) {return 0;}@Overridepublic int drainTo(Collection<? super E> c, int maxElements) {return 0;}@Overridepublic Iterator<E> iterator() {return null;}// 當前讀隊列中還有多少個 @Overridepublic int size() {int size = 0;readLock.lock();try {size = readCount;} finally {readLock.unlock();}return size;}/*** 當前已寫入的隊列大小* */public int WriteSize() {int size = 0;writeLock.lock();try {size = writeCount;} finally {writeLock.unlock();}return size;}public int unsafeReadSize() {return readCount;}public int unsafeWriteSize() {return writeCount;}public int capacity() {return capacity;}public String toMemString() {return "--read: " + readCount + "/" + capacity + "--write: " + writeCount + "/" + capacity;}// 清理/** public void clear() { readLock.lock(); writeLock.lock(); try { readCount* = 0; readArrayHP = 0; writeCount = 0; writeArrayTP = 0;* //logger.debug("Queue clear successfully!"); } finally {* writeLock.unlock(); readLock.unlock(); } }*/ }?
轉載于:https://www.cnblogs.com/zhuawang/p/4158486.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Java队列集合的性能测试的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JS的类型转换
- 下一篇: 在 App 扩展和主 App 间共享数据