際際滷

際際滷Share a Scribd company logo
Pig¨s Map Reduce Executionxiafei.qiu@PCA
AgendaData typeData structurePig-Latin to Map-Reduce job compilationPhysical Plan ExecutionUDF Invocation
Data TypeTupleAn ordered list of Data.DefaultTuple has List<Object> mFieldsDataBagA collection of Tuples.Memory Manager calls spill() to spill to diskMap C Java TypeInteger, Double, etc.. C Java Type
Data Structure
Map-Reduce CompilationPig-Latin to Logical PlanParser invoke logicalPlanBuilderLogical Plan to Physical PlanLogToPhyTranslationVisitor group, distinctLR-GR-PackJoin: LR-GR-JoinPack(with inner foreach)
Map-Reduce CompilationPhysical Plan to Map-Reduce PlanA MROperator stands for a MR jobTraverse in topological orderIf POLoad or GlobalRearrnge, new MR operator/job
Map-Reduce Compilation
Map-Reduce Compilation
Map Executionprotectedvoid map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {     //...........for (PhysicalOperator root : roots) {if (inIllustrator) {if (root != null) {                    root.attachInput(inpTuple);}            } else {                root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));}}		runPipeline(leaf);}
Map Executionprotectedvoid runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {while(true){            Result res = leaf.getNext(DUMMYTUPLE);if(res.returnStatus==POStatus.STATUS_OK){                collect(outputCollector,(Tuple)res.result);continue;}}//...........}
Reduce Executionprotectedvoid reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)  throws IOException, InterruptedException {//...........if (packinstanceofPOJoinPackage){pack.attachInput(key, tupIter.iterator());		while (true)		{		    if (processOnePackageOutput(context))			break;}	}	else{pack.attachInput(key, tupIter.iterator());processOnePackageOutput(context);} }
Reduce ExecutionpublicbooleanprocessOnePackageOutput(Context oc) throws IOException, InterruptedException {    Result res = pack.getNext(DUMMYTUPLE);if(res.returnStatus==POStatus.STATUS_OK){        Tuple packRes = (Tuple)res.result;	   //...........for (int i = 0; i < roots.length; i++) {roots[i].attachInput(packRes);}runPipeline(leaf);    }if(res.returnStatus==POStatus.STATUS_NULL) {returnfalse;}    //...........if(res.returnStatus==POStatus.STATUS_EOP) {returntrue;}    returnfalse;}
Physical Plan ExecutionPhysicalPlan extends OperatorPlan<PhysicalOperator>Operation on GraphPhysicalOperator as vertexEach vertex has a group of getNext() methodsprocessInput() if necessary
Physical Plan Execution	public Result getNext(Tuple t) throwsExecException{//...........	     Result res = new Result();try {res.result = loader.getNext();if(res.result==null){res.returnStatus = POStatus.STATUS_EOP;tearDown();}elseres.returnStatus = POStatus.STATUS_OK;if (res.returnStatus == POStatus.STATUS_OK)res.result = illustratorMarkup(res, res.result, 0);        } catch (IOException e) {log.error("Received error from loader function: " + e);return res;}return res;}
Physical Plan Executionpublic Result getNext(Tuple t) throwsExecException {        Result res = null;        Result inp = null;while (true) {inp = processInput();if (inp.returnStatus == POStatus.STATUS_EOP                    || inp.returnStatus == POStatus.STATUS_ERR)break;illustratorMarkup(inp.result, null, 0);// illustrator ignore LIMIT before the post processingif ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)inp.returnStatus = POStatus.STATUS_EOP;soFar++;break;}returninp;}
UDF/Built-In InvocationPOUserFunc

More Related Content

Pig Map Reduce Execution

  • 1. Pig¨s Map Reduce Executionxiafei.qiu@PCA
  • 2. AgendaData typeData structurePig-Latin to Map-Reduce job compilationPhysical Plan ExecutionUDF Invocation
  • 3. Data TypeTupleAn ordered list of Data.DefaultTuple has List<Object> mFieldsDataBagA collection of Tuples.Memory Manager calls spill() to spill to diskMap C Java TypeInteger, Double, etc.. C Java Type
  • 5. Map-Reduce CompilationPig-Latin to Logical PlanParser invoke logicalPlanBuilderLogical Plan to Physical PlanLogToPhyTranslationVisitor group, distinctLR-GR-PackJoin: LR-GR-JoinPack(with inner foreach)
  • 6. Map-Reduce CompilationPhysical Plan to Map-Reduce PlanA MROperator stands for a MR jobTraverse in topological orderIf POLoad or GlobalRearrnge, new MR operator/job
  • 9. Map Executionprotectedvoid map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException { //...........for (PhysicalOperator root : roots) {if (inIllustrator) {if (root != null) { root.attachInput(inpTuple);} } else { root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));}} runPipeline(leaf);}
  • 10. Map Executionprotectedvoid runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {while(true){ Result res = leaf.getNext(DUMMYTUPLE);if(res.returnStatus==POStatus.STATUS_OK){ collect(outputCollector,(Tuple)res.result);continue;}}//...........}
  • 11. Reduce Executionprotectedvoid reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) throws IOException, InterruptedException {//...........if (packinstanceofPOJoinPackage){pack.attachInput(key, tupIter.iterator()); while (true) { if (processOnePackageOutput(context)) break;} } else{pack.attachInput(key, tupIter.iterator());processOnePackageOutput(context);} }
  • 12. Reduce ExecutionpublicbooleanprocessOnePackageOutput(Context oc) throws IOException, InterruptedException { Result res = pack.getNext(DUMMYTUPLE);if(res.returnStatus==POStatus.STATUS_OK){ Tuple packRes = (Tuple)res.result; //...........for (int i = 0; i < roots.length; i++) {roots[i].attachInput(packRes);}runPipeline(leaf); }if(res.returnStatus==POStatus.STATUS_NULL) {returnfalse;} //...........if(res.returnStatus==POStatus.STATUS_EOP) {returntrue;} returnfalse;}
  • 13. Physical Plan ExecutionPhysicalPlan extends OperatorPlan<PhysicalOperator>Operation on GraphPhysicalOperator as vertexEach vertex has a group of getNext() methodsprocessInput() if necessary
  • 14. Physical Plan Execution public Result getNext(Tuple t) throwsExecException{//........... Result res = new Result();try {res.result = loader.getNext();if(res.result==null){res.returnStatus = POStatus.STATUS_EOP;tearDown();}elseres.returnStatus = POStatus.STATUS_OK;if (res.returnStatus == POStatus.STATUS_OK)res.result = illustratorMarkup(res, res.result, 0); } catch (IOException e) {log.error("Received error from loader function: " + e);return res;}return res;}
  • 15. Physical Plan Executionpublic Result getNext(Tuple t) throwsExecException { Result res = null; Result inp = null;while (true) {inp = processInput();if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)break;illustratorMarkup(inp.result, null, 0);// illustrator ignore LIMIT before the post processingif ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)inp.returnStatus = POStatus.STATUS_EOP;soFar++;break;}returninp;}