Transcript
  • 8/7/2019 Write your first MapReduce program in 20 minutes

    1/20

    Write your first MapReduce program in 20 minutes

    Posted on January 2, 2009 by Michael Nielsen

    The slow revolution

    Some revolutions are marked by a single, spectacular event: the

    storming of the Bastille during the French Revolution, or the

    destruction of the towers of the World Trade Center on September 11,

    2001, which so changed the USs relationship with the rest of the

    world. But often the most important revolutions arent announced

    with the blare of trumpets. They occur softly, too slow for a single

    news cycle, but fast enough that if you arent alert, the revolution isover before youre aware its happening.

    Such a revolution is happening right now in computing.

    Microprocessor clock speeds have stagnated since about 2000. Major

    chipmakers such as Intel and AMD continue to wring out

    improvements in speed by improving on-chip caching and other clever

    techniques, but they are gradually hitting the point of diminishingreturns. Instead, as transistors continue to shrink in size, the

    chipmakers are packing multiple processing units onto a single chip.

    Most computers shipped today use multi-core microprocessors, i.e.,

    chips with 2 (or 4, or 8, or more) separate processing units on the

    main microprocessor.

    The result is a revolution in software development. Were gradually

    moving from the old world in which multiple processor computing was

    a special case, used only for boutique applications, to a world in which

    it is widespread. As this movement happens, software development, so

    long tailored to single-processor models, is seeing a major shift in

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    2/20

    some its basic paradigms, to make the use of multiple processors

    natural and simple for programmers.

    This movement to multiple processors began decades ago. Projectssuch as the Connection Machine demonstrated the potential of

    massively parallel computing in the 1980s. In the 1990s, scientists

    became large-scale users of parallel computing, using parallel

    computing to simulate things like nuclear explosions and the

    dynamics of the Universe. Those scientific applications were a bit like

    the early scientific computing of the late 1940s and 1950s: specialized,

    boutique applications, built with heroic effort using relatively

    primitive tools. As computing with multiple processors becomes

    widespread, though, were seeing a flowering of general-purpose

    software development tools tailored to multiple processor

    environments.

    One of the organizations driving this shift is Google. Google is one of

    the largest users of multiple processor computing in the world, with its

    entire computing cluster containing hundreds of thousands of

    commodity machines, located in data centers around the world, and

    linked using commodity networking components. This approach to

    multiple processor computing is known as distributed computing; the

    characteristic feature of distributed computing is that the processors

    in the cluster dont necessarily share any memory or disk space, and so

    information sharing must be mediated by the relatively slow network.

    In this post, Ill describe a framework for distributed computing called

    MapReduce. MapReduce was introduced in a paper written in 2004

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    3/20

    by Jeffrey Dean and Sanjay Ghemawat from Google. Whats beautiful

    about MapReduce is that it makes parallelization almost entirely

    invisible to the programmer who is using MapReduce to develop

    applications. If, for example, you allocate a large number of machinesin the cluster to a given MapReduce job, the job runs in a highly

    parallelized way. If, on the other hand, you allocate only a small

    number of machines, it will run in a much more serial way, and its

    even possible to run jobs on just a single machine.

    What exactly is MapReduce? From the programmers point of view,

    its just a library thats imported at the start of your program, like any

    other library. It provides a single library call that you can make,

    passing in a description of some input data and two ordinary serial

    functions (the mapper and reducer) that you, the programmer,

    specify in your favorite programming language. MapReduce then takes

    over, ensuring that the input data is distributed through the cluster,

    and computing those two functions across the entire cluster of

    machines, in a way well make precise shortly. All the details

    parallelization, distribution of data, tolerance of machine failures are

    hidden away from the programmer, inside the library.

    What were going to do in this post is learn how to use the MapReduce

    library. To do this, we dont need a big sophisticated version of the

    MapReduce library. Instead, we can get away with a toy

    implementation (just a few lines of Python!) that runs on a single

    machine. By using this single-machine toy library we can learn how to

    develop for MapReduce. The programs we develop will run essentially

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    4/20

    unchanged when, in later posts, we improve the MapReduce library so

    that it can run on a cluster of machines.

    Our first MapReduce program

    Okay, so how do we use MapReduce? Ill describe it with a simple

    example, which is a program to count the number of occurrences of

    different words in a set of files. The example is simple, but youll find it

    rather strange if youre not already familiar with MapReduce: the

    program well describe is certainly not the way most programmers

    would solve the word-counting problem! What it is, however, is anexcellent illustration of the basic ideas of MapReduce. Furthermore,

    what well eventually see is that by using this approach we can easily

    scale our wordcount program up to run on millions or even billions of

    documents, spread out over a large cluster of computers, and thats

    not something a conventional approach could do easily.

    The inputto a MapReduce job is just a set of(input_key,input_value) pairs, which well implement as a Python

    dictionary. In the wordcount example, the input keys will be the

    filenames of the files were interested in counting words in, and the

    corresponding input values will be the contents of those files:

    filenames = ["text\\a.txt","text\\b.txt","text\\c.txt"]

    i = {}

    forfilename in filenames:

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    5/20

    f= open(filename)

    i[filename] = f.read()

    f.close()

    After this code is run the Python dictionaryi will contain the input to

    our MapReduce job, namely, i has three keys containing the

    filenames, and three corresponding values containing the contents of

    those files. Note that Ive used Windows filenaming conventions

    above; if youre running a Mac or Linux you may need to tinker withthe filenames. Also, to run the code you will of course need text files

    text\1.txt, text\2.txt, text\3.txt. You can create some

    simple examples texts by cutting and pasting from the following:

    text\a.txt:

    The quick brown fox jumped over the lazy grey dogs.

    text\b.txt:

    That's one small step for a man, one giant leap for mankind.

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    6/20

    text\c.txt:

    Mary had a little lamb,

    Its fleece was white as snow;

    And everywhere that Mary went,

    The lamb was sure to go.

    The MapReduce job will process this input dictionary in two phases:

    the map phase, which produces output which (after a little

    intermediate processing) is then processed by the reduce phase. In the

    map phase what happens is that for each (input_key,input_value)

    pair in the input dictionaryi, a function

    mapper(input_key,input_value) is computed, whose output is alist of intermediate keys and values. This function mapper is supplied

    by the programmer well show how it works for wordcount below.

    The output of the map phase is just the list formed by concatenating

    the list of intermediate keys and values for all of the different input

    keys and values.

    I said above that the function mapper is supplied by the programmer.In the wordcount example, what mapper does is takes the input key

    and input value a filename, and a string containing the contents of

    the file and then moves through the words in the file. For each word

    it encounters, it returns the intermediate key and value (word,1),

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    7/20

    indicating that it found one occurrence ofword. So, for example, for

    the input keytext\a.txt, a call to

    mapper("text\\a.txt",i["text\\a.txt"]) returns:

    [('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), ('jumped', 1),

    ('over', 1), ('the', 1), ('lazy', 1), ('grey', 1), ('dogs', 1)]

    Notice that everything has been lowercased, so we dont count words

    with different cases as distinct. Furthermore, the same key gets

    repeated multiple times, because words like the appear more thanonce in the text. This, incidentally, is the reason we use a Python list

    for the output, and not a Python dictionary, for in a dictionary the

    same key can only be used once.

    Heres the Python code for the mapper function, together with a helper

    function used to remove punctuation:

    defmapper(input_key,input_value):

    return [(word,1) for word in

    remove_punctuation(input_value.lower()).split()]

    def remove_punctuation(s):

    return s.translate(string.maketrans("",""), string.punctuation)

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    8/20

    mapper works by lowercasing the input file, removing the punctuation,

    splitting the resulting string around whitespace, and finally emitting

    the pair (word,1) for each resulting word. Note, incidentally, that Im

    ignoring apostrophes, to keep the code simple, but you can easilyextend the code to deal with apostrophes and other special cases.

    With this specification ofmapper, the output of the map phase for

    wordcount is simply the result of combining the lists for

    mapper("text\\a.txt"), mapper("text\\b.txt"), and

    mapper("text\\c.txt"):

    [('the', 1), ('quick', 1), ('brown', 1), ('fox', 1),

    ('jumped', 1), ('over', 1), ('the', 1), ('lazy', 1), ('grey', 1),

    ('dogs', 1), ('mary', 1), ('had', 1), ('a', 1), ('little', 1),

    ('lamb', 1), ('its', 1), ('fleece', 1), ('was', 1), ('white', 1),

    ('as', 1), ('snow', 1), ('and', 1), ('everywhere', 1),

    ('that', 1), ('mary', 1), ('went', 1), ('the', 1), ('lamb', 1),

    ('was', 1), ('sure', 1), ('to', 1), ('go', 1), ('thats', 1),

    ('one', 1), ('small', 1), ('step', 1), ('for', 1), ('a', 1),

    ('man', 1), ('one', 1), ('giant', 1), ('leap', 1), ('for', 1),

    ('mankind', 1)]

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    9/20

    The map phase of MapReduce is logically trivial, but when the input

    dictionary has, say 10 billion keys, and those keys point to files held on

    thousands of different machines, implementing the map phase is

    actually quite non-trivial. What the MapReduce library handles isdetails like knowing which files are stored on what machines, making

    sure that machine failures dont affect the computation, making

    efficient use of the network, and storing the output in a useable form.

    We wont worry about these issues for now, but we will come back to

    them in future posts.

    What the MapReduce library now does in preparation for the reduce

    phase is to group together all the intermediate values which have the

    same key. In our example the result of doing this is the following

    intermediate dictionary:

    {'and': [1], 'fox': [1], 'over': [1], 'one': [1, 1], 'as': [1],

    'go': [1], 'its': [1], 'lamb': [1, 1], 'giant': [1],

    'for': [1, 1], 'jumped': [1], 'had': [1], 'snow': [1],

    'to': [1], 'leap': [1], 'white': [1], 'was': [1, 1],

    'mary': [1, 1], 'brown': [1], 'lazy': [1], 'sure': [1],

    'that': [1], 'little': [1], 'small': [1], 'step': [1],

    'everywhere': [1], 'mankind': [1], 'went': [1], 'man': [1],

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    10/20

    'a': [1, 1], 'fleece': [1], 'grey': [1], 'dogs': [1],

    'quick': [1], 'the': [1, 1, 1], 'thats': [1]}

    We see, for example, that the word and, which appears only once in

    the three files, has as its associated value a list containing just a single

    1, [1]. By contrast, the word one, which appears twice, has [1,1] as

    its value.

    The reduce phase now commences. A programmer-defined function

    reducer(intermediate_key,intermediate_value_list) isapplied to each entry in the intermediate dictionary. For wordcount,

    reducer simply sums up the list of intermediate values, and return

    both the intermediate_key and the sum as the output. This is done

    by the following code:

    def reducer(intermediate_key,intermediate_value_list):

    return (intermediate_key,sum(intermediate_value_list))

    The output from the reduce phase, and from the total MapReduce

    computation, is thus:

    [('and', 1), ('fox', 1), ('over', 1), ('one', 2), ('as', 1),

    ('go', 1), ('its', 1), ('lamb', 2), ('giant', 1), ('for', 2),

    ('jumped', 1), ('had', 1), ('snow', 1), ('to', 1), ('leap', 1),

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    11/20

    ('white', 1), ('was', 2), ('mary', 2), ('brown', 1),

    ('lazy', 1), ('sure', 1), ('that', 1), ('little', 1),

    ('small', 1), ('step', 1), ('everywhere', 1), ('mankind', 1),

    ('went', 1), ('man', 1), ('a', 2), ('fleece', 1), ('grey', 1),

    ('dogs', 1), ('quick', 1), ('the', 3), ('thats', 1)]

    You can easily check that this is just a list of the words in the three files

    we started with, and the associated wordcounts, as desired.

    Weve looked at code defining the input dictionaryi, the mapper and

    reducer functions. Collecting it all up, and adding a call to the

    MapReduce library, heres the complete wordcount.py program:

    #word_count.py

    import string

    import map_reduce

    defmapper(input_key,input_value):

    return [(word,1) for word in

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    12/20

    remove_punctuation(input_value.lower()).split()]

    def remove_punctuation(s):

    return s.translate(string.maketrans("",""), string.punctuation)

    def reducer(intermediate_key,intermediate_value_list):

    return (intermediate_key,sum(intermediate_value_list))

    filenames = ["text\\a.txt","text\\b.txt","text\\c.txt"]

    i = {}

    forfilename in filenames:

    f= open(filename)

    i[filename] = f.read()

    f.close()

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    13/20

    print map_reduce.map_reduce(i,mapper,reducer)

    The map_reduce module imported by this program implements

    MapReduce in pretty much the simplest possible way, using someuseful functions from the itertools library:

    # map_reduce.py

    "'Defines a single function, map_reduce, which takes an input

    dictionary i and applies the user-defined

    function mapper to each

    (input_key,input_value) pair, producing a list of intermediate

    keys and intermediate values. Repeated intermediate keys then

    have their values grouped into a list, and the user-defined

    function reducer is applied to the intermediate key and list of

    intermediate values. The results are returned as a list."'

    import itertools

    defmap_reduce(i,mapper,reducer):

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    14/20

    intermediate = []

    for (key,value) in i.items():

    intermediate.extend(mapper(key,value))

    groups = {}

    for key, group in itertools.groupby(sorted(intermediate),

    lambda x: x[0]):

    groups[key] = list([y for x, y in group])

    return [reducer(intermediate_key,groups[intermediate_key])

    for intermediate_key in groups]

    (Credit to a nice blog post from Dave Spencer for the use of

    itertools.groupby to simplify the reduce phase.)

    Obviously, on a single machine an implementation of the MapReduce

    library is pretty trivial! In later posts well extend this library so that it

    can distribute the execution of the mapper and reducer functions

    across multiple machines on a network. The payoff is that with enough

    improvement to the library we can with essentially no change use our

    wordcount.py program to count the words not just in 3 files, but

    rather the words in billions of files, spread over thousands of

    computers in a cluster. What the MapReduce library does, then, is

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    15/20

    provide an approach to developing in a distributed environment where

    many simple tasks (like wordcount) remain simple for the

    programmer. Important (but boring) tasks like parallelization, getting

    the right data into the right places, dealing with the failure ofcomputers and networking components, and even coping with racks of

    computers being taken offline for maintenance, are all taken care of

    under the hood of the library.

    In the posts that follow, were thus going to do two things. First, were

    going to learn how to develop MapReduce applications. That means

    taking familiar tasks things like computing PageRank and figuring

    out how they can be done within the MapReduce framework. Well do

    that in the next post in this series. In later posts, well also take a look

    at Hadoop, an open source platform that can be used to develop

    MapReduce applications.

    Second, well go under the hood of MapReduce, and look at how it

    works. Well scale up our toy implementation so that it can be used

    over small clusters of computers. This is not only fun in its own right,

    it will also make us better MapReduce programmers, in the same way

    as understanding the innards of an operating system (for example) can

    make you a better application programmer.

    To finish off this post, though, well do just two things. First, well sum

    up what MapReduce does, stripping out the wordcount-specific

    material. Its not any kind of a formal specification, just a brief

    informal summary, together with a few remarks. Well refine this

    summary a little in some future posts, but this is the basic MapReduce

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    16/20

    model. Second, well give a overview of how MapReduce takes

    advantage of a distributed environment to parallelize jobs.

    MapReduce in general

    Summing up our earlier description of MapReduce, and with the

    details about wordcount removed, the input to a MapReduce job is a

    set of(input_key,input_value) pairs. Each pair is used as input to

    a function mapper(input_key,input_value)which produces as

    output a list of intermediate keys and intermediate values:

    [(intermediate_key,intermediate_value),

    (intermediate_key',intermediate_value'),

    ...]

    The output from all the different input pairs is then sorted, so thatvalues associated with the same intermediate_key are grouped

    together in a list of intermediate values. The

    reducer(intermediate_key,intermediate_value_list)

    function is then applied to each intermediate key and list of

    intermediate values, to produce the output from the MapReduce job.

    A natural question is whether the order of values inintermediate_value_list matters. I must admit Im not sure of the

    answer to this question if its discussed in the original paper, then I

    missed it. In most of the examples Im familiar with, the order doesnt

    matter, because the reducer works by applying a commutative,

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    17/20

    associate operation across all intermediate values in the list. As well

    see in a minute, because the mapper computations are potentially

    done in parallel, on machines which may be of varying speed, itd be

    hard to guarantee the ordering, and this suggests that the orderingdoesnt matter. Itd be nice to know for sure if anyone reading this

    does know the answer, Id appreciate hearing it, and will update the

    post!

    One of the most striking things about MapReduce is howrestrictive it

    is. A prioriits by no means clear that MapReduce should be all that

    useful in practical applications. It turns out, though, that many

    interesting computations can be expressed either directly in

    MapReduce, or as a sequence of a few MapReduce computations.

    Weve seen wordcount implemented in this post, and well see how to

    compute PageRank using MapReduce in the next post. Many other

    important computations can also be implemented using MapReduce,

    including doing things like finding shortest paths in a graph, grepping

    a large document collection, or many data mining algorithms. For

    many such problems, though, the standard approach doesnt obviously

    translate into MapReduce. Instead, you need to think through the

    problem again from scratch, and find a way of doing it using

    MapReduce.

    Exercises

    How would you implement grep in MapReduce?

    Problems

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    18/20

    Take a well-known algorithms book, e.g., Cormen-Leiserson-

    Rivest-Stein, or a list of well-known algorithms. Which

    algorithms lend themselves to being expressed in MapReduce?

    Which do not? This isnt so much a problem as it is a suggestionfor a research program.

    The early years of serial computing saw the introduction of

    powerful general-purpose ideas like those that went into the Lisp

    and Smalltalk programming languages. Arguably, those ideas are

    still the most powerful in todays modern programming

    languages. MapReduce isnt a programming language as we

    conventionally think of them, of course, but its similar in that it

    introduces a way of thinking about and approaching

    programming problems. I dont think MapReduce has quite the

    same power as the ideas that went into Lisp and Smalltalk; its

    more like the Fortran of distributed computing. Can we find

    similarly powerful ideas to Lisp or Smalltalk in distributed

    computing? Whats the hundred-year frameworkgoing to look

    like for distributed computing?

    How MapReduce takes advantage of the distributed setting

    You can probably already see how MapReduce takes advantage of a

    large cluster of computers, but lets spell out some of the details. There

    are two key points. First, the mapper functions can be run in parallel,

    on different processors, because they dont share any data. Providedthe right data is in the local memory of the right processor a task

    MapReduce manages for you the different computations can be

    done in parallel. The more machines are in the cluster, the more

    mapper computations can be simultaneously executed. Second, the

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    19/20

    reducer functions can also be run in parallel, for the same reason,

    and, again, the more machines are available, the more computations

    can be done in parallel.

    The difficulty in this story arises in the grouping step that takes place

    between the map phase and the reduce phase. For the reducer

    functions to work in parallel, we need to ensure that all the

    intermediate values corresponding to the same key get sent to the

    same machine. Obviously, this requires communcation between

    machines, over the relatively slow network connections.

    This looks tough to arrange, and you might think (as I did at first) that

    a lot of communication would be required to get the right data to the

    right machine. Fortunately, a simple and beautiful idea is used to

    make sure that the right data ends up in the right location, without

    there being too much communication overhead.

    Imagine youve got 1000 machines that youre going to use to run

    reducers on. As the mappers compute the intermediate keys and value

    lists, they compute hash(intermediate_key) mod 1000 for some

    hash function. This number is used to identify the machine in the

    cluster that the corresponding reducer will be run on, and the

    resulting intermediate key and value list is then sent to that machine.

    Because every machine running mappers uses the same hash function,

    this ensures that value lists corresponding to the same intermediate

    key all end up at the same machine. Furthermore, by using a hash we

    ensure that the intermediate keys end up pretty evenly spread over

    machines in the cluster. Now, I should warn you that in practice this

  • 8/7/2019 Write your first MapReduce program in 20 minutes

    20/20

    hashing method isnt literally quite whats done (well describe that in

    later lectures), but thats the main idea.

    Needless to say, theres a lot more to the story of how MapReduceworks in practice, especially the way it handles data distribution and

    fault-tolerance. In future posts well explore many more of the details.

    Nonetheless, hopefully this post gives you a basic understanding of

    how MapReduce works. In the next post well apply MapReduce to the

    computation of PageRank.


Top Related