Distributed Databases

A Big Overview (not going into the tiny details) of CS4224 — A module taken in NUS.

21 min readNov 23, 2020

** Note that some figures are taken from my school’s lecture notes

The topics covered include:

Data Fragmentation: How to partition the data and decide which database shard should contain which part of the data?

Data Storage: How does databases perform reads and writes? What is the underlying data structure used to store data?

Data Concurrency: Each server might receive more than one transaction concurrently. How do we know what order to perform them? How do we ensure this ordering of transactions is maintained across different replica sites and executed in isolation of one another?

Data Reliability: This section deals with ensuring the atomicity of a Xact. How do we know whether to abort or commit a Xact? A Xact may involve interacting with ≥ 1 site and we need to ask across all servers and ensure that all related servers arrive at the same decision regardless of server failures. 2PC -> 3PC.

Data Replication Protocols: After a Xact has been confirmed to have committed, we will then need to replicate the data to replica servers. Different types of protocols can be adopted to ensure that each server contains the latest data version. What form of consistency levels do we want to achieve?

Data Consistency: Going Further into the different types of consistency levels

Query Processing and Optimisation: How should we rewrite the logical plan for queries to make them as efficient as possible?

Important to distinguish between these two concepts:

Data Fragmentation: The splitting up of parts of the overall database across different sites.

Data Replication: The process of maintaining updates to replica data across different sites.

(1) Data Fragmentation

One key problem that comes with distributed databases is deciding how to fragment the data to be stored in the various sites.

Why do we want to do data fragmentation:

  • Locality of access
  • Scaling purposes
  • Allow execution of parallelised query

Disadvantages of data fragmentation:

  • Over fragmented database may lead to reduced efficiency in querying
  • Integrity constraints checking may be more complex — need to check across different sites

Fragmentation Strategies:

  • Horizontal Fragmentation: Each replicas stores a certain subset of rows
  1. Range Partitioning: Divide them according to the range of values of the rows
  2. Hash Partitioning: Modulo hashing of a certain attribute of a row OR Consistent Hashing (with virtual nodes) ** Note: Consistent Hashing is quite an interesting subject on it’s own**
  3. Primary Horizontal Fragmentation: Similar to range partitioning but exploits predicates in query workloads
  4. Derived Horizontal Fragmentation: Partition a relation R based on the partitioning defined for a related relation S. R.A must be a subset of S.A to ensure complete join and S.A must be a key of S to prevent duplicates. For partitioning of R to be complete and disjoint, R.A must be a foreign key of S with non null values of R.A.
  • Vertical Fragmentation: Each replicas store certain columns (always include primary key column in each replica)
  • Hybrid Fragmentation: Mix of the above 2

Desirable Properties of Fragmentation:

  • Completeness: We want to ensure that we do not lose any rows
  • Reconstruction: We must be able to derive the original set of data rows from the fragments
  • Disjointness: We cannot have duplicated rows (that’s why vertical fragmentation needs to have primary key in every replica)

Min Term Predicates and Complete Partitioning:

  • Let P = {P1, P2, …Pn} be a set of selection predicates on the relation R. A Minterm predicate m for P is the conjunction of all the predicates in P where p1* ^ p2* ^p3* ^ p4* where pi* can be pi or ~pi. Keep pi or negate pi.
  • Let F = {R1,R2…Rm} be a possible partition of the relation R.
    F is a complete partitioning of R wrt Q if for every fragment Ri, either every tuple in Ri matches Q or every tuple in Ri does not match Q.
  • If F = {R1, R2…Rm} is a minterm partitioning of R, then F is a complete partitioning with every query in Q.

(2) Data Storage & Indexing

Secondary Indexes

We might have heard of using B+- tree indexing in our databases. In a B+ tree, updates are performed in place. The posting list (associated values) for each key is stored together. Examples of databases using B+ Trees for indexing is MongoDB.

Another alternative secondary index would be Log-Structured Merge Storage (which is the focus of this section). LSM Storage performs append-only updates and is immutable. The posting list for each key could be fragmented across different SSTables (to be introduced later). Example of usage is in Cassandra.

How does LSM Storage look like:

A LSM Storage for a relation R consist of 3 main items: (1) MemTable (2) SSTables (3) Commit Log File

MemTable is the memory table stored in memory (represented by C0 tree above). It contains the latest updates made by the application and is updated in place. However, note that Deleted records are not removed but marked with tombstones. When the size of a MemTable exceeds a certain threshold, it will be flushed to disk as a new SSTable.

SSTable (Sorted String Table) are immutable structures where append-only updates happens. Each SSTable has an associated range of key values and a single timestamp. For eg, SSTable 3 is created later than SSTable 1 and hence it has a later timestamp and contains a more recent version of object values. Note that the records in a single SSTable are each sorted by their key values.

Big picture: Instead of writing SSTables to disk on every column update, it keeps the updates in memory (MemTable) and flushes SSTables to disk periodically to keep the IO to a reasonable level.

A Commit Log file is included as a backup in case the Memtable crashes and we can still restore them. This is because main memory is volatile and not durable. Therefore, each time we perform an update to the MemTable, we will have to similarly make an edit to the Commit Log and flush that to disk.

At this point, one question that comes to mind might be that since we are still required to write to disk for the commit log on top of writing to memory for updating the Memtable, then won’t the cost be higher than if we were to edit SSTables each time?

Explanation i saw from a Stackoverflow answer:

The CommitLog is optimized for writing. Unlike SSTables which store rows in sorted order, the CommitLog stores updates in the order which they were processed by Cassandra. The CommitLog also stores changes for all the column families in a single file so the disk doesn’t need to do a bunch of seeks when it is receiving updates for multiple column families at the same time.

Basically writing the CommitLog to the disk is better because it has to write less data than writing SSTables does and it writes all that data to a single place on disk.

The database keeps track of what data has been flushed to SSTables and is able to truncate the Commit log once all data older than a certain point has been written.

When the database fails and needs to be started up again, it has to read the commit log back from that last known good point in time (the point at which we know all previous writes were written to an SSTable). It re-applies the changes in the commit log to its MemTables so it can get into the same state when it stopped.

A Pictorial visualisation of MemTables and SSTables. Note that the symbol with horizontal and vertical lines are tombstones used to mark deleted records.

A few more things to note:

  1. Deletion with tombstones: Why do we not delete records immediately and need to mark them with tombstones? This is because if we delete it from the current version of MemTable, this record can still appear in older SSTable records. We might mistaken that older record to be the latest version of the record and missed the fact that the record has been deleted. Thus, we need a way for us to record the action of deletion using the tombstones.
  2. Each SSTable contains sorted records. Hence we can do binary search to search for the value we want. However, see that if the records overlaps (actually this depends on the type of compaction strategies which will be explained later), we might need to search through every SSTable to get the record we want.

Compaction of SSTables | Maintainance Task to merge SSTable records

There is a need to do some housekeeping on our SSTables and make them as compact as we can. This helps improves read performance by defragmenting table records and improves space utilisation by eliminating tombstones and stale values.

Compaction Strategies:

  • Size-tiered Compaction (STCS)
  • Leveled Compaction Strategy (LCS)

Size-Tiered Compaction Strategy

Compaction is triggered at a tier L when the ***number of SSTables ***reaches a threshold (eg, 4).

All SSTables in tier L are merged into a single SSTable stored in the next tier L+1. Tier L hence becomes empty after compaction.

Hence we observe that the size of SSTables at higher tiers are bigger.

Pros: Good for write intensive workloads since the number of SSTables is the main trigger for compaction and not size. We can reduce on the amount of IOs needed by having bigger sized SSTables.

Cons: Can hold on to stale data too long. Required memory increases over time. Reads are slower because the merge-by-size process does not group data by rows. This makes it more likely that versions of a particular row may be spread over many SSTables. Also, STCS does not evict deleted data predictably because its trigger for compaction is SSTable size, and SSTables might not grow quickly enough to merge and evict old data.

Leveled Compaction Strategy

SSTables are organised into levels: level 0, level 1…..

SSTables at level 0 may overlap (key range overlap) but SSTables at levels ≥ 1 do not overlap with SSTables in their own levels. SSTables at levels ≥1 can only overlap with at most F SSTables at the next level L+1.

Given this, except for level 0, we view each level as a single sorted runs. When searching on a particular level, we just need to access one SSTable.

If a key appears in two SSTables at different levels i & j , i < j , the version at level i is more recent. Si ,j is more recently created than Si ,k if j > k.

Compaction is triggered when the size of all SSTables at a certain level exceeds a certain threshold size. (see picture below)

For Levels ≥ 1, we need to select a SSTable by either choosing the next SSTable of the SSTable that was previously compacted or choose the SSTable with the smallest start key value.

For Level = 0, we simply merge all SSTables at level 0 with all overlapping SSTables at level 1.

How does the compaction factor F impact search performance?

Increasing F reduces the number of levels of LSM storage since we allow more overlaps of SSTables between different levels and lesser compaction happens. This improves on the worst-case I/O cost for searching.

However, a larger F increases the I/O cost of each compaction as more SSTables will be merged during compaction.

Pros: Disk requirements are easier to predict. Read operation latency is more predictable. Stale data is evicted more frequently.

Good for reads. The LCS compaction process guarantees that the SSTables within each level starting with L1 have non-overlapping data. For many reads, this process enables the database to retrieve all the required data from only one or two SSTable. Since LCS does not compact L0 tables, however, resource-intensive reads involving many L0 SSTables may still occur.

Cons: Much higher I/O utilisation where obsolete data is evicted more often, so deleted data uses smaller portions of the SSTables on disk. LCS compaction operations take place more often and place more I/O burden on the node.

Optimising SSTable Search

Each SSTable is stored as a file comprising a sequence of data blocks. We can optimise searching within a SSTable by using indexes.

Optimisation 1: Sparse index where we represent a block by the smallest key value.

Optimisation 2: Bloom Filter ** Not elaborated here**

(3) Data Concurrency

In a DBMS, a concurrency control manager ensures Isolation of Xacts.

In a distributed database system, different servers receives different query requests. We need a way to ensure that these requests are processed correctly and execute the queries in the same serial order and in isolation across all replicas (No interleaving of Xacts effects).

Idea: We want serialised transactions.

A global schedule S for T and {S1,…Sm} is serialisable if every local schedule (execution at each site) is serialisable and the global schedule matches each of this.

Terms to undestand before moving on:

Won’t be going through them here.

  • View Serialisable Schedules
  • Conflicting Actions
  • Conflict Serialisable Schedules
  • Serialisable Schedules
  • Recoverable Schedules

Theorem 1: A schedule that is conflict serialisable is also view serialisable.

Strategy A: Lock Based Concurrency Control

Depending on the database, we can have centralised lock managers (stored in master node and in charge of assigning locks) or distributed lock managers (Any site containing the primary version of the object can be a lock manager).

Each Xact needs to request for an appropriate level of lock on an object before it is permitted to perform operations on the object.

There are two types of levels:

  • Shared Locks for reading objects (S)
  • Exclusive Locks for writing objects (X)

To read an object, a Xact needs to request for a S or X lock. To update, a Xact needs to request for a X lock. A lock request can be granted if it matches the lock modes of existing locks on the object.

If a lock cannot be granted, the Xact will be blocked and the request will be added to the Object’s request queue to be processed later on.

Two Phase Locking Protocol

  • To read an object O, a Xact must have S or X lock on O
  • To write, a Xact must have X lock on O
  • Once a Xact releases a lock, the Xact cannot request for a new lock

Theorem 2: 2PL schedules are conflict serialisable

Strict Two Phase Locking Protocol

  • To read an object O, a Xact must have S or X lock on O
  • To write, a Xact must have X lock on O
  • A Xact must hold on to a lock until Xact commits or aborts.

Theorem 3: Strict 2PL Schedules are both conflict serialisable and recoverable

Deadlocks: The locking protocol helps to ensure that we can perform ≥ 1 transactions in ≥ 1 sites while having a global serial order of transaction executions. However, it might lead to the instance of deadlocks where a cycle of different Xacts waits for locks to be released by one another.

We use Wait-For-Graphs to detect deadlocks across sites.

To prevent deadlocks, we can have:

  • Wait-Die policy: Ti waits if Ti is older (earlier timestamp) than Tj. Otherwise, Ti aborts
  • Wound-Wait policy: Ti waits if Ti is younger than Tj. Otherwise, Ti aborts.

Strategy B: Multiversion Concurrency Control (MVCC) using Snapshot Isolation

The key idea is to maintain multiple versions of each object. Every write creates a new version of the object.

  • Multiversion schedules: If there are multiple versions of an object X, a read action on X could return any version
  • Two schedules over the same set of transactions are defined to be multiversion view equivalent if they have the same set of read from relationships.
  • A multiversion schedule is called a monoversion schedule if each read action in S returns the most recently created object version.
  • Serial Monoversion schedule: Serialisable monoversion schedule
  • Multiversion View serialisability (MVSS): There exist a serial monoversion schedule over the same set of Xact that is multiversion view equivalent to S.

Theorem 1: A View serialisable schedule is also a multiversion view serialisable schedule. But the reverse is not.

In snapshot isolation protocol, each Xact T sees a snapshot of the DB that consists of updates by Xacts that committed before T starts. Each Xact is associated with a start time stamp and commit time stamp. Two Xacts are defined to be concurrent if they overlap.

Wi(O) creates a version of O. Ri(O) reads either its own updates or the latest version of O that is created by another Xact that committed before Ti.

If multiple concurrent Xact updated the same object (both Xact performs writes), then only one of them is allowed to commit. Otherwise, the schedule may not be serialisable. We can follow the First Committer Wins or first Updater Wins Rule.

In a distributed system, the timestamp of each Xact will be issued by one site designated as the centralised coordinator. Write locks are managed collectively by each site’s lock manager. To start a Xact, the site will need to make a request to CC to get the timestamps (its own start Ts and the last previous Tx that has committed).

Anomalies: Snapshot Isolation does not guarantee serialisability.

Problem Scenario: Suppose X and Y are data items in different rows representing checking account balances of a married couple at a bank, with a constraint that X+Y > 0 (the bank permits either account to be overdrawn, as long as the sum of the account balances remains positive). Assume that initially X0 = 70 and Y0 = 80. Under SI, transaction T1 reads X0 and Y0, then subtracts 100 from X, assuming it is safe because the two data items added up to 150. Transaction T2 concurrently reads X0 and Y0, then subtracts 100 from Y, assuming it is safe for the same reason. Each update is safe by itself, but SI will result in the following history: H2: R1(X0,70) R2(X0,70) R1(Y0,80) R2(Y0,80) W1(X1,-30) C1 W2(Y2,-20) C2 Here the final committed state (X1 and Y2) violates the constraint X+Y > 0. This problem was not detected by First Committer Wins because two different data items were updated, each under the assumption that the other remained stable. Hence the name “Write Skew”.

Solution: One way is to require in the transactional program that each Read of X and Y to update Y give the impression of a Write of X (this is possible in Oracle using the Select For Update statement). Now it seems that both X and Y are updated in H2 and collision will occur. Another approach requires that each constraint on the sum of two accounts X and Y be materialized in another row Z and insist that all updates of X and Y must keep Z up to date. Then the anomaly of history H will not arise, since collision on updates of Z will occur whenever X and Y are updated by two different transactions.

Suppose X and Y are data items in different rows representing a checking account balance and a savings account balance, and that initially X0 = 0 and Y0 = 0. In history H3 below, transaction T1 deposits 20 to the savings account Y, T2 subtracts 10 from the checking account X, considering the withdrawal covered as long as X+Y > 0, but accepting an overdraft with a penalty charge of 1 if X+Y goes negative; finally, T3 is a read-only transaction that retrieves the values of X and Y and prints them out for the customer. For one sequence of operations, this can result in the following history under SI: H3: R2(X0,0) R2(Y0,0) R1(Y0,0) W1(Y1,20) C1 R3(X0,0) R3(Y1,20) C3 W2(X2,-11) C2 The anomaly that arises in this transaction is that read-only transaction T3 prints out X = 0 and Y = 20, while final values are Y = 20 and X = -11. This can’t happen in any serializable execution since if 20 was added to Y before 10 was subtracted from X, no charge of 1 would ever occur, and the final balance should be 10, not 9. A customer, knowing a deposit of 20 was due and worried that his check for 10 might have caused a penalty, would conclude he was safe based on the data read by T3. Indeed, such a print-out by T3 would be embarrassing for a bank should the SEC ask how the charge occurred. We also note that any execution of T1 and T2 (with arbitrary parameter values) without T3 present will always act serializably.

Recommended reading. Explanation taken from: https://www.cs.umb.edu/~poneil/ROAnom.pdf

Note that however, there is a stronger Snapshot isolation protocol that ensures serialisability not explained here.

What we have gone through so far is how a DBMS decides to fragment its replicas, store data with LSM storage and manage concurrency to preserve the isolation of Xact executions and ensure serialisability.

How do we decide to commit or abort a Xact? After deciding to commit a Xact, how exactly does the different servers communicate with each other and send the Xacts across different replica sites? This will be covered in the next section.

(4) Data Reliability Protocols

We want to ensure the database consistency even if components of the distributed system fails. Transaction must be committed at all sites or at none of the sites! No matter what failures occur and when they occur.

First Phase: Voting/Preparation phase where the coordinator collects votes from participants

Second phase: Decision phase where the coordinator sends global decision to participants.

The participants can communicate only with coordinator.


  • Every participant must reach the same global decision (commit/abort)
  • Once a participant has voted, it cannot change its vote.
  • As long as one participant votes to abort, then the global decision will be to abort.
  • It is important for a coordinator/participant to write a log record to reflect its state before sending a message. Log writes are forced.

Recovery Protocols for 2PC

Servers may crash and hence there has to be a way for them to recover.

Termination Protocols for 2PC

There will be instances where other servers fail. Hence, the remaining servers will need to be able to deal with the failures and resume operation.

  • Basic Termination Protocol
  • Cooperative Termination Protocol

The problem with the basic 2pc termination protocol is when there is a timeout in ready state and the participant becomes blocked.

A commit protocol is blocking if its state transition diagram contains either of the following:

A state that is adjacent to both a commit and an abort state

A non committable state that is adjacent to a commit state

We see that these two conditions are present in the WAIT state in the coordinator and READY state in the participant.

It could be the situation where the coordinator send a global commit message to the other participants B, C but not to A. When participant A times out, it does not know what to do and becomes blocked because it is unable to ask the coordinator. Alternatively, even after the coordinator recover, it might have erased the records and does not know.

On top of this, if a participant is in READY state, it can no longer abort unilaterally — If all participants are in READY state, and the coordinator crashes before sending its decision, then all participants block until the coordinator recovers.

Thus, to mitigate this problem, and to reduce probability of blocking by failed coordinator, we use the Cooperative Termination Protocol.

In the Cooperative Termination Protocol, the participant that timed out will send “Decision-Request” message to all other participants.

Suppose that all participants are uncertain, then the problem of blocking remains. Hence, introducing the Three-Phase Commit Protocol.

Three-Phase Commit Protocol

The additional Precommit state prevents the blocking from happening.

When the coordinator fails when the participant timeout in ready state, the new coordinator termination protocol gets executed.

Any participant/coordinator that fails and then recovers while the termination protocol is in progress will not be allowed to participate.

(5) Data Replication Protocols

The protocols introduced in Part (4) is for the context of ensuring the atomicity of a Xact across servers. Usually we can have Primary servers take part in 2PC. Data replication (this section) is typically done after a transaction has committed. For example, in Pileus, the replication is done lazily after a transaction has committed among the primary servers.

I found a good reading online. Recommended for further details on the protocols as they won’t be covered here in detail: https://www.comp.nus.edu.sg/~tankl/cs5225/2008/replicate.pdf

We need to apply updates to all replicas (or to a majority) as part of a Single Xact (Need 2PC). Our main goal is to maintain one-copy serialisability where the execution of Xact has the same effect as an execution on a non replicated DB.

The following are the four types of data replication strategies.

Just a generic classification of the protocols used for replication of data amongst servers and communicating to ensure data consistency

WHEN: Whether the server that receives the client’s request will propagate the Tx to other replica servers immediately or at a later time, depends on the consistency level desired. Eager protocols will have high consistency where the master node will keep trying to send the Xact to its’ replicas until it succeeds. Lazy protocols have eventual consistency where only one replica is updated and then refresh Xacts are sent later on to others.

WHERE: As for the question for which server should execute the Xact and change their db state during the processing of the Xact, it depends on whether a Centralised or Distributed Protocol is practiced. Centralised Protocol means that the Xact has to be executed in the master server which is also the sole lock manager while Distributed Protocol means any server (the one that receives client’s request even if it is not master node) can process it.

Locking protocol to ensure concurrency of Xact is used. We also assume that logs are being sent across servers for communication and to propagate updates.

Eager replication: Favours consistency over availability & High run time overhead. 2PC.

Lazy replication: Main goals are availability and performance.

Lazy Centralised: Ensures single copy serialisability at master node.

Lazy Distributed: Cannot guarantee single copy serialisability.

Problem with Lazy Protocols (aka concurrency problem):

The problem is that there could then be inconsistent updates and we need to ensure that each server executes the Xacts in the same order (as mentioned before in concurrency). One possible reconciliation procedure is: Last Writer Wins Heuristic (timestamp order — execute the one that has earlier timestamp). But this only works for updates that are blind writes. Relate to previous section on concurrency.

Failures with replication protocols

So far, we have considered replication protocols in absence of failures. For eg, in the previous section on 2pc which is a blocking protocol and prioritises consistency over availability.

We can consider Quorum-Based Protocol to handle failures such as network partition. This type of protocol prioritises availability over consistency.

Each copy Oi of an object O is assigned a non negative weight.

Case Study: Raft is another data replication protocol to solve the consensus problem (check out an earlier article where i did a summary post).

(6) Consistency Levels

In the above sections, we briefly mentioned Strong and Eventual Consistency. In actual, there are many other consistency levels as well.

Depending on the consistency level we desire, we have different data replication protocols for them.

  1. Strong consistency: Read operation returns the value that was last written for the object.
  2. Eventual consistency: Read operation returns any value that was written in the past.
  3. Consistent Prefix: Read operation can return any value written from the first write to the kth write.
  4. Bounded Staleness: Read operation returns the value that are no more than T minutes out of date.
  5. Monotonic Reads: If a client issues 2 read ops, either both return the same value or the second read operation returns a more recent value than the first.
  6. Read My Writes: Effects of all writes performed by a client on a object are visible to a client’s subsequent reads.

Case Study: Pileus is a db that allows tunable consistency levels

(7) Query Processing and Optimisation

The steps involved:

(a) Query rewriting: Query decomposition (Translate query into relational algebra) + Data Localisation (Rewrite distributed query into fragmented query)

(b) Optimise Query Plan: Deciding how to perform JOINS, Using Semi Joins before JOINS.

There are different Join strategies:

Collocated: Means the two tables involved in the JOIN already has been partitioned according to query predicate.

Directed: Means one of the table has been partition according to query predicate so we just need to repartition the other one.

Broadcast: We send one of the tables to all partitions of the other tables to perform join.

Repartitioned: We decide to repartition both tables involved in JOIN.

Depending on which join strategies is the most cost efficient, as seen in the table above, we choose them.

Semi Joins

Sometimes, we can optimise JOINS by first doing optimisation with Semi Joins to get rid of dangling tuples.

A semijoin is beneficial if the savings in not sending dangling tuples of R is bigger than the cost of sending distinct S tuples over to R.

(c) Execute Query Plan: The last part of query processing would be to execute it.

The End :)




𓆉︎ 𝙳𝚛𝚎𝚊𝚖𝚎𝚛 🪴𝙲𝚛𝚎𝚊𝚝𝚘𝚛 👩‍💻𝚂𝚘𝚏𝚝𝚠𝚊𝚛𝚎 𝚎𝚗𝚐𝚒𝚗𝚎𝚎𝚛 ☻ I write & reflect weekly about software engineering, my life and books. Ŧ๏ɭɭ๏ฬ ๓є!