pig map reduce execution

16
Pig’s Map Reduce Execution xiafei.qiu@PCA

Upload: qiu-xiafei

Post on 02-Jul-2015

2.532 views

Category:

Technology


0 download

TRANSCRIPT

Page 1: Pig Map Reduce Execution

Pig’s Map Reduce Execution

xiafei.qiu@PCA

Page 2: Pig Map Reduce Execution

Agenda

• Data type

• Data structure

• Pig-Latin to Map-Reduce job compilation

• Physical Plan Execution

• UDF Invocation

Page 3: Pig Map Reduce Execution

Data Type

• Tuple

– An ordered list of Data.

– DefaultTuple has List<Object> mFields

• DataBag

– A collection of Tuples.

– Memory Manager calls spill() to spill to disk

• Map – Java Type

• Integer, Double, etc.. – Java Type

Page 4: Pig Map Reduce Execution

Data Structure

Logical Physical Map-Reduce

OperatorLogicalRelationalOperator

(with schema)LogicalExpressionOperator

ExpressionOperatorPhysicalOperator

MapReduceOper

Plan(Graph)

LogicalPlanLogicalExpresssionPlan PhysicalPlan MROperPlan

Page 5: Pig Map Reduce Execution

Map-Reduce Compilation

• Pig-Latin to Logical Plan

– Parser invoke logicalPlanBuilder

• Logical Plan to Physical Plan

– LogToPhyTranslationVisitor

– group, distinct:LR-GR-Pack

– Join: LR-GR-JoinPack(with inner foreach)

Page 6: Pig Map Reduce Execution

Map-Reduce Compilation

• Physical Plan to Map-Reduce Plan

– A MROperator stands for a MR job

– Traverse in topological order

• If POLoad or GlobalRearrnge, new MR operator/job

Page 7: Pig Map Reduce Execution

Map-Reduce Compilation

Page 8: Pig Map Reduce Execution

Map-Reduce Compilation

Page 9: Pig Map Reduce Execution

Map Execution

protected void map(Text key, Tuple inpTuple, Context context) throwsIOException, InterruptedException

{

//...........

for (PhysicalOperator root : roots) {

if (inIllustrator) {

if (root != null) {

root.attachInput(inpTuple);

}

} else {

root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));

}

}

runPipeline(leaf);

}

Page 10: Pig Map Reduce Execution

Map Execution

protected void runPipeline(PhysicalOperator leaf) throwsIOException, InterruptedException {

while(true){

Result res = leaf.getNext(DUMMYTUPLE);

if(res.returnStatus==POStatus.STATUS_OK){

collect(outputCollector,(Tuple)res.result);

continue;

}

}

//...........

}

Page 11: Pig Map Reduce Execution

Reduce Execution

protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) throws IOException, InterruptedException

{//...........

if (pack instanceof POJoinPackage){

pack.attachInput(key, tupIter.iterator());while (true){

if (processOnePackageOutput(context))break;

}}else{

pack.attachInput(key, tupIter.iterator());processOnePackageOutput(context);

} }

Page 12: Pig Map Reduce Execution

Reduce Execution

public boolean processOnePackageOutput(Context oc)

throws IOException, InterruptedException

{

Result res = pack.getNext(DUMMYTUPLE);

if(res.returnStatus==POStatus.STATUS_OK)

{

Tuple packRes = (Tuple)res.result;

//...........

for (int i = 0; i < roots.length; i++) {

roots[i].attachInput(packRes);

}

runPipeline(leaf);

}

if(res.returnStatus==POStatus.STATUS_NULL) {

return false;

}

//...........

if(res.returnStatus==POStatus.STATUS_EOP) {

return true;

}

return false;

}

Page 13: Pig Map Reduce Execution

Physical Plan Execution

• PhysicalPlan extendsOperatorPlan<PhysicalOperator>

– Operation on Graph

– PhysicalOperator as vertex

– Each vertex has a group of getNext() methods

– processInput() if necessary

Page 14: Pig Map Reduce Execution

Physical Plan Execution

public Result getNext(Tuple t) throws ExecException

{

//...........

Result res = new Result();

try {

res.result = loader.getNext();

if(res.result==null){

res.returnStatus = POStatus.STATUS_EOP;

tearDown();

}

else

res.returnStatus = POStatus.STATUS_OK;

if (res.returnStatus == POStatus.STATUS_OK)

res.result = illustratorMarkup(res, res.result, 0);

} catch (IOException e) {

log.error("Received error from loader function: " + e);

return res;

}

return res;

}

Page 15: Pig Map Reduce Execution

Physical Plan Execution

public Result getNext(Tuple t) throws ExecException {Result res = null;Result inp = null;while (true) {

inp = processInput();if (inp.returnStatus == POStatus.STATUS_EOP

|| inp.returnStatus == POStatus.STATUS_ERR)break;

illustratorMarkup(inp.result, null, 0);// illustrator ignore LIMIT before the post processingif ((illustrator == null || illustrator.getOriginalLimit() != -1) &&

soFar>=mLimit)inp.returnStatus = POStatus.STATUS_EOP;

soFar++;break;

}

return inp;}

Page 16: Pig Map Reduce Execution

UDF/Built-In Invocation

• POUserFunc