Hi, I am Saša Jurić, a software developer with 10+ years of professional experience in programming of web and desktop applications using Elixir, Erlang, Ruby, JavaScript, C# and C++. I'm also the author of the upcoming Elixir in Action book. In this blog you can read about Erlang and other programming related topics. You can subscribe to the feed, follow me on Twitter or fork me on GitHub.

Working with immutable data

| Comment on this post
In the next few articles I will talk about programming with immutable data, a very important property of functional programming languages. Using immutable data might seem scary to a typical OO programmer used to deal with data which can be modified in-place. After many years of C++/C#/Ruby based development, I certainly had difficulties getting used to this concept. However once accustomed, I found it brings many benefits over classical mutable data approach: it forces a cleaner, better structured code, and makes concurrent programming much easier.

This series especially targets classical OO developers, accustomed to standard mutable variables. You might find the topic interesting even if you don't plan on coding in a functional language such as Elixir or Erlang. Immutable structures can be implemented in other languages (like this one in Ruby) and can bring some benefits which are difficult to gain with mutable data.

The first article of this miniseries will be more theoretical, presenting the basic ideas, benefits and building blocks of immutable programming, while the future ones should contain more practical, code driven examples on how to manipulate complex data. As usual, I will talk from the Elixir/Erlang point of view, but most of the ideas presented can be implemented in most modern languages.


Basics

The general idea of immutable programing is that it is impossible to do an in-place modification of a variable. Instead we can call functions which transform current value and return the modified version:

new_data = transform(original_data)

The transform does not in any way changes the original data so after it is executed we have acces to both old and the new data. The transform is a side effect free function: the same input will always produce the same output.

A more specific example of how this works is illustrated by this Elixir snippet:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
defmodule Company do
  def new(name), do: {name, []}
  
  def add_employee({company_name, employees}, new_employee) do
    {company_name, [new_employee | employees]}
  end
end

company_1 = Company.new("Initech")
company_2 = Company.add_employee(company_1, "Peter Gibbons")
company_3 = Company.add_employee(company_2, "Michael Bolton")

The Company module defines transformation functions, and lines 9-11 illustrate how we can use them to create and modify the state without in-place mutations. With proper language support we won't need the intermediate variables. Instead we can somehow chain the calls together, feeding the output of the previous function as the argument of the next one. I plan to discuss this in the future articles, but here is the taste of how is this done in Elixir:

company = 
  Company.new("Initech") |>
  Company.add_employee("Peter Gibbons") |>
  Company.add_employee("Michael Bolton")

If appropriate data structures are chosen (with respect to the actual transformations), the two variables (old and new) will share as much memory as possible, with the rest being shallow copied. For example, if we modify the 3rd element of the Elixir list, the new list will hold a shallow copy of the first two elements, the third one will have the new value, and the rest of the list is completely shared. In the example above, when we call add_employee, the new company variable will completely share the data with the previous one, adding additional information (new employee data) to its own structure.

With appropriately selected data structures, the transformations should work reasonably fast (though not as fast as in place mutations, this is what we are trading off for some other benefits). In the example above, add_employee is an O(1) operation due to the characteristics of Erlang lists and the fact that we are pushing the new employee to the top of it.

How is this used in a long running program which must respond to outer interaction and represent the changing state using immutable data? The following pseudocode illustrates the idea:

forever do
  message = wait_for_external_input()
  new_state = transform(current_state, message)
  store_state(new_state)
end

A program waits for an external input such as user interaction, network message, message from another thread or process, change on a file system, etc. Based on that input and the current state, it computes the new state (without modifying the current one) and somehow stores it (so it can be retrieved when the next external input arrives).

What does it mean to store the state? The simplest approach is to assign a value to a global variable. Since in most mutable languages assigning a value doesn't modifies the old value this won't break the immutability principle (closures might complicate the matter but I won't go into such deep details).
Erlang and Elixir offer more sophisticated solutions such as infinite tail recursion, where a loop is a function which calls itself passing the new state as the argument (if you haven't read it, refer to my article on actors which explains this in more details). Alternatively, an in memory mutable key-value store called ETS can be used.


Benefits

The key benefits of this approach are data consistency, improved code quality and easier concurrent programming.

Data consistency

Since transformation does not change the data (it only returns a modified version), it can be considered as an atomic operation. If it fails somewhere in the middle, it will not leave a mess inside the current state. We can capitalize on this by slightly altering the pseudo code presented earlier:

1
2
3
4
5
6
7
forever do
  message = wait_for_external_input()
  new_state = transform(current_state, message)
  store_state(new_state)
catch
  store_state(current_state)
end

If an invalid message somehow causes an unhandled exception, we can simply continue the execution using the current state (line 6) which is in no way corrupted by the transformation. Think of this as an in memory atomicity: either complete transformation will happen, or nothing will change.

The transformation may still introduce side effects, if it communicates with external entities e.g. by sending messages to other threads or system processes, storing to database or file system, or issuing network requests. Functions with side effects are also called impure functions. To retain consistency, it is best to completely separate impure operations from the state transformations, first calling the pure state transformation and afterwards executing the side effect tasks:

forever do
  message = wait_for_external_input()
  new_state = transform(current_state, message)
  execute_impure_tasks(new_state)
  store_state(new_state)
catch
  store_state(current_state)
end

If the state transformation breaks, no side effects will be introduced and consistency is kept completely. For the impure tasks, we have to either ensure it executes atomically (e.g. by using database transactions) or live with the fact they will not always succeed and develop our system accordingly.


Improved code quality

When an impure transformation causes no side effects, it is easy to reason about the code, and understand what it does and what it simply cannot do. Consider the following two function calls:

# retrieves employee, doesn't modify anything
employee = company.employee(name: "Peter Gibbons")

# retrieves employee, returns modified company
{employee, new_company} = company.employee(name: "Michael Bolton")

The first call retrieves the employee and produces no side effects. We can be 100% positive nothing else is modified.

The second call, however, additionally returns the company, which is a clear indication it does some additional work (e.g. internally caches the employee so the next retrieval works faster).

Another important benefit is the promotion of well factored code. Working with immutables forces you to divide your code into many small functions. Although not immutable specific, here it is more or less the only way to develop maintainable code. Since the code will be factored into many small functions with clean inputs and outputs, which produce no other side effects and do not rely on any global data, it will bring improved reusability and the code will be easier to test.

Easier concurrent programming

With the help of immutables we can write concurrent, multi threaded programs almost without any need for classical synchronization techniques such as locks, semaphores and mutexes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# main thread
forever do
  message = wait_for_external_input()
  new_state = transform(current_state, message)
  notify_parallel_thread(new_state) # requires synchronization
  store_state(new_state)
end


# parallel thread
forever do
  message = wait_for_message_from_main_thread() # requires synchronization
  do_something(message)
end

The code above presents two threads of execution. The main one again runs the infinite loop which processes external inputs and modifies the state. In line 5, the main thread somehow notifies the parallel thread that it has some new work for it. Upon receiving that notification (line 12) the parallel thread will do something with the data it received.

Consequently, we need some kind of messaging system to communicate between threads (Erlang has it built in, and I have seen libraries available for other languages, or you can always hand code it yourself) and this messaging system will indeed require some synchronization, since both parties, the sender and the receiver, modify the shared communication channel.

However, the data transformations run in parallel, without any need for synchronization. The transformation of the main thread (line 4) can run simultaneously to the computation of the parallel thread (line 13), even if both functions work on exactly the same data residing in the same memory location. This works because neither transform of the main thread nor do_something of the parallel thread can modify the data.

So the data transformations, which we expect to be complex operations (otherwise why parallelize them?), can run completely in parallel, without any need for acquiring the lock, waiting for another thread to finish, or blocking another thread. Not only does this significantly simplifies the code, and reduces deadlock possibility (it can still happen, but far less often), but it also promotes the true parallelism, since the need for synchronization is minimal. I should note that in Erlang, this benefit is not relevant, since the data is never shared between two processes anyway (it is deep copied instead).


Building blocks

The basic idea behind immutable data is not very complicated and can be implemented in many mutable based languages. In addition to primitive types (ints, floats, booleans, ...), we need two complex structures: one to hold together small fixed number of elements (something akin to records), and another one to store arbitrary large, dynamically changed collections. Of course, both of these structures must be implemented as immutables: each modifier method may not modify any existing variable, but instead must create the new instance which represents the modified version of the structure.

To implement fixed size records, Erlang and Elixir use tuples. Tuples have instant read time and O(N) modify time (N being the size of tuple). When we modify a tuple, a new one is always constructed, with modified values in place of original ones, and unchanged values shallow copied. Since tuples usually hold a small number of fixed elements, the updates should be fast enough. In an OO language tuples can be implemented using a class which exposes only public readonly properties (which may return only immutables) and modifier methods, which return new instance.

Arbitrary sized collections are usually implemented with cons, a singly linked list where each element holds a value and a pointer to the rest of the list. When using cons, pushing a new element at the top of the list, and reading the head of the list are O(1) operations. All other operations such as random data access, updates or deletes are O(N). If we update an Mth element of an N sized list, the first M-1 elements will be shallow copied, than the modified element comes, and the rest of the list is completely shared.

All other complex structures such as binary tree, hash, stack, queue or set, can be implemented on top of tuples and cons (lists). For example, in Erlang, a balanced binary tree is implemented as a recursive tuple and gives an O(log2N) performance for both reads and writes. Erlang dictionary, on the other side, uses both tuples and lists, nesting them to divide the data in buckets.

In a language which deals strictly with immutables, there are no classical loops (since a variable cannot be modified). Instead such languages have strong support for recursion, transforming the tail recursion calls to pure jump instructions. Often a support for pattern matching exists, allowing us to write functions consisting of multiple clauses, of which only one will execute depending on the values of input arguments. Here's the example of recursive list iteration in Elixir:

def each([]), do: :ok # empty list
def each([head | tail]) do
  IO.inspect head
  each(tail)
end

Notice how two clauses of function exist, one for empty and another for non empty list.

If the recursive code is too verbose, we can use helper functions to make the code seem more imperative. For example, in Elixir, the function Enum.filter can be called to get all even numbers from the list:

Enum.filter(
  [1, 2, 3, 4, 5], 
  fn(x) -> rem(x, 2) == 0 end
)

This is an example of a so called higher order function, which is a fancy name for a function that takes another function as an argument, or returns a function (or both). In this case Enum.filter takes the list and a filter function which is invoked for each element of the list. The result is another list holding all elements for which the filter function returned true.
Although the fact is hidden, Enum.filter is internally still based on a recursion, as there are no alternatives in an immutable based language.

If you try to emulate immutables in a mutable based language you will have the commodity to use typical loops (lik for and while) which use local mutable variables. I advise avoiding this approach whenever possible. Many modern mutable based languages, such as Ruby, provide high order enumerable functions which can reduce the amount of mutables in a program.


Downsides

As I mentioned earlier, immutable updates will generally be slower than the mutable ones. After all, we are constructing a new variable instead of modifying the existing memory location and in addition shallow copy some elements. However, this should usually not present a problem, since the real performance cost most often lies in the computational part (e.g. transforming the data) or in I/O operations.

Since every modifications returns a new value, the garbage collector will have to deal with a large amount of short lived variables. If the language is immutable based, the GC is probably specifically fine tuned for such situations. For other environments it is best to research and test. I would expect the generational collectors to work fine, since they expect many short lived variables but I don't have any real experience or data to back this up.

The final downside (actually a benefit) is that you can't resort to fast hacks. If you deal with immutables. Forget about quick fixes where a global variable is set in one place and read in another one. Data can only be propagated via function arguments or return values, so you are forced to incorporate your special cases in your conceptual model, which is a good thing because it again promotes cleaner, easier to maintain code.

Wrapping up

If you have endured this far, hopefully you are a bit intrigued in exploring the world of immutable programming. I tried to reduce the boring theory to a bare minimum, which we can use to do something more interesting. Next time I will discuss how to deal with a more complex immutable data.

On a completely unrelated note for all Erlang programmers:  I'll be attending this year's Erlang User Conference, so if you happen to be there and are interested in a live discussion about Erlang, Elixir or anything else, let me know. I am looking forward to finally meeting Erlang programmers in person :-)

Concurrency in a long polling comet server

| Comment on this post
The last couple of articles were a part of a loose miniseries in which I presented a few concurrency patterns I have used in my Erlang systems. Today, I will wrap up this series with some thoughts on how to deal with many concurrent clients and minimize CPU load in an Erlang based comet (HTTP push) server.

The article is based on my own experience in building a long polling based server which pushes fast changing data to connected clients. The data is dynamically categorized into channels to which clients can subscribe to. At any moment, there are no more than 50 channels, and 4000 connected clients. All clients receive data using the so called long polling technique. A client issues an AJAX request asking the server for new data, and receives a response only when something new is available, after which new request must be issued. This results in a fairly high number of HTTP requests, in peak times about 2000 per second, all of which are handled in Erlang, without any front level caches. Each request comes from a separate client which is subscribed to its own subset of available channels, and therefore must be treated individually.

Comparing to big success stories, such as WhatsApp, Facebook, Heroku and others, this server might seem almost like a hello world app. Nevertheless, it took some attention and tuning to be able to handle that kind of load, and here I will present some approaches which have helped me accomplishing the task.


Basic principles

A typical Erlang based web server handles each request in a separate Erlang process, so at any point in time there are at least as many processes as there are connected users. These processes are usually short lived: they get the response, send it back via network and terminate. I will refer to such processes as request processes

Additionally, the server will usually run a smaller number of long running actors, which will be used by the request processes to obtain data. I will refer to these processes as data providers

The basic strategy of minimizing CPU load is to make data providers do everything that is common for multiple requests, and at the same time move all specific operations to request processes. So, whatever is common, we want to execute exactly once, whereas whatever is specific, we want to execute in parallel thus promoting vertical scalability of the system. This strategy is pure common sense not specific to Erlang, but in Erlang it is especially easy to implement, due to its light weight concurrency, message passing and some other mechanisms available.

It is not always obvious what is common for multiple clients. For example, in each request, the server returns the current time in RFC1123 format. The creation of such string might last about 1-2 milliseconds, which is not insignificant if we have 2000 requests/sec. Since this information is common for all processes, it is better to have a "time provider" process which computes the string once (instead of 2000 times) per second.

Although, this might seem like a case of premature optimization, it is important to notice that each CPU bound operation in a request process might be very expensive if there are many simultaneous requests. If 4000 clients perform a 1 millisecond operation, they effectively create a 4 seconds of a single core work. The good news is that you can usually get away if you do it wrong, depending on the load and available CPU resources, thanks to the way Erlang scheduler works (there is a great article about it here). I most certainly did many things wrong (and I probably still do), but still the Erlang VM was able to cope with it, and the server was working fine. So I don't recommend long meditation whether every single statement belongs in the request or in the provider process. Start with some reasonable assumptions, then measure and optimize where needed.


Removing single process reader bottleneck

Data providers are typically implemented as Erlang processes. Since process handles messages sequentially, requesting data directly from them might cause a bottleneck. Imagine 4000 clients at the same time asking a single provider process for some information. These clients effectively become "deparallelized", meaning they will get the answer one by one, instead of obtaining it simultaneously. Moreover, the provider process will be overloaded handling fetch data requests, instead of doing some other meaningful work (e.g. producing new data).

We can get around this problem by using Erlang Term Storage (ETS), an in memory key-value storage, with keys and values being Erlang terms. ETS supports fast lookup, and truly concurrent read/write operations. You can think of it as a lower level Erlang version of Redis. While not as rich in features, ETS has advantage of being in process, which makes it easy to use and requires no serialization/deserialization of the data being stored.

Recall the time provider process mentioned earlier. It is implemented as a separate Erlang process which calculates the time string once per second. Instead of exposing read interface, the process can store the computed string into an ETS table which can be used by the requests processes to read the time string, thus making reads truly parallel and at the same time taking the load of the time provider process. This technique is for example used in the popular Cowboy web server.

ETS implements only a very basic locking mechanism, so you usually want to have one type of data modified by a single process. However, it's ok to have multiple processes modifying different types of data. For example, in the comet server, when client subscribe to channels, a client specific process stores the subscription information to the subscription ETS table. Thus, there are many processes simultaneously writing to the same ETS table, and there are also many other processes reading from that table. This works fine, because each client process handles only its own subscription information, so no record level contention occurs.

ETS tables have their own drawbacks: all data is memory copied during reads and writes, and there is no garbage collection, so you must manually delete the data when it's no longer needed. They are not appropriate for all types of problems, but when used properly, they can really boost performance. I use them often to hold server-wide information, meaning anything that will be read simultaneously from multiple processes.


Permanent client processes

As stated earlier, each HTTP request is handled in a separate Erlang process. Consequently, for every client request, we have to constantly fetch the state of the client which might introduce some processing overhead. In the long polling comet server, many clients return constantly to the server so some processing time is spent repeatedly gathering client specific data. To overcome this, we can create permanent client processes which will maintain client's state. All client related operations will be executed in these processes. When HTTP request arrives from a client, the request process simply must find the corresponding client process (for example via ETS) and ask it to perform the requested operation. The client process now has all data ready available.

Let's see how this works in practice. In the comet server, when a message for a client is generated it is sent to the client process. Since the server is long polling based, the real client may have not yet returned to the server, for example if it still processes the HTTP response of the previous message, or its HTTP request did not yet reach the server. If the client has not yet returned, the corresponding client process simply stores the message in its internal message queue, doing nothing else (since we don't know if the client will return at all). When/if the client returns, the client process immediately knows whether there are new messages, and has them immediately available for serving.

Although this effectively doubles the amount of Erlang processes being used, due to their low cost, it should not present a problem. However, some additional processing must be done to perform client expiry check and terminate processes of clients which didn't return. Additionally, since we will always have some number of phantom client processes for clients which didn't return, some extra memory and CPU will be used. As a rule, I would say the technique is appropriate when you expect mostly returning clients which issue regular, frequent requests in some larger time period.

Alternative approach is to hold client data in the ETS table. The request process can retrieve it directly from the table, which eliminates the need to have extra process per each client. I initially used this approach and then transformed it to explicit client process. The problem in my case was that there are multiple parallel modifiers of client state: long polling requests, channel (un)subscribe requests, and message dispatcher process which modifies client's inbox. Multiple parallel modifiers of the same ETS data require some manual synchronization. Additional downside of ETS is that data is copied while reading/writing, which can cause performance penalty depending on the data size. These issues disappear when using pure processes, and taking this approach increased performance in my case. In a different situation, e.g. with small state data and a single data writer, ETS based approach might work better.


Throttling dispatches

In the comet server, during peak times, the data changes fast, a couple of times per second. Since long polling is used, a typical client usually returns after some new messages are generated, and when this happens, the request must be processed separately: per each request new messages must be collected, joined and the combined final message must be compressed. This again results in extra computations, and higher CPU load.

Knowing that many clients listen to the same channels, what I wanted to achieve is to have most clients connected and in the waiting state before the message dispatch takes place. This opens up the possibility to identify clients which receive the same set of messages, and perform join/compress computation only once per each distinct subscription group (all clients listening to same channels).

To achieve this, I have resorted to the throttling approach. All messages are generated in the channel specific process, and initially, these processes dispatched messages immediately to clients. In the throttling version, I have introduced the single dispatcher process, which receives messages from channels, aggregates them and dispatches them to clients every two seconds. The interval was chosen as big enough to accomodate reasonably fast clients, and small enough not to introduce significant delay, because data must be served as fast as possible. As the result, most of the clients return to the server before new messages are dispatched, so now I could identify which clients listen to the same messages, and minimize join/compress computations (see next section). As an added bonus, since this approach reduces incoming requests rate, both CPU load and generated traffic have dropped. Prior to introducing throttling, I noticed peak load of about 3000 reqs/sec. After it was introduced, the requests rate has never went above 2000 reqs/sec even though the number of parallel clients has since increased by 33% (from 3000 to 4000).

Admittedly, fixed time throttling seems like a "lame" approach, but it was an easy solution to the problem at hand, which helped me take control over the CPU load and paved the way for further optimizations. It might be better to monitor returning clients, and dispatch new messages once some desired percentage of them has returned, which would eliminate arbitrary dispatch time delay. Additionally, switching to true permanent connection, such as web sockets or server sent events might completely eliminate the problem.


Concurrent caching

Although we usually want to have data providers doing all the common processing, sometimes it is easier to identify that commonality from inside client processes. For example, when profiling the comet server I noticed that many requests were spending a considerable amount of time joining new messages and compressing the response. Now, as I have mentioned, each client is its own case, with its own set of subscriptions, and some of them might not support compression. So at first glance, it seemed as if those messages joins and compressions are not common between different clients. However, knowing the nature of the data and the usage flow of the application, I was sure that most of the clients were actually listening to the same subset of channels. On top of this, owing to the throttling approach described above, I knew most of the clients were receiving same messages most of the time.

To take advantage of this, I have introduce the logic in the client processes which can reuse the join/compression operation from another process. The algorithm is a bit involved, but the general idea goes like this. When a client process receives the list of messages it is interested in, it verifies whether a combined message has already been computed by another client process. It does so via look up to the ETS table, using messages unique ids as the key. If a cached version exists, it is served. Otherwise, the client process joins messages, compresses the result and stores it to ETS.

The key thing here is that, while one process is joining messages, all others, interested in the same set of messages, are waiting for it to finish, so they can reuse its result. However, two processes which try to join different set of messages will not block each other. Consequently, join/compress computation is performed only once per distinct set of messages, but it is executed in parallel to join/compress for a different set of messages. There are couple of ways of accomplishing this in Erlang, and after some trials and errors, I have settled for a custom locking implementation. The generic Elixir library which I used for this task can be found here.

As the result, messages join/compress is performed much less often. A glance in the production log, tells me that in a single minute out of about 80,000 join/compress operations needed, only 1,300 are actually computed! For all others, the pre-generated result is fetched from ETS.

This is generally a more complicated approach. Not only do we have to worry about custom inter process synchronization, but there is also an expiry issue: the cached data is stored in the ETS table, so we must manually delete it at some point. Therefore, I generally find it better to prepare data in the provider process, if possible, because this should result in a simpler and better performant code.

A careful reader might also wonder whether a message dispatcher process could determine which clients receive same messages, since all messages are sent from that process. This is essentially possible, but it has its drawbacks. First, the dispatcher doesn't know which clients have returned to the server. A client might return after the next dispatch, so whatever the dispatcher has joined and compressed would then be useless for such clients (since additional new messages have arrived), making the computation completely redundant. Additionally, shifting the join/compress responsibility to the dispatcher, effectively removes parallelization of independent computations, turning a dispatcher into a sequential bottleneck.


Some final thoughts

The most important point of this post is that, in Erlang, it is fairly easy to minimize the number of redundant computations, without sacrificing the scalability. I described a couple of techniques which I find interesting, and which (hopefully) illustrate higher level view on concurrency in Erlang systems. The motivation to use these approaches came to me after observing the system from a higher level perspective, thinking about the nature of the data, how it is transported through the system and how the clients are using it. Of course, profiling, benchmarking and performance monitoring are always useful tools which can help identifying more problematic parts of the system.