Hi, I am Saša Jurić, a software developer with 10+ years of professional experience in programming of web and desktop applications using Elixir, Erlang, Ruby, JavaScript, C# and C++. I'm also the author of the upcoming Elixir in Action book. In this blog you can read about Erlang and other programming related topics. You can subscribe to the feed, follow me on Twitter or fork me on GitHub.

Parallelizing independent tasks

| 2 comments
In the previous three articles, I have introduced actors - "smart" Erlang processes which encapsulate state, interact with each other, and can be used to execute interdependent computations in parallel, thus utilizing available CPU resources. Today, I will show a couple of simple techniques to break down monolithic request into a number of smaller independent tasks and execute them in parallel.

Simple parallelization

As usual, I'll use a simplistic simulation of a long running operation:

def square(number) do
  :timer.sleep(100)
  number * number
end

If we use this function to compute squares of ten numbers and print them to the screen, it will take about a second to do it. However, since computations are independent (calculating square of one number does not depend on calculating square of another one), we can execute them in parallel. This is very easy, especially if we don't need the return values, as is the case in this example:

1
2
3
4
5
6
7
def parallel_squares(numbers) do
  Enum.each(numbers, fn(x) ->
    spawn(fn() ->
      IO.puts "#{x} * #{x} =  #{square(x)}"
    end)
  end)
end

This function iterates over numbers, and for each one spawns a separate process which calculates the square and prints it to the screen. The spawn function takes a lambda as an argument and executes it in parallel. Notice that lambda takes bindings with it, so each process will reference its own value of x from the corresponding iteration step.

We can now call this function passing it anything enumerable, such as Elixir range:

parallel_squares(1..10)

Which will print and compute the squares of ten numbers. Owing to parallelization and the nature of the Erlang scheduler, it will take about 100 ms to print all results, even when running on a single core machine.

Limiting the number of processes

The approach above works fine, but it uses the unlimited number of parallel processes which perform square computation. If the input list has million entries, a million processes will be spawned. Erlang processes are very cheap, comparing to traditional heavyweight threads, so we usually don't have to concern ourselves with memory or CPU cost of creating them. However, the code we run inside a process might be another story. It could access a resource of more limited parallelization capabilities, such as database, network, disk or anything which might have problems when accessed from an unlimited number of parallel requests. In such case we might want to use a fixed number of parallel processes, essentially a pool, to limit the load on the critical resource.

I'll call the processes which use limited resource, the worker processes. The basic idea is to use a pool manager, an actor which will manage access to the pool of worker processes. A client can request a worker process from the pool manager, and upon receiving it, execute some code inside it. Finally when the work is done, the client returns the worker process to the pool manager, which passes it (or discards it and creates a new one in place of it) to another client.

It is fairly straightforward to implement this, but in this article I'll use an existing Erlang library, called poolboy, which behaves as described above.

Using a 3rd party Erlang lib inside an Elixir project is very easy with the help of the Elixir's build tool called mix, which can handle both Elixir and Erlang dependencies. Once a dependency is referenced in our mix based project, we have to call mix deps.get from the command line, which fetches a library from the github and compiles it. Then we can freely call Erlang code using the syntax :module.function(args).

We'll start off by defining the worker actor which computes squares. This is the code we want to run inside worker processes:

defmodule SquareActor do
  use ExActor
  
   defcall square(x) do
    :timer.sleep(100)
    IO.puts "#{x} * #{x} = #{x * x}"
  end
end

Notice that square is defined as a synchronous call, which is important due to the way the poolboy works (I'll explain this in a moment).

This is how we create the pool:

def start_pool do
  {:ok, pool} = :poolboy.start(
    worker_module: SquareActor, size: 0, max_overflow: 5
  )
  
  pool
end

The arguments tell the poolboy to create worker processes using the SquareActor module. The pool will contain size number of fixed (recyclable) processes, and max_overflow number of dynamic (per request) processes. Since square operation is stateless, only dynamic processes are used. So in other words, the pool will simultaneously issue at most five processes. All other clients will have to wait for a worker to become available.

To calculate a square, we need to request a worker from the pool, and perform the computation in it:

def pooled_square(pool, x) do
  :poolboy.transaction(pool, fn(pid) ->
    SquareActor.square(pid, x)
  end)
end

The transaction function requests a worker, then calls lambda, with the worker pid as an argument. After the lambda finishes, the worker is returned to the pool.

Inside the transaction we simply call the square operation of the SquareActor. Since transaction returns the worker to the pool after the call is done, we needed to implement the square as a synchronous call. If it was an async cast, the client would send a message to the worker process, and finish its task immediately before the message was processed, returning the busy worker to the pool. The poolboy would then terminate the worker process (since it is dynamic) before it finishes its work.

Consequently, the pooled_square function is synchronous and blocking. To make it work in parallel, we need to call it from spawned processes:

def parallel_squares(pool, numbers) do
  Enum.each(numbers, fn(x) ->
    spawn(fn() -> pooled_square(pool, x) end)
  end)
end

This is similar to our first implementation of parallel squares. The function takes additional pool argument, using it inside the spawned processes to acquire workers. We can now use this function in the following way:

parallel_squares(start_pool, 1..10)

When we execute this code, the function parallel_squares will start ten processes just like the first version. However, each of those processes will use the pool to acquire the worker process which does the actual job.

In total, we will now have sixteen Erlang processes: the pool manager, five workers, and ten clients. However only five of those will simultaneously run square calculations (or in real life do something which requires limitations such as network calls, database queries, etc.). The remaining clients will be idle until a worker becomes available.

Collecting responses

The examples so far simply spawn processes and forget about them. Occasionally, we may need to get the responses from the spawned processes back in the caller process. To do this, we must send an Erlang message from each worker to the caller, and make the caller wait for the messages.

To accomplish this, I'm going to create a basic (not production ready) helper which executes functions in parallel, and collects their return values. The module will be called ParallelExec and it will expose one function: run which will receive a list of functions, call them in parallel, making sure that spawned processes send the results back to the caller process. Furthermore, the function will wait for the corresponding Erlang messages in the caller process, and aggregate all responses into a single list.

The top level run function is pretty much self explanatory:

def run(funs) do
  start_jobs(funs)
  collect_responses(length(funs))
end

The start_jobs function is responsible for creating processes which execute functions and send their results back to the caller:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
defp start_jobs(funs) do
  caller = self
  Enum.each(funs, fn(fun) ->
    spawn(fn() ->
      execute_job(caller, fun)
    end)
  end)
end

defp execute_job(caller, fun) do
  response = fun.()
  caller <- {:response, response}
end

For each input function, a new process is spawned (line 4). The process will execute the function (line 11) and send the response back to the caller (line 12), which is the process that spawned the worker (line 2).

To collect responses, we need to receive n messages containing {:response, response} tuple, where n is the length of the input functions list:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
defp collect_responses(expected) do
  Enum.map(1..expected, fn(_) ->
    get_response
  end)
end

defp get_response do
  receive do
    {:response, response} -> response
  end
end

This code essentially makes n calls to receive statement, which waits for an Erlang message from the worker process, and takes the response from it (line 9). The results are combined in a list via Enum.map function.

With ParallelExec helper in place, we can compute squares of numbers in parallel and get the results in the caller process:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def parallel_squares(numbers) do
  ParallelExec.run(make_jobs(numbers))
end

defp make_jobs(numbers) do
  Enum.map(numbers, fn(x) ->
    fn() -> square(x) end
  end)
end

defp square(x) do
  :timer.sleep(100)
  {x, x * x}
end

The function parallel_squares uses make_jobs to create the list of functions and passes it on to the ParallelExec.run which will invoke them in parallel, and return the result.

The function make_jobs takes a list of numbers and for each one creates a lambda (line 7) which computes its square and returns the result in the form of {x, x*x} (line 13).

To prove the responses are received, we can print them to the screen from the caller process:

IO.inspect parallel_squares(1..10)

I use this approach in a production system to build a json representation of a complex hierarchy. The top level of the hierarchy is the list of about 30 deep nested elements. For each of those top level elements, I spawn a process which calculates the element's json and then join the responses into the final result. This is essentially a very basic version of map/reduce technique where the input is divided (mapped) into multiple parts, each part is executed in parallel, and the responses are combined (reduced) into a single response.

Additionally, I also use this technique combined with pooling to issue database queries. The pooling allows me not to overload the database by creating too many concurrent queries.

Exception handling

The code above doesn't deal with errors. If an unhandled exception occurs in the spawned process, the caller will not know anything about it. In addition, if the caller awaits responses, it will hang forever, waiting for a message which will never be sent (since the worker crashed). There are multiple ways to tackle this problem. 

One approach is to put try/catch in workers and report the error back to the caller. 

Alternatively we can link processes by starting them with spawn_link instead of spawn. When two processes are linked, if one terminates abnormally, the other one will be taken down with it, so if an exception occurs in the worker, the caller and all other workers will crash as well.

As a variation, when processes are linked, the caller can be configured to receive a message when another linked process crashes, giving us a possibility to do some custom error handling. 

Finally, when waiting for the return message from the worker, a timeout can be set. If the message doesn't arrive in a given time, the timeout code is executed, and we can again invoke some custom error handling logic.

Final thoughts

It is fairly simple to break down a complex task into independent parts, run them in parallel and optionally collect the responses. By doing this we can better utilize given resources and obtain better scalability. The technique is applicable both for I/O and CPU bound operations.

Be careful not to overuse this approach. The process communication as well as map/reducing introduce some overhead, and the code gets to be somewhat more complex. Parallelization is not a remedy for a poor sequential algorithm, so first try to speed up your sequential code, and only when that fails, consider parallel approach.

The full code of the examples can be found here.

2 comments:

  1. Is this any different with Elixir v1 ?

    ReplyDelete
    Replies
    1. The principles are definitely the same, given that everything is based on Erlang concurrency model. Skimming through the code in this article, it seems it should work, but I can't make any guarantees. The supporting GH project relies on a very old 0.10.0 version which is pretty obsolete.

      Delete