cassandra summit 2014: reading cassandra sstables directly for offline data analysis
DESCRIPTION
Presenter: Ben Vanberg, Senior Software Engineer at FullContact Here at FullContact we have lots and lots of contact data. In particular we have more than a billion profiles over which we would like to perform ad hoc data analysis. Much of this data resides in Cassandra, and we have many analytics MapReduce jobs that require us to iterate across terabytes of Cassandra data. To solve this problem we've implemented our own splittable input format which allows us to quickly process large SSTables for downstream analytics.TRANSCRIPT
![Page 1: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/1.jpg)
#CassandraSummit 2014
A Journey
● Solving a problem for a specific use case
● Implementation
● Example Code
![Page 2: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/2.jpg)
#CassandraSummit 2014
Person API
![Page 3: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/3.jpg)
#CassandraSummit 2014
Goal: Analytics on Cassandra Data
● How many profile types?
● How many profiles have social data and what type? (facebook, twitter, etc)
● How many total social profiles of each type?
● Whatever!
![Page 4: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/4.jpg)
#CassandraSummit 2014
Key Factors
● Netflix Priam for Backups (Snapshots, Compressed)
● Size-Tiered Compaction (SSTables 200 GB+)
● Compression enabled (SnappyCompressor)
● AWS
![Page 5: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/5.jpg)
#CassandraSummit 2014
Where we started
![Page 6: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/6.jpg)
#CassandraSummit 2014
Limiting Factors
● 3-10 days total processing time
● $2700+ in AWS resources
● Ad-Hoc analytics (not really!)
● Engineering time!
![Page 7: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/7.jpg)
#CassandraSummit 2014
Moving Forward
● Querying Cassandra directly didn’t scale for MapReduce.
● Cassandra SSTables. Could we consume them directly?
● SSTables would need to be directly available (HDFS).
● SSTables would need to be available as MapReduce input.
● Did something already exist to do this?
![Page 8: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/8.jpg)
#CassandraSummit 2014
Netflix Aegisthus
● We already use Netflix Priam for Cassandra backups
● Aegisthus works great for the Netflix use case: (C* 1.0, No compression)
● At the time there was an experimental C* 1.2 branch.
● Aegisthus splits only when compression is not enabled.
● Single thread processing 200 GB+ SSTables.
![Page 9: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/9.jpg)
#CassandraSummit 2014
KassandraMRHelper
● Support for C* 1.2!
● We got the job done with KassandraMRHelper
● Copies SSTables to local file system in order to leverage existing C* I/O
libraries.
● InputFormat not splittable.
● Single thread processing 200 GB+ SSTables.
● 60+ hours to process
![Page 10: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/10.jpg)
#CassandraSummit 2014
Existing Solutions
![Page 11: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/11.jpg)
#CassandraSummit 2014
Implementing a Splittable InputFormat
● We needed splittable SSTables to make this work.
● With compression enabled this is more difficult.
● Cassandra I/O code makes the compression seamless but doesn’t support
HDFS.
● Need a way to define the splits.
![Page 12: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/12.jpg)
#CassandraSummit 2014
Our Approach
● Leverage the SSTable metadata.
● Adapt Cassandra I/O libraries to HDFS.
● Leverage the SSTable Index to define splits. IndexIndex!
● Implement an InputFormat which leverages the IndexIndex to define splits.
● Similar to Hadoop LZO implementation.
![Page 13: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/13.jpg)
#CassandraSummit 2014
Cassandra SSTables
Data file: This file contains the actual SSTable data. A binary format of key/
value row data.
Index file: This file contains an index into the data file for each row key.
CompressionInfo file: This file contains an index into the data file for each
compressed block. This file is available when compression has been enabled
for a Cassandra column family.
![Page 14: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/14.jpg)
#CassandraSummit 2014
Cassandra I/O for HDFS
● Cassandra’s I/O allows for random access of the SSTable.
● Porting this code to HDFS allowed us to read the SSTable in the same
fashion directly within MapReduce.
![Page 15: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/15.jpg)
#CassandraSummit 2014
The IndexIndex
![Page 16: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/16.jpg)
#CassandraSummit 2014
Original Solution
![Page 17: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/17.jpg)
#CassandraSummit 2014
Final Solution
![Page 18: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/18.jpg)
#CassandraSummit 2014
Results
Reading via live queries to Cassandra 3-10 days $2700+
Unsplittable SSTable input format 60 hours $350+
Splittable SSTable input format 10 hours $165+
![Page 19: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/19.jpg)
#CassandraSummit 2014
Example
![Page 20: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/20.jpg)
#CassandraSummit 2014
Mapper
AbstractType keyType =
CompositeType.getInstance(Lists.<AbstractType<?>>newArrayList(UTF8Type.instance,
UTF8Type.instance));
protected void map(ByteBuffer key, SSTableIdentityIterator value, Context context)
throws IOException, InterruptedException {
final ByteBuffer newBuffer = key.slice();
final Text mapKey = new Text(keyType.getString(newBuffer));
Text mapValue = jsonColumnParser.getJson(value, context);
if (mapValue == null) {
return;
}
context.write(mapKey, mapValue);
}
![Page 21: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/21.jpg)
#CassandraSummit 2014
Reducer
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// Make things super simple and output the first value only.
// In reality we'd want to figure out which was the
// most correct value of the ones we have based on our C* cluster
configuration.
context.write(key, new Text(values.iterator().next().toString()));
}
![Page 22: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/22.jpg)
#CassandraSummit 2014
Job Configuration
job.setMapperClass(SimpleExampleMapper.class);
job.setReducerClass(SimpleExampleReducer.class);
...
job.setInputFormatClass(SSTableRowInputFormat.class);
...
SSTableInputFormat.addInputPaths(job, inputPaths);
...
FileOutputFormat.setOutputPath(job, new Path(outputPath));
![Page 23: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/23.jpg)
#CassandraSummit 2014
Running the indexer
hadoop jar hadoop-‐sstable-‐0.1.2.jar
com.fullcontact.sstable.index.SSTableIndexIndexer [SSTABLE_ROOT]
![Page 24: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/24.jpg)
#CassandraSummit 2014
Running the job
hadoop jar hadoop-‐sstable-‐0.1.2.jar com.fullcontact.sstable.example.SimpleExample
\
-‐D hadoop.sstable.cql="CREATE TABLE ..." \
-‐D mapred.map.tasks.speculative.execution=false \
-‐D mapred.job.reuse.jvm.num.tasks=1 \
-‐D io.sort.mb=1000 \
-‐D io.sort.factor=100 \
-‐D mapred.reduce.tasks=512 \
-‐D hadoop.sstable.split.mb=1024 \
-‐D mapred.child.java.opts="-‐Xmx2G -‐XX:MaxPermSize=256m" [SSTABLE_ROOT]
[OUTPUT_PATH]
![Page 25: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/25.jpg)
#CassandraSummit 2014
Example Summary
1. Write SSTable reader MapReduce jobs
2. Run the SSTable Indexer
3. Run SSTable reader MapReduce jobs
![Page 26: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/26.jpg)
#CassandraSummit 2014
Goal Accomplished
● 96% decrease in processing times!
● 94% decrease in resource costs!
● Reduced Engineering time!
![Page 27: Cassandra Summit 2014: Reading Cassandra SSTables Directly for Offline Data Analysis](https://reader034.vdocuments.us/reader034/viewer/2022042714/54c3858d4a7959fd7a8b457c/html5/thumbnails/27.jpg)
#CassandraSummit 2014
Open Source Project
Open Source @ https://github.com/fullcontact/hadoop-sstable
Roadmap:
● Cassandra 2.1 support
● Scalding support