狠狠撸
Search
Submit Search
Apache NiFiと他プロダクトのつなぎ方
?
Download as PPTX, PDF
?
8 likes
?
5,683 views
Sotaro Kimura
2016/07/27 Apache NiFi 勉強会~データフローの自動化~ での発表資料です。
Read less
Read more
1 of 24
Download now
Downloaded 41 times
More Related Content
Apache NiFiと他プロダクトのつなぎ方
1.
Apache NiFiと 他プロダクトのつなぎ方 2016/07/27 Apache NiFi
勉強会 ?データフローの自動化? 木村宗太郎(@kimutansk) https://www.flickr.com/photos/neokratz/4913885458
2.
自己紹介 ? 木村 宗太郎(Sotaro
Kimura) ? ビッグデータ界隈に生息する何でも屋 ? バックエンドからフロントエンド、技術検証から運用、 ドキュメント書きまで色々 ? ストリーム処理基盤を調べているうちに NiFiにたどり着き、色々試しています。 ? Twitter他 : @kimutansk 1
3.
アジェンダ 1. NiFiと他プロダクトの連携手段 2. 外部データストアを使用する方法 3.
Input?Output Portを使用する方法 4. Flinkとの接続サンプル 2
4.
アジェンダ 1. NiFiと他プロダクトの連携手段 2. 外部データストアを使用する方法 3.
Input?Output Portを使用する方法 4. Flinkとの接続サンプル 3 Apache NiFi自体の説明は 前発表にあるため、省きます。
5.
4 1. NiFiと他プロダクトの連携手段 ? NiFiを他プロダクトと連携させるには、 大きく2つの方法がある。 1.
外部データストアを使用する方法 2. Input?Output Portを使用する方法
6.
5 2. 外部データストアを使用する方法 ? データストアを介して他プロダクトと連携 ?
NiFiはデータストアにデータを保存 ? 連携先プロダクトはデータストアから取得 センサー データ ログ アプリ 履歴 データ発生元 NiFi データストア 連携先プロダクト データストアに一度保存してそこから取得
7.
6 2. 外部データストアを使用する方法 ? NiFi、連携先共にコンポーネントが必要 ?
NiFi側の保持Processorは下記のように多彩 ? AMQP ? JMS ? Kafka ? MQTT ? Cassandra ? Couchbase ? Elasticsearch ? etc...
8.
7 2. 外部データストアを使用する方法 ? 利点 ?
並列化で容易にスケールが可能 ? データストアの耐障害性を利用可能 ? 欠点 ? 管理するプロセスが増大し、複雑化 ? NiFi、連携先双方で対応コンポーネントが必要
9.
8 3. Input?Output Portを使用する方法 ?
NiFiの持つInput?Output Portを介して 他プロダクトと連携 ? NiFiから連携先プロダクトが直接取得 センサー データ ログ アプリ 履歴 データ発生元 NiFi 連携先プロダクト
10.
9 3. Input?Output Portを使用する方法 ?
Input?Output Portとは? ? NiFiプロセス同士が通信するための機構 ? Input PortにPushしてNiFiにデータ投入 ? Output PortにPullしてNiFiからデータを取得 ? 通信路の暗号化も可能(オプション)
11.
10 3. Input?Output Portを使用する方法 ?
NiFiの画面上ではヘッダ部に存在 ? NiFiプロセスで複数のPortを管理利用可能 ここからドラッグして使用
12.
11 3. Input?Output Portを使用する方法 ?
NiFiプロセスで複数のPortを管理利用可能 接続先のNiFi情報 Input Port一覧 Output Port一覧
13.
12 3. Input?Output Portを使用する方法 ?
他プロダクトから用いるには? ? Site-To-Site Clientという 再利用可能なクライアントとしてNiFiから提供 ? ※Java製 ? https://github.com/apache/nifi/tree/master/nifi-commons/nifi-site-to-site-client ? これを用いることで任意のJavaプロセスが NiFiと直接通信する処理を容易に記述可能 ? 使用したExampleも色々ある ? Apache Flink ? Apache Apex ? etc...
14.
13 3. Input?Output Portを使用する方法 ?
下記のような構成で使用可能 ? 複数のNiFiプロセスから取得?投入可能(?) ? 取得側がクラスタの場合も対応可能だが、 ロードバランスの方式は考える必要あり? ? ※現状GitHub上のコードでは1Client:1Host接続の実装しかない??? Java Program Site-To-Site Client NiFi Process 1 Output Port NiFi Process 2 Output Port
15.
14 3. Input?Output Portを使用する方法 ?
利点 ? NiFiと直接やり取りが可能で構成がシンプル ? Site-to-Siteクライアントを用いることで 幅広いプロダクトで使用可能 ? 欠点 ? 並列化への対応が不完全(?) ? 耐障害性はNiFiの個々プロセスに依存 ? あくまでデータフローを構築するための機構で、 データを保持するための機構ではない。
16.
15 4. Flinkとの接続サンプル ? 実際に接続した例で何ができるかを見る。 ?
具体的にどういう構成になるのか? ? 下記のサンプルを基に説明 ? https://github.com/bbende/nifi-streaming-examples
17.
NiFi Process 1 NiFi
Process 1 16 4. Flinkとの接続サンプル ? サンプルを構築した際の構成 Core NiFi Input Port Flink StSClient StSClient Output Port Input Port Http Endpoint Edge NiFi StS Client Http Client ログ解析を行い、 結果を返信 ログを Edgeから集約 解析結果を取得 集約?転送を実施
18.
17 4. Flinkとの接続サンプル ? EdgeでのFlow定義 ログ読み込み Coreに送信 解析結果取得
19.
18 4. Flinkとの接続サンプル ? CoreでのFlow定義 Edgeの結果集約 Flinkの結果待受 Edgeからの待受
20.
19 4. Flinkとの接続サンプル ? Flinkアプリケーションの構成 NiFi Source NiFi Sink LogLevel FlatMap LogLevel Window Counter Dictionary Builder NiFiOutputPort からデータ取得 NiFiInputPort にデータ送信 ログメッセージから ログレベル抽出 ログレベルを Windowカウント 統計結果集計
21.
20 4. Flinkとの接続サンプル ? Flinkアプリケーションの構築コード //
NiFiDataPacket(NiFi提供)を実行単位とするNiFi用Source生成し、実行環境に設定 SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(sourceConfig); DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource); // ログレベル抽出Mapper生成 LogLevelFlatMap logLevelFlatMap = new LogLevelFlatMap(props.getLogLevelAttribute()); // ログレベルWindowCounter生成 LogLevelWindowCounter windowCounter = new LogLevelWindowCounter(); // 統計結果集計Builder生成 NiFiDataPacketBuilder<LogLevels> builder = new DictionaryBuilder(windowSize, rateThreshold); // アプリケーション構築 streamSource.flatMap(logLevelFlatMap) .timeWindowAll(Time.of(windowSize, TimeUnit.MILLISECONDS)) .apply(new LogLevelWindowCounter()).addSink(new NiFiSink<>(sinkConfig, builder)); // ストリーム処理アプリケーション起動 env.execute("WindowLogLevelCount");
22.
21 4. Flinkとの接続サンプル ? Flinkアプリケーションの構築コード //
NiFiDataPacket(NiFi提供)を実行単位とするNiFi用Source生成し、実行環境に設定 SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(sourceConfig); DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource); // ログレベル抽出Mapper生成 LogLevelFlatMap logLevelFlatMap = new LogLevelFlatMap(props.getLogLevelAttribute()); // ログレベルWindowCounter生成 LogLevelWindowCounter windowCounter = new LogLevelWindowCounter(); // 統計結果集計Builder生成 NiFiDataPacketBuilder<LogLevels> builder = new DictionaryBuilder(windowSize, rateThreshold); // アプリケーション構築 streamSource.flatMap(logLevelFlatMap) .timeWindowAll(Time.of(windowSize, TimeUnit.MILLISECONDS)) .apply(new LogLevelWindowCounter()).addSink(new NiFiSink<>(sinkConfig, builder)); // ストリーム処理アプリケーション起動 env.execute("WindowLogLevelCount"); Apexアプリケーションでも ほぼ同じコード量で同等の機能が実現可能。
23.
まとめ ? NiFiと他プロダクトと連携する手段は2つ ① 外部データストアを使用する方法 ②
Input?Output Portを使用する方法 ? 利点欠点は両方ある ? Input?Output Portを利用するための SiteToSiteClientがNiFiから提供 ? FlinkやApexのサンプルが存在 ? Flink、Apex共に、 SiteToSiteClientを使えば 数十行のコードでNiFiと接続し、 アプリケーション構築可能 22
24.
Enjoy Apache NiFi
! https://www.flickr.com/photos/99408200@N05/11646500835
Download