狠狠撸
Submit Search
Spark in small or middle scale data processing with Elasticsearch
?
7 likes
?
7,586 views
C
chibochibo
Follow
1 of 33
Download now
Downloaded 10 times
More Related Content
Spark in small or middle scale data processing with Elasticsearch
1.
Spark in small
or middle scale data processing with Elasticsearch 2015-06-01 第10回elasticsearch勉強会
2.
自己紹介 島本 多可子(@chibochibo03) 株式会社ビズリーチ 元々はJavaを書いてましたが、ここ数年はScalaを 書いてます 一応、GitBucketのコミッタです https://github.com/takezoe/gitbucket
3.
执笔もやってます
4.
検索エンジン作ってます
5.
リリースまでの開発期間 ● おおよそ1年 ○ この間に何度もアーキテクチャを変更 ○
全部捨てて1から作り直すことも ● Elasticsearchではない時期もあった
6.
Elasticsearch vs CloudSearch ●
サービスのほとんどはAWS上に構築 ● 自然とCloudSearchという選択肢に ● 実際CloudSearchで動いていたこともあった ○ aws-cloudsearch-scalaというライブラリも作った
7.
Elasticsearchに決めた理由 ● 当時は機能不足が否めなかった ○ 当時はユーザ辞書が使えなかった、n-gramが使えな かったなど欲しい機能が不足していた ●
検索の細かいチューニングが肝になる ○ 将来的にもプラグイン含め自分達で手を入れられるもの を選択する必要があった
8.
検索部分のアーキテクチャ Front API JSON over
HTTP Search API Elasticsearch elastic-scala-httpclient フロントから検索部分を完 全分離 APIにすることで影響範囲 の限定および集約 クエリはテンプレート化
9.
ねらい ● フロントサイドと検索サイドでライフサイクルが 異なるでの別々にデプロイできるように ● クエリリライトや検索クオリティへの要件からフ ロントサイドを切り離したい ●
クエリの修正はテンプレートだけで完结したい
10.
elastic-scala-httpclient ● HTTPのRESTで通信 するScalaクライアント ● マッピングするScalaの クラスを自動生成する ジェネレータを提供
11.
(参考)こんなのも ● 接続方式はTransport Client
or Node Client ● クエリをDSLで書ける
12.
やってみて思ったこと ● クエリがJSONで辛い ○ クエリがバグっていると結果が真っ白に ○
検索結果(クオリティ)を改善していく管理機能を用意 ● 要件が安定するまでスキーマを何度も作り直 す。この作業が結構辛い ○ Alter文とか無いので最初からやり直し ○ 今後も変更はあるのでdynamicフィールドを採用へ
13.
データの投入
14.
元ネタ ● クロールで取得 ● ノンブロッキングIOを使うことでサーバのリソー スを効率良く活用 ※詳細はScalaの話なので割愛
15.
初期の課題 ● HTMLを解析する部分は何度も改善して徐々に 品質を上げていく必要があった ○ その度に再クロールなんてやってられない ●
データ量は増加の一途で想定量を考慮すると、 モノリシックでは太刀打ちできない ○ バッチ処理フレームワーク、分散処理がキーワード そこでSparkですよ!
16.
Apache Sparkとは ● Scala製の高速バッチ処理フレームワーク ●
Hadoopが苦手としているスループットとレイテ ンシの両立が必要な問題領域にアプローチする ために開発された ● 簡単なコードで分散処理が可能 ● Spark SQL、Spark Streaming、MLlibなど、 様々なサブプロジェクトが存在
17.
Elasticsearch for Hadoop ●
elasticsearch-hadoopに含まれるApache Spark Supportを使うことで、Sparkのストレー ジとしてElasticsearchを使える HadoopからESへ投入 Hadoop上でESから抽出
18.
投入部分のアーキテクチャ(初期) クロール Elasticsearch HTML インデキシング JSON Elasticsearch JSON クロールした情報を一 旦ESへ投入 データを加工して検索 用ESへ投入
19.
ねらい ● HTML解析やインデキシング部分だけを部分的 に実行できる ○ 初期の課題を解決する形 ●
複数のインデックスを結合できる ○ 時間のかかるデータを前もって作っておき、インデキシ ング時に_idで結合する
20.
インデックスを結合 val conf =
new SparkConf().setAll(Seq( ES_RESOURCE -> "job/sample", ES_QUERY -> "?q=*:*" )) val spark = new SparkContext(conf) import SparkContext._ val rdd = spark.esRDD.leftOuterJoin(spark.esRDD(Map( ES_RESOURCE -> "geo/sample", ES_QUERY -> "?q=*:*" ))).flatMap { case (_id, (_source, geo)) => geo.map { x => _source ++ Map("workLocation" -> x("workLocation")) } } jobインデックスに geoインデックスを外部結合
21.
最適なストレージ ● 「Sparkは高速」「Sparkの耐障害性は高い」とい うのは、ストレージとしてファイルを想定した話 ○ 今後ファイル以外も改善していきたいそう ●
ファイル以外のストレージを扱う場合は注意が 必要 ○ es-sparkには耐障害性のために必要なチェックポイント の実装が無い
22.
なぜ採用したか ● 辞書更新が最大の懸念 ○ 特に初期は辞書更新が頻繁に発生 ○
再インデックスを考慮したアーキテクチャ ● rawデータの更新?削除の必要性 ○ ファイルだと厳しい ● アーキテクチャの統一による運用のしやすさ
23.
● 辞書更新が最大の懸念 ○ 特に初期は辞書更新が頻繁に発生 ○
再インデックスを考慮したアーキテクチャ ● rawデータの更新?削除の必要性 ○ ファイルだと厳しい ● アーキテクチャの統一による運用のしやすさ 事件発生!! なぜ採用したか
24.
事件その1 タスクが分割されない mapのタスク数 = シャード数
25.
タスクが分割されない ● ESからプライマリシャードの情報を取得 ○ レプリカは使わない ●
各シャードが持つドキュメントを分割した入力値 として扱う ○ 当初は1シャードのため、シャード数を増やした
26.
事件その2 Sparkしすぎてシャードが壊れる リカバリに失敗して欠損
27.
シャードが欠損 ● Sparkによってものすごい勢いで書き込みを 行っているときにリカバリが走った場合、耐えき れず一部シャードが壊れる ● リカバリの負荷を下げることでコピーしながら書 き込みを受け付けられるように、以下の値を小 さくした ○
indices.recovery.max_bytes_per_sec ○ indices.recovery.concurrent_streams
28.
課題 ● データの鮮度が落ちる ○ 1日1回のバッチ反映では遅い ○
想定以上にデータの生存期間が短い ■ 自然淘汰にまかせてもよいのでは ■ 全件インデックスはいずれ量的限界 ● リソースに無駄がある ○ 瞬間的に高スペックが求められるが、遊んでいる時間が 結構ある
29.
Spark Streaming ● Sparkで擬似的にストリーム処理を行うライブラ リ ○
Sparkはあくまでバッチ処理で、処理を超分割してショー トバッチにして実行 ○ バッチとストリームで基盤を統一できるメリットがある ● クロールを一定間隔で行っているので、Spark Streamingのしくみとマッチ
30.
投入部分のアーキテクチャ(現在) クロール Elasticsearch JSON es-sparkはそのまま 利用可能 HTML SparkはS3の特定の バケットを監視 数分間隔にウィンド ウ幅を設定
31.
コード例 val spark =
new StreamingContext(new SparkConf, Minutes(1)) val dstream = spark.textFileStream("s3n://hoge/test") .flatMap(_ split "n") .map(...) .transform(... RDDに対する処理 ...) .foreachRDD { rdd => // saving data only if DStream is not empty if(!rdd.isEmpty) rdd.saveToEs(Map( ES_NODES -> "localhost", ES_RESOURCE -> "job/sample" )) } バッチと同様、RDDに対して saveToEsを実行
32.
効果 ● Elasticsearchは検索部分に特化 ○ 本来の役割へ ●
inputをファイルにしたことでMapタスク数が シャード数のような外的要因に依存しない形に ○ 収集して数分で表へ反映
33.
まとめ ● 最初は数台のクラスタで始めて、徐々にスケー ルアウトが比較的容易に可能 ○ ちょっとした検証にDocker使ったり ●
elasticsearch-sparkはbeta版だが利用には問 題ない ○ ただし、設定しても効かない項目があったりintegration 間で整合性が取れてない部分も
Download