Prototyping Vivaldi, a simple distributed algorithm in Elixir
When learning Elixir using the Programming Elixir book, I came across an excellent Papers We Love talk by Armon Dadgar of Hashicorp on the Vivaldi algorithm. I thought Elixir would be a great fit to implement the algorithm.
Neat way of visualizing it! This is definitely useful for visual learners, makes it easier to build an intuitive understanding
— Armon Dadgar (@armon) July 31, 2017
What is Vivaldi?
Vivaldi was developed by Frank Dabek and Russ Cox at MIT. It is a decentralized algorithm which predicts round trip times between nodes in a cluster by assigning co-ordinates to each node.
Why is it needed?
From the paper:
Large-scale Internet applications can benefit from an ability to predict round-trip times to other hosts without having to contact them first. Explicit measurements are often unattractive because the cost of measurement can outweigh the benefits of exploiting proximity information. Vivaldi is a simple, light-weight algorithm that assigns synthetic coordinates to hosts such that the distance between the coordinates of two hosts accurately predicts the communication latency between the hosts.
A modified version of Vivaldi is used by Hashicorp's Serf, which in-turn is used in Consul, a system used for service discovery and configuration. Vivaldi is also used in Vuze's BitTorrent client.
How does it work?
The paper and the talk are very accessible, and I can't do a better job! But, if you don't have the time, here's a quick simplified explanation:
Vivaldi's goal is to assign Euclidean coordinates to each node in a cluster so that these coordinates accurately(within a 10% error) predict the round-trip-time(RTT) between the nodes.
Each node starts at the origin. When a node A makes a request to another node B, B responds with the payload and also the latest version of its coordinates. A uses the difference between the real RTT, and the RTT predicted by A and B's coordinates to calculate the next version of its coordinates. When another node C communicates with A, A now responds with its updated coordinates.
This process repeats itself throughout the cluster, and all the nodes converge to coordinates which not only predict RTTs to nodes they've communicated with, but also the RTTs to the remaining nodes! In my simulations, I found that in clusters without triangle-inequality violations, each node converged after communicating once with 10 - 15% of its peers.
The authors found that Euclidean coordinates alone aren't good enough to model the Internet, so they added a height factor, to accommodate nodes that have high latencies to reach the Internet backbone.
Intuition
We'll build our intuition on how the co-ordinates are updated in 3 steps.
1. Forces in Action
Consider the above cluster with four nodes. The leftmost node, A, is new, whereas the others nodes, B, C and D are stable, i.e. their coordinates have already been assigned and the distance between each other is also equal to the latency between each other.
Assume that the latency from A to all the other nodes is equal, and that its desired position is at the centre of the triangle formed among the other nodes.
Now each of B, C and D exerts a force on A towards itself. The magnitude of each force is proportional to the difference between the real latency and the latency predicted by the co-ordinates. A calculates the resultant force — the vector-sum of all these forces — and takes a small step in the direction of the resultant force. This process repeats, and A moves towards the centre of the triangle.
2. Centralized Algorithm
In the previous case, we only had to worry about one new node. In this case, all nodes are new and start at the origin. In the first iteration, each node takes a step in a random direction and continues to follow the same principle as explained in the previous case and the nodes converge after a few iterations.
3. Distributed Algorithm
In the previous two cases, each node communicated with all the other nodes before taking a step in the right direction(You can think of it as batched gradient descent). In this case, each node takes a step after communicating with a single node (as in stochastic gradient descent) and the algorithm continues to work!
In real-world scenarios, we often find all nodes don't join the cluster at the same point in time. Hence, in the above visualization, nodes are added to the cluster in 6 stages.
You can see that nodes move more aggresively when they're new and the error rates are high. Once a node is stable enough, it isn't affected much by the presence of newer nodes.
(The height factor is not visualized since it is very small compared to the Euclidean coordinates)
Implementing Vivaldi in Elixir
You can find the code on GitHub. I'll highlight how certain features of Elixir and Erlang/OTP helped prototype the algorithm quickly.
Process Discovery
There was no impedance-mismatch between local mode and distributed mode. In local mode, all the peers ran on my development machine. And in distributed node, the peers ran in different machines. I didn't have to write any low level networking code, or serialization/deserialization code.
Once I got the algorithm working on my machine, I spun up a bunch of nodes on AWS, Digital Ocean and Google Cloud Platform to test the algorithm and spent very little time debugging networking issues.
Each peer runs the following processes.
1. Ping Client :node-<id>-ping-client
2. Ping Server :node-<id>-ping-server
3. Coordinate :node-<id>-coordinate
4. Coordinate Stash :node-<id>-coordinate-stash
5. Connections :node-<id>-connections
6. Coordinator :node-<id>-coordinator
7. Supervisor :node-<id>-supervisor
PingClient
- Periodically pings a random peer. When it receives a response, it forwards the
peer's
coordinates and the latency information to the Coordinate
process.
Coordinate
- Updates the peer's coordinates using Vivaldi, and stores the values in the
CoordinateStash
process.
PingServer
- Responds to a ping with the coordinates stored in CoordinateStash.
CoordinateStash
- Stores the latest version of the peer's coordinate.
Connections
- Helps connect to another peer's PingServer (more on this later).
Coordinator
- Helps configure the peer during simulations.
Supervisor
- Supervises all the above processes. So, for example, if the PingClient
process goes down because another peer's PingServer
does not respond within a timeout, the
supervisor just restarts the PingClient
, and I don't have to handle this error manually in the
PingClient
.
I assigned unique names to each process, where each process's name is prefixed by the node_id
.
The
processes communicate with each other using these names instead of explicitly using the pid
. In
development mode, all the peers run on a single node, whereas in the distributed mode, each peer runs on a
different node.
Only PingServer
needs to be visible across the network. So I made each PingServer
's
pid
globally visible using :global.register_name
defmodule PingServer do
use GenServer
def init([{node_id, session_id}]) do
node_id
|> get_name()
|> :global.register_name(self)
{:ok, {node_id, session_id}}
end
def get_server_pid(node_id) do
node_id
|> get_name()
|> :global.whereis_name()
end
defp get_name(node_id) do
:"#{node_id}-ping-server"
end
end
The Connections module helps discover PingServer
pid
s in distributed mode, It first
connects to the peer, and then uses :global.whereis_name
, which relies on the underlying
Erlang/OTP
system to return the PingServer
pid.
defmodule Connections do
use GenServer
def handle_call({:get_peer_ping_server_pid, peer_id}, _, config) do
case config[:local_mode?] do
true ->
get_local_ping_server_pid(peer_id, config)
false ->
get_remote_ping_server_pid(peer_id, config)
end
end
def get_local_ping_server_pid(peer_id, config) do
PingServer.get_server_pid(peer_id)
end
def get_remote_ping_server_pid(peer_id, config) do
peer_name = config[:peer_names][peer_id]
if peer_name in Node.list do
PingServer.get_server_pid(peer_id)
else
case Node.connect(peer_name) do
true ->
:timer.sleep(500)
PingServer.get_server_pid(peer_id)
_ ->
Logger.error("Can not connect to #{peer_name}")
end
end
end
end
If you're a careful reader, you might have noticed how I slipped in a :timer.sleep
after
Node.connect
. I added it as a quick hack since :global.whereis_name
returned
:undefined
without the timeout. Sarat) explained to
me
that the issue occurs because the OTP does takes a small amount of time to propagate global names across the
cluster.
Running Simulations
The default values recommend by Vivaldi and Serf are excellent. But you can tune these values based on your
cluster. So I used an additional Controller
node which sends commands to configure peers.
Elixir's pipes came in really handy in modelling setup operations.
defmodule Controller
def run(peers, base_config) do
peers
|> connect()
|> check_status(expected_status=:not_started)
|> generate_peer_configs(base_config)
|> send_command(:configure)
|> check_status(expected_status=:just_started)
|> send_command(:get_ready)
|> check_status(expected_status=:ready)
|> send_command(:begin_pings)
end
end
Here the controller first connects to each peer, then checks if each peer is up, and then generates configuration information which contains the node_ids and IP addresses, and once again checks if the configuration is successful and so on.
(The Controller code on GitHub is not yet as simple, since this was the last part of the project, and I just wanted to get the system working at this point!)
Pattern Matching and State Machines
Each peer runs a Coordinator process which receives commands from the Controller. I modelled the Coordinator as a simple state machine. If you've ever done any UI programming, you'll have written code to prevent users from clicking a button continuously and performing their action more than once. Elixir (and Erlang's) pattern matching provides this feature for free.
defmodule Coordinator do
def handle_call({:configure, config}, _, {:not_started, node_id}) do
# application logic
{:reply, :ok, {:just_started, [x, y, error]}}
end
def handle_call({:begin_pings}, _, {:just_started, {node_id}) do
# application logic
{:reply, :ok, {:pinging, [x, y, error]}}
end
def handle_call(:get_status, _, {status, {node_id}}) do
{:reply, {:ok, status}, {status, {node_id}}}
end
end
The Coordinator
accepts the :configure
command only when it is in the
:not_started
state, and then moves to the :just_started state
. Similarly, it only
accepts the :begin_pings
command when it is in the :just_started
state, and then
moves
to the :pinging
state.
Tooling
Elixir has excellent tooling, considering it's a young language and ecosystem.
-
The build tool,
mix
, comes with support for unit testing out-of-the-box, which encouraged me to write unit tests from the beginning. Similarly, theLogger
package is also available by default. -
Hex
, the package manager is integrated withmix
and is very easy to use as well. -
Also, I loved interactive development using iex's
:recompile
command. I wish Python'simportlib.reload()
function was just as intelligent.
I faced issues in setting names and cookies to nodes dynamically. So I asked a StackOverflow question and quickly received an answer. I still had to resort to a couple of hacks to get the system working. But this was expected considering this was my first Elixir project, and mainly because I'm not familiar with Erlang's ecosystem.
Overall, I understood the appeal of Erlang/OTP in building distributed, soft real-time systems. This was also the first time I've used a functional programming language, and Elixir eased the learning curve. Praveen explained how we don't have to use complex concepts in order to be productive with Elixir, and it resonated with me as well. I'm now looking forward to using Elixir in my upcoming projects!