large scale data-parsing with hadoop in bioinformatics

15
Large-scale data parsing and algorithm development with Hadoop / MapReduce Ntino Krampis Cloud Computing Workshop 28 October 2010 J. Craig Venter Institute

Upload: ntino-krampis

Post on 11-May-2015

1.972 views

Category:

Technology


2 download

TRANSCRIPT

Page 1: Large scale data-parsing with Hadoop in Bioinformatics

Large-scale data parsing and algorithm development with

Hadoop / MapReduce

Ntino Krampis

Cloud Computing Workshop

28 October 2010

J. Craig Venter Institute

Page 2: Large scale data-parsing with Hadoop in Bioinformatics

Uniref_ID Uniref_Cluster

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0 A7ZA84 A7ZA84 A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9 A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

Canonical example : finding the members of Uniref100 clusters

● 30GB of data, ~ 12 million rows

● remember, this is a “small” example dataset

● your typical server at 32GB of memory + 16 cores

● approach for finding the cluster members ?

Page 3: Large scale data-parsing with Hadoop in Bioinformatics

Uniref_ID Uniref_Cluster

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0 A7ZA84 A7ZA84 A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9 A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

Key Value

A0K3H0 → ( B1JTL4, A2VU91, A0K3H0, ... ) A0Q8P9 → ( A0Q8P9, A7JF80, B4ARM5 )A7ZA84 → ( A7ZA84 )

Traditional approach one : hashing

● Key : Uniref_Cluster ID

● Value : array of cluster member Uniref IDs

● add new Keys or member Uniref IDs in Value if Key exists

● how big hash can you fit in a 32GB memory ?

Page 4: Large scale data-parsing with Hadoop in Bioinformatics

Uniref_ID Uniref_Cluster

A0K3H0 A0K3H0 A2VU91 A0K3H0 B1JTL4 A0K3H0 Q1BRP4 A0K3H0 A0Q8P9 A0Q8P9 A7JF80 A0Q8P9 B4ARM5 A0Q8P9 A7GLP0 A7GLP0 A7ZA84 A7ZA84 A0RAB9 A0RAB9 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83

Traditional approach two : sorting

● sort to bring all Uniref Cluster IDs together

● stream all the lines and get the cluster members

● soring algorithms, memory or disk based ?

● can probably do 100GB with disk paging (slow....)

Page 5: Large scale data-parsing with Hadoop in Bioinformatics

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0A7ZA84 A7ZA84

A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9

A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0A7ZA84 A7ZA84

A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9

A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

Split the data and sort in parallel ?

● implement data distribution across compute nodes (easy)

● implement parallel processing the data fragments at nodes (easy)

● implement exhange of partial sorts / intermediate results between nodes (difficult)

● implement tracking of data fragment failures (difficult)

● let's see in detail how you'd implement all this...

● …which is the same as explaining what MapReduce/Hadoop does automatically for you.

Page 6: Large scale data-parsing with Hadoop in Bioinformatics

A bird's eye view of the Hadoop Map/Reduce framework

● data distribution across the compute nodes :

HDFS , Hadoop Distributed FileSystem

● parallel processing of the data fragments at nodes, part 1 :

Map script written by you (ex. parse Uniref100 cluster IDs from >FASTA)

● exchange of intermediate results between nodes :

Shuffle, aggregates results sharing Key (Uniref cluster ID) on same node If not looking for Uniref clusters use random key and simply parse in parallel at Map

● parallel processing of the data fragments at nodes, part 2 :

Reduce script written by you, processing of aggregated results Not required if you don't want to aggregate using specific Key

● re-scheduling of a job failure with a data fragment :

Automatically

Page 7: Large scale data-parsing with Hadoop in Bioinformatics

Data distribution across compute nodes

Hadoop Distributed Filesystem (HDFS)

● data split in 64MB blocks distributed across nodes of cluster

● to you look as regular files and directories

$fog-0-0-1> hadoop fs -ls , -rm , -rmr , -mkdir , -chmod etc.

$fog-0-0-1> hadoop fs -put uniref100_clusters /user/kkrampis/

● one compute task per block: granularity

tasks per cluster node based on number of blocks at the node

small data tasks prevent “long wall clock” by longest running task

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0A7ZA84 A7ZA84

A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9

A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

fog-0-1-2 : 32GB + 16 cores

fog-0-1-3 : 32GB + 16 cores

fog-0-1-4 : 32GB + + 16 cores

. . . . . . .

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0A7ZA84 A7ZA84

A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9

A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

Page 8: Large scale data-parsing with Hadoop in Bioinformatics

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0A7ZA84 A7ZA84

A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9

A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

● Map script specifying how to parse your data

● Hadoop handles all the parallel execution details

STDIN.each_line do |line|

lineArray = line.split ( / \t / ) uniref_id = lineArray.at ( 0 ) uniref_cluster_id = lineArray.at ( 1 )

puts " #{uniref_cluster_id} \t #{uniref_id} "

End ( beloved Perl: “while <STDIN> { }“ )

fog-0-1-2 : 32GB + 16 cores

fog-0-1-3 : 32GB + 16 cores

fog-0-1-4 : 32GB + + 16 cores

. . . . . . .

Map ( Key, Value )code

Map code

B1JTL4 A0K3H0 A0Q8P9 A0Q8P9 A2VU91 A0K3H0A7ZA84 A7ZA84

A0RAB9 A0RAB9 A7JF80 A0Q8P9 A7GLP0 A7GLP0 B4ARM5 A0Q8P9

A0K3H0 A0K3H0 A9VGI8 A9VGI8 A0KAJ8 Q1BTJ3 A1BI83 A1BI83 Q1BRP4 A0K3H0

( A0K3H0 , B1JTL4 ) ( A0Q8P9 , A0Q8P9 ) ( A0K3H0 , A2VU91 )( A7ZA84 , A7ZA84 )

Parallel processing the of the data fragments part 1 Map phase ( data pre-processing in parallel )

Mapcode

Page 9: Large scale data-parsing with Hadoop in Bioinformatics

( A0K3H0 , B1JTL4 ) ( A0K3H0, A2VU91 )( A0Q8P9, A0Q8P9 )( A7ZA84, A7ZA84 )

( A0Q8P9, A7JF80 )( A0Q8P9, B4ARM5 )( A0RAB9, A0RAB9 ) ( A7GLP0, A7GLP0 )

( A0K3H0, A0K3H0 ) ( A0K3H0, Q1BRP4) ( A1BI83, A1BI83 ) ( A9VGI8, A9VGI8 ) ( Q1BTJ3, A0KAJ8 )

( A0K3H0 , B1JTL4 ) ( A0Q8P9, A0Q8P9 ) ( A0K3H0, A2VU91 )( A7ZA84, A7ZA84 )

( A0RAB9, A0RAB9 ) ( A0Q8P9, A7JF80 )( A7GLP0, A7GLP0 ) ( A0Q8P9, B4ARM5 )

( A0K3H0, A0K3H0 ) ( A9VGI8, A9VGI8 ) ( Q1BTJ3, A0KAJ8 ) ( A1BI83, A1BI83 )( A0K3H0, Q1BRP4 )

fog-0-1-2

fog-0-1-3

fog-0-1-4

. . . . . .

Processing the data fragments across nodes part 1:

fog-0-1-2

fog-0-1-3

fog-0-1-4

. . . . . .

Hadoop performs parallel

sorting by Key

this is intermendiate sorting on the data fragments

at the nodes

Page 10: Large scale data-parsing with Hadoop in Bioinformatics

fog-0-1-1 : master

I have A0K3H0 A0Q8P9 sent

A0Q8P9to fog-0-0-2

I have A0Q8P9

keep it

sent A0K3H0to fog-0-0-1

I have A0K3H0

Exchange of intermediate Shuffle phaseresults between nodes

( A0K3H0 , B1JTL4 ) ( A0K3H0, A2VU91 )( A0Q8P9, A0Q8P9 )( A7ZA84, A7ZA84 )

( A0Q8P9, A7JF80 )( A0Q8P9, B4ARM5 )( A0RAB9, A0RAB9 ) ( A7GLP0, A7GLP0 )

( A0K3H0, A0K3H0 ) ( A0K3H0, Q1BRP4) ( A1BI83, A1BI83 ) ( A9VGI8, A9VGI8 ) ( Q1BTJ3, A0KAJ8 )

fog-0-1-2

fog-0-1-3

fog-0-1-4

. . . . . .

fog-0-1-2

fog-0-1-3

fog-0-1-4

. . . . . .

( A0K3H0 , B1JTL4 ) ( A0K3H0, A2VU91 )( A0K3H0, A0K3H0 ) ( A0K3H0, Q1BRP4) ( A7ZA84, A7ZA84 )

( A0Q8P9, A7JF80 )( A0Q8P9, B4ARM5 )( A0Q8P9, A0Q8P9 )( A0RAB9, A0RAB9 ) ( A7GLP0, A7GLP0 )

( A1BI83, A1BI83 ) ( A9VGI8, A9VGI8 ) ( Q1BTJ3, A0KAJ8 )

Page 11: Large scale data-parsing with Hadoop in Bioinformatics

guaranteed: Keys ordered Values are not ordered can use secondary keys if desiredparallel processing as well

Output: A0K3H0 B1JTL4, A2VU91, A0K3H0, Q1BRP4 A7ZA84 A7ZA84A0Q8P9 A0Q8P9, A7JF80, B4ARM5

last_key, cluster = nil , “ ”,

STDIN.each_line do |line|

uniref_cluster_id, uniref_id = line.split( "\t" )

if last_key && last_key != uniref_cluster_id puts "#{last_key} \t #{cluster}"

else last_key, cluster = uniref_cluster_id, cluster + ',' + uniref_id end

end

Reducecode

fog-0-0-1

fog-0-0-2

fog-0-0-3

. . . . . .

( A0K3H0 , B1JTL4 ) ( A0K3H0, A2VU91 )( A0K3H0, A0K3H0 ) ( A0K3H0, Q1BRP4) ( A7ZA84, A7ZA84 )

( A0Q8P9, A7JF80 )( A0Q8P9, B4ARM5 )( A0Q8P9, A0Q8P9 )( A0RAB9, A0RAB9 ) ( A7GLP0, A7GLP0 )

( A1BI83, A1BI83 ) ( A9VGI8, A9VGI8 ) ( Q1BTJ3, A0KAJ8 ) Reduce

code

Processing the data fragments across nodes part 2 Reduce Phase

Page 12: Large scale data-parsing with Hadoop in Bioinformatics

CAAGGACGTGACAA TATTAATGCAATGAG TAGATCACGTTTTTACCGGACGAACCACA

CTATTTTAGTGGTCAG TGAGTTGCACTTAAG ATTAGGACCATGTAG AGTGGTGCACATGAT

ACGTCAACGTCATCG TTTATCTCTCGAAACT ATTCCATAGTGAGTGTTATCGTTATTGCTAGCCATAGACGTACGTC

fog-0-1-2 : 32GB + 16 cores

fog-0-1-3 : 32GB + 16 cores

fog-0-1-4 : 32GB + + 16 cores

. . . . . . .

Distributed Grep, CloudBLAST – CloudBurst and K-mer frequency counts

( Key , Value )

( ACGT, CAAGGACGTGACAA )( TGCA, TATTAATGCAATGAG )( ACGT, TAGATCACGTTTTTA )

( Key , Value )

( ACGT, CAAGGACGTGACAA )( ACGT, TAGATCACGTTTTTA )( ACGT, CCATAGACGTACGTC)

( Key , Value )

( TGCA, TATTAATGCAATGAG )( TGCA, TGAGTTGCACTTAAG)( TGCA, AGTGGTGCACATGAT)

( Key , Value )

( TGCA, TGAGTTGCACTTAAG )( TGCA, AGTGGTGCACATGAT )

Map Shuffle

Map

while <STDIN> {

$value = $_ ;

if $key = $_ =~ / ACGT / print “ $key \t $value \n”;

if $key = $_ =~ / TGCA / print “ $key \t $value \n”;}

OK, This is some Perl !

Page 13: Large scale data-parsing with Hadoop in Bioinformatics

References

[1] Aaron McKenna et al. The genome analysis toolkit: A mapreduce framework for analyzing next-generation dna sequencing data. Genome Research, 20(9):1297–1303, September 2010.

[2] Suzanne Matthews et al. Mrsrf: an efficient mapreduce algorithm for analyzing large collections of evolutionary trees. BMC Bioinformatics, 11(Suppl 1):S15+, 2010.

[3] G. Sudha Sadasivam et al. A novel approach to multiple sequence alignment using hadoop data grids. In MDAC ’10: Proceedings of 2010 Workshop on Massive Data Analytics on the Cloud, pages 1–7, NY, USA, 2010. ACM.

[4] Christopher Moretti et al. Scaling up classifiers to cloud computers. In ICDM '08. Eighth IEEE International Conference on Data Mining, pages 472-481, NY, USA, 2010. ACM.

[5] Weizhong Zhao et al. Parallel k-means clustering based on mapreduce. In Martin G. Jaatun, Gansen Zhao, and Chunming Rong, editors, Cloud Computing 5931;2:2–18. Springer Berlin,2009.

[6] Yang Liu et al. Mapreduce-based pattern finding algorithm applied in motif detection for prescription compatibility network. Lecture Notes in Computer Science 27: 341–355

[7] Michael C. Schatz. Cloudburst: highly sensitive read mapping with mapreduce. Bioinformatics, 25(11):1363–1369, June 2009.

[8] Ben Langmead et al. Searching for snps with cloud computing. Genome Biology, 10(11):R134+, November 2009.

Page 14: Large scale data-parsing with Hadoop in Bioinformatics

Further References

● showed the core framework and Hadoop streaming (using scripting languages)

● much more for experienced Java developers:

- complex data structures on the Value field

- combiners, custom serialization, compression

- coding patterns for algorithms in MapReduce

● http://hadoop.apache.org :

- Hbase / Hive : scalable, distributed database / data warehouse for large tables

- Mahout: A Scalable machine learning and data mining library

- Pig: data workflow language and execution framework for parallel computation.

Page 15: Large scale data-parsing with Hadoop in Bioinformatics

/home/cloud/training/hadoop_cmds.sh :

hadoop fs -mkdir /user/$USER/workshop

hadoop fs -put /home/cloud/training/uniref100_proteins /user/$USER/workshop/uniref100_proteins

hadoop fs -ls /user/$USER/workshop

hadoop jar /opt/hadoop/contrib/streaming/hadoop-0.20.2-streaming.jar \-input /user/$USER/workshop/uniref100_proteins \-output /user/$USER/workshop/uniref100_clusters \-file /home/cloud/training/uniref100_clusters_map.rb \-mapper /home/cloud/training/uniref100_clusters_map.rb \-file /home/cloud/training/uniref100_clusters_reduce.rb \-reducer /home/cloud/training/uniref100_clusters_reduce.rb\

hadoop fs -get /user/$USER/workshop/uniref100_clusters /home/cloud/users/$USER

gunzip /home/cloud/users/$USER/uniref100_clusters/part-00000.gz

more /home/cloud/users/$USER/uniref100_clusters/part-00000hadoop fs -rmr /user/$USER/workshop