From ddb43d54dc119b4d3d9d797000a2ba0f59692109 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Thu, 8 Jun 2023 20:19:19 +0200 Subject: [PATCH 1/4] Add content hash and canonical hash upgrader --- .../impl/sql/AbstractSqlRegistryStorage.java | 2 +- .../storage/impl/sql/CommonSqlStatements.java | 8 ++ .../storage/impl/sql/SqlStatements.java | 6 +- .../ReferencesCanonicalHashUpgrader.java | 119 +++++++++++++++++ .../registry/storage/impl/sql/db-version | 2 +- .../impl/sql/upgrades/14/h2.upgrade.ddl | 8 ++ .../impl/sql/upgrades/14/mssql.upgrade.ddl | 8 ++ .../sql/upgrades/14/postgresql.upgrade.ddl | 8 ++ ...KafkaSqlProtobufCanonicalizerUpgrader.java | 122 ++++++++++++++++++ .../impl/kafkasql/KafkaSqlUpgrader.java | 117 ++--------------- 10 files changed, 293 insertions(+), 107 deletions(-) create mode 100644 app/src/main/java/io/apicurio/registry/storage/impl/sql/upgrader/ReferencesCanonicalHashUpgrader.java create mode 100644 app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/h2.upgrade.ddl create mode 100644 app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/mssql.upgrade.ddl create mode 100644 app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/postgresql.upgrade.ddl create mode 100644 storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlProtobufCanonicalizerUpgrader.java diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java index fc0cf77d07..3e19097d6b 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java @@ -740,7 +740,7 @@ protected Long createOrUpdateContent(Handle handle, ContentHandle content, Strin byte[] contentBytes = content.bytes(); // Upsert a row in the "content" table. This will insert a row for the content - // if a row doesn't already exist. We use the canonical hash to determine whether + // if a row doesn't already exist. We use the content hash to determine whether // a row for this content already exists. If we find a row we return its globalId. // If we don't find a row, we insert one and then return its globalId. String sql; diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java index cbe13e4052..ac5b4cfb1e 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java @@ -695,6 +695,14 @@ public String updateContentCanonicalHash() { return "UPDATE content SET canonicalHash = ? WHERE tenantId = ? AND contentId = ? AND contentHash = ?"; } + /** + * @see io.apicurio.registry.storage.impl.sql.SqlStatements#updateContentCanonicalHash() + */ + @Override + public String upgradeContent() { + return "UPDATE content SET canonicalHash = ?, contentHash = ? WHERE tenantId = ? AND contentId = ? AND contentHash = ?"; + } + /** * @see io.apicurio.registry.storage.impl.sql.SqlStatements#selectLogConfigurationByLogger() */ diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java index ed996926a3..9c92368e7a 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java @@ -196,10 +196,14 @@ public interface SqlStatements { /** * A statement to update canonicalHash value in a row in the "content" table - * The only statement that allows to modify an existing row in the "content" table */ public String updateContentCanonicalHash(); + /** + * A statement to update contentHash value in a row in the "content" table + */ + public String upgradeContent(); + /** * A statement to get a single artifact (latest version) content by artifactId. */ diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/upgrader/ReferencesCanonicalHashUpgrader.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/upgrader/ReferencesCanonicalHashUpgrader.java new file mode 100644 index 0000000000..416c231f45 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/upgrader/ReferencesCanonicalHashUpgrader.java @@ -0,0 +1,119 @@ +/* + * Copyright 2023 Red Hat + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.apicurio.registry.storage.impl.sql.upgrader; + +import io.apicurio.registry.content.ContentHandle; +import io.apicurio.registry.content.canon.ContentCanonicalizer; +import io.apicurio.registry.storage.impl.sql.IDbUpgrader; +import io.apicurio.registry.storage.impl.sql.jdb.Handle; +import io.apicurio.registry.storage.impl.sql.jdb.RowMapper; +import io.apicurio.registry.storage.impl.sql.mappers.ContentEntityMapper; +import io.apicurio.registry.types.provider.ArtifactTypeUtilProviderFactory; +import io.apicurio.registry.types.provider.DefaultArtifactTypeUtilProviderImpl; +import io.apicurio.registry.utils.impexp.ContentEntity; +import io.quarkus.runtime.annotations.RegisterForReflection; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.stream.Stream; + +@RegisterForReflection +public class ReferencesCanonicalHashUpgrader implements IDbUpgrader { + + private static final Logger logger = LoggerFactory.getLogger(ReferencesContentHashUpgrader.class); + + private static final ArtifactTypeUtilProviderFactory factory = new DefaultArtifactTypeUtilProviderImpl(); + + /** + * @see io.apicurio.registry.storage.impl.sql.IDbUpgrader#upgrade(io.apicurio.registry.storage.impl.sql.jdb.Handle) + */ + @Override + public void upgrade(Handle dbHandle) throws Exception { + + String sql = "SELECT c.contentId, c.content, c.canonicalHash, c.contentHash, c.artifactreferences, a.type " + + "FROM versions v " + + "JOIN content c on c.contentId = v.contentId " + + "JOIN artifacts a ON v.tenantId = a.tenantId AND v.groupId = a.groupId AND v.artifactId = a.artifactId "; + + Stream stream = dbHandle.createQuery(sql) + .setFetchSize(50) + .map(new TenantContentEntityRowMapper()) + .stream(); + try (stream) { + stream.forEach(entity -> updateHash(entity, dbHandle)); + } + + } + + private void updateHash(TypeContentEntity typeContentEntity, Handle dbHandle) { + try { + + String canonicalContentHash; + if (typeContentEntity.contentEntity.serializedReferences != null) { + byte[] referencesBytes = typeContentEntity.contentEntity.serializedReferences.getBytes(StandardCharsets.UTF_8); + canonicalContentHash = DigestUtils.sha256Hex(concatContentAndReferences(this.canonicalizeContent(typeContentEntity.contentEntity, typeContentEntity.type).bytes(), referencesBytes)); + String update = "UPDATE content SET canonicalHash = ? WHERE contentId = ? AND contentHash = ?"; + int rowCount = dbHandle.createUpdate(update) + .bind(0, canonicalContentHash) + .bind(1, typeContentEntity.contentEntity.contentId) + .bind(2, typeContentEntity.contentEntity.contentHash) + .execute(); + if (rowCount == 0) { + logger.warn("content row not matched for hash upgrade contentId {} contentHash {}", typeContentEntity.contentEntity.contentId, typeContentEntity.contentEntity.contentHash); + } + } + + } catch (Exception e) { + logger.warn("Error found processing content with id {} and hash {}", typeContentEntity.contentEntity.contentId, typeContentEntity.contentEntity.contentHash, e); + } + } + + private byte[] concatContentAndReferences(byte[] contentBytes, byte[] referencesBytes) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(contentBytes.length + referencesBytes.length); + outputStream.write(contentBytes); + outputStream.write(referencesBytes); + return outputStream.toByteArray(); + } + + private ContentHandle canonicalizeContent(ContentEntity contentEntity, String type) { + ContentHandle contentHandle = ContentHandle.create(contentEntity.contentBytes); + ContentCanonicalizer canonicalizer = factory.getArtifactTypeProvider(type).getContentCanonicalizer(); + return canonicalizer.canonicalize(contentHandle, Collections.emptyMap()); + } + + public static class TypeContentEntity { + String type; + ContentEntity contentEntity; + } + + public static class TenantContentEntityRowMapper implements RowMapper { + @Override + public TypeContentEntity map(ResultSet rs) throws SQLException { + TypeContentEntity e = new TypeContentEntity(); + e.type = rs.getString("type"); + e.contentEntity = ContentEntityMapper.instance.map(rs); + return e; + } + } +} diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version index ca7bf83ac5..da2d3988d7 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version @@ -1 +1 @@ -13 \ No newline at end of file +14 \ No newline at end of file diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/h2.upgrade.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/h2.upgrade.ddl new file mode 100644 index 0000000000..6a675a0eca --- /dev/null +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/h2.upgrade.ddl @@ -0,0 +1,8 @@ +-- ********************************************************************* +-- DDL for the Apicurio Registry - Database: H2 +-- Upgrades the DB schema from version 13 to version 14. +-- ********************************************************************* + +UPDATE apicurio SET prop_value = 14 WHERE prop_name = 'db_version'; + +UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.ReferencesCanonicalHashUpgrader; \ No newline at end of file diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/mssql.upgrade.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/mssql.upgrade.ddl new file mode 100644 index 0000000000..a9d5e8cd7d --- /dev/null +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/mssql.upgrade.ddl @@ -0,0 +1,8 @@ +-- ********************************************************************* +-- DDL for the Apicurio Registry - Database: mssql +-- Upgrades the DB schema from version 13 to version 14. +-- ********************************************************************* + +UPDATE apicurio SET prop_value = 14 WHERE prop_name = 'db_version'; + +UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.ReferencesCanonicalHashUpgrader; diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/postgresql.upgrade.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/postgresql.upgrade.ddl new file mode 100644 index 0000000000..2429276408 --- /dev/null +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/14/postgresql.upgrade.ddl @@ -0,0 +1,8 @@ +-- ********************************************************************* +-- DDL for the Apicurio Registry - Database: PostgreSQL +-- Upgrades the DB schema from version 13 to version 14. +-- ********************************************************************* + +UPDATE apicurio SET prop_value = 14 WHERE prop_name = 'db_version'; + +UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.ReferencesCanonicalHashUpgrader; \ No newline at end of file diff --git a/storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlProtobufCanonicalizerUpgrader.java b/storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlProtobufCanonicalizerUpgrader.java new file mode 100644 index 0000000000..b448ec2ddb --- /dev/null +++ b/storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlProtobufCanonicalizerUpgrader.java @@ -0,0 +1,122 @@ +/* + * Copyright 2023 Red Hat + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.apicurio.registry.storage.impl.kafkasql; + +import io.apicurio.registry.content.ContentHandle; +import io.apicurio.registry.content.canon.ContentCanonicalizer; +import io.apicurio.registry.content.canon.ProtobufContentCanonicalizer; +import io.apicurio.registry.storage.impl.kafkasql.values.ActionType; +import io.apicurio.registry.storage.impl.sql.IDbUpgrader; +import io.apicurio.registry.storage.impl.sql.jdb.Handle; +import io.apicurio.registry.storage.impl.sql.jdb.RowMapper; +import io.apicurio.registry.storage.impl.sql.mappers.ContentEntityMapper; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.utils.ConcurrentUtil; +import io.apicurio.registry.utils.impexp.ContentEntity; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +@ApplicationScoped +public class KafkaSqlProtobufCanonicalizerUpgrader implements IDbUpgrader { + + @Inject + Logger logger; + + @Inject + KafkaSqlSubmitter submitter; + + @Inject + KafkaSqlCoordinator coordinator; + + @Override + public void upgrade(Handle dbHandle) throws Exception { + + String sql = "SELECT c.contentId, c.content, c.canonicalHash, c.contentHash, c.artifactreferences, v.tenantId " + + "FROM versions v " + + "JOIN content c on c.contentId = v.contentId " + + "JOIN artifacts a ON v.tenantId = a.tenantId AND v.groupId = a.groupId AND v.artifactId = a.artifactId " + + "WHERE a.type = ?"; + + Stream stream = dbHandle.createQuery(sql) + .setFetchSize(50) + .bind(0, ArtifactType.PROTOBUF) + .map(new TenantContentEntityRowMapper()) + .stream(); + try (stream) { + stream.forEach(this::updateCanonicalHash); + } + + } + + protected void updateCanonicalHash(TenantContentEntity tenantContentEntity) { + + ContentEntity contentEntity = tenantContentEntity.contentEntity; + + ContentHandle content = ContentHandle.create(contentEntity.contentBytes); + ContentHandle canonicalContent = canonicalizeContent(content); + byte[] canonicalContentBytes = canonicalContent.bytes(); + String canonicalContentHash = DigestUtils.sha256Hex(canonicalContentBytes); + + if (canonicalContentHash.equals(tenantContentEntity.contentEntity.canonicalHash)) { + //canonical hash is correct, skipping + return; + } + + logger.debug("Protobuf content canonicalHash outdated value detected, updating contentId {}", contentEntity.contentId); + + CompletableFuture future = submitter + .submitContent(tenantContentEntity.tenantId, contentEntity.contentId, contentEntity.contentHash, ActionType.UPDATE, canonicalContentHash, null, contentEntity.serializedReferences != null ? contentEntity.serializedReferences : null); + UUID uuid = ConcurrentUtil.get(future); + coordinator.waitForResponse(uuid); + + } + + protected ContentHandle canonicalizeContent(ContentHandle content) { + try { + ContentCanonicalizer canonicalizer = new ProtobufContentCanonicalizer(); + return canonicalizer.canonicalize(content, Collections.emptyMap()); + } catch (Exception e) { + logger.debug("Failed to canonicalize content of type: {}", ArtifactType.PROTOBUF); + return content; + } + } + + + public static class TenantContentEntity { + String tenantId; + ContentEntity contentEntity; + } + + public static class TenantContentEntityRowMapper implements RowMapper { + @Override + public TenantContentEntity map(ResultSet rs) throws SQLException { + TenantContentEntity e = new TenantContentEntity(); + e.tenantId = rs.getString("tenantId"); + e.contentEntity = ContentEntityMapper.instance.map(rs); + return e; + } + } +} diff --git a/storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlUpgrader.java b/storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlUpgrader.java index 8a46ceeb6a..56425e18f4 100644 --- a/storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlUpgrader.java +++ b/storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlUpgrader.java @@ -16,133 +16,42 @@ package io.apicurio.registry.storage.impl.kafkasql; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; +import io.apicurio.registry.storage.impl.sql.HandleFactory; +import io.apicurio.registry.storage.impl.sql.upgrader.ReferencesCanonicalHashUpgrader; +import io.apicurio.registry.storage.impl.sql.upgrader.ReferencesContentHashUpgrader; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; -import org.apache.commons.codec.digest.DigestUtils; -import org.slf4j.Logger; - -import io.apicurio.registry.content.ContentHandle; -import io.apicurio.registry.content.canon.ContentCanonicalizer; -import io.apicurio.registry.content.canon.ProtobufContentCanonicalizer; -import io.apicurio.registry.storage.impl.kafkasql.values.ActionType; -import io.apicurio.registry.storage.impl.sql.HandleFactory; -import io.apicurio.registry.storage.impl.sql.IDbUpgrader; -import io.apicurio.registry.storage.impl.sql.jdb.Handle; -import io.apicurio.registry.storage.impl.sql.jdb.RowMapper; -import io.apicurio.registry.storage.impl.sql.mappers.ContentEntityMapper; -import io.apicurio.registry.types.ArtifactType; -import io.apicurio.registry.utils.ConcurrentUtil; -import io.apicurio.registry.utils.impexp.ContentEntity; - /** * @author Fabian Martinez */ @ApplicationScoped public class KafkaSqlUpgrader { - @Inject - Logger logger; @Inject - KafkaSqlSubmitter submitter; + HandleFactory handles; @Inject - KafkaSqlCoordinator coordinator; + KafkaSqlProtobufCanonicalizerUpgrader protobufCanonicalizerUpgrader; - @Inject - HandleFactory handles; + ReferencesContentHashUpgrader contentHashUpgrader; + ReferencesCanonicalHashUpgrader canonicalHashUpgrader; public void upgrade() { + contentHashUpgrader = new ReferencesContentHashUpgrader(); + canonicalHashUpgrader = new ReferencesCanonicalHashUpgrader(); + handles.withHandleNoException(handle -> { - new KafkaSqlProtobufCanonicalizerUpgrader().upgrade(handle); + protobufCanonicalizerUpgrader.upgrade(handle); + contentHashUpgrader.upgrade(handle); + canonicalHashUpgrader.upgrade(handle); return null; }); } - - private class KafkaSqlProtobufCanonicalizerUpgrader implements IDbUpgrader { - - @Override - public void upgrade(Handle dbHandle) throws Exception { - - String sql = "SELECT c.contentId, c.content, c.canonicalHash, c.contentHash, c.artifactreferences, v.tenantId " - + "FROM versions v " - + "JOIN content c on c.contentId = v.contentId " - + "JOIN artifacts a ON v.tenantId = a.tenantId AND v.groupId = a.groupId AND v.artifactId = a.artifactId " - + "WHERE a.type = ?"; - - Stream stream = dbHandle.createQuery(sql) - .setFetchSize(50) - .bind(0, ArtifactType.PROTOBUF) - .map(new TenantContentEntityRowMapper()) - .stream(); - try (stream) { - stream.forEach(entity -> { - updateCanonicalHash(entity); - }); - } - - } - - protected void updateCanonicalHash(TenantContentEntity tenantContentEntity) { - - ContentEntity contentEntity = tenantContentEntity.contentEntity; - - ContentHandle content = ContentHandle.create(contentEntity.contentBytes); - ContentHandle canonicalContent = canonicalizeContent(content); - byte[] canonicalContentBytes = canonicalContent.bytes(); - String canonicalContentHash = DigestUtils.sha256Hex(canonicalContentBytes); - - if (canonicalContentHash.equals(tenantContentEntity.contentEntity.canonicalHash)) { - //canonical hash is correct, skipping - return; - } - - logger.debug("Protobuf content canonicalHash outdated value detected, updating contentId {}", contentEntity.contentId); - - CompletableFuture future = submitter - .submitContent(tenantContentEntity.tenantId, contentEntity.contentId, contentEntity.contentHash, ActionType.UPDATE, canonicalContentHash, null, contentEntity.serializedReferences != null ? contentEntity.serializedReferences: null); - UUID uuid = ConcurrentUtil.get(future); - coordinator.waitForResponse(uuid); - - } - - protected ContentHandle canonicalizeContent(ContentHandle content) { - try { - ContentCanonicalizer canonicalizer = new ProtobufContentCanonicalizer(); - return canonicalizer.canonicalize(content, Collections.emptyMap()); - } catch (Exception e) { - logger.debug("Failed to canonicalize content of type: {}", ArtifactType.PROTOBUF); - return content; - } - } - - } - - public static class TenantContentEntity { - String tenantId; - ContentEntity contentEntity; - } - - public static class TenantContentEntityRowMapper implements RowMapper { - @Override - public TenantContentEntity map(ResultSet rs) throws SQLException { - TenantContentEntity e = new TenantContentEntity(); - e.tenantId = rs.getString("tenantId"); - e.contentEntity = ContentEntityMapper.instance.map(rs); - return e; - } - } - } From 2a3486f15e0cb17e4b263c1f835bb7b30520b420 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Fri, 9 Jun 2023 11:09:17 +0200 Subject: [PATCH 2/4] Update database version --- .../main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl | 2 +- .../resources/io/apicurio/registry/storage/impl/sql/mssql.ddl | 2 +- .../io/apicurio/registry/storage/impl/sql/postgresql.ddl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl index f7f23e1e2d..f3a4e7c488 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl @@ -4,7 +4,7 @@ CREATE TABLE apicurio (prop_name VARCHAR(255) NOT NULL, prop_value VARCHAR(255)); ALTER TABLE apicurio ADD PRIMARY KEY (prop_name); -INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 13); +INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 14); CREATE TABLE sequences (tenantId VARCHAR(128) NOT NULL, name VARCHAR(32) NOT NULL, seq_value BIGINT NOT NULL); ALTER TABLE sequences ADD PRIMARY KEY (tenantId, name); diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl index 2d7e1307d2..0f55408855 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl @@ -4,7 +4,7 @@ CREATE TABLE apicurio (prop_name VARCHAR(255) NOT NULL, prop_value VARCHAR(255)); ALTER TABLE apicurio ADD PRIMARY KEY (prop_name); -INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 13); +INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 14); CREATE TABLE sequences (tenantId VARCHAR(128) NOT NULL, name VARCHAR(32) NOT NULL, value BIGINT NOT NULL); ALTER TABLE sequences ADD PRIMARY KEY (tenantId, name); diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl index a84693a54d..9687f2c736 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl @@ -4,7 +4,7 @@ CREATE TABLE apicurio (prop_name VARCHAR(255) NOT NULL, prop_value VARCHAR(255)); ALTER TABLE apicurio ADD PRIMARY KEY (prop_name); -INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 13); +INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 14); CREATE TABLE sequences (tenantId VARCHAR(128) NOT NULL, name VARCHAR(32) NOT NULL, value BIGINT NOT NULL); ALTER TABLE sequences ADD PRIMARY KEY (tenantId, name); From 94788d64f2e9073ba2b634961150d0f01301bbc9 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Fri, 9 Jun 2023 12:13:43 +0200 Subject: [PATCH 3/4] Add integration test for content hash upgrade --- .../tests/utils/CustomTestsUtils.java | 13 ++ .../dbupgrade/KafkaSqlStorageUpgradeIT.java | 85 +++++++++++ .../tests/dbupgrade/SqlStorageUpgradeIT.java | 135 ++++++++++++++---- 3 files changed, 207 insertions(+), 26 deletions(-) diff --git a/integration-tests/testsuite/src/main/java/io/apicurio/tests/utils/CustomTestsUtils.java b/integration-tests/testsuite/src/main/java/io/apicurio/tests/utils/CustomTestsUtils.java index 7e358daa8f..3c9d778e5c 100644 --- a/integration-tests/testsuite/src/main/java/io/apicurio/tests/utils/CustomTestsUtils.java +++ b/integration-tests/testsuite/src/main/java/io/apicurio/tests/utils/CustomTestsUtils.java @@ -18,6 +18,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.types.ContentTypes; import org.apache.commons.codec.digest.DigestUtils; import io.apicurio.registry.rest.client.RegistryClient; @@ -25,6 +28,8 @@ import io.apicurio.registry.utils.IoUtil; import io.apicurio.registry.utils.tests.TestUtils; +import java.util.List; + /** * @author Fabian Martinez */ @@ -39,6 +44,14 @@ public static ArtifactData createArtifact(RegistryClient client, String type, St return new ArtifactData(meta, contentHash); } + public static ArtifactData createArtifactWithReferences(String artifactId, RegistryClient client, String type, String content, List references) throws Exception { + ArtifactMetaData meta = client.createArtifact(null, artifactId, null, type, IfExists.RETURN, false, null, null, ContentTypes.APPLICATION_CREATE_EXTENDED,null, null, IoUtil.toStream(content), references); + TestUtils.retry(() -> client.getContentByGlobalId(meta.getGlobalId())); + assertNotNull(client.getLatestArtifact(meta.getGroupId(), meta.getId())); + String contentHash = DigestUtils.sha256Hex(IoUtil.toBytes(content)); + return new ArtifactData(meta, contentHash); + } + public static class ArtifactData { public ArtifactMetaData meta; public String contentHash; diff --git a/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java b/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java index 43134830f0..ec51d9c846 100644 --- a/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java +++ b/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java @@ -17,12 +17,17 @@ package io.apicurio.tests.dbupgrade; import static io.apicurio.tests.utils.CustomTestsUtils.createArtifact; +import static io.apicurio.tests.utils.CustomTestsUtils.createArtifactWithReferences; import static org.junit.jupiter.api.Assertions.assertEquals; import java.nio.file.Path; +import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import org.apache.commons.codec.digest.DigestUtils; import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -54,6 +59,9 @@ @Tag(Constants.KAFKA_SQL) public class KafkaSqlStorageUpgradeIT implements TestSeparator, Constants { + private static final String ARTIFACT_CONTENT = "{\"name\":\"redhat\"}"; + private static final String REFERENCE_CONTENT = "{\"name\":\"ibm\"}"; + @Test public void testStorageUpgradeProtobufUpgraderKafkaSql() throws Exception { testStorageUpgradeProtobufUpgrader("protobufCanonicalHashKafkaSql", RegistryStorageType.kafkasql); @@ -153,4 +161,81 @@ public void testStorageUpgradeProtobufUpgrader(String testName, RegistryStorageT } + @Test + public void testStorageUpgradeReferencesContentHash() throws Exception { + testStorageUpgradeReferencesContentHashUpgrader("referencesContentHash", RegistryStorageType.sql); + } + + public void testStorageUpgradeReferencesContentHashUpgrader(String testName, RegistryStorageType storage) throws Exception { + + RegistryStorageType previousStorageValue = RegistryUtils.REGISTRY_STORAGE; + RegistryUtils.REGISTRY_STORAGE = storage; + + Path logsPath = RegistryUtils.getLogsPath(getClass(), testName); + RegistryFacade facade = RegistryFacade.getInstance(); + + try { + + Map appEnv = facade.initRegistryAppEnv(); + + //runs all required infra except for the registry + facade.deployStorage(appEnv, storage); + + appEnv.put("QUARKUS_HTTP_PORT", "8081"); + + String oldRegistryName = "registry-dbv4"; + String image = "quay.io/apicurio/apicurio-registry-kafkasql:2.4.1.Final"; + + var container = new GenericContainer<>(new RemoteDockerImage(DockerImageName.parse(image))); + container.setNetworkMode("host"); + facade.runContainer(appEnv, oldRegistryName, container); + facade.waitForRegistryReady(); + + // + var registryClient = RegistryClientFactory.create("http://localhost:8081/"); + + + final ArtifactData artifact = createArtifact(registryClient, ArtifactType.JSON, REFERENCE_CONTENT); + + //Create a second artifact referencing the first one, the hash will be the same using version 2.4.1.Final. + var artifactReference = new ArtifactReference(); + + artifactReference.setName("testReference"); + artifactReference.setArtifactId(artifact.meta.getId()); + artifactReference.setGroupId(artifact.meta.getGroupId()); + artifactReference.setVersion(artifact.meta.getVersion()); + + var artifactReferences = List.of(artifactReference); + + String artifactId = UUID.randomUUID().toString(); + + final ArtifactData artifactWithReferences = createArtifactWithReferences(artifactId, registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences); + + String calculatedHash = DigestUtils.sha256Hex(ARTIFACT_CONTENT); + + //Assertions + //The artifact hash is calculated without using references + assertEquals(calculatedHash, artifactWithReferences.contentHash); + + facade.stopProcess(logsPath, oldRegistryName); + + facade.runRegistry(appEnv, "registry-dblatest", "8081"); + facade.waitForRegistryReady(); + + //Finally, if we try to create the same artifact with the same references, no new version will be created and the same ids are used. + ArtifactData upgradedArtifact = createArtifactWithReferences(artifactId, registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences); + assertEquals(artifactWithReferences.meta.getGlobalId(), upgradedArtifact.meta.getGlobalId()); + assertEquals(artifactWithReferences.meta.getContentId(), upgradedArtifact.meta.getContentId()); + + + } finally { + try { + facade.stopAndCollectLogs(logsPath); + } finally { + RegistryUtils.REGISTRY_STORAGE = previousStorageValue; + } + } + + } + } diff --git a/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/SqlStorageUpgradeIT.java b/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/SqlStorageUpgradeIT.java index 82f6a06de5..9cf8ca2428 100644 --- a/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/SqlStorageUpgradeIT.java +++ b/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/SqlStorageUpgradeIT.java @@ -16,28 +16,11 @@ package io.apicurio.tests.dbupgrade; -import static io.apicurio.tests.utils.CustomTestsUtils.createArtifact; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import org.apache.commons.codec.digest.DigestUtils; -import org.junit.jupiter.api.DisplayNameGeneration; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.TestInstance.Lifecycle; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.images.RemoteDockerImage; -import org.testcontainers.utility.DockerImageName; - +import com.fasterxml.jackson.databind.ObjectMapper; import io.apicurio.registry.rest.client.RegistryClient; import io.apicurio.registry.rest.client.RegistryClientFactory; import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; import io.apicurio.registry.rest.v2.beans.Rule; import io.apicurio.registry.rest.v2.beans.VersionMetaData; import io.apicurio.registry.types.ArtifactType; @@ -53,6 +36,26 @@ import io.apicurio.tests.multitenancy.MultitenancySupport; import io.apicurio.tests.multitenancy.TenantUserClient; import io.apicurio.tests.utils.CustomTestsUtils.ArtifactData; +import org.apache.commons.codec.digest.DigestUtils; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.images.RemoteDockerImage; +import org.testcontainers.utility.DockerImageName; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import static io.apicurio.tests.utils.CustomTestsUtils.createArtifact; +import static io.apicurio.tests.utils.CustomTestsUtils.createArtifactWithReferences; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * Note this test does not extend any base class @@ -65,6 +68,11 @@ @Tag(Constants.SQL) public class SqlStorageUpgradeIT implements TestSeparator, Constants { + private static final String ARTIFACT_CONTENT = "{\"name\":\"redhat\"}"; + private static final String REFERENCE_CONTENT = "{\"name\":\"ibm\"}"; + + private static final ObjectMapper mapper = new ObjectMapper(); + @Test public void testStorageUpgrade() throws Exception { @@ -170,8 +178,8 @@ public void testStorageUpgradeProtobufUpgrader(String testName, RegistryStorageT assertEquals(3, searchResults.getCount()); var protobufs = searchResults.getArtifacts().stream() - .filter(ar -> ar.getType().equals(ArtifactType.PROTOBUF)) - .collect(Collectors.toList()); + .filter(ar -> ar.getType().equals(ArtifactType.PROTOBUF)) + .collect(Collectors.toList()); assertEquals(1, protobufs.size()); var protoMetadata = registryClient.getArtifactMetaData(protobufs.get(0).getGroupId(), protobufs.get(0).getId()); @@ -209,6 +217,85 @@ public void testStorageUpgradeProtobufUpgrader(String testName, RegistryStorageT } + @Test + public void testStorageUpgradeReferencesContentHash() throws Exception { + testStorageUpgradeReferencesContentHashUpgrader("referencesContentHash", RegistryStorageType.sql); + } + + public void testStorageUpgradeReferencesContentHashUpgrader(String testName, RegistryStorageType storage) throws Exception { + + RegistryStorageType previousStorageValue = RegistryUtils.REGISTRY_STORAGE; + RegistryUtils.REGISTRY_STORAGE = storage; + + Path logsPath = RegistryUtils.getLogsPath(getClass(), testName); + RegistryFacade facade = RegistryFacade.getInstance(); + + try { + + Map appEnv = facade.initRegistryAppEnv(); + + //runs all required infra except for the registry + facade.deployStorage(appEnv, storage); + + appEnv.put("QUARKUS_HTTP_PORT", "8081"); + + String oldRegistryName = "registry-dbv4"; + String image = "quay.io/apicurio/apicurio-registry-sql:2.4.1.Final"; + if (storage == RegistryStorageType.kafkasql) { + image = "quay.io/apicurio/apicurio-registry-kafkasql:2.4.1.Final"; + } + var container = new GenericContainer<>(new RemoteDockerImage(DockerImageName.parse(image))); + container.setNetworkMode("host"); + facade.runContainer(appEnv, oldRegistryName, container); + facade.waitForRegistryReady(); + + // + var registryClient = RegistryClientFactory.create("http://localhost:8081/"); + + + final ArtifactData artifact = createArtifact(registryClient, ArtifactType.JSON, REFERENCE_CONTENT); + + //Create a second artifact referencing the first one, the hash will be the same using version 2.4.1.Final. + var artifactReference = new ArtifactReference(); + + artifactReference.setName("testReference"); + artifactReference.setArtifactId(artifact.meta.getId()); + artifactReference.setGroupId(artifact.meta.getGroupId()); + artifactReference.setVersion(artifact.meta.getVersion()); + + var artifactReferences = List.of(artifactReference); + + String artifactId = UUID.randomUUID().toString(); + + final ArtifactData artifactWithReferences = createArtifactWithReferences(artifactId, registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences); + + String calculatedHash = DigestUtils.sha256Hex(ARTIFACT_CONTENT); + + //Assertions + //The artifact hash is calculated without using references + assertEquals(calculatedHash, artifactWithReferences.contentHash); + + facade.stopProcess(logsPath, oldRegistryName); + + facade.runRegistry(appEnv, "registry-dblatest", "8081"); + facade.waitForRegistryReady(); + + //Finally, if we try to create the same artifact with the same references, no new version will be created and the same ids are used. + ArtifactData upgradedArtifact = createArtifactWithReferences(artifactId, registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences); + assertEquals(artifactWithReferences.meta.getGlobalId(), upgradedArtifact.meta.getGlobalId()); + assertEquals(artifactWithReferences.meta.getContentId(), upgradedArtifact.meta.getContentId()); + + } finally { + try { + facade.stopAndCollectLogs(logsPath); + } finally { + RegistryUtils.REGISTRY_STORAGE = previousStorageValue; + } + } + + } + + private List loadData(MultitenancySupport mt) throws Exception { List tenants = new ArrayList<>(); @@ -236,9 +323,7 @@ private List loadData(MultitenancySupport mt) throws Exception { tenant.artifacts.add(createArtifact(client, ArtifactType.ASYNCAPI, ApicurioV2BaseIT.resourceToString("artifactTypes/" + "asyncapi/2.0-streetlights_v1.json"))); } - return tenants; - } private void createMoreArtifacts(List tenants) throws Exception { @@ -280,11 +365,10 @@ private void verifyData(List tenants) { assertEquals(meta.getContentId(), vmeta.getContentId()); } - } } - private class TenantData { + private static class TenantData { TenantUserClient tenant; List artifacts; @@ -292,5 +376,4 @@ public TenantData() { artifacts = new ArrayList<>(); } } - } From 0948c64db3d78e28cd28ca44bc6f114f721b0867 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Fri, 9 Jun 2023 12:34:56 +0200 Subject: [PATCH 4/4] Fix kafkasql upgrade test --- .../io/apicurio/registry/storage/impl/sql/SqlStatements.java | 5 ----- .../apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java index 9c92368e7a..b0e54796a8 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java @@ -199,11 +199,6 @@ public interface SqlStatements { */ public String updateContentCanonicalHash(); - /** - * A statement to update contentHash value in a row in the "content" table - */ - public String upgradeContent(); - /** * A statement to get a single artifact (latest version) content by artifactId. */ diff --git a/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java b/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java index ec51d9c846..61c972a445 100644 --- a/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java +++ b/integration-tests/testsuite/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java @@ -163,7 +163,7 @@ public void testStorageUpgradeProtobufUpgrader(String testName, RegistryStorageT @Test public void testStorageUpgradeReferencesContentHash() throws Exception { - testStorageUpgradeReferencesContentHashUpgrader("referencesContentHash", RegistryStorageType.sql); + testStorageUpgradeReferencesContentHashUpgrader("referencesContentHash", RegistryStorageType.kafkasql); } public void testStorageUpgradeReferencesContentHashUpgrader(String testName, RegistryStorageType storage) throws Exception {