Skip to content

Commit

Permalink
speech: add a sink to requestStream so events are not dropped (#2461)
Browse files Browse the repository at this point in the history
* speech: add a sink to `requestStream` so events are not dropped
  • Loading branch information
stephenplusplus authored Jul 17, 2017
1 parent 1aa325e commit f86d288
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions packages/google-cloud-node/src/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ module.exports = () => {
* for the details.
* @returns {Stream}
* An object stream which is both readable and writable. It accepts
* [StreamingRecognizeRequest]{@link StreamingRecognizeRequest}-like
* objects for the write() method, and will emit objects representing
* raw audio for the write() method, and will emit objects representing
* [StreamingRecognizeResponse]{@link StreamingRecognizeResponse} on the
* 'data' event asynchronously.
*
Expand Down Expand Up @@ -80,29 +79,39 @@ module.exports = () => {
// Format the audio content as input request for pipeline
var recognizeStream = streamEvents(pumpify.obj());

recognizeStream.once('writing', function() {
requestStream.on('error', function(err) {
// Attach the events to the request stream, but only do so
// when the first write (of data) comes in.
//
// This also means that the sending of the initial request (with the
// config) is delayed until we get the first burst of data.
recognizeStream.once('writing', () => {
requestStream.on('error', err => {
recognizeStream.destroy(err);
});

requestStream.on('response', function(response) {
// Responses must be explicitly forwarded.
requestStream.on('response', response => {
recognizeStream.emit('response', response);
});

// Write the initial configuration to the stream,
// Write the initial configuration to the stream.
requestStream.write({
streamingConfig: config
});

this.setPipeline([
// Set up appropriate piping between the stream returned by
// the underlying API method and the one that we return.
recognizeStream.setPipeline([
// Format the user's input.
through.obj(function(obj, _, next) {
// This entails that the user sends raw audio; it is wrapped in
// the appropriate request structure.
through.obj((obj, _, next) => {
next(null, {
audioContent: obj
});
}),

requestStream
requestStream,
through.obj()
]);
});

Expand Down

0 comments on commit f86d288

Please sign in to comment.