使用SynchronousQueue实现生产者/消费者
Java提供了許多用于并發支持的有用類中,有一個我想談一談: SynchronousQueue 。 特別是,我想通過使用方便的SynchronousQueue作為交換機制來完成Producer / Consumer實現。
除非我們了解SynchronousQueue實現的內幕,否則可能不清楚為什么要使用這種類型的隊列進行生產者/消費者通信。 事實證明,這并不是我們過去通常考慮的隊列。 這個類比只是一個最多包含一個元素的集合。
為什么有用? 好吧,有幾個原因。 從生產者的角度來看,只能將一個元素(或消息)存儲到隊列中。 為了繼續進行下一個元素(或消息),生產者應等到消費者使用隊列中的當前元素。 從使用者的角度來看,它只是輪詢隊列以查找下一個可用的元素(或消息)。 很簡單,但是最大的好處是:生產者發送消息的速度不能超過消費者處理消息的速度。
這是我最近遇到的用例之一:比較兩個數據庫表(可能只是巨大的),并檢測其中包含不同數據或數據是否相同(副本)。 SynchronousQueue是解決此問題的便捷工具:它允許在自己的線程中處理每個表,并在從兩個不同的數據庫讀取數據時補償可能的超時/延遲。
讓我們從定義比較功能開始,該功能接受源數據源和目標數據源以及表名(進行比較)。 我正在使用Spring框架中非常有用的JdbcTemplate類,因為它非常好地抽象了處理連接和準備好的語句的所有無聊的細節。
public boolean compare( final DataSource source, final DataSource destination, final String table ) {final JdbcTemplate from = new JdbcTemplate( source );final JdbcTemplate to = new JdbcTemplate( destination ); }在進行任何實際數據比較之前,最好比較一下源數據庫和目標數據庫的表行數:
if( from.queryForLong('SELECT count(1) FROM ' + table ) != to.queryForLong('SELECT count(1) FROM ' + table ) ) {return false; }現在,至少知道表在兩個數據庫中包含相同數量的行,我們可以開始進行數據比較。 該算法非常簡單:
- 為源(生產者)和目標(消費者)數據庫創建一個單獨的線程
- 生產者線程從表中讀取單行并將其放入SynchronousQueue
- 使用者線程還從表中讀取單行,然后向隊列詢問要比較的可用行(必要時等待),最后比較兩個結果集
使用另一大部分Java并發實用程序進行線程池,讓我們定義一個具有固定線程數量的線程池(2)。
final ExecutorService executor = Executors.newFixedThreadPool( 2 ); final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();按照描述的算法,生產者功能可以表示為單個可調用項:
Callable< Void > producer = new Callable< Void >() {@Overridepublic Void call() throws Exception {from.query( 'SELECT * FROM ' + table,new RowCallbackHandler() {@Overridepublic void processRow(ResultSet rs) throws SQLException {try { List< ? > row = ...; // convert ResultSet to Listif( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {throw new SQLException( 'Having more data but consumer has already completed' );}} catch( InterruptedException ex ) {throw new SQLException( 'Having more data but producer has been interrupted' );}}});return null;} };由于Java語法,該代碼有點冗長,但實際上并沒有做很多事情。 從表生成器讀取的每個結果集都將轉換為一個列表(由于是樣板,因此省略了實現),并將其放入隊列( offer )。 如果隊列不為空,則生產者將被阻止等待消費者完成工作。 使用者可以分別表示為以下可調用對象:
Callable< Void > consumer = new Callable< Void >() {@Overridepublic Void call() throws Exception {to.query( 'SELECT * FROM ' + table,new RowCallbackHandler() {@Overridepublic void processRow(ResultSet rs) throws SQLException {try {List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );if( source == null ) {throw new SQLException( 'Having more data but producer has already completed' );} List< ? > destination = ...; // convert ResultSet to Listif( !source.equals( destination ) ) {throw new SQLException( 'Row data is not the same' );}} catch ( InterruptedException ex ) {throw new SQLException( 'Having more data but consumer has been interrupted' );}}});return null;} };使用者對隊列執行反向操作:與其放入數據,不如將數據從隊列中拉出( poll )。 如果隊列為空,則阻止消費者,等待生產者發布下一行。 剩下的部分只是提交那些可調用對象以執行。 Future的get方法返回的任何異常都表明表不包含相同的數據(或者從數據庫獲取數據存在問題):
List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );for( final Future< Void > future: futures ) {future.get( 5, TimeUnit.MINUTES );}參考: Andriy Redko {devmind}博客中的JCG合作伙伴 Andrey Redko 使用SynchronousQueue實現了生產者/消費者 。
翻譯自: https://www.javacodegeeks.com/2013/01/implementing-producerconsumer-using-synchronousqueue.html
總結
以上是生活随笔為你收集整理的使用SynchronousQueue实现生产者/消费者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 投影仪该怎么连接电脑摄像机如何连接电脑
- 下一篇: wr886n路由器怎么设置wr886n路