diff --git a/clients/client-transcribe-streaming/TranscribeStreamingClient.ts b/clients/client-transcribe-streaming/TranscribeStreamingClient.ts index 7b4ea48714e59..8d7955e0d4cb8 100644 --- a/clients/client-transcribe-streaming/TranscribeStreamingClient.ts +++ b/clients/client-transcribe-streaming/TranscribeStreamingClient.ts @@ -34,6 +34,12 @@ import { getRetryPlugin, resolveRetryConfig } from "@aws-sdk/middleware-retry"; +import { + WebSocketInputConfig, + WebSocketResolvedConfig, + getWebSocketPlugin, + resolveWebSocketConfig +} from "@aws-sdk/middleware-sdk-transcribe-streaming"; import { AwsAuthInputConfig, AwsAuthResolvedConfig, @@ -176,6 +182,7 @@ export type TranscribeStreamingClientConfig = Partial< UserAgentInputConfig & HostHeaderInputConfig & EventStreamInputConfig & + WebSocketInputConfig & EventStreamSerdeInputConfig; export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfiguration< @@ -189,6 +196,7 @@ export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfigurat UserAgentResolvedConfig & HostHeaderResolvedConfig & EventStreamResolvedConfig & + WebSocketResolvedConfig & EventStreamSerdeResolvedConfig; /** @@ -214,14 +222,16 @@ export class TranscribeStreamingClient extends __Client< let _config_5 = resolveUserAgentConfig(_config_4); let _config_6 = resolveHostHeaderConfig(_config_5); let _config_7 = resolveEventStreamConfig(_config_6); - let _config_8 = resolveEventStreamSerdeConfig(_config_7); - super(_config_8); - this.config = _config_8; + let _config_8 = resolveWebSocketConfig(_config_7); + let _config_9 = resolveEventStreamSerdeConfig(_config_8); + super(_config_9); + this.config = _config_9; this.middlewareStack.use(getAwsAuthPlugin(this.config)); this.middlewareStack.use(getRetryPlugin(this.config)); this.middlewareStack.use(getUserAgentPlugin(this.config)); this.middlewareStack.use(getContentLengthPlugin(this.config)); this.middlewareStack.use(getHostHeaderPlugin(this.config)); + this.middlewareStack.use(getWebSocketPlugin(this.config)); } destroy(): void { diff --git a/clients/client-transcribe-streaming/package.json b/clients/client-transcribe-streaming/package.json index 74d26d35cdfea..7349b27afebc0 100644 --- a/clients/client-transcribe-streaming/package.json +++ b/clients/client-transcribe-streaming/package.json @@ -45,6 +45,7 @@ "@aws-sdk/middleware-eventstream": "1.0.0-gamma.0", "@aws-sdk/middleware-host-header": "1.0.0-gamma.1", "@aws-sdk/middleware-retry": "1.0.0-gamma.1", + "@aws-sdk/middleware-sdk-transcribe-streaming": "1.0.0-gamma.0", "@aws-sdk/middleware-serde": "1.0.0-gamma.1", "@aws-sdk/middleware-signing": "1.0.0-gamma.1", "@aws-sdk/middleware-stack": "1.0.0-gamma.1", @@ -82,4 +83,4 @@ "url": "https://aws.amazon.com/javascript/" }, "license": "Apache-2.0" -} \ No newline at end of file +} diff --git a/clients/client-transcribe-streaming/runtimeConfig.browser.ts b/clients/client-transcribe-streaming/runtimeConfig.browser.ts index b73f3636b86a8..e3789eff50709 100644 --- a/clients/client-transcribe-streaming/runtimeConfig.browser.ts +++ b/clients/client-transcribe-streaming/runtimeConfig.browser.ts @@ -1,8 +1,12 @@ import { name, version } from "./package.json"; import { Sha256 } from "@aws-crypto/sha256-browser"; import { eventStreamSerdeProvider } from "@aws-sdk/eventstream-serde-browser"; -import { FetchHttpHandler, streamCollector } from "@aws-sdk/fetch-http-handler"; +import { streamCollector } from "@aws-sdk/fetch-http-handler"; import { invalidFunction } from "@aws-sdk/invalid-dependency"; +import { + WebSocketHandler, + eventStreamPayloadHandler +} from "@aws-sdk/middleware-sdk-transcribe-streaming"; import { parseUrl } from "@aws-sdk/url-parser-browser"; import { fromBase64, toBase64 } from "@aws-sdk/util-base64-browser"; import { calculateBodyLength } from "@aws-sdk/util-body-length-browser"; @@ -19,12 +23,10 @@ export const ClientDefaultValues: Required = { bodyLengthChecker: calculateBodyLength, credentialDefaultProvider: invalidFunction("Credential is missing") as any, defaultUserAgent: defaultUserAgent(name, version), - eventStreamPayloadHandlerProvider: () => ({ - handle: invalidFunction("event stream request is not supported in browser.") - }), + eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler, eventStreamSerdeProvider, regionDefaultProvider: invalidFunction("Region is missing") as any, - requestHandler: new FetchHttpHandler(), + requestHandler: new WebSocketHandler(), sha256: Sha256, streamCollector, urlParser: parseUrl, diff --git a/clients/client-transcribe-streaming/runtimeConfig.native.ts b/clients/client-transcribe-streaming/runtimeConfig.native.ts index 488a2ec132d67..39e6e9ab9d7c1 100644 --- a/clients/client-transcribe-streaming/runtimeConfig.native.ts +++ b/clients/client-transcribe-streaming/runtimeConfig.native.ts @@ -1,6 +1,9 @@ import { name, version } from "./package.json"; import { Sha256 } from "@aws-crypto/sha256-js"; -import { invalidFunction } from "@aws-sdk/invalid-dependency"; +import { + WebSocketHandler, + eventStreamPayloadHandler +} from "@aws-sdk/middleware-sdk-transcribe-streaming"; import { parseUrl } from "@aws-sdk/url-parser-node"; import { ClientDefaults } from "./TranscribeStreamingClient"; import { ClientDefaultValues as BrowserDefaults } from "./runtimeConfig.browser"; @@ -9,17 +12,8 @@ export const ClientDefaultValues: Required = { ...BrowserDefaults, runtime: "react-native", defaultUserAgent: `aws-sdk-js-v3-react-native-${name}/${version}`, - eventStreamPayloadHandlerProvider: () => ({ - handle: invalidFunction( - "event stream request is not supported in ReactNative." - ) - }), - eventStreamSerdeProvider: () => ({ - serialize: invalidFunction("event stream is not supported in ReactNative."), - deserialize: invalidFunction( - "event stream is not supported in ReactNative." - ) - }), + eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler, + requestHandler: new WebSocketHandler(), sha256: Sha256, urlParser: parseUrl }; diff --git a/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AddTranscribeStreamingDependency.java b/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AddTranscribeStreamingDependency.java new file mode 100644 index 0000000000000..61139ecf68460 --- /dev/null +++ b/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AddTranscribeStreamingDependency.java @@ -0,0 +1,81 @@ +package software.amazon.smithy.aws.typescript.codegen; + +import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_CONFIG; +import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_MIDDLEWARE; + + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import software.amazon.smithy.aws.traits.ServiceTrait; +import software.amazon.smithy.codegen.core.SymbolProvider; +import software.amazon.smithy.model.Model; +import software.amazon.smithy.model.shapes.ServiceShape; +import software.amazon.smithy.typescript.codegen.LanguageTarget; +import software.amazon.smithy.typescript.codegen.TypeScriptDependency; +import software.amazon.smithy.typescript.codegen.TypeScriptSettings; +import software.amazon.smithy.typescript.codegen.TypeScriptWriter; +import software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin; +import software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration; +import software.amazon.smithy.utils.ListUtils; +import software.amazon.smithy.utils.MapUtils; + +/** + * Add client plugins and configs to support WebSocket streaming for Transcribe + * Streaming service + * */ +public class AddTranscribeStreamingDependency implements TypeScriptIntegration { + @Override + public List getClientPlugins() { + return ListUtils.of( + RuntimeClientPlugin.builder() + .withConventions(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.dependency, + "WebSocket") + .servicePredicate((m, s) -> AddTranscribeStreamingDependency.isTranscribeStreaming(s)) + .build() + ); + } + + @Override + public Map> getRuntimeConfigWriters( + TypeScriptSettings settings, + Model model, + SymbolProvider symbolProvider, + LanguageTarget target + ) { + ServiceShape service = settings.getService(model); + if (!isTranscribeStreaming(service)) { + return Collections.emptyMap(); + } + + Map> transcribeStreamingHandlerConfig = MapUtils.of( + "eventStreamPayloadHandlerProvider", writer -> { + writer.addDependency(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE); + writer.addImport("eventStreamPayloadHandler", "eventStreamPayloadHandler", + AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.packageName); + writer.write("eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler,"); + }, + "requestHandler", writer -> { + writer.addDependency(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE); + writer.addImport("WebSocketHandler", "WebSocketHandler", + AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.packageName); + writer.write("requestHandler: new WebSocketHandler(),"); + }); + + switch (target) { + case REACT_NATIVE: + case BROWSER: + return transcribeStreamingHandlerConfig; + default: + return Collections.emptyMap(); + } + } + + private static boolean isTranscribeStreaming(ServiceShape service) { + String serviceId = service.getTrait(ServiceTrait.class).map(ServiceTrait::getSdkId).orElse(""); + return serviceId.equals("Transcribe Streaming"); + } +} + + diff --git a/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AwsDependency.java b/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AwsDependency.java index e769f6ef80bd7..40ef346bc5c9d 100644 --- a/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AwsDependency.java +++ b/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AwsDependency.java @@ -57,7 +57,8 @@ public enum AwsDependency implements SymbolDependencyContainer { UUID_GENERATOR(NORMAL_DEPENDENCY, "uuid", "^7.0.0"), UUID_GENERATOR_TYPES(DEV_DEPENDENCY, "@types/uuid", "^7.0.0"), MIDDLEWARE_EVENTSTREAM(NORMAL_DEPENDENCY, "@aws-sdk/middleware-eventstream", "^1.0.0-beta.0"), - AWS_SDK_EVENTSTREAM_HANDLER_NODE(NORMAL_DEPENDENCY, "@aws-sdk/eventstream-handler-node", "^1.0.0-beta.0"); + AWS_SDK_EVENTSTREAM_HANDLER_NODE(NORMAL_DEPENDENCY, "@aws-sdk/eventstream-handler-node", "^1.0.0-beta.0"), + TRANSCRIBE_STREAMING_MIDDLEWARE(NORMAL_DEPENDENCY, "@aws-sdk/middleware-sdk-transcribe-streaming", "^1.0.0-gamma.0"); public final String packageName; public final String version; diff --git a/codegen/smithy-aws-typescript-codegen/src/main/resources/META-INF/services/software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration b/codegen/smithy-aws-typescript-codegen/src/main/resources/META-INF/services/software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration index 8f0548e82e881..63298334be212 100644 --- a/codegen/smithy-aws-typescript-codegen/src/main/resources/META-INF/services/software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration +++ b/codegen/smithy-aws-typescript-codegen/src/main/resources/META-INF/services/software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration @@ -10,3 +10,4 @@ software.amazon.smithy.aws.typescript.codegen.AddBodyChecksumGeneratorDependency software.amazon.smithy.aws.typescript.codegen.AddS3Config software.amazon.smithy.aws.typescript.codegen.AddEventStreamHandlingDependency software.amazon.smithy.aws.typescript.codegen.AddHttp2Dependency +software.amazon.smithy.aws.typescript.codegen.AddTranscribeStreamingDependency diff --git a/packages/eventstream-serde-browser/package.json b/packages/eventstream-serde-browser/package.json index 1aa96ada56c4b..4cde63709bab7 100644 --- a/packages/eventstream-serde-browser/package.json +++ b/packages/eventstream-serde-browser/package.json @@ -16,6 +16,7 @@ "license": "Apache-2.0", "dependencies": { "@aws-sdk/eventstream-marshaller": "1.0.0-gamma.1", + "@aws-sdk/eventstream-serde-universal": "1.0.0-gamma.0", "@aws-sdk/types": "1.0.0-gamma.1", "tslib": "^1.8.0" }, diff --git a/packages/eventstream-serde-browser/src/EventStreamMarshaller.ts b/packages/eventstream-serde-browser/src/EventStreamMarshaller.ts index 861af806f527c..a9214f77b4ba2 100644 --- a/packages/eventstream-serde-browser/src/EventStreamMarshaller.ts +++ b/packages/eventstream-serde-browser/src/EventStreamMarshaller.ts @@ -1,3 +1,4 @@ +import { EventStreamMarshaller as UniversalEventStreamMarshaller } from "@aws-sdk/eventstream-serde-universal"; import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller"; import { Encoder, @@ -5,10 +6,7 @@ import { Message, EventStreamMarshaller as IEventStreamMarshaller } from "@aws-sdk/types"; -import { ReadableStreamtoIterable } from "./utils"; -import { getChunkedStream } from "./getChunkedStream"; -import { getEventMessageStream } from "./getEventMessageStream"; -import { getDeserializingStream } from "./getDeserializingStream"; +import { readableStreamtoIterable, iterableToReadableStream } from "./utils"; export interface EventStreamMarshaller extends IEventStreamMarshaller {} @@ -17,35 +15,48 @@ export interface EventStreamMarshallerOptions { utf8Decoder: Decoder; } +/** + * Utility class used to serialize and deserialize event streams in + * browsers and ReactNative. + * + * In browsers where ReadableStream API is available: + * * deserialize from ReadableStream to an async iterable of output structure + * * serialize from async iterable of input structure to ReadableStream + * In ReactNative where only async iterable API is available: + * * deserialize from async iterable of binaries to async iterable of output structure + * * serialize from async iterable of input structure to async iterable of binaries + * + * We use ReadableStream API in browsers because of the consistency with other + * streaming operations, where ReadableStream API is used to denote streaming data. + * Whereas in ReactNative, ReadableStream API is not available, we use async iterable + * for streaming data although it has lower throughput. + */ export class EventStreamMarshaller { private readonly eventMarshaller: EventMarshaller; + private readonly universalMarshaller: UniversalEventStreamMarshaller; constructor({ utf8Encoder, utf8Decoder }: EventStreamMarshallerOptions) { this.eventMarshaller = new EventMarshaller(utf8Encoder, utf8Decoder); + this.universalMarshaller = new UniversalEventStreamMarshaller({ + utf8Decoder, + utf8Encoder + }); } deserialize( - body: ReadableStream, + body: ReadableStream | AsyncIterable, deserializer: (input: { [event: string]: Message }) => Promise ): AsyncIterable { - const chunkedStream = getChunkedStream(body); - const messageStream = getEventMessageStream( - chunkedStream, - this.eventMarshaller - ); - const deserialingStream = getDeserializingStream( - messageStream, - deserializer - ); - return ReadableStreamtoIterable(deserialingStream); + const bodyIterable = isReadableStream(body) + ? readableStreamtoIterable(body) + : body; + return this.universalMarshaller.deserialize(bodyIterable, deserializer); } /** - * Generate a ReadableStream that serialize events - * to event stream binary chunks; Use a pull stream - * here to support low connection speed. + * Generate a stream that serialize events into stream of binary chunks; * - * This doesn't work on browser currently because - * browser doesn't support upload streaming. + * Caveat is that streaming request payload doesn't work on browser with native + * xhr or fetch handler currently because they don't support upload streaming. * reference: * * https://bugs.chromium.org/p/chromium/issues/detail?id=688906 * * https://bugzilla.mozilla.org/show_bug.cgi?id=1387483 @@ -54,8 +65,16 @@ export class EventStreamMarshaller { serialize( input: AsyncIterable, serializer: (event: T) => Message - ): ReadableStream { - throw new Error(`event stream request in browser is not supported -Reference: https://bugs.chromium.org/p/chromium/issues/detail?id=688906`); + ): ReadableStream | AsyncIterable { + const serialziedIterable = this.universalMarshaller.serialize( + input, + serializer + ); + return typeof ReadableStream === "function" + ? iterableToReadableStream(serialziedIterable) + : serialziedIterable; } } + +const isReadableStream = (body: any): body is ReadableStream => + typeof ReadableStream === "function" && body instanceof ReadableStream; diff --git a/packages/eventstream-serde-browser/src/getChunkedStream.ts b/packages/eventstream-serde-browser/src/getChunkedStream.ts deleted file mode 100644 index 45f1a0c08feb9..0000000000000 --- a/packages/eventstream-serde-browser/src/getChunkedStream.ts +++ /dev/null @@ -1,111 +0,0 @@ -export function getChunkedStream( - source: ReadableStream -): ReadableStream { - const sourceReader = source.getReader(); - let currentMessageTotalLength = 0; - let currentMessagePendingLength = 0; - let currentMessage: Uint8Array | null = null; - let messageLengthBuffer: Uint8Array | null = null; - const allocateMessage = function (size: number) { - if (typeof size !== "number") { - throw new Error( - "Attempted to allocate an event message where size was not a number: " + - size - ); - } - currentMessageTotalLength = size; - currentMessagePendingLength = 4; - currentMessage = new Uint8Array(size); - const currentMessageView = new DataView(currentMessage.buffer); - currentMessageView.setUint32(0, size, false); //set big-endian Uint32 to 0~3 bytes - }; - - const chunkedStream = new ReadableStream({ - start(controller) { - function push() { - return sourceReader.read().then(({ done, value }) => { - if (done) { - if (currentMessageTotalLength) { - if (currentMessageTotalLength === currentMessagePendingLength) { - controller.enqueue(currentMessage); - } else { - throw new Error("Truncated event message received."); - } - } - controller.close(); - return; - } - - // @ts-ignore error TS2532: Object is possibly 'undefined' for value - const chunkLength = value.length; - let currentOffset = 0; - - while (currentOffset < chunkLength) { - // create new message if necessary - if (!currentMessage) { - // working on a new message, determine total length - const bytesRemaining = chunkLength - currentOffset; - // prevent edge case where total length spans 2 chunks - if (!messageLengthBuffer) { - messageLengthBuffer = new Uint8Array(4); - } - const numBytesForTotal = Math.min( - 4 - currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer - bytesRemaining // bytes left in chunk - ); - - messageLengthBuffer.set( - // @ts-ignore error TS2532: Object is possibly 'undefined' for value - value.slice(currentOffset, currentOffset + numBytesForTotal), - currentMessagePendingLength - ); - - currentMessagePendingLength += numBytesForTotal; - currentOffset += numBytesForTotal; - - if (currentMessagePendingLength < 4) { - // not enough information to create the current message - break; - } - allocateMessage( - new DataView(messageLengthBuffer.buffer).getUint32(0, false) - ); - messageLengthBuffer = null; - } - - // write data into current message - const numBytesToWrite = Math.min( - currentMessageTotalLength - currentMessagePendingLength, // number of bytes left to complete message - chunkLength - currentOffset // number of bytes left in the original chunk - ); - currentMessage!.set( - // @ts-ignore error TS2532: Object is possibly 'undefined' for value - value.slice(currentOffset, currentOffset + numBytesToWrite), - currentMessagePendingLength - ); - currentMessagePendingLength += numBytesToWrite; - currentOffset += numBytesToWrite; - - // check if a message is ready to be pushed - if ( - currentMessageTotalLength && - currentMessageTotalLength === currentMessagePendingLength - ) { - // push out the message - controller.enqueue(currentMessage); - // cleanup - currentMessage = null; - currentMessageTotalLength = 0; - currentMessagePendingLength = 0; - } - } - push(); - }); - } - - push(); - } - }); - - return chunkedStream; -} diff --git a/packages/eventstream-serde-browser/src/getDeserializingStream.ts b/packages/eventstream-serde-browser/src/getDeserializingStream.ts deleted file mode 100644 index 78c050f25eaf4..0000000000000 --- a/packages/eventstream-serde-browser/src/getDeserializingStream.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { Message } from "@aws-sdk/types"; - -export function getDeserializingStream( - messageStream: ReadableStream<{ [name: string]: Message }>, - deserializer: (input: any) => any -): ReadableStream<{ [name: string]: any }> { - const messageReader = messageStream.getReader(); - const deserializedStream = new ReadableStream<{ [name: string]: any }>({ - start(controller) { - function push() { - messageReader.read().then(async ({ done, value }) => { - if (done) { - controller.close(); - return; - } - - try { - controller.enqueue(await deserializer(value)); - push(); - } catch (e) { - controller.error(e); - } - }); - } - - push(); - } - }); - return deserializedStream; -} diff --git a/packages/eventstream-serde-browser/src/getEventMessageStream.ts b/packages/eventstream-serde-browser/src/getEventMessageStream.ts deleted file mode 100644 index 13a0bbd236ba4..0000000000000 --- a/packages/eventstream-serde-browser/src/getEventMessageStream.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller"; -import { Message } from "@aws-sdk/types"; - -export function getEventMessageStream( - chunkedStream: ReadableStream, - eventMarshaller: EventMarshaller -): ReadableStream<{ [name: string]: Message }> { - const chunkReader = chunkedStream.getReader(); - const messageStream = new ReadableStream<{ [name: string]: Message }>({ - start(controller) { - function push() { - chunkReader.read().then(({ done, value }) => { - if (done) { - controller.close(); - return; - } - - // @ts-ignore: error TS2345: Argument of type 'Uint8Array | undefined' - // is not assignable to parameter of type 'ArrayBufferView' - const message = eventMarshaller.unmarshall(value); - const { value: messageType } = message.headers[":message-type"]; - if (messageType === "error") { - // Unmodeled exception in event - const unmodeledError = new Error( - (message.headers[":error-message"].value as string) || - "UnknownError" - ); - unmodeledError.name = message.headers[":error-code"] - .value as string; - controller.error(unmodeledError); - } else if (messageType === "exception") { - // throw this.exceptionsDeserializer(message); - controller.enqueue({ - [message.headers[":exception-type"].value as string]: message - }); - } else if (messageType === "event") { - controller.enqueue({ - [message.headers[":event-type"].value as string]: message - }); - } else { - controller.error( - new Error( - `Unrecognizable event type: ${message.headers[":event-type"].value}` - ) - ); - } - push(); - }); - } - - push(); - } - }); - - return messageStream; -} diff --git a/packages/eventstream-serde-browser/src/index.ts b/packages/eventstream-serde-browser/src/index.ts index 1a8f850f5f8ac..06e0068336085 100644 --- a/packages/eventstream-serde-browser/src/index.ts +++ b/packages/eventstream-serde-browser/src/index.ts @@ -1 +1,3 @@ export * from "./provider"; +export * from "./EventStreamMarshaller"; +export * from "./utils"; diff --git a/packages/eventstream-serde-browser/src/utils.ts b/packages/eventstream-serde-browser/src/utils.ts index d261672d7c728..53b1d18b71635 100644 --- a/packages/eventstream-serde-browser/src/utils.ts +++ b/packages/eventstream-serde-browser/src/utils.ts @@ -1,19 +1,38 @@ /** - * Convert ReadableStream into an async iterable. + * A util function converting ReadableStream into an async iterable. + * Reference: https://jakearchibald.com/2017/async-iterators-and-generators/#making-streams-iterate */ -export async function* ReadableStreamtoIterable( - readableStream: ReadableStream -): AsyncIterable { - const reader = readableStream.getReader(); - let done = false; - while (!done) { - const { done: end, value } = await reader.read(); - if (end) { - done = true; - break; - } - if (value) { - yield value; +export const readableStreamtoIterable = ( + readableStream: ReadableStream +): AsyncIterable => ({ + [Symbol.asyncIterator]: async function* () { + const reader = readableStream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) return; + yield value as T; + } + } finally { + reader.releaseLock(); } } -} +}); + +/** + * A util function converting async iterable to a ReadableStream. + */ +export const iterableToReadableStream = ( + asyncIterable: AsyncIterable +): ReadableStream => { + const iterator = asyncIterable[Symbol.asyncIterator](); + return new ReadableStream({ + async pull(controller) { + const { done, value } = await iterator.next(); + if (done) { + return controller.close(); + } + controller.enqueue(value); + } + }); +}; diff --git a/packages/middleware-sdk-transcribe-streaming/.gitignore b/packages/middleware-sdk-transcribe-streaming/.gitignore new file mode 100644 index 0000000000000..f30a2ca2cef55 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/.gitignore @@ -0,0 +1,9 @@ +/node_modules/ +/build/ +/dist/ +/coverage/ +/docs/ +*.tsbuildinfo +*.tgz +*.log +package-lock.json diff --git a/packages/middleware-sdk-transcribe-streaming/.npmignore b/packages/middleware-sdk-transcribe-streaming/.npmignore new file mode 100644 index 0000000000000..4b9fe3abf33a6 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/.npmignore @@ -0,0 +1,13 @@ +/src/ +/coverage/ +/docs/ +tsconfig.test.json +*.tsbuildinfo + +*.spec.js +*.spec.d.ts +*.spec.js.map + +*.fixture.js +*.fixture.d.ts +*.fixture.js.map diff --git a/packages/middleware-sdk-transcribe-streaming/LICENSE b/packages/middleware-sdk-transcribe-streaming/LICENSE new file mode 100644 index 0000000000000..e907b58668da3 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/packages/middleware-sdk-transcribe-streaming/README.md b/packages/middleware-sdk-transcribe-streaming/README.md new file mode 100644 index 0000000000000..6aa9abcacdc27 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/README.md @@ -0,0 +1,4 @@ +# @aws-sdk/@aws-sdk/middleware-sdk-transcribe-streaming + +[![NPM version](https://img.shields.io/npm/v/@aws-sdk/@aws-sdk/middleware-sdk-transcribe-streaming/beta.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/middleware-sdk-transcribe-streaming) +[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/@aws-sdk/middleware-sdk-transcribe-streaming.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/middleware-sdk-transcribe-streaming) diff --git a/packages/middleware-sdk-transcribe-streaming/jest.config.js b/packages/middleware-sdk-transcribe-streaming/jest.config.js new file mode 100644 index 0000000000000..e9c9f9642d5f5 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/jest.config.js @@ -0,0 +1,7 @@ +const base = require("../../jest.config.base.js"); + +module.exports = { + ...base, + //only test cjs dist, avoid testing the package twice + testPathIgnorePatterns: ["/node_modules/", "/es/"] +}; diff --git a/packages/middleware-sdk-transcribe-streaming/package.json b/packages/middleware-sdk-transcribe-streaming/package.json new file mode 100644 index 0000000000000..e31d35991f9cb --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/package.json @@ -0,0 +1,38 @@ +{ + "name": "@aws-sdk/middleware-sdk-transcribe-streaming", + "version": "1.0.0-gamma.0", + "main": "./dist/cjs/index.js", + "module": "./dist/es/index.js", + "scripts": { + "build:cjs": "tsc -p tsconfig.json", + "build:es": "tsc -p tsconfig.es.json", + "build": "yarn build:es && yarn build:cjs", + "prepublishOnly": "yarn build", + "pretest": "yarn build", + "test": "jest --passWithNoTests" + }, + "types": "./dist/cjs/index.d.ts", + "author": { + "name": "AWS SDK for JavaScript Team", + "url": "https://aws.amazon.com/javascript/" + }, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/eventstream-serde-browser": "1.0.0-gamma.1", + "@aws-sdk/middleware-signing": "1.0.0-gamma.1", + "@aws-sdk/protocol-http": "1.0.0-gamma.1", + "@aws-sdk/signature-v4": "1.0.0-gamma.1", + "@aws-sdk/types": "1.0.0-gamma.1", + "@aws-sdk/util-format-url": "1.0.0-gamma.1", + "react-native-get-random-values": "^1.4.0", + "tslib": "^1.8.0", + "uuid": "^8.0.0" + }, + "devDependencies": { + "@types/jest": "^25.1.4", + "jest": "^25.1.0", + "jest-websocket-mock": "^2.0.2", + "mock-socket": "^9.0.3", + "typescript": "~3.8.3" + } +} diff --git a/packages/middleware-sdk-transcribe-streaming/src/configuration.ts b/packages/middleware-sdk-transcribe-streaming/src/configuration.ts new file mode 100644 index 0000000000000..ac876c6f35889 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/configuration.ts @@ -0,0 +1,38 @@ +import { Provider, RequestSigner, RequestHandler } from "@aws-sdk/types"; +import { SignatureV4 as BaseSignatureV4 } from "@aws-sdk/signature-v4"; +import { SignatureV4 } from "./signer"; + +export interface WebSocketInputConfig {} + +interface PreviouslyResolved { + signer: Provider; + requestHandler: RequestHandler; +} + +export interface WebSocketResolvedConfig { + signer: Provider; + requestHandler: RequestHandler; +} + +export const resolveWebSocketConfig = ( + input: T & WebSocketInputConfig & PreviouslyResolved +): T & WebSocketResolvedConfig => + input.requestHandler.metadata?.handlerProtocol !== "websocket" + ? input + : { + ...input, + signer: async () => { + const signerObj = await input.signer(); + if (validateSigner(signerObj)) { + return new SignatureV4({ signer: signerObj }); + } + throw new Error( + "Expected SignatureV4 signer, please check the client constructor." + ); + } + }; + +const validateSigner = (signer: any): signer is BaseSignatureV4 => + // We cannot use instanceof here. Because we might import the wrong SignatureV4 + // constructor here as multiple version of packages maybe installed here. + (signer.constructor.toString() as string).indexOf("SignatureV4") >= 0; diff --git a/packages/middleware-sdk-transcribe-streaming/src/eventstream-handler.ts b/packages/middleware-sdk-transcribe-streaming/src/eventstream-handler.ts new file mode 100644 index 0000000000000..fdf60adc5aae2 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/eventstream-handler.ts @@ -0,0 +1,10 @@ +import { + EventStreamPayloadHandler as IEventStreamPayloadHandler, + BuildHandler, + BuildHandlerArguments +} from "@aws-sdk/types"; + +export const eventStreamPayloadHandler: IEventStreamPayloadHandler = { + handle: (next: BuildHandler, args: BuildHandlerArguments) => + next(args) +}; diff --git a/packages/middleware-sdk-transcribe-streaming/src/index.native.ts b/packages/middleware-sdk-transcribe-streaming/src/index.native.ts new file mode 100644 index 0000000000000..ca4a5e1bd1ef3 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/index.native.ts @@ -0,0 +1,9 @@ +//reference: https://github.com/uuidjs/uuid#getrandomvalues-not-supported +import "react-native-get-random-values"; +export * from "./websocket-handler"; +export * from "./eventstream-handler"; +export * from "./signer"; +export * from "./configuration"; +export * from "./middleware-endpoint"; +export * from "./middleware-session-id"; +export * from "./plugin"; diff --git a/packages/middleware-sdk-transcribe-streaming/src/index.ts b/packages/middleware-sdk-transcribe-streaming/src/index.ts new file mode 100644 index 0000000000000..e2b9dcf968a4b --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/index.ts @@ -0,0 +1,7 @@ +export * from "./websocket-handler"; +export * from "./eventstream-handler"; +export * from "./signer"; +export * from "./configuration"; +export * from "./middleware-endpoint"; +export * from "./middleware-session-id"; +export * from "./plugin"; diff --git a/packages/middleware-sdk-transcribe-streaming/src/middleware-endpoint.spec.ts b/packages/middleware-sdk-transcribe-streaming/src/middleware-endpoint.spec.ts new file mode 100644 index 0000000000000..9fecfdfaee619 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/middleware-endpoint.spec.ts @@ -0,0 +1,117 @@ +import { RequestHandler, BuildHandlerArguments } from "@aws-sdk/types"; +import { websocketURLMiddleware } from "./middleware-endpoint"; +import { HttpRequest } from "@aws-sdk/protocol-http"; + +describe("websocketURLMiddleware", () => { + const mockHandler: RequestHandler = { + metadata: { handlerProtocol: "websocket" }, + handle: () => ({} as any) + }; + it("should skip non-http request", done => { + const nonHttpRequest = { + foo: "bar" + }; + const next = (args: BuildHandlerArguments) => { + expect(args.request).toEqual(nonHttpRequest); + done(); + }; + const mw = websocketURLMiddleware({ requestHandler: mockHandler }); + mw(next as any, {} as any)({ request: nonHttpRequest, input: {} }); + }); + + it("should skip non WebSocket requests", done => { + const mockHandler: RequestHandler = { + metadata: { handlerProtocol: "some_protocol" }, + handle: () => ({} as any) + }; + const request = new HttpRequest({}); + const next = (args: BuildHandlerArguments) => { + expect(args.request).toEqual(request); + done(); + }; + const mw = websocketURLMiddleware({ requestHandler: mockHandler }); + mw(next as any, {} as any)({ request, input: {} }); + }); + + it("should update endpoint to websocket url", done => { + const request = new HttpRequest({ + protocol: "https:", + hostname: "transcribestreaming.us-east-1.amazonaws.com", + path: "/stream-transcription", + method: "POST" + }); + const next = (args: BuildHandlerArguments) => { + expect(HttpRequest.isInstance(args.request)).toBeTruthy(); + const processed = args.request as HttpRequest; + expect(processed.protocol).toEqual("wss:"); + expect(processed.hostname).toEqual( + "transcribestreaming.us-east-1.amazonaws.com:8443" + ); + expect(processed.path).toEqual("/stream-transcription-websocket"); + expect(processed.method).toEqual("GET"); + done(); + }; + const mw = websocketURLMiddleware({ requestHandler: mockHandler }); + mw(next as any, {} as any)({ request, input: {} }); + }); + + it("should remove content-type and sha256 hash header", done => { + const request = new HttpRequest({ + headers: { + "content-type": "application/vnd.amazon.eventstream", + "Content-Type": "application/vnd.amazon.eventstream", + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-EVENTS", + "X-Amz-Content-Sha256": "STREAMING-AWS4-HMAC-SHA256-EVENTS" + } + }); + const next = (args: BuildHandlerArguments) => { + expect(HttpRequest.isInstance(args.request)).toBeTruthy(); + const processed = args.request as HttpRequest; + expect(processed.headers["content-type"]).toBeUndefined(); + expect(processed.headers["Content-Type"]).toBeUndefined(); + expect(processed.headers["x-amz-content-sha256"]).toBeUndefined(); + expect(processed.headers["X-Amz-Content-Sha256"]).toBeUndefined(); + done(); + }; + const mw = websocketURLMiddleware({ requestHandler: mockHandler }); + mw(next as any, {} as any)({ request, input: {} }); + }); + + it("should contains host header after adjustment", done => { + const request = new HttpRequest({}); + const next = (args: BuildHandlerArguments) => { + expect(HttpRequest.isInstance(args.request)).toBeTruthy(); + const processed = args.request as HttpRequest; + expect(processed.headers["host"]).toBeDefined(); + done(); + }; + const mw = websocketURLMiddleware({ requestHandler: mockHandler }); + mw(next as any, {} as any)({ request, input: {} }); + }); + + it("should move API parameters from headers to query", done => { + const request = new HttpRequest({ + headers: { + "x-amzn-transcribe-language-code": "en-US", + "x-amzn-transcribe-media-encoding": "pmc", + "x-amzn-transcribe-session-id": "123", + "x-amzn-transcribe-vocabulary-name": "abc", + "x-amzn-transcribe-sample-rate": "44100" + } + }); + const next = (args: BuildHandlerArguments) => { + expect(HttpRequest.isInstance(args.request)).toBeTruthy(); + const processed = args.request as HttpRequest; + expect(processed.query).toEqual({ + "language-code": "en-US", + "media-encoding": "pmc", + "session-id": "123", + "vocabulary-name": "abc", + "sample-rate": "44100" + }); + done(); + }; + const mw = websocketURLMiddleware({ requestHandler: mockHandler }); + mw(next as any, {} as any)({ request, input: {} }); + }); +}); diff --git a/packages/middleware-sdk-transcribe-streaming/src/middleware-endpoint.ts b/packages/middleware-sdk-transcribe-streaming/src/middleware-endpoint.ts new file mode 100644 index 0000000000000..21ecd40c2336c --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/middleware-endpoint.ts @@ -0,0 +1,69 @@ +import { + BuildMiddleware, + BuildHandler, + BuildHandlerArguments, + RequestHandler, + BuildHandlerOptions, + RelativeLocation +} from "@aws-sdk/types"; +import { HttpRequest } from "@aws-sdk/protocol-http"; + +/** + * Middleware that generates WebSocket URL to TranscribeStreaming service + * Reference: https://docs.aws.amazon.com/transcribe/latest/dg/websocket.html + */ +export const websocketURLMiddleware = (options: { + requestHandler: RequestHandler; +}): BuildMiddleware => (next: BuildHandler) => ( + args: BuildHandlerArguments +) => { + const { request } = args; + if ( + HttpRequest.isInstance(request) && + options.requestHandler.metadata?.handlerProtocol === "websocket" + ) { + // Update http/2 endpoint to WebSocket-specific endpoint. + request.protocol = "wss:"; + // Append port to hostname because it needs to be signed together + request.hostname = `${request.hostname}:8443`; + request.path = `${request.path}-websocket`; + request.method = "GET"; + + // Move headers to query string. Because the signature is generated with + // headers moved to query, the endpoint url needs to tally with the signature. + const { headers } = request; + + // 'Content-Type' and 'x-amz-content-sha256' headers are normally set for + // event stream, but WebSocket doesn't require it. + // See: 'eventStreamHeaderMiddleware' in @aws-sdk/middleware-eventstream + delete headers["Content-Type"]; + delete headers["x-amz-content-sha256"]; + + // Serialized header like 'x-amzn-transcribe-sample-rate' should be 'sample-rate' + // in WebSocket URL. + for (const name of Object.keys(headers)) { + if (name.indexOf("x-amzn-transcribe-") === 0) { + const chunkedName = name.replace("x-amzn-transcribe-", ""); + request.query[chunkedName] = headers[name]; + } + } + + // The service perfers Node.js style 'user-agent' over browser-style + // 'x-amz-user-agent' + if (headers["x-amz-user-agent"]) { + request.query["user-agent"] = headers["x-amz-user-agent"]; + } + // Host header is required for signing + request.headers = { host: request.hostname }; + } + return next(args); +}; + +export const websocketURLMiddlewareOptions: BuildHandlerOptions & + RelativeLocation = { + step: "build", + name: "websocketURLMiddleware", + tags: ["WEBSOCKET", "EVENT_STREAM"], + relation: "after", + toMiddleware: "eventStreamHeaderMiddleware" +}; diff --git a/packages/middleware-sdk-transcribe-streaming/src/middleware-session-id.ts b/packages/middleware-sdk-transcribe-streaming/src/middleware-session-id.ts new file mode 100644 index 0000000000000..2016836170c45 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/middleware-session-id.ts @@ -0,0 +1,50 @@ +import { + InitializeMiddleware, + InitializeHandler, + InitializeHandlerArguments, + InitializeHandlerOptions, + RequestHandler +} from "@aws-sdk/types"; +import { v4 } from "uuid"; + +type WithSession = { + SessionId?: string; + [key: string]: any; +}; + +/** + * Middleware that inject default sessionId for operations, and inject + * the parameters from request to the response metadata. This is + * necessary because the SDK cannot access any parameters other than + * the result stream. So it copies the parameters from input to the same + * parameters in the output. + */ +export const injectSessionIdMiddleware = (config: { + requestHandler: RequestHandler; +}): InitializeMiddleware => ( + next: InitializeHandler +) => async (args: InitializeHandlerArguments) => { + if (args.input.SessionId === undefined && isWebSocket(config)) { + args.input.SessionId = v4(); + } + const requestParams = { + ...args.input + }; + const response = await next(args); + const output = response.output; + for (const key of Object.keys(output)) { + if (output[key] === undefined && requestParams[key]) { + output[key] = requestParams[key]; + } + } + return response; +}; + +const isWebSocket = (config: { requestHandler: RequestHandler }) => + config.requestHandler.metadata?.handlerProtocol === "websocket"; + +export const injectSessionIdMiddlewareOptions: InitializeHandlerOptions = { + step: "initialize", + name: "injectSessionIdMiddleware", + tags: ["WEBSOCKET", "EVENT_STREAM"] +}; diff --git a/packages/middleware-sdk-transcribe-streaming/src/plugin.ts b/packages/middleware-sdk-transcribe-streaming/src/plugin.ts new file mode 100644 index 0000000000000..d6585a095ad7c --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/plugin.ts @@ -0,0 +1,25 @@ +import { Pluggable } from "@aws-sdk/types"; +import { + websocketURLMiddleware, + websocketURLMiddlewareOptions +} from "./middleware-endpoint"; +import { + injectSessionIdMiddleware, + injectSessionIdMiddlewareOptions +} from "./middleware-session-id"; +import { WebSocketResolvedConfig } from "./configuration"; + +export const getWebSocketPlugin = ( + config: WebSocketResolvedConfig +): Pluggable => ({ + applyToStack: clientStack => { + clientStack.addRelativeTo( + websocketURLMiddleware(config), + websocketURLMiddlewareOptions + ); + clientStack.add( + injectSessionIdMiddleware(config), + injectSessionIdMiddlewareOptions + ); + } +}); diff --git a/packages/middleware-sdk-transcribe-streaming/src/signer.ts b/packages/middleware-sdk-transcribe-streaming/src/signer.ts new file mode 100644 index 0000000000000..ea9849d15a4bd --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/signer.ts @@ -0,0 +1,42 @@ +import { + RequestSigner, + HttpRequest as IHttpRequest, + RequestSigningArguments, + RequestPresigner, + RequestPresigningArguments +} from "@aws-sdk/types"; +import { SignatureV4 as BaseSignatureV4 } from "@aws-sdk/signature-v4"; +import { HttpRequest } from "@aws-sdk/protocol-http"; + +export class SignatureV4 implements RequestSigner, RequestPresigner { + private readonly signer: BaseSignatureV4; + constructor(options: { signer: BaseSignatureV4 }) { + this.signer = options.signer; + } + + public presign( + originalRequest: IHttpRequest, + options: RequestPresigningArguments = {} + ): Promise { + return this.signer.presign(originalRequest, options); + } + + public async sign( + toSign: IHttpRequest, + options?: RequestSigningArguments + ): Promise { + if (HttpRequest.isInstance(toSign)) { + // Presign the endpoint url with empty body, otherwise + // the payload hash would be UNSINGED_PAYLOAD + const signedRequest = await this.signer.presign({ ...toSign, body: "" }, { + expiresIn: 5 * 60 // presigned url must be expired within 5 mins + } as any); + return { + ...signedRequest, + body: toSign.body + }; + } else { + return this.signer.sign(toSign, options); + } + } +} diff --git a/packages/middleware-sdk-transcribe-streaming/src/websocket-handler.spec.ts b/packages/middleware-sdk-transcribe-streaming/src/websocket-handler.spec.ts new file mode 100644 index 0000000000000..f0216531e7898 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/websocket-handler.spec.ts @@ -0,0 +1,76 @@ +import { WebSocketHandler } from "./websocket-handler"; +import { HttpRequest } from "@aws-sdk/protocol-http"; +import { PassThrough } from "stream"; +import WS from "jest-websocket-mock"; +import { WebSocket } from "mock-socket"; + +describe("WebSocketHandler", () => { + beforeEach(() => { + (global as any).WebSocket = WebSocket; + }); + afterEach(() => { + WS.clean(); + jest.clearAllMocks(); + }); + it("should contain protocol metadata", () => { + const handler = new WebSocketHandler(); + expect(handler.metadata.handlerProtocol).toEqual("websocket"); + }); + + it("should throw in output stream if input stream throws", async () => { + expect.assertions(2); + const handler = new WebSocketHandler(); + //Using Node stream is fine because they are also async iterables. + const payload = new PassThrough(); + const server = new WS("ws://localhost:6789"); + const { + response: { body: responsePayload } + } = await handler.handle( + new HttpRequest({ + body: payload, + hostname: "localhost:6789", + protocol: "ws:" + }) + ); + await server.connected; + payload.emit("error", new Error("FakeError")); + try { + for await (const chunk of responsePayload) { + /** pass */ + } + } catch (err) { + expect(err).toBeDefined(); + expect(err.message).toEqual("FakeError"); + } + }); + + it("should return retryable error if cannot setup ws connection", async () => { + expect.assertions(4); + const originalFn = setTimeout; + (global as any).setTimeout = jest.fn().mockImplementation(setTimeout); + const connectionTimeout = 1000; + const handler = new WebSocketHandler({ connectionTimeout }); + //Using Node stream is fine because they are also async iterables. + const payload = new PassThrough(); + try { + await handler.handle( + new HttpRequest({ + body: payload, + hostname: "localhost:9876", //invalid websocket endpoint + protocol: "ws:" + }) + ); + } catch (err) { + expect(err).toBeDefined(); + expect(err.$metadata).toBeDefined(); + expect(err.$metadata.httpStatusCode >= 500).toBe(true); + expect( + (global.setTimeout as jest.Mock).mock.calls.filter(args => { + //find the 'setTimeout' call from the websocket handler + return args[0].toString().indexOf("$metadata") >= 0; + })[0][1] + ).toBe(connectionTimeout); + } + (global as any).setTimeout = originalFn; + }); +}); diff --git a/packages/middleware-sdk-transcribe-streaming/src/websocket-handler.ts b/packages/middleware-sdk-transcribe-streaming/src/websocket-handler.ts new file mode 100644 index 0000000000000..6fe86c387fd25 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/src/websocket-handler.ts @@ -0,0 +1,159 @@ +import { HttpHandlerOptions, RequestHandlerMetadata } from "@aws-sdk/types"; +import { HttpHandler, HttpRequest, HttpResponse } from "@aws-sdk/protocol-http"; +import { formatUrl } from "@aws-sdk/util-format-url"; +import { + iterableToReadableStream, + readableStreamtoIterable +} from "@aws-sdk/eventstream-serde-browser"; + +export interface WebSocketHandlerOptions { + /** + * The maximum time in milliseconds that the connection phase of a request + * may take before the connection attempt is abandoned. + */ + connectionTimeout?: number; +} + +/** + * Base handler for websocket requests. By default, the request input and output + * body will be in a ReadableStream, because of interface consistency among middleware. + * If ReadableStream is not available, like in React-Native, the response body + * will be an async iterable. + */ +export class WebSocketHandler implements HttpHandler { + public readonly metadata: RequestHandlerMetadata = { + handlerProtocol: "websocket" + }; + private readonly connectionTimeout: number; + constructor({ connectionTimeout }: WebSocketHandlerOptions = {}) { + this.connectionTimeout = connectionTimeout || 2000; + } + + destroy(): void {} + + async handle( + request: HttpRequest, + options: HttpHandlerOptions = {} + ): Promise<{ response: HttpResponse }> { + const url = formatUrl(request); + const socket: WebSocket = new WebSocket(url); + socket.binaryType = "arraybuffer"; + await waitForReady(socket, this.connectionTimeout); + const { body } = request; + const bodyStream = getIterator(body); + const asyncIterable = connect(socket, bodyStream); + const outputPayload = toReadableStream(asyncIterable); + return { + response: new HttpResponse({ + statusCode: 200, // indicates connection success + body: outputPayload + }) + }; + } +} + +const waitForReady = (socket: WebSocket, connectionTimeout: number) => + new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject({ + $metadata: { + httpStatusCode: 500 + } + }); + }, connectionTimeout); + socket.onopen = () => { + clearTimeout(timeout); + resolve(); + }; + }); + +const connect = ( + socket: WebSocket, + data: AsyncIterable +): AsyncIterable => { + // To notify output stream any error thrown after response + // is returned while data keeps streaming. + let streamError: Error | undefined = undefined; + const outputStream: AsyncIterable = { + [Symbol.asyncIterator]: () => ({ + next: () => { + return new Promise((resolve, reject) => { + socket.onerror = error => { + socket.onclose = null; + socket.close(); + reject(error); + }; + socket.onclose = () => { + if (streamError) { + reject(streamError); + } else { + resolve({ + done: true, + value: undefined + }); + } + }; + socket.onmessage = event => { + resolve({ + done: false, + value: new Uint8Array(event.data) + }); + }; + }); + } + }) + }; + + const send = async (): Promise => { + try { + for await (const inputChunk of data) { + socket.send(inputChunk); + } + } catch (err) { + // We don't throw the error here because the send()'s returned + // would already be settled by the time sending chunk throws error. + // Instead, the notify the output stream to throw if there's + // exceptions + streamError = err; + } finally { + // WS status code: https://tools.ietf.org/html/rfc6455#section-7.4 + socket.close(1000); + } + }; + send(); + return outputStream; +}; + +/** + * Transfer payload data to an AsyncIterable. + * When the ReadableStream API is available in the runtime(e.g. browser), and + * the request body is ReadableStream, so we need to transfer it to AsyncIterable + * to make the stream consumable by WebSocket. + */ +const getIterator = (stream: any): AsyncIterable => { + // Noop if stream is already an async iterable + if (stream[Symbol.asyncIterator]) return stream; + else if (isReadableStream(stream)) { + //If stream is a ReadableStream, transfer the ReadableStream to async iterable. + return readableStreamtoIterable(stream); + } else { + //For other types, just wrap them with an async iterable. + return { + [Symbol.asyncIterator]: async function* () { + yield stream; + } + }; + } +}; + +/** + * Convert async iterable to a ReadableStream when ReadableStream API + * is available(browsers). Otherwise, leave as it is(ReactNative). + */ +const toReadableStream = (asyncIterable: AsyncIterable) => + typeof ReadableStream === "function" + ? iterableToReadableStream(asyncIterable) + : asyncIterable; + +const isReadableStream = (payload: any): payload is ReadableStream => + typeof ReadableStream === "function" && payload instanceof ReadableStream; diff --git a/packages/middleware-sdk-transcribe-streaming/tsconfig.es.json b/packages/middleware-sdk-transcribe-streaming/tsconfig.es.json new file mode 100644 index 0000000000000..8bacf65d673d4 --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/tsconfig.es.json @@ -0,0 +1,18 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "target": "ES2015", + "module": "esNext", + "moduleResolution": "node", + "lib": ["es5", "es2015.promise", "es2015.collection", "DOM"], + "declaration": true, + "sourceMap": true, + "strict": true, + "stripInternal": true, + "rootDir": "./src", + "outDir": "./dist/es", + "importHelpers": true, + "noEmitHelpers": true, + "incremental": true + } +} diff --git a/packages/middleware-sdk-transcribe-streaming/tsconfig.json b/packages/middleware-sdk-transcribe-streaming/tsconfig.json new file mode 100644 index 0000000000000..551f36accf08b --- /dev/null +++ b/packages/middleware-sdk-transcribe-streaming/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "target": "es2017", + "module": "commonjs", + "declaration": true, + "lib": ["DOM"], + "sourceMap": true, + "strict": true, + "stripInternal": true, + "rootDir": "./src", + "outDir": "./dist/cjs", + "importHelpers": true, + "noEmitHelpers": true, + "incremental": true + } +} diff --git a/scripts/generate-clients/code-gen.js b/scripts/generate-clients/code-gen.js index 649e47b1d84bf..e3b9092b9434e 100644 --- a/scripts/generate-clients/code-gen.js +++ b/scripts/generate-clients/code-gen.js @@ -41,9 +41,7 @@ const generateClients = async models => { if (!lstatSync(file).isFile()) return; const name = path.basename(file); console.log(`copying model ${name}...`); - copyFileSync(file, path.join(TEMP_CODE_GEN_INPUT_DIR, name), { - overwrite: true - }); + copyFileSync(file, path.join(TEMP_CODE_GEN_INPUT_DIR, name)); }); }); } else {