mongodb hadoop connector whitepaper-2

20

Upload: riteshaladdin

Post on 12-Jul-2016

227 views

Category:

Documents


0 download

DESCRIPTION

Mongodb Hadoop Connector Whitepaper-2

TRANSCRIPT

Page 1: Mongodb Hadoop Connector Whitepaper-2
Page 2: Mongodb Hadoop Connector Whitepaper-2

2ww.ikanow.com

OverviewHere at IKANOW, we are big fans of MongoDB, for reasons we’ve written about before: it has decent performance, scales sensibly, and above all its schema-less JSON approach makes it a joy to develop with, and ensures easy integration with a host of other modern technology platforms.

One area where MongoDB is less well developed than many NoSQL alternatives is in its support for Hadoop. This is not surprising since technologies like HBase and Accumolo are in fact built on top of the hdfs/Hadoop stack.

It should be noted that the combination of MongoDB’s query language and JSON integration already enables many of the functions Hadoop is used for, like data transformation. The latest MongoDB release also includes an aggregation framework that is comparable to Hive, and will be sufficient for many simpler analytics (here is a nice example of a typical use case).

However, especially given the complexity of documents MongoDB is capable of storing, there is much scope for analysis more complex than can be expressed in either the aggregation framework or MongoDB’s built-in map/reduce framework (at least maintainably). Examples of such analytics that we have developed for the semi-structured sources we typically store include:

• Clustering documents based on the entities (eg proper nouns) they contain.• Graph analysis for recommending new sales leads based on their links with existing leads• (or equivalently for detecting potential fraud networks in business!)• Generating risk scores for public profiles in social networks.

1

Page 3: Mongodb Hadoop Connector Whitepaper-2

For these more sophisticated algorithms, Hadoop is our (and many others’) preferred platform:

• It has a huge community support, including integration with powerful analytic libraries and languages like R and Mahout.

• It scales well.• It’s primary language is Java, which sits in a nice intersection of decent developer base,

performance, maintainability, and ease-of-use.

It is beyond the scope of this post to describe Hadoop’s goals and architecture in detail (there is no shortage of material!); in essence it has three themes:

• distributed (and redundant) storage• distributing processing providing both scalability as the amount of required processing

increases (eg because of increasing data volumes) and also...• data locality, where the data is processed on one of the nodes on which the data resides,

minimizing I/O. (data locality becomes less important the more processor intensive theapplication)

Outline Of this pOstThe remainder of this post (aka the technical aka the fun bits!) takes a brief theoretical and empirical look at how MongoDB’s replica/sharding/indexing architecture fits with Hadoop, how the current connector works, and possible improvements to what is currently publicly available with MongoDB. Specifically we will look at:

• Analyzing the Default Mongo-Hadoop Connector• Data Locality• Run Times Based on Data Quantity

2

Page 4: Mongodb Hadoop Connector Whitepaper-2

• Run Time Improvement for Smaller Queries• A More General Solution• HDFS• Query-Specific Splits• Converting Queries to Updates• Conclusion

AnAlyzing the defAult MOngO-hAdOOp cOnnectOr10gen publish a MongoDB-Hadoop connector on github. It is described here, but basically it spawns one mapper/split per user-specified number of chunks (already created from its sharding management). This is illustrated below.

The connector allows the submitter to specify a query that is applied by each mapper to the chunks it is assigned (also shown in the diagram below). This is very useful for a number of applications:

• Providing a security layer.• Basic selection of the analysis dataset, eg by date/source.• More sophisticated slicing of the data, eg “all documents within X km of this location”,

“referencing this person”, combinations thereof, etc.

The figure on the next page shows clearly how the processing is distributed.

3

Page 5: Mongodb Hadoop Connector Whitepaper-2

dAtA lOcAlityWhat about data locality? Well, the existing code makes no attempt to identify or take advantage of any overlap between Hadoop nodes and MongoDB nodes, but we’re more interested by what can be easily added, than by what is currently present.

It should be first noted that the MongoDB and Hadoop clusters are logically complely separate, each server can belong to either or both clusters as illustrated below.

4

Page 6: Mongodb Hadoop Connector Whitepaper-2

(Co-locating MongoDB and Hadoop nodes can often make sense since Hadoop jobs typically don’t use much memory and MongoDB is normally I/O bound not CPU bound, particularly on multi-core servers).

It is important to observe that whenever the data is not evenly distributed across the mapper nodes (and this will often be the case when a user query is specified), then there is a case to process data non-locally rather than wait for local resources to become available (unless the cluster is loaded with many jobs, making it likely that the non-local nodes would be used by other jobs anyway). However for clusters where there are more Hadoop than MongoDB nodes (eg due to API/application servers), then once the data-local nodes are full, splits can be allocated immediately to the “no data nodes”.

So, where desired, adding data locality is straightforward: the “mongos” in the first figure knows about all the shards, and the splitter knows about all the mapper nodes, so ensuring a user-configurable number of splits are assigned to co-located mapper nodes first will be straightforward. Replica sets can be taken into account in a similar way.

5

Page 7: Mongodb Hadoop Connector Whitepaper-2

run tiMes BAsed On dAtA QuAntity

Now let’s look at how the existing code performs when the query returns only a subset of the collection.The table below shows how the run times are affected by reducing the amount of data processed, using a simple indexed query (5 cores across 3 lightly used servers with plenty of available I/O):

Although the CPU time spent in mappers scales well, the overall time does not change much, because the same number of splits are being generated each time. This is illustrated by the figure below.

6

Page 8: Mongodb Hadoop Connector Whitepaper-2

The red bars in the chunks indicate blocks of documents that match the query (in practice it will be even more distributed), and the white mappers indicate ones that have been passed no data (in practice even many of the mappers that are passed data are very lightly utilized, indicated in the diagram by a lighter shade of blue). The overhead from all those empty/mostly empty splits prevents the overall time from scaling.

Our testing also uncovered another important issue: although our query was indexed, it did not include the collection’s shard key (which happens to be “_id” for us). As a result, although an initial query from the “InputFormat” to the “mongos” would be fast, the mapper components combine the original query with the shard key (“_id”) range query. As a result each of the mappers will either perform the original query on the entire “_id” range, or the range query on the entire dataset matching the original query. This is illustrated in the diagram below, and discussed more later on in the post.

7

Page 9: Mongodb Hadoop Connector Whitepaper-2

A siMple iMprOveMent fOr sMAller QueriesFor our the application we were building, each individual map/reduce job acts on a small number of documents, approximately 50K on average. We investigated the performance of “skip()” (which presumably just steps through the “_id” index on the server, ie is very fast but scales “badly” -linearly- with the skip offset). Running MongoDB under a moderate load on an Amazon EC2 m1.xlarge instance with a 4x RAID-0 array, we got the following results:

Obviously these numbers will vary from instance to instance (eg under less load, during my original experiments 6 months ago, the 500K skip was only 2s), but the “shape” will stay the same.

Note that although “skip()” is linear, the overall performance will in fact be O(N squared), because the number of mappers will increase linearly with the number of documents, and each mapper will have linear performance because of the “skip()” function.

Based on this we developed a very simple extension to handle tasks within our application’s typical performance envelope:

8

Page 10: Mongodb Hadoop Connector Whitepaper-2

Our extensions to MongoInputFormat are released as open source under the Apache License and are available here. The configuration requirements are described in a post here.

As can be seen in both the diagram and the table, fewer mappers are generated and they are all fully utilized. Note one other difference compared to default connector, the mappers have to go through mongos components because chunk/shard boundaries are no longer respected. This is assumed to degrade performance but not by much. (2 “mongos” are shown because we use a fairly standard Hadoop cluster configuration of 2 mappers per dual-core CPU). One easy improvement (that has not yet been necessary for us) would be to treat each shard individually, with each mapper getting assigned a shard as well as a skip/limit pair.

9

Page 11: Mongodb Hadoop Connector Whitepaper-2

Currently this approach is also slightly more sensitive to “bad” queries: with the default approach, because of the “_id” range, a bad query traverses the database at worst once (leaving aside swap issues from the chunks not being traversed in order necessarily). With this approach a bad query is made once per split. To productionize this code vs arbitrary queries, some query analysis logic (eg with “explain()”) should be used (again, backing out to the default where advisable).

A MOre generAl sOlutiOnAlthough this extension solved our short term needs, there are clearly some issues with this simple approach: eg because of its use of the linear “skip()” function, resulting in “O(N squared)” performance, it does not scale well. Even with the multi-shard improvement mentioned above, there will be a significant envelope where the query filters enough documents for the default connector to be ineffective, but where enough documents are returned by the filter for the extension to perform badly.

Eg if each shard contains 20M documents, then any query that returns <5M documents per shard but >250K is likely to fall in between the 2 existing methods. The actual envelope will of course vary from case to case.

What we would ideally like is an approach that fits in between the 2 existing connectors (and can support data locality). The remainder of this section describes what this might look like.

We briefly consider 3 alternatives:

1. Dump matching documents to HDFS and then use Hadoop/HDFS as normal, ie not using any MongoDB specific connector at all.

2. Use a compound of query key with the shard key, and create custom “query-specific” splits.

10

Page 12: Mongodb Hadoop Connector Whitepaper-2

1

3. Turn the query into an “update” on a sparsely indexed field, and then optimize the connector to use this field.

hdfsSince the focus of this post is on manipulating indexes in MongoDB, we’re not going to spend much time on this, even though (disappointingly!) it might be the most promising type of approach.

It does have a few downsides - eg although it scales nicely, the amount of up-front I/O required for larger queries will be expensive. I did a quick test using “mongodump” with a large query (~40% of the DB), and it was at 0% after 6 minutes with 1.1GB written into one big file. This suggests at least that a custom export client would be needed. (“mongoexport” took 2.5 minutes to get to 1.1GB. In another experiment, restricting “mongoexport” to output only the “_id” field, it took 10 minutes to output 200K records.)

One potential upside to not using HDFS is that the Java Security Manager is incompatible with Hadoop, and there are no plans to fix it; but it looks from the discussion like HDFS is the main impediment. Being able to enforce security restrictions in Hadoop JARs better than Hadoop does would be very powerful for multi-user platforms like Infinit.e that support plugins.

“Query-specific” splits We saw that one big issue with the default connector was that it generated the splits independently of the query. The obvious way to solve this is to generate query-specific splits! The other issue was that the user query needed to be combined with the shard key index - fortunately MongoDB supports exactly this sort of compound index. Even better, the compound index will solve most of both issues. Note that our large

11

Page 13: Mongodb Hadoop Connector Whitepaper-2

collections happen to sharded by “_id” so we’ll proceed on that assumption; the following logic will hold for any shard key that is “reasonably unique”. The case where the shard key has a small number of distinct values is outside the scope of this post.

One initial worry I had was that a compound index would take up significantly more space, but that actually doesn’t seem to be an issue. For example, considering a single shard with 10M documents each of ~20KB in size, with a query on an indexed field with 140K distinct values:

Compare the last 2, which are identical apart from the secondary “_id” term - they are not very different in size. In fact both on the replicas of this shard, and also on a completely different cluster, with a shard of ~3M documents and only ~500 unique “sourceKey” values, the compound index is in fact slightly smaller. (Presumably the overhead caused by “munging” the two keys together is somewhat offset by the fact there can be only 1 document per combined index). Note just looking at the sizes doesn’t necessarily give the whole picture when it comes to query efficiency. That sort of thing is difficult to quantify; however empirically I haven’t noticed any difference between the two.

12

Page 14: Mongodb Hadoop Connector Whitepaper-2

ww.ikanow.co

Once the compound index is in place, the next step is for the InputFormat component to generate the mappers/splits. Assume the user specifies a maximum number of documents D per mapper instance, and that the query returns N documents. Then we want to generate “ceiling(N/D)” mappers, with each one specified by an “_id” range. This is illustrated by the following diagram.

The process of building query-specific “_id” ranges is actually surprisingly difficult because of a MongoDB issue described here and in this JIRA issue (but basically: you can’t sort on the secondary index, eg “_id”, if the query on the primary index, eg “field”, involves a range; this will normally be the case for these sorts of queries).

You can sort on the compound index though, so an approximate version can be calculated with a single pass through the data by encoding logic in a map-reduce (or in a an eval or forEach() block, it’s not clear what the fastest would be):

13

Page 15: Mongodb Hadoop Connector Whitepaper-2

• While the “field” index key remains the same, the “_id”s are ordered, so you can create “_id” ranges that contain (eg) 1/10th of the desired size.

• Each time the “field” index changes, the next “_id” can potentially be out-of-order so you need to locate its position in the array of ranges. Then increment the number of documents in the corresponding “_id” range.

• In the client, step once through the resulting array, merging _”id” ranges until they contain close to the desired number of documents.

I didn’t have time to implement this approach, so I knocked up a dummy algorithm that doesn’t do anything functionally, but should take a similar amount of time.

The above “dummy-but-representative” code took 1 minute to run on a query returning 4.5M documents (out of 10M), and should scale approximately linearly (actually O(N*log(N)); of course there will be some hard cap on the number of documents per shard anyway, probably before the log(N) term becomes significant).

Once the above unpleasantness has been overcome, it is straightforward to spawn the required number of mappers and pass each one an “_id” range to go with the original query. This is illustrated below:

14

Page 16: Mongodb Hadoop Connector Whitepaper-2

Here only the required number of mappers are generated, and each mapper makes an indexed query to its shard. Because each mapper is assigned an “_id” range that is within a single shard, data locality can be easily enforced if desired.

15

Page 17: Mongodb Hadoop Connector Whitepaper-2

cOnverting the “Queries” tO “updAtes”This approach is actually very similar to the “query-specific” splits described above. We saw two unpleasant issues with that approach:

• The additional logic that was required to handle queries involving ranges on the primary indexed field.

• The fact that every query would need to contain the type of compound index described above.

A candidate solution to this is to transform the user query into a single platform-level key (that is then the only thing that needs to have the compound index with the shard key). For example:

(Unfortunately the sparseness of the index does nothing because the index is compound, see this issue. At least it can plausibly be hoped that the “mrSelect==null” portion of the index should mostly stay swapped out and not be too expensive).

Then all the subsequent processing occurs just as in the previous example, except that the “user query” is replaced with “{ mrSelect: JOBID }” everywhere. Once the Hadoop job is finished, the array element can be removed.

16

Page 18: Mongodb Hadoop Connector Whitepaper-2

One advantage of this approach might no longer be an issue with the new MongoDB full text query (which I haven’t had a chance to play with yet): it makes it very easy to integrate the processing with text scanning Lucene-based solutions: from within eg elasticsearch (our favored platform) scroll through the matching “_id”s in blocks, for each one call “update” with “{ ‘$in’: ID_ARRAY }” as the query, and then proceed as before.

Unfortunately on a fairly heavily loaded database shard, the above update operation (on the query from the previous section, selecting 4.4M out of 10M documents) had still not completed after 20 minutes - this suggests at they very least that it is not a robust solution. In fact I then turned everything using that database off, and it still had not completed after 10 minutes. The IO (via “iostat”) showed as almost exclusively reads, “mongostat” showed no activity, and “top” showed lots of IOWAIT (100% of 1 core) but a small amount of CPU activity (<10%) shared between “mongod” and “kswapd” - so it’s not very clear what was happening.

(My main worry had been that adding ObjectIds to the document object would require that they all get relocated on the file, my hope was that there was enough padding to prevent that from being necessary for most documents).

One interesting hybrid idea would be to create files (eg on HDFS) compactly containing the “_id”s generated from the query, and then have the mappers atomically read/delete them in batches of (say) 1000 and perform “{ ‘_id’: { ‘$in’: [ ID_ARRAY ] } }” queries until there are no more left. This would remove the overhead of using the database for a relatively simple data exchange. And this could also decouple the “query scrolling” from the mappers (by having a separate server do the scrolling), reducing latency for large query responses.

cOnclusiOnsThis post performed some preliminary analysis on the MongoDB-Hadoop connector, described how we solved one performance issue, and speculated on related performance issues and approaches to solve them.A very quick summary looks something like:

17

Page 19: Mongodb Hadoop Connector Whitepaper-2

There is clear evidence that the default approach can be improved upon when the data to be processed is sub-selected by a query. There is some investigation to be done into some of the unexpected results encountered during the experiments performed for this blog post, but regardless the “query-specific indexes” approach seems like a promising avenue for further exploration.

It will likely be the case that different approaches are optimal for different algorithms, and a recommendation for 10gen, if they wanted to optimize the MongoDB-Hadoop experience, would be to put in place a better framework in place to help the community discover others’ extensions and/or publish their own.

There was also a description of how any of the above approaches could better support replicas and shards. This is not very complex to add to any of approaches discussed.

18

Page 20: Mongodb Hadoop Connector Whitepaper-2

19