java 超时集合_确定性监视器脉冲/等待并在 生产环境 者 - 消费者集合中实现超时...
我正在嘗試實(shí)現(xiàn)支持消費(fèi)者超時(shí)的并發(fā) 生產(chǎn)環(huán)境 者 - 消費(fèi)者集合(多個(gè) 生產(chǎn)環(huán)境 者和消費(fèi)者) .
現(xiàn)在實(shí)際的集合非常復(fù)雜(不幸的是,在System.Collections.Concurrent中沒(méi)有任何工作),但我在這里有一個(gè)最小的示例來(lái)演示我的問(wèn)題(看起來(lái)有點(diǎn)像 BlockingCollection ) .
public sealed class ProducerConsumerQueueDraft
{
private readonly Queue queue = new Queue();
private readonly object locker = new object();
public void Enqueue(T item)
{
lock (locker)
{
queue.Enqueue(item);
/* This "optimization" is broken, as Nicholas Butler points out.
if(queue.Count == 1) // Optimization
*/
Monitor.Pulse(locker); // Notify any waiting consumer threads.
}
}
public T Dequeue(T item)
{
lock (locker)
{
// Surprisingly, this needs to be a *while* and not an *if*
// which is the core of my problem.
while (queue.Count == 0)
Monitor.Wait(locker);
return queue.Dequeue();
}
}
// This isn't thread-safe, but is how I want TryDequeue to look.
public bool TryDequeueDesired(out T item, TimeSpan timeout)
{
lock (locker)
{
if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
{
item = default(T);
return false;
}
// This is wrong! The queue may be empty even though we were pulsed!
item = queue.Dequeue();
return true;
}
}
// Has nasty timing-gymnastics I want to avoid.
public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
{
lock (locker)
{
var watch = Stopwatch.StartNew();
while (queue.Count == 0)
{
var remaining = timeout - watch.Elapsed;
if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
{
item = default(T);
return false;
}
}
item = queue.Dequeue();
return true;
}
}
}
這個(gè)想法很簡(jiǎn)單:找到空隊(duì)列的消費(fèi)者等待發(fā)信號(hào)通知,并且生成者(注意:不是 PulseAll ,這將是低效的)他們通知他們等待的項(xiàng)目 .
當(dāng)調(diào)用Pulse的線(xiàn)程釋放鎖定時(shí),就緒隊(duì)列中的下一個(gè)線(xiàn)程(不一定是脈沖線(xiàn)程)獲取鎖定 .
這意味著消費(fèi)者線(xiàn)程C1可以被 生產(chǎn)環(huán)境 者線(xiàn)程喚醒以消耗一個(gè)項(xiàng)目,但是另一個(gè)消費(fèi)者線(xiàn)程C2可以在C1有機(jī)會(huì)重新獲取它之前獲取鎖定并消耗該項(xiàng)目,留下C1與給予控制時(shí)的空隊(duì)列 .
這意味著如果隊(duì)列確實(shí)非空,我必須在每個(gè)脈沖上防御性地檢查消費(fèi)者代碼,如果不是這種情況,則返回并空手等待 .
我的主要問(wèn)題是它效率低下 - 線(xiàn)程可能會(huì)被喚醒以進(jìn)行工作,然后立即返回等待 . 這樣做的一個(gè)相關(guān)結(jié)果是,當(dāng)它應(yīng)該是優(yōu)雅的時(shí)候?qū)崿F(xiàn)具有超時(shí)的 TryDequeue 是不必要的困難和低效的(參見(jiàn) TryDequeueThatWorks )(參見(jiàn) TryDequeueDesired ) .
How can I twist Monitor.Pulse to do what I want? Alternatively, is there another synchronization primitive that does? Is there a more efficient and/or elegant way to implement a TryDequeue timeout than what I have done?
僅供參考,這是一個(gè)測(cè)試,用于演示我所需解決方案的問(wèn)題:
var queue = new ProducerConsumerQueueDraft();
for (int consumer = 0; consumer < 3; consumer++)
new Thread(() =>
{
while (true)
{
int item;
// This call should occasionally throw an exception.
// Switching to queue.TryDequeueThatWorks should make
// the problem go away.
if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
{
// Do nothing.
}
}
}).Start();
Thread.Sleep(1000); // Let consumers get up and running
for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
queue.Enqueue(0);
}
總結(jié)
以上是生活随笔為你收集整理的java 超时集合_确定性监视器脉冲/等待并在 生产环境 者 - 消费者集合中实现超时...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java算术运算符_JAVA-基础-算术
- 下一篇: kruskal算法java_克鲁斯卡尔算