From d18e03d06e4fa4970aa24c4c041793d58a7cde79 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Mon, 27 Jan 2025 09:33:09 +0100 Subject: [PATCH] fix(parser): allow Kinesis envelopes to handle non-JSON strings (#3531) --- .../parser/src/envelopes/kinesis-firehose.ts | 93 ++++-- packages/parser/src/envelopes/kinesis.ts | 97 ++++-- packages/parser/src/schemas/dynamodb.ts | 142 ++++++--- .../parser/src/schemas/kinesis-firehose.ts | 4 +- packages/parser/src/schemas/kinesis.ts | 12 +- .../events/kinesis/stream-one-record.json | 20 -- .../unit/envelopes/kinesis-firehose.test.ts | 296 +++++++++++------- .../tests/unit/envelopes/kinesis.test.ts | 219 ++++++++++--- .../parser/tests/unit/schema/kinesis.test.ts | 61 ++-- packages/parser/typedoc.json | 3 +- 10 files changed, 630 insertions(+), 317 deletions(-) delete mode 100644 packages/parser/tests/events/kinesis/stream-one-record.json diff --git a/packages/parser/src/envelopes/kinesis-firehose.ts b/packages/parser/src/envelopes/kinesis-firehose.ts index 6f03bcd331..859f5b15f3 100644 --- a/packages/parser/src/envelopes/kinesis-firehose.ts +++ b/packages/parser/src/envelopes/kinesis-firehose.ts @@ -1,8 +1,11 @@ -import type { ZodSchema, z } from 'zod'; +import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod'; import { ParseError } from '../errors.js'; -import { KinesisFirehoseSchema } from '../schemas/index.js'; +import { + type KinesisFirehoseRecordSchema, + KinesisFirehoseSchema, +} from '../schemas/index.js'; import type { ParsedResult } from '../types/index.js'; -import { Envelope, envelopeDiscriminator } from './envelope.js'; +import { envelopeDiscriminator } from './envelope.js'; /** * Kinesis Firehose Envelope to extract array of Records @@ -23,10 +26,33 @@ export const KinesisFirehoseEnvelope = { */ [envelopeDiscriminator]: 'array' as const, parse(data: unknown, schema: T): z.infer[] { - const parsedEnvelope = KinesisFirehoseSchema.parse(data); + let parsedEnvelope: z.infer; + try { + parsedEnvelope = KinesisFirehoseSchema.parse(data); + } catch (error) { + throw new ParseError('Failed to parse Kinesis Firehose envelope', { + cause: error as Error, + }); + } - return parsedEnvelope.records.map((record) => { - return Envelope.parse(record.data, schema); + return parsedEnvelope.records.map((record, recordIndex) => { + let parsedRecord: z.infer; + try { + parsedRecord = schema.parse(record.data); + } catch (error) { + throw new ParseError( + `Failed to parse Kinesis Firehose record at index ${recordIndex}`, + { + cause: new ZodError( + (error as ZodError).issues.map((issue) => ({ + ...issue, + path: ['records', recordIndex, 'data', ...issue.path], + })) + ), + } + ); + } + return parsedRecord; }); }, @@ -35,7 +61,6 @@ export const KinesisFirehoseEnvelope = { schema: T ): ParsedResult[]> { const parsedEnvelope = KinesisFirehoseSchema.safeParse(data); - if (!parsedEnvelope.success) { return { success: false, @@ -45,25 +70,49 @@ export const KinesisFirehoseEnvelope = { originalEvent: data, }; } - const parsedRecords: z.infer[] = []; - for (const record of parsedEnvelope.data.records) { - const parsedData = Envelope.safeParse(record.data, schema); - if (!parsedData.success) { - return { - success: false, - error: new ParseError('Failed to parse Kinesis Firehose record', { - cause: parsedData.error, - }), - originalEvent: data, - }; - } - parsedRecords.push(parsedData.data); + const result = parsedEnvelope.data.records.reduce<{ + success: boolean; + records: z.infer[]; + errors: { + [key: number | string]: { issues: ZodIssue[] }; + }; + }>( + (acc, record, index) => { + const parsedRecord = schema.safeParse(record.data); + + if (!parsedRecord.success) { + const issues = parsedRecord.error.issues.map((issue) => ({ + ...issue, + path: ['records', index, 'data', ...issue.path], + })); + acc.success = false; + acc.errors[index] = { issues }; + return acc; + } + + acc.records.push(parsedRecord.data); + return acc; + }, + { success: true, records: [], errors: {} } + ); + + if (result.success) { + return { success: true, data: result.records }; } + const errorMessage = + Object.keys(result.errors).length > 1 + ? `Failed to parse Kinesis Firehose records at indexes ${Object.keys(result.errors).join(', ')}` + : `Failed to parse Kinesis Firehose record at index ${Object.keys(result.errors)[0]}`; return { - success: true, - data: parsedRecords, + success: false, + error: new ParseError(errorMessage, { + cause: new ZodError( + Object.values(result.errors).flatMap((error) => error.issues) + ), + }), + originalEvent: data, }; }, }; diff --git a/packages/parser/src/envelopes/kinesis.ts b/packages/parser/src/envelopes/kinesis.ts index 248b753acf..9259a56f0b 100644 --- a/packages/parser/src/envelopes/kinesis.ts +++ b/packages/parser/src/envelopes/kinesis.ts @@ -1,8 +1,11 @@ -import type { ZodSchema, z } from 'zod'; +import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod'; import { ParseError } from '../errors.js'; -import { KinesisDataStreamSchema } from '../schemas/kinesis.js'; +import { + type KinesisDataStreamRecord, + KinesisDataStreamSchema, +} from '../schemas/kinesis.js'; import type { ParsedResult } from '../types/index.js'; -import { Envelope, envelopeDiscriminator } from './envelope.js'; +import { envelopeDiscriminator } from './envelope.js'; /** * Kinesis Data Stream Envelope to extract array of Records @@ -21,10 +24,39 @@ export const KinesisEnvelope = { */ [envelopeDiscriminator]: 'array' as const, parse(data: unknown, schema: T): z.infer[] { - const parsedEnvelope = KinesisDataStreamSchema.parse(data); + let parsedEnvelope: z.infer; + try { + parsedEnvelope = KinesisDataStreamSchema.parse(data); + } catch (error) { + throw new ParseError('Failed to parse Kinesis Data Stream envelope', { + cause: error as Error, + }); + } - return parsedEnvelope.Records.map((record) => { - return Envelope.parse(record.kinesis.data, schema); + return parsedEnvelope.Records.map((record, recordIndex) => { + let parsedRecord: z.infer; + try { + parsedRecord = schema.parse(record.kinesis.data); + } catch (error) { + throw new ParseError( + `Failed to parse Kinesis Data Stream record at index ${recordIndex}`, + { + cause: new ZodError( + (error as ZodError).issues.map((issue) => ({ + ...issue, + path: [ + 'Records', + recordIndex, + 'kinesis', + 'data', + ...issue.path, + ], + })) + ), + } + ); + } + return parsedRecord; }); }, @@ -43,25 +75,48 @@ export const KinesisEnvelope = { }; } - const parsedRecords: z.infer[] = []; + const result = parsedEnvelope.data.Records.reduce<{ + success: boolean; + records: z.infer[]; + errors: { index?: number; issues?: ZodIssue[] }; + }>( + (acc, record, index) => { + const parsedRecord = schema.safeParse(record.kinesis.data); - for (const record of parsedEnvelope.data.Records) { - const parsedRecord = Envelope.safeParse(record.kinesis.data, schema); - if (!parsedRecord.success) { - return { - success: false, - error: new ParseError('Failed to parse Kinesis Data Stream record', { - cause: parsedRecord.error, - }), - originalEvent: data, - }; - } - parsedRecords.push(parsedRecord.data); + if (!parsedRecord.success) { + const issues = parsedRecord.error.issues.map((issue) => ({ + ...issue, + path: ['Records', index, 'kinesis', 'data', ...issue.path], + })); + acc.success = false; + // @ts-expect-error - index is assigned + acc.errors[index] = { issues }; + return acc; + } + + acc.records.push(parsedRecord.data); + return acc; + }, + { success: true, records: [], errors: {} } + ); + + if (result.success) { + return { success: true, data: result.records }; } + const errorMessage = + Object.keys(result.errors).length > 1 + ? `Failed to parse Kinesis Data Stream records at indexes ${Object.keys(result.errors).join(', ')}` + : `Failed to parse Kinesis Data Stream record at index ${Object.keys(result.errors)[0]}`; return { - success: true, - data: parsedRecords, + success: false, + error: new ParseError(errorMessage, { + cause: new ZodError( + // @ts-expect-error - issues are assigned because success is false + Object.values(result.errors).flatMap((error) => error.issues) + ), + }), + originalEvent: data, }; }, }; diff --git a/packages/parser/src/schemas/dynamodb.ts b/packages/parser/src/schemas/dynamodb.ts index 6f4f51ca80..ee361b783b 100644 --- a/packages/parser/src/schemas/dynamodb.ts +++ b/packages/parser/src/schemas/dynamodb.ts @@ -1,5 +1,7 @@ import { unmarshallDynamoDB } from '@aws-lambda-powertools/commons/utils/unmarshallDynamoDB'; import { z } from 'zod'; +import type { KinesisEnvelope } from '../envelopes/kinesis.js'; +import type { DynamoDBMarshalled } from '../helpers/dynamodb.js'; const DynamoDBStreamChangeRecordBase = z.object({ ApproximateCreationDateTime: z.number().optional(), @@ -16,53 +18,61 @@ const DynamoDBStreamChangeRecordBase = z.object({ ]), }); -const DynamoDBStreamChangeRecord = DynamoDBStreamChangeRecordBase.transform( - (object, ctx) => { - const result = { ...object }; - - const unmarshallAttributeValue = ( - imageName: 'NewImage' | 'OldImage' | 'Keys', - image: Record - ) => { - try { - // @ts-expect-error - return unmarshallDynamoDB(image) as Record; - } catch (err) { - ctx.addIssue({ - code: 'custom', - message: `Could not unmarshall ${imageName} in DynamoDB stream record`, - fatal: true, - path: [imageName], - }); - return z.NEVER; - } - }; +const DynamoDBStreamToKinesisChangeRecord = DynamoDBStreamChangeRecordBase.omit( + { + SequenceNumber: true, + StreamViewType: true, + } +); - const unmarshalledKeys = unmarshallAttributeValue('Keys', object.Keys); - if (unmarshalledKeys === z.NEVER) return z.NEVER; - // @ts-expect-error - We are intentionally mutating the object - result.Keys = unmarshalledKeys; +const unmarshallDynamoDBTransform = ( + object: + | z.infer + | z.infer, + ctx: z.RefinementCtx +) => { + const result = { ...object }; - if (object.NewImage) { - const unmarshalled = unmarshallAttributeValue( - 'NewImage', - object.NewImage - ); - if (unmarshalled === z.NEVER) return z.NEVER; - result.NewImage = unmarshalled; + const unmarshallAttributeValue = ( + imageName: 'NewImage' | 'OldImage' | 'Keys', + image: Record + ) => { + try { + // @ts-expect-error + return unmarshallDynamoDB(image) as Record; + } catch (err) { + ctx.addIssue({ + code: 'custom', + message: `Could not unmarshall ${imageName} in DynamoDB stream record`, + fatal: true, + path: [imageName], + }); + return z.NEVER; } + }; - if (object.OldImage) { - const unmarshalled = unmarshallAttributeValue( - 'OldImage', - object.OldImage - ); - if (unmarshalled === z.NEVER) return z.NEVER; - result.OldImage = unmarshalled; - } + const unmarshalledKeys = unmarshallAttributeValue('Keys', object.Keys); + if (unmarshalledKeys === z.NEVER) return z.NEVER; + // @ts-expect-error - We are intentionally mutating the object + result.Keys = unmarshalledKeys; - return result; + if (object.NewImage) { + const unmarshalled = unmarshallAttributeValue('NewImage', object.NewImage); + if (unmarshalled === z.NEVER) return z.NEVER; + result.NewImage = unmarshalled; } + + if (object.OldImage) { + const unmarshalled = unmarshallAttributeValue('OldImage', object.OldImage); + if (unmarshalled === z.NEVER) return z.NEVER; + result.OldImage = unmarshalled; + } + + return result; +}; + +const DynamoDBStreamChangeRecord = DynamoDBStreamChangeRecordBase.transform( + unmarshallDynamoDBTransform ); const UserIdentity = z.object({ @@ -81,14 +91,55 @@ const DynamoDBStreamRecord = z.object({ userIdentity: UserIdentity.optional(), }); +/** + * Zod schema for Amazon DynamoDB Stream event sent to an Amazon Kinesis Stream. + * + * This schema is best used in conjunction with the {@link KinesisEnvelope | `KinesisEnvelope`} when + * you want to work with the DynamoDB stream event coming from an Amazon Kinesis Stream. + * + * By default, we unmarshall the `dynamodb.Keys`, `dynamodb.NewImage`, and `dynamodb.OldImage` fields + * for you. + * + * If you want to extend the schema and provide your own Zod schema for any of these fields, + * you can use the {@link DynamoDBMarshalled | `DynamoDBMarshalled`} helper. In that case, we won't unmarshall the other fields. + * + * To extend the schema, you can use the {@link DynamoDBStreamToKinesisRecord | `DynamoDBStreamToKinesisRecord`} child schema and the {@link DynamoDBMarshalled | `DynamoDBMarshalled`} + * helper together. + * + * @example + * ```ts + * import { + * DynamoDBStreamToKinesisRecord, + * DynamoDBStreamToKinesisChangeRecord, + * } from '@aws-lambda-powertools/parser/schemas/dynamodb'; + * import { KinesisEnvelope } from '@aws-lambda-powertools/parser/envelopes/dynamodb'; + * import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb'; + * + * const CustomSchema = DynamoDBStreamToKinesisRecord.extend({ + * dynamodb: DynamoDBStreamToKinesisChangeRecord.extend({ + * NewImage: DynamoDBMarshalled( + * z.object({ + * id: z.string(), + * attribute: z.number(), + * stuff: z.array(z.string()), + * }) + * ), + * // Add the lines below only if you want these keys to be unmarshalled + * Keys: DynamoDBMarshalled(z.unknown()), + * OldImage: DynamoDBMarshalled(z.unknown()), + * }), + * }); + * + * type CustomEvent = z.infer; + * ``` + */ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({ recordFormat: z.literal('application/json'), tableName: z.string(), userIdentity: UserIdentity.nullish(), - dynamodb: DynamoDBStreamChangeRecordBase.omit({ - SequenceNumber: true, - StreamViewType: true, - }), + dynamodb: DynamoDBStreamToKinesisChangeRecord.transform( + unmarshallDynamoDBTransform + ), }).omit({ eventVersion: true, eventSourceARN: true, @@ -175,6 +226,7 @@ const DynamoDBStreamSchema = z.object({ export { DynamoDBStreamToKinesisRecord, + DynamoDBStreamToKinesisChangeRecord, DynamoDBStreamSchema, DynamoDBStreamRecord, DynamoDBStreamChangeRecord, diff --git a/packages/parser/src/schemas/kinesis-firehose.ts b/packages/parser/src/schemas/kinesis-firehose.ts index 529eab2cf5..cb0bbab57b 100644 --- a/packages/parser/src/schemas/kinesis-firehose.ts +++ b/packages/parser/src/schemas/kinesis-firehose.ts @@ -95,7 +95,7 @@ const KinesisFirehoseSqsRecordSchema = KinesisFireHoseRecordBase.extend({ * @see {@link https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html} */ const KinesisFirehoseSchema = KinesisFireHoseBaseSchema.extend({ - records: z.array(KinesisFirehoseRecordSchema), + records: z.array(KinesisFirehoseRecordSchema).min(1), }); /** @@ -120,7 +120,7 @@ const KinesisFirehoseSchema = KinesisFireHoseBaseSchema.extend({ * @see {@link types.KinesisFireHoseSqsEvent | KinesisFireHoseSqsEvent} */ const KinesisFirehoseSqsSchema = KinesisFireHoseBaseSchema.extend({ - records: z.array(KinesisFirehoseSqsRecordSchema), + records: z.array(KinesisFirehoseSqsRecordSchema).min(1), }); export { diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 083f39ddce..8e21724305 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -1,7 +1,10 @@ import { gunzipSync } from 'node:zlib'; +import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64'; import { z } from 'zod'; import { DynamoDBStreamToKinesisRecord } from './dynamodb.js'; +const decoder = new TextDecoder(); + const KinesisDataStreamRecordPayload = z.object({ kinesisSchemaVersion: z.string(), partitionKey: z.string(), @@ -9,7 +12,7 @@ const KinesisDataStreamRecordPayload = z.object({ approximateArrivalTimestamp: z.number(), data: z.string().transform((data) => { const decompressed = decompress(data); - const decoded = Buffer.from(data, 'base64').toString('utf-8'); + const decoded = decoder.decode(fromBase64(data, 'base64')); try { // If data was not compressed, try to parse it as JSON otherwise it must be string return decompressed === data ? JSON.parse(decoded) : decompressed; @@ -21,7 +24,7 @@ const KinesisDataStreamRecordPayload = z.object({ const decompress = (data: string): string => { try { - return JSON.parse(gunzipSync(Buffer.from(data, 'base64')).toString('utf8')); + return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); } catch (e) { return data; } @@ -45,7 +48,8 @@ const KinesisDynamoDBStreamSchema = z.object({ data: z .string() .transform((data) => { - return JSON.parse(Buffer.from(data, 'base64').toString('utf8')); + const decoded = decoder.decode(fromBase64(data, 'base64')); + return JSON.parse(decoded); }) .pipe(DynamoDBStreamToKinesisRecord), }), @@ -100,7 +104,7 @@ const KinesisDynamoDBStreamSchema = z.object({ * */ const KinesisDataStreamSchema = z.object({ - Records: z.array(KinesisDataStreamRecord), + Records: z.array(KinesisDataStreamRecord).min(1), }); export { diff --git a/packages/parser/tests/events/kinesis/stream-one-record.json b/packages/parser/tests/events/kinesis/stream-one-record.json deleted file mode 100644 index 1c65b949dd..0000000000 --- a/packages/parser/tests/events/kinesis/stream-one-record.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "Records": [ - { - "kinesis": { - "kinesisSchemaVersion": "1.0", - "partitionKey": "1", - "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", - "data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=", - "approximateArrivalTimestamp": 1545084650.987 - }, - "eventSource": "aws:kinesis", - "eventVersion": "1.0", - "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", - "eventName": "aws:kinesis:record", - "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", - "awsRegion": "us-east-2", - "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" - } - ] -} diff --git a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts index 32d7bb610f..b271721381 100644 --- a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts @@ -1,157 +1,231 @@ -import { generateMock } from '@anatine/zod-mock'; import { describe, expect, it } from 'vitest'; -import { ZodError } from 'zod'; -import { ParseError } from '../../../src'; +import { ZodError, z } from 'zod'; import { KinesisFirehoseEnvelope } from '../../../src/envelopes/index.js'; +import { ParseError } from '../../../src/errors.js'; +import { JSONStringified } from '../../../src/helpers.js'; import type { KinesisFireHoseEvent, KinesisFireHoseSqsEvent, -} from '../../../src/types'; -import { TestSchema, getTestEvent } from '../schema/utils.js'; +} from '../../../src/types/schema.js'; +import { getTestEvent, omit } from '../schema/utils.js'; -describe('Kinesis Firehose Envelope', () => { +const encode = (data: unknown) => Buffer.from(String(data)).toString('base64'); + +describe('Envelope: Kinesis Firehose', () => { const eventsPath = 'kinesis'; const kinesisFirehosePutEvent = getTestEvent({ eventsPath, filename: 'firehose-put', }); - const kinesisFirehoseSQSEvent = getTestEvent({ eventsPath, filename: 'firehose-sqs', }); - const kinesisFirehoseEvent = getTestEvent({ - eventsPath, - filename: 'firehose', - }); - - describe('parse', () => { - it('should parse records for PutEvent', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisFirehosePutEvent); - - testEvent.records.map((record) => { - record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); - }); - - const resp = KinesisFirehoseEnvelope.parse(testEvent, TestSchema); - expect(resp).toEqual([mock, mock]); + describe('Method: parse', () => { + it('throws if one of the payloads does not match the schema', () => { + // Prepare + const event = structuredClone(kinesisFirehosePutEvent); + + // Act & Assess + expect(() => KinesisFirehoseEnvelope.parse(event, z.number())).toThrow( + expect.objectContaining({ + message: expect.stringContaining( + 'Failed to parse Kinesis Firehose record at index 0' + ), + cause: expect.objectContaining({ + issues: [ + { + code: 'invalid_type', + expected: 'number', + received: 'string', + path: ['records', 0, 'data'], + message: 'Expected number, received string', + }, + ], + }), + }) + ); }); - it('should parse a single record for SQS event', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisFirehoseSQSEvent); + it('parses a Kinesis Firehose event', () => { + // Prepare + const event = structuredClone(kinesisFirehosePutEvent); + event.records[1].data = encode('foo'); - testEvent.records.map((record) => { - record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); - }); + // Act + const result = KinesisFirehoseEnvelope.parse(event, z.string()); - const resp = KinesisFirehoseEnvelope.parse(testEvent, TestSchema); - expect(resp).toEqual([mock]); + // Assess + expect(result).toEqual(['Hello World', 'foo']); }); - it('should parse records for kinesis event', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisFirehoseEvent); - - testEvent.records.map((record) => { - record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); - }); - - const resp = KinesisFirehoseEnvelope.parse(testEvent, TestSchema); - expect(resp).toEqual([mock, mock]); - }); - it('should throw if record is not base64 encoded', () => { - const testEvent = structuredClone(kinesisFirehosePutEvent); + it('parses a Kinesis Firehose event and applies the schema transformation', () => { + // Prepare + const event = structuredClone(kinesisFirehosePutEvent); + event.records[0].data = encode(JSON.stringify({ Hello: 'foo' })); - testEvent.records.map((record) => { - record.data = 'not base64 encoded'; - }); + // Act + const result = KinesisFirehoseEnvelope.parse( + event, + JSONStringified(z.object({ Hello: z.string() })) + ); - expect(() => { - KinesisFirehoseEnvelope.parse(testEvent, TestSchema); - }).toThrow(); + // Assess + expect(result).toStrictEqual([{ Hello: 'foo' }, { Hello: 'World' }]); }); - it('should throw if envelope is invalid', () => { - expect(() => { - KinesisFirehoseEnvelope.parse({ foo: 'bar' }, TestSchema); - }).toThrow(); - }); - it('should throw when schema does not match record', () => { - const testEvent = structuredClone(kinesisFirehosePutEvent); - - testEvent.records.map((record) => { - record.data = Buffer.from('not a valid json').toString('base64'); - }); - expect(() => { - KinesisFirehoseEnvelope.parse(testEvent, TestSchema); - }).toThrow(); + it('throws if the event is not a Kinesis Data Stream event', () => { + // Prepare + const event = structuredClone(kinesisFirehosePutEvent); + event.records = []; + + // Act & Assess + expect(() => KinesisFirehoseEnvelope.parse(event, z.string())).toThrow( + new ParseError('Failed to parse Kinesis Firehose envelope', { + cause: new ZodError([ + { + code: 'too_small', + minimum: 1, + type: 'array', + inclusive: true, + exact: false, + message: 'Array must contain at least 1 element(s)', + path: ['records'], + }, + ]), + }) + ); }); }); - describe('safeParse', () => { - it('should parse records for PutEvent', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisFirehosePutEvent); - testEvent.records.map((record) => { - record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); - }); - - const resp = KinesisFirehoseEnvelope.safeParse(testEvent, TestSchema); - expect(resp).toEqual({ success: true, data: [mock, mock] }); - }); - - it('should parse a single record for SQS event', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisFirehoseSQSEvent); + describe('Method: safeParse', () => { + it('parses a Kinesis Firehose event with SQS data', () => { + // Prepare + const event = structuredClone(kinesisFirehoseSQSEvent); + + // Act + const result = KinesisFirehoseEnvelope.safeParse( + event, + JSONStringified( + z.object({ + body: z.string(), + }) + ) + ); - testEvent.records.map((record) => { - record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); + // Assess + expect(result).toEqual({ + success: true, + data: [ + { + body: 'Test message.', + }, + ], }); - - const resp = KinesisFirehoseEnvelope.safeParse(testEvent, TestSchema); - expect(resp).toEqual({ success: true, data: [mock] }); }); - it('should parse records for kinesis event', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisFirehoseEvent); + it('returns an error if the event is not a Kinesis Data Stream event', () => { + // Prepare + const event = omit( + ['invocationId'], + structuredClone(kinesisFirehosePutEvent) + ); + + // Act + const result = KinesisFirehoseEnvelope.safeParse(event, z.string()); - testEvent.records.map((record) => { - record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); + // Assess + expect(result).toEqual({ + success: false, + error: new ParseError('Failed to parse Kinesis Firehose envelope', { + cause: new ZodError([ + { + code: 'invalid_type', + expected: 'string', + received: 'undefined', + path: ['invocationId'], + message: 'Required', + }, + ]), + }), + originalEvent: event, }); - - const resp = KinesisFirehoseEnvelope.safeParse(testEvent, TestSchema); - expect(resp).toEqual({ success: true, data: [mock, mock] }); }); - it('should return original event if envelope is invalid', () => { - const parseResult = KinesisFirehoseEnvelope.safeParse( - { foo: 'bar' }, - TestSchema + + it('returns an error if one of the payloads does not match the schema', () => { + // Prepare + const event = structuredClone(kinesisFirehosePutEvent); + event.records[0].data = encode(JSON.stringify({ foo: 'bar' })); + + // Act + const result = KinesisFirehoseEnvelope.safeParse( + event, + JSONStringified( + z.object({ + foo: z.string(), + }) + ) ); - expect(parseResult).toEqual({ + + // Assess + expect(result).toEqual({ success: false, - error: expect.any(ParseError), - originalEvent: { foo: 'bar' }, + error: new ParseError( + 'Failed to parse Kinesis Firehose record at index 1', + { + cause: new ZodError([ + { + code: 'invalid_type', + expected: 'string', + received: 'undefined', + path: ['records', 1, 'data', 'foo'], + message: 'Required', + }, + ]), + } + ), + originalEvent: event, }); - - if (!parseResult.success && parseResult.error) { - expect(parseResult.error.cause).toBeInstanceOf(ZodError); - } }); - it('should return original event if record is not base64 encoded', () => { - const testEvent = structuredClone(kinesisFirehosePutEvent); - testEvent.records.map((record) => { - record.data = 'not base64 encoded'; - }); + it('returns a combined error if multiple records fail to parse', () => { + // Prepare + const event = structuredClone(kinesisFirehosePutEvent); + + // Act + const result = KinesisFirehoseEnvelope.safeParse( + event, + z.object({ + foo: z.string(), + }) + ); - expect(KinesisFirehoseEnvelope.safeParse(testEvent, TestSchema)).toEqual({ + // Assess + expect(result).toEqual({ success: false, - error: expect.any(ParseError), - originalEvent: testEvent, + error: new ParseError( + 'Failed to parse Kinesis Firehose records at indexes 0, 1', + { + cause: new ZodError([ + { + code: 'invalid_type', + expected: 'object', + received: 'string', + path: ['records', 0, 'data'], + message: 'Expected object, received string', + }, + { + code: 'invalid_type', + expected: 'object', + received: 'string', + path: ['records', 1, 'data'], + message: 'Expected object, received string', + }, + ]), + } + ), + originalEvent: event, }); }); }); diff --git a/packages/parser/tests/unit/envelopes/kinesis.test.ts b/packages/parser/tests/unit/envelopes/kinesis.test.ts index fc04c4e9d4..df9d51dd0c 100644 --- a/packages/parser/tests/unit/envelopes/kinesis.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis.test.ts @@ -1,79 +1,200 @@ -import { generateMock } from '@anatine/zod-mock'; import { describe, expect, it } from 'vitest'; +import { ZodError, z } from 'zod'; import { KinesisEnvelope } from '../../../src/envelopes/index.js'; import { ParseError } from '../../../src/errors.js'; import type { KinesisDataStreamEvent } from '../../../src/types/schema.js'; -import { TestSchema, getTestEvent } from '../schema/utils.js'; +import { getTestEvent } from '../schema/utils.js'; -describe('KinesisEnvelope', () => { - const eventsPath = 'kinesis'; +const encode = (data: unknown) => Buffer.from(String(data)).toString('base64'); +describe('Envelope: Kinesis', () => { + const eventsPath = 'kinesis'; const kinesisStreamEvent = getTestEvent({ eventsPath, filename: 'stream', }); - describe('parse', () => { - it('should parse Kinesis Stream event', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisStreamEvent); - - testEvent.Records.map((record) => { - record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString( - 'base64' - ); - }); + describe('Method: parse', () => { + it('throws if one of the payloads does not match the schema', () => { + // Prepare + const event = structuredClone(kinesisStreamEvent); - const resp = KinesisEnvelope.parse(testEvent, TestSchema); - expect(resp).toEqual([mock, mock]); + // Act & Assess + expect(() => KinesisEnvelope.parse(event, z.number())).toThrow( + expect.objectContaining({ + message: expect.stringContaining( + 'Failed to parse Kinesis Data Stream record at index 0' + ), + cause: expect.objectContaining({ + issues: [ + { + code: 'invalid_type', + expected: 'number', + received: 'string', + path: ['Records', 0, 'kinesis', 'data'], + message: 'Expected number, received string', + }, + ], + }), + }) + ); }); - it('should throw if envelope is invalid', () => { - expect(() => KinesisEnvelope.parse({ foo: 'bar' }, TestSchema)).toThrow(); + + it('parses a Kinesis Data Stream event', () => { + // Prepare + const event = structuredClone(kinesisStreamEvent); + + // Act + const result = KinesisEnvelope.parse(event, z.string()); + + // Assess + expect(result).toEqual([ + 'Hello, this is a test.', + 'This is only a test.', + ]); }); - it('should throw if record is invalid', () => { - const testEvent = structuredClone(kinesisStreamEvent); - testEvent.Records[0].kinesis.data = 'invalid'; - expect(() => KinesisEnvelope.parse(testEvent, TestSchema)).toThrow(); + it('throws if the event is not a Kinesis Data Stream event', () => { + // Prepare + const event = { + Records: [], + }; + + // Act & Assess + expect(() => KinesisEnvelope.parse(event, z.string())).toThrow( + new ParseError('Failed to parse Kinesis Data Stream envelope', { + cause: new ZodError([ + { + code: 'too_small', + minimum: 1, + type: 'array', + inclusive: true, + exact: false, + message: 'Array must contain at least 1 element(s)', + path: ['Records'], + }, + ]), + }) + ); }); }); - describe('safeParse', () => { - it('should parse Kinesis Stream event', () => { - const mock = generateMock(TestSchema); - const testEvent = structuredClone(kinesisStreamEvent); + describe('Method: safeParse', () => { + it('parses a Kinesis Data Stream event', () => { + // Prepare + const event = structuredClone(kinesisStreamEvent); - testEvent.Records.map((record) => { - record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString( - 'base64' - ); - }); + // Act + const result = KinesisEnvelope.safeParse(event, z.string()); - const resp = KinesisEnvelope.safeParse(testEvent, TestSchema); - expect(resp).toEqual({ success: true, data: [mock, mock] }); + // Assess + expect(result).toEqual({ + success: true, + data: ['Hello, this is a test.', 'This is only a test.'], + }); }); - it('should return original event if envelope is invalid', () => { - const testEvent = { foo: 'bar' }; - const resp = KinesisEnvelope.safeParse(testEvent, TestSchema); - expect(resp).toEqual({ + + it('returns an error if the event is not a Kinesis Data Stream event', () => { + // Prepare + const event = { + Records: [], + }; + + // Act + const result = KinesisEnvelope.safeParse(event, z.string()); + + // Assess + expect(result).toEqual({ success: false, - error: expect.any(ParseError), - originalEvent: testEvent, + error: new ParseError('Failed to parse Kinesis Data Stream envelope', { + cause: new ZodError([ + { + code: 'too_small', + minimum: 1, + type: 'array', + inclusive: true, + exact: false, + message: 'Array must contain at least 1 element(s)', + path: ['Records'], + }, + ]), + }), + originalEvent: event, }); }); - it('should return original event if record is invalid', () => { - const testEvent = structuredClone(kinesisStreamEvent); - testEvent.Records[0].kinesis.data = 'invalid'; - const parseResult = KinesisEnvelope.safeParse(testEvent, TestSchema); - expect(parseResult).toEqual({ + + it('returns an error if one of the payloads does not match the schema', () => { + // Prepare + const event = structuredClone(kinesisStreamEvent); + event.Records[0].kinesis.data = encode(JSON.stringify({ foo: 'bar' })); + + // Act + const result = KinesisEnvelope.safeParse( + event, + z.object({ + foo: z.string(), + }) + ); + + // Assess + expect(result).toEqual({ success: false, - error: expect.any(ParseError), - originalEvent: testEvent, + error: new ParseError( + 'Failed to parse Kinesis Data Stream record at index 1', + { + cause: new ZodError([ + { + code: 'invalid_type', + expected: 'object', + received: 'string', + path: ['Records', 1, 'kinesis', 'data'], + message: 'Expected object, received string', + }, + ]), + } + ), + originalEvent: event, }); + }); + + it('returns a combined error if multiple records fail to parse', () => { + // Prepare + const event = structuredClone(kinesisStreamEvent); + + // Act + const result = KinesisEnvelope.safeParse( + event, + z.object({ + foo: z.string(), + }) + ); - if (!parseResult.success && parseResult.error) { - expect(parseResult.error.cause).toBeInstanceOf(SyntaxError); - } + // Assess + expect(result).toEqual({ + success: false, + error: new ParseError( + 'Failed to parse Kinesis Data Stream records at indexes 0, 1', + { + cause: new ZodError([ + { + code: 'invalid_type', + expected: 'object', + received: 'string', + path: ['Records', 0, 'kinesis', 'data'], + message: 'Expected object, received string', + }, + { + code: 'invalid_type', + expected: 'object', + received: 'string', + path: ['Records', 1, 'kinesis', 'data'], + message: 'Expected object, received string', + }, + ]), + } + ), + originalEvent: event, + }); }); }); }); diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index 5948e1fc45..4751348a7f 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -1,24 +1,24 @@ import { gunzipSync } from 'node:zlib'; import { describe, expect, it } from 'vitest'; import { - KinesisDataStreamRecord, - KinesisDataStreamSchema, KinesisFirehoseRecordSchema, KinesisFirehoseSchema, KinesisFirehoseSqsRecordSchema, KinesisFirehoseSqsSchema, -} from '../../../src/schemas/'; -import { KinesisDynamoDBStreamSchema } from '../../../src/schemas/kinesis'; +} from '../../../src/schemas/kinesis-firehose.js'; +import { + KinesisDataStreamRecord, + KinesisDataStreamSchema, + KinesisDynamoDBStreamSchema, +} from '../../../src/schemas/kinesis.js'; import type { KinesisDataStreamEvent, + KinesisDynamoDBStreamEvent, KinesisFireHoseEvent, KinesisFireHoseSqsEvent, -} from '../../../src/types'; -import type { - KinesisDynamoDBStreamEvent, KinesisFirehoseRecord, KinesisFirehoseSqsRecord, -} from '../../../src/types/schema'; +} from '../../../src/types/schema.js'; import { getTestEvent } from './utils.js'; describe('Schema: Kinesis', () => { @@ -29,11 +29,6 @@ describe('Schema: Kinesis', () => { filename: 'stream', }); - const kinesisStreamEventOneRecord = getTestEvent({ - eventsPath, - filename: 'stream-one-record', - }); - const kinesisFirehoseEvent = getTestEvent({ eventsPath, filename: 'firehose', @@ -78,30 +73,7 @@ describe('Schema: Kinesis', () => { // Assess expect(parsed).toStrictEqual(transformedInput); }); - it('parses single kinesis record', () => { - // Prepare - const testEvent = structuredClone(kinesisStreamEventOneRecord); - - // Act - const parsed = KinesisDataStreamSchema.parse(testEvent); - - const transformedInput = { - Records: testEvent.Records.map((record, index) => { - return { - ...record, - kinesis: { - ...record.kinesis, - data: JSON.parse( - Buffer.from(record.kinesis.data, 'base64').toString() - ), - }, - }; - }), - }; - // Assess - expect(parsed).toStrictEqual(transformedInput); - }); it('parses Firehose event', () => { // Prepare const testEvent = structuredClone(kinesisFirehoseEvent); @@ -123,6 +95,7 @@ describe('Schema: Kinesis', () => { // Assess expect(parsed).toStrictEqual(transformedInput); }); + it('parses Kinesis Firehose PutEvents event', () => { // Prepare const testEvent = structuredClone(kinesisFirehosePutEvent); @@ -143,6 +116,7 @@ describe('Schema: Kinesis', () => { // Assess expect(parsed).toStrictEqual(transformedInput); }); + it('parses Firehose event with SQS event', () => { // Prepare const testEvent = structuredClone(kinesisFirehoseSQSEvent); @@ -165,6 +139,7 @@ describe('Schema: Kinesis', () => { // Assess expect(parsed).toStrictEqual(transformedInput); }); + it('parses Kinesis event with CloudWatch event', () => { // Prepare const testEvent = structuredClone(kinesisStreamCloudWatchLogsEvent); @@ -191,6 +166,7 @@ describe('Schema: Kinesis', () => { // Assess expect(parsed).toStrictEqual(transformedInput); }); + it('throws if cannot parse SQS record of KinesisFirehoseSqsRecord', () => { // Prepare const testEvent = getTestEvent({ @@ -201,6 +177,7 @@ describe('Schema: Kinesis', () => { // Act & Assess expect(() => KinesisFirehoseSqsSchema.parse(testEvent)).toThrow(); }); + it('parses a kinesis record from a kinesis event', () => { // Prepare const testEvent: KinesisDataStreamEvent = @@ -231,10 +208,10 @@ describe('Schema: Kinesis', () => { tableName: 'PowertoolsEventsStack-DynamoDBTable59784FC0-8NKAMTERTAXY', dynamodb: { ApproximateCreationDateTime: 1731924555370, - Keys: { id: { S: 'record-1qit2y819gi' } }, + Keys: { id: 'record-1qit2y819gi' }, NewImage: { - id: { S: 'record-1qit2y819gi' }, - data: { S: 'data-x6aq7ckdpgk' }, + id: 'record-1qit2y819gi', + data: 'data-x6aq7ckdpgk', }, SizeBytes: 60, }, @@ -249,10 +226,10 @@ describe('Schema: Kinesis', () => { tableName: 'PowertoolsEventsStack-DynamoDBTable59784FC0-8NKAMTERTAXY', dynamodb: { ApproximateCreationDateTime: 1731924555370, - Keys: { id: { S: 'record-fvxn3q4q5jw' } }, + Keys: { id: 'record-fvxn3q4q5jw' }, NewImage: { - id: { S: 'record-fvxn3q4q5jw' }, - data: { S: 'data-4eompjs89n5' }, + id: 'record-fvxn3q4q5jw', + data: 'data-4eompjs89n5', }, SizeBytes: 60, }, diff --git a/packages/parser/typedoc.json b/packages/parser/typedoc.json index bda7b8332e..0650b79fad 100644 --- a/packages/parser/typedoc.json +++ b/packages/parser/typedoc.json @@ -6,7 +6,8 @@ "./src/types/index.ts", "./src/envelopes/index.ts", "./src/schemas/index.ts", - "./src/helpers.ts" + "./src/helpers.ts", + "./src/helpers/dynamodb.ts" ], "readme": "README.md" }