cascading at the lyon hadoop user group
TRANSCRIPT
CascadingHadoop User Group Lyon
2015-02-06Arnaud Cogoluègnes - Zenika
Content
Cascading: what, why, how?Hadoop basics along the way
No pre-requisites to follow
Cascading, what is it?
Java frameworkApache License, Version 2.0
To build data-oriented applicationse.g. ETL-like applications
Cascading key features
Java APIMature (runs on MapReduce for years)
TestabilityRe-usability
Built-in features (filter, join, aggregator, etc)
Cascading simple flowFields usersFields = new Fields(
"name","country","gender"
);
Pipe users = new Pipe("users");
users = new Unique(users,new Fields("name"));
jason US M
arnaud FR M
cynthia US F
mike US M
paul GB M
anna RU F
clare GB F
input file
Connecting flow to source and sinkFields usersFields = new Fields("name","country","gender");
Pipe users = new Pipe("users");
users = new Unique(users,new Fields("name"));
Tap usersIn = ... // file’s location and structure abstraction
Tap usersOut = ...
FlowDef flowDef = FlowDef.flowDef()
.addSource(users, usersIn)
.addTailSink(users, usersOut);
Taps and schemesFields usersFields = new Fields("name","country","gender");
Pipe users = new Pipe("users");
users = new Unique(users,new Fields("name"));
Tap usersIn = new Hfs(
new TextDelimited(usersFields,false,"\t"), // structure
"/in" // location
);
Tap usersOut = new Hfs(
new TextDelimited(usersFields, false, "\t"),"/out"
);
Executing a MapReduce flowFlowDef flowDef = FlowDef.flowDef()
.addSource(users, usersIn)
.addTailSink(users, usersOut);
new Hadoop2MR1FlowConnector().connect(flowDef).complete();
My first MapReduce flowFields usersFields = new Fields("name","country","gender");
Pipe users = new Pipe("users");
users = new Unique(users,new Fields("name"));
Tap usersIn = new Hfs(...);
Tap usersOut = new Hfs(...);
FlowDef flowDef = FlowDef.flowDef()
.addSource(users, usersIn)
.addTailSink(users, usersOut);
new Hadoop2MR1FlowConnector().connect(flowDef).complete();
Changing the outputFields usersFields = new Fields("name","country","gender");
Pipe users = new Pipe("users");
users = new Unique(users,new Fields("name"));
Tap usersIn = new Hfs(...);
Tap usersOut = new Hfs( new SequenceFile(usersFields),"/out");
FlowDef flowDef = FlowDef.flowDef()
.addSource(users, usersIn)
.addTailSink(users, usersOut);
new Hadoop2MR1FlowConnector().connect(flowDef).complete();
Hadoop 2
HDFS
YARN
MapReduce Yourapp
Blocks, datanodes, namenode
file.csv B1 B2 B3 file is made of 3 blocks (default block size is 128 MB)
B1 B2 B1 B3
B1 B2 B2 B3
DN 1 DN 2
DN 4DN 3datanodes store files blocks(here block 3 is under-replicated)
B1 : 1, 2, 3 B2 : 1, 3, 4B3 : 2, 4
Namenode
namenode handles files metadata and enforces replication
MapReducefile.csv B1 B2 B3
Mapper
Mapper
Mapper
B1
B2
B3
Reducer
Reducer
k1,v1
k1,v2
k1 [v1,v2]
Code goes to datafile.csv B1 B2 B3
Mapper
Mapper
Mapper
B1
B2
B3
Reducer
Reducer
k1,v1
k1,v2
k1 [v1,v2]
B1 B2 B1 B3
B1 B2 B2 B3
DN 1 DN 2
DN 4DN 3
DN 1
DN 3
DN 4
Local MapReduce in a test
Not bad
Local connector
Better
Local connector for testingFields usersFields = new Fields("name","country","gender");
Pipe users = new Pipe("users");
users = new Unique(users,new Fields("name"));
Tap usersIn = new FileTap(new TextDelimited(usersFields,false,"\t"),"in.txt");
Tap usersOut = new FileTap(
new TextDelimited(usersFields, false, "\t"), "out.txt"
);
FlowDef flowDef = FlowDef.flowDef()
.addSource(users, usersIn)
.addTailSink(users, usersOut);
new LocalFlowConnector().connect(flowDef).complete();
Users by countriesFields usersFields = new Fields("name","country","gender");
Pipe users = new Pipe("users");
users = new GroupBy(users,new Fields("country"));
users = new Every(users,new Count(new Fields("count")));
Tap usersOut = new FileTap(
new TextDelimited(new Fields("country","count"), false, "\t"),"/out.txt"
);
jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F
FR 1RU 1GB 2US 3
Usage by countries?
jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F
jason loginmike newcontractcynthia loginanna logoutjason newcontractjason logout...
logs users
Join logs and usersFields usersFields = new Fields("name","country","gender");
Fields logsFields = new Fields("username","action");
Pipe users = new Pipe("users");
Pipe logs = new Pipe("logs");
Pipe logsUsers = new CoGroup(
logs,new Fields("username"),
users,new Fields("name")
);
Join logs and usersPipe logsUsers = new CoGroup(
logs,new Fields("username"),
users,new Fields("name")
);
jason loginmike newcontractcynthia loginanna logoutjason newcontractjason logout...
jason US Marnaud FR Mcynthia US Fmike US Mpaul GB Manna RU Fclare GB F
anna RU F logoutcynthia US F loginjason US M loginjason US M newcontractjason US M logoutmike US M newcontract
Usage by countrylogsUsers = new GroupBy(logsUsers,new Fields("country"));
logsUsers = new Every(logsUsers,new Count(new Fields("count")));
Usage by countriesTap usersIn = new FileTap(new TextDelimited(usersFields,false,"\t"),"users.txt");
Tap logsIn = new FileTap(new TextDelimited(logsFields,false,"\t"),"logs.txt");
Tap usageOut = new FileTap(
new TextDelimited(new Fields("country","count"), false, "\t"),
"usage.txt"
);
FlowDef flowDef = FlowDef.flowDef()
.addSource(users, usersIn)
.addSource(logs,logsIn)
.addTailSink(logsUsers, usageOut);
RU 1US 5
Repartition join
M
M
M
R
R
jdoe,USpmartin,FR
jdoe,/productspmartin,/checkoutjdoe,/account
jdoe,US
jdoe,/products
jdoe,/account
jdoe,/productsjdoe,USjdoe,/account
jdoe,/products,USjdoe,/account,US
in-memorycartesian product
Repartition join optimization
M
M
M
R
R
jdoe,USpmartin,FR
jdoe,/productspmartin,/checkoutjdoe,/account
jdoe,US
jdoe,/products
jdoe,/account
jdoe,USjdoe,/productsjdoe,/account
jdoe,/products,USjdoe,/account,US
only “users” in memory(thanks to dataset indicator sorting,
i.e. “secondary sort”)
Optimization in Cascading CoGroup“During co-grouping, for any given unique grouping key, all of the rightmost pipes will accumulate the current grouping values into memory so they may be iterated across for every value in the left hand side pipe.(...)There is no accumulation for the left hand side pipe, only for those to the "right".
Thus, for the pipe that has the largest number of values per unique key grouping, on average, it should be made the "left hand side" pipe (lhs).”
Replicated/asymmetrical join
M
M
M
jdoe,USpmartin,FR
jdoe,/productspmartin,/checkoutjdoe,/account
jdoe,/products,US
jdoe,USpmartin,FR
jdoe,USpmartin,FR
jdoe,/account,US
pmartin,/checkout,FR
Loaded in distributed cache(hence “replicated”)
Functionusers = new Each(
users,
new Fields("country"), // argument
new CountryFullnameFunction(new Fields("countryFullname")), // function output
new Fields("name","countryFullname","gender") // what we keep
);
jason United States Marnaud France Mcynthia United States Fmike United States Mpaul United Kingdom Manna Russia Fclare United Kingdom F
Function (naive) implementationpublic static class CountryFullnameFunction extends BaseOperation implements Function {
public CountryFullnameFunction(Fields fields) { super(fields); }
@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { String country = functionCall.getArguments().getString(0); Locale locale = new Locale("",country); Tuple tuple = new Tuple(); tuple.add(locale.getDisplayCountry(Locale.ENGLISH)); functionCall.getOutputCollector().add(tuple); }
}
Functionspublic static class CountryFullnameFunction extends BaseOperation implements Function {
public CountryFullnameFunction(Fields fields) { super(fields); }
@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { // this is executed remotely // tips: initialize (small) caches, re-use objects, etc. // functions have callbacks for this }
}
Re-using objects in a functionpublic static class CountryFullnameFunction extends BaseOperation implements Function {
Tuple tuple = new Tuple();
public CountryFullnameFunction(Fields fields) { super(fields); }
@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { String country = functionCall.getArguments().getString(0); Locale locale = new Locale("",country); tuple.clear(); tuple.add(locale.getDisplayCountry(Locale.ENGLISH)); functionCall.getOutputCollector().add(tuple); }
}
Using Avro with Cascading// Avro is splittable, supports compression,// and has schemas Schema schema = new Schema.Parser().parse(schemaAsJson); AvroScheme avroScheme = new AvroScheme(schema);Tap tap = new Hfs(avroScheme,"/out");
Using Parquet files// Parquet is column-oriented// it supports splits and compressionMessageType type = ... // ~ the schemaScheme parquetScheme = new ParquetTupleScheme( fields, // fields to read fields, // fields to write type.toString());
Tap tap = new Hfs( parquetScheme, "/out");
Other dialects
Cascalog (Clojure)Scalding (Scala)
...
Testing with plungerFields usersFields = new Fields("name","country","gender");
Data corpus = new DataBuilder(usersFields)
.addTuple("jason","US","M")
(...)
.addTuple("cynthia", "US", "F")
.build();
Pipe users = plunger.newNamedPipe("users", corpus);
users = new GroupBy(users,new Fields("country"));
users = new Every(users,new Count(new Fields("count")));
Plunger plunger = new Plunger();
Bucket bucket = plunger.newBucket(new Fields("country", "count"), users);
Assert.assertEquals(bucket.result().asTupleList().size(),4);
Flow visualizationFlow flow = new LocalFlowConnector().connect(flowDef);flow.writeDOT("cascading-flow.dot");
digraph G { 1 [label = "Every('users')[Count[decl:[{1}:'count']]]"]; 2 [label = "FileTap['TextDelimited[['country', 'count']]']['/tmp/junit1462026100615315705/junit2286442878134169792.tmp']"]; 3 [label = "GroupBy('users')[by:['country']]"]; 4 [label = "FileTap['TextDelimited[['name', 'country', 'gender']]']['/home/acogoluegnes/prog/hadoop-dev/./src/test/resources/cascading/users.txt']"]; 5 [label = "[head]\n2.6.2\nlocal:2.6.2:Concurrent, Inc."]; 6 [label = "[tail]"]; 1 -> 2 [label = "[{2}:'country', 'count']\n[{3}:'name', 'country', 'gender']"]; 3 -> 1 [label = "users[{1}:'country']\n[{3}:'name', 'country', 'gender']"]; 5 -> 4 [label = ""]; 2 -> 6 [label = "[{2}:'country', 'count']\n[{2}:'country', 'count']"]; 4 -> 3 [label = "[{3}:'name', 'country', 'gender']\n[{3}:'name', 'country', 'gender']"];}
Typical processing
Receiving data (bulk or streams)Processing in batch mode
Feed to real-time systems (RDBMs, NoSQL)
Use cases
Parsing, processing, aggregating data“Diff-ing” 2 datasets
Joining data
Join generated and reference data
Hadoop
Processing(join, transformation)Generated data
Reporting
Reference data
Data handling
Raw data Parsed data Processing and insertion
Archives View on data Transformations
Avro, GZIPKeep it for forever
Parquet, SnappyKeep 2 years of data Processing (Cascading)
HDFS Real time DB
Flow handling with Spring Batch
Archiving
Processing Processing Processing
Cleaning
Java, API HDFS
Cascading
MapReduce
Lambda architecture
Lambda architecture wish list
● Fault-tolerant● Low latency● Scalable● General
● Extensible● Ad hoc queries● Minimal maintenance● Debuggable
Layers
Speed layer
Serving layer
Batch layer
Batch layer
Speed layer
Serving layer
Batch layer
Dataset storage.Views computation.
Serving layer
Speed layer
Serving layer
Batch layer
Random access to batch views.
Speed layer
Speed layer
Serving layer
Batch layer
Low latency access.
Batch layer
Speed layer
Serving layer
Batch layer
Hadoop (MapReduce, HDFS).Thrift, Cascalog (i.e. Cascading).
Serving layer
Speed layer
Serving layer
Batch layer
ElephantDB, BerkeleyDB.
Speed layer
Speed layer
Serving layer
Batch layer
Cassandra, Storm, Kafka.
Hive, Pig, Cascading
UDF : User Defined Function
Hive
+SQL (non-standard)Low learning curveExtensible with UDF
-So-so testabilitySo-so reusabilityNo flow controlSpread logic (script, java, shell)Programming with UDF
Pig
+Pig LatinLow learning curveExtensible with UDF
-So-so testabilitySo-so reusabilitySpread logic (script, java, shell)Programming with UDF
Cascading
+API JavaUnit testableFlow control (if, try/catch, etc)Good re-usability
-Programming needed
SQL on Cascading: Lingual
Pure Cascading underneathANSI/ISO standard SQL-99
JDBC DriverQuery any system…
… with an available Cascading Tap
Management & monitoring: Driven
CommercialAnalyze Cascading flows
SaaS and on-site deployment
Image: http://cascading.io/driven/
Image: http://cascading.io/driven/
Future: Cascading 3.0
Major rewritingBetter extensibility
MapReduce planner optimizationTez and Storm support
Thank you!