map reduce ~continuous map reduce design~

40
Map Reduce ~Continuous Map Reduce Design~ 第13回 データマイニング+WEB @東京(#TokyoWebmining) doryokujin

Upload: takahiro-inoue

Post on 15-Jan-2015

6.698 views

Category:

Technology


2 download

DESCRIPTION

 

TRANSCRIPT

Page 1: Map Reduce ~Continuous Map Reduce Design~

Map Reduce~Continuous Map Reduce Design~

第13回 データマイニング+WEB @東京(#TokyoWebmining)

doryokujin

Page 2: Map Reduce ~Continuous Map Reduce Design~

[名前] doryokujin ( 井上 敬浩 )

[年齢] 26歳

[専攻] 数学(統計・確率的アルゴリズム)

[会社] 芸者東京エンターテインメント(GTE)

[職業] データマイニングエンジニア

[趣味] マラソン ( 42.195km: 2時間33分 )

[コミュニティ]

・MongoDB JP: もっとMongoDBを日本に!

・TokyoWebMining: 統計解析・データマイニングの各種方法論、WEB上のデータ活用に関する勉強会

・おしゃれStatistics: 名著「statistics」を読み進めながら統計を学ぶ勉強会 with @isseing333 @dichika

自己紹介

Page 3: Map Reduce ~Continuous Map Reduce Design~

お知らせ(1つ)

Page 5: Map Reduce ~Continuous Map Reduce Design~

1. Map Reduce とは

2. Continuous Map Reduce・2-1. Compare to Traditional MR・2-2. Design Concept・2-3. Example

3. 応用編について

アジェンダ

Page 6: Map Reduce ~Continuous Map Reduce Design~

1. Map Reduce (Map)30 CHAPTER 2. MAPREDUCE BASICS

! " # $ % &

'())*+ '())*+ '())*+ '())*+

,( - . / /0 1 ( /2 . , /3 4

/5',67*+ /5',67*+ /5',67*+ /5',67*+

)) )) )) ))

,( - . / 8 ( /2 . , /3 4

)(+969657*+ )(+969657*+ )(+969657*+ )(+969657*+

:;<==>*?(7@?:5+9A (BB+*B(9*?C(><*D?,E?F*ED

( - 2 , . 3 / . 8 4

) ) ) )

+*@</*+ +*@</*+ +*@</*+

G 2 H 3 I 8

Figure 2.4: Complete view of MapReduce, illustrating combiners and partitioners in addi-

tion to mappers and reducers. Combiners can be viewed as “mini-reducers” in the map phase.

Partitioners determine which reducer is responsible for a particular key.

a combiner can significantly reduce the amount of data that needs to be copied over

the network, resulting in much faster algorithms.

The complete MapReduce model is shown in Figure 2.4. Output of the mappers

are processed by the combiners, which perform local aggregation to cut down on the

number of intermediate key-value pairs. The partitioner determines which reducer will

be responsible for processing a particular key, and the execution framework uses this

information to copy the data to the right location during the shuffle and sort phase.13

Therefore, a complete MapReduce job consists of code for the mapper, reducer, com-

biner, and partitioner, along with job configuration parameters. The execution frame-

work handles everything else.

13In Hadoop, partitioners are actually executed before combiners, so while Figure 2.4 is conceptually accurate,

it doesn’t precisely describe the Hadoop implementation.

1.Split処理入力データを一定のサイズに分割して各mapperに渡す

2.Map処理ラインごとに1つ以上の(key,value)のペアを作成

3.Combiner処理Mapで処理されたペア(key, list(values))→(key, value2)

に集約

Page 7: Map Reduce ~Continuous Map Reduce Design~

1. Map Reduce (Reduce)30 CHAPTER 2. MAPREDUCE BASICS

! " # $ % &

'())*+ '())*+ '())*+ '())*+

,( - . / /0 1 ( /2 . , /3 4

/5',67*+ /5',67*+ /5',67*+ /5',67*+

)) )) )) ))

,( - . / 8 ( /2 . , /3 4

)(+969657*+ )(+969657*+ )(+969657*+ )(+969657*+

:;<==>*?(7@?:5+9A (BB+*B(9*?C(><*D?,E?F*ED

( - 2 , . 3 / . 8 4

) ) ) )

+*@</*+ +*@</*+ +*@</*+

G 2 H 3 I 8

Figure 2.4: Complete view of MapReduce, illustrating combiners and partitioners in addi-

tion to mappers and reducers. Combiners can be viewed as “mini-reducers” in the map phase.

Partitioners determine which reducer is responsible for a particular key.

a combiner can significantly reduce the amount of data that needs to be copied over

the network, resulting in much faster algorithms.

The complete MapReduce model is shown in Figure 2.4. Output of the mappers

are processed by the combiners, which perform local aggregation to cut down on the

number of intermediate key-value pairs. The partitioner determines which reducer will

be responsible for processing a particular key, and the execution framework uses this

information to copy the data to the right location during the shuffle and sort phase.13

Therefore, a complete MapReduce job consists of code for the mapper, reducer, com-

biner, and partitioner, along with job configuration parameters. The execution frame-

work handles everything else.

13In Hadoop, partitioners are actually executed before combiners, so while Figure 2.4 is conceptually accurate,

it doesn’t precisely describe the Hadoop implementation.

4.Shuffle処理PartitionerによってkeyごとにReduce先が決定され、Shuffleが行われる

5.Sort処理各ReducerごとにkeyでSort

6.Reduce処理各mapperから渡された(key, list(values))→(key, value3)にReduce

Page 8: Map Reduce ~Continuous Map Reduce Design~

11

...

...

Map

Shuffle

Reduce

Input Splits

Output Files

<K, V>

<K, list(V)>

<list(V)>

Figure 2.2: Hadoop implements a MapReduce job as a three-phase execution flow made

up of the map, shuffle and reduce phases.

simply restarts that task. The exact same method is used for reduce tasks as well. This

can be done because a reduce task can not start until all map tasks have finished. In

other words, the reduce tasks must wait until all intermediate pairs are grouped, sorted

and spooled to disk before applying reduce functions.

Hadoop uses a precise, or fully-consistent, fault tolerance model. The system

will either return results corresponding to the entire input data set or will not return

results at all. A consequence is that one straggling map task can block the progress of

an entire MapReduce job [41]. As scale increases, a fully-consistent model might not

be suitable for batch-oriented jobs that still need to complete in a reasonable amount

of time. When dealing with hundreds of thousands of nodes, there is always a set of

nodes that is not available due to failure. This may hamstring MapReduce jobs if the

framework has to wait for all map and reduce tasks to complete successfully before

returning results. Thus, knowing when (and where) to restart tasks is a complicated

question that can have a large impact of job completion time [41].

※Wait Process全てのMap処理が完了しないとReduce処理に移行しない

※Disk BufferMap処理後の中間データは一旦

Diskに書き込まれる

1.Wait Process / Disk Buffer

Continuous MapReduce: An Architecture for Large-scale In-situ Data Processing [pdf]

Page 9: Map Reduce ~Continuous Map Reduce Design~

1. Wait Process / Disk Buffer[ ∵ Fault Tolerance ]

・全てのノードが正常である保証は無い

・Map処理に失敗したノードの代わりに同じ処理を他のノードが引き継ぐ

・不正確な出力が得られないように、Reduce処理される前に全てのMap処理が完了したことを保証したい

・また全てのMap出力(サイズ大)を一時的に保存するためにはDiskへのフラッシュが必要

Page 10: Map Reduce ~Continuous Map Reduce Design~

2. Continuous

Map Reduce (CMR)

Page 11: Map Reduce ~Continuous Map Reduce Design~

2-1. Compare to

Traditional Map Reduce

Page 12: Map Reduce ~Continuous Map Reduce Design~

2-1. Compare To Traditional MR

[Traditional Map Reduce]

・”Store-First-Query-Later Model”: 対象のデータは必ず事前にHDFS等の分散FSに保存される。解析は全てのデータの保存を待ってから開始

・”Batch-Oriented”: 定期的(毎週、毎日、毎時)に蓄積されているデータを処置する

Page 13: Map Reduce ~Continuous Map Reduce Design~

[Continuous Map Reduce]

・”Query-Then-Store in-situ Model”: データの保存を待たずにログサーバー上から解析処理(MR)を実行していく。HDFSに保存されるのは出力のみ

・”Streaming Processing (Continuous Queries)”: 境界のないストリーミングデータに対し、継続的な解析が行われる。Map処理、Reduce処理に完了がない

2-1. Compare To Traditional MR

Page 14: Map Reduce ~Continuous Map Reduce Design~

3

Distributed File SystemData Processing Framework

Ha

do

op

Ma

pR

ed

uce

HDFS

Log Servers

Framework Users

Query

Result

Figure 1.1: Log processing with the store-first-query-later model. Apache Hadoop [3]is used as an example.

frameworks in a traditional store-first-query-later model [17]. Companies migrate log

data from the source nodes to an append-only distributed file system such as GFS [18] or

HDFS [3]. The distributed file system replicates the log data for availability and fault-

tolerance. Once the data is placed in the file system, users can execute queries using

bulk-processing frameworks and retrieve results from the distributed file system. Figure

1.1 illustrates this model.

The current approach exhibits a number of key limitations in this environment.

Migrating log data into the distributed file system will take a prohibitive amount of

time for even batch-oriented analysis with loose time constraints. A simple back-of-

the-envelope analysis reveals this issue. Consider 10,000 servers (Facebook currently

harvests 30,000) producing log data at a rate of 10 MB/sec. This cluster generates 8

petabytes of log data per day. In our measurements, server-class machines can sink

data at a rate of 30 MB/sec. It would take 3,314 dedicated HDFS [3] nodes to sink

that amount of log data in one day. These machines are completely I/O bound, and are

unable to perform a significant amount of data processing at the same time. Thus, all of

their CPUs are left virtually idle. Servers are one of the largest fixed, depreciating costs

in a data center, putting a substantial price tag on data migration alone.

This fundamental limitation has broad ramifications on the fidelity and avail-

5

Cloud Serverswith Logs

Data Processing Framework (Continuous MapReduce)Framework Users

Query

Results

Distributed File System

HDFS

Figure 1.2: Log processing with the query-then-store in-situ model.

MapReduce (CMR), a data processing framework to address large-scale batch-oriented

workloads where timeliness is still important.

CMR’s architecture has several properties that specifically address the limita-

tions of current approaches. It is scalable across large networks, enabling companies to

handle the next generation of data management challenges. It is responsive, allowing

more agile analysis of valuable log data. It is highly available, providing users with

results in the presence of failure. Finally, it is easily adoptable, creating a solution that

can be integrated into existing infrastructures.

These architectural properties are supported with several unique design

choices. CMR’s architecture uses a query-then-store model (Figure 1.2), taking dis-

tributed storage systems off the critical path for delivering results to a user. It processes

data in-situ, or on location, eliminating wasteful migration of unused data. It uses in-

network aggregation to save valuable bandwidth resources by reducing the amount of

data sent across the network. It uses a push-based model and continuous queries with

incremental processing windows to handle data as it arrives, accommodating the latency

requirements of update-driven recurring queries. The architecture also explores a re-

laxed consistency model that prevents an entire query from being blocked by a small

percentage of unavailable data. Lastly, CMR extends the original MapReduce program-

”Store-First-Query-Later Model”

”Query-Then-Store in-situ Model”Continuous MapReduce: An Architecture for Large-scale In-situ Data Processing [pdf]

Page 15: Map Reduce ~Continuous Map Reduce Design~

2-2. Design Concept

Page 16: Map Reduce ~Continuous Map Reduce Design~

2-2. Design Concept

[Continuous Map Reduce Design]

・”In-situ(on location) Processing”: MRはログが出力されるサーバーまたは経由サーバーの中で実行されながらROOT(結果格納先)までダウンストリーム

・”Pipelined Processing”: 各Map処理完了時、他のMap処理の完了を待たずに中間データをReducerに渡す

Page 17: Map Reduce ~Continuous Map Reduce Design~

[Continuous Map Reduce Design]

・”Time Window Processing”: 境界のないストリーミングデータを、時間を軸にした範囲(Window)に分割

・”Incremental Processing”: MRの出力は今までの結果に”Merge”され、古くなった出力は結果から”Unmerge”される

2-2. Design Concept

Page 18: Map Reduce ~Continuous Map Reduce Design~

In-situ(on location) Processing

Figure 1: The in-situ MapReduce architecture avoids thecost and latency of the store-first-query-later design bymoving processing onto the data sources.

speed of social network updates or accuracy of ad target-ing. The in-situ MapReduce (iMR) architecture buildson previous work in stream processing [5, 7, 9] to sup-port low-latency continuous log processing. Like streamprocessors, iMR MapReduce jobs can process over slid-ing windows, updating and delivering results as new dataarrives.Available: iMR’s lossy data model allows the system

to return results that may be incomplete. This allowsthe system to improve result availability in the event offailures or processing and network delays. Additionally,iMR may pro-actively reduce processing fidelity throughload shedding, reducing the impact on existing servertasks. iMR attaches a metric of result quality to eachoutput, allowing users to judge the relative accuracy ofprocessing. Users may also explicitly trade fidelity forimproved result latency by specifying latency and fidelitybounds on their queries.Efficient: A log processing architecture should make

parsimonious use of computational and network re-sources. iMR explores the use of sub-windows orpanes for efficient continuous processing. Instead of re-computing each window from scratch, iMR allows incre-mental processing, merging recent data with previouslycomputed panes to create the next result. And adaptiveload-shedding policies ensure that nodes use computecycles for results that meet latency requirements.Compatible: iMR supports the traditional MapRe-

duce API, making it trivial to “port” existingMapReducejobs to run in-situ. It provides a single extension, un-combine, to allow users to further optimize incrementalprocessing in some contexts (Section 2.3.2).

2.1 In-situ MapReduce jobsA MapReduce job in iMR is nearly identical to that intraditional MapReduce architectures [12]. Programmersspecify two data processing functions: map and reduce.The map function outputs key-value pairs, {k, v}, for

each input record, and the reduce processes each groupof values, v[], that share the same key k. iMR is designedfor queries that are either highly selective or employ re-duce functions that are distributive or algebraic aggre-gates [14]. Thus we expect that users will also specify theMapReduce combiner, allowing the underlying systemto merge values of a single key to reduce data movementand distribute processing overhead. The use of a com-biner allows iMR to process windows incrementally andfurther reduce data volumes through in-network aggrega-tion. The only non-standard (but optional) function iMRMapReduce jobs may implement is uncombine, whichwe describe in Section 2.3.2.However, the primary way in which iMR jobs differ is

that they emit a stream of results computed over contin-uous input, e.g., server log files. Like data stream pro-cessors [7], iMR bounds computation over these (per-haps infinite) data streams by processing over a windowof data. The window’s range R defines the amount ofdata processed in each result, while the window’s slideS defines its update frequency. For example, a usercould count error events over the last 24 hours of logrecords (R = 24 hours), and update the count everyhour (S = 1 hour). This sliding window, one whoseslide S is less than its range R, may be in terms of wall-clock time or logical index, such as record count, bytes,or any user-defined sequence number. Users specify Rand S with simple annotations to the reduce function.While sufficient for real-time log processing, a

MapReduce job in iMRmay reference historical log dataas well. Doing so requires a job-level annotation thatspecifies the point in the local log to begin B and the to-tal data to consume, the extent E. If unspecified, the jobcontinues to process, possibly catching up to real-timeprocessing.

2.2 Job executionIn general, MapReduce architectures have three primarytasks: the parallel execution of the map phase, groupinginput records by key, and the parallel execution of the re-duce phase. In cluster-based MapReduce systems, likeHadoop, each map task produces key-value pairs, {k,v},from raw input records at individual nodes in the clus-ter. The map tasks then group values by their key k, andsplit the set of keys into r partitions. After the map tasksfinish, the system starts a reduce task for each partitionr. These tasks first download their partition’s key-valuepairs from each mapper (the shuffle), finish grouping val-ues, and then call reduce once for every {k,v[]} pair.iMR distributes the work of a MapReduce job across

multiple trees, one for each reducer partition. Figure 2illustrates one such tree; iMR co-locates map processingon the server nodes themselves, sourcing input records

!"#$"

!"#$%"

&'(

%#&'()*

)*+,-."

%#&'()*

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

+&,-+.#",&#/0

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

!"#$$%&!'()*+,-./01

)01201*%$$,

+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0

+&,-+.#",&#/0+&,-+.#",&#/0

Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.

(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable

functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.

2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-

ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.

!"#$%& !'( !)% !*+ !()",-./#"0-1.2

3014!5

6!7819:7-;,<./,</10<.

=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB

+/3,< )+/3,< %&+/3,<

C+$ CD=> ?@//A% CD

=> ?@//A&C%$ CD=> ?@//A& CD

=> ?@//AB

E7F/!.:7!2#

>.GH@0E8.

10<.#

10,!#

Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.

!"# !"$ !%# !%$

!"#&!%# !"$&!

%$

'(()*("+*,-".*,-")+/"0,1"02*3

!"4&!%4

&!# !$!#5 67 &!4 !$!#5849

':;/0<

=:;/0<

Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.

2.3.1 Pane management

Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes

from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs

※Incremental ProcessingMRの出力は既存の結果に集約

される

※Aggregation TreeRootがReduce、それ以外はCombineが実行される

※In-situ ProcessingMRはログサーバー内で実行される、出力のみHDFSに保存

In-situ MapReduce for Log Processing [pdf]

Page 19: Map Reduce ~Continuous Map Reduce Design~

Pipelined Processing

・Continuous Map Reduceでは半永久的にMap処理とReduce処理が続けられる

・Map/Reduceフェーズごとにサーバーの役割が切り替わるのではなく、Map処理、Reduce処理を行うサーバーが事前に決まっている(兼任可)

・各Map処理は完了ごとにReducer(Combiner)へ中間データを転送。Reducerは集計に必要な中間データが全て集まった時点でReduce処理を実行(それまではメモリ上に格納)

・このPipelined Processingによって、全Map処理の完了を待たず、また中間データがDiskにフラッシュされることもなく、効率的に処理が継続できる

Page 20: Map Reduce ~Continuous Map Reduce Design~

・境界のないストリーミングデータを時間の範囲を元に(Time Window)で分割してMRを行う。

・”Range”: 最終的な結果とする対象範囲。例えば24hour、60min。ただしMRはRangeごとに行われるのではない

・”Slide (Slice)”:1回の処理区間。例えば (R,S) = (24hour, 1hour), (60min, 5min)となる。前者では24時間分のデータ集計のために1時間おきに集計を行い、生成した24個のSub Window(=Pane)の集約結果を足し合わせる

Time Window Processing

Page 21: Map Reduce ~Continuous Map Reduce Design~

[Time Windowの種類]

※今回は Sliding Window を扱う

Time Window Processing

明確な基点がある 日や月をまたぐとリセット 直近24時間など、現在を基点にし続ける

Map Reduce and Stream Processing

Page 22: Map Reduce ~Continuous Map Reduce Design~

Incremental Processing・Reducerは全Mapperから同じSlideを全て受け取った時点で処理を実行し、既存の結果データと”Merge”する

・Windowが動いた時点で、古くて不要なPaneを”UnMerge”する

New Window

Slide

reducer

mapper mapper mapper

Merge

UnMerge

Old Window

Time

New Pane

Old Pane

Page 23: Map Reduce ~Continuous Map Reduce Design~

・Map処理は(key, value, timestamp)を生成し、Combinerに”push”する

・[1] Combine処理はすぐには実行されずTimeline Bufferに格納。その際、timestampから該当するPaneに振り分けられていく(GroupBy)

More: Time Window Processing

!"#$"

!"#$%"

&'(

%#&'()*

)*+,-."

%#&'()*

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

+&,-+.#",&#/0

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

!"#$$%&!'()*+,-./01

)01201*%$$,

+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0

+&,-+.#",&#/0+&,-+.#",&#/0

Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.

(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable

functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.

2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-

ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.

!"#$%& !'( !)% !*+ !()",-./#"0-1.2

3014!5

6!7819:7-;,<./,</10<.

=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB

+/3,< )+/3,< %&+/3,<

C+$ CD=> ?@//A% CD

=> ?@//A&C%$ CD=> ?@//A& CD

=> ?@//AB

E7F/!.:7!2#

>.GH@0E8.

10<.#

10,!#

Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.

!"# !"$ !%# !%$

!"#&!%# !"$&!

%$

'(()*("+*,-".*,-")+/"0,1"02*3

!"4&!%4

&!# !$!#5 67 &!4 !$!#5849

':;/0<

=:;/0<

Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.

2.3.1 Pane management

Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes

from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs

In-situ MapReduce for Log Processing [pdf]

Page 24: Map Reduce ~Continuous Map Reduce Design~

・[2] 各Paneは以降該当データが入ってこないと判断した時点でCombine処理(i.e. Paneに属する値リストを集約)が行われ、paneIdをkey、集約値をvalueとして (key,value,timestamp) がダウンストリームに”push”される

・[3] 次のストリームでは、全てのサーバーから同じpaneIdを持った集合(paneId, list(values, timestamps)) を受け取りCombiner処理

・[4] Aggregation TreeのRootでは、Combine処理の代わりにReduce処理を行い、集約結果を元のデータとMergeする(Incremental Processing)

More: Time Window Processing

Page 25: Map Reduce ~Continuous Map Reduce Design~

More: Time Window Processing

!"#$"

!"#$%"

&'(

%#&'()*

)*+,-."

%#&'()*

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

+&,-+.#",&#/0

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

!"#$$%&!'()*+,-./01

)01201*%$$,

+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0

+&,-+.#",&#/0+&,-+.#",&#/0

Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.

(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable

functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.

2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-

ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.

!"#$%& !'( !)% !*+ !()",-./#"0-1.2

3014!5

6!7819:7-;,<./,</10<.

=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB

+/3,< )+/3,< %&+/3,<

C+$ CD=> ?@//A% CD

=> ?@//A&C%$ CD=> ?@//A& CD

=> ?@//AB

E7F/!.:7!2#

>.GH@0E8.

10<.#

10,!#

Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.

!"# !"$ !%# !%$

!"#&!%# !"$&!

%$

'(()*("+*,-".*,-")+/"0,1"02*3

!"4&!%4

&!# !$!#5 67 &!4 !$!#5849

':;/0<

=:;/0<

Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.

2.3.1 Pane management

Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes

from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs

[2] Paneごとに集約処理が行われた後、ダウンストリームへ”Push”

[1] Paneごとにmapperの中間出力を振り分け

[3] サーバーを通して同じPaneが集まった時点

で集約処理

[4] RootではReduce処理が行われ、出力は前までの結果と

Merge

In-situ MapReduce for Log Processing [pdf]

Page 26: Map Reduce ~Continuous Map Reduce Design~

More: Incremental Processing

[4-1] PaneごとにChildAとChildBの中間出力の集約処理

例:Child AのP1はRange[0,10]の集約結果、P2はRange[10,20]の集約結果

[4-2] 旧Window: W0から対象外になったPane: P0 を取り除き、新しくP2を加える

!"#$"

!"#$%"

&'(

%#&'()*

)*+,-."

%#&'()*

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

+&,-+.#",&#/0

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

&'(

%#&'()*

)*+,-."

!"#$$%&!'()*+,-./01

)01201*%$$,

+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0

+&,-+.#",&#/0+&,-+.#",&#/0

Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.

(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable

functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.

2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-

ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.

!"#$%& !'( !)% !*+ !()",-./#"0-1.2

3014!5

6!7819:7-;,<./,</10<.

=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB

+/3,< )+/3,< %&+/3,<

C+$ CD=> ?@//A% CD

=> ?@//A&C%$ CD=> ?@//A& CD

=> ?@//AB

E7F/!.:7!2#

>.GH@0E8.

10<.#

10,!#

Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.

!"# !"$ !%# !%$

!"#&!%# !"$&!

%$

'(()*("+*,-".*,-")+/"0,1"02*3

!"4&!%4

&!# !$!#5 67 &!4 !$!#5849

':;/0<

=:;/0<

Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.

2.3.1 Pane management

Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes

from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs

In-situ MapReduce for Log Processing [pdf]

Page 27: Map Reduce ~Continuous Map Reduce Design~

Incremental Processing(再)・Reducerは全Mapperから同じSlideを全て受け取った時点で処理を実行し、既存の結果データと”Merge”する

・つまりWindowが動いた時点で、古くて不要なPaneを”UnMerge”する

New Window

Slide

reducer

mapper mapper mapper

Merge

UnMerge

Old Window

Time

New Pane

Old Pane

Page 28: Map Reduce ~Continuous Map Reduce Design~

2-3. Example

[例]

・24 Hour Sliding Widow Model

・Range = 24, Slide = 1

・直近24時間の”hit count”を算出、MRは1時間ごとに実行されマージされていく

Page 29: Map Reduce ~Continuous Map Reduce Design~

# Call at each hit record

map(k1, hitRecord) {

timestamp = hitRecord.time

# look up paneId from timestamp

paneId = lookupPane(timestamp)

if (paneId.endFlag == True) {

# Notify whole data of the pane is sent

notify(paneId)

}

emitIntermediate(paneId, 1, timestamp)

}

Ex: Incremental Processing

Map Reduce and Stream Processing

Page 30: Map Reduce ~Continuous Map Reduce Design~

combine(paneId, countList) {

hitCount = 0

for count in countList {

hitCount += count

}

# Send the message to the downstream node

emitIntermediate(paneId, hitCount)

}

Ex: Incremental Processing

Map Reduce and Stream Processing

Page 31: Map Reduce ~Continuous Map Reduce Design~

# if node == root of aggregation tree

reduce(paneId ,countList) {

hitCount = 0

for count in countList {

hitCount += count

}

sv = SlideValue.new(paneId)

sv.hitCount = hitCount

return sv

}

Ex: Incremental Processing

Map Reduce and Stream Processing

Page 32: Map Reduce ~Continuous Map Reduce Design~

# Windowがslideした時 init(slide) {

rangeValue = RangeValue.new

rangeValue.hitCount = 0

return rangeValue

}

# Reduce処理が完了したとき merge(rangeValue, slideValue) {

rangeValue.hitCount += slideValue.hitCount

}

# あるslideがwindowから外れたとき unmerge(rangeValue, slideValue) {

rangeValue.hitCount -= slideValue.hitCount

} Map Reduce and Stream Processing

Page 34: Map Reduce ~Continuous Map Reduce Design~

3. 応用編について

Page 35: Map Reduce ~Continuous Map Reduce Design~

応用編について

[次回予告]: 以下の内容の中でいくつか…

・Map Reduce for 機械学習、グラフ、行列演算

・実装例を紹介

・場合によってはMap Reduce が非効率なことを示し、BSPなどの他の計算モデルを紹介、比較

・Dryad の紹介

Page 37: Map Reduce ~Continuous Map Reduce Design~

Map Reduce for Classifier

A Comparison of Approaches for Large-Scale Data Mining

Figure 2: MapReduce Classifier Training and Evaluation Procedure

Table 1: Numbers of HTML Pages from Eight Categories.

Category Art Business Computer Game Health Home Science Society

Number 176, 340 188, 100 88, 830 39, 560 43, 680 22, 281 85, 197 81, 620

Table 2: Data mining accuracy of Hadoop approach.

Category Round1 Round2 Round3 Round4 Round5 Round6 Round7 Round8 Round9 Round10

Art 80.09% 80.12% 81.18% 80.31% 80.20% 80.17% 80.86% 79.70% 79.79% 80.06%Business 55.82% 58.43% 53.77% 57.30% 57.13% 59.62% 54.98% 58.36% 59.15% 57.98%Computer 82.37% 81.93% 82.88% 82.68% 82.48% 82.18% 82.81% 82.22% 81.69% 82.14%Game 78.83% 78.84% 76.86% 77.70% 78.45% 78.94% 78.17% 79.78% 78.49% 79.02%Health 78.77% 80.49% 79.68% 80.42% 79.95% 80.33% 79.76% 79.71% 80.14% 81.27%Home 68.01% 66.78% 67.97% 66.41% 67.16% 67.12% 67.49% 67.13% 66.16% 67.44%Science 48.47% 50.64% 49.98% 50.19% 49.26% 47.89% 48.34% 49.32% 49.62% 48.53%Society 63.07% 61.00% 61.10% 61.62% 61.77% 61.50% 61.92% 62.16% 62.16% 61.39%

Page 40: Map Reduce ~Continuous Map Reduce Design~

ありがとうございました