-
Notifications
You must be signed in to change notification settings - Fork 1
ConcurrentLinkedBlockingQueue
laforge49 edited this page Nov 9, 2011
·
2 revisions
The ConcurrentLinkedBlockingQueue class supports only a single reader, but has the advantage of not using locks unless the queue is empty. It is built using ConcurrentLinkedQueue, an AtomicBoolean and a Semaphore.
/**
* A ConcurrentLinkedQueue with a take method that doesn't block
* when the queue isn't empty.
* Note that this code only supports a singe reader thread.
*/
class ConcurrentLinkedBlockingQueue[E]
extends ConcurrentLinkedQueue[E] {
private val waiting = new atomic.AtomicBoolean //when true, take is requesting a permit
private val wakeup = new Semaphore(0) //to wake up a pending take
/**
* Inserts the element at the tail of the queue.
*/
def put(e: E) {
offer(e)
}
/**
* Inserts the element at the tail of the queue.
* As the queue is unbounded, this method will never return {@code false}.
*/
override def offer(e: E) = {
super.offer(e)
if (waiting.compareAndSet(true, false))
wakeup.release //if there is a pending take, wake it up
true
}
/**
* Returns the element at head of the queue when an element is available.
* This method is similar to poll, except that it does not return null.
*/
@tailrec final def take(): E = {
var rv = poll
if (rv != null) return rv
//the queue may now be empty, so request a permit
waiting.set(true)
rv = poll
if (rv != null) {
//the queue was not empty
if (!waiting.compareAndSet(true, false))
wakeup.drainPermits //clear the permit that we didn't need
return rv
}
wakeup.acquire //wait for a permit
take
}
}