From b7cdb4017ad1ca2252304902d8a450aeeb073851 Mon Sep 17 00:00:00 2001 From: William Ayd Date: Tue, 2 Jul 2024 20:17:06 -0400 Subject: [PATCH] feat(c/driver/postgresql): Read/write support for TIME64[us] (#1960) One step towards https://github.com/apache/arrow-adbc/issues/1950 Just doing microsecond support for now. Can add nanosecond support and time32 as a follow up Reading was already implemented but untested. Added some shared test data to go with the writer --- .../copy/postgres_copy_reader_test.cc | 35 +++++++++++++++++++ .../copy/postgres_copy_test_common.h | 9 +++++ .../copy/postgres_copy_writer_test.cc | 31 ++++++++++++++++ c/driver/postgresql/copy/reader.h | 9 +++-- c/driver/postgresql/copy/writer.h | 9 +++++ 5 files changed, 91 insertions(+), 2 deletions(-) diff --git a/c/driver/postgresql/copy/postgres_copy_reader_test.cc b/c/driver/postgresql/copy/postgres_copy_reader_test.cc index 0d85c256ec..958df4a932 100644 --- a/c/driver/postgresql/copy/postgres_copy_reader_test.cc +++ b/c/driver/postgresql/copy/postgres_copy_reader_test.cc @@ -314,6 +314,41 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadDate) { ASSERT_EQ(data_buffer[1], 47482); } +TEST(PostgresCopyUtilsTest, PostgresCopyReadTime) { + ArrowBufferView data; + data.data.as_uint8 = kTestPgCopyTime; + data.size_bytes = sizeof(kTestPgCopyTime); + + auto col_type = PostgresType(PostgresTypeId::kTime); + PostgresType input_type(PostgresTypeId::kRecord); + input_type.AppendChild("col", col_type); + + PostgresCopyStreamTester tester; + ASSERT_EQ(tester.Init(input_type), NANOARROW_OK); + ASSERT_EQ(tester.ReadAll(&data), ENODATA); + ASSERT_EQ(data.data.as_uint8 - kTestPgCopyTime, sizeof(kTestPgCopyTime)); + ASSERT_EQ(data.size_bytes, 0); + + nanoarrow::UniqueArray array; + ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK); + ASSERT_EQ(array->length, 4); + ASSERT_EQ(array->n_children, 1); + + auto validity = reinterpret_cast(array->children[0]->buffers[0]); + auto data_buffer = reinterpret_cast(array->children[0]->buffers[1]); + ASSERT_NE(validity, nullptr); + ASSERT_NE(data_buffer, nullptr); + + ASSERT_TRUE(ArrowBitGet(validity, 0)); + ASSERT_TRUE(ArrowBitGet(validity, 1)); + ASSERT_TRUE(ArrowBitGet(validity, 2)); + ASSERT_FALSE(ArrowBitGet(validity, 3)); + + ASSERT_EQ(data_buffer[0], 0); + ASSERT_EQ(data_buffer[1], 86399000000); + ASSERT_EQ(data_buffer[2], 49376123456); +} + TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric) { ArrowBufferView data; data.data.as_uint8 = kTestPgCopyNumeric; diff --git a/c/driver/postgresql/copy/postgres_copy_test_common.h b/c/driver/postgresql/copy/postgres_copy_test_common.h index d685486626..3c98d089c0 100644 --- a/c/driver/postgresql/copy/postgres_copy_test_common.h +++ b/c/driver/postgresql/copy/postgres_copy_test_common.h @@ -81,6 +81,15 @@ static const uint8_t kTestPgCopyDate[] = { 0x04, 0xff, 0xff, 0x71, 0x54, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x8e, 0xad, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; +// COPY (SELECT CAST("col" AS TIME) AS "col" FROM ( VALUES ('00:00:00'), ('23:59:59'), +// ('13:42:57.123456'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); +static const uint8_t kTestPgCopyTime[] = { + 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, + 0x14, 0x1d, 0xc8, 0x1d, 0xc0, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, + 0x0b, 0x7f, 0x0b, 0xda, 0x40, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; + // COPY (SELECT CAST(col AS TIMESTAMP) FROM ( VALUES ('1900-01-01 12:34:56'), // ('2100-01-01 12:34:56'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY); static const uint8_t kTestPgCopyTimestamp[] = { diff --git a/c/driver/postgresql/copy/postgres_copy_writer_test.cc b/c/driver/postgresql/copy/postgres_copy_writer_test.cc index 1a31126958..9b0331d6b7 100644 --- a/c/driver/postgresql/copy/postgres_copy_writer_test.cc +++ b/c/driver/postgresql/copy/postgres_copy_writer_test.cc @@ -360,6 +360,37 @@ TEST(PostgresCopyUtilsTest, PostgresCopyWriteDate) { } } +TEST(PostgresCopyUtilsTest, PostgresCopyWriteTime) { + adbc_validation::Handle schema; + adbc_validation::Handle array; + struct ArrowError na_error; + + const enum ArrowTimeUnit unit = NANOARROW_TIME_UNIT_MICRO; + const auto values = + std::vector>{0, 86399000000, 49376123456, std::nullopt}; + + ArrowSchemaInit(&schema.value); + ArrowSchemaSetTypeStruct(&schema.value, 1); + ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIME64, unit, nullptr); + ArrowSchemaSetName(schema->children[0], "col"); + ASSERT_EQ( + adbc_validation::MakeBatch(&schema.value, &array.value, &na_error, values), + ADBC_STATUS_OK); + + PostgresCopyStreamWriteTester tester; + ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK); + ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); + + const struct ArrowBuffer buf = tester.WriteBuffer(); + // The last 2 bytes of a message can be transmitted via PQputCopyData + // so no need to test those bytes from the Writer + constexpr size_t buf_size = sizeof(kTestPgCopyTime) - 2; + ASSERT_EQ(buf.size_bytes, buf_size); + for (size_t i = 0; i < buf_size; i++) { + ASSERT_EQ(buf.data[i], kTestPgCopyTime[i]); + } +} + // This buffer is similar to the read variant above but removes special values // nan, ±inf as they are not supported via the Arrow Decimal types // COPY (SELECT CAST(col AS NUMERIC) AS col FROM ( VALUES (NULL), (-123.456), diff --git a/c/driver/postgresql/copy/reader.h b/c/driver/postgresql/copy/reader.h index 9486df93cb..371445a8ef 100644 --- a/c/driver/postgresql/copy/reader.h +++ b/c/driver/postgresql/copy/reader.h @@ -852,8 +852,13 @@ static inline ArrowErrorCode MakeCopyFieldReader( } case NANOARROW_TYPE_TIME64: { - *out = std::make_unique>(); - return NANOARROW_OK; + switch (pg_type.type_id()) { + case PostgresTypeId::kTime: + *out = std::make_unique>(); + return NANOARROW_OK; + default: + return ErrorCantConvert(error, pg_type, schema_view); + } } case NANOARROW_TYPE_TIMESTAMP: diff --git a/c/driver/postgresql/copy/writer.h b/c/driver/postgresql/copy/writer.h index d7acf3cc6f..57456cb49d 100644 --- a/c/driver/postgresql/copy/writer.h +++ b/c/driver/postgresql/copy/writer.h @@ -523,6 +523,15 @@ static inline ArrowErrorCode MakeCopyFieldWriter( PostgresCopyNetworkEndianFieldWriter>(); return NANOARROW_OK; } + case NANOARROW_TYPE_TIME64: { + switch (schema_view.time_unit) { + case NANOARROW_TIME_UNIT_MICRO: + *out = std::make_unique>(); + return NANOARROW_OK; + default: + return ADBC_STATUS_NOT_IMPLEMENTED; + } + } case NANOARROW_TYPE_FLOAT: *out = std::make_unique(); return NANOARROW_OK;