狠狠撸

狠狠撸Share a Scribd company logo
Spark Outlook
jiessiecao@sohu-inc.com
12/18/2014 @9-冯诺依曼
Outlines
? Requirements
? Features
? How Spark Works?
? Spark Deployment Issues
? Spark Use Case
? Spark Programming
? Q&A
? Requirements
1. Iterative computing
(Sohu Shipin
Recommendation)
2. Before Spark
3. Other Requirements
Requirements: Iterative computing
Optimization Methods in Machine Learning
Iterative methods
SGD for for LR(Stochastic Gradient Descent)
Newton’s XXXX methods
big-data :
? sophisticated algorithm, but long computing time
? simple algorithm,but error-prone,need iterative
and convergence
E.g. Recommendation System(Sohu Shipin)
Possible Requirements:
1.User CF(realtime session user log analysis)
2. Item CF(tag,label based,clustering?)
3.LFM (SGD based latent fact model)
4.Graph-based (PersonalRank in User-Item bipartgraph)
Before Spark
Issues in iterative computing
1. Many iterative process
2. Cached dataset or mediate result
Hadoop的问题
Hadoop 仅支持1次iteration,且每次迭代都要reload
data
iterative mapreduce
twist,haloop等
优化不成熟,也受限于计算模型
Mahout 本来只支持Hadoop,现在支持Spark,并加入
DSL语言与scala交互
interactive computing
Hive + Distributed cache
厂辫补谤办调研串讲
Other Requirements
1. Offline Batch Process
Hadoop Ecosystem(Map-Reduce)
Dryad(Microsoft DAG workflow+scope)
2. Stream Computing
Twitter Storm(Spout/Bolt Model)
Yahoo S4(Actors Model)
Google Percolator,IBM SPC
Kafka,Flume(Pub-Sub Pipe)
3. Iterative Computing
Spark Ecosystem (RDD,DAG)
Iterative MapReduce(Haloop,Twist)
Petuum(SSP model)
4. Graph Computing(Iterative+Messaging,e.g. PageRank)
Google Pregel(BSP model)
Apache Giraph(BSP model)
Apache HAMA(BSP+Matrix)
http://spark-summit.org/wp-content/uploads/2013/10/Feng-Andy-SparkSummit-Keynote.pdf
Features
1. What Spark Provide?
2. WordCount In Spark
3. RDD model
4. Spark DBAS
Ecosystem
What Spark Provides
? http://spark.apache.org/ 官方特性介绍如下
? Speed (v.s. Hadoop, in memory 快100倍,on disk
快10倍,支持DAG)
? High level API(Scala,Java,Python,RDD算子)
? Generality (Combine SQL, streaming, and
complex analytics,MLlib,GraphX)
? Runs Everywhere(Yarn,Mesos,Standalone)
First Glance: WordCount(Hadoop v.s Spark)
public class WordCount {
16
17 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
18 private final static IntWritable one = new IntWritable(1);
19 private Text word = new Text();
20
21 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
22 String line = value.toString();
23 StringTokenizer tokenizer = new StringTokenizer(line);
24 while (tokenizer.hasMoreTokens()) {
25 word.set(tokenizer.nextToken());
26 context.write(word, one);
27 }
28 }
29 }
30
31 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
32
33 public void reduce(Text key, Iterable<IntWritable> values, Context context)
34 throws IOException, InterruptedException {
35 int sum = 0;
36 for (IntWritable val : values) {
37 sum += val.get();
38 }
39 context.write(key, new IntWritable(sum));
40 }
41 }
42
43 public static void main(String[] args) throws Exception {
44 Configuration conf = new Configuration();
45
46 Job job = new Job(conf, "wordcount");
47
48 job.setOutputKeyClass(Text.class);
49 job.setOutputValueClass(IntWritable.class);
50
51 job.setMapperClass(Map.class);
52 job.setReducerClass(Reduce.class);
53
54 job.setInputFormatClass(TextInputFormat.class);
55 job.setOutputFormatClass(TextOutputFormat.class);
56
57 FileInputFormat.addInputPath(job, new Path(args[0]));
58 FileOutputFormat.setOutputPath(job, new Path(args[1]));
59
60 job.waitForCompletion(true);
61 }
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" "))

.map(lambda word: (word, 1)) 
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
JavaRDD<String> file =
spark.textFile("hdfs://...");
JavaRDD<String> words = file.flatMap(new
FlatMapFunction<String, String>() {
public Iterable<String> call(String s) { return
Arrays.asList(s.split(" ")); }
});
JavaPairRDD<String, Integer> pairs =
words.mapToPair(new PairFunction<String,
String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1); }
});
JavaPairRDD<String, Integer> counts =
pairs.reduceByKey(new Function2<Integer,
Integer>() {
public Integer call(Integer a, Integer b) { return
a + b; }
});
counts.saveAsTextFile("hdfs://...");
Map-Reduce Model
<K1, V1> <K2, V2>
用户定义的Map函数
<K3, V3>
shuffle,然后经过用户定义的Reduce函数
<K2, V2>
shuffle,然后经过用户定义的Reduce函数
<K2, V2>
某些时候可以有combine过程,再执行一次用户定义的Reduce函数
MR 的缺点:
1. Shuffle 的性能。
Map 到 reduce 之间数据多
次需要 IO 操作。
2. 当有多个 MR 时,每
轮的 MR 之间需要将结果
写到 hdfs 上,才能迭代
3. 只有 map,reduce
二种计算模型,无法建立一
组 DAG 操作,来减少中间的
一些操作开销
RDD Model
? RDD:Resilient Distributed Datasets,Spark核心数据结构
? 逻辑上
? 一个大的分布式只读Array,含有多个Partition
? 两种创建方式:从文件系统输入或从父RDD转换得到
? 用两类操作算子进行变换:Transformation,Action
? 物理上
? 一个元数据结构,存储着逻辑Patition和物理Block(磁
盘或内存),Node等映射关系,以及RDD之前的依赖转换
关系
? 依赖关系:宽依赖,窄依赖
RDD Model
分布式存储HDFS,HBase,Cassandra
输入函数textFile,parallelize等
RDD_0
RDD_cache
RDD_2
filter,map等Transformation算子
RDD_1
cache算子
Scala集合和数据类型
saveAsTextFile等Action算子
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
共享变量
1. broadcast(hadoop
distributed cache)
2. accumulator(hadoop
counter)
BlockManager
? DiskStore
? MemoryStore
RDD Model
1个SparkApplication
根据Action的数目N变为
N个RDD DAG Graph
(即N个Job)
Stage以shuffle为边界进行划分(
如左图),
形成M个Stage DAG,
(每个stage一个task,
即M个task)
RDD Graph(Job)的划分依据
对于需要output或者转换为外部对象(scala)等为Action操作,没遇到Action操作,RDD DAG
Graph则为止
Stage(Task)的划分依据
根据RDD之间依赖关系分为二种依赖类型: Narrow 和 Wide(Shuffle Dependency)。
Narrow Dependency 指的是 child RDD 只依赖于 parent RDD(s)固定数量的 partition。
Wide Dependency 指的是 child RDD 的每一个 partition 都依赖于 parent RDD(s)所有 partition。
具体的理解为 Wide 依赖是需要结点之间进行 shuffle 操作的,其他的就是 Narrow。
所以 spark 将 shuffle 操作定义为 stage 的边界,有 shuffle 操作的 stage 叫做 ShuffleMapStage, 对
应的 task 叫做 ShuffleMapTask,其他的就是 ResultTask。
RDD Cache(TODO)
RDD persisting
1. persist() StorageLevel
2. cache() 其实是StorageLevel为内容的persist()
ark.apache.org/docs/latest/programming-guide.html#rdd-persiste
MEMORY_ONLY: default
OFF_HEAP (experimental):Tachyon
MEMORY_ONLY_2, MEMORY_AND_DISK_2: 2备份?(TODO)
Fault Tolerance(Lineage机制,Narrow Dependencies)
1. Lineage 机制
1. 粗粒度的重算方式(不像mysql 备份,binlog,hadoop直接依赖HDFS多备份等机制)
2. 两种依赖:
1. Narrow Dependencies
parent:child = N:1 (N>=1)
只需重算对应parent的子分区数据
partition0
partition1
partition2
partition3
RDD_0
partition0
partition1
partition2
partition3
RDD_1
partition0
partition1
partition2
partition3
RDD_0
partition0
partition1
RDD_1
Narrow Dependencies
? Shuffle Dependencies(Wide Dependencies)
parent:child = 1:N(N>2)
其中一个child失败要整个parent的所有分区
来一遍,有些是未丢失的child RDD数据,
有冗余的计算
Fault Tolerance(Lineage机制,Wide Dependencies)
Wide Dependencies
partition0
partition1
partition2
partition3
RDD_0
partition0
partition1
RDD_1
partition2
Fault Tolerance (Check Point)作为辅助
Lineage的问题:
1. 如果过长,重算开销很大(如pagerank)
2. Shuffle Dependency直接做check point收益更大
CheckPoint
1. 一致性问题(对spark没有这问题,RDD为只读的)
2. 本质就是写入HDFS(? //TODO,那standalone?)
How Spark
works?
1. Key Concepts
2. Key Components
3. Key Procedure
4. Deployment issues
5. Submit
Key Concepts
Driver Program
运行Spark Application的main函数,并创建SparkContext.
根据Driver 是否在集群中运行,分为Cluster模式和Client模式。负责应用的解析,切分Stage,并调度task到
executor执行,包含DAGSchduler等重要的组件对象,并且在最后负责汇总各子task结果,因此需要和work
节点的双向通信。
RDD Grpah
每次到一个Action时,将之前的所有动作形成RDD Graph,整个RDD Graph成为一个Job提交,一个Spark App
会有多个Action,即多个RDD Graph,因此一个spark app会有多个Job,而且对于多线程提交的job,是可能有同
时有多个Job的
Job
运行sparkContext的driver program 通过runJob方法向Spark提交Job.
Stage
每个Job根据RDD宽依赖关系(shuffle dependency)切分为多个Stage,一个RDD Graph会有多个Stage,每
个Stage包含相同的一组Task, 一组Task也叫TaskSet,task之间是并行的.
Task
一个分区对应一个Task,Task执行RDD对应的Stage中包含的算子,Task被封装后放入Executor创建的线程池
中执行,并发执行对当前节点中的RDD分区数据的计算。Task是spark中的最小的调度单位。其中一个RDD
包含在不同节点上的数据块,调度器会将包含操作的task分到指定的节点上,data locality. 其中一个
Application 只有一个TaskScheduler,它用于对该Application中所有action触发的Job中的taskSetManager进行
调度。
Key Procedure
Procedure in Standalone
Master or ClusterScheduler
RDD Graph1-N
d0.根据action为限,划分job和RDDGraph
Stage DAG1-N
Worker1
Executor1-N
BlockManager
BlockManager
ConnectionManager
BroadcastManager
CacheManager
Worker2
BlockManager
ConnectionManager
BroadcastManager
CacheManager
-1.启动时worker向master注册资源
SparkContext in Driver
ExecutorBackend
DAGScheduler
d1将RDD Graph分为StageDAG
TaskScheduler
SchedulerBackend
d2调度TasksetManager,并分发task
Executor1-N
ExecutorBackend
2. executorbackend直接注册到driver的schedulerBackend注册,同时有后续的心跳等
3.由TasksetManager分发task给executor执行
4. 执行完,有些需要返回结果或状态给driver, driver命令executor销毁
0.Driver向master注册application
1.master命令worker启动executor
ExecutorRunner ExecutorRunner
Akka轻量级异步事件网络框架(Actor)
Procedure in Spark-on-Yarn
Yarn Spark Client Yarn Map Reduce Client Yarn MPI Client
Resource Manager
Node Manager
Spark Application Master
ClusterScheduler
==Master in standalone
Node Manager
Container:
Worker
StandaloneExecutorBackend
Node Manager
spark submission
将app master部署好
申请资源
注册executor
Container:
Worker
StandaloneExecutorBackend
Spark
Production
(产物化)
Issues
? Scheduling
? Security
? Dashboard,Platform
Deployment Model:Standalone,Local,Yarn,Mesos
? Standalone model时, master和worker以deamon运行,不支持cluster模式提交
? 配合slaves文件,然后./sbin/start-all.sh 即可在master节点上启动master进程,在worker节点启动worker进程
? master启动
1. Spark.Secutiry
2. 支持zookeeper方式,elect leader
3. WebUI默认8080
4. 服务端口7077,注册Worker
? worker 启动
? Spark.Security
? webui 8081
? 主动连接master,完成注册
? local model 时,并没有启动master,只是根据local参数启动
临时worker thread运行
? ./bin/spark-submit --class org.apache.spark.examples.SparkPi lib/spark-examples-1.0.2-
hadoop2.2.0.jar --master local 2
? 适合调试使用,和deploymode无关,除控制台,无日志和历史
Deployment Model:Standalone,Local,Yarn,Mesos
? 和Standalone的区别是,使用Yarn和Mesos的资源
管理功能进行调度,如下:
? 1. 没有固定的master和worker节点,worker节点
资源是临时调度和注册的
? 2. Yarn模式支持将Driver跑在cluster中,而
standalone和Mesos目前不支持
? 3.在Yarn模式下,因为master webui和work
webui生命周期和job一致,因此无法利用spark
master和slave webui查看job history. spark job
history server提供一个方案,需要进一步深入
Deployment Issues: Scheduling
? Scheduling Requirements:(Spark v.s Hadoop)
? 相同点:
? 主要还是批处理作业的多用户多队列调度
? 都有across applications和within applications两层调度
? Across Application的调度由yarn,mesos等实现
? Within applications的调度(包括data locality),主要由hadoop或
spark本身实现.
? 不同点(TODO)
? Spark的有更多交互式分析任务(spark-shell),要求及时返回结果,
可能需要有单独高QoS的队列或额外的调度方式
? 特殊的ML等作业,多轮迭代和shuffle,IO以及Memory密集
Scheduling Across Applications
? Yarn模式调度的优点
? Yarn对细粒度的支持情况,需要进一步调研(TODO)
? spark本身已实现了yarn-client和对应的AM
? 包括HDFS集群上已有数据共享,计算动态扩充,以及运维等
? 也可复用kerberos安全等机制
? Mesos的调度比Yarn粒度更细。
? 支持coarse的静态资源配置,动态的调度,后者能复用空闲资源,提升利用率,对调度有较高要的可以
考虑
? 多用户支持,以及webui的资源等管理(TODO)
? Standalone模式,
? 自身调度比较简单,默认FIFO,Application使用节点所有当前可用资源,也可用参数限制最大值
? 也需要看具体调度需求,如multiuser等支持情况。(TODO)
Scheduling Across Jobs
? 由Pool类实现
? 默认:FIFO
? 0.84之后支持Fair模式
? 用户可以配置调度池属性,conf/fairscheduler.xml配置,
SparkContext来配置属性
? 1. schedulingMode FIFO 或者FAIR
? 2.权重,该调度池对其他调度池的权重
? 3.minShare,该调度池至少需要多少资源
Scheduling Across Stages&TaskSetManagers
1. Stage 生成和调度: 由DAGScheduler完成
对最后执行的根节点往上,宽度优先分析搜索,找到最开始执行的stage,对于stage的parent没有执行完的情况,则
不会提交该stage到pool中进行调度运行,
waitingStages
runningStages
failedStages
2. TaskSetManager调度(每一个被DAGScheduler提交的stage对应一个TaskSetManager,它们在pool中的调度方法如
下)
调度策略:
1. jobID小的先调度
2.同一个job的,taskID小的先调度
a)DAGSchduler submit tasks的时候分配任务执行节点
b)任务执行节点的分配策略Locality策略
1)cache()过的RDD,已经在内存中
2)如果能直接获取到执行地点,则直接读取执行。通常DAG中最原始的RDD有执行地点,比如HadoopRDD的
HDFS信息
3)获取第一个窄依赖的父亲RDD对应的分区执行
3. locality 的确定方式
1)RDD DAG的源头有HDFS等类型的分布式存储,他们内置的数据本地性决定(RDD中配置preferred location)数据
存储位置和分区的选取
2)每个其他非源头Stage,因为是中间过程,需要shuffle,所以地址由resourceoffer来确定,另外就是由Narrow Deps
的祖先stage确定最佳执行位置。
Deployment issues: Submit,deploymode,master
Your Program
sc = new SparkContext
f = sc.textFile(“hdfs://…”)
f.filter(…)
.count()
...
cluster model on yarn
Driver Program
on Worker
./bin/spark-submit 
--class <main-class>
--master <master-url> (local;local[K];local[*],spark://..;mesos://..;yarn-client;yarn-master)
--deploy-mode <deploy-mode> (cluster;yarn)
--conf <key>=<value> 
... # other options
<application-jar> 
[application-arguments]
用yarn.Client wrapper你程序的main
client model on mesos
Driver Program
on Client
client model on alone
Driver Program
on client
client model on yarn
Driver Program
on Client
由Yarn的resourceManager为spark
ApplicationMaster申请资源,
运行到一个节点上,再启动Driver Programe
Your Program
sc = new SparkContext
f = sc.textFile(“hdfs://…”)
f.filter(…)
.count()
...
用spark.deploy.Client wrapper
cluster model
Driver Program
on Standalone
cluster model on mess
Driver Program
on Mesos worker
直接由spark-submit在client
本地调用,启动driver,后面也是依赖
yarn-client和applicationMaster
Your Program
sc = new SparkContext
f = sc.textFile(“hdfs://…”)
f.filter(…)
.count()
...
Your Program
sc = new SparkContext
f = sc.textFile(“hdfs://…”)
f.filter(…)
.count()
...
直接由spark-submit在client
本地调用
直接由spark-submit在client
本地调用
p.s. driver program需要和worker的网络互通
Client
Worker Client Client Clien
Deployment Issues: Security(TODO)
Spark Security
Spark currently supports authentication via a shared secret. Authentication can be configured
? For Spark on YARN deployments, configuring spark.authenticate to true will automa
? For other types of Spark deployments, the Spark parameter spark.authenticate.secr
? IMPORTANT NOTE: The experimental Netty shuffle path (spark.shuffle.use.netty)
Deployment issues: Dashboard,Platform
1. sparkcontext创建时,4040端口,但生命周期是一个applic
1. job server(Ooyala)
2. spark-shell,复用spark context
3. Spark Streaming
4. Shark
2. master-webui 8080
3. work-webui 8081
4. jobhistory server 18080
其他:
Monitor:
http://spark-summit.org/wp-content/uploads/2013/10/Spark-Ops-Final.pdf
Web UI visual data:
Databricks workspace : notebooks,dashboards,job luncher
Future Work
? Spark Deployment Solutions
? Scheduling(with Yarn, mesos)
? Security (kerboros for standalone and mesos)
? Dashboard&Platform (job history and resource monitor)
? Spark Ecosystem Sharing
? Spark Programming Sharing
Spark Use
Case
? Spark DBAS
? Spark Usecase
? (summit 13-14)
? Tips&Tricks
? Future work
厂辫补谤办调研串讲
Case 1:Iterative computing
http://spark-summit.org/2013/
http://spark-summit.org/2014
Case 2: Real-Time Log Aggregation & Analysis
http://spark-summit.org/wp-content/uploads/2013/10/Spark_Summit_2013_Jason_Dai.pdf
1. 西班牙电信 实时数据处理
Kafka 做数据集成
数据预处理:使用storm
批处理:使用Cassandra + Spark
2. Spark in Taobao
a)Spark on Yarn
b)Spark Streaming
c)GraphX
Case 3: Other Cases
Tips & Tricks By twitter (2014.4)
遇到过的问题:(TODO 0.81版本后的更新)
1. Spilling Data to disk is a work in progress
2. OOM : task 太大
a) 使用split size 比HDFS Block要小得多
b) 如果不用caching, 减少spark.storage.MemoryFraction
c) 增加reducer的并行度
3. YARN仅仅支持静态资源划分,如何回收
4. Multi-tenancy 多租户情况的clean 和标准的方式
5. Long running SparkContext as a service: Spark job Server
6. Failure Mode, 需要重算,对小作业ok,对大作业不好
7. 主要是task failure不是executor faill
8. 使用spork
9. 主要是Spark on Yarn
Tips & Tricks By Sony (2014.4)
遇到的问题:
1. JVM issue: OOM,对executeMemory和spark.storage/shuffle.memoryFraction
2. Full GC
3. Assembly Jar Size
Tips& Tricks by ShareThrough
Tips& Tricks by Yahoo
http://spark-summit.org/wp-content/uploads/2013/10/Feng-Andy-SparkSummit-Keynote.pdf
Tricks& Tips on hadoop Streaming
Spark
Programming
? API(Scala,Python,Java)
? Future Work
k.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.p
https://spark.apache.org/docs/1.1.0/programming-guide.h
Summary Todo-List
集群部署和选型等(Priority 1)
1. Scheduling(standalone,yarn,mesos的详细优缺点,包括多租户,细粒度
等等)
2. Security(Yarn Kerberos, standalone和mesos下的shared-secrets),结合
调度等确定集群方式
3. 可行的metrics和monitor, web ui等
4. 关注tips&tricks,Bechmark测试,确定稳定和选用的spark版本
5. 部署使用中继续补充
性能特征和使用方法(Priority 2)
1. Spark programming
2. RDD Cache(MEMORY_ONLY_2, MEMORY_AND_DISK_2: 2备份?)
3. CheckPoints: 本质就是写入HDFS(? //那standalone呢,必配HDFS?)
4. Driver Program性能,SparkContext初始化开销
5. BlockManager的IO实现
6. 周边生态组件和场景
7. 其他组建和特性(通信、压缩等等)

More Related Content

厂辫补谤办调研串讲

  • 2. Outlines ? Requirements ? Features ? How Spark Works? ? Spark Deployment Issues ? Spark Use Case ? Spark Programming ? Q&A
  • 3. ? Requirements 1. Iterative computing (Sohu Shipin Recommendation) 2. Before Spark 3. Other Requirements
  • 4. Requirements: Iterative computing Optimization Methods in Machine Learning Iterative methods SGD for for LR(Stochastic Gradient Descent) Newton’s XXXX methods big-data : ? sophisticated algorithm, but long computing time ? simple algorithm,but error-prone,need iterative and convergence E.g. Recommendation System(Sohu Shipin) Possible Requirements: 1.User CF(realtime session user log analysis) 2. Item CF(tag,label based,clustering?) 3.LFM (SGD based latent fact model) 4.Graph-based (PersonalRank in User-Item bipartgraph)
  • 5. Before Spark Issues in iterative computing 1. Many iterative process 2. Cached dataset or mediate result Hadoop的问题 Hadoop 仅支持1次iteration,且每次迭代都要reload data iterative mapreduce twist,haloop等 优化不成熟,也受限于计算模型 Mahout 本来只支持Hadoop,现在支持Spark,并加入 DSL语言与scala交互 interactive computing Hive + Distributed cache
  • 7. Other Requirements 1. Offline Batch Process Hadoop Ecosystem(Map-Reduce) Dryad(Microsoft DAG workflow+scope) 2. Stream Computing Twitter Storm(Spout/Bolt Model) Yahoo S4(Actors Model) Google Percolator,IBM SPC Kafka,Flume(Pub-Sub Pipe) 3. Iterative Computing Spark Ecosystem (RDD,DAG) Iterative MapReduce(Haloop,Twist) Petuum(SSP model) 4. Graph Computing(Iterative+Messaging,e.g. PageRank) Google Pregel(BSP model) Apache Giraph(BSP model) Apache HAMA(BSP+Matrix)
  • 9. Features 1. What Spark Provide? 2. WordCount In Spark 3. RDD model 4. Spark DBAS Ecosystem
  • 10. What Spark Provides ? http://spark.apache.org/ 官方特性介绍如下 ? Speed (v.s. Hadoop, in memory 快100倍,on disk 快10倍,支持DAG) ? High level API(Scala,Java,Python,RDD算子) ? Generality (Combine SQL, streaming, and complex analytics,MLlib,GraphX) ? Runs Everywhere(Yarn,Mesos,Standalone)
  • 11. First Glance: WordCount(Hadoop v.s Spark) public class WordCount { 16 17 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { 18 private final static IntWritable one = new IntWritable(1); 19 private Text word = new Text(); 20 21 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 22 String line = value.toString(); 23 StringTokenizer tokenizer = new StringTokenizer(line); 24 while (tokenizer.hasMoreTokens()) { 25 word.set(tokenizer.nextToken()); 26 context.write(word, one); 27 } 28 } 29 } 30 31 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { 32 33 public void reduce(Text key, Iterable<IntWritable> values, Context context) 34 throws IOException, InterruptedException { 35 int sum = 0; 36 for (IntWritable val : values) { 37 sum += val.get(); 38 } 39 context.write(key, new IntWritable(sum)); 40 } 41 } 42 43 public static void main(String[] args) throws Exception { 44 Configuration conf = new Configuration(); 45 46 Job job = new Job(conf, "wordcount"); 47 48 job.setOutputKeyClass(Text.class); 49 job.setOutputValueClass(IntWritable.class); 50 51 job.setMapperClass(Map.class); 52 job.setReducerClass(Reduce.class); 53 54 job.setInputFormatClass(TextInputFormat.class); 55 job.setOutputFormatClass(TextOutputFormat.class); 56 57 FileInputFormat.addInputPath(job, new Path(args[0])); 58 FileOutputFormat.setOutputPath(job, new Path(args[1])); 59 60 job.waitForCompletion(true); 61 } val file = spark.textFile("hdfs://...") val counts = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...") file = spark.textFile("hdfs://...") counts = file.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a + b) counts.saveAsTextFile("hdfs://...") JavaRDD<String> file = spark.textFile("hdfs://..."); JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } }); JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer>() { public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile("hdfs://...");
  • 12. Map-Reduce Model <K1, V1> <K2, V2> 用户定义的Map函数 <K3, V3> shuffle,然后经过用户定义的Reduce函数 <K2, V2> shuffle,然后经过用户定义的Reduce函数 <K2, V2> 某些时候可以有combine过程,再执行一次用户定义的Reduce函数 MR 的缺点: 1. Shuffle 的性能。 Map 到 reduce 之间数据多 次需要 IO 操作。 2. 当有多个 MR 时,每 轮的 MR 之间需要将结果 写到 hdfs 上,才能迭代 3. 只有 map,reduce 二种计算模型,无法建立一 组 DAG 操作,来减少中间的 一些操作开销
  • 13. RDD Model ? RDD:Resilient Distributed Datasets,Spark核心数据结构 ? 逻辑上 ? 一个大的分布式只读Array,含有多个Partition ? 两种创建方式:从文件系统输入或从父RDD转换得到 ? 用两类操作算子进行变换:Transformation,Action ? 物理上 ? 一个元数据结构,存储着逻辑Patition和物理Block(磁 盘或内存),Node等映射关系,以及RDD之前的依赖转换 关系 ? 依赖关系:宽依赖,窄依赖
  • 14. RDD Model 分布式存储HDFS,HBase,Cassandra 输入函数textFile,parallelize等 RDD_0 RDD_cache RDD_2 filter,map等Transformation算子 RDD_1 cache算子 Scala集合和数据类型 saveAsTextFile等Action算子 val file = spark.textFile("hdfs://...") val counts = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...") 共享变量 1. broadcast(hadoop distributed cache) 2. accumulator(hadoop counter) BlockManager ? DiskStore ? MemoryStore
  • 15. RDD Model 1个SparkApplication 根据Action的数目N变为 N个RDD DAG Graph (即N个Job) Stage以shuffle为边界进行划分( 如左图), 形成M个Stage DAG, (每个stage一个task, 即M个task) RDD Graph(Job)的划分依据 对于需要output或者转换为外部对象(scala)等为Action操作,没遇到Action操作,RDD DAG Graph则为止 Stage(Task)的划分依据 根据RDD之间依赖关系分为二种依赖类型: Narrow 和 Wide(Shuffle Dependency)。 Narrow Dependency 指的是 child RDD 只依赖于 parent RDD(s)固定数量的 partition。 Wide Dependency 指的是 child RDD 的每一个 partition 都依赖于 parent RDD(s)所有 partition。 具体的理解为 Wide 依赖是需要结点之间进行 shuffle 操作的,其他的就是 Narrow。 所以 spark 将 shuffle 操作定义为 stage 的边界,有 shuffle 操作的 stage 叫做 ShuffleMapStage, 对 应的 task 叫做 ShuffleMapTask,其他的就是 ResultTask。
  • 16. RDD Cache(TODO) RDD persisting 1. persist() StorageLevel 2. cache() 其实是StorageLevel为内容的persist() ark.apache.org/docs/latest/programming-guide.html#rdd-persiste MEMORY_ONLY: default OFF_HEAP (experimental):Tachyon MEMORY_ONLY_2, MEMORY_AND_DISK_2: 2备份?(TODO)
  • 17. Fault Tolerance(Lineage机制,Narrow Dependencies) 1. Lineage 机制 1. 粗粒度的重算方式(不像mysql 备份,binlog,hadoop直接依赖HDFS多备份等机制) 2. 两种依赖: 1. Narrow Dependencies parent:child = N:1 (N>=1) 只需重算对应parent的子分区数据 partition0 partition1 partition2 partition3 RDD_0 partition0 partition1 partition2 partition3 RDD_1 partition0 partition1 partition2 partition3 RDD_0 partition0 partition1 RDD_1 Narrow Dependencies
  • 18. ? Shuffle Dependencies(Wide Dependencies) parent:child = 1:N(N>2) 其中一个child失败要整个parent的所有分区 来一遍,有些是未丢失的child RDD数据, 有冗余的计算 Fault Tolerance(Lineage机制,Wide Dependencies) Wide Dependencies partition0 partition1 partition2 partition3 RDD_0 partition0 partition1 RDD_1 partition2
  • 19. Fault Tolerance (Check Point)作为辅助 Lineage的问题: 1. 如果过长,重算开销很大(如pagerank) 2. Shuffle Dependency直接做check point收益更大 CheckPoint 1. 一致性问题(对spark没有这问题,RDD为只读的) 2. 本质就是写入HDFS(? //TODO,那standalone?)
  • 20. How Spark works? 1. Key Concepts 2. Key Components 3. Key Procedure 4. Deployment issues 5. Submit
  • 21. Key Concepts Driver Program 运行Spark Application的main函数,并创建SparkContext. 根据Driver 是否在集群中运行,分为Cluster模式和Client模式。负责应用的解析,切分Stage,并调度task到 executor执行,包含DAGSchduler等重要的组件对象,并且在最后负责汇总各子task结果,因此需要和work 节点的双向通信。 RDD Grpah 每次到一个Action时,将之前的所有动作形成RDD Graph,整个RDD Graph成为一个Job提交,一个Spark App 会有多个Action,即多个RDD Graph,因此一个spark app会有多个Job,而且对于多线程提交的job,是可能有同 时有多个Job的 Job 运行sparkContext的driver program 通过runJob方法向Spark提交Job. Stage 每个Job根据RDD宽依赖关系(shuffle dependency)切分为多个Stage,一个RDD Graph会有多个Stage,每 个Stage包含相同的一组Task, 一组Task也叫TaskSet,task之间是并行的. Task 一个分区对应一个Task,Task执行RDD对应的Stage中包含的算子,Task被封装后放入Executor创建的线程池 中执行,并发执行对当前节点中的RDD分区数据的计算。Task是spark中的最小的调度单位。其中一个RDD 包含在不同节点上的数据块,调度器会将包含操作的task分到指定的节点上,data locality. 其中一个 Application 只有一个TaskScheduler,它用于对该Application中所有action触发的Job中的taskSetManager进行 调度。
  • 23. Procedure in Standalone Master or ClusterScheduler RDD Graph1-N d0.根据action为限,划分job和RDDGraph Stage DAG1-N Worker1 Executor1-N BlockManager BlockManager ConnectionManager BroadcastManager CacheManager Worker2 BlockManager ConnectionManager BroadcastManager CacheManager -1.启动时worker向master注册资源 SparkContext in Driver ExecutorBackend DAGScheduler d1将RDD Graph分为StageDAG TaskScheduler SchedulerBackend d2调度TasksetManager,并分发task Executor1-N ExecutorBackend 2. executorbackend直接注册到driver的schedulerBackend注册,同时有后续的心跳等 3.由TasksetManager分发task给executor执行 4. 执行完,有些需要返回结果或状态给driver, driver命令executor销毁 0.Driver向master注册application 1.master命令worker启动executor ExecutorRunner ExecutorRunner Akka轻量级异步事件网络框架(Actor)
  • 24. Procedure in Spark-on-Yarn Yarn Spark Client Yarn Map Reduce Client Yarn MPI Client Resource Manager Node Manager Spark Application Master ClusterScheduler ==Master in standalone Node Manager Container: Worker StandaloneExecutorBackend Node Manager spark submission 将app master部署好 申请资源 注册executor Container: Worker StandaloneExecutorBackend
  • 26. Deployment Model:Standalone,Local,Yarn,Mesos ? Standalone model时, master和worker以deamon运行,不支持cluster模式提交 ? 配合slaves文件,然后./sbin/start-all.sh 即可在master节点上启动master进程,在worker节点启动worker进程 ? master启动 1. Spark.Secutiry 2. 支持zookeeper方式,elect leader 3. WebUI默认8080 4. 服务端口7077,注册Worker ? worker 启动 ? Spark.Security ? webui 8081 ? 主动连接master,完成注册 ? local model 时,并没有启动master,只是根据local参数启动 临时worker thread运行 ? ./bin/spark-submit --class org.apache.spark.examples.SparkPi lib/spark-examples-1.0.2- hadoop2.2.0.jar --master local 2 ? 适合调试使用,和deploymode无关,除控制台,无日志和历史
  • 27. Deployment Model:Standalone,Local,Yarn,Mesos ? 和Standalone的区别是,使用Yarn和Mesos的资源 管理功能进行调度,如下: ? 1. 没有固定的master和worker节点,worker节点 资源是临时调度和注册的 ? 2. Yarn模式支持将Driver跑在cluster中,而 standalone和Mesos目前不支持 ? 3.在Yarn模式下,因为master webui和work webui生命周期和job一致,因此无法利用spark master和slave webui查看job history. spark job history server提供一个方案,需要进一步深入
  • 28. Deployment Issues: Scheduling ? Scheduling Requirements:(Spark v.s Hadoop) ? 相同点: ? 主要还是批处理作业的多用户多队列调度 ? 都有across applications和within applications两层调度 ? Across Application的调度由yarn,mesos等实现 ? Within applications的调度(包括data locality),主要由hadoop或 spark本身实现. ? 不同点(TODO) ? Spark的有更多交互式分析任务(spark-shell),要求及时返回结果, 可能需要有单独高QoS的队列或额外的调度方式 ? 特殊的ML等作业,多轮迭代和shuffle,IO以及Memory密集
  • 29. Scheduling Across Applications ? Yarn模式调度的优点 ? Yarn对细粒度的支持情况,需要进一步调研(TODO) ? spark本身已实现了yarn-client和对应的AM ? 包括HDFS集群上已有数据共享,计算动态扩充,以及运维等 ? 也可复用kerberos安全等机制 ? Mesos的调度比Yarn粒度更细。 ? 支持coarse的静态资源配置,动态的调度,后者能复用空闲资源,提升利用率,对调度有较高要的可以 考虑 ? 多用户支持,以及webui的资源等管理(TODO) ? Standalone模式, ? 自身调度比较简单,默认FIFO,Application使用节点所有当前可用资源,也可用参数限制最大值 ? 也需要看具体调度需求,如multiuser等支持情况。(TODO)
  • 30. Scheduling Across Jobs ? 由Pool类实现 ? 默认:FIFO ? 0.84之后支持Fair模式 ? 用户可以配置调度池属性,conf/fairscheduler.xml配置, SparkContext来配置属性 ? 1. schedulingMode FIFO 或者FAIR ? 2.权重,该调度池对其他调度池的权重 ? 3.minShare,该调度池至少需要多少资源
  • 31. Scheduling Across Stages&TaskSetManagers 1. Stage 生成和调度: 由DAGScheduler完成 对最后执行的根节点往上,宽度优先分析搜索,找到最开始执行的stage,对于stage的parent没有执行完的情况,则 不会提交该stage到pool中进行调度运行, waitingStages runningStages failedStages 2. TaskSetManager调度(每一个被DAGScheduler提交的stage对应一个TaskSetManager,它们在pool中的调度方法如 下) 调度策略: 1. jobID小的先调度 2.同一个job的,taskID小的先调度 a)DAGSchduler submit tasks的时候分配任务执行节点 b)任务执行节点的分配策略Locality策略 1)cache()过的RDD,已经在内存中 2)如果能直接获取到执行地点,则直接读取执行。通常DAG中最原始的RDD有执行地点,比如HadoopRDD的 HDFS信息 3)获取第一个窄依赖的父亲RDD对应的分区执行 3. locality 的确定方式 1)RDD DAG的源头有HDFS等类型的分布式存储,他们内置的数据本地性决定(RDD中配置preferred location)数据 存储位置和分区的选取 2)每个其他非源头Stage,因为是中间过程,需要shuffle,所以地址由resourceoffer来确定,另外就是由Narrow Deps 的祖先stage确定最佳执行位置。
  • 32. Deployment issues: Submit,deploymode,master Your Program sc = new SparkContext f = sc.textFile(“hdfs://…”) f.filter(…) .count() ... cluster model on yarn Driver Program on Worker ./bin/spark-submit --class <main-class> --master <master-url> (local;local[K];local[*],spark://..;mesos://..;yarn-client;yarn-master) --deploy-mode <deploy-mode> (cluster;yarn) --conf <key>=<value> ... # other options <application-jar> [application-arguments] 用yarn.Client wrapper你程序的main client model on mesos Driver Program on Client client model on alone Driver Program on client client model on yarn Driver Program on Client 由Yarn的resourceManager为spark ApplicationMaster申请资源, 运行到一个节点上,再启动Driver Programe Your Program sc = new SparkContext f = sc.textFile(“hdfs://…”) f.filter(…) .count() ... 用spark.deploy.Client wrapper cluster model Driver Program on Standalone cluster model on mess Driver Program on Mesos worker 直接由spark-submit在client 本地调用,启动driver,后面也是依赖 yarn-client和applicationMaster Your Program sc = new SparkContext f = sc.textFile(“hdfs://…”) f.filter(…) .count() ... Your Program sc = new SparkContext f = sc.textFile(“hdfs://…”) f.filter(…) .count() ... 直接由spark-submit在client 本地调用 直接由spark-submit在client 本地调用 p.s. driver program需要和worker的网络互通 Client Worker Client Client Clien
  • 33. Deployment Issues: Security(TODO) Spark Security Spark currently supports authentication via a shared secret. Authentication can be configured ? For Spark on YARN deployments, configuring spark.authenticate to true will automa ? For other types of Spark deployments, the Spark parameter spark.authenticate.secr ? IMPORTANT NOTE: The experimental Netty shuffle path (spark.shuffle.use.netty)
  • 34. Deployment issues: Dashboard,Platform 1. sparkcontext创建时,4040端口,但生命周期是一个applic 1. job server(Ooyala) 2. spark-shell,复用spark context 3. Spark Streaming 4. Shark 2. master-webui 8080 3. work-webui 8081 4. jobhistory server 18080 其他: Monitor: http://spark-summit.org/wp-content/uploads/2013/10/Spark-Ops-Final.pdf Web UI visual data: Databricks workspace : notebooks,dashboards,job luncher
  • 35. Future Work ? Spark Deployment Solutions ? Scheduling(with Yarn, mesos) ? Security (kerboros for standalone and mesos) ? Dashboard&Platform (job history and resource monitor) ? Spark Ecosystem Sharing ? Spark Programming Sharing
  • 36. Spark Use Case ? Spark DBAS ? Spark Usecase ? (summit 13-14) ? Tips&Tricks ? Future work
  • 39. Case 2: Real-Time Log Aggregation & Analysis http://spark-summit.org/wp-content/uploads/2013/10/Spark_Summit_2013_Jason_Dai.pdf
  • 40. 1. 西班牙电信 实时数据处理 Kafka 做数据集成 数据预处理:使用storm 批处理:使用Cassandra + Spark 2. Spark in Taobao a)Spark on Yarn b)Spark Streaming c)GraphX Case 3: Other Cases
  • 41. Tips & Tricks By twitter (2014.4) 遇到过的问题:(TODO 0.81版本后的更新) 1. Spilling Data to disk is a work in progress 2. OOM : task 太大 a) 使用split size 比HDFS Block要小得多 b) 如果不用caching, 减少spark.storage.MemoryFraction c) 增加reducer的并行度 3. YARN仅仅支持静态资源划分,如何回收 4. Multi-tenancy 多租户情况的clean 和标准的方式 5. Long running SparkContext as a service: Spark job Server 6. Failure Mode, 需要重算,对小作业ok,对大作业不好 7. 主要是task failure不是executor faill 8. 使用spork 9. 主要是Spark on Yarn
  • 42. Tips & Tricks By Sony (2014.4) 遇到的问题: 1. JVM issue: OOM,对executeMemory和spark.storage/shuffle.memoryFraction 2. Full GC 3. Assembly Jar Size
  • 43. Tips& Tricks by ShareThrough
  • 44. Tips& Tricks by Yahoo http://spark-summit.org/wp-content/uploads/2013/10/Feng-Andy-SparkSummit-Keynote.pdf
  • 45. Tricks& Tips on hadoop Streaming
  • 48. Summary Todo-List 集群部署和选型等(Priority 1) 1. Scheduling(standalone,yarn,mesos的详细优缺点,包括多租户,细粒度 等等) 2. Security(Yarn Kerberos, standalone和mesos下的shared-secrets),结合 调度等确定集群方式 3. 可行的metrics和monitor, web ui等 4. 关注tips&tricks,Bechmark测试,确定稳定和选用的spark版本 5. 部署使用中继续补充 性能特征和使用方法(Priority 2) 1. Spark programming 2. RDD Cache(MEMORY_ONLY_2, MEMORY_AND_DISK_2: 2备份?) 3. CheckPoints: 本质就是写入HDFS(? //那standalone呢,必配HDFS?) 4. Driver Program性能,SparkContext初始化开销 5. BlockManager的IO实现 6. 周边生态组件和场景 7. 其他组建和特性(通信、压缩等等)