狠狠撸

狠狠撸Share a Scribd company logo
ビッグじゃなくても使える
Spark☆Streaming
2016-05-21 JJUG CCC 2016 Spring
Agenda
● What’s Apache Spark
○ 概要、特徴
○ 落ち着いて考えよう
● Why Spark (Streaming)
○ 利用事例
○ 効果
● How To Spark (Streaming)
○ 使いドコロ
?今必要としていないあなたにも?
About me
島本 多可子(@chibochibo03)
株式会社ビズリーチ CTO室
Scala界隈に生息してます
GitBucketもよろしくお願いします
What’s Apache Spark
What's Apache Spark ?
● オープンソースの並列分散処理基盤
● MapReduceが苦手とする領域にアプローチするために開発さ
れた
○ 処理が複雑になるとすべての処理をMap?Reduceの形にして逐次実行では
オーバーヘッドが大きい
○ MapReduce処理の都度JVMの起動?終了がある
Sparkはこの問題を取り除いている
What's Apache Spark ?
● Unified Engine
○ 様々な用途のライブラリが豊富
"Generality". Apache Spark. http://spark.apache.org, (参照 2016-05-06)
What's Apache Spark ?
● SparkはScalaで開発
○ Javaユーザに優しいコード、Scala詳しくなくても読める !!
○ SparkのビルドはMaven !!
○ JAX-RSとかある !!
● 言語用のAPIもいろいろ
○ Scala、Java、Python、R
● 比較的シンプルなコードで分散処理が可能と言われる
(例)Word Count
val conf = new SparkConf().setAppName("sample batch")
val spark = new SparkContext(conf)
// 処理を定義
val rdd = spark.textFile("s3n://foo/bar/a.log")
.flatMap(_ split " ")
.map(_ -> 1)
.reduceByKey(_ + _)
// 実行
rdd foreach println
スペースで分割
文字とカウントのタプルに変換
文字ごとに集計
まるでコレクション操作のよう!!
Sparkの変革
2009年 2013年 2014年 2016年
ASFへ寄贈
UC BerkeleyのAMPLabで
開発スタート
1.0リリース
2.0
予定
▲DataFrame Since 1.3
▼Dataset Since 1.6
DataFrame Since 1.3
● built on SQL
● Dynamically typed
val sqlContext = new SQLContext(...)
val df: DataFrame = sqlContext.read.json("people.json")
df.filter(df("age") < 20)
Dataset Since 1.6
● Statically typed
val sqlContext = new SQLContext(...)
val ds: Dataset[Person] = sqlContext.read.json("people.json")
.as[Person]
ds.filter(_.age < 20)
case class Person(
name: String, age: Int)
Hadoopエコシステムのひとつ
● MapReduce
● HDFS
● YARN
● HBase
● Hive
● Mahout
● Sqoop
● ZooKeeper
● Flume
● Spark
● Storm
● Flink
● Tez
● Cassandra
● Kudo
● Impala
● Presto
● Drill
よく言われる特徴
● オンメモリ処理によるパーフォーマンス優位
● 耐障害性に優れている
落ち着いて考えよう
その耐障害性、本当に必須ですか?
● 分散システムとは
○ 故障耐性をどう保証するか
○ いくつかは故障している前提
● 大規模クラスタは故障が顕在化する
○ 通常運用で必要になる
● 数、数十なら「常にどれか壊れている」はない
○ 障害通知、復旧手段があれば十分では?
高速、本当ですか?
● 何をもって高速?
○ 既にHadoopを利用している場合は比較しやすい?
● チューニングが必要な高速化は難しい
○ JVMの設定、Sparkの設定
● コードの書き方次第で遅くなることも
○ 意識的????cacheの多用
○ 無意識的???shuffleによる強制書き出し
クラスタ、何台必要ですか?
● スタンバイではStandalone Clusterで構築
○ マスタ1台、ワーカ2台
● ワーカは強いインスタンスを使う
○ 物理数が少ないほうが運用が楽だから
● SPARK_WORKER_INSTANCESでプロセスを複数に
※ Standalone Cluster
リソース管理にSpark独自のものを利用することで、 Sparkだけでクラスタを組
む。
Sparkの対象領域は数百ノード以上
● 数台でも使うことは可能
● ただ、本質的なターゲットは数百台から
○ 相応の作り込みがされている
なぜ选んだの?
Why Spark (Streaming)
スタンバイ 日本最大級の求人検索エンジン
スタンバイの全体像
ユーザ
Web Server
API Server
Indexer
企業
Web Server
API Server
求人公開
Crawler
広告
媒体
Web Server
API Server
アーキテクチャ 全体概要
● サーバサイドはScala
○ Play2 + Slick、Akka
● フロントエンドはAngularJS、TypeScriptなど
● モバイルはSwift、Java(Android)
● マイクロサービスでAWS上に構築
○ 基本的にはAWSのサービスを使う
● 全文検索エンジンはElasticsearch
リリースまでの開発期間
2014年半ば???本格的に開始
● アーキテクチャの検討、ベースの構築
● その後はひたすら作る、作る、作る
2015年1月???テストリリース
● ダメなところは捨てて作り直し、改善?機能追加
2015年5月???本リリース
Sparkへの取り組みを開始
2009年 2013年 2014年 2016年
ASFへ寄贈
UC BerkeleyのAMPLabで
開発スタート
1.0リリース
2.0
予定
▲Sparkへの取り組みを開始
Sparkはどこ?
ユーザ
Web Server
API Server
Indexer
企業
Web Server
API Server
求人公開
Crawler
広告
媒体
Web Server
API Server
ここ
その昔 バッチ時代、そして失敗
その昔
Crawler
Indexer
貯めて貯めて…
別で作って…
一気に反映!
求人公開
企業
Why Spark ?
● 最初からスケールを前提としたアーキテクチャ
○ この手のものを限られた期間で作るには骨が折れる
● 最初はSparkだけでクラスタが組める手軽さ
● 他にも
○ Elasticsearch for Hadoopがあった
○ Scala製なので何かあったときに自分達で解析ができる
○ ScalaのAPIもあるのでコードも統一できる
使ってすごいと感じたこと
1. 手軽に、並列処理
コレクション操作をしているような感覚で書ける
val spark = new SparkContext(...)
spark.esRDD.flatMap { case (_id, _source) =>
... 処理 ...
}.saveToEs(Map(
ES_NODES -> "localhost"
))
使ってすごいと感じたこと
2. 異なるストレージ同士を簡単に結合
val spark = new SparkContext(...)
val rdd = spark.esRDD.leftOuterJoin(spark.esRDD(Map(
ES_RESOURCE -> s"other/sample",
ES_QUERY -> "?q=*:*"
))).flatMap { case (_id, (_source, other)) => // otherはOption型
... 処理 ...
}
問題多発!!
● 処理が並列にならない
○ シャード数 = 並列数だった
● SparkしすぎてElasticsearchが壊れた
○ 当時の構成にも問題はあった
● 終了求人、多発
○ 求人のライフサイクルが短い
バッチ、終焉を迎える
● 一定量貯めて反映、では遅い
● 取った求人はリアルタイムに検索結果へ
随時発生するデータを蓄積せずに
リアルタイムに処理する基盤が必要
Spark Streamingへ
2009年 2013年 2014年 2016年
ASFへ寄贈
UC BerkeleyのAMPLabで
開発スタート
1.0リリース
2.0
予定
▲Sparkへの取り組みを開始
▼Spark Streamingへ
現在 ストリーム到来
現在
Crawler
求人公開
企業API
Indexer
AD
Why Spark Streaming ?
● Sparkのメリットをそのまま踏襲
○ Spark自体のインフラ構成は変わらない
● 移行コストの低さ
○ バッチのときのコードを一部変更するだけ
● 1.4でKafkaとKinesisのサポートが強化
○ リトライ含めKinesisからの読み込み、チェックポイントなど
Spark Streamingは擬似的ストリーム
● Sparkはあくまでバッチ処理
● 処理を細分化して実行
○ miniバッチの連続でストリーム処理のように見せている
時間の流れ
data data data data data data
miniバッチ miniバッチ miniバッチ
どのくらい移行が簡単か
val spark = new SparkContext(conf)
val rdd = spark.textFile("s3n://foo/bar/a.
log")
.flatMap(_ split " ")
.map(_ -> 1)
.reduceByKey(_ + _)
// 実行
rdd foreach println
val spark = new StreamingContext(
conf, Seconds(5))
val dstream = spark.textFileStream("s3n:
//foo/bar")
.flatMap(_ split " ")
.map(_ -> 1)
.reduceByKey(_ + _)
// 実行
dstream.print()
// 開始
spark.start
spark.awaitTermination
バッチ ストリーム
書き込み先が複数でも手軽 at first
val dstream = spark.textFileStream("s3n://foo/bar")
.flatMap(_ split " ")
// elasticsearchへwrite
dstream.map(...).foreachRDD { rdd => rdd.saveToEs(...) }
書き込み先が複数でも手軽 and then
val dstream = spark.textFileStream("s3n://foo/bar")
.flatMap(_ split " ").cache
// elasticsearchへwrite
dstream.map(...).foreachRDD { rdd => rdd.saveToEs(...) }
// ファイルへwrite
dstream.filter(...).saveAsTextFiles(...)
How To Spark (Streaming)
Spark Streamingの使いドコロ
● よりリアルタイムを求めるもの
○ 集計処理
■ 「今何人みてます」「今予約がありました」
■ ランキング、レコメンド etc
● バッチとのハイブリッド
○ 定期メンテナンスによるストリーム停止時
■ 貯まったデータをバッチで投入し、再開
リアルタイム、べつに
業務アプリの場合、それほどリアルタイムを求めない
それ、夜間バッチで
いいじゃん
バッチでデータが増えると???
夜間バッチが夜間に
終わらない あるある
並列化、高速化が必要になる
作るにしても何かしら
工夫が必要
Sparkをバッチ処理で使おう!!
要件は時間とともに変化
● Sparkなら後でストリームへの移行コストが低い
○ 丸ごと作り直す必要がない
機能要件???よりリアルタイムを求められるようになった
非機能要件??リソースを有効活用したい
Sparkはお得感がある
● 様々な選択ができるメリット
● 変化に対応しやすい、投資のしがいがある
● Unified Engine
Conclusion
わかった上で、どんどん使おう!
● Sparkの特性は理解しよう
● わかった上で、でも使いたいと思うメリットがある
○ 比較的シンプルなコードで分散処理が書ける
○ 様々な泥臭い処理をSparkが吸収してくれている
○ minimumで始められて、かつスケールが可能
● ちゃんとお世話してあげましょう
○ 特に、軌道に乗るまで、バージョンアップ

More Related Content

ビッグじゃなくても使えるSpark Streaming