diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index cbaeeae66fb27..7b78c64af0ca7 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -424,7 +424,7 @@ protected static Object convertTo(Schema toSchema, Schema fromSchema, Object val return BigDecimal.valueOf(converted); } if (value instanceof String) { - return new BigDecimal(value.toString()).doubleValue(); + return new BigDecimal(value.toString()); } } if (value instanceof ByteBuffer) { @@ -802,11 +802,12 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No try { if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) { List result = new ArrayList<>(); + boolean compatible = true; Schema elementSchema = null; while (parser.hasNext()) { if (parser.canConsume(ARRAY_END_DELIMITER)) { Schema listSchema; - if (elementSchema != null) { + if (elementSchema != null && compatible) { listSchema = SchemaBuilder.array(elementSchema).schema(); result = alignListEntriesWithSchema(listSchema, result); } else { @@ -821,6 +822,9 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No } SchemaAndValue element = parse(parser, true); elementSchema = commonSchemaFor(elementSchema, element); + if (elementSchema == null && element != null && element.schema() != null) { + compatible = false; + } result.add(element != null ? element.value() : null); int currentPosition = parser.mark(); @@ -840,15 +844,17 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No if (parser.canConsume(MAP_BEGIN_DELIMITER)) { Map result = new LinkedHashMap<>(); + boolean keyCompatible = true; Schema keySchema = null; + boolean valueCompatible = true; Schema valueSchema = null; while (parser.hasNext()) { if (parser.canConsume(MAP_END_DELIMITER)) { Schema mapSchema; - if (keySchema != null && valueSchema != null) { + if (keySchema != null && valueSchema != null && keyCompatible && valueCompatible) { mapSchema = SchemaBuilder.map(keySchema, valueSchema).build(); result = alignMapKeysAndValuesWithSchema(mapSchema, result); - } else if (keySchema != null) { + } else if (keySchema != null && keyCompatible) { mapSchema = SchemaBuilder.mapWithNullValues(keySchema); result = alignMapKeysWithSchema(mapSchema, result); } else { @@ -876,7 +882,13 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No parser.canConsume(COMMA_DELIMITER); keySchema = commonSchemaFor(keySchema, key); + if (keySchema == null && key.schema() != null) { + keyCompatible = false; + } valueSchema = commonSchemaFor(valueSchema, value); + if (valueSchema == null && value != null && value.schema() != null) { + valueCompatible = false; + } } // Missing either a comma or an end delimiter if (COMMA_DELIMITER.equals(parser.previous())) { diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index 3700a6ee4e6cc..abb6ea4221460 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -25,6 +25,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -363,33 +364,73 @@ public void shouldConvertStringOfListWithOnlyNumericElementTypesIntoListOfLarges } /** - * The parsed array has byte values and one int value, so we should return list with single unified type of integers. + * We parse into different element types, but cannot infer a common element schema. + * This behavior should be independent of the order that the elements appear in the string */ @Test - public void shouldConvertStringOfListWithMixedElementTypesIntoListWithDifferentElementTypes() { - String str = "[1, 2, \"three\"]"; - List list = Values.convertToList(Schema.STRING_SCHEMA, str); - assertEquals(3, list.size()); - assertEquals(1, ((Number) list.get(0)).intValue()); - assertEquals(2, ((Number) list.get(1)).intValue()); - assertEquals("three", list.get(2)); + public void shouldParseStringListWithMultipleElementTypes() { + assertParseStringArrayWithNoSchema( + Arrays.asList((byte) 1, (byte) 2, (short) 300, "four"), + "[1, 2, 300, \"four\"]"); + assertParseStringArrayWithNoSchema( + Arrays.asList((byte) 2, (short) 300, "four", (byte) 1), + "[2, 300, \"four\", 1]"); + assertParseStringArrayWithNoSchema( + Arrays.asList((short) 300, "four", (byte) 1, (byte) 2), + "[300, \"four\", 1, 2]"); + assertParseStringArrayWithNoSchema( + Arrays.asList("four", (byte) 1, (byte) 2, (short) 300), + "[\"four\", 1, 2, 300]"); + } + + private void assertParseStringArrayWithNoSchema(List expected, String str) { + SchemaAndValue result = Values.parseString(str); + assertEquals(Type.ARRAY, result.schema().type()); + assertNull(result.schema().valueSchema()); + List list = (List) result.value(); + assertEquals(expected, list); } /** - * We parse into different element types, but cannot infer a common element schema. + * Maps with an inconsistent key type don't find a common type for the keys or the values + * This behavior should be independent of the order that the pairs appear in the string + */ + @Test + public void shouldParseStringMapWithMultipleKeyTypes() { + Map expected = new HashMap<>(); + expected.put((byte) 1, (byte) 1); + expected.put((byte) 2, (byte) 1); + expected.put((short) 300, (short) 300); + expected.put("four", (byte) 1); + assertParseStringMapWithNoSchema(expected, "{1:1, 2:1, 300:300, \"four\":1}"); + assertParseStringMapWithNoSchema(expected, "{2:1, 300:300, \"four\":1, 1:1}"); + assertParseStringMapWithNoSchema(expected, "{300:300, \"four\":1, 1:1, 2:1}"); + assertParseStringMapWithNoSchema(expected, "{\"four\":1, 1:1, 2:1, 300:300}"); + } + + /** + * Maps with a consistent key type may still not have a common type for the values + * This behavior should be independent of the order that the pairs appear in the string */ @Test - public void shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() { - String str = "[1, 2, 3, \"four\"]"; + public void shouldParseStringMapWithMultipleValueTypes() { + Map expected = new HashMap<>(); + expected.put((short) 1, (byte) 1); + expected.put((short) 2, (byte) 1); + expected.put((short) 300, (short) 300); + expected.put((short) 4, "four"); + assertParseStringMapWithNoSchema(expected, "{1:1, 2:1, 300:300, 4:\"four\"}"); + assertParseStringMapWithNoSchema(expected, "{2:1, 300:300, 4:\"four\", 1:1}"); + assertParseStringMapWithNoSchema(expected, "{300:300, 4:\"four\", 1:1, 2:1}"); + assertParseStringMapWithNoSchema(expected, "{4:\"four\", 1:1, 2:1, 300:300}"); + } + + private void assertParseStringMapWithNoSchema(Map expected, String str) { SchemaAndValue result = Values.parseString(str); - assertEquals(Type.ARRAY, result.schema().type()); + assertEquals(Type.MAP, result.schema().type()); assertNull(result.schema().valueSchema()); - List list = (List) result.value(); - assertEquals(4, list.size()); - assertEquals(1, ((Number) list.get(0)).intValue()); - assertEquals(2, ((Number) list.get(1)).intValue()); - assertEquals(3, ((Number) list.get(2)).intValue()); - assertEquals("four", list.get(3)); + Map list = (Map) result.value(); + assertEquals(expected, list); } /** @@ -744,6 +785,39 @@ public void shouldConvertTimestampValues() { assertEquals(current, ts4); } + @Test + public void shouldConvertDecimalValues() { + // Various forms of the same number should all be parsed to the same BigDecimal + Number number = 1.0f; + String string = number.toString(); + BigDecimal value = new BigDecimal(string); + byte[] bytes = Decimal.fromLogical(Decimal.schema(1), value); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + assertEquals(value, Values.convertToDecimal(null, number, 1)); + assertEquals(value, Values.convertToDecimal(null, string, 1)); + assertEquals(value, Values.convertToDecimal(null, value, 1)); + assertEquals(value, Values.convertToDecimal(null, bytes, 1)); + assertEquals(value, Values.convertToDecimal(null, buffer, 1)); + } + + /** + * Test parsing distinct number-like types (strings containing numbers, and logical Decimals) in the same list + * The parser does not convert Numbers to Decimals, or Strings containing numbers to Numbers automatically. + */ + @Test + public void shouldNotConvertArrayValuesToDecimal() { + List decimals = Arrays.asList("\"1.0\"", BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE), + BigDecimal.valueOf(Long.MIN_VALUE).subtract(BigDecimal.ONE), (byte) 1, (byte) 1); + List expected = new ArrayList<>(decimals); // most values are directly reproduced with the same type + expected.set(0, "1.0"); // The quotes are parsed away, but the value remains a string + SchemaAndValue schemaAndValue = Values.parseString(decimals.toString()); + Schema schema = schemaAndValue.schema(); + assertEquals(Type.ARRAY, schema.type()); + assertNull(schema.valueSchema()); + assertEquals(expected, schemaAndValue.value()); + } + @Test public void canConsume() { }