From c1799a3431803db1d3f65a2ed0f0171e8b3582a6 Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Sun, 9 Jan 2022 00:38:31 +0900 Subject: [PATCH 01/10] feat(iot): add Action to put record to Kinesis Data stream --- packages/@aws-cdk/aws-iot-actions/README.md | 23 ++ .../@aws-cdk/aws-iot-actions/lib/index.ts | 2 +- .../lib/kinesis-put-record-action.ts | 54 ++++ .../@aws-cdk/aws-iot-actions/package.json | 4 + ...integ.firehose-stream-action.expected.json | 268 ++++++++++++++++++ .../integ.firehose-stream-action.ts | 54 ++++ .../kinesis-put-record-action.test.ts | 120 ++++++++ 7 files changed, 524 insertions(+), 1 deletion(-) create mode 100644 packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts create mode 100644 packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.expected.json create mode 100644 packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.ts create mode 100644 packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts diff --git a/packages/@aws-cdk/aws-iot-actions/README.md b/packages/@aws-cdk/aws-iot-actions/README.md index ffb45017ed721..5c87c35b0d91a 100644 --- a/packages/@aws-cdk/aws-iot-actions/README.md +++ b/packages/@aws-cdk/aws-iot-actions/README.md @@ -26,6 +26,7 @@ Currently supported are: - Put logs to CloudWatch Logs - Capture CloudWatch metrics - Change state for a CloudWatch alarm +- Put records to Kinesis Data stream - Put records to Kinesis Data Firehose stream - Send messages to SQS queues @@ -183,6 +184,28 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', { }); ``` +## Put records to Kinesis Data stream + +The code snippet below creates an AWS IoT Rule that put records to Kinesis Data +stream when it is triggered. + +```ts +import * as iot from '@aws-cdk/aws-iot'; +import * as actions from '@aws-cdk/aws-iot-actions'; +import * as kinesis from '@aws-cdk/aws-kinesis'; + +const stream = new kinesis.Stream(this, 'MyStream'); + +const topicRule = new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"), + actions: [ + new actions.KinesisPutRecordAction(stream, { + partitionKey: '${timestamp()}', // optional property + }), + ], +}); +``` + ## Put records to Kinesis Data Firehose stream The code snippet below creates an AWS IoT Rule that put records to Put records diff --git a/packages/@aws-cdk/aws-iot-actions/lib/index.ts b/packages/@aws-cdk/aws-iot-actions/lib/index.ts index a817ccb0ca35a..109d130d7f67a 100644 --- a/packages/@aws-cdk/aws-iot-actions/lib/index.ts +++ b/packages/@aws-cdk/aws-iot-actions/lib/index.ts @@ -3,7 +3,7 @@ export * from './cloudwatch-put-metric-action'; export * from './cloudwatch-set-alarm-state-action'; export * from './common-action-props'; export * from './firehose-stream-action'; +export * from './kinesis-put-record-action'; export * from './lambda-function-action'; export * from './s3-put-object-action'; export * from './sqs-queue-action'; - diff --git a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts new file mode 100644 index 0000000000000..bcc58acd144e7 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts @@ -0,0 +1,54 @@ +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import { CommonActionProps } from './common-action-props'; +import { singletonActionRole } from './private/role'; + +/** + * Configuration properties of an action for the Kinesis Data stream. + */ +export interface KinesisPutRecordActionProps extends CommonActionProps { + /** + * The partition key used to determine to which shard the data is written. + * The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}). + * For more information @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters + * + * @default - None + */ + readonly partitionKey?: string; +} + +/** + * The action to put the record from an MQTT message to the Kinesis Data stream. + */ +export class KinesisPutRecordAction implements iot.IAction { + private readonly partitionKey?: string; + private readonly role?: iam.IRole; + + /** + * @param stream The Kinesis Data stream to which to put records. + * @param props Optional properties to not use default + */ + constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps = {}) { + this.partitionKey = props.partitionKey; + this.role = props.role; + } + + bind(rule: iot.ITopicRule): iot.ActionConfig { + const role = this.role ?? singletonActionRole(rule); + role.addToPrincipalPolicy(new iam.PolicyStatement({ + actions: ['kinesis:PutRecord'], + resources: [this.stream.streamArn], + })); + + return { + configuration: { + kinesis: { + streamName: this.stream.streamName, + partitionKey: this.partitionKey, + roleArn: role.roleArn, + }, + }, + }; + } +} diff --git a/packages/@aws-cdk/aws-iot-actions/package.json b/packages/@aws-cdk/aws-iot-actions/package.json index d39aa93533a94..eba3f293379dd 100644 --- a/packages/@aws-cdk/aws-iot-actions/package.json +++ b/packages/@aws-cdk/aws-iot-actions/package.json @@ -83,8 +83,10 @@ "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-iot": "0.0.0", + "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", + "@aws-cdk/aws-lambda-event-sources": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", @@ -97,8 +99,10 @@ "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-iot": "0.0.0", + "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", + "@aws-cdk/aws-lambda-event-sources": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.expected.json new file mode 100644 index 0000000000000..9620e82634d77 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.expected.json @@ -0,0 +1,268 @@ +{ + "Resources": { + "Logs6819BB44": { + "Type": "AWS::Logs::LogGroup", + "Properties": { + "RetentionInDays": 731 + }, + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, + "TopicRuleTopicRuleActionRole246C4F77": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "iot.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + } + } + }, + "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "Logs6819BB44", + "Arn" + ] + } + }, + { + "Action": "logs:DescribeLogStreams", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "Logs6819BB44", + "Arn" + ] + } + }, + { + "Action": "kinesis:PutRecord", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687", + "Roles": [ + { + "Ref": "TopicRuleTopicRuleActionRole246C4F77" + } + ] + } + }, + "TopicRule40A4EA44": { + "Type": "AWS::IoT::TopicRule", + "Properties": { + "TopicRulePayload": { + "Actions": [ + { + "Kinesis": { + "PartitionKey": "${timestamp()}", + "RoleArn": { + "Fn::GetAtt": [ + "TopicRuleTopicRuleActionRole246C4F77", + "Arn" + ] + }, + "StreamName": { + "Ref": "MyStream5C050E93" + } + } + } + ], + "AwsIotSqlVersion": "2016-03-23", + "ErrorAction": { + "CloudwatchLogs": { + "LogGroupName": { + "Ref": "Logs6819BB44" + }, + "RoleArn": { + "Fn::GetAtt": [ + "TopicRuleTopicRuleActionRole246C4F77", + "Arn" + ] + } + } + }, + "Sql": "SELECT * FROM 'device/+/data'" + } + } + }, + "MyStream5C050E93": { + "Type": "AWS::Kinesis::Stream", + "Properties": { + "ShardCount": 3, + "RetentionPeriodHours": 24, + "StreamEncryption": { + "Fn::If": [ + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions", + { + "Ref": "AWS::NoValue" + }, + { + "EncryptionType": "KMS", + "KeyId": "alias/aws/kinesis" + } + ] + } + } + }, + "MyLambdaServiceRole4539ECB6": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "MyLambdaServiceRoleDefaultPolicy5BBC6F68": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "kinesis:DescribeStreamSummary", + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:ListShards", + "kinesis:SubscribeToShard", + "kinesis:DescribeStream", + "kinesis:ListStreams" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + } + }, + { + "Action": "kinesis:DescribeStream", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "MyLambdaServiceRoleDefaultPolicy5BBC6F68", + "Roles": [ + { + "Ref": "MyLambdaServiceRole4539ECB6" + } + ] + } + }, + "MyLambdaCCE802FB": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "\n exports.handler = (event) => {\n event.Records.forEach(rec => {\n console.log('eventID:', rec.eventID)\n })\n }\n " + }, + "Role": { + "Fn::GetAtt": [ + "MyLambdaServiceRole4539ECB6", + "Arn" + ] + }, + "Handler": "index.handler", + "Runtime": "nodejs14.x" + }, + "DependsOn": [ + "MyLambdaServiceRoleDefaultPolicy5BBC6F68", + "MyLambdaServiceRole4539ECB6" + ] + }, + "MyLambdaKinesisEventSourceteststackMyStreamED37A4A443E4E7EF": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "FunctionName": { + "Ref": "MyLambdaCCE802FB" + }, + "BatchSize": 100, + "EventSourceArn": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + }, + "StartingPosition": "TRIM_HORIZON" + } + } + }, + "Conditions": { + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions": { + "Fn::Or": [ + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-north-1" + ] + }, + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-northwest-1" + ] + } + ] + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.ts new file mode 100644 index 0000000000000..b0d8ae70d8e04 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.ts @@ -0,0 +1,54 @@ +import * as iot from '@aws-cdk/aws-iot'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as lambda from '@aws-cdk/aws-lambda'; +import * as lambdaEventSources from '@aws-cdk/aws-lambda-event-sources'; +import * as logs from '@aws-cdk/aws-logs'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + + +const app = new cdk.App(); + +class TestStack extends cdk.Stack { + constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + const topicRule = new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323( + "SELECT * FROM 'device/+/data'", + ), + errorAction: new actions.CloudWatchLogsAction( + new logs.LogGroup(this, 'Logs', { removalPolicy: cdk.RemovalPolicy.DESTROY }), + ), + }); + + const stream = new kinesis.Stream(this, 'MyStream', { + shardCount: 3, + }); + topicRule.addAction( + new actions.KinesisPutRecordAction(stream, { + partitionKey: '${timestamp()}', + }), + ); + + const func = new lambda.Function(this, 'MyLambda', { + code: lambda.Code.fromInline(` + exports.handler = (event) => { + event.Records.forEach(rec => { + console.log('eventID:', rec.eventID) + }) + } + `), + handler: 'index.handler', + runtime: lambda.Runtime.NODEJS_14_X, + }); + func.addEventSource( + new lambdaEventSources.KinesisEventSource(stream, { + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + }), + ); + } +} + +new TestStack(app, 'test-stack'); +app.synth(); diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts new file mode 100644 index 0000000000000..5982cbc4c42eb --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts @@ -0,0 +1,120 @@ +import { Template, Match } from '@aws-cdk/assertions'; +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + +test('Default kinesis stream action', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); + + // WHEN + topicRule.addAction( + new actions.KinesisPutRecordAction(stream), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + { + Kinesis: { + StreamName: 'my-stream', + RoleArn: { + 'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'], + }, + }, + }, + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', { + AssumeRolePolicyDocument: { + Statement: [ + { + Action: 'sts:AssumeRole', + Effect: 'Allow', + Principal: { + Service: 'iot.amazonaws.com', + }, + }, + ], + Version: '2012-10-17', + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: 'kinesis:PutRecord', + Effect: 'Allow', + Resource: 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream', + }, + ], + Version: '2012-10-17', + }, + PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7', + Roles: [ + { Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' }, + ], + }); +}); + +test('can set partitionKey', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); + + // WHEN + topicRule.addAction( + new actions.KinesisPutRecordAction(stream, { partitionKey: '${timestamp()}' }), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Kinesis: { PartitionKey: '${timestamp()}' } }), + ], + }, + }); +}); + +test('can set role', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); + const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); + + // WHEN + topicRule.addAction( + new actions.KinesisPutRecordAction(stream, { role }), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Kinesis: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }), + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyName: 'MyRolePolicy64AB00A5', + Roles: ['ForTest'], + }); +}); From 9249bd6a5cc8f358803d3c58b715ef8fae72b0cb Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Sun, 9 Jan 2022 01:02:01 +0900 Subject: [PATCH 02/10] remove unused package --- packages/@aws-cdk/aws-iot-actions/package.json | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/@aws-cdk/aws-iot-actions/package.json b/packages/@aws-cdk/aws-iot-actions/package.json index eba3f293379dd..63957a3b9c0a4 100644 --- a/packages/@aws-cdk/aws-iot-actions/package.json +++ b/packages/@aws-cdk/aws-iot-actions/package.json @@ -86,7 +86,6 @@ "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", - "@aws-cdk/aws-lambda-event-sources": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", @@ -102,7 +101,6 @@ "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", - "@aws-cdk/aws-lambda-event-sources": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", From c1c4bd448014a77f2024dfb28d148aecdb572f32 Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Sun, 9 Jan 2022 01:02:21 +0900 Subject: [PATCH 03/10] rename integ test file --- ...xpected.json => integ.kinesis-put-record-action.expected.json} | 0 ...rehose-stream-action.ts => integ.kinesis-put-record-action.ts} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/{integ.firehose-stream-action.expected.json => integ.kinesis-put-record-action.expected.json} (100%) rename packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/{integ.firehose-stream-action.ts => integ.kinesis-put-record-action.ts} (100%) diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json similarity index 100% rename from packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.expected.json rename to packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts similarity index 100% rename from packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.firehose-stream-action.ts rename to packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts From d64b3ff9af1a3cf3f20c3110c90b76ebd90966dc Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Sun, 9 Jan 2022 01:04:21 +0900 Subject: [PATCH 04/10] remove verbose code in integ test --- ...eg.kinesis-put-record-action.expected.json | 111 ------------------ .../integ.kinesis-put-record-action.ts | 19 --- 2 files changed, 130 deletions(-) diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json index 9620e82634d77..142757695eb2c 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json @@ -130,117 +130,6 @@ ] } } - }, - "MyLambdaServiceRole4539ECB6": { - "Type": "AWS::IAM::Role", - "Properties": { - "AssumeRolePolicyDocument": { - "Statement": [ - { - "Action": "sts:AssumeRole", - "Effect": "Allow", - "Principal": { - "Service": "lambda.amazonaws.com" - } - } - ], - "Version": "2012-10-17" - }, - "ManagedPolicyArns": [ - { - "Fn::Join": [ - "", - [ - "arn:", - { - "Ref": "AWS::Partition" - }, - ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" - ] - ] - } - ] - } - }, - "MyLambdaServiceRoleDefaultPolicy5BBC6F68": { - "Type": "AWS::IAM::Policy", - "Properties": { - "PolicyDocument": { - "Statement": [ - { - "Action": [ - "kinesis:DescribeStreamSummary", - "kinesis:GetRecords", - "kinesis:GetShardIterator", - "kinesis:ListShards", - "kinesis:SubscribeToShard", - "kinesis:DescribeStream", - "kinesis:ListStreams" - ], - "Effect": "Allow", - "Resource": { - "Fn::GetAtt": [ - "MyStream5C050E93", - "Arn" - ] - } - }, - { - "Action": "kinesis:DescribeStream", - "Effect": "Allow", - "Resource": { - "Fn::GetAtt": [ - "MyStream5C050E93", - "Arn" - ] - } - } - ], - "Version": "2012-10-17" - }, - "PolicyName": "MyLambdaServiceRoleDefaultPolicy5BBC6F68", - "Roles": [ - { - "Ref": "MyLambdaServiceRole4539ECB6" - } - ] - } - }, - "MyLambdaCCE802FB": { - "Type": "AWS::Lambda::Function", - "Properties": { - "Code": { - "ZipFile": "\n exports.handler = (event) => {\n event.Records.forEach(rec => {\n console.log('eventID:', rec.eventID)\n })\n }\n " - }, - "Role": { - "Fn::GetAtt": [ - "MyLambdaServiceRole4539ECB6", - "Arn" - ] - }, - "Handler": "index.handler", - "Runtime": "nodejs14.x" - }, - "DependsOn": [ - "MyLambdaServiceRoleDefaultPolicy5BBC6F68", - "MyLambdaServiceRole4539ECB6" - ] - }, - "MyLambdaKinesisEventSourceteststackMyStreamED37A4A443E4E7EF": { - "Type": "AWS::Lambda::EventSourceMapping", - "Properties": { - "FunctionName": { - "Ref": "MyLambdaCCE802FB" - }, - "BatchSize": 100, - "EventSourceArn": { - "Fn::GetAtt": [ - "MyStream5C050E93", - "Arn" - ] - }, - "StartingPosition": "TRIM_HORIZON" - } } }, "Conditions": { diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts index b0d8ae70d8e04..1ad0f33826b20 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts @@ -1,7 +1,5 @@ import * as iot from '@aws-cdk/aws-iot'; import * as kinesis from '@aws-cdk/aws-kinesis'; -import * as lambda from '@aws-cdk/aws-lambda'; -import * as lambdaEventSources from '@aws-cdk/aws-lambda-event-sources'; import * as logs from '@aws-cdk/aws-logs'; import * as cdk from '@aws-cdk/core'; import * as actions from '../../lib'; @@ -30,23 +28,6 @@ class TestStack extends cdk.Stack { partitionKey: '${timestamp()}', }), ); - - const func = new lambda.Function(this, 'MyLambda', { - code: lambda.Code.fromInline(` - exports.handler = (event) => { - event.Records.forEach(rec => { - console.log('eventID:', rec.eventID) - }) - } - `), - handler: 'index.handler', - runtime: lambda.Runtime.NODEJS_14_X, - }); - func.addEventSource( - new lambdaEventSources.KinesisEventSource(stream, { - startingPosition: lambda.StartingPosition.TRIM_HORIZON, - }), - ); } } From 4b362f14ea8b7d8881017b713058d38bd9c57eb9 Mon Sep 17 00:00:00 2001 From: Tatsuya Yamamoto Date: Tue, 11 Jan 2022 12:35:09 +0900 Subject: [PATCH 05/10] Update packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts Co-authored-by: Adam Ruka --- .../kinesis-stream/integ.kinesis-put-record-action.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts index 1ad0f33826b20..441b4cdb8bc07 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts @@ -23,11 +23,9 @@ class TestStack extends cdk.Stack { const stream = new kinesis.Stream(this, 'MyStream', { shardCount: 3, }); - topicRule.addAction( - new actions.KinesisPutRecordAction(stream, { - partitionKey: '${timestamp()}', - }), - ); + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '${timestamp()}', + })); } } From e32e2d7d5939ea6988e1f308e7254618f3033145 Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Tue, 11 Jan 2022 12:41:10 +0900 Subject: [PATCH 06/10] address comments --- .../lib/kinesis-put-record-action.ts | 2 +- ...eg.kinesis-put-record-action.expected.json | 90 +++++-------------- .../integ.kinesis-put-record-action.ts | 10 +-- .../kinesis-put-record-action.test.ts | 14 ++- 4 files changed, 31 insertions(+), 85 deletions(-) diff --git a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts index bcc58acd144e7..85ef778f59e35 100644 --- a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts +++ b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts @@ -11,8 +11,8 @@ export interface KinesisPutRecordActionProps extends CommonActionProps { /** * The partition key used to determine to which shard the data is written. * The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}). - * For more information @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters * + * @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters * @default - None */ readonly partitionKey?: string; diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json index 142757695eb2c..ded46a0088df1 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json @@ -1,12 +1,29 @@ { "Resources": { - "Logs6819BB44": { - "Type": "AWS::Logs::LogGroup", + "TopicRule40A4EA44": { + "Type": "AWS::IoT::TopicRule", "Properties": { - "RetentionInDays": 731 - }, - "UpdateReplacePolicy": "Delete", - "DeletionPolicy": "Delete" + "TopicRulePayload": { + "Actions": [ + { + "Kinesis": { + "PartitionKey": "${timestamp()}", + "RoleArn": { + "Fn::GetAtt": [ + "TopicRuleTopicRuleActionRole246C4F77", + "Arn" + ] + }, + "StreamName": { + "Ref": "MyStream5C050E93" + } + } + } + ], + "AwsIotSqlVersion": "2016-03-23", + "Sql": "SELECT * FROM 'device/+/data'" + } + } }, "TopicRuleTopicRuleActionRole246C4F77": { "Type": "AWS::IAM::Role", @@ -30,29 +47,6 @@ "Properties": { "PolicyDocument": { "Statement": [ - { - "Action": [ - "logs:CreateLogStream", - "logs:PutLogEvents" - ], - "Effect": "Allow", - "Resource": { - "Fn::GetAtt": [ - "Logs6819BB44", - "Arn" - ] - } - }, - { - "Action": "logs:DescribeLogStreams", - "Effect": "Allow", - "Resource": { - "Fn::GetAtt": [ - "Logs6819BB44", - "Arn" - ] - } - }, { "Action": "kinesis:PutRecord", "Effect": "Allow", @@ -74,44 +68,6 @@ ] } }, - "TopicRule40A4EA44": { - "Type": "AWS::IoT::TopicRule", - "Properties": { - "TopicRulePayload": { - "Actions": [ - { - "Kinesis": { - "PartitionKey": "${timestamp()}", - "RoleArn": { - "Fn::GetAtt": [ - "TopicRuleTopicRuleActionRole246C4F77", - "Arn" - ] - }, - "StreamName": { - "Ref": "MyStream5C050E93" - } - } - } - ], - "AwsIotSqlVersion": "2016-03-23", - "ErrorAction": { - "CloudwatchLogs": { - "LogGroupName": { - "Ref": "Logs6819BB44" - }, - "RoleArn": { - "Fn::GetAtt": [ - "TopicRuleTopicRuleActionRole246C4F77", - "Arn" - ] - } - } - }, - "Sql": "SELECT * FROM 'device/+/data'" - } - } - }, "MyStream5C050E93": { "Type": "AWS::Kinesis::Stream", "Properties": { diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts index 1ad0f33826b20..46d1d91972afe 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts @@ -1,12 +1,8 @@ import * as iot from '@aws-cdk/aws-iot'; import * as kinesis from '@aws-cdk/aws-kinesis'; -import * as logs from '@aws-cdk/aws-logs'; import * as cdk from '@aws-cdk/core'; import * as actions from '../../lib'; - -const app = new cdk.App(); - class TestStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); @@ -15,9 +11,6 @@ class TestStack extends cdk.Stack { sql: iot.IotSql.fromStringAsVer20160323( "SELECT * FROM 'device/+/data'", ), - errorAction: new actions.CloudWatchLogsAction( - new logs.LogGroup(this, 'Logs', { removalPolicy: cdk.RemovalPolicy.DESTROY }), - ), }); const stream = new kinesis.Stream(this, 'MyStream', { @@ -31,5 +24,6 @@ class TestStack extends cdk.Stack { } } -new TestStack(app, 'test-stack'); +const app = new cdk.App(); +new TestStack(app, 'test-kinesis-stream-action-stack'); app.synth(); diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts index 5982cbc4c42eb..6ad4cb3d1c3b2 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts @@ -14,9 +14,7 @@ test('Default kinesis stream action', () => { const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); // WHEN - topicRule.addAction( - new actions.KinesisPutRecordAction(stream), - ); + topicRule.addAction(new actions.KinesisPutRecordAction(stream)); // THEN Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { @@ -76,9 +74,9 @@ test('can set partitionKey', () => { const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); // WHEN - topicRule.addAction( - new actions.KinesisPutRecordAction(stream, { partitionKey: '${timestamp()}' }), - ); + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '${timestamp()}', + })); // THEN Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { @@ -100,9 +98,7 @@ test('can set role', () => { const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); // WHEN - topicRule.addAction( - new actions.KinesisPutRecordAction(stream, { role }), - ); + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { role })); // THEN Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { From 1e6c4a8988bd0170c2cbc39a60723baa7ff27bff Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Thu, 13 Jan 2022 23:45:14 +0900 Subject: [PATCH 07/10] fix integ test snapshot --- .../integ.kinesis-put-record-action.expected.json | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json index ded46a0088df1..89229bf4731e3 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json @@ -71,8 +71,8 @@ "MyStream5C050E93": { "Type": "AWS::Kinesis::Stream", "Properties": { - "ShardCount": 3, "RetentionPeriodHours": 24, + "ShardCount": 3, "StreamEncryption": { "Fn::If": [ "AwsCdkKinesisEncryptedStreamsUnsupportedRegions", @@ -84,6 +84,9 @@ "KeyId": "alias/aws/kinesis" } ] + }, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" } } } From b2cd603150a7af6f84387d0827d24d23a6ca6279 Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Thu, 13 Jan 2022 23:45:57 +0900 Subject: [PATCH 08/10] add default value for partitionKey --- packages/@aws-cdk/aws-iot-actions/README.md | 2 +- .../@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts | 4 ++-- .../test/kinesis-stream/kinesis-put-record-action.test.ts | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/@aws-cdk/aws-iot-actions/README.md b/packages/@aws-cdk/aws-iot-actions/README.md index b3e502bc3ce3d..242999b0b0897 100644 --- a/packages/@aws-cdk/aws-iot-actions/README.md +++ b/packages/@aws-cdk/aws-iot-actions/README.md @@ -189,7 +189,7 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', { sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"), actions: [ new actions.KinesisPutRecordAction(stream, { - partitionKey: '${timestamp()}', // optional property + partitionKey: '${timestamp()}', // optional property, default is '${newuuid()}' }), ], }); diff --git a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts index 85ef778f59e35..5bcccfdd4d28b 100644 --- a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts +++ b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts @@ -13,7 +13,7 @@ export interface KinesisPutRecordActionProps extends CommonActionProps { * The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}). * * @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters - * @default - None + * @default '${newuuid()}' */ readonly partitionKey?: string; } @@ -45,7 +45,7 @@ export class KinesisPutRecordAction implements iot.IAction { configuration: { kinesis: { streamName: this.stream.streamName, - partitionKey: this.partitionKey, + partitionKey: this.partitionKey || '${newuuid()}', roleArn: role.roleArn, }, }, diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts index 6ad4cb3d1c3b2..765b13a63988e 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts @@ -23,6 +23,7 @@ test('Default kinesis stream action', () => { { Kinesis: { StreamName: 'my-stream', + PartitionKey: '${newuuid()}', RoleArn: { 'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'], }, From b74dc6ce6524c91d21a180de34401d1de75dbba2 Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Fri, 14 Jan 2022 17:46:53 +0900 Subject: [PATCH 09/10] fix readme --- packages/@aws-cdk/aws-iot-actions/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/@aws-cdk/aws-iot-actions/README.md b/packages/@aws-cdk/aws-iot-actions/README.md index 242999b0b0897..b6578adf23496 100644 --- a/packages/@aws-cdk/aws-iot-actions/README.md +++ b/packages/@aws-cdk/aws-iot-actions/README.md @@ -179,8 +179,6 @@ The code snippet below creates an AWS IoT Rule that put records to Kinesis Data stream when it is triggered. ```ts -import * as iot from '@aws-cdk/aws-iot'; -import * as actions from '@aws-cdk/aws-iot-actions'; import * as kinesis from '@aws-cdk/aws-kinesis'; const stream = new kinesis.Stream(this, 'MyStream'); From 9fe5bb15bb1127f2b5f3dbb698e2b6972b0ab4ab Mon Sep 17 00:00:00 2001 From: yamatatsu Date: Mon, 17 Jan 2022 20:29:48 +0900 Subject: [PATCH 10/10] address comments --- packages/@aws-cdk/aws-iot-actions/README.md | 2 +- .../lib/kinesis-put-record-action.ts | 12 ++++++++---- .../kinesis-put-record-action.test.ts | 15 ++++++++++----- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/packages/@aws-cdk/aws-iot-actions/README.md b/packages/@aws-cdk/aws-iot-actions/README.md index b6578adf23496..be475fe028210 100644 --- a/packages/@aws-cdk/aws-iot-actions/README.md +++ b/packages/@aws-cdk/aws-iot-actions/README.md @@ -187,7 +187,7 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', { sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"), actions: [ new actions.KinesisPutRecordAction(stream, { - partitionKey: '${timestamp()}', // optional property, default is '${newuuid()}' + partitionKey: '${newuuid()}', }), ], }); diff --git a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts index 5bcccfdd4d28b..6baa5976bccf4 100644 --- a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts +++ b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts @@ -12,10 +12,14 @@ export interface KinesisPutRecordActionProps extends CommonActionProps { * The partition key used to determine to which shard the data is written. * The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}). * + * @see https://docs.aws.amazon.com/iot/latest/developerguide/iot-substitution-templates.html + * + * You can use the expression '${newuuid()}' if your payload does not have a high cardinarity property. + * If you use empty string, this action use no partition key and all records will put same one shard. + * * @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters - * @default '${newuuid()}' */ - readonly partitionKey?: string; + readonly partitionKey: string; } /** @@ -29,7 +33,7 @@ export class KinesisPutRecordAction implements iot.IAction { * @param stream The Kinesis Data stream to which to put records. * @param props Optional properties to not use default */ - constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps = {}) { + constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps) { this.partitionKey = props.partitionKey; this.role = props.role; } @@ -45,7 +49,7 @@ export class KinesisPutRecordAction implements iot.IAction { configuration: { kinesis: { streamName: this.stream.streamName, - partitionKey: this.partitionKey || '${newuuid()}', + partitionKey: this.partitionKey || undefined, roleArn: role.roleArn, }, }, diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts index 765b13a63988e..05a0d603b4046 100644 --- a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts @@ -14,7 +14,9 @@ test('Default kinesis stream action', () => { const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); // WHEN - topicRule.addAction(new actions.KinesisPutRecordAction(stream)); + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '${newuuid()}', + })); // THEN Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { @@ -66,7 +68,7 @@ test('Default kinesis stream action', () => { }); }); -test('can set partitionKey', () => { +test('passes undefined to partitionKey if empty string is given', () => { // GIVEN const stack = new cdk.Stack(); const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { @@ -76,14 +78,14 @@ test('can set partitionKey', () => { // WHEN topicRule.addAction(new actions.KinesisPutRecordAction(stream, { - partitionKey: '${timestamp()}', + partitionKey: '', })); // THEN Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { TopicRulePayload: { Actions: [ - Match.objectLike({ Kinesis: { PartitionKey: '${timestamp()}' } }), + Match.objectLike({ Kinesis: { PartitionKey: Match.absent() } }), ], }, }); @@ -99,7 +101,10 @@ test('can set role', () => { const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); // WHEN - topicRule.addAction(new actions.KinesisPutRecordAction(stream, { role })); + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '${newuuid()}', + role, + })); // THEN Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {