Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add getWriteStream and createWriteStreamFullResponse methods #453

Merged
merged 6 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions samples/append_rows_buffered.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ function main(
// [START bigquerystorage_append_rows_buffered]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendRowsBuffered() {
/**
Expand All @@ -35,26 +34,20 @@ function main(
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.BufferedStream;
const writeClient = new WriterClient({projectId: projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
writeStream.tableSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamId,
});
Expand Down
21 changes: 7 additions & 14 deletions samples/append_rows_json_writer_commited.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ function main(
// [START bigquerystorage_jsonstreamwriter_commited]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendJSONRowsCommitedStream() {
/**
Expand All @@ -35,26 +34,20 @@ function main(
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.CommittedStream;
const writeClient = new WriterClient({projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
writeStream.tableSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamId,
});
Expand Down
3 changes: 2 additions & 1 deletion samples/append_rows_pending.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ function main(
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({projectId});
try {
const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const connection = await writeClient.createStreamConnection({
Expand Down
21 changes: 7 additions & 14 deletions samples/append_rows_table_to_proto2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ function main(
// [START bigquerystorage_jsonstreamwriter_pending]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendRowsPendingStream() {
/**
Expand All @@ -35,26 +34,20 @@ function main(
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({projectId: projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const streamId = await writeClient.createWriteStream({
const writeStream = await writeClient.createWriteStreamFullResponse({
streamType,
destinationTable,
});
const streamId = writeStream.name;
console.log(`Stream created: ${streamId}`);

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
writeStream.tableSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamId,
});
Expand Down
10 changes: 6 additions & 4 deletions src/adapt/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import * as protos from '../../protos/protos';
import {bqTypeToFieldTypeMap, convertModeToLabel} from './proto_mappings';
import {normalizeFieldType} from './schema_mappings';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type TableFieldSchema =
Expand Down Expand Up @@ -92,12 +93,13 @@ function convertStorageSchemaToFileDescriptorInternal(
for (const field of schema.fields ?? []) {
fNumber += 1;
const currentScope = `${scope}_${field.name}`;
const normalizedType = normalizeFieldType(field);
if (
field.type === TableFieldSchema.Type.STRUCT ||
field.type === TableFieldSchema.Type.RANGE
normalizedType === TableFieldSchema.Type.STRUCT ||
normalizedType === TableFieldSchema.Type.RANGE
) {
let subSchema: TableSchema = {};
switch (field.type) {
switch (normalizedType) {
case TableFieldSchema.Type.STRUCT:
subSchema = {
fields: field.fields,
Expand Down Expand Up @@ -245,7 +247,7 @@ function convertTableFieldSchemaToFieldDescriptorProto(
useProto3: boolean
): FieldDescriptorProto {
const name = field.name;
const type = field.type;
const type = normalizeFieldType(field);
if (!type) {
throw Error(`table field ${name} missing type`);
}
Expand Down
13 changes: 13 additions & 0 deletions src/adapt/schema_mappings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ export const fieldTypeMap: Record<string, StorageTableFieldType> = {
protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.GEOGRAPHY,
};

export function normalizeFieldType(
field: StorageTableField
): StorageTableField['type'] {
if (field.type) {
const ftype = fieldTypeMap[field.type];
if (!ftype) {
return field.type;
}
return ftype;
}
return field.type;
}

export const modeMap: Record<string, StorageTableField['mode']> = {
NULLABLE:
protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.NULLABLE,
Expand Down
103 changes: 88 additions & 15 deletions src/managedwriter/writer_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import type {CallOptions, ClientOptions} from 'google-gax';
import * as protos from '../../protos/protos';

import {BigQueryWriteClient} from '../v1';
import {WriteStreamType, DefaultStream, streamTypeToEnum} from './stream_types';
import {
WriteStreamType,
DefaultStream,
streamTypeToEnum,
WriteStream,
} from './stream_types';
import {StreamConnection} from './stream_connection';

type StreamConnections = {
Expand All @@ -29,6 +34,9 @@ type RetrySettings = {
};
type CreateWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest;
type GetWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.IGetWriteStreamRequest;
type WriteStreamView = protos.google.cloud.bigquery.storage.v1.WriteStreamView;
type BatchCommitWriteStreamsRequest =
protos.google.cloud.bigquery.storage.v1.IBatchCommitWriteStreamsRequest;
type BatchCommitWriteStreamsResponse =
Expand Down Expand Up @@ -140,7 +148,9 @@ export class WriterClient {
}

/**
* Creates a write stream to the given table.
* Creates a write stream to the given table and return just the
* streamId.
*
* Additionally, every table has a special stream named DefaultStream
* to which data can be written. This stream doesn't need to be created using
* createWriteStream. It is a stream that can be used simultaneously by any
Expand All @@ -156,10 +166,46 @@ export class WriterClient {
* of `projects/{project}/datasets/{dataset}/tables/{table}`.
* @returns {Promise<string>}} - The promise which resolves to the streamId.
*/
async createWriteStream(request: {
streamType: WriteStreamType;
destinationTable: string;
}): Promise<string> {
async createWriteStream(
request: {
streamType: WriteStreamType;
destinationTable: string;
},
options?: CallOptions
): Promise<string> {
const stream = await this.createWriteStreamFullResponse(request, options);
if (stream.name) {
return stream.name;
}
return '';
}

/**
* Creates a write stream to the given table and return all
* information about it.
*
* Additionally, every table has a special stream named DefaultStream
* to which data can be written. This stream doesn't need to be created using
* createWriteStream. It is a stream that can be used simultaneously by any
* number of clients. Data written to this stream is considered committed as
* soon as an acknowledgement is received.
*
* @param {Object} request
* The request object that will be sent.
* @param {string} request.streamType
* Required. The type of stream to create.
* @param {string} request.destinationTable
* Required. Reference to the table to which the stream belongs, in the format
* of `projects/{project}/datasets/{dataset}/tables/{table}`.
* @returns {Promise<WriteStream>}} - The promise which resolves to the WriteStream.
*/
async createWriteStreamFullResponse(
request: {
streamType: WriteStreamType;
destinationTable: string;
},
options?: CallOptions
): Promise<WriteStream> {
await this.initialize();
const {streamType, destinationTable} = request;
const createReq: CreateWriteStreamRequest = {
Expand All @@ -168,19 +214,46 @@ export class WriterClient {
type: streamTypeToEnum(streamType),
},
};
const [response] = await this._client.createWriteStream(createReq);
const [response] = await this._client.createWriteStream(createReq, options);
if (typeof [response] === undefined) {
throw new gax.GoogleError(`${response}`);
}
try {
if (response.name) {
const streamId = response.name;
return streamId;
}
return '';
} catch {
throw new Error('Stream connection failed');
return response;
}

/**
* Gets information about a write stream.
*
* @param {Object} request
* The request object that will be sent.
* @param {string} request.streamId
* Required. Name of the stream to get, in the form of
* `projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}`
* @param {WriteStreamView} request.view
* Indicates whether to get full or partial view of the WriteStream. If
* not set, view returned will be basic.
* @param {object} [options]
* Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details.
* @returns {Promise<WriteStream>}} - The promise which resolves to the WriteStream.
*/
async getWriteStream(
request: {
streamId: string;
view?: WriteStreamView;
},
options?: CallOptions
): Promise<WriteStream> {
await this.initialize();
const {streamId, view} = request;
const getReq: GetWriteStreamRequest = {
name: streamId,
view,
};
const [response] = await this._client.getWriteStream(getReq, options);
if (typeof [response] === undefined) {
throw new gax.GoogleError(`${response}`);
}
return response;
}

/**
Expand Down
Loading
Loading