Skip to content

Commit

Permalink
feat: add getWriteStream and createWriteStreamFullResponse methods (#453
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alvarowolfx authored May 8, 2024
1 parent 841de04 commit 27dce6a
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 63 deletions.
21 changes: 7 additions & 14 deletions samples/append_rows_buffered.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ function main(
// [START bigquerystorage_append_rows_buffered]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendRowsBuffered() {
/**
Expand All @@ -35,26 +34,20 @@ function main(
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.BufferedStream;
const writeClient = new WriterClient({projectId: projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
writeStream.tableSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamId,
});
Expand Down
21 changes: 7 additions & 14 deletions samples/append_rows_json_writer_commited.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ function main(
// [START bigquerystorage_jsonstreamwriter_commited]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendJSONRowsCommitedStream() {
/**
Expand All @@ -35,26 +34,20 @@ function main(
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.CommittedStream;
const writeClient = new WriterClient({projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
writeStream.tableSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamId,
});
Expand Down
3 changes: 2 additions & 1 deletion samples/append_rows_pending.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ function main(
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({projectId});
try {
const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const connection = await writeClient.createStreamConnection({
Expand Down
21 changes: 7 additions & 14 deletions samples/append_rows_table_to_proto2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ function main(
// [START bigquerystorage_jsonstreamwriter_pending]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendRowsPendingStream() {
/**
Expand All @@ -35,26 +34,20 @@ function main(
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({projectId: projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
writeStream.tableSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamId,
});
Expand Down
10 changes: 6 additions & 4 deletions src/adapt/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import * as protos from '../../protos/protos';
import {bqTypeToFieldTypeMap, convertModeToLabel} from './proto_mappings';
import {normalizeFieldType} from './schema_mappings';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type TableFieldSchema =
Expand Down Expand Up @@ -92,12 +93,13 @@ function convertStorageSchemaToFileDescriptorInternal(
for (const field of schema.fields ?? []) {
fNumber += 1;
const currentScope = `${scope}_${field.name}`;
const normalizedType = normalizeFieldType(field);
if (
field.type === TableFieldSchema.Type.STRUCT ||
field.type === TableFieldSchema.Type.RANGE
normalizedType === TableFieldSchema.Type.STRUCT ||
normalizedType === TableFieldSchema.Type.RANGE
) {
let subSchema: TableSchema = {};
switch (field.type) {
switch (normalizedType) {
case TableFieldSchema.Type.STRUCT:
subSchema = {
fields: field.fields,
Expand Down Expand Up @@ -245,7 +247,7 @@ function convertTableFieldSchemaToFieldDescriptorProto(
useProto3: boolean
): FieldDescriptorProto {
const name = field.name;
const type = field.type;
const type = normalizeFieldType(field);
if (!type) {
throw Error(`table field ${name} missing type`);
}
Expand Down
13 changes: 13 additions & 0 deletions src/adapt/schema_mappings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ export const fieldTypeMap: Record<string, StorageTableFieldType> = {
protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.GEOGRAPHY,
};

export function normalizeFieldType(
field: StorageTableField
): StorageTableField['type'] {
if (field.type) {
const ftype = fieldTypeMap[field.type];
if (!ftype) {
return field.type;
}
return ftype;
}
return field.type;
}

export const modeMap: Record<string, StorageTableField['mode']> = {
NULLABLE:
protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.NULLABLE,
Expand Down
103 changes: 88 additions & 15 deletions src/managedwriter/writer_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import type {CallOptions, ClientOptions} from 'google-gax';
import * as protos from '../../protos/protos';

import {BigQueryWriteClient} from '../v1';
import {WriteStreamType, DefaultStream, streamTypeToEnum} from './stream_types';
import {
WriteStreamType,
DefaultStream,
streamTypeToEnum,
WriteStream,
} from './stream_types';
import {StreamConnection} from './stream_connection';

type StreamConnections = {
Expand All @@ -29,6 +34,9 @@ type RetrySettings = {
};
type CreateWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest;
type GetWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.IGetWriteStreamRequest;
type WriteStreamView = protos.google.cloud.bigquery.storage.v1.WriteStreamView;
type BatchCommitWriteStreamsRequest =
protos.google.cloud.bigquery.storage.v1.IBatchCommitWriteStreamsRequest;
type BatchCommitWriteStreamsResponse =
Expand Down Expand Up @@ -140,7 +148,9 @@ export class WriterClient {
}

/**
* Creates a write stream to the given table.
* Creates a write stream to the given table and return just the
* streamId.
*
* Additionally, every table has a special stream named DefaultStream
* to which data can be written. This stream doesn't need to be created using
* createWriteStream. It is a stream that can be used simultaneously by any
Expand All @@ -156,10 +166,46 @@ export class WriterClient {
* of `projects/{project}/datasets/{dataset}/tables/{table}`.
* @returns {Promise<string>}} - The promise which resolves to the streamId.
*/
async createWriteStream(request: {
streamType: WriteStreamType;
destinationTable: string;
}): Promise<string> {
async createWriteStream(
request: {
streamType: WriteStreamType;
destinationTable: string;
},
options?: CallOptions
): Promise<string> {
const stream = await this.createWriteStreamFullResponse(request, options);
if (stream.name) {
return stream.name;
}
return '';
}

/**
* Creates a write stream to the given table and return all
* information about it.
*
* Additionally, every table has a special stream named DefaultStream
* to which data can be written. This stream doesn't need to be created using
* createWriteStream. It is a stream that can be used simultaneously by any
* number of clients. Data written to this stream is considered committed as
* soon as an acknowledgement is received.
*
* @param {Object} request
* The request object that will be sent.
* @param {string} request.streamType
* Required. The type of stream to create.
* @param {string} request.destinationTable
* Required. Reference to the table to which the stream belongs, in the format
* of `projects/{project}/datasets/{dataset}/tables/{table}`.
* @returns {Promise<WriteStream>}} - The promise which resolves to the WriteStream.
*/
async createWriteStreamFullResponse(
request: {
streamType: WriteStreamType;
destinationTable: string;
},
options?: CallOptions
): Promise<WriteStream> {
await this.initialize();
const {streamType, destinationTable} = request;
const createReq: CreateWriteStreamRequest = {
Expand All @@ -168,19 +214,46 @@ export class WriterClient {
type: streamTypeToEnum(streamType),
},
};
const [response] = await this._client.createWriteStream(createReq);
const [response] = await this._client.createWriteStream(createReq, options);
if (typeof [response] === undefined) {
throw new gax.GoogleError(`${response}`);
}
try {
if (response.name) {
const streamId = response.name;
return streamId;
}
return '';
} catch {
throw new Error('Stream connection failed');
return response;
}

/**
* Gets information about a write stream.
*
* @param {Object} request
* The request object that will be sent.
* @param {string} request.streamId
* Required. Name of the stream to get, in the form of
* `projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}`
* @param {WriteStreamView} request.view
* Indicates whether to get full or partial view of the WriteStream. If
* not set, view returned will be basic.
* @param {object} [options]
* Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details.
* @returns {Promise<WriteStream>}} - The promise which resolves to the WriteStream.
*/
async getWriteStream(
request: {
streamId: string;
view?: WriteStreamView;
},
options?: CallOptions
): Promise<WriteStream> {
await this.initialize();
const {streamId, view} = request;
const getReq: GetWriteStreamRequest = {
name: streamId,
view,
};
const [response] = await this._client.getWriteStream(getReq, options);
if (typeof [response] === undefined) {
throw new gax.GoogleError(`${response}`);
}
return response;
}

/**
Expand Down
Loading

0 comments on commit 27dce6a

Please sign in to comment.