Hi, I am Saša Jurić, a software developer with 10+ years of professional experience in programming of web and desktop applications using Elixir, Erlang, Ruby, JavaScript, C# and C++. I'm also the author of the upcoming Elixir in Action book. In this blog you can read about Erlang and other programming related topics. You can subscribe to the feed, follow me on Twitter or fork me on GitHub.

Pipelining in Elixir/Erlang

| 3 comments
In my previous post, I gave an introduction to the actor model in Elixir/Erlang. Drawing from the theory presented there, this post and the following ones will describe a couple of simple concurrency patterns which I have used in a real life project. The techniques are fairly simple, both to reason about and to implement, and yet they make it possible to parallelize your system, utilize available hardware resources and achieve bigger throughput (process more data per time unit). I will begin this miniseries by presenting a very simple, but effective technique: the pipeline.

The problem

One of my first Erlang projects was to implement a job which runs forever and repeatedly performs following steps:
  1. Receives xmls via tcp from the external data provider.
  2. Parses xmls and modifies internal data model accordingly.
  3. Stores necessary data to the SQL database, loads some other data from the database and puts them in the data hierarchy.
  4. Sends the data via network to another service which serves the data to the users.
Each step depends on the result of the previous one, so consequently, the first implementation I made was a naive approach where everything was executed sequentially, passing the output of one operation to the next one.

Needless to say, this didn't perform well on the real life data. The messages were coming faster then the job was able to process them. The obvious approach to address this issue was to measure and identify bottlenecks, and then optimize. This helped to some extent, but it still wasn't enough. Some of the steps involved, such as database processing or network communication were getting hard to optimize and the gain would be small. Each operation was executing in a reasonable amount of time. However, the sum of durations of all operations was too large.

At this time I stepped back, and looked at a problem from another angle. Namely, I asked myself what could be performed in parallel. For example, while waiting for DB operations to finish, I could receive new XML message and then parse, and process it. In the same fashion, after DB operations are done, I could asynchronously send the data via network, and at the same time execute DB operations of the next message (if one has arrived).

So instead of sequentially doing:
receive(n)
process(n)
db_op(n)
send(n)

Where n stands for the nth message, I could do following in parallel:
receive(n)     process(n-1)    db_op(n-2)    send(n-3)

This is called the pipeline pattern: the output of one process is fed as the input to the next one. Since processes run concurrently, we don't have to wait for all operations to finish in order to start processing the next message. This will not speed up the total processing of one message, but it will increase the overall throughput, as I will show in the rest of the article.

The problem (simplified version)

Here is the simplified Elixir simulation of the three "long" operations which must be executed sequentially:

defmodule Workers do
  def f1(i) do 
    :timer.sleep(100)
    i + 1
  end
  
  def f2(i) do 
    :timer.sleep(100)
    i * 2
  end
  
  def f3(i) do
    :timer.sleep(100)
    i - 3
  end
end

The module is simply a collection of functions, and here, three functions are defined, each one simulating a long operation by sleeping for a while. To compute the final result, we need to call those functions sequentially:

a = f1(input) 
b = f2(a)
result = f3(b)

Obviously, in this approach, it takes 300 ms to compute the result, so this code can handle the incoming rate up to 3.3 messages per second (1 / 300 ms). If messages arrive more often, the output delay will constantly increase and the incoming messages will accumulate in the input buffer. For example, when the input rate is 10 messages/sec (one message every 100 ms), we will have following delays:

#  delay(ms)
1    300
2    500
3    700
4    900
5   1100
     ...

The hardcoded solution

To tackle this, we can use a separate process to perform each operation. This is how the first operation would be implemented inside the actor:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
defmodule Pipeline1 do
  use ExActor
  import Workers
  
  def init(_) do
    initial_state(Pipeline2.actor_start)
  end
  
  defcast consume(data), state: actor2 do
    actor2.consume(f1(data))
  end
end

This code defines three actors, each performing one operation

The line 2 makes the module "actor aware", which makes it easier to implement an actor, as explained in my previous post.

In the line 3, the Workers module is imported, which allows us to use functions f1, f2 and f3. Without this directive, we would have to write Workers.f1 and so on.

The init function, which will be invoked when the actor is started, creates the second actor, and stores the "reference" to it in its own state.

Finally, the actor exposes one operation: consume, which, when invoked, will perform the operation f1 and send the result to the second actor.

The other two actors are more or less a copy paste version of the first one, the actor 3 being a bit simpler, since it does not pass the result further, but prints it to the console instead:

defmodule Pipeline2 do
  use ExActor
  import Workers

  def init(_) do
    initial_state(Pipeline3.actor_start)
  end
  
  defcast consume(data), state: actor3 do
    actor3.consume(f2(data))
  end
end

defmodule Pipeline3 do
  use ExActor
  import Workers

  defcast consume(data) do
    IO.puts(f3(data))
  end
end

Now, we can use this pipeline in the following way:

pipeline = Pipeline1.actor_start
Enum.each(1..10, fn(i) -> pipeline.consume(i) end)

We simply start the first actor (which will in turn start the second, which will in turn start the third), and send it messages which then pass through the pipeline.

As I mentioned, the individual processing of each message is the same, if not slower, since we do some additional work (namely, pass the data between actors). However, the throughput of this pipeline is 10 messages per second, three times bigger than the pure sequential solution. More precisely, the throughput of the pipeline is the throughput of its slowest part, while the throughput of the sequential approach equals to the sum of throughputs of all operations.

In addition, due to the nature of the Erlang virtual machine (called BEAM), we don't need multiple cores to obtain higher throughput. The  BEAM can use separate threads for I/O operations, and it can also utilize the kqueue/epoll to perform asynchronous I/O (though not by default, both options must be explicitly configured). This essentially means that, while waiting for an I/O operation to finish, the BEAM can execute other pending Erlang instructions.
Of course, if the processes are CPU bound (i.e. they execute pure Erlang code without doing I/O), then you will need multiple cores to make them run in parallel. The code though doesn't need to be changed: it is ready to utilize more CPU resources if given.

The generic solution

The code above involves a lot of copy/pasting and hardcoding. Both Erlang, and especially Elixir offer various constructs which allow us to write reusable code, and for the purpose of this article, I have hacked a quick sketch of such solution. The code is fairly simple, but requires some knowledge about Elixir/Erlang concepts, so I will not describe it here. However, it is interesting to show how easy it is to create a pipeline:

pipeline = Pipeline.create([
  function(:f1, 1),
  function(:f2, 1),
  function(:f3, 1),
  fn(x) -> IO.puts x end
])

This construct creates a pipeline of four processes and connects them as presented earlier. The pipeline can be used in exactly the same way as the hardcoded version.
Quick syntax explanation: the function(:f1, 1) construct is simply a reference to an existing function f1 (that's the one from the Workers module) while fn(x) -> ... end creates an anonymous function (lambda). Essentially, we send four function values to the Pipeline.create, and it turns them into pipelined processes.

The complete code is bundled together with the examples of the previous article, and can be found here.

Conclusion

Hopefully, this article has demonstrated that it is fairly easy to break a long sequential operation into a chain of parallel tasks. While this didn't speed up the handling of a single message, it has significantly improved the throughput of the system.

Despite the ease of implementation, I wouldn't advise the overuse of this technique. The code is more complicated/complex than the naive approach and some performance penalty is introduced when passing messages from one process to another.

Personally, I would start with the sequential approach, do some benchmarking and then, if/when required and appropriate, move to the pipeline version. The exceptions to this rule would include obviously long running operations, such as the ones presented here: database operations and network I/O which I would immediately implement in separate processes.