狠狠撸

狠狠撸Share a Scribd company logo
Apache NiFiと
他プロダクトのつなぎ方
2016/07/27
Apache NiFi 勉強会
?データフローの自動化?
木村宗太郎(@kimutansk)
https://www.flickr.com/photos/neokratz/4913885458
自己紹介
? 木村 宗太郎(Sotaro Kimura)
? ビッグデータ界隈に生息する何でも屋
? バックエンドからフロントエンド、技術検証から運用、
ドキュメント書きまで色々
? ストリーム処理基盤を調べているうちに
NiFiにたどり着き、色々試しています。
? Twitter他 : @kimutansk
1
アジェンダ
1. NiFiと他プロダクトの連携手段
2. 外部データストアを使用する方法
3. Input?Output Portを使用する方法
4. Flinkとの接続サンプル
2
アジェンダ
1. NiFiと他プロダクトの連携手段
2. 外部データストアを使用する方法
3. Input?Output Portを使用する方法
4. Flinkとの接続サンプル
3
Apache NiFi自体の説明は
前発表にあるため、省きます。
4
1. NiFiと他プロダクトの連携手段
? NiFiを他プロダクトと連携させるには、
大きく2つの方法がある。
1. 外部データストアを使用する方法
2. Input?Output Portを使用する方法
5
2. 外部データストアを使用する方法
? データストアを介して他プロダクトと連携
? NiFiはデータストアにデータを保存
? 連携先プロダクトはデータストアから取得
センサー
データ
ログ
アプリ
履歴
データ発生元 NiFi データストア 連携先プロダクト
データストアに一度保存してそこから取得
6
2. 外部データストアを使用する方法
? NiFi、連携先共にコンポーネントが必要
? NiFi側の保持Processorは下記のように多彩
? AMQP
? JMS
? Kafka
? MQTT
? Cassandra
? Couchbase
? Elasticsearch
? etc...
7
2. 外部データストアを使用する方法
? 利点
? 並列化で容易にスケールが可能
? データストアの耐障害性を利用可能
? 欠点
? 管理するプロセスが増大し、複雑化
? NiFi、連携先双方で対応コンポーネントが必要
8
3. Input?Output Portを使用する方法
? NiFiの持つInput?Output Portを介して
他プロダクトと連携
? NiFiから連携先プロダクトが直接取得
センサー
データ
ログ
アプリ
履歴
データ発生元 NiFi 連携先プロダクト
9
3. Input?Output Portを使用する方法
? Input?Output Portとは?
? NiFiプロセス同士が通信するための機構
? Input PortにPushしてNiFiにデータ投入
? Output PortにPullしてNiFiからデータを取得
? 通信路の暗号化も可能(オプション)
10
3. Input?Output Portを使用する方法
? NiFiの画面上ではヘッダ部に存在
? NiFiプロセスで複数のPortを管理利用可能
ここからドラッグして使用
11
3. Input?Output Portを使用する方法
? NiFiプロセスで複数のPortを管理利用可能
接続先のNiFi情報
Input Port一覧 Output Port一覧
12
3. Input?Output Portを使用する方法
? 他プロダクトから用いるには?
? Site-To-Site Clientという
再利用可能なクライアントとしてNiFiから提供
? ※Java製
? https://github.com/apache/nifi/tree/master/nifi-commons/nifi-site-to-site-client
? これを用いることで任意のJavaプロセスが
NiFiと直接通信する処理を容易に記述可能
? 使用したExampleも色々ある
? Apache Flink
? Apache Apex
? etc...
13
3. Input?Output Portを使用する方法
? 下記のような構成で使用可能
? 複数のNiFiプロセスから取得?投入可能(?)
? 取得側がクラスタの場合も対応可能だが、
ロードバランスの方式は考える必要あり?
? ※現状GitHub上のコードでは1Client:1Host接続の実装しかない???
Java Program
Site-To-Site Client
NiFi Process 1
Output Port
NiFi Process 2
Output Port
14
3. Input?Output Portを使用する方法
? 利点
? NiFiと直接やり取りが可能で構成がシンプル
? Site-to-Siteクライアントを用いることで
幅広いプロダクトで使用可能
? 欠点
? 並列化への対応が不完全(?)
? 耐障害性はNiFiの個々プロセスに依存
? あくまでデータフローを構築するための機構で、
データを保持するための機構ではない。
15
4. Flinkとの接続サンプル
? 実際に接続した例で何ができるかを見る。
? 具体的にどういう構成になるのか?
? 下記のサンプルを基に説明
? https://github.com/bbende/nifi-streaming-examples
NiFi Process 1
NiFi Process 1
16
4. Flinkとの接続サンプル
? サンプルを構築した際の構成
Core NiFi
Input Port
Flink
StSClient
StSClient
Output Port
Input Port
Http Endpoint
Edge NiFi
StS Client
Http Client
ログ解析を行い、
結果を返信
ログを
Edgeから集約
解析結果を取得 集約?転送を実施
17
4. Flinkとの接続サンプル
? EdgeでのFlow定義
ログ読み込み
Coreに送信
解析結果取得
18
4. Flinkとの接続サンプル
? CoreでのFlow定義
Edgeの結果集約
Flinkの結果待受
Edgeからの待受
19
4. Flinkとの接続サンプル
? Flinkアプリケーションの構成
NiFi
Source
NiFi
Sink
LogLevel
FlatMap
LogLevel
Window
Counter
Dictionary
Builder
NiFiOutputPort
からデータ取得
NiFiInputPort
にデータ送信
ログメッセージから
ログレベル抽出
ログレベルを
Windowカウント
統計結果集計
20
4. Flinkとの接続サンプル
? Flinkアプリケーションの構築コード
// NiFiDataPacket(NiFi提供)を実行単位とするNiFi用Source生成し、実行環境に設定
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(sourceConfig);
DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource);
// ログレベル抽出Mapper生成
LogLevelFlatMap logLevelFlatMap = new LogLevelFlatMap(props.getLogLevelAttribute());
// ログレベルWindowCounter生成
LogLevelWindowCounter windowCounter = new LogLevelWindowCounter();
// 統計結果集計Builder生成
NiFiDataPacketBuilder<LogLevels> builder = new DictionaryBuilder(windowSize, rateThreshold);
// アプリケーション構築
streamSource.flatMap(logLevelFlatMap)
.timeWindowAll(Time.of(windowSize, TimeUnit.MILLISECONDS))
.apply(new LogLevelWindowCounter()).addSink(new NiFiSink<>(sinkConfig, builder));
// ストリーム処理アプリケーション起動
env.execute("WindowLogLevelCount");
21
4. Flinkとの接続サンプル
? Flinkアプリケーションの構築コード
// NiFiDataPacket(NiFi提供)を実行単位とするNiFi用Source生成し、実行環境に設定
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(sourceConfig);
DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource);
// ログレベル抽出Mapper生成
LogLevelFlatMap logLevelFlatMap = new LogLevelFlatMap(props.getLogLevelAttribute());
// ログレベルWindowCounter生成
LogLevelWindowCounter windowCounter = new LogLevelWindowCounter();
// 統計結果集計Builder生成
NiFiDataPacketBuilder<LogLevels> builder = new DictionaryBuilder(windowSize, rateThreshold);
// アプリケーション構築
streamSource.flatMap(logLevelFlatMap)
.timeWindowAll(Time.of(windowSize, TimeUnit.MILLISECONDS))
.apply(new LogLevelWindowCounter()).addSink(new NiFiSink<>(sinkConfig, builder));
// ストリーム処理アプリケーション起動
env.execute("WindowLogLevelCount");
Apexアプリケーションでも
ほぼ同じコード量で同等の機能が実現可能。
まとめ
? NiFiと他プロダクトと連携する手段は2つ
① 外部データストアを使用する方法
② Input?Output Portを使用する方法
? 利点欠点は両方ある
? Input?Output Portを利用するための
SiteToSiteClientがNiFiから提供
? FlinkやApexのサンプルが存在
? Flink、Apex共に、 SiteToSiteClientを使えば
数十行のコードでNiFiと接続し、
アプリケーション構築可能
22
Enjoy Apache NiFi !
https://www.flickr.com/photos/99408200@N05/11646500835

More Related Content

Apache NiFiと 他プロダクトのつなぎ方