sharding key-value data in isis2

SHARDING KEY-VALUE DATA IN ISIS2 Ken Birman Cornell University

Cornell University

Sharding When we create very large groups we often

“shard” data: rather than fully replicating the data, we break it into subgroups with some small replication factor like 2 or 3.

Isis2 allows you to do this very easily, using the Isis2 DHT (a distributed key-value store).

You can mix and match: fully replicate some data, shard other data, etc.

Definition A shard is a container for some set of data items

Normally the data items are (key,value) tuples

In our work the container “lives” on a small set of machines within what might be a bigger group

We store large data sets by using some rule that maps from keys to shards.

Sharding: Key Concepts Where do the keys and values come from?

Key: Any data type you like. Values: Again, any type you wish. But best to keep

very large objects in mapped-memory regions and put the names of the regions in the DHT, not the values.

Subset multicast: A form of multicast Send and OrderedSend that delivers just to a subset of the members of a group. Used to update shard(s)

Aggregation: A way to query a very big system and combine the responses. Like a query, but the responses are combined “in the network”, not just at the caller

Setting up sharding You’ll call:

The parameters describe the anticipated steady-state configuration of the group.

nShards will be ExpectedSize/ReplicationFactor

g.DHTEnable(int ReplicationFactor, int ExpectedGroupSize, int MinGroupSize);


g.DHTEnable(int ReplicationFactor, int ExpectedGroupSize, int MinGroupSize, int TTL);

Key-value Stores (DHT) Data items are

tuples with a key and value

Basic operations are Put and Get

DHT maps key to a shard and puts the data on the shard

Get fetches the data from the shard

g.DHTPut(“Ken”, 58)

g.DHTPut(“Sarah”, 26)

g.DHTGet(“Ken”) returns 58

Isis2 DHT: Called Ida as a shorthand(Infrastructure for Data Analysis)

Builds on Isis2 but applications might limit themselves to mostly Ida mechanisms Goal: Support a long-lived in-memory DHT, cloud

scale, with rapid Put/Get and Query operations First challenge: Strengthen the DHT model

DHTs normally do tuple-at-a-time operations In Ida, we decided to offer atomic operations

on sets of tuples: MultiPut, MultiGet, MultiQuery.

DHT-ness What makes something a DHT?

In a datacenter setting, a DHT… Reads, updates cost roughly a single RPC Behavior is never disrupted by churn.

In Isis2 we want the DHT to have stronger guarantees. Does adding consistency to a DHT break DHT-ness?

Sharding: Usual approach Most of today’s big data systems have

some set of nodes. They hash to group the nodes into “shards”

In effect: “hash first”:

In Ida group view determines the shards Node index shard Key hashed key shard… but we let the

user redefine GetHashode, hence can control this mapping

Node-ID hashed node-id shardKey hashed key shard

First issue: Shard mapping The picture we showed you earlier was a


This illustration suggests that the shards are simply laid out left to right within the group. But in fact we do it this way:

0 0 0 1 1 1 2 2 2 3 3 3

1 2 30 1 2 30 4 1 2 30 4 1 2 30 44

4 4 4

Basic mapping: Shards in a group Group view: managed by Isis2, fully replicated Shards “count off” left to right in group view

Fast remap minimizes churn overheads If needed, state transfer loads key-value tuples to the

remapped member(s)

1 2 30 1 2 30 4 1 2 30 4 1 2 30 441 2 30 4 1 2 30 4 1 2 30 4 1 2 304

Danger: Churn!

Extra members Notice that not every shard will

necessarily have exactly the desired ReplicationFactor members!

We recommend keeping DHT groups a little too big, to minimize the impact of churn

Extra members do participate… but if a fault occurs your shards won’t get “too small”

You control the mapping! The mapping of group members to shards is as

shown, but to decide which shard a key maps to1. Isis2 calls the GetHashCode(key) method2. Takes the value that comes back % number-of-


Normally this spreads data fairly randomly

But if you “override” the default hash code method you can assign data to shards under program control. This is useful!

Example of user-defined hashing Suppose that you are representing a set of web pages

as keys and values Key: the web page name or some form of id Values: information about the page

You might want pages that are somehow related to one-another to be in the same shard to make it easy to access them simultaneously

By mapping their keys to the same shard-id you can do this. If you map to a specific shard-id, Isis2 will use that shard because the modulus operation has no effect

Coding Example: A numeric key [AutoMarshalled] public class myKeyType { public int key;

public myKeyType(int n) { key = n; }

public myKeyType () { }

public override int GetHashCode() { return key; // Uses the key as the shard number }

public override bool Equals(object obj) { return key == ((myKeyType)obj).key; }

public override string ToString() { return key.ToString(); } }

What can the hash function do?

Any calculation based on the given key

Any use of the group view is acceptable too Number of members in the group, number of

shards… If the view changes, and this impacts the shard

mapping, Isis2 automatically moves tuples to the correct shard

Only rule is that every group member must give the same mapping for the same key and view

Collisions With sharded data, we worry about two kinds of

collisions that can occur

Items with distinct keys often map to the same shard This is normal. We say that the shard holds a “slice” of

the overall database. This slice is a collection of key-value tuples that all map to the same shard

A second form of collision occurs when a new key-value tuple is inserted and there is a prior key-value tuple with the same key already in the system

How are collisions handled? Suppose that Put(key, value2) matches the same

key as in some prior Put(key, value1). By default, value2 replaces value1: newer replaces older

You can also define a “put collision resolver” Call DHTSetPutCollisionResolver() to register the

resolver Resolve(key, value1, value2) result The system calls the resolver if a collision occurs and it

can override the default behavior. For example you can keep a list of values, or average them, or keep the “best” value, etc.

Supporting multi-key operations A put that might need to update multiple key-value tuples, and

hence update multiple shards, requires a form of “subset” multicast We added a way to use Send and OrderedSend to talk to a list of

group members, or even “a list of keys” These versions of Send and OrderedSend allow you to specify a list

of group members: a subset of the full group. You can actually use this directly when sending a multicast. We use

it to implement DHTPut, DHTGet, DHTOrderedPut and DHTOrderedGet

This lets us offer all kinds of multi-tuple operations Call DHTPut or DHTOrderedPut with a List<KeyValuePair<KT,VT>> Each element is a KeyValuePair object containing a key and a value.

Coding Example: MultiPut/Get Shows a few Put and OrderedPut

operationsfor (int k = 0; k < 10; k++){ List<KeyValuePair<int, string>> myList = new List<KeyValuePair<int, string>>(); for (int n = start + 1000 + 10 * k; n < start + 1000 + ntuples / 4 + 10 * k; n++) myList.Add(new KeyValuePair<int, string>(n, (n * 5 % 1000).ToString())); g.DHTPut(myList);}for (int k = 0; k < 10; k++){ List<KeyValuePair<int, string>> myList = new List<KeyValuePair<int, string>>(); for (int n = start + 2000 + 100 * k; n < start + (n * 5 % 1000).ToString())); g.DHTOrderedPut(myList);}

Coding Example: MultiPut/Get Fetches sets of 3 key-value pairs

Understanding self-test: what could happen if we used DHTGet instead of DHTOrderedGet here?

for (int count = 0; count < 500; count++) { result = g.DHTOrderedGet<int, string>(new List<int>() { start + count + myrank * 10, start + 1000 + count + myrank * 15, start + 2000 + count + myrank * 25 }); IsisSystem.WriteLine("DHTOrderedGet[" + count + "]: N=" + result.Count()); }

Visualizing multi-key operations

1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4

Actions occur on ordered

consistent cuts

What do the wavy lines mean? Each wavy line

represents a singleDHTOrderedPut orDHTOrderedGet

Notice that they never“cross”

The effect is like thedatabase serializabilty guarantee: this kind of Getonly sees the DHT in a consistent state!

Multicast can trigger new puts…

1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4

OrderedSend starts computation at targeted subset of group members

Application initiates request

Each key maps to a shard. One representative participates from each selected shard and contributes a subresult.

“Shuffle” occursas new Puts are


… but if you do this, think about robustness to failures!

If you do create new key-value tuples, it may be wise to have multiple shard members do each put

This way if one fails, someone else will compute the same new tuple and do the same put

Then just have the DHT ignore duplicates (which is the default behavior)

Query? Multicast, then reply…

1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4

OrderedSend starts computation at targeted subset of group members

Application initiates request

denotes aggregation via developer-defined code.

Aggregated result

How to access the DHT? A DHT group member can access a copy of the

DHT itself, but not the “true” DHT data structure You call DHT<KT,VT>() Isis2 computes a clone of the DHT (this is because of

concerns about locking). It includes only items matching the given types but you can specify <Object,Object>

Then you can use the result in a computation. Example (using LINQ, but you don’t have to use

LINQ)g.DHT<int,int>().GroupBy(kvp => kvp.Value/1000);

But who calls DHT<KT,VT>()?

You need to send the Query to the appropriate group members You could use a DHTQueryKey object and specify a list

of keys. The system will send to each shard matching those keys

You have two options: send to all members of each shard, or send to just one selected member of each shard. The insight here is that for a query, any one shard member can compute the result

So: you issue a multicast (OrderedSend) to the shards. Recipients call DHT<KT,VT>(), compute sub-results, and then we combine those subresults

Would this be hard to code? Example of an Isis request handler

Notice that this example used LINQ g.DHT<int, string>() // The DHT “slice” .Where(kvp => kvp.Key % 77 == n) // A subset… .Select(kvp => kvp.Value); // Make a list of the


g.Handlers[0] += (Action<int>)delegate(int n) { IsisSystem.WriteLine("Entry to new parallel Select logic; looking for key%77==" + n + ", my portion of the DHT contains " + g.DHT<int, string>().Count() + " Key,Value pairs"); IEnumerable<string> newList = g.DHT<int, string>().Where(kvp => kvp.Key % 77 == n).Select(kvp => kvp.Value); IsisSystem.WriteLine("After parallel Linq operation, my contribution contains " + newList.Count() + " matching tuples"); };

How would the caller combine results?

The caller receives a list of replies, one from each participant.

So you can use the C# LINQ query language to combine them, or can just write code that aggregates (combines) the contents of the list

For example, you could form one big merged list, take the maximum, take the average, etc

Big Query? Use tree aggregation…

1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4

OrderedSend starts computation at targeted subset of group members

Aggregated result

Application initiates scalable aggregation

This is where Isis2 calls an aggregation method that you can define.

How tree aggregation works You need to define a method

Aggregate(key, value1, value2) result In practice you also need to specify all the types:

this method is actually a “generic”. But that just makes it messy, not hard to do.

Then the system will call this function within the tree.

In practice that means the calls happen in DHT members (your program, your code…) which get asked to help out in this way.

Example of a simple aggregator You register the code that combines the


In Isis2, an Aggregator<KT,VT> is a function that receives three arguments: a key and two values, of the given types, and returns a new value

This one uses the max of the two arguments

g.RegisterAggregator<int, myData>((Aggregator<int, myData>)delegate(int qk, myData lv, myData dv) { return new myData(Math.Max(,; });

The caller does a query, then waits: Sends a request to handler 0

Then collect the aggregated result The wait can fail if the group membership

changes while the computation is underway.

g.Send(0, counter); try { g.GetAggregatorResult<int, myData>(counter); } catch (AggregationFailed) { IsisSystem.WriteLine("Aggregation failed for round " + counter); }

A consistency guarantee Either your aggregated result reflects

exactly one contribution from each shard that should contribute

… or your result might be wrong and Isis2 throws an AggregationException. Normally you would just reissue the request This is an example of planning for failure

and treating it as a routine, common problem

You should expect that these exceptions do happen!

ID for aggregation… Isis2 allows you to have multiple such aggregations

running at the same time. Each needs an identifier so that it can be distinguished from the others. Example simply used an integer for this

In the case where we use aggregation in conjunction with the DHT we normally use a DHTKey object as the identifier. It has a list of the keys in it.

Isis2 can then match up the results even if different aggregations hit the same nodes

Side remark on Aggregation In fact you can use aggregation without using

the DHT. Any multicast can be used to initiate an aggregation.

… and you can aggregate again and again, without even using a multicast. The group members just keep reporting on their “value”

… You can even reuse the same aggregation key. This is a nice feature: it lets you continuously track properties of the group using aggregation.

Ida can even support full DB transactions

Pre-execute transaction on a real-only copy. Remember object versions accessed and the desired updates.

Now execute transaction as a 1-shot transaction: Phase 1: Distribute provisional updates and check

that data versions haven’t changed. Hold a lock. Phase 2: Apply the updates atomically, release


… but transactions violate DHT-ness

Even if Ida can support full transactions, users might feel that this is one step too far

Requires multiple phases Holds a lock (albeit briefly)

Potential users seem to like multi-tuple transactions but uncomfortable with “algorithms” that could issue more than one of them at a time…

Ida: Preliminary performance data

With a single key, shard size 2, OrderedPut and OrderedGet reach performance of Put and Get

With multiple keys per operation (picked to cause many conflicts) performance saturates. Why? Turns out that Ida is I/O

bound… Infiniband UDP, multicast is slow !

Currently porting Isis2 to use the OFED package

Ida: Preliminary performance data

This compares two query approaches Agg/O-Agg: async

multicast to participants via Send/OSend. Tree-structured aggregation collects responses

Query/OQuery: 1:N responses

Aggregated ordered query underperforms 1:N query in this example, but would win if the “responses” were large

Ida: Preliminary performance data

Locality of costs for OrderedGet touching 3 shards. One member in each shard incurs load (this is good).

A single failure in a group with 128 members at time 45secs, then a failure of 1/3 of the group members at time 90 secs.

Rapid restoration of service When 1/3-fail, we lose about 1/3

capacity to handle parallel queries

Summary Isis2: Powerful new platform for data replication

in the cloud. Ida is the Isis2 DHT

Our question: “what should DHT consistency mean?” We looked at a range of options: single tuple

updates, multiple tuples, queries, full transactions…

… and Ida does support all of these options … but we suspect that the desire for DHT-ness will

drive users to use only simple