Skip to content

Commit

Permalink
fix: auto convert nested fields (#438)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx authored Apr 8, 2024
1 parent f0505e7 commit 0ba5b7d
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 58 deletions.
113 changes: 70 additions & 43 deletions src/managedwriter/encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import * as protobuf from 'protobufjs';
import * as protos from '../../protos/protos';
import {normalizeDescriptor} from '../adapt/proto';
import * as extend from 'extend';
import {JSONObject, JSONValue} from './json_writer';

type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
type DescriptorProto = protos.google.protobuf.DescriptorProto;
Expand Down Expand Up @@ -67,10 +68,10 @@ export class JSONEncoder {
* @param {JSONList} rows - The list of JSON rows.
* @returns {Uint8Array[]} The encoded rows.
*/
encodeRows(rows: any[]): Uint8Array[] {
encodeRows(rows: JSONObject[]): Uint8Array[] {
const serializedRows = rows
.map(r => {
return this.convertRow(r);
return this.convertRow(r, this._type);
})
.map(r => {
return this.encodeRow(r);
Expand All @@ -82,61 +83,87 @@ export class JSONEncoder {
return value && [undefined, Object].includes(value.constructor);
}

private encodeRow(row: any): Uint8Array {
private encodeRow(row: JSONObject): Uint8Array {
const msg = this._type.create(row);
return this._type.encode(msg).finish();
}

private convertRow(source: any): Object {
private convertRow(source: JSONObject, ptype: protobuf.Type): JSONObject {
const row = extend(true, {}, source);
for (const key in row) {
const value = row[key];
if (value === null) {
continue;
}
const pfield = this._type.fields[key];
if (!pfield) {
const encodedValue = this.encodeRowValue(value, key, ptype);
if (encodedValue === undefined) {
continue;
}
if (value instanceof Date) {
switch (pfield.type) {
case 'int32': // DATE
// The value is the number of days since the Unix epoch (1970-01-01)
row[key] = value.getTime() / (1000 * 60 * 60 * 24);
break;
case 'int64': // TIMESTAMP
// The value is given in microseconds since the Unix epoch (1970-01-01)
row[key] = value.getTime() * 1000;
break;
case 'string': // DATETIME
row[key] = value.toJSON().replace(/^(.*)T(.*)Z$/, '$1 $2');
break;
}
continue;
}
// NUMERIC and BIGNUMERIC integer
if (typeof value === 'number' || typeof value === 'bigint') {
switch (pfield.type) {
case 'string':
row[key] = value.toString(10);
break;
}
continue;
}
if (Array.isArray(value)) {
row[key] = value.map(v => {
if (!this.isPlainObject(v)) {
return v;
}
return this.convertRow(v);
});
continue;
row[key] = encodedValue;
}
return row;
}

private encodeRowValue(
value: JSONValue,
key: string,
ptype: protobuf.Type
): JSONValue | undefined {
const pfield = ptype.fields[key];
if (!pfield) {
return undefined;
}
if (value instanceof Date) {
switch (pfield.type) {
case 'int32': // DATE
// The value is the number of days since the Unix epoch (1970-01-01)
return value.getTime() / (1000 * 60 * 60 * 24);
case 'int64': // TIMESTAMP
// The value is given in microseconds since the Unix epoch (1970-01-01)
return value.getTime() * 1000;
case 'string': // DATETIME
return value.toJSON().replace(/^(.*)T(.*)Z$/, '$1 $2');
}
if (this.isPlainObject(value)) {
row[key] = this.convertRow(value);
continue;
return undefined;
}
// NUMERIC and BIGNUMERIC integer
if (typeof value === 'number' || typeof value === 'bigint') {
switch (pfield.type) {
case 'string':
return value.toString(10);
}
return undefined;
}
if (Array.isArray(value)) {
const subType = this.getSubType(key, ptype);
return value.map(v => {
if (this.isPlainObject(v)) {
return this.convertRow(v as JSONObject, subType);
}
const encodedValue = this.encodeRowValue(v, key, subType);
if (encodedValue === undefined) {
return v;
}
return encodedValue;
});
}
if (this.isPlainObject(value)) {
const subType = this.getSubType(key, ptype);
return this.convertRow(value as JSONObject, subType);
}
return undefined;
}

private getSubType(key: string, ptype: protobuf.Type): protobuf.Type {
const pfield = ptype.fields[key];
if (!pfield) {
return ptype;
}
try {
const subType = ptype.lookupTypeOrEnum(pfield.type);
return subType;
} catch (err) {
return ptype;
}
return row;
}
}
2 changes: 1 addition & 1 deletion src/managedwriter/json_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type MissingValueInterpretationMap = {
};
type IInt64Value = protos.google.protobuf.IInt64Value;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
export type JSONPrimitive = string | number | boolean | Date | null;
export type JSONPrimitive = string | number | bigint | boolean | Date | null;
export type JSONValue = JSONPrimitive | JSONObject | JSONArray;
export type JSONObject = {[member: string]: JSONValue};
export type JSONArray = Array<JSONValue>;
Expand Down
60 changes: 46 additions & 14 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,20 +371,34 @@ describe('managedwriter.WriterClient', () => {

describe('JSONEncoder', () => {
it('should automatically convert date/datetime/timestamps to expect BigQuery format', () => {
const updatedSchema = {
const updatedSchema: TableSchema = {
fields: [
...(schema.fields || []),
{
name: 'customer_birthday',
type: 'DATE',
mode: 'REQUIRED',
},
{
name: 'customer_created_at',
type: 'DATETIME',
name: 'customer_metadata',
type: 'RECORD',
mode: 'REQUIRED',
fields: [
{
name: 'customer_created_at',
type: 'DATETIME',
mode: 'REQUIRED',
},
{
name: 'customer_updated_at',
type: 'TIMESTAMP',
},
],
},
{
name: 'customer_updated_at',
name: 'customer_last_purchase_dates',
type: 'TIMESTAMP',
mode: 'REPEATED',
},
],
};
Expand All @@ -401,17 +415,29 @@ describe('managedwriter.WriterClient', () => {
customer_name: 'Ada Lovelace',
row_num: 1,
customer_birthday: new Date('1815-12-10'),
customer_created_at: new Date('2022-01-09T03:49:46.564Z'),
customer_updated_at: new Date('2023-01-09T03:49:46.564Z'),
customer_metadata: {
customer_created_at: new Date('2022-01-09T03:49:46.564Z'),
customer_updated_at: new Date('2023-01-09T03:49:46.564Z'),
},
customer_last_purchase_dates: [
new Date('2022-01-09T03:49:46.564Z'),
new Date('2023-01-09T03:49:46.564Z'),
],
};

// Row 2
const row2 = {
customer_name: 'Alan Turing',
row_num: 2,
customer_birthday: new Date('1912-07-23'),
customer_created_at: new Date('2022-01-09T03:49:46.564Z'),
customer_updated_at: new Date('2023-01-09T03:49:46.564Z'),
customer_metadata: {
customer_created_at: new Date('2022-01-09T03:49:46.564Z'),
customer_updated_at: new Date('2023-01-09T03:49:46.564Z'),
},
customer_last_purchase_dates: [
new Date('2022-01-09T03:49:46.564Z'),
new Date('2023-01-09T03:49:46.564Z'),
],
};

const Proto = Type.fromDescriptor(protoDescriptor);
Expand All @@ -421,20 +447,26 @@ describe('managedwriter.WriterClient', () => {
const decodedRow1 = Proto.decode(encodedRow1).toJSON();
assert.deepEqual(decodedRow1, {
customer_name: 'Ada Lovelace',
row_num: 1,
row_num: '1',
customer_birthday: -56270,
customer_created_at: '2022-01-09 03:49:46.564',
customer_updated_at: 1673236186564000,
customer_metadata: {
customer_created_at: '2022-01-09 03:49:46.564',
customer_updated_at: '1673236186564000',
},
customer_last_purchase_dates: ['1641700186564000', '1673236186564000'],
});

const encodedRow2 = encoded[1];
const decodedRow2 = Proto.decode(encodedRow2).toJSON();
assert.deepEqual(decodedRow2, {
customer_name: 'Alan Turing',
row_num: 2,
row_num: '2',
customer_birthday: -20981,
customer_created_at: '2022-01-09 03:49:46.564',
customer_updated_at: 1673236186564000,
customer_metadata: {
customer_created_at: '2022-01-09 03:49:46.564',
customer_updated_at: '1673236186564000',
},
customer_last_purchase_dates: ['1641700186564000', '1673236186564000'],
});
});

Expand Down

0 comments on commit 0ba5b7d

Please sign in to comment.