Skip to content

MessengerThreadManager

laforge49 edited this page Nov 9, 2011 · 2 revisions

The MessengerThreadManager class is a simple thread pool built from a Semaphore and a ConcurrentLinkedQueue.

/**
 * The MessengerThreadManager starts a number of threads (12 by default)
 * for processing Runnable tasks.
 * By default, the MessengerThreadFactory is used to create threads,
 * though it is easily replaced by any class which implements java.util.ThreadFactory.
 */
class MessengerThreadManager(threadCount: Int = 12,
                             threadFactory: ThreadFactory = new MessengerThreadFactory)
  extends ThreadManager with Runnable {

  val taskRequest = new Semaphore(0)
  val tasks = new ConcurrentLinkedQueue[Runnable]
  var closing = false

  init

  /**
   * The init method is called in the constructor and is used to start threadCount threads.
   */
  private def init {
    var c = 0
    while (c < threadCount) {
      c += 1
      threadFactory.newThread(this).start
    }
  }

  /**
   * The run method is used by all the threads.
   * This method wakes up a thread when there is a task to be processed
   * and stops idle threads after the close method has been called.
   */
  @tailrec final override def run {
    taskRequest.acquire
    if (closing) return
    val task = tasks.poll
    try {
      task.run
    } catch {
      case t: Throwable => t.printStackTrace()
    }
    run
  }

  /**
   * The process method is used to request the processing of a Runnable task.
   * This method adds the task to a concurrent queue of tasks to be processed
   * and then wakes up a task.
   */
  override def process(task: Runnable) {
    tasks.add(task)
    taskRequest.release
      }

  /**
   * The close method is used to stop all the threads as they become idle.
   * This method sets a flag to indicate that the threads should stop
   * and then wakes up all the threads.
   */
  def close {
    closing = true
    var c = 0
    while (c < threadCount) {
      c += 1
      taskRequest.release
    }
  }
}

MessengerThreadManager

Asynchronous Messaging

Clone this wiki locally