狠狠撸
Submit Search
Hadoop基盤上のETL構築実践例 ~多様なデータをどう扱う?~
?
Download as PPTX, PDF
?
5 likes
?
1,639 views
Sotaro Kimura
Follow
2016/05/26 D&S Data Night vol.02
Read less
Read more
1 of 34
Download now
More Related Content
Hadoop基盤上のETL構築実践例 ~多様なデータをどう扱う?~
1.
Hadoop基盤上のETL構築実践例 ~多様なデータをどう扱う?~ 2016/05/26 D&S Data
Night vol.02 株式会社ドワンゴ 共通基盤開発部 数値基盤セクション 木村宗太郎(@kimutansk) https://www.flickr.com/photos/nanoprobe67/5761031999/
2.
自己紹介 ? 木村 宗太郎(Sotaro
Kimura) ? 株式会社ドワンゴ ? 共通基盤開発部 数値基盤セクション ? Hadoopを核としたビッグデータ基盤の開発運用 ? Twitter : @kimutansk 1
3.
アジェンダ 1. niconicoの概要 2. サービスの特徴 3.
ETLに求められる要件 4. niconicoにおけるデータ活用方針 5. niconicoのETLにおける課題と対処 6. 対処:内製バッチジョブフレームワーク 7. 対処実施結果 8. 今後の改善方針 2
4.
1. niconicoの概要 3 MAU 約900万 ID発行数
約5541万 有料会員 約256万 ユーザアクション - コンテンツ視聴 - コンテンツ投稿 - コメント - マイリスト - お気に入り - タグ編集 ユーザ アクション ファミリー サービス アクセス デバイス 動画 静画 マンガ 電子書籍 生放送 チャンネル アプリ ブロマガ 立体 大百科 市場 ニコニ広告 実況 コモンズ コミュニティ ニュース ニコナレ PC iOS Android SPモード Xbox One 3DS PS4 Vii U PS Vita 光Box+ PS Vita TV ビエラ ブラビア TV Box Fire TV LG TV Gear VR
5.
1. niconicoの概要 4 MAU 約900万 ID発行数
約5541万 有料会員 約256万 ユーザアクション - コンテンツ視聴 - コンテンツ投稿 - コメント - マイリスト - お気に入り - タグ編集 ユーザ アクション ファミリー サービス アクセス デバイス 動画 静画 マンガ 電子書籍 生放送 チャンネル アプリ ブロマガ 立体 大百科 市場 ニコニ広告 実況 コモンズ コミュニティ ニュース ニコナレ PC iOS Android SPモード Xbox One 3DS PS4 Vii U PS Vita 光Box+ PS Vita TV ビエラ ブラビア TV Box Fire TV LG TV Gear VR - 多様なユーザアクション - 多様なファミリーサービス - 多様なアクセスデバイス これらが同一ユーザID体系の上で行われる。
6.
2. サービスの特徴 ? 概要より、下記の特徴を持つことがわかる。 1.
事業の成長が早い 2. 事業の変化展開が速い 3. 施策の実施サイクルが早い 5
7.
3. ETLに求められる要件 6 スモールスタートで、 将来は巨大データへの対応 1. 事業の成長が早い データ活用で必要な要素
ETLに求められる要件 高速な分析 各事業内で自前でデータ活用 常にスケールアウト可能 高速なジョブ実行 使いやすい統一された データ形式
8.
3. ETLに求められる要件 7 サービスの仕様変更に 対する素早い追従 2. 事業の変化展開が早い データ活用で必要な要素
ETLに求められる要件 多様なデータ間の整合性確保 多様なデータを統一的に活用 仕様変更に容易に追従可能 バージョン/サービス間の 差分を吸収 データの一元管理
9.
3. ETLに求められる要件 8 定期的/リアルタイムの 様々なタイミングで分析 3. 施策の実施サイクルが早い データ活用で必要な要素
ETLに求められる要件 多様なツールの活用 リードタイムの短縮 適切なSLA設定 汎用的なインタフェースでの データ提供
10.
4. niconicoにおけるデータ活用方針 9 1. データを一か所に集約 ?
多様なファミリーサービスとデバイス展開 ? 多様なユーザアクション ? ユーザの行動を捉えるには横断的な分析が必要 2. 誰もがデータ分析 ? 分析部署だけで全部署のニーズを満たすことは不可能 ? 業務で必要な全社員が自前でデータ集計?分析 ? 分析できる環境やAPIの提供と教育体制の拡充 ? データの「民主化」(※) (※)データを一箇所に集めることでデータ活用の民主化が進んだ話 http://chezou.hatenablog.com/entry/2016/05/05/222046
11.
5. niconicoのETLにおける課題と対処 10 ? 求められる要素は多い 常にスケールアウト可能 高速なジョブ実行 使いやすい統一された データ形式 仕様変更に容易に追従可 能 バージョン/サービス間の 差分を吸収 データの一元管理 リードタイムの短縮 適切なSLA設定 汎用的なインタフェースでの データ提供
12.
5. niconicoのETLにおける課題と対処 11 ? 求められる要素は多い 常にスケールアウト可能 高速なジョブ実行 使いやすい統一された データ形式 仕様変更に容易に追従可 能 バージョン/サービス間の 差分を吸収 データの一元管理 リードタイムの短縮 適切なSLA設定 汎用的なインタフェースでの データ提供 -
「何か一つを行えばOK」にはならない - 対処を各々実施し、 積み上げていくことで徐々に改善が可能
13.
5. niconicoのETLにおける課題と対処 12 ? 対処(一部現在進行中)その1 常にスケールアウト可能 高速なジョブ実行 使いやすい統一された データ形式 常時Hadoop/Sparkジョブで 実施 Spark/Tezに移行 フォーマットを Tsv
Lzo > Parquet Snappy 非正規化して活用時の Joinを低減
14.
5. niconicoのETLにおける課題と対処 13 ? 対処(一部現在進行中)その2 仕様変更に容易に追従可能 データの一元管理 バージョン/サービス間の 差分を吸収 内製 バッチジョブフレームワーク データをカテゴリ分けして 集約し、ユーザ毎に権限管理 ジョブをテンプレートから生成 する内製ジョブ実行アプリ 共通ログ形式を複数提示し、 それ以外の管理レベル低減
15.
5. niconicoのETLにおける課題と対処 14 ? 対処(一部現在進行中)その3 リードタイムの短縮 適切なSLA設定 汎用的なインタフェースでの データ提供 リアルタイムクラスタ (fluentd
& Kafka)の構築 サービスレベルの 明確化&文書化 ジョブ実行用UI/RestAPIの 提供 JDBCインタフェースの 限定開放
16.
5. niconicoのETLにおける課題と対処 15 ? 今回のお話しする対象 仕様変更に容易に追従可能 データの一元管理 バージョン/サービス間の 差分を吸収 内製 バッチジョブフレームワーク データをカテゴリ分けして 集約し、ユーザ毎に権限管理 ジョブをテンプレートから生成 する内製ジョブ実行アプリ 共通ログ形式を複数提示し、 それ以外の管理レベル低減
17.
5. niconicoのETLにおける課題と対処 16 ? 今回のお話しする対象 Norikra fluentd &
Kafka scp Pig / Hive MapReduce Hive / Impala 内製バッチジョブフレームワーク MapReduce → Spark
18.
6. 対処:内製バッチジョブフレームワーク 17 ? 実施したいこと ?
仕様変更に容易に追従可能 ? バージョン/サービス間の差分を吸収 ? 現実 ? ファミリーサービス毎にログ内容が異なる ? アクセスデバイス毎にログ内容が異なる ? ユーザアクション毎にログ内容が大きく異なる 条件分岐で区切って作成! 結果???
19.
https://www.flickr.com/photos/kenyee/2817511001/
20.
6. 対処:内製バッチジョブフレームワーク 19 ? ログ変換は下記の流れに概ね抽象化可能 Input(Read) Decode Parse Convert/Filter Format Encode Output(Write) ファイル入力 圧縮ファイルの解凍 入力ファイルパース 変換/フィルタ 出力ファイルフォーマット ファイルの圧縮 ファイル出力
21.
6. 対処:内製バッチジョブフレームワーク 20 ? 各パーツをコンポーネント化し、設定で 差し替え/組み合わせ可能にすれば? ?
設定で処理が決まるので仕様変更に追従可能 ? データの名前やフォーマットの差分に対応可能 ? データの内容の差分も設定で吸収可能 ? MapReduce/Spark両バージョンは必要 ? MapReduceは遅いが設定が少なく安定 ? Sparkは早いが設定が多くデータ増加で不安定 ? 共通のドメインロジック部/ユーティリティは共用 ただし???
22.
6. 対処:内製バッチジョブフレームワーク 21 ? 下記のように実現 ?
設定は起動時の引数で指定(例) ? 実行メインクラス(App)は処理モデル毎 ? ファイルを読み込んで変換するのみのジョブ ? 重複除去を行うジョブ ? 他データとのJoinを行うジョブ ? MapSideJoin、ReduceSideJoinは別個作成 $ /opt/spark/bin/spark-submit --master yarn-client --class LogApp --conf spark.input_compression_codec=lzo --conf spark.converter_classes=TimeRangeFilterConverter, ReqTimeUnifyConverter ......
23.
6. 対処:内製バッチジョブフレームワーク 22 ? 起動時引数は下記のように組み合わせる No
フェーズ 設定項目例 1 Input HDFS上のパス(数は任意) 2 Decode 入力時の圧縮形式(uncompress、Lzo、BZip2 etc) 3 Parse 入力フォーマット(Ltsv、Tsv) 入力スキーマ定義パス(独自Json形式) 4 Convert &Filter 適用するConverter(数は任意) Converter例: 時刻範囲のフィルタ、必須カラム補足、時刻形式統一 … 5 Format 出力フォーマット(Parquet、Tsv) 出力スキーマ定義パス(AvroSchema) 6 Encode 出力時の圧縮形式(Snappy、Lzo etc) 7 Output HDFS上のパス 出力ファイル数
24.
6. 対処:内製バッチジョブフレームワーク 23 ? MapReduce/Spark共用のクラス例 ?
入力スキーマを読み込み&オブジェクト化 ? スキーマを用いて入力データをパース ? AvroSchemaを用いて出力データ生成 ? ローカル/HDFSの透過的なファイル読み込み ? 起動時引数のパーサ ? テスト用ユーティリティ ? 他ドメイン固有ロジッククラス
25.
6. 対処:内製バッチジョブフレームワーク 24 ? MapReduceの設定適用例 ?
Driverで Input/Decode/Format/Encode/Outputを適用 // 起動時引数から各種設定値を取得(各種変数に設定) ~~~~~~~~ ~~~~~~~~ ~~~~~~~~ // 実行用ジョブを生成 String jobName = conf.get(ArgKey.JOB_NAME, "mapreduce"); Job job = new Job(conf, jobName); // 出力ファイル数を設定 job.setNumReduceTasks(numReduceTasks); // Mapperクラスと関連クラスを設定 job.setMapperClass(mapperClass); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // Reducerクラスと関連クラスを設定 job.setReducerClass(reducerClass); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
26.
6. 対処:内製バッチジョブフレームワーク 25 ? MapReduceの設定適用例 ?
Driverで Input/Decode/Format/Encode/Outputを適用 // 入力時圧縮形式、入力パスを設定 job.setInputFormatClass(inputFormatClass); FileInputFormat.addInputPaths(job, inputPaths); // 出力時圧縮形式、出力パスを設定 job.setOutputFormatClass(outputFormatClass); FileOutputFormat.setOutputCompressorClass(job, compressOutputCodec); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputPath(job, new Path(outputPath));
27.
6. 対処:内製バッチジョブフレームワーク 26 ? MapReduceの設定適用例 ?
MapperでParser/Convert&Filter/Formatを適用 // 初期化メソッドでスキーマ、Parser、Converter群を生成 public void setup(Context context) throws InterruptedException, IOException { Configuration conf = context.getConfiguration(); // Converterを続けて適用する親Converterを生成 converter = new MapReduceMessageConverter(conf); // Parserを生成 this.lineParser = ParserFactory.createParser(conf); // 入力スキーマを生成 String inputSchemaStr = context.getConfiguration().get(ArgKey.INPUT_SCHEMA, ""); this.inputSchema = MappingSchemaCreator.createSchema(inputSchemaStr); // 出力スキーマを生成 String outputSchemaStr = context.getConfiguration().get(ArgKey.OUTPUT_SCHEMA, ""); this.outputShema = new Schema.Parser().parse(outputSchemaStr); }
28.
6. 対処:内製バッチジョブフレームワーク 27 ? MapReduceの設定適用例 ?
MapperでParser/Convert&Filter/Formatを適用 // MapメソッドでConverter、Formatを適用 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 入力データをParserでパース Map<String, String> recordMap = this.lineParser.parseWithSchema(value.toString(), this.inputSchema); // データに順次Converterを適用 recordMap = converter.convert(recordMap, conf); if (recordMap == null) { return; } // 出力スキーマを適用して出力 String output = AvroSchemaApplier.format(recordMap, this.outputFormat, this.outputShema); context.write(new Text(output), NullWritable.get()); }
29.
6. 対処:内製バッチジョブフレームワーク 28 ? Sparkの設定適用例 ?
実行アプリケーション(App)中で 各種起動時設定を読み込み、適用 // 起動時引数から各種設定値を取得(設定Mapオブジェクトを生成) val sparkConf = new SparkConf var customConf = SparkConfigAdapter.createBaseAppConf(sparkConf) // SparkJobContextを生成 val sparkContext = new SparkContext(sparkConf) val sqlContext = new SQLContext(sparkContext) val appConf = sparkContext.broadcast(customConf ) // パス、コーデックを指定して変換対象ファイルを読み込む val readLineRDD = SparkInputAdapter.readCompressedText(inputPaths, sparkContext, appConf.value) // 重複除去、出力ファイル数を設定 val distinctRDD = readLineRDD.distinct(appConf.value("output_file_num").toInt) // 読み込んだファイルを変換する val convertedRDD = parseConvertRDD(distinctRDD, appConf.value)
30.
6. 対処:内製バッチジョブフレームワーク 29 ? Sparkの設定適用例 ?
実行アプリケーション(App)中で 各種起動時設定を読み込み、適用 // Parquet出力の場合はDataFrameを介して出力、それ以外の場合はRDDのまま出力 if (appConf.value.getOrElse("output_file_format", "csv").toLowerCase().equals("parquet")) { val dataFrame = SparkMessageFormatter.convertMapRddToDataFrame(sqlContext, convertedRDD, appConf.value) SparkOutputAdapter.writeParquetDataFrame(outputPath, dataFrame) } else { val outputRDD = SparkMessageFormatter.convertMessageToText(convertedRDD, appConf.value) SparkOutputAdapter.writeCompressedText(outputPath, outputRDD, appConf.value) }
31.
7. 対処実施結果 30 ? 内製バッチジョブフレームワークに 組み込んだ結果 ?
上手くいった点 ? 複数パターンのデータ変換が引数のみで構成出来た ? 内製ジョブ実行アプリと組み合わせて一部の 設定値更新のみで複数パターンのジョブを発行できた ? 残っている課題 ? 引数としてタブ文字などの一部文字が指定できない ? 引数に複雑さが移行しているため、 引数と実行機能に対するドキュメントが別途必要
32.
8. 今後の改善方針 31 ? 今後は下記の改善対応を行う方針 ?
チューニング自動化 ? ジョブ実行前にリソース量/出力ファイル数を算出 ? カテゴリ毎に作成しているジョブの共通化 ? 起動時引数の指定方法の簡略化 ? ドキュメント整備自動化 ? 入力/出力スキーマファイルから ドキュメント自動生成
33.
まとめ 32 ? ETLに求められる要件は多い ? 何か一つを行えばいいということにはならない ?
対処を一つ一つ実施し、積み上げていく必要有 ? 「銀の弾丸」は欲しいが、多分ない。 ? 対処:内製バッチジョブフレームワーク ? ログ変換の各フェーズを起動時引数で構成 ? 柔軟性は増したが、 特殊文字やドキュメントなど課題は残る ? チューニングやドキュメントの自動化など 今後も改善対応を継続予定
34.
ETL改善の積み重ねは高く険しい We have yet
to finish improving ETL. https://www.flickr.com/photos/nanoprobe67/5746249953/
Download