map reduce ~continuous map reduce design~
DESCRIPTION
TRANSCRIPT
Map Reduce~Continuous Map Reduce Design~
第13回 データマイニング+WEB @東京(#TokyoWebmining)
doryokujin
[名前] doryokujin ( 井上 敬浩 )
[年齢] 26歳
[専攻] 数学(統計・確率的アルゴリズム)
[会社] 芸者東京エンターテインメント(GTE)
[職業] データマイニングエンジニア
[趣味] マラソン ( 42.195km: 2時間33分 )
[コミュニティ]
・MongoDB JP: もっとMongoDBを日本に!
・TokyoWebMining: 統計解析・データマイニングの各種方法論、WEB上のデータ活用に関する勉強会
・おしゃれStatistics: 名著「statistics」を読み進めながら統計を学ぶ勉強会 with @isseing333 @dichika
自己紹介
お知らせ(1つ)
祝日本語版発売:Data-Intensive Text Processing with MapReduce
・待望の日本語訳が8月~9月にオライリーから発売!
・「アルゴリズムデザイン」「転置インデックス」「グラフアルゴリズム」「EMアルゴリズム」
・Jimmiy Lin (著) Chris Dyer (著)
・玉川竜司 (訳)Hadoop 第2版も7月頃発売!
1. Map Reduce とは
2. Continuous Map Reduce・2-1. Compare to Traditional MR・2-2. Design Concept・2-3. Example
3. 応用編について
アジェンダ
1. Map Reduce (Map)30 CHAPTER 2. MAPREDUCE BASICS
! " # $ % &
'())*+ '())*+ '())*+ '())*+
,( - . / /0 1 ( /2 . , /3 4
/5',67*+ /5',67*+ /5',67*+ /5',67*+
)) )) )) ))
,( - . / 8 ( /2 . , /3 4
)(+969657*+ )(+969657*+ )(+969657*+ )(+969657*+
:;<==>*?(7@?:5+9A (BB+*B(9*?C(><*D?,E?F*ED
( - 2 , . 3 / . 8 4
) ) ) )
+*@</*+ +*@</*+ +*@</*+
G 2 H 3 I 8
Figure 2.4: Complete view of MapReduce, illustrating combiners and partitioners in addi-
tion to mappers and reducers. Combiners can be viewed as “mini-reducers” in the map phase.
Partitioners determine which reducer is responsible for a particular key.
a combiner can significantly reduce the amount of data that needs to be copied over
the network, resulting in much faster algorithms.
The complete MapReduce model is shown in Figure 2.4. Output of the mappers
are processed by the combiners, which perform local aggregation to cut down on the
number of intermediate key-value pairs. The partitioner determines which reducer will
be responsible for processing a particular key, and the execution framework uses this
information to copy the data to the right location during the shuffle and sort phase.13
Therefore, a complete MapReduce job consists of code for the mapper, reducer, com-
biner, and partitioner, along with job configuration parameters. The execution frame-
work handles everything else.
13In Hadoop, partitioners are actually executed before combiners, so while Figure 2.4 is conceptually accurate,
it doesn’t precisely describe the Hadoop implementation.
1.Split処理入力データを一定のサイズに分割して各mapperに渡す
2.Map処理ラインごとに1つ以上の(key,value)のペアを作成
3.Combiner処理Mapで処理されたペア(key, list(values))→(key, value2)
に集約
1. Map Reduce (Reduce)30 CHAPTER 2. MAPREDUCE BASICS
! " # $ % &
'())*+ '())*+ '())*+ '())*+
,( - . / /0 1 ( /2 . , /3 4
/5',67*+ /5',67*+ /5',67*+ /5',67*+
)) )) )) ))
,( - . / 8 ( /2 . , /3 4
)(+969657*+ )(+969657*+ )(+969657*+ )(+969657*+
:;<==>*?(7@?:5+9A (BB+*B(9*?C(><*D?,E?F*ED
( - 2 , . 3 / . 8 4
) ) ) )
+*@</*+ +*@</*+ +*@</*+
G 2 H 3 I 8
Figure 2.4: Complete view of MapReduce, illustrating combiners and partitioners in addi-
tion to mappers and reducers. Combiners can be viewed as “mini-reducers” in the map phase.
Partitioners determine which reducer is responsible for a particular key.
a combiner can significantly reduce the amount of data that needs to be copied over
the network, resulting in much faster algorithms.
The complete MapReduce model is shown in Figure 2.4. Output of the mappers
are processed by the combiners, which perform local aggregation to cut down on the
number of intermediate key-value pairs. The partitioner determines which reducer will
be responsible for processing a particular key, and the execution framework uses this
information to copy the data to the right location during the shuffle and sort phase.13
Therefore, a complete MapReduce job consists of code for the mapper, reducer, com-
biner, and partitioner, along with job configuration parameters. The execution frame-
work handles everything else.
13In Hadoop, partitioners are actually executed before combiners, so while Figure 2.4 is conceptually accurate,
it doesn’t precisely describe the Hadoop implementation.
4.Shuffle処理PartitionerによってkeyごとにReduce先が決定され、Shuffleが行われる
5.Sort処理各ReducerごとにkeyでSort
6.Reduce処理各mapperから渡された(key, list(values))→(key, value3)にReduce
11
...
...
Map
Shuffle
Reduce
Input Splits
Output Files
<K, V>
<K, list(V)>
<list(V)>
Figure 2.2: Hadoop implements a MapReduce job as a three-phase execution flow made
up of the map, shuffle and reduce phases.
simply restarts that task. The exact same method is used for reduce tasks as well. This
can be done because a reduce task can not start until all map tasks have finished. In
other words, the reduce tasks must wait until all intermediate pairs are grouped, sorted
and spooled to disk before applying reduce functions.
Hadoop uses a precise, or fully-consistent, fault tolerance model. The system
will either return results corresponding to the entire input data set or will not return
results at all. A consequence is that one straggling map task can block the progress of
an entire MapReduce job [41]. As scale increases, a fully-consistent model might not
be suitable for batch-oriented jobs that still need to complete in a reasonable amount
of time. When dealing with hundreds of thousands of nodes, there is always a set of
nodes that is not available due to failure. This may hamstring MapReduce jobs if the
framework has to wait for all map and reduce tasks to complete successfully before
returning results. Thus, knowing when (and where) to restart tasks is a complicated
question that can have a large impact of job completion time [41].
※Wait Process全てのMap処理が完了しないとReduce処理に移行しない
※Disk BufferMap処理後の中間データは一旦
Diskに書き込まれる
1.Wait Process / Disk Buffer
Continuous MapReduce: An Architecture for Large-scale In-situ Data Processing [pdf]
1. Wait Process / Disk Buffer[ ∵ Fault Tolerance ]
・全てのノードが正常である保証は無い
・Map処理に失敗したノードの代わりに同じ処理を他のノードが引き継ぐ
・不正確な出力が得られないように、Reduce処理される前に全てのMap処理が完了したことを保証したい
・また全てのMap出力(サイズ大)を一時的に保存するためにはDiskへのフラッシュが必要
2. Continuous
Map Reduce (CMR)
2-1. Compare to
Traditional Map Reduce
2-1. Compare To Traditional MR
[Traditional Map Reduce]
・”Store-First-Query-Later Model”: 対象のデータは必ず事前にHDFS等の分散FSに保存される。解析は全てのデータの保存を待ってから開始
・”Batch-Oriented”: 定期的(毎週、毎日、毎時)に蓄積されているデータを処置する
[Continuous Map Reduce]
・”Query-Then-Store in-situ Model”: データの保存を待たずにログサーバー上から解析処理(MR)を実行していく。HDFSに保存されるのは出力のみ
・”Streaming Processing (Continuous Queries)”: 境界のないストリーミングデータに対し、継続的な解析が行われる。Map処理、Reduce処理に完了がない
2-1. Compare To Traditional MR
3
Distributed File SystemData Processing Framework
Ha
do
op
Ma
pR
ed
uce
HDFS
Log Servers
Framework Users
Query
Result
Figure 1.1: Log processing with the store-first-query-later model. Apache Hadoop [3]is used as an example.
frameworks in a traditional store-first-query-later model [17]. Companies migrate log
data from the source nodes to an append-only distributed file system such as GFS [18] or
HDFS [3]. The distributed file system replicates the log data for availability and fault-
tolerance. Once the data is placed in the file system, users can execute queries using
bulk-processing frameworks and retrieve results from the distributed file system. Figure
1.1 illustrates this model.
The current approach exhibits a number of key limitations in this environment.
Migrating log data into the distributed file system will take a prohibitive amount of
time for even batch-oriented analysis with loose time constraints. A simple back-of-
the-envelope analysis reveals this issue. Consider 10,000 servers (Facebook currently
harvests 30,000) producing log data at a rate of 10 MB/sec. This cluster generates 8
petabytes of log data per day. In our measurements, server-class machines can sink
data at a rate of 30 MB/sec. It would take 3,314 dedicated HDFS [3] nodes to sink
that amount of log data in one day. These machines are completely I/O bound, and are
unable to perform a significant amount of data processing at the same time. Thus, all of
their CPUs are left virtually idle. Servers are one of the largest fixed, depreciating costs
in a data center, putting a substantial price tag on data migration alone.
This fundamental limitation has broad ramifications on the fidelity and avail-
5
Cloud Serverswith Logs
Data Processing Framework (Continuous MapReduce)Framework Users
Query
Results
Distributed File System
HDFS
Figure 1.2: Log processing with the query-then-store in-situ model.
MapReduce (CMR), a data processing framework to address large-scale batch-oriented
workloads where timeliness is still important.
CMR’s architecture has several properties that specifically address the limita-
tions of current approaches. It is scalable across large networks, enabling companies to
handle the next generation of data management challenges. It is responsive, allowing
more agile analysis of valuable log data. It is highly available, providing users with
results in the presence of failure. Finally, it is easily adoptable, creating a solution that
can be integrated into existing infrastructures.
These architectural properties are supported with several unique design
choices. CMR’s architecture uses a query-then-store model (Figure 1.2), taking dis-
tributed storage systems off the critical path for delivering results to a user. It processes
data in-situ, or on location, eliminating wasteful migration of unused data. It uses in-
network aggregation to save valuable bandwidth resources by reducing the amount of
data sent across the network. It uses a push-based model and continuous queries with
incremental processing windows to handle data as it arrives, accommodating the latency
requirements of update-driven recurring queries. The architecture also explores a re-
laxed consistency model that prevents an entire query from being blocked by a small
percentage of unavailable data. Lastly, CMR extends the original MapReduce program-
”Store-First-Query-Later Model”
”Query-Then-Store in-situ Model”Continuous MapReduce: An Architecture for Large-scale In-situ Data Processing [pdf]
2-2. Design Concept
2-2. Design Concept
[Continuous Map Reduce Design]
・”In-situ(on location) Processing”: MRはログが出力されるサーバーまたは経由サーバーの中で実行されながらROOT(結果格納先)までダウンストリーム
・”Pipelined Processing”: 各Map処理完了時、他のMap処理の完了を待たずに中間データをReducerに渡す
[Continuous Map Reduce Design]
・”Time Window Processing”: 境界のないストリーミングデータを、時間を軸にした範囲(Window)に分割
・”Incremental Processing”: MRの出力は今までの結果に”Merge”され、古くなった出力は結果から”Unmerge”される
2-2. Design Concept
In-situ(on location) Processing
Figure 1: The in-situ MapReduce architecture avoids thecost and latency of the store-first-query-later design bymoving processing onto the data sources.
speed of social network updates or accuracy of ad target-ing. The in-situ MapReduce (iMR) architecture buildson previous work in stream processing [5, 7, 9] to sup-port low-latency continuous log processing. Like streamprocessors, iMR MapReduce jobs can process over slid-ing windows, updating and delivering results as new dataarrives.Available: iMR’s lossy data model allows the system
to return results that may be incomplete. This allowsthe system to improve result availability in the event offailures or processing and network delays. Additionally,iMR may pro-actively reduce processing fidelity throughload shedding, reducing the impact on existing servertasks. iMR attaches a metric of result quality to eachoutput, allowing users to judge the relative accuracy ofprocessing. Users may also explicitly trade fidelity forimproved result latency by specifying latency and fidelitybounds on their queries.Efficient: A log processing architecture should make
parsimonious use of computational and network re-sources. iMR explores the use of sub-windows orpanes for efficient continuous processing. Instead of re-computing each window from scratch, iMR allows incre-mental processing, merging recent data with previouslycomputed panes to create the next result. And adaptiveload-shedding policies ensure that nodes use computecycles for results that meet latency requirements.Compatible: iMR supports the traditional MapRe-
duce API, making it trivial to “port” existingMapReducejobs to run in-situ. It provides a single extension, un-combine, to allow users to further optimize incrementalprocessing in some contexts (Section 2.3.2).
2.1 In-situ MapReduce jobsA MapReduce job in iMR is nearly identical to that intraditional MapReduce architectures [12]. Programmersspecify two data processing functions: map and reduce.The map function outputs key-value pairs, {k, v}, for
each input record, and the reduce processes each groupof values, v[], that share the same key k. iMR is designedfor queries that are either highly selective or employ re-duce functions that are distributive or algebraic aggre-gates [14]. Thus we expect that users will also specify theMapReduce combiner, allowing the underlying systemto merge values of a single key to reduce data movementand distribute processing overhead. The use of a com-biner allows iMR to process windows incrementally andfurther reduce data volumes through in-network aggrega-tion. The only non-standard (but optional) function iMRMapReduce jobs may implement is uncombine, whichwe describe in Section 2.3.2.However, the primary way in which iMR jobs differ is
that they emit a stream of results computed over contin-uous input, e.g., server log files. Like data stream pro-cessors [7], iMR bounds computation over these (per-haps infinite) data streams by processing over a windowof data. The window’s range R defines the amount ofdata processed in each result, while the window’s slideS defines its update frequency. For example, a usercould count error events over the last 24 hours of logrecords (R = 24 hours), and update the count everyhour (S = 1 hour). This sliding window, one whoseslide S is less than its range R, may be in terms of wall-clock time or logical index, such as record count, bytes,or any user-defined sequence number. Users specify Rand S with simple annotations to the reduce function.While sufficient for real-time log processing, a
MapReduce job in iMRmay reference historical log dataas well. Doing so requires a job-level annotation thatspecifies the point in the local log to begin B and the to-tal data to consume, the extent E. If unspecified, the jobcontinues to process, possibly catching up to real-timeprocessing.
2.2 Job executionIn general, MapReduce architectures have three primarytasks: the parallel execution of the map phase, groupinginput records by key, and the parallel execution of the re-duce phase. In cluster-based MapReduce systems, likeHadoop, each map task produces key-value pairs, {k,v},from raw input records at individual nodes in the clus-ter. The map tasks then group values by their key k, andsplit the set of keys into r partitions. After the map tasksfinish, the system starts a reduce task for each partitionr. These tasks first download their partition’s key-valuepairs from each mapper (the shuffle), finish grouping val-ues, and then call reduce once for every {k,v[]} pair.iMR distributes the work of a MapReduce job across
multiple trees, one for each reducer partition. Figure 2illustrates one such tree; iMR co-locates map processingon the server nodes themselves, sourcing input records
!"#$"
!"#$%"
&'(
%#&'()*
)*+,-."
%#&'()*
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
+&,-+.#",&#/0
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
!"#$$%&!'()*+,-./01
)01201*%$$,
+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0
+&,-+.#",&#/0+&,-+.#",&#/0
Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.
(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable
functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.
2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-
ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.
!"#$%& !'( !)% !*+ !()",-./#"0-1.2
3014!5
6!7819:7-;,<./,</10<.
=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB
+/3,< )+/3,< %&+/3,<
C+$ CD=> ?@//A% CD
=> ?@//A&C%$ CD=> ?@//A& CD
=> ?@//AB
E7F/!.:7!2#
>.GH@0E8.
10<.#
10,!#
Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.
!"# !"$ !%# !%$
!"#&!%# !"$&!
%$
'(()*("+*,-".*,-")+/"0,1"02*3
!"4&!%4
&!# !$!#5 67 &!4 !$!#5849
':;/0<
=:;/0<
Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.
2.3.1 Pane management
Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes
from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs
※Incremental ProcessingMRの出力は既存の結果に集約
される
※Aggregation TreeRootがReduce、それ以外はCombineが実行される
※In-situ ProcessingMRはログサーバー内で実行される、出力のみHDFSに保存
In-situ MapReduce for Log Processing [pdf]
Pipelined Processing
・Continuous Map Reduceでは半永久的にMap処理とReduce処理が続けられる
・Map/Reduceフェーズごとにサーバーの役割が切り替わるのではなく、Map処理、Reduce処理を行うサーバーが事前に決まっている(兼任可)
・各Map処理は完了ごとにReducer(Combiner)へ中間データを転送。Reducerは集計に必要な中間データが全て集まった時点でReduce処理を実行(それまではメモリ上に格納)
・このPipelined Processingによって、全Map処理の完了を待たず、また中間データがDiskにフラッシュされることもなく、効率的に処理が継続できる
・境界のないストリーミングデータを時間の範囲を元に(Time Window)で分割してMRを行う。
・”Range”: 最終的な結果とする対象範囲。例えば24hour、60min。ただしMRはRangeごとに行われるのではない
・”Slide (Slice)”:1回の処理区間。例えば (R,S) = (24hour, 1hour), (60min, 5min)となる。前者では24時間分のデータ集計のために1時間おきに集計を行い、生成した24個のSub Window(=Pane)の集約結果を足し合わせる
Time Window Processing
[Time Windowの種類]
※今回は Sliding Window を扱う
Time Window Processing
明確な基点がある 日や月をまたぐとリセット 直近24時間など、現在を基点にし続ける
Map Reduce and Stream Processing
Incremental Processing・Reducerは全Mapperから同じSlideを全て受け取った時点で処理を実行し、既存の結果データと”Merge”する
・Windowが動いた時点で、古くて不要なPaneを”UnMerge”する
New Window
Slide
reducer
mapper mapper mapper
Merge
UnMerge
Old Window
Time
New Pane
Old Pane
・Map処理は(key, value, timestamp)を生成し、Combinerに”push”する
・[1] Combine処理はすぐには実行されずTimeline Bufferに格納。その際、timestampから該当するPaneに振り分けられていく(GroupBy)
More: Time Window Processing
!"#$"
!"#$%"
&'(
%#&'()*
)*+,-."
%#&'()*
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
+&,-+.#",&#/0
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
!"#$$%&!'()*+,-./01
)01201*%$$,
+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0
+&,-+.#",&#/0+&,-+.#",&#/0
Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.
(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable
functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.
2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-
ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.
!"#$%& !'( !)% !*+ !()",-./#"0-1.2
3014!5
6!7819:7-;,<./,</10<.
=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB
+/3,< )+/3,< %&+/3,<
C+$ CD=> ?@//A% CD
=> ?@//A&C%$ CD=> ?@//A& CD
=> ?@//AB
E7F/!.:7!2#
>.GH@0E8.
10<.#
10,!#
Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.
!"# !"$ !%# !%$
!"#&!%# !"$&!
%$
'(()*("+*,-".*,-")+/"0,1"02*3
!"4&!%4
&!# !$!#5 67 &!4 !$!#5849
':;/0<
=:;/0<
Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.
2.3.1 Pane management
Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes
from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs
In-situ MapReduce for Log Processing [pdf]
・[2] 各Paneは以降該当データが入ってこないと判断した時点でCombine処理(i.e. Paneに属する値リストを集約)が行われ、paneIdをkey、集約値をvalueとして (key,value,timestamp) がダウンストリームに”push”される
・[3] 次のストリームでは、全てのサーバーから同じpaneIdを持った集合(paneId, list(values, timestamps)) を受け取りCombiner処理
・[4] Aggregation TreeのRootでは、Combine処理の代わりにReduce処理を行い、集約結果を元のデータとMergeする(Incremental Processing)
More: Time Window Processing
More: Time Window Processing
!"#$"
!"#$%"
&'(
%#&'()*
)*+,-."
%#&'()*
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
+&,-+.#",&#/0
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
!"#$$%&!'()*+,-./01
)01201*%$$,
+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0
+&,-+.#",&#/0+&,-+.#",&#/0
Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.
(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable
functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.
2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-
ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.
!"#$%& !'( !)% !*+ !()",-./#"0-1.2
3014!5
6!7819:7-;,<./,</10<.
=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB
+/3,< )+/3,< %&+/3,<
C+$ CD=> ?@//A% CD
=> ?@//A&C%$ CD=> ?@//A& CD
=> ?@//AB
E7F/!.:7!2#
>.GH@0E8.
10<.#
10,!#
Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.
!"# !"$ !%# !%$
!"#&!%# !"$&!
%$
'(()*("+*,-".*,-")+/"0,1"02*3
!"4&!%4
&!# !$!#5 67 &!4 !$!#5849
':;/0<
=:;/0<
Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.
2.3.1 Pane management
Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes
from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs
[2] Paneごとに集約処理が行われた後、ダウンストリームへ”Push”
[1] Paneごとにmapperの中間出力を振り分け
[3] サーバーを通して同じPaneが集まった時点
で集約処理
[4] RootではReduce処理が行われ、出力は前までの結果と
Merge
In-situ MapReduce for Log Processing [pdf]
More: Incremental Processing
[4-1] PaneごとにChildAとChildBの中間出力の集約処理
例:Child AのP1はRange[0,10]の集約結果、P2はRange[10,20]の集約結果
[4-2] 旧Window: W0から対象外になったPane: P0 を取り除き、新しくP2を加える
!"#$"
!"#$%"
&'(
%#&'()*
)*+,-."
%#&'()*
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
+&,-+.#",&#/0
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
&'(
%#&'()*
)*+,-."
!"#$$%&!'()*+,-./01
)01201*%$$,
+&,-+.#",&#/0 +&,-+.#",&#/0 +&,-+.#",&#/0
+&,-+.#",&#/0+&,-+.#",&#/0
Figure 2: This illustrates the physical instantiation of oneiMR MapReduce partition as a multi-level aggregationtree.
(tuples) from the local node’s log file. The dedicated pro-cessing cluster hosts the root, which executes the user’sreduce function. This tree uses the combine API to ag-gregate intermediate data at every mapper in a mannersimilar to traditional MapReduce architectures. How-ever, like Dryad [32], iMR can use multi-level aggrega-tion trees to further reduce the data crossing the network.In general, this requires aggregate or decomposable
functions that can be computed incrementally [15, 23,32]. Here we are interested in two broad categories ofaggregate functions [21]. Holistic aggregates requirepartial values whose size is in proportion to their inputdata, e.g., union, median or groupby. In contrast,bounded aggregates have constant-sized partial values,e.g., sum or max, and present the greatest opportunitiesfor data reduction.
2.3 Window processing with panesiMR supports sliding processing windows not just be-cause they bound computation on infinite streams, butbecause they also enable incremental computations.However, they do not immediately lend themselves toefficient in-network processing. Consider a simple ag-gregation strategy where each log server accumulates allkey-value pairs for each logical window and nodes in theaggregation tree combine these entire windows.We can see that this strategy isn’t efficient for our ex-
ample sliding window query. In this case, every eventrecord would be included in 24 successive results. Thusevery input key-value pair in a sliding window wouldbe grouped, combined, and transmitted for each update(slide) of the window or R/S times. To reduce theseoverheads, iMR adapts the use of sub-windows or panesto efficiently compute aggregates over sliding windows.While the concept of panes was introduced in prior workfor single-node stream processors [21]; here we adaptthem to distributed in-situ MapReduce processing.
!"#$%& !'( !)% !*+ !()",-./#"0-1.2
3014!5
6!7819:7-;,<./,</10<.
=> ?@A% %& '( '( )% *+ ()=> ?@A& => ?@A& => ?@A& => ?@AB => ?@AB
+/3,< )+/3,< %&+/3,<
C+$ CD=> ?@//A% CD
=> ?@//A&C%$ CD=> ?@//A& CD
=> ?@//AB
E7F/!.:7!2#
>.GH@0E8.
10<.#
10,!#
Figure 3: iMR nodes process local log files to producesub-windows or panes. The system assumes log recordshave a logical timestamp and arrive in order.
!"# !"$ !%# !%$
!"#&!%# !"$&!
%$
'(()*("+*,-".*,-")+/"0,1"02*3
!"4&!%4
&!# !$!#5 67 &!4 !$!#5849
':;/0<
=:;/0<
Figure 4: iMR aggregates individual panes Pi in the net-work. To produce a result, the root may either combinethe constituent panes or update the prior window by re-moving an expired pane and adding the most recent.
2.3.1 Pane management
Panes break a window into multiple equal-sized sub-windows, allowing the system to group and combinekey-value records once per sub-window. Nodes in thesystem generate panes and send them to their parents inthe aggregation tree. Thus in iMR, interior nodes in atree aggregate panes and the root node combines theminto each window result. This supports the fundamen-tal grouping operation underlying reduce, a holistic ag-gregate. By sending panes, rather than sending the en-tire window up the tree, the system sends a single copyof a key’s value, reducing network traffic. Additionally,issuing values at the granularity of panes gives the sys-tem fine-grain control on fidelity and load shedding (Sec-tion 3.4). It is also the granularity at which failed nodesrestart processing, minimizing the gap of dropped data(Section 4.4.2).Figure 3 illustrates how a single node creates panes
from a stream of local log records. Typically, we setthe pane size equal to the slide S, though it may beany common divisor of R and S, and each node main-tains a sequence of pane partial values Pi. This exampleuses a processing window with a slide of 60 minutes.When log records first enter the system, iMR tags eachone with a non-decreasing user-defined timestamp. Thesystem then feeds these records to the user’s map func-tion. After mapping, the system assigns key-value pairs
In-situ MapReduce for Log Processing [pdf]
Incremental Processing(再)・Reducerは全Mapperから同じSlideを全て受け取った時点で処理を実行し、既存の結果データと”Merge”する
・つまりWindowが動いた時点で、古くて不要なPaneを”UnMerge”する
New Window
Slide
reducer
mapper mapper mapper
Merge
UnMerge
Old Window
Time
New Pane
Old Pane
2-3. Example
[例]
・24 Hour Sliding Widow Model
・Range = 24, Slide = 1
・直近24時間の”hit count”を算出、MRは1時間ごとに実行されマージされていく
# Call at each hit record
map(k1, hitRecord) {
timestamp = hitRecord.time
# look up paneId from timestamp
paneId = lookupPane(timestamp)
if (paneId.endFlag == True) {
# Notify whole data of the pane is sent
notify(paneId)
}
emitIntermediate(paneId, 1, timestamp)
}
Ex: Incremental Processing
Map Reduce and Stream Processing
combine(paneId, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
# Send the message to the downstream node
emitIntermediate(paneId, hitCount)
}
Ex: Incremental Processing
Map Reduce and Stream Processing
# if node == root of aggregation tree
reduce(paneId ,countList) {
hitCount = 0
for count in countList {
hitCount += count
}
sv = SlideValue.new(paneId)
sv.hitCount = hitCount
return sv
}
Ex: Incremental Processing
Map Reduce and Stream Processing
# Windowがslideした時 init(slide) {
rangeValue = RangeValue.new
rangeValue.hitCount = 0
return rangeValue
}
# Reduce処理が完了したとき merge(rangeValue, slideValue) {
rangeValue.hitCount += slideValue.hitCount
}
# あるslideがwindowから外れたとき unmerge(rangeValue, slideValue) {
rangeValue.hitCount -= slideValue.hitCount
} Map Reduce and Stream Processing
・Map Reduce Online [pdf]
・C-MR: A Continuous MapReduce Processing Model for Low-Latency Stream Processing on Multi-Core Architectures
・Continuous MapReduce: An Architecture for Large-scale In-situ Data Processing [pdf]
・In-situ MapReduce for Log Processing [pdf]
・Wide-Scale Data Stream Management [pdf]
参考文献
3. 応用編について
応用編について
[次回予告]: 以下の内容の中でいくつか…
・Map Reduce for 機械学習、グラフ、行列演算
・実装例を紹介
・場合によってはMap Reduce が非効率なことを示し、BSPなどの他の計算モデルを紹介、比較
・Dryad の紹介
Map Reduce for k-means
K-Means Clustering in Map Reduce
Map Reduce for Classifier
A Comparison of Approaches for Large-Scale Data Mining
Figure 2: MapReduce Classifier Training and Evaluation Procedure
Table 1: Numbers of HTML Pages from Eight Categories.
Category Art Business Computer Game Health Home Science Society
Number 176, 340 188, 100 88, 830 39, 560 43, 680 22, 281 85, 197 81, 620
Table 2: Data mining accuracy of Hadoop approach.
Category Round1 Round2 Round3 Round4 Round5 Round6 Round7 Round8 Round9 Round10
Art 80.09% 80.12% 81.18% 80.31% 80.20% 80.17% 80.86% 79.70% 79.79% 80.06%Business 55.82% 58.43% 53.77% 57.30% 57.13% 59.62% 54.98% 58.36% 59.15% 57.98%Computer 82.37% 81.93% 82.88% 82.68% 82.48% 82.18% 82.81% 82.22% 81.69% 82.14%Game 78.83% 78.84% 76.86% 77.70% 78.45% 78.94% 78.17% 79.78% 78.49% 79.02%Health 78.77% 80.49% 79.68% 80.42% 79.95% 80.33% 79.76% 79.71% 80.14% 81.27%Home 68.01% 66.78% 67.97% 66.41% 67.16% 67.12% 67.49% 67.13% 66.16% 67.44%Science 48.47% 50.64% 49.98% 50.19% 49.26% 47.89% 48.34% 49.32% 49.62% 48.53%Society 63.07% 61.00% 61.10% 61.62% 61.77% 61.50% 61.92% 62.16% 62.16% 61.39%
BSP For Graph Processing
Google Pregel Graph Processing
Dryad
Google Pregel Graph Processing
ありがとうございました