KinesisはLambdaのイベントリソースとして設定することができます。
Lambda関数にKinesisイベントリソースを設定する
Kinesisのストリームにデータが積まれた時にLambda関数を実行する
$ aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kinesis:ap-northeast-1:****:stream/streamName \ --function-name lambdaFunctionName \ --enabled --starting-position LATEST
Lambda関数の定義
引数のeventからストリームのレコードが取得できる。
import boto3 import json import base64 def lambda_handler(event, context): try: # eventにKinesisのstream recordが渡される(record数はイベントソースの設定上限まで) for record in event['Records']: #recordはbase64されて運ばれる payload = base64.b64decode(record['kinesis']['data']) data = json.loads(payload)
Lambda実行ロールのポリシーについて
なお、Lambdaの実行ロールにKinesisへのアクセスポリシーが必要。多分、Lambdaコンテナ起動時にストリームアクセスしているからだと思う。
ロールに付与するポリシー例。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:*", "kinesis:ListStreams", "kinesis:GetRecords" "kinesis:GetShardIterator", "kinesis:DescribeStream" ], "Resource": "*" } ] }
リンク