From 97ac765b6c0eb39d006c79172378210183b651db Mon Sep 17 00:00:00 2001 From: Sascha Huk Date: Tue, 8 Apr 2025 11:05:50 +0200 Subject: [PATCH] NIFI-14448 nifi-cdc: Fix integer serialization to respect SQL UNSIGNED type semantics --- .../nifi/cdc/event/ColumnDefinition.java | 16 +- .../io/AbstractBinlogTableEventWriter.java | 40 ++-- .../cdc/mysql/event/io/DeleteRowsWriter.java | 10 +- .../cdc/mysql/event/io/InsertRowsWriter.java | 10 +- .../cdc/mysql/event/io/UpdateRowsWriter.java | 17 +- .../mysql/processors/CaptureChangeMySQL.java | 4 +- .../mysql/event/io/TestInsertRowsWriter.java | 69 +++++- .../processors/CaptureChangeMySQLTest.java | 210 +++++++++++++++++- 8 files changed, 318 insertions(+), 58 deletions(-) diff --git a/nifi-extension-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java b/nifi-extension-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java index c3f14ec1e31f..0904fbeb3971 100644 --- a/nifi-extension-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java +++ b/nifi-extension-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java @@ -24,14 +24,16 @@ public class ColumnDefinition { private int type; + private Boolean isSigned; private String name = ""; - public ColumnDefinition(int type) { + public ColumnDefinition(Boolean isSigned, int type) { this.type = type; + this.isSigned = isSigned; } - public ColumnDefinition(int type, String name) { - this(type); + public ColumnDefinition(Boolean isSigned, int type, String name) { + this(isSigned, type); this.name = name; } @@ -43,6 +45,14 @@ public void setType(int type) { this.type = type; } + public Boolean getIsSigned() { + return isSigned; + } + + public void setIsSigned(boolean isSigned) { + this.isSigned = isSigned; + } + public String getName() { return name; } diff --git a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java index 013b63edf963..9972fd29e60e 100644 --- a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java +++ b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java @@ -16,36 +16,44 @@ */ package org.apache.nifi.cdc.mysql.event.io; +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.nifi.cdc.event.ColumnDefinition; import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo; import java.io.IOException; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; /** * An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g. */ public abstract class AbstractBinlogTableEventWriter extends AbstractBinlogEventWriter { - protected Object getWritableObject(Integer type, Serializable value) { + private final static Map> UNSIGNED_SQLTYPE_MAP = new HashMap<>(Map.of( + java.sql.Types.BIGINT, (z) -> Long.toUnsignedString(z.longValue()), + java.sql.Types.INTEGER, (z) -> String.valueOf(Integer.toUnsignedLong(z.intValue())), + java.sql.Types.SMALLINT, (z) -> String.valueOf(Short.toUnsignedInt(z.shortValue())), + java.sql.Types.TINYINT, (z) -> String.valueOf(Byte.toUnsignedInt(z.byteValue())) + )); + + protected void writeObjectAsValueField(JsonGenerator jg, String fieldName, ColumnDefinition columnDefinition, + Serializable value) throws IOException { if (value == null) { - return null; - } - if (type == null) { - if (value instanceof byte[]) { - return new String((byte[]) value); - } else if (value instanceof Number) { - return value; + jg.writeNullField(fieldName); + } else if (value instanceof Number) { + if (columnDefinition != null && Boolean.FALSE.equals(columnDefinition.getIsSigned()) + && UNSIGNED_SQLTYPE_MAP.containsKey(columnDefinition.getType())) { + jg.writeFieldName(fieldName); + jg.writeRawValue(UNSIGNED_SQLTYPE_MAP.get(columnDefinition.getType()).apply((Number) value)); } else { - return null; + jg.writeObjectField(fieldName, value); } + } else if (value instanceof byte[]) { + jg.writeObjectField(fieldName, new String((byte[]) value)); } else { - if (value instanceof byte[]) { - return new String((byte[]) value); - } else if (value instanceof Number) { - return value; - } else { - return value.toString(); - } + jg.writeObjectField(fieldName, value.toString()); } } diff --git a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java index 8bf4b0220b50..38f8fc3720a4 100644 --- a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java +++ b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java @@ -79,17 +79,11 @@ protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet in jsonGenerator.writeStartObject(); jsonGenerator.writeNumberField("id", i + 1); ColumnDefinition columnDefinition = event.getColumnByIndex(i); - Integer columnType = null; if (columnDefinition != null) { jsonGenerator.writeStringField("name", columnDefinition.getName()); - columnType = columnDefinition.getType(); - jsonGenerator.writeNumberField("column_type", columnType); - } - if (row[i] == null) { - jsonGenerator.writeNullField("value"); - } else { - jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i])); + jsonGenerator.writeNumberField("column_type", columnDefinition.getType()); } + writeObjectAsValueField(jsonGenerator, "value", columnDefinition, row[i]); jsonGenerator.writeEndObject(); i = includedColumns.nextSetBit(i + 1); } diff --git a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java index b4fa31f78733..cf76241fb86b 100644 --- a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java +++ b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java @@ -80,17 +80,11 @@ protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet in jsonGenerator.writeStartObject(); jsonGenerator.writeNumberField("id", i + 1); ColumnDefinition columnDefinition = event.getColumnByIndex(i); - Integer columnType = null; if (columnDefinition != null) { jsonGenerator.writeStringField("name", columnDefinition.getName()); - columnType = columnDefinition.getType(); - jsonGenerator.writeNumberField("column_type", columnType); - } - if (row[i] == null) { - jsonGenerator.writeNullField("value"); - } else { - jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i])); + jsonGenerator.writeNumberField("column_type", columnDefinition.getType()); } + writeObjectAsValueField(jsonGenerator, "value", columnDefinition, row[i]); jsonGenerator.writeEndObject(); i = includedColumns.nextSetBit(i + 1); } diff --git a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java index c860ffdaabc9..e45bc13382a6 100644 --- a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java +++ b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java @@ -82,26 +82,15 @@ protected void writeRow(UpdateRowsEventInfo event, Map.Entry assertEquals(ProvenanceEventType.RECEIVE, event.getEventType())); } + @Test + public void testInsertUpdateDeleteSignedAndUnsigned(@Mock Connection connection) throws JsonProcessingException { + CaptureChangeMySQL processorUnsigned = new MockCaptureChangeMySQLUnsigned(connection); + TestRunner unsignedRunner = TestRunners.newTestRunner(processorUnsigned); + Serializable[] sampleRow = new Serializable[]{2, SMITH, (byte) -77, (byte) -77, (short) -77, (short) -77, (int) -77, (int) -77, (long) -77, (long) -77}; + + unsignedRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION); + unsignedRunner.setProperty(CaptureChangeMySQL.HOSTS, LOCAL_HOST_DEFAULT_PORT); + unsignedRunner.setProperty(CaptureChangeMySQL.USERNAME, ROOT_USER); + unsignedRunner.setProperty(CaptureChangeMySQL.PASSWORD, PASSWORD); + unsignedRunner.setProperty(CaptureChangeMySQL.SERVER_ID, ONE); + unsignedRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, CONNECT_TIMEOUT); + unsignedRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, INIT_BIN_LOG_FILENAME); + unsignedRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, FOUR); + unsignedRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, Boolean.TRUE.toString()); + unsignedRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, Boolean.TRUE.toString()); + + unsignedRunner.run(1, false, true); + + // ROTATE scenario + EventHeaderV4 eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.ROTATE, 2L); + RotateEventData rotateEventData = EventUtils.buildRotateEventData(INIT_BIN_LOG_FILENAME, 4L); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, rotateEventData)); + + + // INSERT scenario + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.QUERY, 4L); + QueryEventData queryEventData = EventUtils.buildQueryEventData(MY_DB, BEGIN_SQL_KEYWORD_UPPERCASE); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, queryEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.TABLE_MAP, 6L); + TableMapEventData tableMapEventData = EventUtils.buildTableMapEventData(1L, MY_DB, MY_TABLE, MockCaptureChangeMySQLUnsigned.COLUMN_TYPES); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, tableMapEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.EXT_WRITE_ROWS, 8L); + BitSet cols = new BitSet(); + cols.set(0, 10); + WriteRowsEventData writeRowsEventData = EventUtils.buildWriteRowsEventData(1L, cols, Collections.singletonList(sampleRow)); + + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, writeRowsEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.XID, 12L); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4)); + + + // UPDATE scenario + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.QUERY, 16L); + queryEventData = EventUtils.buildQueryEventData(MY_DB, BEGIN_SQL_KEYWORD_UPPERCASE); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, queryEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.TABLE_MAP, 18L); + tableMapEventData = EventUtils.buildTableMapEventData(1L, MY_DB, MY_TABLE, MockCaptureChangeMySQLUnsigned.COLUMN_TYPES); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, tableMapEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.UPDATE_ROWS, 20L); + BitSet colsBefore = new BitSet(); + cols.set(0, 10); + BitSet colsAfter = new BitSet(); + colsAfter.set(0, 10); + Map updateMap = Collections.singletonMap(sampleRow, sampleRow); + UpdateRowsEventData updateRowsEventData = EventUtils.buildUpdateRowsEventData(1L, colsBefore, colsAfter, + new ArrayList<>(updateMap.entrySet())); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, updateRowsEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.XID, 24L); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4)); + + // ROTATE scenario + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.ROTATE, 26L); + rotateEventData = EventUtils.buildRotateEventData(SUBSEQUENT_BIN_LOG_FILENAME, 4L); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, rotateEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.QUERY, 28L); + queryEventData = EventUtils.buildQueryEventData(MY_DB, BEGIN_SQL_KEYWORD_UPPERCASE); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, queryEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.TABLE_MAP, 30L); + tableMapEventData = EventUtils.buildTableMapEventData(1L, MY_DB, MY_TABLE, MockCaptureChangeMySQLUnsigned.COLUMN_TYPES); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, tableMapEventData)); + + + // DELETE scenario + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.DELETE_ROWS, 36L); + cols = new BitSet(); + cols.set(0, 10); + + DeleteRowsEventData deleteRowsEventData = EventUtils.buildDeleteRowsEventData(1L, cols, Collections.singletonList(sampleRow)); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4, deleteRowsEventData)); + + eventHeaderV4 = EventUtils.buildEventHeaderV4(EventType.XID, 40L); + client.sendEvent(EventUtils.buildEvent(eventHeaderV4)); + + unsignedRunner.run(1, true, false); + + List resultFiles = unsignedRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS); + List expectedEventTypes = new ArrayList<>(); + expectedEventTypes.add(BEGIN_SQL_KEYWORD_LOWERCASE); + expectedEventTypes.addAll(Collections.nCopies(1, INSERT_SQL_KEYWORD_LOWERCASE)); + expectedEventTypes.addAll(Arrays.asList(COMMIT_SQL_KEYWORD_LOWERCASE, BEGIN_SQL_KEYWORD_LOWERCASE, "update", COMMIT_SQL_KEYWORD_LOWERCASE, BEGIN_SQL_KEYWORD_LOWERCASE)); + expectedEventTypes.addAll(Collections.nCopies(1, "delete")); + expectedEventTypes.add(COMMIT_SQL_KEYWORD_LOWERCASE); + + IntStream.range(0, resultFiles.size()).forEach(index -> { + MockFlowFile resultFile = resultFiles.get(index); + assertEquals(EventWriter.APPLICATION_JSON, resultFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + assertEquals(index, Long.valueOf(resultFile.getAttribute(EventWriter.SEQUENCE_ID_KEY))); + assertEquals(index < 6 ? INIT_BIN_LOG_FILENAME : SUBSEQUENT_BIN_LOG_FILENAME, resultFile.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY)); + assertEquals(0L, Long.parseLong(resultFile.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4); + assertEquals(expectedEventTypes.get(index), resultFile.getAttribute(EventWriter.CDC_EVENT_TYPE_ATTRIBUTE)); + }); + + // check value -77 unsigned vs. signed + int comparedValues = 0; + for (MockFlowFile flowFile : resultFiles) { + String content = new String(flowFile.toByteArray(), java.nio.charset.StandardCharsets.UTF_8); + JsonNode root = new ObjectMapper().readTree(content); + JsonNode columns = root.get("columns"); + if (columns != null && columns.isArray()) { + for (int i = 2; i < columns.size(); i++) { // the first two columns are id and name (not -77) + JsonNode column = columns.get(i); + JsonNode valueNode = column.get("value"); + JsonNode nameNode = column.get("name"); + JsonNode typeNode = column.get("column_type"); + if (nameNode != null && nameNode.isTextual() && valueNode != null && valueNode.isNumber() + && typeNode != null && typeNode.isNumber() && nameNode.isTextual() ) { + comparedValues++; + if (typeNode.asInt() == -6) { //tinyint, byte + assertEquals( + nameNode.asText().toLowerCase().startsWith("unsigned") + ? String.valueOf(Byte.toUnsignedInt((byte) -77)) : "-77", + valueNode.asText()); + } else if (typeNode.asInt() == 5) { //smallint, short + assertEquals( + nameNode.asText().toLowerCase().startsWith("unsigned") + ? String.valueOf(Short.toUnsignedInt((short) -77)) : "-77", + valueNode.asText()); + } else if (typeNode.asInt() == 4) { //integer, int + assertEquals( + nameNode.asText().toLowerCase().startsWith("unsigned") + ? Integer.toUnsignedString(-77) : "-77", + valueNode.asText()); + } else if (typeNode.asInt() == -5) { //bigint, long + assertEquals( + nameNode.asText().toLowerCase().startsWith("unsigned") + ? Long.toUnsignedString( -77 ) : "-77", + valueNode.asText()); + } else { + comparedValues--; + } + } + } + } + } + assertEquals(24, comparedValues); + assertEquals(9, resultFiles.size()); + assertEquals(9, unsignedRunner.getProvenanceEvents().size()); + unsignedRunner.getProvenanceEvents().forEach(event -> assertEquals(ProvenanceEventType.RECEIVE, event.getEventType())); + } + @Test public void testExcludeSchemaChanges() { testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION); @@ -1317,11 +1477,59 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) { TableInfo tableInfo = cache.get(key); if (tableInfo == null) { tableInfo = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), - Collections.singletonList(new ColumnDefinition((byte) -4, "string1"))); + Collections.singletonList(new ColumnDefinition(null, (byte) -4, "string1"))); + cache.put(key, tableInfo); + } + return tableInfo; + } + + @Override + protected void registerDriver(String locationString, String drvName) { + } + + @Override + protected Connection getJdbcConnection() { + return mockConnection; + } + } + + @RequiresInstanceClassLoading + class MockCaptureChangeMySQLUnsigned extends CaptureChangeMySQL { + private final Connection mockConnection; + + public MockCaptureChangeMySQLUnsigned(Connection mockConnection) { + this.mockConnection = mockConnection; + } + + Map cache = new HashMap<>(); + + @Override + protected BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) { + return client; + } + + @Override + protected TableInfo loadTableInfo(TableInfoCacheKey key) { + TableInfo tableInfo = cache.get(key); + if (tableInfo == null) { + tableInfo = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), + List.of( + new ColumnDefinition(true, (byte) 4, "int1"), + new ColumnDefinition(true, (byte) -4, "string1"), + new ColumnDefinition(false, (byte) -6, "unsigned tinyint"), + new ColumnDefinition(true, (byte) -6, "signed tinyint"), + new ColumnDefinition(false, (byte) 5, "unsigned smallint"), + new ColumnDefinition(true, (byte) 5, "signed smallint"), + new ColumnDefinition(false, (byte) 4, "unsigned int"), + new ColumnDefinition(true, (byte) 4, "signed int"), + new ColumnDefinition(false, (byte) -5, "unsigned bigint"), + new ColumnDefinition(true, (byte) -5, "signed bigint") + )); cache.put(key, tableInfo); } return tableInfo; } + private static final byte[] COLUMN_TYPES = new byte[]{4, -4, -6, -6, 5, 5, 4, 4, -5, -5}; @Override protected void registerDriver(String locationString, String drvName) {