Skip to content

Commit

Permalink
Merge branch '7.6.x' into 7.7.x by rayokota
Browse files Browse the repository at this point in the history
  • Loading branch information
semaphore-agent-production[bot] committed Jan 13, 2025
2 parents d56c258 + 7be77ea commit a3d786d
Show file tree
Hide file tree
Showing 5 changed files with 499 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected static boolean canLookupIgnoringVersion(
ParsedSchema newPrev = prev.copy(
Metadata.removeConfluentVersion(prev.metadata()), prev.ruleSet());
// This handles the case where current schema is without confluent:version
return newSchema.deepEquals(newPrev);
return newSchema.equivalent(newPrev);
} else if (schemaVer != null && prevVer == null) {
if (!schemaVer.equals(prev.version())) {
// The incoming confluent:version must match the actual version of the prev schema
Expand All @@ -114,9 +114,9 @@ protected static boolean canLookupIgnoringVersion(
ParsedSchema newSchema = current.copy(
Metadata.removeConfluentVersion(current.metadata()), current.ruleSet());
// This handles the case where prev schema is without confluent:version
return newSchema.deepEquals(newPrev);
return newSchema.equivalent(newPrev);
} else {
return current.deepEquals(prev);
return current.equivalent(prev);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,30 @@ default boolean hasTopLevelField(String field) {
}

/**
* Returns whether the underlying raw representations are equal, ignoring references.
* Returns whether the underlying raw representations are equivalent,
* ignoring version and references.
*
* @return whether the underlying raw representations are equal
* @return whether the underlying raw representations are equivalent
* @deprecated use {@link #equivalent(ParsedSchema)} instead
*/
default boolean deepEquals(ParsedSchema schema) {
return Objects.equals(rawSchema(), schema.rawSchema())
return equivalent(schema);
}

/**
* Returns whether the underlying raw representations are equivalent,
* ignoring version and references.
*
* @return whether the underlying raw representations are equivalent
*/
default boolean equivalent(ParsedSchema schema) {
if (this == schema) {
return true;
}
if (schema == null || getClass() != schema.getClass()) {
return false;
}
return Objects.equals(canonicalString(), schema.canonicalString())
&& Objects.equals(metadata(), schema.metadata())
&& Objects.equals(ruleSet(), schema.ruleSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,27 @@ public List<String> isBackwardCompatible(ParsedSchema previousSchema) {
}
}

/**
* Returns whether the underlying raw representations are equivalent,
* ignoring version and references.
*
* @return whether the underlying raw representations are equivalent
*/
@Override
public boolean equivalent(ParsedSchema schema) {
if (this == schema) {
return true;
}
if (schema == null || getClass() != schema.getClass()) {
return false;
}
AvroSchema that = (AvroSchema) schema;
return Objects.equals(schemaObj, that.schemaObj)
&& Objects.equals(metadata, that.metadata)
&& Objects.equals(ruleSet, that.ruleSet)
&& metaEqual(schemaObj, that.schemaObj, new HashMap<>());
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Properties;
import org.apache.avro.Schema.Parser;
import org.apache.avro.SchemaParseException;
import org.junit.Assert;
import org.junit.Test;

public class RestApiTest extends ClusterTestHarness {
Expand Down Expand Up @@ -2180,11 +2181,250 @@ public void testRegisterSchemaWithReservedFields() throws RestClientException, I
() -> restApp.restClient.registerSchema(request2, subject0, false));
}

@Test
public void testInvalidSchema() {
assertThrows(InvalidSchemaException.class, () ->
((KafkaSchemaRegistry) restApp.schemaRegistry()).parseSchema(null));
}

@Test
public void testConfluentVersion() throws Exception {
String subject = "test";
String schemaString = "{\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":"
+ "[{\"name\":\"f1\",\"type\":\"string\"}]}";

RegisterSchemaRequest request = new RegisterSchemaRequest();
request.setSchemaType(AvroSchema.TYPE);
request.setSchema(schemaString);
// Register with null version
registerAndVerifySchema(restApp.restClient, request, 1, subject);

Schema result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 1, result.getVersion());
assertNull(result.getMetadata());

// Register schema with version -1
request.setVersion(-1);
request.setMetadata(null);
registerAndVerifySchema(restApp.restClient, request, 2, subject);

// Register schema with version -1
request.setVersion(-1);
request.setMetadata(null);
registerAndVerifySchema(restApp.restClient, request, 2, subject);

result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 2, result.getVersion());
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));

// Lookup schema with null version
request.setVersion(null);
request.setMetadata(null);
result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 1, result.getVersion());
assertNull(result.getMetadata());

// Lookup schema with confluent:version 1 (should return one without metadata)
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "1"), null));
result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 1, result.getVersion());
assertNull(result.getMetadata());

// Lookup schema with confluent:version 2
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "2"), null));
result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 2, result.getVersion());
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));

// Delete version 1
restApp.restClient.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, subject, "1");

// Lookup schema with null version
request.setVersion(null);
request.setMetadata(null);
result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 2, result.getVersion());
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));

// Register schema with null version
registerAndVerifySchema(restApp.restClient, request, 2, subject);

result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 2, result.getVersion());
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));

// Register schema with version 3
request.setVersion(3);
request.setMetadata(null);
registerAndVerifySchema(restApp.restClient, request, 3, subject);

// Register schema with version -1
request.setVersion(-1);
request.setMetadata(null);
registerAndVerifySchema(restApp.restClient, request, 3, subject);

// Register schema with version -1
request.setVersion(-1);
request.setMetadata(null);
registerAndVerifySchema(restApp.restClient, request, 3, subject);

result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 3, result.getVersion());
assertEquals("3", result.getMetadata().getProperties().get("confluent:version"));

// Register schema with version 3
request.setVersion(3);
request.setMetadata(null);
try {
registerAndVerifySchema(restApp.restClient, request, 3, subject);
fail("Registering version that is not next version should fail with " + Errors.INVALID_SCHEMA_ERROR_CODE);
} catch (RestClientException rce) {
assertEquals("Invalid schema",
Errors.INVALID_SCHEMA_ERROR_CODE,
rce.getErrorCode());
}

// Register schema with version 4
request.setVersion(4);
request.setMetadata(null);
registerAndVerifySchema(restApp.restClient, request, 4, subject);

// Lookup schema with null version
request.setVersion(null);
request.setMetadata(null);
result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 4, result.getVersion());
assertEquals("4", result.getMetadata().getProperties().get("confluent:version"));

// Register schema with confluent:version -1
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "-1"), null));
registerAndVerifySchema(restApp.restClient, request, 5, subject);

result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 5, result.getVersion());
assertEquals("5", result.getMetadata().getProperties().get("confluent:version"));

// Register schema with confluent:version 2
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "2"), null));
registerAndVerifySchema(restApp.restClient, request, 2, subject);

// Register schema with confluent:version 3
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "3"), null));
registerAndVerifySchema(restApp.restClient, request, 3, subject);

// Register schema with confluent:version 0
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("confluent:version", "0"), null));
registerAndVerifySchema(restApp.restClient, request, 6, subject);

result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 6, result.getVersion());
assertEquals("6", result.getMetadata().getProperties().get("confluent:version"));

// Register schema with empty metadata
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.emptyMap(), null));
registerAndVerifySchema(restApp.restClient, request, 6, subject);

// Register schema with new metadata
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null));
registerAndVerifySchema(restApp.restClient, request, 7, subject);

result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 7, result.getVersion());
assertNull(result.getMetadata().getProperties().get("confluent:version"));

// Register schema with confluent:version -1
request.setVersion(-1);
request.setMetadata(null);
registerAndVerifySchema(restApp.restClient, request, 8, subject);

result = restApp.restClient.getLatestVersion(subject);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 8, result.getVersion());
assertEquals("8", result.getMetadata().getProperties().get("confluent:version"));

// Lookup schema with new metadata
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null));
result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 7, result.getVersion());
assertNull(result.getMetadata().getProperties().get("confluent:version"));

// Delete version 7
restApp.restClient.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, subject, "7");

// Lookup schema with new metadata
request.setVersion(null);
request.setMetadata(new Metadata(null, Collections.singletonMap("mykey", "myvalue"), null));
result = restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
assertEquals(schemaString, result.getSchema());
assertEquals((Integer) 8, result.getVersion());
assertEquals("8", result.getMetadata().getProperties().get("confluent:version"));

// Use same schema with doc
schemaString = "{\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"doc\":\"mydoc\","
+ "\"fields\":"
+ "[{\"name\":\"f1\",\"type\":\"string\"}]}";
request = new RegisterSchemaRequest();
request.setSchemaType(AvroSchema.TYPE);
request.setSchema(schemaString);

// Look up schema with version -1
request.setVersion(-1);
request.setMetadata(null);
try {
restApp.restClient.lookUpSubjectVersion(request, subject, false, false);
fail("Looking up version that is not next version should fail with " + Errors.SCHEMA_NOT_FOUND_ERROR_CODE);
} catch (RestClientException rce) {
assertEquals("Schema not found",
Errors.SCHEMA_NOT_FOUND_ERROR_CODE,
rce.getErrorCode());
}
}

public static void registerAndVerifySchema(
RestService restService,
RegisterSchemaRequest request,
int expectedId,
String subject
) throws IOException, RestClientException {
int registeredId = restService.registerSchema(request, subject, false).getId();
Assert.assertEquals(
"Registering a new schema should succeed",
(long) expectedId,
(long) registeredId
);
Assert.assertEquals(
"Registered schema should be found",
request.getSchema().trim(),
restService.getId(expectedId).getSchemaString().trim()
);
}

@Override
protected Properties getSchemaRegistryProperties() {
Properties schemaRegistryProps = new Properties();
Expand Down
Loading

0 comments on commit a3d786d

Please sign in to comment.