Think "Parallel Skiis"
Skiis are streaming / iterator-like collections that support both "stream fusion" and parallel operations.
Skiis address different needs (and some deficiencies depending on your point of view) than the standard Scala {serial, parallel} collections,
-
All operations from
Skiis[T]
=>Skiis[U]
are lazy -- processing happens only on as-needed basis. This applies to both serial (e.g.map
) and parallel operations (e.g.,parMap
) -
Serial/parallel processing are explicit by name (e.g.,
map
vsparMap
) -- no guessing how each operation processes the data. -
Streaming-oriented framework is memory-safe -- won't cause bad surprises since none of the operations will suddenly materialize a huge data collection in memory or lead to memory retention as is sometimes the case when handling Streams (i.e., keeping reference to the
head
). Skiis have fewer operations than Scala collections but those offered are amenable to stream- and/or parallel-processing. You can easily and explicitly convertSkiis[T]
to anIterator[T]
if you want to use Scala collections methods -- caveat emptor! -
You can mix serial and parallel processing operations together to tailor the level of parallelism at each stage of your processing pipeline for SEDA-style pull-based streaming architecture.
-
Pluggable execution context -- you can easily plug your own
Executor
(thread pool) and/or use a differentExecutor
for different parallel operations. (Note this deficiency only applies to Scala 2.9.x since 2.10.0 introduced pluggable contexts). -
Beyond pluging-in your own
Executor
, you can also control the level of- parallelism -- number of workers simultaneously submitted to Executor,
- queuing -- number of work-in-progress elements between parallel operations
to balance memory usage against parallelism and 3) batch size -- to balance
efficiency/contention against sharing your executor(s)/thread-pool(s)
with other tasks (whether they use
Skiis
or not).
-
Skiis[T]
exposes aControl
trait that supports cancellation.
See CHANGELOG
for evolution details.
Skiis' performance is generally comparable to Scala Parallel Collections -- sometimes better, sometimes worse. It depends on your workload (types of operations), the thread pool you use, the allowable queue depth, worker batch size, and so on.
I have not yet tested Skiis on machines with > 16 CPU cores.
Launch your Scala REPL,
# launch Scala REPL
buildr shell
and you can then interactively try the Skiis[T] collections,
Welcome to Scala version 2.9.1.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_04).
Type in expressions to have them evaluated.
Type :help for more information.
scala> import skiis.Skiis
// Stream fusion
scala> Skiis(1 to 10) map (_ * 2) filter (_ % 2 == 0)
res1: skiis.Skiis[Int] = skiis.Skiis$$anon$11@456e5841
scala> res1.toIterator.toList
res4: List[Int] = List(4, 6, 8, 10, 12, 14, 16, 18, 20)
// Parallel operations
// (note unordered nature of the output)
scala> implicit val context = Skiis.DefaultContext
context: skiis.Skiis.DefaultContext.type = skiis.Skiis$DefaultContext$@49b7bb1f
scala> Skiis(1 to 10) parMap (_ * 2) parFilter (_ % 2 == 0)
res1: skiis.Skiis[Int] = skiis.Skiis$$anon$3@6a39f22c
scala> res1.toIterator.toList
res2: List[Int] = List(6, 12, 16, 18, 20, 4, 14, 2, 8, 10)
// You can mix & match non-parallel and parallel operations
// Here the last parallel operation (reduce) "pulls" elements from
// previous operations in parallel.
scala> Skiis(1 to 100000) map (_ * 2) filter (_ % 2 == 0) parReduce (_ + _)
res1: Int = 1410165408
(Elapsed 62ms)
// compared to Scala parallel collection (we're in the same ballpark)
scala> (1 to 100000).par map (_ * 2) filter (_ % 2 == 0) reduce (_ + _)
res1: Int = 1410165408
(Elapsed 54ms)
// compared to Scala "serial" collections
// (all parallel collections have some overhead)
scala> (1 to 100000) map (_ * 2) filter (_ % 2 == 0) reduce (_ + _)
res1: Int = 1410165408
(Elapsed 27ms)
// configure a different context
scala> implicit val context = new Skiis.Context {
val executor = Executors.newFixedThreadPool(5)
val parallelism = 10
val queue = 10000
val batch = 100
}
// timing function (handy for interactive testing)
scala> def time[T](f: => T) = {
val start = System.currentTimeMillis
val result = f
val stop = System.currentTimeMillis
println("Elapsed: " + (stop-start) + "ms")
result
}
- Skiis do not implement equals() or hashCode(). If you want to compare Skiis, first convert them to an Iterator. This may be supported in the future.
You need Apache Buildr 1.4.x or higher.
# compile, test and package .jars
buildr package
- Scala 2.8.0+
- JVM 1.5+
Skiis is is licensed under the terms of the Apache Software License v2.0. http://www.apache.org/licenses/LICENSE-2.0.html