diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java b/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java index aafe21f73d1..a4aef07bec4 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java @@ -102,7 +102,7 @@ protected static boolean canLookupIgnoringVersion( ParsedSchema newPrev = prev.copy( Metadata.removeConfluentVersion(prev.metadata()), prev.ruleSet()); // This handles the case where current schema is without confluent:version - return newSchema.deepEquals(newPrev); + return newSchema.equivalent(newPrev); } else if (schemaVer != null && prevVer == null) { if (!schemaVer.equals(prev.version())) { // The incoming confluent:version must match the actual version of the prev schema @@ -114,9 +114,9 @@ protected static boolean canLookupIgnoringVersion( ParsedSchema newSchema = current.copy( Metadata.removeConfluentVersion(current.metadata()), current.ruleSet()); // This handles the case where prev schema is without confluent:version - return newSchema.deepEquals(newPrev); + return newSchema.equivalent(newPrev); } else { - return current.deepEquals(prev); + return current.equivalent(prev); } } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java b/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java index 88aaae202f7..53ad0ebfe1c 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java @@ -251,12 +251,30 @@ default boolean hasTopLevelField(String field) { } /** - * Returns whether the underlying raw representations are equal, ignoring references. + * Returns whether the underlying raw representations are equivalent, + * ignoring version and references. * - * @return whether the underlying raw representations are equal + * @return whether the underlying raw representations are equivalent + * @deprecated use {@link #equivalent(ParsedSchema)} instead */ default boolean deepEquals(ParsedSchema schema) { - return Objects.equals(rawSchema(), schema.rawSchema()) + return equivalent(schema); + } + + /** + * Returns whether the underlying raw representations are equivalent, + * ignoring version and references. + * + * @return whether the underlying raw representations are equivalent + */ + default boolean equivalent(ParsedSchema schema) { + if (this == schema) { + return true; + } + if (schema == null || getClass() != schema.getClass()) { + return false; + } + return Objects.equals(canonicalString(), schema.canonicalString()) && Objects.equals(metadata(), schema.metadata()) && Objects.equals(ruleSet(), schema.ruleSet()); } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java index 338ccd381cd..001320bc166 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java @@ -351,6 +351,27 @@ public List isBackwardCompatible(ParsedSchema previousSchema) { } } + /** + * Returns whether the underlying raw representations are equivalent, + * ignoring version and references. + * + * @return whether the underlying raw representations are equivalent + */ + @Override + public boolean equivalent(ParsedSchema schema) { + if (this == schema) { + return true; + } + if (schema == null || getClass() != schema.getClass()) { + return false; + } + AvroSchema that = (AvroSchema) schema; + return Objects.equals(schemaObj, that.schemaObj) + && Objects.equals(metadata, that.metadata) + && Objects.equals(ruleSet, that.ruleSet) + && metaEqual(schemaObj, that.schemaObj, new HashMap<>()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java index 6c6a3bce525..4f6ba04128c 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java @@ -67,6 +67,7 @@ import java.util.Properties; import org.apache.avro.Schema.Parser; import org.apache.avro.SchemaParseException; +import org.junit.Assert; import org.junit.Test; public class RestApiTest extends ClusterTestHarness { @@ -2180,11 +2181,250 @@ public void testRegisterSchemaWithReservedFields() throws RestClientException, I () -> restApp.restClient.registerSchema(request2, subject0, false)); } + @Test public void testInvalidSchema() { assertThrows(InvalidSchemaException.class, () -> ((KafkaSchemaRegistry) restApp.schemaRegistry()).parseSchema(null)); } + @Test + public void testConfluentVersion() throws Exception { + String subject = "test"; + String schemaString = "{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":" + + "[{\"name\":\"f1\",\"type\":\"string\"}]}"; + + RegisterSchemaRequest request = new RegisterSchemaRequest(); + request.setSchemaType(AvroSchema.TYPE); + request.setSchema(schemaString); + // Register with null version + registerAndVerifySchema(restApp.restClient, request, 1, subject); + + Schema result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 1, result.getVersion()); + assertNull(result.getMetadata()); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Lookup schema with null version + request.setVersion(null); + request.setMetadata(null); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 1, result.getVersion()); + assertNull(result.getMetadata()); + + // Lookup schema with confluent:version 1 (should return one without metadata) + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "1"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 1, result.getVersion()); + assertNull(result.getMetadata()); + + // Lookup schema with confluent:version 2 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "2"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Delete version 1 + restApp.restClient.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, subject, "1"); + + // Lookup schema with null version + request.setVersion(null); + request.setMetadata(null); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with null version + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with version 3 + request.setVersion(3); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 3, result.getVersion()); + assertEquals("3", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with version 3 + request.setVersion(3); + request.setMetadata(null); + try { + registerAndVerifySchema(restApp.restClient, request, 3, subject); + fail("Registering version that is not next version should fail with " + Errors.INVALID_SCHEMA_ERROR_CODE); + } catch (RestClientException rce) { + assertEquals("Invalid schema", + Errors.INVALID_SCHEMA_ERROR_CODE, + rce.getErrorCode()); + } + + // Register schema with version 4 + request.setVersion(4); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 4, subject); + + // Lookup schema with null version + request.setVersion(null); + request.setMetadata(null); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 4, result.getVersion()); + assertEquals("4", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with confluent:version -1 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "-1"), null)); + registerAndVerifySchema(restApp.restClient, request, 5, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 5, result.getVersion()); + assertEquals("5", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with confluent:version 2 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "2"), null)); + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + // Register schema with confluent:version 3 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "3"), null)); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + // Register schema with confluent:version 0 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "0"), null)); + registerAndVerifySchema(restApp.restClient, request, 6, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 6, result.getVersion()); + assertEquals("6", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with empty metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.emptyMap(), null)); + registerAndVerifySchema(restApp.restClient, request, 6, subject); + + // Register schema with new metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null)); + registerAndVerifySchema(restApp.restClient, request, 7, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 7, result.getVersion()); + assertNull(result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with confluent:version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 8, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 8, result.getVersion()); + assertEquals("8", result.getMetadata().getProperties().get("confluent:version")); + + // Lookup schema with new metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 7, result.getVersion()); + assertNull(result.getMetadata().getProperties().get("confluent:version")); + + // Delete version 7 + restApp.restClient.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, subject, "7"); + + // Lookup schema with new metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 8, result.getVersion()); + assertEquals("8", result.getMetadata().getProperties().get("confluent:version")); + + // Use same schema with doc + schemaString = "{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"doc\":\"mydoc\"," + + "\"fields\":" + + "[{\"name\":\"f1\",\"type\":\"string\"}]}"; + request = new RegisterSchemaRequest(); + request.setSchemaType(AvroSchema.TYPE); + request.setSchema(schemaString); + + // Look up schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + try { + restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + fail("Looking up version that is not next version should fail with " + Errors.SCHEMA_NOT_FOUND_ERROR_CODE); + } catch (RestClientException rce) { + assertEquals("Schema not found", + Errors.SCHEMA_NOT_FOUND_ERROR_CODE, + rce.getErrorCode()); + } + } + + public static void registerAndVerifySchema( + RestService restService, + RegisterSchemaRequest request, + int expectedId, + String subject + ) throws IOException, RestClientException { + int registeredId = restService.registerSchema(request, subject, false).getId(); + Assert.assertEquals( + "Registering a new schema should succeed", + (long) expectedId, + (long) registeredId + ); + Assert.assertEquals( + "Registered schema should be found", + request.getSchema().trim(), + restService.getId(expectedId).getSchemaString().trim() + ); + } + @Override protected Properties getSchemaRegistryProperties() { Properties schemaRegistryProps = new Properties(); diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/json/RestApiTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/json/RestApiTest.java index 13a72e72bca..5766480aff2 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/json/RestApiTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/json/RestApiTest.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.CompatibilityLevel; +import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata; import org.junit.Assert; import org.junit.Test; @@ -44,6 +45,7 @@ import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -368,6 +370,199 @@ public void testIncompatibleSchema() throws Exception { assertTrue(response.get(4).contains("compatibility:")); } + @Test + public void testConfluentVersion() throws Exception { + String subject = "test"; + String schemaString = "{\"id\":\"urn:jsonschema:com:MySchema\",\"properties\":{\"myEnum\":{\"enum\":[\"YES_VALUE\",\"NO_VALUE\"],\"type\":\"string\"}},\"type\":\"object\"}"; + + RegisterSchemaRequest request = new RegisterSchemaRequest(); + request.setSchemaType(JsonSchema.TYPE); + request.setSchema(schemaString); + // Register with null version + registerAndVerifySchema(restApp.restClient, request, 1, subject); + + Schema result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 1, result.getVersion()); + assertNull(result.getMetadata()); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Lookup schema with null version + request.setVersion(null); + request.setMetadata(null); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 1, result.getVersion()); + assertNull(result.getMetadata()); + + // Lookup schema with confluent:version 1 (should return one without metadata) + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "1"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 1, result.getVersion()); + assertNull(result.getMetadata()); + + // Lookup schema with confluent:version 2 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "2"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Delete version 1 + restApp.restClient.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, subject, "1"); + + // Lookup schema with null version + request.setVersion(null); + request.setMetadata(null); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with null version + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with version 3 + request.setVersion(3); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + // Register schema with version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 3, result.getVersion()); + assertEquals("3", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with version 3 + request.setVersion(3); + request.setMetadata(null); + try { + registerAndVerifySchema(restApp.restClient, request, 3, subject); + fail("Registering version that is not next version should fail with " + Errors.INVALID_SCHEMA_ERROR_CODE); + } catch (RestClientException rce) { + assertEquals("Invalid schema", + Errors.INVALID_SCHEMA_ERROR_CODE, + rce.getErrorCode()); + } + + // Register schema with version 4 + request.setVersion(4); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 4, subject); + + // Lookup schema with null version + request.setVersion(null); + request.setMetadata(null); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 4, result.getVersion()); + assertEquals("4", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with confluent:version -1 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "-1"), null)); + registerAndVerifySchema(restApp.restClient, request, 5, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 5, result.getVersion()); + assertEquals("5", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with confluent:version 2 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "2"), null)); + registerAndVerifySchema(restApp.restClient, request, 2, subject); + + // Register schema with confluent:version 3 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "3"), null)); + registerAndVerifySchema(restApp.restClient, request, 3, subject); + + // Register schema with confluent:version 0 + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "0"), null)); + registerAndVerifySchema(restApp.restClient, request, 6, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 6, result.getVersion()); + assertEquals("6", result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with empty metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.emptyMap(), null)); + registerAndVerifySchema(restApp.restClient, request, 6, subject); + + // Register schema with new metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null)); + registerAndVerifySchema(restApp.restClient, request, 7, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 7, result.getVersion()); + assertNull(result.getMetadata().getProperties().get("confluent:version")); + + // Register schema with confluent:version -1 + request.setVersion(-1); + request.setMetadata(null); + registerAndVerifySchema(restApp.restClient, request, 8, subject); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 8, result.getVersion()); + assertEquals("8", result.getMetadata().getProperties().get("confluent:version")); + + // Lookup schema with new metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 7, result.getVersion()); + assertNull(result.getMetadata().getProperties().get("confluent:version")); + + // Delete version 7 + restApp.restClient.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, subject, "7"); + + // Lookup schema with new metadata + request.setVersion(null); + request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null)); + result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false); + assertEquals(schemaString, result.getSchema()); + assertEquals((Integer) 8, result.getVersion()); + assertEquals("8", result.getMetadata().getProperties().get("confluent:version")); + } public static void registerAndVerifySchema( RestService restService, @@ -403,6 +598,25 @@ public static void registerAndVerifySchema( ); } + public static void registerAndVerifySchema( + RestService restService, + RegisterSchemaRequest request, + int expectedId, + String subject + ) throws IOException, RestClientException { + int registeredId = restService.registerSchema(request, subject, false).getId(); + Assert.assertEquals( + "Registering a new schema should succeed", + (long) expectedId, + (long) registeredId + ); + Assert.assertEquals( + "Registered schema should be found", + request.getSchema().trim(), + restService.getId(expectedId).getSchemaString().trim() + ); + } + public static List getRandomJsonSchemas(int num) { List schemas = new ArrayList<>(); for (int i = 0; i < num; i++) {