Java 并发框架Disruptor(七)
生活随笔
收集整理的這篇文章主要介紹了
Java 并发框架Disruptor(七)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Disruptor VS BlockingQueue的壓測對比:
import java.util.concurrent.ArrayBlockingQueue;public class ArrayBlockingQueue4Test {public static void main(String[] args) {final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);final long startTime = System.currentTimeMillis();//向容器中添加元素new Thread(new Runnable() {public void run() {long i = 0;while (i < Constants.EVENT_NUM_OHM) {Data data = new Data(i, "c" + i);try {queue.put(data);} catch (InterruptedException e) {e.printStackTrace();}i++;}}}).start();new Thread(new Runnable() {public void run() {int k = 0;while (k < Constants.EVENT_NUM_OHM) {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}k++;}long endTime = System.currentTimeMillis();System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");}}).start();} }public interface Constants {int EVENT_NUM_OHM = 1000000;int EVENT_NUM_FM = 50000000;int EVENT_NUM_OM = 10000000;}
import java.util.concurrent.Executors;import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;public class DisruptorSingle4Test {public static void main(String[] args) {int ringBufferSize = 65536;final Disruptor<Data> disruptor = new Disruptor<Data>(new EventFactory<Data>() {public Data newInstance() {return new Data();}},ringBufferSize,Executors.newSingleThreadExecutor(),ProducerType.SINGLE, //new BlockingWaitStrategy()new YieldingWaitStrategy());DataConsumer consumer = new DataConsumer();//消費數據disruptor.handleEventsWith(consumer);disruptor.start();new Thread(new Runnable() {public void run() {RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {long seq = ringBuffer.next();Data data = ringBuffer.get(seq);data.setId(i);data.setName("c" + i);ringBuffer.publish(seq);}}}).start();} }
import com.lmax.disruptor.EventHandler;public class DataConsumer implements EventHandler<Data> {private long startTime;private int i;public DataConsumer() {this.startTime = System.currentTimeMillis();}public void onEvent(Data data, long seq, boolean bool)throws Exception {i++;if (i == Constants.EVENT_NUM_OHM) {long endTime = System.currentTimeMillis();System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");}}}
import java.io.Serializable;public class Data implements Serializable {private static final long serialVersionUID = 2035546038986494352L;private Long id ;private String name;public Data() {}public Data(Long id, String name) {super();this.id = id;this.name = name;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;} }
?
BlockingQueue測試:
1.建立一個工廠Event類,用于創建Event類實例對象
2.需要有一個jian監聽事件類,用于處理數據(Event類)
3.實例化Disruptor實例,配置一系列參數,編寫DisDisruptor核心組件
4.編寫生產者組件,向Disruptor容器中投遞數據
pom.xml添加:
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><scope>3.3.2</scope> </dependency>public class OrderEvent {private long value; //訂單的價格public long getValue() {return value;}public void setValue(long value) {this.value = value;} }
import com.lmax.disruptor.EventFactory;public class OrderEventFactory implements EventFactory<OrderEvent>{public OrderEvent newInstance() {return new OrderEvent(); //這個方法就是為了返回空的數據對象(Event)} }
public class OrderEventHandler implements EventHandler<OrderEvent>{public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(Integer.MAX_VALUE);System.err.println("消費者: " + event.getValue());} }
import java.nio.ByteBuffer;import com.lmax.disruptor.RingBuffer;public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data) {//1 在生產者發送消息的時候, 首先 需要從我們的ringBuffer里面 獲取一個可用的序號long sequence = ringBuffer.next(); //0 try {//2 根據這個序號, 找到具體的 "OrderEvent" 元素 注意:此時獲取的OrderEvent對象是一個沒有被賦值的"空對象"OrderEvent event = ringBuffer.get(sequence);//3 進行實際的賦值處理event.setValue(data.getLong(0)); } finally {//4 提交發布操作ringBuffer.publish(sequence); }} }
import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;public class Main {public static void main(String[] args) {// 參數準備工作OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());/*** 1 eventFactory: 消息(event)工廠對象* 2 ringBufferSize: 容器的長度* 3 executor: 線程池(建議使用自定義線程池) RejectedExecutionHandler* 4 ProducerType: 單生產者 還是 多生產者* 5 waitStrategy: 等待策略*///1. 實例化disruptor對象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2. 添加消費者的監聽 (構建disruptor 與 消費者的一個關聯關系)disruptor.handleEventsWith(new OrderEventHandler());//3. 啟動disruptordisruptor.start();//4. 獲取實際存儲數據的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for(long i = 0 ; i < 100; i ++){bb.putLong(0, i);producer.sendData(bb);}disruptor.shutdown();executor.shutdown();} }
?
?
?
?
public final class BlockingWaitStrategy implements WaitStrategy {private final Lock lock = new ReentrantLock();private final Condition processorNotifyCondition = lock.newCondition();@Overridepublic long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;if ((availableSequence = cursorSequence.get()) < sequence){lock.lock();try{while ((availableSequence = cursorSequence.get()) < sequence){barrier.checkAlert();processorNotifyCondition.await();}}finally{lock.unlock();}}while ((availableSequence = dependentSequence.get()) < sequence){barrier.checkAlert();}return availableSequence;}@Overridepublic void signalAllWhenBlocking(){lock.lock();try{processorNotifyCondition.signalAll();}finally{lock.unlock();}} }public final class SleepingWaitStrategy implements WaitStrategy {private static final int DEFAULT_RETRIES = 200;private final int retries;public SleepingWaitStrategy(){this(DEFAULT_RETRIES);}public SleepingWaitStrategy(int retries){this.retries = retries;}@Overridepublic long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;int counter = retries;while ((availableSequence = dependentSequence.get()) < sequence){counter = applyWaitMethod(barrier, counter);}return availableSequence;}@Overridepublic void signalAllWhenBlocking(){}private int applyWaitMethod(final SequenceBarrier barrier, int counter)throws AlertException{barrier.checkAlert();if (counter > 100){--counter;}else if (counter > 0){--counter;Thread.yield();}else{LockSupport.parkNanos(1L);}return counter;} }
public final class YieldingWaitStrategy implements WaitStrategy {private static final int SPIN_TRIES = 100;@Overridepublic long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;int counter = SPIN_TRIES;while ((availableSequence = dependentSequence.get()) < sequence){counter = applyWaitMethod(barrier, counter);}return availableSequence;}@Overridepublic void signalAllWhenBlocking(){}private int applyWaitMethod(final SequenceBarrier barrier, int counter)throws AlertException{barrier.checkAlert();if (0 == counter){Thread.yield();}else{--counter;}return counter;} }
?
?
?
轉載于:https://www.cnblogs.com/sunliyuan/p/10872380.html
總結
以上是生活随笔為你收集整理的Java 并发框架Disruptor(七)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [hihoCoder 1384]Geni
- 下一篇: 树:哈夫曼树和哈夫曼编码的详细介绍以及代