Skip to content

Commit 8a77a37

Browse files
Updates mysql and postgres processors to handle null strings in json columns (#3305)
1 parent 7ddb6aa commit 8a77a37

File tree

6 files changed

+27
-6
lines changed

6 files changed

+27
-6
lines changed

internal/database-record-mapper/mysql/mapper.go

+6
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ func parseMysqlRowValues(values []any, columnNames, columnDbTypes []string) (map
6161
col := columnNames[i]
6262
colDataType := columnDbTypes[i]
6363
switch t := v.(type) {
64+
case nil:
65+
jObj[col] = t
6466
case time.Time:
6567
dt, err := neosynctypes.NewDateTimeFromMysql(t)
6668
if err != nil {
@@ -69,6 +71,10 @@ func parseMysqlRowValues(values []any, columnNames, columnDbTypes []string) (map
6971
jObj[col] = dt
7072
case []byte:
7173
if strings.EqualFold(colDataType, "json") {
74+
if string(t) == "null" {
75+
jObj[col] = string(t)
76+
continue
77+
}
7278
var js any
7379
if err := json.Unmarshal(t, &js); err != nil {
7480
return nil, err

internal/database-record-mapper/mysql/mapper_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,21 @@ func Test_parseMysqlRowValues(t *testing.T) {
5353
})
5454

5555
t.Run("JSON Columns", func(t *testing.T) {
56-
values := []any{[]byte(`"Hello"`), []byte(`true`), []byte(`null`), []byte(`42`), []byte(`{"items": ["book", "pen"], "count": 2, "in_stock": true}`), []byte(`[1,2,3]`)}
57-
columnNames := []string{"text_col", "bool_col", "null_col", "int_col", "json_col", "array_col"}
58-
cTypes := []string{"json", "json", "json", "json", "json", "json"}
56+
values := []any{[]byte(`"Hello"`), []byte(`true`), []byte(`null`), []byte(`42`), []byte(`{"items": ["book", "pen"], "count": 2, "in_stock": true}`), []byte(`[1,2,3]`), nil}
57+
columnNames := []string{"text_col", "bool_col", "null_col", "int_col", "json_col", "array_col", "nil_col"}
58+
cTypes := []string{"json", "json", "json", "json", "json", "json", "json"}
5959

6060
result, err := parseMysqlRowValues(values, columnNames, cTypes)
6161
require.NoError(t, err)
6262

6363
expected := map[string]any{
6464
"text_col": "Hello",
6565
"bool_col": true,
66-
"null_col": nil,
66+
"null_col": "null",
6767
"int_col": float64(42),
6868
"json_col": map[string]any{"items": []any{"book", "pen"}, "count": float64(2), "in_stock": true},
6969
"array_col": []any{float64(1), float64(2), float64(3)},
70+
"nil_col": nil,
7071
}
7172
require.Equal(t, expected, result)
7273
})

internal/database-record-mapper/postgres/mapper.go

+4
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ func parsePgRowValues(values []any, columnNames, columnTypes []string) (map[stri
109109
}
110110
jObj[col] = val
111111
case *NullableJSON:
112+
if t.Valid && string(t.RawMessage) == "null" {
113+
jObj[col] = string(t.RawMessage)
114+
continue
115+
}
112116
js, err := t.Unmarshal()
113117
if err != nil {
114118
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)

internal/database-record-mapper/postgres/mapper_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,11 @@ func Test_parsePgRowValues(t *testing.T) {
220220
&NullableJSON{RawMessage: json.RawMessage(`42`), Valid: true},
221221
&NullableJSON{RawMessage: json.RawMessage(`{"items": ["book", "pen"], "count": 2, "in_stock": true}`), Valid: true},
222222
&NullableJSON{RawMessage: json.RawMessage(`[1,2,3]`), Valid: true},
223+
nil,
224+
&NullableJSON{RawMessage: json.RawMessage(`null`), Valid: true},
223225
}
224-
columnNames := []string{"text_col", "bool_col", "null_col", "int_col", "json_col", "array_col"}
225-
columnTypes := []string{"json", "json", "json", "json", "json", "_json"}
226+
columnNames := []string{"text_col", "bool_col", "null_col", "int_col", "json_col", "array_col", "nil_col", "null_json"}
227+
columnTypes := []string{"json", "json", "json", "json", "json", "_json", "json", "json"}
226228

227229
result, err := parsePgRowValues(values, columnNames, columnTypes)
228230
require.NoError(t, err)
@@ -234,6 +236,8 @@ func Test_parsePgRowValues(t *testing.T) {
234236
"int_col": float64(42),
235237
"json_col": map[string]any{"items": []any{"book", "pen"}, "count": float64(2), "in_stock": true},
236238
"array_col": []any{float64(1), float64(2), float64(3)},
239+
"nil_col": nil,
240+
"null_json": "null",
237241
}
238242
require.Equal(t, expected, result)
239243
})

worker/pkg/benthos/sql/processor_neosync_mysql.go

+3
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ func getMysqlValue(value any, colDefaults *neosync_benthos.ColumnDefaultProperti
148148
}
149149
return validJson, nil
150150
}
151+
if value == "null" {
152+
return value, nil
153+
}
151154
bits, err := json.Marshal(value)
152155
if err != nil {
153156
return nil, fmt.Errorf("unable to marshal mysql json to bits: %w", err)

worker/pkg/benthos/sql/processor_neosync_pgx.go

+3
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ func getPgxValue(value any, colDefaults *neosync_benthos.ColumnDefaultProperties
145145

146146
switch {
147147
case strings.EqualFold(datatype, "json") || strings.EqualFold(datatype, "jsonb"):
148+
if value == "null" {
149+
return value, nil
150+
}
148151
bits, err := json.Marshal(value)
149152
if err != nil {
150153
return nil, fmt.Errorf("unable to marshal postgres json to bits: %w", err)

0 commit comments

Comments
 (0)