There have been some nice additions to the Scala collections with the *parallel* versions of standard data structures. These work fine for shared memory architectures, however, means for distributed memory parallelization seem to be very scarce and fragmented.

During a course at my university, I worked on the development of a general framework for distributed parallelization using Scala and MPJ-Express. The idea was to use functional concepts on sequences to mimic/generalize features already found in MPI. Since *MPJ-Express* is more or less a wrapper around MPI, there were plenty of interesting ideas to try out.

Here is an example of a parallel matrix-multiplication algorithm using the framework:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
def testMatrix(args: Array[String]) { /* * Initialize matrices */ val M = 2 val N = 2 val BLOCK_SIZE = 2550 val A = Array.fill(M, M)(MJBLProxy(1337, BLOCK_SIZE)) val B = Array.fill(M, M)(MJBLProxy(1337, BLOCK_SIZE)) val Bt = B.transpose /* * DNS using underlying MPI */ parallelize(args) { for (i <- 0 until M; j <- 0 until N) A(i) zip Bt(j) mapMPJ { case (a, b) => a * b } reduceMPJ (_ + _) } } |

The algorithm simply creates some matrices, A and B, filling them with random blocks. No data is being initialized until the actual multiplication, due to the use of proxy-matrices. This ensures that only the needed data is expanded on the respective nodes. In the multiplication, jblas was used to ensure maximum peak-rate.

Clearly, the biggest achievements in the framework are the ability to send objects transparently through messages and the ability to use anonymous functions for network operations like *reduceMPJ*.

1 2 3 4 5 6 |
parallelize(args) { for (i <- 0 until M; j <- 0 until N) A(i) zip Bt(j) mapMPJ { case (a, b) => a * b } reduceMPJ (_ + _) } |

The parallelize call simply initializes MPI through MPJ, and allows for distributed operations like mapMPJ and reduceMPJ to be used. Behind the scenes, there is an implicit conversion from *Array[T]* to *SeqMPJ[T]*, which is a wrapper class from this framework.

Another *huge* advantage from the framework is the fact that everything is type inferred. This means that, since the computation works as a kind of *single program multiple data* scheme, the receiving side of any send or collective communication operation knows the type to receive. This means that the programmer is free to think more about solving the problem than dealing with concurrency.

Another interesting example is finding the maximum number in in a segmented list:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
def testMax(args: Array[String]) { val seq = Array( Array(1, 2, 2, 2, 3, 3, 1, 12, 3, 100, 3, 2), Array(1, 2, 3, 2, 1, 2, 3), Array(1, 2, 3, 4, 2, 2, 1, 2, 3, 3, 450, 540, 3), Array(1, 2, 3, 1337, 1)) def max(a: Int, b: Int) = if (a >= b) a else b parallelize(args) { seq mapMPJ (_.max) reduceMPJ max } } |

Notice that the *mapMPJ* operation works in parallel for each element in the sequence. Notice that the max-function used in *reduceMPJ* is user defined, and might as well have been an anonymous function.

There are lots of interesting goals to look at in the future, especially working with non-uniform workloads.