diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index 25189eff09e6e..cac4896f3fa58 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -128,6 +128,8 @@ select id, my_int from list_with_null_shared order by id; 2 {NULL,3,4} 3 {NULL,-3,-4} 4 {-4,-5,-6} +5 NULL +6 NULL # my_num: varchar[] query II @@ -137,6 +139,8 @@ select id, my_num from list_with_null_shared order by id; 2 {2.2,0,NULL} 3 NULL 4 {NULL,-99999999999999999.9999} +5 NULL +6 NULL # my_num1: numeric[] query II @@ -146,6 +150,8 @@ select id, my_num_1 from list_with_null_shared order by id; 2 {2.2,0,NULL} 3 NULL 4 {NULL,-99999999999999999.9999} +5 NULL +6 NULL # my_num2: rw_int256[] query II @@ -155,6 +161,8 @@ select id, my_num_2 from list_with_null_shared order by id; 2 NULL 3 NULL 4 NULL +5 NULL +6 NULL # Due to the bug in Debezium, if a enum list contains `NULL`, the list will be converted to `NULL` query II @@ -164,6 +172,8 @@ select id, my_mood from list_with_null_shared order by id; 2 {happy,ok,sad} 3 NULL 4 NULL +5 NULL +6 NULL query II select id, my_uuid from list_with_null_shared order by id; @@ -172,6 +182,8 @@ select id, my_uuid from list_with_null_shared order by id; 2 {2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840} 3 {NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829} 4 {b2e4636d-fa03-4ad4-bf16-029a79dca3e2} +5 NULL +6 NULL query II select id, my_bytea from list_with_null_shared order by id; @@ -180,3 +192,132 @@ select id, my_bytea from list_with_null_shared order by id; 2 {"\\x00","\\x01","\\x02"} 3 {NULL,"\\x99","\\xaa"} 4 {"\\x88","\\x99","\\xaa"} +5 NULL +6 NULL + +query TTTTTTT +SELECT c_boolean, c_smallint, c_integer, c_bigint, c_decimal, c_real, c_double_precision +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +f -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000 +f 0 0 0 0 0 0 +f NULL NULL 1 NULL NULL NULL +f 1 123 1234567890 123.45 123.45 123.456 +t -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000 +t 0 0 0 0 0 0 +t NULL NULL 1 NULL NULL NULL +t 1 123 1234567890 123.45 123.45 123.456 + +query TTTTTTT +SELECT c_varchar, c_bytea +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +d \x00 +(empty) \x00 +NULL NULL +example \xdeadbeef +d \x00 +(empty) \x00 +NULL NULL +example \xdeadbeef + +query TTTTTTT +SELECT c_date, c_time, c_timestamp, c_timestamptz, c_interval +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00 +0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00 +0024-05-19 NULL NULL NULL NULL +0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day +0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00 +0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00 +0024-05-19 NULL NULL NULL NULL +0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day + +query TTTTTTT +SELECT c_jsonb, c_uuid, c_enum +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +{} bb488f9b-330d-4012-b849-12adeb49e57e happy +{} NULL sad +NULL NULL NULL +{"key": "value"} 123e4567-e89b-12d3-a456-426614174000 happy +{} bb488f9b-330d-4012-b849-12adeb49e57e happy +{} NULL sad +NULL NULL NULL +{"key": "value"} 123e4567-e89b-12d3-a456-426614174000 happy + +query TTTTTTT +SELECT c_boolean_array, c_smallint_array, c_integer_array, c_bigint_array +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +{f} {-32767} {-2147483647} {-9223372036854775807} +{} {} {} {} +NULL NULL NULL NULL +{NULL,t} {NULL,1} {NULL,123} {NULL,1234567890} +{f} {-32767} {-2147483647} {-9223372036854775807} +{} {} {} {} +NULL NULL NULL NULL +{NULL,t} {NULL,1} {NULL,123} {NULL,1234567890} + + +query TTTTTTT +SELECT c_decimal_array, c_real_array, c_double_precision_array +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +{-10.0} {-10000} {-10000} +{} {} {} +NULL NULL NULL +{NULL,123.45} {NULL,123.45} {NULL,123.456} +{-10.0} {-10000} {-10000} +{} {} {} +NULL NULL NULL +{NULL,123.45} {NULL,123.45} {NULL,123.456} + +query TTTTTTT +SELECT c_varchar_array, c_bytea_array +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +{""} {"\\x00"} +{} {} +NULL NULL +{NULL,example} {NULL,"\\xdeadbeef"} +{""} {"\\x00"} +{} {} +NULL NULL +{NULL,example} {NULL,"\\xdeadbeef"} + +query TTTTTTT +SELECT c_date_array, c_time_array, c_timestamp_array, c_timestamptz_array, c_interval_array +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +{0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL +{} {} {} {} NULL +NULL NULL NULL NULL NULL +{NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL +{0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL +{} {} {} {} NULL +NULL NULL NULL NULL NULL +{NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL + +query TTTTTTT +SELECT c_jsonb_array, c_uuid_array, c_enum_array +FROM postgres_all_types +ORDER BY c_boolean, c_bigint, c_date; +---- +{"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} NULL +{} {} {} +NULL NULL NULL +{NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL +{"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} NULL +{} {} {} +NULL NULL NULL +{NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 3563ee6ed2a4b..e07a0c1d773ef 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -43,6 +43,9 @@ create table rw.products_test ( id INT, PRIMARY KEY (id) ) include timestamp as commit_ts from mysql_mytest table 'mytest.products'; +# sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill. +sleep 10s + system ok mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'Milk','Milk is a white liquid food'); INSERT INTO orders VALUES (default, '2023-11-28 15:08:22', 'Bob', 10.52, 100, false);" @@ -206,6 +209,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz timestamptz, c_interval interval, c_jsonb jsonb, + c_uuid varchar, + c_enum varchar, c_boolean_array boolean[], c_smallint_array smallint[], c_integer_array integer[], @@ -221,7 +226,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz_array timestamptz[], c_interval_array interval[], c_jsonb_array jsonb[], - c_uuid varchar, + c_uuid_array varchar[], + c_enum_array varchar[], PRIMARY KEY (c_boolean,c_bigint,c_date) ) from pg_source table 'public.postgres_all_types'; @@ -253,7 +259,7 @@ sleep 3s query TTTTTTT SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array,c_uuid FROM postgres_all_types where c_bigint=-9223372036854775807 ---- -f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e +f 0001-01-01 00:00:00 2001-01-01 00:00:00 {} {-32767} {"2001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e # postgres streaming test diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index b936fb7876ade..6579bc2683037 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -35,6 +35,7 @@ create table abs.t1 ("V1" int primary key, v2 double precision, v3 varchar, v4 n create publication my_publicaton for table abs.t1 ("V1", v3); insert into abs.t1 values (1, 1.1, 'aaa', '5431.1234'); +CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy'); CREATE TABLE IF NOT EXISTS postgres_all_types( c_boolean boolean, @@ -52,6 +53,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz timestamptz, c_interval interval, c_jsonb jsonb, + c_uuid uuid, + c_enum mood, c_boolean_array boolean[], c_smallint_array smallint[], c_integer_array integer[], @@ -67,11 +70,14 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz_array timestamptz[], c_interval_array interval[], c_jsonb_array jsonb[], - c_uuid uuid, + c_uuid_array uuid[], + c_enum_array mood[], PRIMARY KEY (c_boolean,c_bigint,c_date) ); -INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null); -INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e'); +INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]); +INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,NULL}'); +INSERT INTO postgres_all_types VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'example', '\xDEADBEEF', '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', '{"key": "value"}', '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'example'], ARRAY[NULL, '\xDEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, '{"key": "value"}'::jsonb], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]); +INSERT INTO postgres_all_types VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); create table numeric_table(id int PRIMARY KEY, num numeric); insert into numeric_table values(1, 3.14); @@ -89,8 +95,6 @@ create table numeric_list(id int primary key, num numeric[]); insert into numeric_list values(1, '{3.14, 6, 57896044618658097711785492504343953926634992332820282019728792003956564819967, 57896044618658097711785492504343953926634992332820282019728792003956564819968, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555}'); insert into numeric_list values(2, '{nan, infinity, -infinity}'); ---- for https://github.com/risingwavelabs/risingwave/issues/16392 -CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy'); CREATE TABLE enum_table ( id int PRIMARY KEY, current_mood mood @@ -100,3 +104,4 @@ INSERT INTO enum_table VALUES (1, 'happy'); CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], my_num_1 numeric[], my_num_2 numeric[], my_mood mood[], my_uuid uuid[], my_bytea bytea[]); INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}'); INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}'); +INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL); diff --git a/e2e_test/source/cdc/postgres_cdc_insert.sql b/e2e_test/source/cdc/postgres_cdc_insert.sql index a02a35a020965..4c0d0dee48b42 100644 --- a/e2e_test/source/cdc/postgres_cdc_insert.sql +++ b/e2e_test/source/cdc/postgres_cdc_insert.sql @@ -12,6 +12,11 @@ SELECT pg_current_wal_lsn(); select * from pg_publication_tables where pubname='rw_publication'; select * from public.person order by id; +INSERT INTO postgres_all_types VALUES ( True, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]); +INSERT INTO postgres_all_types VALUES ( True, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,NULL}'); +INSERT INTO postgres_all_types VALUES ( True, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'example', '\xDEADBEEF', '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', '{"key": "value"}', '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'example'], ARRAY[NULL, '\xDEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, '{"key": "value"}'::jsonb], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]); +INSERT INTO postgres_all_types VALUES ( True, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967); --- 2^255 insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968); @@ -25,3 +30,5 @@ INSERT INTO enum_table VALUES (3, 'sad'); --- to avoid escaping issues of psql -c "", we insert this row here and check the result in check_new_rows.slt INSERT INTO list_with_null VALUES (3, '{NULL,-3,-4}', '{NULL,nan,-inf}', '{NULL,nan,-inf}', '{NULL,nan,-inf}', '{NULL,sad,ok}', '{NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829}', '{NULL,\\x99,\\xAA}'); INSERT INTO list_with_null VALUES (4, '{-4,-5,-6}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,sad,ok}', '{b2e4636d-fa03-4ad4-bf16-029a79dca3e2}', '{\\x88,\\x99,\\xAA}'); +INSERT INTO list_with_null VALUES (6, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs index 9199dd41b6e5b..7058d36ec6fd5 100644 --- a/src/common/src/types/datetime.rs +++ b/src/common/src/types/datetime.rs @@ -22,7 +22,7 @@ use std::str::FromStr; use bytes::{Bytes, BytesMut}; use chrono::{Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday}; -use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; use risingwave_common_estimate_size::ZeroHeapSize; use thiserror::Error; @@ -88,6 +88,20 @@ impl ToSql for Date { } } +impl<'a> FromSql<'a> for Date { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> std::result::Result> { + let instant = NaiveDate::from_sql(ty, raw)?; + Ok(Self::from(instant)) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::DATE) + } +} + impl ToSql for Time { accepts!(TIME); @@ -105,6 +119,20 @@ impl ToSql for Time { } } +impl<'a> FromSql<'a> for Time { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> std::result::Result> { + let instant = NaiveTime::from_sql(ty, raw)?; + Ok(Self::from(instant)) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::TIME) + } +} + impl ToSql for Timestamp { accepts!(TIMESTAMP); @@ -122,6 +150,20 @@ impl ToSql for Timestamp { } } +impl<'a> FromSql<'a> for Timestamp { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> std::result::Result> { + let instant = NaiveDateTime::from_sql(ty, raw)?; + Ok(Self::from(instant)) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::TIMESTAMP) + } +} + /// Parse a date from varchar. /// /// # Example diff --git a/src/common/src/types/from_sql.rs b/src/common/src/types/from_sql.rs new file mode 100644 index 0000000000000..ba1d49892c602 --- /dev/null +++ b/src/common/src/types/from_sql.rs @@ -0,0 +1,66 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use postgres_types::{FromSql, Type}; +use risingwave_common::types::{ + Date, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, +}; + +impl<'a> FromSql<'a> for ScalarImpl { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> Result> { + Ok(match *ty { + Type::BOOL => ScalarImpl::from(bool::from_sql(ty, raw)?), + Type::INT2 => ScalarImpl::from(i16::from_sql(ty, raw)?), + Type::INT4 => ScalarImpl::from(i32::from_sql(ty, raw)?), + Type::INT8 => ScalarImpl::from(i64::from_sql(ty, raw)?), + Type::FLOAT4 => ScalarImpl::from(f32::from_sql(ty, raw)?), + Type::FLOAT8 => ScalarImpl::from(f64::from_sql(ty, raw)?), + Type::DATE => ScalarImpl::from(Date::from_sql(ty, raw)?), + Type::TIME => ScalarImpl::from(Time::from_sql(ty, raw)?), + Type::TIMESTAMP => ScalarImpl::from(Timestamp::from_sql(ty, raw)?), + Type::TIMESTAMPTZ => ScalarImpl::from(Timestamptz::from_sql(ty, raw)?), + Type::JSONB => ScalarImpl::from(JsonbVal::from_sql(ty, raw)?), + Type::INTERVAL => ScalarImpl::from(Interval::from_sql(ty, raw)?), + Type::BYTEA => ScalarImpl::from(Vec::::from_sql(ty, raw)?.into_boxed_slice()), + Type::VARCHAR | Type::TEXT => ScalarImpl::from(String::from_sql(ty, raw)?), + // Serial, Int256, Struct, List and Decimal are not supported here + // Note: The Decimal type is specially handled in the `ScalarAdapter`. + _ => bail_not_implemented!("the postgres decoding for {ty} is unsupported"), + }) + } + + fn accepts(ty: &Type) -> bool { + matches!( + *ty, + Type::BOOL + | Type::INT2 + | Type::INT4 + | Type::INT8 + | Type::FLOAT4 + | Type::FLOAT8 + | Type::DATE + | Type::TIME + | Type::TIMESTAMP + | Type::TIMESTAMPTZ + | Type::JSONB + | Type::INTERVAL + | Type::BYTEA + | Type::VARCHAR + | Type::TEXT + ) + } +} diff --git a/src/common/src/types/jsonb.rs b/src/common/src/types/jsonb.rs index 522ec788d8646..824020fac3123 100644 --- a/src/common/src/types/jsonb.rs +++ b/src/common/src/types/jsonb.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; +use std::fmt::{self, Write}; use std::hash::Hash; -use bytes::Buf; +use bytes::{Buf, BufMut, BytesMut}; use jsonbb::{Value, ValueRef}; +use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; use risingwave_common_estimate_size::EstimateSize; use super::{Datum, IntoOrdered, ListValue, ScalarImpl, StructRef, ToOwnedDatum, F64}; @@ -539,3 +540,38 @@ impl std::io::Write for FmtToIoUnchecked { Ok(()) } } + +impl ToSql for JsonbVal { + accepts!(JSONB); + + to_sql_checked!(); + + fn to_sql( + &self, + _ty: &Type, + out: &mut BytesMut, + ) -> Result> + where + Self: Sized, + { + out.put_u8(1); + write!(out, "{}", self.0).unwrap(); + Ok(IsNull::No) + } +} + +impl<'a> FromSql<'a> for JsonbVal { + fn from_sql( + _ty: &Type, + mut raw: &'a [u8], + ) -> Result> { + if raw.is_empty() || raw.get_u8() != 1 { + return Err("invalid jsonb encoding".into()); + } + Ok(JsonbVal::from(Value::from_text(raw)?)) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::JSONB) + } +} diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index bb41e75c8a4f8..01364fb15da63 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -48,6 +48,7 @@ use crate::{ mod datetime; mod decimal; mod fields; +mod from_sql; mod interval; mod jsonb; mod macros; diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 1da5f0abd718f..feafee6e212bf 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -19,7 +19,7 @@ use std::str::FromStr; use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Datelike, TimeZone, Utc}; use chrono_tz::Tz; -use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; use risingwave_common_estimate_size::ZeroHeapSize; use serde::{Deserialize, Serialize}; @@ -51,6 +51,20 @@ impl ToSql for Timestamptz { } } +impl<'a> FromSql<'a> for Timestamptz { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> Result> { + let instant = DateTime::::from_sql(ty, raw)?; + Ok(Self::from(instant)) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::TIMESTAMPTZ) + } +} + impl ToBinary for Timestamptz { fn to_binary_with_type(&self, _ty: &DataType) -> super::to_binary::Result> { let instant = self.to_datetime_utc(); diff --git a/src/common/src/types/to_sql.rs b/src/common/src/types/to_sql.rs index 71957b3bf35c8..3ece8a574c450 100644 --- a/src/common/src/types/to_sql.rs +++ b/src/common/src/types/to_sql.rs @@ -15,11 +15,11 @@ use std::error::Error; use bytes::BytesMut; -use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use postgres_types::{to_sql_checked, IsNull, ToSql, Type}; -use crate::types::{JsonbRef, ScalarRefImpl}; +use crate::types::ScalarImpl; -impl ToSql for ScalarRefImpl<'_> { +impl ToSql for ScalarImpl { to_sql_checked!(); fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result> @@ -27,25 +27,23 @@ impl ToSql for ScalarRefImpl<'_> { Self: Sized, { match self { - ScalarRefImpl::Int16(v) => v.to_sql(ty, out), - ScalarRefImpl::Int32(v) => v.to_sql(ty, out), - ScalarRefImpl::Int64(v) => v.to_sql(ty, out), - ScalarRefImpl::Serial(v) => v.to_sql(ty, out), - ScalarRefImpl::Float32(v) => v.to_sql(ty, out), - ScalarRefImpl::Float64(v) => v.to_sql(ty, out), - ScalarRefImpl::Utf8(v) => v.to_sql(ty, out), - ScalarRefImpl::Bool(v) => v.to_sql(ty, out), - ScalarRefImpl::Decimal(v) => v.to_sql(ty, out), - ScalarRefImpl::Interval(v) => v.to_sql(ty, out), - ScalarRefImpl::Date(v) => v.to_sql(ty, out), - ScalarRefImpl::Timestamp(v) => v.to_sql(ty, out), - ScalarRefImpl::Timestamptz(v) => v.to_sql(ty, out), - ScalarRefImpl::Time(v) => v.to_sql(ty, out), - ScalarRefImpl::Bytea(v) => v.to_sql(ty, out), - ScalarRefImpl::Jsonb(_) // jsonbb::Value doesn't implement ToSql yet - | ScalarRefImpl::Int256(_) - | ScalarRefImpl::Struct(_) - | ScalarRefImpl::List(_) => { + ScalarImpl::Int16(v) => v.to_sql(ty, out), + ScalarImpl::Int32(v) => v.to_sql(ty, out), + ScalarImpl::Int64(v) => v.to_sql(ty, out), + ScalarImpl::Serial(v) => v.to_sql(ty, out), + ScalarImpl::Float32(v) => v.to_sql(ty, out), + ScalarImpl::Float64(v) => v.to_sql(ty, out), + ScalarImpl::Utf8(v) => v.to_sql(ty, out), + ScalarImpl::Bool(v) => v.to_sql(ty, out), + ScalarImpl::Decimal(v) => v.to_sql(ty, out), + ScalarImpl::Interval(v) => v.to_sql(ty, out), + ScalarImpl::Date(v) => v.to_sql(ty, out), + ScalarImpl::Timestamp(v) => v.to_sql(ty, out), + ScalarImpl::Timestamptz(v) => v.to_sql(ty, out), + ScalarImpl::Time(v) => v.to_sql(ty, out), + ScalarImpl::Bytea(v) => (&**v).to_sql(ty, out), + ScalarImpl::Jsonb(v) => v.to_sql(ty, out), + ScalarImpl::Int256(_) | ScalarImpl::Struct(_) | ScalarImpl::List(_) => { bail_not_implemented!("the postgres encoding for {ty} is unsupported") } } @@ -59,18 +57,3 @@ impl ToSql for ScalarRefImpl<'_> { true } } - -impl ToSql for JsonbRef<'_> { - accepts!(JSONB); - - to_sql_checked!(); - - fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result> - where - Self: Sized, - { - let buf = self.value_serialize(); - out.extend(buf); - Ok(IsNull::No) - } -} diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index f01484ad18fb2..da17ea256ba3c 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -14,54 +14,17 @@ use std::sync::LazyLock; -use chrono::{NaiveDate, Utc}; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{ - DataType, Date, Decimal, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp, - Timestamptz, -}; +use risingwave_common::types::{DataType, Decimal, ScalarImpl}; use thiserror_ext::AsReport; -use tokio_postgres::types::{Kind, Type}; use crate::parser::scalar_adapter::ScalarAdapter; use crate::parser::util::log_error; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); -macro_rules! handle_list_data_type { - ($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr) => { - let res = $row.try_get::<_, Option>>>($i); - match res { - Ok(val) => { - if let Some(v) = val { - v.into_iter() - .for_each(|val| $builder.append(val.map(ScalarImpl::from))) - } - } - Err(err) => { - log_error!($name, err, "parse column failed"); - } - } - }; - ($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr, $rw_type:ty) => { - let res = $row.try_get::<_, Option>>>($i); - match res { - Ok(val) => { - if let Some(v) = val { - v.into_iter().for_each(|val| { - $builder.append(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))) - }) - } - } - Err(err) => { - log_error!($name, err, "parse column failed"); - } - } - }; -} - macro_rules! handle_data_type { ($row:expr, $i:expr, $name:expr, $type:ty) => {{ let res = $row.try_get::<_, Option<$type>>($i); @@ -73,16 +36,6 @@ macro_rules! handle_data_type { } } }}; - ($row:expr, $i:expr, $name:expr, $type:ty, $rw_type:ty) => {{ - let res = $row.try_get::<_, Option<$type>>($i); - match res { - Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), - Err(err) => { - log_error!($name, err, "parse column failed"); - None - } - } - }}; } pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> OwnedRow { @@ -106,304 +59,65 @@ fn postgres_cell_to_scalar_impl( // Issue #1. The null of enum list is not supported in Debezium. An enum list contains `NULL` will fallback to `NULL`. // Issue #2. In our parser, when there's inf, -inf, nan or invalid item in a list, the whole list will fallback null. match data_type { - DataType::Boolean => { - handle_data_type!(row, i, name, bool) - } - DataType::Int16 => { - handle_data_type!(row, i, name, i16) - } - DataType::Int32 => { - handle_data_type!(row, i, name, i32) - } - DataType::Int64 => { - handle_data_type!(row, i, name, i64) - } - DataType::Float32 => { - handle_data_type!(row, i, name, f32) - } - DataType::Float64 => { - handle_data_type!(row, i, name, f64) - } - DataType::Decimal => { - handle_data_type!(row, i, name, Decimal) - } - DataType::Int256 => { - // Currently in order to handle the decimal beyond RustDecimal, - // we use the PgNumeric type to convert the decimal to a string. - // Then we convert the string to Int256. - // Note: It's only used to map the numeric type in upstream Postgres to RisingWave's rw_int256. - let res = row.try_get::<_, Option>>(i); + DataType::Boolean + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Jsonb + | DataType::Interval + | DataType::Bytea => { + // ScalarAdapter is also fine. But ScalarImpl is more efficient + let res = row.try_get::<_, Option>(i); match res { - Ok(val) => val.and_then(|v| v.into_scalar(DataType::Int256)), + Ok(val) => val, Err(err) => { - log_error!(name, err, "parse numeric column as pg_numeric failed"); + log_error!(name, err, "parse column failed"); None } } } - DataType::Varchar => { - if let Kind::Enum(_) = row.columns()[i].type_().kind() { - // enum type needs to be handled separately - let res = row.try_get::<_, Option>>(i); - match res { - Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)), - Err(err) => { - log_error!(name, err, "parse enum column failed"); - None - } - } - } else { - match *row.columns()[i].type_() { - // Since we don't support UUID natively, adapt it to a VARCHAR column - Type::UUID => { - let res = row.try_get::<_, Option>>(i); - match res { - Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)), - Err(err) => { - log_error!(name, err, "parse uuid column failed"); - None - } - } - } - // we support converting NUMERIC to VARCHAR implicitly - Type::NUMERIC => { - // Currently in order to handle the decimal beyond RustDecimal, - // we use the PgNumeric type to convert the decimal to a string. - // Note: It's only used to map the numeric type in upstream Postgres to RisingWave's varchar. - let res = row.try_get::<_, Option>>(i); - match res { - Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)), - Err(err) => { - log_error!(name, err, "parse numeric column as pg_numeric failed"); - None - } - } - } - _ => { - handle_data_type!(row, i, name, String) - } - } - } - } - DataType::Date => { - handle_data_type!(row, i, name, NaiveDate, Date) - } - DataType::Time => { - handle_data_type!(row, i, name, chrono::NaiveTime, Time) - } - DataType::Timestamp => { - handle_data_type!(row, i, name, chrono::NaiveDateTime, Timestamp) - } - DataType::Timestamptz => { - handle_data_type!(row, i, name, chrono::DateTime, Timestamptz) + DataType::Decimal => { + // Decimal is more efficient than PgNumeric in ScalarAdapter + handle_data_type!(row, i, name, Decimal) } - DataType::Bytea => { - let res = row.try_get::<_, Option>>(i); + DataType::Varchar | DataType::Int256 => { + let res = row.try_get::<_, Option>(i); match res { - Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), + Ok(val) => val.and_then(|v| v.into_scalar(data_type)), Err(err) => { log_error!(name, err, "parse column failed"); None } } } - DataType::Jsonb => { - handle_data_type!(row, i, name, serde_json::Value, JsonbVal) - } - DataType::Interval => { - handle_data_type!(row, i, name, Interval) - } - DataType::List(dtype) => { - let mut builder = dtype.create_array_builder(0); - // enum list needs to be handled separately - if let Kind::Array(item_type) = row.columns()[i].type_().kind() - && let Kind::Enum(_) = item_type.kind() - { - // Issue #1, we use ScalarAdapter instead of Option - let res = row.try_get::<_, Option>>>(i); + DataType::List(dtype) => match **dtype { + // TODO(Kexiang): allow DataType::List(_) + DataType::Struct(_) | DataType::List(_) | DataType::Serial => { + tracing::warn!( + "unsupported List data type {:?}, set the List to empty", + **dtype + ); + None + } + _ => { + let res = row.try_get::<_, Option>(i); match res { - Ok(val) => { - if let Some(vec) = val { - for val in vec { - builder.append(val.into_scalar(DataType::Varchar)) - } - } - Some(ScalarImpl::from(ListValue::new(builder.finish()))) - } + Ok(val) => val.and_then(|v| v.into_scalar(data_type)), Err(err) => { - log_error!(name, err, "parse enum column failed"); + log_error!(name, err, "parse list column failed"); None } } - } else { - match **dtype { - DataType::Boolean => { - handle_list_data_type!(row, i, name, bool, builder); - } - DataType::Int16 => { - handle_list_data_type!(row, i, name, i16, builder); - } - DataType::Int32 => { - handle_list_data_type!(row, i, name, i32, builder); - } - DataType::Int64 => { - handle_list_data_type!(row, i, name, i64, builder); - } - DataType::Float32 => { - handle_list_data_type!(row, i, name, f32, builder); - } - DataType::Float64 => { - handle_list_data_type!(row, i, name, f64, builder); - } - DataType::Decimal => { - let res = row.try_get::<_, Option>>>>(i); - match res { - Ok(val) => { - if let Some(vec) = val { - builder = ScalarAdapter::build_scalar_in_list( - vec, - DataType::Decimal, - builder, - )?; - } - } - Err(err) => { - log_error!(name, err, "parse uuid column failed"); - } - }; - } - DataType::Date => { - handle_list_data_type!(row, i, name, NaiveDate, builder, Date); - } - DataType::Varchar => { - match *row.columns()[i].type_() { - // Since we don't support UUID natively, adapt it to a VARCHAR column - Type::UUID_ARRAY => { - let res = - row.try_get::<_, Option>>>>(i); - match res { - Ok(val) => { - if let Some(vec) = val { - for val in vec { - builder.append( - val.and_then(|v| { - v.into_scalar(DataType::Varchar) - }), - ) - } - } - } - Err(err) => { - log_error!(name, err, "parse uuid column failed"); - } - }; - } - Type::NUMERIC_ARRAY => { - let res = - row.try_get::<_, Option>>>>(i); - match res { - Ok(val) => { - if let Some(vec) = val { - builder = ScalarAdapter::build_scalar_in_list( - vec, - DataType::Varchar, - builder, - )?; - } - } - Err(err) => { - log_error!( - name, - err, - "parse numeric list column as pg_numeric list failed" - ); - } - }; - } - _ => { - handle_list_data_type!(row, i, name, String, builder); - } - } - } - DataType::Time => { - handle_list_data_type!(row, i, name, chrono::NaiveTime, builder, Time); - } - DataType::Timestamp => { - handle_list_data_type!( - row, - i, - name, - chrono::NaiveDateTime, - builder, - Timestamp - ); - } - DataType::Timestamptz => { - handle_list_data_type!( - row, - i, - name, - chrono::DateTime, - builder, - Timestamptz - ); - } - DataType::Interval => { - handle_list_data_type!(row, i, name, Interval, builder); - } - DataType::Jsonb => { - handle_list_data_type!(row, i, name, serde_json::Value, builder, JsonbVal); - } - DataType::Bytea => { - let res = row.try_get::<_, Option>>>>(i); - match res { - Ok(val) => { - if let Some(v) = val { - v.into_iter().for_each(|val| { - builder.append( - val.map(|v| ScalarImpl::from(v.into_boxed_slice())), - ) - }) - } - } - Err(err) => { - log_error!(name, err, "parse column failed"); - } - } - } - DataType::Int256 => { - let res = row.try_get::<_, Option>>>>(i); - match res { - Ok(val) => { - if let Some(vec) = val { - builder = ScalarAdapter::build_scalar_in_list( - vec, - DataType::Int256, - builder, - )?; - } - } - Err(err) => { - log_error!( - name, - err, - "parse numeric list column as pg_numeric list failed" - ); - } - }; - } - DataType::Struct(_) | DataType::List(_) | DataType::Serial => { - tracing::warn!( - "unsupported List data type {:?}, set the List to empty", - **dtype - ); - } - }; - Some(ScalarImpl::from(ListValue::new(builder.finish()))) } - } + }, DataType::Struct(_) | DataType::Serial => { - // Interval and Struct are not supported + // Struct and Serial are not supported tracing::warn!(name, ?data_type, "unsupported data type, set to null"); None } diff --git a/src/connector/src/parser/scalar_adapter.rs b/src/connector/src/parser/scalar_adapter.rs index 6a6546a8d7600..0f5d2d6d6d935 100644 --- a/src/connector/src/parser/scalar_adapter.rs +++ b/src/connector/src/parser/scalar_adapter.rs @@ -17,7 +17,7 @@ use std::str::FromStr; use anyhow::anyhow; use bytes::BytesMut; use pg_bigdecimal::PgNumeric; -use risingwave_common::types::{DataType, Decimal, Int256, ScalarImpl, ScalarRefImpl}; +use risingwave_common::types::{DataType, Decimal, Int256, ListValue, ScalarImpl, ScalarRefImpl}; use thiserror_ext::AsReport; use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type}; @@ -75,14 +75,21 @@ impl ToSql for EnumString { /// Adapter for `ScalarImpl` to Postgres data type, /// which can be used to encode/decode to/from Postgres value. #[derive(Debug)] -pub(crate) enum ScalarAdapter<'a> { - Builtin(ScalarRefImpl<'a>), +pub(crate) enum ScalarAdapter { + Builtin(ScalarImpl), Uuid(uuid::Uuid), + // Currently in order to handle the decimal beyond RustDecimal, + // we use the PgNumeric type to convert the decimal to a string/decimal/rw_int256. Numeric(PgNumeric), Enum(EnumString), + NumericList(Vec>), + EnumList(Vec>), + // UuidList is covered by List, while NumericList and EnumList are special cases. + // Note: The IntervalList is not supported. + List(Vec>), } -impl ToSql for ScalarAdapter<'_> { +impl ToSql for ScalarAdapter { to_sql_checked!(); fn to_sql( @@ -95,6 +102,9 @@ impl ToSql for ScalarAdapter<'_> { ScalarAdapter::Uuid(v) => v.to_sql(ty, out), ScalarAdapter::Numeric(v) => v.to_sql(ty, out), ScalarAdapter::Enum(v) => v.to_sql(ty, out), + ScalarAdapter::NumericList(v) => v.to_sql(ty, out), + ScalarAdapter::EnumList(v) => v.to_sql(ty, out), + ScalarAdapter::List(v) => v.to_sql(ty, out), } } @@ -104,133 +114,204 @@ impl ToSql for ScalarAdapter<'_> { } /// convert from Postgres uuid, numeric and enum to `ScalarAdapter` -impl<'a> FromSql<'a> for ScalarAdapter<'_> { +impl<'a> FromSql<'a> for ScalarAdapter { fn from_sql( ty: &Type, raw: &'a [u8], ) -> Result> { match ty.kind() { Kind::Simple => match *ty { - Type::UUID => { - let uuid = uuid::Uuid::from_sql(ty, raw)?; - Ok(ScalarAdapter::Uuid(uuid)) - } - Type::NUMERIC => { - let numeric = PgNumeric::from_sql(ty, raw)?; - Ok(ScalarAdapter::Numeric(numeric)) - } - _ => Err(anyhow!("failed to convert type {:?} to ScalarAdapter", ty).into()), + Type::UUID => Ok(ScalarAdapter::Uuid(uuid::Uuid::from_sql(ty, raw)?)), + // In order to cover the decimal beyond RustDecimal(only 28 digits are supported), + // we use the PgNumeric to handle decimal from postgres. + Type::NUMERIC => Ok(ScalarAdapter::Numeric(PgNumeric::from_sql(ty, raw)?)), + _ => Ok(ScalarAdapter::Builtin(ScalarImpl::from_sql(ty, raw)?)), }, Kind::Enum(_) => Ok(ScalarAdapter::Enum(EnumString::from_sql(ty, raw)?)), + Kind::Array(Type::NUMERIC) => { + Ok(ScalarAdapter::NumericList(FromSql::from_sql(ty, raw)?)) + } + Kind::Array(inner_type) if let Kind::Enum(_) = inner_type.kind() => { + Ok(ScalarAdapter::EnumList(FromSql::from_sql(ty, raw)?)) + } + Kind::Array(_) => Ok(ScalarAdapter::List(FromSql::from_sql(ty, raw)?)), _ => Err(anyhow!("failed to convert type {:?} to ScalarAdapter", ty).into()), } } fn accepts(ty: &Type) -> bool { - matches!(ty, &Type::UUID | &Type::NUMERIC) || ::accepts(ty) + match ty.kind() { + Kind::Simple => { + matches!(ty, &Type::UUID | &Type::NUMERIC) || ::accepts(ty) + } + Kind::Enum(_) => true, + Kind::Array(inner_type) => ::accepts(inner_type), + _ => false, + } } } -impl ScalarAdapter<'_> { +impl ScalarAdapter { pub fn name(&self) -> &'static str { match self { ScalarAdapter::Builtin(_) => "Builtin", ScalarAdapter::Uuid(_) => "Uuid", ScalarAdapter::Numeric(_) => "Numeric", ScalarAdapter::Enum(_) => "Enum", + ScalarAdapter::EnumList(_) => "EnumList", + ScalarAdapter::NumericList(_) => "NumericList", + ScalarAdapter::List(_) => "List", } } /// convert `ScalarRefImpl` to `ScalarAdapter` so that we can correctly encode to postgres value - pub(crate) fn from_scalar<'a>( - scalar: ScalarRefImpl<'a>, + pub(crate) fn from_scalar( + scalar: ScalarRefImpl<'_>, ty: &Type, - ) -> ConnectorResult> { + ) -> ConnectorResult { Ok(match (scalar, ty, ty.kind()) { (ScalarRefImpl::Utf8(s), &Type::UUID, _) => ScalarAdapter::Uuid(s.parse()?), (ScalarRefImpl::Utf8(s), &Type::NUMERIC, _) => { - ScalarAdapter::Numeric(string_to_pg_numeric(s)?) + ScalarAdapter::Numeric(string_to_pg_numeric(s)) } (ScalarRefImpl::Int256(s), &Type::NUMERIC, _) => { - ScalarAdapter::Numeric(string_to_pg_numeric(&s.to_string())?) + ScalarAdapter::Numeric(string_to_pg_numeric(&s.to_string())) } (ScalarRefImpl::Utf8(s), _, Kind::Enum(_)) => { ScalarAdapter::Enum(EnumString(s.to_owned())) } - _ => ScalarAdapter::Builtin(scalar), + (ScalarRefImpl::List(list), &Type::NUMERIC_ARRAY, _) => { + let mut vec = vec![]; + for datum in list.iter() { + vec.push(match datum { + Some(ScalarRefImpl::Int256(s)) => Some(string_to_pg_numeric(&s.to_string())), + Some(ScalarRefImpl::Decimal(s)) => Some(rw_numeric_to_pg_numeric(s)), + Some(ScalarRefImpl::Utf8(s)) => Some(string_to_pg_numeric(s)), + None => None, + _ => { + unreachable!("Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"); + } + }) + } + ScalarAdapter::NumericList(vec) + } + (ScalarRefImpl::List(list), _, Kind::Array(inner_type)) => match inner_type.kind() { + Kind::Enum(_) => { + let mut vec = vec![]; + for datum in list.iter() { + vec.push(match datum { + Some(ScalarRefImpl::Utf8(s)) => Some(EnumString(s.to_owned())), + _ => unreachable!( + "Only non-null varchar[] is supported to convert to enum[]" + ), + }) + } + ScalarAdapter::EnumList(vec) + } + _ => { + let mut vec = vec![]; + for datum in list.iter() { + vec.push( + datum + .map(|s| ScalarAdapter::from_scalar(s, inner_type)) + .transpose()?, + ); + } + ScalarAdapter::List(vec) + } + }, + _ => ScalarAdapter::Builtin(scalar.into_scalar_impl()), }) } - pub fn into_scalar(self, ty: DataType) -> Option { - match (&self, &ty) { - (ScalarAdapter::Builtin(scalar), _) => Some(scalar.into_scalar_impl()), + pub fn into_scalar(self, ty: &DataType) -> Option { + match (self, &ty) { + (ScalarAdapter::Builtin(scalar), _) => Some(scalar), (ScalarAdapter::Uuid(uuid), &DataType::Varchar) => { Some(ScalarImpl::from(uuid.to_string())) } (ScalarAdapter::Numeric(numeric), &DataType::Varchar) => { - Some(ScalarImpl::from(pg_numeric_to_string(numeric))) + Some(ScalarImpl::from(pg_numeric_to_string(&numeric))) } (ScalarAdapter::Numeric(numeric), &DataType::Int256) => { - pg_numeric_to_rw_int256(numeric) + pg_numeric_to_rw_int256(&numeric) } - (ScalarAdapter::Enum(EnumString(s)), &DataType::Varchar) => Some(ScalarImpl::from(s)), - _ => { - tracing::error!( - adapter = self.name(), - rw_type = ty.pg_name(), - "failed to convert from ScalarAdapter: invalid conversion" - ); - None + (ScalarAdapter::Numeric(numeric), &DataType::Decimal) => { + pg_numeric_to_rw_numeric(&numeric) } - } - } - - pub fn build_scalar_in_list( - vec: Vec>>, - ty: DataType, - mut builder: risingwave_common::array::ArrayBuilderImpl, - ) -> Option { - for val in vec { - let scalar = match (val, &ty) { - (Some(ScalarAdapter::Numeric(numeric)), &DataType::Varchar) => { - if pg_numeric_is_special(&numeric) { - return None; - } else { - Some(ScalarImpl::from(pg_numeric_to_string(&numeric))) - } - } - (Some(ScalarAdapter::Numeric(numeric)), &DataType::Int256) => match numeric { - PgNumeric::Normalized(big_decimal) => { - match Int256::from_str(big_decimal.to_string().as_str()) { - Ok(num) => Some(ScalarImpl::from(num)), - Err(err) => { - tracing::error!(error = %err.as_report(), "parse pg-numeric as rw_int256 failed"); + (ScalarAdapter::Enum(EnumString(s)), &DataType::Varchar) => Some(ScalarImpl::from(s)), + (ScalarAdapter::NumericList(vec), &DataType::List(dtype)) => { + let mut builder = dtype.create_array_builder(0); + for val in vec { + let scalar = match (val, &dtype) { + // A numeric array contains special values like NaN, Inf, -Inf, which are not supported in Debezium, + // when we encounter these special values, we fallback the array to NULL, returning None directly. + (Some(numeric), box DataType::Varchar) => { + if pg_numeric_is_special(&numeric) { return None; + } else { + ScalarAdapter::Numeric(numeric).into_scalar(dtype) } } - } - _ => return None, - }, - (Some(ScalarAdapter::Numeric(numeric)), &DataType::Decimal) => match numeric { - PgNumeric::Normalized(big_decimal) => { - match Decimal::from_str(big_decimal.to_string().as_str()) { - Ok(num) => Some(ScalarImpl::from(num)), - Err(err) => { - tracing::error!(error = %err.as_report(), "parse pg-numeric as rw-numeric failed (likely out-of-range"); + (Some(numeric), box DataType::Int256 | box DataType::Decimal) => { + if pg_numeric_is_special(&numeric) { return None; + } else { + // A PgNumeric can sometimes exceeds the range of Int256 and RwNumeric. + // In our json parsing, we fallback the array to NULL in this case. + // Here we keep the behavior consistent and return None directly. + match ScalarAdapter::Numeric(numeric).into_scalar(dtype) { + Some(scalar) => Some(scalar), + None => { + return None; + } + } } } + (Some(_), _) => unreachable!( + "Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]" + ), + // This item is NULL, continue to handle next item. + (None, _) => None, + }; + builder.append(scalar); + } + Some(ScalarImpl::from(ListValue::new(builder.finish()))) + } + (ScalarAdapter::EnumList(vec), &DataType::List(dtype)) => { + let mut builder = dtype.create_array_builder(0); + for val in vec { + match val { + Some(EnumString(s)) => { + builder.append(Some(ScalarImpl::from(s))); + } + None => { + return None; + } } - _ => return None, - }, - (Some(_), _) => unreachable!( - "into_scalar_in_list should only be called with ScalarAdapter::Numeric types" - ), - (None, _) => None, - }; - builder.append(scalar); + } + Some(ScalarImpl::from(ListValue::new(builder.finish()))) + } + (ScalarAdapter::List(vec), &DataType::List(dtype)) => { + // Due to https://github.com/risingwavelabs/risingwave/issues/16882, INTERVAL_ARRAY is not supported in Debezium, so we keep backfilling and CDC consistent. + if matches!(**dtype, DataType::Interval) { + return None; + } + let mut builder = dtype.create_array_builder(0); + for val in vec { + builder.append(val.and_then(|v| v.into_scalar(dtype))); + } + Some(ScalarImpl::from(ListValue::new(builder.finish()))) + } + (scaler, ty) => { + tracing::error!( + adapter = scaler.name(), + rw_type = ty.pg_name(), + "failed to convert from ScalarAdapter: invalid conversion" + ); + None + } } - Some(builder) } } @@ -251,6 +332,23 @@ fn pg_numeric_to_rw_int256(val: &PgNumeric) -> Option { } } +fn pg_numeric_to_rw_numeric(val: &PgNumeric) -> Option { + match val { + PgNumeric::NegativeInf => Some(ScalarImpl::from(Decimal::NegativeInf)), + PgNumeric::Normalized(big_decimal) => { + match Decimal::from_str(big_decimal.to_string().as_str()) { + Ok(num) => Some(ScalarImpl::from(num)), + Err(err) => { + tracing::error!(error = %err.as_report(), "parse pg-numeric as rw-numeric failed (likely out-of-range"); + None + } + } + } + PgNumeric::PositiveInf => Some(ScalarImpl::from(Decimal::PositiveInf)), + PgNumeric::NaN => Some(ScalarImpl::from(Decimal::NaN)), + } +} + fn pg_numeric_to_string(val: &PgNumeric) -> String { // TODO(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN // The current implementation is to ensure consistency with the behavior of cdc event parsor. @@ -262,11 +360,20 @@ fn pg_numeric_to_string(val: &PgNumeric) -> String { } } -fn string_to_pg_numeric(s: &str) -> super::ConnectorResult { - Ok(match s { +fn string_to_pg_numeric(s: &str) -> PgNumeric { + match s { "NEGATIVE_INFINITY" => PgNumeric::NegativeInf, "POSITIVE_INFINITY" => PgNumeric::PositiveInf, "NAN" => PgNumeric::NaN, _ => PgNumeric::Normalized(s.parse().unwrap()), - }) + } +} + +fn rw_numeric_to_pg_numeric(val: Decimal) -> PgNumeric { + match val { + Decimal::NegativeInf => PgNumeric::NegativeInf, + Decimal::Normalized(inner) => PgNumeric::Normalized(inner.to_string().parse().unwrap()), + Decimal::PositiveInf => PgNumeric::PositiveInf, + Decimal::NaN => PgNumeric::NaN, + } } diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index a4e3117b49519..4d4a89a3248cd 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -241,7 +241,7 @@ impl PostgresExternalTableReader { let stream = match start_pk_row { Some(ref pk_row) => { - let params: Vec>> = pk_row + let params: Vec> = pk_row .iter() .zip_eq_fast(self.prepared_scan_stmt.params()) .map(|(datum, ty)| { @@ -260,7 +260,7 @@ impl PostgresExternalTableReader { Self::get_normalized_table_name(&table_name), order_key, ); - let params: Vec>> = vec![]; + let params: Vec> = vec![]; client.query_raw(&sql, ¶ms).await? } };