cascading at the lyon hadoop user group

Post on 15-Jul-2015

283 Views

Category:

Technology

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

CascadingHadoop User Group Lyon

2015-02-06Arnaud Cogoluègnes - Zenika

Content

Cascading: what, why, how?Hadoop basics along the way

No pre-requisites to follow

Cascading, what is it?

Java frameworkApache License, Version 2.0

To build data-oriented applicationse.g. ETL-like applications

Cascading key features

Java APIMature (runs on MapReduce for years)

TestabilityRe-usability

Built-in features (filter, join, aggregator, etc)

Cascading simple flowFields usersFields = new Fields(

"name","country","gender"

);

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

jason US M

arnaud FR M

cynthia US F

mike US M

paul GB M

anna RU F

clare GB F

input file

Connecting flow to source and sinkFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = ... // file’s location and structure abstraction

Tap usersOut = ...

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

Taps and schemesFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new Hfs(

new TextDelimited(usersFields,false,"\t"), // structure

"/in" // location

);

Tap usersOut = new Hfs(

new TextDelimited(usersFields, false, "\t"),"/out"

);

Executing a MapReduce flowFlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new Hadoop2MR1FlowConnector().connect(flowDef).complete();

My first MapReduce flowFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new Hfs(...);

Tap usersOut = new Hfs(...);

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new Hadoop2MR1FlowConnector().connect(flowDef).complete();

Changing the outputFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new Hfs(...);

Tap usersOut = new Hfs( new SequenceFile(usersFields),"/out");

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new Hadoop2MR1FlowConnector().connect(flowDef).complete();

Hadoop 2

HDFS

YARN

MapReduce Yourapp

Blocks, datanodes, namenode

file.csv B1 B2 B3 file is made of 3 blocks (default block size is 128 MB)

B1 B2 B1 B3

B1 B2 B2 B3

DN 1 DN 2

DN 4DN 3datanodes store files blocks(here block 3 is under-replicated)

B1 : 1, 2, 3 B2 : 1, 3, 4B3 : 2, 4

Namenode

namenode handles files metadata and enforces replication

MapReducefile.csv B1 B2 B3

Mapper

Mapper

Mapper

B1

B2

B3

Reducer

Reducer

k1,v1

k1,v2

k1 [v1,v2]

Code goes to datafile.csv B1 B2 B3

Mapper

Mapper

Mapper

B1

B2

B3

Reducer

Reducer

k1,v1

k1,v2

k1 [v1,v2]

B1 B2 B1 B3

B1 B2 B2 B3

DN 1 DN 2

DN 4DN 3

DN 1

DN 3

DN 4

Local MapReduce in a test

Not bad

Local connector

Better

Local connector for testingFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new Unique(users,new Fields("name"));

Tap usersIn = new FileTap(new TextDelimited(usersFields,false,"\t"),"in.txt");

Tap usersOut = new FileTap(

new TextDelimited(usersFields, false, "\t"), "out.txt"

);

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addTailSink(users, usersOut);

new LocalFlowConnector().connect(flowDef).complete();

Users by countriesFields usersFields = new Fields("name","country","gender");

Pipe users = new Pipe("users");

users = new GroupBy(users,new Fields("country"));

users = new Every(users,new Count(new Fields("count")));

Tap usersOut = new FileTap(

new TextDelimited(new Fields("country","count"), false, "\t"),"/out.txt"

);

jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F

FR 1RU 1GB 2US 3

Usage by countries?

jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F

jason loginmike newcontractcynthia loginanna logoutjason newcontractjason logout...

logs users

Join logs and usersFields usersFields = new Fields("name","country","gender");

Fields logsFields = new Fields("username","action");

Pipe users = new Pipe("users");

Pipe logs = new Pipe("logs");

Pipe logsUsers = new CoGroup(

logs,new Fields("username"),

users,new Fields("name")

);

Join logs and usersPipe logsUsers = new CoGroup(

logs,new Fields("username"),

users,new Fields("name")

);

jason loginmike newcontractcynthia loginanna logoutjason newcontractjason logout...

jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F

anna RU F logoutcynthia US F loginjason US M loginjason US M newcontractjason US M logoutmike US M newcontract

Usage by countrylogsUsers = new GroupBy(logsUsers,new Fields("country"));

logsUsers = new Every(logsUsers,new Count(new Fields("count")));

Usage by countriesTap usersIn = new FileTap(new TextDelimited(usersFields,false,"\t"),"users.txt");

Tap logsIn = new FileTap(new TextDelimited(logsFields,false,"\t"),"logs.txt");

Tap usageOut = new FileTap(

new TextDelimited(new Fields("country","count"), false, "\t"),

"usage.txt"

);

FlowDef flowDef = FlowDef.flowDef()

.addSource(users, usersIn)

.addSource(logs,logsIn)

.addTailSink(logsUsers, usageOut);

RU 1US 5

Repartition join

M

M

M

R

R

jdoe,USpmartin,FR

jdoe,/productspmartin,/checkoutjdoe,/account

jdoe,US

jdoe,/products

jdoe,/account

jdoe,/productsjdoe,USjdoe,/account

jdoe,/products,USjdoe,/account,US

in-memorycartesian product

Repartition join optimization

M

M

M

R

R

jdoe,USpmartin,FR

jdoe,/productspmartin,/checkoutjdoe,/account

jdoe,US

jdoe,/products

jdoe,/account

jdoe,USjdoe,/productsjdoe,/account

jdoe,/products,USjdoe,/account,US

only “users” in memory(thanks to dataset indicator sorting,

i.e. “secondary sort”)

Optimization in Cascading CoGroup“During co-grouping, for any given unique grouping key, all of the rightmost pipes will accumulate the current grouping values into memory so they may be iterated across for every value in the left hand side pipe.(...)There is no accumulation for the left hand side pipe, only for those to the "right".

Thus, for the pipe that has the largest number of values per unique key grouping, on average, it should be made the "left hand side" pipe (lhs).”

Replicated/asymmetrical join

M

M

M

jdoe,USpmartin,FR

jdoe,/productspmartin,/checkoutjdoe,/account

jdoe,/products,US

jdoe,USpmartin,FR

jdoe,USpmartin,FR

jdoe,/account,US

pmartin,/checkout,FR

Loaded in distributed cache(hence “replicated”)

Functionusers = new Each(

users,

new Fields("country"), // argument

new CountryFullnameFunction(new Fields("countryFullname")), // function output

new Fields("name","countryFullname","gender") // what we keep

);

jason United States Marnaud France Mcynthia United States Fmike United States Mpaul United Kingdom Manna Russia Fclare United Kingdom F

Function (naive) implementationpublic static class CountryFullnameFunction extends BaseOperation implements Function {

public CountryFullnameFunction(Fields fields) { super(fields); }

@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { String country = functionCall.getArguments().getString(0); Locale locale = new Locale("",country); Tuple tuple = new Tuple(); tuple.add(locale.getDisplayCountry(Locale.ENGLISH)); functionCall.getOutputCollector().add(tuple); }

}

Functionspublic static class CountryFullnameFunction extends BaseOperation implements Function {

public CountryFullnameFunction(Fields fields) { super(fields); }

@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { // this is executed remotely // tips: initialize (small) caches, re-use objects, etc. // functions have callbacks for this }

}

Re-using objects in a functionpublic static class CountryFullnameFunction extends BaseOperation implements Function {

Tuple tuple = new Tuple();

public CountryFullnameFunction(Fields fields) { super(fields); }

@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { String country = functionCall.getArguments().getString(0); Locale locale = new Locale("",country); tuple.clear(); tuple.add(locale.getDisplayCountry(Locale.ENGLISH)); functionCall.getOutputCollector().add(tuple); }

}

Using Avro with Cascading// Avro is splittable, supports compression,// and has schemas Schema schema = new Schema.Parser().parse(schemaAsJson); AvroScheme avroScheme = new AvroScheme(schema);Tap tap = new Hfs(avroScheme,"/out");

Using Parquet files// Parquet is column-oriented// it supports splits and compressionMessageType type = ... // ~ the schemaScheme parquetScheme = new ParquetTupleScheme( fields, // fields to read fields, // fields to write type.toString());

Tap tap = new Hfs( parquetScheme, "/out");

Other dialects

Cascalog (Clojure)Scalding (Scala)

...

Testing with plungerFields usersFields = new Fields("name","country","gender");

Data corpus = new DataBuilder(usersFields)

.addTuple("jason","US","M")

(...)

.addTuple("cynthia", "US", "F")

.build();

Pipe users = plunger.newNamedPipe("users", corpus);

users = new GroupBy(users,new Fields("country"));

users = new Every(users,new Count(new Fields("count")));

Plunger plunger = new Plunger();

Bucket bucket = plunger.newBucket(new Fields("country", "count"), users);

Assert.assertEquals(bucket.result().asTupleList().size(),4);

Flow visualizationFlow flow = new LocalFlowConnector().connect(flowDef);flow.writeDOT("cascading-flow.dot");

digraph G { 1 [label = "Every('users')[Count[decl:[{1}:'count']]]"]; 2 [label = "FileTap['TextDelimited[['country', 'count']]']['/tmp/junit1462026100615315705/junit2286442878134169792.tmp']"]; 3 [label = "GroupBy('users')[by:['country']]"]; 4 [label = "FileTap['TextDelimited[['name', 'country', 'gender']]']['/home/acogoluegnes/prog/hadoop-dev/./src/test/resources/cascading/users.txt']"]; 5 [label = "[head]\n2.6.2\nlocal:2.6.2:Concurrent, Inc."]; 6 [label = "[tail]"]; 1 -> 2 [label = "[{2}:'country', 'count']\n[{3}:'name', 'country', 'gender']"]; 3 -> 1 [label = "users[{1}:'country']\n[{3}:'name', 'country', 'gender']"]; 5 -> 4 [label = ""]; 2 -> 6 [label = "[{2}:'country', 'count']\n[{2}:'country', 'count']"]; 4 -> 3 [label = "[{3}:'name', 'country', 'gender']\n[{3}:'name', 'country', 'gender']"];}

Typical processing

Receiving data (bulk or streams)Processing in batch mode

Feed to real-time systems (RDBMs, NoSQL)

Use cases

Parsing, processing, aggregating data“Diff-ing” 2 datasets

Joining data

Join generated and reference data

Hadoop

Processing(join, transformation)Generated data

Reporting

Reference data

Data handling

Raw data Parsed data Processing and insertion

Archives View on data Transformations

Avro, GZIPKeep it for forever

Parquet, SnappyKeep 2 years of data Processing (Cascading)

HDFS Real time DB

Flow handling with Spring Batch

Archiving

Processing Processing Processing

Cleaning

Java, API HDFS

Cascading

MapReduce

Lambda architecture

Lambda architecture wish list

● Fault-tolerant● Low latency● Scalable● General

● Extensible● Ad hoc queries● Minimal maintenance● Debuggable

Layers

Speed layer

Serving layer

Batch layer

Batch layer

Speed layer

Serving layer

Batch layer

Dataset storage.Views computation.

Serving layer

Speed layer

Serving layer

Batch layer

Random access to batch views.

Speed layer

Speed layer

Serving layer

Batch layer

Low latency access.

Batch layer

Speed layer

Serving layer

Batch layer

Hadoop (MapReduce, HDFS).Thrift, Cascalog (i.e. Cascading).

Serving layer

Speed layer

Serving layer

Batch layer

ElephantDB, BerkeleyDB.

Speed layer

Speed layer

Serving layer

Batch layer

Cassandra, Storm, Kafka.

Hive, Pig, Cascading

UDF : User Defined Function

Hive

+SQL (non-standard)Low learning curveExtensible with UDF

-So-so testabilitySo-so reusabilityNo flow controlSpread logic (script, java, shell)Programming with UDF

Pig

+Pig LatinLow learning curveExtensible with UDF

-So-so testabilitySo-so reusabilitySpread logic (script, java, shell)Programming with UDF

Cascading

+API JavaUnit testableFlow control (if, try/catch, etc)Good re-usability

-Programming needed

SQL on Cascading: Lingual

Pure Cascading underneathANSI/ISO standard SQL-99

JDBC DriverQuery any system…

… with an available Cascading Tap

Management & monitoring: Driven

CommercialAnalyze Cascading flows

SaaS and on-site deployment

Image: http://cascading.io/driven/

Image: http://cascading.io/driven/

Future: Cascading 3.0

Major rewritingBetter extensibility

MapReduce planner optimizationTez and Storm support

Thank you!

top related