Lamport Timestamp
How do you order events that happens between and within distributed processes? Let’s assume that each of the processes keeps a list of:
- All the events that happened within itself
- And all events related to sending and receiving messages between processes
To sort the list of events, we would need to determine if a particular event happens before another event. One option would be to tag each event with DateTime of when the event occurred. Alternatively, one could also use Unix Timestamp, instead of DateTime, and avoid all the hassle of timezone and daylight savings time. But, there is no guarantee that time runs the same in all the distributed processes. Time synchronisation with NTP have an accuracy level within tens of milliseconds, assuming the network is good and polled within 36 hours1. Another risk would be Leap Second and the documented problems arising from it2.
DateTime and Unix Timestamp are considered as Physical Clocks. Alternative to them are Logical Clocks and we are going to implement one of such clock called Lamport Timestamp (or Lamport’s Logical Clock). The implementation is heavily inspired by the famous paper by Leslie Lamport, titled “Time, Clocks, and the Ordering of Events in a Distributed System”3. Anytime clock is mentioned after this, it will refer to Logical Clock. Below are a few definitions that has been extracted from the paper.
Definition (DEF)
- If
a
andb
are events in the same process anda
happens beforeb
, thena -> b
. The->
means happens before. - If
a
is the sending of a message by one process andb
is the receipt of the same message by another process, thena -> b
. - If
a -> b
andb -> c
thena -> c
. This means the ordering of the events are transitive. - Two distinct events
a
andb
are said to be concurrent ifa -/> b
andb -/> a
. This would happen ifa
andb
happens in separate processes and there were no messages sent or received among the two processes between eventa
andb
.
Distributed events between processes
The logical clock can be implemented simply as a counter. To view which clock belongs to which process, we’ll create a Clock
struct with a timestamp
field to act as a counter and a process_name
to identify the process it belongs to. For convenience, the code below also includes two functions for interfacing with the Clock
:
increment
which returns a newClock
with an incremented timestamptimestamp
to return the timestamp value of aClock
Full implementation can be accessed here and each modules have a name in the first line that corresponds to a file in the repository
# clock.ex
defmodule Clock do
defstruct [:timestamp, :process_name]
def increment(clock = %Clock{timestamp: timestamp}) do
%Clock{ clock | timestamp: timestamp + 1 }
end
def timestamp(clock) do
Map.get(clock, :timestamp)
end
end
For events, we would want to know the name of the event as well as when it happened. So, a clock is needed as well. Here is an example of Event
struct that we’ll be using later on.
# event.ex
defmodule Event do
defstruct [:name, :clock]
def timestamp(event) do
Map.get(event, :clock)
|> Map.get(:timestamp)
end
def process_name(event) do
Map.get(event, :clock)
|> Map.get(:process_name)
end
end
Events ordering
Let’s build something to demonstrate the clock and ordering of events between distributed processes. We’ll emulate distributed processes using GenServer
and the application we’ll build is the Worst Random String Generator (WRSG)
We would not focus on actually building the logic for the Generator. The goal would be to demonstrate a distributed system that satisfies DEF 1 until DEF 4.
Implementation Rules (IR)
The implementation rules for the events and clock are quite straightforward:
- Each process will have its internal logical clock
- When an event happens within the process, increment the clock and assign it to the event
- When a process wants to send a message to another process, it will create a
sent
event - When a process receives a message from another process, it will create a
received
event
We’ll start simple, with intra process events. The WRSG will be generating strings one char at a time. Every time the process receives a command to generate a char, it will create an event and observes the IR1 and IR2 above.
Here is the starting code:
# generator1.ex
require Event
require Clock
defmodule Generator do
use GenServer
def generate_char(name) do
GenServer.cast(name, :generate_char)
end
def get_events(name) do
GenServer.call name, :events
end
def start_process(name) do
GenServer.start_link __MODULE__, %{name: name}, name: name
end
### GENSERVER IMPLEMENTATIONS ###
@impl true
def init(%{name: name}) do
{:ok, %{name: name, events: [], clock: %Clock{process_name: name, timestamp: 0}}}
end
@doc """
Returns all the events stored in the state
"""
@impl true
def handle_call(:events, _from, state = %{ events: events }) do
{:reply, events, state}
end
@doc """
Process use this to generate char internally
"""
@impl true
def handle_cast(:generate_char, state = %{clock: clock, events: events}) do
updated_clock = Clock.increment(clock)
new_event = %Event{name: :generate_char, clock: updated_clock}
state = %{ state | clock: updated_clock }
{:noreply, %{state | events: [ new_event | events] } }
end
end
The code uses the Clock
and Event
structs defined earlier. When the process starts, in init
, you’ll notice that the process initializes a Clock
and stores it in its state. This will be its internal logical Clock and fulfils IR1.
Next, in handle_cast(:generate_char, ...)
, the process:
- Gets its internal clock and increment it
- Create a new
Event
with the updated clock - Updates its internal clock
Let’s try running this in iex
(it is assumed that Clock
, Event
and Generator
are already compiled)
iex> Generator.start_process(:k)
iex> Generator.generate_char(:k)
iex> Generator.generate_char(:k)
iex> Generator.get_events(:k)
[
%Event{clock: %Clock{process_name: :k, timestamp: 2}, name: :generate_char},
%Event{clock: %Clock{process_name: :k, timestamp: 1}, name: :generate_char}
]
Events within a process are simple enough. We’ll move on to the interesting bit, combining with events between processes. The goal is to have 3 processes, each with its task to complete and have messages sent from one process to another. Below is a space-time diagram to illustrate what we want to achieve.
Each vertical line is a process, the arrows are messages being sent from one process to another and each dot is an event. Here the events are colour coded. The vertical direction also helps to display movement of “time”, bottom to top representing oldest to latest. The horizontal direction represents space, to indicate that processes are isolated from one another.
If we would like our processes to behave as in Fig. 1, calling generate_char
from iex
will not be adequate. We’ll need a way to inform the process of the tasks that it needs to perform. To achieve that, we’ll add a new state to the process, which is an anonymous function that will contain the necessary steps for the process to execute. Since the tasks for the process are within the anonymous function, we’ll need a way to kick off the process, let’s create a new function for that as well. Here are the changes and new methods.
# generator2.ex
defmodule Generator do
# Only showing the changes from the previous Generator code
@doc """
func will be function that contains the tasks to be executed
by the process when run() is called
"""
def start_process(name, func) do
GenServer.start_link __MODULE__, %{name: name, func: func}, name: name
end
def run(name) do
GenServer.cast name, :run
end
### GENSERVER IMPLEMENTATIONS ###
@impl true
def init(%{name: name, func: func}) do
{:ok, %{name: name, func: func, events: [], clock: %Clock{process_name: name, timestamp: 0}}}
end
@doc """
This is for kickstarting the execution
"""
@impl true
def handle_cast(:run, state = %{ func: func }) do
func.()
{:noreply, state}
end
end
Fire up the iex
again and lets try to generate the same list of events as we did previously.
iex> fun = fn() ->
> Generator.generate_char(:k)
> Generator.generate_char(:k)
> end
iex> Generator.start_process(:k, fun)
iex> Generator.run(:k)
iex> Generator.get_events(:k)
[
%Event{clock: %Clock{process_name: :k, timestamp: 2}, name: :generate_char},
%Event{clock: %Clock{process_name: :k, timestamp: 1}, name: :generate_char}
]
Great! Now we could start different Generator
processes that will run different steps if needed. Next, we need a few functions to send
and receive
messages between processes. A process will use these functions to ask the other process to do work. For convenience, a few functions also being added to start off the 3 difference processes including the tasks that each process should be executing. There is also a new function named get_all_events
for gathering events from all the processes.
# generator3.ex
defmodule Generator do
# Only showing the changes from the previous Generator code
def start_process_k do
fun = fn() ->
Generator.generate_char(:k)
Generator.send_generate_message(:k, :j)
Generator.generate_char(:k)
end
Generator.start_process(:k, fun)
end
def start_process_j do
fun = fn() ->
Generator.generate_char(:j)
Generator.send_generate_message(:j, :i)
Generator.generate_char(:j)
end
Generator.start_process(:j, fun)
end
def start_process_i do
fun = fn() ->
Generator.generate_char(:i)
end
Generator.start_process(:i, fun)
end
def get_all_events do
get_events(:k)
|> Enum.concat(get_events(:j))
|> Enum.concat(get_events(:i))
end
def send_generate_message(from, to) do
GenServer.cast from, {:send_generate_message, to}
end
### GENSERVER IMPLEMENTATIONS ###
@doc """
This is to simulate an API for a process to receive message from other processes to generate chars
"""
@impl true
def handle_cast({ :generate, from }, state = %{ name: name }) do
GenServer.cast name, {:received, from}
run(name)
{:noreply, state}
end
@doc """
A process uses this function to send a message to other process
"""
@impl true
def handle_cast({ :send_generate_message, to}, state = %{ name: name, events: events, clock: clock }) do
updated_clock = Clock.increment(clock)
new_event = %Event{name: "sent to #{to}", clock: updated_clock }
state = %{ state | events: [ new_event | events ] }
GenServer.cast to, { :generate, name }
{:noreply, %{ state | clock: updated_clock }}
end
@doc """
Here is the logic for creating an event to indicate the process has received a message.
"""
@impl true
def handle_cast({ :received, from }, state = %{ events: events, clock: clock }) do
updated_clock = Clock.increment(clock)
new_event = %Event{name: "received from #{from}", clock: updated_clock}
state = %{ state | events: [ new_event | events] }
{:noreply, %{ state | clock: updated_clock }}
end
end
The main changes surround the three new handle_cast
functions. First is handle_cast({ :generate, ...
, this is how the processes can receive a message from another process to begin executing its tasks. Before it calls run
notice that, the first thing a process does is to invoke handle_cast({ :received, ...
, which creates a new event to mark that it has received a message. The last function is handle_cast({ :send_generate_message, ...
, this function’s main purpose is to create a new event before actually sending the message.
Let’s give it a run in iex
iex> Generator.start_process_k
iex> Generator.start_process_j
iex> Generator.start_process_i
iex> Generator.run :k
Now that all the processes are running, if we call Generator.get_all_events()
and try to sort the resulting list of events, we should get something like below:
iex> events = Generator.get_all_events()
iex> Enum.sort(events, &(Event.timestamp(&1) > Event.timestamp(&2)))
[
%Event{clock: %Clock{process_name: :j, timestamp: 4}, name: :generate_char},
%Event{clock: %Clock{process_name: :j, timestamp: 3}, name: "sent to i"},
%Event{clock: %Clock{process_name: :k, timestamp: 3}, name: :generate_char},
%Event{clock: %Clock{process_name: :i, timestamp: 2}, name: :generate_char},
%Event{clock: %Clock{process_name: :j, timestamp: 2}, name: :generate_char},
%Event{clock: %Clock{process_name: :k, timestamp: 2}, name: "sent to j"},
%Event{clock: %Clock{process_name: :i, timestamp: 1}, name: "received from j"},
%Event{clock: %Clock{process_name: :j, timestamp: 1}, name: "received from k"},
%Event{clock: %Clock{process_name: :k, timestamp: 1}, name: :generate_char}
]
If compared to Fig. 1, the events are not sorted properly. There are a couple of reasons for this, but one of them is because our implementation violated DEF 2. To fix that, we have to alter our implementation rules a bit. Changes are in bold:
Updated Implementation Rules
- Each process will have its internal logical clock
- When an event happens within the process, increment the clock and assign it to the event
- When a process wants to send a message to another process, it will create a
sent
event - When a process receives a message from another process, it will:
- Updates its internal logical clock to
max(message_timestamp, process_timestamp)
- Create a
received
event
- Updates its internal logical clock to
Let’s update a few functions. Changes are marked in the code.
# generator4.ex
defmodule Generator do
### GENSERVER IMPLEMENTATIONS ###
@doc """
This is to simulate an API for a process to receive message from other processes to generate chars
"""
@impl true
def handle_cast({ :generate, message_timestamp, from }, state = %{ name: name }) do
# CHANGE: added message_timestamp of sender
GenServer.cast name, {:received, message_timestamp, from}
run(name)
{:noreply, state}
end
@doc """
A process uses this function to send a message to other process
"""
@impl true
def handle_cast({ :send_generate_message, to}, state = %{ name: name, events: events, clock: clock }) do
updated_clock = Clock.increment(clock)
new_event = %Event{name: "sent to #{to}", clock: updated_clock }
state = %{ state | events: [ new_event | events ] }
# CHANGE send process timestamp to receiver
GenServer.cast to, { :generate, %Clock.timestamp(updated_clock), name }
{:noreply, %{ state | clock: updated_clock }}
end
@doc """
Here is the logic for creating an event to indicate the process has received a message.
"""
@impl true
def handle_cast({ :received, message_timestamp, from }, state = %{ events: events, clock: clock }) do
# CHANGE take whichever the latest timestamp and create a new event using it
latest_timestamp = max(Clock.timestamp(clock), message_timestamp)
updated_clock = %Clock{ clock | timestamp: latest_timestamp + 1 }
new_event = %Event{name: "received from #{from}", clock: updated_clock}
state = %{ state | events: [ new_event | events] }
{:noreply, %{ state | clock: updated_clock }}
end
end
If we repeat all the steps again, we’ll get:
[
%Event{clock: %Clock{process_name: :i, timestamp: 7}, name: :generate_char},
%Event{clock: %Clock{process_name: :i, timestamp: 6}, name: "received from j"},
%Event{clock: %Clock{process_name: :j, timestamp: 6}, name: :generate_char},
%Event{clock: %Clock{process_name: :j, timestamp: 5}, name: "sent to i"},
%Event{clock: %Clock{process_name: :j, timestamp: 4}, name: :generate_char},
%Event{clock: %Clock{process_name: :j, timestamp: 3}, name: "received from k"},
%Event{clock: %Clock{process_name: :k, timestamp: 3}, name: :generate_char},
%Event{clock: %Clock{process_name: :k, timestamp: 2}, name: "sent to j"},
%Event{clock: %Clock{process_name: :k, timestamp: 1}, name: :generate_char}
]
Now it looks better. But, there are a couple of events that still have the same timestamp and they can swap places on each ordering because their value are the same. These events are an example of concurrent
events, as mentioned in DEF 4. And this particular kind of ordering is called partial ordering
.
The paper suggest that we could achieve total ordering
, where there are no ambiguity or events swapping places, if we can find a way to break the tie. One way to achieve that is to give each process a weight and use the weight as a tiebreaker. For example, we could determine that process k > j > i
and if there are any events that share the same timestamp
, we could fall back to the process hierarchy to determine which event sits higher in the ordering. Since in Elixir, inequality operation of atoms will use their string value, this can be achieved by changing the sorting logic to:
&((Event.timestamp(&1) > Event.timestamp(&2)) || ((Event.timestamp(&1) == Event.timestamp(&2) && Event.process_name(&1) > Event.process_name(&2)))
So far, we have learned about how to use Logical Clocks instead of Physical Clocks for generating and ordering events in a distributed process. In part 2, we’ll explore how to use Lamport Timestamp
for making decision within a distributed system and how to handle out of order events.