chris hillman – beyond mapreduce scientific data processing in real-time

31
beyond mapreduce scientific data processing in real-time Chris Hillman October 13 th 2015 [email protected]

Upload: flink-forward

Post on 16-Apr-2017

5.752 views

Category:

Technology


0 download

TRANSCRIPT

Page 1: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

beyond mapreduce scientific data processing in real-time

Chris Hillman October 13th 2015

[email protected]

Page 2: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Proteomics

GENOME21,000

PROTEOME1,000,000+

Page 3: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Mass SpectrometryEach Experiment produces 7Gb XML file containing 40,000 scans 600,000,000 data points In approx 100 minutes

Data processing cantake over 24 hours• Pick 2D Peaks• De-isotope• Pick 3D Peaks• Match weights to known

peptides

Page 4: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Mass Spectrometry

New Lab has 12 Machines

That’s a lot of data and a lot of data processing

Page 5: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Parallel Computing

Page 6: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Parallel ProcessingAmdahl’s Law

Serial portion is fixedGustafson’s Law

Size of problem is not fixedGunther’s Law

linear scalability issues

Page 7: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Working Environment

Page 8: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Parallel Algorithm

2D peak picking fits well into a Map task– Read into

memory– Decode base64

float array– Peak pick,

isotopic envelope detection

Page 9: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Parallel Algorithm3D peak picking fits well into a Reduce task– Receive partitions of 2D peaks– Detect 3D peaks– Isotopic envelopes– Output peakMass Intensity

Page 10: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Issues

XML is not a good format for parallel processing

Page 11: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Issues

372 372.2 372.4 372.6 372.8 373 373.2 373.4 373.6 373.8 37438

38.5

39

39.5

40

40.5

41

Page 12: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

IssuesData Shuffle and skew on the cluster

51 115 180 244 308 372 436 500 564 628 692 756 820 884 948 1012 1076 1141 1206 1271 1337 1402 1466 1530 15940

20000

40000

60000

80000

100000

120000

Series1

Page 13: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Results

Page 14: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

MapReduce

Map

Reduce

Shuffle

Page 15: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

MapReduce

Transforming the XML and writing the modified data to• HDFS• Hbase• CassandraExecuting the MapReduce code reading from the above

Batch process shows potential of speeding up the current process by scaling the size of the cluster running it.

Page 16: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Flink

Experiences so farVery easy to installVery easy to understandGood documentationVery easy to adapt current codeI like it!

Page 17: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

public class PeakPick extends Configured implements Tool { Job job=new Job(getConf(), "peakpick");job.setJarByClass(PeakPick.class);job.setNumReduceTasks(104);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);job.setMapperClass(MapHDFS.class);job.setReducerClass(ReduceHDFS.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setPartitionerClass(MZPartitioner.class);FileInputFormat.setInputPaths(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, new Path(args[2]));job.waitForCompletion(true);return job.isSuccessful() ? 0 : 1;

} public static void main(String[] args) throws Exception{int res = ToolRunner.run(new Configuration(), new PeakPick(), args);System.exit(res); }

MR Job

Page 18: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

MR Readpublic class MapHDFS extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(LongWritable key, Text value, Context context {

String inputLine = value.toString();tempStr = inputLine.split("\t"); scNumber = tempStr[1];……intensityString = tempStr[8]

}

public class MapCassandra extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, IntWritable> { public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) {

scNumber = String.valueOf(key.getInt());for (Cell cell : columns.values()){String name = ByteBufferUtil.string(cell.name().toByteBuffer());if (name.contains("scan")) scNumber String.valueOf(ByteBufferUtil.toInt(cell.value()));if (name.contains("mslvl")) scLevel = String.valueOf(ByteBufferUtil.toInt(cell.value()));if (name.contains("rettime")) RT = String.valueOf(ByteBufferUtil.toDouble(cell.value()));}

Page 19: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

MR Writepublic class MapHDFS extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(LongWritable key, Text value, Context context {

…………for (int i=0; i<outputPoints.size(); i++){mzStringOut = scNumber + "\t" + scLevel + "\t" + RT + "\t" + Integer.toString(outputPoints.get(i).getCurveID()) + "\t" +Double.toString(outputPoints.get(i).getWpm()) context.write(new IntWritable(outputPoints.get(i).getKey()), peakOut);

}

public class ReduceHDFS extends Reducer<IntWritable, Text, IntWritable, Text> { public void reduce(IntWritable key, Iterable<Text> values, Context context){

…………for (int k = 0; k<MonoISO.size();k++){outText = MonoISO.get(k).getCharge() + "\t" +MonoISO.get(k).getWpm() + "\t" +MonoISO.get(k).getSumI() + "\t" +MonoISO.get(k).getWpRT();context.write(new IntWritable(0), new Text(outText));

}

Page 20: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Flink Job

public class PeakPickFlink_MR {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Job job = Job.getInstance();

//Setup input format HadoopInputFormat<LongWritable, Text> hadoopIF = new HadoopInputFormat<LongWritable, Text>( new TextInputFormat(), LongWritable.class, Text.class, job ); TextInputFormat.addInputPath(job, new Path(args[0]));

//Read HDFS Data DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

Page 21: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Flink Job

// use Hadoop Mapper as MapFunction DataSet<Tuple2<IntWritable, Text>> result = text .flatMap(new HadoopMapFunction<LongWritable, Text, IntWritable, Text>( new MapHDFS() )) .groupBy(0)

// use Hadoop Reducer .reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, Text>(new ReduceHDFS() ));

Page 22: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Flink Job// Set up the Hadoop TextOutputFormat.

HadoopOutputFormat<IntWritable, Text> hadoopOF = new HadoopOutputFormat<IntWritable, Text>( new TextOutputFormat<IntWritable, Text>(), job );

//Write results back to HDFShadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); TextOutputFormat.setOutputPath(job, new Path(args[1]));

// Emit data using the Hadoop TextOutputFormat. result.output(hadoopOF).setParallelism(1);

// Execute Code env.execute("Hadoop PeakPick"); }}

Page 23: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Interim Results

Mapper onlyHadoop 12m25sFlink 4m50s

Mapper and ReducerHadoop 28m32sFlink 10m20s

Existing code 35m22s

Page 24: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Near real-time?Still not fast enough to be called near real-time

Processing x scans per second, if the cluster was big enough then maybe…

But… the mass spectrometer takes 100 minutes to complete its processing for one experiment so in fact we have more than enough time to process the data if we are streaming the results and processing the data as it is produced…

Page 25: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Streaming the data

Simulate streaming data using an existing data file and kafka

Ingest the data using Flink Streaming API and process the scans using the existing mapper code

Existing Data File Kafka Flink Streaming API

Page 26: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Streaming the results

A peptide elutes over a period of time, this means the data from many scans needs to be compared at the same time.

A safe window to measure the quantity of a peptide is 10 seconds

10 seconds

Page 27: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Interim Results

Overlapping 10 second windows captures 3D peaks from the 2D scans

Page 28: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Interim ResultsProcessing the entire scan in the 10 second window means that we don’t need the overlapping window and the de-duplication step

Page 29: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

All this means that the data will be fully pre-processed just over 10 seconds after the mass spectrometer completes the experiment.

Near real-time?

Stream Processing

Page 30: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Complete stable working system Contrast with Spark and Storm

Hookup previous research on database lookup to create a complete system

Pay for some EC2 system time to complete testing

Write a thesis…

To Do list

Page 31: Chris Hillman – Beyond Mapreduce Scientific Data Processing in Real-time

Questions?

[email protected]@chillax7

http://www.thequeensarmskensington.co.uk