From 1f1905efeb5b5f9e923ea6f5ff466fe60343f908 Mon Sep 17 00:00:00 2001 From: George Fu Date: Mon, 17 Feb 2025 12:23:34 -0500 Subject: [PATCH] fix(middleware-flexible-checksums): buffer stream chunks to minimum required size (#6882) --- .../src/configuration.ts | 5 + .../src/flexibleChecksumsMiddleware.ts | 20 +- .../middleware-flexible-checksums.e2e.spec.ts | 178 +++++++++++++++++- .../resolveFlexibleChecksumsConfig.spec.ts | 2 + .../src/resolveFlexibleChecksumsConfig.ts | 19 ++ .../src/check-content-length-header.spec.ts | 20 ++ .../src/check-content-length-header.ts | 3 +- .../initializeWithMaximalConfiguration.ts | 1 + supplemental-docs/CLIENTS.md | 25 ++- 9 files changed, 258 insertions(+), 15 deletions(-) diff --git a/packages/middleware-flexible-checksums/src/configuration.ts b/packages/middleware-flexible-checksums/src/configuration.ts index a6a066c74924..e9deff02e06f 100644 --- a/packages/middleware-flexible-checksums/src/configuration.ts +++ b/packages/middleware-flexible-checksums/src/configuration.ts @@ -66,4 +66,9 @@ export interface PreviouslyResolved { * Collects streams into buffers. */ streamCollector: StreamCollector; + + /** + * Minimum bytes from a stream to buffer into a chunk before passing to chunked encoding. + */ + requestStreamBufferSize: number; } diff --git a/packages/middleware-flexible-checksums/src/flexibleChecksumsMiddleware.ts b/packages/middleware-flexible-checksums/src/flexibleChecksumsMiddleware.ts index 5759f42ec60b..2e819631c6c4 100644 --- a/packages/middleware-flexible-checksums/src/flexibleChecksumsMiddleware.ts +++ b/packages/middleware-flexible-checksums/src/flexibleChecksumsMiddleware.ts @@ -9,6 +9,7 @@ import { HandlerExecutionContext, MetadataBearer, } from "@smithy/types"; +import { createBufferedReadable } from "@smithy/util-stream"; import { PreviouslyResolved } from "./configuration"; import { ChecksumAlgorithm, DEFAULT_CHECKSUM_ALGORITHM, RequestChecksumCalculation } from "./constants"; @@ -119,13 +120,18 @@ export const flexibleChecksumsMiddleware = const checksumAlgorithmFn = selectChecksumAlgorithmFunction(checksumAlgorithm, config); if (isStreaming(requestBody)) { const { getAwsChunkedEncodingStream, bodyLengthChecker } = config; - updatedBody = getAwsChunkedEncodingStream(requestBody, { - base64Encoder, - bodyLengthChecker, - checksumLocationName, - checksumAlgorithmFn, - streamHasher, - }); + updatedBody = getAwsChunkedEncodingStream( + typeof config.requestStreamBufferSize === "number" && config.requestStreamBufferSize >= 8 * 1024 + ? createBufferedReadable(requestBody, config.requestStreamBufferSize, context.logger) + : requestBody, + { + base64Encoder, + bodyLengthChecker, + checksumLocationName, + checksumAlgorithmFn, + streamHasher, + } + ); updatedHeaders = { ...headers, "content-encoding": headers["content-encoding"] diff --git a/packages/middleware-flexible-checksums/src/middleware-flexible-checksums.e2e.spec.ts b/packages/middleware-flexible-checksums/src/middleware-flexible-checksums.e2e.spec.ts index 6dc3b719f30e..75c10b60a7dc 100644 --- a/packages/middleware-flexible-checksums/src/middleware-flexible-checksums.e2e.spec.ts +++ b/packages/middleware-flexible-checksums/src/middleware-flexible-checksums.e2e.spec.ts @@ -1,18 +1,41 @@ -import { S3 } from "@aws-sdk/client-s3"; +import { S3, UploadPartCommandOutput } from "@aws-sdk/client-s3"; +import { Upload } from "@aws-sdk/lib-storage"; +import { FetchHttpHandler } from "@smithy/fetch-http-handler"; import type { HttpRequest, HttpResponse } from "@smithy/types"; -import { headStream } from "@smithy/util-stream"; +import { ChecksumStream, headStream } from "@smithy/util-stream"; import { Readable } from "node:stream"; -import { beforeAll, describe, expect, test as it } from "vitest"; +import { beforeAll, describe, expect, test as it, vi } from "vitest"; import { getIntegTestResources } from "../../../tests/e2e/get-integ-test-resources"; describe("S3 checksums", () => { let s3: S3; let s3_noChecksum: S3; + let s3_noRequestBuffer: S3; let Bucket: string; let Key: string; let region: string; const expected = new Uint8Array([97, 98, 99, 100]); + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + + function stream(size: number, chunkSize: number) { + async function* generate() { + while (size > 0) { + const z = Math.min(size, chunkSize); + yield "a".repeat(z); + size -= z; + } + } + return Readable.from(generate()); + } + function webStream(size: number, chunkSize: number) { + return Readable.toWeb(stream(size, chunkSize)) as unknown as ReadableStream; + } beforeAll(async () => { const integTestResourcesEnv = await getIntegTestResources(); @@ -21,7 +44,8 @@ describe("S3 checksums", () => { region = process?.env?.AWS_SMOKE_TEST_REGION as string; Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string; - s3 = new S3({ region }); + s3 = new S3({ logger, region, requestStreamBufferSize: 8 * 1024 }); + s3_noRequestBuffer = new S3({ logger, region }); s3_noChecksum = new S3({ region, requestChecksumCalculation: "WHEN_REQUIRED", @@ -38,7 +62,7 @@ describe("S3 checksums", () => { expect(reqHeader).toEqual("CRC32"); } if (resHeader) { - expect(resHeader).toEqual("7YLNEQ=="); + expect(resHeader.length).toBeGreaterThanOrEqual(8); } return r; }, @@ -52,10 +76,152 @@ describe("S3 checksums", () => { await s3.putObject({ Bucket, Key, Body: "abcd" }); }); + it("checksums work with empty objects", async () => { + await s3.putObject({ + Bucket, + Key: Key + "empty", + Body: stream(0, 0), + ContentLength: 0, + }); + const get = await s3.getObject({ Bucket, Key: Key + "empty" }); + expect(get.Body).toBeInstanceOf(ChecksumStream); + }); + it("an object should have checksum by default", async () => { - await s3.getObject({ Bucket, Key }); + const get = await s3.getObject({ Bucket, Key }); + expect(get.Body).toBeInstanceOf(ChecksumStream); }); + describe("PUT operations", () => { + it("S3 throws an error if chunks are too small, because request buffering is off by default", async () => { + await s3_noRequestBuffer + .putObject({ + Bucket, + Key: Key + "small-chunks", + Body: stream(24 * 1024, 8), + ContentLength: 24 * 1024, + }) + .catch((e) => { + expect(String(e)).toContain( + "InvalidChunkSizeError: Only the last chunk is allowed to have a size less than 8192 bytes" + ); + }); + expect.hasAssertions(); + }); + it("should assist user input streams by buffering to the minimum 8kb required by S3", async () => { + await s3.putObject({ + Bucket, + Key: Key + "small-chunks", + Body: stream(24 * 1024, 8), + ContentLength: 24 * 1024, + }); + expect(logger.warn).toHaveBeenCalledWith( + `@smithy/util-stream - stream chunk size 8 is below threshold of 8192, automatically buffering.` + ); + const get = await s3.getObject({ + Bucket, + Key: Key + "small-chunks", + }); + expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(24 * 1024); + }); + it("should be able to write an object with a webstream body (using fetch handler without checksum)", async () => { + const handler = s3_noChecksum.config.requestHandler; + s3_noChecksum.config.requestHandler = new FetchHttpHandler(); + await s3_noChecksum.putObject({ + Bucket, + Key: Key + "small-chunks-webstream", + Body: webStream(24 * 1024, 512), + ContentLength: 24 * 1024, + }); + s3_noChecksum.config.requestHandler = handler; + const get = await s3.getObject({ + Bucket, + Key: Key + "small-chunks-webstream", + }); + expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(24 * 1024); + }); + it("@aws-sdk/lib-storage Upload should allow webstreams to be used", async () => { + await new Upload({ + client: s3, + params: { + Bucket, + Key: Key + "small-chunks-webstream-mpu", + Body: webStream(6 * 1024 * 1024, 512), + }, + }).done(); + const get = await s3.getObject({ + Bucket, + Key: Key + "small-chunks-webstream-mpu", + }); + expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(6 * 1024 * 1024); + }); + it("should allow streams to be used in a manually orchestrated MPU", async () => { + const cmpu = await s3.createMultipartUpload({ + Bucket, + Key: Key + "-mpu", + }); + + const MB = 1024 * 1024; + const up = [] as UploadPartCommandOutput[]; + + try { + up.push( + await s3.uploadPart({ + Bucket, + Key: Key + "-mpu", + UploadId: cmpu.UploadId, + Body: stream(5 * MB, 1024), + PartNumber: 1, + ContentLength: 5 * MB, + }), + await s3.uploadPart({ + Bucket, + Key: Key + "-mpu", + UploadId: cmpu.UploadId, + Body: stream(MB, 64), + PartNumber: 2, + ContentLength: MB, + }) + ); + expect(logger.warn).toHaveBeenCalledWith( + `@smithy/util-stream - stream chunk size 1024 is below threshold of 8192, automatically buffering.` + ); + expect(logger.warn).toHaveBeenCalledWith( + `@smithy/util-stream - stream chunk size 64 is below threshold of 8192, automatically buffering.` + ); + + await s3.completeMultipartUpload({ + Bucket, + Key: Key + "-mpu", + UploadId: cmpu.UploadId, + MultipartUpload: { + Parts: up.map((part, i) => { + return { + PartNumber: i + 1, + ETag: part.ETag, + }; + }), + }, + }); + + const go = await s3.getObject({ + Bucket, + Key: Key + "-mpu", + }); + expect((await go.Body?.transformToByteArray())?.byteLength).toEqual(6 * MB); + + expect(go.$metadata.httpStatusCode).toEqual(200); + } catch (e) { + await s3.abortMultipartUpload({ + UploadId: cmpu.UploadId, + Bucket, + Key: Key + "-mpu", + }); + throw e; + } + }); + }, 45_000); + describe("the stream returned by S3::getObject should function interchangeably between ChecksumStream and default streams", () => { it("when collecting the stream", async () => { const defaultStream = (await s3_noChecksum.getObject({ Bucket, Key })).Body as Readable; diff --git a/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.spec.ts b/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.spec.ts index 06b293d2cbcc..9f340834a480 100644 --- a/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.spec.ts +++ b/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.spec.ts @@ -25,6 +25,7 @@ describe(resolveFlexibleChecksumsConfig.name, () => { expect(resolvedConfig).toEqual({ requestChecksumCalculation: DEFAULT_REQUEST_CHECKSUM_CALCULATION, responseChecksumValidation: DEFAULT_RESPONSE_CHECKSUM_VALIDATION, + requestStreamBufferSize: 0, }); expect(normalizeProvider).toHaveBeenCalledTimes(2); }); @@ -33,6 +34,7 @@ describe(resolveFlexibleChecksumsConfig.name, () => { const mockInput = { requestChecksumCalculation: RequestChecksumCalculation.WHEN_REQUIRED, responseChecksumValidation: ResponseChecksumValidation.WHEN_REQUIRED, + requestStreamBufferSize: 0, }; const resolvedConfig = resolveFlexibleChecksumsConfig(mockInput); expect(resolvedConfig).toEqual(mockInput); diff --git a/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.ts b/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.ts index 57cfd22e7ed9..8741f9b69947 100644 --- a/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.ts +++ b/packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.ts @@ -18,11 +18,29 @@ export interface FlexibleChecksumsInputConfig { * Determines when checksum validation will be performed on response payloads. */ responseChecksumValidation?: ResponseChecksumValidation | Provider; + + /** + * Default 0 (off). + * + * When set to a value greater than or equal to 8192, sets the minimum number + * of bytes to buffer into a chunk when processing input streams + * with chunked encoding (that is, when request checksums are enabled). + * A minimum of 8kb = 8 * 1024 is required, and 64kb or higher is recommended. + * + * See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html. + * + * This has a slight performance penalty because it must wrap and buffer + * your input stream. + * You do not need to set this value if your stream already flows chunks + * of 8kb or greater. + */ + requestStreamBufferSize?: number | false; } export interface FlexibleChecksumsResolvedConfig { requestChecksumCalculation: Provider; responseChecksumValidation: Provider; + requestStreamBufferSize: number; } export const resolveFlexibleChecksumsConfig = ( @@ -35,4 +53,5 @@ export const resolveFlexibleChecksumsConfig = ( responseChecksumValidation: normalizeProvider( input.responseChecksumValidation ?? DEFAULT_RESPONSE_CHECKSUM_VALIDATION ), + requestStreamBufferSize: Number(input.requestStreamBufferSize ?? 0), }); diff --git a/packages/middleware-sdk-s3/src/check-content-length-header.spec.ts b/packages/middleware-sdk-s3/src/check-content-length-header.spec.ts index 6efc3e96dc9a..17945fa08582 100644 --- a/packages/middleware-sdk-s3/src/check-content-length-header.spec.ts +++ b/packages/middleware-sdk-s3/src/check-content-length-header.spec.ts @@ -130,4 +130,24 @@ describe("checkContentLengthHeaderMiddleware", () => { expect(spy).not.toHaveBeenCalled(); }); + + it("does not warn if uploading a payload of known length via alternate header x-amz-decoded-content-length", async () => { + const handler = checkContentLengthHeader()(mockNextHandler, {}); + + await handler({ + request: { + method: null, + protocol: null, + hostname: null, + path: null, + query: {}, + headers: { + "x-amz-decoded-content-length": "5", + }, + }, + input: {}, + }); + + expect(spy).not.toHaveBeenCalled(); + }); }); diff --git a/packages/middleware-sdk-s3/src/check-content-length-header.ts b/packages/middleware-sdk-s3/src/check-content-length-header.ts index ca45e78484bd..4425b4e479c7 100644 --- a/packages/middleware-sdk-s3/src/check-content-length-header.ts +++ b/packages/middleware-sdk-s3/src/check-content-length-header.ts @@ -12,6 +12,7 @@ import { } from "@smithy/types"; const CONTENT_LENGTH_HEADER = "content-length"; +const DECODED_CONTENT_LENGTH_HEADER = "x-amz-decoded-content-length"; /** * @internal @@ -28,7 +29,7 @@ export function checkContentLengthHeader(): FinalizeRequestMiddleware const { request } = args; if (HttpRequest.isInstance(request)) { - if (!(CONTENT_LENGTH_HEADER in request.headers)) { + if (!(CONTENT_LENGTH_HEADER in request.headers) && !(DECODED_CONTENT_LENGTH_HEADER in request.headers)) { const message = `Are you using a Stream of unknown length as the Body of a PutObject request? Consider using Upload instead from @aws-sdk/lib-storage.`; if (typeof context?.logger?.warn === "function" && !(context.logger instanceof NoOpLogger)) { context.logger.warn(message); diff --git a/private/aws-client-api-test/src/client-interface-tests/client-s3/impl/initializeWithMaximalConfiguration.ts b/private/aws-client-api-test/src/client-interface-tests/client-s3/impl/initializeWithMaximalConfiguration.ts index adbf6c675449..9989684a276b 100644 --- a/private/aws-client-api-test/src/client-interface-tests/client-s3/impl/initializeWithMaximalConfiguration.ts +++ b/private/aws-client-api-test/src/client-interface-tests/client-s3/impl/initializeWithMaximalConfiguration.ts @@ -130,6 +130,7 @@ export const initializeWithMaximalConfiguration = () => { requestChecksumCalculation: DEFAULT_REQUEST_CHECKSUM_CALCULATION, responseChecksumValidation: DEFAULT_RESPONSE_CHECKSUM_VALIDATION, userAgentAppId: "testApp", + requestStreamBufferSize: 8 * 1024, }; const s3 = new S3Client(config); diff --git a/supplemental-docs/CLIENTS.md b/supplemental-docs/CLIENTS.md index 66ab91f51a1b..4ebeb739c7ec 100644 --- a/supplemental-docs/CLIENTS.md +++ b/supplemental-docs/CLIENTS.md @@ -753,7 +753,8 @@ See also https://aws.amazon.com/blogs/developer/middleware-stack-modular-aws-sdk ### S3 -`followRegionRedirects`: +#### `followRegionRedirects`: + This feature was previously called the S3 Global Client. Setting this option to true enables failed requests to be retried with a corrected region when receiving a permanent redirect error with status 301. Note that this can result in additional latency owing to the retried request. This feature should only be used as a last resort if you do not know the region of your bucket(s) ahead of time. ```ts @@ -763,6 +764,28 @@ new S3Client({ }); ``` +#### `requestChecksumCalculation` and `responseChecksumValidation`: + +These may be set to `WHEN_REQUIRED` or `WHEN_SUPPORTED`. See https://github.com/aws/aws-sdk-js-v3/issues/6810. + +#### `requestStreamBufferSize`: + +This only comes into play when request checksums are enabled. +When you set a value greater than or equal to 8kb, or 8192 bytes, +your input streams that emit chunks having size less than the value +configured will be buffered until the buffer exceeds the desired size +before continuing to flow. + +See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html. + +```ts +// example: configuring the S3 client to buffer your +// input streams into chunks of 64kb or higher. +new S3Client({ + requestStreamBufferSize: 64 * 1024, +}); +``` + ### SQS #### Using Queue Names with SQS Client