狠狠撸

狠狠撸Share a Scribd company logo
Amazon Kinesisの紹介 
と使いドコロ 
アマゾンデータサービスジャパン株式会社 
パートナーソリューションアーキテクト 
榎並 利利晃
?自?己紹介 
? 名前 
– 榎並 利利晃(えなみ としあき) 
– toshiake@amazon.co.jp 
– @ToshiakiEnami 
? 役割 
– パートナーソリューションアーキテクト 
– 主にエマージングパートナー様を担当 
? 好きなAWSのサービス 
– Amazon Kinesis 
– Amazon DynamoDB
AWSでの例例:迅速かつ正確なメータリング情報の集計 
S3 
Process 
Submissions 
Store 
Batches 
Process 
Hourly w/ 
Hadoop 
Clients 
Submitting 
Data 
Data 
Warehouse 
毎秒数千万レ 
コードの利利?用 
データ 
何?十万ものデー 
タソース 
毎時数テラバイ 
トのデータ 
毎?日 100以上の 
ETL Job起動 
毎時 100以上の 
クエリー 
新しい要求 
? リアルタイム, 早い意思決定 
? “keep everything” 
? 新しいデータソースの追加に追随するための拡張性 
? 複数の?目的に応じて同じデータを並?行行処理理
典型的なデータフローとIngestレイヤの重要性 
Client/ 
SensorIngestProcessingStorageAnalytics + 
Visualization 
+ Reporting
Ingest Layerの重要性 
 構造の異異なるデータソースに対する?高速処理理 
? 耐障害性とスケールに対する考慮 
? ?高い信頼性の維持 
? 順序性 
 ランダムにくるデータをまとめて、シーケンスストリームの形に変換 
? シーケンスデータによる容易易な処理理の実現 
? 容易易なスケール 
? 永続化データ 
Processing 
Or 
Kinesis 
Kafka 
Processing 
Kinesis
导?入事例例
碍颈苍别蝉颈蝉概要
Amazon Kinesisとは? 
? ハイボリュームな連続したデータをリアルタイム 
で処理理可能なフルマネージドサービス 
? Kinesisは、数?十万のデータソースからの1時間辺 
り数テラバイトのデータを処理理することができ、 
かつ、格納されたデータは、複数のAZに格納する 
信頼性と耐久性をもつサービス
ユースケース 
サービスやシステム 
のリアルタイム状況 
把握 
? キャンペーンの状 
況把握 
? ゲーム内イベント 
の状況把握 
? POSデータからの 
売上状況把握 
異異常検知 
? センサーの異異常検 
知 
? 不不正アクセス検知 
サービス向上 
? ソーシャルデータ 
を?用いたリアルタ 
イムサービス 
? 直近の?行行動に基に 
したリコメンデー 
ション
碍颈苍别蝉颈蝉概要 
Kinesis Client Library 
+ 
Connector Library 
HTTPS Post 
AWS SDK 
Fluentd 
Flume 
LOG4J 
Get* APIs 
Apache 
Storm 
Amazon Elastic 
MapReduce 
データ?入?力力側データ取得と処理理 
MobileSDK 
 Cognito
Kinesis構成内容 
Data 
Sources 
App.1 
 
[Aggregate  
De-‐??Duplicate] 
App.4 
 
[Machine 
Learning] 
Data 
Sources 
Data 
Sources 
Data 
Sources 
App.2 
 
[Metric 
Extraction] 
S3 
DynamoDB 
Redshift 
App.3 
 
[Real-‐??time 
Dashboard] 
Data 
Sources 
Stream 
Availability 
Zone 
Availability 
Zone 
Shard 1 
Shard 2 
Shard N 
Availability 
Zone 
Kinesis 
 
AWS Endpoint 
 
? ?用途単位でStreamを作成し、Streamは、1つ以上のShardで構成される 
? Shardは、データ?入?力力側 1MB/sec, 1000 TPS、データ処理理側 2 MB/sec, 5TPSのキャパシティを持つ  
? ?入?力力するデータをData Recordと呼び、?入?力力されたData Recordは、24 時間かつ複数のAZに保管される 
? Shardの増加減によってスケールの制御が可能
Kinesisコスト 
従量量課?金金  初期費?用不不要 
課?金金項?目単価 
シャード利利?用料料$0.0195/shard/時間 
Putトランザクション$0.043/100万Put 
? シャード1つで、?一ヶ?月約$14 
? Getトランザクションは無料料 
? インバウンドのデータ転送料料は無料料 
? アプリケーションが?走るEC2は通常の料料?金金がかかります
データ?入?力力
データ?入?力力?方法 
? PutRecord API でデータ?入?力力が可能 
– http://docs.aws.amazon.com/kinesis/latest/APIReference/API_?PutRecord.html 
? AWS SDK for Java, Javascript, Python, Ruby, PHP, .Net が利利?用可能 
例例)botoを利利?用してput_?recordした例例  
http://docs.pythonboto.org/en/latest/ref/kinesis.html#module-boto.kinesis.layer1
データ?入?力力及び分配イメージ 
? DataRecordに設定されたパーティションキーを基に 
Shardに分配 
? Shardは担当するレンジを持ち、パーティションキーを 
MD5でハッシュ化した値によって該当のShardに分配さ 
れる0 
2128 
Shard-‐??1 
MD5(パーティションキー) 
Shard-‐??0 
データパーティション 
キー 
値によりどちら 
かに分配 
0 
2127 
シャーディングは 
パーティションキー 
の設計が肝!
シーケンス番号 
? KinesisがStream内でユニークなシーケンス番号を付与 
? データもシーケンス番号も不不変 
? シーケンス番号でデータが何回でも取得できる(24時間以内) 
? 何度度取得してもシーケンス番号の順番はかわらない 
shard 
SeqNo 
(14) 
SeqNo 
(17) 
SeqNo 
(25) 
SeqNo 
(26) 
SeqNo 
(32)
データ?入?力力パターン分類 
パターンユースケース 
既存ログ収集パターン既存あるWebサーバやアプリサーバのログを?入?力力するパターン 
 
センサーログ収集パターンセンサーが収集したデータを?入?力力するパターン 
 
モバイルアプリデータ収集パ 
ターン 
モバイルアプリが?生み出すデータ(ログ、メッセージなど)を?入 
?力力するパターン
既存ログ収集パターン 
? Fluentd Plugin利利?用パターン 
? Webサーバ、アプリケーションサー 
バなどにあるログデータの?入?力力に最 
適 
? GithubからPluginを取得することが 
可能 
https://github.com/awslabs/ 
aws-‐??fluent-‐??plugin-‐??kinesis 
? Log4J利利?用パターン 
? JavaアプリケーションでLog4Jを利利?用 
している場合導?入が容易易 
? 開発者ガイド 
http://docs.aws.amazon.com/ 
ElasticMapReduce/latest/DeveloperGuide/ 
kinesis-‐??pig-‐??publisher.html 
Web 
log4j.properties サンプル 
# KINESIS appender 
log4j.logger.KinesisLogger=INFO, KINESIS 
log4j.additivity.KinesisLogger=false 
log4j.appender.KINESIS=com.amazonaws.services.kin 
esis.log4j.KinesisAppender 
log4j.appender.KINESIS.layout=org.apache.log4j.Patte 
rnLayout 
log4j.appender.KINESIS.layout.ConversionPattern= 
%m
センサーログ収集パターン 
? センサーデバイスなどライトウェイトなプロトコル(MQTT)を利利?用するパターン 
? MQTT BrokerとMQTT-‐??Kinesis Bridgeを?用いてメッセージをKinesisに?入?力力するこ 
MQTT 
Broker 
Kinesis-?‐MQTT 
Bridge 
とが可能 
? GithubからMQTT-‐??Kinesis Bridgeサンプルソースが取得可能 
https://github.com/awslabs/mqtt-‐??kinesis-‐??bridge 
MQTT 
Broker 
Kinesis-?‐MQTT 
Bridge 
センサー 
センサー 
センサー 
Auto scaling Group
モバイルアプリデータ収集パターン 
? モバイルアプリから直接?入?力力パターン 
? CognitoとMobileSDKを?用いて容易易にKinesisにデータ?入?力力が可能 
? 認証または、?非認証でKinesisへのアクセストークンをテンポラリに取得しデータ?入 
?力力が可能 
Login OAUTH/OpenID 
Access Token 
End Users 
App w/SDK 
認証が必要な 
Access Token 
Pool ID 
Role ARNs 
Cognito ID, 
Temp 
Credentials 
Put Recode 
場合 
Amazon Cognito -‐?? IDブローカー 
AWS identities 
Account 
Identitypool 
Identity 
Providers 
Access 
authenticated 
identitypool Policy 
Unauthenticated 
Identities
データの取得と処理理
データ取得?方法 
? GetShardIterator APIでShard内のポジションを取得し、GetRecords 
APIでデータ?入?力力が可能 
– http://docs.aws.amazon.com/kinesis/latest/APIReference/API_?GetShardIterator.html 
– http://docs.aws.amazon.com/kinesis/latest/APIReference/API_?GetRecords.html 
? AWS SDK for Java, Javascript, Python, Ruby, PHP, .Net が利利?用可 
能 
例例)botoを利利?用してget_?shard_?iterator, get_?recordsした例例  
http://docs.pythonboto.org/en/latest/ref/kinesis.html#module-boto.kinesis.layer1
GetShardIteratorでのデータ取得指定?方法 
? GetShardIterator APIでは、ShardIteratorTypeを指定してポジションを取得 
する。 
? ShardIteratorTypeは以下の通り 
– AT_?SEQUENCE_?NUMBER ( 指定のシーケンス番号からデータ取得 ) 
– AFTER_?SEQUENCE_?NUMBER ( 指定のシーケンス番号以降降からデータ取得 ) 
– TRIM_?HORIZON ( Shardにある最も古いデータからデータ取得 )  
– LATEST ( 最新のデータからデータ取得 ) 
Seq: xxx 
LATEST 
AT_?SEQUENCE_?NUMBER 
AFTER_?SEQUENCE_?NUMBER 
TRIM_?HORIZON 
GetShardIteratorの動作イメージ
Kinesis Client Library (KCL) 
Client library for fault-‐??tolerant, at least-‐??once, Continuous 
Processing  
? Shardと同じ数のWorker 
? Workerを均等にロードバランシング 
Shard 1 
? 障害感知と新しいWorkerの?立立ち上げ 
Shard 2 
? シャードの数に応じてworkerが動作する 
Shard 3 
? AutoScalingでエラスティック 
Shard 4 
? チェックポインティングとAt least once処理理 
Shard n 
EC2 Instance 
KCL Worker 1 
KCL Worker 2 
EC2 Instance 
KCL Worker 3 
KCL Worker 4 
EC2 Instance 
KCL Worker n 
Kinesis 
これらの煩雑な処理理を意識識することなく 
ビジネスロジックに集中することができる。
Kinesis Client Libraryの動き 
Stream 
Shard-‐??0 
Shard-‐??1 
Kinesis 
アプリケーション 
(KCL) 
ワーカーシーケンス番号 
Instance A12345 
Instance A98765 
Data 
Record 
(12345) 
Data 
Record 
(24680) 
Data 
Record 
(98765) 
DynamoDB 
Instance A 
(*)実際のKey, Attribute名は異異なります。 
1. Kinesis Client LibraryがShardからData Recordを取得 
2. 設定された間隔でシーケンス番号をそのワーカーのIDをキーにした 
DynamoDBのテーブルに格納 
3. 1つのアプリが複数Shardからデータを取得し処理理を実?行行
Kinesis Client Libraryの動き 
Stream 
Shard-‐??0 
Shard-‐??1 
Kinesis 
アプリケーション 
(KCL)ワーカーシーケンス番号 
Instance A12345 
Instance B98765 
Data 
Record 
(12345) 
Data 
Record 
(24680) 
Data 
Record 
(98765) 
DynamoDB 
Instance A 
Instance B 
Kinesis 
アプリケーション 
(KCL) 
1. 複数アプリを実?行行した場合は、負荷分散される 
(*)実際のKey, Attribute名は異異なります。
Kinesis Client Libraryの動き 
Stream 
Shard-‐??0 
Shard-‐??1 
Kinesis 
アプリケーション 
(KCL)ワーカーシーケンス番号 
Instance A 
↓ 
Instance B 
12345 
Instance B98765 
Data 
Record 
(12345) 
Data 
Record 
(24680) 
Data 
Record 
(98765) 
DynamoDB 
Instance A 
Instance B 
Kinesis 
アプリケーション 
(KCL) 
(*)実際のKey, Attribute名は異異なります。 
Instance Aがデータ取得されない状況を検知し、Instance Bが、DynamoDBに 
格納されているシーケンス番号からデータ取得を?行行う
Kinesis Client Libraryの動き–拡張性 
Stream 
Shard-‐??0 
Kinesis 
アプリケーション 
(KCL) 
Shardワーカーシーケンス 
番号 
Shard-‐??0Instance A12345 
Shard-‐??1Instance A98765 
Data 
Record 
(12345) 
Data 
Record 
(24680) 
DynamoDB 
Instance A 
Shard-‐??1 
Data 
Record 
(98765) 
New 
(*)実際のKey, Attribute名は異異なります。 
Shard-‐??1を増やしたことを検知し、データ取得を開始し、Shard-‐??1のチェックポ 
イント情報をDynamoDBに追加
?目的に応じてKinesisアプリケーションを追加可能 
ストリーム 
データ 
レコード 
(12345) 
データ 
レコード 
(24680) 
シャード 
データ 
レコード 
(98765) 
シャード 
アーカイブアプリ 
(KCL) 
DynamoDB 
Instance A 
Archive Table 
Shardワーカーシーケンス 
番号 
Shard-‐??0Instance A12345 
Shard-‐??1Instance A98765 
各アプリ毎に別テーブルで管理理される 
Instance A 
集計アプリ 
(KCL) 
Calc Table 
Shardワーカーシーケンス 
番号 
Shard-‐??0Instance A24680 
Shard-‐??1Instance A98765
Kinesis Client Library (KCL) for Pythonについて 
? KCL for Pythonは、KCL for Javaの“MultiLangDaemon”を常駐プロセ 
スとして利利?用し、データ処理理のメインロジックをPythonで記述できるラ 
イブラリ 
? データ処理理は、サブプロセスとして起動される 
? “MultiLangDaemon”とサブプロセス間のデータ通信は、定義されたプロ 
トコルでSTDIN/STDOUTを使って?行行われる
Kinesis Client Library (KCL) for Pythonについて 
? KCL for Pythonは、KCL for Javaの“MultiLangDaemon”を常駐プロセ 
スとして利利?用し、データ処理理のメインロジックをPythonで記述できるラ 
イブラリ 
? データ処理理は、サブプロセスとして起動される 
? “MultiLangDaemon”とサブプロセス間のデータ通信は、定義されたプロ 
トコルでSTDIN/STDOUTを使って?行行われる 
KCL(Java) 
Worker Thread Python Logic 
Shard-‐??0 
Shard-‐??1Worker Thread 
Process 
Python Logic 
Process
KCL for Python実装 
#!env python 
from amazon_kclpy import kcl 
import json, base64 
class RecordProcessor(kcl.RecordProcessorBase): 
def initialize(self, shard_id): 
pass 
def process_records(self, records, checkpointer): 
pass 
def shutdown(self, checkpointer, reason): 
pass 
if __name__ == __main__: 
kclprocess = kcl.KCLProcess(RecordProcessor()) 
kclprocess.run()
KCL for Python実装 
KCL for Python 
https://github.com/awslabs/amazon-kinesis-client-python/blob/ 
master/amazon_kclpy/kcl.py 
KCL for Java 
https://github.com/awslabs/amazon-kinesis-client/tree/master/src/ 
main/java/com/amazonaws/services/kinesis/multilang
Multi Language Protocol 
Action Parameter 
Initialize shardId : string 
processRecords [{ data : ”base64encoded_string, 
partitionKey : ”partition key, 
sequenceNumber : ”sequence number; 
}] // a list of records 
checkpoint checkpoint : ”sequence number, 
error : NameOfException 
shutdown reason : “TERMINATE|ZOMBIE
KCL for Pythonのプロパティで設定出来る項?目 
項?目内容 
failoverTimeMillisWorkerが処理理を継続できなくなり別なWorkerにフェイルオーバする時間(ミリ秒) 
短い値を設定するとDynamoDBのPIOPS?高くなるため注意が必要 
maxRecords1回のデータ取得で取得するレコード件数 
idleTimeBetweenReadsInMillisレコード取得間隔(ミリ秒) 
callProcessRecordsEvenForEmptyRecord 
List 
レコードデータが空でもレコード取得処理理を継続するかの判断(True or Fault) 
parentShardPollIntervalMillis親のShardをチェックするインターバル 
短い値を設定するとDynamoDBのPIOPS?高くなるため注意が必要 
cleanupLeasesUponShardCompletionshradが終了了した後に継続して処理理を続けるかクリーンアップするかを指定 
taskBackoffTimeMillisKCLのバックオフタイムの設定 
metricsBufferTimeMillisCloudWatchのAPIコールする前の時間 
metricsMaxQueueSizeCloudWatchのAPIコールする最?大のキューサイズ 
validateSequenceNumberBeforeCheckpo 
inting 
Checkpointingする前にシーケンス番号をチェックするかを指定 
maxActiveThreadsMultiLangDaemonの最?大スレッド数
KCL for Python実?行行?方法 
[ec2-user@ip-172-31-17-43 samples]$ amazon_kclpy_helper.py --print_command -j /usr/bin/java -p /home/ec2- 
user/amazon-kinesis-client-python/samples/sample.properties 
/usr/bin/java -cp /usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/amazon-kinesis- 
client-1.2.0.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ 
jackson-annotations-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/ 
jars/commons-codec-1.3.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/ 
jars/commons-logging-1.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/ 
jars/joda-time-2.4.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ 
jackson-databind-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ 
jackson-core-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/aws-java- 
sdk-1.7.13.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ 
httpclient-4.2.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ 
httpcore-4.2.jar:/home/ec2-user/amazon-kinesis-client-python/samples 
com.amazonaws.services.kinesis.multilang.MultiLangDaemon sample.properties 
出?力力結果をコピーして、シェルの?入?力力 
としてペーストし、実?行行すると、KCL 
が実?行行されます
【前提】データ処理理デザインパターン 
? データ処理理を?行行うアプリケーション側でリカバリーやロードバランシン 
グを考慮した設計が必要 
? Kinesisの特徴であるシリアル番号を利利?用しチェックポイントを打つこと 
が重要 
? 1つのデータを複数のアプリケーションで利利?用できるためアプリケー 
ション毎に追加?削除できる設計 
? 本番データを?用いて開発中のロジックの評価や複数のロジックを同じ 
データを?用いて評価することが可能 
データ 
ソース 
ロジック 
A 
ロジック 
SeqNoB 
(14) 
SeqNo 
(17) 
SeqNo 
(25) 
SeqNo 
(26) 
SeqNo 
(32)
データ処理理パターン分類 
分類ユースケース 
基本構成パターンKinesisアプリケーションを構成する基本的なパターン 
Simple ETL処理理パターンKinesisでIngestされたデータをS3、DynamoDB、Redshiftにアーカイブするパ 
ターン 
-‐?? フィルタリング+データアーカイブ 
ETL/MapReduce処理理パターンKinesisでIngestされたデータをHadoop、Spark、Storm 
を?用いてデータ処理理するパターン 
-‐?? データクレンジング 
-‐?? ETL+簡易易集計 
Filterパターン複数のKinesisをパイプラインのようにつなぎあわせてFiltering/MapReduceを?行行 
うパターン 
-‐?? フィルタリング 
-‐?? ロジックルーティング 
機械学習インテグレーションパターンオンラインとオフラインを組み合わせた機械学習インテグレーションパターン 
??異異 
常検知、リコメンデーション 
AWS Lambda インテンションパターンAWS Lambdaをデータ処理理に利利?用するパターン 
マネージドサービスを使うことで簡単にデータ処理理システムを構築できる
基本構成パターン 
? ?目的毎にアプリケーションを構成するパターン 
? KCLを?用いることで可?用性の?高いアプリの導?入が可能 
? それぞれのアプリの可?用性?信頼性に合わせた設計 
? 本番システムに影響されず開発検証のためにデータを利利?用することも可能 
センサー 
センサー 
センサー 
アプリ1Dashboard 
アプリ2 
DynamoDB 
Redshift 
例例:リアルタイムダッシュボード
Simple ETL処理理パターン 
? DynamoDB、Redshift、S3などとのインテグレーションを容易易にするKinesis 
Connector Libraryを利利?用可能 
https://github.com/awslabs/amazon-kinesis-connectors 
センサー 
センサー 
センサー 
S3 
アーカイブ 
Redshift 
データロード 
S3 
Redshift 
Kinesis Connectorのデータフロー 
TransformerFilterBufferEmitter
ETL/MapReduce処理理パターン(1) 
? HadoopやSparkを?用いたパターン 
? Kinesisに集積されたデータをHive、PigなどのHadoopツールを?用いてETL処理理(Map Reduce 
処理理)が可能 
? 別のKinesis Stream, S3, DynamoDB, HDFSのHive Tableなどの他のデータソースのテーブ 
ルとJOINすることなども可能 
? Data pipeline / Crontabで定期実?行行することにより、定期的にKinesisからデータを取り込み、 
処理理することが可能 
Data Pipeline 
EMR ClusterS3 
構成例例  
DataPipelineで定期的にHiveを実 
?行行しKinesisにあるデータを処理理。 
結果をS3に格納 
Kinesis 
EMR AMI 3.0.4以上を?用いることでKinesisインテグレーションが可能
ETL/MapReduce処理理パターン(2) 
? Apache Stormを利利?用するパターン 
? Boltをつなげることで?高度度なデータ処理理をリアルタイムで分散処理理が可能 
? KinesisからApache Stormへのインテグレーションを容易易にするためのSpoutを提 
供 
https://github.com/awslabs/kinesis-‐??storm-‐??spout 
Data 
Sources 
Data 
Sources 
Data 
Sources 
Storm 
Spout 
Storm 
Bolt 
Storm 
Bolt 
Storm 
Bolt
Filterパターン 
? Kinesisをパイプラインとして連結するパターン 
? FilterやMapReduceを多段Kinesisを?用いて実現 
? 最初のKinesisは、ピークトラフィックに対応しやすくするためにランダムな値を 
パーティションキーとしてセットし、平準化し、次のストリームを?生成し、伝送す 
る 
Data 
Sources 
Data 
Sources 
Data 
Sources 
Filter Layer (例例)Process Layer (例例) 
Kinesis 
App 
Kinesis 
App 
Kinesis 
App 
Kinesis 
App
機械学習インテグレーションパターン 
? ストリーミングデータの分類、異異常検知などを機械学習を?用いて実?行行 
? 機械学習への教師データの反映を定期的に実?行行 
? 機械学習器は、Apache Spark、Apache Stormなどの上で機械学習アルゴ 
リズムを動作させインテグレーションすることも可能 
Data 
Sources 
Data 
Sources 
Data 
Sources 
機械学習 
Jubatus 
アーカイブ 
Dashboar 
d 
教師データ分析 
ダッシュボード 
例例:オンライン機械学習 
(Jubatus)の例例
AWS Lambdaインテグレーションパターン 
? Lambda Functionをデプロイするだけでインフラの構成などを気にせず 
データ処理理実装が可能 
例例:AWS Lambda 
インテンション例例 
Data 
Sources 
Data 
Sources 
Data 
Sources 
S3 
Redshift
デモ 
? 異異常検知とアーカイブを?目的としたKinesisアプリをEC2上で実?行行 
? ダッシュボードで検知状態を確認 
リアルタイム 
分析を実現 
ゲートウェイ 
サーバ 
Jubatus 
異異常検知 
サーバ 
ダッシュボード 
サーバ 
センサデバイス 
(iPhone) 
監視?用 
ダッシュボード 
?生データ?用 
ストリーム 
異異常値スコア?用 
ストリーム 
HTTP/WS 
Put Record 
HTTP/WS 
Get Records 
加速度度センサ 
情報を送信
まとめ
クラウドのメリットをフルに活かす 
? IoT、オープンデータなど、より多くのデータを収 
集?活?用することで、ビジネス価値がうまれる 
? AWSを活?用することで、データを効率率率よく収集し、 
分析することが実現できる 
? 特に、1つのデータ?ソースをアジリティ?高く試すこ 
とができるアーキテクチャが重要

More Related Content

Pydata Amazon Kinesisのご紹介

  • 1. Amazon Kinesisの紹介 と使いドコロ アマゾンデータサービスジャパン株式会社 パートナーソリューションアーキテクト 榎並 利利晃
  • 2. ?自?己紹介 ? 名前 – 榎並 利利晃(えなみ としあき) – toshiake@amazon.co.jp – @ToshiakiEnami ? 役割 – パートナーソリューションアーキテクト – 主にエマージングパートナー様を担当 ? 好きなAWSのサービス – Amazon Kinesis – Amazon DynamoDB
  • 3. AWSでの例例:迅速かつ正確なメータリング情報の集計 S3 Process Submissions Store Batches Process Hourly w/ Hadoop Clients Submitting Data Data Warehouse 毎秒数千万レ コードの利利?用 データ 何?十万ものデー タソース 毎時数テラバイ トのデータ 毎?日 100以上の ETL Job起動 毎時 100以上の クエリー 新しい要求 ? リアルタイム, 早い意思決定 ? “keep everything” ? 新しいデータソースの追加に追随するための拡張性 ? 複数の?目的に応じて同じデータを並?行行処理理
  • 5. Ingest Layerの重要性 構造の異異なるデータソースに対する?高速処理理 ? 耐障害性とスケールに対する考慮 ? ?高い信頼性の維持 ? 順序性 ランダムにくるデータをまとめて、シーケンスストリームの形に変換 ? シーケンスデータによる容易易な処理理の実現 ? 容易易なスケール ? 永続化データ Processing Or Kinesis Kafka Processing Kinesis
  • 8. Amazon Kinesisとは? ? ハイボリュームな連続したデータをリアルタイム で処理理可能なフルマネージドサービス ? Kinesisは、数?十万のデータソースからの1時間辺 り数テラバイトのデータを処理理することができ、 かつ、格納されたデータは、複数のAZに格納する 信頼性と耐久性をもつサービス
  • 9. ユースケース サービスやシステム のリアルタイム状況 把握 ? キャンペーンの状 況把握 ? ゲーム内イベント の状況把握 ? POSデータからの 売上状況把握 異異常検知 ? センサーの異異常検 知 ? 不不正アクセス検知 サービス向上 ? ソーシャルデータ を?用いたリアルタ イムサービス ? 直近の?行行動に基に したリコメンデー ション
  • 10. 碍颈苍别蝉颈蝉概要 Kinesis Client Library + Connector Library HTTPS Post AWS SDK Fluentd Flume LOG4J Get* APIs Apache Storm Amazon Elastic MapReduce データ?入?力力側データ取得と処理理 MobileSDK Cognito
  • 11. Kinesis構成内容 Data Sources App.1 [Aggregate De-‐??Duplicate] App.4 [Machine Learning] Data Sources Data Sources Data Sources App.2 [Metric Extraction] S3 DynamoDB Redshift App.3 [Real-‐??time Dashboard] Data Sources Stream Availability Zone Availability Zone Shard 1 Shard 2 Shard N Availability Zone Kinesis AWS Endpoint ? ?用途単位でStreamを作成し、Streamは、1つ以上のShardで構成される ? Shardは、データ?入?力力側 1MB/sec, 1000 TPS、データ処理理側 2 MB/sec, 5TPSのキャパシティを持つ ? ?入?力力するデータをData Recordと呼び、?入?力力されたData Recordは、24 時間かつ複数のAZに保管される ? Shardの増加減によってスケールの制御が可能
  • 12. Kinesisコスト 従量量課?金金 初期費?用不不要 課?金金項?目単価 シャード利利?用料料$0.0195/shard/時間 Putトランザクション$0.043/100万Put ? シャード1つで、?一ヶ?月約$14 ? Getトランザクションは無料料 ? インバウンドのデータ転送料料は無料料 ? アプリケーションが?走るEC2は通常の料料?金金がかかります
  • 14. データ?入?力力?方法 ? PutRecord API でデータ?入?力力が可能 – http://docs.aws.amazon.com/kinesis/latest/APIReference/API_?PutRecord.html ? AWS SDK for Java, Javascript, Python, Ruby, PHP, .Net が利利?用可能 例例)botoを利利?用してput_?recordした例例 http://docs.pythonboto.org/en/latest/ref/kinesis.html#module-boto.kinesis.layer1
  • 15. データ?入?力力及び分配イメージ ? DataRecordに設定されたパーティションキーを基に Shardに分配 ? Shardは担当するレンジを持ち、パーティションキーを MD5でハッシュ化した値によって該当のShardに分配さ れる0 2128 Shard-‐??1 MD5(パーティションキー) Shard-‐??0 データパーティション キー 値によりどちら かに分配 0 2127 シャーディングは パーティションキー の設計が肝!
  • 16. シーケンス番号 ? KinesisがStream内でユニークなシーケンス番号を付与 ? データもシーケンス番号も不不変 ? シーケンス番号でデータが何回でも取得できる(24時間以内) ? 何度度取得してもシーケンス番号の順番はかわらない shard SeqNo (14) SeqNo (17) SeqNo (25) SeqNo (26) SeqNo (32)
  • 17. データ?入?力力パターン分類 パターンユースケース 既存ログ収集パターン既存あるWebサーバやアプリサーバのログを?入?力力するパターン センサーログ収集パターンセンサーが収集したデータを?入?力力するパターン モバイルアプリデータ収集パ ターン モバイルアプリが?生み出すデータ(ログ、メッセージなど)を?入 ?力力するパターン
  • 18. 既存ログ収集パターン ? Fluentd Plugin利利?用パターン ? Webサーバ、アプリケーションサー バなどにあるログデータの?入?力力に最 適 ? GithubからPluginを取得することが 可能 https://github.com/awslabs/ aws-‐??fluent-‐??plugin-‐??kinesis ? Log4J利利?用パターン ? JavaアプリケーションでLog4Jを利利?用 している場合導?入が容易易 ? 開発者ガイド http://docs.aws.amazon.com/ ElasticMapReduce/latest/DeveloperGuide/ kinesis-‐??pig-‐??publisher.html Web log4j.properties サンプル # KINESIS appender log4j.logger.KinesisLogger=INFO, KINESIS log4j.additivity.KinesisLogger=false log4j.appender.KINESIS=com.amazonaws.services.kin esis.log4j.KinesisAppender log4j.appender.KINESIS.layout=org.apache.log4j.Patte rnLayout log4j.appender.KINESIS.layout.ConversionPattern= %m
  • 19. センサーログ収集パターン ? センサーデバイスなどライトウェイトなプロトコル(MQTT)を利利?用するパターン ? MQTT BrokerとMQTT-‐??Kinesis Bridgeを?用いてメッセージをKinesisに?入?力力するこ MQTT Broker Kinesis-?‐MQTT Bridge とが可能 ? GithubからMQTT-‐??Kinesis Bridgeサンプルソースが取得可能 https://github.com/awslabs/mqtt-‐??kinesis-‐??bridge MQTT Broker Kinesis-?‐MQTT Bridge センサー センサー センサー Auto scaling Group
  • 20. モバイルアプリデータ収集パターン ? モバイルアプリから直接?入?力力パターン ? CognitoとMobileSDKを?用いて容易易にKinesisにデータ?入?力力が可能 ? 認証または、?非認証でKinesisへのアクセストークンをテンポラリに取得しデータ?入 ?力力が可能 Login OAUTH/OpenID Access Token End Users App w/SDK 認証が必要な Access Token Pool ID Role ARNs Cognito ID, Temp Credentials Put Recode 場合 Amazon Cognito -‐?? IDブローカー AWS identities Account Identitypool Identity Providers Access authenticated identitypool Policy Unauthenticated Identities
  • 22. データ取得?方法 ? GetShardIterator APIでShard内のポジションを取得し、GetRecords APIでデータ?入?力力が可能 – http://docs.aws.amazon.com/kinesis/latest/APIReference/API_?GetShardIterator.html – http://docs.aws.amazon.com/kinesis/latest/APIReference/API_?GetRecords.html ? AWS SDK for Java, Javascript, Python, Ruby, PHP, .Net が利利?用可 能 例例)botoを利利?用してget_?shard_?iterator, get_?recordsした例例 http://docs.pythonboto.org/en/latest/ref/kinesis.html#module-boto.kinesis.layer1
  • 23. GetShardIteratorでのデータ取得指定?方法 ? GetShardIterator APIでは、ShardIteratorTypeを指定してポジションを取得 する。 ? ShardIteratorTypeは以下の通り – AT_?SEQUENCE_?NUMBER ( 指定のシーケンス番号からデータ取得 ) – AFTER_?SEQUENCE_?NUMBER ( 指定のシーケンス番号以降降からデータ取得 ) – TRIM_?HORIZON ( Shardにある最も古いデータからデータ取得 ) – LATEST ( 最新のデータからデータ取得 ) Seq: xxx LATEST AT_?SEQUENCE_?NUMBER AFTER_?SEQUENCE_?NUMBER TRIM_?HORIZON GetShardIteratorの動作イメージ
  • 24. Kinesis Client Library (KCL) Client library for fault-‐??tolerant, at least-‐??once, Continuous Processing ? Shardと同じ数のWorker ? Workerを均等にロードバランシング Shard 1 ? 障害感知と新しいWorkerの?立立ち上げ Shard 2 ? シャードの数に応じてworkerが動作する Shard 3 ? AutoScalingでエラスティック Shard 4 ? チェックポインティングとAt least once処理理 Shard n EC2 Instance KCL Worker 1 KCL Worker 2 EC2 Instance KCL Worker 3 KCL Worker 4 EC2 Instance KCL Worker n Kinesis これらの煩雑な処理理を意識識することなく ビジネスロジックに集中することができる。
  • 25. Kinesis Client Libraryの動き Stream Shard-‐??0 Shard-‐??1 Kinesis アプリケーション (KCL) ワーカーシーケンス番号 Instance A12345 Instance A98765 Data Record (12345) Data Record (24680) Data Record (98765) DynamoDB Instance A (*)実際のKey, Attribute名は異異なります。 1. Kinesis Client LibraryがShardからData Recordを取得 2. 設定された間隔でシーケンス番号をそのワーカーのIDをキーにした DynamoDBのテーブルに格納 3. 1つのアプリが複数Shardからデータを取得し処理理を実?行行
  • 26. Kinesis Client Libraryの動き Stream Shard-‐??0 Shard-‐??1 Kinesis アプリケーション (KCL)ワーカーシーケンス番号 Instance A12345 Instance B98765 Data Record (12345) Data Record (24680) Data Record (98765) DynamoDB Instance A Instance B Kinesis アプリケーション (KCL) 1. 複数アプリを実?行行した場合は、負荷分散される (*)実際のKey, Attribute名は異異なります。
  • 27. Kinesis Client Libraryの動き Stream Shard-‐??0 Shard-‐??1 Kinesis アプリケーション (KCL)ワーカーシーケンス番号 Instance A ↓ Instance B 12345 Instance B98765 Data Record (12345) Data Record (24680) Data Record (98765) DynamoDB Instance A Instance B Kinesis アプリケーション (KCL) (*)実際のKey, Attribute名は異異なります。 Instance Aがデータ取得されない状況を検知し、Instance Bが、DynamoDBに 格納されているシーケンス番号からデータ取得を?行行う
  • 28. Kinesis Client Libraryの動き–拡張性 Stream Shard-‐??0 Kinesis アプリケーション (KCL) Shardワーカーシーケンス 番号 Shard-‐??0Instance A12345 Shard-‐??1Instance A98765 Data Record (12345) Data Record (24680) DynamoDB Instance A Shard-‐??1 Data Record (98765) New (*)実際のKey, Attribute名は異異なります。 Shard-‐??1を増やしたことを検知し、データ取得を開始し、Shard-‐??1のチェックポ イント情報をDynamoDBに追加
  • 29. ?目的に応じてKinesisアプリケーションを追加可能 ストリーム データ レコード (12345) データ レコード (24680) シャード データ レコード (98765) シャード アーカイブアプリ (KCL) DynamoDB Instance A Archive Table Shardワーカーシーケンス 番号 Shard-‐??0Instance A12345 Shard-‐??1Instance A98765 各アプリ毎に別テーブルで管理理される Instance A 集計アプリ (KCL) Calc Table Shardワーカーシーケンス 番号 Shard-‐??0Instance A24680 Shard-‐??1Instance A98765
  • 30. Kinesis Client Library (KCL) for Pythonについて ? KCL for Pythonは、KCL for Javaの“MultiLangDaemon”を常駐プロセ スとして利利?用し、データ処理理のメインロジックをPythonで記述できるラ イブラリ ? データ処理理は、サブプロセスとして起動される ? “MultiLangDaemon”とサブプロセス間のデータ通信は、定義されたプロ トコルでSTDIN/STDOUTを使って?行行われる
  • 31. Kinesis Client Library (KCL) for Pythonについて ? KCL for Pythonは、KCL for Javaの“MultiLangDaemon”を常駐プロセ スとして利利?用し、データ処理理のメインロジックをPythonで記述できるラ イブラリ ? データ処理理は、サブプロセスとして起動される ? “MultiLangDaemon”とサブプロセス間のデータ通信は、定義されたプロ トコルでSTDIN/STDOUTを使って?行行われる KCL(Java) Worker Thread Python Logic Shard-‐??0 Shard-‐??1Worker Thread Process Python Logic Process
  • 32. KCL for Python実装 #!env python from amazon_kclpy import kcl import json, base64 class RecordProcessor(kcl.RecordProcessorBase): def initialize(self, shard_id): pass def process_records(self, records, checkpointer): pass def shutdown(self, checkpointer, reason): pass if __name__ == __main__: kclprocess = kcl.KCLProcess(RecordProcessor()) kclprocess.run()
  • 33. KCL for Python実装 KCL for Python https://github.com/awslabs/amazon-kinesis-client-python/blob/ master/amazon_kclpy/kcl.py KCL for Java https://github.com/awslabs/amazon-kinesis-client/tree/master/src/ main/java/com/amazonaws/services/kinesis/multilang
  • 34. Multi Language Protocol Action Parameter Initialize shardId : string processRecords [{ data : ”base64encoded_string, partitionKey : ”partition key, sequenceNumber : ”sequence number; }] // a list of records checkpoint checkpoint : ”sequence number, error : NameOfException shutdown reason : “TERMINATE|ZOMBIE
  • 35. KCL for Pythonのプロパティで設定出来る項?目 項?目内容 failoverTimeMillisWorkerが処理理を継続できなくなり別なWorkerにフェイルオーバする時間(ミリ秒) 短い値を設定するとDynamoDBのPIOPS?高くなるため注意が必要 maxRecords1回のデータ取得で取得するレコード件数 idleTimeBetweenReadsInMillisレコード取得間隔(ミリ秒) callProcessRecordsEvenForEmptyRecord List レコードデータが空でもレコード取得処理理を継続するかの判断(True or Fault) parentShardPollIntervalMillis親のShardをチェックするインターバル 短い値を設定するとDynamoDBのPIOPS?高くなるため注意が必要 cleanupLeasesUponShardCompletionshradが終了了した後に継続して処理理を続けるかクリーンアップするかを指定 taskBackoffTimeMillisKCLのバックオフタイムの設定 metricsBufferTimeMillisCloudWatchのAPIコールする前の時間 metricsMaxQueueSizeCloudWatchのAPIコールする最?大のキューサイズ validateSequenceNumberBeforeCheckpo inting Checkpointingする前にシーケンス番号をチェックするかを指定 maxActiveThreadsMultiLangDaemonの最?大スレッド数
  • 36. KCL for Python実?行行?方法 [ec2-user@ip-172-31-17-43 samples]$ amazon_kclpy_helper.py --print_command -j /usr/bin/java -p /home/ec2- user/amazon-kinesis-client-python/samples/sample.properties /usr/bin/java -cp /usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/amazon-kinesis- client-1.2.0.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ jackson-annotations-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/ jars/commons-codec-1.3.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/ jars/commons-logging-1.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/ jars/joda-time-2.4.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ jackson-databind-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ jackson-core-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/aws-java- sdk-1.7.13.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ httpclient-4.2.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/ httpcore-4.2.jar:/home/ec2-user/amazon-kinesis-client-python/samples com.amazonaws.services.kinesis.multilang.MultiLangDaemon sample.properties 出?力力結果をコピーして、シェルの?入?力力 としてペーストし、実?行行すると、KCL が実?行行されます
  • 37. 【前提】データ処理理デザインパターン ? データ処理理を?行行うアプリケーション側でリカバリーやロードバランシン グを考慮した設計が必要 ? Kinesisの特徴であるシリアル番号を利利?用しチェックポイントを打つこと が重要 ? 1つのデータを複数のアプリケーションで利利?用できるためアプリケー ション毎に追加?削除できる設計 ? 本番データを?用いて開発中のロジックの評価や複数のロジックを同じ データを?用いて評価することが可能 データ ソース ロジック A ロジック SeqNoB (14) SeqNo (17) SeqNo (25) SeqNo (26) SeqNo (32)
  • 38. データ処理理パターン分類 分類ユースケース 基本構成パターンKinesisアプリケーションを構成する基本的なパターン Simple ETL処理理パターンKinesisでIngestされたデータをS3、DynamoDB、Redshiftにアーカイブするパ ターン -‐?? フィルタリング+データアーカイブ ETL/MapReduce処理理パターンKinesisでIngestされたデータをHadoop、Spark、Storm を?用いてデータ処理理するパターン -‐?? データクレンジング -‐?? ETL+簡易易集計 Filterパターン複数のKinesisをパイプラインのようにつなぎあわせてFiltering/MapReduceを?行行 うパターン -‐?? フィルタリング -‐?? ロジックルーティング 機械学習インテグレーションパターンオンラインとオフラインを組み合わせた機械学習インテグレーションパターン ??異異 常検知、リコメンデーション AWS Lambda インテンションパターンAWS Lambdaをデータ処理理に利利?用するパターン マネージドサービスを使うことで簡単にデータ処理理システムを構築できる
  • 39. 基本構成パターン ? ?目的毎にアプリケーションを構成するパターン ? KCLを?用いることで可?用性の?高いアプリの導?入が可能 ? それぞれのアプリの可?用性?信頼性に合わせた設計 ? 本番システムに影響されず開発検証のためにデータを利利?用することも可能 センサー センサー センサー アプリ1Dashboard アプリ2 DynamoDB Redshift 例例:リアルタイムダッシュボード
  • 40. Simple ETL処理理パターン ? DynamoDB、Redshift、S3などとのインテグレーションを容易易にするKinesis Connector Libraryを利利?用可能 https://github.com/awslabs/amazon-kinesis-connectors センサー センサー センサー S3 アーカイブ Redshift データロード S3 Redshift Kinesis Connectorのデータフロー TransformerFilterBufferEmitter
  • 41. ETL/MapReduce処理理パターン(1) ? HadoopやSparkを?用いたパターン ? Kinesisに集積されたデータをHive、PigなどのHadoopツールを?用いてETL処理理(Map Reduce 処理理)が可能 ? 別のKinesis Stream, S3, DynamoDB, HDFSのHive Tableなどの他のデータソースのテーブ ルとJOINすることなども可能 ? Data pipeline / Crontabで定期実?行行することにより、定期的にKinesisからデータを取り込み、 処理理することが可能 Data Pipeline EMR ClusterS3 構成例例 DataPipelineで定期的にHiveを実 ?行行しKinesisにあるデータを処理理。 結果をS3に格納 Kinesis EMR AMI 3.0.4以上を?用いることでKinesisインテグレーションが可能
  • 42. ETL/MapReduce処理理パターン(2) ? Apache Stormを利利?用するパターン ? Boltをつなげることで?高度度なデータ処理理をリアルタイムで分散処理理が可能 ? KinesisからApache Stormへのインテグレーションを容易易にするためのSpoutを提 供 https://github.com/awslabs/kinesis-‐??storm-‐??spout Data Sources Data Sources Data Sources Storm Spout Storm Bolt Storm Bolt Storm Bolt
  • 43. Filterパターン ? Kinesisをパイプラインとして連結するパターン ? FilterやMapReduceを多段Kinesisを?用いて実現 ? 最初のKinesisは、ピークトラフィックに対応しやすくするためにランダムな値を パーティションキーとしてセットし、平準化し、次のストリームを?生成し、伝送す る Data Sources Data Sources Data Sources Filter Layer (例例)Process Layer (例例) Kinesis App Kinesis App Kinesis App Kinesis App
  • 44. 機械学習インテグレーションパターン ? ストリーミングデータの分類、異異常検知などを機械学習を?用いて実?行行 ? 機械学習への教師データの反映を定期的に実?行行 ? 機械学習器は、Apache Spark、Apache Stormなどの上で機械学習アルゴ リズムを動作させインテグレーションすることも可能 Data Sources Data Sources Data Sources 機械学習 Jubatus アーカイブ Dashboar d 教師データ分析 ダッシュボード 例例:オンライン機械学習 (Jubatus)の例例
  • 45. AWS Lambdaインテグレーションパターン ? Lambda Functionをデプロイするだけでインフラの構成などを気にせず データ処理理実装が可能 例例:AWS Lambda インテンション例例 Data Sources Data Sources Data Sources S3 Redshift
  • 46. デモ ? 異異常検知とアーカイブを?目的としたKinesisアプリをEC2上で実?行行 ? ダッシュボードで検知状態を確認 リアルタイム 分析を実現 ゲートウェイ サーバ Jubatus 異異常検知 サーバ ダッシュボード サーバ センサデバイス (iPhone) 監視?用 ダッシュボード ?生データ?用 ストリーム 異異常値スコア?用 ストリーム HTTP/WS Put Record HTTP/WS Get Records 加速度度センサ 情報を送信
  • 48. クラウドのメリットをフルに活かす ? IoT、オープンデータなど、より多くのデータを収 集?活?用することで、ビジネス価値がうまれる ? AWSを活?用することで、データを効率率率よく収集し、 分析することが実現できる ? 特に、1つのデータ?ソースをアジリティ?高く試すこ とができるアーキテクチャが重要