02 February 2007

Erlang: parallel-map and parallel-foreach

In a post to erlang-questions last summer, Erlang on the niagara, Erlang super-hero Joe Armstrong posts a simple "parallel-map" implementation that he uses in place of some of his simple "iterative-foreach" calls to quickly achieve dramatic speed improvements by taking advantage of multiple processors. The post got me thinking, particularly in light of recent discussions on side-effects and purity, about the real differences and guarantees of such simple things as map and foreach.

Principally, map is (1) supposed to be called with a function with no side effects, (2) guarantees order of return to match the input list, and (3) makes no guarantee on order of function calls, while foreach is (1) used explicitly (and solely) for its side effects as (2) it has no return value (other than the atom ok for success) and (3) guarantees the order of function calls to match the order of the input list. Now, Joe's code was for a parallel-map implementation -- returning a list of results from the application of the given function, no guaranteed order of execution -- when it was actually being used to replace a foreach statement. It happened to work out nicely that his code did not rely on any order of execution promises.

But what if it did? And, anyway, it also made me wonder what cost was entailed with gathering the result list of a (potentially) large number of function calls, when in the end the result needed was only the lack of an error, and the eventual return of control to signify that the operation was complete?

But let's go back. First, let's look at how the standard Erlang lists module implements these two functions:

1> lists:map(fun(I) -> io:format("~w~n",[I]), I+1 end, lists:seq(1,10)).
1
2
3
4
5
6
7
8
9
10
[2,3,4,5,6,7,8,9,10,11]
2> lists:foreach(fun(I) -> io:format("~w~n",[I]) end, lists:seq(1,10)).
1
2
3
4
5
6
7
8
9
10
ok
Exactly as we expected. It just so happened that map made the function calls in the same order as the input list, but this wasn't guaranteed. What do I mean? A glance at the relevant documentation has the following to say about foreach:
This function is used for its side effects and the evaluation order is defined to be the same as the order of the elements in the list.
Contrast this with what it has to say about map:
This function is used to obtain the return values. The evaluation order is implementation dependent.
Fair enough. I put together a little first pass at a plists module (for, obviously, "parallel lists"):
-module(plists).
-export([pmap/2,pforeach/2,npforeach/2]).

pmap(F, L) ->
    S = self(),
    Pids = lists:map(fun(I) -> spawn(fun() -> pmap_f(S, F, I) end) end, L),
    pmap_gather(Pids).

pmap_gather([H|T]) ->
    receive
        {H, Ret} -> [Ret|pmap_gather(T)]
    end;
pmap_gather([]) ->
    [].

pmap_f(Parent, F, I) ->
    Parent ! {self(), (catch F(I))}.

pforeach(F, L) ->
  S = self(),
  Pids = pmap(fun(I) -> spawn(fun() -> pforeach_f(S,F,I) end) end, L),
  pforeach_wait(Pids).

pforeach_wait([H|T]) ->
  receive
    H -> pforeach_wait(T)
  end;
pforeach_wait([]) -> ok.

pforeach_f(Parent, F, I) ->
  _ = (catch F(I)),
  Parent ! self().

npforeach(F, L) ->
  S = self(),
  Pid = spawn(fun() -> npforeach_0(S,F,L) end),
  receive Pid -> ok end.

npforeach_0(Parent,F,L) ->
  S = self(),
  Pids = pmap(fun(I) -> spawn(fun() -> npforeach_f(S,F,I) end) end, L),
  npforeach_wait(S,length(Pids)),
  Parent ! S.

npforeach_wait(_S,0) -> ok;
npforeach_wait(S,N) ->
  receive
    S -> npforeach_wait(S,N-1)
  end.

npforeach_f(Parent, F, I) ->
  _ = (catch F(I)),
  Parent ! Parent.
Some things to notice here are that (1) my pmap is effectively the same as Joe's, but that (2) I've provided a pforeach here that works slightly differently in that it disregards return values instead of gathering them into a list. (Note, however, that like pmap it also disregards order of execution promise and so could not be a drop-in replacement for the iterative foreach.) For now, ignore the npforeach as I'll talk about it later.

Fine, anyway, let's play:
$ erl -smp +S 16 +A 2
Here I start Erlang, enabling the use of multiple processors, setting the internal scheduler to use 16 scheduler threads, and using 2 asynchronous threads.
Erlang (BEAM) emulator version 5.5.2 [source] [smp:16] [async-threads:2] [hipe] [kernel-poll:false]

Eshell V5.5.2  (abort with ^G)
1> c(plists).
{ok,plists}
2> plists:pforeach(fun(I) -> io:format("~w~n", [I]) end, lists:seq(1,10)).
2
3
4
5
6
7
1
8
9
10
ok
3> plists:pforeach(fun(I) -> io:format("~w~n", [I]) end, lists:seq(1,10)).
1
2
3
4
5
7
9
6
10
8
ok
OK! Here we go, we see that, as expected, the order of execution is certainly interleaved -- and not predictably. Let's move on to pmap:
4> plists:pmap(fun(I) -> io:format("~w~n",[I]), I + 1 end, lists:seq(1,20)).
1
3
4
5
6
7
8
10
9
11
12
13
14
15
16
17
18
19
20
2
[2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21]
Here we see that pmap also doesn't have any guaranteed order of execution (which it shouldn't) and that the return values are in proper order (which it should).

Now. Places to improve? Well, for one we could get all complicated-like and have options for a certain number of worker processes instead of "blindly" spawning a process for each element of a (potentially large!) input list. Another idea is spawning a "spawner" process to actually spawn the worker processes, so that our gathering can begin immediately without waiting for all the worker spawning to end. Yet another is, since we do not need any ordered return value from pforeach find a marginally better way of acknowledging process completion as they occur instead of always waiting for each process in order. (Now you can go back and look at npforeach if you like.) But mostly at that point you're just shuffling deck chairs. Actually just the simple difference between pmap and pforeach is mostly shuffling deck chairs. Certainly it "seems" like pforeach should be faster than pmap -- but lacking a Niagara of my own, mostly I end up testing the limits of my limited CPU and memory slice of my shared VPS server.

But, if anyone does happen to have an interesting amount of hardware lying around:
-module(plists_test).
-export([start/0]).

start() ->
  timer:start(),
  F = fun(I) -> math:pow(I,I) end,
  {T0,L} = timer:tc(lists,seq,[1,1000]),
  io:format("seq took ~w microseconds~n",[T0]),
  {T1,_V2} = timer:tc(plists,pmap,[F,L]),
  io:format("pmap took ~w microseconds~n",[T1]),
  {T2,_V1} = timer:tc(plists,pforeach,[F,L]),
  io:format("pforeach took ~w microseconds~n",[T2]),
  {T3,_V1} = timer:tc(plists,npforeach,[F,L]),
  io:format("npforeach took ~w microseconds~n",[T3]),
  ok.
And give it a twirl:
$ erl -smp +S 16 +A 2 -s plists_test -run init stop -noshell
seq took 109 microseconds
pmap took 90465 microseconds
pforeach took 104702 microseconds
npforeach took 112472 microseconds
As the input list gets larger, that "queue of processes instead of trying to spawn a process for each element of the list blindly" idea starts to look a whole lot better -- what good is spawning 10000 CPU-bound processes when you only a fraction of that in available CPU cores?

2 comments:

Unknown said...

Maybe I'm a sucker for obfuscation but I like my parallel map better than Joe's:

%% Map function F over list L in parallel.
parmap(F, L) ->
Parent = self(),
[receive {Pid, Result} -> Result end
|| Pid <- [spawn(fun() -> Parent ! {self(), F(X)} end) || X <- L]].

would that the text box understood PRE tags..

Matt Williamson said...

Luke, that is more beautiful code; is there any performance difference?