Skip to content

Commit

Permalink
fix(middleware-flexible-checksums): buffer stream chunks to minimum r…
Browse files Browse the repository at this point in the history
…equired size (#6882)
  • Loading branch information
kuhe authored Feb 17, 2025
1 parent a1d8ad3 commit 1f1905e
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 15 deletions.
5 changes: 5 additions & 0 deletions packages/middleware-flexible-checksums/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ export interface PreviouslyResolved {
* Collects streams into buffers.
*/
streamCollector: StreamCollector;

/**
* Minimum bytes from a stream to buffer into a chunk before passing to chunked encoding.
*/
requestStreamBufferSize: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
HandlerExecutionContext,
MetadataBearer,
} from "@smithy/types";
import { createBufferedReadable } from "@smithy/util-stream";

import { PreviouslyResolved } from "./configuration";
import { ChecksumAlgorithm, DEFAULT_CHECKSUM_ALGORITHM, RequestChecksumCalculation } from "./constants";
Expand Down Expand Up @@ -119,13 +120,18 @@ export const flexibleChecksumsMiddleware =
const checksumAlgorithmFn = selectChecksumAlgorithmFunction(checksumAlgorithm, config);
if (isStreaming(requestBody)) {
const { getAwsChunkedEncodingStream, bodyLengthChecker } = config;
updatedBody = getAwsChunkedEncodingStream(requestBody, {
base64Encoder,
bodyLengthChecker,
checksumLocationName,
checksumAlgorithmFn,
streamHasher,
});
updatedBody = getAwsChunkedEncodingStream(
typeof config.requestStreamBufferSize === "number" && config.requestStreamBufferSize >= 8 * 1024
? createBufferedReadable(requestBody, config.requestStreamBufferSize, context.logger)
: requestBody,
{
base64Encoder,
bodyLengthChecker,
checksumLocationName,
checksumAlgorithmFn,
streamHasher,
}
);
updatedHeaders = {
...headers,
"content-encoding": headers["content-encoding"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,41 @@
import { S3 } from "@aws-sdk/client-s3";
import { S3, UploadPartCommandOutput } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { FetchHttpHandler } from "@smithy/fetch-http-handler";
import type { HttpRequest, HttpResponse } from "@smithy/types";
import { headStream } from "@smithy/util-stream";
import { ChecksumStream, headStream } from "@smithy/util-stream";
import { Readable } from "node:stream";
import { beforeAll, describe, expect, test as it } from "vitest";
import { beforeAll, describe, expect, test as it, vi } from "vitest";

import { getIntegTestResources } from "../../../tests/e2e/get-integ-test-resources";

describe("S3 checksums", () => {
let s3: S3;
let s3_noChecksum: S3;
let s3_noRequestBuffer: S3;
let Bucket: string;
let Key: string;
let region: string;
const expected = new Uint8Array([97, 98, 99, 100]);
const logger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};

function stream(size: number, chunkSize: number) {
async function* generate() {
while (size > 0) {
const z = Math.min(size, chunkSize);
yield "a".repeat(z);
size -= z;
}
}
return Readable.from(generate());
}
function webStream(size: number, chunkSize: number) {
return Readable.toWeb(stream(size, chunkSize)) as unknown as ReadableStream;
}

beforeAll(async () => {
const integTestResourcesEnv = await getIntegTestResources();
Expand All @@ -21,7 +44,8 @@ describe("S3 checksums", () => {
region = process?.env?.AWS_SMOKE_TEST_REGION as string;
Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string;

s3 = new S3({ region });
s3 = new S3({ logger, region, requestStreamBufferSize: 8 * 1024 });
s3_noRequestBuffer = new S3({ logger, region });
s3_noChecksum = new S3({
region,
requestChecksumCalculation: "WHEN_REQUIRED",
Expand All @@ -38,7 +62,7 @@ describe("S3 checksums", () => {
expect(reqHeader).toEqual("CRC32");
}
if (resHeader) {
expect(resHeader).toEqual("7YLNEQ==");
expect(resHeader.length).toBeGreaterThanOrEqual(8);
}
return r;
},
Expand All @@ -52,10 +76,152 @@ describe("S3 checksums", () => {
await s3.putObject({ Bucket, Key, Body: "abcd" });
});

it("checksums work with empty objects", async () => {
await s3.putObject({
Bucket,
Key: Key + "empty",
Body: stream(0, 0),
ContentLength: 0,
});
const get = await s3.getObject({ Bucket, Key: Key + "empty" });
expect(get.Body).toBeInstanceOf(ChecksumStream);
});

it("an object should have checksum by default", async () => {
await s3.getObject({ Bucket, Key });
const get = await s3.getObject({ Bucket, Key });
expect(get.Body).toBeInstanceOf(ChecksumStream);
});

describe("PUT operations", () => {
it("S3 throws an error if chunks are too small, because request buffering is off by default", async () => {
await s3_noRequestBuffer
.putObject({
Bucket,
Key: Key + "small-chunks",
Body: stream(24 * 1024, 8),
ContentLength: 24 * 1024,
})
.catch((e) => {
expect(String(e)).toContain(
"InvalidChunkSizeError: Only the last chunk is allowed to have a size less than 8192 bytes"
);
});
expect.hasAssertions();
});
it("should assist user input streams by buffering to the minimum 8kb required by S3", async () => {
await s3.putObject({
Bucket,
Key: Key + "small-chunks",
Body: stream(24 * 1024, 8),
ContentLength: 24 * 1024,
});
expect(logger.warn).toHaveBeenCalledWith(
`@smithy/util-stream - stream chunk size 8 is below threshold of 8192, automatically buffering.`
);
const get = await s3.getObject({
Bucket,
Key: Key + "small-chunks",
});
expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(24 * 1024);
});
it("should be able to write an object with a webstream body (using fetch handler without checksum)", async () => {
const handler = s3_noChecksum.config.requestHandler;
s3_noChecksum.config.requestHandler = new FetchHttpHandler();
await s3_noChecksum.putObject({
Bucket,
Key: Key + "small-chunks-webstream",
Body: webStream(24 * 1024, 512),
ContentLength: 24 * 1024,
});
s3_noChecksum.config.requestHandler = handler;
const get = await s3.getObject({
Bucket,
Key: Key + "small-chunks-webstream",
});
expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(24 * 1024);
});
it("@aws-sdk/lib-storage Upload should allow webstreams to be used", async () => {
await new Upload({
client: s3,
params: {
Bucket,
Key: Key + "small-chunks-webstream-mpu",
Body: webStream(6 * 1024 * 1024, 512),
},
}).done();
const get = await s3.getObject({
Bucket,
Key: Key + "small-chunks-webstream-mpu",
});
expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(6 * 1024 * 1024);
});
it("should allow streams to be used in a manually orchestrated MPU", async () => {
const cmpu = await s3.createMultipartUpload({
Bucket,
Key: Key + "-mpu",
});

const MB = 1024 * 1024;
const up = [] as UploadPartCommandOutput[];

try {
up.push(
await s3.uploadPart({
Bucket,
Key: Key + "-mpu",
UploadId: cmpu.UploadId,
Body: stream(5 * MB, 1024),
PartNumber: 1,
ContentLength: 5 * MB,
}),
await s3.uploadPart({
Bucket,
Key: Key + "-mpu",
UploadId: cmpu.UploadId,
Body: stream(MB, 64),
PartNumber: 2,
ContentLength: MB,
})
);
expect(logger.warn).toHaveBeenCalledWith(
`@smithy/util-stream - stream chunk size 1024 is below threshold of 8192, automatically buffering.`
);
expect(logger.warn).toHaveBeenCalledWith(
`@smithy/util-stream - stream chunk size 64 is below threshold of 8192, automatically buffering.`
);

await s3.completeMultipartUpload({
Bucket,
Key: Key + "-mpu",
UploadId: cmpu.UploadId,
MultipartUpload: {
Parts: up.map((part, i) => {
return {
PartNumber: i + 1,
ETag: part.ETag,
};
}),
},
});

const go = await s3.getObject({
Bucket,
Key: Key + "-mpu",
});
expect((await go.Body?.transformToByteArray())?.byteLength).toEqual(6 * MB);

expect(go.$metadata.httpStatusCode).toEqual(200);
} catch (e) {
await s3.abortMultipartUpload({
UploadId: cmpu.UploadId,
Bucket,
Key: Key + "-mpu",
});
throw e;
}
});
}, 45_000);

describe("the stream returned by S3::getObject should function interchangeably between ChecksumStream and default streams", () => {
it("when collecting the stream", async () => {
const defaultStream = (await s3_noChecksum.getObject({ Bucket, Key })).Body as Readable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ describe(resolveFlexibleChecksumsConfig.name, () => {
expect(resolvedConfig).toEqual({
requestChecksumCalculation: DEFAULT_REQUEST_CHECKSUM_CALCULATION,
responseChecksumValidation: DEFAULT_RESPONSE_CHECKSUM_VALIDATION,
requestStreamBufferSize: 0,
});
expect(normalizeProvider).toHaveBeenCalledTimes(2);
});
Expand All @@ -33,6 +34,7 @@ describe(resolveFlexibleChecksumsConfig.name, () => {
const mockInput = {
requestChecksumCalculation: RequestChecksumCalculation.WHEN_REQUIRED,
responseChecksumValidation: ResponseChecksumValidation.WHEN_REQUIRED,
requestStreamBufferSize: 0,
};
const resolvedConfig = resolveFlexibleChecksumsConfig(mockInput);
expect(resolvedConfig).toEqual(mockInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,29 @@ export interface FlexibleChecksumsInputConfig {
* Determines when checksum validation will be performed on response payloads.
*/
responseChecksumValidation?: ResponseChecksumValidation | Provider<ResponseChecksumValidation>;

/**
* Default 0 (off).
*
* When set to a value greater than or equal to 8192, sets the minimum number
* of bytes to buffer into a chunk when processing input streams
* with chunked encoding (that is, when request checksums are enabled).
* A minimum of 8kb = 8 * 1024 is required, and 64kb or higher is recommended.
*
* See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html.
*
* This has a slight performance penalty because it must wrap and buffer
* your input stream.
* You do not need to set this value if your stream already flows chunks
* of 8kb or greater.
*/
requestStreamBufferSize?: number | false;
}

export interface FlexibleChecksumsResolvedConfig {
requestChecksumCalculation: Provider<RequestChecksumCalculation>;
responseChecksumValidation: Provider<ResponseChecksumValidation>;
requestStreamBufferSize: number;
}

export const resolveFlexibleChecksumsConfig = <T>(
Expand All @@ -35,4 +53,5 @@ export const resolveFlexibleChecksumsConfig = <T>(
responseChecksumValidation: normalizeProvider(
input.responseChecksumValidation ?? DEFAULT_RESPONSE_CHECKSUM_VALIDATION
),
requestStreamBufferSize: Number(input.requestStreamBufferSize ?? 0),
});
20 changes: 20 additions & 0 deletions packages/middleware-sdk-s3/src/check-content-length-header.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,24 @@ describe("checkContentLengthHeaderMiddleware", () => {

expect(spy).not.toHaveBeenCalled();
});

it("does not warn if uploading a payload of known length via alternate header x-amz-decoded-content-length", async () => {
const handler = checkContentLengthHeader()(mockNextHandler, {});

await handler({
request: {
method: null,
protocol: null,
hostname: null,
path: null,
query: {},
headers: {
"x-amz-decoded-content-length": "5",
},
},
input: {},
});

expect(spy).not.toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from "@smithy/types";

const CONTENT_LENGTH_HEADER = "content-length";
const DECODED_CONTENT_LENGTH_HEADER = "x-amz-decoded-content-length";

/**
* @internal
Expand All @@ -28,7 +29,7 @@ export function checkContentLengthHeader(): FinalizeRequestMiddleware<any, any>
const { request } = args;

if (HttpRequest.isInstance(request)) {
if (!(CONTENT_LENGTH_HEADER in request.headers)) {
if (!(CONTENT_LENGTH_HEADER in request.headers) && !(DECODED_CONTENT_LENGTH_HEADER in request.headers)) {
const message = `Are you using a Stream of unknown length as the Body of a PutObject request? Consider using Upload instead from @aws-sdk/lib-storage.`;
if (typeof context?.logger?.warn === "function" && !(context.logger instanceof NoOpLogger)) {
context.logger.warn(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ export const initializeWithMaximalConfiguration = () => {
requestChecksumCalculation: DEFAULT_REQUEST_CHECKSUM_CALCULATION,
responseChecksumValidation: DEFAULT_RESPONSE_CHECKSUM_VALIDATION,
userAgentAppId: "testApp",
requestStreamBufferSize: 8 * 1024,
};

const s3 = new S3Client(config);
Expand Down
Loading

0 comments on commit 1f1905e

Please sign in to comment.