Skip to content

Commit

Permalink
fix(parser): allow Kinesis envelopes to handle non-JSON strings (#3531)
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamorosi authored Jan 27, 2025
1 parent 4721dda commit d18e03d
Show file tree
Hide file tree
Showing 10 changed files with 630 additions and 317 deletions.
93 changes: 71 additions & 22 deletions packages/parser/src/envelopes/kinesis-firehose.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { ZodSchema, z } from 'zod';
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
import { ParseError } from '../errors.js';
import { KinesisFirehoseSchema } from '../schemas/index.js';
import {
type KinesisFirehoseRecordSchema,
KinesisFirehoseSchema,
} from '../schemas/index.js';
import type { ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
import { envelopeDiscriminator } from './envelope.js';

/**
* Kinesis Firehose Envelope to extract array of Records
Expand All @@ -23,10 +26,33 @@ export const KinesisFirehoseEnvelope = {
*/
[envelopeDiscriminator]: 'array' as const,
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = KinesisFirehoseSchema.parse(data);
let parsedEnvelope: z.infer<typeof KinesisFirehoseSchema>;
try {
parsedEnvelope = KinesisFirehoseSchema.parse(data);
} catch (error) {
throw new ParseError('Failed to parse Kinesis Firehose envelope', {
cause: error as Error,
});
}

return parsedEnvelope.records.map((record) => {
return Envelope.parse(record.data, schema);
return parsedEnvelope.records.map((record, recordIndex) => {
let parsedRecord: z.infer<typeof KinesisFirehoseRecordSchema>;
try {
parsedRecord = schema.parse(record.data);
} catch (error) {
throw new ParseError(
`Failed to parse Kinesis Firehose record at index ${recordIndex}`,
{
cause: new ZodError(
(error as ZodError).issues.map((issue) => ({
...issue,
path: ['records', recordIndex, 'data', ...issue.path],
}))
),
}
);
}
return parsedRecord;
});
},

Expand All @@ -35,7 +61,6 @@ export const KinesisFirehoseEnvelope = {
schema: T
): ParsedResult<unknown, z.infer<T>[]> {
const parsedEnvelope = KinesisFirehoseSchema.safeParse(data);

if (!parsedEnvelope.success) {
return {
success: false,
Expand All @@ -45,25 +70,49 @@ export const KinesisFirehoseEnvelope = {
originalEvent: data,
};
}
const parsedRecords: z.infer<T>[] = [];

for (const record of parsedEnvelope.data.records) {
const parsedData = Envelope.safeParse(record.data, schema);
if (!parsedData.success) {
return {
success: false,
error: new ParseError('Failed to parse Kinesis Firehose record', {
cause: parsedData.error,
}),
originalEvent: data,
};
}
parsedRecords.push(parsedData.data);
const result = parsedEnvelope.data.records.reduce<{
success: boolean;
records: z.infer<T>[];
errors: {
[key: number | string]: { issues: ZodIssue[] };
};
}>(
(acc, record, index) => {
const parsedRecord = schema.safeParse(record.data);

if (!parsedRecord.success) {
const issues = parsedRecord.error.issues.map((issue) => ({
...issue,
path: ['records', index, 'data', ...issue.path],
}));
acc.success = false;
acc.errors[index] = { issues };
return acc;
}

acc.records.push(parsedRecord.data);
return acc;
},
{ success: true, records: [], errors: {} }
);

if (result.success) {
return { success: true, data: result.records };
}

const errorMessage =
Object.keys(result.errors).length > 1
? `Failed to parse Kinesis Firehose records at indexes ${Object.keys(result.errors).join(', ')}`
: `Failed to parse Kinesis Firehose record at index ${Object.keys(result.errors)[0]}`;
return {
success: true,
data: parsedRecords,
success: false,
error: new ParseError(errorMessage, {
cause: new ZodError(
Object.values(result.errors).flatMap((error) => error.issues)
),
}),
originalEvent: data,
};
},
};
97 changes: 76 additions & 21 deletions packages/parser/src/envelopes/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { ZodSchema, z } from 'zod';
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
import { ParseError } from '../errors.js';
import { KinesisDataStreamSchema } from '../schemas/kinesis.js';
import {
type KinesisDataStreamRecord,
KinesisDataStreamSchema,
} from '../schemas/kinesis.js';
import type { ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
import { envelopeDiscriminator } from './envelope.js';

/**
* Kinesis Data Stream Envelope to extract array of Records
Expand All @@ -21,10 +24,39 @@ export const KinesisEnvelope = {
*/
[envelopeDiscriminator]: 'array' as const,
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = KinesisDataStreamSchema.parse(data);
let parsedEnvelope: z.infer<typeof KinesisDataStreamSchema>;
try {
parsedEnvelope = KinesisDataStreamSchema.parse(data);
} catch (error) {
throw new ParseError('Failed to parse Kinesis Data Stream envelope', {
cause: error as Error,
});
}

return parsedEnvelope.Records.map((record) => {
return Envelope.parse(record.kinesis.data, schema);
return parsedEnvelope.Records.map((record, recordIndex) => {
let parsedRecord: z.infer<typeof KinesisDataStreamRecord>;
try {
parsedRecord = schema.parse(record.kinesis.data);
} catch (error) {
throw new ParseError(
`Failed to parse Kinesis Data Stream record at index ${recordIndex}`,
{
cause: new ZodError(
(error as ZodError).issues.map((issue) => ({
...issue,
path: [
'Records',
recordIndex,
'kinesis',
'data',
...issue.path,
],
}))
),
}
);
}
return parsedRecord;
});
},

Expand All @@ -43,25 +75,48 @@ export const KinesisEnvelope = {
};
}

const parsedRecords: z.infer<T>[] = [];
const result = parsedEnvelope.data.Records.reduce<{
success: boolean;
records: z.infer<T>[];
errors: { index?: number; issues?: ZodIssue[] };
}>(
(acc, record, index) => {
const parsedRecord = schema.safeParse(record.kinesis.data);

for (const record of parsedEnvelope.data.Records) {
const parsedRecord = Envelope.safeParse(record.kinesis.data, schema);
if (!parsedRecord.success) {
return {
success: false,
error: new ParseError('Failed to parse Kinesis Data Stream record', {
cause: parsedRecord.error,
}),
originalEvent: data,
};
}
parsedRecords.push(parsedRecord.data);
if (!parsedRecord.success) {
const issues = parsedRecord.error.issues.map((issue) => ({
...issue,
path: ['Records', index, 'kinesis', 'data', ...issue.path],
}));
acc.success = false;
// @ts-expect-error - index is assigned
acc.errors[index] = { issues };
return acc;
}

acc.records.push(parsedRecord.data);
return acc;
},
{ success: true, records: [], errors: {} }
);

if (result.success) {
return { success: true, data: result.records };
}

const errorMessage =
Object.keys(result.errors).length > 1
? `Failed to parse Kinesis Data Stream records at indexes ${Object.keys(result.errors).join(', ')}`
: `Failed to parse Kinesis Data Stream record at index ${Object.keys(result.errors)[0]}`;
return {
success: true,
data: parsedRecords,
success: false,
error: new ParseError(errorMessage, {
cause: new ZodError(
// @ts-expect-error - issues are assigned because success is false
Object.values(result.errors).flatMap((error) => error.issues)
),
}),
originalEvent: data,
};
},
};
Loading

0 comments on commit d18e03d

Please sign in to comment.