apache hadoop in theory and practice

96
Adam Kawa Data Engineer @ Spotify Apache Hadoop In Theory And Practice

Upload: adam-kawa

Post on 08-Sep-2014

10.042 views

Category:

Technology


0 download

DESCRIPTION

A presentation about Hadoop internals, and a couple of issues that we have seen in the practice.

TRANSCRIPT

Page 1: Apache Hadoop In Theory And Practice

Adam KawaData Engineer @ Spotify

Apache HadoopIn Theory And Practice

Page 2: Apache Hadoop In Theory And Practice

Get insights to offer a better product“More data usually beats better algorithms”

Get of insights to make better decisionsAvoid “guesstimates”

Take a competitive advantage

Why Data?

Page 3: Apache Hadoop In Theory And Practice

Store data reliablyAnalyze data quicklyCost-effective wayUse expressible and high-level language

What Is Challenging?

Page 4: Apache Hadoop In Theory And Practice

A big system of machines, not a big machineFailures will happenMove computation to data, not data to computationWrite complex code only once, but right

Fundamental Ideas

A system of multiple animals

Page 5: Apache Hadoop In Theory And Practice

An An open-sourceopen-source Java software Java softwareStoringStoring and and processingprocessing of very of very large data setslarge data setsAA clusters of clusters of commodity machinescommodity machinesA A simple programming modelsimple programming model

Apache Hadoop

Page 6: Apache Hadoop In Theory And Practice

Two main components: HDFS - a distributed file system MapReduce – a distributed processing layer

Apache Hadoop

Many other tools belong to “Apache Hadoop Ecosystem”

Page 7: Apache Hadoop In Theory And Practice

ComponentHDFS

Page 8: Apache Hadoop In Theory And Practice

Store large datasets in a distributed, scalable and fault-tolerant wayHigh throughputVery large filesStreaming reads and writes (no edits)

Write once, read many times

The Purpose Of HDFS

It is like a big truckto move heavy stuff

(not Ferrari)

Page 9: Apache Hadoop In Theory And Practice

Do NOT use, if you haveLow-latency requestsRandom reads and writesLots of small files

Then better to consider RDBMs,File servers, Hbase or Cassandra...

HDFS Mis-Usage

Page 10: Apache Hadoop In Theory And Practice

Split a very large file into smaller (but still large) blocksStore them redundantly on a set of machines

Splitting Files And Replicating Blocks

Page 11: Apache Hadoop In Theory And Practice

The default block size is 64MB Minimize the overhead of a disk seek operation (less than 1%) A file is just “sliced” into chunks after each 64MB (or so) It does NOT matter whether it is text/binary, compressed or not It does matter later (when reading the data)

Spiting Files Into BlocksToday, 128MB or 256MB is recommended

Page 12: Apache Hadoop In Theory And Practice

The default replication factor is 3 It can be changed per a file or a directory It can be increased for “hot” datasets (temporarily or permanently)

Trade-offTrade-off between Reliability, availability, performance Disk space

Replicating Blocks

Page 13: Apache Hadoop In Theory And Practice

The Master node keeps and manages all metadata informationThe Slave nodes store blocks of data and serve them to the client

Master And Slaves

Master node (called NameNode)

Slave nodes (called DataNodes)

Page 14: Apache Hadoop In Theory And Practice

*no NameNode HA, no HDFS Replication

Classical* HDFS Cluster

Manages metadata Does some “house-keeping”operations forNameNode

Stores and retrievesblocks of data

Page 15: Apache Hadoop In Theory And Practice

Performs all the metadata-related operations Keeps information in RAM (for fast lookup) The filesystem tree Metadata for all files/directories (e.g. ownership, permissions) Names and locations of blocks

Metadata (not all) is additionally stored on disk(s) (for reliability)The filesystem snapshot (fsimage) + editlog (edits) files

HDFS NameNode

Page 16: Apache Hadoop In Theory And Practice

Stores and retrieves blocks of data Data is stored as regular files on a local filesystem (e.g. ext4) e.g. blk_-992391354910561645 (+ checksums in a separate file) A block itself does not know which file it belongs to!

Sends a heartbeat message to the NN to say that it is still aliveSends a block report to the NN periodically

HDFS DataNode

Page 17: Apache Hadoop In Theory And Practice

NOT a failover NameNodePeriodically merges a prior snapshot (fsimage) and editlog(s) (edits) Fetches current fsimage and edits files from the NameNode Applies edits to fsimage to create the up-to-date fsimage Then sends the up-to-date fsimage back to the NameNode

We can configure frequency of this operation Reduces the NameNode startup time Prevents edits to become too large

HDFS Secondary NameNode

Page 18: Apache Hadoop In Theory And Practice

hadoop fs -ls -R /user/kawaahadoop fs -cat /toplist/2013-05-15/poland.txt | lesshadoop fs -put logs.txt /incoming/logs/user

hadoop fs -count /toplisthadoop fs -chown kawaa:kawaa /toplist/2013-05-15/poland.avro

Exemplary HDFS Commands

It is distributed, but it gives you a beautiful abstraction!

Page 19: Apache Hadoop In Theory And Practice

Block data is never sent through the NameNodeThe NameNode redirects a client to an appropriate DataNodeThe NameNode chooses a DataNode that is as “close” as possible

Reading A File From HDFS

$ hadoop fs -cat /toplist/2013-05-15/poland.txt

Blocks locations

Lots of data comesfrom DataNodesto a client

Page 20: Apache Hadoop In Theory And Practice

Network topology defined by an administrator in a supplied scriptConvert IP address into a path to a rack (e.g /dc1/rack1)

A path is used to calculate distance between nodes

HDFS Network Topology

Image source: “Hadoop: The Definitive Guide” by Tom White

Page 21: Apache Hadoop In Theory And Practice

Pluggable (default in BlockPlacementPolicyDefault.java) 1st replica on the same node where a writer is located

Otherwise “random” (but not too “busy” or almost full) node is used 2nd and the 3rd replicas on two different nodes in a different rack The rest are placed on random nodes No DataNode with more than one replica of any block No rack with more than two replicas of the same block (if possible)

HDFS Block Placement

Page 22: Apache Hadoop In Theory And Practice

Moves block from over-utilized DNs to under-utilized DNsStops when HDFS is balancedMaintains the block placement policy

HDFS Balancer

the utilization of every DN differs from the utilization of the cluster by no more than a given threshold

Page 23: Apache Hadoop In Theory And Practice

QuestionsHDFS

Page 24: Apache Hadoop In Theory And Practice

Question Why a block itself does NOT knowwhich file it belongs to?

HDFS Block

Page 25: Apache Hadoop In Theory And Practice

Question Why a block itself does NOT knowwhich file it belongs to?

AnswerDesign decision → simplicity, performanceFilename, permissions, ownership etc might change It would require updating all block replicas that belongs to a file

HDFS Block

Page 26: Apache Hadoop In Theory And Practice

Question Why NN does NOT store information about block locations on disks?

HDFS Metadata

Page 27: Apache Hadoop In Theory And Practice

Question Why NN does NOT store information about block locations on disks?

AnswerDesign decision → simplicityThey are sent by DataNodes as block reports periodicallyLocations of block replicas may change over time

A change in IP address or hostname of DataNode Balancing the cluster (moving blocks between nodes) Moving disks between servers (e.g. failure of a motherboard)

HDFS Metadata

Page 28: Apache Hadoop In Theory And Practice

QuestionQuestion How many files represent a block replica in HDFS?

HDFS Replication

Page 29: Apache Hadoop In Theory And Practice

QuestionQuestion How many files represent a block replica in HDFS?

AnswerAnswer Actually, two files: The first file for data itself The second file for block’s metadata Checksums for the block data The block’s generation stamp

HDFS Replication

by default less than 1% of the actual data

Page 30: Apache Hadoop In Theory And Practice

QuestionWhy does NOT the default block placement strategy take the disk space utilization (%) into account?

HDFS Block Placement

It only checks, if a node a) has enough disk space to write a block, and b) does not serve too many clients ...

Page 31: Apache Hadoop In Theory And Practice

QuestionWhy does NOT the default block placement strategy take the disk space utilization (%) into account?

AnswerSome DataNodes might become overloaded by incoming data e.g. a newly added node to the cluster

HDFS Block Placement

Page 32: Apache Hadoop In Theory And Practice

FactsHDFS

Page 33: Apache Hadoop In Theory And Practice

Runs on the top of a native file system (e.g. ext3, ext4, xfs) HDFS is simply a Java application that uses a native file system

HDFS And Local File System

Page 34: Apache Hadoop In Theory And Practice

HDFS detects corrupted blocks When writing Client computes the checksums for each block

Client sends checksums to a DN together with data When reading Client verifies the checksums when reading a block If verification fails, NN is notified about the corrupt replica Then a DN fetches a different replica from another DN

HDFS Data Integrity

Page 35: Apache Hadoop In Theory And Practice

Stats based on Yahoo! Clusters An average file ≈ 1.5 blocks (block size = 128 MB) An average file ≈ 600 bytes in RAM (1 file and 2 blocks objects) 100M files ≈ 60 GB of metadata 1 GB of metadata ≈ 1 PB physical storage (but usually less*)

*Sadly, based on practical observations, the block to file ratio tends to decrease during the lifetime of the clusterDekel Tankel, Yahoo!

HDFS NameNode Scalability

Page 36: Apache Hadoop In Theory And Practice

Read/write operations throughput limited by one machine ~120K read ops/sec ~6K write ops/sec

MapReduce tasks are also HDFS clients

Internal load increases as the cluster growsMore block reports and heartbeats from DataNodes

More MapReduce tasks sending requestsBigger snapshots transferred from/to Secondary NameNode

HDFS NameNode Performance

Page 37: Apache Hadoop In Theory And Practice

Single NameNode Keeps all metadata in RAM Performs all metadata operations Becomes a single point of failure (SPOF)

HDFS Main Limitations

Page 38: Apache Hadoop In Theory And Practice

Introduce multiple NameNodes in form of: HDFS Federation HDFS High Availability (HA)

HDFS Main Improvements

Find More:http://slidesha.re/15zZlet

Page 39: Apache Hadoop In Theory And Practice

In practiceHDFS

Page 40: Apache Hadoop In Theory And Practice

ProblemDataNode can not start on a server for some reason

Page 41: Apache Hadoop In Theory And Practice

Usually it means some kind of disk failure

$ ls /disk/hd12/ls: reading directory /disk/hd12/: Input/output error

org.apache.hadoop.util.DiskChecker$DiskErrorException: Too many failed volumes - current valid volumes: 11, volumes configured: 12, volumes failed: 1, Volume failures tolerated: 0org.apache.hadoop.hdfs.server.datanode.DataNode: SHUTDOWN_MSG:

Increase dfs.datanode.failed.volumes.tolerated to avoid expensive block replication when a disk fails (and just monitor failed disks)

Page 42: Apache Hadoop In Theory And Practice

It was exciting this see stuff breaking!

Page 43: Apache Hadoop In Theory And Practice

In practiceHDFS

Page 44: Apache Hadoop In Theory And Practice

ProblemA user user can not run resource-intensive Hive queriescan not run resource-intensive Hive queriesIt happened It happened immediately after expanding the clusterimmediately after expanding the cluster

Page 45: Apache Hadoop In Theory And Practice

DescriptionThe The queries are validqueries are validThe queries areThe queries are resource-intensive resource-intensive The queries run successfully on a small datasetsuccessfully on a small dataset But they fail on a large datasetfail on a large dataset Surprisingly they run successfully through other user accountssuccessfully through other user accountsThe user has right permissions to HDFS directories and Hive tablesright permissions to HDFS directories and Hive tables

Page 46: Apache Hadoop In Theory And Practice

The NameNode is throwing thousands of warnings and exceptions 14592 times only during 8 min (4768/min in a peak)

Page 47: Apache Hadoop In Theory And Practice

NormallyHadoop is a very trusty elephantHadoop is a very trusty elephantThe The username username comescomes from the client machine from the client machine (and is not verified) (and is not verified)The The groupname is resolved on the NameNode servergroupname is resolved on the NameNode server

Using the shell command ''Using the shell command ''id -Gn <username>''''If a user does not have an account on the NameNode serverIf a user does not have an account on the NameNode server The The ExitCodeExceptionExitCodeException exception is thrownexception is thrown

Page 48: Apache Hadoop In Theory And Practice

Possible FixesPossible Fixes Create an user account on the NameNode server (dirty, insecure) Use AD/LDAP for a user-group resolutionAD/LDAP for a user-group resolution hadoop.security.group.mapping.ldap.* settings

If you also need the full-authentication, deploy Kerberos

Page 49: Apache Hadoop In Theory And Practice

Our FixWe decided to useWe decided to use LDAP for a user-group resolution LDAP for a user-group resolution

However, However, LDAP settings in Hadoop did not work for usLDAP settings in Hadoop did not work for us Because Because posixGroupposixGroup is not a supported filter group class in is not a supported filter group class in hadoop.security.group.mapping.ldap.search.filter.grouphadoop.security.group.mapping.ldap.search.filter.group

We found We found a workaround using a workaround using nsswitch.confnsswitch.conf

Page 50: Apache Hadoop In Theory And Practice

Lesson LearnedKnow who is going to use your clusterKnow who is going to use your clusterKnow Know who is abusing the clusterwho is abusing the cluster (HDFS access and MR jobs) (HDFS access and MR jobs)Parse the NameNode logsParse the NameNode logs regularly regularly

Look forLook for FATALFATAL, , ERRORERROR, , ExceptionException messages messagesEspecially before and after expanding the clusterEspecially before and after expanding the cluster

Page 51: Apache Hadoop In Theory And Practice

ComponentMapReduce

Page 52: Apache Hadoop In Theory And Practice

Programming model inspired by functional programming map() and reduce() functions processing <key, value> pairsUseful for processing very large datasets in a distributed waySimple, but very expressible

MapReduce Model

Page 53: Apache Hadoop In Theory And Practice

Map And Reduce Functions

Page 54: Apache Hadoop In Theory And Practice

Map And Reduce Functions - Counting Word

Page 55: Apache Hadoop In Theory And Practice

MapReduce JobInput data is divided intosplits and converted into<key, value> pairs Invokes map() function

multiple times Keys are sorted,values not (butcould be)

Invokes reduce() Function multiple times

Page 56: Apache Hadoop In Theory And Practice

MapReduce Example: ArtistCountArtist, Song, Timestamp, User

Key is the offset of the line from the beginning of the line

We could specify which artist goes to which reducer(HashParitioner is default one)

Page 57: Apache Hadoop In Theory And Practice

map(Integer key, EndSong value, Context context): context.write(value.artist, 1)

reduce(String key, Iterator<Integer> values, Context context):int count = 0for each v in values:count += v

context.write(key, count)

MapReduce Example: ArtistCount

Pseudo-code in non-existing language ;)

Page 58: Apache Hadoop In Theory And Practice

MapReduce CombinerMake sure that the Combinercombines fast and enough (otherwise it adds overhead only)

Page 59: Apache Hadoop In Theory And Practice

Data Locality in HDFS and MapReduce

Ideally, Mapper code is sent to a node that has the replica of this block

By default, three replicas should be available somewhereon the cluster

Page 60: Apache Hadoop In Theory And Practice

Batch processing systemAutomatic parallelization and distribution of computationFault-toleranceDeals with all messy details related to distributed processingRelatively easy to use for programmers Java API, Streaming (Python, Perl, Ruby …) Apache Pig, Apache Hive, (S)CrunchStatus and monitoring tools

MapReduce Implementation

Page 61: Apache Hadoop In Theory And Practice

“Classical” MapReduce Daemons

Keeps track of TTs, schedules jobs and tasks executions

Runs map and reduce tasks,Reports to JobTracker

Page 62: Apache Hadoop In Theory And Practice

Manages the computational resourcesAvailable TaskTrackers, map and reduce slots

Schedules all user jobs Schedules all tasks that belongs to a job Monitors tasks executions Restarts failed and speculatively runs slow tasks Calculates job counters totals

JobTracker Reponsibilities

Page 63: Apache Hadoop In Theory And Practice

Runs map and reduce tasksReports to JobTracker Heartbeats saying that it is still alive Number of free map and reduce slots Task progress, status, counters etc

TaskTracker Reponsibilities

Page 64: Apache Hadoop In Theory And Practice

It can consists of 1, 5, 100 and 4000 nodes

Apache Hadoop Cluster

Page 65: Apache Hadoop In Theory And Practice

MapReduce Job Submission

Image source: “Hadoop: The Definitive Guide” by Tom White

Tasks are started in a separate JVMto isolate a user codeform Hadoop code

They are copied witha higher replication factor(by default, 10)

Page 66: Apache Hadoop In Theory And Practice

MapReduce: Sort And Shuffle Phase

Other maps tasksOther reduce tasks

Reduce phaseMap phase

Image source: “Hadoop: The Definitive Guide” by Tom White

Page 67: Apache Hadoop In Theory And Practice

Specifies which Reducer should get a given <key, value> pairAim for an even distribution of the intermediate dataSkewed data may overload a single reducer And make a job running much longer

MapReduce: Partitioner

Page 68: Apache Hadoop In Theory And Practice

Scheduling a redundant copy of the remaining, long-running taskThe output from the one that finishes first is used The other one is killed, since it is no longer neededAn optimization, not a feature to make jobs run more reliably

Speculative Execution

Page 69: Apache Hadoop In Theory And Practice

Enable, if tasks often experience “external” problems e.g. hardware degradation (disk, network card), system problems, memory unavailability..

OtherwiseSpeculative execution can reduce overall throughput Redundant tasks run with similar speed as non-redundant ones Might help one job, all the others have to wait longer for slots Redundantly running reduce tasks will transfer over the network all intermediate data write their output redundantly (for a moment) to directory in HDFS

Speculative Execution

Page 70: Apache Hadoop In Theory And Practice

FactsMapReduce

Page 71: Apache Hadoop In Theory And Practice

Very customizableInput/Output Format, Record Reader/Writer, Partitioner, Writable, Comparator …

Unit testing with MRUnitHPROF profiler can give a lot of insights Reuse objects (especially keys and values) when possible Split String efficiently e.g. StringUtils instead of String.split

More about Hadoop Java APIhttp://slidesha.re/1c50IPk

Java API

Page 72: Apache Hadoop In Theory And Practice

Tons of configuration parameters Input split size (~implies the number of map tasks) Number of reduce tasks

Available memory for tasks Compression settings Combiner Partitioner

and more...

MapReduce Job Configuration

Page 73: Apache Hadoop In Theory And Practice

QuestionsMapReduce

Page 74: Apache Hadoop In Theory And Practice

Question Why each line in text file is, by default, converted to <offset, line> instead of <line_number, line>?

MapReduce Input <Key, Value> Pairs

Page 75: Apache Hadoop In Theory And Practice

Question Why each line in text file is, by default, converted to<offset, line> instead of <line_number, line>?

AnswerIf your lines are not fixed-width, you need to read file from the beginning to the end to find line_number of each line (thus it is not parallelized).

MapReduce Input <Key, Value> Pairs

Page 76: Apache Hadoop In Theory And Practice

In practiceMapReduce

Page 77: Apache Hadoop In Theory And Practice

“I noticed that a bottleneck seems to be coming from the map tasks. Is there any reason that we can't open any of the allocated reduce slots to map tasks?”

Regards,Chris

How to hard-code the number of map and reduce slots efficiently?

Page 78: Apache Hadoop In Theory And Practice

We initially started with 60/40But today we are closer to something like 70/30

Occupied Map And Reduce SlotsTime Spend In Occupied Slots

This may change again soon ...

Page 79: Apache Hadoop In Theory And Practice

We are currently introducing a new feature to LuigiAutomatic settings of Maximum input split size (~implies the number of map tasks) Number of reduce task More settings soon (e.g. size of map output buffer)The goal is each task running 5-15 minutes on average

Because even perfect manual setting may become outdated

because input size grows

Page 80: Apache Hadoop In Theory And Practice

The current PoC ;)

It should help in extreme cases short-living maps short-living and long-living reduces

type # map # reduce avg map time avg reduce time job execution time

old_1 4826 25 46sec 1hrs, 52mins, 14sec 2hrs, 52mins, 16sec

new_1 391 294 4mins, 46sec 8mins, 24sec 23mins, 12sec

type # map # reduce avg map time avg reduce time job execution time

old_2 4936 800 7mins, 30sec 22mins, 18sec 5hrs, 20mins, 1sec

new_2 4936 1893 8mins, 52sec 7mins, 35sec 1hrs, 18mins, 29sec

Page 81: Apache Hadoop In Theory And Practice

In practiceMapReduce

Page 82: Apache Hadoop In Theory And Practice

ProblemSurprisingly, Hive queries are running extremely longThousands task are constantly being killed

Page 83: Apache Hadoop In Theory And Practice

Only 1 task failed,2x more task were killed than were completed

Page 84: Apache Hadoop In Theory And Practice
Page 85: Apache Hadoop In Theory And Practice

Logs show that the JobTracker gets a request to kill the tasksWho actually can send a kill request? User (using e.g. mapred job -kill-task)

JobTracker (a speculative duplicate, or when a whole job fails)Fair Scheduler

Diplomatically, it's called “preemption”

Page 86: Apache Hadoop In Theory And Practice

Key ObservationsKilled tasks came from ad-hoc and resource-intensive Hive queriesTasks are usually killed quickly after they start Surviving tasks are running fine for long timeHive queries are running in their own Fair Scheduler's pool

Page 87: Apache Hadoop In Theory And Practice

Eureka!FairScheduler has a license to kill!

Preempt the newest tasks in an over-share pool to forcibly make some room for starving pools

Page 88: Apache Hadoop In Theory And Practice

Hive pool was running over its minimum and fair sharesOther pools were running under their minimum and fair shares

So thatFair Scheduler was (legally) killing Hive tasks from time to time

Fair Scheduler can kill to be KIND...

Page 89: Apache Hadoop In Theory And Practice

Possible FixesDisable the preemptionTune minimum shares based on your workloadTune preemption timeouts based on your workloadLimit the number of map/reduce tasks in a poolLimit the number of jobs in a pool

Switch to Capacity Scheduler

Page 90: Apache Hadoop In Theory And Practice

Lessons Learned A scheduler should NOT be considered as the ''black-box'' It is so easy to implement a long-running Hive query

Page 91: Apache Hadoop In Theory And Practice

More AboutHadoop Adventureshttp://slidesha.re/1ctbTHT

Page 92: Apache Hadoop In Theory And Practice

In the realityHadoop is fun!

Page 93: Apache Hadoop In Theory And Practice

Questions?

Page 94: Apache Hadoop In Theory And Practice

Stockholm and Sweden

Page 95: Apache Hadoop In Theory And Practice

Check out spotify.com/jobs or @Spotifyjobs for more information

[email protected]

Want to join the band?

Page 96: Apache Hadoop In Theory And Practice

Thank you!