use of spark for proteomic scoring seattle presentation
TRANSCRIPT
Use of Spark for Proteomic Scoring
Steven M. Lewis PhDInstitute for Systems Biology
EMBL Uninetthttp://tinyurl.com/qgtzhkw
AbstractTandem mass spectrometry has proven to be a powerful tool for proteomic analysis. A critical step is scoring a measured spectrum against an existing database of peptides and potential modifications. The details of proteomic search are discussed. Such analyses stain the resources of existing machines and are limited in the number of modifications that can be considered. Apache Spark is a powerful tool for parallelizing applications. We have developed a version of Comet - a high precision scoring algorithm and implemented it on a Spark cluster. The cluster outperforms single machines by a factor of greater than ten allowing searched which take 8 hours to be performed in under 30 minutes. Equally important, search speed scales with the number of cores allowing further speed ups or increases in the number of modifications by adding more computing power.The considerations required to run large jobs in parallel will be discussed.
This is a war storyIt describes a large problemThe approaches to parallelize itThe problems encounteredThe tools developed to solve them
How did I get into this?A few years ago I developed a Hadoop
application to to protein searchIt was a good - reasonably big problem We published a paperI got a note from Gurvinder Singh at Uninett a
Norwegian cloud provider asking if I was interested in implementing what I did in Spark
Consider a Protein
MTRRSRVGAGLAAIVLALAAVSAAAPIAGAQSAGSGAVSVTIGDVDVSPANPTTGTQVLITPSINNSGSASGSARVNEVTLRGDGLLATEDSLGRLGAGDSIEVPLSSTFTEPGDHQLSVHVRGLNPDGSVFYVQRSVYVTVDDRTSDVGVSARTTATNGSTDIQATITQYGTIPIKSGELQVVSDGRIVERAPVANVSESDSANVTFDGASIPSGELVIRGEYTLDDEHSTHTTNTTLTYQPQRSADVALTGVEASGGGTTYTISGDAANLGSADAASVRVNAVGDGLSANGGYFVGKIETSEFATFDMTVQADSAVDEIPITVNYSADGQRYSDVVTVDVSGASSGSATSPERAPGQQQKRAPSPSNGASGGGLPLFKIGGAVAVIAIVVVVVRRWRNP
It is a string of Amino Acids (20) designated by one letter
Digestion●Trypsin breaks proteins after arginine (R) or
lysine (K) except when followed by proline (P)MTRSVGAGLAAIVLALAAVSAARPIARGAQSAGSGAVSVKTIGDVDVSPANPTTGTQVL Cleaves to:MTRSVGAGLAAIVLALAAVSAARPIARGAQSAGSGAVSVKTIGDVDVSPANPTTGTQVL
Tandem Mass Spec ProteomicsProteins are digested into Peptides (fragments)Run through a column to separate them andanalyzed in a Mass Spectrometer to yield a spectrum. A database of known proteins is searched for the best match
Basics of Tandem Mass Spectrometry
http://en.wikipedia.org/wiki/Tandem_mass_spectrometry
Measured Spectrum
From Kinter and Sherman
Proteomic Search
So you went into the labPrepared a sample Ran it through a Tandem Mass Spec Collected Thousands of spectra
Now we need a Search a Database of Proteins to find matches
Protein Database● Search Starts with a list of proteins
○ Read From Uniprot○ Parsed from a known genome○ Supplied by a researcher
● Protein Databases for Humans are around 20 million amino Acids
● For search you add the same number of decoy (false) proteins
● Multiorganism databases may run 500 MBMoral - databases are fairly big
Protein Database Fasta File>sp|Q58D72|ATLA1_BOVIN Atlastin-1 OS=Bos taurus GN=ATL1 PE=2 SV=2MAKNRRDRNSWGGFSEKTYEWSSEEEEPVKKAGPVQVLVVKDDHSFELDETALNRILLSEAVRDKEVVAVSVAGAFRKGKSFLMDFMLRYMYNQESVDWVGDHNEPLTGFSWRGGSERETTGIQIWSEIFLINKPDGKKVAVLLMDTQGTFDSQSTLRDSATVFALSTMISSIQVYNLSQNVQEDDLQHLQLFTEYGRLAMEETFLKPFQSLIFLVRDWSFPYEFSYGSDGGS>sp|Q58D72_REVERSED|ATLA1_BOVIN Atlastin-1 OS=Bos taurus GN=ATL1 PE=2 SV=2-REVERSEDMKKKESQETSESKPAPFAQHYLHRHTAAASYLKYLAENTSGQDWLAAAVQDIVAGLERYEGSYRIYAWTCLTILTLGMIMNCLSAIIDLGIFGTVGAIVYTIFIVVFLTAPTRAAHFINKSDNHKIYQIYLEDIETELQQLYRRSFEEGGMKKVGRFLKVSEEKLELHKTQLDNPALFPKDGGCIEEMKKNYTDKATAVAALNNAEATAQLMSKPHPLEEGQYIKIYAKFYEVLGRCTIKN>tr|Q58D73|Q58D73_BOVIN Chromosome 20 open reading frame 29 OS=Bos taurus GN=C20orf29 PE=2 SV=1MVHAFLIHTLRAAKAEEGLCRVLYSCFFGAENSPNDSQPHSAERDRLLRKEQILAVARQVESMYQLQQQACGRHAVDLQPQSSDDPVALHEAPCGAFRLAPGDPFQEPRTVVWLGVLSIGFALVLDTHENLLLVESTLRLLARLLLDHLRLLVPGGANLLLRADCIEGILTRFLPHGQLLFLNDQFVQGLEKEFSAAWSH>tr|Q58D73_REVERSED|Q58D73_BOVIN Chromosome 20 open reading frame 29 OS=Bos taurus GN=C20orf29 PE=2 SV=1-REVERSED
… And so on for the next 20-500 mb
Protein Database● Starting with a database● These are digested in silico to produce peptides● Modifications may be added to produce a list of
peptides to search● Every potential modification roughly doubles the search
space
IAM[15.995]S[79.966]GS[79.966]S[79.966]SAIYVR RGNTVLKDLK IEFLNEAS[79.966]VMK1360.63272 TVRAKQPSEK ...
InSilico Digestion
MTRRSRVGAGLAAIVLALAAVSAAAPIAGAQSAGSGAVSVTIGDVDVSPANPTTGTQVLITPSINNSGSASGSARVNEVTLRGDGLLATEDSLGRLGAGDSIEVPLSSTFTEPGDHQLSVHVRGLNPDGSVFYVQRSVYVTVDDRTSDVGVSARTTATNGSTDIQATITQYGTIPIKSGELQVVSDGRIVERAPVANVSESDSANVTFDGASIPSGELVIRGEYTLDDEHSTHTTNTTLTYQPQRSADVALTGVEASGGGTTYTISGDAANLGSADAASVRVNAVGDGLSANGGYFVGKIETSEFATFDMTVQADSAVDEIPITVNYSADGQRYSDVVTVDVSGASSGSATSPERAPGQQQKRAPSPSNGASGGGLPLFKIGGAVAVIAIVVVVVRRWRNP
Consider a Protein
Digestion●Trypsin breaks proteins after arginine (R) or
lysine (K) except when followed by proline (P)MTRSVGAGLAAIVLALAAVSAARPIARGAQSAGSGAVSVKTIGDVDVSPANPTTGTQVL Cleaves to:MTRSVGAGLAAIVLALAAVSAARPIARGAQSAGSGAVSVK
Well … Almost ●Sometimes cleavages are missed●Sometimes breaks occur in other places ●Some amino acids are modified chemically●Samples may be labeled with isotopes to
distinguish before and after proteins All these changes can push the number of scored peptides from hundreds of thousands to tens of millions or more
Finding Fragments● http://db.systemsbiology.net:8080/proteomicsToolkit/FragIonServlet.html
LGAGDSIEVPB ion Y ionLGAGDSIEVPLGAGDSIEVPLGAGDSIEVPLGAGDSIEVPLGAGDSIEVP LGAGDSIEVP LGAGDSIEVP LGAGDSIEVP LGAGDSIEVP
Theoretical and Measured SpectraB IONY ION
Cross Correlationmeasured=215.36
….
measured=310.17
measured=312.76
measured=312.76 theory=312.18
measured=319.31
measured=344.22
…
measured=354.19 theory=356.17
measured=355.16 theory=356.17
measured=356.08 theory=356.17
measured=355.16
measured=356.08
…
measured=431.21
measured=442.03
measured=442.03 theory=440.24
measured=443.43
…
measured=942.79 theory=944.5
measured=943.55 theory=944.5
measured=943.55
measured=948.6
Score is a weighted sum of matching peaks in the correlation●Scoring is done against all peptides with a similar MZ to
the measured spectrum●The output is the best scoring peptide and a few of the
"runner ups"
NOTE In a typical Experiment only 15-25% of spectra will be identified with peptide in the database These are used to identify proteins
Why is this a Big Data Problem
The human body has about 20K ProteinsUsually for quality control there is a ‘Decoy’ for every proteinThere are optional modifications with increase peptides by a factor
of 2A smaller Sample will have about 50 M peptides - 900 M with
larger database and more modificationsA large run is about 100 K spectraThe search space is proportional to peptides * spectra
Demonstrationspark-submit --class com.lordjoe.distributed.hydra.comet_spark.SparkCometScanScorer ~/SteveSpark.jar ~/SparkClusterEupaG.properties input_searchGUI.xml
spark-submit --class com.lordjoe.distributed.hydra.comet_spark.SparkCometScanScorer ~/SteveSpark.jar ~/SparkClusterEupaG.properties input_searchGUI.xml
http://hwlogin.labs.uninett.no:4040/ Viewer
Political ConcernsTo sell the answer to biologists we must copy a well known algorithm. This means translating the code to Java from C++ and accepting the algorithm’s data structures and memory requirements
Binning50,000 spectra * 2,000,000,000 peptides is a VERY large number
Fortunately all pairs do not have to be scored -
Spectra are measured with precursor mass peptides have a mass - only peptides and spectra in a specific mass range (bin) - need be compared
On modern high precision instruments the bin is about 0.03 DaltonThis reduces the number of pairs to score 2000 million
- on a Small sample we score 128 million pairs at about 500 microsec per scoring
BinningBins put all peptides and spectra with a specific MZ range
into groupsSpectra are put in several binsBins can be subdivided for scoringBins hold N Spectra and K peptidesCurrently there are tens of thousands of binsScoring fails in larger Bins due to excess GC
time
Hadoop Input
CoGroup
FlatMap
PairFlatMap
Sort
Spark Operations
Debugging and PerformanceThis involves taking an unfamiliar problem
running on an unfamiliar platformQuestions
Which operations are taking most time?How many times is each function called?Are functions balanced across machines on the
cluster?When a small number of cases fail how can you
instrument them
Did it work the first time?Hell NoAfter it stopped crashing and did well on a trivial problem a base sample took 30 hours to run on the cluster - Way longer than on a single machine!!!- issues - data not like familiar test data- Hadoop Input format bug
Spark AccumulatorsAccumulators are like counters but much more powerful.Accumulators can track any object supporting add and zero methods
The code is here
Sample Code to Accumulate a Set of Stringspublic class SetAccumulableParam implements AccumulatorParam<Set<String>>, Serializable { public Set<String> addAccumulator(final Set<String> r, final Set<String> t) { HashSet<String> ret = new HashSet<String>(r); ret.addAll(t); return ret; } public Set<String> addInPlace(final Set<String> r1, final Set<String> r2) { return addAccumulator(r1,r2); }
public Set<String> zero(final Set<String> initialValue) { return initialValue; }}
Sample Accumulator UseJavaSparkContext ctx = new JavaSparkContext(sparkConf);// make an accumulatorfinal Accumulator<Set<String>> wordsUsed = ctx.accumulator(new HashSet<String>(), new SetAccumulableParam());JavaRDD<String> lines = ctx.textFile(args[0]); // read linesJavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) { List<String> stringList = Arrays.asList(s.split(" ")); wordsUsed.add(new HashSet<String>(stringList)); // accumulate words return stringList; }});… Finish word count
Function AccumulatorsFunctions extend AbstractFunctionBase
all reporting code in base classFunctions implement doCall not call Calls are wrapped for timing and statistics
Data Gatheredtotal callstotal timetimes executed on each MAC address
Sample Instrumented Functionpublic static class ChooseBestScanScore extends
AbstractLoggingFunction2<IScoredScan, IScoredScan, IScoredScan> { @Override public IScoredScan doCall(final IScoredScan v1, final IScoredScan v2) throws Exception { ISpectralMatch match1 = v1.getBestMatch(); ISpectralMatch match2 = v2.getBestMatch(); (match1.getHyperScore() > match2.getHyperScore()) ? v1 : v2; }}CombineCometScoringResults totalCalls:69M totalTime:29.05 sec
machines:15 variance 0.058
Running Job
Improving PerformanceFix bugs in Hadoop Format for large filesFind most time spent in scoringUse a Parquet database to store digestionDiscover that repartition is cheaper than
expensive operationsSmart partitioning to balance work in partitionsUse more partitions for larger jobs
Smart Partitioninga bin is a set of spectra and peptides that score togetherBin sizes vary by orders of magnitudeScoring puts pressure on memoryBin sizes can be counted before scoring stepPartitioning puts larger bins in separate partitions puts multiple smaller bins in the same partition
Performance● A Larger test test took 4 hours on a single
machine● On a small 15 node cluster it took
○ 69 minutes real time○ Used 41 hours of cpu time○ Scored 2100 million peptides○ generated 605 million peptides○ with 4 potential modifications○ 95% of the time we find the same top
peptides as Comet
Summary
Proteomic Search is a large data problem involving scoring a large number of spectra against an even larger number of candidate peptides.
In the future the complexity will increase with more spectra and more modifications adding more peptides
Spark is a parallel execution environment allowing search to be performed on a cluster
Performance is superior to existing tools and can be improved by increasing the size of the cluster
Code Part 1 // Read Spectra RDD<IMeasuredSpectrum> spectraToScore = SparkScanScorer.getMeasuredSpectra(scoringApplication); // Condition Spectra RDD<CometScoredScan> cometSpectraToScore = spectraToScore.map(new MapToCometSpectrum(comet)); // Assign bins to spectra PairRDD<BinChargeKey, CometScoredScan> keyedSpectra = handler.mapMeasuredSpectrumToKeys(cometSpectraToScore);
// read Proteins RDD<IProtein> proteins = readProteins(jctx); // Digest to peptides RDD<IPolypeptide> digested = proteins.flatMap(new DigestProteinFunction(app)); // map to bins PairRDD<BinChargeKey, IPolypeptide> keyedPeptides = digested.flatMapToPair(new mapPolypeptidesToBin(application, usedBins));
Code Part 2
// Now collect the contents of spectra and peptide bins PairRDD<BinChargeKey, Tuple2<Iterable<CometScoredScan>, Iterable<HashMap<String, IPolypeptide>>>> binContents = keyedSpectra.cogroup(keyedPeptides);// do scoring RDD< IScoredScan> scores = binContents.flatMap(new ScoreSpectrumAndPeptideWithCogroup(application)); // combine spectrum scoring RDD< IScoredScan> cometBestScores = handler.combineScanScores(scores);
// write results as a single file consolidator.writeScores(cometBestScores);
Proteomic Search PseudoCode
RDD<Spectrum> spectra = readSpectra(); // mydata.mzXML
RDD<Proteins> proteins = readDatabase(); // uniprot_swiss.fastaRDD<Peptides> peptides= digest(proteins );
THESE ARE UNUSED SLIDES
DON’T GO HERE
Consider a Protein - a collection of Amino Acids
MTRRSRVGAGLAAIVLALAAVSAAAPIAGAQSAGSGAVSVTIGDVDVSPANPTTGTQVLITPSINNSGSASGSARVNEVTLRGDGLLATEDSLGRLGAGDSIEVPLSSTFTEPGDHQLSVHVRGLNPDGSVFYVQRSVYVTVDDRTSDVGVSARTTATNGSTDIQATITQYGTIPIKSGELQVVSDGRIVERAPVANVSESDSANVTFDGASIPSGELVIRGEYTLDDEHSTHTTNTTLTYQPQRSADVALTGVEASGGGTTYTISGDAANLGSADAASVRVNAVGDGLSANGGYFVGKIETSEFATFDMTVQADSAVDEIPITVNYSADGQRYSDVVTVDVSGASSGSATSPERAPGQQQKRAPSPSNGASGGGLPLFKIGGAVAVIAIVVVVVRRWRNP
Protein Database
Digest
MeasuredSpectra
Normalize
AddModifications
MZ Bin Fragments in one bin
MZ Bin Spectra put in multiple bins
CrossProduct
Scoreall pairs
Hadoop Input
Filter (and write)
FlatMap
PairFlatMap
Hadoop Input
Sort
Spark Operations
Map
What is SparkSpark is a Framework for parallel executionSpark works well on Hadoop clusters (also
has a local mode for testing)Spark is less formal than Map-Reduce and
multiple operations can run locally
Protein Database
Digest
MeasuredSpectra
NormalizeAddModifications
MZ Bin Fragments in one bin
MZ Bin Spectra put in multiple bins
CrossProduct
Scoreall pairs
Sort by Spectra
Report Best Fits
All operations are on a 15 node Spark Cluster and are performed in parallel with lazy executionMost time is spent in the Score all PairsStep
Multi Stage Mass SpecFrom Kinter and Sherman
A Protein is a Collection of Amino Acids
●Each (of 20) Amino acid is indicated by a letter ●Assume we have a sample with a number of
proteins. ●Assume that we can list the possible proteins
in the sample. ●Tandem Mass Spectrometry is similar to
shotgun genomics