- Receives xmls via tcp from the external data provider.
- Parses xmls and modifies internal data model accordingly.
- Stores necessary data to the SQL database, loads some other data from the database and puts them in the data hierarchy.
- Sends the data via network to another service which serves the data to the users.
receive(n) process(n) db_op(n) send(n)
Where n stands for the nth message, I could do following in parallel:
receive(n) process(n-1) db_op(n-2) send(n-3)
This is called the pipeline pattern: the output of one process is fed as the input to the next one. Since processes run concurrently, we don't have to wait for all operations to finish in order to start processing the next message. This will not speed up the total processing of one message, but it will increase the overall throughput, as I will show in the rest of the article.
The problem (simplified version)
defmodule Workers do def f1(i) do :timer.sleep(100) i + 1 end def f2(i) do :timer.sleep(100) i * 2 end def f3(i) do :timer.sleep(100) i - 3 end end
The module is simply a collection of functions, and here, three functions are defined, each one simulating a long operation by sleeping for a while. To compute the final result, we need to call those functions sequentially:
a = f1(input) b = f2(a) result = f3(b)
Obviously, in this approach, it takes 300 ms to compute the result, so this code can handle the incoming rate up to 3.3 messages per second (1 / 300 ms). If messages arrive more often, the output delay will constantly increase and the incoming messages will accumulate in the input buffer. For example, when the input rate is 10 messages/sec (one message every 100 ms), we will have following delays:
# delay(ms) 1 300 2 500 3 700 4 900 5 1100 ...
The hardcoded solution
1 2 3 4 5 6 7 8 9 10 11 12
defmodule Pipeline1 do use ExActor import Workers def init(_) do initial_state(Pipeline2.actor_start) end defcast consume(data), state: actor2 do actor2.consume(f1(data)) end end
This code defines three actors, each performing one operation
The line 2 makes the module "actor aware", which makes it easier to implement an actor, as explained in my previous post.
In the line 3, the Workers module is imported, which allows us to use functions f1, f2 and f3. Without this directive, we would have to write Workers.f1 and so on.
The init function, which will be invoked when the actor is started, creates the second actor, and stores the "reference" to it in its own state.
Finally, the actor exposes one operation: consume, which, when invoked, will perform the operation f1 and send the result to the second actor.
The other two actors are more or less a copy paste version of the first one, the actor 3 being a bit simpler, since it does not pass the result further, but prints it to the console instead:
defmodule Pipeline2 do use ExActor import Workers def init(_) do initial_state(Pipeline3.actor_start) end defcast consume(data), state: actor3 do actor3.consume(f2(data)) end end defmodule Pipeline3 do use ExActor import Workers defcast consume(data) do IO.puts(f3(data)) end end
Now, we can use this pipeline in the following way:
pipeline = Pipeline1.actor_start Enum.each(1..10, fn(i) -> pipeline.consume(i) end)
We simply start the first actor (which will in turn start the second, which will in turn start the third), and send it messages which then pass through the pipeline.
In addition, due to the nature of the Erlang virtual machine (called BEAM), we don't need multiple cores to obtain higher throughput. The BEAM can use separate threads for I/O operations, and it can also utilize the kqueue/epoll to perform asynchronous I/O (though not by default, both options must be explicitly configured). This essentially means that, while waiting for an I/O operation to finish, the BEAM can execute other pending Erlang instructions.
Of course, if the processes are CPU bound (i.e. they execute pure Erlang code without doing I/O), then you will need multiple cores to make them run in parallel. The code though doesn't need to be changed: it is ready to utilize more CPU resources if given.
The generic solution
pipeline = Pipeline.create([ function(:f1, 1), function(:f2, 1), function(:f3, 1), fn(x) -> IO.puts x end ])
This construct creates a pipeline of four processes and connects them as presented earlier. The pipeline can be used in exactly the same way as the hardcoded version.
Quick syntax explanation: the function(:f1, 1) construct is simply a reference to an existing function f1 (that's the one from the Workers module) while fn(x) -> ... end creates an anonymous function (lambda). Essentially, we send four function values to the Pipeline.create, and it turns them into pipelined processes.
The complete code is bundled together with the examples of the previous article, and can be found here.