book recommender system using hadoop
TRANSCRIPT
-
7/22/2019 Book Recommender System using Hadoop
1/55
Project Report
On
BOOK RECOMMENDER SYSTEM USING
HADOOP
By
Ankit Kumar Das (04710402710)
Pranay Khatri (03010402710)
Shreya Tripathi (05110407210)
Submitted
in partial fulfillment for the award of the degree of
Bachelor of Technology
in
Computer Science & Engineering
Under the Guidance of
Mrs Shaveta Tatwani
Department of Computer Science and Engineering
Amity School of Engineering and Technology
(Affiliated to G.G.S.I.P.University )
New Delhi - 110061
-
7/22/2019 Book Recommender System using Hadoop
2/55
i
CERTIFICATE
This is to certify that the Project Report titled
Book Recommender System using Hadoop
Done by
Ankit Kumar Das (04710402710)
Pranay Khatri (03010402710)
Shreya Tripathi (05110402710)
is an authentic work carried out by them under my guidance at :
AM ITY SCHOOL OF ENGINEERING AND TECHNOLOGY, Bijwasan
The matter embodied in this project report has not been submitted earlier for the award of any
degree or diploma to the best of my knowledge and belief.
Mrs. Shaveta Tatwani Prof. M.N.Gupta
Asst. Professor Head of Department
Dept. of CSE/IT Dept. of CSE/IT
-
7/22/2019 Book Recommender System using Hadoop
3/55
ii
AKNOWLEDGEMENT
First and foremost, our acknowledgements are due to Prof. B.P.Singh, Senior Director and
Prof. Rekha Aggarwal, Director for the education under their guidance that has provided
strong fundamentals, positive competition and unmatched learning experience.
I express our gratitude to our esteemed Professor M.N. Gupta, (Head, Department of CSE &
IT) and our guide Asst. Professor, Mrs Shaveta Tatwani. Their inspiration, motivation,
suggestion and invaluable guidance enabled us to develop the minor project. Their careful
observations and precise ideas were an immense help in refining our content.
Ankit Kumar Das Pranay Khatri
(047/B.TECH(CSE)/ASET/14) (030/B.TECH(CSE)/ASET/14)
Shreya Tripathi
(051/B.TECH(CSE)/ASET/14)
-
7/22/2019 Book Recommender System using Hadoop
4/55
iii
ABSTRACT
Recommender Systems are new generation internet tool that help user in navigating through
information on the internet and receive information related to their preferences. Although
most of the time recommender systems are applied in the area of online shopping and
entertainment domains like movie and music, yet their applicability is being researched upon
in other area as well. This report presents an overview of the Recommender Systems which
are currently working in the domain of online book shopping. This report also proposes a
new book recommender system that combines user choices with not only similar users but
other users as well to give diverse recommendation that change over time. The overall
architecture of the proposed system is presented and its implementation with a prototype
design is described. Lastly, the report presents empirical evaluation of the system based on a
survey reflecting the impact of such diverse recommendations on the user choices.
-
7/22/2019 Book Recommender System using Hadoop
5/55
iv
TABLE OF CONTENTSCertificate ...i
Acknowledgement .....iiAbstract ....iii
Table of Contents .....ivList of Figuresvi
1. Introduction..........1
1.1 Recommender Systems
1.2 Apache Hadoop
2. Recommender Systems2
2.1 Introduction
2.2 Content based Filtering
2.3 Collaborative Filtering
2.3.1 User based Collaborative Filtering
2.3.2 Item based Collaborative Filtering
2.4 Similarity Measures
2.4.1 Cosine based Similarity
2.4.2 Pearson Correlation based Similarity
2.4.3 Cooccurrence based Similarity
2.4.4 Euclidian Distance based Similarity
2.4.5 Tanimoto Coefficient based Similarity
2.4.6 Log Likelihood based Similarity
2.5 Challenges and Issues3. Hadoop Distributed File System.......11
3.1 Overview
3.2 Assumptions and Goals
3.3 Architecture
3.3.1 Namenode
3.3.1.1 Checkpoint Node
3.3.1.2 Backup Node
3.3.2 DataNode
3.3.3 HDFS Client3.4 Data Replication
3.4.1 Replica Placement
3.4.2 Replica Selection
3.4.3 Replica Selection
3.5 Data Organisation
3.5.1 Data Blocks
3.5.3 Staging
3.5.3 Replication and Pipelining
3.6 HDFS Features
3.6.1 Communication Protocols
-
7/22/2019 Book Recommender System using Hadoop
6/55
v
3.6.2 Data Disk Failure, Heartbeats and Re replication
3.6.3 Cluster Rebalancing
3.6.4 Data Integrity
3.6.5 Metadata Disk Failure
3.7 Advantages and Disadvantages4. Hadoop MapReduce...24
4.1 Introduction
4.2 Inputs and Outputs
4.3 User Interfaces
4.3.1 Mapper
4.3.2 Reducer
4.3.3 Partitioner
4.3.4 Reporter
4.3.5 Output Collector
4.4 Job Configuration
4.5 MapReduce Features
5. Item Based Recommendation Algorithm..30
5.1 Algorithm
5.2 Logical Parts of Code
5.2.1 Preparation of Preference Matrix
5.2.2 Generation of Similarity Matrix
5.2.3 Preparation for Matrix Multiplication
5.2.4 Matrix Multiplication
6. System Preparation and Implementation.336.1 Runtime Environment
6.2 Software and Language Versions
6.3 Hardware Specification of each Hadoop Node
6.4 Hadoop Configuration
6.5 Starting the Cluster
6.5.1 Formatting the Namenode
6.5.2 Starting HDFS daemons
6.5.3 Starting mapred daemons
6.5.4 Starting the Job
6.6 Monitoring the Cluster
7. Results and Conclusion40
7.1 Results
7.1.1 Run Time
7.1.2 Recommendations
7.2 Conclusions
8. Future Scope...........46
9. References...........47
-
7/22/2019 Book Recommender System using Hadoop
7/55
vi
LIST OF FIGURES
Figure no. Name Page no.
2.1 User based Collaborative Filtering 5
2.2 Item based Collaborative Filtering 5
2.3 Item-Item Similarity 6
2.4 Tanimoto Coefficient 8
3.1 HDFS Architecture 13
3.2 Reading a File 16
3.3 Writing to a file 17
3.4 Block Replication 18
4.1 Map reduce Framework 25
4.2 Task Environment 28
6.1 Formatting Namenode 36
6.2 HDFS daemons 36
6.3 Mapred daemons 37
6.4 Starting Job 37
6.5 Namenode Interface 37
6.6 JobTracker Interface 38
6.7 Scheduling Interface 1 38
6.8 Scheduling Interface 2 39
7.1 Average Run Time 41
7.2 Similarity Cooccurrence Recommendations 42
7.3 Tanimoto Recommendations 42
7.4 Loglikelihood Recommendations 43
7.5 Euclidian Distance Recommendation 43
7.6 Pearson Coefficient Recommendations 44
-
7/22/2019 Book Recommender System using Hadoop
8/55
vii
LIST OF TABLES
Table no. Name Page no.
2.1 User Ratings 4
7.1 Time Taken 40
-
7/22/2019 Book Recommender System using Hadoop
9/55
1
CHAPTER 1
INTRODUCTION
1.1 Recommender Systems
Recommender systems are widespread tools that are employed by a wide range of
organisations and companies for recommending items such as movies, books and even
employees for projects. But with the advent of big data it has become difficult to process the
large amount of data for recommendations. Due to this reason, Apache Hadoop is employed
for scalability, reliability and faster processing.
Recommender systems (sometimes replacing "system" with a synonym such as platform or
engine) are a subclass of information filtering system that seek to predict the 'rating' or
'preference' that user would give to an item. Recommender systems have become extremely
common in recent years, and are applied in a variety of applications. The most popular ones
are probably movies, music, news, books, research articles, search queries, social tags, and
products in general
1.2 Apache HadoopThe Apache Hadoop software library is a framework that allows for the distributed
processing of large data sets across clusters of computers using simple programming models.
It is designed to scale up from single servers to thousands of machines, each offering local
computation and storage. Rather than rely on hardware to deliver high-availability, the library
itself is designed to detect and handle failures at the application layer, so delivering a highly-
available service on top of a cluster of computers, each of which may be prone to failures [1].
The project includes the following modules:
Hadoop Common: The common utilities that support the other Hadoop modules. Hadoop Distributed File System (HDFS): A distributed file system that provides
high-throughput access to application data.
Hadoop MapReduce: A YARN-based system for parallel processing of large datasets.
-
7/22/2019 Book Recommender System using Hadoop
10/55
2
CHAPTER 2
RECOMMENDER SYSTEMS
2.1 Introduction
Recommender Systems (RSs) are software tools and techniques providing suggestions for
items to be of use to a user. The suggestions relate to various decision-making processes,
such as what items to buy, what music to listen to, what books to read or what online news to
read.
Item is the general term used to denote what the system recommends to users. A RS
normally focuses on a specific type of item (e.g., CDs, or news) and accordingly its design,
its graphical user interface, and the core recommendation technique used to generate the
recommendations are all customized to provide useful and effective suggestions for that
specific type of item. RSs are primarily directed towards individuals who lack sufficient
personal experience or competence to evaluate the potentially overwhelming number of
alternative items that a Web site, for example, may offer. In most cases, people are faced with
choices and very large data volumes, and searching all of them is out of user capability. This
problem is called information overload. [1]
A case in point is a book recommender system that assists users to select a book to read. In
the popular Web site, Amazon.com, the site employs a RS to personalize the online store for
each customer. Since recommendations are usually personalized, different users or user
groups receive diverse suggestions.
In their simplest form, personalized recommendations are offered as ranked lists of items. In
performing this ranking, RSs try to predict what the most suitable products or services are,
based on the users preferences and constraints. In order tocomplete such a computational
task, RSs collect from users their preferences, which are either explicitly expressed, e.g., as
ratings for products, or are inferred by interpreting user actions. For instance, a RS may
consider the navigation to a particular product page as an implicit sign of preference for the
items shown on that page. RSs development initiated from a rather simple observation:
individuals often rely on recommendations provided by others in making routine, daily
decisions. For example it is common to rely on what ones peers recommend when selecting
a book to read; employers count on recommendation letters in their recruiting decisions; and
when selecting a movie to watch, individuals tend to read and rely on the movie reviews that
-
7/22/2019 Book Recommender System using Hadoop
11/55
3
a film critic has written and which appear in the newspaper they read. In seeking to mimic
this behaviour, the first RSs applied algorithms to leverage recommendations produced by a
community of users to deliver recommendations to an active user, i.e., a user looking for
suggestions. The recommendations were for items that similar users (those with similar
tastes) had liked. This approach is termed collaborative-filtering and its rationale is that if the
active user agreed in the past with some users, then the other recommendations coming from
these similar users should be relevant as well and of interest to the active user. As e-
commerce Web sites began to develop, a pressing need emerged for providing
recommendations derived from filtering the whole range of available alternatives.
Users were finding it very difficult to arrive at the most appropriate choices from the
immense variety of items (products and services) that these Web sites were offering. Theexplosive growth and variety of information available on the Web and the rapid introduction
of new e-business services (buying products, product comparison, auction, etc.) frequently
overwhelmed users, leading them to make poor decisions.
The availability of choices, instead of producing a benefit, started to decrease users well-
being. It was understood that while choice is good, more choice is not always better. Indeed,
choice, with its implications of freedom, autonomy, and self-determination can become
excessive, creating a sense that freedom may come to be regarded as a kind of misery-inducing tyranny. RSs have proved in recent years to be a valuable means for coping with the
information overload problem. Ultimately a RS addresses this phenomenon by pointing the
user towards the items in which he/she may be interested.
2.2 Content based Filtering
Content-based recommender systems work with profiles of users that are created at the
beginning. A profile has information about a user and his taste. Generally, when creating a
profile, recommender systems make a survey, to get initial information about a user in order
to avoid the new-user problem. In the recommendation process, the engine compares the
items that were already positively rated by the user with the items he didn t rate and looks for
similarities [2]. Those items that are mostly similar to the positively rated ones, will be
recommended to the user.
2.3 Collaborative Filtering
-
7/22/2019 Book Recommender System using Hadoop
12/55
4
Recommendations can be based on demographics of the users, overall top selling items, or
past buying habit of users as a predictor of future items. Collaborative Filtering (CF) [2] [3]is
the most successful recommendation technique to date. The idea of collaborative filtering is
in finding users in a community that share appreciations. If two users have same or almost
same rated items in common, then they have similar tastes. Such users build a group or a so
called neighbourhood. A user gets recommendations to those items that he/she hasnt rated
before, but that were already positively rated by users in his/her neighbourhood. Table 2.1
shows that all three users rated the books positively and with similar marks. That means that
they have similar taste and build a neighbourhood. The user A hasnt rated the book
ASP.Net, which probably mean that he hasnt watched it yet. As the movie was positively
rated by the other users, he will get this item recommended. As opposed to simpler
recommender systems where recommendations base on the most rated item and the most
popular item methods, collaborative recommender systems care about the taste of user. The
taste is considered to be constant or at least change slowly.
Table 2.1: User Ratings
Collaborative filtering is widely used in e-commerce. Customers can rate books, songs,
movies and then get recommendations regarding those issues in future. Moreover
collaborative filtering is utilized in browsing of certain documents (e.g. documents among
scientific works, articles, and magazines).
2.3.1 User Based Collaborative Filtering
-
7/22/2019 Book Recommender System using Hadoop
13/55
5
In the user-based approach, the users perform the main role. If certain majority of the
customers has the same taste then they join into one group.
Recommendations are given to user based on evaluation of items by other users form the
same group, with whom he/she shares common preferences. If the item was positively rated
by the community, it will be recommended to the user. Thus in the user-based approach the
items that were already rated by the user before play an important role in searching a group
that shares appreciations with him.
Fig. 2.1: User Based Collaborative Filtering
2.3.2 Item Based Collaborative Filtering
The item-based approach looks into the set of items the target user has rated and computes
how similar they are to the target item i and then selects k most similar items {i1, i2, . . . , ik
}. At the same time their corresponding similarities {si1, si2, . . . , sik } are also computed.
Once the most similar items are found, the prediction is then computed by taking a weighted
average of the target users ratingson these similar items [4].
Fig. 2.2: Item based Collaborative Filtering
-
7/22/2019 Book Recommender System using Hadoop
14/55
6
2.4 Similarity Measures
One critical step in the item-based collaborative filtering algorithm is to compute the
similarity between items and then to select the most similar items. The basic idea in similarity
computation between two items i and j is to first isolate the users who have rated both of
these items and then to apply a similarity computation technique to determine the similarity
si,j[5].
Fig. 2.3: Item-Item Similarity
There are different ways to compute the similarity between items. These are cosine-based
similarity, correlation-based similarity and adjusted-cosine similarity.
2.4.1 Cosine based Similarity
In this case, two items are thought of as two vectors in the m dimensional user-space. The
similarity between them is measured by computing the cosine of the angle between these two
vectors. Formally, in the m n ratings matrix, similarity between items i andj , denoted by
sim(i, j ) is given by
where denotes the dot-product of the two vectors.
2.4.2 Pearson Correlation-based Similarity
-
7/22/2019 Book Recommender System using Hadoop
15/55
7
The Pearson correlation of two series is the ratio of their covariance to the product of their
variances. Covariance is a measure of how much two series move together in absolute terms;
its big when the series moves far in the same direction fromtheir means in the same places.
Dividing by the variances merely normalizes for the absolute size of their changes.
The Pearson correlation is a number between 1 and 1 that measures the tendency of two
series of numbers, paired up one-to-one, to move together. That is to say, it measures how
likely a number in one series is to be relatively large when the corresponding number in the
other series is high, and vice versa. It measures the tendency of the numbers to move together
proportionally, such that theres a roughly linear relationshipbetween the values in one series
and the other. When this tendency is high, the correlation is close to 1. When there appears to
be little relationship at all, the value is near 0. When there appears to be an opposingrelationshipone series numbersare high exactly when the other series numbers are low
the value is near1.
In this case, similarity between two items i and j is measured by computing the Pearson-r
correlation corr i, j. To make the correlation computation accurate we must first isolate the
co-rated cases (i.e., cases where the users rated both i and j). Let the set of users who both
rated i andj are denoted by U then the correlation similarity is given by
2.4.3 Co-occurrence-based Similarity
Instead of computing thesimilaritybetween every pair of items, in this metric, the algorithm
will compute the number of times each pair of items occurs together in some users list of
preferences, in order to fill out the matrix. Co-occurrence is like similarity; the more two
items turn up together, the more related or similar they probably are.
Producing the matrix is a simple matter of counting. The entries in the matrix arent affected
by preference values. These values will enter the computation later
2.4.4 Euclidian Distance based Similarity
This implementation is based on the distancebetween users. The users are seen as points in a
space of many dimensions (as many dimensions as there are items), whose coordinates are
-
7/22/2019 Book Recommender System using Hadoop
16/55
8
preference values. This similarity metric computes the Euclidean distance d between two
such user points. This value alone doesnt constitute a valid similarity metric, because larger
values would mean more-distant, and therefore less similar, users. The value should be
smaller when users are more similar. Therefore, the implementation actually returns 1 / (1+ d).
This similarity metric never returns a negative value, but larger values still mean more
similarity.
2.4.5 Tanimoto Coefficient based Similarity
TanimotoCoefficientSimilarity is an implementation which does not take into account the
preference values specified for the users. It is based on (surprise) the Tanimoto coefficient.
This value is also known as the Jaccard coefficient. Its the number of items that twousers
express some preference for, divided by the number of items that either user expresses some
preference for. In other words, its the ratio of the size of the intersection to the size of the
union of their preferred items. It has the required properties: when two users items
completely overlap, the result is 1.0. When they have nothing in common, its 0.0. The value
is never negative, but thats OK. Its possible to expand the results into the range1 to 1 with
some simple math:similarity = 2 similarity 1. It wont matter to the framework.
Fig. 2.4: Tanimoto Coefficient
2.4.6 Log likelihood based Similarity
Log-likelihoodbased similarity is similar to the Tanimoto coefficientbased similarity,
though its more difficult to understand intuitively. Its another metric that doesnt take
account of individual preference values. Like the Tanimoto coefficient, its based on the
-
7/22/2019 Book Recommender System using Hadoop
17/55
9
number of items in common between two users, but its value is more an expression of how
unlikely it is for two users to have so much overlap, given the total number of items out there
and the number of items each user has a preference for.
It is a probability based distance. The distance between two clusters is related to the decrease
in log-likelihood as they are combined into one cluster. In calculating log-likelihood, normal
distributions for continuous variables and multinomial distributions for categorical variables
are assumed. It is also assumed that the variables are independent of each other, and so are
the cases. The distance between clustersjandsis defined as:
Where
2.5 Challenges and Issues
Cold StartIts difficult to give recommendations to new users as his profile is almost empty and
he hasnt rated any items yet so his taste is unknown to the system. This is called the
coldstart problem[6]
. In some recommender systems this problem is solved with
survey when creating a profile. Items can also have a cold-start when they are new in
the system and havent been rated before. Both of these problems can be also solved
with hybrid approaches.
TrustThe voices of people with a short history may not be that relevant as the voices of
those who have rich history in their profiles. The issue of trust arises towards
evaluations of a certain customer. The problem could be solved by distribution ofpriorities to the users.
-
7/22/2019 Book Recommender System using Hadoop
18/55
10
ScalabilityWith the growth of numbers of users and items, the system needs more resources for
processing information and forming recommendations. Majority of resources is
consumed with the purpose of determining users with similar tastes, and goods with
similar descriptions. This problem is also solved by the combination of various types
of filters and physical improvement of systems. Parts of numerous computations may
also be implemented offline in order to accelerate issuance of recommendations
online.
SparsityIn online shops that have a huge amount of users and items there are almost always
users that have rated just a few items. Using collaborative and other approaches
recommender systems generally create neighbourhoods of users using their profiles. If
a user has evaluated just few items then its pretty difficult to determine his taste and
he/she could be related to the wrong neighbourhood. Sparsity is the problem of lack
of information.
PrivacyPrivacy has been the most important problem. In order to receive the most accurate
and correct recommendation, the system must acquire the most amount of information
possible about the user, including demographic data, and data about the location of a
particular user. Naturally, the question of reliability, security and confidentiality of the
given information arises. Many online shops offer effective protection of privacy of
the users by utilizing specialized algorithms and programs.
-
7/22/2019 Book Recommender System using Hadoop
19/55
11
CHAPTER 3
HADOOP DISTRIBUTED FILE SYSTEM
3.1 Overview
Hadoop comes with a distributed filesystem called HDFS (Hadoop Distributed Filesystem).
HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS
provides high throughput access to application data and is suitable for applications that have
large data sets [7].
The Hadoop filesystem is designed for storing petabytes of a file with streaming data access
using the idea that most efficient data processing pattern is a write-once, readmany- times
pattern. HDFS stores metadata on a dedicated server, called NameNode. Application data are
stored on other servers called DataNodes. All the servers are fully connected and
communicate with each other using TCP-based protocols.
3.2 Assumptions and Goals
Hardware FailureHardware failure is the norm rather than the exception. An HDFS instance may consist of
hundreds or thousands of server machines, each storing part of the file systems data. The
fact that there are a large number of components and that each component has a non-trivial
probability of failure means that some component of HDFS is always non-functional.
Therefore, detection of faults and quick, automatic recovery from them is a core
architectural goal of HDFS.
Streaming Data AccessApplications that run on HDFS need streaming access to their data sets. They are notgeneral purpose applications that typically run on general purpose file systems. HDFS is
designed more for batch processing rather than interactive use by users. The emphasis is
on high throughput of data access rather than low latency of data access. POSIX imposes
many hard requirements that are not needed for applications that are targeted for HDFS.
POSIX semantics in a few key areas has been traded to increase data throughput rates.
Large Data Sets
-
7/22/2019 Book Recommender System using Hadoop
20/55
12
Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to
terabytes in size. Thus, HDFS is tuned to support large files. It should provide high
aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should
support tens of millions of files in a single instance.
Simple Coherency ModelHDFS applications need a write-once-read-many access model for files. A file once
created, written, and closed need not be changed. This assumption simplifies data
coherency issues and enables high throughput data access. A MapReduce application or a
web crawler application fits perfectly with this model. There is a plan to support
appending-writes to files in the future.
Moving Computation is Cheaper than Moving DataA computation requested by an application is much more efficient if it is executed near the
data it operates on. This is especially true when the size of the data set is huge. This
minimizes network congestion and increases the overall throughput of the system. The
assumption is that it is often better to migrate the computation closer to where the data is
located rather than moving the data to where the application is running. HDFS provides
interfaces for applications to move themselves closer to where the data is located.
Portability Across Heterogeneous Hardware and Software PlatformsHDFS has been designed to be easily portable from one platform to another. This
facilitates widespread adoption of HDFS as a platform of choice for a large set of
applications.
3.3 Architecture
HDFS is based on master/slave architecture. A HDFS cluster consists of a single NameNode(as master) and a number of DataNodes (as slaves). The NameNode and DataNodes are
pieces of software designed to run on commodity machines. These machines typically run a
GNU/Linux operating system (OS). The usage of the highly portable Java language means
that HDFS can be deployed on a wide range of machines. A typical deployment has a
dedicated machine that runs only the NameNode software. Each of the other machines in the
cluster runs one instance of the DataNodes software. The architecture does not preclude
running multiple DataNodes on the same machine but in a real deployment one machine
usually runs one DataNode.
-
7/22/2019 Book Recommender System using Hadoop
21/55
13
The existence of a single NameNode in a cluster greatly simplifies the architecture of the
system. The NameNode is the arbitrator and repository for all HDFS metadata. The system is
designed in such a way that user data never flows through the NameNode.
3.3.1 Namenode
NameNode manages the filesystem namespace, metadata for all the files and directories in
the tree.
Fig. 3.1: HDFS Architecture
The file is divided into large blocks (typically 148 megabytes, but the user selectable file-by-
file) and each block is independently replicated at multiple DataNodes (typically three, but
user selectable file-by-file) to provide reliability. The NameNode maintains and stores the
namespace tree and the mapping of file blocks to DataNodes persistently on the local disk in
the form of two files: the namespace image and the edit log. The NameNode also knows the
DataNodes on which all the blocks for a given file are located. However, it does not store
block locations persistently, since this information is reconstructed from DataNodes when the
system starts.
On the NameNode failure, the filesystem becomes inaccessible because only NameNodeknows how to reconstruct the files from the blocks on the DataNodes. So, for this reason, it is
-
7/22/2019 Book Recommender System using Hadoop
22/55
14
important to make the NameNode resilient to failure, and Hadoop provides two mechanisms
for this: Checkpoint Node and Backup Node.
3.3.1.1 Checkpoint Node
Checkpoint is an image record written persistently to disk. NameNode uses two types of files
to persist its namespace:
- Fsimage: the latest checkpoint of the namespace
- Edits: logs containing changes to the namespace; these logs are also called journals.
NameNode creates an updated file system metadata by merging both files i.e. fsimage and
edits on restart. The NameNode then overwrites fsimage with the new HDFS state and begins
a new edits journal.
The Checkpoint node periodically downloads the latest fsimage and edits from the active
NameNode to create checkpoints by merging them locally and then to upload new
checkpoints back to the active NameNode. This requires the same memory space as that of
NameNode and so checkpoint needs to be run on separate machine. Namespace information
lost if either the checkpoint or the journal is missing, so it is highly recommended to
configure HDFS to store the checkpoint and journal in multiple storage directories.
The Checkpoint node uses parameter fs.checkpoint.period to check the interval between two
consecutive checkpoints. The Interval time is in seconds (default is 4600 second). The Edit
log file size is specified by parameter fs.checkpoint.size (default size 64MB) and a
checkpoint triggers if size exceeds. Multiple checkpoint nodes may be specified in the cluster
configuration file.
3.3.1.2 Backup Node
The Backup node has the same functionality as the Checkpoint node. In addition, it maintainsan in-memory, up-to-date copy of the file system namespace that is always synchronized with
the active NameNode state. Along with accepting a journal stream of the filesystem edits
from the NameNode and persisting this to disk, the Backup node also applies those edits into
its own copy of the namespace in memory, thus creating a backup of the namespace.
Unlike the Checkpoint node, the Backup node has an up-to-date state of the namespace state
in memory. The Backup node requires same RAM as of NameNode. The NameNode
supports one Backup node at a time. No Checkpoint nodes may be registered if a Backup
-
7/22/2019 Book Recommender System using Hadoop
23/55
15
node is in use. The Backup node takes care of the namespace data persistence and NameNode
does not need to have persistent store.
3.3.2 DataNodes
There are a number of DataNodes, usually one per node in the cluster, which manage storage
attached to the nodes. HDFS exposes a file system namespace and allows user data to be
stored in files. Internally, a file is split into one or more blocks and these blocks are stored in
a set of DataNodes[7]. The NameNode executes the file system namespace operations such as
opening, closing, and renaming files and directories. It also determines the mapping of blocks
to DataNodes.
DataNodes store and retrieve blocks when requested (by clients or the NameNode), and they
report back to the NameNode periodically with lists of blocks they are storing. The
DataNodes are responsible for serving read and write requests from the file systems clients.
The DataNodes also perform block creation, deletion, and replication upon instruction from
the NameNode. DataNodes and NameNode connections are established by handshake where
namespace ID and the software version of the DataNodes are verified. The namespace ID is
assigned to the file system instance when it is formatted. The namespace ID is stored
persistently on all nodes of the cluster. A different namespace ID node cannot join the cluster.
A new DataNode without any namespace ID can join the cluster and receive the clusters
namespace ID and DataNode registers with the NameNode with storage ID. A DataNode
identifies block replicas in its possession to the NameNode by sending a block report. A
block report contains the block id, the generation stamp and the length for each block replica
the server hosts. The first block report is sent immediately after the DataNodes registrations.
Subsequent block reports are sent every hour and provide the NameNode with an up-to date
view of where block replicas are located on the cluster.
3.3.3 HDFS Client
Reading a file
To read a file, HDFS client first contacts NameNode. It returns list of addresses of the
DataNodes that have a copy of the blocks of the file. Then client connects to the closest
DataNodes directly for each block and requests the transfer of the desired block. Figure 7
shows the main sequence of events involved in reading data from HDFS.
-
7/22/2019 Book Recommender System using Hadoop
24/55
16
Fig. 3.2: Reading a file
Writing to a File
For writing to a file, HDFS client first creates an empty file without any blocks. File
creation is only possible when the client has writing permission and a new file does
not exist in the system. NameNode records new file creation and allocates data blocks
to list of suitable DataNodes to host replicas of the first block of the file. Replication
of data makes DataNodes in pipeline. When the first block is filled, new DataNodes
are requested to host replicas of the next block. A new pipeline is organized, and the
client sends the further bytes of the file. Each choice of DataNodes is likely to be
different.
If a DataNode in pipeline fails while writing the data then pipeline is first closed and
partial block on failed data node is deleted and failed DataNode is removed from the
pipeline. New DataNodes in the pipeline are chosen to write remaining blocks of data.
-
7/22/2019 Book Recommender System using Hadoop
25/55
17
Fig. 3.3: Writing to a file
3.4 Data Replication
HDFS is designed to reliably store very large files across machines in a large cluster. It stores
each file as a sequence of blocks; all blocks in a file except the last block are the same size.
The blocks of a file are replicated for fault tolerance. The block size and replication factor are
configurable per file. An application can specify the number of replicas of a file. The
replication factor can be specified at file creation time and can be changed later. Files in
HDFS are write-once and have strictly one writer at any time.
The NameNode makes all decisions regarding replication of blocks. It periodically receives a
Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of aHeartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of
all blocks on a DataNode.
-
7/22/2019 Book Recommender System using Hadoop
26/55
18
Fig 3.4: Block Replication
3.4.1 Replica Placement
The placement of replicas is critical to HDFS reliability and performance. Optimizing replica
placement distinguishes HDFS from most other distributed file systems. This is a feature that
needs lots of tuning and experience. The purpose of a rack-aware replica placement policy is
to improve data reliability, availability, and network bandwidth utilization. The current
implementation for the replica placement policy is a first effort in this direction. The short-
term goals of implementing this policy are to validate it on production systems, learn moreabout its behavior, and build a foundation to test and research more sophisticated policies.
Large HDFS instances run on a cluster of computers that commonly spread across many
racks. Communication between two nodes in different racks has to go through switches. In
most cases, network bandwidth between machines in the same rack is greater than network
bandwidth between machines in different racks.
The NameNode determines the rack id each DataNode belongs to via the process outlined
in Hadoop Rack Awareness. A simple but non-optimal policy is to place replicas on unique
-
7/22/2019 Book Recommender System using Hadoop
27/55
19
racks. This prevents losing data when an entire rack fails and allows use of bandwidth from
multiple racks when reading data. This policy evenly distributes replicas in the cluster which
makes it easy to balance load on component failure. However, this policy increases the cost
of writes because a write needs to transfer blocks to multiple racks.
For the common case, when the replication factor is three, HDFSs placement policy is to put
one replica on one node in the local rack, another on a node in a different (remote) rack, and
the last on a different node in the same remote rack. This policy cuts the inter-rack write
traffic which generally improves write performance. The chance of rack failure is far less
than that of node failure; this policy does not impact data reliability and availability
guarantees. However, it does reduce the aggregate network bandwidth used when reading
data since a block is placed in only two unique racks rather than three. With this policy, thereplicas of a file do not evenly distribute across the racks. One third of replicas are on one
node, two thirds of replicas are on one rack, and the other third are evenly distributed across
the remaining racks. This policy improves write performance without compromising data
reliability or read performance.
3.4.2 Replica Selection
To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read
request from a replica that is closest to the reader. If there exists a replica on the same rack as
the reader node, then that replica is preferred to satisfy the read request. If angg/ HDFS
cluster spans multiple data centers, then a replica that is resident in the local data center is
preferred over any remote replica.
3.4.3 Safemode
On startup, the NameNode enters a special state called Safemode. Replication of data blocks
does not occur when the NameNode is in the Safemode state. The NameNode receives
Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of
data blocks that a DataNode is hosting. Each block has a specified minimum number of
replicas. A block is considered safely replicated when the minimum number of replicas of
that data block has checked in with the NameNode. After a configurable percentage of safely
replicated data blocks checks in with the NameNode (plus an additional 40 seconds), the
NameNode exits the Safemode state. It then determines the list of data blocks (if any) that
still have fewer than the specified number of replicas. The NameNode then replicates these
blocks to other DataNodes.
-
7/22/2019 Book Recommender System using Hadoop
28/55
20
3.5 Data Organisation
3.5.1 Data Blocks
HDFS is designed to support very large files. Applications that are compatible with HDFS
are those that deal with large data sets. These applications write their data only once but they
read it one or more times and require these reads to be satisfied at streaming speeds. HDFS
supports write-once-read-many semantics on files. A typical block size used by HDFS is 64
MB. Thus, an HDFS file is chopped up into 64 MB chunks, and if possible, each chunk will
reside on a different DataNode.
3.5.2 Staging
A client request to create a file does not reach the NameNode immediately. In fact, initiallythe HDFS client caches the file data into a temporary local file. Application writes are
transparently redirected to this temporary local file. When the local file accumulates data
worth over one HDFS block size, the client contacts the NameNode. The NameNode inserts
the file name into the file system hierarchy and allocates a data block for it. The NameNode
responds to the client request with the identity of the DataNode and the destination data
block. Then the client flushes the block of data from the local temporary file to the specified
DataNode. When a file is closed, the remaining un-flushed data in the temporary local file is
transferred to the DataNode. The client then tells the NameNode that the file is closed. At this
point, the NameNode commits the file creation operation into a persistent store. If the
NameNode dies before the file is closed, the file is lost.
The above approach has been adopted after careful consideration of target applications that
run on HDFS. These applications need streaming writes to files. If a client writes to a remote
file directly without any client side buffering, the network speed and the congestion in the
network impacts throughput considerably. This approach is not without precedent. Earlierdistributed file systems, e.g. AFS, have used client side caching to improve performance. A
POSIX requirement has been relaxed to achieve higher performance of data uploads.
3.5.3 Replication Pipelining
When a client is writing data to an HDFS file, its data is first written to a local file as
explained in the previous section. Suppose the HDFS file has a replication factor of three.
When the local file accumulates a full block of user data, the client retrieves a list of
DataNodes from the NameNode. This list contains the DataNodes that will host a replica of
-
7/22/2019 Book Recommender System using Hadoop
29/55
21
that block. The client then flushes the data block to the first DataNode. The first DataNode
starts receiving the data in small portions (4 KB), writes each portion to its local repository
and transfers that portion to the second DataNode in the list. The second DataNode, in turn
starts receiving each portion of the data block, writes that portion to its repository and then
flushes that portion to the third DataNode. Finally, the third DataNode writes the data to its
local repository. Thus, a DataNode can be receiving data from the previous one in the
pipeline and at the same time forwarding data to the next one in the pipeline. Thus, the data is
pipelined from one DataNode to the next.
3.6 HDFS Features
3.6.1 Communication Protocols
All HDFS communication protocols are layered on top of the TCP/IP protocol. A client
establishes a connection to a configurable TCP port on the NameNode machine. It talks the
ClientProtocol with the NameNode. The DataNodes talk to the NameNode using the
DataNodes Protocol. A Remote Procedure Call (RPC) abstraction wraps both the Client
Protocol and the DataNodes Protocol. By design, the NameNode never initiates any RPCs.
Instead, it only responds to RPC requests issued by DataNodes or clients.
3.6.2 Data Disk Failure, Heartbeats and Re-Replication
The primary objective of the HDFS is to store data reliably even in the presence of failures.
The three common types of failures are NameNode failures, DataNodes failures and network
partitions.
NameNode considers DataNodes as alive as long as it receives Heartbeat message (default
Heartbeat interval is three seconds) from DataNodes. If the NameNode does not receive a
heartbeat from a DataNodes in ten minutes the NameNode considers the DataNodes as dead
and stop forwarding IO request to it. The NameNode then schedules the creation of new
replicas of those blocks on other DataNodes.
Heartbeats carry information about total storage capacity, fraction of storage in use, and the
number of data transfers currently in progress. These statistics are used for the NameNodes
space allocation and load balancing decisions. The NameNode can process thousands of
heartbeats per second without affecting other NameNode operations.
3.6.3 Cluster Rebalancing
-
7/22/2019 Book Recommender System using Hadoop
30/55
22
The HDFS architecture has data rebalancing schemes in which data is automatically moved
from one DataNode to another if the free space threshold is reached. In the event of a sudden
high demand for a particular file, a scheme might dynamically create additional replicas and
rebalance other data in the cluster. These types of data rebalancing schemes are not yet
implemented.
3.6.4 Data Integrity
Block of data can be corrupted due to many reasons such as network faults, buggy software
or faults in a storage device. So, at the time of file creation checksum is used and stored for
each block. While retrieving a file, it is first verified with those checksums and if verification
fails, then another replica of data is used.
3.6.5 Metadata Disk Failure
Corrupted Fsimage and the EditLog may stop the HDFS functioning. For redundancy,
NameNode is configured to have multiple copies of these files and are updated
synchronously.
3.7 Advantages and Disadvantages
Advantages of the HDFS are:
Reliable storage - HDFS is a fault tolerant storage system. HDFS can significantlystore huge amounts of data, scale up incrementally and can effectively handle the
failure of significant parts of the storage infrastructure without losing data.
Commodity hardware - HDFS is designed to run on highly unreliable hardware andso is less expensive compared to other fault tolerant storage systems.
Distributed - HDFS data are distributed over many nodes in a cluster and so parallelanalyses are possible and this eliminates the bottlenecks imposed by monolithic
storage systems.
Availability - Block replication is one of the main features of HDFS. By default eachblock is replicated by the client to three DataNodes but replication factor can be
configured more than 4 at creation time. Because of replication HDFS provides high
availability of data in high demand.
Limitations in HDFS are:
-
7/22/2019 Book Recommender System using Hadoop
31/55
23
Architectural bottlenecks - There are scheduling delays in the Hadoop architecturethat result in cluster nodes waiting for new tasks. More over disk is not used in a
streaming manner, the access pattern is periodic. HDFS client serializes computation
and I/O instead of decoupling and pipelining those operations.
Portability limitations - HDFS being in Java could not able to support someperformance-enhancing features in the native filesystem.
Small file - HDFS is not efficient for large numbers of small files. Single MasterNodeThere might be risk of data loss because of single NameNode. Latency data accessAt the expense of latency HDFS delivers a high throughput of
data. If an application needs lowlatency access to data then HDFS is not a good
choice.
-
7/22/2019 Book Recommender System using Hadoop
32/55
24
CHAPTER 4
HADOOP MAPREDUCE
4.1 Introduction
Hadoop MapReduce is a software framework for easily writing applications which process
vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of
nodes) of commodity hardware in a reliable, fault-tolerant manner[8].
A MapReduce job usually splits the input data-set into independent chunks which are
processed by the map tasks in a completely parallel manner. The framework sorts the outputs
of the maps, which are then input to the reduce tasks. Typically both the input and the output
of the job are stored in a file-system. The framework takes care of scheduling tasks,
monitoring them and re-executes the failed tasks.
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce
framework and the Hadoop Distributed File System are running on the same set of nodes.
This configuration allows the framework to effectively schedule tasks on the nodes where
data is already present, resulting in very high aggregate bandwidth across the cluster.
The MapReduce framework consists of a single master JobTracker and oneslave TaskTracker per cluster-node. The master is responsible for scheduling the jobs'
component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves
execute the tasks as directed by the master.
Minimally, applications specify the input/output locations and
supply map and reduce functions via implementations of appropriate interfaces and/or
abstract-classes. These, and other job parameters, comprise the job configuration. The
Hadoop job client then submits the job (jar/executable etc.) and configuration to
the JobTrackerwhich then assumes the responsibility of distributing the
software/configuration to the slaves, scheduling tasks and monitoring them, providing status
and diagnostic information to the job-client.
4.2 Inputs and Outputs
The MapReduce framework operates exclusively on pairs, that is, the
framework views the input to the job as a set of pairs and produces a set
of pairs as the output of the job, conceivably of different types.
-
7/22/2019 Book Recommender System using Hadoop
33/55
25
The key and value classes have to be serializable by the framework and hence need to
implement the Writable interface. Additionally, the key classes have to implement
the WritableComparable interface to facilitate sorting by the framework.
Input and Output types of a MapReduce job:
(input) -> map-> -> combine-> -> reduce-> (output)
Fig. 4.1: Map Reduce Framework
4.3 User Interfaces
4.3.1 Mapper
Mapper maps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks that transform input records into intermediate records. The
transformed intermediate records do not need to be of the same type as the input records. A
given input pair may map to zero or many output pairs.
4.3.2 Reducer
Reducer reduces a set of intermediate values which share a key to a smaller set of values.
-
7/22/2019 Book Recommender System using Hadoop
34/55
26
The number of reduces for the job is set by the user via JobConf.setNumReduceTasks(int).
Overall, Reducer implementations are passed the JobConf for the job via
the JobConfigurable.configure(JobConf) method and can override it to initialize themselves.
The framework then calls reduce( WritableComparable,Iterator, OutputCollector,
reporter) method for each pair in the grouped inputs. Applications can
then override the Closeable.close() method to perform any required cleanup.
Reducer has 4 primary phases: shuffle, sort and reduce.
Shuffle
Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches
the relevant partition of the output of all the mappers, via HTTP.
Sort
The framework groups Reducer inputs by keys (since different mappers may have output the
same key) in this stage.
The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they
are merged.
Secondary Sort
If equivalence rules for grouping the intermediate keys are required to be different from those
for grouping keys before reduction, then one may specify a Comparator via
JobConf.setOutputValueGroupingComparator(Class).
Since JobConf.setOutputKeyComparatorClass(Class) can be used to control how
intermediate keys are grouped, these can be used in conjunction to simulate secondary sort
on values.
Reduce
In this phase the reduce(WritableComparable, Iterator, OutputCollector, Reporter) method is
called for each pair in the grouped inputs.
The output of the reduce task is typically written to
the FileSystem via OutputCollector.collect(WritableComparable, Writable).
Applications can use the Reporter to report progress, set application-level status messages
and update Counters, or just indicate that they are alive.
-
7/22/2019 Book Recommender System using Hadoop
35/55
27
The output of the Reducer is not sorted.
4.3.3 Partitioner
Partitioner partitions the key space.
Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or
a subset of the key) is used to derive the partition, typically by a hash function. The total
number of partitions is the same as the number of reduce tasks for the job. Hence this controls
which of the m reduce tasks the intermediate key (and hence the record) is sent to for
reduction.
4.3.4 Reporter
Reporter is a facility for MapReduce applications to report progress, set application-level
status messages and update Counters.
Mapper and Reducer implementations can use the Reporter to report progress or just indicate
that they are alive. In scenarios where the application takes a significant amount of time to
process individual key/value pairs, this is crucial since the framework might assume that the
task has timed-out and kill that task. Another way to avoid this is to set the configuration
parameter mapred.task.timeout to a high-enough value (or even set it tozero for no time-
outs).
4.3.5 Output Collector
OutputCollector is a generalization of the facility provided by the MapReduce framework to
collect data output by the Mapper or the Reducer (either the intermediate outputs or the
output of the job).
4.4 Job Configuration
JobConf represents a MapReduce job configuration.
JobConf is the primary interface for a user to describe a MapReduce job to the Hadoop
framework for execution. The framework tries to faithfully execute the job as described
by JobConf, however, while some job parameters are straight-forward to set
(e.g. setNumReduceTasks(int)), other parameters interact subtly with the rest of the
framework and/or job configuration and are more complex to set
(e.g. setNumMapTasks(int)).
-
7/22/2019 Book Recommender System using Hadoop
36/55
28
JobConf is typically used to specify the Mapper, combiner (if
any), Partitioner, Reducer, InputFormat, OutputFormat and OutputCommitter implementation
s. JobConf also indicates the set of input files (setInputPaths(JobConf, Path...) / addInputPath
(JobConf,Path)) and (setInputPaths (JobConf, String) / addInputPaths(JobConf,String)) and
where the output files should be written (setOutputPath (Path)).
Fig 4.2: Task Environment
4.5 MapReduce Features
CountersCounters represent global counters, defined either by the Map/Reduce framework or
applications. Each Counter can be of any Enum type. Counters of a particular Enum are
bunched into groups of type Counters.Group.
DistributedCacheDistributedCache distributes application-specific, large, read-only files efficiently.
-
7/22/2019 Book Recommender System using Hadoop
37/55
29
DistributedCache is a facility provided by the Map/Reduce framework to cache files (text,
archives, jars and so on) needed by applications.
Applications specify the files to be cached via urls (hdfs://) in the JobConf.
The DistributedCache assumes that the files specified via hdfs:// urls are already present
on the FileSystem.
The framework will copy the necessary files to the slave node before any tasks for the job
are executed on that node. Its efficiency stems from the fact that the files are only copied
once per job and the ability to cache archives which are un-archived on the slaves.
DistributedCache tracks the modification timestamps of the cached files. Clearly the cache
files should not be modified by the application or externally while the job is executing.
DistributedCache can be used to distribute simple, read-only data/text files and more
complex types such as archives and jars. Archives (zip, tar, tgz and tar.gz files) are un-
archivedat the slave nodes. Files have execution permissionsset.
ToolThe Tool interface supports the handling of generic Hadoop command-line options.
Tool is the standard for any Map/Reduce tool or application. The application should
delegate the handling of standard command-line options
to GenericOptionsParser viaToolRunner.run(Tool, String[]) and only handle its custom
arguments.
IsolationRunnerIsolationRunner is a utility to help debug Map/Reduce programs.
To use the IsolationRunner, first set keep.failed.tasks.files to true
ProfilingProfiling is a utility to get a representative (2 or 4) sample of built-in java profiler for a
sample of maps and reduces.
User can specify whether the system should collect profiler information for some of the
tasks in the job by setting the configuration property mapred.task.profile. The value can be
set using the api JobConf.setProfileEnabled(boolean). If the value is set true, the task
profiling is enabled. The profiler information is stored in the user log directory. By default,profiling is not enabled for the job.
-
7/22/2019 Book Recommender System using Hadoop
38/55
30
DebuggingMap/Reduce framework provides a facility to run user-provided scripts for debugging.
When map/reduce task fails, user can run script for doing post-processing on task logs i.e
task's stdout, stderr, syslog and jobconf. The stdout and stderr of the user-provided debug
script are printed on the diagnostics. These outputs are also displayed on job UI on
demand.
In the following sections we discuss how to submit debug script along with the job. For
submitting debug script, first it has to distributed. Then the script has to supplied in
Configuration.
JobControlJobControl is a utility which encapsulates a set of Map/Reduce jobs and their
dependencies.
Data CompressionHadoop Map/Reduce provides facilities for the application-writer to specify compression
for both intermediate map-outputs and the job-outputs i.e. output of the reduces. It also
comes bundled with CompressionCodec implementations for the zlib and lzo compression
algorithms.
Intermediate OutputsApplications can control compression of intermediate map-outputs via
the JobConf.setCompressMapOutput(boolean) api and the CompressionCodec to be used
via theJobConf.setMapOutputCompressorClass(Class) api.
Job OutputsApplications can control compression of job-outputs via
the FileOutputFormat.setCompressOutput(JobConf, boolean) api and
the CompressionCodec to be used can be specified via
the FileOutputFormat.setOutputCompressorClass(JobConf, Class) api.
-
7/22/2019 Book Recommender System using Hadoop
39/55
31
CHAPTER 5
ITEM BASED RECOMMENDER ALGORITHM
5.1 Algorithm
Given a set of items that someone is known to like, recommend a new set of items that are
similar to them. Two items can be regarded as similar to one another if different people like
both of them. In terms of pseudo code [9]:
for every item ithat has uhas no preference for yet
for every itemjthat uhas a preference for
compute a similaritysbetween iandj
add us preference forj, weighted bys, to a running average
return top items, ranked by weighted average
5.2 Logical Parts of Code
The code is structured so that it breaks the recommender into four logical parts:
1. Prepare the preference matrix2. Generate the similarity matrix3. Prepare for matrix multiplication4. Multiply matrices
5.2.1 Preparation of preference matrix
In this step, three mapreduce jobs are executed
1. ItemIDIndexMapper and ItemIDIndexReducer maps item ids in the input datafrom Long types to Integer types.
2. ToItemPrefsMapper and ToUserVectorsReducergenerates the user-item vectors.
3. ToItemVectorsMapper and ToItemVectorsReducergenerates the item-user vectors.
-
7/22/2019 Book Recommender System using Hadoop
40/55
32
5.2.2 Generation of similarity matrix
In this step, three mapreduce jobs are executed
1. VectorNormMapper and MergeVectorsReducer reads in the item-user vectors andoutputs them as user-item vectors, calculating statistical matrix properties in the
process. These statistics are later employed in matrix calculations.
2. CooccurrencesMapper and SimilarityReducer reads in the user-item vectors fromthe previous step and creates half of an item-item similarity matrix. Only half of the
matrix is generated at this stage as it is symmetric along its diagonal, so the other half
can be inferred. The matrix also lacks values along its diagonal.
3. UnsymmetrifyMapper and MergeToTopKSimilaritiesReducer reads half of theitem-item similarity matrix and generates an almost complete one, missing only the
values along its diagonal.
5.2.3 Preparation for matrix multiplication
With both the item-item similarity matrix and the user-item vectors, its now possible to
multiply them together and generate recommendations for users. In this part of the code, the
matrix multiplication begins, again, by implementing three MapReduce jobs.
1. SimilarityMatrixRowWrapperMapper and default Reducer completes the item-itemsimilarity matrix by adding in Double.NaNvalues along its diagonal. In preparation
for the item-item matrix being multiplied with the item-user vectors, it converts the
matrix from VectorWritable to VectorOrPrefWritable.
-
7/22/2019 Book Recommender System using Hadoop
41/55
33
2. UserVectorSplitterMapper and Reducerreads in user-item vectors and outputs themas item-user vectors, formatted as VectorOrPrefWritable, to make them compatible
for multiplication with the item-item matrix.
3. Default Mapper and ToVectorAndPrefReducer combines the output from theprevious two steps, thus starting the multiplication of the item-item similarity matrix
with the item-user vectors.
5.2.4 Matrix multiplication
Now that the item-item similarity matrix and the item-user vectors have been combined
together into a single source of data, the matrix multiplication can be completed and userrecommendations generated. This process is implemented in a single MapReduce job.
1. PartialMultiplyMapper and AggregateAndRecommendReducer completes thematrix multiplication, producing item recommendations for users.
-
7/22/2019 Book Recommender System using Hadoop
42/55
34
CHAPTER 6
SYSTEM PREPARATIONAND IMPLEMENTATION
6.1 Runtime Environment
Hadoop engine was deployed over the 2 node cluster and Java runtime was setup to use the
common Hadoop configuration, as specified by the NameNode (master node) in the cluster.
6.2 Software and Language Versions
Hadoop 1.2.1 Java 1.6 Maven 3 Mahout 0.20
6.3 Hardware Specification of each Hadoop Node
Hadoop clusters have identical hardware specifications for all the cluster nodes. Table lists
the specification of nodes.
Operating System Ubuntu 12.04 LTS (64 bit)
Processor Intel Core i3 (Quad Core)
Memory 3GB
Disk Space 160GB
Table 6.1 : Hardware Specification
6.4 Hadoop Configuration
/etc/hosts
The /etc/hosts file in each machine contains the IP address of the nodes and their labels. In
this case the labels assigned were master and slave.
hadoop-env.sh
The only required environment variable that has to be configured for Hadoop is
JAVA_HOME
core-site.xml
-
7/22/2019 Book Recommender System using Hadoop
43/55
35
The following configurations are added
hadoop.tmp.dir
/app/hadoop/tmp
A base for other temporary directories.
fs.default.name
hdfs://master:54410
The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri's scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri's authority is used to
determine the host, port, etc. for a filesystem.
mapred-site.xml
The following configurations are added
mapred.job.tracker
localhost:54411
The host and port that the MapReduce job tracker runs
at. If "local", then jobs are run in-process as a single map
and reduce task.
hdfs-site.xml
The following configurations are added
dfs.replication
1
Default block replication.
The actual number of replications can be specified when the file is
created.
The default is used if replication is not specified in create time.
masters
This file contains the label of the master. In this case- master is added to the file.
slaves
-
7/22/2019 Book Recommender System using Hadoop
44/55
36
This file contains the labels of the slaves. In this case- slave is added to the file.
6.5 Starting the cluster
Starting the cluster is performed in three steps
6.5.1 Formatting the Namenode
This initialises the namespace and the namenode itself.
Fig. 6.1: Formatting namenode
6.5.2 Starting HDFS daemons
The NameNode daemon is started on master, and DataNode daemons are started on the slave.
Fig. 6.2: HDFS daemons
6.5.3 Starting mapred daemonsThe JobTracker is started on master, and TaskTracker daemons are started on the slave.
-
7/22/2019 Book Recommender System using Hadoop
45/55
37
Fig. 6.3: Mapred daemons
6.5.4 Starting the JobThe jar file containing the map reduce jobs are specified as argument for hadoop.
Fig. 6.4: Starting the job
6.6 Monitoring the cluster
The web interfaces provided by hadoop can be used to check the status of the jobs.
The status of the namenode is shown by the following web interface
Fig. 6.5: Namenode interface
Shown below is the interface for JobTracker in the master machine. It displays the number of
tasks that are running and their other details.
-
7/22/2019 Book Recommender System using Hadoop
46/55
38
Fig. 6.6: JobTracker interface
Fig. 6.7: Scheduling interface 1
-
7/22/2019 Book Recommender System using Hadoop
47/55
39
Fig. 6.8: Scheduling interface 2
-
7/22/2019 Book Recommender System using Hadoop
48/55
40
CHAPTER 7
RESULTS AND CONCLUSION
7.1 Results
7.1.1 Run Time
The Recommendation job is run 5 times with each of the respective item similarity metrics
namely similarity co-occurrence, similarity Loglikelihood, Tanimoto coefficient, Euclidian
distance and Pearson coefficient.
The run time of each algorithm was calculated using the linux time command which gives us
the real, user and sys times taken for the command to execute and the results (time inseconds) are summarised below
Metric/Run no. 1 2 4 4 5 Average
Similarity Co-
occurrence566.740 564.924 566.108 565.098 564.982 565.570
Tanimoto
Coefficient589.650 590.472 588.847 589.241 589.594 589.548
Similarity
Loglikelihood704.074 705.012 704.725 704.104 705.021 704.586
Euclidian
Distance588.212 589.027 588.624 587.984 588.149 588.499
Pearson
Coefficient441.286 440.956 442.012 441.459 441.427 441.428
Table 7.1: Time Taken
-
7/22/2019 Book Recommender System using Hadoop
49/55
41
Fig 7.1: Average Run Time
7.1.2 Recommendations
The Book Crossing dataset was used for the recommender job. It contained user
details, book ratings and book details such as name, genre, etc.
A python script was coded which takes the ratings file, users file and book details file
as parameters and displays the rated books and the recommended books for the user
which has been specified through the user ID in the script in the console.
The rated books are displayed with the book name followed by the user rating given to
them by the user.
The recommendations were made based on the similarity matrix specified to them.
0
100
200
300
400
500
600
700
800
Average Run Time (s)
Similarity Co-occurrence
Tanimoto Coefficient
Similarity Loglikelihood
Euclidian Distance
Pearson Coefficient
-
7/22/2019 Book Recommender System using Hadoop
50/55
42
(a) Similarity Cooccurrence
Fig. 7.2: Similarity Co-occurrence Recommendations
(b) Tanimoto Coefficient
Fig. 7.3: Tanimoto Recommendations
-
7/22/2019 Book Recommender System using Hadoop
51/55
43
(c) Similarity Loglikelihood
Fig. 7.4: Loglikelihood Recommendations
(d) Euclidian Distance
Fig. 7.5: Euclidian Recommendations
-
7/22/2019 Book Recommender System using Hadoop
52/55
44
(e) Pearson Coefficient
Fig. 7.6: Pearson Recommendations
7.2 Conclusion
The recommender job was successfully run on the Hadoop distributed environment.
The following conclusions can be drawn from the results above.
The time taken by Loglikelihood is the greatest. This is so because it eliminates all theintersections by chance by counting the number of times the items occur together and
also when they are not together. This increases the relevance of the recommendations
but also increases the overhead.
Though the Pearson correlation takes the least time but it is used only theoretically. Ithas a few drawbacks in principle which do not make recommendations accurate. First,
it doesnt take into account the number of items in which two users preferences
overlap, which is probably a weakness in the context of recommender engines. Two
items that have been read by 100 of the same users, for instance, even if they dont
often agree on ratings, are probably more similar than two items which have only
been read by two users in common. This is not considered by this metric and
similarity of the two user ratings outweighs the 100 similar users. Second, if two
-
7/22/2019 Book Recommender System using Hadoop
53/55
45
items overlap on only one user, no correlation can be computed because of how the
computation is defined. This could be an issue for small or sparse data sets, in which
users item sets rarely overlap.Finally, the correlation is also undefined if either series
of preference values are all identical. It doesnt require that bothseries have all the
same preference values.
The run times of Euclidian distance, similarity co-occurrence and tanimoto coefficientwere nearly equal and lie between Loglikelihood and Pearson coefficient.
-
7/22/2019 Book Recommender System using Hadoop
54/55
46
CHAPTER 8
FUTURE SCOPE
The system implemented in the project uses static data to recommend books to the users. To
incorporate dynamic data, distributed databases such as HBase or Cassandra can be used
which can be regularly updated to add new users and ratings. To make the web application,
the data needs to be accessible in real time. The solution to this too can be the use of a
distributed database.
The recommender system can be improved by combining user based collaborative filtering
and content based filtering with the current system. This combination is also called Hybrid
filtering and it helps in significantly performance improvement.
The comparison made between the different similarity metrics was based on the run time and
not on the precision of the recommendations.
-
7/22/2019 Book Recommender System using Hadoop
55/55
CHAPTER 9
REFERENCES
[1] A. Felfering, G. Friedrich and Schmidt Thieme, Recommender systems, IEEE Intelligent
systems, pages 18-21, 2007.
[2] P. Resnick, N. Iacovou, M. Suchak, and J. Riedl, GroupLens: An Open Archi tecture for
Collaborative F il teri ng of Netnews, In Proceedings of CSCW 94, Chapel Hill, NC, 1994.
[3] U. Shardanand and P. Maes Social I nformation F il teri ng: Algori thms for Automating
Word of Mouth, In Proceedings of CHI 95. Denver, 1995.
[4] Daniar Asanov, Algori thms and M ethods in Recommender Systems, Berlin Institute of
Technology, Berlin, 2011
[5] Badrul Sarwar , George Karypis and John Riedl, I tem-Based Collaborative F il ter ing
Recommendation Algorithms, IW3C2: Hong Kong, China, 2001 [Online].
http://wwwconference.org/www10/cdrom/papers/519/index.html
[6] Francesco Ricci, Lior Rokach and Bracha Shapira, I ntr oduction to Recommender System
Handbook,New York: Springer Science+Buisness Media Ltd, 2011, ch. 1, sec. 1.4, pp
10-14.
[7] D. Borthakur and S. Dhruba,HDFS archi tecture guide,Hadoop Apache Project, 2008
[Online]. http://hadoop.apache.org/common/docs/current/hdfsdesign.pdf
[8] Dean, Jeffrey, and Sanjay Ghemawat, MapReduce: simpli fi ed data processing on large
clusters, Communications of the ACM51.1, 2008, pp 107-113.
[9] Linden, Greg, Brent Smith, and Jeremy York, Amazon.com recommendations: I tem-to-
item collaborative fi ltering, Internet Computing, IEEE 7.1, 2003, pp 76-80.
http://wwwconference.org/www10/cdrom/papers/519/index.htmlhttp://wwwconference.org/www10/cdrom/papers/519/index.html