狠狠撸

狠狠撸Share a Scribd company logo
Hadoop基盤上のETL構築実践例
~多様なデータをどう扱う?~
2016/05/26 D&S Data Night vol.02
株式会社ドワンゴ
共通基盤開発部 数値基盤セクション
木村宗太郎(@kimutansk)
https://www.flickr.com/photos/nanoprobe67/5761031999/
自己紹介
? 木村 宗太郎(Sotaro Kimura)
? 株式会社ドワンゴ
? 共通基盤開発部 数値基盤セクション
? Hadoopを核としたビッグデータ基盤の開発運用
? Twitter : @kimutansk
1
アジェンダ
1. niconicoの概要
2. サービスの特徴
3. ETLに求められる要件
4. niconicoにおけるデータ活用方針
5. niconicoのETLにおける課題と対処
6. 対処:内製バッチジョブフレームワーク
7. 対処実施結果
8. 今後の改善方針
2
1. niconicoの概要
3
MAU 約900万
ID発行数 約5541万
有料会員 約256万
ユーザアクション
- コンテンツ視聴
- コンテンツ投稿
- コメント
- マイリスト
- お気に入り
- タグ編集
ユーザ
アクション
ファミリー
サービス
アクセス
デバイス
動画
静画
マンガ
電子書籍
生放送
チャンネル
アプリ
ブロマガ
立体
大百科
市場
ニコニ広告
実況
コモンズ
コミュニティ
ニュース
ニコナレ
PC
iOS
Android
SPモード
Xbox One
3DS
PS4
Vii U
PS Vita
光Box+
PS Vita TV
ビエラ
ブラビア
TV Box
Fire TV
LG TV
Gear VR
1. niconicoの概要
4
MAU 約900万
ID発行数 約5541万
有料会員 約256万
ユーザアクション
- コンテンツ視聴
- コンテンツ投稿
- コメント
- マイリスト
- お気に入り
- タグ編集
ユーザ
アクション
ファミリー
サービス
アクセス
デバイス
動画
静画
マンガ
電子書籍
生放送
チャンネル
アプリ
ブロマガ
立体
大百科
市場
ニコニ広告
実況
コモンズ
コミュニティ
ニュース
ニコナレ
PC
iOS
Android
SPモード
Xbox One
3DS
PS4
Vii U
PS Vita
光Box+
PS Vita TV
ビエラ
ブラビア
TV Box
Fire TV
LG TV
Gear VR
- 多様なユーザアクション
- 多様なファミリーサービス
- 多様なアクセスデバイス
これらが同一ユーザID体系の上で行われる。
2. サービスの特徴
? 概要より、下記の特徴を持つことがわかる。
1. 事業の成長が早い
2. 事業の変化展開が速い
3. 施策の実施サイクルが早い
5
3. ETLに求められる要件
6
スモールスタートで、
将来は巨大データへの対応
1. 事業の成長が早い
データ活用で必要な要素 ETLに求められる要件
高速な分析
各事業内で自前でデータ活用
常にスケールアウト可能
高速なジョブ実行
使いやすい統一された
データ形式
3. ETLに求められる要件
7
サービスの仕様変更に
対する素早い追従
2. 事業の変化展開が早い
データ活用で必要な要素 ETLに求められる要件
多様なデータ間の整合性確保
多様なデータを統一的に活用
仕様変更に容易に追従可能
バージョン/サービス間の
差分を吸収
データの一元管理
3. ETLに求められる要件
8
定期的/リアルタイムの
様々なタイミングで分析
3. 施策の実施サイクルが早い
データ活用で必要な要素 ETLに求められる要件
多様なツールの活用
リードタイムの短縮
適切なSLA設定
汎用的なインタフェースでの
データ提供
4. niconicoにおけるデータ活用方針
9
1. データを一か所に集約
? 多様なファミリーサービスとデバイス展開
? 多様なユーザアクション
? ユーザの行動を捉えるには横断的な分析が必要
2. 誰もがデータ分析
? 分析部署だけで全部署のニーズを満たすことは不可能
? 業務で必要な全社員が自前でデータ集計?分析
? 分析できる環境やAPIの提供と教育体制の拡充
? データの「民主化」(※)
(※)データを一箇所に集めることでデータ活用の民主化が進んだ話
http://chezou.hatenablog.com/entry/2016/05/05/222046
5. niconicoのETLにおける課題と対処
10
? 求められる要素は多い
常にスケールアウト可能
高速なジョブ実行
使いやすい統一された
データ形式
仕様変更に容易に追従可
能
バージョン/サービス間の
差分を吸収
データの一元管理
リードタイムの短縮
適切なSLA設定
汎用的なインタフェースでの
データ提供
5. niconicoのETLにおける課題と対処
11
? 求められる要素は多い
常にスケールアウト可能
高速なジョブ実行
使いやすい統一された
データ形式
仕様変更に容易に追従可
能
バージョン/サービス間の
差分を吸収
データの一元管理
リードタイムの短縮
適切なSLA設定
汎用的なインタフェースでの
データ提供
- 「何か一つを行えばOK」にはならない
- 対処を各々実施し、
積み上げていくことで徐々に改善が可能
5. niconicoのETLにおける課題と対処
12
? 対処(一部現在進行中)その1
常にスケールアウト可能
高速なジョブ実行
使いやすい統一された
データ形式
常時Hadoop/Sparkジョブで
実施
Spark/Tezに移行
フォーマットを
Tsv Lzo > Parquet Snappy
非正規化して活用時の
Joinを低減
5. niconicoのETLにおける課題と対処
13
? 対処(一部現在進行中)その2
仕様変更に容易に追従可能
データの一元管理
バージョン/サービス間の
差分を吸収
内製
バッチジョブフレームワーク
データをカテゴリ分けして
集約し、ユーザ毎に権限管理
ジョブをテンプレートから生成
する内製ジョブ実行アプリ
共通ログ形式を複数提示し、
それ以外の管理レベル低減
5. niconicoのETLにおける課題と対処
14
? 対処(一部現在進行中)その3
リードタイムの短縮
適切なSLA設定
汎用的なインタフェースでの
データ提供
リアルタイムクラスタ
(fluentd & Kafka)の構築
サービスレベルの
明確化&文書化
ジョブ実行用UI/RestAPIの
提供
JDBCインタフェースの
限定開放
5. niconicoのETLにおける課題と対処
15
? 今回のお話しする対象
仕様変更に容易に追従可能
データの一元管理
バージョン/サービス間の
差分を吸収
内製
バッチジョブフレームワーク
データをカテゴリ分けして
集約し、ユーザ毎に権限管理
ジョブをテンプレートから生成
する内製ジョブ実行アプリ
共通ログ形式を複数提示し、
それ以外の管理レベル低減
5. niconicoのETLにおける課題と対処
16
? 今回のお話しする対象
Norikra
fluentd
& Kafka
scp
Pig / Hive
MapReduce
Hive /
Impala
内製バッチジョブフレームワーク
MapReduce → Spark
6. 対処:内製バッチジョブフレームワーク
17
? 実施したいこと
? 仕様変更に容易に追従可能
? バージョン/サービス間の差分を吸収
? 現実
? ファミリーサービス毎にログ内容が異なる
? アクセスデバイス毎にログ内容が異なる
? ユーザアクション毎にログ内容が大きく異なる
条件分岐で区切って作成!
結果???
https://www.flickr.com/photos/kenyee/2817511001/
6. 対処:内製バッチジョブフレームワーク
19
? ログ変換は下記の流れに概ね抽象化可能
Input(Read)
Decode
Parse
Convert/Filter
Format
Encode
Output(Write)
ファイル入力
圧縮ファイルの解凍
入力ファイルパース
変換/フィルタ
出力ファイルフォーマット
ファイルの圧縮
ファイル出力
6. 対処:内製バッチジョブフレームワーク
20
? 各パーツをコンポーネント化し、設定で
差し替え/組み合わせ可能にすれば?
? 設定で処理が決まるので仕様変更に追従可能
? データの名前やフォーマットの差分に対応可能
? データの内容の差分も設定で吸収可能
? MapReduce/Spark両バージョンは必要
? MapReduceは遅いが設定が少なく安定
? Sparkは早いが設定が多くデータ増加で不安定
? 共通のドメインロジック部/ユーティリティは共用
ただし???
6. 対処:内製バッチジョブフレームワーク
21
? 下記のように実現
? 設定は起動時の引数で指定(例)
? 実行メインクラス(App)は処理モデル毎
? ファイルを読み込んで変換するのみのジョブ
? 重複除去を行うジョブ
? 他データとのJoinを行うジョブ
? MapSideJoin、ReduceSideJoinは別個作成
$ /opt/spark/bin/spark-submit --master yarn-client --class
LogApp --conf spark.input_compression_codec=lzo --conf
spark.converter_classes=TimeRangeFilterConverter,
ReqTimeUnifyConverter ......
6. 対処:内製バッチジョブフレームワーク
22
? 起動時引数は下記のように組み合わせる
No フェーズ 設定項目例
1 Input HDFS上のパス(数は任意)
2 Decode 入力時の圧縮形式(uncompress、Lzo、BZip2 etc)
3 Parse 入力フォーマット(Ltsv、Tsv)
入力スキーマ定義パス(独自Json形式)
4 Convert
&Filter
適用するConverter(数は任意)
Converter例:
時刻範囲のフィルタ、必須カラム補足、時刻形式統一 …
5 Format 出力フォーマット(Parquet、Tsv)
出力スキーマ定義パス(AvroSchema)
6 Encode 出力時の圧縮形式(Snappy、Lzo etc)
7 Output HDFS上のパス
出力ファイル数
6. 対処:内製バッチジョブフレームワーク
23
? MapReduce/Spark共用のクラス例
? 入力スキーマを読み込み&オブジェクト化
? スキーマを用いて入力データをパース
? AvroSchemaを用いて出力データ生成
? ローカル/HDFSの透過的なファイル読み込み
? 起動時引数のパーサ
? テスト用ユーティリティ
? 他ドメイン固有ロジッククラス
6. 対処:内製バッチジョブフレームワーク
24
? MapReduceの設定適用例
? Driverで
Input/Decode/Format/Encode/Outputを適用
// 起動時引数から各種設定値を取得(各種変数に設定)
~~~~~~~~
~~~~~~~~
~~~~~~~~
// 実行用ジョブを生成
String jobName = conf.get(ArgKey.JOB_NAME, "mapreduce");
Job job = new Job(conf, jobName);
// 出力ファイル数を設定
job.setNumReduceTasks(numReduceTasks);
// Mapperクラスと関連クラスを設定
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// Reducerクラスと関連クラスを設定
job.setReducerClass(reducerClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
6. 対処:内製バッチジョブフレームワーク
25
? MapReduceの設定適用例
? Driverで
Input/Decode/Format/Encode/Outputを適用
// 入力時圧縮形式、入力パスを設定
job.setInputFormatClass(inputFormatClass);
FileInputFormat.addInputPaths(job, inputPaths);
// 出力時圧縮形式、出力パスを設定
job.setOutputFormatClass(outputFormatClass);
FileOutputFormat.setOutputCompressorClass(job, compressOutputCodec);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
6. 対処:内製バッチジョブフレームワーク
26
? MapReduceの設定適用例
? MapperでParser/Convert&Filter/Formatを適用
// 初期化メソッドでスキーマ、Parser、Converter群を生成
public void setup(Context context) throws InterruptedException, IOException {
Configuration conf = context.getConfiguration();
// Converterを続けて適用する親Converterを生成
converter = new MapReduceMessageConverter(conf);
// Parserを生成
this.lineParser = ParserFactory.createParser(conf);
// 入力スキーマを生成
String inputSchemaStr = context.getConfiguration().get(ArgKey.INPUT_SCHEMA, "");
this.inputSchema = MappingSchemaCreator.createSchema(inputSchemaStr);
// 出力スキーマを生成
String outputSchemaStr = context.getConfiguration().get(ArgKey.OUTPUT_SCHEMA, "");
this.outputShema = new Schema.Parser().parse(outputSchemaStr);
}
6. 対処:内製バッチジョブフレームワーク
27
? MapReduceの設定適用例
? MapperでParser/Convert&Filter/Formatを適用
// MapメソッドでConverter、Formatを適用
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
// 入力データをParserでパース
Map<String, String> recordMap = this.lineParser.parseWithSchema(value.toString(), this.inputSchema);
// データに順次Converterを適用
recordMap = converter.convert(recordMap, conf);
if (recordMap == null) {
return;
}
// 出力スキーマを適用して出力
String output = AvroSchemaApplier.format(recordMap, this.outputFormat, this.outputShema);
context.write(new Text(output), NullWritable.get());
}
6. 対処:内製バッチジョブフレームワーク
28
? Sparkの設定適用例
? 実行アプリケーション(App)中で
各種起動時設定を読み込み、適用
// 起動時引数から各種設定値を取得(設定Mapオブジェクトを生成)
val sparkConf = new SparkConf
var customConf = SparkConfigAdapter.createBaseAppConf(sparkConf)
// SparkJobContextを生成
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
val appConf = sparkContext.broadcast(customConf )
// パス、コーデックを指定して変換対象ファイルを読み込む
val readLineRDD = SparkInputAdapter.readCompressedText(inputPaths, sparkContext, appConf.value)
// 重複除去、出力ファイル数を設定
val distinctRDD = readLineRDD.distinct(appConf.value("output_file_num").toInt)
// 読み込んだファイルを変換する
val convertedRDD = parseConvertRDD(distinctRDD, appConf.value)
6. 対処:内製バッチジョブフレームワーク
29
? Sparkの設定適用例
? 実行アプリケーション(App)中で
各種起動時設定を読み込み、適用
// Parquet出力の場合はDataFrameを介して出力、それ以外の場合はRDDのまま出力
if (appConf.value.getOrElse("output_file_format", "csv").toLowerCase().equals("parquet")) {
val dataFrame =
SparkMessageFormatter.convertMapRddToDataFrame(sqlContext, convertedRDD, appConf.value)
SparkOutputAdapter.writeParquetDataFrame(outputPath, dataFrame)
} else {
val outputRDD = SparkMessageFormatter.convertMessageToText(convertedRDD, appConf.value)
SparkOutputAdapter.writeCompressedText(outputPath, outputRDD, appConf.value)
}
7. 対処実施結果
30
? 内製バッチジョブフレームワークに
組み込んだ結果
? 上手くいった点
? 複数パターンのデータ変換が引数のみで構成出来た
? 内製ジョブ実行アプリと組み合わせて一部の
設定値更新のみで複数パターンのジョブを発行できた
? 残っている課題
? 引数としてタブ文字などの一部文字が指定できない
? 引数に複雑さが移行しているため、
引数と実行機能に対するドキュメントが別途必要
8. 今後の改善方針
31
? 今後は下記の改善対応を行う方針
? チューニング自動化
? ジョブ実行前にリソース量/出力ファイル数を算出
? カテゴリ毎に作成しているジョブの共通化
? 起動時引数の指定方法の簡略化
? ドキュメント整備自動化
? 入力/出力スキーマファイルから
ドキュメント自動生成
まとめ
32
? ETLに求められる要件は多い
? 何か一つを行えばいいということにはならない
? 対処を一つ一つ実施し、積み上げていく必要有
? 「銀の弾丸」は欲しいが、多分ない。
? 対処:内製バッチジョブフレームワーク
? ログ変換の各フェーズを起動時引数で構成
? 柔軟性は増したが、
特殊文字やドキュメントなど課題は残る
? チューニングやドキュメントの自動化など
今後も改善対応を継続予定
ETL改善の積み重ねは高く険しい
We have yet to finish improving ETL.
https://www.flickr.com/photos/nanoprobe67/5746249953/

More Related Content

Hadoop基盤上のETL構築実践例 ~多様なデータをどう扱う?~