Skip to content

Commit

Permalink
feat(parser): add schema for DynamoDB - Kinesis Stream event (#3328)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrea Amorosi <[email protected]>
  • Loading branch information
am29d and dreamorosi authored Nov 22, 2024
1 parent 6156587 commit a8dfa74
Show file tree
Hide file tree
Showing 16 changed files with 261 additions and 88 deletions.
1 change: 1 addition & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Parser comes with the following built-in schemas:
| **KafkaSelfManagedEventSchema** | Lambda Event Source payload for self managed Kafka payload |
| **KinesisDataStreamSchema** | Lambda Event Source payload for Amazon Kinesis Data Streams |
| **KinesisFirehoseSchema** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **KinesisDynamoDBStreamSchema** | Lambda Event Source payload for DynamodbStream record wrapped in Kinesis Data stream |
| **KinesisFirehoseSqsSchema** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
| **LambdaFunctionUrlSchema** | Lambda Event Source payload for Lambda Function URL payload |
| **S3EventNotificationEventBridgeSchema** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
Expand Down
14 changes: 14 additions & 0 deletions packages/parser/src/schemas/dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ const DynamoDBStreamRecord = z.object({
userIdentity: UserIdentity.optional(),
});

const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
recordFormat: z.literal('application/json'),
tableName: z.string(),
userIdentity: UserIdentity.nullish(),
dynamodb: DynamoDBStreamChangeRecord.omit({
SequenceNumber: true,
StreamViewType: true,
}),
}).omit({
eventVersion: true,
eventSourceARN: true,
});

/**
* Zod schema for Amazon DynamoDB Stream event.
*
Expand Down Expand Up @@ -111,6 +124,7 @@ const DynamoDBStreamSchema = z.object({
});

export {
DynamoDBStreamToKinesisRecord,
DynamoDBStreamSchema,
DynamoDBStreamRecord,
DynamoDBStreamChangeRecord,
Expand Down
11 changes: 9 additions & 2 deletions packages/parser/src/schemas/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@ export {
CloudWatchLogsDecodeSchema,
CloudWatchLogsSchema,
} from './cloudwatch.js';
export { DynamoDBStreamSchema } from './dynamodb.js';
export {
DynamoDBStreamSchema,
DynamoDBStreamToKinesisRecord,
} from './dynamodb.js';
export { EventBridgeSchema } from './eventbridge.js';
export {
KafkaMskEventSchema,
KafkaSelfManagedEventSchema,
KafkaRecordSchema,
} from './kafka.js';
export { KinesisDataStreamSchema, KinesisDataStreamRecord } from './kinesis.js';
export {
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
KinesisDataStreamRecord,
} from './kinesis.js';
export {
KinesisFirehoseSchema,
KinesisFirehoseSqsSchema,
Expand Down
23 changes: 20 additions & 3 deletions packages/parser/src/schemas/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { gunzipSync } from 'node:zlib';
import { z } from 'zod';
import { DynamoDBStreamToKinesisRecord } from './dynamodb.js';

const KinesisDataStreamRecordPayload = z.object({
kinesisSchemaVersion: z.string(),
partitionKey: z.string(),
sequenceNumber: z.string(),
approximateArrivalTimestamp: z.number(),
data: z.string().transform((data) => {
const decompresed = decompress(data);
const decompressed = decompress(data);
const decoded = Buffer.from(data, 'base64').toString('utf-8');
try {
// If data was not compressed, try to parse it as JSON otherwise it must be string
return decompresed === data ? JSON.parse(decoded) : decompresed;
return decompressed === data ? JSON.parse(decoded) : decompressed;
} catch (e) {
return decoded;
}
Expand All @@ -37,6 +38,21 @@ const KinesisDataStreamRecord = z.object({
kinesis: KinesisDataStreamRecordPayload,
});

const KinesisDynamoDBStreamSchema = z.object({
Records: z.array(
KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: z
.string()
.transform((data) => {
return JSON.parse(Buffer.from(data, 'base64').toString('utf8'));
})
.pipe(DynamoDBStreamToKinesisRecord),
}),
})
),
});

/**
* Zod schema for Kinesis Data Stream event
*
Expand Down Expand Up @@ -88,7 +104,8 @@ const KinesisDataStreamSchema = z.object({
});

export {
KinesisDataStreamSchema,
KinesisDataStreamRecord,
KinesisDataStreamRecordPayload,
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
};
2 changes: 2 additions & 0 deletions packages/parser/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type {
SnsEvent,
SqsEvent,
DynamoDBStreamEvent,
DynamoDBStreamToKinesisRecordEvent,
CloudWatchLogsEvent,
CloudFormationCustomResourceCreateEvent,
CloudFormationCustomResourceDeleteEvent,
Expand All @@ -27,6 +28,7 @@ export type {
KafkaSelfManagedEvent,
KafkaMskEvent,
KinesisDataStreamEvent,
KinesisDynamoDBStreamEvent,
KinesisFireHoseEvent,
KinesisFireHoseSqsEvent,
LambdaFunctionUrlEvent,
Expand Down
10 changes: 10 additions & 0 deletions packages/parser/src/types/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import type {
CloudFormationCustomResourceUpdateSchema,
CloudWatchLogsSchema,
DynamoDBStreamSchema,
DynamoDBStreamToKinesisRecord,
EventBridgeSchema,
KafkaMskEventSchema,
KafkaRecordSchema,
KafkaSelfManagedEventSchema,
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
KinesisFirehoseRecordSchema,
KinesisFirehoseSchema,
KinesisFirehoseSqsRecordSchema,
Expand Down Expand Up @@ -69,6 +71,10 @@ type CloudWatchLogsEvent = z.infer<typeof CloudWatchLogsSchema>;

type DynamoDBStreamEvent = z.infer<typeof DynamoDBStreamSchema>;

type DynamoDBStreamToKinesisRecordEvent = z.infer<
typeof DynamoDBStreamToKinesisRecord
>;

type EventBridgeEvent = z.infer<typeof EventBridgeSchema>;

type KafkaSelfManagedEvent = z.infer<typeof KafkaSelfManagedEventSchema>;
Expand All @@ -79,6 +85,8 @@ type KafkaMskEvent = z.infer<typeof KafkaMskEventSchema>;

type KinesisDataStreamEvent = z.infer<typeof KinesisDataStreamSchema>;

type KinesisDynamoDBStreamEvent = z.infer<typeof KinesisDynamoDBStreamSchema>;

type KinesisFireHoseEvent = z.infer<typeof KinesisFirehoseSchema>;

type KinesisFirehoseRecord = z.infer<typeof KinesisFirehoseRecordSchema>;
Expand Down Expand Up @@ -131,11 +139,13 @@ export type {
CloudFormationCustomResourceUpdateEvent,
CloudWatchLogsEvent,
DynamoDBStreamEvent,
DynamoDBStreamToKinesisRecordEvent,
EventBridgeEvent,
KafkaSelfManagedEvent,
KafkaMskEvent,
KafkaRecord,
KinesisDataStreamEvent,
KinesisDynamoDBStreamEvent,
KinesisFireHoseEvent,
KinesisFirehoseRecord,
KinesisFireHoseSqsEvent,
Expand Down
36 changes: 36 additions & 0 deletions packages/parser/tests/events/kinesis/dynamodb-stream.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "859F7C064A4818874FA67ABEC9BF2AF1",
"sequenceNumber": "49657828409187701520019995242508390119953358325192589314",
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiZDk0MjgwMjktMGY2My00MDU2LTg2ZGEtY2UxMGQ1NDViMWI5IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9LCJkYXRhIjp7IlMiOiJkYXRhLXg2YXE3Y2tkcGdrIn19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
"approximateArrivalTimestamp": 1731924555.932
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49657828409187701520019995242508390119953358325192589314",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "6037E47B707479B67E577C989D96E9F8",
"sequenceNumber": "49657828409187701520019995242509599045772972954367295490",
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiYWE1NmNhZDQtMzExYS00NmM4LWFiNWYtYzdhMTNhN2E2Mjk4IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9LCJkYXRhIjp7IlMiOiJkYXRhLTRlb21wanM4OW41In19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
"approximateArrivalTimestamp": 1731924555.935
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49657828409187701520019995242509599045772972954367295490",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
}
]
}
61 changes: 31 additions & 30 deletions packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
import { generateMock } from '@anatine/zod-mock';
import { describe, expect, it } from 'vitest';
import { ZodError, type z } from 'zod';
import { ZodError } from 'zod';
import { ParseError } from '../../../src';
import { KinesisFirehoseEnvelope } from '../../../src/envelopes/index.js';
import type { KinesisFirehoseSchema } from '../../../src/schemas/';
import { TestEvents, TestSchema } from '../schema/utils.js';
import type {
KinesisFireHoseEvent,
KinesisFireHoseSqsEvent,
} from '../../../src/types';
import { TestSchema, getTestEvent } from '../schema/utils.js';

describe('Kinesis Firehose Envelope', () => {
const eventsPath = 'kinesis';
const kinesisFirehosePutEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

const kinesisFirehoseSQSEvent = getTestEvent<KinesisFireHoseSqsEvent>({
eventsPath,
filename: 'firehose-sqs',
});

const kinesisFirehoseEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose',
});

describe('parse', () => {
it('should parse records for PutEvent', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehosePutEvent);

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -24,9 +41,7 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse a single record for SQS event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehoseSQSEvent);

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -38,9 +53,7 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse records for kinesis event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehoseEvent);

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -50,9 +63,7 @@ describe('Kinesis Firehose Envelope', () => {
expect(resp).toEqual([mock, mock]);
});
it('should throw if record is not base64 encoded', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehosePutEvent);

testEvent.records.map((record) => {
record.data = 'not base64 encoded';
Expand All @@ -68,9 +79,7 @@ describe('Kinesis Firehose Envelope', () => {
}).toThrow();
});
it('should throw when schema does not match record', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehosePutEvent);

testEvent.records.map((record) => {
record.data = Buffer.from('not a valid json').toString('base64');
Expand All @@ -84,9 +93,7 @@ describe('Kinesis Firehose Envelope', () => {
describe('safeParse', () => {
it('should parse records for PutEvent', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehosePutEvent);

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -98,9 +105,7 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse a single record for SQS event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehoseSQSEvent);

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -112,9 +117,7 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse records for kinesis event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehoseEvent);

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -139,9 +142,7 @@ describe('Kinesis Firehose Envelope', () => {
}
});
it('should return original event if record is not base64 encoded', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = structuredClone(kinesisFirehosePutEvent);

testEvent.records.map((record) => {
record.data = 'not base64 encoded';
Expand Down
20 changes: 14 additions & 6 deletions packages/parser/tests/unit/envelopes/kinesis.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import { generateMock } from '@anatine/zod-mock';
import type { KinesisStreamEvent } from 'aws-lambda';
import { describe, expect, it } from 'vitest';
import { KinesisEnvelope } from '../../../src/envelopes/index.js';
import { ParseError } from '../../../src/errors.js';
import { TestEvents, TestSchema } from '../schema/utils.js';
import type { KinesisDataStreamEvent } from '../../../src/types/schema.js';
import { TestSchema, getTestEvent } from '../schema/utils.js';

describe('KinesisEnvelope', () => {
const eventsPath = 'kinesis';

const kinesisStreamEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});

describe('parse', () => {
it('should parse Kinesis Stream event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = structuredClone(kinesisStreamEvent);

testEvent.Records.map((record) => {
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
Expand All @@ -24,16 +31,17 @@ describe('KinesisEnvelope', () => {
expect(() => KinesisEnvelope.parse({ foo: 'bar' }, TestSchema)).toThrow();
});
it('should throw if record is invalid', () => {
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = structuredClone(kinesisStreamEvent);
testEvent.Records[0].kinesis.data = 'invalid';

expect(() => KinesisEnvelope.parse(testEvent, TestSchema)).toThrow();
});
});

describe('safeParse', () => {
it('should parse Kinesis Stream event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = structuredClone(kinesisStreamEvent);

testEvent.Records.map((record) => {
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
Expand All @@ -54,7 +62,7 @@ describe('KinesisEnvelope', () => {
});
});
it('should return original event if record is invalid', () => {
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = structuredClone(kinesisStreamEvent);
testEvent.Records[0].kinesis.data = 'invalid';
const parseResult = KinesisEnvelope.safeParse(testEvent, TestSchema);
expect(parseResult).toEqual({
Expand Down
Loading

0 comments on commit a8dfa74

Please sign in to comment.