big data and hadoop ecosystem essentials for managers

57
Big Data & Hadoop ecosystem essentials for managers (Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi ) Big Data & Hadoop ecosystem essentials for managers (Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi ) Big Data & Hadoop ecosystem essentials for managers Manjeet Singh Nagi (Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi )

Upload: manjeet-singh-nagi

Post on 11-Jan-2017

31 views

Category:

Technology


1 download

TRANSCRIPT

Page 1: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Big Data & Hadoop ecosystem essentials for managers

Manjeet Singh Nagi(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 2: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

IndexChapter 1 – A Brief History of…?

Chapter 2 - NoSQL databases

Chapter 3 - The backbone I (Hadoop, HDFS, MapReduce)

Chapter 4 - The backbone II (MapReduce continued)

Chapter 5 – A quick view of the ecosystem around Hadoop

Chapter 6 - Hive

Chapter 7 - Pig

Chapter 8 - Hbase

Chapter 9 - Sqoop

Chapter 10 - Flume

Chapter 11 - Kafka

Chapter 12 - Oozie

Chapter 13 - Zookeeper

Chapter 14 - Solr

Chapter 15 - Giraph

Chapter 16 - Putting it all together

Chapter 17 – Hadoop ecosystem on Amazon

Chapter 18 – Machine Learning with Mahout

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 3: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Preface

Started writing this ebook explaining Big Data to managers. Did not get time to complete it. Still uploading it for everyone to have a look.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 4: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Chapter 1 – A brief history of …

Any data set that cannot be processed on a single machine within reasonable amount of time is big data. The underlined phrase in the sentence is very critical to determine if the problem at hand classifies as a big data problem.

Theoretically any amount of data can be processed on a single machine, with high amount of storage and multi processors (e.g. a mainframe). But if the machine takes a couple of days, or even a day, it may not be of much use to the business. If the business or customer or consumer of the data is OK getting the data processed in as much time as it takes on a single machine(and there are valid scenarios for such requirements) you do not need to solve the problem like a big data problem. But in the world today, a large amount of data is coming to companies. A quicker analysis or processing of this data can help them get quicker insights and quicker decisions.

Theoretically big data has three attributes. Volume, velocity and variety, Lets understand each first.

In a big data problem the amount of that data that needs to be processed would typically be huge (volume). It might run into terra-bytes, peta-bytes etc.

It would typically be coming in at a high speed (real time in some case) (that’s velocity).

And it would come in a lot of variety. Variety could mean it could be coming from different sources each of which could have different formats of sending the data. Even within the data from the same source the format could vary over a period of time. Even within the data from the same source at a given time the data may not have structure.

Having said that, why is it happening that the companies are getting such huge amount of data at huge velocity and in so much variety?

The following reasons over the past couple of decades lead us to big data

Digitization of organizations – Over the past three decades or so organizations have become more and more digitized. Every activity done by organizations has become digitized. Every interface of the organization, be it with consumers, partnering vendors, government agencies have become digitized. All this is creating a lot of data. But all this would not have generated data (in volume, velocity and variety) needed to qualify as a big data unless the developments mentioned in the following paragraphs would have taken place.

Web 2.0 – Web 2.0 introduced technologies which made billions of people not the consumers of websites but the content generators. Blogging & social websites are the examples of the web 2.0. Even on sites not typically classified as social or blogging sites there are features which enable billions of people to generate contents e.g. sharing the new articles from news websites, commenting on specific content on a website etc.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 5: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Web 2.0 is often a hotly debated term. It is not as if a new version of web or any related technology were released. But the web in the last decade of the last century was about flow of information from website owners to billions of web users. Slowly the web evolved to enable billions of user to generate the content. The content in the web today is much more democratic. It is by the people and for the people.

Mobile devices – With the advent of mobile devices users are performing many more activities and spending more hours in the web than earlier. Add to that the fact that mobile devices capture much more user contextual information (location for example) than desktops did earlier. This contextual information if processed and analyzed can enable organizations understand their consumers much better and provide much more meaningful services and\or products. Also the very fact the user are spending lot of time on their mobile devices means the amount of information that is getting generated is much more than what was getting generated when users were using desktop.

More digitization of the organizations - With more and more retail, social interactions and information consumption moving to web organizations need to, literally, every click of a user to understand her better. As opposed to a brick a mortar, where a user a can be observed physically and assisted in-store assistants, in a e-tail website the only way to observe a user is observe and analyze every click made by the user on the web. E-tail offers an advantage over brick and mortar shops in the sense that the user activity can be saved for analysis later which is not possible in a brick and mortar shop. This analysis needs a lot of data, in form of logs, to be analyzed.

Typical pattern of big data problem solutionAs we all know from our experience, typically in an application the time taken to process the data is order of magnitudes smaller than the time taken for IO of the data from data repository (disk or database). Also the time taken to the read (IO) the data over the network (say from network storage or database on another server) is many times larger than the time taken to read the data locally from the disk.

So typically when a big data problem is solved,

1. The data is distributed across multiple machines (called nodes). Transferring a petabyte of data to a single machine would take much more time than the time taken to divide this data into smaller chunks and transferring to 100 smaller machines in parallel. The IO is now done to 100 nodes in parallel which reduces the IO time significantly.

2. Now that the data is distributed across multiple nodes, the code/application/binary/jar etc. is copied to all the nodes. This is unconventional as compared to a typical application where data is brought from multiple sources to a single machine where the application resides and is processed on this single machine. In big data solutions it is the application that moves closer to the data.

3. Finally, the output from all the nodes is brought to a smaller number of nodes (many times only 1 node) for final processing or summarization.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 6: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

So, as you can see, the solution for a big data problems is about distributed storage (#1 above) and distributed processing (#2 and 3). The evolution of the solutions for big data problem also happened approximately in the same manner. Firstly, many solutions around distributed storage arrived and then around distributed processing.

How did it all start?Commercial relational databases ruled the roost when it came to consistent storage since 70s/80s. These relational database had their own advantage which made them so popular. But they had certain limitations which did not come to the fore till late 90s.

In the late 90s and early part of this century companies had more and more data to store. The option available with the relational databases was to buy bigger and bigger machines which are really costly.

Also in order to keep their websites available 100% of the time ( you do not expect google to be down, do you?) the companies needed to scale out(add more active hot backups). This made the relational database costly only from hardware perspective (relational database in companies always ran on high grade servers) but also from licensing cost. The licensing cost for the relational database was directly proportional to the number of machines it was going to run on. For e-tailers and search giants had data that needed 100s and 1000s of machines to store data with fault tolerance.

Also, the relational databases were designed to store data in a fixed format. They were designed to lock-in to the schema at the time of database design. But companies were getting data which was un-structured to a large extent (imagine logs). They could have formatted this data and stored data in a structured format in the relational database but that eliminated any possibility of using any data, discarded during the formatting stage, whose significance was realized only later companies were looking for persistent data storage where schema was not locked-in at the time of database design but at the time of database read.

To summarize, organizations were running into the following limitations of relational database storages:

Licensing cost prohibited the scaling out that was needed to store large data sets.

Licensing cost as the higher-grade servers needed were prohibitive for the creating fault tolerance in the storage.

Relational database were designed for locking-in the schema at the time of database design.

As companies started coming against these limitations many of them started designing databases on their own and bringing them out in public in form of open source database. These databases were together called NoSQL databases. All these database had the following attributes (in addition to the fact that they were open source),

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 7: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

They were designed to run on clusters made of commodity hardware. Unlike relational databases, they did not need high end servers.

They were inherently designed to run on clusters. So as the size of data increases an organization could just add more commodity hardware and scale out rather than buying costly servers.

Fault tolerance was inherent in their design. Any data on one node of the cluster was backed up on another node (number of backups was configurable not only at database level but at much more granular level). This low cost fault tolerance made them much more resilient on commodity hardware than relational databases on enterprise servers.

They were design for unstructured data. So you could just load the data in whatever format you get it. You need not even know what information comes in the data. It was up to the application to know what to expect in the data.

Also NoSQL databases challenged the very foundation of the relational databases. The foundation was that relational database updates were ACID (Atomic, Consistent, Isolated and Durable). NoSQL databases challenged this very foundation. They questioned if every business scenario really needed the databases to be ACID compliant. We will get into much more details on this in the chapter 2.

While many authors do not talk about NoSQL databases when they talk about dig data technologies, NoSQL databases brought to the fore distributed storage, used for big datasets, as we know it today.

Sometime in the early part of this century, Google published two papers.

One of the papers was about their distributed file storage system. This was not the first distributed file storage system in the world. But it had many architectural and design aspect to solve a very specific problem they had at hand. It was designed for

Fault tolerance using commodity hardware. So the data was distributed across a cluster of commodity machines (instead of high end servers). Since the machines are high-grade there are high risks of failure The distributed file system will take care of backing up data on each node on other nodes and recovering it in case a machine fails.

Scenarios where files written once to the distributed file system are read multiple times.

Random reads (reading a specific record from the file) are not required or are an exception.

Files are required to be ready sequentially in big chunks rather than one record each time. These big chunks are also read in sequential manner rather than from random places in the file

Random write (updating some particular read) is not needed. So you do not have a scenario to update a record in the file

Updates to the file are about adding/appending more data that too in huge chunks rather than one record at a time.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 8: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Scenarios where a modest number of huge files need to be stored rather than huge number of modest/small need to be stored.

Clients(of distributed file system) which want to process bulk of data faster(throughput) rather than small amount of data quickly(latency)

The other paper from Google was about a framework they developed for processing their data. It is called MapReduce. In this framework user specifies a map function that will transform their data and a Reduce function that will summarize the output of Map function. The MapReduce framework takes the onus of

1. distributing the data to be processed across many nodes2. distributing the map and reduce functions to all the nodes so that code is

closer to the data and hence IO is reduced(refer to Typical pattern of big data problem solution we discussed earlier in this section)

3. Schedule to run Map and Reduce functions on all the nodes4. Manage to recover from a failed machine – So the framework will take

care of restoring the data from a backup on another node and restart the map or reduce function there if some machine fails.

The MapReduce framework was designed to run simple functions on a huge amount of data. It lets programmers write Map and Reduce functions while it takes care of distributing the data and code, schedule the run and recovering from a failure.

I do not want you to get bogged down by the term MapReduce. It is similar to a typical processing of data in other applications. Here is more detail on what a map reduce function are to make you more comfortable before we move forward.

A map function accepts a record in the form of a key-value pair and does processing or formatting on the record and generates an output in the form of another key-value pair. I do not want you to think of “value” in the key-value pair as a single field. A “value” in the key-value pair could a complete record with a lot of fields or information in it. E.g. key could be employee ID and value could be all the details of that employee, key could be a transaction id and value could be all the details of the transaction. It is up to the map function to decide what processing or formatting it wants to do on which field in the value.

Similarly, Reduce function reads the key-value output from all the map functions running of different nodes and summarizes to generate the final output.

A very simple example is – Say you have a huge data which has all the details of the employees of many organizations from the world in a file. What you want to achieve is calculate avg salary for each designation (assume there are standard designation). Your map function will read the part of input file provided to it. The input Key-value would be designation and the value would be the rest of the information about that employee. Your map function will read each record and for each input record it will generate an output with key as designation and value as salary from that record. It sounds simple. Isn’t it? What is important is that the map function is parallelizable. You can divide your input records into as many processing nodes as you have and run Map function in parallel on all

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 9: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

those nodes. The map function is not dependent on getting information from another record on another node while processing a specific record.

The reduce function in our example will read records from all the nodes where map function. Its input would be the output key-value from map function. It will do an avg for each designation present in the file.

This lead to the development of Hadoop, an open source product which delivers capability similar to the ones shared by google in their two documents. Hadoop has two components:

HDFS (Hadoop Distributed File System) – This is similar to google’s distributed file system (as described above). As the name suggests HDFS is a distributed fault tolerant file system. It enables storing large files across a cluster of commodity machines.

MapReduce – MapReduce is a framework to process data in the form of key-value pair by distributing the key-value pairs across a cluster of machines. It is run in two steps. First step is called a Map where input in the form of Key-value is processed to generate intermediate key-value pairs. The intermediate key-value pair go through a reduce step which summarizes the key-value pair to generate the final output.

Hadoop was quickly adopted across organizations. This eventually led to the development of a lot of other products which extended the functionality of Hadoop further. E.g. Flume, Sqoop, Pig, Hive etc. We will understand each of these open source product developed around Hadoop in subsequent chapters in enough details for us to be able to design, at a high level, a solution to solve a big data business problem.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 10: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Chapter 2 - NoSQL Database

How NoSQL databases are so scalable

When we say an application or a database is scalable we generally mean horizontal scalability. The capacity (to process more data or take more load) of a scalable solution can easily be increased by adding more machines to the cluster. On the other hand, the capacity of a not-so-scalable solution can either be not increased at all or it can be increased only by replacing the existing machine/server with a bigger (costlier) (vertical scalability) one.

The way relational and NoSQL databases store the data is very different. This difference makes NoSQL database very scalable and cluster oriented. Let’s understand this with an example.

Let’s take example of a professional networking website. Users maintain the information about their education institute (schools and colleges they passed from) in this application. Let’s also assume that the typical access pattern from the application is such that if every time application accesses user information it will access her school/college information as well.

A typical relational database design to store this information would be to save user and educational institutes two different tables and maintaining the relationships between the 2 using foreign keys(or using a third table to maintain the start and end date of relationship).

Typical design in a relational database

User table Education Institute Table

Relationship Table

Nowhere in this design have we told the database the typical access pattern of the data from the application. I.e. we have not told the database if every time

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

User ID

User Name

User DoB

…..

Education Institute ID

Institute Name

Institute City

….

User ID(Foreign Key)

Education Institute ID(Foreign Key)

Start Date

End Date

Page 11: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

application accesses user information it will access her school/college information as well.

Now let’s look at the database design of a NoSQL database. NoSQL database would typically be designed in such a way that a user’s school\college information will be embedded within the user information itself and stored at the same physical node in the cluster. So the user information would be like,

User {Name, D0B, Current designation, Educational Institute [(Institute1, Start Date, End Date), (Institute2, Start Date, End Date)…]}

Note that the number of educational institute for a user can vary from zero to any number your applications wants to allow. Relational database are generally not conducive to store such limitless list like information. If you try to embed the educational institute within the user information in a relational database it would also make it highly denormalized and inefficient.

The NoSQL database would store the user and her educational institute information at the same node in a cluster. If the educational institute information was not embedded in the user information but instead maintained separately with a relation between two (like in relational database) the two could be saved in different node. In that case every time a user information was accessed the application would have to connect to another node as well to get the information on the education institute (due to the typical access pattern of our application described above). This would increase the IO and slow down the application.

This way of storing the data makes NoSQL database very cluster oriented and scalable. As you have more users, you can add mode nodes to the cluster and spread data across the cluster. When application needs the data for a user, the database will get the data from the node it is on.

You cannot scale the relational database in the same manner. The user and educational institute are maintained as separate tables. If you spread user information across the nodes on the cluster what should you do about the educational institutes? Typically many people would have gone to the same institute. Relational database would maintain this many-to-one relationship by using foreign key. But you cannot spread educational institutes across nodes because user on node 1 would have gone to institute on node 2(do that you increase the IO).

Please note NoSQL makes a very strong assumption in terms of how data will be typically accessed. If there are multiple ways in which data will be typically accessed then probably NoSQL databases would not be a good option. In our example, what if the application will also need to typically generate reports by counting the user by their educational institute. In that case the application will have to scan through all the users across all the nodes in the database to get the output which would be very in-efficient. In such a scenario a relational database would be a good option or you can used NoSQL for general queries and create a materialized view for storing counts of users by educational institutes.

I hope now you can imagine how NoSQL databases store the data. They spread the data across the nodes in clusters but they ensure that the data that is

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 12: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

accessed typically together stays on the same node (latter part needs to be ensured by a good design). As the data increases in the application one can add more nodes to the database to scale the solution horizontally.

Please note that, conceptually, it is not as if NoSQL database stores one table on one node and another table on another node. The scalability comes from the fact that it can distribute each row of the table to a different node. Imagine (pardon for using a very simplistic example here) you have a pack of pringles and a pack of chocolates and 10 dishes (plates to serve them). Unless you open the pack of pringles and chocolates you cannot use the 10 dishes. You will be able to use only 2 dishes. So users (guests who need the pringles and chocolates) would put load on those two containers. But if you open the packs you can spread the pringles and chocolates across 10 dishes. Some containers could have only pringles, some only chocolate, some a combination of both. Some would have more quantity which you can keep near areas which have more guest. Others can have less quantity and can be kept near guest who would not be consuming as much. That’s scalability.

AtomicityLeaving the technical terms aside, what atomicity means is that related database updates should be done in a manner that either all are done or none is done. In case of relational database, the atomicity is ensured by the concept of transaction. So if updates to multiple tables need to be done in a manner that either all are done or none is done then relational database wrap these updates in a transaction and make the updates. If there are issues after updating a few tables and the rest of the tables could not be updated, the database will roll back the transaction, i.e. the updates already made to the table as a part of the transaction are rolled back.

Let’s take an example. Say there is a simple application that records the transactions done on a bank account and maintains the final balance. There are two tables in the database. One stores the transactions. Other stores the final balance. If a user executes a transaction on an account, the application needs to make two updates. First it needs to insert the transaction in the transaction table. Second, it needs to update the final balance in the final balance table. The application will indicate to the relational database that these two updates constitute one transaction. The database will update the tables in any order but ensure that either both updates are done or none. If after updating the transaction table first it runs into issues and not able to update the final balance table it would roll back the updates made to the first table and inform the application which must have a code to handle such exceptions.

NoSQL databases manage the atomicity a little bit differently. Though they are atomic to an extent but not thoroughly.

Let’s continue with the same example of account update we used for understanding atomicity in relational databases. If it were a NoSQL database there were two ways in which it could be designed. The final balance could be embedded in the table which lists the transactions in an account. Or the final balance could be a separate table with a relationship between the transaction and final balance table.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 13: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Design 1

{Bank Account ID, Final Balance, Transactions [(Transaction1 ID, Date, Credit/Debit Ind, Amount, Other transaction details), (Transaction2 ID, Date, Credit/Debit Ind, Amount, Other transaction details)… ]

Design 2

Table 1(Bank Account ID, Final Balance)

Table 2(Bank Account ID, Transactions [(Transaction1 ID, Date, Credit/Debit Ind, Amount, Other transaction details), (Transaction2 ID, Date, Credit/Debit Ind, Amount, Other transaction details)…]

In Design 1 the final balance is embedded within the same table which has the list of the transactions. So updates made to transaction and final balance would be both be either done or none. It would not happen that one of them is done and the other one not. So the atomicity is ensured as much as a relational database.

In case of design 2, the final balance of an account could be stored on node different from the list of transactions for that account. The NoSQL database would not be able to ensure atomicity across nodes. The application will have to ensure that either both the updates are made or none. NoSQL databases would ensure that all the updates made to one table are either all done or none. But it cannot ensure this across tables.

So atomicity in relational database is ensured by design. While designing the database if we keep the data, on which we want atomic updates, together the atomicity is ensured as the data update is a part of the single update. Anything more than this needs to be ensured by application.

ConsistencyThe NoSQL databases were designed to run on cluster of commodity grade machines. The machines of commodity grade have a higher chances of failure. So any data saved on a node must be backed up other nodes. NoSQL databases generally store a copy of each data on 3 different nodes (this is called replication factor and is configurable). This adds complexity to the game now. If your data is replicated across 3 nodes you need to ensure they are in synch. If you don’t they will go out of synch and different users access data from different nodes will read different version of the data. But if your application needs to wait till all the nodes are updated before confirming to the user that updates have been made it will make the application slower. In case of relational database updates only on one node would be needed (or two in case of hot-backup). But in case of NoSQL there would be 3 IOs needed (or mode in case your database is configured that way) which would make the response to the application less responsive or less alive.

So NoSQL database use the concept of Quorum. So when updates to the database are made the NoSQL database would not wait for update to all the nodes. It would wait only for the majority to get updated. So if the replication factor is 3 the NoSQL database would wait for updates confirmed only by 2

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 14: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

nodes (quorum for updates). The third node will be consistent later. This concept is called eventual consistency as the different nodes eventually become consistent. What if one of these nodes fail before the updates are made to the 3rd node? The NoSQL database would take the latest update from the rest of the two nodes on which the data is saved and replicate it on another node.

What about the quorum while reading the data? The NoSQL database would not read the data from all the nodes and give the result to the user. That would make the application slower. The number of nodes from which it will read the data (or quorum for read) should be 1 more than the number of nodes which were not a part of quorum for update. So if your data was replicated across 3 nodes, 2 nodes were part of update quorum and 1 was not a part of the quorum, the NoSQL database would read the data from 1+(number of nodes which were not a part of the update quorum) i.e. 1+1=2 nodes.

General rule is that

Qu+Qr > Rf

Also Qu > (Rf/2)

Qu Quorum for updates

Qr Quorum for reads

Rf Replication factor

Please note that any operation that needs more nodes to participate in a quorum will become slower than the complementary operation (read is complementary for write and vice versa) in the equation. But the complementary operation would become faster due to the above equation. So if your replication factor is 7(Rf) and you configure that quorum for updates as 6(Qu) then you need to read data from only 2(Qr) nodes (refer to the above equation). The reads will be much faster than the updates. So based on the requirement of your application you can configure all the 3 parameter of the above equation in the database. If you need updates faster then go for smaller quorum (but still a majority) would be good. If you need reads to be faster instead you need to have higher quorum for updates and lower quorum for reads. Some of the database allow you to configure the values of the 3 parameters in the above equation not only at the database level but also at the transaction level. So while executing the transaction you can indicate to the database if you want to the transaction to be confirmed by a majority or lesser number of nodes.

Types of NoSQL databases

NoSQL databases are generally classified into four categories based on the information is stored and accessed –

Key-value storeThis is one of the easiest NoSQL database category to comprehend. These are typical key-hash store. You can store and fetch any value for a given key. The database does not care what is inside the value. You can store xml, JSON or

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 15: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

anything you want as a value. The database does not care. You can even store different format for different keys in the same table (called bucket here). The database does not care. The onus of making sense of the value read from the database lies with your application. This also means the database cannot do any validations on the value, it cannot create index on the value and you cannot fetch data based on any information within the value. All the access is done only based on the key which makes is very fast.

Typically used to store session information, shopping cart information, user profiles all of which require fast access.

A table in key-value data store is generally called a bucket. So a database can have multiple buckets. The buckets are used to categorize keys and store them separately. e.g if you have three different values for a key you can merge the values into one value and store it(design 1). In such a case the onus of reading the value and splitting into 3 different values will lie with your application. Or you can have 3 buckets and in the table and store the 3 values separately (design 2). First design involves less IO and hence faster. The second design has more IO and hence slower but the design is less complex

Design 1

Database

Bucket 1

Design 2

Database

Bucket 1 Bucket 2 Bucket 3

Atomic updates are ensured for a given key-value. But if the application needs atomic updates across buckets in a database then application will have to ensure that.

Examples of these databases are Redis, Memcached DB, Berkeley DB, Amazon’s Dynamo DB

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Key1

Value1, Value2, Value3

Key1

Value1

Key1

Value2

Key1

Value3

Page 16: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Document storeThese are similar to key-value store but these databases do not have a key and the value part (called a document in this case) stored is not opaque. So you can fetch data based on different fields with the document. You can very well save your key within the document and fetch based on that field. Indexes can be created based on the fields within the document. The schema of information within the document can vary across documents saved in different rows. Tables are called collections.

Database

Row 1/Document 1

Row 2/Document 2

Please note the schema of documents in different row is different.

MongoDB and CouchDB and famous examples of this category.

These databases are generally used for Event logging by enterprise application and as a datastore for Document management systems.

Atomicity is maintained at a single document level just like a key-value store maintained atomicity at single key-value level.

Column familyThese databases are more difficult to understand than key-value and document. But these are the most interesting ones. These are also the most relevant from Hadoop perspective because the Hbase, a data store on top of HDFS, belongs to this category.

These database are accessed by a key to read multiple values\columns. Different columns are grouped together into column families. Columns that may be accessed together are generally grouped into column family and are stored together.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

{Name:ABC

LasName: XYZ

DoB:DD/MM/YYYY}

{Name:DEF

LastName:HKJ

Place:Mumbai}

Page 17: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Try not to imagine the columns here like the columns in the relational database. The columns in relational database have same name across multiple rows. If a row does not have value for a column it is saved as null in relational database. But in a column-family database if a row does not have a column it just does not have the that column.

The figure below shows a good example of column-family data store\

Row 1

Row 2

So in the example above the access to the database is by using Key (UserID). The columns (FirstName, MiddleName and LastName) have been grouped together into a column family (UserDtls) as they will be accessed together and the columns (Institute, StartDate, EndDate) have been grouped as another column family (Education). Please note that the columns in the first row in column family ‘UserDtls’ are different from that in the second row.

Indexes can be created on different columns. While adding a new columns family for a database requires database restart, an application can very easily add new columns with in the column family. Atomicity of updates is maintained at a column family for a given key. Since different columns families for a given key can be stored as different nodes atomicity of updates cannot be maintained across updates for different column families for a given row.

Cassandra and Hbase are examples of these databases.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

ColumnFamily-UserDtls{Name:Manjeet,MiddleName:Singh,LastName:Nagi}

ColumnFamily-Education{Institute:AMC University,StartDate:30/06/2012,EndDate:30/06/2016}

ColumnFamily-UserDtls{Name:XYZ,LastName:ABC}

ColumnFamily-Education{Institute:AMC University,StartDate:30/06/2012,EndDate:30/06/2016}

Key – User1

Key-User2

Page 18: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Before we move on to the next category of the NoSQL database type, I want to reiterate that the term column should not be visualized similar to column in relational database. In relational databases the columns are similar to the columns in an excel in the sense that

- all the rows will have all the columns defined on a table- if a row does not have value for a column its value will be saved a null- the name of the column is not saved at each row

In a column family table the columns should be imagined like the attributes in an xml. Also

- all the rows need not have all the columns- if a row does not have value for a column it will not save its value as null.

It will not have that column at all- the name of the column is saved at each row

GraphThe category is the most difficult to comprehend. These databases store entities and relationships between them. Conceptually, entities are nodes in a graph and relationship are depicted as directional edges in the graphs. Edges can have additional attributes which depict further properties of the relationship. Neo4J is a good example of this category.

The picture below depicts the kinds of relationships that are generally stored in such data bases.

Legends

node depicts an entity

shows attributes of relationship between entities

Depicts a one way relationship

Depicts two way relationships

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Person1

Person2

Book1

Person3

Movie1BigData

Reports to

Reports toisFriendsWith

Knows Likes

Org1WorksIn, StartDate,EndDate

Page 19: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Something important to understand is how this category is different from the other NoSQL databases in terms of scalability. A key-value data store ensures scalability by spreading the key-values across all the nodes of a cluster. It can do so because it understands (by virtue of the database design) that it is a key-value combination that will be accessed mostly together. A document store achieves this by spreading documents across the nodes on the cluster. Similarly, a column-family data store achieves this by spreading key-column family combination across cluster.

But a graph data store cannot achieve this as nodes in a relationship are linked to each other. So graph databases cannot spread the data across nodes in a peer-to-peer manner. They achieve scalability by using master-slave configuration of the cluster. This can be achieved in many ways:

1. Reads operations can be directed to slave nodes. Writes operations are directed to master node. Once master is updated a confirmation is provided to user about the database updates. Slave nodes are updated after this. Add more and more slave nodes makes reads more scalable. If writes need to be made more scalable then the data needs to be sharded across multiple masters and the logic to do so is very specific to the domain.

2. Writes are directed to salves as well but they provide confirmation to user only after master has been updated. This makes writes as well scalable without really sharding the data.

As must be clear by now, graph databases are used more for networking problem (social or professional networking being one such problem).

As must be clear from the name “NoSQL”, none of these databases use SQL for database access. All of them have their syntax for database operations. We have not gone into those languages as the objective of the book is not to get to the level of code. Having said that, the languages for each of the database are not very difficult to grasp.

So why did we understand so much about the NoSQL databases when the book is primarily about Hadoop ecosystem. One of the open source in the Hadoop ecosystem, Hbase, is a column-family store built on top of HDFS. We will understand Hbase in a detailed manner in chapter 8.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 20: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Chapter 3 - The backbone (Hadoop, HDFS, MapReduce)

Hadoop is an open source software/framework/product to implement a distributed file system (HDFS) and for processing your map-reduce (MapReduce) solutions.

I do not want you to get bogged down by the term MapReduce. It is similar to a typical processing of data in other applications. If you can recall the set theory and functions from our old school days, you will realize that we learnt about Map in the school days itself.

A Map is a function that processes an input data to produce output data. E.g.

f(x)=x2

The above is a map function. It processes any number x and produces its square. Another way to look at the above function is using the set theory (again something we learnt in school)

In the above diagram, there are two sets, A and B. The function or the map f(x) maps each number in the set A to its square in Set B.

In our enterprise applications the functions or maps are complicated but they are still functions/maps. e.g. Let’s say we have a mutual fund transaction processing batch system which receives transaction from the sales agents and processes them. The first program or script in the transaction processing system would typically do some formatting on the transaction, validate it and persist the transaction in the data store. So our first program is a function as depicted below.

f(Input Transaction)=Formatted, Validated, Persisted Transaction

Or we can imagine our program as a map as shown below

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

1

2

3

4

1

4

9

16

f(x)

Set A Set B

Page 21: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Set A is our set of input transactions. So Set A is our input file. f(x) is our program which maps the input transaction Tn to Tnfvp which is a formatted, validated transaction persisted in the data store.

A Reduce function is just a program which reads a group of records and produces a summary of those records. So extending the same example above there could be a program at the end of the transaction processing system which sums all the transactions and produce a sum total of the amount of transactions processed on that day (Fig 1 below). Or it could produce sum total of transaction separately for each mutual fund product (Fig 2 below).

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

T1

T2

T3

T4

T5

T1fvp

T2fvp

T3fvp

T4fvp

Set A Set B

f(x)

T1fvp

T2fvp

T3fvp

T

4fvp

Sum of the amount of transactions processed

Sum of the amount of transactions processed for

Product A

T1fvp

T2fvp

T3fvp

T4fvpSum of the amount of

transactions processed for Product B

Fig 1

Fig 2

Page 22: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Hadoop MapReduce is more suitable where processing of a data record is not dependent, especially during Map function, on other data records in the input. So if your map program (In the example) needs information of some of the other transactions when processing a transaction Tn then Hadoop MapReduce is not the best solution for it. The reason is that Hadoop MapReduce distributes the transactions across nodes on the cluster and sends the Map function (your program in this case) to all these nodes to process these transactions. If your program needs information from other transactions while processing a transaction Tn then there will be network overhead to get details from transactions on the other nodes. This network IO could slow down the Map.

In case of Reduce the program may need inputs from other transactions (say if it is summing all the transactions) but Reduce is generally processed on a very few nodes. Even when Reduce is processed on more than 1 node (but still very few nodes as compared to that of Map) the data is divided amongst the nodes in such a manner that Reduce on a node will not need information from data on the other node executing Reduce. In the example above if you want to sum the transactions by Product you could send the transaction for Product A to node 1 and that of Product B to node 2 and run reduce on both the nodes. Reduce will calculate the sum for each product separately on each node.

Hadoop consists of two components – HDFS and MapReduce. We will understand each of these in more detail in the sections below

HDFS

HDFS (Hadoop Distributed File System) is, as its name suggest, an open source distributed file system to store huge amount of data. It splits the files that need to be stored into small blocks and stores those blocks of file on different nodes on a cluster while letting the users (applications, software, frameworks which use HDFS for its storage) still view the file as a single ,unified and un-split file. So the distribution of the file to different nodes on the cluster is not visible to the user.

At this stage it is important to re-iterate that HDFS is suitable only for certain scenarios. These are -

Scenarios where files written once to the distributed file system are read multiple times.

Random reads (reading a specific record from the file) are not required or are an exception.

Files are required to be read sequentially in big chunks rather than one record each time. These big chunks are also read in sequential manner rather than from random places in the file

Random write (updating some particular read) is not needed. So you do not have a scenario to update a record in the file

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 23: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Updates to the file are about adding/appending more data that too in huge chunks rather than one record at a time.

Scenarios where a modest number of huge files need to be stored rather than huge number of modest/small need to be stored.

Clients(of distributed file system) which want to process bulk of data faster(throughput) rather than small amount of data quickly(latency)

HDFS works on master-slave architecture. Master node (generally 1) has a Namenode and SecondaryNode daemons (or processes) running on it. Rest all the nodes in the HDFS cluster are slave nodes and have DataNode process/daemon running on them. Actually blocks of any data file is saved on slave machines where DataNodes are running. MasterNode only has metadata of each of the block of these files.

Master Node

Slave Node

Namenode, SecondaryNode and DataNodeNamenode and SecondaryNode are the processes/daemons that run on the master node of the cluster. Namenode stores metadata about the files stored on HDFS. It stores information about each block of each file. It does not read or write blocks of files on DataNodes. It only tells, during a write operation, the HDFS client about the nodes where blocks of files can be stored. Similarly during the read operations it only tells the HDFS client about the DataNodes where the blocks of each file are stored. It is the HDFS client that stores or reads the blocks of the files by connecting with each DataNode.

The metadata is stored in a file named fsimage on the disk. When the Namenode is started the metadata is loaded into the memory. After this all the metadata updates (about new files added, old files updated or deleted) are stored in the memory. This is risky for the obvious reason that if the Namenode goes down all the updates since the last restart would be lost. So Namenode stores the updates as well in a local file names edits. This eliminates the risk only to some extent. If the Namenode goes down and needs to be restarted, it will have to merge edits file into fsimage file. This will slow down the restart of

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

MasterNode

SecondaryNode

DataNode

DataNode

DataNode

DataNode

DataNode

DataNode

DataNode

HDFS Client

Page 24: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

a Namenode. This risk is further brought down by adding a SecondaryNode. The SecondaryNode daemon/process merges the edits file on the primary node with the fsimage on the primary node and replaces the existing fsimage file with this new merged file.

Challenges or Limitations of the HDFS architectureSince Namenode stores all the Metadata and if it goes bad all your cluster will be useless, the Namenode is a single point of failure. Hence the physical machine on which Namenode and the SecondaryNode daemons are run should be of robust standard and not of the same specifications as the machines on which DataNodes are run which could be commodity machines. For the same reason, the NameNode should also be backed up frequently to ensure the metadata can be restored in case the Namenode cannot be restarted after a failure.

Also as we know, the Namenode loads all the Metadata from the fsimage file into memory at the time of start and operates on this data during operations. Metadata of each block of file takes about 200 bytes. This adds a limitation on the usage of HDFS. Storing a huge file broken into small blocks works fine on HDFS. But storing too many small files (smaller than the block size on HDFS) creates Metadata overload which clogs the memory on the Namenode. This is primarily the reason HDFS is not a suitable distributed storage for smaller files.

As you would have observed by now, Namenode could create a bottleneck as all the read and write operations operation on cluster would need to access Namenode for the metadata it needs. The bottleneck problem was solved in the later versions of Hadoop (Hadoop 2.0/YARN)

Each block of file is saved on the slave nodes running the daemons/processes called DataNode. DataNodes also send regulars messages (called heartbeat) to the Namenode. This heartbeat informs Namenode if a specific DataNode is up and running.

Replication of data Since the DataNodes are run on commodity machines and the chances of these machines going down is high, each block of file is replicated on 3(default and can be changed) different DataNodes. The first replica is stored on a node at random. The second replica is stored on a DataNode which is on a different rack. This ensures against a rack failure. The third replica is saved on a different machine on the same rack. The chances of multiple racks going down is less. Hence the third replica is saved on a different node on the same rack as the second replica without increasing the risk of failure. Saving the third replica on a machine on the third rack would increase the network IO and make the read and write operations slower as different copies of the replicas are accessed during the read and write operations. Please note the number of replica can be configured at HDFS level as well as at each file level. Increasing the number of replicas makes HDFS operations slower as the IO increases.

Typical read-write operations in HDFSWhen a file needs to be written to HDFS, users/applications interface with HDFS client. The client starts download the file. Once the download reaches

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 25: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

the size of a block, the client works with Namenode to find out on which DataNode can each block of the file be saved. Once it get this information it sends the block of the file to first DataNode which starts writing file on its disk and at the same time starts sending it to the second DataNode where its replica needs to be saved. The DataNode 2 starts writing it on its disk and start sending it to disk 3. On completion of write disk 3 confirms to disk 2 which confirms to disk 1 which eventually confirms to HDFS client and which in turn confirms to Namenode. Once Namenode gets confirmation it persists the metadata information and makes the file visible on HDFS. This process is repeated for each block of the file and complete file is saved in this manner. Checksum on each block is calculated and saved in HDFS to validate the integrity of each block when the file needs to be read.

Similar process is followed at the time of read. When a file needs to be read, the HDFS client gets the DataNode information from Namenode for each block and reads it from the DataNode. Checksum is calculated again and matched with the checksum saved at the time of write to validate integrity. If the read fails from a DataNode (node is down, or checksum fails) the block is read from the replicated node

In the above read write operation we assumed a replication factor of 3. This factor can be configured ad HDFS level or a file level. Even after file has been written to HDFS its replication factor can be reduced. HDFS deletes some of the block replicas of the file to bring down the replication factor of the file if we reduce the replication factor of a file

When a file is deleted by HDFS client, only the metadata information is updated to mark it as a deleted file. This makes deletion faster. Actual deletion of file happens later.

All the DataNodes send messages, called heartbeat, to Namenode every 3 seconds. If Namenode does not receive message from a DataNode it assumes it has failed. Since Namenode maintains the metadata information of file blocks saved on each DataNode and also on which other nodes they are replicated, it recreates those blocks on other nodes which are running and updates its metadata information.

MapReduceMapReduce is an open source framework to execute your Map (and Reduce) programs on a cluster of machines. MapReduce copies your Map program (provided by you) to each node on which a block of input file is stored and runs it on that node to process the block of input data. Once all the nodes in the cluster have run their Map programs the MapReduce copies the output from all the nodes to a smaller set of nodes where it copies the Reduce program(again provided by you) and runs the Reduce program on each of these smaller set of nodes to process and summarize the output from Map step. Though this is a simplified view of MapReduce, this is what it does. As we progress in this chapter and next we will see more complex and detailed view of MapReduce.

Just like HDFS, MapReduce also works on a master slave configuration. Master machine has a daemon, named JobTracker, running on it. All the other machines

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 26: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

on the cluster are salve machines and have a daemon, named TaskTracker, running on them.

JobTracker and TaskTrackerJobTracker is responsible for coordinating with all TaskTrackers on the slave nodes where the Map and Reduce programs are run. It checks with the Namenode (of HDFS) where the blocks of input files are kept. It sends the Map and Reduce programs to those node. It asks TaskTracker on each of the slave nodes to run the Map and reduce programs. It keeps receiving heartbeats from the TaskTracker to check if they are fine. If a TaskTracker does not send the heartbeat the JobTracker assumes it has failed and reschedules the Map/Reduce program running on that node on another node which has a replica of that data.

Just like Namenode in HDFS, if the JobTracker in MapReduce goes down all the cluster running the MapReduce becomes useless. So JobTracker must be run on a machine with specifications better than that of a machine running TaskTracker.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 27: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Chapter 4 - The backbone II (MapReduce continued)Sample MapReduce solution

Let’s look at a very simple MapReduce solution. Let’s say you have billions of successful sale transactions of all the mutual fund products of all the mutual funds companies in USA since 1975. You need to sum the transactions by year of sale. Your input file has a record for each transaction. Each record has transaction date, the transaction amount and other transaction details. While this problem can be solved by processing the transactions on a single machine, the chances of it overwhelming even a high-end machine is very high. Even if it completes successfully it would take a lot of time. You can solve this problem much more easily by distributing the transactions over a cluster and processing them in parallel.

You need to write a Map program which will read a transaction and emit Year of sale and transaction amount to the output file. You need to write a Reduce program which will take multiple records(for a given year) with Year of sale and transaction amount as input and generate an output where transaction amount is summed and it emits Year of sale and summed transaction amount as output. So,

Map Program:

Reduce Program

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Transaction ID,

Transaction Date,

Transaction Amount,

Mutual Fund Product,

……..

Year of Transaction,

Transaction Amount

Map Program

Year of Transaction,

Transaction Amount1

Year of Transaction,

Transaction Amount2

Year of Transaction,

Transaction Amount3

Year of Transaction,

Sum of Transaction Amounts

Reduce Program

Page 28: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

For the sake of simplicity, our Reduce program assumes that all the transactions it receives belong to the same year. It just needs to sum all the transactions and emit the sum along with the year from any of the transactions.

Once your programs are ready you will provide the following inputs to Hadoop

1. Name of the Map Program2. Name of the Reduce Program3. Name of the Partition Program. If we skip this then Hadoop uses default

Partition class available to it. We will learn about it later in the chapter.4. Name of the Combiner Program. If we skip this then Hadoop skips the

Combiner step. We will learn about it later in the chapter.5. Jar file(in case your programs are in Java) and the path from where to

pick it from6. Path to your input file with billions of transactions7. Number of reducers you want to run. We will specify 1 reducer for each

year since 1975, so a total of 42 reducers. This will ensure each reducer receives transactions of only 1 year.

Hadoop will take the input file and split it into multiple blocks and store these on multiple nodes on the cluster (as described in the Typical read-write operations in HDFS)

JobTracker will then copy your Jar (which has the Map and Reduce programs) to each of the nodes which has a block of input file (it will get this information from Namenode of HDFS).

Hadoop will then run the following steps to execute your Map and Reduce programs. Please note in the diagram below which phase runs on Map node and which on Reduce node.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Map

Partition

Combine

Sort

Map

Partition

Combine

Map

Partition

Combine

Map

Partition

Combine

Shuffle

Sort

Page 29: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

MapThis phase will run your Map program on each node which has a block of your data file (not on the replicas). The output of this phase will be a file on each node with Year of sale as key and transaction amount as value. The output file on each node may have records for multiple years from 1975 to 2016.

PartitionIn this phase MapReduce will take the output of Map on each node and partition it into as many files as there are Reducers (42 in our case). It will do this by partitioning the output file of each map by key. So the output file of each Map step will be partitioned into 42 files (max) each of which will have transaction of one year on that node. Partitioning the output file of a Map by the key is the default Partition behavior. It can be customized to partition by some other criteria and we will see it in the next chapter. If we do not mention any class of Partition to Hadoop, it will use the default class available to partition the Map output by the key in the Map output.

Output file from Map 1

Output file from Map 2

Output file from Map 3

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Reduce Reduce

(Key1, value1), (Key1, value2), (Key2, value3)

(Key2, value4), (Key3, value5), (Key3, value6)

(Key1, value7), (Key2, value8), (Key2, value9)

Partition

Partition

Partition

(Key1, value1),(Key1,value2)

(Key2,value3)

Output file for Redcuer1

Output file for Redcuer2

(Key2, value4)

(Key3,value5),(Key3,value6),

Output file for Redcuer2

Output file for Redcuer3

(Key1, value7)

(Key2,value8),(Key2,value9),

Output file for Redcuer1

Output file for Redcuer2

Nod

e1

Nod

e2

Nod

e3

Page 30: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Partition will come into action only if the number of Reducers are going to be > 1. If only 1 Reducer is going to be used, there is no need for portioning as all the records from all the Maps need to go to only one reducer.

Partition ensures that records for a specific key go to the same reducer from the all the nodes. Reducer nodes will connect to the nodes on which Maps are running and collect the files generated for only for them (based on name).But it does not ensure all the reducers get equal load. Roughly, it divides the keys equally between reducers. If some key has more records in the Map output than other keys then the reducer that is assigned that key will take more time to complete. There are ways to ensure the load is equally divided between the reducers. We will see how this is done later in this chapter.

The default Partition program does the partitioning by calculating an index for the key of each record to be written to the output

Index = Hash of Key % Number of reducers to run

Hash is nothing but any function that generates a unique value for a given input. So two keys will not generate the same output when run through a hash function. Also a given key will always generate the same output when run through a hash function.

% is nothing but the simple Modulo function from our mathematics. A % B provides the remainder left when A is divided by B.

Different index values are assigned to different reducers. Based on the Index value calculated for a key, all the records with that Index are written to the output file for a reducer which has that index value assigned to it. Different keys may go to a single reducers but a given key will not go to multiple reducers.

We can very well overwrite all this default behavior of the Partition program by extending the default class and customizing the Partitioning method. e.g. in our case we can overwrite the default behavior by partitioning simply by the key(which is the year of transaction) instead of the default behavior of calculating Index etc.

CombinerThis is an optional step. Please note that there are millions of transactions on each Map node. Our Map program does not remove any transaction from further processing. So the output of each map will also have millions of transactions (though each with two fields, year of sale and transaction amount). So there are billions of records spread across the partitioned outputs of Mappers across multiple node. Sending these records to 42 reduce nodes will cause a lot of network IO and slow down the overall processing. This is where a Combiner can help.

Since the Reducer is going to sum all the transactions it receives with an assumption that all the transactions it receives belong to the same year, we can

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 31: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

run the same summation on the each partitioned output of each mapper. So thousands of records in each partitioned output of a mapper will be summed into one record. A Combiner will sum the transactions in each partitioned output of Partition step. It will take all the records in one partition, sum the transaction values and emit Year of sale as key and sum of transaction amount as value. So for each partitioned output (which has thousands of records), the Combiner will generate only one records. This reduces the amount of data that needs to be transmitted over network. If you delve over the behavior of Combiner it is like running reducer on the Map node before transmitting the data to Reduce node.

The above diagram shows how combiner works. It does not show the scale by which it reduces the number of transactions which need to be transmitted to

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

(Key1, value1),(Key1,value2)

(Key2,value3)

Output file for Redcuer1

Output file for Redcuer2, Key2

(Key2, value4)

(Key3,value5),(Key3,value6),

Output file for Redcuer2

Output file for Redcuer3

(Key1, value7)

(Key2,value8),(Key2,value9),

Output file for Redcuer1

Output file for Redcuer2

Nod

e1

Nod

e2

Nod

e3

Combiner

Combiner

Combiner

(Key1, Sum(value1,value2)

(Key2,value3)

(Key3, Sum(value5,value6)

(Key2,value4)

(Key1,value7)

(Key2, Sum(value8,value9)

Combiner

Combiner

Combiner

Combined Output file for Redcuer1

Combined Output file for Redcuer2

Combined Output file for Redcuer2

Combined Output file for Redcuer3

Combined Output file for Redcuer1

Combined Output file for Redcuer2

Page 32: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Reducer nodes. Imagine when you have thousands of transactions for a key in a file and combiner generates only one transaction which summarizes all those transactions and needs to be transmitted to the reducer node then the amount of data to be transmitted reduces significantly.

As I said earlier, Combiner step is optional and we will have to tell Hadoop to run combiner. Whether combiner can be run in your solution or not is very specific to the problem you are trying to solve in the MapReduce. If some processing can be done in the Map output locally to reduce the amount of data before transmitting it to the reducer nodes then you should think about running Combiners. We also need to write a combiner program, add it to the jar which we provide to Hadoop. We also inform Hadoop that a combiner needs to be run. This can be done by providing the combiner class to Hadoop just like how we provide Map and/or Reduce class to Hadoop.

ShuffleMeanwhile, MapReduce would have identified 42 nodes that need to run the Reduce program and assigned a Key (Year of sale) to each of them. The TaskTracker on each of these nodes will keep on scanning the nodes on which Maps are running and as soon as it finds the output file generated for its processing (based on name of the file) it will copy the file to its node. Once a reducer node gets all the files for it processing, MapReduce will go to the next step.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Nod

e1

Nod

e2

Nod

e3

(Key1, Sum(value1,value2)

(Key2,value3)

(Key3, Sum(value5,value6)

(Key2,value4)

(Key1,value7)

Combined Output file for Redcuer1

Combined Output file for Redcuer2

Combined Output file for Redcuer2

Combined Output file for Redcuer3

Combined Output file for Redcuer1

Nod

e4

Nod

e5

(Key1, Sum(value1,value2)

(Key1,value7)

(Key2,value3)

(Key2,value4)

(Key2,value4)

Page 33: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

In the diagram above we assumed that there are only 2 nodes for Reduce phase and MapReduce assigned Key1 to Reduce1 on Node4 and Key2 to Reduce2 on Node5. We could have assumed 3 nodes for Reduce phase as well and assigned one key to each node executing Reduce phase. But keeping only two nodes for Reduce phase and assigning two keys(Key2 and Key3) to the Reducer on Node5 will help you understand the Sort phase better.

SortEach reduce node would have received files from multiple Map nodes. So in this step MapReduce will merge all the files into one and sort by key (Year of transaction in this case) all the input records to a Reducer.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

(Key2, Sum(value8,value9)

Combined Output file for Redcuer2(Key3,

Sum(value5,value6)

Nod

e4

Nod

e5

(Key1, Sum(value1,value2)

(Key1,value7)

(Key2,value3)

(Key2,value4)

(Key2,value4)

(Key3, Sum(value5,value6)

Sort

Sort

(Key1, Sum(value1,value2),

(Key1,value7)

(Key2,value3),

(Key2,value4),

(Key2,value4),

(Key3, Sum(value5,value6)

Page 34: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Please note Sort phase is run by default. Reduce phase must get its data sorted by keys. We can overwrite the default behavior of sorting by key by extending the default class. We can overwrite the default class to sort the input to Reduce by Keys as well values( or a part of the value) if our scenarios expects that.

ReduceReducer will sum all the transactions in a file to generate {Year of Sale, Sum of Transaction} as output.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Nod

e4

Nod

e5

Reduce

Reduce

(Key1, Sum(Sum(value1,valu

e2),value7)

(Key2,Sum (value3,value4,value5)

)

(Key3, Sum (value5,value6)

(Key1, Sum(value1,value2),

(Key1,value7)

(Key2,value3),

(Key2,value4),

(Key2,value5),

(Key3, Sum(value5,value6)

Page 35: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Please note that if you are sending multiple keys to your Reduce phase then your Reduce program should be able to handle that. In the diagram above we have assumed this. But in the example we have been going through in this chapter, we assumed each instance of Reducer running will get only 1 key.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 36: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Chapter 5 – A quick view of the ecosystem around Hadoop

By now we have understood the capabilities of Hadoop quite well. HDFS (Hadoop Distributed File System) offers a distributed storage for huge amount of data using a cluster of commodity hardware. The distribution of the data on the cluster is transparent to the end users or the applications interacting with HDFS. The data is also replicated within the cluster to provide failover in case a machine in the cluster goes kaput.

MapReduce sits on top of HDFS and provides capability to process MapReduce programs on data stored on the HDFS cluster.

Over a period of time a lot open source products have cropped up which either enhance the capability of Hadoop further or overcome the limitations of Hadoop framework.

These new products can be categorized in the following four categories:

1. Ingestion – While we have huge storage available in HDFS, transferring huge amount of data from the sources available with enterprises could be daunting. Products like Sqoop, Flume and Kafka offer capability to move the data from our enterprise sources into HDFS and vice versa. While Sqoop is used for importing the data from SQL data sources within the enterprise, Kafka and Flume are used to import data from Non SQL data sources (log etc.). Kafka and Flume have some finer differences between them and we will see those as we move forward.

2. Processing – While MapReduce offers capability to process data stored on the HDFS cluster, in order to use MapReduce one must know coding. The coding required to develop MapReduce programs is quite complicated. Many times you need your business users to be able to process the data stored on HDFS. Even for technology teams, developing MapReduce programs in Java or any other language could be inefficient. So frameworks or products were required which could ease the task of processing data stored on HDFS. Pig and Hive are products which offer ease to process data stored on HDFS. Hive offers a language HQL, much similar to SQL, using which we can query the data in HDFS. Pig offers, an

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

MapReduce

HDFS

Page 37: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

easy to learn and use language, called Pig Latin using which we can ETL(extract, transform, load) kind of procedural programs can be developed to process the data on HDFS. Both, HQL queries and Pig Latin programs, eventually get converted into MapReduce programs at the back end and get executed. Thus Pig and Hive offer a higher level of abstraction as compared to the Java program that one has to write if we need to develop a MapReduce program.

3. Real-time systems – MapReduce is designed for high throughput processing rather than low-latency processing. It could process huge amount of data but it has some kick start time. It is not designed to process data quickly and turn around. The initial start time needed for Hadoop to identify the nodes for Map and Reduce, transfer the code to these nodes and kick start the processing makes HDFS unsuitable for real-time process where you need the response to your query/program quickly. Hbase offers such capability. Hbase basically uses the distributed storage offered by HDFS to offer key-value datastore services (refer to the Chapter 2 – NoSQL Database to recall what a key-value store is). So it is a key-value type of NoSQL database using HDFS for storing the keys and the values.

4. Coordination – There are two products in this category that are used for designing solutions to big data process using Hadoop. Oozie is a workflow schedule to manage Hadoop jobs. Zookeeper is used for coordination amongst different products in the Hadoop eco system.

So keeping these products in mind the ecosystem developed around Hadoop looks like this

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

MapReduce

HDFS

Hive(SQL like processing capability)

Pig(ETL like procedural capability)

Hbase(key-value store using HDFS)

Sqoop(Ingest data from SQL data sources in the

enterprise)

Flume (Ingest data from non

SQL data sources in the enterprise)

Kafka (Ingest data from non SQL

data sources in the enterprise)

Oozie(Hadoop job scheduling)Zookeeper(coordination amongst

products)

Page 38: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

The subsequent chapters will each pick up one product each from the eco system and explain it in detail. Since we have already understood the MapReduce which is for processing of data, we will take up the processing category (Hive, Pig, Hbase) first. Amongst this category, we will take up Hive first. Understanding Hive is easy as the programing is done using HQL which is very similar to SQL which most of us understand well. Next we will take up Pig which again is easy to understand as the programming language Pig Latin is very easy. Hbase is more difficult to understand as compared to Hive and Pig so we will take it up last in this category.

Next we will take up the Ingestion category of product. We will take up Sqoop first. The reason again being that this product is related to SQL world to which we all can relate to. Next we will move to Flume as it originated before Kafka. Once we understand Flume we can identify limitations and see how Kafka overcomes those.

At last, we will move to Oozie and Zookeeper as understanding other products in detail will help us appreciate these two product better.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 39: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Chapter 6 – HiveWhy Hive?If we look back the example of transaction processing we took in chapter four, we are essentially doing the following to the transactions,

1. Select certain fields from each transaction.2. Group the transactions by year(by sending them to different Reducers)3. Sum the transaction amount for each group

If you are even remotely aware of SQL world the equivalent in the SQL is something like

Select Transaction.Year, SUM (Transaction.Amount)

From Transaction

Group By Transaction.Year

In case we wanted to filter out some transactions from processing we could have added the filter in the Map (just an If condition). Let’s assume we want only those transactions to be processed which have ’Purchase’ in a field named ‘type’. In the Map program that you develop for the processing you would add an IF condition to process only those transactions which have the value ‘Purchase’ in the field named ‘type.’ The SQL equivalent SQL would be

Select Transaction.Year, SUM (Transaction.Amount)

From Transaction

Where Transaction.type=’Purchase’

Group By Transaction.Year

Let’s also consider a scenario where the transaction has another field named “ProductCode” which has a numeric code for the financial product on which transaction was done. We also have a file which has a mapping between the “ProductCode” and “ProductName”. If we need the field “ProductName” in the final output from the Reducer and also want to sum the transactions on Year and ProductName instead of only Year of transaction, the Map Reduce processing would be modified as below

Map:

1. Select transaction with ‘Purchase’ code in the ‘type’ field of transaction for further processing in Map

2. Output year, product code and amount for each transaction with ‘Purchase’ in the transaction type field.

Partition:

1. Partition transactions by year so that transactions for each year go to a different Reducer.

Combiner:

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 40: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

1. Sum the transactions on each Partition by Year and ProductCode.

Shuffle:

Each Reducer picks its files from the Map nodes.

Sort:

1. Sort the transactions by Year and Product Code

Reducer:

1. Load the file which has ProductCode-ProductName mapping into the memory.

2. Sum the input transactions by Year and Product Code. This time this step will sum the transactions coming from different Map nodes( in the Combiner the same processing did the sum only for the transactions on each node).

3. Just before writing a Sum read the ProductCode-ProductName mapping from memory (loaded earlier in the Reducer) to resolve ProductCode in the output record to ProductName.

4. Write the Sum of transactions by year and product name to the output.

The SQL equivalent of the above processing would be

Select Transaction.Year, Product.ProductName, SUM (Transaction.Amount)

From Transaction,Product

Where Transaction.type=’Purchase

Transaction.ProductCode=Product.ProductCode

Group By Transaction.Year,Product.Name

By now you would have noticed that it takes only a few lines of SQL code to do the processing that we are trying to in MapReduce. When it comes to writing Java programs for MapReduce

1. The number of lines of codes is large2. There are many libraries that need to be imported.3. You need to be aware of which out of the box class file to extend for our

specific requirement.4. There are variables to be defined, set and reset. And all the other

complications involved in any programing.5. There are steps for building the jar.

When you have so much of raw of data residing on the HDFS is there no easier way to process the data? Is there no way a business person or a person with

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 41: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

limited technology skill set can process and analyze the data? Is there a tool/framework which can

1. take the queries in the form similar to SQL written above,2. do the laborious work of developing Map, Reduce, Partition and combiner

classes,3. schedule as many Maps and Reducers as needed and 4. produce the end result for the user.

That is what Hive does. It does all the 4 points written above and much more. Welcome to the world of Hive! Hive is a tool operating at a higher level than Hadoop. It takes away the difficult task of writing MapReduce programs. It develops those programs based on the instructions given to it in the form of HQL (Hive Query Language) which is similar to SQL. Thus it brings the power of Hadoop within the reach of people who are not programmer but know what logic needs to be applied to the data to analyze and process it. HQL, like SQL, is a much easier to pick up than Java or any other programming language. If one already know SQL then the learning curve is much steeper.

Please note since Hive is only a layer above Hadoop it inherits the limitations of Hadoop

1. Hive does not support row level updates, inserts and deletes

Hive architectureThe following diagram shows the architecture of Hive

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

MapReduce HDFS

Hadoop

Driver Metastore

Command line Interface Web Interface Thriftserver

JDBC ODBC

Hive

Page 42: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Hive sits on top of Hadoop, thus taking away all the complications of writing Map and Reduce programs to process data. There are three ways to access Hive:

CLI: This is a typical command line interface where a user can write a few queries to load, read and process data.

HWI: Hive Web Interface is an interface on the web serving the same objective as CLI

Thriftserver: It exposes Hive functionality to other applications that access Hive via JDBC or ODBC drivers.

Metastore: While all the data accessed by Hive is saved on HDFS, the data about databases and tables is stored in Metastore. We will learn about what kind of data is stored in the Metastore in the subsequent sections.

Creating tables and types of tablesHive is a layer sitting above Hadoop. It only extends the functionality of Hadoop by letting user provide inputs in the form of Hive Query language (HQL) rather than low level programs. In this and the subsequent sections we will take a few example of HQL to understand what it does for the user and what it does in Hadoop at the back-end. While we understand this, we will avoid trying to understand each and every option or variation possible with an HQL command. The essence of this is to explain the core functionality of a product without really getting into a lot of code.

Our logical flow would be to first understand the HQLs for defining databases and table. Then we will move on to understanding HQLs for loading data into these tables. Finally, we will understand the HQLs for processing the data. All along, we will also understand what these HQLs eventually do on Hadoop.

Let’s assume that in addition to the transaction file (with all the details of mutual fund transactions) we also have another file which provides the mapping between the mutual fund product ID ( the financial product on which the transaction was executed) and mutual fund name(the name of the product).

In order to load and process the data available in these two files we will firstly create the database and tables to store the data in these files.

Create database if not exists transproc

The above command will create a directory in the HDFS with name transproc. A database is just an umbrella folder to contain and organize all the tables. A registry will also be made in the Metastore table about this new database.

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Hive will not create the database if it already exisits

Name of the database

Page 43: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Once your database is created you can very well create tables within the database with command very similar to the one we used for creating the database

Create table if not exists transproc.transactions(transid STRING, ,transamount FLOAT,…..)

Create table if not exists transproc.prodinfo (prodid string, prodname string)

The above command would create two subdirectories within the transproc directory and also make a registry with the Metastore for the two new tables created.

Internal and External tablesThere are two types of tables in Hive.

If Hive keeps the ownership of data stored in a table then the table is called internal or managed table. In case of an internal/managed table, when a table is dropped the data from HDFS as well as the reference to the table in the Metastore is deleted.

If Hive does not keep the ownership of the data stored in a table then the table is called an external table. In case of an external table, when a table is dropped only the reference to the table from the Metastore is deleted but the data from the HDFS is not deleted. So the table stops existing for Hive but the data is still retained in Hadoop.

External tables are used to exchange data between multiple applications. E.g. in our case of mutual fund transaction processing it may be the case that the product data (product id to product name mapping) is not owned by the department which has the responsibility of processing the transactions (typical scenario). In such a case, the product department would make the product information available in a flat file in some HDFS location. The transaction processing application would define an external table on top of this data. When the transactions processing is done it could delete the external table. But that would not delete the product data in the flat file. That data might be referenced by other applications as well.

If we do not mention in our HQL command if the table is internal or external, Hive would assume it to be internal.

The command to create an external table is

Create external table if not exists transproc.prodinfo (prodid string, prodname string) row format delimited fields terminated by ‘,’ location ‘location of the external file’

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Provides the location of the external file Hive

Informs hive to expect the fields in the external file separated by ‘,’

Name of the database within which table needs to be created Name of the table

Layout of the table

Page 44: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Internal partitioned tablesLet’s look back at the query that creates the transaction table,

Create table if not exists transproc.transactions(transid STRING, ,transamount FLOAT,ProductID STRING, SubproductID STRING,… )

Assume that SubproductID indicates a variety of a Product. So a Product can have different varieties and each can be indicated by the sub product ID.

Now let’s assume that we know the access pattern for this table. By access pattern I mean we know that when the data is accessed it will be mostly accessed for a specific Product ID and/or Sub Product ID. Let’s say we also know that the data would generally not be accessed for many or all the products ID at the same time.

Now the above HQL command for creating a table would create one single directory for the table. All the data for the table would be in one directory. Every time the table is accessed Hive (and HDFS in the back-end) would have to find the data for that particular product and/or subproduct id to fetch it. The directory structure created in HDFS by the above command would be

../transproc/transactions

Instead, if we know that typically data would be accessed using product ID and/or subproduct ID we can segregate the data within the directory into separate subdirectories for product ID and subproduct ID. This is called partitioning.

The command to partition the table is:

Create table if not exists transproc.transactions(transid STRING, ,transamount FLOAT,…) Partitioned by (ProductID STRING, SubproductID STRING)

The above command will create a subdirectory like

../transproc/transactions

As and when data is added to this table separate sub-directories will be created within the transaction directory in HDFS for each ProductID and SuproductID combination.

Load data local inpath ‘path from where data needs to be picked up’

Into table transactions

Partition (ProductID=’A’, SubproductID=’1’)

The above command will create a subdirectory like

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 45: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

../transproc/transactions/ProductID=’A’/SubproductID=’1’

Anytime data is loaded into this table the command to load the data would have to specify partition information and the data will be loaded into the directory structure for that partition.

Please also note that the table schema does not have the columns which are a part of partition now. There is no need save ProductID and SubproductID in the table itself as this information can be derived from the path of the partition.

If data has to be read for a specific Product ID and Subproduct ID combination the HQL command would be

Select transproc.transactions where ProductID=’A’ and SubproductID=’1’

This command will make Hive read only the specific subdirectory we created earlier.

Partitioning improves the performance of Hive as it has to read a specific subdirectory to fetch the data

If the command above is modified like the one given below Hive will read all the subdirectories with the subdirectory ../transproc/transactions/ProductID=’A’

Select transproc.transactions where ProductID=’A’

If the typical access pattern is not to access the data for specific Product ID and Subproduct ID combination then it is not a good idea to create partitions. If you create partitions by Product ID and Subproduct ID but end up writing queries that read data across multiple Product ID and Subproduct ID Hive will have to scan multiple subdirectories and it will impact the performance of Hive.

External partitioned tablesJust like the internal tables the external tables can be partitioned. Since the data is not managed by Hive, it assumes that the data at the external location is segregated as per the partition keys

Create external table if not exists transproc.prodinfo (subprodid string, subprodname string) partitioned by (prodid string) row format delimited fields terminated by ‘,’

Please note we do not declare the location of the data for a partitioned external table as we would in case of non-partitioned external table. That needs to be done separately using alter command for each partition separately.

Loading Data

You can load data into tables from a file. If we need to load data into our transaction table the HQL command would be

Load data local inpath “path of the file here”

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 46: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Overwrite into table transproc.transactions

Partition (ProductID=’A’ and SubproductID=’1’)

Please note the table should have been defined with partitions on Product ID and Subproduct ID. Overwrite clause will overwrite (as is obvious from the name) the existing data present in the table. Hive will create a directory in HDFS for this ProductID and SubproductID combination if it is not already existing. If the table is not partitioned you can skip the partition clause.

You can even read data from one table and insert it into another table. E.g. if we assume the transaction records were present in another table where they were loaded for initial clean up by business, we can write query like the one below to load the data into our transaction table

From PreProdTransactions

Insert Overwrite table transactions

Partition (ProductID=’A’ and SubproductID=’1’)

Select * from PreProdTransactions. ProductID=’A’ and PreProdTransactions .SubproductID=’1’

Insert Overwrite table transactions

Partition (ProductID=’A’ and SubproductID=’1’)

Select * from PreProdTransactions. ProductID=’A’ and PreProdTransactions .SubproductID=’2’

Insert Overwrite table transactions

Partition (ProductID=’B’ and SubproductID=’1’)

Select * from PreProdTransactions. ProductID=’B’ and PreProdTransactions .SubproductID=’1’

Insert Overwrite table transactions

Partition (ProductID=’B’ and SubproductID=’2’)

Select * from PreProdTransactions. ProductID=’B’ and PreProdTransactions .SubproductID=’2’

The above query will scan the PreProdTransactions table once and then create the Partitions for the Transactions table based on the Partitions clause.

A more concise way of writing the above query is

From PreProdTransactions

Insert Overwrite table transactions

Partition (ProductID, SubproductID)

Select …, PreProdTransactions. ProductID, PreProdTransactions. SubproductID

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

Page 47: Big data and hadoop ecosystem essentials for managers

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)

from PreProdTransactions

In this case Hive itself will analyze the data present in the PreProdTransactions table and create as many partitions in the transactions table as many unique combinations of ProductID and SubproductID it finds in the PreProdTransactions table.

Reading Data from Hive

Big Data & Hadoop ecosystem essentials for managers(Manjeet Singh Nagi - https://in.linkedin.com/in/manjeetnagi)