使用Akka简化交易系统
我的同事正在開發一種交易系統,該系統可以處理大量的傳入交易。 每筆交易都涵蓋一種Instrument (例如債券或股票),并且具有某些(現在)不重要的屬性。 他們堅持使用Java(<8),所以我們堅持下去:
Instrument稍后將用作HashMap的鍵,因此將來我們會主動實現Comparable<Instrument> 。 這是我們的領域,現在的要求是:
最初的實現很簡單–將所有傳入的事務放入一個使用方的隊列(例如ArrayBlockingQueue )中。 這滿足了最后的要求,因為隊列在所有事務中都保留了嚴格的FIFO順序。 但是,這種架構阻止了針對不同工具的不相關交易的并發處理,從而浪費了令人信服的吞吐量提高。 毫無疑問,這種實現盡管很簡單,卻成為了瓶頸。
第一個想法是以某種方式分別按工具和流程工具拆分傳入的交易。 我們提出了以下數據結構:
priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx); }! 但是最壞的時刻還沒有到來。 您如何確保最多一個線程一次處理每個隊列? 畢竟,否則,兩個線程可以從一個隊列(一種儀器)中提取項目并以相反的順序處理它們,這是不允許的。 最簡單的情況是每個隊列都有一個Thread -這無法擴展,因為我們期望成千上萬種不同的工具。 因此,我們可以說N線程,讓每個線程處理隊列的一個子集,例如instrument.hashCode() % N告訴我們哪個線程負責處理給定的隊列。 但是由于以下三個原因,它仍然不夠完美:
實現這種怪異是可能的,但是困難且容易出錯。 此外,還有另一個非功能性的要求:儀器來來往往,隨著時間的流逝,成千上萬的儀器。 一段時間后,我們應刪除代表最近未見過的儀器的地圖條目。 否則我們會發生內存泄漏。
如果您能提出一些更簡單的解決方案,請告訴我。 同時,讓我告訴你我對同事的建議。 如您所料,它是Akka –結果非常簡單。 我們需要兩種角色: Dispatcher和Processor 。 Dispatcher有一個實例,并接收所有傳入的事務。 它的責任是為每個Instrument找到或生成工作Processor角色,并向其推送事務:
public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}} }這很簡單。 由于我們的Dispatcher actor實際上是單線程的,因此不需要同步。 我們幾乎沒有收到Transaction ,查找或創建Processor并進一步傳遞Transaction 。 這是Processor實現的樣子:
public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);} }而已! 有趣的是,我們的Akka實現幾乎與我們第一個使用隊列映射的想法相同。 畢竟,參與者只是一個隊列,還有一個(邏輯)線程在該隊列中處理項目。 區別在于:Akka管理有限的線程池,并可能在成千上萬的參與者之間共享它。 而且,由于每個工具都有其專用(和“單線程”)執行器,因此可以保證每個工具的事務順序處理。
還有一件事。 如前所述,有大量的樂器,我們不想讓演員出現一段時間了。 假設如果Processor在一個小時內未收到任何交易,則應停止并收集垃圾。 如果以后我們收到此類工具的新交易,則可以隨時重新創建它。 這是一個非常棘手的問題–我們必須確保,如果處理器決定刪除自身時,如果事務到達,我們將無法松開該事務。 Processor沒有停止自身,而是向其父Processor發出空閑時間過長的信號。 然后, Dispatcher將發送PoisonPill到它。 因為ProcessorIdle和Transaction消息都是順序處理的,所以沒有交易發送到不再存在的參與者的風險。
每個setReceiveTimeout通過使用setReceiveTimeout安排超時來獨立地管理其生命周期:
public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE }顯然,當Processor在一個小時內未收到任何消息時,它會向其父級( Dispatcher )輕輕發出信號。 但是演員仍然活著,并且如果交易恰好在一小時后發生,便可以處理。 Dispatcher作用是殺死給定的Processor并將其從地圖中刪除:
public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}不便之處。 instrumentProcessors過去是Map<Instrument, ActorRef> 。 事實證明這是不夠的,因為我們突然不得不按值刪除此映射中的條目。 換句話說,我們需要找到一個映射到給定ActorRef ( Processor )的鍵( Instrument )。 有多種處理方法(例如,空閑的Processor可以發送它處理的Instrumnt ),但是我改用了BiMap<K, V> 。 之所以起作用,是因為指定的Instrument和ActorRef都是唯一的(每個樂器的actor)。 使用BiMap我可以簡單地對地圖進行inverse() (從BiMap<Instrument, ActorRef>到BiMap<ActorRef, Instrument>并將ActorRef視為鍵。
這個Akka例子只不過是“ hello,world ”。 但是與卷積解決方案相比,我們必須使用并發隊列,鎖和線程池進行編寫,這是完美的。 我的隊友非常興奮,以至于最終他們決定將整個應用程序重寫為Akka。
翻譯自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html
總結
以上是生活随笔為你收集整理的使用Akka简化交易系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 定级备案材料定级备案报告(定级备案材料)
- 下一篇: 安卓英雄无敌3(安卓英雄无敌)