From 2a36c7e18c16280e54a3e09cd3b2039fde07cbb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Wed, 29 Jan 2025 16:21:01 +0100 Subject: [PATCH] JdbcIO - report schema as part of lineage (#33795) * fix missing schemas * spotless * fix tests --- .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 28 +++---- .../org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 76 ++++++++++++++----- .../apache/beam/sdk/io/jdbc/JdbcIOTest.java | 12 ++- .../apache/beam/sdk/io/jdbc/JdbcUtilTest.java | 25 +++--- 4 files changed, 97 insertions(+), 44 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 39769495beb6..a31745754d0d 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -1611,7 +1611,7 @@ private static class ReadFn extends DoFn reportedLineage; private ReadFn( SerializableFunction dataSourceProviderFn, @@ -1641,16 +1641,17 @@ private Connection getConnection() throws SQLException { this.connection = connection; // report Lineage if not haven't done so - String table = JdbcUtil.extractTableFromReadQuery(query.get()); - if (!table.equals(reportedLineage)) { + KV<@Nullable String, String> schemaWithTable = + JdbcUtil.extractTableFromReadQuery(query.get()); + if (schemaWithTable != null && !schemaWithTable.equals(reportedLineage)) { JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource); if (fqn == null) { fqn = JdbcUtil.FQNComponents.of(connection); } if (fqn != null) { - fqn.reportLineage(Lineage.getSources(), table); + fqn.reportLineage(Lineage.getSources(), schemaWithTable); } - reportedLineage = table; + reportedLineage = schemaWithTable; } } return connection; @@ -2665,7 +2666,7 @@ abstract Builder setMaxBatchBufferingDuration( private @Nullable DataSource dataSource; private @Nullable Connection connection; private @Nullable PreparedStatement preparedStatement; - private @Nullable String reportedLineage; + private @Nullable KV<@Nullable String, String> reportedLineage; private static @Nullable FluentBackoff retryBackOff; public WriteFn(WriteFnSpec spec) { @@ -2705,20 +2706,21 @@ private Connection getConnection() throws SQLException { connection.prepareStatement(checkStateNotNull(spec.getStatement()).get()); this.connection = connection; - // report Lineage if haven't done so - String table = spec.getTable(); - if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) { - table = JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get()); + KV<@Nullable String, String> tableWithSchema; + if (Strings.isNullOrEmpty(spec.getTable()) && spec.getStatement() != null) { + tableWithSchema = JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get()); + } else { + tableWithSchema = JdbcUtil.extractTableFromTable(spec.getTable()); } - if (!Objects.equals(table, reportedLineage)) { + if (!Objects.equals(tableWithSchema, reportedLineage)) { JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource); if (fqn == null) { fqn = JdbcUtil.FQNComponents.of(connection); } if (fqn != null) { - fqn.reportLineage(Lineage.getSinks(), table); + fqn.reportLineage(Lineage.getSinks(), tableWithSchema); } - reportedLineage = table; + reportedLineage = tableWithSchema; } } return connection; diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index 503b64e4a446..128f21a81097 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -678,14 +678,29 @@ abstract static class JdbcUrl { /** Jdbc fully qualified name components. */ @AutoValue abstract static class FQNComponents { + + static final String DEFAULT_SCHEMA = "default"; + abstract String getScheme(); abstract Iterable getSegments(); - void reportLineage(Lineage lineage, @Nullable String table) { + void reportLineage(Lineage lineage, @Nullable KV<@Nullable String, String> tableWithSchema) { ImmutableList.Builder builder = ImmutableList.builder().addAll(getSegments()); - if (table != null && !table.isEmpty()) { - builder.add(table); + if (tableWithSchema != null) { + if (tableWithSchema.getKey() != null && !tableWithSchema.getKey().isEmpty()) { + builder.add(tableWithSchema.getKey()); + } else { + // Every database engine has the default schema or search path if user hasn't provided + // one. The name + // is specific to db engine. For PostgreSQL it is public, for MSSQL it is dbo. + // Users can have custom default scheme for the benefit of the user but dataflow is unable + // to determine that. + builder.add(DEFAULT_SCHEMA); + } + if (!tableWithSchema.getValue().isEmpty()) { + builder.add(tableWithSchema.getValue()); + } } lineage.add(getScheme(), builder.build()); } @@ -792,41 +807,66 @@ void reportLineage(Lineage lineage, @Nullable String table) { } } + private static final Pattern TABLE_PATTERN = + Pattern.compile( + "(\\[?`?(?[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?[^\\s\\[\\]`]+)\\]?`?", + Pattern.CASE_INSENSITIVE); + private static final Pattern READ_STATEMENT_PATTERN = Pattern.compile( - "SELECT\\s+.+?\\s+FROM\\s+\\[?(?[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE); + "SELECT\\s+.+?\\s+FROM\\s+(\\[?`?(?[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?[^\\s\\[\\]`]+)\\]?`?", + Pattern.CASE_INSENSITIVE); private static final Pattern WRITE_STATEMENT_PATTERN = Pattern.compile( - "INSERT\\s+INTO\\s+\\[?(?[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE); + "INSERT\\s+INTO\\s+(\\[?`?(?[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?(?[^\\s\\[\\]]+)\\]?", + Pattern.CASE_INSENSITIVE); - /** Extract table name a SELECT statement. Return empty string if fail to extract. */ - static String extractTableFromReadQuery(@Nullable String query) { + /** Extract schema and table name a SELECT statement. Return null if fail to extract. */ + static @Nullable KV<@Nullable String, String> extractTableFromReadQuery(@Nullable String query) { if (query == null) { - return ""; + return null; } Matcher matchRead = READ_STATEMENT_PATTERN.matcher(query); if (matchRead.find()) { - String matched = matchRead.group("tableName"); - if (matched != null) { - return matched; + String matchedTable = matchRead.group("tableName"); + String matchedSchema = matchRead.group("schemaName"); + System.out.println(matchedSchema); + if (matchedTable != null) { + return KV.of(matchedSchema, matchedTable); + } + } + return null; + } + + static @Nullable KV<@Nullable String, String> extractTableFromTable(@Nullable String table) { + if (table == null) { + return null; + } + Matcher matchRead = TABLE_PATTERN.matcher(table); + if (matchRead.find()) { + String matchedTable = matchRead.group("tableName"); + String matchedSchema = matchRead.group("schemaName"); + if (matchedTable != null) { + return KV.of(matchedSchema, matchedTable); } } - return ""; + return null; } /** Extract table name from an INSERT statement. Return empty string if fail to extract. */ - static String extractTableFromWriteQuery(@Nullable String query) { + static @Nullable KV<@Nullable String, String> extractTableFromWriteQuery(@Nullable String query) { if (query == null) { - return ""; + return null; } Matcher matchRead = WRITE_STATEMENT_PATTERN.matcher(query); if (matchRead.find()) { - String matched = matchRead.group("tableName"); - if (matched != null) { - return matched; + String matchedTable = matchRead.group("tableName"); + String matchedSchema = matchRead.group("schemaName"); + if (matchedTable != null) { + return KV.of(matchedSchema, matchedTable); } } - return ""; + return null; } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 8725ef4b3f78..39e1a45d7a98 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -248,7 +248,9 @@ public void testRead() { PipelineResult result = pipeline.run(); assertThat( Lineage.query(result.metrics(), Lineage.Type.SOURCE), - hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", READ_TABLE_NAME)))); + hasItem( + Lineage.getFqName( + "derby", ImmutableList.of("memory", "testDB", "default", READ_TABLE_NAME)))); } @Test @@ -271,7 +273,9 @@ public void testReadWithSingleStringParameter() { PipelineResult result = pipeline.run(); assertThat( Lineage.query(result.metrics(), Lineage.Type.SOURCE), - hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", READ_TABLE_NAME)))); + hasItem( + Lineage.getFqName( + "derby", ImmutableList.of("memory", "testDB", "default", READ_TABLE_NAME)))); } @Test @@ -543,7 +547,9 @@ public void testWrite() throws Exception { assertRowCount(DATA_SOURCE, tableName, EXPECTED_ROW_COUNT); assertThat( Lineage.query(result.metrics(), Lineage.Type.SINK), - hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", tableName)))); + hasItem( + Lineage.getFqName( + "derby", ImmutableList.of("memory", "testDB", "default", tableName)))); } finally { DatabaseTestHelper.deleteTable(DATA_SOURCE, tableName); } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java index 356d6c7f8de7..118eaa4df7ef 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java @@ -348,20 +348,25 @@ public void testFqnFromHikariDataSourceMySql() { @Test public void testExtractTableFromQuery() { - ImmutableList> readCases = + ImmutableList>> readCases = ImmutableList.of( - KV.of("select * from table_1", "table_1"), - KV.of("SELECT a, b FROM [table-2]", "table-2"), - KV.of("drop table not-select", "")); - for (KV testCase : readCases) { + KV.of("select * from table_1", KV.of(null, "table_1")), + KV.of("select * from public.table_1", KV.of("public", "table_1")), + KV.of("select * from `select`", KV.of(null, "select")), + KV.of("select * from `public`.`select`", KV.of("public", "select")), + KV.of("SELECT a, b FROM [table-2]", KV.of(null, "table-2")), + KV.of("SELECT a, b FROM [public].[table-2]", KV.of("public", "table-2")), + KV.of("drop table not-select", null)); + for (KV> testCase : readCases) { assertEquals(testCase.getValue(), JdbcUtil.extractTableFromReadQuery(testCase.getKey())); } - ImmutableList> writeCases = + ImmutableList>> writeCases = ImmutableList.of( - KV.of("insert into table_1 values ...", "table_1"), - KV.of("INSERT INTO [table-2] values ...", "table-2"), - KV.of("drop table not-select", "")); - for (KV testCase : writeCases) { + KV.of("insert into table_1 values ...", KV.of(null, "table_1")), + KV.of("INSERT INTO [table-2] values ...", KV.of(null, "table-2")), + KV.of("INSERT INTO [foo].[table-2] values ...", KV.of("foo", "table-2")), + KV.of("drop table not-select", null)); + for (KV> testCase : writeCases) { assertEquals(testCase.getValue(), JdbcUtil.extractTableFromWriteQuery(testCase.getKey())); } }