big!data!bootcamp!! 2016! - s3.amazonaws.com › maylong › chicago+bootcamp › chicago… ·...

170
Big Data Bootcamp 2016 Day 1 1 Hadoop

Upload: others

Post on 29-May-2020

3 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

!

Big!Data!Bootcamp!!

2016!

Day!1!1!Hadoop!

Page 2: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

2!

Introduc8on!

 Name!

 Role!

 Expecta8on!

Page 3: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Agenda!–!Day!1!/!Day!2!

3!

1"

•  Hadoop""Architecture"

•  HDFS""•  Demo"

•  Lab"•  YARN"•  MapReduce"

•  Lab"•  Job"Scheduler"•  Preemp=on"

•  Capacity"Scheduler"Lab"

•  SQL"DataFrames"

•  Catalyst"Op=mizer"

•  Spark"Streaming"

•  Lab"

2" 3"

•  Hive"•  Demo"

4"•  Security"

•  Lab"•  Lucene"/"Solr"Elas=cSearch"

Page 4: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

!!!!!!!!!!!Agenda!Day!2!and!Day!3:!!

•  Field"Engagement"

•  Posi=oning"

•  Architecture"

•  Setup"

•  Troubleshoo=ng"

4!

•  Administra=on"

•  Best"Prac=ces"

•  Archiving"

•  ERPs"

•  MongoDB"

•  Assessment"

Page 5: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Hadoop!Architecture!

Page 6: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Hadoop!2.x!

HADOOP"1.x!

HDFS"(redundant,!reliable!storage)!

MapReduce"(cluster!resource!management!

!&!data!processing)!

HDFS2"(redundant,!reliable!storage)!

YARN"(cluster!resource!management)!

HADOOP"2.x!

Single"Use"System"

Batch&Apps&Mul="Purpose"PlaRorm"

Batch,&Interac/ve,&Online,&Streaming,&…&

MapReduce"(data!processing)!

Others"(data!processing)!

Page 7: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

7!

  A!Hadoop!cluster!is!made!up!of!master!and!slave!(worker)!nodes!

–  Master!nodes!manage!the!infrastructure!

–  Worker!nodes!contain!the!distributed!data!and!perform!processing!

Overview!of!a!Hadoop!Cluster!

Master"Nodes"–"NameNode,!ResourceManager,!Standby!NameNode,!Hbase!Master!

Master"Node"1"

NameNode!

Oozie!Server!

ZooKeeper!

Master"Node"2"

ResourceManager!

Standby!NameNode!

HBase!Master!

HiveServer2!

ZooKeeper!

Management"Node"

Ambari!Server!

WebHCat!Server!

JobHistoryServer!

ZooKeeper!

Worker"Nodes"–"NodeManager,!DataNode,!Hbase!RegionServer!

Worker"Node"1"

DataNode!

NodeManager!

H!RegionServer!

Worker"Node"2"

DataNode!

NodeManager!

H!RegionServer!

Worker"Node"3"

DataNode!

NodeManager!

H!RegionServer!

Worker"Node"4"

DataNode!

NodeManager!

H!RegionServer!

Page 8: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Standalone!Deployment!Mode!!

•  Single!JVMs!!

(Java!Virtual!Machines)!

•  Local!file!system!

•  Suitable!for!tes8ng!during!development!

!

Hadoop"

Pig"

Hive"

JVM"

CPU"

Memory"

Disk"

Page 9: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Pseudo1Distributed!Deployment!Mode!

•  Each!daemon!runs!its!own!JVM!

•  Hortonworks!Sandbox!runs!in!Pseudo1Distributed!mode!

•  They!all!run!on!a!single!machine!using!HDFS!

•  Appropriate!for!QA!and!development!environments!

NameNode"

JVM!

ResourceManager"

JVM!

DataNode"

JVM!

NodeManager"

JVM!

CPU"

Memory"HDFS"

Page 10: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Distributed!Deployment!Mode!

•  Each!daemon!runs!on!

its!own!machine!

•  Data!is!maintained!

with!local!HDFS!

•  Best!for!produc8on!environments!

CPU!

Memory!

Local!

Disk!

Master!Node!

Memory!

Data!

(HDFS)!

Worker!Node!

CPU!

Memory!

Data!

(HDFS)!

Worker!Node!

CPU!

Memory!

Data!

(HDFS)!

Worker!Node!

CPU!

Memory!

Data!

(HDFS)!

Worker!Node!

CPU!

Page 11: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Small!Cluster!with!HA!

Ambari

Client Gateway Hiveserver

WebHCat

Hive Metastore

Monitoring!

&!Metrics!

Node Manager

Datanode

HBase Region

Node Manager

Datanode

HBase Region

Node Manager

Datanode

HBase Region

Node Manager

Datanode

HBase Region

Node Manager

Datanode

HBase Region

Node Manager

Datanode

HBase Region

HBase Master 1

Namenode 1

Zookeeper

Oozie

Zookeeper

Namenode 2

Resource Manager 1

Zookeeper

History Server

Timeline Server Falcon

Journal Keeper

Journal Keeper

Journal Keeper

Resource Manager 2

HBase Master 2

KNOX KNOX

Client Gateway

Worker Node 1 Worker Node 2 Worker Node 3 Worker Node 4 Worker Node 5 Worker Node 6

Master Node 1 Master Node 2 Master Node 3 Master Node 4 Utility Node 1 Utility Node 2

Page 12: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS!

Page 13: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Hadoop!2.x!

HADOOP"1.x!

HDFS"(redundant,!reliable!storage)!

MapReduce"(cluster!resource!management!

!&!data!processing)!

HDFS2"(redundant,!reliable!storage)!

YARN"(cluster!resource!management)!

HADOOP"2.x!

Single"Use"System"

Batch&Apps&Mul="Purpose"PlaRorm"

Batch,&Interac/ve,&Online,&Streaming,&…&

MapReduce"(data!processing)!

Others"(data!processing)!

Page 14: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

14!

HDFS!Architecture!

NOTE:"HDFS!is!the!most!commonly!used!filesystem!!

implementa8on.!!Other!available!implementa8ons:!!

Tachyon,!GlusterFS,!S3!and!more.!

Storage"

DataNode"

NameNode" Namespace! Block!Map!

Metadata! Journaling!

NameNode and DataNodes are daemon jvms """""""DataNode"

""""""""NameNode"

Storage"

DataNode"

Storage"

DataNode"

Page 15: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

15!

""""""""""""""""""""""""""""""""DataNode"1" """"""""""""""""""""""""""""""""""DataNode"2" """""""""""""""""""""""""""""""""""DataNode"3"

NameNode"Client"

1

2

3

4

5

1.!Client!sends!a!request!to!the!NameNode!to!add!a!file!to!HDFS"

2.!NameNode!gives!client!a!lease!to!file!path!

3.!For!every!block,!the!client!will!request!the!NameNode!to!provide!a!new!!

!!!!blockid!and!a!list!of!des+na+on!DataNodes"

4.!The!client!will!write!the!block!directly!to!the!first!DataNode!in!the!list!

5.!The!first!DataNode!pipelines!the!replica8on!to!the!next!DataNode!in!the!list!

Page 16: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

16!

1.!When!the!NameNode!starts,!it!reads!the!fsimage_N!!

!!!!!and!edits_N!files"

2.!The!transac8ons!in!edits_N!are!merged!with!fsimage_N!

3.!A!newly1created!fsimage_N+1!is!wrifen!to!disk,!and!a!new,!!

!!!!!empty!edits_N+1!is!created"

The NameNode

NameNode"

Namespace!

Block!Map!

Metadata!

Journaling!

fsimage! edits!

4.!Now!a!client!applica8on!can!create!a!new!file!!

!!!!!in!HDFS!

5.!The!NameNode!journals!that!create!!

!!!!!transac8on!in!the!edits_N+1!file.!

The!NameNode!will!be!in!safemode,!a!read1only!mode.!!

!

Note:!The!blockmap!construc8on!process!happens!!

during!safemode.!Blockmap!is!not!persistent!in!fsimage!!

or!edits,!and!must!be!dynamically!reconstructed!when!!

the!NN!starts.!!

Page 17: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

17!

The!DataNodes!

DataNode1" DataNode2" DataNode3" DataNode4"

NameNode"

Heartbeat"&"

Blockreport"

Heartbeat"&"

Blockreport"

Heartbeat"&"

Blockreport"

“Sorry, DataNode 3,

but I’m going to assume you are dead.”

Page 18: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

18!

Heterogeneous!Storage!

•  Before!•  DataNode!is!a!single!storage!•  Storage!is!uniform!1!Only!storage!type!

Disk!

•  Storage!types!hidden!from!the!file!

system!

•  New!Architecture!•  DataNode!is!a!collec8on!of!storages!•  Support!different!types!of!storages!

–  Disk,!SSDs,!Memory!

All"disks"as"a"single"storage"

Collec=on"of"=ered"storages"

S3"

Swi]"

SAN"

Filers"

Page 19: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

19!

HDFS!Storage!Architecture!

NameNode"

Clients"

DataNode"

Reports!

Per!

Device!

Read/Write!

To!Specific!

Storage!

Page 20: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

W!

Understanding NameNode HA

Ac=ve"NameNode"

Namespace!

Block!Management!

ZKFC"

Archive"

ZK" ZK" ZK"

Writes" Reads"

Standby"NameNode"

Namespace!

Block!Management!

ZKFC"

Standby"

Hadoop"HA"Cluster"

DataNode"1"

BLK1! BLK2! BLK6! BLK5!

DataNode"2"

BLK2! BLK4! BLK5!

DataNode"3"""

BLK1! BLK2! BLK4! BLK5!

DataNode"n"""

BLK1! BLK2! BLK4! BLK5!

HDFS"–"Distributed"Storage"

Heartbeats" Heartbeats"

For!automa8c!failover!

Page 21: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

21!

Configura8on!Hierarchy!!

Default!Configura8on!

!

hadoop1common.jar!

hadoop1hdfs.jar!

hadoop1mapreduce1client1core.jar!

hadoop1yarn1common.jar!

!

core1default.xml!

hdfs1default.xml!

mapred1default.xml!

yarn1default.xml!

!

Inherits!/!Extends!

Site/Node!Configura8on!

/etc/hadoop/conf!

!

core1site.xml!

hdfs1site.xml!

mapred1site.xml!

yarn1site.xml!

!

!

!

Applica8on/Job!Configura8on!

!

yarn!jar!–D!prop=value!

Configura8on.set(…)!

!

!

!

Inherits!/!Extends!

Page 22: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

22!

HDFS!Demo!

Page 23: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

23!

HDFS!Lab!

Page 24: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

YARN!

Page 25: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Hadoop!2.x!

HADOOP"1.x!

HDFS"(redundant,!reliable!storage)!

MapReduce"(cluster!resource!management!

!&!data!processing)!

HDFS2"(redundant,!reliable!storage)!

YARN"(cluster!resource!management)!

HADOOP"2.x!

Single"Use"System"

Batch&Apps&Mul="Purpose"PlaRorm"

Batch,&Interac/ve,&Online,&Streaming,&…&

MapReduce"(data!processing)!

Others"(data!processing)!

Page 26: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

What is YARN? •  Yet!Another!Resource!Nego8ator!•  Part!of!core!Hadoop!•  Mul81node!cluster!has!an!aggregate!pool!of!compute!resources!

•  CPU!and!memory!

•  Like!resources!that!an!opera8ng!system!would!manage!

–  AKA:!the!Hadoop!Opera8ng!System!

•  Disk!resources!are!managed!by!HDFS!

Now!have!infrastructure!resource!management!by!Hadoop!

Page 27: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

27!

5"1.  Scale!

2.  New!Programming!Models!&!Services!

3.  Improved!cluster!u8liza8on!

4.  Agility!

5.  Beyond!Java!

5!Key!Benefits!of!YARN!

Page 28: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

ResourceManager

NodeManager NodeManager NodeManager NodeManager

NodeManager NodeManager NodeManager NodeManager

NodeManager NodeManager NodeManager NodeManager

YARN Architecture

Container!1.1!

Scheduler!

Container!2.4!

Container!1.2!

Container!1.3!

AM!2!

AM!2!

Container!2.1!

Container!2.2!

Container!2.3!

Page 29: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Lifecycle of an YARN Application

NodeManager

ResourceManager

Client"

1.   Client!submits!applica8on!

request!to!Applica=ons"

Manager"

2.  Response!with!Applica=on"

ID"

3.   Applica=on"Submission"

Context"(Applica8on!Id,!

Queue,!applica8on!name,!

applica8on!type,!AM!

resource,!priority)!+!

Container"Launch"Context"

(launch!commands,!

LocalResource,!Delega8on!

tokens!

4.  Start!Applica=onMaster"

5.   Get"Capabili=es"

6.   Req/Rec"Containers"

7.  Container!Launch"Requests"

NodeManager

NodeManager NodeManager xx

Applica8ons!

Manager!

1

2

34

Applica8on!

MasterService!

Scheduler!

Container!

Applica=onMaster"

MR"Job"

Job"

Map"

3"

Job"

Map"

4"

Job"

Map"

5"

5

6

Job"

Map"

1"

Job"

Reducer"

1"

Job"

Map"

2"

Job"

Map6"

Job"

Reducer2"

Job"

Map7"

Job"

Map8"Free!Space!

7

Page 30: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

NodeManager

ZooKeeper

1

ZooKeeper ZooKeeper

ResourceManager

Scheduler!

Ac=ve"

ResourceManager

Scheduler!

Standby"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Page 31: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

Ac=ve"

ResourceManager

Scheduler!

Standby"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.1!

Map!1.2!

2

Page 32: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

Ac=ve"

ResourceManager

Scheduler!

Standby"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.1!

Map!1.2!

HDFS"HDFS"

HDFS"

3

Page 33: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS"HDFS"

HDFS"

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

DOWN!"

ResourceManager

Scheduler!

Standby"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.2!

X X 4

Page 34: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS"HDFS"

HDFS"

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

DOWN!"

ResourceManager

Scheduler!

Ac=ve"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.2!

X X 5

Page 35: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS"HDFS"

HDFS"

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

DOWN!"

ResourceManager

Scheduler!

Standby"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.2!

X X 6

Page 36: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS"HDFS"

HDFS"

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

DOWN!"

ResourceManager

Scheduler!

Ac=ve"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.2!

X X 7

Page 37: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS"HDFS"

HDFS"

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

Standby"

ResourceManager

Scheduler!

Ac=ve"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.2!

8

Page 38: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS"HDFS"

HDFS"

NodeManager

ZooKeeper ZooKeeper ZooKeeper

ResourceManager

Scheduler!

Standby"

ResourceManager

Scheduler!

Ac=ve"

NodeManager

NodeManager NodeManager

Client"Node"

YARN!

Client!

Batch:!

Applica8on!

Master!

Map!1.2!

9

Map!1.1!

Page 39: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

MapReduce!

Page 40: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Hadoop!2.x!

HADOOP"1.x!

HDFS"(redundant,!reliable!storage)!

MapReduce"(cluster!resource!management!

!&!data!processing)!

HDFS2"(redundant,!reliable!storage)!

YARN"(cluster!resource!management)!

HADOOP"2.x!

Single"Use"System"

Batch&Apps&Mul="Purpose"PlaRorm"

Batch,&Interac/ve,&Online,&Streaming,&…&

MapReduce"(data!processing)!

Others"(data!processing)!

Page 41: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

41!

MapReduce!Phases!

NM + DN

Mapper!

NM + DN

Mapper!

NM + DN

Mapper!

NM + DN

Reducer!

NM + DN

Reducer!

Map!Phase! Shuffle/Sort! Reduce!Phase!

Data!is!shuffled!

across!the!network!

and!sorted!

Page 42: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Inverted Index: Map

books" movies"

Key"

P1"[books,"amazon.com]"

P2"[movies,"amazon.com]"

Map"2"

Key"

P2"[automo=ve,"walmart.com]"

P1"[books,"walmart.com]"

Value"

books" automo=ve"

Value"

Local"File"System"

Local"File"System"

Worker"

Node"

Worker"

Node"

Hadoop"Distributed"File"

System"(HDFS)"

Map"1"

Amazon.com"

Walmart.com"

SORTED"

BY"

KEY!"

Page 43: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Inverted Index: Shuffle

Reduce"1"Map"1"P1"[books,"amazon.com]"P2"[movies,"amazon.com]"

P1"[books,"walmart.com]"

Reduce"2"

Map"2"P2"[automo=ve,"walmart.com]"

Page 44: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

44!

Reduce"2"

Inverted!Index:!Reduce!

Worker"

Node"Reduce"1"

"[books,"amazon.com]"

"[books,"walmart.com]"

"[automo=ve,"walmart.com]"

"[movies,"amazon.com]"

books,"walmart.com"amazon.com""""""""""

HDFS"

HDFS"

Worker"

Node"

automo=ve,"walmart.com"

movies,"amazon.com"

Page 45: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Within a Single Mapper

Sort/Combine!&!Spill!

P0"P1"P2"P3"

Spill file

Serialized"(key/value)"and"par==on"#"

Record!info!(K,V!offsets!

and!par88on)!

maintained!

mapreduce.task.io.sort.mb"

Deserialized"Key/Value"

Local!Disk!

Offset,"uncompressed,"compressed"length"

In-Memory Spill index

Pluggable2Shuffle2and2Pluggable2Sort&

mapreduce.map.sort.spill.percent"

User’s"Map"

Method"

Record"

Reader"Input"Split"

User�s&Par//oner/Comparator/Combiner&

Sort"Buffer"

Page 46: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Map-side Merging of Spill Files

Par88on!0!

Par88on!1!

Par88on!2!

Par88on!3!

Par88on!0!

Par88on!1!

Par88on!2!

Par88on!3!

Par88on!0!

Par88on!1!

Par88on!2!

Par88on!3!

Par==on"0"

Par==on"1"

Par==on"2"

Par==on"3"

Spill"File1"

Spill"File2"

Spill"File3"

InqMemory"Spill"Index"

(Offset"Length)""

for"each"Spill"File"

Final"Spill"Index"

(Offset,"Length)"

Processing"

of"requests""

from"Reducers"

Merge"/"Combine"

Final"Spill"

file.out"

file..out.index"

(these"are"in"a"

separate"directory"

for"each"map"task)"

NodeManager

Page 47: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Input

Shuffle Dataflow

Mapper 0

Mapper 3

Mapper 2

Mapper 1 Reducer 0

Reducer 1

Intermediate file transfers

Output

Output

Page 48: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Reducer – Shuffle Received Data

Small"Map"

Output"

Can"fit"in"memory" Merge/"

Combine"

Disk"

Spill""

merged"

data"

Happens in parallel (copier threads) = mapreduce.reduce.shuffle.parallelcopies

Cannot"fit"in"memory"

Large"Map"

Output"Merge"

Spill merged data

mapreduce.reduce.shuffle.input.buffer.percent!=!%!of!JVM!Heap!to!use!for!buffering!

Page 49: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Compression!

Page 50: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Compression Needs and Tradeoffs •  Compressing!data!can!speed!up!data1intensive!I/O!opera8ons!

•  MapReduce!jobs!are!almost!always!I/O!bound!

•  Compressed!data!can!save!storage!space!and!speed!up!data!transfers!across!the!network!

•  Capital!alloca8on!for!hardware!can!go!further!

•  Reduced!I/O!and!network!load!can!result!in!significant!performance!improvements!

•  MapReduce!jobs!can!finish!faster!overall!

•  But,!CPU!u8liza8on!and!processing!8me!increase!during!compression!and!decompression!

•  Understanding!the!tradeoffs!is!important!for!MapReduce!pipeline’s!overall!performance!!

Page 51: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

51!

Data"Compression"in"Hadoop’s"MR"Pipeline"

Input""

splits"

Map Reduce Buffer in memory

Par==on"and"Sort"

fetch"

Merge""

on"disk"

Merge"and"sort"

"Other""

maps"

Other""

reducers"

I/P""

compressed"

Mapper"decompresses"

Mapper"O/P"compressed"

Map"Reduce"

Reduce"I/P"

Map"O/P"

Reducer"I/P"decompresses"

Reducer"O/P"compressed"

Sort"&"Shuffle"

Compress" Decompress"

1 2 3

Page 52: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

52!

Phase"" Config" Values"

1)!Input!data!to!Map"

File!extension!recognized!automa8cally!for!decompression"

File!extensions!for!supported!formats!

Note:!For!SequenceFile,!headers!have!the!informa8on![compression!

(boolean),!block!compression!(boolean),!and!compression!codec]"

One!of!the!supported!codecs" one!defined!in!io.compression.codecs

2)!Intermediate!

(Map)!Output"

mapreduce.map.output.compressfalse!(default),!true!

"

mapreduce.map.output.compress.codec!one!defined!in!io.compression.codecs

3)!Final!(Reduce)!Output"

mapreduce.output.fileoutputormat.!compressfalse!(default),!true!

"

mapreduce.output.fileoutputormat.!compress.codec one!defined!in!io.compression.codecs

mapreduce.output.fileoutputormat.!compress.typeType!of!compression!to!use!for!SequenceFile!outputs:!NONE,!RECORD!

(default),!BLOCK"

Using!Data!Compression!in!Hadoop!

Page 53: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Common File Formats

•  Block!Compressed:!

•  SequenceFile!•  RCFile!•  ORC!File!•  Avro!File!•  Parquet!

•  Other!Formats:!

•  Text!•  JSON!•  XML!

Page 54: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

54!

Format" Algorithm" Strategy"

zlib"Uses!DEFLATE!!

(LZ77!and!Huffman!coding)"Dic8onary1based,!API"

gzip" Wrapper!around!zlib" Dic8onary1based,!standard!compression!u8lity"

bzip2" Burrows1Wheeler!transform" Transform1based,!block1oriented"

LZO" Variant!of!LZ77" Dic8onary1based,!block1oriented,!API"

LZ4" Simplified!variant!of!LZ77" Fast!scan,!API"

Snappy" LZ77" Block1oriented,!API"

LZOP" LZOIX" Splifable!with!preprocessing!to!generate!index!file"

Compression!Op8ons!in!Hadoop!

Page 55: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

55!

MapReduce!Demo!

Page 56: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

56!

MapReduce!Lab!

Page 57: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Job!Scheduler!

Page 58: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

YARN Capacity Scheduler

Administer"

FUNCTION"

Isolate"

FUNCTION"

Schedule"

FUNCTION"

Page 59: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Capacity Scheduler •  Distributes jobs to queues based on capacity

•  Capacity determined by percentage of available memory •  Total of all of the task percentage assigned has to add up to 100%

•  Can set individual user limits per queue •  Extra capacity available is given to other queues evenly

50%! 30%! 20%!Configured!for:!

Actual:!

40%! 35%! 25%!

Queue1! Queue2! Queue3!

Page 60: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Defining Queues yarn.scheduler.capacity.root.queues= "Marketing,Sales,Engineering”

yarn.scheduler.capacity.root.Marketing.capacity=50

yarn.scheduler.capacity.root.Sales.capacity=30

yarn.scheduler.capacity.root.Engineering.capacity=20!

Page 61: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Configuring Capacity Limits yarn.scheduler.capacity.root.queues= "Marketing,Marketing-longrunning" yarn.scheduler.capacity.root. Marketing.capacity=70 yarn.scheduler.capacity.root. Marketing.maximum-capacity=80 yarn.scheduler.capacity.root. Marketing-longrunning.capacity=30 yarn.scheduler.capacity.root. Marketing-longrunning.maximum-capacity=30

Page 62: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Configuring Permissions yarn.scheduler.capacity.root. Engineering.acl_submit_applications= "George,Tom developer,admin" yarn.scheduler.capacity.root. Engineering.acl_administer_queue= "Tom admin"

Page 63: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Preempton!!

Page 64: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

64!

YARN!Capacity!Scheduler!

•  Queues!with!guaranteed!capacity!

•  Queue!hierarchy!

•  Job!submission!Access!Control!Lists!

•  Max!capacity!per!queue!

•  User!limits!within!queue!

•  Preemp8on!across!queues!

•  Management!Admin.!Access!Control!Lists!

•  Capacity1Scheduler.xml!Administer"

Schedule"

Isolate"

FUNCTION"

FUNCTION"

FUNCTION"

Page 65: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

65!

Preemp8on:!Flow!!

•  Current"Capacity"

•  Guaranteed"Capacity"

•  Pending"Requests"

•  Figure"out"what"is"needed"to"achieve"capacity"balance"

•  Select"applica=ons"to"preempt:"Over"cap."Qs"and"FIFO"order"

•  Respect"bounds"on"amount"of"preemp=on"allowed"for"each"round"

1.   Remove"reserva=ons"from"the"most"recently"assigned"app"

2.   Issue"preemp=ons"for"containers"of"same"app"(reverse"chronological"order,"last"assigned"

container"first)"

App"Master"preqemp=on"is"last"resort."

1.   Track"those"containers"that"have"been"issued"by"not"yet"executed"preemp=on"

2.   A]er"a"set"of"execu=on"periods,"forcibly"kill"these"containers"

Preempt"

Applica=on(s)"

Gather""

Queue""

State"

Kill""

Containers"

Iden=fy""

Set"of""

Preemp=ons"

1

2

3

4

Page 66: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

66!

Preemp8on!in!ac8on!

Product"

QUEUE"

Min:"20%" Max:"30%"

Marke=ng"

QUEUE"

Min:"45%" Max:"75%"

Finance"

QUEUE"

Job"2""

Min:"35%" Max:"35%"

Job1"

2

Page 67: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

67!

Preemp8on!in!ac8on!

Product"

QUEUE"

Min:"20%"

Max:"30%"

Marke=ng"

QUEUE"

Min:"45%" Max:"75%"

Finance"

QUEUE"

Job!3!!

Min:"35%"

Max:"35%"

Job1!

Job2!

65%"

3

Page 68: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

68!

Preemp8on!in!ac8on!

Product"

QUEUE"

Min:"20%"

Max:"30%"

Marke=ng"

QUEUE"

Min:"45%" Max:"75%"

Finance"

QUEUE"

Job"3""

Min:"35%"

Max:"35%"

Job1!

Job3"

JOB2"PREEMPTED"Job2"

4

Page 69: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

69!

Preemp8on!in!ac8on!

Product"

QUEUE"

Min:"20%"

Max:"30%"

Marke=ng"

QUEUE"

Min:"45%" Max:"75%"

Finance"

QUEUE"

Min:"35%"

Max:"35%"

Job2!

Job3!

JOB1"FINISHES"

5

Page 70: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

70!

Capacity!Scheduler!Lab!

Page 71: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Kava!

Page 72: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

What is Kafka? •  !!According!to!the!Kava!website:!

•  Distributed:!horizontally!scalable!(just!like!Hadoop!)!

•  Par==oned:!the!data!is!split1up!and!distributed!across!the!brokers!

•  Replicated:!allows!for!automa8c!failover!

•  Unique:!Kava!does!not!track!the!consump8on!of!messages!(the!consumers!do)!

•  Fast:!designed!from!the!ground!up!with!a!focus!on!performance!and!throughout!

Kaua"is"a"distributed,"par==oned,"

replicated"commit"log"service."It"provides"

the"func=onality"of"a"messaging"system,"

but"with"a"unique"design."

Page 73: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Kafka Terminology Kafka is a publish/subscribe messaging system comprised of the following components:

•  Topic: a message feed

•  Producer: a process that publishes messages to a topic

•  Consumer: a process that subscribes to a topic and processes its messages

•  Broker: a server in a Kafka cluster

Page 74: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

74!

Kava!Components!

Kaua"Cluster"

producer"

producer"

producer"

consumer"

consumer"

consumer"

brokers"

Kaua"uses"ZooKeeper"to""

coordinate"brokers"with"

consumers"

Page 75: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

75!

Publishing!Messages!

producer"

message_a"

message_b"

message_c"

message_d"

message_e"

message_f"

…"

1."A"producer"publishes"messages"to"a"topic"

2."The"producer"decides"which"

par==on"to"send"each"message"to"

offset!1>! 0! 1! 2! 3! 4!

Par88on!0! message_b! message_f!

Par88on!1! message_a! message_c! message_e!

Par88on!2! message_d!

Old" New"

3."New"messages"are"wriwen"to"the"

end"of"the"par==on"

consumer"

4."A"consumer"fetches"messages"from"a"

par==on"by"specifying"an"offset"

Page 76: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

76!

Leader!and!Followers!

Broker!1!

my_topic!

Par88on11!

(follower)!

Broker!2!

my_topic!

Par88on11!

(leader)!

Broker!3!

my_topic!

Par88on11!

(follower)!

Page 77: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Spark!

Page 78: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

78!

Hadoop!Ecosystem!

Page 79: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

79!

Introducing

Spark"SQL""

DataFrames""

Spark""

Streaming"MLlib" GraphX"

Spark"Core"/"Tungsten"

HDFS" Hive" HBase" S3" Cassandra" Others"

Page 80: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

80!

Supported:!

•  Spark!Core!

•  MLlib!

•  Spark!on!YARN!/!Kerberos!

•  Spark!SQL!

•  Spark!R!

•  Spark!Streaming!

•  DataFrame!API!

•  Spark!ML!Pipeline!API!in!PySpark!

Unsupported!/!Tech!Preview:!

•  GraphX!

•  Spark!Standalone/Mesos!

Spark!Features!

Supported:!

•  Spark!Core!

•  MLlib!

•  Spark!on!YARN!/!Kerberos!

•  Spark!SQL!/!DataFrame!

•  Spark!Streaming!

•  Spark!Standalone!

Unsupported:!

•  GraphX!

•  Spark!R!

•  Mesos!

•  Spark!ML!Pipeline!API!!

Page 81: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

81!

2015!Spark!Survey!

81!

Page 82: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

82!

What"is"Spark?"

Spark is an!open>source&sozware!solu8on!that!performs!rapid!calcula8ons!on!in>memory&datasets&

-  Open!Source![Apache!hosted!&!licensed]!•  Free!to!download!and!use!in!produc8on!

•  Developed!by!a!community!of!developers!

!

-  In1memory!datasets!

•  RDD"(Resilient"Distributed"Data)"is!the!basis!for!what!Spark!enables!

•  Resilient!–!the!models!can!be!recreated!on!the!fly!from!known!state!

•  Distributed!–!the!dataset!is!ozen!par88oned!across!mul8ple!nodes!for!

increased!scalability!and!parallelism!!

Page 83: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

83!

! !Java!MapReduce!protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String currentLine = value.toString(); String[] words = StringUtils.split(currentLine, '\\', ' '); for (String word : words) { outputKey.set(word); context.write(outputKey, ONE); } } protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : values) { sum += count.get(); } outputValue.set(sum); context.write(key, outputValue); }

Write!Less!Code:!WordCount!

Scala Spark Core

val words = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word, 1)) .reduceByKey{case (x, y) => x + y} counts.saveAsTextFile(outputFile)

Page 84: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

84!

Write!Less!Code:!Calculate!an!Average!

Page 85: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

85!

 Spark!Core!contains!the!basic!func8onality!of!Spark!–  !Task!scheduling!–  !Memory!management!

–  !Fault!recovery!–  !Interac8ng!with!storage!systems!

 Home!to!Resilient&Distributed&Datasets&(RDDs):!A!collec8on!of!immutable!par88ons!of!data! Provides!many!APIs!for!building!and!manipula8ng!RDDs!

 Tungsten!(Version!1.5)!

 Spark!Datasets!(SPARK19999!Target!Version:!1.6/2.0)!!

!

Spark!Core!

Page 86: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

86!

Format"Name" Spllitable" Structured" Comments"

Text! Yes! No! Plain!old!text!files.!Splifable!provided!

records!are!one!per!line.!

JSON! Yes! Semi! Common!text!based!format,!are!semi1

structured,!splifable!if!one!record!per!line.!

CSV! Yes! Yes! Very!common!text!based!format,!ozen!used!

with!spreadsheet!applica8ons.!

SequenceFile! Yes! Yes! A!common!Hadoop!file!format!used!for!key1

value!data.!

Protocol!Buffers! Yes! Yes! A!fast!space1efficient!mul81language!format.!

!

Object!Files!

!

Yes!

!

Yes!

Useful!for!saving!data!from!a!Spark!job!to!be!

consumed!by!shared!code.!Breaks!if!you!

change!your!classes,!as!it!relies!on!Java!

Serializa8on.!

File!Formats!

Page 87: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

87!

Compression!

Property"Name" Default" Descrip=on"

spark.rdd.compress FALSE Whether!to!compress!serialized!RDD!

par88ons

spark.broadcast.compress TRUE Whether!to!compress!broadcast!variables!

before!sending!them

spark.io.compression.codec snappy The!codec!used!to!compress!internal!data!

such!as!RDD!par88ons,!broadcast!variables!

and!shuffle!outputs.!By!default,!Spark!

provides!three!codecs:!lz4,!lzf,!and!snappy

Page 88: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

88!

Spark!provides!two!ways!to!create!RDDs:!!

1.!Loading!an!external!dataset!

sc.textFile("data/salarydata.txt")!

2.!Parallelizing!a!collec8on!in!your!driver!program!

!

sc.parallelize(List("M",14,0,95102))!

!

Create!RDD!

Page 89: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

89!

  Two!types!of!opera8ons!on!RDDs:!transforma/ons!and!ac/ons&

•  Transforma8ons!are!lazy!(not!computed!immediately)!the!transformed!

RDD!gets!recomputed!when!an!ac8on!is!run!on!it!(default)!

•  RDD!can!be!persisted!into!storage!in!memory!or!disk!

RDD!Opera8ons!

Page 90: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

90!

•  map()•  flatMap()•  filter()•  distinct()•  sample()•  union()•  intersection()•  subtract()•  cartesian()

Transforma8ons!

!

Transforma8ons!

Physical!Plans!Physical!Plans!

RDD!

// Transformed RDD

val genderSalaryData = (salaryData.map(line => line.split(',')).map(line => (line(0), line(2).toInt)))

Page 91: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

91!

Ac8ons!

!

Transforma8ons!

Physical!Plans!Physical!Plans!

RDD!

!

Ac8on! Value!

// takeOrdered() action

val distinctFemaleSalaries = (genderSalaryData .filter { case(gender, salary) => gender == "F" } .map { case(gender,salary) => salary } .distinct() )distinctFemaleSalaries.takeOrdered(10)(Ordering[Int].reverse)

Page 92: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

92!

Pair!RDD!

•  combineByKey•  groupByKey()•  reduceByKey(func) •  mapValues(func)•  flatMapValues(func) •  keys()•  values()•  sortByKey()

salaryData.map(line => line.split(',')).map(line => (line(0), line(2).toInt))

Transforma=ons"on"one"Pair"RDD"

example&({(1,&2),&(3,&4),&(3,&6)})&&

Create"pair"RDD"using"first"word"as"the"key&

•  countByKey() •  collectAsMap()•  lookup(key)

Ac=ons"on"Pair"RDD"

example&({(1,&2),&(3,&4),&(3,&6)})&&

•  Key-Value RDDs are commonly used to perform aggregations

•  Expose new operations

•  Act on each key in parallel or regroup data across the network

•  join•  cogroup•  aggregateByKey•  fullOuterJoin•  leftOuterJoin•  rightOuterJoin

Page 93: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

93!

Storage!Levels:!cache(), persist()!

Storage Level Descrip=on"

MEMORY_ONLY (default) Deserialized.!Par88ons!will!not!be!cached.!Recomputed!them!on!demand.!

MEMORY_AND_DISK Deserialized.!Store!the!par88ons!that!don’t!fit!on!disk.!Read!them!on!demand.!

MEMORY_ONLY_SER Serialized.!More!space1efficient!than!deserialized!objects.!CPU1intensive!to!read.!

MEMORY_AND_DISK_SER Spill!par88ons!that!don't!fit!in!memory!to!disk!instead!of!recompu8ng!on!demand.!

DISK_ONLY Store!the!RDD!par88ons!only!on!disk.!

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

Same!as!the!levels!above,!but!replicate!each!par88on!on!two!cluster!nodes.!

OFF_HEAP Reduces!GC!overhead,!allows!executors!to!be!smaller!/!share!a!pool!of!memory,!

•  Storage levels are set by passing a StorageLevel object (Scala, Java, Python, R) to persist()

•  The cache() method is a shorthand for using the default storage level: StorageLevel.MEMORY_ONLY (store deserialized objects in memory)

Full set of storage levels:

Page 94: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Partitions •  Each RDD is split into multiple partitions, computed on different nodes of the cluster

•  More partitions in a RDD can provide more parallelism

•  Tasks are executed on a partition

•  Each HDFS block will require one partition

•  Repartitioning can improve performance

Page 95: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

95!

  Spark!automa8cally!recovers!from!failed!or!slow!nodes!by!re1execu8ng!failed!or!slow!

tasks!

  Par88ons!are!rebuilt!on!a!different!node!if!node!failure!occurs!!

  Specula8ve!execu8on!can!be!u8lized!for!slower!nodes!

  RDD!data!fails,!Spark!re1computes!the!missing!par88ons,!transparent!to!the!user!

Fault!Tolerance!and!Recovery!

Page 96: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Step 1: Create RDDs

sc.textFile("data/salarydata.txt")"

map(line"=>"line.split(','))"

map(line"=>"(line(0),"line(2).toInt))"

"reduceByKey"{_"+"_}"

collect()"

Page 97: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

97!

Step!1:!Create!RDDs!

!!

HadoopRDD!

!!

map()!

!!

reduceByKey()!

!!

collect()!

map()!!!

map()!

•  Pipeline!as!much!

as!possible!

•  Split!into!“stages”!based!on!need!to!

reorganize!data!

Page 98: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

98!

Step!2:!Create!Execu8on!Plan!!

""

HadoopRDD"

""

map()"

""

reduceByKey()"

""

collect()"

map()"""

map()"

Stage 1

Gender" Age" Salary" Zip"Code"

M! 14! 0! 95102!M! 3! 0! 95101!M! 25! 26000! 94040!M! 71! 69000! 95102!F! 7! 0! 95102!M! 95! 12000! 95105!F! 36! 61000! 95101!M! 86! 65000! 95051!F! 17! 0! 95103!F! 44! 96000! 94041!...! …! …! …!

Gender" Salary"

M! 0!M! 0!M! 26000!M! 69000!F! 0!M! 12000!F! 61000!M! 65000!F! 0!F! 96000!…! …!

salarydata.txt!

•  Pipeline!as!much!as!

possible!

•  Split!into!“stages”!based!

on!need!to!reorganize!

data!

Page 99: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

99!

Step!2:!Create!Execu8on!Plan!!

!!

HadoopRDD"

!!

map()"

!!

reduceByKey()"

!!

collect()"

map()!!!

map()"

Stage 2

Gender" Salary"

M! 0!M! 0!M! 26000!M! 69000!F! 0!M! 12000!F! 61000!M! 65000!F! 0!F! 96000!…! …!

F!total!salaries!=!196171000!!

M!total!salaries!=!191825000!

Page 100: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Step 3: Schedule Tasks • Split each stage into tasks • A task is data + computation • Execute all tasks within a stage before moving on

!!

HadoopRDD"

!!

map()"

map()!""

map()"

Stage 1

Computation

""Task0!

""Task1!

host1:/par==on0"

host2:/par==on1"

Page 101: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

101!

Default!par88ons!are!determined!by!number!of!CPU!cores!

!

Step!3:!Schedule!Tasks!

""

HadoopRDD"

""

map()"

map()"""

map()"

""

host1:/par==on0"

""

HadoopRDD"

""

map()"

map()"""

map()"

""

host2:/par==on1"

Task0! Task1!

Page 102: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

102!

The!Shuffle!

""

HadoopRDD"

""

map()"

""

reduceByKey()"

""

collect()"

map()"""

map()"

Stage 1

Stage 2

Page 103: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Shuffling

•  Spark creates intermediate buffers for storing shuffle output data

•  Shuffle is sort-based. Every map task writes out data to local disk, reduce tasks make remote requests to fetch that data

•  Buffers are used to store intermediate results of aggregations

•  Spark attempts to limit the total amount of memory used in shuffle

Memory"

Name" EmpID"

Susan& 3230&Jeff& 4432&

Barbera& 5643&David& 2346&Charles& 6346&Terry& 8435&Joseph& 2747&Debra& 2482&Nathan& 3485&Maria& 4824&

Par==onqSorted"File"

Name" EmpID"

Barbera& 5643&Charles& 6346&David& 2346&Debra& 2482&Jeff& 4432&

Joseph& 2747&Maria& 4824&Nathan& 3485&Susan& 3230&Terry& 8435&

map()"!!

reduces"

Page 104: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

104!

  Similar!to!reduce,!use!it!to!combine!values!

  Runs!several!parallel!reduce!opera8ons!

  Each!opera8on!have!the!same!key!

  Returns!a!new!RDD!consis8ng!of!each!key!and!the!reduced!value!

  Aggregate!data!separately!for!each!key!

  collect() func8on!retrieves!the!en8re!RDD!

Execute!reduceByKey() collect() Gender" Salary"

M! 0!M! 0!M! 26000!M! 69000!F! 0!M! 12000!F! 61000!M! 65000!F! 0!F! 96000!…! …!

F!total!salaries!=!196171000!!

M!total!salaries!=!191825000!

Page 105: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

105!

spark1shell!and!pyspark!shell!

Page 106: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

106!

Apache!Zeppelin!

Page 107: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Spark!SQL!

DataFrames!

Page 108: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

108!

Interfaces!to!Spark!SQL!

JDBC! Console!User!Programs!

(Java,!Scala,!Python,!R)!

Spark!SQL!

!!Catalyst!Op8mizer!

DataFrame!API!

Spark!

!!Resilient!Distributed!Datasets!

Page 109: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

109!

  Most!powerful!way!to!use!Spark!SQL!is!inside!a!Spark!applica8on!

  Load!data!and!query!it!with!SQL!while!simultaneously!combining!it!with!“regular”!

program!code!u8lizing!SQLContext!or!HiveContext

SQLContext!

// SQL Imports// Import Spark SQL. If you can't have the// hive dependencies

import org.apache.spark.sql.SQLContext

// Construct SQL Contextval sqlContext = new SQLContext(…)

// SQL Imports// Import Spark SQL

import org.apache.spark.sql.hive.HiveContext

// Construct Hive Contextval hiveContext = new HiveContext(…)

Page 110: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

110!

HiveContext (Recommended)

110!

!

•  Provides!a!superset!of!the!func8onality!in!addi8on!to!the!basic!SQLContext

•  Write!queries!using!the!more!complete!HiveQL!parser!

•  Access!to!Hive!UDFs!and!ability!to!read!data!from!Hive!tables!

•  Build!DataFrames!(represent!structure!data),!and!operate!on!them!with!SQL!or!with!

normal!RDD!opera8ons!like!map.

•  Do!not!need!to!have!an!exis8ng!Hive!setup!!

•  Future!releases!will!focus!on!bringing!SQLContext!up!to!feature!parity!with!a!HiveContext

Page 111: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

111!

  Offers!rich!rela8onal/procedural!integra8on!within!Spark!programs!

DataFrames:!

–  Collec8ons!of!structured!records!that!can!be!manipulated!using!Spark’s!procedural!API!or!new!

rela8onal!API!

–  Perform!rela8onal!opera8ons!on!DataFrames!using!a!domain1specific!language!(DSL)!similar!to!R!

data!frames!and!Python!Pandas!

–  Pass!Scala,!Java!or!Python!func8ons!through!DataFrames!to!build!a!logical!plan!

–  Create!directly!from!Spark’s!distributed!objects!

–  Enable!rela8onal!rela8onal!processing!in!exis8ng!Spark!programs!

  Automa8cally!store!data!in!a!columnar!format!

  Go!through!a!rela8onal!op8mizer,!Catalyst&

  Standard!data!representa8on!in!a!new!“ML!pipeline”!API!for!machine!learning!

DataFrames!

Page 112: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Create DataFrames

// case class is used here to define a schema/structure for the underlying datacase class SalaryData(gender: String, age: Int, salary: Int, zip: Int)

val rawSalaryData = sc.textFile("data/salarydata.txt”)

val salaryData = rawSalaryData.map(line => { val fields = line.split(','); SalaryData(fields(0), fields(1).toInt, fields(2).toInt,fields(3).toInt) }).toDF()

•  With!a!HiveContext!or!SQLContext,!applica8ons!can!create!DataFrames!from:!!

•  An!exis8ng!RDD!•  Hive!table!•  Data!Sources!

•  Create!a!DataFrame!from!the!content!of!a!CSV!file:!

Page 113: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Operations on DataFrames

salaryData.printSchema()salaryData.show()

val genderSalaryData = salaryData.select(salaryData("gender"),salaryData("salary")).cache()

genderSalaryData.groupBy("gender").sum("salary").show()genderSalaryData.groupBy("gender").avg("salary").show()

Common!DataFrame!Func8ons:!!

df.show()df.printSchema()df.select()df.filter()df.groupBy().count()

Example:

Page 114: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

114!

  Largest!change!to!Spark’s!execu8on!engine!since!the!project’s!incep8on!

  Focuses!on!substan8ally!improving!the!efficiency!of!memory!and!CPU!for!Spark!

applica8ons!

  Includes!three!ini8a8ves:!1.   Memory"Management"and"Binary"Processing:"leveraging!applica8on!seman8cs!to!manage!

memory!explicitly!and!eliminate!the!overhead!of!JVM!object!model!and!garbage!collec8on!

2.   Cacheqaware"computa=on:"algorithms!and!data!structures!to!exploit!memory!hierarchy!

3.   Code"genera=on:"using!code!genera8on!to!exploit!modern!compilers!and!CPUs!

Tungsten!

Page 115: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

115!

Considera=ons:"

– Object!overhead!– GC’s!inefficiency!

1.!Memory!Management!and!Binary!Processing!

Reference"

Size" Descrip=on"

4" (object"header)"

4" (object"header)"

4" (object"header)"

4" char[]"String.value"

4" int"String.hash"

4" int"String.hash2"

12!byte!object!header!

20!bytes!of!overhead!+!8!bytes!of!chars!

8!byte!hashcode!

Total:"48"bytes"

Consider"a"simple"string"“Mike”"

Solutions: – Explicit!memory!manager:!

sun.misc.Unsafe!

– Tungsten’s!UnsafeRow!format!

– Spark’s!new!binary!map!

Page 116: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

116!

  Uses!Janino!compiler!to!reduce!code!genera8on!8me!

  Speed!up!the!conversion!of!data!from!in1memory!binary!format!to!wire1protocol!for!

shuffle!!

  Increase!throughput!of!serializa8on!

  Generates!specialized!bytecode!code!for!schema!

!

  2X!faster!shuffle!than!Kryo!

3.!Code!Genera8on!

Page 117: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Catalyst!

Op8mizer!

Page 118: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

118!

  Based!on!func8onal!programming!constructs!in!Scala!

  Supports!both!rule1based!and!cost1based!op8miza8on!!

  Extensible!for!two!reasons:!

1. Easy!to!add!new!op8miza8on!techniques!and!features!to!Spark!SQL!

2. Enable!external!developers!to!extend!the!op8mizer!

  Contains!a!general!library!for!represen8ng!trees!and!applying!rules!to!manipula8ng!them!

  Libraries!specific!to!rela8onal!query!processing!and!sets!of!rules!that!handle!query!execu8on!

  Offers!several!public!extension!points:!

–  External!Data!Sources!– User1Defined!Types!

Catalyst!Op8mizer!

Page 119: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

119!

Query!Planning!

SQL!Query!

DataFrame!

Unresolved!

Logical!Plan!

Cost!Model!

!

RDDs!

!!

Logical!Plan!Op8mized!

Logical!Plan!

Physical!Plans!Physical!Plans!Physical!Plans!

Selected!Physical!Plan!

Code!!

Genera8on!

Physical!Planning!

Logical!Op8miza8on!Analysis!

Catalog!

Page 120: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Analysis

SQL!Query!

DataFrame!

Unresolved!

Logical!Plan!

Cost!Model!

!

RDDs!

!!

Logical!Plan!Op8mized!

Logical!Plan!

Physical!Plans!Physical!Plans!Physical!Plans!

Selected!Physical!Plan!

Code!!

Genera8on!

Physical!Planning!

Logical!Op8miza8on!Analysis"

Catalog!

Page 121: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Logical Optimization

SQL!Query!

DataFrame!

Unresolved!

Logical!Plan!

Cost!Model!

!

RDDs!

!!

Logical!Plan!Op8mized!

Logical!Plan!

Physical!Plans!Physical!Plans!Physical!Plans!

Selected!Physical!Plan!

Code!!

Genera8on!

Physical!Planning!

Logical"Op=miza=on"Analysis!

Catalog!

Page 122: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Physical Planning

SQL!Query!

DataFrame!

Unresolved!

Logical!Plan!

Cost!Model!

!

RDDs!

!!

Logical!Plan!Op8mized!

Logical!Plan!

Physical!Plans!Physical!Plans!Physical!Plans!

Selected!Physical!Plan!

Code!!

Genera8on!

Physical"Planning"

Logical!Op8miza8on!Analysis!

Catalog!

Page 123: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Code Generation

SQL!Query!

DataFrame!

Unresolved!

Logical!Plan!

Cost!Model!

!

RDDs!

!!

Logical!Plan!Op8mized!

Logical!Plan!

Physical!Plans!Physical!Plans!Physical!Plans!

Selected!Physical!Plan!

Code""

Genera=on"

Physical!Planning!

Logical!Op8miza8on!Analysis!

Catalog!

Page 124: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Query Federation to External Databases • Data pipelines often combine data from heterogeneous sources

• Spark SQL data sources leverage Catalyst to push predicates down into the data sources whenever possible

Example: Use JDBC data source and JSON data source to join two tables together

CREATE TEMPORARY TABLE users USING jdbcOPTIONS(driver "mysql" url "jdbc:mysql://userDB/users ")

CREATE TEMPORARY TABLE logsUSING json OPTIONS (path "logs.json")

SELECT users.id,users.name,logs.messageFROM users JOIN logs WHERE users.id=logs.userId

AND users.registrationDate > "2015-01-01"

Page 125: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

User-Defined Functions • Register custom functions in:

– Python!– Java!– Scala !!

• Supports native UDFs and existing Apache Hive UDFs

• The standard Hive UDFs are already automatically included

val udf = (text: String, topic: String) => { text match { case s if s matches s".*(?i)$topic.*" => true case _ => false }}hiveContext.udf.register("contains_topic", udf)hiveContext.sql("select count(*) from newsgroups where contains_topic(subject, 'alaska')").show()

Page 126: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Spark!SQL!DataFrames!Demo!

Page 127: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Spark!Streaming!

Page 128: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Spark Streaming •  A!Spark!component!that!enables!processing!live!streams!of!data!

•  Can!be!created!from!various!input!sources:!Flume,!Kava,!HDFS/S3!

•  Integrate!with!MLlib,!SQL,!DataFrames,!GraphX!

•  Provides!an!abstrac8on!called!DStreams:!a!sequence!of!data!arriving!over!8me!

•  Provides!an!API!for!manipula8ng!data!streams!that!closely!matches!the!Spark!Core’s!RDD!API!

•  Designed!to!provide!the!same!degree!of!fault!tolerance,!throughput,!and!scalability!that!the!Spark!Core!

provides!

Kaua""

Flume""

Kinesis""

HDFS/S3"

Twiwer"

File"Systems"

Databases"

Das"

Page 129: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

!!

Discretized Streaming (DStreams) •  Treat!a!streaming!computa8on!as!a!series!of!determinis8c!

batch!computa8ons!on!small!8me!intervals!

•  Basic!abstrac8on!provided!by!Spark!Streaming:!

1.  Input!data!stream!received!from!the!source!

2.  Processed!data!stream!generated!by!transforming!the!input!stream!

•  Represented!by!a!con8nuous!series!of!RDDs!

!!

!!

!!

!!

!!

!!

!!

Data"Streams"

Batches!of!Input!data!

(x&seconds)!

Batches!of!processed!data!

Page 130: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Example: View Count Lineage Graph !!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

!!

rsvps

map

ones

reduce

counts

rsvps = meetupReceier("http://...”)ones = rsvps.map(event => (event.event_url, 1))counts = ones.updateStateByKey((a, b) => a + b)

Create"a"DStream"called"rsvps:"""

t2=222

t2=212

Page 131: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Input DStreams and Receivers •  Input"DStreams""

–  DStreams!represen8ng!the!stream!of!input!data!received!from!streaming!

sources!

–  In!previous!example,!rsvps was!an!input!DStream!as!it!represented!the!stream!

of!data!received!from!HTTP!

•  Receiver"Object""–  Receives!the!data!from!a!source!and!stores!it!in!Spark’s!memory!for!processing!

–  Associated!with!every!input!DStream!(except!file!stream)!

Page 132: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Transformations Operators on DStreams •  Produce!a!new!DStream!from!one!or!more!parent!streams.!Can!be!

either:!

–  Stateless:!act!independently!on!each!interval!–  Stateful:!share!data!across!intervals!

Common"Opera=ons:"

Stateless" Stateful"

map()! updateStateByKey()!

flatMap()! window()!

filter()! reduceByWindow()!

repar88on()! reduceByKeyAndWindow()!

reduceByKey()!

groupByKey()!

Page 133: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Output Operators on DStreams •  Let!the!program!write!data!to!external!systems:!save!each!RDD!

to!HDFS!

Current"Output"Operators:"

Output Operation Description print() Prints the first ten elements of every batch of data in a DStream on the driver node running the

streaming application. This is useful for development and debugging. 

saveAsTextFiles() Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

saveAsObjectFiles() Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". 

saveAsHadoopFiles() Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". 

foreachRDD()The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database.

Page 134: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

DataFrame and SQL Operations

•  Use!DataFrames!and!SQL!opera8ons!on!streaming!data!

•  Create!a!SQLContext!using!the!SparkContext!that!the!StreamingContext is!using!

•  Each!RDD!is!converted!to!a!DataFrame,!registered!as!a!temporary!table!and!then!queried!

using!SQL!

!

•  Run!SQL!queries!on!tables!defined!on!streaming!data!from!a!different!threat!

Page 135: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

MLlib Operations •  Use!machine!learning!algorithms!provided!by!MLlib!

–  Streaming!machine!learning!algorithms!which!can!simultaneously!learn!from!the!streaming!data!

as!well!as!apply!the!model!on!the!streaming!data:!

•  Streaming!Linear!Regression!

•  Streaming!KMeans!!

•  !For!a!much!larger!class!of!machine!learning!algorithms:!

–  learn!a!learning!model!offline!(i.e.!using!historical!data)!

–  apply!the!model!online!on!streaming!data!

Page 136: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Caching and Persistence •  Use!the!persist() method!on!a!DStream:!automa8cally!persist!every!RDD!of!

that!DStream!in!memory!

•  Useful!if!the!data!in!the!DStream!will!be!computed!mul8ple!8mes:!

– Window1based!opera8ons:!

•  reduceByWindow•  reduceByKeyAndWindow!!

–  State1based!opera8ons!•  updateStateByKey

Page 137: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Checkpointing •  Metadata!checkpoin8ng:!

–  Saving!of!the!informa8on!defining!the!streaming!computa8on!to!fault1tolerant!storage!like!HDFS!

–  Used!to!recover!from!failure!of!the!node!running!the!driver!of!the!streaming!applica8on!!

–  Metadata!includes:!

•  Configura8on!1!The!configura8on!that!was!used!to!create!the!streaming!applica8on!

•  DStream!opera8ons!1!The!set!of!DStream!opera8ons!that!define!the!streaming!applica8on!

•  Incomplete!batches!1!Batches!whose!jobs!are!queued!but!have!not!completed!yet!

•  Data!checkpoin8ng!–  Saving!of!the!generated!RDDs!to!reliable!storage!–  Necessary!in!some!stateful!opera8ons!that!combine!data!across!mul8ple!batches!

Page 138: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Source

Source"" Ar=fact""

Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License] Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10

Advanced""

Sources:"

Basic""

Sources:"

Source"" Usage"

File Streams streamingContext.fileStream()Streams based on Custom Actors streamingContext.actorStream()

Queue of RDDs as Stream streamingContext.queueStream()

Socket Stream socketTextStream()

Simple Text Files textFileStream()

Page 139: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Kafka Direct Stream API •  Beginning!of!every!batch!interval,!iden8fy!the!range!of!offsets!to!consume!

•  When!each!batch’s!jobs!are!executed,!the!data!corresponding!to!the!offset!ranges!is!

read!from!Kava!for!processing!

•  Offsets!are!!saved!reliably!(with!checkpoints)!and!used!to!recompute!the!data!to!

recover!from!failures!

Spark"Streaming"

Driver"

Executor"

Kaua"

Query"latest"offsets"and"decide"offset"ranges"for"batch"

Launch"jobs"using"

"offset"ranges"

Read"data

"using"

offset"ran

ges"in"jobs

"

Page 140: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

140!

Spark!Streaming!Lab!

Page 141: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Hive!

Page 142: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Hive Architecture

NodeManager NodeManager NodeManager

Hadoop"

YARN!job!

Hive"

HiveServer2! Hive!

Compiler!

Op8mizer!

Executor!Megastore!

DB!

MapReduce!

CLI"

JDBC/"

ODBC"

Web"UI"

1.  User!issues!SQL!query!

2.  Hive!parses!and!plans!query!

3.  Query!converted!to!MapReduce!

and!executed!on!Hadoop!

1

Hive!

SQL!2

3

Page 143: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

143!

Hive"–"MR" Hive"–"Tez"

Comparing:!Hive/MR!vs.!Hive/Tez!

SELECT a.state, COUNT(*), AVERAGE(c.price) FROM a

JOIN b ON (a.id = b.id) JOIN c ON (a.itemId = c.itemId)

GROUP BY a.state

SELECT!a.state!

JOIN!(a,!c)!

SELECT!c.price!

SELECT!b.id!

JOIN(a,!b)!

GROUP!BY!a.state!

COUNT(*)!

AVERAGE(c.price)!

M M M

R R

M M

R

M M

R

M M

R

HDFS!

HDFS!

HDFS!

M M M

R R

R

M M

R

R

SELECT!a.state,!

c.itemId!

JOIN!(a,!c)!

JOIN(a,!b)!

GROUP!BY!a.state!

COUNT(*)!

AVERAGE(c.price)!

SELECT!b.id!

Tez!avoids!unneeded!writes!

to!HDFS!

Page 144: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

144!

Defining!a!Hive1Managed!Table!

CREATE TABLE customer ( customerID INT, firstName STRING, lastName STRING, birthday TIMESTAMP ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; !

Page 145: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

145!

Defining!an!External!Table!

CREATE EXTERNAL TABLE salaries ( gender string, age int, salary double, zip int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; !

Page 146: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

146!

Defining!a!Table!LOCATION!

CREATE EXTERNAL TABLE SALARIES ( gender string, age int, salary double, zip int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/train/salaries/'; !

Page 147: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

147!

Loading!Data!into!Hive!

LOAD!DATA!LOCAL!INPATH!’/tmp/customers.csv'!

OVERWRITE!INTO!TABLE!customers;!

LOAD!DATA!INPATH!'/user/train/customers.csv'!

OVERWRITE!INTO!TABLE!customers;!

!

INSERT!INTO!TABLE!birthdays!!

!SELECT!firstName,!lastName,!birthday!!

!FROM!customers!!

!WHERE!birthday!IS!NOT!NULL;!

Page 148: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

148!

Performing!Queries!

SELECT!*!FROM!customers;!

!

FROM!customers!

!SELECT!firstName,!lastName,!address,!zip!!

!WHERE!orderID!>!0!!

!ORDER!BY!zip;!

!

SELECT!customers.*,!orders.*!!

!FROM!customers!!

!JOIN!orders!ON!!

!(customers.customerID!=!orders.customerID);!

Page 149: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

149!

Hive!Par88ons!

•  Use!the!par==oned"by!clause!to!define!a!par88on!when!crea8ng!a!table:!

create!table!employees!(id!int,!name!string,!salary!double)!

par88oned!by!(dept!string);!

•  Subfolders!are!created!based!on!the!par88on!values:!/apps/hive/warehouse/employees!

!!!!/dept=hr/!

!!!!/dept=support/!

!!!!/dept=engineering/!

!!!!/dept=training/!

Page 150: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

150!

Hive!Join!Strategies!

Type" Approach" Pros" Cons"

Shuffle"Join"

Join!keys!are!shuffled!using!

MapReduce,!and!joins!are!

performed!on!the!reduce!side.!

Works!regardless!of!data!

size!or!layout.!

Most!resource1

intensive!and!

slowest!join!type.!

Map"

(Broadcast)"

Join"

Small!tables!are!loaded!into!

memory!in!all!nodes,!mapper!

scans!through!the!large!table,!and!

joins.!

Very!fast,!single!scan!

through!largest!table.!

All!but!one!table!must!be!

small!enough!to!fit!in!

RAM.!

SortqMergeq

Bucket"Join"

Mappers!take!advantage!of!co1

loca8on!of!keys!to!do!efficient!

joins.!

Very!fast!for!tables!of!any!

size.!

Data!must!be!sorted!and!

bucketed!ahead!of!8me.!

Page 151: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

151!

Columnar!Storage!for!Hive!Large!block!size!ideal!for!

map/reduce.!

Columnar!format!enables!

high!compression!and!high!

performance.!

Index"Data""

Row"Data""

Stripe"Footer""

Index"Data""

Row"Data""

Stripe"Footer""

Index"Data""

Row"Data""

Stripe"Footer""

File"Footer"

Postscript"

250!M

B!Stripe!!

250!M

B!Stripe!!

250!M

B!Stripe!!

Column"1"

Column"2""

Column"6""

Column"5""

Column"3""

Column"4"

Column"8""

Column"7"

Column"1"

Column"2""

Column"6""

Column"5""

Column"3""

Column"4"

Column"8""

Column"7"

Page 152: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Query Planner •  Cost1Based!Op8mizer!(CBO)!uses!sta8s8cs!from!Hive!tables!to!produce!

op8mal!query!plans!

!

Why!cost1based!op8miza8on?!

–  Ease!of!Use!–!Join!Reordering!–  Reduces!the!need!for!specialists!to!tune!queries.!–  More!efficient!query!plans!lead!to!befer!cluster!u8liza8on.!

set hive.cbo.enable=true; set hive.stats.fetch.column.stats=true;

!

Page 153: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Transactions

Read"

Op=mized"

ORC"File"

Read"

Op=mized"

ORC"File"

Delta""

File""

Merge"

Read"

Op=mized"

ORC"File"

Task"Task"Task"

Page 154: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

154!

Hive!Demo!

Page 155: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

155!

Hive!Demo!

Page 156: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Security!

Page 157: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Authentication Authentication

Who am I?

Can I prove it?

Control access to cluster.

Authorization

•  What am I allowed to do?

•  Restrict access to explicit data

Data Protection

•  Protect sensitive data at rest & in-motion

Audit

•  Who did what, when & with what results?

Kerberos Knox Gateway

•  Core Hadoop (MR ACL, HDFS perms, Hive & HBase Authz)

•  Apache Ranger

•  Core Hadoop•  OS Level

Encryption

Page 158: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Kerberos Component Architecture

AD"/"LDAP"

Client"

KDC"

Use"exis=ng"directory"

tools"to"manage"users.""

Step"1""

Login"and"get"a""

TGT"Ticket"."

KDC"contains"host"+"

service"+"user"Principals"

and"ACL"for"these"

principals."

Step"3""

Get"access"to"NN"Service"

with"TGT"and"return""

NNqST"Ticket."

Step"2"

Store"TGT"in"cache.""

Step"5""

Read/write"ac=on"on"a"file"

with"NNqST"is"sent"to"NN."If"

access"authorized,"NN"returns"

block"loca=ons,"block"IDs,"

anblock"access"tokens.""

Step"4""

Store"NNqST"in"

cache."

Client’s!

Kerberos!!

Ticket!

Cache!!

TGT"

Ticket""

TGT"

Ticket""

Users" Users/Hosts""

Services""

NN" HIVE" JT" DN"Step"6""

Read/write"block"given"

block"access"token"and"

block"ID"."

Page 159: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Authorization & Audit

Authentication

•  Who am I?

•  Can I prove it?

•  Control access to cluster.

AuthorizationWhat am I allowed to do?

Restrict access to explicit data

Data Protection

•  Protect sensitive data at rest & in-motion

Audit

•  Who did what, when & with what results?

•  Kerberos •  Knox Gateway

Core Hadoop (MR ACL, HDFS perms, Hive & HBase Authz)Apache RangerPartners

•  Core Hadoop•  OS Level

Encryption

Page 160: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Controlling Who Can Do What •  Access!Control!Services!exist!for!each!of!the!Hadoop!components!

–  HDFS!has!file!Permissions!

–  Higher!Level!services!provide!Access!Control!Rules!•  MapReduce,!HBase,!Hive!all!have!Access!Control!Lists!(ACL)!

•  Improvements!to!these!services!are!being!led!by!Hortonworks!Team:!

–  HDFS!Improvements!1!More!flexible!permissions!for!HDFS!via!mul8ple!policies!on!

the!same!file!or!directory!

–  Hive!Improvements!–!Hortonworks!ini8a8ve!called!Hive!ATZ1NG!allows!familiar!

SQL/database!syntax!(GRANT/REVOKE)!and!allows!more!clients!to!be!secure.!!

Page 161: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS File Permissions Example •  Authoriza8on!requirements:!

•  In!a!sales!department,!they!would!like!a!single!user!Maya!(Department!Manager)!to!control!all!

modifica8ons!to!sales!data!.!

•  Other!members!of!sales!department!need!to!view!the!data,!but!can’t!modify!it.!

•  Everyone!else!in!the!company!must!not!be!allowed!to!view!the!data.!!

•  Can!be!implemented!via!the!following:!

Page 162: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS Extended ACLs •  The!Problem!

•  No!longer!feasible!for!Maya!to!control!all!modifica8ons!to!the!file!

-  New!Requirement:!Maya,!Diane!and!Clark!are!allowed!to!make!modifica8ons!

-  New!Requirement:!New!group!called!execu8ves!should!be!able!to!read!the!sales!data!

•  Current!permissions!model!only!allows!permissions!at!1!group!and!1!user!

•  HDFS!Extended!ACLs!solves!this!issue!•  Now!assign!different!permissions!to!different!users!and!groups!

HDFS""

Directory"

Owner""

Group"

Others""

Group"D"

Group"F""

User"Y"

Page 163: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS Extended ACLs (1 of 2) New!Tools!for!ACL!Management!(setacl,!getacl)!

hdfs dfs -setfacl -m group:execs:r-- /sales-data hdfs dfs -getfacl /sales-data # file: /sales-data # owner: bruce # group: sales user::rw- group::r-- group:execs:r-- mask::r-- other::--

How!do!you!know!if!a!directory!has!ACL’s!set?!

hdfs dfs -ls /sales-data Found 1 items -rw-r-----+ 3 bruce sales 0 2014-03-04 16:31 /sales-data

Page 164: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

HDFS Extended ACLs (2 of 2) Default!ACLs!

hdfs dfs -setfacl -m default:group:execs:r-x /monthly-sales-data hdfs dfs -mkdir /monthly-sales-data/JAN hdfs dfs –getfacl /monthly-sales-data/JAN

# file: /monthly-sales-data/JAN # owner: bruce # group: sales user::rwx group::r-x group:execs:r-x mask::r-x other::--- default:user::rwx default:group::r-x default:group:execs:r-x default:mask::r-x default:other::---

Page 165: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Authorization & Audit

Authentication

•  Who am I?

•  Can I prove it?

•  Control access to cluster.

Authorization

What am I allowed to do?

Restrict access to explicit data

Data Protection

Protect sensitive data at rest & in-motion

Audit

•  Who did what, when & with what results?

•  Kerberos •  Knox Gateway

Core Hadoop (MR ACL, HDFS perms, Hive & HBase Authz)Apache RangerPartners

Core HadoopOS Level Encryption

Page 166: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Encryption •  Data!at!Rest!

–  Op8on!1:!Transparent!Encryp8on!(Built1in)!–  Op8on!2:!OS!Level!Encryp8on!(Out!of!the!Box)!–  Op8on!3:!Custom!Development!(experience!doing!it)!

–  Op8on!4:!Cer8fied!Partners!(e.g:!Protegrity,!Voltage!Security)!•  Data!on!the!Wire!

–  All!wire!protocols!can!be!encrypted!by!HDP!platorm!(2.0+).!Wire1level!

encryp8on!enhancements!led!by!HW!Team.!!

•  Column!Level!

–  No!current!out!of!the!box!support!in!Hadoop!–  Cer8fied!Partners!like!Voltage!Security!provide!these!capabili8es!

Page 167: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Encrypting Data at Rest in HDFS Solu=on" Type" Descrip=on"

•  Transparent!

Encryp8on!

•  HDFS! •  Provides!with!a!solu8on!to!data!encryp8on!on!a!

higher!level!than!the!OS!one!whilst!remaining!na8ve!

and!transparent!to!Hadoop!

!

•  No!third1party!solu8on!required!

•  LUKS! •  File!System/Volume! •  Linux!Unified!Key!Setup1on1disk1format!(or!LUKS)!allows!you!to!

encrypt!par88ons!on!your!Linux;!volume!encryp8on!

technology!

•  Advantage:!Reduced!complexity!since!encryp8on!is!seamlessly!

done!by!OS.!

•  Voltage!Security! •  HDFS! •  Encryp8on!vendor/partner!!provides!seamless!encryp8on!of!

data!at!MR!level!via!PIg!and!Hive!UDFS!!

•  Advantage:!Format!Preserving!Encryp8on!

•  Protegrity! •  HDFS! •  Provides!encryp8on!for!full!stack!

•  Advantage:!Key!management,!centralized!authoriza8on!

policies!

Page 168: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Data In-Motion: Points of Communication

Nodes"2

3

4

Data"Transfer""

(TCP/IP)"

Node"to"Node""

(RPC)""

M/R"Shuffle""

(HTTP)"

Nodes"Client"

2

3

4

Data"Transfer""

(TCP/IP)"

Hadoop"client""

(RPC)"

Web"UI"

(HTTP)"

Nodes"Nodes"

1WebHDFS"

(REST)" Hadoop"Cluster""

Page 169: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

Encrypting Data In-Motion Protocol" Communica=on"Point" Encryp=on"Mechanism"

•  REST! •  WebHDFS!!(Client!to!Cluster)!

•  Client!to!Knox!

•  REST!over!SSL!

•  Knox!Gateway!SSL!

•  SPNEGO!1!provides!a!mechanism!for!extending!Kerberos!

to!Web!applica8ons!through!the!standard!HTTP!protocol"

•  HTTP! •  NameNode/

ResourceManager!UI!

•  MapReduce!Shuffle!

•  HTTPS!

•  Encrypted!MapReduce!Shuffle!(MAPREDUCE14117)"

•  RPC! •  Hadoop!Client!(Client!to!

Cluster,!Intra1Cluster)!

•  SASL!–!The!Hadoop!RPC!system!implements!SASL!which!

provides!different!QoP!including!encryp8on"

•  TCP/IP! •  Data!Transfer!(Client!to!

Cluster,!Intra1Cluster)!

•  Encrypted!DataTransfer!Protocol!available!in!HDP!2!

(HDFS13637)!

•  Adding!SASL!support!to!the!DataTransferProtocol!

Page 170: Big!Data!Bootcamp!! 2016! - s3.amazonaws.com › maylong › Chicago+Bootcamp › Chicago… · Hadoop!2.x! HADOOP"1.x! HDFS" (redundant,!reliable!storage)! MapReduce" (cluster!resource!management!!&!data!processing)!

170!

Security!Lab!