際際滷

際際滷Share a Scribd company logo
Taewook Eom 
Data Infrastructure Group 
SK planet 
taewook@sk.com 
2014-09-25 
Programming Cascading
Big Data Processing 
 螻一 
螻 一
Cascading 
http://www.cascading.org/ 
Since 2007, by Chris Wensel (CTO, founder of Concurrent, Inc.)
Cascading 
一危磯伎 蠍一 螳螻 覦郁(危 一一) 觜 豢 螻 
壱殊伎 data workflow 觜讀 襦語 蟯襴襯   語
http://docs.cascading.org/cascading/2.5/userguide/pdf/userguide.pdf 
http://docs.cascading.org/impatient/ https://github.com/Cascading/Impatient 
Cascading for the Impatient
Cascading 
Flow Planner螳  螻 螻(compile time)   p.23 
一一   p.31 
一  
危  郁屋  
譟伎 蠏碁  -> DAG  p.37 
DAG(Directed Acyclic Graph) 
Data work flow   
れ 一危 豌襴 讌 : Microsoft Dryad, Apache Tez, Apache Spark 
壱殊伎 蟆曙  
朱Μ 螻  覓朱Μ 螻朱 豸 螳 p.24 
蟆一 旧朱 ろ襷 覓朱Μ ろ 螻 覦讌  p.36 
 JAR 朱 れ 蠏覈 (Same JAR, any scale) p.33 
觜讀 襦讌, ろ 牛,  ろ,  蟆,  豌襴 覈  
伎 覲旧′ 豢 p.172 
Ad-hoc Query 觜襯  覲企る Hive豌  豌襴 覈朱 ETL  p.142 
JAVA 螳覦れ蟆 旧 蟲 谿 p.172
Cascading Terminology 
http://docs.cascading.org/cascading/2.5/userguide/html/ch03.html#N2013B 3.1 Terminology 
Pipe: Data stream 
Filter: Data operation 
Tuple: Data record 
Branch: 覿蠍磯 覲  螳 危 郁屋 
Pipe Assembly: Pipe branchれ 郁屋 讌 
Tuple Stream: Pipe branch assembly襯 糾骸 Tupleれ 一 
Tap: Data source/sink 
Flow: Tapり骸 郁屋  螳 伎 pipe assemblyれ 郁屋 
Cascade 
Flow 讌朱  襦語る ろ 
Flow るジ flow 一危 譟伎煙 襷譟焔 蟾讌 ろ讌
Pipe Types 
http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20276 Types of Pipes 
Each 
Filter, Function  
Filter Tuple 襷 螳 
Function  豢螳/覲蟆所骸  Tuple 豢 螳 
Function 蠍磯蓋 Output Selector Fields.RESULT 
Every 
GroupBy, CoGroup 蟆郁骸襷  
Aggregator, Buffer  
Function, Aggregator, Buffer Output Selector  蠎 讌
http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20438 The Each and Every Pipes
http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20438 The Each and Every Pipes
http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20438 The Each and Every Pipes
Buffer vs. Aggregator 
∬概旧 
GroupBy, CoGroup 蟆郁骸 伎襷  
Aggregator Buffer 蠍磯蓋 Output Selector Fields.ALL 
谿伎 
Aggregator chained讌襷 Buffer chained讌  
Buffer  group   螳 蟆郁骸 tuple 豢 螳 
Buffer Aggregator襯 螳 蟲  朱襦 Aggregator Buffer 豪 豕  覲   
pipe = new GroupBy(pipe, new Fields("mdn"), new Fields("log_time")); pipe = new Every(pipe, new Count(new Fields("count"))); pipe = new Every(pipe, new Fields("mdn"), new DistinctCount(new Fields("unique_mdn_cnt"))); pipe = new Every(pipe, new Fields("pay_amt"), new Sum(new Fields("sum"), long.class)); pipe = new Every(pipe, new Fields("log_time"), new Last(new Fields("last_time")));
Pipe Types 
http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20276 Types of Pipes 
Merge 
Unsorted merge 
螳   螳讌  伎 Pipeれ  stream朱 覲 
Grouping 讌  GroupBy覲企 觜襴 (Aggregator/Buffer 覿螳) 
GroupBy 
Key   Sorted merge 
螳   螳讌  伎 Pipeる 覲 螳 
Group    ( 讌讌襷 2谿  螳) 
Grouping 襷れ Every襯  譴觜  
Grouping  grouping fields襯 伎 Merge 覲企 襴 
grouping fields  natural order襦  
2谿  螳 
2谿  讌讌 朱 group 伎  讌襷  觜襯願  
Fields sortFields = new Fields("value1", "value2"); sortFields.setComparator("value1", Collections.reverseOrder()); Pipe groupBy = new GroupBy(assembly, groupFields, sortFields);
Pipe Types 
http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20276 Types of Pipes 
襦 るジ fields 螳讌  伎 stream 螻牛 fields 螳 蠍一朱 Join 
CoGroup 
SQL join螻 (InnerJoin, OuterJoin, LeftJoin, RightJoin, MixedJoin) 
outer join 蟆曙 譟伎讌 fieldsれ null襦 豈 
蟆郁骸 覈 stream 覈 fields螳 豢ル蠍 覓語, 覈 stream fieldsれ 譴覲給 企    
譴覲給 企  蟆曙 declaredFields 語襦 覲蟆 螳 
field 襦 讌襷豢. field 企 螳覦襯   訖 
觜襯 join  るジ讓 stream 覈 unique key tuple(bag) 覃覈襴   
れ 螳ロ 螻豺襯 伎覃 覃覈襴 disk襦 磯伎 讌(焔 ) 
螻豺螳  蟆曙 覃覈襴  覦 
∬  group 螳 殊曙 j  螻豺襯 譟一覃 豕螻 焔 覦 
HashJoin 
 螳  stream螻  streamれ join 豕 (Map-side Join) 
るジ讓 stream 覈 覃覈襴 l 觜襯願 觜蟲 一 (group  豌 覃覈襴 襴) 
Group   襦 Join CoGroup 覲企 觜襴 
Group 譟伎讌  aggregator buffer螳 る磯ゴ讌 覈詩 
CheckPoint襯 HashJoin 讌 l 蟆  stream 覈 ろ 磯 覦
http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20630 CoGroup
String inPath = args[ 0 ]; String outPath = args[ 1 ]; Properties properties = new Properties(); AppProps.setApplicationJarClass( properties, Main.class ); HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties ); Tap inTap = new Hfs( new TextDelimited( true, "t" ), inPath ); Tap outTap = new Hfs( new TextDelimited( true, "t" ), outPath ); Pipe copyPipe = new Pipe( "copy" ); FlowDef flowDef = FlowDef.flowDef() .addSource( copyPipe, inTap ) .addTailSink( copyPipe, outTap ); flowConnector.connect( flowDef ).complete(); 
https://github.com/Cascading/Impatient/blob/master/part1/src/main/java/impatient/Main.java 
p.29 1.2 豐螳 貅れ企 襴貅伎
 Tap docTap = new Hfs( new TextDelimited( true, "t" ), docPath ); Tap wcTap = new Hfs( new TextDelimited( true, "t" ), wcPath ); Fields token = new Fields( "token" ); Fields text = new Fields( "text" ); RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ [](),.]" ); Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS ); Pipe wcPipe = new Pipe( "wc", docPipe ); wcPipe = new GroupBy( wcPipe, token ); wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); FlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ).addTailSink( wcPipe, wcTap );  
https://github.com/Cascading/Impatient/blob/master/part2/src/main/java/impatient/Main.java 
p.37 1.5   瑚鍵
 Fields token = new Fields( "token" ); Fields text = new Fields( "text" ); RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ [](),.]" ); Fields fieldSelector = new Fields( "doc_id", "token" ); Pipe docPipe = new Each( "token", text, splitter, fieldSelector ); Fields scrubArguments = new Fields( "doc_id", "token" ); docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS ); Pipe wcPipe = new Pipe( "wc", docPipe ); wcPipe = new Retain( wcPipe, token ); wcPipe = new GroupBy( wcPipe, token ); wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); FlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ).addTailSink( wcPipe, wcTap ); Flow wcFlow = flowConnector.connect( flowDef ); wcFlow.writeDOT( "dot/wc.dot" ); wcFlow.complete(); 
https://github.com/Cascading/Impatient/blob/master/part3/src/main/java/impatient/Main.java 
doc_id text doc01 A rain shadow is a dry area on the lee back  doc02 This sinking, dry air produces a rain shadow,  doc03 A rain shadow is an area of dry land that lies   
p.55 2.2  る蠍
public class ScrubFunction extends BaseOperation implements Function { public ScrubFunction( Fields fieldDeclaration ) { super( 2, fieldDeclaration ); } public void operate( FlowProcess flowProcess, FunctionCall functionCall ) { TupleEntry argument = functionCall.getArguments(); String doc_id = argument.getString( 0 ); String token = scrubText( argument.getString( 1 ) ); if( token.length() > 0 ) { Tuple result = new Tuple(); result.add( doc_id ); result.add( token ); functionCall.getOutputCollector().add( result ); } } public String scrubText( String text ) { return text.trim().toLowerCase(); } } 
https://github.com/Cascading/Impatient/blob/master/part3/src/main/java/impatient/ScrubFunction.java 
p.49 2.1   一
p.55 2.2  る蠍
 String stopPath = args[ 2 ];  Fields stop = new Fields( "stop" ); Tap stopTap = new Hfs( new TextDelimited( stop, true, "t" ), stopPath );  Fields scrubArguments = new Fields( "doc_id", "token" ); docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS ); Pipe stopPipe = new Pipe( "stop" ); Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() ); tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) ); Pipe wcPipe = new Pipe( "wc", tokenPipe ); wcPipe = new Retain( wcPipe, token ); wcPipe = new GroupBy( wcPipe, token ); wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); FlowDef flowDef = FlowDef.flowDef() .setName( "wc" ) .addSource( docPipe, docTap ) .addSource( stopPipe, stopTap ) .addTailSink( wcPipe, wcTap );  
https://github.com/Cascading/Impatient/blob/master/part4/src/main/java/impatient/Main.java 
stop a about after all along an and Any  
p.57 2.3 覲旧 譟一
p.57 2.3 覲旧 譟一
 String tfidfPath = args[ 3 ];  Fields fieldSelector = new Fields( "doc_id", "token" ); tokenPipe = new Retain( tokenPipe, fieldSelector );  Pipe tfPipe = new Pipe( "TF", tokenPipe ); Fields tf_count = new Fields( "tf_count" ); tfPipe = new CountBy( tfPipe, new Fields( "doc_id", "token" ), tf_count ); Fields tf_token = new Fields( "tf_token" ); tfPipe = new Rename( tfPipe, token, tf_token ); Fields doc_id = new Fields( "doc_id" ); Fields tally = new Fields( "tally" ); Fields rhs_join = new Fields( "rhs_join" ); Fields n_docs = new Fields( "n_docs" ); Pipe dPipe = new Unique( "D", tokenPipe, doc_id ); dPipe = new Each( dPipe, new Insert( tally, 1 ), Fields.ALL ); dPipe = new Each( dPipe, new Insert( rhs_join, 1 ), Fields.ALL ); dPipe = new SumBy( dPipe, rhs_join, tally, n_docs, long.class ); Pipe dfPipe = new Unique( "DF", tokenPipe, Fields.ALL ); Fields df_count = new Fields( "df_count" ); dfPipe = new CountBy( dfPipe, token, df_count ); Fields df_token = new Fields( "df_token" ); Fields lhs_join = new Fields( "lhs_join" ); dfPipe = new Rename( dfPipe, token, df_token ); dfPipe = new Each( dfPipe, new Insert( lhs_join, 1 ), Fields.ALL ); Pipe idfPipe = new HashJoin( dfPipe, lhs_join, dPipe, rhs_join ); 
https://github.com/Cascading/Impatient/blob/master/part5/src/main/java/impatient/Main.java 
p.71 3.1 TF-IDF 蟲 
tfPipe: (doc_id, tf_token, tf_count) 
dPipe: (doc_id) 
dfPipe: (doc_id, token) 
tfPipe: (doc_id, token, tf_count) 
dPipe: (doc_id, tally) 
dPipe: (doc_id, tally, rhs_join) 
dPipe: (rhs_join, n_docs) 
dfPipe: (token, df_count) 
dfPipe: (df_token, df_count) 
dfPipe: (df_token, df_count, lhs_join) 
idfPipe: (df_token, df_count, lhs_join, rhs_join, n_docs)
Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfPipe, df_token ); Fields tfidf = new Fields( "tfidf" ); String expression = "(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )"; ExpressionFunction tfidfExpression = new ExpressionFunction( tfidf, expression, Double.class ); Fields tfidfArguments = new Fields( "tf_count", "df_count", "n_docs" ); tfidfPipe = new Each( tfidfPipe, tfidfArguments, tfidfExpression, Fields.ALL ); fieldSelector = new Fields( "tf_token", "doc_id", "tfidf" ); tfidfPipe = new Retain( tfidfPipe, fieldSelector ); tfidfPipe = new Rename( tfidfPipe, tf_token, token ); Pipe wcPipe = new Pipe( "wc", tfPipe ); Fields count = new Fields( "count" ); wcPipe = new SumBy( wcPipe, tf_token, tf_count, count, long.class ); wcPipe = new Rename( wcPipe, tf_token, token ); wcPipe = new GroupBy( wcPipe, count, count ); FlowDef flowDef = FlowDef.flowDef() .setName( "tfidf" ) .addSource( docPipe, docTap ) .addSource( stopPipe, stopTap ) .addTailSink( tfidfPipe, tfidfTap ) .addTailSink( wcPipe, wcTap );  
https://github.com/Cascading/Impatient/blob/master/part5/src/main/java/impatient/Main.java 
p.71 3.1 TF-IDF 蟲 
tfidfPipe: (doc_id, tf_token, tf_count, 
df_token, df_count, lhs_join, rhs_join, n_docs) 
tfidfPipe: (doc_id, tf_token, tf_count, df_token, df_count, lhs_join, rhs_join, n_docs, tfidf) 
tfidfPipe: (tf_token, doc_id, tfidf) 
tfidfPipe: (token, doc_id, tfidf)
p.71 3.1 TF-IDF 蟲
Programming Tips 
Local Mode 
Hadoop 蠍  襦貉 殊 伎 螳覦/ろ/一危  
Hadoop API襯 讌 螻, 覃覈襴襷 (覃覈襴 ) 
襦貉 ろ 螳ロ 襦貉螻 Hadoop 覩碁 API 谿  
cascading-hadoop-2.0.x.jar  cascading-local-2.0.x.jar  
FileTap, LocalFlowConnector  
Test p.80 
CascadingTestCase 
Debug http://docs.cascading.org/cascading/2.5/userguide/html/ch09s02.html 
Assert 
http://docs.cascading.org/cascading/2.5/userguide/html/ch08s02.html 
http://docs.cascading.org/cascading/2.5/userguide/html/ch09s09.html 
Trap http://docs.cascading.org/cascading/2.5/userguide/html/ch08s03.html 
Sample http://docs.cascading.org/cascading/2.5/userguide/html/ch09s03.html 
Checkpoint 
http://docs.cascading.org/cascading/2.5/userguide/html/ch08s04.html 
http://docs.cascading.org/cascading/2.5/userguide/html/ch08s05.html
Programming Tips 
 覩 蟲覿朱 SubAssembly Flow襯 襷り Cascade 郁屋 
Flow 郁屋 
Head, Tail, Assemblyれ "企" 牛伎 郁屋覩襦 企 覈 譴 
DAG襦   襷讌襷 sinkる 覿 朱 郁屋 覿 蟆 
Pipe 伎 Pipe 企 覓朱 覦朱襦 覈 企 蟲覿 runtime る 覦讌 
 企 _, 覓語, 襷  
蠍企  Janino compiler襯  Expression  る 覦 
"first-name  企 讌襷, Expression 覃 first-name.trim() 豌 語覃伎 Janino runtime る 覦 
Expression function 覲企 function 蟲 Janino 覓語 螻   
GroupBy sort  class type 襾殊 襷豢蠍 
HDFS   れ 曙朱 覓伎^蟇 String 朱 覲蟆暑 
Operation   覲 property 伎 豕螻 operation constructor  語 蠍郁鍵
Programming Tips 
Reducer 螳 讌 
譴螳 Reducer 螳 
豕譬 Reducer 螳 
Properties properties = new Properties(); properties.put("mapred.reduce.tasks", 10); properties.put("mapred.map.tasks.speculative.execution", "true"); properties.put("mapred.reduce.tasks.speculative.execution", "false"); properties.put("mapred.job.priority", HIGH); AppProps.setApplicationJarClass(properties, Main.class); FlowConnector flowConnector = new HadoopFlowConnector(properties); 
TextDelimited scheme = new TextDelimited(new Fields(key, value), true, "t"); scheme.setNumSinkParts(1); Tap sinkTap = new Hfs(scheme, outputPath, SinkMode.REPLACE);
http://docs.cascading.org/cascading/2.5/userguide/html/ch09.html 9. Built-In Operations 
Identity Function 
Text Functions 
Regular Expression Operations 
Java Expression Operations 
Buffers 
http://docs.cascading.org/cascading/2.5/userguide/html/ch10.html 10. Built-in Assemblies 
AggregateBy (AverageBy, CountBy, SumBy, FirstBy) 
Rename 
Retain 
Unique 
http://docs.cascading.org/cascading/2.5/userguide/html/ch13.html 13. Cookbook 
Programming Tips
Questions? 
Questions.foreach( answer(_) )
public class DistinctCount extends BaseOperation<HashSet<String>> implements Aggregator<HashSet<String>> { public DistinctCount(Fields fieldDeclaration) { super(fieldDeclaration); } @Override public void start(FlowProcess flowProcess, AggregatorCall<HashSet<String>> aggregatorCall) { if (aggregatorCall.getContext() == null) { aggregatorCall.setContext(new HashSet<String>()); } else { aggregatorCall.getContext().clear(); } } @Override public void aggregate(FlowProcess flowProcess, AggregatorCall<HashSet<String>> aggregatorCall) { TupleEntry argument = aggregatorCall.getArguments(); HashSet<String> context = aggregatorCall.getContext(); context.add(argument.getTuple().toString()); } @Override public void complete(FlowProcess flowProcess, AggregatorCall<HashSet<String>> aggregatorCall) { aggregatorCall.getOutputCollector().add(new Tuple(aggregatorCall.getContext().size())); } }

More Related Content

What's hot (20)

NiFi 蠍
NiFi 蠍NiFi 蠍
NiFi 蠍
Byunghwa Yoon
Apache Spark 覓語 襾語蟾讌
Apache Spark 覓語 襾語蟾讌Apache Spark 覓語 襾語蟾讌
Apache Spark 覓語 襾語蟾讌
Donam Kim
The MongoDB Strikes Back / MongoDB
The MongoDB Strikes Back / MongoDB  The MongoDB Strikes Back / MongoDB
The MongoDB Strikes Back / MongoDB
Hyun-woo Park
Data discovery & metadata management (amundsen installation)
Data discovery & metadata management (amundsen installation)Data discovery & metadata management (amundsen installation)
Data discovery & metadata management (amundsen installation)
谿曙
Spark 螳 2覿
Spark 螳 2覿Spark 螳 2覿
Spark 螳 2覿
Jinho Yoo
Pg report 20161010_02
Pg report 20161010_02Pg report 20161010_02
Pg report 20161010_02
PgDay.Seoul
AWSKRUG DS 2020/12 - Let the Airflow in AWS
AWSKRUG DS 2020/12 - Let the Airflow in AWSAWSKRUG DS 2020/12 - Let the Airflow in AWS
AWSKRUG DS 2020/12 - Let the Airflow in AWS
Woong Seok Kang
Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵
Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵
Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵
AWSKRUG - AWS蟲覈
Zeppelin notebook =梶
Zeppelin notebook =梶Zeppelin notebook =梶
Zeppelin notebook =梶
Soo-Kyung Choi
Spark 企蟆 覈伎螻 蟾?
Spark  企蟆  覈伎螻 蟾?Spark  企蟆  覈伎螻 蟾?
Spark 企蟆 覈伎螻 蟾?
KSLUG
2020.02.06 磯Μ glue襯 覯碁?
2020.02.06 磯Μ  glue襯 覯碁?2020.02.06 磯Μ  glue襯 覯碁?
2020.02.06 磯Μ glue襯 覯碁?
Thomas Hyun () Park
Data pipeline and data lake
Data pipeline and data lakeData pipeline and data lake
Data pipeline and data lake
DaeMyung Kang
[113]how can realm_make_efficient_mobile_database
[113]how can realm_make_efficient_mobile_database[113]how can realm_make_efficient_mobile_database
[113]how can realm_make_efficient_mobile_database
NAVER D2
Spark machine learning & deep learning
Spark machine learning & deep learningSpark machine learning & deep learning
Spark machine learning & deep learning
hoondong kim
AWS 襷 AWS 一危 危殊
AWS 襷  AWS  一危 危殊AWS 襷  AWS  一危 危殊
AWS 襷 AWS 一危 危殊
Kim Hyuk
Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1 覓願鍵(譯) 豕 20170912
Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1  覓願鍵(譯) 豕 20170912Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1  覓願鍵(譯) 豕 20170912
Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1 覓願鍵(譯) 豕 20170912
Yooseok Choi
data platform on kubernetes
data platform on kubernetesdata platform on kubernetes
data platform on kubernetes
谿曙
2.apache spark れ
2.apache spark れ2.apache spark れ
2.apache spark れ
[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬
[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬
[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬
PgDay.Seoul
メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...
メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...
メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...
Jemin Huh
Apache Spark 覓語 襾語蟾讌
Apache Spark 覓語 襾語蟾讌Apache Spark 覓語 襾語蟾讌
Apache Spark 覓語 襾語蟾讌
Donam Kim
The MongoDB Strikes Back / MongoDB
The MongoDB Strikes Back / MongoDB  The MongoDB Strikes Back / MongoDB
The MongoDB Strikes Back / MongoDB
Hyun-woo Park
Data discovery & metadata management (amundsen installation)
Data discovery & metadata management (amundsen installation)Data discovery & metadata management (amundsen installation)
Data discovery & metadata management (amundsen installation)
谿曙
Spark 螳 2覿
Spark 螳 2覿Spark 螳 2覿
Spark 螳 2覿
Jinho Yoo
Pg report 20161010_02
Pg report 20161010_02Pg report 20161010_02
Pg report 20161010_02
PgDay.Seoul
AWSKRUG DS 2020/12 - Let the Airflow in AWS
AWSKRUG DS 2020/12 - Let the Airflow in AWSAWSKRUG DS 2020/12 - Let the Airflow in AWS
AWSKRUG DS 2020/12 - Let the Airflow in AWS
Woong Seok Kang
Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵
Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵
Spark + S3 + R3襯 伎 一危 覿 ろ 襷り鍵
AWSKRUG - AWS蟲覈
Zeppelin notebook =梶
Zeppelin notebook =梶Zeppelin notebook =梶
Zeppelin notebook =梶
Soo-Kyung Choi
Spark 企蟆 覈伎螻 蟾?
Spark  企蟆  覈伎螻 蟾?Spark  企蟆  覈伎螻 蟾?
Spark 企蟆 覈伎螻 蟾?
KSLUG
2020.02.06 磯Μ glue襯 覯碁?
2020.02.06 磯Μ  glue襯 覯碁?2020.02.06 磯Μ  glue襯 覯碁?
2020.02.06 磯Μ glue襯 覯碁?
Thomas Hyun () Park
Data pipeline and data lake
Data pipeline and data lakeData pipeline and data lake
Data pipeline and data lake
DaeMyung Kang
[113]how can realm_make_efficient_mobile_database
[113]how can realm_make_efficient_mobile_database[113]how can realm_make_efficient_mobile_database
[113]how can realm_make_efficient_mobile_database
NAVER D2
Spark machine learning & deep learning
Spark machine learning & deep learningSpark machine learning & deep learning
Spark machine learning & deep learning
hoondong kim
AWS 襷 AWS 一危 危殊
AWS 襷  AWS  一危 危殊AWS 襷  AWS  一危 危殊
AWS 襷 AWS 一危 危殊
Kim Hyuk
Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1 覓願鍵(譯) 豕 20170912
Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1  覓願鍵(譯) 豕 20170912Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1  覓願鍵(譯) 豕 20170912
Bigquery airflow襯 伎 一危 覿 ろ 蟲豢 v1 覓願鍵(譯) 豕 20170912
Yooseok Choi
data platform on kubernetes
data platform on kubernetesdata platform on kubernetes
data platform on kubernetes
谿曙
2.apache spark れ
2.apache spark れ2.apache spark れ
2.apache spark れ
[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬
[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬
[Pgday.Seoul 2017] 5. 吴觚(豈) PostgreSQL ロ蠍 - 譟壱譬
PgDay.Seoul
メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...
メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...
メ求 求メ求 п = 釈 - Realtime log monitoring platform-PMon梶 ...
Jemin Huh

Similar to Programming Cascading (20)

7螳讌 覈 れろ豌
7螳讌  覈  れろ豌7螳讌  覈  れろ豌
7螳讌 覈 れろ豌
Sunggon Song
螻 襷
 螻 襷 螻 襷
螻 襷
讌 覦
Programming skills 1覿
Programming skills 1覿Programming skills 1覿
Programming skills 1覿
JiHyung Lee
覿伎る 覲 覦, From c++98 to c++11, 14
覿伎る 覲 覦, From c++98 to c++11, 14 覿伎る 覲 覦, From c++98 to c++11, 14
覿伎る 覲 覦, From c++98 to c++11, 14
覈 蟾
Ryu with OpenFlow 1.3, REST API
Ryu with OpenFlow 1.3, REST APIRyu with OpenFlow 1.3, REST API
Ryu with OpenFlow 1.3, REST API
jieun kim
Cloudera Impala 1.0
Cloudera Impala 1.0Cloudera Impala 1.0
Cloudera Impala 1.0
Minwoo Kim
[Swift] Protocol (2/2)
[Swift] Protocol (2/2)[Swift] Protocol (2/2)
[Swift] Protocol (2/2)
Bill Kim
Presto User & Admin Guide
Presto User & Admin GuidePresto User & Admin Guide
Presto User & Admin Guide
JEONGPHIL HAN
Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...
Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...
Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...
Kyuhyun Byun
Big data analysis with R and Apache Tajo (in Korean)
Big data analysis with R and Apache Tajo (in Korean)Big data analysis with R and Apache Tajo (in Korean)
Big data analysis with R and Apache Tajo (in Korean)
Gruter
Spring boot actuator
Spring boot   actuatorSpring boot   actuator
Spring boot actuator
Choonghyun Yang
20201121 貊 朱讌螻
20201121 貊 朱讌螻20201121 貊 朱讌螻
20201121 貊 朱讌螻
Chiwon Song
螻 2
 螻 2 螻 2
螻 2
HyeonSeok Choi
Introduction to Parallel Programming
Introduction to Parallel ProgrammingIntroduction to Parallel Programming
Introduction to Parallel Programming
UNIST
螳覦襯 襷覯伎 れ 螳企
螳覦襯  襷覯伎 れ  螳企螳覦襯  襷覯伎 れ  螳企
螳覦襯 襷覯伎 れ 螳企
tn457wwhvm
Warp
WarpWarp
Warp
aceigy6322
TABLE ACCESS 伎 伎 SQL _Wh oracle
TABLE ACCESS 伎 伎 SQL _Wh oracleTABLE ACCESS 伎 伎 SQL _Wh oracle
TABLE ACCESS 伎 伎 SQL _Wh oracle
濠覦螳 狩 蠍一
濠覦螳 狩 蠍一濠覦螳 狩 蠍一
濠覦螳 狩 蠍一
jaypi Ko
Storm 企慨蠍
Storm 企慨蠍Storm 企慨蠍
Storm 企慨蠍
beom kyun choi
п1. block chain as a platform
п1. block chain as a platformп1. block chain as a platform
п1. block chain as a platform
Jay JH Park
7螳讌 覈 れろ豌
7螳讌  覈  れろ豌7螳讌  覈  れろ豌
7螳讌 覈 れろ豌
Sunggon Song
螻 襷
 螻 襷 螻 襷
螻 襷
讌 覦
Programming skills 1覿
Programming skills 1覿Programming skills 1覿
Programming skills 1覿
JiHyung Lee
覿伎る 覲 覦, From c++98 to c++11, 14
覿伎る 覲 覦, From c++98 to c++11, 14 覿伎る 覲 覦, From c++98 to c++11, 14
覿伎る 覲 覦, From c++98 to c++11, 14
覈 蟾
Ryu with OpenFlow 1.3, REST API
Ryu with OpenFlow 1.3, REST APIRyu with OpenFlow 1.3, REST API
Ryu with OpenFlow 1.3, REST API
jieun kim
Cloudera Impala 1.0
Cloudera Impala 1.0Cloudera Impala 1.0
Cloudera Impala 1.0
Minwoo Kim
[Swift] Protocol (2/2)
[Swift] Protocol (2/2)[Swift] Protocol (2/2)
[Swift] Protocol (2/2)
Bill Kim
Presto User & Admin Guide
Presto User & Admin GuidePresto User & Admin Guide
Presto User & Admin Guide
JEONGPHIL HAN
Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...
Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...
Golang Project Guide from A to Z: From Feature Development to Enterprise Appl...
Kyuhyun Byun
Big data analysis with R and Apache Tajo (in Korean)
Big data analysis with R and Apache Tajo (in Korean)Big data analysis with R and Apache Tajo (in Korean)
Big data analysis with R and Apache Tajo (in Korean)
Gruter
20201121 貊 朱讌螻
20201121 貊 朱讌螻20201121 貊 朱讌螻
20201121 貊 朱讌螻
Chiwon Song
Introduction to Parallel Programming
Introduction to Parallel ProgrammingIntroduction to Parallel Programming
Introduction to Parallel Programming
UNIST
螳覦襯 襷覯伎 れ 螳企
螳覦襯  襷覯伎 れ  螳企螳覦襯  襷覯伎 れ  螳企
螳覦襯 襷覯伎 れ 螳企
tn457wwhvm
TABLE ACCESS 伎 伎 SQL _Wh oracle
TABLE ACCESS 伎 伎 SQL _Wh oracleTABLE ACCESS 伎 伎 SQL _Wh oracle
TABLE ACCESS 伎 伎 SQL _Wh oracle
濠覦螳 狩 蠍一
濠覦螳 狩 蠍一濠覦螳 狩 蠍一
濠覦螳 狩 蠍一
jaypi Ko
п1. block chain as a platform
п1. block chain as a platformп1. block chain as a platform
п1. block chain as a platform
Jay JH Park

Programming Cascading

  • 1. Taewook Eom Data Infrastructure Group SK planet taewook@sk.com 2014-09-25 Programming Cascading
  • 2. Big Data Processing 螻一 螻 一
  • 3. Cascading http://www.cascading.org/ Since 2007, by Chris Wensel (CTO, founder of Concurrent, Inc.)
  • 4. Cascading 一危磯伎 蠍一 螳螻 覦郁(危 一一) 觜 豢 螻 壱殊伎 data workflow 觜讀 襦語 蟯襴襯 語
  • 6. Cascading Flow Planner螳 螻 螻(compile time) p.23 一一 p.31 一 危 郁屋 譟伎 蠏碁 -> DAG p.37 DAG(Directed Acyclic Graph) Data work flow れ 一危 豌襴 讌 : Microsoft Dryad, Apache Tez, Apache Spark 壱殊伎 蟆曙 朱Μ 螻 覓朱Μ 螻朱 豸 螳 p.24 蟆一 旧朱 ろ襷 覓朱Μ ろ 螻 覦讌 p.36 JAR 朱 れ 蠏覈 (Same JAR, any scale) p.33 觜讀 襦讌, ろ 牛, ろ, 蟆, 豌襴 覈 伎 覲旧′ 豢 p.172 Ad-hoc Query 觜襯 覲企る Hive豌 豌襴 覈朱 ETL p.142 JAVA 螳覦れ蟆 旧 蟲 谿 p.172
  • 7. Cascading Terminology http://docs.cascading.org/cascading/2.5/userguide/html/ch03.html#N2013B 3.1 Terminology Pipe: Data stream Filter: Data operation Tuple: Data record Branch: 覿蠍磯 覲 螳 危 郁屋 Pipe Assembly: Pipe branchれ 郁屋 讌 Tuple Stream: Pipe branch assembly襯 糾骸 Tupleれ 一 Tap: Data source/sink Flow: Tapり骸 郁屋 螳 伎 pipe assemblyれ 郁屋 Cascade Flow 讌朱 襦語る ろ Flow るジ flow 一危 譟伎煙 襷譟焔 蟾讌 ろ讌
  • 8. Pipe Types http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20276 Types of Pipes Each Filter, Function Filter Tuple 襷 螳 Function 豢螳/覲蟆所骸 Tuple 豢 螳 Function 蠍磯蓋 Output Selector Fields.RESULT Every GroupBy, CoGroup 蟆郁骸襷 Aggregator, Buffer Function, Aggregator, Buffer Output Selector 蠎 讌
  • 12. Buffer vs. Aggregator ∬概旧 GroupBy, CoGroup 蟆郁骸 伎襷 Aggregator Buffer 蠍磯蓋 Output Selector Fields.ALL 谿伎 Aggregator chained讌襷 Buffer chained讌 Buffer group 螳 蟆郁骸 tuple 豢 螳 Buffer Aggregator襯 螳 蟲 朱襦 Aggregator Buffer 豪 豕 覲 pipe = new GroupBy(pipe, new Fields("mdn"), new Fields("log_time")); pipe = new Every(pipe, new Count(new Fields("count"))); pipe = new Every(pipe, new Fields("mdn"), new DistinctCount(new Fields("unique_mdn_cnt"))); pipe = new Every(pipe, new Fields("pay_amt"), new Sum(new Fields("sum"), long.class)); pipe = new Every(pipe, new Fields("log_time"), new Last(new Fields("last_time")));
  • 13. Pipe Types http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20276 Types of Pipes Merge Unsorted merge 螳 螳讌 伎 Pipeれ stream朱 覲 Grouping 讌 GroupBy覲企 觜襴 (Aggregator/Buffer 覿螳) GroupBy Key Sorted merge 螳 螳讌 伎 Pipeる 覲 螳 Group ( 讌讌襷 2谿 螳) Grouping 襷れ Every襯 譴觜 Grouping grouping fields襯 伎 Merge 覲企 襴 grouping fields natural order襦 2谿 螳 2谿 讌讌 朱 group 伎 讌襷 觜襯願 Fields sortFields = new Fields("value1", "value2"); sortFields.setComparator("value1", Collections.reverseOrder()); Pipe groupBy = new GroupBy(assembly, groupFields, sortFields);
  • 14. Pipe Types http://docs.cascading.org/cascading/2.5/userguide/html/ch03s03.html#N20276 Types of Pipes 襦 るジ fields 螳讌 伎 stream 螻牛 fields 螳 蠍一朱 Join CoGroup SQL join螻 (InnerJoin, OuterJoin, LeftJoin, RightJoin, MixedJoin) outer join 蟆曙 譟伎讌 fieldsれ null襦 豈 蟆郁骸 覈 stream 覈 fields螳 豢ル蠍 覓語, 覈 stream fieldsれ 譴覲給 企 譴覲給 企 蟆曙 declaredFields 語襦 覲蟆 螳 field 襦 讌襷豢. field 企 螳覦襯 訖 觜襯 join るジ讓 stream 覈 unique key tuple(bag) 覃覈襴 れ 螳ロ 螻豺襯 伎覃 覃覈襴 disk襦 磯伎 讌(焔 ) 螻豺螳 蟆曙 覃覈襴 覦 ∬ group 螳 殊曙 j 螻豺襯 譟一覃 豕螻 焔 覦 HashJoin 螳 stream螻 streamれ join 豕 (Map-side Join) るジ讓 stream 覈 覃覈襴 l 觜襯願 觜蟲 一 (group 豌 覃覈襴 襴) Group 襦 Join CoGroup 覲企 觜襴 Group 譟伎讌 aggregator buffer螳 る磯ゴ讌 覈詩 CheckPoint襯 HashJoin 讌 l 蟆 stream 覈 ろ 磯 覦
  • 16. String inPath = args[ 0 ]; String outPath = args[ 1 ]; Properties properties = new Properties(); AppProps.setApplicationJarClass( properties, Main.class ); HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties ); Tap inTap = new Hfs( new TextDelimited( true, "t" ), inPath ); Tap outTap = new Hfs( new TextDelimited( true, "t" ), outPath ); Pipe copyPipe = new Pipe( "copy" ); FlowDef flowDef = FlowDef.flowDef() .addSource( copyPipe, inTap ) .addTailSink( copyPipe, outTap ); flowConnector.connect( flowDef ).complete(); https://github.com/Cascading/Impatient/blob/master/part1/src/main/java/impatient/Main.java p.29 1.2 豐螳 貅れ企 襴貅伎
  • 17. Tap docTap = new Hfs( new TextDelimited( true, "t" ), docPath ); Tap wcTap = new Hfs( new TextDelimited( true, "t" ), wcPath ); Fields token = new Fields( "token" ); Fields text = new Fields( "text" ); RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ [](),.]" ); Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS ); Pipe wcPipe = new Pipe( "wc", docPipe ); wcPipe = new GroupBy( wcPipe, token ); wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); FlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ).addTailSink( wcPipe, wcTap ); https://github.com/Cascading/Impatient/blob/master/part2/src/main/java/impatient/Main.java p.37 1.5 瑚鍵
  • 18. Fields token = new Fields( "token" ); Fields text = new Fields( "text" ); RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ [](),.]" ); Fields fieldSelector = new Fields( "doc_id", "token" ); Pipe docPipe = new Each( "token", text, splitter, fieldSelector ); Fields scrubArguments = new Fields( "doc_id", "token" ); docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS ); Pipe wcPipe = new Pipe( "wc", docPipe ); wcPipe = new Retain( wcPipe, token ); wcPipe = new GroupBy( wcPipe, token ); wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); FlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ).addTailSink( wcPipe, wcTap ); Flow wcFlow = flowConnector.connect( flowDef ); wcFlow.writeDOT( "dot/wc.dot" ); wcFlow.complete(); https://github.com/Cascading/Impatient/blob/master/part3/src/main/java/impatient/Main.java doc_id text doc01 A rain shadow is a dry area on the lee back doc02 This sinking, dry air produces a rain shadow, doc03 A rain shadow is an area of dry land that lies p.55 2.2 る蠍
  • 19. public class ScrubFunction extends BaseOperation implements Function { public ScrubFunction( Fields fieldDeclaration ) { super( 2, fieldDeclaration ); } public void operate( FlowProcess flowProcess, FunctionCall functionCall ) { TupleEntry argument = functionCall.getArguments(); String doc_id = argument.getString( 0 ); String token = scrubText( argument.getString( 1 ) ); if( token.length() > 0 ) { Tuple result = new Tuple(); result.add( doc_id ); result.add( token ); functionCall.getOutputCollector().add( result ); } } public String scrubText( String text ) { return text.trim().toLowerCase(); } } https://github.com/Cascading/Impatient/blob/master/part3/src/main/java/impatient/ScrubFunction.java p.49 2.1 一
  • 20. p.55 2.2 る蠍
  • 21. String stopPath = args[ 2 ]; Fields stop = new Fields( "stop" ); Tap stopTap = new Hfs( new TextDelimited( stop, true, "t" ), stopPath ); Fields scrubArguments = new Fields( "doc_id", "token" ); docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS ); Pipe stopPipe = new Pipe( "stop" ); Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() ); tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) ); Pipe wcPipe = new Pipe( "wc", tokenPipe ); wcPipe = new Retain( wcPipe, token ); wcPipe = new GroupBy( wcPipe, token ); wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); FlowDef flowDef = FlowDef.flowDef() .setName( "wc" ) .addSource( docPipe, docTap ) .addSource( stopPipe, stopTap ) .addTailSink( wcPipe, wcTap ); https://github.com/Cascading/Impatient/blob/master/part4/src/main/java/impatient/Main.java stop a about after all along an and Any p.57 2.3 覲旧 譟一
  • 22. p.57 2.3 覲旧 譟一
  • 23. String tfidfPath = args[ 3 ]; Fields fieldSelector = new Fields( "doc_id", "token" ); tokenPipe = new Retain( tokenPipe, fieldSelector ); Pipe tfPipe = new Pipe( "TF", tokenPipe ); Fields tf_count = new Fields( "tf_count" ); tfPipe = new CountBy( tfPipe, new Fields( "doc_id", "token" ), tf_count ); Fields tf_token = new Fields( "tf_token" ); tfPipe = new Rename( tfPipe, token, tf_token ); Fields doc_id = new Fields( "doc_id" ); Fields tally = new Fields( "tally" ); Fields rhs_join = new Fields( "rhs_join" ); Fields n_docs = new Fields( "n_docs" ); Pipe dPipe = new Unique( "D", tokenPipe, doc_id ); dPipe = new Each( dPipe, new Insert( tally, 1 ), Fields.ALL ); dPipe = new Each( dPipe, new Insert( rhs_join, 1 ), Fields.ALL ); dPipe = new SumBy( dPipe, rhs_join, tally, n_docs, long.class ); Pipe dfPipe = new Unique( "DF", tokenPipe, Fields.ALL ); Fields df_count = new Fields( "df_count" ); dfPipe = new CountBy( dfPipe, token, df_count ); Fields df_token = new Fields( "df_token" ); Fields lhs_join = new Fields( "lhs_join" ); dfPipe = new Rename( dfPipe, token, df_token ); dfPipe = new Each( dfPipe, new Insert( lhs_join, 1 ), Fields.ALL ); Pipe idfPipe = new HashJoin( dfPipe, lhs_join, dPipe, rhs_join ); https://github.com/Cascading/Impatient/blob/master/part5/src/main/java/impatient/Main.java p.71 3.1 TF-IDF 蟲 tfPipe: (doc_id, tf_token, tf_count) dPipe: (doc_id) dfPipe: (doc_id, token) tfPipe: (doc_id, token, tf_count) dPipe: (doc_id, tally) dPipe: (doc_id, tally, rhs_join) dPipe: (rhs_join, n_docs) dfPipe: (token, df_count) dfPipe: (df_token, df_count) dfPipe: (df_token, df_count, lhs_join) idfPipe: (df_token, df_count, lhs_join, rhs_join, n_docs)
  • 24. Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfPipe, df_token ); Fields tfidf = new Fields( "tfidf" ); String expression = "(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )"; ExpressionFunction tfidfExpression = new ExpressionFunction( tfidf, expression, Double.class ); Fields tfidfArguments = new Fields( "tf_count", "df_count", "n_docs" ); tfidfPipe = new Each( tfidfPipe, tfidfArguments, tfidfExpression, Fields.ALL ); fieldSelector = new Fields( "tf_token", "doc_id", "tfidf" ); tfidfPipe = new Retain( tfidfPipe, fieldSelector ); tfidfPipe = new Rename( tfidfPipe, tf_token, token ); Pipe wcPipe = new Pipe( "wc", tfPipe ); Fields count = new Fields( "count" ); wcPipe = new SumBy( wcPipe, tf_token, tf_count, count, long.class ); wcPipe = new Rename( wcPipe, tf_token, token ); wcPipe = new GroupBy( wcPipe, count, count ); FlowDef flowDef = FlowDef.flowDef() .setName( "tfidf" ) .addSource( docPipe, docTap ) .addSource( stopPipe, stopTap ) .addTailSink( tfidfPipe, tfidfTap ) .addTailSink( wcPipe, wcTap ); https://github.com/Cascading/Impatient/blob/master/part5/src/main/java/impatient/Main.java p.71 3.1 TF-IDF 蟲 tfidfPipe: (doc_id, tf_token, tf_count, df_token, df_count, lhs_join, rhs_join, n_docs) tfidfPipe: (doc_id, tf_token, tf_count, df_token, df_count, lhs_join, rhs_join, n_docs, tfidf) tfidfPipe: (tf_token, doc_id, tfidf) tfidfPipe: (token, doc_id, tfidf)
  • 26. Programming Tips Local Mode Hadoop 蠍 襦貉 殊 伎 螳覦/ろ/一危 Hadoop API襯 讌 螻, 覃覈襴襷 (覃覈襴 ) 襦貉 ろ 螳ロ 襦貉螻 Hadoop 覩碁 API 谿 cascading-hadoop-2.0.x.jar cascading-local-2.0.x.jar FileTap, LocalFlowConnector Test p.80 CascadingTestCase Debug http://docs.cascading.org/cascading/2.5/userguide/html/ch09s02.html Assert http://docs.cascading.org/cascading/2.5/userguide/html/ch08s02.html http://docs.cascading.org/cascading/2.5/userguide/html/ch09s09.html Trap http://docs.cascading.org/cascading/2.5/userguide/html/ch08s03.html Sample http://docs.cascading.org/cascading/2.5/userguide/html/ch09s03.html Checkpoint http://docs.cascading.org/cascading/2.5/userguide/html/ch08s04.html http://docs.cascading.org/cascading/2.5/userguide/html/ch08s05.html
  • 27. Programming Tips 覩 蟲覿朱 SubAssembly Flow襯 襷り Cascade 郁屋 Flow 郁屋 Head, Tail, Assemblyれ "企" 牛伎 郁屋覩襦 企 覈 譴 DAG襦 襷讌襷 sinkる 覿 朱 郁屋 覿 蟆 Pipe 伎 Pipe 企 覓朱 覦朱襦 覈 企 蟲覿 runtime る 覦讌 企 _, 覓語, 襷 蠍企 Janino compiler襯 Expression る 覦 "first-name 企 讌襷, Expression 覃 first-name.trim() 豌 語覃伎 Janino runtime る 覦 Expression function 覲企 function 蟲 Janino 覓語 螻 GroupBy sort class type 襾殊 襷豢蠍 HDFS れ 曙朱 覓伎^蟇 String 朱 覲蟆暑 Operation 覲 property 伎 豕螻 operation constructor 語 蠍郁鍵
  • 28. Programming Tips Reducer 螳 讌 譴螳 Reducer 螳 豕譬 Reducer 螳 Properties properties = new Properties(); properties.put("mapred.reduce.tasks", 10); properties.put("mapred.map.tasks.speculative.execution", "true"); properties.put("mapred.reduce.tasks.speculative.execution", "false"); properties.put("mapred.job.priority", HIGH); AppProps.setApplicationJarClass(properties, Main.class); FlowConnector flowConnector = new HadoopFlowConnector(properties); TextDelimited scheme = new TextDelimited(new Fields(key, value), true, "t"); scheme.setNumSinkParts(1); Tap sinkTap = new Hfs(scheme, outputPath, SinkMode.REPLACE);
  • 29. http://docs.cascading.org/cascading/2.5/userguide/html/ch09.html 9. Built-In Operations Identity Function Text Functions Regular Expression Operations Java Expression Operations Buffers http://docs.cascading.org/cascading/2.5/userguide/html/ch10.html 10. Built-in Assemblies AggregateBy (AverageBy, CountBy, SumBy, FirstBy) Rename Retain Unique http://docs.cascading.org/cascading/2.5/userguide/html/ch13.html 13. Cookbook Programming Tips
  • 31. public class DistinctCount extends BaseOperation<HashSet<String>> implements Aggregator<HashSet<String>> { public DistinctCount(Fields fieldDeclaration) { super(fieldDeclaration); } @Override public void start(FlowProcess flowProcess, AggregatorCall<HashSet<String>> aggregatorCall) { if (aggregatorCall.getContext() == null) { aggregatorCall.setContext(new HashSet<String>()); } else { aggregatorCall.getContext().clear(); } } @Override public void aggregate(FlowProcess flowProcess, AggregatorCall<HashSet<String>> aggregatorCall) { TupleEntry argument = aggregatorCall.getArguments(); HashSet<String> context = aggregatorCall.getContext(); context.add(argument.getTuple().toString()); } @Override public void complete(FlowProcess flowProcess, AggregatorCall<HashSet<String>> aggregatorCall) { aggregatorCall.getOutputCollector().add(new Tuple(aggregatorCall.getContext().size())); } }