pig map reduce execution
TRANSCRIPT
Pig’s Map Reduce Execution
xiafei.qiu@PCA
Agenda
• Data type
• Data structure
• Pig-Latin to Map-Reduce job compilation
• Physical Plan Execution
• UDF Invocation
Data Type
• Tuple
– An ordered list of Data.
– DefaultTuple has List<Object> mFields
• DataBag
– A collection of Tuples.
– Memory Manager calls spill() to spill to disk
• Map – Java Type
• Integer, Double, etc.. – Java Type
Data Structure
Logical Physical Map-Reduce
OperatorLogicalRelationalOperator
(with schema)LogicalExpressionOperator
ExpressionOperatorPhysicalOperator
MapReduceOper
Plan(Graph)
LogicalPlanLogicalExpresssionPlan PhysicalPlan MROperPlan
Map-Reduce Compilation
• Pig-Latin to Logical Plan
– Parser invoke logicalPlanBuilder
• Logical Plan to Physical Plan
– LogToPhyTranslationVisitor
– group, distinct:LR-GR-Pack
– Join: LR-GR-JoinPack(with inner foreach)
Map-Reduce Compilation
• Physical Plan to Map-Reduce Plan
– A MROperator stands for a MR job
– Traverse in topological order
• If POLoad or GlobalRearrnge, new MR operator/job
Map-Reduce Compilation
Map-Reduce Compilation
Map Execution
protected void map(Text key, Tuple inpTuple, Context context) throwsIOException, InterruptedException
{
//...........
for (PhysicalOperator root : roots) {
if (inIllustrator) {
if (root != null) {
root.attachInput(inpTuple);
}
} else {
root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
}
}
runPipeline(leaf);
}
Map Execution
protected void runPipeline(PhysicalOperator leaf) throwsIOException, InterruptedException {
while(true){
Result res = leaf.getNext(DUMMYTUPLE);
if(res.returnStatus==POStatus.STATUS_OK){
collect(outputCollector,(Tuple)res.result);
continue;
}
}
//...........
}
Reduce Execution
protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) throws IOException, InterruptedException
{//...........
if (pack instanceof POJoinPackage){
pack.attachInput(key, tupIter.iterator());while (true){
if (processOnePackageOutput(context))break;
}}else{
pack.attachInput(key, tupIter.iterator());processOnePackageOutput(context);
} }
Reduce Execution
public boolean processOnePackageOutput(Context oc)
throws IOException, InterruptedException
{
Result res = pack.getNext(DUMMYTUPLE);
if(res.returnStatus==POStatus.STATUS_OK)
{
Tuple packRes = (Tuple)res.result;
//...........
for (int i = 0; i < roots.length; i++) {
roots[i].attachInput(packRes);
}
runPipeline(leaf);
}
if(res.returnStatus==POStatus.STATUS_NULL) {
return false;
}
//...........
if(res.returnStatus==POStatus.STATUS_EOP) {
return true;
}
return false;
}
Physical Plan Execution
• PhysicalPlan extendsOperatorPlan<PhysicalOperator>
– Operation on Graph
– PhysicalOperator as vertex
– Each vertex has a group of getNext() methods
– processInput() if necessary
Physical Plan Execution
public Result getNext(Tuple t) throws ExecException
{
//...........
Result res = new Result();
try {
res.result = loader.getNext();
if(res.result==null){
res.returnStatus = POStatus.STATUS_EOP;
tearDown();
}
else
res.returnStatus = POStatus.STATUS_OK;
if (res.returnStatus == POStatus.STATUS_OK)
res.result = illustratorMarkup(res, res.result, 0);
} catch (IOException e) {
log.error("Received error from loader function: " + e);
return res;
}
return res;
}
Physical Plan Execution
public Result getNext(Tuple t) throws ExecException {Result res = null;Result inp = null;while (true) {
inp = processInput();if (inp.returnStatus == POStatus.STATUS_EOP
|| inp.returnStatus == POStatus.STATUS_ERR)break;
illustratorMarkup(inp.result, null, 0);// illustrator ignore LIMIT before the post processingif ((illustrator == null || illustrator.getOriginalLimit() != -1) &&
soFar>=mLimit)inp.returnStatus = POStatus.STATUS_EOP;
soFar++;break;
}
return inp;}
UDF/Built-In Invocation
• POUserFunc