Skip to content

Commit

Permalink
refactor(postgres-cdc): refactor postgres_row_to_owned_row (#16714)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored May 28, 2024
1 parent a493616 commit 12f5c0d
Show file tree
Hide file tree
Showing 13 changed files with 576 additions and 454 deletions.
141 changes: 141 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ select id, my_int from list_with_null_shared order by id;
2 {NULL,3,4}
3 {NULL,-3,-4}
4 {-4,-5,-6}
5 NULL
6 NULL

# my_num: varchar[]
query II
Expand All @@ -137,6 +139,8 @@ select id, my_num from list_with_null_shared order by id;
2 {2.2,0,NULL}
3 NULL
4 {NULL,-99999999999999999.9999}
5 NULL
6 NULL

# my_num1: numeric[]
query II
Expand All @@ -146,6 +150,8 @@ select id, my_num_1 from list_with_null_shared order by id;
2 {2.2,0,NULL}
3 NULL
4 {NULL,-99999999999999999.9999}
5 NULL
6 NULL

# my_num2: rw_int256[]
query II
Expand All @@ -155,6 +161,8 @@ select id, my_num_2 from list_with_null_shared order by id;
2 NULL
3 NULL
4 NULL
5 NULL
6 NULL

# Due to the bug in Debezium, if a enum list contains `NULL`, the list will be converted to `NULL`
query II
Expand All @@ -164,6 +172,8 @@ select id, my_mood from list_with_null_shared order by id;
2 {happy,ok,sad}
3 NULL
4 NULL
5 NULL
6 NULL

query II
select id, my_uuid from list_with_null_shared order by id;
Expand All @@ -172,6 +182,8 @@ select id, my_uuid from list_with_null_shared order by id;
2 {2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}
3 {NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829}
4 {b2e4636d-fa03-4ad4-bf16-029a79dca3e2}
5 NULL
6 NULL

query II
select id, my_bytea from list_with_null_shared order by id;
Expand All @@ -180,3 +192,132 @@ select id, my_bytea from list_with_null_shared order by id;
2 {"\\x00","\\x01","\\x02"}
3 {NULL,"\\x99","\\xaa"}
4 {"\\x88","\\x99","\\xaa"}
5 NULL
6 NULL

query TTTTTTT
SELECT c_boolean, c_smallint, c_integer, c_bigint, c_decimal, c_real, c_double_precision
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
f -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000
f 0 0 0 0 0 0
f NULL NULL 1 NULL NULL NULL
f 1 123 1234567890 123.45 123.45 123.456
t -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000
t 0 0 0 0 0 0
t NULL NULL 1 NULL NULL NULL
t 1 123 1234567890 123.45 123.45 123.456

query TTTTTTT
SELECT c_varchar, c_bytea
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
d \x00
(empty) \x00
NULL NULL
example \xdeadbeef
d \x00
(empty) \x00
NULL NULL
example \xdeadbeef

query TTTTTTT
SELECT c_date, c_time, c_timestamp, c_timestamptz, c_interval
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00
0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00
0024-05-19 NULL NULL NULL NULL
0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day
0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00
0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00
0024-05-19 NULL NULL NULL NULL
0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day

query TTTTTTT
SELECT c_jsonb, c_uuid, c_enum
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
{} bb488f9b-330d-4012-b849-12adeb49e57e happy
{} NULL sad
NULL NULL NULL
{"key": "value"} 123e4567-e89b-12d3-a456-426614174000 happy
{} bb488f9b-330d-4012-b849-12adeb49e57e happy
{} NULL sad
NULL NULL NULL
{"key": "value"} 123e4567-e89b-12d3-a456-426614174000 happy

query TTTTTTT
SELECT c_boolean_array, c_smallint_array, c_integer_array, c_bigint_array
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
{f} {-32767} {-2147483647} {-9223372036854775807}
{} {} {} {}
NULL NULL NULL NULL
{NULL,t} {NULL,1} {NULL,123} {NULL,1234567890}
{f} {-32767} {-2147483647} {-9223372036854775807}
{} {} {} {}
NULL NULL NULL NULL
{NULL,t} {NULL,1} {NULL,123} {NULL,1234567890}


query TTTTTTT
SELECT c_decimal_array, c_real_array, c_double_precision_array
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
{-10.0} {-10000} {-10000}
{} {} {}
NULL NULL NULL
{NULL,123.45} {NULL,123.45} {NULL,123.456}
{-10.0} {-10000} {-10000}
{} {} {}
NULL NULL NULL
{NULL,123.45} {NULL,123.45} {NULL,123.456}

query TTTTTTT
SELECT c_varchar_array, c_bytea_array
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
{""} {"\\x00"}
{} {}
NULL NULL
{NULL,example} {NULL,"\\xdeadbeef"}
{""} {"\\x00"}
{} {}
NULL NULL
{NULL,example} {NULL,"\\xdeadbeef"}

query TTTTTTT
SELECT c_date_array, c_time_array, c_timestamp_array, c_timestamptz_array, c_interval_array
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
{0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL
{} {} {} {} NULL
NULL NULL NULL NULL NULL
{NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL
{0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL
{} {} {} {} NULL
NULL NULL NULL NULL NULL
{NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL

query TTTTTTT
SELECT c_jsonb_array, c_uuid_array, c_enum_array
FROM postgres_all_types
ORDER BY c_boolean, c_bigint, c_date;
----
{"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} NULL
{} {} {}
NULL NULL NULL
{NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL
{"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} NULL
{} {} {}
NULL NULL NULL
{NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL
10 changes: 8 additions & 2 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ create table rw.products_test ( id INT,
PRIMARY KEY (id)
) include timestamp as commit_ts from mysql_mytest table 'mytest.products';

# sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill.
sleep 10s

system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'Milk','Milk is a white liquid food');
INSERT INTO orders VALUES (default, '2023-11-28 15:08:22', 'Bob', 10.52, 100, false);"
Expand Down Expand Up @@ -206,6 +209,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_uuid varchar,
c_enum varchar,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
Expand All @@ -221,7 +226,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid varchar,
c_uuid_array varchar[],
c_enum_array varchar[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
) from pg_source table 'public.postgres_all_types';

Expand Down Expand Up @@ -253,7 +259,7 @@ sleep 3s
query TTTTTTT
SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array,c_uuid FROM postgres_all_types where c_bigint=-9223372036854775807
----
f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e
f 0001-01-01 00:00:00 2001-01-01 00:00:00 {} {-32767} {"2001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e


# postgres streaming test
Expand Down
15 changes: 10 additions & 5 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ create table abs.t1 ("V1" int primary key, v2 double precision, v3 varchar, v4 n
create publication my_publicaton for table abs.t1 ("V1", v3);
insert into abs.t1 values (1, 1.1, 'aaa', '5431.1234');

CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');

CREATE TABLE IF NOT EXISTS postgres_all_types(
c_boolean boolean,
Expand All @@ -52,6 +53,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_uuid uuid,
c_enum mood,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
Expand All @@ -67,11 +70,14 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid uuid,
c_uuid_array uuid[],
c_enum_array mood[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e');
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,NULL}');
INSERT INTO postgres_all_types VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'example', '\xDEADBEEF', '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', '{"key": "value"}', '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'example'], ARRAY[NULL, '\xDEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, '{"key": "value"}'::jsonb], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]);
INSERT INTO postgres_all_types VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);

create table numeric_table(id int PRIMARY KEY, num numeric);
insert into numeric_table values(1, 3.14);
Expand All @@ -89,8 +95,6 @@ create table numeric_list(id int primary key, num numeric[]);
insert into numeric_list values(1, '{3.14, 6, 57896044618658097711785492504343953926634992332820282019728792003956564819967, 57896044618658097711785492504343953926634992332820282019728792003956564819968, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555}');
insert into numeric_list values(2, '{nan, infinity, -infinity}');

--- for https://github.com/risingwavelabs/risingwave/issues/16392
CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');
CREATE TABLE enum_table (
id int PRIMARY KEY,
current_mood mood
Expand All @@ -100,3 +104,4 @@ INSERT INTO enum_table VALUES (1, 'happy');
CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], my_num_1 numeric[], my_num_2 numeric[], my_mood mood[], my_uuid uuid[], my_bytea bytea[]);
INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}');
INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}');
INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
7 changes: 7 additions & 0 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ SELECT pg_current_wal_lsn();
select * from pg_publication_tables where pubname='rw_publication';
select * from public.person order by id;

INSERT INTO postgres_all_types VALUES ( True, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]);
INSERT INTO postgres_all_types VALUES ( True, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '\x00', '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,NULL}');
INSERT INTO postgres_all_types VALUES ( True, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'example', '\xDEADBEEF', '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', '{"key": "value"}', '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'example'], ARRAY[NULL, '\xDEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, '{"key": "value"}'::jsonb], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]);
INSERT INTO postgres_all_types VALUES ( True, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);

insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
--- 2^255
insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968);
Expand All @@ -25,3 +30,5 @@ INSERT INTO enum_table VALUES (3, 'sad');
--- to avoid escaping issues of psql -c "", we insert this row here and check the result in check_new_rows.slt
INSERT INTO list_with_null VALUES (3, '{NULL,-3,-4}', '{NULL,nan,-inf}', '{NULL,nan,-inf}', '{NULL,nan,-inf}', '{NULL,sad,ok}', '{NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829}', '{NULL,\\x99,\\xAA}');
INSERT INTO list_with_null VALUES (4, '{-4,-5,-6}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,sad,ok}', '{b2e4636d-fa03-4ad4-bf16-029a79dca3e2}', '{\\x88,\\x99,\\xAA}');
INSERT INTO list_with_null VALUES (6, NULL, NULL, NULL, NULL, NULL, NULL, NULL);

Loading

0 comments on commit 12f5c0d

Please sign in to comment.