Pig's MapReduce execution involves compiling Pig Latin scripts to logical and physical plans, then executing the physical plan as a series of MapReduce jobs. The physical plan is represented as a directed acyclic graph of physical operators. Each operator implements methods like getNext() to process tuples and pass them between operators during execution. Map and reduce functions process input tuples by attaching them to the appropriate physical operators and running the operator pipelines.
3. Data TypeTupleAn ordered list of Data.DefaultTuple has List<Object> mFieldsDataBagA collection of Tuples.Memory Manager calls spill() to spill to diskMap C Java TypeInteger, Double, etc.. C Java Type
5. Map-Reduce CompilationPig-Latin to Logical PlanParser invoke logicalPlanBuilderLogical Plan to Physical PlanLogToPhyTranslationVisitor group, distinctLR-GR-PackJoin: LR-GR-JoinPack(with inner foreach)
6. Map-Reduce CompilationPhysical Plan to Map-Reduce PlanA MROperator stands for a MR jobTraverse in topological orderIf POLoad or GlobalRearrnge, new MR operator/job
12. Reduce ExecutionpublicbooleanprocessOnePackageOutput(Context oc) throws IOException, InterruptedException { Result res = pack.getNext(DUMMYTUPLE);if(res.returnStatus==POStatus.STATUS_OK){ Tuple packRes = (Tuple)res.result; //...........for (int i = 0; i < roots.length; i++) {roots[i].attachInput(packRes);}runPipeline(leaf); }if(res.returnStatus==POStatus.STATUS_NULL) {returnfalse;} //...........if(res.returnStatus==POStatus.STATUS_EOP) {returntrue;} returnfalse;}
13. Physical Plan ExecutionPhysicalPlan extends OperatorPlan<PhysicalOperator>Operation on GraphPhysicalOperator as vertexEach vertex has a group of getNext() methodsprocessInput() if necessary
14. Physical Plan Execution public Result getNext(Tuple t) throwsExecException{//........... Result res = new Result();try {res.result = loader.getNext();if(res.result==null){res.returnStatus = POStatus.STATUS_EOP;tearDown();}elseres.returnStatus = POStatus.STATUS_OK;if (res.returnStatus == POStatus.STATUS_OK)res.result = illustratorMarkup(res, res.result, 0); } catch (IOException e) {log.error("Received error from loader function: " + e);return res;}return res;}
15. Physical Plan Executionpublic Result getNext(Tuple t) throwsExecException { Result res = null; Result inp = null;while (true) {inp = processInput();if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)break;illustratorMarkup(inp.result, null, 0);// illustrator ignore LIMIT before the post processingif ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)inp.returnStatus = POStatus.STATUS_EOP;soFar++;break;}returninp;}