狠狠撸

狠狠撸Share a Scribd company logo
Spark Streaming学习分享
DM Team
徐闻春
Spark如何并行化? 答疑解惑spark streaming介绍
三个问题
Spark的是怎么实现并行化计算的?
不同层次划分依据
? Job划分:action算子
? Stage划分:shuffle操作
? Task划分:分区数
层次划分关系
Spark的是怎么实现并行化计算的?
-- RDD
? 只读的分区的集合
? 对数据计算的函数
? 计算数据的位置
? 分区器
? 依赖的RDD信息
val textFile = sc.parallelize(Seq(1, 2, 3, 4, 5, 6),3)
Spark的是怎么实现并行化计算的?
val textFile = sc.textFile(args(1)) // 构造RDD
val result = textFile .flatMap(line => line.split(“s+”)) .map(word => (word, 1)) .reduceByKey(_ + _) // 计算逻辑
result.saveAsTextFile(args(2)) // 数据存储
Spark的是怎么实现并行化计算的?
RDD Objects DAGScheduler TaskScheduler Executor
Spark Streaming原理
? 以时间为单位将数据流切分成离散的数据单位
? 将每批数据看做RDD,使用RDD操作符处理
? 最终结果以RDD为单位返回
连续问题离散化处理
Spark Streaming运行架构
Driver
JobScheduler
Job Generator
BlockManagerMaster
Executor
BlockManager
Task
Receiver
ExecutorExecutor
Receiver
Task
BlockManager
Message
Queue
Sub Job
Put Block
Put Block
Receiver Tracker
getBlocksOfBatch
Block ID
JobSet
Clock
Tick
Spark Streaming源码走读
Spark Streaming的DStream操作
项目 transformation action output
operation
window updateStateByKey
RDD √ √ × × ×
DStream √ √ √ √ √
Spark Streaming常见操作——Window Operation
window(windowLength, slideInterval)
? windowLength: 窗口长度
? slideInterval: 窗口操作间隔
Spark Streaming常见操作——UpdateStateByKey
? updateStateByKey
? mapWithState
根据新进来的信息来更新过去维持的状态信息。
val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
val currentCount = currValues.sum
val previousCount = prevValueState.getOrElse(0)
Some(currentCount + previousCount)
}
val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
Spark Streaming常见操作——Output Operaion
就像RDD计算中的action操作会引起真正的执行,Dstream中Output Operatiion引起transformation真正的执行,
注意对于foreachRDD算子,里面必须有rdd的action操作,否则数据被接收后直接忽略不处理。
? print() 用于开发和debug
? saveAsHadoopFiles(prefix, [suffix]) 输出数据持久化
? foreachRDD(func) 用于将RDD执行任意操作,如写redis(func函数在driver执行)
dstream.foreachRDD {
rdd => rdd.foreachPartition {
partitionOfRecords => val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record)) connection.close()
}
}
Spark Streaming数据访问容错模式
? Receiver Based Approach
? Direct Approach
以kafka+spark streaming为例
Spark Streaming——基于Receiver
? 优点:使用WAL方式,能保证数据持久
化不丢失
? 缺点:重复写存储、依赖HDFS、仅能实现
at least once语义
Spark Streaming——基于Direct API
? 优点:没有重复写,不依赖hdfs,
使用offset,可以使用事务性
? 缺点:仅适用kafka,需要kafka有
足够存储
下一代Spark Streaming——Structured Streaming
新进来的数据就像新的行数据添加到表中,对流数据的操作就像操作一张表,数据抽象成没有边界的表。
下一代Spark Streaming——Structured Streaming
解疑
? 为什么有stage被skipped?
如果要计算的RDD已经被cache到内存,则对应的stage会被skip,所以skip是加速计算的现象,对结果没有影响
解疑
? 为什么coalesce没有shuffle,不是分区变化就有shuffle?
宽依赖:shuffle 窄依赖:没有shuffle,这就是coalesce算子作用
1
2
3
4
1
2
3
4
5
解疑
? join操作怎么避免shuffle?
——两个条件: 相同分区器,分区数相同
rdd1 = someRdd.reduceByKey(3)
rdd2 = someOtherRdd.reduceByKey(3)
rdd3 = rdd1.join(rdd2)
解疑
? shuffle次数越少越好吗?
不对,当单个分区数据量非常大时,造成资源利用不足和GC等问
题,使用repartition增大分区以提高并行度反而能更好利用CPU,
能加快效率。
解疑
? 了解常用HQL操作?
? 见zeppelin
解疑
? 在场同学的问题

More Related Content

Spark streaming经验介绍

Editor's Notes

  • #4: action : saveAsTextFile() collect() take() shuffle: by**
  • #7: 为什么有skip task?
  • #16: spark 1.2引进,所有接收的数据通过receivers写入HDFS或者S3中checkpoint目录。这样当driver失败后,executor中数据丢失后,可以通过checkpoint恢复。在更新zookeeper的offset前,receivers失败了,会导致不一致,则从zookeeper中保存的offsets开始重复消费
  • #17: Spark driver计算下个batch的offsets,指导executor消费对应的topics和partitions。消费Kafka消息,就像消费文件系统文件一样。如果中间出现数据处理失败,则使用事物回滚
  • #18: 目前厂辫补谤办厂迟谤别补尘颈苍驳是直接依赖搁顿顿,优化需要自己完成,使用顿补迟补厂别迟和顿补迟补蹿谤补尘别就可以利用罢耻苍驳蝉迟别苍引擎来进行优化。
  • #19: 而基于顿补迟补厂别迟和顿补迟补蹿谤补尘别处理,简化础笔滨,使用更高级的厂蚕尝处理,让我们忘记流的概念,使用将会越来越简单。从2.虫的设计来看,从更根本上,是为了满足更快贵补蝉迟别谤、完全容错蹿补耻濒迟-迟辞濒别谤补苍迟、完全的语义一致性别虫补肠迟濒测的要求。
  • #21: 有没shuffle主要看是父依赖还是窄依赖: 父依赖:父RDD的一个分区的数据全部到子RDD的一个分区(独身子女) 窄依赖:父RDD分区的数据分布到子RDD的多个分区(多胎) 剧烈分区变化(多到少),这时如果将shuffle设置为false,会出现并发不足,存储不够溢出可以讲shuffle设置为true。