book recommender system using hadoop

Upload: ankit-kumar-das

Post on 10-Feb-2018

220 views

Category:

Documents


0 download

TRANSCRIPT

  • 7/22/2019 Book Recommender System using Hadoop

    1/55

    Project Report

    On

    BOOK RECOMMENDER SYSTEM USING

    HADOOP

    By

    Ankit Kumar Das (04710402710)

    Pranay Khatri (03010402710)

    Shreya Tripathi (05110407210)

    Submitted

    in partial fulfillment for the award of the degree of

    Bachelor of Technology

    in

    Computer Science & Engineering

    Under the Guidance of

    Mrs Shaveta Tatwani

    Department of Computer Science and Engineering

    Amity School of Engineering and Technology

    (Affiliated to G.G.S.I.P.University )

    New Delhi - 110061

  • 7/22/2019 Book Recommender System using Hadoop

    2/55

    i

    CERTIFICATE

    This is to certify that the Project Report titled

    Book Recommender System using Hadoop

    Done by

    Ankit Kumar Das (04710402710)

    Pranay Khatri (03010402710)

    Shreya Tripathi (05110402710)

    is an authentic work carried out by them under my guidance at :

    AM ITY SCHOOL OF ENGINEERING AND TECHNOLOGY, Bijwasan

    The matter embodied in this project report has not been submitted earlier for the award of any

    degree or diploma to the best of my knowledge and belief.

    Mrs. Shaveta Tatwani Prof. M.N.Gupta

    Asst. Professor Head of Department

    Dept. of CSE/IT Dept. of CSE/IT

  • 7/22/2019 Book Recommender System using Hadoop

    3/55

    ii

    AKNOWLEDGEMENT

    First and foremost, our acknowledgements are due to Prof. B.P.Singh, Senior Director and

    Prof. Rekha Aggarwal, Director for the education under their guidance that has provided

    strong fundamentals, positive competition and unmatched learning experience.

    I express our gratitude to our esteemed Professor M.N. Gupta, (Head, Department of CSE &

    IT) and our guide Asst. Professor, Mrs Shaveta Tatwani. Their inspiration, motivation,

    suggestion and invaluable guidance enabled us to develop the minor project. Their careful

    observations and precise ideas were an immense help in refining our content.

    Ankit Kumar Das Pranay Khatri

    (047/B.TECH(CSE)/ASET/14) (030/B.TECH(CSE)/ASET/14)

    Shreya Tripathi

    (051/B.TECH(CSE)/ASET/14)

  • 7/22/2019 Book Recommender System using Hadoop

    4/55

    iii

    ABSTRACT

    Recommender Systems are new generation internet tool that help user in navigating through

    information on the internet and receive information related to their preferences. Although

    most of the time recommender systems are applied in the area of online shopping and

    entertainment domains like movie and music, yet their applicability is being researched upon

    in other area as well. This report presents an overview of the Recommender Systems which

    are currently working in the domain of online book shopping. This report also proposes a

    new book recommender system that combines user choices with not only similar users but

    other users as well to give diverse recommendation that change over time. The overall

    architecture of the proposed system is presented and its implementation with a prototype

    design is described. Lastly, the report presents empirical evaluation of the system based on a

    survey reflecting the impact of such diverse recommendations on the user choices.

  • 7/22/2019 Book Recommender System using Hadoop

    5/55

    iv

    TABLE OF CONTENTSCertificate ...i

    Acknowledgement .....iiAbstract ....iii

    Table of Contents .....ivList of Figuresvi

    1. Introduction..........1

    1.1 Recommender Systems

    1.2 Apache Hadoop

    2. Recommender Systems2

    2.1 Introduction

    2.2 Content based Filtering

    2.3 Collaborative Filtering

    2.3.1 User based Collaborative Filtering

    2.3.2 Item based Collaborative Filtering

    2.4 Similarity Measures

    2.4.1 Cosine based Similarity

    2.4.2 Pearson Correlation based Similarity

    2.4.3 Cooccurrence based Similarity

    2.4.4 Euclidian Distance based Similarity

    2.4.5 Tanimoto Coefficient based Similarity

    2.4.6 Log Likelihood based Similarity

    2.5 Challenges and Issues3. Hadoop Distributed File System.......11

    3.1 Overview

    3.2 Assumptions and Goals

    3.3 Architecture

    3.3.1 Namenode

    3.3.1.1 Checkpoint Node

    3.3.1.2 Backup Node

    3.3.2 DataNode

    3.3.3 HDFS Client3.4 Data Replication

    3.4.1 Replica Placement

    3.4.2 Replica Selection

    3.4.3 Replica Selection

    3.5 Data Organisation

    3.5.1 Data Blocks

    3.5.3 Staging

    3.5.3 Replication and Pipelining

    3.6 HDFS Features

    3.6.1 Communication Protocols

  • 7/22/2019 Book Recommender System using Hadoop

    6/55

    v

    3.6.2 Data Disk Failure, Heartbeats and Re replication

    3.6.3 Cluster Rebalancing

    3.6.4 Data Integrity

    3.6.5 Metadata Disk Failure

    3.7 Advantages and Disadvantages4. Hadoop MapReduce...24

    4.1 Introduction

    4.2 Inputs and Outputs

    4.3 User Interfaces

    4.3.1 Mapper

    4.3.2 Reducer

    4.3.3 Partitioner

    4.3.4 Reporter

    4.3.5 Output Collector

    4.4 Job Configuration

    4.5 MapReduce Features

    5. Item Based Recommendation Algorithm..30

    5.1 Algorithm

    5.2 Logical Parts of Code

    5.2.1 Preparation of Preference Matrix

    5.2.2 Generation of Similarity Matrix

    5.2.3 Preparation for Matrix Multiplication

    5.2.4 Matrix Multiplication

    6. System Preparation and Implementation.336.1 Runtime Environment

    6.2 Software and Language Versions

    6.3 Hardware Specification of each Hadoop Node

    6.4 Hadoop Configuration

    6.5 Starting the Cluster

    6.5.1 Formatting the Namenode

    6.5.2 Starting HDFS daemons

    6.5.3 Starting mapred daemons

    6.5.4 Starting the Job

    6.6 Monitoring the Cluster

    7. Results and Conclusion40

    7.1 Results

    7.1.1 Run Time

    7.1.2 Recommendations

    7.2 Conclusions

    8. Future Scope...........46

    9. References...........47

  • 7/22/2019 Book Recommender System using Hadoop

    7/55

    vi

    LIST OF FIGURES

    Figure no. Name Page no.

    2.1 User based Collaborative Filtering 5

    2.2 Item based Collaborative Filtering 5

    2.3 Item-Item Similarity 6

    2.4 Tanimoto Coefficient 8

    3.1 HDFS Architecture 13

    3.2 Reading a File 16

    3.3 Writing to a file 17

    3.4 Block Replication 18

    4.1 Map reduce Framework 25

    4.2 Task Environment 28

    6.1 Formatting Namenode 36

    6.2 HDFS daemons 36

    6.3 Mapred daemons 37

    6.4 Starting Job 37

    6.5 Namenode Interface 37

    6.6 JobTracker Interface 38

    6.7 Scheduling Interface 1 38

    6.8 Scheduling Interface 2 39

    7.1 Average Run Time 41

    7.2 Similarity Cooccurrence Recommendations 42

    7.3 Tanimoto Recommendations 42

    7.4 Loglikelihood Recommendations 43

    7.5 Euclidian Distance Recommendation 43

    7.6 Pearson Coefficient Recommendations 44

  • 7/22/2019 Book Recommender System using Hadoop

    8/55

    vii

    LIST OF TABLES

    Table no. Name Page no.

    2.1 User Ratings 4

    7.1 Time Taken 40

  • 7/22/2019 Book Recommender System using Hadoop

    9/55

    1

    CHAPTER 1

    INTRODUCTION

    1.1 Recommender Systems

    Recommender systems are widespread tools that are employed by a wide range of

    organisations and companies for recommending items such as movies, books and even

    employees for projects. But with the advent of big data it has become difficult to process the

    large amount of data for recommendations. Due to this reason, Apache Hadoop is employed

    for scalability, reliability and faster processing.

    Recommender systems (sometimes replacing "system" with a synonym such as platform or

    engine) are a subclass of information filtering system that seek to predict the 'rating' or

    'preference' that user would give to an item. Recommender systems have become extremely

    common in recent years, and are applied in a variety of applications. The most popular ones

    are probably movies, music, news, books, research articles, search queries, social tags, and

    products in general

    1.2 Apache HadoopThe Apache Hadoop software library is a framework that allows for the distributed

    processing of large data sets across clusters of computers using simple programming models.

    It is designed to scale up from single servers to thousands of machines, each offering local

    computation and storage. Rather than rely on hardware to deliver high-availability, the library

    itself is designed to detect and handle failures at the application layer, so delivering a highly-

    available service on top of a cluster of computers, each of which may be prone to failures [1].

    The project includes the following modules:

    Hadoop Common: The common utilities that support the other Hadoop modules. Hadoop Distributed File System (HDFS): A distributed file system that provides

    high-throughput access to application data.

    Hadoop MapReduce: A YARN-based system for parallel processing of large datasets.

  • 7/22/2019 Book Recommender System using Hadoop

    10/55

    2

    CHAPTER 2

    RECOMMENDER SYSTEMS

    2.1 Introduction

    Recommender Systems (RSs) are software tools and techniques providing suggestions for

    items to be of use to a user. The suggestions relate to various decision-making processes,

    such as what items to buy, what music to listen to, what books to read or what online news to

    read.

    Item is the general term used to denote what the system recommends to users. A RS

    normally focuses on a specific type of item (e.g., CDs, or news) and accordingly its design,

    its graphical user interface, and the core recommendation technique used to generate the

    recommendations are all customized to provide useful and effective suggestions for that

    specific type of item. RSs are primarily directed towards individuals who lack sufficient

    personal experience or competence to evaluate the potentially overwhelming number of

    alternative items that a Web site, for example, may offer. In most cases, people are faced with

    choices and very large data volumes, and searching all of them is out of user capability. This

    problem is called information overload. [1]

    A case in point is a book recommender system that assists users to select a book to read. In

    the popular Web site, Amazon.com, the site employs a RS to personalize the online store for

    each customer. Since recommendations are usually personalized, different users or user

    groups receive diverse suggestions.

    In their simplest form, personalized recommendations are offered as ranked lists of items. In

    performing this ranking, RSs try to predict what the most suitable products or services are,

    based on the users preferences and constraints. In order tocomplete such a computational

    task, RSs collect from users their preferences, which are either explicitly expressed, e.g., as

    ratings for products, or are inferred by interpreting user actions. For instance, a RS may

    consider the navigation to a particular product page as an implicit sign of preference for the

    items shown on that page. RSs development initiated from a rather simple observation:

    individuals often rely on recommendations provided by others in making routine, daily

    decisions. For example it is common to rely on what ones peers recommend when selecting

    a book to read; employers count on recommendation letters in their recruiting decisions; and

    when selecting a movie to watch, individuals tend to read and rely on the movie reviews that

  • 7/22/2019 Book Recommender System using Hadoop

    11/55

    3

    a film critic has written and which appear in the newspaper they read. In seeking to mimic

    this behaviour, the first RSs applied algorithms to leverage recommendations produced by a

    community of users to deliver recommendations to an active user, i.e., a user looking for

    suggestions. The recommendations were for items that similar users (those with similar

    tastes) had liked. This approach is termed collaborative-filtering and its rationale is that if the

    active user agreed in the past with some users, then the other recommendations coming from

    these similar users should be relevant as well and of interest to the active user. As e-

    commerce Web sites began to develop, a pressing need emerged for providing

    recommendations derived from filtering the whole range of available alternatives.

    Users were finding it very difficult to arrive at the most appropriate choices from the

    immense variety of items (products and services) that these Web sites were offering. Theexplosive growth and variety of information available on the Web and the rapid introduction

    of new e-business services (buying products, product comparison, auction, etc.) frequently

    overwhelmed users, leading them to make poor decisions.

    The availability of choices, instead of producing a benefit, started to decrease users well-

    being. It was understood that while choice is good, more choice is not always better. Indeed,

    choice, with its implications of freedom, autonomy, and self-determination can become

    excessive, creating a sense that freedom may come to be regarded as a kind of misery-inducing tyranny. RSs have proved in recent years to be a valuable means for coping with the

    information overload problem. Ultimately a RS addresses this phenomenon by pointing the

    user towards the items in which he/she may be interested.

    2.2 Content based Filtering

    Content-based recommender systems work with profiles of users that are created at the

    beginning. A profile has information about a user and his taste. Generally, when creating a

    profile, recommender systems make a survey, to get initial information about a user in order

    to avoid the new-user problem. In the recommendation process, the engine compares the

    items that were already positively rated by the user with the items he didn t rate and looks for

    similarities [2]. Those items that are mostly similar to the positively rated ones, will be

    recommended to the user.

    2.3 Collaborative Filtering

  • 7/22/2019 Book Recommender System using Hadoop

    12/55

    4

    Recommendations can be based on demographics of the users, overall top selling items, or

    past buying habit of users as a predictor of future items. Collaborative Filtering (CF) [2] [3]is

    the most successful recommendation technique to date. The idea of collaborative filtering is

    in finding users in a community that share appreciations. If two users have same or almost

    same rated items in common, then they have similar tastes. Such users build a group or a so

    called neighbourhood. A user gets recommendations to those items that he/she hasnt rated

    before, but that were already positively rated by users in his/her neighbourhood. Table 2.1

    shows that all three users rated the books positively and with similar marks. That means that

    they have similar taste and build a neighbourhood. The user A hasnt rated the book

    ASP.Net, which probably mean that he hasnt watched it yet. As the movie was positively

    rated by the other users, he will get this item recommended. As opposed to simpler

    recommender systems where recommendations base on the most rated item and the most

    popular item methods, collaborative recommender systems care about the taste of user. The

    taste is considered to be constant or at least change slowly.

    Table 2.1: User Ratings

    Collaborative filtering is widely used in e-commerce. Customers can rate books, songs,

    movies and then get recommendations regarding those issues in future. Moreover

    collaborative filtering is utilized in browsing of certain documents (e.g. documents among

    scientific works, articles, and magazines).

    2.3.1 User Based Collaborative Filtering

  • 7/22/2019 Book Recommender System using Hadoop

    13/55

    5

    In the user-based approach, the users perform the main role. If certain majority of the

    customers has the same taste then they join into one group.

    Recommendations are given to user based on evaluation of items by other users form the

    same group, with whom he/she shares common preferences. If the item was positively rated

    by the community, it will be recommended to the user. Thus in the user-based approach the

    items that were already rated by the user before play an important role in searching a group

    that shares appreciations with him.

    Fig. 2.1: User Based Collaborative Filtering

    2.3.2 Item Based Collaborative Filtering

    The item-based approach looks into the set of items the target user has rated and computes

    how similar they are to the target item i and then selects k most similar items {i1, i2, . . . , ik

    }. At the same time their corresponding similarities {si1, si2, . . . , sik } are also computed.

    Once the most similar items are found, the prediction is then computed by taking a weighted

    average of the target users ratingson these similar items [4].

    Fig. 2.2: Item based Collaborative Filtering

  • 7/22/2019 Book Recommender System using Hadoop

    14/55

    6

    2.4 Similarity Measures

    One critical step in the item-based collaborative filtering algorithm is to compute the

    similarity between items and then to select the most similar items. The basic idea in similarity

    computation between two items i and j is to first isolate the users who have rated both of

    these items and then to apply a similarity computation technique to determine the similarity

    si,j[5].

    Fig. 2.3: Item-Item Similarity

    There are different ways to compute the similarity between items. These are cosine-based

    similarity, correlation-based similarity and adjusted-cosine similarity.

    2.4.1 Cosine based Similarity

    In this case, two items are thought of as two vectors in the m dimensional user-space. The

    similarity between them is measured by computing the cosine of the angle between these two

    vectors. Formally, in the m n ratings matrix, similarity between items i andj , denoted by

    sim(i, j ) is given by

    where denotes the dot-product of the two vectors.

    2.4.2 Pearson Correlation-based Similarity

  • 7/22/2019 Book Recommender System using Hadoop

    15/55

    7

    The Pearson correlation of two series is the ratio of their covariance to the product of their

    variances. Covariance is a measure of how much two series move together in absolute terms;

    its big when the series moves far in the same direction fromtheir means in the same places.

    Dividing by the variances merely normalizes for the absolute size of their changes.

    The Pearson correlation is a number between 1 and 1 that measures the tendency of two

    series of numbers, paired up one-to-one, to move together. That is to say, it measures how

    likely a number in one series is to be relatively large when the corresponding number in the

    other series is high, and vice versa. It measures the tendency of the numbers to move together

    proportionally, such that theres a roughly linear relationshipbetween the values in one series

    and the other. When this tendency is high, the correlation is close to 1. When there appears to

    be little relationship at all, the value is near 0. When there appears to be an opposingrelationshipone series numbersare high exactly when the other series numbers are low

    the value is near1.

    In this case, similarity between two items i and j is measured by computing the Pearson-r

    correlation corr i, j. To make the correlation computation accurate we must first isolate the

    co-rated cases (i.e., cases where the users rated both i and j). Let the set of users who both

    rated i andj are denoted by U then the correlation similarity is given by

    2.4.3 Co-occurrence-based Similarity

    Instead of computing thesimilaritybetween every pair of items, in this metric, the algorithm

    will compute the number of times each pair of items occurs together in some users list of

    preferences, in order to fill out the matrix. Co-occurrence is like similarity; the more two

    items turn up together, the more related or similar they probably are.

    Producing the matrix is a simple matter of counting. The entries in the matrix arent affected

    by preference values. These values will enter the computation later

    2.4.4 Euclidian Distance based Similarity

    This implementation is based on the distancebetween users. The users are seen as points in a

    space of many dimensions (as many dimensions as there are items), whose coordinates are

  • 7/22/2019 Book Recommender System using Hadoop

    16/55

    8

    preference values. This similarity metric computes the Euclidean distance d between two

    such user points. This value alone doesnt constitute a valid similarity metric, because larger

    values would mean more-distant, and therefore less similar, users. The value should be

    smaller when users are more similar. Therefore, the implementation actually returns 1 / (1+ d).

    This similarity metric never returns a negative value, but larger values still mean more

    similarity.

    2.4.5 Tanimoto Coefficient based Similarity

    TanimotoCoefficientSimilarity is an implementation which does not take into account the

    preference values specified for the users. It is based on (surprise) the Tanimoto coefficient.

    This value is also known as the Jaccard coefficient. Its the number of items that twousers

    express some preference for, divided by the number of items that either user expresses some

    preference for. In other words, its the ratio of the size of the intersection to the size of the

    union of their preferred items. It has the required properties: when two users items

    completely overlap, the result is 1.0. When they have nothing in common, its 0.0. The value

    is never negative, but thats OK. Its possible to expand the results into the range1 to 1 with

    some simple math:similarity = 2 similarity 1. It wont matter to the framework.

    Fig. 2.4: Tanimoto Coefficient

    2.4.6 Log likelihood based Similarity

    Log-likelihoodbased similarity is similar to the Tanimoto coefficientbased similarity,

    though its more difficult to understand intuitively. Its another metric that doesnt take

    account of individual preference values. Like the Tanimoto coefficient, its based on the

  • 7/22/2019 Book Recommender System using Hadoop

    17/55

    9

    number of items in common between two users, but its value is more an expression of how

    unlikely it is for two users to have so much overlap, given the total number of items out there

    and the number of items each user has a preference for.

    It is a probability based distance. The distance between two clusters is related to the decrease

    in log-likelihood as they are combined into one cluster. In calculating log-likelihood, normal

    distributions for continuous variables and multinomial distributions for categorical variables

    are assumed. It is also assumed that the variables are independent of each other, and so are

    the cases. The distance between clustersjandsis defined as:

    Where

    2.5 Challenges and Issues

    Cold StartIts difficult to give recommendations to new users as his profile is almost empty and

    he hasnt rated any items yet so his taste is unknown to the system. This is called the

    coldstart problem[6]

    . In some recommender systems this problem is solved with

    survey when creating a profile. Items can also have a cold-start when they are new in

    the system and havent been rated before. Both of these problems can be also solved

    with hybrid approaches.

    TrustThe voices of people with a short history may not be that relevant as the voices of

    those who have rich history in their profiles. The issue of trust arises towards

    evaluations of a certain customer. The problem could be solved by distribution ofpriorities to the users.

  • 7/22/2019 Book Recommender System using Hadoop

    18/55

    10

    ScalabilityWith the growth of numbers of users and items, the system needs more resources for

    processing information and forming recommendations. Majority of resources is

    consumed with the purpose of determining users with similar tastes, and goods with

    similar descriptions. This problem is also solved by the combination of various types

    of filters and physical improvement of systems. Parts of numerous computations may

    also be implemented offline in order to accelerate issuance of recommendations

    online.

    SparsityIn online shops that have a huge amount of users and items there are almost always

    users that have rated just a few items. Using collaborative and other approaches

    recommender systems generally create neighbourhoods of users using their profiles. If

    a user has evaluated just few items then its pretty difficult to determine his taste and

    he/she could be related to the wrong neighbourhood. Sparsity is the problem of lack

    of information.

    PrivacyPrivacy has been the most important problem. In order to receive the most accurate

    and correct recommendation, the system must acquire the most amount of information

    possible about the user, including demographic data, and data about the location of a

    particular user. Naturally, the question of reliability, security and confidentiality of the

    given information arises. Many online shops offer effective protection of privacy of

    the users by utilizing specialized algorithms and programs.

  • 7/22/2019 Book Recommender System using Hadoop

    19/55

    11

    CHAPTER 3

    HADOOP DISTRIBUTED FILE SYSTEM

    3.1 Overview

    Hadoop comes with a distributed filesystem called HDFS (Hadoop Distributed Filesystem).

    HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS

    provides high throughput access to application data and is suitable for applications that have

    large data sets [7].

    The Hadoop filesystem is designed for storing petabytes of a file with streaming data access

    using the idea that most efficient data processing pattern is a write-once, readmany- times

    pattern. HDFS stores metadata on a dedicated server, called NameNode. Application data are

    stored on other servers called DataNodes. All the servers are fully connected and

    communicate with each other using TCP-based protocols.

    3.2 Assumptions and Goals

    Hardware FailureHardware failure is the norm rather than the exception. An HDFS instance may consist of

    hundreds or thousands of server machines, each storing part of the file systems data. The

    fact that there are a large number of components and that each component has a non-trivial

    probability of failure means that some component of HDFS is always non-functional.

    Therefore, detection of faults and quick, automatic recovery from them is a core

    architectural goal of HDFS.

    Streaming Data AccessApplications that run on HDFS need streaming access to their data sets. They are notgeneral purpose applications that typically run on general purpose file systems. HDFS is

    designed more for batch processing rather than interactive use by users. The emphasis is

    on high throughput of data access rather than low latency of data access. POSIX imposes

    many hard requirements that are not needed for applications that are targeted for HDFS.

    POSIX semantics in a few key areas has been traded to increase data throughput rates.

    Large Data Sets

  • 7/22/2019 Book Recommender System using Hadoop

    20/55

    12

    Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to

    terabytes in size. Thus, HDFS is tuned to support large files. It should provide high

    aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should

    support tens of millions of files in a single instance.

    Simple Coherency ModelHDFS applications need a write-once-read-many access model for files. A file once

    created, written, and closed need not be changed. This assumption simplifies data

    coherency issues and enables high throughput data access. A MapReduce application or a

    web crawler application fits perfectly with this model. There is a plan to support

    appending-writes to files in the future.

    Moving Computation is Cheaper than Moving DataA computation requested by an application is much more efficient if it is executed near the

    data it operates on. This is especially true when the size of the data set is huge. This

    minimizes network congestion and increases the overall throughput of the system. The

    assumption is that it is often better to migrate the computation closer to where the data is

    located rather than moving the data to where the application is running. HDFS provides

    interfaces for applications to move themselves closer to where the data is located.

    Portability Across Heterogeneous Hardware and Software PlatformsHDFS has been designed to be easily portable from one platform to another. This

    facilitates widespread adoption of HDFS as a platform of choice for a large set of

    applications.

    3.3 Architecture

    HDFS is based on master/slave architecture. A HDFS cluster consists of a single NameNode(as master) and a number of DataNodes (as slaves). The NameNode and DataNodes are

    pieces of software designed to run on commodity machines. These machines typically run a

    GNU/Linux operating system (OS). The usage of the highly portable Java language means

    that HDFS can be deployed on a wide range of machines. A typical deployment has a

    dedicated machine that runs only the NameNode software. Each of the other machines in the

    cluster runs one instance of the DataNodes software. The architecture does not preclude

    running multiple DataNodes on the same machine but in a real deployment one machine

    usually runs one DataNode.

  • 7/22/2019 Book Recommender System using Hadoop

    21/55

    13

    The existence of a single NameNode in a cluster greatly simplifies the architecture of the

    system. The NameNode is the arbitrator and repository for all HDFS metadata. The system is

    designed in such a way that user data never flows through the NameNode.

    3.3.1 Namenode

    NameNode manages the filesystem namespace, metadata for all the files and directories in

    the tree.

    Fig. 3.1: HDFS Architecture

    The file is divided into large blocks (typically 148 megabytes, but the user selectable file-by-

    file) and each block is independently replicated at multiple DataNodes (typically three, but

    user selectable file-by-file) to provide reliability. The NameNode maintains and stores the

    namespace tree and the mapping of file blocks to DataNodes persistently on the local disk in

    the form of two files: the namespace image and the edit log. The NameNode also knows the

    DataNodes on which all the blocks for a given file are located. However, it does not store

    block locations persistently, since this information is reconstructed from DataNodes when the

    system starts.

    On the NameNode failure, the filesystem becomes inaccessible because only NameNodeknows how to reconstruct the files from the blocks on the DataNodes. So, for this reason, it is

  • 7/22/2019 Book Recommender System using Hadoop

    22/55

    14

    important to make the NameNode resilient to failure, and Hadoop provides two mechanisms

    for this: Checkpoint Node and Backup Node.

    3.3.1.1 Checkpoint Node

    Checkpoint is an image record written persistently to disk. NameNode uses two types of files

    to persist its namespace:

    - Fsimage: the latest checkpoint of the namespace

    - Edits: logs containing changes to the namespace; these logs are also called journals.

    NameNode creates an updated file system metadata by merging both files i.e. fsimage and

    edits on restart. The NameNode then overwrites fsimage with the new HDFS state and begins

    a new edits journal.

    The Checkpoint node periodically downloads the latest fsimage and edits from the active

    NameNode to create checkpoints by merging them locally and then to upload new

    checkpoints back to the active NameNode. This requires the same memory space as that of

    NameNode and so checkpoint needs to be run on separate machine. Namespace information

    lost if either the checkpoint or the journal is missing, so it is highly recommended to

    configure HDFS to store the checkpoint and journal in multiple storage directories.

    The Checkpoint node uses parameter fs.checkpoint.period to check the interval between two

    consecutive checkpoints. The Interval time is in seconds (default is 4600 second). The Edit

    log file size is specified by parameter fs.checkpoint.size (default size 64MB) and a

    checkpoint triggers if size exceeds. Multiple checkpoint nodes may be specified in the cluster

    configuration file.

    3.3.1.2 Backup Node

    The Backup node has the same functionality as the Checkpoint node. In addition, it maintainsan in-memory, up-to-date copy of the file system namespace that is always synchronized with

    the active NameNode state. Along with accepting a journal stream of the filesystem edits

    from the NameNode and persisting this to disk, the Backup node also applies those edits into

    its own copy of the namespace in memory, thus creating a backup of the namespace.

    Unlike the Checkpoint node, the Backup node has an up-to-date state of the namespace state

    in memory. The Backup node requires same RAM as of NameNode. The NameNode

    supports one Backup node at a time. No Checkpoint nodes may be registered if a Backup

  • 7/22/2019 Book Recommender System using Hadoop

    23/55

    15

    node is in use. The Backup node takes care of the namespace data persistence and NameNode

    does not need to have persistent store.

    3.3.2 DataNodes

    There are a number of DataNodes, usually one per node in the cluster, which manage storage

    attached to the nodes. HDFS exposes a file system namespace and allows user data to be

    stored in files. Internally, a file is split into one or more blocks and these blocks are stored in

    a set of DataNodes[7]. The NameNode executes the file system namespace operations such as

    opening, closing, and renaming files and directories. It also determines the mapping of blocks

    to DataNodes.

    DataNodes store and retrieve blocks when requested (by clients or the NameNode), and they

    report back to the NameNode periodically with lists of blocks they are storing. The

    DataNodes are responsible for serving read and write requests from the file systems clients.

    The DataNodes also perform block creation, deletion, and replication upon instruction from

    the NameNode. DataNodes and NameNode connections are established by handshake where

    namespace ID and the software version of the DataNodes are verified. The namespace ID is

    assigned to the file system instance when it is formatted. The namespace ID is stored

    persistently on all nodes of the cluster. A different namespace ID node cannot join the cluster.

    A new DataNode without any namespace ID can join the cluster and receive the clusters

    namespace ID and DataNode registers with the NameNode with storage ID. A DataNode

    identifies block replicas in its possession to the NameNode by sending a block report. A

    block report contains the block id, the generation stamp and the length for each block replica

    the server hosts. The first block report is sent immediately after the DataNodes registrations.

    Subsequent block reports are sent every hour and provide the NameNode with an up-to date

    view of where block replicas are located on the cluster.

    3.3.3 HDFS Client

    Reading a file

    To read a file, HDFS client first contacts NameNode. It returns list of addresses of the

    DataNodes that have a copy of the blocks of the file. Then client connects to the closest

    DataNodes directly for each block and requests the transfer of the desired block. Figure 7

    shows the main sequence of events involved in reading data from HDFS.

  • 7/22/2019 Book Recommender System using Hadoop

    24/55

    16

    Fig. 3.2: Reading a file

    Writing to a File

    For writing to a file, HDFS client first creates an empty file without any blocks. File

    creation is only possible when the client has writing permission and a new file does

    not exist in the system. NameNode records new file creation and allocates data blocks

    to list of suitable DataNodes to host replicas of the first block of the file. Replication

    of data makes DataNodes in pipeline. When the first block is filled, new DataNodes

    are requested to host replicas of the next block. A new pipeline is organized, and the

    client sends the further bytes of the file. Each choice of DataNodes is likely to be

    different.

    If a DataNode in pipeline fails while writing the data then pipeline is first closed and

    partial block on failed data node is deleted and failed DataNode is removed from the

    pipeline. New DataNodes in the pipeline are chosen to write remaining blocks of data.

  • 7/22/2019 Book Recommender System using Hadoop

    25/55

    17

    Fig. 3.3: Writing to a file

    3.4 Data Replication

    HDFS is designed to reliably store very large files across machines in a large cluster. It stores

    each file as a sequence of blocks; all blocks in a file except the last block are the same size.

    The blocks of a file are replicated for fault tolerance. The block size and replication factor are

    configurable per file. An application can specify the number of replicas of a file. The

    replication factor can be specified at file creation time and can be changed later. Files in

    HDFS are write-once and have strictly one writer at any time.

    The NameNode makes all decisions regarding replication of blocks. It periodically receives a

    Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of aHeartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of

    all blocks on a DataNode.

  • 7/22/2019 Book Recommender System using Hadoop

    26/55

    18

    Fig 3.4: Block Replication

    3.4.1 Replica Placement

    The placement of replicas is critical to HDFS reliability and performance. Optimizing replica

    placement distinguishes HDFS from most other distributed file systems. This is a feature that

    needs lots of tuning and experience. The purpose of a rack-aware replica placement policy is

    to improve data reliability, availability, and network bandwidth utilization. The current

    implementation for the replica placement policy is a first effort in this direction. The short-

    term goals of implementing this policy are to validate it on production systems, learn moreabout its behavior, and build a foundation to test and research more sophisticated policies.

    Large HDFS instances run on a cluster of computers that commonly spread across many

    racks. Communication between two nodes in different racks has to go through switches. In

    most cases, network bandwidth between machines in the same rack is greater than network

    bandwidth between machines in different racks.

    The NameNode determines the rack id each DataNode belongs to via the process outlined

    in Hadoop Rack Awareness. A simple but non-optimal policy is to place replicas on unique

  • 7/22/2019 Book Recommender System using Hadoop

    27/55

    19

    racks. This prevents losing data when an entire rack fails and allows use of bandwidth from

    multiple racks when reading data. This policy evenly distributes replicas in the cluster which

    makes it easy to balance load on component failure. However, this policy increases the cost

    of writes because a write needs to transfer blocks to multiple racks.

    For the common case, when the replication factor is three, HDFSs placement policy is to put

    one replica on one node in the local rack, another on a node in a different (remote) rack, and

    the last on a different node in the same remote rack. This policy cuts the inter-rack write

    traffic which generally improves write performance. The chance of rack failure is far less

    than that of node failure; this policy does not impact data reliability and availability

    guarantees. However, it does reduce the aggregate network bandwidth used when reading

    data since a block is placed in only two unique racks rather than three. With this policy, thereplicas of a file do not evenly distribute across the racks. One third of replicas are on one

    node, two thirds of replicas are on one rack, and the other third are evenly distributed across

    the remaining racks. This policy improves write performance without compromising data

    reliability or read performance.

    3.4.2 Replica Selection

    To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read

    request from a replica that is closest to the reader. If there exists a replica on the same rack as

    the reader node, then that replica is preferred to satisfy the read request. If angg/ HDFS

    cluster spans multiple data centers, then a replica that is resident in the local data center is

    preferred over any remote replica.

    3.4.3 Safemode

    On startup, the NameNode enters a special state called Safemode. Replication of data blocks

    does not occur when the NameNode is in the Safemode state. The NameNode receives

    Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of

    data blocks that a DataNode is hosting. Each block has a specified minimum number of

    replicas. A block is considered safely replicated when the minimum number of replicas of

    that data block has checked in with the NameNode. After a configurable percentage of safely

    replicated data blocks checks in with the NameNode (plus an additional 40 seconds), the

    NameNode exits the Safemode state. It then determines the list of data blocks (if any) that

    still have fewer than the specified number of replicas. The NameNode then replicates these

    blocks to other DataNodes.

  • 7/22/2019 Book Recommender System using Hadoop

    28/55

    20

    3.5 Data Organisation

    3.5.1 Data Blocks

    HDFS is designed to support very large files. Applications that are compatible with HDFS

    are those that deal with large data sets. These applications write their data only once but they

    read it one or more times and require these reads to be satisfied at streaming speeds. HDFS

    supports write-once-read-many semantics on files. A typical block size used by HDFS is 64

    MB. Thus, an HDFS file is chopped up into 64 MB chunks, and if possible, each chunk will

    reside on a different DataNode.

    3.5.2 Staging

    A client request to create a file does not reach the NameNode immediately. In fact, initiallythe HDFS client caches the file data into a temporary local file. Application writes are

    transparently redirected to this temporary local file. When the local file accumulates data

    worth over one HDFS block size, the client contacts the NameNode. The NameNode inserts

    the file name into the file system hierarchy and allocates a data block for it. The NameNode

    responds to the client request with the identity of the DataNode and the destination data

    block. Then the client flushes the block of data from the local temporary file to the specified

    DataNode. When a file is closed, the remaining un-flushed data in the temporary local file is

    transferred to the DataNode. The client then tells the NameNode that the file is closed. At this

    point, the NameNode commits the file creation operation into a persistent store. If the

    NameNode dies before the file is closed, the file is lost.

    The above approach has been adopted after careful consideration of target applications that

    run on HDFS. These applications need streaming writes to files. If a client writes to a remote

    file directly without any client side buffering, the network speed and the congestion in the

    network impacts throughput considerably. This approach is not without precedent. Earlierdistributed file systems, e.g. AFS, have used client side caching to improve performance. A

    POSIX requirement has been relaxed to achieve higher performance of data uploads.

    3.5.3 Replication Pipelining

    When a client is writing data to an HDFS file, its data is first written to a local file as

    explained in the previous section. Suppose the HDFS file has a replication factor of three.

    When the local file accumulates a full block of user data, the client retrieves a list of

    DataNodes from the NameNode. This list contains the DataNodes that will host a replica of

  • 7/22/2019 Book Recommender System using Hadoop

    29/55

    21

    that block. The client then flushes the data block to the first DataNode. The first DataNode

    starts receiving the data in small portions (4 KB), writes each portion to its local repository

    and transfers that portion to the second DataNode in the list. The second DataNode, in turn

    starts receiving each portion of the data block, writes that portion to its repository and then

    flushes that portion to the third DataNode. Finally, the third DataNode writes the data to its

    local repository. Thus, a DataNode can be receiving data from the previous one in the

    pipeline and at the same time forwarding data to the next one in the pipeline. Thus, the data is

    pipelined from one DataNode to the next.

    3.6 HDFS Features

    3.6.1 Communication Protocols

    All HDFS communication protocols are layered on top of the TCP/IP protocol. A client

    establishes a connection to a configurable TCP port on the NameNode machine. It talks the

    ClientProtocol with the NameNode. The DataNodes talk to the NameNode using the

    DataNodes Protocol. A Remote Procedure Call (RPC) abstraction wraps both the Client

    Protocol and the DataNodes Protocol. By design, the NameNode never initiates any RPCs.

    Instead, it only responds to RPC requests issued by DataNodes or clients.

    3.6.2 Data Disk Failure, Heartbeats and Re-Replication

    The primary objective of the HDFS is to store data reliably even in the presence of failures.

    The three common types of failures are NameNode failures, DataNodes failures and network

    partitions.

    NameNode considers DataNodes as alive as long as it receives Heartbeat message (default

    Heartbeat interval is three seconds) from DataNodes. If the NameNode does not receive a

    heartbeat from a DataNodes in ten minutes the NameNode considers the DataNodes as dead

    and stop forwarding IO request to it. The NameNode then schedules the creation of new

    replicas of those blocks on other DataNodes.

    Heartbeats carry information about total storage capacity, fraction of storage in use, and the

    number of data transfers currently in progress. These statistics are used for the NameNodes

    space allocation and load balancing decisions. The NameNode can process thousands of

    heartbeats per second without affecting other NameNode operations.

    3.6.3 Cluster Rebalancing

  • 7/22/2019 Book Recommender System using Hadoop

    30/55

    22

    The HDFS architecture has data rebalancing schemes in which data is automatically moved

    from one DataNode to another if the free space threshold is reached. In the event of a sudden

    high demand for a particular file, a scheme might dynamically create additional replicas and

    rebalance other data in the cluster. These types of data rebalancing schemes are not yet

    implemented.

    3.6.4 Data Integrity

    Block of data can be corrupted due to many reasons such as network faults, buggy software

    or faults in a storage device. So, at the time of file creation checksum is used and stored for

    each block. While retrieving a file, it is first verified with those checksums and if verification

    fails, then another replica of data is used.

    3.6.5 Metadata Disk Failure

    Corrupted Fsimage and the EditLog may stop the HDFS functioning. For redundancy,

    NameNode is configured to have multiple copies of these files and are updated

    synchronously.

    3.7 Advantages and Disadvantages

    Advantages of the HDFS are:

    Reliable storage - HDFS is a fault tolerant storage system. HDFS can significantlystore huge amounts of data, scale up incrementally and can effectively handle the

    failure of significant parts of the storage infrastructure without losing data.

    Commodity hardware - HDFS is designed to run on highly unreliable hardware andso is less expensive compared to other fault tolerant storage systems.

    Distributed - HDFS data are distributed over many nodes in a cluster and so parallelanalyses are possible and this eliminates the bottlenecks imposed by monolithic

    storage systems.

    Availability - Block replication is one of the main features of HDFS. By default eachblock is replicated by the client to three DataNodes but replication factor can be

    configured more than 4 at creation time. Because of replication HDFS provides high

    availability of data in high demand.

    Limitations in HDFS are:

  • 7/22/2019 Book Recommender System using Hadoop

    31/55

    23

    Architectural bottlenecks - There are scheduling delays in the Hadoop architecturethat result in cluster nodes waiting for new tasks. More over disk is not used in a

    streaming manner, the access pattern is periodic. HDFS client serializes computation

    and I/O instead of decoupling and pipelining those operations.

    Portability limitations - HDFS being in Java could not able to support someperformance-enhancing features in the native filesystem.

    Small file - HDFS is not efficient for large numbers of small files. Single MasterNodeThere might be risk of data loss because of single NameNode. Latency data accessAt the expense of latency HDFS delivers a high throughput of

    data. If an application needs lowlatency access to data then HDFS is not a good

    choice.

  • 7/22/2019 Book Recommender System using Hadoop

    32/55

    24

    CHAPTER 4

    HADOOP MAPREDUCE

    4.1 Introduction

    Hadoop MapReduce is a software framework for easily writing applications which process

    vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of

    nodes) of commodity hardware in a reliable, fault-tolerant manner[8].

    A MapReduce job usually splits the input data-set into independent chunks which are

    processed by the map tasks in a completely parallel manner. The framework sorts the outputs

    of the maps, which are then input to the reduce tasks. Typically both the input and the output

    of the job are stored in a file-system. The framework takes care of scheduling tasks,

    monitoring them and re-executes the failed tasks.

    Typically the compute nodes and the storage nodes are the same, that is, the MapReduce

    framework and the Hadoop Distributed File System are running on the same set of nodes.

    This configuration allows the framework to effectively schedule tasks on the nodes where

    data is already present, resulting in very high aggregate bandwidth across the cluster.

    The MapReduce framework consists of a single master JobTracker and oneslave TaskTracker per cluster-node. The master is responsible for scheduling the jobs'

    component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves

    execute the tasks as directed by the master.

    Minimally, applications specify the input/output locations and

    supply map and reduce functions via implementations of appropriate interfaces and/or

    abstract-classes. These, and other job parameters, comprise the job configuration. The

    Hadoop job client then submits the job (jar/executable etc.) and configuration to

    the JobTrackerwhich then assumes the responsibility of distributing the

    software/configuration to the slaves, scheduling tasks and monitoring them, providing status

    and diagnostic information to the job-client.

    4.2 Inputs and Outputs

    The MapReduce framework operates exclusively on pairs, that is, the

    framework views the input to the job as a set of pairs and produces a set

    of pairs as the output of the job, conceivably of different types.

  • 7/22/2019 Book Recommender System using Hadoop

    33/55

    25

    The key and value classes have to be serializable by the framework and hence need to

    implement the Writable interface. Additionally, the key classes have to implement

    the WritableComparable interface to facilitate sorting by the framework.

    Input and Output types of a MapReduce job:

    (input) -> map-> -> combine-> -> reduce-> (output)

    Fig. 4.1: Map Reduce Framework

    4.3 User Interfaces

    4.3.1 Mapper

    Mapper maps input key/value pairs to a set of intermediate key/value pairs.

    Maps are the individual tasks that transform input records into intermediate records. The

    transformed intermediate records do not need to be of the same type as the input records. A

    given input pair may map to zero or many output pairs.

    4.3.2 Reducer

    Reducer reduces a set of intermediate values which share a key to a smaller set of values.

  • 7/22/2019 Book Recommender System using Hadoop

    34/55

    26

    The number of reduces for the job is set by the user via JobConf.setNumReduceTasks(int).

    Overall, Reducer implementations are passed the JobConf for the job via

    the JobConfigurable.configure(JobConf) method and can override it to initialize themselves.

    The framework then calls reduce( WritableComparable,Iterator, OutputCollector,

    reporter) method for each pair in the grouped inputs. Applications can

    then override the Closeable.close() method to perform any required cleanup.

    Reducer has 4 primary phases: shuffle, sort and reduce.

    Shuffle

    Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches

    the relevant partition of the output of all the mappers, via HTTP.

    Sort

    The framework groups Reducer inputs by keys (since different mappers may have output the

    same key) in this stage.

    The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they

    are merged.

    Secondary Sort

    If equivalence rules for grouping the intermediate keys are required to be different from those

    for grouping keys before reduction, then one may specify a Comparator via

    JobConf.setOutputValueGroupingComparator(Class).

    Since JobConf.setOutputKeyComparatorClass(Class) can be used to control how

    intermediate keys are grouped, these can be used in conjunction to simulate secondary sort

    on values.

    Reduce

    In this phase the reduce(WritableComparable, Iterator, OutputCollector, Reporter) method is

    called for each pair in the grouped inputs.

    The output of the reduce task is typically written to

    the FileSystem via OutputCollector.collect(WritableComparable, Writable).

    Applications can use the Reporter to report progress, set application-level status messages

    and update Counters, or just indicate that they are alive.

  • 7/22/2019 Book Recommender System using Hadoop

    35/55

    27

    The output of the Reducer is not sorted.

    4.3.3 Partitioner

    Partitioner partitions the key space.

    Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or

    a subset of the key) is used to derive the partition, typically by a hash function. The total

    number of partitions is the same as the number of reduce tasks for the job. Hence this controls

    which of the m reduce tasks the intermediate key (and hence the record) is sent to for

    reduction.

    4.3.4 Reporter

    Reporter is a facility for MapReduce applications to report progress, set application-level

    status messages and update Counters.

    Mapper and Reducer implementations can use the Reporter to report progress or just indicate

    that they are alive. In scenarios where the application takes a significant amount of time to

    process individual key/value pairs, this is crucial since the framework might assume that the

    task has timed-out and kill that task. Another way to avoid this is to set the configuration

    parameter mapred.task.timeout to a high-enough value (or even set it tozero for no time-

    outs).

    4.3.5 Output Collector

    OutputCollector is a generalization of the facility provided by the MapReduce framework to

    collect data output by the Mapper or the Reducer (either the intermediate outputs or the

    output of the job).

    4.4 Job Configuration

    JobConf represents a MapReduce job configuration.

    JobConf is the primary interface for a user to describe a MapReduce job to the Hadoop

    framework for execution. The framework tries to faithfully execute the job as described

    by JobConf, however, while some job parameters are straight-forward to set

    (e.g. setNumReduceTasks(int)), other parameters interact subtly with the rest of the

    framework and/or job configuration and are more complex to set

    (e.g. setNumMapTasks(int)).

  • 7/22/2019 Book Recommender System using Hadoop

    36/55

    28

    JobConf is typically used to specify the Mapper, combiner (if

    any), Partitioner, Reducer, InputFormat, OutputFormat and OutputCommitter implementation

    s. JobConf also indicates the set of input files (setInputPaths(JobConf, Path...) / addInputPath

    (JobConf,Path)) and (setInputPaths (JobConf, String) / addInputPaths(JobConf,String)) and

    where the output files should be written (setOutputPath (Path)).

    Fig 4.2: Task Environment

    4.5 MapReduce Features

    CountersCounters represent global counters, defined either by the Map/Reduce framework or

    applications. Each Counter can be of any Enum type. Counters of a particular Enum are

    bunched into groups of type Counters.Group.

    DistributedCacheDistributedCache distributes application-specific, large, read-only files efficiently.

  • 7/22/2019 Book Recommender System using Hadoop

    37/55

    29

    DistributedCache is a facility provided by the Map/Reduce framework to cache files (text,

    archives, jars and so on) needed by applications.

    Applications specify the files to be cached via urls (hdfs://) in the JobConf.

    The DistributedCache assumes that the files specified via hdfs:// urls are already present

    on the FileSystem.

    The framework will copy the necessary files to the slave node before any tasks for the job

    are executed on that node. Its efficiency stems from the fact that the files are only copied

    once per job and the ability to cache archives which are un-archived on the slaves.

    DistributedCache tracks the modification timestamps of the cached files. Clearly the cache

    files should not be modified by the application or externally while the job is executing.

    DistributedCache can be used to distribute simple, read-only data/text files and more

    complex types such as archives and jars. Archives (zip, tar, tgz and tar.gz files) are un-

    archivedat the slave nodes. Files have execution permissionsset.

    ToolThe Tool interface supports the handling of generic Hadoop command-line options.

    Tool is the standard for any Map/Reduce tool or application. The application should

    delegate the handling of standard command-line options

    to GenericOptionsParser viaToolRunner.run(Tool, String[]) and only handle its custom

    arguments.

    IsolationRunnerIsolationRunner is a utility to help debug Map/Reduce programs.

    To use the IsolationRunner, first set keep.failed.tasks.files to true

    ProfilingProfiling is a utility to get a representative (2 or 4) sample of built-in java profiler for a

    sample of maps and reduces.

    User can specify whether the system should collect profiler information for some of the

    tasks in the job by setting the configuration property mapred.task.profile. The value can be

    set using the api JobConf.setProfileEnabled(boolean). If the value is set true, the task

    profiling is enabled. The profiler information is stored in the user log directory. By default,profiling is not enabled for the job.

  • 7/22/2019 Book Recommender System using Hadoop

    38/55

    30

    DebuggingMap/Reduce framework provides a facility to run user-provided scripts for debugging.

    When map/reduce task fails, user can run script for doing post-processing on task logs i.e

    task's stdout, stderr, syslog and jobconf. The stdout and stderr of the user-provided debug

    script are printed on the diagnostics. These outputs are also displayed on job UI on

    demand.

    In the following sections we discuss how to submit debug script along with the job. For

    submitting debug script, first it has to distributed. Then the script has to supplied in

    Configuration.

    JobControlJobControl is a utility which encapsulates a set of Map/Reduce jobs and their

    dependencies.

    Data CompressionHadoop Map/Reduce provides facilities for the application-writer to specify compression

    for both intermediate map-outputs and the job-outputs i.e. output of the reduces. It also

    comes bundled with CompressionCodec implementations for the zlib and lzo compression

    algorithms.

    Intermediate OutputsApplications can control compression of intermediate map-outputs via

    the JobConf.setCompressMapOutput(boolean) api and the CompressionCodec to be used

    via theJobConf.setMapOutputCompressorClass(Class) api.

    Job OutputsApplications can control compression of job-outputs via

    the FileOutputFormat.setCompressOutput(JobConf, boolean) api and

    the CompressionCodec to be used can be specified via

    the FileOutputFormat.setOutputCompressorClass(JobConf, Class) api.

  • 7/22/2019 Book Recommender System using Hadoop

    39/55

    31

    CHAPTER 5

    ITEM BASED RECOMMENDER ALGORITHM

    5.1 Algorithm

    Given a set of items that someone is known to like, recommend a new set of items that are

    similar to them. Two items can be regarded as similar to one another if different people like

    both of them. In terms of pseudo code [9]:

    for every item ithat has uhas no preference for yet

    for every itemjthat uhas a preference for

    compute a similaritysbetween iandj

    add us preference forj, weighted bys, to a running average

    return top items, ranked by weighted average

    5.2 Logical Parts of Code

    The code is structured so that it breaks the recommender into four logical parts:

    1. Prepare the preference matrix2. Generate the similarity matrix3. Prepare for matrix multiplication4. Multiply matrices

    5.2.1 Preparation of preference matrix

    In this step, three mapreduce jobs are executed

    1. ItemIDIndexMapper and ItemIDIndexReducer maps item ids in the input datafrom Long types to Integer types.

    2. ToItemPrefsMapper and ToUserVectorsReducergenerates the user-item vectors.

    3. ToItemVectorsMapper and ToItemVectorsReducergenerates the item-user vectors.

  • 7/22/2019 Book Recommender System using Hadoop

    40/55

    32

    5.2.2 Generation of similarity matrix

    In this step, three mapreduce jobs are executed

    1. VectorNormMapper and MergeVectorsReducer reads in the item-user vectors andoutputs them as user-item vectors, calculating statistical matrix properties in the

    process. These statistics are later employed in matrix calculations.

    2. CooccurrencesMapper and SimilarityReducer reads in the user-item vectors fromthe previous step and creates half of an item-item similarity matrix. Only half of the

    matrix is generated at this stage as it is symmetric along its diagonal, so the other half

    can be inferred. The matrix also lacks values along its diagonal.

    3. UnsymmetrifyMapper and MergeToTopKSimilaritiesReducer reads half of theitem-item similarity matrix and generates an almost complete one, missing only the

    values along its diagonal.

    5.2.3 Preparation for matrix multiplication

    With both the item-item similarity matrix and the user-item vectors, its now possible to

    multiply them together and generate recommendations for users. In this part of the code, the

    matrix multiplication begins, again, by implementing three MapReduce jobs.

    1. SimilarityMatrixRowWrapperMapper and default Reducer completes the item-itemsimilarity matrix by adding in Double.NaNvalues along its diagonal. In preparation

    for the item-item matrix being multiplied with the item-user vectors, it converts the

    matrix from VectorWritable to VectorOrPrefWritable.

  • 7/22/2019 Book Recommender System using Hadoop

    41/55

    33

    2. UserVectorSplitterMapper and Reducerreads in user-item vectors and outputs themas item-user vectors, formatted as VectorOrPrefWritable, to make them compatible

    for multiplication with the item-item matrix.

    3. Default Mapper and ToVectorAndPrefReducer combines the output from theprevious two steps, thus starting the multiplication of the item-item similarity matrix

    with the item-user vectors.

    5.2.4 Matrix multiplication

    Now that the item-item similarity matrix and the item-user vectors have been combined

    together into a single source of data, the matrix multiplication can be completed and userrecommendations generated. This process is implemented in a single MapReduce job.

    1. PartialMultiplyMapper and AggregateAndRecommendReducer completes thematrix multiplication, producing item recommendations for users.

  • 7/22/2019 Book Recommender System using Hadoop

    42/55

    34

    CHAPTER 6

    SYSTEM PREPARATIONAND IMPLEMENTATION

    6.1 Runtime Environment

    Hadoop engine was deployed over the 2 node cluster and Java runtime was setup to use the

    common Hadoop configuration, as specified by the NameNode (master node) in the cluster.

    6.2 Software and Language Versions

    Hadoop 1.2.1 Java 1.6 Maven 3 Mahout 0.20

    6.3 Hardware Specification of each Hadoop Node

    Hadoop clusters have identical hardware specifications for all the cluster nodes. Table lists

    the specification of nodes.

    Operating System Ubuntu 12.04 LTS (64 bit)

    Processor Intel Core i3 (Quad Core)

    Memory 3GB

    Disk Space 160GB

    Table 6.1 : Hardware Specification

    6.4 Hadoop Configuration

    /etc/hosts

    The /etc/hosts file in each machine contains the IP address of the nodes and their labels. In

    this case the labels assigned were master and slave.

    hadoop-env.sh

    The only required environment variable that has to be configured for Hadoop is

    JAVA_HOME

    core-site.xml

  • 7/22/2019 Book Recommender System using Hadoop

    43/55

    35

    The following configurations are added

    hadoop.tmp.dir

    /app/hadoop/tmp

    A base for other temporary directories.

    fs.default.name

    hdfs://master:54410

    The name of the default file system. A URI whose

    scheme and authority determine the FileSystem implementation. The

    uri's scheme determines the config property (fs.SCHEME.impl) naming

    the FileSystem implementation class. The uri's authority is used to

    determine the host, port, etc. for a filesystem.

    mapred-site.xml

    The following configurations are added

    mapred.job.tracker

    localhost:54411

    The host and port that the MapReduce job tracker runs

    at. If "local", then jobs are run in-process as a single map

    and reduce task.

    hdfs-site.xml

    The following configurations are added

    dfs.replication

    1

    Default block replication.

    The actual number of replications can be specified when the file is

    created.

    The default is used if replication is not specified in create time.

    masters

    This file contains the label of the master. In this case- master is added to the file.

    slaves

  • 7/22/2019 Book Recommender System using Hadoop

    44/55

    36

    This file contains the labels of the slaves. In this case- slave is added to the file.

    6.5 Starting the cluster

    Starting the cluster is performed in three steps

    6.5.1 Formatting the Namenode

    This initialises the namespace and the namenode itself.

    Fig. 6.1: Formatting namenode

    6.5.2 Starting HDFS daemons

    The NameNode daemon is started on master, and DataNode daemons are started on the slave.

    Fig. 6.2: HDFS daemons

    6.5.3 Starting mapred daemonsThe JobTracker is started on master, and TaskTracker daemons are started on the slave.

  • 7/22/2019 Book Recommender System using Hadoop

    45/55

    37

    Fig. 6.3: Mapred daemons

    6.5.4 Starting the JobThe jar file containing the map reduce jobs are specified as argument for hadoop.

    Fig. 6.4: Starting the job

    6.6 Monitoring the cluster

    The web interfaces provided by hadoop can be used to check the status of the jobs.

    The status of the namenode is shown by the following web interface

    Fig. 6.5: Namenode interface

    Shown below is the interface for JobTracker in the master machine. It displays the number of

    tasks that are running and their other details.

  • 7/22/2019 Book Recommender System using Hadoop

    46/55

    38

    Fig. 6.6: JobTracker interface

    Fig. 6.7: Scheduling interface 1

  • 7/22/2019 Book Recommender System using Hadoop

    47/55

    39

    Fig. 6.8: Scheduling interface 2

  • 7/22/2019 Book Recommender System using Hadoop

    48/55

    40

    CHAPTER 7

    RESULTS AND CONCLUSION

    7.1 Results

    7.1.1 Run Time

    The Recommendation job is run 5 times with each of the respective item similarity metrics

    namely similarity co-occurrence, similarity Loglikelihood, Tanimoto coefficient, Euclidian

    distance and Pearson coefficient.

    The run time of each algorithm was calculated using the linux time command which gives us

    the real, user and sys times taken for the command to execute and the results (time inseconds) are summarised below

    Metric/Run no. 1 2 4 4 5 Average

    Similarity Co-

    occurrence566.740 564.924 566.108 565.098 564.982 565.570

    Tanimoto

    Coefficient589.650 590.472 588.847 589.241 589.594 589.548

    Similarity

    Loglikelihood704.074 705.012 704.725 704.104 705.021 704.586

    Euclidian

    Distance588.212 589.027 588.624 587.984 588.149 588.499

    Pearson

    Coefficient441.286 440.956 442.012 441.459 441.427 441.428

    Table 7.1: Time Taken

  • 7/22/2019 Book Recommender System using Hadoop

    49/55

    41

    Fig 7.1: Average Run Time

    7.1.2 Recommendations

    The Book Crossing dataset was used for the recommender job. It contained user

    details, book ratings and book details such as name, genre, etc.

    A python script was coded which takes the ratings file, users file and book details file

    as parameters and displays the rated books and the recommended books for the user

    which has been specified through the user ID in the script in the console.

    The rated books are displayed with the book name followed by the user rating given to

    them by the user.

    The recommendations were made based on the similarity matrix specified to them.

    0

    100

    200

    300

    400

    500

    600

    700

    800

    Average Run Time (s)

    Similarity Co-occurrence

    Tanimoto Coefficient

    Similarity Loglikelihood

    Euclidian Distance

    Pearson Coefficient

  • 7/22/2019 Book Recommender System using Hadoop

    50/55

    42

    (a) Similarity Cooccurrence

    Fig. 7.2: Similarity Co-occurrence Recommendations

    (b) Tanimoto Coefficient

    Fig. 7.3: Tanimoto Recommendations

  • 7/22/2019 Book Recommender System using Hadoop

    51/55

    43

    (c) Similarity Loglikelihood

    Fig. 7.4: Loglikelihood Recommendations

    (d) Euclidian Distance

    Fig. 7.5: Euclidian Recommendations

  • 7/22/2019 Book Recommender System using Hadoop

    52/55

    44

    (e) Pearson Coefficient

    Fig. 7.6: Pearson Recommendations

    7.2 Conclusion

    The recommender job was successfully run on the Hadoop distributed environment.

    The following conclusions can be drawn from the results above.

    The time taken by Loglikelihood is the greatest. This is so because it eliminates all theintersections by chance by counting the number of times the items occur together and

    also when they are not together. This increases the relevance of the recommendations

    but also increases the overhead.

    Though the Pearson correlation takes the least time but it is used only theoretically. Ithas a few drawbacks in principle which do not make recommendations accurate. First,

    it doesnt take into account the number of items in which two users preferences

    overlap, which is probably a weakness in the context of recommender engines. Two

    items that have been read by 100 of the same users, for instance, even if they dont

    often agree on ratings, are probably more similar than two items which have only

    been read by two users in common. This is not considered by this metric and

    similarity of the two user ratings outweighs the 100 similar users. Second, if two

  • 7/22/2019 Book Recommender System using Hadoop

    53/55

    45

    items overlap on only one user, no correlation can be computed because of how the

    computation is defined. This could be an issue for small or sparse data sets, in which

    users item sets rarely overlap.Finally, the correlation is also undefined if either series

    of preference values are all identical. It doesnt require that bothseries have all the

    same preference values.

    The run times of Euclidian distance, similarity co-occurrence and tanimoto coefficientwere nearly equal and lie between Loglikelihood and Pearson coefficient.

  • 7/22/2019 Book Recommender System using Hadoop

    54/55

    46

    CHAPTER 8

    FUTURE SCOPE

    The system implemented in the project uses static data to recommend books to the users. To

    incorporate dynamic data, distributed databases such as HBase or Cassandra can be used

    which can be regularly updated to add new users and ratings. To make the web application,

    the data needs to be accessible in real time. The solution to this too can be the use of a

    distributed database.

    The recommender system can be improved by combining user based collaborative filtering

    and content based filtering with the current system. This combination is also called Hybrid

    filtering and it helps in significantly performance improvement.

    The comparison made between the different similarity metrics was based on the run time and

    not on the precision of the recommendations.

  • 7/22/2019 Book Recommender System using Hadoop

    55/55

    CHAPTER 9

    REFERENCES

    [1] A. Felfering, G. Friedrich and Schmidt Thieme, Recommender systems, IEEE Intelligent

    systems, pages 18-21, 2007.

    [2] P. Resnick, N. Iacovou, M. Suchak, and J. Riedl, GroupLens: An Open Archi tecture for

    Collaborative F il teri ng of Netnews, In Proceedings of CSCW 94, Chapel Hill, NC, 1994.

    [3] U. Shardanand and P. Maes Social I nformation F il teri ng: Algori thms for Automating

    Word of Mouth, In Proceedings of CHI 95. Denver, 1995.

    [4] Daniar Asanov, Algori thms and M ethods in Recommender Systems, Berlin Institute of

    Technology, Berlin, 2011

    [5] Badrul Sarwar , George Karypis and John Riedl, I tem-Based Collaborative F il ter ing

    Recommendation Algorithms, IW3C2: Hong Kong, China, 2001 [Online].

    http://wwwconference.org/www10/cdrom/papers/519/index.html

    [6] Francesco Ricci, Lior Rokach and Bracha Shapira, I ntr oduction to Recommender System

    Handbook,New York: Springer Science+Buisness Media Ltd, 2011, ch. 1, sec. 1.4, pp

    10-14.

    [7] D. Borthakur and S. Dhruba,HDFS archi tecture guide,Hadoop Apache Project, 2008

    [Online]. http://hadoop.apache.org/common/docs/current/hdfsdesign.pdf

    [8] Dean, Jeffrey, and Sanjay Ghemawat, MapReduce: simpli fi ed data processing on large

    clusters, Communications of the ACM51.1, 2008, pp 107-113.

    [9] Linden, Greg, Brent Smith, and Jeremy York, Amazon.com recommendations: I tem-to-

    item collaborative fi ltering, Internet Computing, IEEE 7.1, 2003, pp 76-80.

    http://wwwconference.org/www10/cdrom/papers/519/index.htmlhttp://wwwconference.org/www10/cdrom/papers/519/index.html