chris hillman – beyond mapreduce scientific data processing in real-time

Post on 16-Apr-2017

5.752 Views

Category:

Technology

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

beyond mapreduce scientific data processing in real-time

Chris Hillman October 13th 2015

chillman@dundee.ac.uk

Proteomics

GENOME21,000

PROTEOME1,000,000+

Mass SpectrometryEach Experiment produces 7Gb XML file containing 40,000 scans 600,000,000 data points In approx 100 minutes

Data processing cantake over 24 hours• Pick 2D Peaks• De-isotope• Pick 3D Peaks• Match weights to known

peptides

Mass Spectrometry

New Lab has 12 Machines

That’s a lot of data and a lot of data processing

Parallel Computing

Parallel ProcessingAmdahl’s Law

Serial portion is fixedGustafson’s Law

Size of problem is not fixedGunther’s Law

linear scalability issues

Working Environment

Parallel Algorithm

2D peak picking fits well into a Map task– Read into

memory– Decode base64

float array– Peak pick,

isotopic envelope detection

Parallel Algorithm3D peak picking fits well into a Reduce task– Receive partitions of 2D peaks– Detect 3D peaks– Isotopic envelopes– Output peakMass Intensity

Issues

XML is not a good format for parallel processing

Issues

372 372.2 372.4 372.6 372.8 373 373.2 373.4 373.6 373.8 37438

38.5

39

39.5

40

40.5

41

IssuesData Shuffle and skew on the cluster

51 115 180 244 308 372 436 500 564 628 692 756 820 884 948 1012 1076 1141 1206 1271 1337 1402 1466 1530 15940

20000

40000

60000

80000

100000

120000

Series1

Results

MapReduce

Map

Reduce

Shuffle

MapReduce

Transforming the XML and writing the modified data to• HDFS• Hbase• CassandraExecuting the MapReduce code reading from the above

Batch process shows potential of speeding up the current process by scaling the size of the cluster running it.

Flink

Experiences so farVery easy to installVery easy to understandGood documentationVery easy to adapt current codeI like it!

public class PeakPick extends Configured implements Tool { Job job=new Job(getConf(), "peakpick");job.setJarByClass(PeakPick.class);job.setNumReduceTasks(104);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);job.setMapperClass(MapHDFS.class);job.setReducerClass(ReduceHDFS.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setPartitionerClass(MZPartitioner.class);FileInputFormat.setInputPaths(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, new Path(args[2]));job.waitForCompletion(true);return job.isSuccessful() ? 0 : 1;

} public static void main(String[] args) throws Exception{int res = ToolRunner.run(new Configuration(), new PeakPick(), args);System.exit(res); }

MR Job

MR Readpublic class MapHDFS extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(LongWritable key, Text value, Context context {

String inputLine = value.toString();tempStr = inputLine.split("\t"); scNumber = tempStr[1];……intensityString = tempStr[8]

}

public class MapCassandra extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, IntWritable> { public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) {

scNumber = String.valueOf(key.getInt());for (Cell cell : columns.values()){String name = ByteBufferUtil.string(cell.name().toByteBuffer());if (name.contains("scan")) scNumber String.valueOf(ByteBufferUtil.toInt(cell.value()));if (name.contains("mslvl")) scLevel = String.valueOf(ByteBufferUtil.toInt(cell.value()));if (name.contains("rettime")) RT = String.valueOf(ByteBufferUtil.toDouble(cell.value()));}

MR Writepublic class MapHDFS extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(LongWritable key, Text value, Context context {

…………for (int i=0; i<outputPoints.size(); i++){mzStringOut = scNumber + "\t" + scLevel + "\t" + RT + "\t" + Integer.toString(outputPoints.get(i).getCurveID()) + "\t" +Double.toString(outputPoints.get(i).getWpm()) context.write(new IntWritable(outputPoints.get(i).getKey()), peakOut);

}

public class ReduceHDFS extends Reducer<IntWritable, Text, IntWritable, Text> { public void reduce(IntWritable key, Iterable<Text> values, Context context){

…………for (int k = 0; k<MonoISO.size();k++){outText = MonoISO.get(k).getCharge() + "\t" +MonoISO.get(k).getWpm() + "\t" +MonoISO.get(k).getSumI() + "\t" +MonoISO.get(k).getWpRT();context.write(new IntWritable(0), new Text(outText));

}

Flink Job

public class PeakPickFlink_MR {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Job job = Job.getInstance();

//Setup input format HadoopInputFormat<LongWritable, Text> hadoopIF = new HadoopInputFormat<LongWritable, Text>( new TextInputFormat(), LongWritable.class, Text.class, job ); TextInputFormat.addInputPath(job, new Path(args[0]));

//Read HDFS Data DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

Flink Job

// use Hadoop Mapper as MapFunction DataSet<Tuple2<IntWritable, Text>> result = text .flatMap(new HadoopMapFunction<LongWritable, Text, IntWritable, Text>( new MapHDFS() )) .groupBy(0)

// use Hadoop Reducer .reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, Text>(new ReduceHDFS() ));

Flink Job// Set up the Hadoop TextOutputFormat.

HadoopOutputFormat<IntWritable, Text> hadoopOF = new HadoopOutputFormat<IntWritable, Text>( new TextOutputFormat<IntWritable, Text>(), job );

//Write results back to HDFShadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); TextOutputFormat.setOutputPath(job, new Path(args[1]));

// Emit data using the Hadoop TextOutputFormat. result.output(hadoopOF).setParallelism(1);

// Execute Code env.execute("Hadoop PeakPick"); }}

Interim Results

Mapper onlyHadoop 12m25sFlink 4m50s

Mapper and ReducerHadoop 28m32sFlink 10m20s

Existing code 35m22s

Near real-time?Still not fast enough to be called near real-time

Processing x scans per second, if the cluster was big enough then maybe…

But… the mass spectrometer takes 100 minutes to complete its processing for one experiment so in fact we have more than enough time to process the data if we are streaming the results and processing the data as it is produced…

Streaming the data

Simulate streaming data using an existing data file and kafka

Ingest the data using Flink Streaming API and process the scans using the existing mapper code

Existing Data File Kafka Flink Streaming API

Streaming the results

A peptide elutes over a period of time, this means the data from many scans needs to be compared at the same time.

A safe window to measure the quantity of a peptide is 10 seconds

10 seconds

Interim Results

Overlapping 10 second windows captures 3D peaks from the 2D scans

Interim ResultsProcessing the entire scan in the 10 second window means that we don’t need the overlapping window and the de-duplication step

All this means that the data will be fully pre-processed just over 10 seconds after the mass spectrometer completes the experiment.

Near real-time?

Stream Processing

Complete stable working system Contrast with Spark and Storm

Hookup previous research on database lookup to create a complete system

Pay for some EC2 system time to complete testing

Write a thesis…

To Do list

Questions?

chillman@dundee.ac.uk@chillax7

http://www.thequeensarmskensington.co.uk

top related