狠狠撸

狠狠撸Share a Scribd company logo
Is Spark Streaming based on
Reactive Streams?
2016-12-14 もう1つのHadoop Summit
Agenda
back pressureの重要性
Reactive Streamsとは
Spark Streamingのback pressure実装
自己紹介
島本 多可子(@chibochibo03)
株式会社ビズリーチ CTO室
Scala界隈に生息してます
GitBucketもよろしくお願いします
なぜSparkの話を?
2014年半ばからSparkへの取り組みを開始
事例としては小規模
多少のオーバーヘッドは分かった上で使う
小規模でも待望の機能があった
back pressure
back pressureとは
ストリーム処理にてデータのフロー制御を行う
過負荷であることをフィードバックする仕組み
自身の処理能力で処理できるデータ量を伝える
やばい!!
あと2つ!
もー
2つね。
back pressureの重要性
送信側で常に一定のデータ量を保つのは難しい
一時的な増加などの波はある
システム全体として動き続けることは重要
基本的には常にデータが流れている
瞬間的過負荷時の即時性は失われても、止まらないほうがよい
Sparkは1.5からback pressureに対応
spark.streaming.backpressure.enabled
デフォルトはfalse
有効にするにはtrueを設定
spark.streaming.receiver.maxRate
1秒あたりのレコード数の上限
back pressureで調整する際の上限になる
since 1.5
どのように実现しているのか
ストリーム
処理
back pressure
Reactive Streams
Reactive Streamsとは
非同期ストリーム処理の標準化を目指す
ScalaだとAkka Streamsが既にサポート
JDK 9でFlow APIとして導入
Spring 5はReactive対応に
back pressure付き
原理原則
Subscriber側でサイズを制限する
過負荷に直面するとSubscriberはback pressureのシグナルを送る
back pressureのシグナルは非同期であること
Dynamic Push-Pull
Subscriberが高速の場合はpush型
Publisherが高速の場合はpull型
仕様 - Flow API
public final class Flow {
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
仕様 - Flow API
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
}
流れ
SubscriberPublisher
Subscription
onSubscribe(Subscription)
流れ
SubscriberPublisher
request(1) 1個
Subscription
onSubscribe(Subscription)
流れ
SubscriberPublisher
onNext(data)
1個request(1)
Subscription
onSubscribe(Subscription)
流れ
SubscriberPublisher
1個
3個
onNext(data)
request(1)
request(3)
Subscription
onSubscribe(Subscription)
流れ
SubscriberPublisher
1個
3個
onNext(data)
request(1)
request(3)
onNext(data)
Subscription
onSubscribe(Subscription)
流れ
SubscriberPublisher
onComplete()
1個
3個
これで
全部!
onNext(data)
request(1)
request(3)
onNext(data)
Subscription
onSubscribe(Subscription)
SparkはReactive Streamsに遵守してる?
答えは、No!!
Though we will just take inspiration from some of the design
principles of the Reactive Streams specification, we do not
intend for Spark's internals to comply with this specification.
Reactive Streamsの設計方針からインスピレーションを受けていますが、私たち
はSparkの内部がこの仕様を遵守するつもりはありません。
要約すると???
"4.1 Back-pressure signaling". Spark Streaming back-pressure signaling.
https://docs.google.com/document/d/1ZhiP_yBHcbjifz8nJEyPJpHqxB1FT6s8-Zk
7sAfayQw/edit?usp=sharing, (参照 2016-12-11)
Sparkのback pressureはどうやってる?
きっかけは、StreamingListenerのonBatchCompleted
StreamingListenerトレイト
進行中のストリーム処理に関する情報を受け取るためのリスナー
onBatchCompletedメソッド
1つのminiバッチが完了したときに呼び出される
onBatchCompletedでどんな情報が取れる?
batchTime??miniバッチ時間
submissionTime??JobSchedulerのキューに送信された時間
processingStartTime, processingEndTime??処理開始?終了時間
schedulingDelay??スケジュール済?処理開始までの時間(待ち時間)
processingDelay??処理時間
totalDelay??スケジュール済?処理完了までの時間(総所要時間)
遅延がな
ければ0
正常ならバッチ間隔に
収まっている
onBatchCompletedでどんな情報が取れる?
batchTime??miniバッチ時間
submissionTime??JobSchedulerのキューに送信された時間
processingStartTime, processingEndTime??処理開始?終了時間
schedulingDelay??スケジュール済?処理開始までの時間(待ち時間)
processingDelay??処理時間
totalDelay??スケジュール済?処理完了までの時間(総所要時間)
この値をRateEstimatorに渡して新しいRateを計算する
仕組み - データ受け取り時
JobScheduler ReceiverSupervisor
BlockGenerator
Driver Executor
ReceiverReceiverTracker
ReceiverInputDStream
JobGenerator
仕組み - データ受け取り時
JobScheduler ReceiverSupervisor
BlockGenerator
Driver Executor
Receiver
push data
ReceiverTracker
ReceiverInputDStream
JobGenerator
ブロック間隔で
区切る
仕組み - データ受け取り時
JobScheduler ReceiverSupervisor
BlockGenerator
add new blocks
Driver Executor
Receiver
push data
ReceiverTracker
ReceiverInputDStream
JobGenerator
仕組み - データ受け取り時
JobScheduler ReceiverSupervisor
BlockGenerator
add new blocks
Driver Executor
Receiver
push data
ReceiverTracker
ReceiverInputDStream
JobGenerator
generateJob
バッチ間隔
ごと
仕組み - データ受け取り時
JobScheduler ReceiverSupervisor
BlockGenerator
add new blocks
Driver Executor
Receiver
push data
ReceiverTracker
ReceiverInputDStream
get blocks
JobGenerator
generateJob
ブロックを問
い合わせ
仕組み - データ受け取り時
JobScheduler ReceiverSupervisor
BlockGenerator
add new blocks
Driver Executor
Receiver
push data
ReceiverTracker
ReceiverInputDStream
get blocks
JobGenerator
submitJobSet
generateJob
ジョブとしてスケ
ジュール
仕組み - バッチ完了時
JobScheduler ReceiverSupervisor
BlockGenerator
Driver Executor
ReceiverTracker
ReceiverInputDStream
ReceiverRateController
仕組み - バッチ完了時
JobScheduler ReceiverSupervisor
BlockGenerator
Driver Executor
ReceiverTracker
ReceiverInputDStream
ReceiverRateController
RateLimiter
StreamingListener
RateController
リスナー
Receiverが受け取る
データ量を制御
仕組み - バッチ完了時
JobScheduler ReceiverSupervisor
BlockGenerator
Driver Executor
ReceiverTracker
ReceiverInputDStream
ReceiverRateController
sendRateUpdat
eonBatchCompleteで
新しいRateを計算
仕組み - バッチ完了時
JobScheduler ReceiverSupervisor
BlockGenerator
Driver Executor
ReceiverTracker
ReceiverInputDStream
ReceiverRateController
sendRateUpdat
e
RPC send
updateRate
Guavaの
RateLimiterに
セット
onBatchCompleteで
新しいRateを計算
ポイント
バッチ完了時をフックして新しいRateが決まる
待ち時間や処理時間などを考慮
新RateはReceiverSupervisorを介してBlockGeneratorに伝播
GuavaのRateLimiterを使ってpushするデータ量を制御
1件ごとにacquireを呼ぶ
レシーバ毎に毎秒許可する件数を超えるとwait
今后どうなる?
Reactive Streamsに遵守!?
https://issues.apache.org/jira/browse/SPARK-10420
Problem
back pressureに関連する情報がReceiverから見えない
Receiverからrequest(1)のようなシグナルを送れない
どこが問題?
ReceiverSupervisor
BlockGenerator
Executor
Receiver
RPC send
New Rate
updateRate
どこが問題?
ReceiverSupervisor
BlockGenerator
Executor
Receiver
RPC send
New Rate
updateRate
新しいRateを知らない!!
改善案
ReceiverSupervisor
BlockGenerator
Executor
Receiver
RPC send
New Rate
updateRateLimit
改善案
ReceiverSupervisor
BlockGenerator
Executor
Receiver
RPC send
New Rate
updateRateLimit
Receiverを
介す
Supervisorのメ
ソッド経由で伝播
Reactive Streamsベースの
Receiverを作成できる
まとめ
従来からあるSparkの機能をうまく使って実現している
JIRAは上がっているけど、優先度は低
Sparkのback pressure Reactive Streams

More Related Content

Is spark streaming based on reactive streams?