狠狠撸

狠狠撸Share a Scribd company logo
Spark in small or middle
scale data processing with
Elasticsearch
2015-06-01 第10回elasticsearch勉強会
自己紹介
島本 多可子(@chibochibo03)
株式会社ビズリーチ
元々はJavaを書いてましたが、ここ数年はScalaを
書いてます
一応、GitBucketのコミッタです
https://github.com/takezoe/gitbucket
执笔もやってます
検索エンジン作ってます
リリースまでの開発期間
● おおよそ1年
○ この間に何度もアーキテクチャを変更
○ 全部捨てて1から作り直すことも
● Elasticsearchではない時期もあった
Elasticsearch vs CloudSearch
● サービスのほとんどはAWS上に構築
● 自然とCloudSearchという選択肢に
● 実際CloudSearchで動いていたこともあった
○ aws-cloudsearch-scalaというライブラリも作った
Elasticsearchに決めた理由
● 当時は機能不足が否めなかった
○ 当時はユーザ辞書が使えなかった、n-gramが使えな
かったなど欲しい機能が不足していた
● 検索の細かいチューニングが肝になる
○ 将来的にもプラグイン含め自分達で手を入れられるもの
を選択する必要があった
検索部分のアーキテクチャ
Front API
JSON over HTTP
Search API
Elasticsearch
elastic-scala-httpclient
フロントから検索部分を完
全分離
APIにすることで影響範囲
の限定および集約
クエリはテンプレート化
ねらい
● フロントサイドと検索サイドでライフサイクルが
異なるでの別々にデプロイできるように
● クエリリライトや検索クオリティへの要件からフ
ロントサイドを切り離したい
● クエリの修正はテンプレートだけで完结したい
elastic-scala-httpclient
● HTTPのRESTで通信
するScalaクライアント
● マッピングするScalaの
クラスを自動生成する
ジェネレータを提供
(参考)こんなのも
● 接続方式はTransport Client or Node Client
● クエリをDSLで書ける
やってみて思ったこと
● クエリがJSONで辛い
○ クエリがバグっていると結果が真っ白に
○ 検索結果(クオリティ)を改善していく管理機能を用意
● 要件が安定するまでスキーマを何度も作り直
す。この作業が結構辛い
○ Alter文とか無いので最初からやり直し
○ 今後も変更はあるのでdynamicフィールドを採用へ
データの投入
元ネタ
● クロールで取得
● ノンブロッキングIOを使うことでサーバのリソー
スを効率良く活用
※詳細はScalaの話なので割愛
初期の課題
● HTMLを解析する部分は何度も改善して徐々に
品質を上げていく必要があった
○ その度に再クロールなんてやってられない
● データ量は増加の一途で想定量を考慮すると、
モノリシックでは太刀打ちできない
○ バッチ処理フレームワーク、分散処理がキーワード
そこでSparkですよ!
Apache Sparkとは
● Scala製の高速バッチ処理フレームワーク
● Hadoopが苦手としているスループットとレイテ
ンシの両立が必要な問題領域にアプローチする
ために開発された
● 簡単なコードで分散処理が可能
● Spark SQL、Spark Streaming、MLlibなど、
様々なサブプロジェクトが存在
Elasticsearch for Hadoop
● elasticsearch-hadoopに含まれるApache
Spark Supportを使うことで、Sparkのストレー
ジとしてElasticsearchを使える
HadoopからESへ投入
Hadoop上でESから抽出
投入部分のアーキテクチャ(初期)
クロール
Elasticsearch
HTML
インデキシング
JSON
Elasticsearch
JSON
クロールした情報を一
旦ESへ投入
データを加工して検索
用ESへ投入
ねらい
● HTML解析やインデキシング部分だけを部分的
に実行できる
○ 初期の課題を解決する形
● 複数のインデックスを結合できる
○ 時間のかかるデータを前もって作っておき、インデキシ
ング時に_idで結合する
インデックスを結合
val conf = new SparkConf().setAll(Seq(
ES_RESOURCE -> "job/sample",
ES_QUERY -> "?q=*:*"
))
val spark = new SparkContext(conf)
import SparkContext._
val rdd = spark.esRDD.leftOuterJoin(spark.esRDD(Map(
ES_RESOURCE -> "geo/sample",
ES_QUERY -> "?q=*:*"
))).flatMap { case (_id, (_source, geo)) =>
geo.map { x =>
_source ++ Map("workLocation" -> x("workLocation"))
}
}
jobインデックスに
geoインデックスを外部結合
最適なストレージ
● 「Sparkは高速」「Sparkの耐障害性は高い」とい
うのは、ストレージとしてファイルを想定した話
○ 今後ファイル以外も改善していきたいそう
● ファイル以外のストレージを扱う場合は注意が
必要
○ es-sparkには耐障害性のために必要なチェックポイント
の実装が無い
なぜ採用したか
● 辞書更新が最大の懸念
○ 特に初期は辞書更新が頻繁に発生
○ 再インデックスを考慮したアーキテクチャ
● rawデータの更新?削除の必要性
○ ファイルだと厳しい
● アーキテクチャの統一による運用のしやすさ
● 辞書更新が最大の懸念
○ 特に初期は辞書更新が頻繁に発生
○ 再インデックスを考慮したアーキテクチャ
● rawデータの更新?削除の必要性
○ ファイルだと厳しい
● アーキテクチャの統一による運用のしやすさ
事件発生!!
なぜ採用したか
事件その1
タスクが分割されない
mapのタスク数 = シャード数
タスクが分割されない
● ESからプライマリシャードの情報を取得
○ レプリカは使わない
● 各シャードが持つドキュメントを分割した入力値
として扱う
○ 当初は1シャードのため、シャード数を増やした
事件その2
Sparkしすぎてシャードが壊れる
リカバリに失敗して欠損
シャードが欠損
● Sparkによってものすごい勢いで書き込みを
行っているときにリカバリが走った場合、耐えき
れず一部シャードが壊れる
● リカバリの負荷を下げることでコピーしながら書
き込みを受け付けられるように、以下の値を小
さくした
○ indices.recovery.max_bytes_per_sec
○ indices.recovery.concurrent_streams
課題
● データの鮮度が落ちる
○ 1日1回のバッチ反映では遅い
○ 想定以上にデータの生存期間が短い
■ 自然淘汰にまかせてもよいのでは
■ 全件インデックスはいずれ量的限界
● リソースに無駄がある
○ 瞬間的に高スペックが求められるが、遊んでいる時間が
結構ある
Spark Streaming
● Sparkで擬似的にストリーム処理を行うライブラ
リ
○ Sparkはあくまでバッチ処理で、処理を超分割してショー
トバッチにして実行
○ バッチとストリームで基盤を統一できるメリットがある
● クロールを一定間隔で行っているので、Spark
Streamingのしくみとマッチ
投入部分のアーキテクチャ(現在)
クロール
Elasticsearch
JSON
es-sparkはそのまま
利用可能
HTML
SparkはS3の特定の
バケットを監視
数分間隔にウィンド
ウ幅を設定
コード例
val spark = new StreamingContext(new SparkConf, Minutes(1))
val dstream = spark.textFileStream("s3n://hoge/test")
.flatMap(_ split "n")
.map(...)
.transform(... RDDに対する処理 ...)
.foreachRDD { rdd =>
// saving data only if DStream is not empty
if(!rdd.isEmpty) rdd.saveToEs(Map(
ES_NODES -> "localhost",
ES_RESOURCE -> "job/sample"
))
}
バッチと同様、RDDに対して
saveToEsを実行
効果
● Elasticsearchは検索部分に特化
○ 本来の役割へ
● inputをファイルにしたことでMapタスク数が
シャード数のような外的要因に依存しない形に
○ 収集して数分で表へ反映
まとめ
● 最初は数台のクラスタで始めて、徐々にスケー
ルアウトが比較的容易に可能
○ ちょっとした検証にDocker使ったり
● elasticsearch-sparkはbeta版だが利用には問
題ない
○ ただし、設定しても効かない項目があったりintegration
間で整合性が取れてない部分も

More Related Content

Spark in small or middle scale data processing with Elasticsearch