Distributed Hash Tables and Decentralized Data Storage

This post is mostly theoretical computer science (data structures, distributed systems) leading up to future posts that can talk about the design of decentralized communities with an emphasis on social theory (self-governance, trust, responsibility for content)

I’m at the (virtual) computer-supported collaborative work conference this week, and it’s stirring many ideas related to shared governance of decentralized communities. Before digging into those ideas, though, one more interlude about technical underpinnings of decentralized systems…

The Problem Space

We have information on a big central server, and we would like to spread it across many servers. This can be for a variety of technical reasons, including:

  • Redundancy, if the central server goes offline

  • Performance, if users can connect to a variety of servers then the average workload per server will be much lower

  • Resource limitations, if a central server with enough storage, processing, or bandwidth to support all users at once is infeasible

There may also be social reasons for desiring distribution, such as removing trust in a single central entity that could delete or modify data at will, preferring instead a solution where multiple parties have copies of data and can disagree on governance policy.

There are two broad ways of solving “distribution” that at first seem quite different, but are forced to tackle similar problems:

  1. Everyone has a copy of all of the data

  2. Everyone has a piece of the data

Mirrored Data Stores

Taking the “simple” case first, let’s assume we want to mirror data across multiple servers, such that each has an identical copy of all information. This is often appropriate for load-balancers and content-distribution-networks, where we really want “50 copies of the same website, hosted by servers across the entire planet.”

This is very easy if the content never changes! Just have a single “content provider” upload data to each server, and have users connect to the content distribution servers.

The problem is slightly more complicated, but still not too bad, if the single content provider can send out an update. We may have a chaotic transition period where some CDN servers have updated and some have not, but in practice all servers will have the new content “pretty soon.” If content is pulled rather than pushed, meaning that the CDN servers periodically connect to the main server and check for a new version of the data rather than the main server connecting to each CDN server to upload content, then we’ll need some marker to determine whether content is “new”. Some of the more obvious options are:

  1. Always download content from the server, assume the server has the “ground truth”. Works, but wasteful.

  2. Download content if it has a newer timestamp than the timestamp of the previous data. This works, but timestamps are generally messy because computers clocks can drift and need to be periodically re-synchronized via NTP.

  3. Download content if it has a newer version number than the previous data. Same idea as the timestamp, but without the messiness of dealing with real-world “time”

This “versioning” lets us implement some helpful optimizations, like having CDN servers download updates from one another. CDN server 1 can download an update from the “main server”, while CDN servers 2 and 3 download from server 1, allowing the system to run smoothly even if the main server goes offline before servers 2 and 3 can be updated. All the CDN servers are always in agreement about what data is the “newest”, because a single source of ground truth increments a version number to disambiguate.

Let’s move to a messier problem: Server content is no longer static. Imagine collaborative editing software like Google Docs or Overleaf. Multiple users are making changes to a shared document, but that document doesn’t exist on a single server, but is rather spread across a range of servers for performance and redundancy. We must combine users’ edits to synchronize the servers and create a consistent view of the document.

We’ve lost the idea of single linear incrementing versions: Two users can add changes “simultaneously” (a loose definition, where “simultaneously” can just mean that the users made changes on two different servers before those servers had a chance to sync), and we need to come up with a deterministic ordering. Notice that timestamps don’t matter nearly as much as relative ordering and awareness: If Alice added a sentence to a paragraph, and Bob deleted that paragraph, then to combine the changes we need to know which edit came first, and whether Bob was aware of Alice’s edit at the time.

Lamport Vector Clocks

We can address the above challenges using a vector clock, which is basically a version number for each server indicating both what iteration of content the server is on and what updates it’s aware of from other servers.

When server 1 writes to server 2 it includes a list of all messages it doesn’t think server 2 knows about yet, based on the vector clock it received from server 2 the last time server 2 sent a message. That is, if server 1 has received (2,1,2) from server 2, it knows server 2 has “seen two messages from server 1, sent one message of its own, and seen two messages from server 3”. If server 1 has also received (0,0,3) from server 3, then server 1 knows about a message from server 3 that server 2 doesn’t know about. Therefore, when server 1 is ready to send a new message to server 2 it will first include the (0,0,3) message from server 3, followed by the new (3,1,3) message. In this way, it is not possible to receive a message without first receiving all the messages it depends on, guaranteeing an intact history.

Vector clocks assume all participants are truthful. If a server can lie about message timestamps or send multiple messages with the same timestamp then the “consistent world view” model can be trivially broken.

Notice that while we can use vector clocks to produce an optimal ordering of messages, we cannot eliminate all conflicts. Sometimes two users will introduce two conflicting changes, and both make incompatible changes to the same sentence. By frequently synchronizing servers we can make this scenario infrequent, but we need a resolution protocol like one of the following:

  1. Manual intervention (as with git merge conflicts)

  2. Automatic consensus for deciding which change to keep (as with blockchain stabilization when two competing blocks are mined)

  3. A ranking system for selecting a change (for example, if a user replies to a tweet while the original poster deletes their tweet, either always delete the reply, or create an empty “deleted post” for the new tweet to reply to)

We now have a protocol for ordering changes from different participants and resolving conflicts. This is far from the only solution: We can also build consensus protocols like Paxos that only accept and proliferate one change from one participant at a time, guaranteeing zero conflicts even in the face of equipment failure at the cost of significant delays and overhead and the inability to work “offline” (like with git) and then merge in changes later when you’re online. There are many design trade-offs in this space.

Distributed Hash Tables

So far we have described decentralized systems for ensuring that all participants end up with the same data at the end. What about distributing data across participants so users can look up information they’re interested in, without having to store the complete dataset? This is where we introduce distributed hash tables, or DHTs.

The premise is simple: Take a hash table (an efficient way of implementing the more abstract “associative array”, also called a “dictionary” or “key-value table”), and sprinkle the key-value pairs across multiple participant servers, in an equal and deterministic way. With a traditional hash table you hash the key to determine the position the value should be stored at - in a distributed hash table we hash the key to determine which participant the key-value pair should be stored at.

In the trivial case, a client would maintain a network connection to every participant in a distributed hash table. When they want to GET or PUT a value for a key, they hash the key, determine which participant is responsible, and send the GET or PUT directly to the node.

Unfortunately, this scales poorly. If a DHT contains hundreds of thousands or millions of participants, expecting a client (or even a participant) to maintain millions of concurrent network connections would be unwieldy. Instead, we’ll employ a finger-table. Each participant will maintain links to the nodes 2^0 through 2^j ahead, where 2^j is less than the total number of participants. In other words, a logarithmic number of hops:

To dive all in on computer science terminology, this guarantees that all lookups are O(log n). In a DHT with millions of nodes, lookups will take a maximum of 20 or so hops. Much worse than the O(1) lookup of a traditional hash table, but… pretty good. This trade off means clients can connect to any participant in the DHT to submit a request, and the request will quickly bounce around to the correct destination. One network connection for a client, a handful for participants of the DHT.

Alright, so that’s how we store data in a DHT with a static structure. What about redundancy? How do we handle adding and removing nodes? How do we deploy a DHT in a chaotic peer-to-peer network rather than a data center?

Data Redundancy

For data redundancy, we can just store the key-value pairs in two locations! Instead of storing in hash(key) % participants we can store in the original location and in hash(key) + 1 % participants. For additional redundancy, store in + 2, etc. If the “location” of a participant in the DHT ring is random, then there’s no harm in storing in + 1. This is also convenient from a lookup perspective: We’re looking for data stored in participant 6, but participant 6 is offline? Send the query to participant 7 instead!

What if a participant and its backup get out of sync? How do we decide which value is “correct” for a key? Well, that’s what we have lamport vector clocks for!

Adding and Removing Nodes: Dynamic Routing Tables

Replacing a node in a DHT is simple: Contact every participant that links to the dead-node, and give them contact information to update their reference. This is relatively painless: O(log(n)^2) steps to send a RELINK message to all log(n) nodes with a link to the dead one.

Growing and shrinking the DHT is more challenging. The trivial solution, adding the new edges, informing all nodes of the new DHT size, and re-hashing and re-introducing all keys, is obviously too inefficient to be practical.

Let’s revise the structure of a DHT. Instead of numbering all of the nodes sequentially, 0 to n, what if each node has a large random number associated with it? To start with, just add a few zeros, and assume the nodes are numbered “0”, “100”, “200”, …, “1500”.

Now our key lookup mechanism is broken! If we run hash(key) % 1600 the vast majority of keys will be assigned to non-existent nodes! Alright, so let’s re-define the assignment: Keys are now assigned to the closest node number that comes before the “ideal” position. This means keys assigned to nodes “1400” through “1499” will be assigned to node “1400”, keys assigned to “1500” through “1599” will be assigned to node “1500”, and keys for nodes “0” through “99” will be assigned to node “0”.

Each node is still responsible for propagating a message forward through the network, until either the correct position is found, or it’s determined that the key does not exist in the DHT.

We’ll also need to change the linking in the network. Instead of linking to “+1”, “+2”, “+4”, “+8”, we’ll instead allocate each participant some “buckets”. These buckets will let a participant track links to “many nodes 1 or 2 distant”, “a moderate number 8 or 10 distant”, “a few 50 or 100 distant”, and so on. The same concept as a finger-table, just non-deterministic. If a participant doesn’t know any participants “about 100 away” they can instead send a lookup request to the known neighbors “about 50 away”, who are more likely to know neighbors that are closer to them.

This bucketing system makes it easier to introduce new participants: We don’t have to calculate all the participants that “should” have links to the current node number, we just have to send out an introduction, and nearby nodes are likely to add the new participant to their buckets, while distant nodes are unlikely to add the participant to their buckets. The same bucketing system is ideal for redundancy, because if a nearby neighbor goes offline (which we can check using a periodic ping/heartbeat system), a participant will have many other nearby participants in their bucket, and can continue operating without loss of connectivity. If one of the few distant links is lost, then the participant needs to send out a new lookup to find other distant peers to add to their finger-table buckets.

Therefore, when we add a new participant, say node “1355”, we need to send out an announcement. Many nearby participants will add “1355” to their finger-tables, and a handful of more distant nodes will, too. Key-value pairs destined for “1355” through “1399” will be re-allocated from node “1300” to our new participant, but will also be kept in “1300” and “1200” for redundancy, depending on the fault tolerance of the network.

This structure is still recognizably a DHT if we squint at it, but it’s a lot fuzzier now, with non-deterministic positioning and linking. Lookups are still deterministic, in that key-value pairs that exist in the network can reliably be found. We can also stabilize the structure of the DHT by adding an age-based probability function: Nodes that have been active for longer in the DHT (and are therefore likely to be online in the future) are more likely to be added to buckets, and more likely to be recommended in response to “find me more neighbor” requests. This means a new node will be added to many of its nearby peers, who keep large lists of nearby neighbors, but only long-lived nodes will be added to distant buckets. This means long hops across the DHT are much more likely to be reliable and efficient, and only once a lookup gets close to its destination, where participants have large redundant buckets, do connections become more chaotic.

DHTs in the Real-World

With the additions in the “dynamic routing tables” section, we’ve got a very approximate description of Kademlia, a widely used Distributed Hash Table model. BitTorrent, described in a recent blog post, uses a modified Kademlia DHT in place of a tracker, using trackers primarily for bootstrapping by introducing clients to participants in the DHT. The Invisible Internet Protocol, I2P uses a modified Kademlia to track routers and routes connected to the network. Many cryptocurrencies use a bucket structure similar to Kademlia to introduce participants to other peers in the network, but since the blockchain isn’t a key-value storage system they don’t use a DHT for data storage.

Now that we have an understanding of how to build a decentralized content-sharing system with peer introduction and routing, we can move on to more interesting topics: How to build useful systems and communities on top of this communication protocol, and how to build valuable social frameworks on top of those communities. But that’s for another post…

Posted 10/21/20