多线程与高并发(九):单机压测工具JMH,单机最快MQ - Disruptor原理解析
單機壓測工具JMH
JMH Java準測試工具套件
什么是JMH
官網
http://openjdk.java.net/projects/code-tools/jmh/
創建JMH測試
1.創建Maven項目,添加依賴
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><encoding>UTF-8</encoding><java.version>1.8</java.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><groupId>mashibing.com</groupId><artifactId>HelloJMH2</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core --><dependency><groupId>org.openjdk.jmh</groupId><artifactId>jmh-core</artifactId><version>1.21</version></dependency><!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess --><dependency><groupId>org.openjdk.jmh</groupId><artifactId>jmh-generator-annprocess</artifactId><version>1.21</version><scope>test</scope></dependency></dependencies></project>2.idea安裝JMH插件 JMH plugin v1.0.3
3.由于用到了注解,打開運行程序注解配置
compiler -> Annotation Processors -> Enable Annotation Processing
4.定義需要測試類PS (ParallelStream)
package com.mashibing.jmh;import java.util.ArrayList; import java.util.List; import java.util.Random;public class PS {static List<Integer> nums = new ArrayList<>();static {Random r = new Random();for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));}static void foreach() {nums.forEach(v->isPrime(v));}static void parallel() {nums.parallelStream().forEach(PS::isPrime);}static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;}return true;} }5.寫單元測試
這個測試類一定要在 test package下面
package com.mashibing.jmh;import org.openjdk.jmh.annotations.*;import static org.junit.jupiter.api.Assertions.*;public class PSTest {@Benchmark@Warmup(iterations = 1, time = 3)@Fork(5)@BenchmarkMode(Mode.Throughput)@Measurement(iterations = 1, time = 3)public void testForEach() {PS.foreach();} }6.運行測試類,如果遇到下面的錯誤:
ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock (拒絕訪問。), exiting. Use -Djmh.ignoreLock=true to forcefully continue.at org.openjdk.jmh.runner.Runner.run(Runner.java:216)at org.openjdk.jmh.Main.main(Main.java:71)這個錯誤是因為JMH運行需要訪問系統的TMP目錄,解決辦法是:
打開RunConfiguration -> Environment Variables -> include system environment viables
7.閱讀測試報告
JMH中的基本概念
Warmup
預熱,由于JVM中對于特定代碼會存在優化(本地化),預熱對于測試結果很重要
Mesurement
總共執行多少次測試
Timeout
Threads
線程數,由fork指定
Benchmark mode
基準測試的模式
Benchmark
測試哪一段代碼
Next
官方樣例:
http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/
Disruptor單機最快MQ
內存里的高效隊列
介紹
主頁:http://lmax-exchange.github.io/disruptor/
源碼:https://github.com/LMAX-Exchange/disruptor
GettingStarted: https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
api: http://lmax-exchange.github.io/disruptor/docs/index.html
maven: https://mvnrepository.com/artifact/com.lmax/disruptor
Disruptor的特點
對比ConcurrentLinkedQueue : 鏈表實現
JDK中沒有ConcurrentArrayQueue
Disruptor是數組實現的
無鎖,高并發,使用環形Buffer,直接覆蓋(不用清除)舊的數據,降低GC頻率
實現了基于事件的生產者消費者模式(觀察者模式)
RingBuffer
環形隊列
RingBuffer的序號,指向下一個可用的元素
采用數組實現,沒有首尾指針
對比ConcurrentLinkedQueue,用數組實現的速度更快
假如長度為8,當添加到第12個元素的時候在哪個序號上呢?用12%8決定
當Buffer被填滿的時候到底是覆蓋還是等待,由Producer決定
長度設為2的n次冪,利于二進制計算,例如:12%8 = 12 & (8 - 1) pos = num & (size -1)
Disruptor開發步驟
定義Event - 隊列中需要處理的元素
定義Event工廠,用于填充隊列
這里牽扯到效率問題:disruptor初始化的時候,會調用Event工廠,對ringBuffer進行內存的提前分配
GC產頻率會降低
定義EventHandler(消費者),處理容器中的元素
事件發布模板
long sequence = ringBuffer.next(); // Grab the next sequence try {LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor// for the sequenceevent.set(8888L); // Fill with data } finally {ringBuffer.publish(sequence); }使用EventTranslator發布事件
//===============================================================EventTranslator<LongEvent> translator1 = new EventTranslator<LongEvent>() {@Overridepublic void translateTo(LongEvent event, long sequence) {event.set(8888L);}};ringBuffer.publishEvent(translator1);//===============================================================EventTranslatorOneArg<LongEvent, Long> translator2 = new EventTranslatorOneArg<LongEvent, Long>() {@Overridepublic void translateTo(LongEvent event, long sequence, Long l) {event.set(l);}};ringBuffer.publishEvent(translator2, 7777L);//===============================================================EventTranslatorTwoArg<LongEvent, Long, Long> translator3 = new EventTranslatorTwoArg<LongEvent, Long, Long>() {@Overridepublic void translateTo(LongEvent event, long sequence, Long l1, Long l2) {event.set(l1 + l2);}};ringBuffer.publishEvent(translator3, 10000L, 10000L);//===============================================================EventTranslatorThreeArg<LongEvent, Long, Long, Long> translator4 = new EventTranslatorThreeArg<LongEvent, Long, Long, Long>() {@Overridepublic void translateTo(LongEvent event, long sequence, Long l1, Long l2, Long l3) {event.set(l1 + l2 + l3);}};ringBuffer.publishEvent(translator4, 10000L, 10000L, 1000L);//===============================================================EventTranslatorVararg<LongEvent> translator5 = new EventTranslatorVararg<LongEvent>() {@Overridepublic void translateTo(LongEvent event, long sequence, Object... objects) {long result = 0;for(Object o : objects) {long l = (Long)o;result += l;}event.set(result);}};ringBuffer.publishEvent(translator5, 10000L, 10000L, 10000L, 10000L);使用Lamda表達式
package com.mashibing.disruptor;import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory;public class Main03 {public static void main(String[] args) throws Exception{// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);// Connect the handlerdisruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ringBuffer.publishEvent((event, sequence) -> event.set(10000L));System.in.read();} }ProducerType生產者線程模式
ProducerType有兩種模式 Producer.MULTI和Producer.SINGLE
默認是MULTI,表示在多線程模式下產生sequence
如果確認是單線程生產者,那么可以指定SINGLE,效率會提升
如果是多個生產者(多線程),但模式指定為SINGLE,會出什么問題呢?
等待策略
1,(常用)BlockingWaitStrategy:通過線程阻塞的方式,等待生產者喚醒,被喚醒后,再循環檢查依賴的sequence是否已經消費。
2,BusySpinWaitStrategy:線程一直自旋等待,可能比較耗cpu
3,LiteBlockingWaitStrategy:線程阻塞等待生產者喚醒,與BlockingWaitStrategy相比,區別在signalNeeded.getAndSet,如果兩個線程同時訪問一個訪問waitfor,一個訪問signalAll時,可以減少lock加鎖次數.
4,LiteTimeoutBlockingWaitStrategy:與LiteBlockingWaitStrategy相比,設置了阻塞時間,超過時間后拋異常。
5,PhasedBackoffWaitStrategy:根據時間參數和傳入的等待策略來決定使用哪種等待策略
6,TimeoutBlockingWaitStrategy:相對于BlockingWaitStrategy來說,設置了等待時間,超過后拋異常
7,(常用)YieldingWaitStrategy:嘗試100次,然后Thread.yield()讓出cpu
8,(常用)SleepingWaitStrategy : sleep
消費者異常處理
默認:disruptor.setDefaultExceptionHandler()
覆蓋:disruptor.handleExceptionFor().with()
總結
以上是生活随笔為你收集整理的多线程与高并发(九):单机压测工具JMH,单机最快MQ - Disruptor原理解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL 行转列的方法
- 下一篇: Redis实战(七):redis的集群: