codingserbia2014-javavspig

59
Ozren Gulan && Dusan Zamurovic @ CodingSerbia ~ $ echo $title MapReduce: Java vs Apache Pig

Upload: dusan-zamurovic

Post on 27-Nov-2014

151 views

Category:

Software


1 download

DESCRIPTION

Slides from my joint session with Ozren at codingserbia.com 2014 - MapReduce: java vs pig.

TRANSCRIPT

Page 1: CodingSerbia2014-JavaVSPig

Ozren Gulan && Dusan Zamurovic @ CodingSerbia ~$ echo $titleMapReduce: Java vs Apache Pig

Page 2: CodingSerbia2014-JavaVSPig

$ echo $dusandusan zamurovic, m.sc. software engineeringIT consultant @ codecentricTwitter: @ezamur

$ echo $ozrenozren gulan, m.sc. software engineeringIT consultant @ codecentric

Page 3: CodingSerbia2014-JavaVSPig

$ about-BigData-mr, M/R, MapReduce-hadoop-pig-showcase, primer, example

Page 4: CodingSerbia2014-JavaVSPig

$ idea behind analysis

- BigData/Hadoop popularity++- Batch Processing, Apache Pig or Java MapReduce?

$ goal of the analysis

- Comparison of Apache Pig and Java MapReduce- Clear picture, when to use which one

Page 5: CodingSerbia2014-JavaVSPig

$ rpm –qpR pig-vs-java-0.0.1-snapshot.x86.rpm

$ hadoop versionHadoop 2.3.0-cdh5.1.0

$ pig --versionApache Pig version 0.12.0-cdh5.1.0

Page 6: CodingSerbia2014-JavaVSPig

$ showcase-for each customer group

-top 5 products bought-average number of views per visit-average number of purchases-average purchase

Page 7: CodingSerbia2014-JavaVSPig

$ cat input_record.json{ "sessionId": 1, "customerCategoryId": 5, "customerCategoryDescription": "desc", "products": [ { "id": 1222, "name": "product", "category": "product category", "bought": true, "price": 57990.0 }, ... ]}

Page 8: CodingSerbia2014-JavaVSPig

$ analysis criteria

- analysis:readability_maintainability- analysis:performance- analysis:dev_tools (IDE, testing, debugging, etc)

VS

Page 9: CodingSerbia2014-JavaVSPig

$ analysis:readability_maintainability –pig

- PigLatin SQL like - Simple to understand - Limited number of predefined functions

Page 10: CodingSerbia2014-JavaVSPig

products = LOAD '/example/products/customer_records_map_reduce_input.json' USING JsonLoader('...');

categories = LOAD '/example/dimension/customer_categories.db' AS (categoryId:int,age:chararray,gender:chararray);

joinedRecords = JOIN categories BY categoryId, products BY customerCategoryId;

--for each group of users, show top five selling productsflattenedProducts = FOREACH joinedRecords GENERATE

sessionId AS sessionId,categories::categoryId AS categoryId,categories::age AS age,categories::gender AS gender,FLATTEN(products.(id, name, category, bought, price))

AS (id, name, category, bought, price);

boughtProducts = FILTER flattenedProducts BY bought == true;

groupedProducts = GROUP boughtProducts BY (categoryId, age, gender, id, name);

countedProducts = FOREACH groupedProducts GENERATE

FLATTEN(group), COUNT(boughtProducts) AS counter;

groupTopFiveProducts = GROUP countedProducts BY (categoryId, age, gender);

Page 11: CodingSerbia2014-JavaVSPig

resultTopFiveProducts = FOREACH groupTopFiveProducts {sorted = ORDER countedProducts BY counter DESC;topProducts = LIMIT sorted 5;GENERATE FLATTEN(topProducts);};

STORE resultTopFiveProducts INTO '/example/results/topTenProducts'

USING JsonStorage();

--average number of seen productsaverageSeenProducts = FOREACH joinedRecords GENERATE

categories::categoryId AS categoryId, categories::age AS age, categories::gender AS gender, COUNT(products) AS counter;

grpAverageSeenProducts = GROUP averageSeenProducts BY (categoryId, age, gender);

averageCountedProducts = FOREACH grpAverageSeenProducts GENERATEFLATTEN(group),AVG(averageSeenProducts.counter) AS

averageSeen;

--average number of bought products per visitgroupedBySession = GROUP boughtProducts BY (sessionId, categoryId, age, gender);

Page 12: CodingSerbia2014-JavaVSPig

averageBoughtProducts = FOREACH groupedBySession GENERATEFLATTEN(group),COUNT(boughtProducts.name) AS counter;

groupedAverageBoughtProducts = GROUP averageBoughtProducts BY (categoryId, age, gender);

resultAverageBoughtProducts = FOREACH groupedAverageBoughtProducts GENERATEFLATTEN(group),AVG(averageBoughtProducts.counter) AS averageBought;

--average purchase amountgroupedAveragePrice = GROUP boughtProducts BY (categoryId, age, gender);

averagePrice = FOREACH groupedAveragePrice GENERATEFLATTEN(group),AVG(boughtProducts.price) AS averagePaid;

joinedFinal = JOIN averageCountedProducts BY (categoryId, age, gender),resultAverageBoughtProducts BY (categoryId, age, gender),averagePrice BY (categoryId, age, gender);

finalResult = FOREACH joinedFinal GENERATEaverageCountedProducts::categoryId AS categoryId,averageCountedProducts::age AS age,averageCountedProducts::gender AS gender,averageCountedProducts::averageSeen AS averageSeen,resultAverageBoughtProducts::averageBought AS

averageBought,averagePrice::averagePaid AS averagePaid;

STORE finalResult INTO '/example/results/productsStatistic' USING JsonStorage();

Total lines of code: 77

Page 13: CodingSerbia2014-JavaVSPig

$ analysis:readability_maintainability –java

package com.codingserbia.dto;

import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.Set;

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;

import com.codingserbia.writable.ProductWritable;

public class CustomerCategoryProductBag { public LongWritable customerCategoryId;

public Text customerCategoryDescription;

private Map<LongWritable, ProductWritable> products;

private Map<LongWritable, Long> purchasesByProduct;

private Map<LongWritable, Long> viewsByProduct;

private int numberOfViews = 0;

private int numberOfSessions = 0;

Page 14: CodingSerbia2014-JavaVSPig

private int numberOfPurchases = 0;

public CustomerCategoryProductBag() { customerCategoryId = new LongWritable(0L); customerCategoryDescription = new Text(); products = new HashMap<LongWritable, ProductWritable>(); purchasesByProduct = new HashMap<LongWritable, Long>(); viewsByProduct = new HashMap<LongWritable, Long>(); }

public ProductWritable getProductWritable(LongWritable id) { return products.get(id); }

public boolean contains(LongWritable productId) { return getProductWritable(productId) != null; }

public void add(ProductWritable product) { products.put(product.id, product); viewsByProduct.put(product.id, 1L); numberOfViews++;

if (product.bought.get()) { purchasesByProduct.put(product.id, 1L); numberOfPurchases++; } }

public void processOccurance(ProductWritable product) { if (product.bought.get()) { Long productNumberOfPurchases = purchasesByProduct.get(product.id); if (productNumberOfPurchases == null) { productNumberOfPurchases = 1L; } else { productNumberOfPurchases++; } purchasesByProduct.put(product.id, productNumberOfPurchases);

Page 15: CodingSerbia2014-JavaVSPig

numberOfPurchases++; }

Long productNumberOfViews = viewsByProduct.get(product.id); productNumberOfViews++; viewsByProduct.put(product.id, productNumberOfViews);

numberOfViews++; }

public List<ProductWritable> getTopProductsBought(int numberOfProducts) { List<ProductWritable> topProducts = new ArrayList<ProductWritable>();

Set<Entry<LongWritable, Long>> entrySet = purchasesByProduct.entrySet();

List<Entry<LongWritable, Long>> entries = new ArrayList<Entry<LongWritable, Long>>(); for (Iterator<Entry<LongWritable, Long>> iterator = entrySet.iterator(); iterator.hasNext();) { entries.add(iterator.next()); } Collections.sort(entries, new Comparator<Entry<LongWritable, Long>>() { @Override public int compare(Entry<LongWritable, Long> entry1, Entry<LongWritable, Long> entry2) { return entry2.getValue().intValue() - entry1.getValue().intValue(); } });

int resultSize = numberOfProducts; if (resultSize > entries.size()) { resultSize = entries.size(); }

for (Entry<LongWritable, Long> e : entries.subList(0, resultSize)) { topProducts.add(products.get(e.getKey())); }

return topProducts; }

Page 16: CodingSerbia2014-JavaVSPig

public void increaseNumberOfSessions() { numberOfSessions++; }

public float calculateAverageNumberOfViews() { if (numberOfSessions == 0) { return 0f; }

return (float) numberOfViews / (float) numberOfSessions; }

public float calculateAverageNumberOfPurchases() { if (numberOfSessions == 0) { return 0f; }

return (float) numberOfPurchases / (float) numberOfSessions; }

public float calculateAveragePurchase() { float amountInTotal = 0f;

for (Iterator<LongWritable> iterator = purchasesByProduct.keySet().iterator(); iterator.hasNext();) { LongWritable key = iterator.next(); amountInTotal += products.get(key).price.get() * purchasesByProduct.get(key); }

return numberOfPurchases != 0 ? amountInTotal / numberOfPurchases : 0f; }

}

package com.codingserbia.dto;

import java.util.ArrayList;import java.util.List;

import com.fasterxml.jackson.annotation.JsonProperty;

Page 17: CodingSerbia2014-JavaVSPig

public class CustomerSession {

@JsonProperty public long sessionId;

@JsonProperty public long customerCategoryId;

@JsonProperty(required = false) public String customerCategoryDescription;

@JsonProperty public List<Product> products;

public CustomerSession() { products = new ArrayList<Product>(); }}

package com.codingserbia.dto;

import java.util.ArrayList;import java.util.List;

import com.fasterxml.jackson.annotation.JsonProperty;

public class CustomerSessionOutput {

@JsonProperty public long customerCategoryId;

@JsonProperty public String customerCategoryDescription;

@JsonProperty public List<ProductOutput> products;

@JsonProperty public float averageNumberOfViews;

Page 18: CodingSerbia2014-JavaVSPig

@JsonProperty public float averageNumberOfPurchases;

@JsonProperty public float averagePurchase;

public CustomerSessionOutput() { products = new ArrayList<ProductOutput>(); }}

package com.codingserbia.dto;

import com.fasterxml.jackson.annotation.JsonIgnore;import com.fasterxml.jackson.annotation.JsonProperty;

public class Product {

@JsonProperty public long id;

@JsonProperty public String name;

@JsonProperty public String category;

@JsonProperty public boolean bought;

@JsonProperty public double price;

@JsonIgnore public int numberOfPurschases;

public Product() {

}

Page 19: CodingSerbia2014-JavaVSPig

public Product(long id, String name, String category, boolean bought, double price) { super(); this.id = id; this.name = name; this.category = category; this.bought = bought; this.price = price; }

public Product(Product aProduct) { super(); this.id = aProduct.id; this.name = aProduct.name; this.category = aProduct.category; this.bought = aProduct.bought; this.price = aProduct.price; }

}

package com.codingserbia.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

public class ProductOutput {

@JsonProperty public long id;

@JsonProperty public String name;

public ProductOutput() { name = ""; }

public ProductOutput(long id, String name) { super(); this.id = id; this.name = name; }}

Page 20: CodingSerbia2014-JavaVSPig

package com.codingserbia.writable;

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;

public class CustomerCategoryWritable implements Writable {

public LongWritable categoryId; public Text description; public Text gender;

public CustomerCategoryWritable() { super(); categoryId = new LongWritable(); description = new Text(); gender = new Text(); }

public CustomerCategoryWritable(long id, String description, String gender) { super(); categoryId = new LongWritable(id); this.description = new Text(description); this.gender = new Text(gender); }

@Override public void readFields(DataInput input) throws IOException { categoryId.readFields(input); description.readFields(input); gender.readFields(input); }

@Override public void write(DataOutput output) throws IOException { categoryId.write(output); description.write(output);

Page 21: CodingSerbia2014-JavaVSPig

gender.write(output); }

@Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((categoryId == null) ? 0 : categoryId.hashCode()); result = prime * result + ((description == null) ? 0 : description.hashCode()); result = prime * result + ((gender == null) ? 0 : gender.hashCode()); return result; }

@Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } CustomerCategoryWritable other = (CustomerCategoryWritable) obj; if (categoryId == null) { if (other.categoryId != null) { return false; } } else if (!categoryId.equals(other.categoryId)) { return false; } if (description == null) { if (other.description != null) { return false; } } else if (!description.equals(other.description)) { return false; }

Page 22: CodingSerbia2014-JavaVSPig

if (gender == null) { if (other.gender != null) { return false; } } else if (!gender.equals(other.gender)) { return false; } return true; }

@Override public String toString() { return "CustomerCategoryWritable [categoryId=" + categoryId + ", description=" + description + ", gender=" + gender + "]"; }

}

package com.codingserbia.writable;

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;

import com.codingserbia.dto.CustomerSession;

public class CustomerSessionWritable implements Writable {

public LongWritable categoryId;

public Text categoryDescription;

public ProductArrayWritable products;

public CustomerSessionWritable() { super(); categoryId = new LongWritable();

Page 23: CodingSerbia2014-JavaVSPig

categoryDescription = new Text(); products = new ProductArrayWritable(ProductWritable.class); }

public CustomerSessionWritable(String categoryDesc, CustomerSession json) { super(); categoryId = new LongWritable(json.customerCategoryId); categoryDescription = new Text(categoryDesc);

products = new ProductArrayWritable(ProductWritable.class); ProductWritable[] pwArray = new ProductWritable[json.products.size()]; for (int i = 0; i < json.products.size(); i++) { ProductWritable pw = new ProductWritable(json.products.get(i)); pwArray[i] = pw; } products.set(pwArray); }

@Override public void readFields(DataInput input) throws IOException { categoryId.readFields(input); categoryDescription.readFields(input); products.readFields(input); }

@Override public void write(DataOutput ouput) throws IOException { categoryId.write(ouput); categoryDescription.write(ouput); products.write(ouput); }

@Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((categoryDescription == null) ? 0 : categoryDescription.hashCode()); result = prime * result + ((categoryId == null) ? 0 : categoryId.hashCode()); result = prime * result + ((products == null) ? 0 : products.hashCode()); return result; }

Page 24: CodingSerbia2014-JavaVSPig

@Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } CustomerSessionWritable other = (CustomerSessionWritable) obj; if (categoryDescription == null) { if (other.categoryDescription != null) { return false; } } else if (!categoryDescription.equals(other.categoryDescription)) { return false; } if (categoryId == null) { if (other.categoryId != null) { return false; } } else if (!categoryId.equals(other.categoryId)) { return false; } if (products == null) { if (other.products != null) { return false; } } else if (!products.equals(other.products)) { return false; } return true; }

@Override public String toString() { return "CustomerSessionWritable [categoryId=" + categoryId + ", categoryDescription=" + categoryDescription.toString() + ", products=[" + products.toString() + "]]"; }}

Page 25: CodingSerbia2014-JavaVSPig

package com.codingserbia.writable;

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.io.Writable;

public class CustomerSessionWritablesGroupedByCustomerCategoryId implements Writable {

public LongWritable customerCategoryId;

public MapWritable sessions;

public CustomerSessionWritablesGroupedByCustomerCategoryId() { super(); customerCategoryId = new LongWritable(); sessions = new MapWritable(); }

public CustomerSessionWritablesGroupedByCustomerCategoryId(Long categoryId) { super(); customerCategoryId = new LongWritable(categoryId); sessions = new MapWritable(); }

@Override public void readFields(DataInput input) throws IOException { customerCategoryId.readFields(input); sessions.readFields(input); }

@Override public void write(DataOutput output) throws IOException { customerCategoryId.write(output); sessions.write(output); }

Page 26: CodingSerbia2014-JavaVSPig

@Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((customerCategoryId == null) ? 0 : customerCategoryId.hashCode()); result = prime * result + ((sessions == null) ? 0 : sessions.hashCode()); return result; }

@Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } CustomerSessionWritablesGroupedByCustomerCategoryId other = (CustomerSessionWritablesGroupedByCustomerCategoryId) obj; if (customerCategoryId == null) { if (other.customerCategoryId != null) { return false; } } else if (!customerCategoryId.equals(other.customerCategoryId)) { return false; } if (sessions == null) { if (other.sessions != null) { return false; } } else if (!sessions.equals(other.sessions)) { return false; } return true; }

}

Page 27: CodingSerbia2014-JavaVSPig

package com.codingserbia.writable;

import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.Writable;

public class ProductArrayWritable extends ArrayWritable {

public ProductArrayWritable(Class<? extends Writable> valueClass) { super(valueClass); }

@Override public String toString() { String value = "ProductArrayWritable ["; Writable[] pwArray = get(); for (Writable pw : pwArray) { value += pw.toString(); } value += "]"; return value; }

}

package com.codingserbia.writable;

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;

import org.apache.hadoop.io.BooleanWritable;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;

import com.codingserbia.dto.Product;

public class ProductWritable implements Writable {

public LongWritable id;

Page 28: CodingSerbia2014-JavaVSPig

public Text name;

public Text category;

public BooleanWritable bought;

public DoubleWritable price;

public ProductWritable() { super(); id = new LongWritable(); name = new Text(); category = new Text(); bought = new BooleanWritable(); price = new DoubleWritable(); }

public ProductWritable(Product json) { super(); id = new LongWritable(json.id); name = new Text(json.name); category = new Text(json.category); bought = new BooleanWritable(json.bought); price = new DoubleWritable(json.price); }

@Override public void readFields(DataInput input) throws IOException { id.readFields(input); name.readFields(input); category.readFields(input); bought.readFields(input); price.readFields(input); }

@Override public void write(DataOutput output) throws IOException { id.write(output); name.write(output); category.write(output); bought.write(output); price.write(output); }

Page 29: CodingSerbia2014-JavaVSPig

@Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((bought == null) ? 0 : bought.hashCode()); result = prime * result + ((category == null) ? 0 : category.hashCode()); result = prime * result + ((id == null) ? 0 : id.hashCode()); result = prime * result + ((name == null) ? 0 : name.hashCode()); result = prime * result + ((price == null) ? 0 : price.hashCode()); return result; }

@Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } ProductWritable other = (ProductWritable) obj; if (bought == null) { if (other.bought != null) { return false; } } else if (!bought.equals(other.bought)) { return false; } if (category == null) { if (other.category != null) { return false; } } else if (!category.equals(other.category)) { return false; }

Page 30: CodingSerbia2014-JavaVSPig

if (id == null) { if (other.id != null) { return false; } } else if (!id.equals(other.id)) { return false; } if (name == null) { if (other.name != null) { return false; } } else if (!name.equals(other.name)) { return false; } if (price == null) { if (other.price != null) { return false; } } else if (!price.equals(other.price)) { return false; } return true; }

@Override public String toString() { return "ProductWritable [id=" + id + ", name=" + name + ", category=" + category + ", bought=" + bought + ", price=" + price + "]"; }

}

Page 31: CodingSerbia2014-JavaVSPig

package com.codingserbia;

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.slf4j.Logger;import org.slf4j.LoggerFactory;

import com.codingserbia.writable.CustomerSessionWritable;

public class CodingSerbiaMapReduce extends Configured implements Tool {

private static final Logger LOGGER = LoggerFactory.getLogger(CodingSerbiaMapReduce.class);

protected String customerCategoriesFilePath = ""; protected String inputPath = ""; protected String outputPath = "";

public CodingSerbiaMapReduce(Configuration config) { super(); setConf(config); }

public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "C:/work/tools/hadoop-common-2.2.0-bin-master");

Configuration config = new Configuration(); CodingSerbiaMapReduce mr = new CodingSerbiaMapReduce(config); ToolRunner.run(config, mr, args); }

Page 32: CodingSerbia2014-JavaVSPig

protected boolean validateAndParseInput(String[] args) { if (args == null || args.length < 3) { LOGGER.error("Three arguments are required: path to customer categories file, path to input data and path to desired output directory."); return false; }

if (args.length > 3) { LOGGER.error("Too many arguments. Only three arguments are required: path to customer categories file, path to input data and path to desired output directory."); return false; }

customerCategoriesFilePath = args[0]; LOGGER.info("Customer categories file path: " + customerCategoriesFilePath); getConf().set("customer.categories.file.path", customerCategoriesFilePath);

inputPath = args[1]; LOGGER.info("Input path: " + inputPath);

outputPath = args[2]; LOGGER.info("Output path: " + outputPath);

LOGGER.info("Input validation succeeded"); return true; }

@Override public int run(String[] args) throws Exception { if (!validateAndParseInput(args)) { throw new RuntimeException("Input validation failed."); }

Job job = Job.getInstance(getConf()); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(CustomerSessionWritable.class);

job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);

job.setMapperClass(CustomerRecordsMapper.class); job.setReducerClass(CustomerRecordsReducer.class);

Page 33: CodingSerbia2014-JavaVSPig

job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath));

job.setJarByClass(CodingSerbiaMapReduce.class);

return job.waitForCompletion(true) ? 0 : 1; }

}

package com.codingserbia;

import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.util.HashMap;import java.util.Map;

import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;

import com.codingserbia.dto.CustomerSession;import com.codingserbia.writable.CustomerCategoryWritable;import com.codingserbia.writable.CustomerSessionWritable;import com.fasterxml.jackson.databind.ObjectMapper;

public class CustomerRecordsMapper extends Mapper<LongWritable, Text, LongWritable, CustomerSessionWritable> {

private static Logger LOGGER = LoggerFactory.getLogger(CustomerRecordsMapper.class);

private Map<LongWritable, CustomerCategoryWritable> groupedCategories;

private ObjectMapper jsonMapper;

Page 34: CodingSerbia2014-JavaVSPig

public CustomerRecordsMapper() { super(); groupedCategories = new HashMap<LongWritable, CustomerCategoryWritable>(); jsonMapper = new ObjectMapper(); }

@SuppressWarnings({ "rawtypes", "unchecked" }) @Override protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { super.setup(context);

String customerCategoriesPath = context.getConfiguration().get("customer.categories.file.path"); loadCustomerCategories(customerCategoriesPath, context); }

@SuppressWarnings("unused") @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { CustomerSession jsonObj = jsonMapper.readValue(value.toString(), CustomerSession.class); LongWritable categoryId = new LongWritable(jsonObj.customerCategoryId);

CustomerCategoryWritable category = groupedCategories.get(categoryId); if (category != null) { CustomerSessionWritable session = new CustomerSessionWritable(category.description.toString(), jsonObj); context.write(categoryId, session); } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }

private void loadCustomerCategories(String filePath, Context context) throws IOException { Path path = new Path(filePath);

FileSystem fs = path.getFileSystem(context.getConfiguration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); String line; while ((line = br.readLine()) != null) { String[] columns = line.split("\t"); long categoryId = Long.valueOf(columns[0]);

Page 35: CodingSerbia2014-JavaVSPig

String description = columns[1] + " " + columns[2]; String gender = columns[2]; CustomerCategoryWritable writable = new CustomerCategoryWritable(categoryId, description, gender); groupedCategories.put(writable.categoryId, writable); } br.close(); }

}

package com.codingserbia;

import java.io.IOException;import java.util.HashMap;import java.util.List;import java.util.Map;

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Reducer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;

import com.codingserbia.dto.CustomerCategoryProductBag;import com.codingserbia.dto.CustomerSessionOutput;import com.codingserbia.dto.ProductOutput;import com.codingserbia.writable.CustomerSessionWritable;import com.codingserbia.writable.ProductWritable;import com.fasterxml.jackson.databind.ObjectMapper;

public class CustomerRecordsReducer extends Reducer<LongWritable, CustomerSessionWritable, NullWritable, Text> {

private static Logger LOGGER = LoggerFactory.getLogger(CustomerRecordsReducer.class);

private Map<LongWritable, CustomerCategoryProductBag> categoryMap;

private ObjectMapper jsonMapper;

Page 36: CodingSerbia2014-JavaVSPig

public CustomerRecordsReducer() { super(); categoryMap = new HashMap<LongWritable, CustomerCategoryProductBag>(); jsonMapper = new ObjectMapper(); }

@Override protected void reduce(LongWritable key, Iterable<CustomerSessionWritable> values, Context context) throws IOException, InterruptedException { CustomerCategoryProductBag aBag = categoryMap.get(key); if (aBag == null) { aBag = new CustomerCategoryProductBag(); aBag.customerCategoryId = key; }

for (CustomerSessionWritable value : values) { aBag.increaseNumberOfSessions(); if (aBag.customerCategoryDescription.getLength() == 0) { aBag.customerCategoryDescription = value.categoryDescription; }

Writable[] productWritables = value.products.get();

for (Writable writable : productWritables) { ProductWritable product = (ProductWritable) writable;

if (!aBag.contains(product.id)) { aBag.add(product); } else { aBag.processOccurance(product); } } }

categoryMap.put(key, aBag);

int numberOfTopBoughtProducts = 5; List<ProductWritable> topProducts = aBag.getTopProductsBought(numberOfTopBoughtProducts);

CustomerSessionOutput outputJsonObj = new CustomerSessionOutput();

Page 37: CodingSerbia2014-JavaVSPig

outputJsonObj.customerCategoryId = key.get(); outputJsonObj.customerCategoryDescription = aBag.customerCategoryDescription.toString(); outputJsonObj.averageNumberOfViews = aBag.calculateAverageNumberOfViews(); outputJsonObj.averageNumberOfPurchases = aBag.calculateAverageNumberOfPurchases(); outputJsonObj.averagePurchase = aBag.calculateAveragePurchase(); for (ProductWritable pw : topProducts) { outputJsonObj.products.add(new ProductOutput(pw.id.get(), pw.name.toString())); }

context.write(NullWritable.get(), new Text(jsonMapper.writeValueAsString(outputJsonObj))); LOGGER.info(jsonMapper.writeValueAsString(outputJsonObj)); }}

Page 38: CodingSerbia2014-JavaVSPig

Total lines of code: ~ 1K

Page 39: CodingSerbia2014-JavaVSPig

public class CodingSerbiaMapReduce extends Configured implements Tool { ... Configuration config = new Configuration(); CodingSerbiaMapReduce mr = new CodingSerbiaMapReduce(config); ToolRunner.run(config, mr, args); ... Job job = Job.getInstance(getConf()); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);

job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(CustomerSessionWritable.class);

job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);

job.setMapperClass(CustomerRecordsMapper.class); job.setReducerClass(CustomerRecordsReducer.class);

FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath));

return job.waitForCompletion(true) ? 0 : 1;}

Page 40: CodingSerbia2014-JavaVSPig

public class CustomerRecordsMapper extendsMapper<LongWritable, Text, LongWritable,

CustomerSessionWritable> { protected void map(LongWritable key, Text value, Context context) ... { ... CustomerSession jsonObj = jsonMapper.readValue(value.toString(),

CustomerSession.class);

LongWritable categoryId = new

LongWritable(jsonObj.customerCategoryId);

CustomerCategoryWritable category = categories.get(categoryId); if (category != null) { CustomerSessionWritable session = new

CustomerSessionWritable(..., jsonObj);

context.write(categoryId, session); }}

Page 41: CodingSerbia2014-JavaVSPig

public class CustomerRecordsReducer extendsReducer<LongWritable, CustomerSessionWritable, NullWritable,

Text> {

protected void reduce(LongWritable key, Iterable<CustomerSessionWritable> values, Context

context)…{

for (CustomerSessionWritable value : values) { // increase number of customer visits for (Writable writable : value.products.get()) { // process an occurrence of a product // track if it is bought or viewed, etc... } }

// calculate average values we need // order bought/viewed products based on number of purschases/views

context.write(NullWritable.get(), new

Text(jsonMapper.writeValueAsString(outputJsonObj)));

}

Page 42: CodingSerbia2014-JavaVSPig

$ cat output_record.json{ "customerCategoryId": 4, "customerCategoryDescription": "30-40 male", "products": [ { "id": 1229, "name": "Candy ugradna rerna FS 635 AQUA" }, ... ], "averageNumberOfViews": 2.3333333, "averageNumberOfPurchases": 1.3333334, "averagePurchase": 44750.0}

Page 43: CodingSerbia2014-JavaVSPig

...

Page 44: CodingSerbia2014-JavaVSPig
Page 45: CodingSerbia2014-JavaVSPig

$ analysis:performancecloudera-quickstart-vm-5.1.0 64-bitIntel i5 CPU @ 2.60GHz16 GB RAM (12 GB RAM for VM)

Small: 150000 json records ~ 103MBMedium: 750000 json records ~ 517MBLarge: 1500000 json records ~ 1GBX-large: 2250000 json records ~ 1.5GB

Page 46: CodingSerbia2014-JavaVSPig

$ analysis:performance -mode single-node

small (150000/103MB) medium (750000/517MB) large (1500000/1GB) x-large (2250000/1.5GB)0

100

200

300

400

500

600

700

800

900

4m 05s

7m 40s

10m 30s

13m 30s

40s 1m 20s

2m 30s

3m 30s

Page 47: CodingSerbia2014-JavaVSPig

$ analysis:performance –mode single-node

small (150000/103MB) medium (750000/517MB) large (1500000/1GB) x-large (2250000/1.5GB)0

5

10

15

20

25

30

8

13

19

26

5 56

6

Page 48: CodingSerbia2014-JavaVSPig

$ analysis:performance –mode cluster

small(150000/103MB) medium(750000/517MB) large(1500000/1GB) x-large(2250000/1.5GB)0

50

100

150

200

250

300

2m 40s

3m 5s

3m 34s

4m 6s

35s 38s 46s51s

pig_2java

Page 49: CodingSerbia2014-JavaVSPig

$ analysis:performance –mode compare

small(150000/103MB) medium(750000/517MB) large(1500000/1GB) x-large(2250000/1.5GB)0

100

200

300

400

500

600

700

800

900

4m 5s

7m 40s

10m 30s

13m 30s

40s1m 20s 2m 30s

3m 30s2m 40s3m 5s

3m 34s 4m 6s

35s 38s 46s51s

Page 50: CodingSerbia2014-JavaVSPig
Page 51: CodingSerbia2014-JavaVSPig

$ analysis:performance

- PigCompiler translating PigLatin into Java code- PigOptimizer - Graphviz, execution plan

Page 52: CodingSerbia2014-JavaVSPig

$ analysis:language_support Pig

- UDF (Java, Python, Jython, Groovy, Ruby, JavaScript)

REGISTER myUDFs.jar

DEFINE ShinyUDF some.shiny.udf.DoSomething();

Page 53: CodingSerbia2014-JavaVSPig

$ analysis:dev_tools -MRUnit

@Override@Beforepublic void setUp() { super.setUp();

config = new Configuration(); config.set("customer.categories.file.path",

CUSTOMER_CATEGORIES_FILE_PATH);

CustomerRecordsMapper mapper = new CustomerRecordsMapper(); mapDriver = MapDriver.newMapDriver(mapper);}

Page 54: CodingSerbia2014-JavaVSPig

$ analysis:dev_tools

// testing style: tell the input, assert the output@Testpublic void testMapper () throws Exception { Context ctx = mapDriver.getContext(); Mockito.when(ctx.getConfiguration()).thenReturn(config);

for (int i = 0; i < INPUT_RECORDS.size(); i++) { mapDriver.withInput(new LongWritable(i), INPUT_RECORDS.get(i)); }

List<Pair<LongWritable, CustomerSessionWritable>> result =

mapDriver.run();

Assertions.assertThat(result).isNotNull().hasSize(10); }

Page 55: CodingSerbia2014-JavaVSPig

$ analysis:dev_tools

@Testpublic void testReducer () throws Exception { reduceDriver.withInput(new LongWritable(4L), CUSTOMER_CATEGORY_4); ...

Pair<NullWritable, Text> expectedTupple1 =new Pair<NullWritable, Text>(NullWritable.get(),

MAPREDUCE_OUTPUT); ...

List<Pair<NullWritable, Text>> result = reduceDriver.run();

Assertions.assertThat(result).contains(expectedTupple1, ...);}

Page 56: CodingSerbia2014-JavaVSPig

$ analysis:dev_tools

- Currently, no IDE support- Plugins for text editors- Diagnostic operators: Describe, Dump, Explain and Illustrate

Page 57: CodingSerbia2014-JavaVSPig

$ analysis:dev_tools

- PigUnit, local and mapreduce mode

private static PigTest test;

@BeforeClasspublic static void setUp() throws IOException, ParseException { test = new PigTest("src/main/resources/example.pig"); test.override("products", "products = LOAD ‘...’ "); test.override("categories", "categories = LOAD ‘...’ ");}

@Testpublic void testTopFiveProducts() throws IOException, ParseException { test.assertOutput("resultTopFiveProducts", new File(TEST_PATH + "results/resultTopFiveProducts.txt"));}

Page 58: CodingSerbia2014-JavaVSPig

$ conclusion:pig+ high abstraction level+ quick development+ maintenance+ extensions (UDF, PiggyBank)

- performance- restrictions of Pig Latin

$ conclusion:java+ speeeeed, control+ tools

- complexity, maintenance, control

Page 59: CodingSerbia2014-JavaVSPig

$ Q&A?