KinesisとはSQSのようなデータ配送サービス。SQSと違って、大量データの書き込み、転送の用途に向いています。つまりログデータのように連続的に大量に流れてくるデータの転送などに有効。
一定期間、stream内に保管され、順列に読み出しても削除されるようなものではない。よって、再度読み出したり、別々のコンシューマーから同時にアクセスすることもできる。
streamに投じたデータはstream内に存在するシャードに振り分け配置され、読み取り側はiteratorを利用してシャード内のデータにアクセスできる。データの連続性はシャード毎となる。(なので複数のシャードが存在するstream全体で先勝ち後負けになるようなトランザクション処理には適さない。在庫消費や空室予約など。する場合はシャード内のデータを時系列に並べられるDB等に書き出して処理する)
複数のiteratorで一つのシャードにアクセスすることもできる。データはストリーム内に一定期間保存されているので、例えばSeq番号を記録しておき、Seq番号からの処理を再開といったこともできる。
基本的にシャードの状態がwriteスループット > readスループットの場合(往々にしてこの状態になりやすい)、シャードを増やすことによって、readするiteratorの数を追加することで対応する。
writer -> kinesis shard1 -> shard1 iterator writer -> kinesis shard1 -> shard1 iterator shard2 -> shard2 iterator
また、firehose
ストリームを選択すると、S3などに自動で出力を行える。加工して出力する場合は、ProcessionConfiguration
でLamda等で前処理を行える。
Kinesis Streamの作成
$ aws kinesis create-stream --stream-name sample --shard-count 1
Kinesis Streamへの書き込み
import boto3 kinesis = boto3.client('kinesis') stream_name = 'sample' kinesis.put_record( StreamName=stream_name, Data="record data", PartitionKey="1"
CloudWatchアラートの作成
CloudWatch メトリクスアラームの設定
$ aws cloudwatch put-metric-alarm \ --alarm-name alertName --metric-name IncomingRecords \ --namespace AWS/Kinesis --statistic Sum --period 60 \ --threshold 10 --comparison-operator GreaterThanThreshold \ --dimensions Name=StreamName,Value=sample \ --evaluation-periods 1 \ --alarm-actions arn:aws:sns:ap-northeast-1:****:sample
--metric-name IncomingRecords
:モニタリング対象とするメトリクス
--statistic
:取得する統計
--period 60
:期間
--threshold 10
:しきい値
--comparison-operator GreaterThanThreshold
:比較条件
--dimensions
:監視する属性
--evaluation-periods
:アラートを上げるまでの連続発生回数
--alarm-actions arn:aws:sns:ap-northeast-1:****:sample
:閾値超過時の通知先
参考: https://docs.aws.amazon.com/ja_jp/streams/latest/dev/monitoring-with-cloudwatch.html