Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): Support JSON and JSONB types #2072

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions c/driver/postgresql/copy/postgres_copy_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,86 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadEnum) {
ASSERT_EQ(std::string(data_buffer + 2, 3), "sad");
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadJson) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyJson;
data.size_bytes = sizeof(kTestPgCopyJson);

auto col_type = PostgresType(PostgresTypeId::kJson);
PostgresType input_type(PostgresTypeId::kRecord);
input_type.AppendChild("col", col_type);

PostgresCopyStreamTester tester;
ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
ASSERT_EQ(tester.ReadAll(&data), ENODATA);
ASSERT_EQ(data.data.as_uint8 - kTestPgCopyJson, sizeof(kTestPgCopyJson));
ASSERT_EQ(data.size_bytes, 0);

nanoarrow::UniqueArray array;
ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
ASSERT_EQ(array->length, 3);
ASSERT_EQ(array->n_children, 1);

auto validity = reinterpret_cast<const uint8_t*>(array->children[0]->buffers[0]);
auto offsets = reinterpret_cast<const int32_t*>(array->children[0]->buffers[1]);
auto data_buffer = reinterpret_cast<const char*>(array->children[0]->buffers[2]);
ASSERT_NE(validity, nullptr);
ASSERT_NE(data_buffer, nullptr);

ASSERT_TRUE(ArrowBitGet(validity, 0));
ASSERT_TRUE(ArrowBitGet(validity, 1));
ASSERT_FALSE(ArrowBitGet(validity, 2));

ASSERT_EQ(offsets[0], 0);
ASSERT_EQ(offsets[1], 9);
ASSERT_EQ(offsets[2], 18);
ASSERT_EQ(offsets[3], 18);

ASSERT_EQ(std::string(data_buffer, 9), "[1, 2, 3]");
ASSERT_EQ(std::string(data_buffer + 9, 9), "[4, 5, 6]");
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadJsonb) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyJsonb;
data.size_bytes = sizeof(kTestPgCopyJsonb);

auto col_type = PostgresType(PostgresTypeId::kJsonb);
PostgresType input_type(PostgresTypeId::kRecord);
input_type.AppendChild("col", col_type);

struct ArrowError error;
PostgresCopyStreamTester tester;
ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
ASSERT_EQ(tester.ReadAll(&data, &error), ENODATA) << error.message;
ASSERT_EQ(data.data.as_uint8 - kTestPgCopyJsonb, sizeof(kTestPgCopyJsonb));
ASSERT_EQ(data.size_bytes, 0);

nanoarrow::UniqueArray array;

ASSERT_EQ(tester.GetArray(array.get(), &error), NANOARROW_OK) << error.message;
ASSERT_EQ(array->length, 3);
ASSERT_EQ(array->n_children, 1);

auto validity = reinterpret_cast<const uint8_t*>(array->children[0]->buffers[0]);
auto offsets = reinterpret_cast<const int32_t*>(array->children[0]->buffers[1]);
auto data_buffer = reinterpret_cast<const char*>(array->children[0]->buffers[2]);
ASSERT_NE(validity, nullptr);
ASSERT_NE(data_buffer, nullptr);

ASSERT_TRUE(ArrowBitGet(validity, 0));
ASSERT_TRUE(ArrowBitGet(validity, 1));
ASSERT_FALSE(ArrowBitGet(validity, 2));

ASSERT_EQ(offsets[0], 0);
ASSERT_EQ(offsets[1], 9);
ASSERT_EQ(offsets[2], 18);
ASSERT_EQ(offsets[3], 18);

ASSERT_EQ(std::string(data_buffer, 9), "[1, 2, 3]");
ASSERT_EQ(std::string(data_buffer + 9, 9), "[4, 5, 6]");
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadBinary) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyBinary;
Expand Down
22 changes: 22 additions & 0 deletions c/driver/postgresql/copy/postgres_copy_test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

namespace adbcpq {

// New cases can be genereated using:
// psql --host 127.0.0.1 --port 5432 --username postgres -c "COPY (SELECT ...) TO STDOUT
// WITH (FORMAT binary);" > test.copy Rscript -e "dput(brio::read_file_raw('test.copy'))"

// COPY (SELECT CAST("col" AS BOOLEAN) AS "col" FROM ( VALUES (TRUE), (FALSE), (NULL)) AS
// drvd("col")) TO STDOUT;
static const uint8_t kTestPgCopyBoolean[] = {
Expand Down Expand Up @@ -116,6 +120,24 @@ static const uint8_t kTestPgCopyText[] = {
0x03, 0x61, 0x62, 0x63, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x31, 0x32,
0x33, 0x34, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};

// COPY (SELECT CAST(col AS json) AS col FROM (VALUES ('[1, 2, 3]'), ('[4, 5, 6]'),
// (NULL::json)) AS drvd(col)) TO STDOUT WITH (FORMAT binary);
static const uint8_t kTestPgCopyJson[] = {
0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x09, 0x5b, 0x31, 0x2c, 0x20, 0x32, 0x2c, 0x20, 0x33, 0x5d, 0x00, 0x01,
0x00, 0x00, 0x00, 0x09, 0x5b, 0x34, 0x2c, 0x20, 0x35, 0x2c, 0x20, 0x36,
0x5d, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};

// COPY (SELECT CAST(col AS jsonb) AS col FROM (VALUES ('[1, 2, 3]'), ('[4, 5, 6]'),
// (NULL::jsonb)) AS drvd(col)) TO STDOUT WITH (FORMAT binary);
static const uint8_t kTestPgCopyJsonb[] = {
0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x0a, 0x01, 0x5b, 0x31, 0x2c, 0x20, 0x32, 0x2c, 0x20, 0x33, 0x5d, 0x00,
0x01, 0x00, 0x00, 0x00, 0x0a, 0x01, 0x5b, 0x34, 0x2c, 0x20, 0x35, 0x2c,
0x20, 0x36, 0x5d, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};

// COPY (SELECT CAST("col" AS BYTEA) AS "col" FROM ( VALUES (''), ('\x0001'),
// ('\x01020304'), ('\xFEFF'), (NULL)) AS drvd("col")) TO STDOUT
// WITH (FORMAT binary);
Expand Down
45 changes: 45 additions & 0 deletions c/driver/postgresql/copy/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,47 @@ class PostgresCopyBinaryFieldReader : public PostgresCopyFieldReader {
}
};

/// Postgres JSONB emits as the JSON string prefixed with a version number
/// (https://github.com/postgres/postgres/blob/3f44959f47460fb350d25d760cf2384f9aa14e9a/src/backend/utils/adt/jsonb.c#L80-L87
/// ) Currently there is only one version, so functionally this is a just string prefixed
/// with 0x01.
class PostgresCopyJsonbFieldReader : public PostgresCopyFieldReader {
public:
ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array,
ArrowError* error) override {
// -1 for NULL (0 would be empty string)
if (field_size_bytes < 0) {
return ArrowArrayAppendNull(array, 1);
}

if (field_size_bytes > data->size_bytes) {
ArrowErrorSet(error, "Expected %d bytes of field data but got %d bytes of input",
static_cast<int>(field_size_bytes),
static_cast<int>(data->size_bytes)); // NOLINT(runtime/int)
return EINVAL;
}

int8_t version;
NANOARROW_RETURN_NOT_OK(ReadChecked<int8_t>(data, &version, error));
if (version != 1) {
ArrowErrorSet(error, "Expected JSONB binary version 0x01 but got %d",
static_cast<int>(version));
return NANOARROW_OK;
}

field_size_bytes -= 1;
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, data->data.data, field_size_bytes));
data->data.as_uint8 += field_size_bytes;
data->size_bytes -= field_size_bytes;

int32_t* offsets = reinterpret_cast<int32_t*>(offsets_->data);
NANOARROW_RETURN_NOT_OK(
ArrowBufferAppendInt32(offsets_, offsets[array->length] + field_size_bytes));

return AppendValid(array);
}
};

class PostgresCopyArrayFieldReader : public PostgresCopyFieldReader {
public:
void InitChild(std::unique_ptr<PostgresCopyFieldReader> child) {
Expand Down Expand Up @@ -774,11 +815,15 @@ static inline ArrowErrorCode MakeCopyFieldReader(
case PostgresTypeId::kBpchar:
case PostgresTypeId::kName:
case PostgresTypeId::kEnum:
case PostgresTypeId::kJson:
*out = std::make_unique<PostgresCopyBinaryFieldReader>();
return NANOARROW_OK;
case PostgresTypeId::kNumeric:
*out = std::make_unique<PostgresCopyNumericFieldReader>();
return NANOARROW_OK;
case PostgresTypeId::kJsonb:
*out = std::make_unique<PostgresCopyJsonbFieldReader>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
Expand Down
2 changes: 2 additions & 0 deletions c/driver/postgresql/postgres_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ class PostgresType {
case PostgresTypeId::kText:
case PostgresTypeId::kName:
case PostgresTypeId::kEnum:
case PostgresTypeId::kJson:
case PostgresTypeId::kJsonb:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_STRING));
break;
case PostgresTypeId::kBytea:
Expand Down
Loading