25. Kinesis Client Libraryの動き
Stream
Shard-‐??0
Shard-‐??1
Kinesis
アプリケーション
(KCL)
ワーカーシーケンス番号
Instance A12345
Instance A98765
Data
Record
(12345)
Data
Record
(24680)
Data
Record
(98765)
DynamoDB
Instance A
(*)実際のKey, Attribute名は異異なります。
1. Kinesis Client LibraryがShardからData Recordを取得
2. 設定された間隔でシーケンス番号をそのワーカーのIDをキーにした
DynamoDBのテーブルに格納
3. 1つのアプリが複数Shardからデータを取得し処理理を実?行行
26. Kinesis Client Libraryの動き
Stream
Shard-‐??0
Shard-‐??1
Kinesis
アプリケーション
(KCL)ワーカーシーケンス番号
Instance A12345
Instance B98765
Data
Record
(12345)
Data
Record
(24680)
Data
Record
(98765)
DynamoDB
Instance A
Instance B
Kinesis
アプリケーション
(KCL)
1. 複数アプリを実?行行した場合は、負荷分散される
(*)実際のKey, Attribute名は異異なります。
27. Kinesis Client Libraryの動き
Stream
Shard-‐??0
Shard-‐??1
Kinesis
アプリケーション
(KCL)ワーカーシーケンス番号
Instance A
↓
Instance B
12345
Instance B98765
Data
Record
(12345)
Data
Record
(24680)
Data
Record
(98765)
DynamoDB
Instance A
Instance B
Kinesis
アプリケーション
(KCL)
(*)実際のKey, Attribute名は異異なります。
Instance Aがデータ取得されない状況を検知し、Instance Bが、DynamoDBに
格納されているシーケンス番号からデータ取得を?行行う
28. Kinesis Client Libraryの動き–拡張性
Stream
Shard-‐??0
Kinesis
アプリケーション
(KCL)
Shardワーカーシーケンス
番号
Shard-‐??0Instance A12345
Shard-‐??1Instance A98765
Data
Record
(12345)
Data
Record
(24680)
DynamoDB
Instance A
Shard-‐??1
Data
Record
(98765)
New
(*)実際のKey, Attribute名は異異なります。
Shard-‐??1を増やしたことを検知し、データ取得を開始し、Shard-‐??1のチェックポ
イント情報をDynamoDBに追加
30. Kinesis Client Library (KCL) for Pythonについて
? KCL for Pythonは、KCL for Javaの“MultiLangDaemon”を常駐プロセ
スとして利利?用し、データ処理理のメインロジックをPythonで記述できるラ
イブラリ
? データ処理理は、サブプロセスとして起動される
? “MultiLangDaemon”とサブプロセス間のデータ通信は、定義されたプロ
トコルでSTDIN/STDOUTを使って?行行われる
31. Kinesis Client Library (KCL) for Pythonについて
? KCL for Pythonは、KCL for Javaの“MultiLangDaemon”を常駐プロセ
スとして利利?用し、データ処理理のメインロジックをPythonで記述できるラ
イブラリ
? データ処理理は、サブプロセスとして起動される
? “MultiLangDaemon”とサブプロセス間のデータ通信は、定義されたプロ
トコルでSTDIN/STDOUTを使って?行行われる
KCL(Java)
Worker Thread Python Logic
Shard-‐??0
Shard-‐??1Worker Thread
Process
Python Logic
Process
32. KCL for Python実装
#!env python
from amazon_kclpy import kcl
import json, base64
class RecordProcessor(kcl.RecordProcessorBase):
def initialize(self, shard_id):
pass
def process_records(self, records, checkpointer):
pass
def shutdown(self, checkpointer, reason):
pass
if __name__ == __main__:
kclprocess = kcl.KCLProcess(RecordProcessor())
kclprocess.run()
33. KCL for Python実装
KCL for Python
https://github.com/awslabs/amazon-kinesis-client-python/blob/
master/amazon_kclpy/kcl.py
KCL for Java
https://github.com/awslabs/amazon-kinesis-client/tree/master/src/
main/java/com/amazonaws/services/kinesis/multilang
34. Multi Language Protocol
Action Parameter
Initialize shardId : string
processRecords [{ data : ”base64encoded_string,
partitionKey : ”partition key,
sequenceNumber : ”sequence number;
}] // a list of records
checkpoint checkpoint : ”sequence number,
error : NameOfException
shutdown reason : “TERMINATE|ZOMBIE
35. KCL for Pythonのプロパティで設定出来る項?目
項?目内容
failoverTimeMillisWorkerが処理理を継続できなくなり別なWorkerにフェイルオーバする時間(ミリ秒)
短い値を設定するとDynamoDBのPIOPS?高くなるため注意が必要
maxRecords1回のデータ取得で取得するレコード件数
idleTimeBetweenReadsInMillisレコード取得間隔(ミリ秒)
callProcessRecordsEvenForEmptyRecord
List
レコードデータが空でもレコード取得処理理を継続するかの判断(True or Fault)
parentShardPollIntervalMillis親のShardをチェックするインターバル
短い値を設定するとDynamoDBのPIOPS?高くなるため注意が必要
cleanupLeasesUponShardCompletionshradが終了了した後に継続して処理理を続けるかクリーンアップするかを指定
taskBackoffTimeMillisKCLのバックオフタイムの設定
metricsBufferTimeMillisCloudWatchのAPIコールする前の時間
metricsMaxQueueSizeCloudWatchのAPIコールする最?大のキューサイズ
validateSequenceNumberBeforeCheckpo
inting
Checkpointingする前にシーケンス番号をチェックするかを指定
maxActiveThreadsMultiLangDaemonの最?大スレッド数