狠狠撸

狠狠撸Share a Scribd company logo
Spark调优经验分享
2017.4.28
DM Team
徐闻春
一图概览
目的:
? 提高硬件资源利用率
? 减少网络传输开销
? 提高资源复用率
Spark集群优化——数据本地性
sql locality情况 执行时间(s)
Node local Rack local Any
sql1 214 232 0 140.2
326 122 0 26.4
sql6 52067 1492 635 139
49456 1392 142 41
49998 1380 53 36
数据本地性越好,数据网络传输越少,计算也就越快,所以要尽量避免数据跨节点和跨机架传输
数据locality相关参数:
? spark.locality.wait.process 默认3s
? spark.locality.wait.node 默认3s
? spark.locality.wait.rack 默认3s
Spark集群优化——存储格式选择
格式 读取数据量(KB) 存储大小(byte) 查询时间(s)
sql-10 text 1167.5 1164841 10.5
orc 1028.3 572035 10.9
parquet 1083 1093378 10.7
sql-11
text 41.7 42687 7.8
orc 33.1 25841 4.3
parquet 21.7 45112 4.9
从测试结果来看,orc格式相对text和parquet格式在存储大小和查询时间方面都有较好的性能。
相关参数:
spark.sql.hive.convertCTAS 默认false
spark.sql.sources.default 默认parquet
Spark参数优化——计算资源
1core
4G Mem
2core
6G Mem
2core
7G Mem
2core
8GMem
3core
6G Mem
3core
9GMem
sql1 37.7 30 44 29.5 78.6 30
sql6 61 122 107 164 377 144
sql7 1240 failed failed 1176 failed failed
core表示executor同时计算的task数,memory表示执行的内存,从测试结果看到比例过多过小都不合适,
内存调大会出现内存瓶颈,内存太小会出现作业失败;core太小导致并行度小计算慢,太大会引起disk
IO瓶颈。
备注:测试在spark.sql.shuffle.partitions=600情况下进行
相关参数:
--executor-memory 默认1G
--executor-cores 默认1core
Spark参数优化——并行度
sql NO. 200 partitions 400 partitions 600 partitions
sql1 71 50 37.7
sql2 96.6 95 61
sql3 138.5 126 47.3
sql4 154. 179 76.8
sql5 393 303.8 265
sql6 147.7 186 173
sql7 2467 1626 1240
sql8 232.7 37 40
sql9 127 136 113
相关参数:
spark.sql.shuffle.partitions 默认200
并行度增大数据能分配到更多分区,减少数据倾斜,默认值为200,不适合公司目前的数据量级,
从测试可看出调大分区数,能显著增加执行时间,而且能跑通原来跑不通的SQL。
Spark参数优化——offheap内存
spark.yarn.executor.overhead 200 partitions 400 partitions
512M failed 522
1024M 393 303.8
SQL 200partitions 400partitions
SQL4 288.7 199.1
SQL5 failed 522
overhead memory不同值调优记录
overhead memory=512M情况测试记录
offheap内存主要由系统内存和spark内部对象占用,当offheap内存很小的时候作业失败率很高,增大该
内存可以提高作业执行效率和通过率。
相关参数:
spark.executor.overhead.memory 默认executor内存的0.1
Spark参数优化——大小表join
对于两表join,且一张表是另一张表的2个数量级倍数大,可以考虑将小表broadcast到每一个
executor,来达到降低网络传输开销优化目标。
相关参数:spark.sql.autoBroadcastJoinThreshold 默认10M
spark.sql.autoBroadcastJoinThreshold 10M 64M
sql-5 384s 276s
Spark参数优化——其他
? 谓词下推
相关参数spark.sql.orc.filterPushdown 默认false
该参数对orc格式的表自动谓词下推,对有些语句能减少数据读取量,加快执行效率
Spark参数优化——shuffle过程
shuffle(sort based)
shuffle过程文件数和shuflle write的缓存:
shuffle file: core * R
shuffle write buffer: core * R * 32
相关参数:
spark.shuffle.manager 默认sort
spark.shuffle.file.buffer 默认32k
spark.reducer.maxSizeInFlight 默认48m
spark.shuffle.io.maxRetries 默认3
spark.shuffle.io.retryWait 默认5s
spark.shuffle.memoryFraction 默认0.2
spark.shuffle.sort.bypassMergeThreshold默认200
对于shuffle过程,测试验证,如果executor内存比较大如10G以上,提高spark.shuffle.file.buffer、spark.reducer.maxSizeInFlight、 spark.shuffle.
memoryFraction会提高shuffle效率,同时对于大作业,提高spark.shuffle.io.maxRetries和spark.shuffle.io.retryWait能提高大作业稳定运行概率。
Spark代码优化——RDD复用
错误用法
1)重复创建RDD
val rdd1 = sc.textFile(“hdfs://*:9000/hello.txt”)
rdd1.map(...)
val rdd2 = sc.textFile(“hdfs://*:9000/hello.txt”)
rdd2.reduce(...)
2)不复用RDD
JavaPairRDD<Long, String> rdd1= ...
JavaRDD<String> rdd2 = rdd1.map(...)
rdd1.reduceByKey(...)
rdd2.map(...)
正确用法
1)避免重复创建RDD
val rdd1 = sc.textFile(“hdfs://*:9000/hello.txt”)
rdd1.map(...)
rdd1.reduce(...)
2)尽量复用RDD,且多次使用的需要cache
JavaPairRDD<Long, String> rdd1= ....cache()
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
参考: http://tech.meituan.com/spark-tuning-basic.html
Spark代码优化——选择合适算子
1)减少使用shuffle算子
? 能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子
? Broadcast小数据与map数据join,避免shuffle
2)使用高性能算子
? 使用reduceByKey代替groupByKey( reduceByKey 在map端聚合数据)
? 使用mapPartitions代替map(减少重复函数调用的计算开销)
? 使用treeReduce代替reduce ( treeReduce 的计算更多在executor而不是driver)
? 使用foreachPartitions代替foreach (原理同mapPartitions)
? 使用filter之后使用coalesce操作(目的减少分区数,减少task启动开销)
? 使用repartitionAndSortWithinPartitions代替先repartition再sort操作
? 使用broadcast广播大变量,如100M以上(广播后变量是executor存一份,相对每个task内存一份减少
内存开销)
参考:http://stackoverflow.com/questions/32281417/understadning-treereduce-in-spark
Spark代码优化——shuffle算子并行度调优
? shuffle算子如distinct、reduceByKey、sortByKey、groupByKey、
cogroup都有一个并行度作为参数的重载方法。
? 这是Stage层面调优非常重要指标,通过观测UI执行数据情况,
调整合理每个stage的并行度对提高job效率有显著影响。
? 推荐使用shell模式逐行调参,推荐两个实用用法:
1) SizeEstimator.estimator(obj)方法估算对象大小
2) RDD.partitions.size得到RDD分区数
Spark代码优化——数据倾斜
数据倾斜有两种可能:
1)数据分区太少,导致个别分区数据过大,此时
增大分区即可解决。
2)极端情况,某些key非常大,增大分区也无效。
打印key分布情况的方法:
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
参考:http://tech.meituan.com/spark-tuning-pro.html
如何解决数据倾斜?
因为是key过度集中导致数据倾斜,处理方法主要有四种:
? 如果key对计算结果不重要,直接过滤
? broadcast小数据表,转reduce join为map join
? 对key加前缀打散key的分布,shuffle计算之后还原key再全局聚
合(仅适合聚合操作)
? 从RDD剥离倾斜的key加前缀进行shuffle计算,再与剩下RDD的计
算结果合并即可(适合join操作)
注:还有其他详细解决过程见参考资料。
Spark代码优化——优化数据结构
? Java中对象、字符串和集合类型比较消耗内存。
? 在不影响代码逻辑情况下,尽量使用基本数据结构(如int,long),减少对象使用,因为
对象封装开销较大。
Spark代码优化——使用DateSet API
? 为什么推荐使用DataSet API?
一句概括:Spark的发展过程就是通过让对数据操作更简单,执行优化更智能,类型检查更安全。
Spark代码优化——使用DateSet API
? RDD特点
1)JVM对象组成的分布式数据集
2)可处理结构化和非结构化数据
3)函数式转换
? RDD局限
1)用户自己优化程序
2)从不同数据源读数据困难
3)合并多个数据源困难
4)没有schema类型信息
高级API对数据操作更安全,更简便,效率更高。
? DataFrame特点
1)Row对象组成分布式集合
2)可处理结构化数据
3)通过Catalyst自动优化程序
4)Data source API
? DataFrame局限
1)运行时才进行类型检查
2)函数式编程风格
3)不能直接操作domain对象
? DataSet特点
1)扩展DataFrame
2)编译时类型检查
3)代码生成解码器,序列化更高效
Spark调优案例
sql7参数 stage1并发数 执行时间
2core,8Gmem,600partitions
1G executor.memoryOverhead
2160 failed
2core,12Gmem,1500partitions
1G executor.memoryOverhead
1280 1308
2core,12Gmem,3500partitions
1G executor.memoryOverhead
1280 1132
3core,12Gmem,3500partitions,
2G executor.memoryOverhead
1748 986
3core,8Gmem,3500partitions,
2G executor.memoryOverhead
2160 954
3core,8Gmem,5000partitions,
2G executor.memoryOverhead
2160 838.1
目前Spark的瓶颈——内存
https://0x0fff.com/spark-memory-management/
/databricks/memory-management-in-apache-spark
? Storage memory: 用于RDD缓存数据和临时序列化数据
? Execution memory: 用于shuffle,join,sort和aggregation操
作使用内存
? User memory: 用于保存用户代码中的数据结构和对象
? Reserved memory: 系统保留的内存,存储spark内部对象
Spark计算模型对内存敏感,对内存数据复用率比MR模型高(减少数据
落盘),内存不足会导致计算失败。
目前Spark的瓶颈——内存
Spark提高内存能做的事:
? 缓存更多数据,能提高查询cache命中率,也就是提高查询速度
? 提高offheap内存能应对大job,提高任务执行可靠性
? 对于RDD操作,适合map端join的实现(大表join小表,把小表broadcast)
? 提高shuffle的read和write buffer大小,提高shuffle效率
? 提高shuffle read过程数据聚合所需的memoryfraction比例,提升shuffle过程效率,减少频
繁读写磁盘
? 提高内存能存储更多RDD,对于排序和聚合操作能减少数据spill到磁盘
谢谢!

More Related Content

厂辫补谤办性能调优分享

Editor's Notes

  • #12: 有一个搁顿顿的数据格式是办别测-惫补濒耻别类型的,另一个是单惫补濒耻别类型的,这两个搁顿顿的惫补濒耻别数据是完全一样的。那么此时我们可以只使用办别测-惫补濒耻别类型的那个搁顿顿,因为其中已经包含了另一个的数据。对于类似这种多个搁顿顿的数据有重迭或者包含的情况,我们应该尽量复用一个搁顿顿,这样可以尽可能地减少搁顿顿的数量,从而尽可能减少算子执行的次数。
  • #13: treeReduce的工作更多放在executor,reduce算子在driver计算数据,对driver压力很大 算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。
  • #14: treeReduce的工作更多放在executor,reduce算子在driver计算数据,对driver压力很大 算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。