cascading at the lyon hadoop user group

57
Cascading Hadoop User Group Lyon 2015-02-06 Arnaud Cogoluègnes - Zenika

Upload: acogoluegnes

Post on 15-Jul-2015

283 views

Category:

Technology


0 download

TRANSCRIPT

Page 1: Cascading at the Lyon Hadoop User Group

CascadingHadoop User Group Lyon

2015-02-06Arnaud Cogoluègnes - Zenika

Page 2: Cascading at the Lyon Hadoop User Group

Content

Cascading: what, why, how?Hadoop basics along the way

No pre-requisites to follow

Page 3: Cascading at the Lyon Hadoop User Group

Cascading, what is it?

Java frameworkApache License, Version 2.0

To build data-oriented applicationse.g. ETL-like applications

Page 4: Cascading at the Lyon Hadoop User Group

Cascading key features

Java APIMature (runs on MapReduce for years)

TestabilityRe-usability

Built-in features (filter, join, aggregator, etc)

Page 5: Cascading at the Lyon Hadoop User Group

Cascading simple flowFields usersFields = new Fields(

"name","country","gender"

);

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

jason US M

arnaud FR M

cynthia US F

mike US M

paul GB M

anna RU F

clare GB F

input file

Page 6: Cascading at the Lyon Hadoop User Group

Connecting flow to source and sinkFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = ... // file’s location and structure abstraction

Tap usersOut = ...

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

Page 7: Cascading at the Lyon Hadoop User Group

Taps and schemesFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new Hfs(

new TextDelimited(usersFields,false,"\t"), // structure

"/in" // location

);

Tap usersOut = new Hfs(

new TextDelimited(usersFields, false, "\t"),"/out"

);

Page 8: Cascading at the Lyon Hadoop User Group

Executing a MapReduce flowFlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new Hadoop2MR1FlowConnector().connect(flowDef).complete();

Page 9: Cascading at the Lyon Hadoop User Group

My first MapReduce flowFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new Hfs(...);

Tap usersOut = new Hfs(...);

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new Hadoop2MR1FlowConnector().connect(flowDef).complete();

Page 10: Cascading at the Lyon Hadoop User Group

Changing the outputFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new Hfs(...);

Tap usersOut = new Hfs( new SequenceFile(usersFields),"/out");

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new Hadoop2MR1FlowConnector().connect(flowDef).complete();

Page 11: Cascading at the Lyon Hadoop User Group

Hadoop 2

HDFS

YARN

MapReduce Yourapp

Page 12: Cascading at the Lyon Hadoop User Group

Blocks, datanodes, namenode

file.csv B1 B2 B3 file is made of 3 blocks (default block size is 128 MB)

B1 B2 B1 B3

B1 B2 B2 B3

DN 1 DN 2

DN 4DN 3datanodes store files blocks(here block 3 is under-replicated)

B1 : 1, 2, 3 B2 : 1, 3, 4B3 : 2, 4

Namenode

namenode handles files metadata and enforces replication

Page 13: Cascading at the Lyon Hadoop User Group

MapReducefile.csv B1 B2 B3

Mapper

Mapper

Mapper

B1

B2

B3

Reducer

Reducer

k1,v1

k1,v2

k1 [v1,v2]

Page 14: Cascading at the Lyon Hadoop User Group

Code goes to datafile.csv B1 B2 B3

Mapper

Mapper

Mapper

B1

B2

B3

Reducer

Reducer

k1,v1

k1,v2

k1 [v1,v2]

B1 B2 B1 B3

B1 B2 B2 B3

DN 1 DN 2

DN 4DN 3

DN 1

DN 3

DN 4

Page 15: Cascading at the Lyon Hadoop User Group

Local MapReduce in a test

Not bad

Page 16: Cascading at the Lyon Hadoop User Group

Local connector

Better

Page 17: Cascading at the Lyon Hadoop User Group

Local connector for testingFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new FileTap(new TextDelimited(usersFields,false,"\t"),"in.txt");

Tap usersOut = new FileTap(

new TextDelimited(usersFields, false, "\t"), "out.txt"

);

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new LocalFlowConnector().connect(flowDef).complete();

Page 18: Cascading at the Lyon Hadoop User Group

Users by countriesFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new GroupBy(users,new Fields("country"));

users = new Every(users,new Count(new Fields("count")));

Tap usersOut = new FileTap(

new TextDelimited(new Fields("country","count"), false, "\t"),"/out.txt"

);

jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F

FR 1RU 1GB 2US 3

Page 19: Cascading at the Lyon Hadoop User Group

Usage by countries?

jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F

jason loginmike newcontractcynthia loginanna logoutjason newcontractjason logout...

logs users

Page 20: Cascading at the Lyon Hadoop User Group

Join logs and usersFields usersFields = new Fields("name","country","gender");

Fields logsFields = new Fields("username","action");

Pipe users = new Pipe("users");

Pipe logs = new Pipe("logs");

Pipe logsUsers = new CoGroup(

logs,new Fields("username"),

users,new Fields("name")

);

Page 21: Cascading at the Lyon Hadoop User Group

Join logs and usersPipe logsUsers = new CoGroup(

logs,new Fields("username"),

users,new Fields("name")

);

jason loginmike newcontractcynthia loginanna logoutjason newcontractjason logout...

jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F

anna RU F logoutcynthia US F loginjason US M loginjason US M newcontractjason US M logoutmike US M newcontract

Page 22: Cascading at the Lyon Hadoop User Group

Usage by countrylogsUsers = new GroupBy(logsUsers,new Fields("country"));

logsUsers = new Every(logsUsers,new Count(new Fields("count")));

Page 23: Cascading at the Lyon Hadoop User Group

Usage by countriesTap usersIn = new FileTap(new TextDelimited(usersFields,false,"\t"),"users.txt");

Tap logsIn = new FileTap(new TextDelimited(logsFields,false,"\t"),"logs.txt");

Tap usageOut = new FileTap(

new TextDelimited(new Fields("country","count"), false, "\t"),

"usage.txt"

);

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addSource(logs,logsIn)

.addTailSink(logsUsers, usageOut);

RU 1US 5

Page 24: Cascading at the Lyon Hadoop User Group

Repartition join

M

M

M

R

R

jdoe,USpmartin,FR

jdoe,/productspmartin,/checkoutjdoe,/account

jdoe,US

jdoe,/products

jdoe,/account

jdoe,/productsjdoe,USjdoe,/account

jdoe,/products,USjdoe,/account,US

in-memorycartesian product

Page 25: Cascading at the Lyon Hadoop User Group

Repartition join optimization

M

M

M

R

R

jdoe,USpmartin,FR

jdoe,/productspmartin,/checkoutjdoe,/account

jdoe,US

jdoe,/products

jdoe,/account

jdoe,USjdoe,/productsjdoe,/account

jdoe,/products,USjdoe,/account,US

only “users” in memory(thanks to dataset indicator sorting,

i.e. “secondary sort”)

Page 26: Cascading at the Lyon Hadoop User Group

Optimization in Cascading CoGroup“During co-grouping, for any given unique grouping key, all of the rightmost pipes will accumulate the current grouping values into memory so they may be iterated across for every value in the left hand side pipe.(...)There is no accumulation for the left hand side pipe, only for those to the "right".

Thus, for the pipe that has the largest number of values per unique key grouping, on average, it should be made the "left hand side" pipe (lhs).”

Page 27: Cascading at the Lyon Hadoop User Group

Replicated/asymmetrical join

M

M

M

jdoe,USpmartin,FR

jdoe,/productspmartin,/checkoutjdoe,/account

jdoe,/products,US

jdoe,USpmartin,FR

jdoe,USpmartin,FR

jdoe,/account,US

pmartin,/checkout,FR

Loaded in distributed cache(hence “replicated”)

Page 28: Cascading at the Lyon Hadoop User Group

Functionusers = new Each(

users,

new Fields("country"), // argument

new CountryFullnameFunction(new Fields("countryFullname")), // function output

new Fields("name","countryFullname","gender") // what we keep

);

jason United States Marnaud France Mcynthia United States Fmike United States Mpaul United Kingdom Manna Russia Fclare United Kingdom F

Page 29: Cascading at the Lyon Hadoop User Group

Function (naive) implementationpublic static class CountryFullnameFunction extends BaseOperation implements Function {

public CountryFullnameFunction(Fields fields) { super(fields); }

@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { String country = functionCall.getArguments().getString(0); Locale locale = new Locale("",country); Tuple tuple = new Tuple(); tuple.add(locale.getDisplayCountry(Locale.ENGLISH)); functionCall.getOutputCollector().add(tuple); }

}

Page 30: Cascading at the Lyon Hadoop User Group

Functionspublic static class CountryFullnameFunction extends BaseOperation implements Function {

public CountryFullnameFunction(Fields fields) { super(fields); }

@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { // this is executed remotely // tips: initialize (small) caches, re-use objects, etc. // functions have callbacks for this }

}

Page 31: Cascading at the Lyon Hadoop User Group

Re-using objects in a functionpublic static class CountryFullnameFunction extends BaseOperation implements Function {

Tuple tuple = new Tuple();

public CountryFullnameFunction(Fields fields) { super(fields); }

@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { String country = functionCall.getArguments().getString(0); Locale locale = new Locale("",country); tuple.clear(); tuple.add(locale.getDisplayCountry(Locale.ENGLISH)); functionCall.getOutputCollector().add(tuple); }

}

Page 32: Cascading at the Lyon Hadoop User Group

Using Avro with Cascading// Avro is splittable, supports compression,// and has schemas Schema schema = new Schema.Parser().parse(schemaAsJson); AvroScheme avroScheme = new AvroScheme(schema);Tap tap = new Hfs(avroScheme,"/out");

Page 33: Cascading at the Lyon Hadoop User Group

Using Parquet files// Parquet is column-oriented// it supports splits and compressionMessageType type = ... // ~ the schemaScheme parquetScheme = new ParquetTupleScheme( fields, // fields to read fields, // fields to write type.toString());

Tap tap = new Hfs( parquetScheme, "/out");

Page 34: Cascading at the Lyon Hadoop User Group

Other dialects

Cascalog (Clojure)Scalding (Scala)

...

Page 35: Cascading at the Lyon Hadoop User Group

Testing with plungerFields usersFields = new Fields("name","country","gender");

Data corpus = new DataBuilder(usersFields)

.addTuple("jason","US","M")

(...)

.addTuple("cynthia", "US", "F")

.build();

Pipe users = plunger.newNamedPipe("users", corpus);

users = new GroupBy(users,new Fields("country"));

users = new Every(users,new Count(new Fields("count")));

Plunger plunger = new Plunger();

Bucket bucket = plunger.newBucket(new Fields("country", "count"), users);

Assert.assertEquals(bucket.result().asTupleList().size(),4);

Page 36: Cascading at the Lyon Hadoop User Group

Flow visualizationFlow flow = new LocalFlowConnector().connect(flowDef);flow.writeDOT("cascading-flow.dot");

digraph G { 1 [label = "Every('users')[Count[decl:[{1}:'count']]]"]; 2 [label = "FileTap['TextDelimited[['country', 'count']]']['/tmp/junit1462026100615315705/junit2286442878134169792.tmp']"]; 3 [label = "GroupBy('users')[by:['country']]"]; 4 [label = "FileTap['TextDelimited[['name', 'country', 'gender']]']['/home/acogoluegnes/prog/hadoop-dev/./src/test/resources/cascading/users.txt']"]; 5 [label = "[head]\n2.6.2\nlocal:2.6.2:Concurrent, Inc."]; 6 [label = "[tail]"]; 1 -> 2 [label = "[{2}:'country', 'count']\n[{3}:'name', 'country', 'gender']"]; 3 -> 1 [label = "users[{1}:'country']\n[{3}:'name', 'country', 'gender']"]; 5 -> 4 [label = ""]; 2 -> 6 [label = "[{2}:'country', 'count']\n[{2}:'country', 'count']"]; 4 -> 3 [label = "[{3}:'name', 'country', 'gender']\n[{3}:'name', 'country', 'gender']"];}

Page 37: Cascading at the Lyon Hadoop User Group
Page 38: Cascading at the Lyon Hadoop User Group

Typical processing

Receiving data (bulk or streams)Processing in batch mode

Feed to real-time systems (RDBMs, NoSQL)

Page 39: Cascading at the Lyon Hadoop User Group

Use cases

Parsing, processing, aggregating data“Diff-ing” 2 datasets

Joining data

Page 40: Cascading at the Lyon Hadoop User Group

Join generated and reference data

Hadoop

Processing(join, transformation)Generated data

Reporting

Reference data

Page 41: Cascading at the Lyon Hadoop User Group

Data handling

Raw data Parsed data Processing and insertion

Archives View on data Transformations

Avro, GZIPKeep it for forever

Parquet, SnappyKeep 2 years of data Processing (Cascading)

HDFS Real time DB

Page 42: Cascading at the Lyon Hadoop User Group

Flow handling with Spring Batch

Archiving

Processing Processing Processing

Cleaning

Java, API HDFS

Cascading

MapReduce

Page 43: Cascading at the Lyon Hadoop User Group

Lambda architecture

Page 44: Cascading at the Lyon Hadoop User Group

Lambda architecture wish list

● Fault-tolerant● Low latency● Scalable● General

● Extensible● Ad hoc queries● Minimal maintenance● Debuggable

Page 45: Cascading at the Lyon Hadoop User Group

Layers

Speed layer

Serving layer

Batch layer

Page 46: Cascading at the Lyon Hadoop User Group

Batch layer

Speed layer

Serving layer

Batch layer

Dataset storage.Views computation.

Page 47: Cascading at the Lyon Hadoop User Group

Serving layer

Speed layer

Serving layer

Batch layer

Random access to batch views.

Page 48: Cascading at the Lyon Hadoop User Group

Speed layer

Speed layer

Serving layer

Batch layer

Low latency access.

Page 49: Cascading at the Lyon Hadoop User Group

Batch layer

Speed layer

Serving layer

Batch layer

Hadoop (MapReduce, HDFS).Thrift, Cascalog (i.e. Cascading).

Page 50: Cascading at the Lyon Hadoop User Group

Serving layer

Speed layer

Serving layer

Batch layer

ElephantDB, BerkeleyDB.

Page 51: Cascading at the Lyon Hadoop User Group

Speed layer

Speed layer

Serving layer

Batch layer

Cassandra, Storm, Kafka.

Page 52: Cascading at the Lyon Hadoop User Group

Hive, Pig, Cascading

UDF : User Defined Function

Hive

+SQL (non-standard)Low learning curveExtensible with UDF

-So-so testabilitySo-so reusabilityNo flow controlSpread logic (script, java, shell)Programming with UDF

Pig

+Pig LatinLow learning curveExtensible with UDF

-So-so testabilitySo-so reusabilitySpread logic (script, java, shell)Programming with UDF

Cascading

+API JavaUnit testableFlow control (if, try/catch, etc)Good re-usability

-Programming needed

Page 53: Cascading at the Lyon Hadoop User Group

SQL on Cascading: Lingual

Pure Cascading underneathANSI/ISO standard SQL-99

JDBC DriverQuery any system…

… with an available Cascading Tap

Page 54: Cascading at the Lyon Hadoop User Group

Management & monitoring: Driven

CommercialAnalyze Cascading flows

SaaS and on-site deployment

Image: http://cascading.io/driven/

Page 55: Cascading at the Lyon Hadoop User Group

Image: http://cascading.io/driven/

Page 56: Cascading at the Lyon Hadoop User Group

Future: Cascading 3.0

Major rewritingBetter extensibility

MapReduce planner optimizationTez and Storm support

Page 57: Cascading at the Lyon Hadoop User Group

Thank you!