sharding key-value data in isis2
DESCRIPTION
Sharding Key-Value Data IN Isis2. Cornell University. Ken Birman. Sharding. When we create very large groups we often “shard” data: rather than fully replicating the data, we break it into subgroups with some small replication factor like 2 or 3. - PowerPoint PPT PresentationTRANSCRIPT
SHARDING KEY-VALUEDATA IN ISIS2Ken Birman
1
Cornell University
2
Sharding When we create very large groups we often
“shard” data: rather than fully replicating the data, we break it into subgroups with some small replication factor like 2 or 3.
Isis2 allows you to do this very easily, using the Isis2 DHT (a distributed key-value store).
You can mix and match: fully replicate some data, shard other data, etc.
3
Definition A shard is a container for some set of data items
Normally the data items are (key,value) tuples
In our work the container “lives” on a small set of machines within what might be a bigger group
We store large data sets by using some rule that maps from keys to shards.
4
Sharding: Key Concepts Where do the keys and values come from?
Key: Any data type you like. Values: Again, any type you wish. But best to keep
very large objects in mapped-memory regions and put the names of the regions in the DHT, not the values.
Subset multicast: A form of multicast Send and OrderedSend that delivers just to a subset of the members of a group. Used to update shard(s)
Aggregation: A way to query a very big system and combine the responses. Like a query, but the responses are combined “in the network”, not just at the caller
5
Setting up sharding You’ll call:
The parameters describe the anticipated steady-state configuration of the group.
nShards will be ExpectedSize/ReplicationFactor
g.DHTEnable(int ReplicationFactor, int ExpectedGroupSize, int MinGroupSize);
or
g.DHTEnable(int ReplicationFactor, int ExpectedGroupSize, int MinGroupSize, int TTL);
6
Key-value Stores (DHT) Data items are
tuples with a key and value
Basic operations are Put and Get
DHT maps key to a shard and puts the data on the shard
Get fetches the data from the shard
g.DHTPut(“Ken”, 58)
g.DHTPut(“Sarah”, 26)
g.DHTGet(“Ken”) returns 58
7
Isis2 DHT: Called Ida as a shorthand(Infrastructure for Data Analysis)
Builds on Isis2 but applications might limit themselves to mostly Ida mechanisms Goal: Support a long-lived in-memory DHT, cloud
scale, with rapid Put/Get and Query operations First challenge: Strengthen the DHT model
DHTs normally do tuple-at-a-time operations In Ida, we decided to offer atomic operations
on sets of tuples: MultiPut, MultiGet, MultiQuery.
8
DHT-ness What makes something a DHT?
In a datacenter setting, a DHT… Reads, updates cost roughly a single RPC Behavior is never disrupted by churn.
In Isis2 we want the DHT to have stronger guarantees. Does adding consistency to a DHT break DHT-ness?
9
Sharding: Usual approach Most of today’s big data systems have
some set of nodes. They hash to group the nodes into “shards”
In effect: “hash first”:
In Ida group view determines the shards Node index shard Key hashed key shard… but we let the
user redefine GetHashode, hence can control this mapping
Node-ID hashed node-id shardKey hashed key shard
10
First issue: Shard mapping The picture we showed you earlier was a
lie:
This illustration suggests that the shards are simply laid out left to right within the group. But in fact we do it this way:
0 0 0 1 1 1 2 2 2 3 3 3
1 2 30 1 2 30 4 1 2 30 4 1 2 30 44
4 4 4
11
Basic mapping: Shards in a group Group view: managed by Isis2, fully replicated Shards “count off” left to right in group view
Fast remap minimizes churn overheads If needed, state transfer loads key-value tuples to the
remapped member(s)
1 2 30 1 2 30 4 1 2 30 4 1 2 30 441 2 30 4 1 2 30 4 1 2 30 4 1 2 304
Danger: Churn!
12
Extra members Notice that not every shard will
necessarily have exactly the desired ReplicationFactor members!
We recommend keeping DHT groups a little too big, to minimize the impact of churn
Extra members do participate… but if a fault occurs your shards won’t get “too small”
13
You control the mapping! The mapping of group members to shards is as
shown, but to decide which shard a key maps to1. Isis2 calls the GetHashCode(key) method2. Takes the value that comes back % number-of-
shards
Normally this spreads data fairly randomly
But if you “override” the default hash code method you can assign data to shards under program control. This is useful!
14
Example of user-defined hashing Suppose that you are representing a set of web pages
as keys and values Key: the web page name or some form of id Values: information about the page
You might want pages that are somehow related to one-another to be in the same shard to make it easy to access them simultaneously
By mapping their keys to the same shard-id you can do this. If you map to a specific shard-id, Isis2 will use that shard because the modulus operation has no effect
15
Coding Example: A numeric key [AutoMarshalled] public class myKeyType { public int key;
public myKeyType(int n) { key = n; }
public myKeyType () { }
public override int GetHashCode() { return key; // Uses the key as the shard number }
public override bool Equals(object obj) { return key == ((myKeyType)obj).key; }
public override string ToString() { return key.ToString(); } }
16
What can the hash function do?
Any calculation based on the given key
Any use of the group view is acceptable too Number of members in the group, number of
shards… If the view changes, and this impacts the shard
mapping, Isis2 automatically moves tuples to the correct shard
Only rule is that every group member must give the same mapping for the same key and view
17
Collisions With sharded data, we worry about two kinds of
collisions that can occur
Items with distinct keys often map to the same shard This is normal. We say that the shard holds a “slice” of
the overall database. This slice is a collection of key-value tuples that all map to the same shard
A second form of collision occurs when a new key-value tuple is inserted and there is a prior key-value tuple with the same key already in the system
18
How are collisions handled? Suppose that Put(key, value2) matches the same
key as in some prior Put(key, value1). By default, value2 replaces value1: newer replaces older
You can also define a “put collision resolver” Call DHTSetPutCollisionResolver() to register the
resolver Resolve(key, value1, value2) result The system calls the resolver if a collision occurs and it
can override the default behavior. For example you can keep a list of values, or average them, or keep the “best” value, etc.
19
Supporting multi-key operations A put that might need to update multiple key-value tuples, and
hence update multiple shards, requires a form of “subset” multicast We added a way to use Send and OrderedSend to talk to a list of
group members, or even “a list of keys” These versions of Send and OrderedSend allow you to specify a list
of group members: a subset of the full group. You can actually use this directly when sending a multicast. We use
it to implement DHTPut, DHTGet, DHTOrderedPut and DHTOrderedGet
This lets us offer all kinds of multi-tuple operations Call DHTPut or DHTOrderedPut with a List<KeyValuePair<KT,VT>> Each element is a KeyValuePair object containing a key and a value.
20
Coding Example: MultiPut/Get Shows a few Put and OrderedPut
operationsfor (int k = 0; k < 10; k++){ List<KeyValuePair<int, string>> myList = new List<KeyValuePair<int, string>>(); for (int n = start + 1000 + 10 * k; n < start + 1000 + ntuples / 4 + 10 * k; n++) myList.Add(new KeyValuePair<int, string>(n, (n * 5 % 1000).ToString())); g.DHTPut(myList);}for (int k = 0; k < 10; k++){ List<KeyValuePair<int, string>> myList = new List<KeyValuePair<int, string>>(); for (int n = start + 2000 + 100 * k; n < start + (n * 5 % 1000).ToString())); g.DHTOrderedPut(myList);}
21
Coding Example: MultiPut/Get Fetches sets of 3 key-value pairs
Understanding self-test: what could happen if we used DHTGet instead of DHTOrderedGet here?
for (int count = 0; count < 500; count++) { result = g.DHTOrderedGet<int, string>(new List<int>() { start + count + myrank * 10, start + 1000 + count + myrank * 15, start + 2000 + count + myrank * 25 }); IsisSystem.WriteLine("DHTOrderedGet[" + count + "]: N=" + result.Count()); }
22
Visualizing multi-key operations
1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4
Actions occur on ordered
consistent cuts
23
What do the wavy lines mean? Each wavy line
represents a singleDHTOrderedPut orDHTOrderedGet
Notice that they never“cross”
The effect is like thedatabase serializabilty guarantee: this kind of Getonly sees the DHT in a consistent state!
24
Multicast can trigger new puts…
1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4
OrderedSend starts computation at targeted subset of group members
Application initiates request
Each key maps to a shard. One representative participates from each selected shard and contributes a subresult.
“Shuffle” occursas new Puts are
triggered
25
… but if you do this, think about robustness to failures!
If you do create new key-value tuples, it may be wise to have multiple shard members do each put
This way if one fails, someone else will compute the same new tuple and do the same put
Then just have the DHT ignore duplicates (which is the default behavior)
26
Query? Multicast, then reply…
1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4
OrderedSend starts computation at targeted subset of group members
Application initiates request
denotes aggregation via developer-defined code.
Aggregated result
27
How to access the DHT? A DHT group member can access a copy of the
DHT itself, but not the “true” DHT data structure You call DHT<KT,VT>() Isis2 computes a clone of the DHT (this is because of
concerns about locking). It includes only items matching the given types but you can specify <Object,Object>
Then you can use the result in a computation. Example (using LINQ, but you don’t have to use
LINQ)g.DHT<int,int>().GroupBy(kvp => kvp.Value/1000);
28
But who calls DHT<KT,VT>()?
You need to send the Query to the appropriate group members You could use a DHTQueryKey object and specify a list
of keys. The system will send to each shard matching those keys
You have two options: send to all members of each shard, or send to just one selected member of each shard. The insight here is that for a query, any one shard member can compute the result
So: you issue a multicast (OrderedSend) to the shards. Recipients call DHT<KT,VT>(), compute sub-results, and then we combine those subresults
29
Would this be hard to code? Example of an Isis request handler
Notice that this example used LINQ g.DHT<int, string>() // The DHT “slice” .Where(kvp => kvp.Key % 77 == n) // A subset… .Select(kvp => kvp.Value); // Make a list of the
values
g.Handlers[0] += (Action<int>)delegate(int n) { IsisSystem.WriteLine("Entry to new parallel Select logic; looking for key%77==" + n + ", my portion of the DHT contains " + g.DHT<int, string>().Count() + " Key,Value pairs"); IEnumerable<string> newList = g.DHT<int, string>().Where(kvp => kvp.Key % 77 == n).Select(kvp => kvp.Value); IsisSystem.WriteLine("After parallel Linq operation, my contribution contains " + newList.Count() + " matching tuples"); };
30
How would the caller combine results?
The caller receives a list of replies, one from each participant.
So you can use the C# LINQ query language to combine them, or can just write code that aggregates (combines) the contents of the list
For example, you could form one big merged list, take the maximum, take the average, etc
31
Big Query? Use tree aggregation…
1 2 30 4 1 2 30 4 1 2 30 4 1 2 30 4
OrderedSend starts computation at targeted subset of group members
Aggregated result
Application initiates scalable aggregation
This is where Isis2 calls an aggregation method that you can define.
32
How tree aggregation works You need to define a method
Aggregate(key, value1, value2) result In practice you also need to specify all the types:
this method is actually a “generic”. But that just makes it messy, not hard to do.
Then the system will call this function within the tree.
In practice that means the calls happen in DHT members (your program, your code…) which get asked to help out in this way.
33
Example of a simple aggregator You register the code that combines the
values
In Isis2, an Aggregator<KT,VT> is a function that receives three arguments: a key and two values, of the given types, and returns a new value
This one uses the max of the two arguments
g.RegisterAggregator<int, myData>((Aggregator<int, myData>)delegate(int qk, myData lv, myData dv) { return new myData(Math.Max(lv.data, dv.data)); });
34
The caller does a query, then waits: Sends a request to handler 0
Then collect the aggregated result The wait can fail if the group membership
changes while the computation is underway.
g.Send(0, counter); try { g.GetAggregatorResult<int, myData>(counter); } catch (AggregationFailed) { IsisSystem.WriteLine("Aggregation failed for round " + counter); }
35
A consistency guarantee Either your aggregated result reflects
exactly one contribution from each shard that should contribute
… or your result might be wrong and Isis2 throws an AggregationException. Normally you would just reissue the request This is an example of planning for failure
and treating it as a routine, common problem
You should expect that these exceptions do happen!
36
ID for aggregation… Isis2 allows you to have multiple such aggregations
running at the same time. Each needs an identifier so that it can be distinguished from the others. Example simply used an integer for this
In the case where we use aggregation in conjunction with the DHT we normally use a DHTKey object as the identifier. It has a list of the keys in it.
Isis2 can then match up the results even if different aggregations hit the same nodes
37
Side remark on Aggregation In fact you can use aggregation without using
the DHT. Any multicast can be used to initiate an aggregation.
… and you can aggregate again and again, without even using a multicast. The group members just keep reporting on their “value”
… You can even reuse the same aggregation key. This is a nice feature: it lets you continuously track properties of the group using aggregation.
38
Ida can even support full DB transactions
Pre-execute transaction on a real-only copy. Remember object versions accessed and the desired updates.
Now execute transaction as a 1-shot transaction: Phase 1: Distribute provisional updates and check
that data versions haven’t changed. Hold a lock. Phase 2: Apply the updates atomically, release
lock.
39
… but transactions violate DHT-ness
Even if Ida can support full transactions, users might feel that this is one step too far
Requires multiple phases Holds a lock (albeit briefly)
Potential users seem to like multi-tuple transactions but uncomfortable with “algorithms” that could issue more than one of them at a time…
40
Ida: Preliminary performance data
With a single key, shard size 2, OrderedPut and OrderedGet reach performance of Put and Get
With multiple keys per operation (picked to cause many conflicts) performance saturates. Why? Turns out that Ida is I/O
bound… Infiniband UDP, multicast is slow !
Currently porting Isis2 to use the OFED package
41
Ida: Preliminary performance data
This compares two query approaches Agg/O-Agg: async
multicast to participants via Send/OSend. Tree-structured aggregation collects responses
Query/OQuery: 1:N responses
Aggregated ordered query underperforms 1:N query in this example, but would win if the “responses” were large
42
Ida: Preliminary performance data
Locality of costs for OrderedGet touching 3 shards. One member in each shard incurs load (this is good).
A single failure in a group with 128 members at time 45secs, then a failure of 1/3 of the group members at time 90 secs.
Rapid restoration of service When 1/3-fail, we lose about 1/3
capacity to handle parallel queries
43
Summary Isis2: Powerful new platform for data replication
in the cloud. Ida is the Isis2 DHT
Our question: “what should DHT consistency mean?” We looked at a range of options: single tuple
updates, multiple tuples, queries, full transactions…
… and Ida does support all of these options … but we suspect that the desire for DHT-ness will
drive users to use only simple optionsIsis2.codeplex.com