Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add content hash and canonical hash upgrader #3423

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ protected Long createOrUpdateContent(Handle handle, ContentHandle content, Strin
byte[] contentBytes = content.bytes();

// Upsert a row in the "content" table. This will insert a row for the content
// if a row doesn't already exist. We use the canonical hash to determine whether
// if a row doesn't already exist. We use the content hash to determine whether
// a row for this content already exists. If we find a row we return its globalId.
// If we don't find a row, we insert one and then return its globalId.
String sql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,14 @@ public String updateContentCanonicalHash() {
return "UPDATE content SET canonicalHash = ? WHERE tenantId = ? AND contentId = ? AND contentHash = ?";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#updateContentCanonicalHash()
*/
@Override
public String upgradeContent() {
return "UPDATE content SET canonicalHash = ?, contentHash = ? WHERE tenantId = ? AND contentId = ? AND contentHash = ?";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#selectLogConfigurationByLogger()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ public interface SqlStatements {

/**
* A statement to update canonicalHash value in a row in the "content" table
* The only statement that allows to modify an existing row in the "content" table
*/
public String updateContentCanonicalHash();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2023 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.apicurio.registry.storage.impl.sql.upgrader;

import io.apicurio.registry.content.ContentHandle;
import io.apicurio.registry.content.canon.ContentCanonicalizer;
import io.apicurio.registry.storage.impl.sql.IDbUpgrader;
import io.apicurio.registry.storage.impl.sql.jdb.Handle;
import io.apicurio.registry.storage.impl.sql.jdb.RowMapper;
import io.apicurio.registry.storage.impl.sql.mappers.ContentEntityMapper;
import io.apicurio.registry.types.provider.ArtifactTypeUtilProviderFactory;
import io.apicurio.registry.types.provider.DefaultArtifactTypeUtilProviderImpl;
import io.apicurio.registry.utils.impexp.ContentEntity;
import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.stream.Stream;

@RegisterForReflection
public class ReferencesCanonicalHashUpgrader implements IDbUpgrader {

private static final Logger logger = LoggerFactory.getLogger(ReferencesContentHashUpgrader.class);

private static final ArtifactTypeUtilProviderFactory factory = new DefaultArtifactTypeUtilProviderImpl();

/**
* @see io.apicurio.registry.storage.impl.sql.IDbUpgrader#upgrade(io.apicurio.registry.storage.impl.sql.jdb.Handle)
*/
@Override
public void upgrade(Handle dbHandle) throws Exception {

String sql = "SELECT c.contentId, c.content, c.canonicalHash, c.contentHash, c.artifactreferences, a.type "
+ "FROM versions v "
+ "JOIN content c on c.contentId = v.contentId "
+ "JOIN artifacts a ON v.tenantId = a.tenantId AND v.groupId = a.groupId AND v.artifactId = a.artifactId ";

Stream<TypeContentEntity> stream = dbHandle.createQuery(sql)
.setFetchSize(50)
.map(new TenantContentEntityRowMapper())
.stream();
try (stream) {
stream.forEach(entity -> updateHash(entity, dbHandle));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider moving the select statement (and others here) into the CommonSqlStatements for consistency and futureproofing, i.e. in case some new supported database type needs a customized query. (Same in the other upgrader)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit harder than that, actually. The SqlStatements are produced depending on the SQL variant and injected as a bean into the storage. In the upgrader we cannot do that, since it's not a bean. The valid approach for an upgrade requiring different queries would be to create a separate upgrader and point the upgrade ddl to it.

}

}

private void updateHash(TypeContentEntity typeContentEntity, Handle dbHandle) {
try {

String canonicalContentHash;
if (typeContentEntity.contentEntity.serializedReferences != null) {
byte[] referencesBytes = typeContentEntity.contentEntity.serializedReferences.getBytes(StandardCharsets.UTF_8);
canonicalContentHash = DigestUtils.sha256Hex(concatContentAndReferences(this.canonicalizeContent(typeContentEntity.contentEntity, typeContentEntity.type).bytes(), referencesBytes));
String update = "UPDATE content SET canonicalHash = ? WHERE contentId = ? AND contentHash = ?";
int rowCount = dbHandle.createUpdate(update)
.bind(0, canonicalContentHash)
.bind(1, typeContentEntity.contentEntity.contentId)
.bind(2, typeContentEntity.contentEntity.contentHash)
.execute();
if (rowCount == 0) {
logger.warn("content row not matched for hash upgrade contentId {} contentHash {}", typeContentEntity.contentEntity.contentId, typeContentEntity.contentEntity.contentHash);
}
}

} catch (Exception e) {
logger.warn("Error found processing content with id {} and hash {}", typeContentEntity.contentEntity.contentId, typeContentEntity.contentEntity.contentHash, e);
}
}

private byte[] concatContentAndReferences(byte[] contentBytes, byte[] referencesBytes) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(contentBytes.length + referencesBytes.length);
outputStream.write(contentBytes);
outputStream.write(referencesBytes);
return outputStream.toByteArray();
}

private ContentHandle canonicalizeContent(ContentEntity contentEntity, String type) {
ContentHandle contentHandle = ContentHandle.create(contentEntity.contentBytes);
ContentCanonicalizer canonicalizer = factory.getArtifactTypeProvider(type).getContentCanonicalizer();
return canonicalizer.canonicalize(contentHandle, Collections.emptyMap());
}

public static class TypeContentEntity {
String type;
ContentEntity contentEntity;
}

public static class TenantContentEntityRowMapper implements RowMapper<TypeContentEntity> {
@Override
public TypeContentEntity map(ResultSet rs) throws SQLException {
TypeContentEntity e = new TypeContentEntity();
e.type = rs.getString("type");
e.contentEntity = ContentEntityMapper.instance.map(rs);
return e;
}
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
13
14
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

CREATE TABLE apicurio (prop_name VARCHAR(255) NOT NULL, prop_value VARCHAR(255));
ALTER TABLE apicurio ADD PRIMARY KEY (prop_name);
INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 13);
INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 14);

CREATE TABLE sequences (tenantId VARCHAR(128) NOT NULL, name VARCHAR(32) NOT NULL, seq_value BIGINT NOT NULL);
ALTER TABLE sequences ADD PRIMARY KEY (tenantId, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

CREATE TABLE apicurio (prop_name VARCHAR(255) NOT NULL, prop_value VARCHAR(255));
ALTER TABLE apicurio ADD PRIMARY KEY (prop_name);
INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 13);
INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 14);

CREATE TABLE sequences (tenantId VARCHAR(128) NOT NULL, name VARCHAR(32) NOT NULL, value BIGINT NOT NULL);
ALTER TABLE sequences ADD PRIMARY KEY (tenantId, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

CREATE TABLE apicurio (prop_name VARCHAR(255) NOT NULL, prop_value VARCHAR(255));
ALTER TABLE apicurio ADD PRIMARY KEY (prop_name);
INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 13);
INSERT INTO apicurio (prop_name, prop_value) VALUES ('db_version', 14);

CREATE TABLE sequences (tenantId VARCHAR(128) NOT NULL, name VARCHAR(32) NOT NULL, value BIGINT NOT NULL);
ALTER TABLE sequences ADD PRIMARY KEY (tenantId, name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- *********************************************************************
-- DDL for the Apicurio Registry - Database: H2
-- Upgrades the DB schema from version 13 to version 14.
-- *********************************************************************

UPDATE apicurio SET prop_value = 14 WHERE prop_name = 'db_version';

UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.ReferencesCanonicalHashUpgrader;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- *********************************************************************
-- DDL for the Apicurio Registry - Database: mssql
-- Upgrades the DB schema from version 13 to version 14.
-- *********************************************************************

UPDATE apicurio SET prop_value = 14 WHERE prop_name = 'db_version';

UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.ReferencesCanonicalHashUpgrader;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- *********************************************************************
-- DDL for the Apicurio Registry - Database: PostgreSQL
-- Upgrades the DB schema from version 13 to version 14.
-- *********************************************************************

UPDATE apicurio SET prop_value = 14 WHERE prop_name = 'db_version';

UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.ReferencesCanonicalHashUpgrader;
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.apicurio.registry.rest.v2.beans.ArtifactReference;
import io.apicurio.registry.rest.v2.beans.IfExists;
import io.apicurio.registry.types.ContentTypes;
import org.apache.commons.codec.digest.DigestUtils;

import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.v2.beans.ArtifactMetaData;
import io.apicurio.registry.utils.IoUtil;
import io.apicurio.registry.utils.tests.TestUtils;

import java.util.List;

/**
* @author Fabian Martinez
*/
Expand All @@ -39,6 +44,14 @@ public static ArtifactData createArtifact(RegistryClient client, String type, St
return new ArtifactData(meta, contentHash);
}

public static ArtifactData createArtifactWithReferences(String artifactId, RegistryClient client, String type, String content, List<ArtifactReference> references) throws Exception {
ArtifactMetaData meta = client.createArtifact(null, artifactId, null, type, IfExists.RETURN, false, null, null, ContentTypes.APPLICATION_CREATE_EXTENDED,null, null, IoUtil.toStream(content), references);
TestUtils.retry(() -> client.getContentByGlobalId(meta.getGlobalId()));
assertNotNull(client.getLatestArtifact(meta.getGroupId(), meta.getId()));
String contentHash = DigestUtils.sha256Hex(IoUtil.toBytes(content));
return new ArtifactData(meta, contentHash);
}

public static class ArtifactData {
public ArtifactMetaData meta;
public String contentHash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
package io.apicurio.tests.dbupgrade;

import static io.apicurio.tests.utils.CustomTestsUtils.createArtifact;
import static io.apicurio.tests.utils.CustomTestsUtils.createArtifactWithReferences;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import io.apicurio.registry.rest.v2.beans.ArtifactReference;
import org.apache.commons.codec.digest.DigestUtils;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -54,6 +59,9 @@
@Tag(Constants.KAFKA_SQL)
public class KafkaSqlStorageUpgradeIT implements TestSeparator, Constants {

private static final String ARTIFACT_CONTENT = "{\"name\":\"redhat\"}";
private static final String REFERENCE_CONTENT = "{\"name\":\"ibm\"}";

@Test
public void testStorageUpgradeProtobufUpgraderKafkaSql() throws Exception {
testStorageUpgradeProtobufUpgrader("protobufCanonicalHashKafkaSql", RegistryStorageType.kafkasql);
Expand Down Expand Up @@ -153,4 +161,81 @@ public void testStorageUpgradeProtobufUpgrader(String testName, RegistryStorageT

}

@Test
public void testStorageUpgradeReferencesContentHash() throws Exception {
testStorageUpgradeReferencesContentHashUpgrader("referencesContentHash", RegistryStorageType.kafkasql);
}

public void testStorageUpgradeReferencesContentHashUpgrader(String testName, RegistryStorageType storage) throws Exception {

RegistryStorageType previousStorageValue = RegistryUtils.REGISTRY_STORAGE;
RegistryUtils.REGISTRY_STORAGE = storage;

Path logsPath = RegistryUtils.getLogsPath(getClass(), testName);
RegistryFacade facade = RegistryFacade.getInstance();

try {

Map<String, String> appEnv = facade.initRegistryAppEnv();

//runs all required infra except for the registry
facade.deployStorage(appEnv, storage);

appEnv.put("QUARKUS_HTTP_PORT", "8081");

String oldRegistryName = "registry-dbv4";
String image = "quay.io/apicurio/apicurio-registry-kafkasql:2.4.1.Final";

var container = new GenericContainer<>(new RemoteDockerImage(DockerImageName.parse(image)));
container.setNetworkMode("host");
facade.runContainer(appEnv, oldRegistryName, container);
facade.waitForRegistryReady();

//
var registryClient = RegistryClientFactory.create("http://localhost:8081/");


final ArtifactData artifact = createArtifact(registryClient, ArtifactType.JSON, REFERENCE_CONTENT);

//Create a second artifact referencing the first one, the hash will be the same using version 2.4.1.Final.
var artifactReference = new ArtifactReference();

artifactReference.setName("testReference");
artifactReference.setArtifactId(artifact.meta.getId());
artifactReference.setGroupId(artifact.meta.getGroupId());
artifactReference.setVersion(artifact.meta.getVersion());

var artifactReferences = List.of(artifactReference);

String artifactId = UUID.randomUUID().toString();

final ArtifactData artifactWithReferences = createArtifactWithReferences(artifactId, registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences);

String calculatedHash = DigestUtils.sha256Hex(ARTIFACT_CONTENT);

//Assertions
//The artifact hash is calculated without using references
assertEquals(calculatedHash, artifactWithReferences.contentHash);

facade.stopProcess(logsPath, oldRegistryName);

facade.runRegistry(appEnv, "registry-dblatest", "8081");
facade.waitForRegistryReady();

//Finally, if we try to create the same artifact with the same references, no new version will be created and the same ids are used.
ArtifactData upgradedArtifact = createArtifactWithReferences(artifactId, registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences);
assertEquals(artifactWithReferences.meta.getGlobalId(), upgradedArtifact.meta.getGlobalId());
assertEquals(artifactWithReferences.meta.getContentId(), upgradedArtifact.meta.getContentId());


} finally {
try {
facade.stopAndCollectLogs(logsPath);
} finally {
RegistryUtils.REGISTRY_STORAGE = previousStorageValue;
}
}

}

}
Loading