Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support bi-directional transcribe streaming over Websocket #1216

Merged
merged 6 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions clients/client-transcribe-streaming/TranscribeStreamingClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ import {
getRetryPlugin,
resolveRetryConfig
} from "@aws-sdk/middleware-retry";
import {
WebSocketInputConfig,
WebSocketResolvedConfig,
getWebSocketPlugin,
resolveWebSocketConfig
} from "@aws-sdk/middleware-sdk-transcribe-streaming";
import {
AwsAuthInputConfig,
AwsAuthResolvedConfig,
Expand Down Expand Up @@ -176,6 +182,7 @@ export type TranscribeStreamingClientConfig = Partial<
UserAgentInputConfig &
HostHeaderInputConfig &
EventStreamInputConfig &
WebSocketInputConfig &
EventStreamSerdeInputConfig;

export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfiguration<
Expand All @@ -189,6 +196,7 @@ export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfigurat
UserAgentResolvedConfig &
HostHeaderResolvedConfig &
EventStreamResolvedConfig &
WebSocketResolvedConfig &
EventStreamSerdeResolvedConfig;

/**
Expand All @@ -214,14 +222,16 @@ export class TranscribeStreamingClient extends __Client<
let _config_5 = resolveUserAgentConfig(_config_4);
let _config_6 = resolveHostHeaderConfig(_config_5);
let _config_7 = resolveEventStreamConfig(_config_6);
let _config_8 = resolveEventStreamSerdeConfig(_config_7);
super(_config_8);
this.config = _config_8;
let _config_8 = resolveWebSocketConfig(_config_7);
let _config_9 = resolveEventStreamSerdeConfig(_config_8);
super(_config_9);
this.config = _config_9;
this.middlewareStack.use(getAwsAuthPlugin(this.config));
this.middlewareStack.use(getRetryPlugin(this.config));
this.middlewareStack.use(getUserAgentPlugin(this.config));
this.middlewareStack.use(getContentLengthPlugin(this.config));
this.middlewareStack.use(getHostHeaderPlugin(this.config));
this.middlewareStack.use(getWebSocketPlugin(this.config));
}

destroy(): void {
Expand Down
3 changes: 2 additions & 1 deletion clients/client-transcribe-streaming/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"@aws-sdk/middleware-eventstream": "1.0.0-gamma.0",
"@aws-sdk/middleware-host-header": "1.0.0-gamma.1",
"@aws-sdk/middleware-retry": "1.0.0-gamma.1",
"@aws-sdk/middleware-sdk-transcribe-streaming": "1.0.0-gamma.0",
"@aws-sdk/middleware-serde": "1.0.0-gamma.1",
"@aws-sdk/middleware-signing": "1.0.0-gamma.1",
"@aws-sdk/middleware-stack": "1.0.0-gamma.1",
Expand Down Expand Up @@ -82,4 +83,4 @@
"url": "https://aws.amazon.com/javascript/"
},
"license": "Apache-2.0"
}
}
12 changes: 7 additions & 5 deletions clients/client-transcribe-streaming/runtimeConfig.browser.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { name, version } from "./package.json";
import { Sha256 } from "@aws-crypto/sha256-browser";
import { eventStreamSerdeProvider } from "@aws-sdk/eventstream-serde-browser";
import { FetchHttpHandler, streamCollector } from "@aws-sdk/fetch-http-handler";
import { streamCollector } from "@aws-sdk/fetch-http-handler";
import { invalidFunction } from "@aws-sdk/invalid-dependency";
import {
WebSocketHandler,
eventStreamPayloadHandler
} from "@aws-sdk/middleware-sdk-transcribe-streaming";
import { parseUrl } from "@aws-sdk/url-parser-browser";
import { fromBase64, toBase64 } from "@aws-sdk/util-base64-browser";
import { calculateBodyLength } from "@aws-sdk/util-body-length-browser";
Expand All @@ -19,12 +23,10 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
bodyLengthChecker: calculateBodyLength,
credentialDefaultProvider: invalidFunction("Credential is missing") as any,
defaultUserAgent: defaultUserAgent(name, version),
eventStreamPayloadHandlerProvider: () => ({
handle: invalidFunction("event stream request is not supported in browser.")
}),
eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler,
eventStreamSerdeProvider,
regionDefaultProvider: invalidFunction("Region is missing") as any,
requestHandler: new FetchHttpHandler(),
requestHandler: new WebSocketHandler(),
sha256: Sha256,
streamCollector,
urlParser: parseUrl,
Expand Down
18 changes: 6 additions & 12 deletions clients/client-transcribe-streaming/runtimeConfig.native.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { name, version } from "./package.json";
import { Sha256 } from "@aws-crypto/sha256-js";
import { invalidFunction } from "@aws-sdk/invalid-dependency";
import {
WebSocketHandler,
eventStreamPayloadHandler
} from "@aws-sdk/middleware-sdk-transcribe-streaming";
import { parseUrl } from "@aws-sdk/url-parser-node";
import { ClientDefaults } from "./TranscribeStreamingClient";
import { ClientDefaultValues as BrowserDefaults } from "./runtimeConfig.browser";
Expand All @@ -9,17 +12,8 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
...BrowserDefaults,
runtime: "react-native",
defaultUserAgent: `aws-sdk-js-v3-react-native-${name}/${version}`,
eventStreamPayloadHandlerProvider: () => ({
handle: invalidFunction(
"event stream request is not supported in ReactNative."
)
}),
eventStreamSerdeProvider: () => ({
serialize: invalidFunction("event stream is not supported in ReactNative."),
deserialize: invalidFunction(
"event stream is not supported in ReactNative."
)
}),
eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler,
requestHandler: new WebSocketHandler(),
sha256: Sha256,
urlParser: parseUrl
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package software.amazon.smithy.aws.typescript.codegen;

import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_CONFIG;
import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_MIDDLEWARE;


import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import software.amazon.smithy.aws.traits.ServiceTrait;
import software.amazon.smithy.codegen.core.SymbolProvider;
import software.amazon.smithy.model.Model;
import software.amazon.smithy.model.shapes.ServiceShape;
import software.amazon.smithy.typescript.codegen.LanguageTarget;
import software.amazon.smithy.typescript.codegen.TypeScriptDependency;
import software.amazon.smithy.typescript.codegen.TypeScriptSettings;
import software.amazon.smithy.typescript.codegen.TypeScriptWriter;
import software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin;
import software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration;
import software.amazon.smithy.utils.ListUtils;
import software.amazon.smithy.utils.MapUtils;

/**
* Add client plugins and configs to support WebSocket streaming for Transcribe
* Streaming service
* */
public class AddTranscribeStreamingDependency implements TypeScriptIntegration {
@Override
public List<RuntimeClientPlugin> getClientPlugins() {
return ListUtils.of(
RuntimeClientPlugin.builder()
.withConventions(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.dependency,
"WebSocket")
.servicePredicate((m, s) -> AddTranscribeStreamingDependency.isTranscribeStreaming(s))
.build()
);
}

@Override
public Map<String, Consumer<TypeScriptWriter>> getRuntimeConfigWriters(
TypeScriptSettings settings,
Model model,
SymbolProvider symbolProvider,
LanguageTarget target
) {
ServiceShape service = settings.getService(model);
if (!isTranscribeStreaming(service)) {
return Collections.emptyMap();
}

Map<String, Consumer<TypeScriptWriter>> transcribeStreamingHandlerConfig = MapUtils.of(
"eventStreamPayloadHandlerProvider", writer -> {
writer.addDependency(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE);
writer.addImport("eventStreamPayloadHandler", "eventStreamPayloadHandler",
AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.packageName);
writer.write("eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler,");
},
"requestHandler", writer -> {
writer.addDependency(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE);
writer.addImport("WebSocketHandler", "WebSocketHandler",
AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.packageName);
writer.write("requestHandler: new WebSocketHandler(),");
});

switch (target) {
case REACT_NATIVE:
case BROWSER:
return transcribeStreamingHandlerConfig;
default:
return Collections.emptyMap();
}
}

private static boolean isTranscribeStreaming(ServiceShape service) {
String serviceId = service.getTrait(ServiceTrait.class).map(ServiceTrait::getSdkId).orElse("");
return serviceId.equals("Transcribe Streaming");
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public enum AwsDependency implements SymbolDependencyContainer {
UUID_GENERATOR(NORMAL_DEPENDENCY, "uuid", "^7.0.0"),
UUID_GENERATOR_TYPES(DEV_DEPENDENCY, "@types/uuid", "^7.0.0"),
MIDDLEWARE_EVENTSTREAM(NORMAL_DEPENDENCY, "@aws-sdk/middleware-eventstream", "^1.0.0-beta.0"),
AWS_SDK_EVENTSTREAM_HANDLER_NODE(NORMAL_DEPENDENCY, "@aws-sdk/eventstream-handler-node", "^1.0.0-beta.0");
AWS_SDK_EVENTSTREAM_HANDLER_NODE(NORMAL_DEPENDENCY, "@aws-sdk/eventstream-handler-node", "^1.0.0-beta.0"),
TRANSCRIBE_STREAMING_MIDDLEWARE(NORMAL_DEPENDENCY, "@aws-sdk/middleware-sdk-transcribe-streaming", "^1.0.0-gamma.0");

public final String packageName;
public final String version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ software.amazon.smithy.aws.typescript.codegen.AddBodyChecksumGeneratorDependency
software.amazon.smithy.aws.typescript.codegen.AddS3Config
software.amazon.smithy.aws.typescript.codegen.AddEventStreamHandlingDependency
software.amazon.smithy.aws.typescript.codegen.AddHttp2Dependency
software.amazon.smithy.aws.typescript.codegen.AddTranscribeStreamingDependency
1 change: 1 addition & 0 deletions packages/eventstream-serde-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"license": "Apache-2.0",
"dependencies": {
"@aws-sdk/eventstream-marshaller": "1.0.0-gamma.1",
"@aws-sdk/eventstream-serde-universal": "1.0.0-gamma.0",
"@aws-sdk/types": "1.0.0-gamma.1",
"tslib": "^1.8.0"
},
Expand Down
65 changes: 42 additions & 23 deletions packages/eventstream-serde-browser/src/EventStreamMarshaller.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { EventStreamMarshaller as UniversalEventStreamMarshaller } from "@aws-sdk/eventstream-serde-universal";
import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller";
import {
Encoder,
Decoder,
Message,
EventStreamMarshaller as IEventStreamMarshaller
} from "@aws-sdk/types";
import { ReadableStreamtoIterable } from "./utils";
import { getChunkedStream } from "./getChunkedStream";
import { getEventMessageStream } from "./getEventMessageStream";
import { getDeserializingStream } from "./getDeserializingStream";
import { readableStreamtoIterable, iterableToReadableStream } from "./utils";

export interface EventStreamMarshaller extends IEventStreamMarshaller {}

Expand All @@ -17,35 +15,48 @@ export interface EventStreamMarshallerOptions {
utf8Decoder: Decoder;
}

/**
* Utility class used to serialize and deserialize event streams in
* browsers and ReactNative.
*
* In browsers where ReadableStream API is available:
* * deserialize from ReadableStream to an async iterable of output structure
* * serialize from async iterable of input structure to ReadableStream
* In ReactNative where only async iterable API is available:
* * deserialize from async iterable of binaries to async iterable of output structure
* * serialize from async iterable of input structure to async iterable of binaries
*
* We use ReadableStream API in browsers because of the consistency with other
* streaming operations, where ReadableStream API is used to denote streaming data.
* Whereas in ReactNative, ReadableStream API is not available, we use async iterable
* for streaming data although it has lower throughput.
*/
export class EventStreamMarshaller {
private readonly eventMarshaller: EventMarshaller;
private readonly universalMarshaller: UniversalEventStreamMarshaller;
constructor({ utf8Encoder, utf8Decoder }: EventStreamMarshallerOptions) {
this.eventMarshaller = new EventMarshaller(utf8Encoder, utf8Decoder);
this.universalMarshaller = new UniversalEventStreamMarshaller({
utf8Decoder,
utf8Encoder
});
}

deserialize<T>(
body: ReadableStream<Uint8Array>,
body: ReadableStream<Uint8Array> | AsyncIterable<Uint8Array>,
deserializer: (input: { [event: string]: Message }) => Promise<T>
): AsyncIterable<T> {
const chunkedStream = getChunkedStream(body);
const messageStream = getEventMessageStream(
chunkedStream,
this.eventMarshaller
);
const deserialingStream = getDeserializingStream(
messageStream,
deserializer
);
return ReadableStreamtoIterable(deserialingStream);
const bodyIterable = isReadableStream(body)
? readableStreamtoIterable(body)
: body;
return this.universalMarshaller.deserialize(bodyIterable, deserializer);
}

/**
* Generate a ReadableStream that serialize events
* to event stream binary chunks; Use a pull stream
* here to support low connection speed.
* Generate a stream that serialize events into stream of binary chunks;
*
* This doesn't work on browser currently because
* browser doesn't support upload streaming.
* Caveat is that streaming request payload doesn't work on browser with native
* xhr or fetch handler currently because they don't support upload streaming.
* reference:
* * https://bugs.chromium.org/p/chromium/issues/detail?id=688906
* * https://bugzilla.mozilla.org/show_bug.cgi?id=1387483
Expand All @@ -54,8 +65,16 @@ export class EventStreamMarshaller {
serialize<T>(
input: AsyncIterable<T>,
serializer: (event: T) => Message
): ReadableStream {
throw new Error(`event stream request in browser is not supported
Reference: https://bugs.chromium.org/p/chromium/issues/detail?id=688906`);
): ReadableStream | AsyncIterable<Uint8Array> {
const serialziedIterable = this.universalMarshaller.serialize(
input,
serializer
);
return typeof ReadableStream === "function"
? iterableToReadableStream(serialziedIterable)
: serialziedIterable;
}
}

const isReadableStream = (body: any): body is ReadableStream =>
typeof ReadableStream === "function" && body instanceof ReadableStream;
Loading