Skip to content

Commit

Permalink
Fix: MAP value converter for data via debezium (#2156)
Browse files Browse the repository at this point in the history
Fix the HSTORE datatype value converter (MAP) to convert values properly from debezium in the debezium exporter.
Report the store datatype in Unsupported datatype for live migration with fall-forward/fall-back as gRPC connector doesn't support it yet.
https://yugabyte.atlassian.net/browse/DB-14753
Added end-to-end tests for HSTORE datatype in pg/datatypes with data validation.
  • Loading branch information
priyanshi-yb authored Jan 20, 2025
1 parent d06378e commit ae2a72f
Show file tree
Hide file tree
Showing 26 changed files with 279 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class ensures of doing any transformation of the record received from debezium
* before actually writing that record.
*/
public class DebeziumRecordTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordTransformer.class);

private JsonConverter jsonConverter;
public DebeziumRecordTransformer(){
Expand Down Expand Up @@ -70,6 +74,42 @@ private String makeFieldValueSerializable(Object fieldValue, Field field){
case BYTES:
case STRUCT:
return toKafkaConnectJsonConverted(fieldValue, field);
case MAP:
StringBuilder mapString = new StringBuilder();
for (Map.Entry<String, String> entry : ((HashMap<String, String>) fieldValue).entrySet()) {
String key = entry.getKey();
String val = entry.getValue();
LOGGER.debug("[MAP] before transforming key - {}", key);
LOGGER.debug("[MAP] before transforming value - {}", val);
/*
Escaping the key and value here for the double quote (")" and backslash char (\) with a backslash character as mentioned here
https://www.postgresql.org/docs/9/hstore.html#:~:text=To%20include%20a%20double%20quote%20or%20a%20backslash%20in%20a%20key%20or%20value%2C%20escape%20it%20with%20a%20backslash.
Following the order of escaping the backslash first and then the double quote becasue first escape the backslashes in the string and adding the backslash for escaping to handle case like
e.g. key - "a\"b" -> (first escaping) -> "a\\"b" -> (second escaping) -> "a\\\"b"
*/
key = key.replace("\\", "\\\\"); // escaping backslash \ -> \\ ( "a\b" -> "a\\b" ) "
val = val.replace("\\", "\\\\");
key = key.replace("\"", "\\\""); // escaping double quotes " -> \" ( "a"b" -> "a\"b" ) "
val = val.replace("\"", "\\\"");

LOGGER.debug("[MAP] after transforming key - {}", key);
LOGGER.debug("[MAP] after transforming value - {}", val);

mapString.append("\"");
mapString.append(key);
mapString.append("\"");
mapString.append(" => ");
mapString.append("\"");
mapString.append(val);
mapString.append("\"");
mapString.append(",");
}
if(mapString.length() == 0) {
return "";
}
return mapString.toString().substring(0, mapString.length() - 1);

}
return fieldValue.toString();
}
Expand Down
3 changes: 1 addition & 2 deletions migtests/scripts/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ EOF
run_sqlplus_as_sys ${pdb_name} "create-pdb-tablespace.sql"
cp ${SCRIPTS}/oracle/live-grants.sql oracle-inputs.sql
run_sqlplus_as_sys ${cdb_name} "oracle-inputs.sql"
rm create-pdb-tablespace.sql
rm oracle-inputs.sql
}

grant_permissions_for_live_migration_pg() {
Expand Down Expand Up @@ -394,6 +392,7 @@ import_data() {
--send-diagnostics=false
--truncate-splits true
--max-retries 1
--yes
"

if [ "${SOURCE_DB_TYPE}" != "postgresql" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,13 @@ CREATE TABLE public.locations (

CREATE TABLE image (title text, raster lo);

CREATE TABLE employees (id INT PRIMARY KEY, salary INT, data hstore);

-- IS JSON Predicate
CREATE TABLE public.json_data (
id SERIAL PRIMARY KEY,
data_column TEXT NOT NULL CHECK (data_column IS JSON)
);
CREATE TABLE employees (id INT PRIMARY KEY, salary INT);
-- create table with multirange data types

-- Create tables with primary keys directly
Expand Down
11 changes: 11 additions & 0 deletions migtests/tests/analyze-schema/expected_issues.json
Original file line number Diff line number Diff line change
Expand Up @@ -2143,5 +2143,16 @@
"GH": "https://github.com/yugabyte/yugabyte-db/issues/25575",
"DocsLink": "https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#postgresql-12-and-later-features",
"MinimumVersionsFixedIn": null
},
{
"IssueType": "migration_caveats",
"ObjectType": "TABLE",
"ObjectName": "employees",
"Reason": "Unsupported datatype for Live migration with fall-forward/fallback - hstore on column - data",
"SqlStatement": "CREATE TABLE employees (id INT PRIMARY KEY, salary INT, data hstore);",
"Suggestion": "",
"GH": "https://github.com/yugabyte/yb-voyager/issues/1731",
"DocsLink": "https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#unsupported-datatypes-by-voyager-during-live-migration",
"MinimumVersionsFixedIn": null
}
]
2 changes: 1 addition & 1 deletion migtests/tests/analyze-schema/summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
{
"ObjectType": "TABLE",
"TotalCount": 64,
"InvalidCount": 53,
"InvalidCount": 54,
"ObjectNames": "test_table_in_type_file, sales_data, salaries2, sales, test_1, test_2, test_non_pk_multi_column_list, test_3, test_4, test_5, test_6, test_7, test_8, test_9, order_details, public.employees4, enum_example.bugs, table_xyz, table_abc, table_1, table_test, test_interval, public.range_columns_partition_test, public.range_columns_partition_test_copy, anydata_test, anydataset_test, anytype_test, uritype_test, \"Test\", public.meeting, public.pr, public.foreign_def_test, public.users, foreign_def_test1, foreign_def_test2, unique_def_test, unique_def_test1, test_xml_type, test_xid_type, public.test_jsonb, public.inet_type, public.citext_type, public.documents, public.ts_query_table, combined_tbl, combined_tbl1, test_udt, test_arr_enum, public.locations, public.xml_data_example, image, public.json_data, employees, bigint_multirange_table, date_multirange_table, int_multirange_table, numeric_multirange_table, timestamp_multirange_table, timestamptz_multirange_table, users_unique_nulls_distinct, users_unique_nulls_not_distinct, sales_unique_nulls_not_distinct, sales_unique_nulls_not_distinct_alter, users_unique_nulls_not_distinct_index" },
{
"ObjectType": "INDEX",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
},
{
"ObjectType": "EXTENSION",
"TotalCount": 4,
"TotalCount": 5,
"InvalidCount": 0,
"ObjectNames": "citext, pgcrypto, pg_stat_statements, lo"
"ObjectNames": "citext, pgcrypto, pg_stat_statements, lo, hstore"
},
{
"ObjectType": "TYPE",
Expand Down Expand Up @@ -1107,7 +1107,7 @@
"SchemaName": "public",
"ObjectName": "combined_tbl",
"RowCount": 0,
"ColumnCount": 12,
"ColumnCount": 13,
"Reads": 0,
"Writes": 0,
"ReadsPerSecond": 0,
Expand Down Expand Up @@ -2779,6 +2779,10 @@
{
"ObjectName": "schema2.products.item (schema2.item_details)",
"SqlStatement": ""
},
{
"ObjectName": "public.combined_tbl.data (public.hstore)",
"SqlStatement": ""
}
],
"DocsLink": "https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#unsupported-datatypes-by-voyager-during-live-migration",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ CREATE TYPE public.address_type AS (
);

CREATE EXTENSION lo;

CREATE EXTENSION hstore;
--other misc types
create table public.combined_tbl (
id int,
Expand All @@ -220,6 +222,7 @@ create table public.combined_tbl (
address address_type,
raster lo,
arr_enum enum_kind[],
data hstore,
PRIMARY KEY (id, arr_enum)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,44 @@
"exported_updates": 0,
"exported_deletes": 0,
"final_row_count": 2
},
{
"table_name": "public.\"hstore_example\"",
"db_type": "source",
"exported_snapshot_rows": 13,
"imported_snapshot_rows": 0,
"imported_inserts": 0,
"imported_updates": 0,
"imported_deletes": 0,
"exported_inserts": 3,
"exported_updates": 5,
"exported_deletes": 0,
"final_row_count": 16
},
{
"table_name": "public.\"hstore_example\"",
"db_type": "target",
"exported_snapshot_rows": 0,
"imported_snapshot_rows": 13,
"imported_inserts": 3,
"imported_updates": 5,
"imported_deletes": 0,
"exported_inserts": 0,
"exported_updates": 0,
"exported_deletes": 0,
"final_row_count": 16
},
{
"table_name": "public.\"hstore_example\"",
"db_type": "source-replica",
"exported_snapshot_rows": 0,
"imported_snapshot_rows": 13,
"imported_inserts": 3,
"imported_updates": 5,
"imported_deletes": 0,
"exported_inserts": 0,
"exported_updates": 0,
"exported_deletes": 0,
"final_row_count": 16
}
]
5 changes: 5 additions & 0 deletions migtests/tests/pg/datatypes/export_data_status-report.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,10 @@
"table_name": "null_and_default",
"status": "DONE",
"exported_count": 2
},
{
"exported_count": 13,
"status": "DONE",
"table_name": "hstore_example"
}
]
7 changes: 7 additions & 0 deletions migtests/tests/pg/datatypes/import_data_status-report.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
"imported_count": 3,
"percentage_complete": 100
},
{
"table_name": "public.\"hstore_example\"",
"status": "DONE",
"total_count": 13,
"imported_count": 13,
"percentage_complete": 100
},
{
"table_name": "public.\"null_and_default\"",
"status": "DONE",
Expand Down
19 changes: 19 additions & 0 deletions migtests/tests/pg/datatypes/pg_datatypes_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,22 @@ select * from datatypes2;
insert into null_and_default (id) VALUES (1);
insert into null_and_default VALUES(2, NULL, NULL, NULL);

INSERT INTO hstore_example (data)
VALUES
('"key1"=>"value1", "key2"=>"value2"'),
(hstore('a"b', 'd\"a')),
(NULL),
(''),
('key1 => value1, key2 => value2'),
(hstore(ARRAY['key1', 'key2'], ARRAY['value1', 'value2'])),
('key7 => value7, key8 => 123, key9 => true'),
('"paperback" => "243",
"publisher" => "postgresqltutorial.com",
"language" => "English",
"ISBN-13" => "978-1449370000",
"weight" => "11.2 ounces"'),
(hstore(ROW(1,'{"key1=value1, key2=value2"}'))),
(hstore('json_field', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')), --hstore() key and values need no extra processing
('"{\"key1=value1, key2=value2\"}"=>"{\"key1=value1, key2={\"key1=value1, key2=value2\"}\"}"'), --single quotes string need to escaped properly
(hstore('"{""key1"":""value1"",""key2"":""value2""}"', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')),
(hstore('"{key1:value1,key2:value2}"', '{"key1=value1, key2={"key1=value1, key2=value2"}"}'));
6 changes: 5 additions & 1 deletion migtests/tests/pg/datatypes/pg_datatypes_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@ create table datatypes2(id serial primary key, v1 json, v2 BIT(10), v3 int ARRAY
drop table if exists null_and_default;
create table null_and_default(id int PRIMARY KEY, b boolean default false, i int default 10, val varchar default 'testdefault');

create EXTENSION hstore;


CREATE TABLE hstore_example (
id SERIAL PRIMARY KEY,
data hstore
);
29 changes: 29 additions & 0 deletions migtests/tests/pg/datatypes/source_delta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,34 @@ UPDATE datatypes2
SET v1 = '{"updated": true}', v2 = B'0101010101', v5 = B'101010101010101010101010101010', v3 = ARRAY[5, 6, 7, 8], v4 = '{{"e", "f"}, {"g", "h"}}'
WHERE v1 IS NULL;

UPDATE hstore_example
SET data = data || 'key3 => value3'
WHERE id = 1;

UPDATE hstore_example
SET data = hstore('{"key1=value1, key2=value2"}', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')
WHERE id = 3;

UPDATE hstore_example
SET data = '"{\"key1=value1, key2=value2\"}"=>"{\"key1=value1, key2={\"key1=value1, key2=value2\"}\"}"'
WHERE id = 7;

INSERT INTO hstore_example (data)
VALUES
('key5 => value5, key6 => value6');

INSERT INTO hstore_example (data)
VALUES
(hstore('{"key1=value1, key2=value2"}', '{"key1=value1, key2={"key1=value1, key2=value2"}"}'));

INSERT INTO hstore_example (data)
VALUES
('');

UPDATE hstore_example
SET data = NULL
WHERE id = 5;

UPDATE hstore_example
SET data = ''
WHERE id = 6;
23 changes: 23 additions & 0 deletions migtests/tests/pg/datatypes/target_delta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,26 @@ SET v1 = '{"new": "data"}', v2 = B'1111000011', v5=B'001010100101010101010101010

DELETE FROM datatypes2
WHERE 5 = ANY(v3);

-- NOT WORKING WIT H YB CDC GRPC connector as of now
-- INSERT INTO hstore_example (data)
-- VALUES
-- ('key7 => value7, key8 => value8');

-- UPDATE hstore_example
-- SET data = delete(data, 'key2')
-- WHERE id = 8;

-- DELETE FROM hstore_example WHERE data ? 'key5';

-- INSERT INTO hstore_example (data)
-- VALUES
-- (hstore('"{""key1"":""value1"",""key2"":""value2""}"', '{"key1=value1, key2={"key1=value1, key2=value2"}"}'));

-- UPDATE hstore_example
-- SET data = hstore('{"key1=value1, key2=value2"}', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')
-- WHERE id = 15;

-- UPDATE hstore_example
-- SET data = '"{\"key1=value1, key2=value2\"}"=>"{\"key1=value1, key2={\"key1=value1, key2=value2\"}\"}"'
-- WHERE id = 14;
11 changes: 10 additions & 1 deletion migtests/tests/pg/datatypes/validate
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ EXPECTED_ROW_COUNT = {
'datetime_type2': 2,
'null_and_default' :2,
'decimal_types': 3,
'hstore_example': 13,
}

EXPECTED_SUM_OF_COLUMN = {
Expand Down Expand Up @@ -73,7 +74,7 @@ def migration_completed_checks_fb():
def migration_completed_checks(tgt):
table_list = tgt.get_table_names("public")
print("table_list:", table_list)
assert len(table_list) == 7
assert len(table_list) == 8

got_row_count = tgt.row_count_of_all_tables("public")
for table_name, row_count in EXPECTED_ROW_COUNT.items():
Expand Down Expand Up @@ -146,6 +147,14 @@ def migration_completed_checks(tgt):
tgt.assert_all_values_of_col("null_and_default", "i", "public", expected_values=[10, None])
tgt.assert_all_values_of_col("null_and_default", "b", "public", expected_values=[False, None])

print("hstore_example:")
expected_hstore_values=['"f1"=>"1", "f2"=>"{\\"key1=value1, key2=value2\\"}"', None, '"json_field"=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"',
'"weight"=>"11.2 ounces", "ISBN-13"=>"978-1449370000", "language"=>"English", "paperback"=>"243", "publisher"=>"postgresqltutorial.com"', '"key1"=>"value1", "key2"=>"value2"',
'"\\"{key1:value1,key2:value2}\\""=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"', '"a\\"b"=>"d\\\\\\"a"', '"{\\"key1=value1, key2=value2\\"}"=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"',
'"key7"=>"value7", "key8"=>"123", "key9"=>"true"', '"\\"{\\"\\"key1\\"\\":\\"\\"value1\\"\\",\\"\\"key2\\"\\":\\"\\"value2\\"\\"}\\""=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"', '']
tgt.assert_distinct_values_of_col("hstore_example", "data", "public", expected_distinct_values=expected_hstore_values)


def YB_specific_checks(tgt):
yb.verify_colocation(tgt, "postgresql")

Expand Down
10 changes: 10 additions & 0 deletions migtests/tests/pg/datatypes/validateAfterChanges
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ EXPECTED_ROW_COUNT = {
'datetime_type2': 2,
'null_and_default' :2,
'decimal_types': 4,
'hstore_example': 16,
}

EXPECTED_SUM_OF_COLUMN = {
Expand Down Expand Up @@ -65,6 +66,7 @@ EXPECTED_ROW_COUNT_FF = {
'datetime_type2': 3,
'null_and_default' :2,
'decimal_types': 4,
'hstore_example': 16,
}

EXPECTED_SUM_OF_COLUMN_FF = {
Expand Down Expand Up @@ -128,6 +130,14 @@ def migration_completed_checks(tgt):
expected_distinct_values = EXPECTED_DISNTICT_VALUES['v5']
tgt.assert_distinct_values_of_col("datatypes2", "v5", "public",
None, expected_distinct_values = expected_distinct_values)

print("hstore_example:")
expected_hstore_values=['"f1"=>"1", "f2"=>"{\\"key1=value1, key2=value2\\"}"', None, '"json_field"=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"',
'"weight"=>"11.2 ounces", "ISBN-13"=>"978-1449370000", "language"=>"English", "paperback"=>"243", "publisher"=>"postgresqltutorial.com"',
'"key1"=>"value1", "key2"=>"value2", "key3"=>"value3"', '"\\"{key1:value1,key2:value2}\\""=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"',
'"a\\"b"=>"d\\\\\\"a"', '"{\\"key1=value1, key2=value2\\"}"=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"', '"key5"=>"value5", "key6"=>"value6"',
'"\\"{\\"\\"key1\\"\\":\\"\\"value1\\"\\",\\"\\"key2\\"\\":\\"\\"value2\\"\\"}\\""=>"{\\"key1=value1, key2={\\"key1=value1, key2=value2\\"}\\"}"', '']
tgt.assert_distinct_values_of_col("hstore_example", "data", "public", expected_distinct_values=expected_hstore_values)

if __name__ == "__main__":
main()
Loading

0 comments on commit ae2a72f

Please sign in to comment.