From 9f1e3f9863e7f05982923ea99d120ef57ffa0f98 Mon Sep 17 00:00:00 2001 From: Daniel Krawczyk Date: Thu, 29 Sep 2016 09:26:57 +0200 Subject: [PATCH] Schema-registry integration --- build.gradle | 1 + .../pl/allegro/tech/hermes/api/Topic.java | 34 +-- .../pl/allegro/tech/hermes/api/TopicTest.java | 2 - .../tech/hermes/api/helper/PatchTest.java | 4 +- hermes-common/build.gradle | 5 +- .../tech/hermes/common/config/Configs.java | 4 +- .../tech/hermes/common/di/CommonBinder.java | 21 +- .../di/factories/BoonObjectMapperFactory.java | 18 -- .../exception/InvalidSchemaException.java | 26 -- .../common/message/AvroSchemaSource.java | 47 ---- .../DeserializationException.java | 13 - .../wrapper/AvroMessageContentWrapper.java | 2 +- .../wrapper/DeserializationException.java | 8 + .../wrapper/MessageContentWrapper.java | 58 ++++- .../SchemaAwarePayload.java | 4 +- .../SchemaAwareSerDe.java | 7 +- .../wrapper}/SchemaMissingException.java | 4 +- .../wrapper/UnwrappedMessageContent.java | 2 +- .../AvroCompiledSchemaRepositoryFactory.java | 16 +- .../schema/SchemaRepositoryFactory.java | 33 +++ .../common/schema/SchemaRepositoryType.java | 5 + .../schema/SchemaSourceClientFactory.java | 53 ++++ .../SchemaVersionsRepositoryFactory.java | 48 ++++ .../CouldNotCompileSchemaException.java | 17 -- .../schema/CouldNotLoadSchemaException.java | 20 -- .../topic/schema/SchemaCompilersFactory.java | 28 -- .../SchemaRepoSchemaSourceProvider.java | 34 --- .../domain/topic/schema/SchemaRepository.java | 59 ----- .../topic/schema/SchemaRepositoryType.java | 5 - .../topic/schema/SchemaSourceProvider.java | 23 -- .../domain/topic/schema/TopicWithSchema.java | 22 -- .../schema/UnknownSchemaVersionException.java | 17 -- .../schema/ZookeeperSchemaSourceProvider.java | 37 --- .../JsonCompiledSchemaRepositoryFactory.java | 37 --- .../schema/SchemaSourceProviderFactory.java | 53 ---- .../SchemaVersionsRepositoryFactory.java | 52 ---- .../schema/repo/SchemaRepoClient.java | 24 -- .../schema/repo/SchemaRepoClientFactory.java | 35 --- .../ZookeeperTopicRepositoryTest.groovy | 3 +- .../wrapper/MessageContentWrapperTest.java | 69 +++-- .../SchemaAwareSerDeTest.java | 4 +- .../SchemaRepoSchemaSourceProviderTest.java | 68 ----- .../ZookeeperSchemaSourceProviderTest.java | 65 ----- .../repo/JerseySchemaRepoClientTest.java | 226 ----------------- hermes-console/config.json.example | 3 +- .../js/console/topic/TopicController.js | 2 +- .../js/console/topic/TopicRepository.js | 21 +- .../static/partials/modal/editTopic.html | 25 +- hermes-console/static/partials/topic.html | 6 +- hermes-consumers/build.gradle | 3 +- .../hermes/consumers/consumer/Message.java | 2 +- .../filtering/avro/AvroPathPredicate.java | 3 +- .../kafka/KafkaMessageReceiverFactory.java | 2 +- .../KafkaSingleThreadedMessageReceiver.java | 4 +- .../AvroToJsonMessageConverterTest.java | 4 +- .../hermes/consumers/test/MessageBuilder.java | 4 +- hermes-frontend/build.gradle | 1 + .../hermes/frontend/di/FrontendBinder.java | 2 - .../MessageContentTypeEnforcer.java | 1 - .../publishing/PublishingServlet.java | 6 +- .../frontend/publishing/avro/AvroMessage.java | 2 +- .../publishing/message/JsonMessage.java | 16 -- .../frontend/publishing/message/Message.java | 4 +- .../publishing/message/MessageFactory.java | 30 +-- .../validator/JsonTopicMessageValidator.java | 69 ----- .../TopicMessageValidatorListFactory.java | 6 +- .../MessageContentTypeEnforcerTest.java | 6 +- hermes-management/build.gradle | 1 + .../hermes/management/api/SchemaEndpoint.java | 6 +- .../api/mappers/RuntimeExceptionMapper.java | 7 +- .../api/mappers/SchemaExceptionMapper.java | 22 ++ .../config/MessageConfiguration.java | 6 +- .../config/SchemaRepositoryConfiguration.java | 68 ++--- .../management/config/TopicProperties.java | 10 + .../config/kafka/KafkaConfiguration.java | 2 +- .../AvroSchemaRemovalDisabledException.java | 16 -- .../schema/CouldNotDeleteSchemaException.java | 16 -- ...va => SchemaRemovalDisabledException.java} | 8 +- .../topic/schema/SchemaSourceRepository.java | 18 -- .../topic/schema/SchemaSourceService.java | 39 +-- .../topic/validator/TopicValidator.java | 6 +- .../service/KafkaSingleMessageReader.java | 8 +- .../KafkaRetransmissionService.java | 2 +- .../retransmit/KafkaTimestampExtractor.java | 4 +- .../SchemaRepoSchemaSourceRepository.java | 33 --- .../ZookeeperSchemaSourceRepository.java | 41 --- .../schema/validator/AvroSchemaValidator.java | 1 - .../validator/InvalidSchemaException.java | 16 ++ .../schema/validator/JsonSchemaValidator.java | 44 ---- .../schema/validator/SchemaValidator.java | 2 - .../validator/SchemaValidatorProvider.java | 3 +- .../src/main/resources/application.yaml | 2 +- .../topic/validator/TopicValidatorTest.groovy | 12 +- ...ZookeeperSchemaSourceRepositorySpec.groovy | 63 ----- .../validator/AvroSchemaValidatorTest.groovy | 1 - .../validator/JsonSchemaValidatorTest.groovy | 45 ---- hermes-schema/build.gradle | 11 + .../CachedCompiledSchemaRepository.java | 2 +- .../CachedSchemaVersionsRepository.java | 27 +- .../tech/hermes}/schema/CompiledSchema.java | 2 +- .../schema/CompiledSchemaRepository.java | 3 +- .../schema/CouldNotLoadSchemaException.java | 19 ++ .../schema/CouldNotRemoveSchemaException.java | 15 ++ .../DirectCompiledSchemaRepository.java | 14 +- .../DirectSchemaVersionsRepository.java | 16 +- .../hermes/schema/InvalidSchemaException.java | 15 ++ .../tech/hermes}/schema/SchemaCompiler.java | 2 +- .../hermes/schema/SchemaCompilersFactory.java | 10 + .../tech/hermes/schema/SchemaException.java | 20 ++ .../tech/hermes/schema/SchemaRepository.java | 48 ++++ .../SchemaRepositoryServerException.java | 7 +- .../hermes/schema/SchemaSourceClient.java | 20 ++ .../schema/SchemaSourceNotFoundException.java | 13 +- .../tech/hermes}/schema/SchemaVersion.java | 2 +- .../schema/SchemaVersionsRepository.java | 5 +- .../confluent/SchemaRegistryRequest.java | 42 +++ .../confluent/SchemaRegistryResponse.java | 69 +++++ .../SchemaRegistrySchemaSourceClient.java | 146 +++++++++++ .../SchemaRepoSchemaSourceClient.java | 94 ++++--- .../CachedCompiledSchemaRepositoryTest.groovy | 2 +- .../CachedSchemaVersionsRepositoryTest.groovy | 28 +- .../DirectCompiledSchemaRepositoryTest.groovy | 16 +- .../SchemaRegistrySchemaSourceClientTest.java | 219 ++++++++++++++++ .../SchemaRepoSchemaSourceClientTest.java | 240 ++++++++++++++++++ .../__files/all-schemas-response.json | 0 .../hermes/test/helper/avro/AvroUser.java | 4 +- .../test/helper/builder/TopicBuilder.java | 18 +- .../helper/endpoint/HermesAPIOperations.java | 16 ++ .../src/main/resources/schema/example.json | 11 - .../src/main/resources/schema/user_v2.avsc | 2 +- integration/build.gradle | 7 +- .../hermes/integration/BatchDeliveryTest.java | 1 - .../hermes/integration/FilteringAvroTest.java | 2 - .../KafkaSingleMessageReaderTest.java | 2 - .../integration/MessageBufferLoadingTest.java | 2 +- .../integration/PublishingAvroTest.java | 14 +- .../hermes/integration/PublishingTest.java | 64 ----- .../env/ConfluentSchemaRegistryStarter.java | 49 ++++ .../integration/env/ConsumersStarter.java | 4 +- .../integration/env/EnvironmentAware.java | 4 +- .../integration/env/FrontendStarter.java | 4 +- .../env/HermesIntegrationEnvironment.java | 4 +- .../integration/helper/GraphiteEndpoint.java | 3 +- .../management/SchemaManagementTest.java | 30 +-- .../integration/resources/application.yaml | 1 - .../resources/application-integration.yaml | 2 +- .../src/test/resources/config.properties | 2 +- settings.gradle | 3 +- 148 files changed, 1554 insertions(+), 1839 deletions(-) delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/BoonObjectMapperFactory.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/exception/InvalidSchemaException.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/AvroSchemaSource.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/DeserializationException.java create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/DeserializationException.java rename hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/{serialization => wrapper}/SchemaAwarePayload.java (77%) rename hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/{serialization => wrapper}/SchemaAwareSerDe.java (80%) rename hermes-common/src/main/java/pl/allegro/tech/hermes/{domain/topic/schema => common/message/wrapper}/SchemaMissingException.java (80%) rename hermes-common/src/main/java/pl/allegro/tech/hermes/{infrastructure => common}/schema/AvroCompiledSchemaRepositoryFactory.java (58%) create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryFactory.java create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryType.java create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaSourceClientFactory.java create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotCompileSchemaException.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotLoadSchemaException.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaCompilersFactory.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProvider.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepository.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepositoryType.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaSourceProvider.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/TopicWithSchema.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/UnknownSchemaVersionException.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProvider.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/JsonCompiledSchemaRepositoryFactory.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaSourceProviderFactory.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaVersionsRepositoryFactory.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClient.java delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClientFactory.java rename hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/{serialization => wrapper}/SchemaAwareSerDeTest.java (91%) delete mode 100644 hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProviderTest.java delete mode 100644 hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProviderTest.java delete mode 100644 hermes-common/src/test/java/pl/allegro/tech/hermes/infrastructure/schema/repo/JerseySchemaRepoClientTest.java delete mode 100644 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/validator/JsonTopicMessageValidator.java create mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/mappers/SchemaExceptionMapper.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/AvroSchemaRemovalDisabledException.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/CouldNotDeleteSchemaException.java rename hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/{CouldNotSaveSchemaException.java => SchemaRemovalDisabledException.java} (52%) delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaSourceRepository.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/schema/SchemaRepoSchemaSourceRepository.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/schema/ZookeeperSchemaSourceRepository.java create mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/schema/validator/InvalidSchemaException.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/schema/validator/JsonSchemaValidator.java delete mode 100644 hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/schema/ZookeeperSchemaSourceRepositorySpec.groovy delete mode 100644 hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/schema/validator/JsonSchemaValidatorTest.groovy create mode 100644 hermes-schema/build.gradle rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/CachedCompiledSchemaRepository.java (97%) rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/CachedSchemaVersionsRepository.java (71%) rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/CompiledSchema.java (95%) rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/CompiledSchemaRepository.java (75%) create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/CouldNotLoadSchemaException.java create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/CouldNotRemoveSchemaException.java rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/DirectCompiledSchemaRepository.java (64%) rename hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SimpleSchemaVersionsRepository.java => hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/DirectSchemaVersionsRepository.java (54%) create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/InvalidSchemaException.java rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/SchemaCompiler.java (74%) create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/SchemaCompilersFactory.java create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/SchemaException.java create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/SchemaRepository.java rename hermes-common/src/main/java/pl/allegro/tech/hermes/common/exception/SchemaRepoException.java => hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/SchemaRepositoryServerException.java (50%) create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/SchemaSourceClient.java rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/SchemaSourceNotFoundException.java (53%) rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/SchemaVersion.java (95%) rename {hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/main/java/pl/allegro/tech/hermes}/schema/SchemaVersionsRepository.java (76%) create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRequest.java create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryResponse.java create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistrySchemaSourceClient.java rename hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/JerseySchemaRepoClient.java => hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/schemarepo/SchemaRepoSchemaSourceClient.java (61%) rename {hermes-common/src/test/groovy/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/test/groovy/pl/allegro/tech/hermes}/schema/CachedCompiledSchemaRepositoryTest.groovy (96%) rename {hermes-common/src/test/groovy/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/test/groovy/pl/allegro/tech/hermes}/schema/CachedSchemaVersionsRepositoryTest.groovy (75%) rename {hermes-common/src/test/groovy/pl/allegro/tech/hermes/domain/topic => hermes-schema/src/test/groovy/pl/allegro/tech/hermes}/schema/DirectCompiledSchemaRepositoryTest.groovy (71%) create mode 100644 hermes-schema/src/test/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistrySchemaSourceClientTest.java create mode 100644 hermes-schema/src/test/java/pl/allegro/tech/hermes/schema/schemarepo/SchemaRepoSchemaSourceClientTest.java rename {hermes-common => hermes-schema}/src/test/resources/schema-repo-stub/__files/all-schemas-response.json (100%) delete mode 100644 hermes-test-helper/src/main/resources/schema/example.json create mode 100644 integration/src/integration/java/pl/allegro/tech/hermes/integration/env/ConfluentSchemaRegistryStarter.java diff --git a/build.gradle b/build.gradle index 0b3ebdcc3e..d6af522127 100644 --- a/build.gradle +++ b/build.gradle @@ -43,6 +43,7 @@ allprojects { guava : '19.0', jackson : '2.5.1', jersey : '2.23.2', + jetty : '9.3.6.v20151106', curator : '2.11.0', wiremock : '1.58', fongo : '1.6.1', diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java index 27939118bf..ccdb7b9279 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java @@ -17,10 +17,6 @@ public class Topic { @NotNull private String description; - private boolean validationEnabled = false; - - private boolean validationDryRunEnabled = false; - private boolean jsonToAvroDryRunEnabled = false; @NotNull @@ -44,14 +40,11 @@ public enum Ack { private boolean schemaVersionAwareSerializationEnabled = false; public Topic(TopicName name, String description, RetentionTime retentionTime, - boolean validationEnabled, boolean validationDryRunEnabled, boolean migratedFromJsonType, - Ack ack, boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled, - boolean schemaVersionAwareSerializationEnabled) { + boolean migratedFromJsonType, Ack ack, boolean trackingEnabled, ContentType contentType, + boolean jsonToAvroDryRunEnabled, boolean schemaVersionAwareSerializationEnabled) { this.name = name; this.description = description; this.retentionTime = retentionTime; - this.validationEnabled = validationEnabled; - this.validationDryRunEnabled = validationDryRunEnabled; this.ack = (ack == null ? Ack.LEADER : ack); this.trackingEnabled = trackingEnabled; this.migratedFromJsonType = migratedFromJsonType; @@ -65,17 +58,14 @@ public Topic( @JsonProperty("name") String qualifiedName, @JsonProperty("description") String description, @JsonProperty("retentionTime") RetentionTime retentionTime, - @JsonProperty("validation") boolean validationEnabled, - @JsonProperty("validationDryRun") boolean validationDryRunEnabled, @JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled, @JsonProperty("ack") Ack ack, @JsonProperty("trackingEnabled") boolean trackingEnabled, @JsonProperty("migratedFromJsonType") boolean migratedFromJsonType, @JsonProperty("schemaVersionAwareSerializationEnabled") boolean schemaVersionAwareSerializationEnabled, @JsonProperty("contentType") ContentType contentType) { - this(TopicName.fromQualifiedName(qualifiedName), description, retentionTime, validationEnabled, - validationDryRunEnabled, migratedFromJsonType, ack, trackingEnabled, contentType, jsonToAvroDryRunEnabled, - schemaVersionAwareSerializationEnabled); + this(TopicName.fromQualifiedName(qualifiedName), description, retentionTime, migratedFromJsonType, ack, + trackingEnabled, contentType, jsonToAvroDryRunEnabled, schemaVersionAwareSerializationEnabled); } public RetentionTime getRetentionTime() { @@ -84,8 +74,8 @@ public RetentionTime getRetentionTime() { @Override public int hashCode() { - return Objects.hash(name, description, retentionTime, validationEnabled, validationDryRunEnabled, - migratedFromJsonType, trackingEnabled, ack, contentType, jsonToAvroDryRunEnabled, schemaVersionAwareSerializationEnabled); + return Objects.hash(name, description, retentionTime, migratedFromJsonType, trackingEnabled, ack, contentType, + jsonToAvroDryRunEnabled, schemaVersionAwareSerializationEnabled); } @Override @@ -101,8 +91,6 @@ public boolean equals(Object obj) { return Objects.equals(this.name, other.name) && Objects.equals(this.description, other.description) && Objects.equals(this.retentionTime, other.retentionTime) - && Objects.equals(this.isValidationEnabled(), other.isValidationEnabled()) - && Objects.equals(this.validationDryRunEnabled, other.validationDryRunEnabled) && Objects.equals(this.jsonToAvroDryRunEnabled, other.jsonToAvroDryRunEnabled) && Objects.equals(this.trackingEnabled, other.trackingEnabled) && Objects.equals(this.migratedFromJsonType, other.migratedFromJsonType) @@ -133,16 +121,6 @@ public void setRetentionTime(RetentionTime retentionTime) { this.retentionTime = retentionTime; } - @JsonProperty("validation") - public boolean isValidationEnabled() { - return validationEnabled || ContentType.AVRO == contentType; - } - - @JsonProperty("validationDryRun") - public boolean isValidationDryRunEnabled() { - return validationDryRunEnabled; - } - @JsonProperty("jsonToAvroDryRun") public boolean isJsonToAvroDryRunEnabled() { return jsonToAvroDryRunEnabled; diff --git a/hermes-api/src/test/java/pl/allegro/tech/hermes/api/TopicTest.java b/hermes-api/src/test/java/pl/allegro/tech/hermes/api/TopicTest.java index 9821eebdef..de703621f6 100644 --- a/hermes-api/src/test/java/pl/allegro/tech/hermes/api/TopicTest.java +++ b/hermes-api/src/test/java/pl/allegro/tech/hermes/api/TopicTest.java @@ -21,8 +21,6 @@ public void shouldDeserializeTopic() throws Exception { assertThat(topic.getName().getName()).isEqualTo("bar"); assertThat(topic.getName().getGroupName()).isEqualTo("foo"); assertThat(topic.getDescription()).isEqualTo("description"); - assertThat(topic.isValidationEnabled()).isFalse(); - assertThat(topic.isValidationDryRunEnabled()).isFalse(); } @Test diff --git a/hermes-api/src/test/java/pl/allegro/tech/hermes/api/helper/PatchTest.java b/hermes-api/src/test/java/pl/allegro/tech/hermes/api/helper/PatchTest.java index 9682479519..b3ea8c72c7 100644 --- a/hermes-api/src/test/java/pl/allegro/tech/hermes/api/helper/PatchTest.java +++ b/hermes-api/src/test/java/pl/allegro/tech/hermes/api/helper/PatchTest.java @@ -73,13 +73,13 @@ public void shouldPatchNestedObjects() { public void shouldNotResetPrimitiveFields() { // given Topic topic = topic("group.topic").withTrackingEnabled(true).build(); - PatchData patch = patchData().set("validation", true).build(); + PatchData patch = patchData().set("schemaVersionAwareSerializationEnabled", true).build(); // when Topic patched = Patch.apply(topic, patch); // then assertThat(patched.isTrackingEnabled()).isTrue(); - assertThat(patched.isValidationEnabled()).isTrue(); + assertThat(patched.isSchemaVersionAwareSerializationEnabled()).isTrue(); } } diff --git a/hermes-common/build.gradle b/hermes-common/build.gradle index 45d14aa3c5..744171dcd5 100644 --- a/hermes-common/build.gradle +++ b/hermes-common/build.gradle @@ -3,6 +3,7 @@ apply plugin: 'groovy' dependencies { compile project(':hermes-api') compile project(':hermes-metrics') + compile project(':hermes-schema') compile group: 'com.netflix.archaius', name: 'archaius-core', version: '0.6.0' @@ -14,9 +15,6 @@ dependencies { exclude module: 'slf4j-log4j12' exclude module: 'log4j' } - compile (group: 'com.github.fge', name: 'json-schema-validator', version: '2.2.6') { - exclude group: 'net.sf.jopt-simple' - } compile group: 'org.glassfish.hk2', name: 'hk2-locator', version: '2.3.0-b01' compile group: 'org.glassfish.jersey.core', name: 'jersey-client', version: versions.jersey @@ -30,7 +28,6 @@ dependencies { compile group: 'com.google.guava', name: 'guava', version: versions.guava compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: versions.jackson - compile group: 'io.fastjson', name: 'boon', version: '0.28' compile group: 'org.apache.avro', name: 'avro', version: '1.7.7' compile group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: '3.1.1' diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java index 7ffe5b6030..384eb1bff3 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/config/Configs.java @@ -188,8 +188,8 @@ public enum Configs { SCHEMA_CACHE_RELOAD_THREAD_POOL_SIZE("schema.cache.reload.thread.pool.size", 2), SCHEMA_CACHE_ENABLED("schema.cache.enabled", true), SCHEMA_CACHE_COMPILED_MAXIMUM_SIZE("schema.cache.compiled.maximum.size", 2000), - SCHEMA_REPOSITORY_TYPE("schema.repository.type", "zookeeper"), - SCHEMA_REPOSITORY_SERVER_URL("schema.repository.serverUrl", "http://localhost:8888/schema-repo/"), + SCHEMA_REPOSITORY_TYPE("schema.repository.type", "schema_registry"), + SCHEMA_REPOSITORY_SERVER_URL("schema.repository.serverUrl", "http://localhost:8888/"), SCHEMA_REPOSITORY_HTTP_READ_TIMEOUT_MS("schema.repository.http.read.timeout.ms", 2000), SCHEMA_REPOSITORY_HTTP_CONNECT_TIMEOUT_MS("schema.repository.http.connect.timeout.ms", 2000), diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/CommonBinder.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/CommonBinder.java index 7ce8bf4dbb..79f52bf4dd 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/CommonBinder.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/CommonBinder.java @@ -1,6 +1,5 @@ package pl.allegro.tech.hermes.common.di; -import com.github.fge.jsonschema.main.JsonSchema; import com.yammer.metrics.core.HealthCheckRegistry; import org.apache.avro.Schema; import org.glassfish.hk2.api.TypeLiteral; @@ -8,7 +7,6 @@ import pl.allegro.tech.hermes.common.broker.ZookeeperBrokerStorage; import pl.allegro.tech.hermes.common.clock.ClockFactory; import pl.allegro.tech.hermes.common.config.ConfigFactory; -import pl.allegro.tech.hermes.common.di.factories.BoonObjectMapperFactory; import pl.allegro.tech.hermes.common.di.factories.CuratorClientFactory; import pl.allegro.tech.hermes.common.di.factories.DistributedEphemeralCounterFactory; import pl.allegro.tech.hermes.common.di.factories.GraphiteWebTargetFactory; @@ -34,17 +32,15 @@ import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.common.metric.counter.CounterStorage; import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterStorage; +import pl.allegro.tech.hermes.common.schema.AvroCompiledSchemaRepositoryFactory; +import pl.allegro.tech.hermes.common.schema.SchemaRepositoryFactory; +import pl.allegro.tech.hermes.common.schema.SchemaSourceClientFactory; +import pl.allegro.tech.hermes.common.schema.SchemaVersionsRepositoryFactory; import pl.allegro.tech.hermes.common.util.HostnameResolver; import pl.allegro.tech.hermes.common.util.InetAddressHostnameResolver; import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus; -import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchemaRepository; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepository; -import pl.allegro.tech.hermes.infrastructure.schema.AvroCompiledSchemaRepositoryFactory; -import pl.allegro.tech.hermes.infrastructure.schema.JsonCompiledSchemaRepositoryFactory; -import pl.allegro.tech.hermes.infrastructure.schema.SchemaSourceProviderFactory; -import pl.allegro.tech.hermes.infrastructure.schema.SchemaVersionsRepositoryFactory; -import pl.allegro.tech.hermes.infrastructure.schema.repo.SchemaRepoClientFactory; import pl.allegro.tech.hermes.infrastructure.zookeeper.notifications.ZookeeperInternalNotificationBus; +import pl.allegro.tech.hermes.schema.CompiledSchemaRepository; import javax.inject.Singleton; import java.time.Clock; @@ -58,12 +54,10 @@ protected void configure() { bindFactory(ClockFactory.class).in(Singleton.class).to(Clock.class); bind(ZookeeperBrokerStorage.class).to(BrokerStorage.class).in(Singleton.class); bind(InetAddressHostnameResolver.class).in(Singleton.class).to(HostnameResolver.class); - bindSingletonFactory(SchemaSourceProviderFactory.class); - bindSingletonFactory(SchemaRepoClientFactory.class); + bindSingletonFactory(SchemaSourceClientFactory.class); bindSingletonFactory(SchemaVersionsRepositoryFactory.class); - bindSingleton(SchemaRepository.class); - bindFactory(JsonCompiledSchemaRepositoryFactory.class).in(Singleton.class).to(new TypeLiteral>() {}); bindFactory(AvroCompiledSchemaRepositoryFactory.class).in(Singleton.class).to(new TypeLiteral>() {}); + bindSingletonFactory(SchemaRepositoryFactory.class); bindSingleton(CuratorClientFactory.class); bindSingleton(HermesMetrics.class); @@ -77,7 +71,6 @@ protected void configure() { bindSingletonFactory(KafkaCuratorClientFactory.class).named(CuratorType.KAFKA); bindSingletonFactory(GraphiteWebTargetFactory.class); bindSingletonFactory(ObjectMapperFactory.class); - bindSingletonFactory(BoonObjectMapperFactory.class); bindSingletonFactory(SharedCounterFactory.class); bindSingletonFactory(DistributedEphemeralCounterFactory.class); bindSingletonFactory(MetricRegistryFactory.class); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/BoonObjectMapperFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/BoonObjectMapperFactory.java deleted file mode 100644 index 97caccf1e6..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/BoonObjectMapperFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package pl.allegro.tech.hermes.common.di.factories; - -import org.boon.json.JsonFactory; -import org.boon.json.ObjectMapper; -import org.glassfish.hk2.api.Factory; - -public class BoonObjectMapperFactory implements Factory { - - @Override - public ObjectMapper provide() { - return JsonFactory.create(); - } - - @Override - public void dispose(ObjectMapper instance) { - - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/exception/InvalidSchemaException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/exception/InvalidSchemaException.java deleted file mode 100644 index e8057a1858..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/exception/InvalidSchemaException.java +++ /dev/null @@ -1,26 +0,0 @@ -package pl.allegro.tech.hermes.common.exception; - -import com.google.common.base.Joiner; -import pl.allegro.tech.hermes.api.ErrorCode; - -import java.util.List; - -public class InvalidSchemaException extends HermesException { - - public InvalidSchemaException(Throwable cause) { - super("Error while trying to validate schema", cause); - } - - public InvalidSchemaException(List reasons) { - super(String.format("Tried to set invalid topic schema: %s", Joiner.on(";").join(reasons))); - } - - public InvalidSchemaException(String message) { - super(message); - } - - @Override - public ErrorCode getCode() { - return ErrorCode.FORMAT_ERROR; - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/AvroSchemaSource.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/AvroSchemaSource.java deleted file mode 100644 index 8985a0ac9b..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/AvroSchemaSource.java +++ /dev/null @@ -1,47 +0,0 @@ -package pl.allegro.tech.hermes.common.message; - -import org.apache.avro.Schema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchema; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaMissingException; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion; - -import java.util.List; -import java.util.function.Function; - -public interface AvroSchemaSource { - CompiledSchema getAvroSchema(Topic topic, SchemaVersion version); - List versions(Topic topic, boolean online); - - - Logger logger = LoggerFactory.getLogger(AvroSchemaSource.class); - - default List versions(Topic topic) { - return versions(topic, false); - } - - default T invokeWithFallback(Topic topic, Function, T> processor, List availableVersions) { - for (SchemaVersion version : availableVersions) { - try { - return processor.apply(getAvroSchema(topic, version)); - } catch (Exception ex) { - logger.debug("Failed to match schema for message for topic {}, schema version {}, fallback to previous.", topic.getQualifiedName(), version.value()); - } - } - logger.error("Could not match schema from cache for message for topic {} {}", topic.getQualifiedName(), SchemaVersion.toString(availableVersions)); - throw new SchemaMissingException(topic); - } - - // try-harding to find proper schema - default T tryHard(Topic topic, Function, T> processor) { - try { - return invokeWithFallback(topic, processor, versions(topic)); - } catch (Exception ex) { - logger.info("Trying to find schema online for message for topic {}", topic.getQualifiedName()); - return invokeWithFallback(topic, processor, versions(topic, true)); - } - } - -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/DeserializationException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/DeserializationException.java deleted file mode 100644 index b5aedb68cf..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/DeserializationException.java +++ /dev/null @@ -1,13 +0,0 @@ -package pl.allegro.tech.hermes.common.message.serialization; - -import pl.allegro.tech.hermes.common.exception.InternalProcessingException; - -public class DeserializationException extends InternalProcessingException { - public DeserializationException(String message, Exception cause) { - super(message, cause); - } - - public DeserializationException(String message) { - super(message); - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/AvroMessageContentWrapper.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/AvroMessageContentWrapper.java index ac37c48a05..4bb2f41f99 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/AvroMessageContentWrapper.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/AvroMessageContentWrapper.java @@ -5,7 +5,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; import pl.allegro.tech.hermes.common.util.MessageId; -import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchema; +import pl.allegro.tech.hermes.schema.CompiledSchema; import javax.inject.Inject; import java.time.Clock; diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/DeserializationException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/DeserializationException.java new file mode 100644 index 0000000000..9cc62105f8 --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/DeserializationException.java @@ -0,0 +1,8 @@ +package pl.allegro.tech.hermes.common.message.wrapper; + +public class DeserializationException extends RuntimeException { + + DeserializationException(String message) { + super(message); + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapper.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapper.java index e608cc3440..e227854f50 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapper.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapper.java @@ -1,26 +1,31 @@ package pl.allegro.tech.hermes.common.message.wrapper; import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.message.AvroSchemaSource; -import pl.allegro.tech.hermes.common.message.serialization.SchemaAwarePayload; -import pl.allegro.tech.hermes.common.message.serialization.SchemaAwareSerDe; -import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchema; +import pl.allegro.tech.hermes.schema.CompiledSchema; +import pl.allegro.tech.hermes.schema.SchemaRepository; +import pl.allegro.tech.hermes.schema.SchemaVersion; import javax.inject.Inject; +import java.util.List; import java.util.Map; -import static java.util.Optional.of; - public class MessageContentWrapper { + + private static final Logger logger = LoggerFactory.getLogger(MessageContentWrapper.class); + private final JsonMessageContentWrapper jsonMessageContentWrapper; private final AvroMessageContentWrapper avroMessageContentWrapper; + private final SchemaRepository schemaRepository; @Inject public MessageContentWrapper(JsonMessageContentWrapper jsonMessageContentWrapper, - AvroMessageContentWrapper avroMessageContentWrapper) { + AvroMessageContentWrapper avroMessageContentWrapper, SchemaRepository schemaRepository) { this.jsonMessageContentWrapper = jsonMessageContentWrapper; this.avroMessageContentWrapper = avroMessageContentWrapper; + this.schemaRepository = schemaRepository; } public UnwrappedMessageContent unwrapJson(byte[] data) { @@ -28,15 +33,42 @@ public UnwrappedMessageContent unwrapJson(byte[] data) { } public UnwrappedMessageContent unwrapAvro(byte[] data, - Topic topic, - AvroSchemaSource schemaSource) { - return topic.isSchemaVersionAwareSerializationEnabled() ? - deserialize(data, topic, schemaSource) : schemaSource.tryHard(topic, schema -> avroMessageContentWrapper.unwrapContent(data, schema)); + Topic topic) { + return topic.isSchemaVersionAwareSerializationEnabled() ? deserializeSchemaVersionAwarePayload(data, topic) : + tryDeserializingUsingAnySchemaVersion(data, topic); } - private UnwrappedMessageContent deserialize(byte[] data, Topic topic, AvroSchemaSource schemaSource) { + private UnwrappedMessageContent deserializeSchemaVersionAwarePayload(byte[] data, Topic topic) { SchemaAwarePayload payload = SchemaAwareSerDe.deserialize(data); - return avroMessageContentWrapper.unwrapContent(payload.getPayload(), schemaSource.getAvroSchema(topic, payload.getSchemaVersion())); + return avroMessageContentWrapper.unwrapContent(payload.getPayload(), + schemaRepository.getAvroSchema(topic, payload.getSchemaVersion())); + } + + // try-harding to find proper schema + private UnwrappedMessageContent tryDeserializingUsingAnySchemaVersion(byte[] data, Topic topic) { + try { + return tryDeserializingUsingAnySchemaVersion(data, topic, false); + } catch (Exception ex) { + logger.info("Trying to find schema online for message for topic {}", topic.getQualifiedName()); + return tryDeserializingUsingAnySchemaVersion(data, topic, true); + } + } + + private UnwrappedMessageContent tryDeserializingUsingAnySchemaVersion(byte[] data, Topic topic, boolean online) { + List versions = schemaRepository.getVersions(topic, online); + for (SchemaVersion version : versions) { + try { + CompiledSchema schema = online ? schemaRepository.getKnownAvroSchemaVersion(topic, version) : + schemaRepository.getAvroSchema(topic, version); + return avroMessageContentWrapper.unwrapContent(data, schema); + } catch (Exception ex) { + logger.debug("Failed to match schema for message for topic {}, schema version {}, fallback to previous.", + topic.getQualifiedName(), version.value()); + } + } + logger.error("Could not match schema from cache for message for topic {} {}", + topic.getQualifiedName(), SchemaVersion.toString(versions)); + throw new SchemaMissingException(topic); } public byte[] wrapAvro(byte[] data, String id, long timestamp, Topic topic, CompiledSchema schema, Map externalMetadata) { diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwarePayload.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwarePayload.java similarity index 77% rename from hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwarePayload.java rename to hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwarePayload.java index 47c8eb7cea..a7f9d2b74a 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwarePayload.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwarePayload.java @@ -1,6 +1,6 @@ -package pl.allegro.tech.hermes.common.message.serialization; +package pl.allegro.tech.hermes.common.message.wrapper; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion; +import pl.allegro.tech.hermes.schema.SchemaVersion; public class SchemaAwarePayload { private final byte[] payload; diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwareSerDe.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwareSerDe.java similarity index 80% rename from hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwareSerDe.java rename to hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwareSerDe.java index 0ee542cee1..3d9d0e3bcf 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwareSerDe.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwareSerDe.java @@ -1,11 +1,10 @@ -package pl.allegro.tech.hermes.common.message.serialization; +package pl.allegro.tech.hermes.common.message.wrapper; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion; +import pl.allegro.tech.hermes.schema.SchemaVersion; import java.nio.ByteBuffer; import static java.lang.String.format; -import static pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion.valueOf; public class SchemaAwareSerDe { private static final byte MAGIC_BYTE = 0; @@ -28,7 +27,7 @@ public static SchemaAwarePayload deserialize(byte[] payloadWithHeader) { int schemaVersion = buffer.getInt(); byte[] payload = new byte[payloadWithHeader.length - HEADER_SIZE]; buffer.get(payload); - return new SchemaAwarePayload(payload, valueOf(schemaVersion)); + return new SchemaAwarePayload(payload, SchemaVersion.valueOf(schemaVersion)); } private static void assertMagicByte(byte magicByte) { diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaMissingException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaMissingException.java similarity index 80% rename from hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaMissingException.java rename to hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaMissingException.java index 86fe14554e..6df5563fe1 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaMissingException.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaMissingException.java @@ -1,4 +1,4 @@ -package pl.allegro.tech.hermes.domain.topic.schema; +package pl.allegro.tech.hermes.common.message.wrapper; import pl.allegro.tech.hermes.api.ErrorCode; import pl.allegro.tech.hermes.api.Topic; @@ -6,7 +6,7 @@ public class SchemaMissingException extends HermesException { - public SchemaMissingException(Topic topic) { + SchemaMissingException(Topic topic) { super("Schema for topic " + topic.getQualifiedName() + " was not available"); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/UnwrappedMessageContent.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/UnwrappedMessageContent.java index a6f2926164..f531186a68 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/UnwrappedMessageContent.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/wrapper/UnwrappedMessageContent.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.common.message.wrapper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchema; +import pl.allegro.tech.hermes.schema.CompiledSchema; import java.util.Optional; diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/AvroCompiledSchemaRepositoryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/AvroCompiledSchemaRepositoryFactory.java similarity index 58% rename from hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/AvroCompiledSchemaRepositoryFactory.java rename to hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/AvroCompiledSchemaRepositoryFactory.java index 09ecb65284..d876121a9b 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/AvroCompiledSchemaRepositoryFactory.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/AvroCompiledSchemaRepositoryFactory.java @@ -1,28 +1,32 @@ -package pl.allegro.tech.hermes.infrastructure.schema; +package pl.allegro.tech.hermes.common.schema; import org.apache.avro.Schema; import org.glassfish.hk2.api.Factory; import pl.allegro.tech.hermes.common.config.ConfigFactory; import pl.allegro.tech.hermes.common.config.Configs; -import pl.allegro.tech.hermes.domain.topic.schema.*; +import pl.allegro.tech.hermes.schema.CachedCompiledSchemaRepository; +import pl.allegro.tech.hermes.schema.CompiledSchemaRepository; +import pl.allegro.tech.hermes.schema.DirectCompiledSchemaRepository; +import pl.allegro.tech.hermes.schema.SchemaCompilersFactory; +import pl.allegro.tech.hermes.schema.SchemaSourceClient; import javax.inject.Inject; public class AvroCompiledSchemaRepositoryFactory implements Factory> { - private final SchemaSourceProvider schemaSourceProvider; + private final SchemaSourceClient schemaSourceClient; private final ConfigFactory configFactory; @Inject - public AvroCompiledSchemaRepositoryFactory(SchemaSourceProvider schemaSourceProvider, ConfigFactory configFactory) { - this.schemaSourceProvider = schemaSourceProvider; + public AvroCompiledSchemaRepositoryFactory(SchemaSourceClient schemaSourceClient, ConfigFactory configFactory) { + this.schemaSourceClient = schemaSourceClient; this.configFactory = configFactory; } @Override public CompiledSchemaRepository provide() { return new CachedCompiledSchemaRepository<>( - new DirectCompiledSchemaRepository<>(schemaSourceProvider, SchemaCompilersFactory.avroSchemaCompiler()), + new DirectCompiledSchemaRepository<>(schemaSourceClient, SchemaCompilersFactory.avroSchemaCompiler()), configFactory.getIntProperty(Configs.SCHEMA_CACHE_COMPILED_MAXIMUM_SIZE), configFactory.getIntProperty(Configs.SCHEMA_CACHE_COMPILED_EXPIRE_AFTER_ACCESS_MINUTES)); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryFactory.java new file mode 100644 index 0000000000..f7139ebb35 --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryFactory.java @@ -0,0 +1,33 @@ +package pl.allegro.tech.hermes.common.schema; + +import org.apache.avro.Schema; +import org.glassfish.hk2.api.Factory; +import pl.allegro.tech.hermes.schema.CompiledSchemaRepository; +import pl.allegro.tech.hermes.schema.SchemaRepository; +import pl.allegro.tech.hermes.schema.SchemaVersionsRepository; + +import javax.inject.Inject; + +public class SchemaRepositoryFactory implements Factory { + + private final SchemaVersionsRepository schemaVersionsRepository; + + private final CompiledSchemaRepository compiledAvroSchemaRepository; + + @Inject + public SchemaRepositoryFactory(SchemaVersionsRepository schemaVersionsRepository, + CompiledSchemaRepository compiledAvroSchemaRepository) { + this.schemaVersionsRepository = schemaVersionsRepository; + this.compiledAvroSchemaRepository = compiledAvroSchemaRepository; + } + + @Override + public SchemaRepository provide() { + return new SchemaRepository(schemaVersionsRepository, compiledAvroSchemaRepository); + } + + @Override + public void dispose(SchemaRepository instance) { + + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryType.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryType.java new file mode 100644 index 0000000000..3cb7a80d5d --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaRepositoryType.java @@ -0,0 +1,5 @@ +package pl.allegro.tech.hermes.common.schema; + +public enum SchemaRepositoryType { + SCHEMA_REPO, SCHEMA_REGISTRY +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaSourceClientFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaSourceClientFactory.java new file mode 100644 index 0000000000..04f1e40c0c --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaSourceClientFactory.java @@ -0,0 +1,53 @@ +package pl.allegro.tech.hermes.common.schema; + +import org.glassfish.hk2.api.Factory; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import pl.allegro.tech.hermes.common.config.ConfigFactory; +import pl.allegro.tech.hermes.common.config.Configs; +import pl.allegro.tech.hermes.schema.confluent.SchemaRegistrySchemaSourceClient; +import pl.allegro.tech.hermes.schema.schemarepo.SchemaRepoSchemaSourceClient; +import pl.allegro.tech.hermes.schema.SchemaSourceClient; + +import javax.inject.Inject; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import java.net.URI; + +public class SchemaSourceClientFactory implements Factory { + + private final ConfigFactory configFactory; + + @Inject + public SchemaSourceClientFactory(ConfigFactory configFactory) { + this.configFactory = configFactory; + } + + @Override + public SchemaSourceClient provide() { + + int httpReadTimeoutMs = configFactory.getIntProperty(Configs.SCHEMA_REPOSITORY_HTTP_READ_TIMEOUT_MS); + int httpConnectTimeoutMs = configFactory.getIntProperty(Configs.SCHEMA_REPOSITORY_HTTP_CONNECT_TIMEOUT_MS); + + ClientConfig config = new ClientConfig() + .property(ClientProperties.READ_TIMEOUT, httpReadTimeoutMs) + .property(ClientProperties.CONNECT_TIMEOUT, httpConnectTimeoutMs); + + String schemaRepositoryType = configFactory.getStringProperty(Configs.SCHEMA_REPOSITORY_TYPE).toUpperCase(); + Client client = ClientBuilder.newClient(config); + URI schemaRepositoryServerUri = URI.create(configFactory.getStringProperty(Configs.SCHEMA_REPOSITORY_SERVER_URL)); + switch (SchemaRepositoryType.valueOf(schemaRepositoryType)) { + case SCHEMA_REPO: + return new SchemaRepoSchemaSourceClient(client, schemaRepositoryServerUri); + case SCHEMA_REGISTRY: + return new SchemaRegistrySchemaSourceClient(client, schemaRepositoryServerUri); + default: + throw new IllegalStateException("Unknown schema repository type " + schemaRepositoryType); + } + } + + @Override + public void dispose(SchemaSourceClient instance) { + + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java new file mode 100644 index 0000000000..0c2a6b5875 --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java @@ -0,0 +1,48 @@ +package pl.allegro.tech.hermes.common.schema; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.glassfish.hk2.api.Factory; +import pl.allegro.tech.hermes.common.config.ConfigFactory; +import pl.allegro.tech.hermes.common.config.Configs; +import pl.allegro.tech.hermes.schema.CachedSchemaVersionsRepository; +import pl.allegro.tech.hermes.schema.SchemaSourceClient; +import pl.allegro.tech.hermes.schema.SchemaVersionsRepository; +import pl.allegro.tech.hermes.schema.DirectSchemaVersionsRepository; + +import javax.inject.Inject; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class SchemaVersionsRepositoryFactory implements Factory { + + private final SchemaSourceClient schemaSourceClient; + private final ConfigFactory configFactory; + + @Inject + public SchemaVersionsRepositoryFactory(SchemaSourceClient schemaSourceClient, ConfigFactory configFactory) { + this.schemaSourceClient = schemaSourceClient; + this.configFactory = configFactory; + } + + @Override + public SchemaVersionsRepository provide() { + if (configFactory.getBooleanProperty(Configs.SCHEMA_CACHE_ENABLED)) { + return new CachedSchemaVersionsRepository(schemaSourceClient, + getVersionsReloader(), + configFactory.getIntProperty(Configs.SCHEMA_CACHE_REFRESH_AFTER_WRITE_MINUTES), + configFactory.getIntProperty(Configs.SCHEMA_CACHE_EXPIRE_AFTER_WRITE_MINUTES)); + } + return new DirectSchemaVersionsRepository(schemaSourceClient); + } + + private ExecutorService getVersionsReloader() { + return Executors.newFixedThreadPool( + configFactory.getIntProperty(Configs.SCHEMA_CACHE_RELOAD_THREAD_POOL_SIZE), + new ThreadFactoryBuilder().setNameFormat("schema-source-reloader-%d").build()); + } + + @Override + public void dispose(SchemaVersionsRepository instance) { + + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotCompileSchemaException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotCompileSchemaException.java deleted file mode 100644 index 8de6814328..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotCompileSchemaException.java +++ /dev/null @@ -1,17 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import pl.allegro.tech.hermes.api.ErrorCode; -import pl.allegro.tech.hermes.common.exception.HermesException; - -public class CouldNotCompileSchemaException extends HermesException { - - public CouldNotCompileSchemaException(Throwable cause) { - super(cause); - } - - @Override - public ErrorCode getCode() { - return ErrorCode.OTHER; - } - -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotLoadSchemaException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotLoadSchemaException.java deleted file mode 100644 index 5739e5a493..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/CouldNotLoadSchemaException.java +++ /dev/null @@ -1,20 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import pl.allegro.tech.hermes.api.ErrorCode; -import pl.allegro.tech.hermes.common.exception.HermesException; - -public class CouldNotLoadSchemaException extends HermesException { - - public CouldNotLoadSchemaException(Throwable cause) { - super(cause); - } - - public CouldNotLoadSchemaException(String message, Throwable cause) { - super(message, cause); - } - - @Override - public ErrorCode getCode() { - return ErrorCode.SCHEMA_COULD_NOT_BE_LOADED; - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaCompilersFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaCompilersFactory.java deleted file mode 100644 index 1be5ccec0e..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaCompilersFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.fge.jsonschema.core.exceptions.ProcessingException; -import com.github.fge.jsonschema.main.JsonSchema; -import com.github.fge.jsonschema.main.JsonSchemaFactory; -import org.apache.avro.Schema; - -import java.io.IOException; - -public interface SchemaCompilersFactory { - - static SchemaCompiler avroSchemaCompiler() { - return source -> new Schema.Parser().parse(source.value()); - } - - static SchemaCompiler jsonSchemaCompiler(ObjectMapper objectMapper) { - JsonSchemaFactory jsonSchemaFactory = JsonSchemaFactory.byDefault(); - return source -> { - try { - return jsonSchemaFactory.getJsonSchema(objectMapper.readTree(source.value())); - } catch (IOException | ProcessingException e) { - throw new CouldNotCompileSchemaException(e); - } - }; - } - -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProvider.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProvider.java deleted file mode 100644 index a884c8d5c3..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import pl.allegro.tech.hermes.api.SchemaSource; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.infrastructure.schema.repo.SchemaRepoClient; - -import javax.inject.Inject; -import java.util.List; -import java.util.Optional; - -public class SchemaRepoSchemaSourceProvider implements SchemaSourceProvider { - - protected final SchemaRepoClient schemaRepoClient; - - @Inject - public SchemaRepoSchemaSourceProvider(SchemaRepoClient schemaRepoClient) { - this.schemaRepoClient = schemaRepoClient; - } - - @Override - public Optional get(Topic topic) { - return schemaRepoClient.getLatestSchema(topic.getQualifiedName()).map(SchemaSource::valueOf); - } - - @Override - public Optional get(Topic topic, SchemaVersion version) { - return schemaRepoClient.getSchema(topic.getQualifiedName(), version).map(SchemaSource::valueOf); - } - - @Override - public List versions(Topic topic) { - return schemaRepoClient.getSchemaVersions(topic.getQualifiedName()); - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepository.java deleted file mode 100644 index 31a9cfaa7b..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepository.java +++ /dev/null @@ -1,59 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import com.github.fge.jsonschema.main.JsonSchema; -import org.apache.avro.Schema; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.message.AvroSchemaSource; - -import javax.inject.Inject; -import java.util.List; -import java.util.Optional; - -public class SchemaRepository implements AvroSchemaSource { - - private final SchemaVersionsRepository schemaVersionsRepository; - private final CompiledSchemaRepository avroSchemaRepository; - private final CompiledSchemaRepository jsonSchemaRepository; - - @Inject - public SchemaRepository(SchemaVersionsRepository schemaVersionsRepository, - CompiledSchemaRepository avroSchemaRepository, - CompiledSchemaRepository jsonSchemaRepository) { - this.schemaVersionsRepository = schemaVersionsRepository; - this.avroSchemaRepository = avroSchemaRepository; - this.jsonSchemaRepository = jsonSchemaRepository; - } - - public CompiledSchema getAvroSchema(Topic topic) { - return getSchema(topic, avroSchemaRepository); - } - - public CompiledSchema getAvroSchema(Topic topic, SchemaVersion version) { - return getSchema(topic, version, avroSchemaRepository); - } - - public CompiledSchema getJsonSchema(Topic topic) { - return getSchema(topic, jsonSchemaRepository); - } - - public CompiledSchema getJsonSchema(Topic topic, SchemaVersion version) { - return getSchema(topic, version, jsonSchemaRepository); - } - - private CompiledSchema getSchema(Topic topic, CompiledSchemaRepository compiledSchemaRepository) { - SchemaVersion latestVersion = schemaVersionsRepository.latestSchemaVersion(topic).orElseThrow(() -> new SchemaMissingException(topic)); - return getSchema(topic, latestVersion, compiledSchemaRepository); - } - - private CompiledSchema getSchema(Topic topic, SchemaVersion version, CompiledSchemaRepository compiledSchemaRepository) { - if (!schemaVersionsRepository.schemaVersionExists(topic, version)) { - throw new UnknownSchemaVersionException(topic, version); - } - return compiledSchemaRepository.getSchema(topic, version); - } - - @Override - public List versions(Topic topic, boolean online) { - return schemaVersionsRepository.versions(topic, online); - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepositoryType.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepositoryType.java deleted file mode 100644 index 5c3c56bed5..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepositoryType.java +++ /dev/null @@ -1,5 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -public enum SchemaRepositoryType { - SCHEMA_REPO, ZOOKEEPER -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaSourceProvider.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaSourceProvider.java deleted file mode 100644 index 863faef22d..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaSourceProvider.java +++ /dev/null @@ -1,23 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import pl.allegro.tech.hermes.api.SchemaSource; -import pl.allegro.tech.hermes.api.Topic; - -import java.util.List; -import java.util.Optional; - -public interface SchemaSourceProvider { - - Optional get(Topic topic); - - default Optional get(Topic topic, SchemaVersion version) { - throw new UnsupportedOperationException("schema versioning not supported"); - } - - /** - * @return a sorted list of versions in descending order. - */ - default List versions(Topic topic) { - throw new UnsupportedOperationException("schema versioning not supported"); - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/TopicWithSchema.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/TopicWithSchema.java deleted file mode 100644 index 30f9bb0a5c..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/TopicWithSchema.java +++ /dev/null @@ -1,22 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import pl.allegro.tech.hermes.api.Topic; - -public class TopicWithSchema { - - private final Topic topic; - private final T schema; - - TopicWithSchema(Topic topic, T schema) { - this.topic = topic; - this.schema = schema; - } - - public Topic getTopic() { - return topic; - } - - public T getSchema() { - return schema; - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/UnknownSchemaVersionException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/UnknownSchemaVersionException.java deleted file mode 100644 index a708d00d56..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/UnknownSchemaVersionException.java +++ /dev/null @@ -1,17 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import pl.allegro.tech.hermes.api.ErrorCode; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.exception.HermesException; - -public class UnknownSchemaVersionException extends HermesException { - - public UnknownSchemaVersionException(Topic topic, SchemaVersion version) { - super("Unknown schema version " + version.value() + " for topic " + topic.getQualifiedName()); - } - - @Override - public ErrorCode getCode() { - return ErrorCode.SCHEMA_COULD_NOT_BE_LOADED; - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProvider.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProvider.java deleted file mode 100644 index 338f92cdfe..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.zookeeper.KeeperException; -import pl.allegro.tech.hermes.api.SchemaSource; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.exception.InternalProcessingException; -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; - -import java.nio.charset.Charset; -import java.util.Optional; - -public class ZookeeperSchemaSourceProvider implements SchemaSourceProvider { - - protected static final String SCHEMA_SUFFIX = "schema"; - - protected final CuratorFramework curatorFramework; - protected final ZookeeperPaths zkPaths; - - public ZookeeperSchemaSourceProvider(CuratorFramework curatorFramework, ZookeeperPaths zkPaths) { - this.curatorFramework = curatorFramework; - this.zkPaths = zkPaths; - } - - @Override - public Optional get(Topic topic) { - try { - byte[] schemaBytes = curatorFramework.getData().forPath(zkPaths.topicPath(topic.getName(), SCHEMA_SUFFIX)); - return Optional.of(SchemaSource.valueOf(new String(schemaBytes, Charset.defaultCharset()))); - } catch (KeeperException.NoNodeException ex) { - return Optional.empty(); - } catch (Exception ex) { - throw new InternalProcessingException("Could not load from zookeeper schema for topic " + topic.getQualifiedName(), ex); - } - } - -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/JsonCompiledSchemaRepositoryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/JsonCompiledSchemaRepositoryFactory.java deleted file mode 100644 index 13f1b1446b..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/JsonCompiledSchemaRepositoryFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -package pl.allegro.tech.hermes.infrastructure.schema; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.fge.jsonschema.main.JsonSchema; -import org.glassfish.hk2.api.Factory; -import pl.allegro.tech.hermes.common.config.ConfigFactory; -import pl.allegro.tech.hermes.common.config.Configs; -import pl.allegro.tech.hermes.domain.topic.schema.*; - -import javax.inject.Inject; - -public class JsonCompiledSchemaRepositoryFactory implements Factory> { - - private final SchemaSourceProvider schemaSourceProvider; - private final ConfigFactory configFactory; - private final ObjectMapper objectMapper; - - @Inject - public JsonCompiledSchemaRepositoryFactory(ObjectMapper objectMapper, SchemaSourceProvider schemaSourceProvider, ConfigFactory configFactory) { - this.objectMapper = objectMapper; - this.schemaSourceProvider = schemaSourceProvider; - this.configFactory = configFactory; - } - - @Override - public CompiledSchemaRepository provide() { - return new CachedCompiledSchemaRepository<>( - new DirectCompiledSchemaRepository<>(schemaSourceProvider, SchemaCompilersFactory.jsonSchemaCompiler(objectMapper)), - configFactory.getIntProperty(Configs.SCHEMA_CACHE_COMPILED_MAXIMUM_SIZE), - configFactory.getIntProperty(Configs.SCHEMA_CACHE_COMPILED_EXPIRE_AFTER_ACCESS_MINUTES)); - } - - @Override - public void dispose(CompiledSchemaRepository instance) { - - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaSourceProviderFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaSourceProviderFactory.java deleted file mode 100644 index 2dd414d765..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaSourceProviderFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -package pl.allegro.tech.hermes.infrastructure.schema; - -import org.apache.curator.framework.CuratorFramework; -import org.glassfish.hk2.api.Factory; -import pl.allegro.tech.hermes.common.config.ConfigFactory; -import pl.allegro.tech.hermes.common.config.Configs; -import pl.allegro.tech.hermes.common.di.CuratorType; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepoSchemaSourceProvider; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepositoryType; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaSourceProvider; -import pl.allegro.tech.hermes.domain.topic.schema.ZookeeperSchemaSourceProvider; -import pl.allegro.tech.hermes.infrastructure.schema.repo.SchemaRepoClient; -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; - -import javax.inject.Inject; -import javax.inject.Named; - -public class SchemaSourceProviderFactory implements Factory { - - private final ConfigFactory configFactory; - private final SchemaRepoClient schemaRepoClient; - private final CuratorFramework curatorFramework; - private final ZookeeperPaths zookeeperPaths; - - @Inject - public SchemaSourceProviderFactory(ConfigFactory configFactory, - SchemaRepoClient schemaRepoClient, - @Named(CuratorType.HERMES) CuratorFramework curatorFramework, - ZookeeperPaths zookeeperPaths) { - this.configFactory = configFactory; - this.schemaRepoClient = schemaRepoClient; - this.curatorFramework = curatorFramework; - this.zookeeperPaths = zookeeperPaths; - } - - @Override - public SchemaSourceProvider provide() { - String schemaRepositoryType = configFactory.getStringProperty(Configs.SCHEMA_REPOSITORY_TYPE).toUpperCase(); - switch (SchemaRepositoryType.valueOf(schemaRepositoryType)) { - case SCHEMA_REPO: - return new SchemaRepoSchemaSourceProvider(schemaRepoClient); - case ZOOKEEPER: - return new ZookeeperSchemaSourceProvider(curatorFramework, zookeeperPaths); - default: - throw new IllegalStateException("Unknown schema repository type " + schemaRepositoryType); - } - } - - @Override - public void dispose(SchemaSourceProvider instance) { - } - -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaVersionsRepositoryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaVersionsRepositoryFactory.java deleted file mode 100644 index a590a4df39..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/SchemaVersionsRepositoryFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -package pl.allegro.tech.hermes.infrastructure.schema; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.glassfish.hk2.api.Factory; -import pl.allegro.tech.hermes.common.config.ConfigFactory; -import pl.allegro.tech.hermes.domain.topic.schema.CachedSchemaVersionsRepository; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaSourceProvider; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersionsRepository; -import pl.allegro.tech.hermes.domain.topic.schema.SimpleSchemaVersionsRepository; - -import javax.inject.Inject; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static pl.allegro.tech.hermes.common.config.Configs.SCHEMA_CACHE_ENABLED; -import static pl.allegro.tech.hermes.common.config.Configs.SCHEMA_CACHE_EXPIRE_AFTER_WRITE_MINUTES; -import static pl.allegro.tech.hermes.common.config.Configs.SCHEMA_CACHE_REFRESH_AFTER_WRITE_MINUTES; -import static pl.allegro.tech.hermes.common.config.Configs.SCHEMA_CACHE_RELOAD_THREAD_POOL_SIZE; - -public class SchemaVersionsRepositoryFactory implements Factory { - - private final SchemaSourceProvider sourceProvider; - private final ConfigFactory configFactory; - - @Inject - public SchemaVersionsRepositoryFactory(SchemaSourceProvider sourceProvider, ConfigFactory configFactory) { - this.sourceProvider = sourceProvider; - this.configFactory = configFactory; - } - - @Override - public SchemaVersionsRepository provide() { - if (configFactory.getBooleanProperty(SCHEMA_CACHE_ENABLED)) { - return new CachedSchemaVersionsRepository(sourceProvider, - getVersionsReloader(), - configFactory.getIntProperty(SCHEMA_CACHE_REFRESH_AFTER_WRITE_MINUTES), - configFactory.getIntProperty(SCHEMA_CACHE_EXPIRE_AFTER_WRITE_MINUTES)); - } - return new SimpleSchemaVersionsRepository(sourceProvider); - } - - private ExecutorService getVersionsReloader() { - return Executors.newFixedThreadPool( - configFactory.getIntProperty(SCHEMA_CACHE_RELOAD_THREAD_POOL_SIZE), - new ThreadFactoryBuilder().setNameFormat("schema-source-reloader-%d").build()); - } - - @Override - public void dispose(SchemaVersionsRepository instance) { - - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClient.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClient.java deleted file mode 100644 index 2945e4c04a..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClient.java +++ /dev/null @@ -1,24 +0,0 @@ -package pl.allegro.tech.hermes.infrastructure.schema.repo; - -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion; - -import java.util.List; -import java.util.Optional; - -public interface SchemaRepoClient { - - void registerSubject(String subject); - - boolean isSubjectRegistered(String subject); - - void registerSchema(String subject, String schema); - - Optional getLatestSchema(String subject); - - Optional getSchema(String subject, SchemaVersion version); - - /** - * @return a sorted list of versions in descending order. - */ - List getSchemaVersions(String subject); -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClientFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClientFactory.java deleted file mode 100644 index 2c790eeae1..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/schema/repo/SchemaRepoClientFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -package pl.allegro.tech.hermes.infrastructure.schema.repo; - -import org.glassfish.hk2.api.Factory; -import org.glassfish.jersey.client.ClientConfig; -import org.glassfish.jersey.client.ClientProperties; -import pl.allegro.tech.hermes.common.config.ConfigFactory; -import pl.allegro.tech.hermes.common.config.Configs; - -import javax.inject.Inject; -import javax.ws.rs.client.ClientBuilder; -import java.net.URI; - -public class SchemaRepoClientFactory implements Factory { - - private final ConfigFactory configFactory; - - @Inject - public SchemaRepoClientFactory(ConfigFactory configFactory) { - this.configFactory = configFactory; - } - - @Override - public SchemaRepoClient provide() { - ClientConfig config = new ClientConfig() - .property(ClientProperties.READ_TIMEOUT, configFactory.getIntProperty(Configs.SCHEMA_REPOSITORY_HTTP_READ_TIMEOUT_MS)) - .property(ClientProperties.CONNECT_TIMEOUT, configFactory.getIntProperty(Configs.SCHEMA_REPOSITORY_HTTP_CONNECT_TIMEOUT_MS)); - - return new JerseySchemaRepoClient(ClientBuilder.newClient(config), URI.create(configFactory.getStringProperty(Configs.SCHEMA_REPOSITORY_SERVER_URL))); - } - - @Override - public void dispose(SchemaRepoClient instance) { - - } -} diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy index 84ebe47feb..13050d0cee 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy @@ -88,7 +88,7 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest { def "should load topic details"() { given: - repository.createTopic(topic(GROUP, 'details').withDescription('description').withValidation(true).build()) + repository.createTopic(topic(GROUP, 'details').withDescription('description').build()) wait.untilTopicCreated(GROUP, 'details') when: @@ -96,7 +96,6 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest { then: retrievedTopic.description == 'description' - retrievedTopic.validationEnabled } def "should update topic"() { diff --git a/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapperTest.java b/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapperTest.java index 5a5ec8180e..525cb94b90 100644 --- a/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapperTest.java +++ b/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/wrapper/MessageContentWrapperTest.java @@ -5,18 +5,17 @@ import org.apache.commons.collections4.map.HashedMap; import org.junit.Test; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.message.AvroSchemaSource; -import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchema; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaMissingException; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion; +import pl.allegro.tech.hermes.schema.CompiledSchema; +import pl.allegro.tech.hermes.schema.CompiledSchemaRepository; +import pl.allegro.tech.hermes.schema.SchemaRepository; +import pl.allegro.tech.hermes.schema.SchemaVersion; +import pl.allegro.tech.hermes.schema.SchemaVersionsRepository; import pl.allegro.tech.hermes.test.helper.avro.AvroUser; import pl.allegro.tech.hermes.test.helper.builder.TopicBuilder; import java.io.IOException; import java.time.Clock; -import java.util.List; -import static com.googlecode.catchexception.CatchException.catchException; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import static pl.allegro.tech.hermes.test.helper.avro.AvroUserSchemaLoader.load; @@ -25,55 +24,47 @@ public class MessageContentWrapperTest { private final MessageContentWrapper messageContentWrapper = new MessageContentWrapper( new JsonMessageContentWrapper("message", "metadata", new ObjectMapper()), - new AvroMessageContentWrapper(Clock.systemDefaultZone()) - ); + new AvroMessageContentWrapper(Clock.systemDefaultZone()), schemaRepository); static Topic topic = TopicBuilder.topic("group", "topic").build(); - static AvroSchemaSource schemaSource = new AvroUserSchemaSource(); - - static class AvroUserSchemaSource implements AvroSchemaSource { - CompiledSchema schema1 = new CompiledSchema<>(load("/schema/user.avsc"), SchemaVersion.valueOf(1)); - CompiledSchema schema2 = new CompiledSchema<>(load("/schema/user_v2.avsc"), SchemaVersion.valueOf(2)); - CompiledSchema schema3 = new CompiledSchema<>(load("/schema/user_v3.avsc"), SchemaVersion.valueOf(3)); - - @Override - public CompiledSchema getAvroSchema(Topic topic, SchemaVersion version) { - switch (version.value()) { - case 1: return schema1; - case 2: return schema2; - case 3: return schema3; - default: throw new RuntimeException("sry"); - } - } - - @Override - public List versions(Topic topic, boolean online) { - return online? - asList(schema3.getVersion(), schema2.getVersion(), schema1.getVersion()) - : asList(schema2.getVersion(), schema1.getVersion()); - } - } + + static CompiledSchema schema1 = new CompiledSchema<>(load("/schema/user.avsc"), SchemaVersion.valueOf(1)); + static CompiledSchema schema2 = new CompiledSchema<>(load("/schema/user_v2.avsc"), SchemaVersion.valueOf(2)); + static CompiledSchema schema3 = new CompiledSchema<>(load("/schema/user_v3.avsc"), SchemaVersion.valueOf(3)); + + static SchemaVersionsRepository schemaVersionsRepository = (topic, online) -> + online? asList(schema3.getVersion(), schema2.getVersion(), schema1.getVersion()) + : asList(schema2.getVersion(), schema1.getVersion()); + + static CompiledSchemaRepository compiledSchemaRepository = (topic, version) -> { + switch (version.value()) { + case 1: return schema1; + case 2: return schema2; + case 3: return schema3; + default: throw new RuntimeException("sry");} + }; + static SchemaRepository schemaRepository = new SchemaRepository(schemaVersionsRepository, compiledSchemaRepository); @Test public void shouldUnwrapMessageUsingEverySchemaAvailable() throws IOException { // forcing offline latest - shouldUnwrapMessage(SchemaVersion.valueOf(2)); + shouldUnwrapMessageWrappedWithSchemaAtVersion(SchemaVersion.valueOf(2)); // forcing fallback - shouldUnwrapMessage(SchemaVersion.valueOf(1)); + shouldUnwrapMessageWrappedWithSchemaAtVersion(SchemaVersion.valueOf(1)); // forcing online check - shouldUnwrapMessage(SchemaVersion.valueOf(3)); + shouldUnwrapMessageWrappedWithSchemaAtVersion(SchemaVersion.valueOf(3)); } - public void shouldUnwrapMessage(SchemaVersion version) throws IOException { + public void shouldUnwrapMessageWrappedWithSchemaAtVersion(SchemaVersion version) throws IOException { // given - CompiledSchema schema = schemaSource.getAvroSchema(topic, version); + CompiledSchema schema = schemaRepository.getKnownAvroSchemaVersion(topic, version); AvroUser user = new AvroUser(schema, "Bob", 15, "blue"); byte[] wrapped = messageContentWrapper.wrapAvro(user.asBytes(), "uniqueId", 1234, topic, schema, new HashedMap<>()); // when - UnwrappedMessageContent unwrappedMessageContent = messageContentWrapper.unwrapAvro(wrapped, topic, schemaSource); + UnwrappedMessageContent unwrappedMessageContent = messageContentWrapper.unwrapAvro(wrapped, topic); // then assertThat(unwrappedMessageContent.getContent()).contains(user.asBytes()); @@ -88,6 +79,6 @@ public void shouldThrowExceptionWhenMessageCouldNotBeUnwrappedByAnySchema() thro byte[] doesNotMatchAnySchema = "{}".getBytes(); // when - messageContentWrapper.unwrapAvro(doesNotMatchAnySchema, topic, schemaSource); + messageContentWrapper.unwrapAvro(doesNotMatchAnySchema, topic); } } \ No newline at end of file diff --git a/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwareSerDeTest.java b/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwareSerDeTest.java similarity index 91% rename from hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwareSerDeTest.java rename to hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwareSerDeTest.java index 73e0ea97a5..3dbc261872 100644 --- a/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/serialization/SchemaAwareSerDeTest.java +++ b/hermes-common/src/test/java/pl/allegro/tech/hermes/common/message/wrapper/SchemaAwareSerDeTest.java @@ -1,7 +1,7 @@ -package pl.allegro.tech.hermes.common.message.serialization; +package pl.allegro.tech.hermes.common.message.wrapper; import org.testng.annotations.Test; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion; +import pl.allegro.tech.hermes.schema.SchemaVersion; import pl.allegro.tech.hermes.test.helper.avro.AvroUser; import java.io.IOException; diff --git a/hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProviderTest.java b/hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProviderTest.java deleted file mode 100644 index 4ca4d639fa..0000000000 --- a/hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/SchemaRepoSchemaSourceProviderTest.java +++ /dev/null @@ -1,68 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.client.UrlMatchingStrategy; -import com.github.tomakehurst.wiremock.junit.WireMockRule; -import org.junit.Rule; -import org.junit.Test; -import pl.allegro.tech.hermes.api.SchemaSource; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.config.ConfigFactory; -import pl.allegro.tech.hermes.infrastructure.schema.repo.SchemaRepoClientFactory; - -import java.util.Optional; - -import static com.github.tomakehurst.wiremock.client.WireMock.*; -import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; -import static org.assertj.core.api.Assertions.assertThat; -import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic; - -public class SchemaRepoSchemaSourceProviderTest { - - private static final String ROOT_DIR = "/schema-repo/"; - - private static final SchemaRepoSchemaSourceProvider sourceProvider = new SchemaRepoSchemaSourceProvider(new SchemaRepoClientFactory(new ConfigFactory()).provide()); - - private static final Topic topic = topic("someGroup.someTopic").build(); - - @Rule - public final WireMockRule wireMockRule = new WireMockRule(wireMockConfig().port(8888)); - - @Test - public void shouldReturnEmptyOptionalOnMissingSchema() { - // given - wireMockRule.stubFor(get(topicLatestSchema()).willReturn(notFoundResponse())); - - // when - Optional source = sourceProvider.get(topic); - - // then - wireMockRule.verify(1, getRequestedFor(topicLatestSchema())); - assertThat(source).isEmpty(); - } - - @Test - public void shouldReturnSchemaSourceWhenAvailable() { - // given - wireMockRule.stubFor(get(topicLatestSchema()).willReturn(okResponse().withBody("0\tsomeSchema"))); - - // when - Optional source = sourceProvider.get(topic); - - // then - wireMockRule.verify(1, getRequestedFor(topicLatestSchema())); - assertThat(source).contains(SchemaSource.valueOf("someSchema")); - } - - private UrlMatchingStrategy topicLatestSchema() { - return urlEqualTo(ROOT_DIR + topic.getQualifiedName() + "/latest"); - } - - private ResponseDefinitionBuilder okResponse() { - return aResponse().withStatus(200); - } - - private ResponseDefinitionBuilder notFoundResponse() { - return aResponse().withStatus(404); - } -} \ No newline at end of file diff --git a/hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProviderTest.java b/hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProviderTest.java deleted file mode 100644 index b116d6bd76..0000000000 --- a/hermes-common/src/test/java/pl/allegro/tech/hermes/domain/topic/schema/ZookeeperSchemaSourceProviderTest.java +++ /dev/null @@ -1,65 +0,0 @@ -package pl.allegro.tech.hermes.domain.topic.schema; - -import com.googlecode.catchexception.CatchException; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.junit.Test; -import pl.allegro.tech.hermes.api.SchemaSource; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.exception.InternalProcessingException; -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; -import pl.allegro.tech.hermes.test.helper.zookeeper.ZookeeperBaseTest; - -import java.util.Optional; - -import static com.googlecode.catchexception.CatchException.catchException; -import static org.assertj.core.api.Assertions.assertThat; -import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic; - -public class ZookeeperSchemaSourceProviderTest extends ZookeeperBaseTest { - - private static final String AVRO_SCHEMA = "{type:\"record\",name:\"schema\",namespace:\"com.avro\",fields:[{name:\"username\",type:\"string\",doc:\"Name of the user account\"}],\"doc:\":\"A basic schema\"}"; - - private ZookeeperSchemaSourceProvider provider = new ZookeeperSchemaSourceProvider(zookeeperClient, new ZookeeperPaths("/test")); - - @Test - public void shouldGetTopicSchemaFromZookeeper() throws Exception { - // given - Topic topic = topic("org.hermes.schema", "existing").build(); - zookeeperClient.create().creatingParentsIfNeeded().forPath("/test/groups/org.hermes.schema/topics/existing/schema", AVRO_SCHEMA.getBytes()); - - // when - Optional schemaSource = provider.get(topic); - - // then - assertThat(schemaSource).isPresent().contains(SchemaSource.valueOf(AVRO_SCHEMA)); - } - - @Test - public void shouldReturnEmptyWhenSchemaNotFound() { - // given - Topic topic = topic("org.hermes.schema", "notExisting").build(); - - // when - Optional schemaSource = provider.get(topic); - - // then - assertThat(schemaSource).isEmpty(); - } - - @Test - public void shouldThrowExceptionWhenCouldNotConnectToZK() { - // given - Topic topic = topic("org.hermes.schema", "broken").build(); - CuratorFramework notStartedClient = CuratorFrameworkFactory.newClient(zookeeperServer.getConnectString(), new ExponentialBackoffRetry(100, 1)); - ZookeeperSchemaSourceProvider brokenProvider = new ZookeeperSchemaSourceProvider(notStartedClient, new ZookeeperPaths("/test")); - - // when - catchException(brokenProvider).get(topic); - - // then - assertThat(CatchException.caughtException()).isInstanceOf(InternalProcessingException.class); - } - -} diff --git a/hermes-common/src/test/java/pl/allegro/tech/hermes/infrastructure/schema/repo/JerseySchemaRepoClientTest.java b/hermes-common/src/test/java/pl/allegro/tech/hermes/infrastructure/schema/repo/JerseySchemaRepoClientTest.java deleted file mode 100644 index 2f65044730..0000000000 --- a/hermes-common/src/test/java/pl/allegro/tech/hermes/infrastructure/schema/repo/JerseySchemaRepoClientTest.java +++ /dev/null @@ -1,226 +0,0 @@ -package pl.allegro.tech.hermes.infrastructure.schema.repo; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.client.UrlMatchingStrategy; -import com.github.tomakehurst.wiremock.junit.WireMockRule; -import org.junit.ClassRule; -import org.junit.Test; -import org.testng.annotations.BeforeTest; -import pl.allegro.tech.hermes.common.exception.InvalidSchemaException; -import pl.allegro.tech.hermes.common.exception.SchemaRepoException; -import pl.allegro.tech.hermes.domain.topic.schema.SchemaVersion; -import pl.allegro.tech.hermes.test.helper.util.Ports; - -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.core.MediaType; -import java.net.URI; -import java.util.List; -import java.util.Optional; - -import static com.github.tomakehurst.wiremock.client.WireMock.*; -import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; -import static com.googlecode.catchexception.CatchException.catchException; -import static com.googlecode.catchexception.CatchException.caughtException; -import static org.assertj.core.api.Assertions.assertThat; - -public class JerseySchemaRepoClientTest { - - private static final String ROOT_DIR = "/schema-repo/"; - private static final int PORT = Ports.nextAvailable(); - - private static final SchemaRepoClient client = new JerseySchemaRepoClient( - ClientBuilder.newClient(), - URI.create("http://localhost:" + PORT + ROOT_DIR) - ); - - @ClassRule - public static final WireMockRule wireMock = new WireMockRule( - wireMockConfig().port(PORT).usingFilesUnderClasspath("schema-repo-stub") - ); - - @BeforeTest - public void initialize() { - wireMock.resetRequests(); - wireMock.resetMappings(); - wireMock.resetScenarios(); - } - - @Test - public void shouldCheckIfSubjectIsRegistered() { - // given - wireMock.stubFor(get(subjectUrl("notRegistered.subject")).willReturn(notFoundResponse())); - - // when - boolean exists = client.isSubjectRegistered("notRegistered.subject"); - - // then - wireMock.verify(1, getRequestedFor(subjectUrl("notRegistered.subject"))); - assertThat(exists).isFalse(); - } - - @Test - public void shouldRegisterSubject() { - // given - wireMock.stubFor(put(subjectUrl("register.subject")).willReturn(okResponse())); - - // when - client.registerSubject("register.subject"); - - // then - verify(putRequestedFor(subjectUrl("register.subject")) - .withHeader("Content-type", equalTo(MediaType.APPLICATION_FORM_URLENCODED))); - } - - @Test - public void shouldThrowExceptionForUnsuccessfulSubjectRegistration() { - // given - wireMock.stubFor(put(subjectUrl("failed.subject")).willReturn(serverErrorResponse())); - - // when - catchException(client).registerSubject("failed.subject"); - - // then - assertThat((Throwable) caughtException()).isInstanceOf(SchemaRepoException.class); - } - - @Test - public void shouldReturnEmptyOptionalIfLatestSchemaDoesNotExist() { - // given - wireMock.stubFor(get(latestSchemaUrl("nolatest.subject")).willReturn(notFoundResponse())); - - // when - Optional schema = client.getLatestSchema("nolatest.subject"); - - // then - assertThat(schema.isPresent()).isFalse(); - } - - @Test - public void shouldReturnLatestSchemaIfExists() { - // given - wireMock.stubFor(get(latestSchemaUrl("latest.subject")).willReturn(okResponse().withBody("0\t{}"))); - - // when - Optional schema = client.getLatestSchema("latest.subject"); - - // then - assertThat(schema.get()).isEqualTo("{}"); - } - - @Test - public void shouldRegisterSchema() { - // given - wireMock.stubFor(put(registerSchemaUrl("registerSchema.subject")).willReturn(okResponse())); - - // when - client.registerSchema("registerSchema.subject", "{}"); - - // then - verify(1, putRequestedFor(registerSchemaUrl("registerSchema.subject")) - .withHeader("Content-type", equalTo(MediaType.TEXT_PLAIN)) - .withRequestBody(equalTo("{}")) - ); - } - - @Test - public void shouldThrowExceptionForInvalidSchemaRegistration() { - // given - wireMock.stubFor(put(registerSchemaUrl("invalidSchema.subject")).willReturn(forbiddenResponse("error"))); - - // when - catchException(client).registerSchema("invalidSchema.subject", "{}"); - - // then - assertThat((Throwable) caughtException()) - .isInstanceOf(InvalidSchemaException.class) - .hasMessageContaining("error"); - } - - @Test - public void shouldThrowSchemaRepoExceptionForSchemaRegistration() { - // given - wireMock.stubFor(put(registerSchemaUrl("repoException.subject")).willReturn(serverErrorResponse())); - - // when - catchException(client).registerSchema("repoException.subject", "{}"); - - // then - assertThat((Throwable) caughtException()).isInstanceOf(SchemaRepoException.class); - } - - @Test - public void shouldReturnSchemaVersions() { - // given - wireMock.stubFor(get(allSchemasUrl("allSchemas.subject")).willReturn( - okResponse().withBodyFile("all-schemas-response.json").withHeader("Content-Type", "application/json")) - ); - - // when - List versions = client.getSchemaVersions("allSchemas.subject"); - - // then - assertThat(versions).containsExactly( - SchemaVersion.valueOf(2), - SchemaVersion.valueOf(1), - SchemaVersion.valueOf(0) - ); - } - - @Test - public void shouldReturnEmptySchemaVersionsIfNoSchemasAreRegistered() { - // given - wireMock.stubFor(get(allSchemasUrl("noSchema.subject")).willReturn( - okResponse().withBody("").withHeader("Content-Type", "application/json")) - ); - - // when - List versions = client.getSchemaVersions("noSchema.subject"); - - // then - assertThat(versions).isEmpty(); - } - - @Test - public void shouldReturnEmptySchemaVersionsIfSubjectDoesntExist() { - // given - wireMock.stubFor(get(allSchemasUrl("noSubject.subject")).willReturn(notFoundResponse())); - - // when - List versions = client.getSchemaVersions("noSubject.subject"); - - // then - assertThat(versions).isEmpty(); - } - - private UrlMatchingStrategy subjectUrl(String name) { - return urlEqualTo(ROOT_DIR + name); - } - - private UrlMatchingStrategy latestSchemaUrl(String name) { - return urlEqualTo(ROOT_DIR + name + "/latest"); - } - - private UrlMatchingStrategy registerSchemaUrl(String name) { - return urlEqualTo(ROOT_DIR + name + "/register"); - } - - private UrlMatchingStrategy allSchemasUrl(String name) { - return urlEqualTo(ROOT_DIR + name + "/all"); - } - - private ResponseDefinitionBuilder okResponse() { - return aResponse().withStatus(200); - } - - private ResponseDefinitionBuilder serverErrorResponse() { - return aResponse().withStatus(500); - } - - private ResponseDefinitionBuilder notFoundResponse() { - return aResponse().withStatus(404); - } - - private ResponseDefinitionBuilder forbiddenResponse(String body) { - return aResponse().withStatus(403).withBody(body); - } -} \ No newline at end of file diff --git a/hermes-console/config.json.example b/hermes-console/config.json.example index a621344020..c72dd07277 100644 --- a/hermes-console/config.json.example +++ b/hermes-console/config.json.example @@ -46,7 +46,8 @@ "retentionTime": { "duration": 1 } - } + }, + "removeSchema": false }, "subscription": { "endpointAddressResolverMetadata": { diff --git a/hermes-console/static/js/console/topic/TopicController.js b/hermes-console/static/js/console/topic/TopicController.js index ecc3b429e1..4da21dc065 100644 --- a/hermes-console/static/js/console/topic/TopicController.js +++ b/hermes-console/static/js/console/topic/TopicController.js @@ -73,7 +73,7 @@ topics.controller('TopicController', ['TOPIC_CONFIG', 'TopicRepository', 'TopicM passwordHint: 'Password for group ' + groupName }).result.then(function (result) { passwordService.set(result.password); - topicRepository.remove($scope.topic.name).$promise + topicRepository.remove($scope.topic) .then(function () { toaster.pop('success', 'Success', 'Topic has been removed'); $location.path('/groups/' + groupName).replace(); diff --git a/hermes-console/static/js/console/topic/TopicRepository.js b/hermes-console/static/js/console/topic/TopicRepository.js index 57fcfeff7c..502055d1e3 100644 --- a/hermes-console/static/js/console/topic/TopicRepository.js +++ b/hermes-console/static/js/console/topic/TopicRepository.js @@ -1,7 +1,8 @@ var repository = angular.module('hermes.topic.repository', ['hermes.subscription.repository']); -repository.factory('TopicRepository', ['DiscoveryService', '$resource', '$location', 'SubscriptionRepository', 'SchemaRepository', - function (discovery, $resource, $location, subscriptionRepository, schemaRepository) { +repository.factory('TopicRepository', ['DiscoveryService', '$resource', '$location', 'SubscriptionRepository', + 'SchemaRepository', 'TOPIC_CONFIG', + function (discovery, $resource, $location, subscriptionRepository, schemaRepository, topicConfig) { var repository = $resource(discovery.resolve('/topics/:name'), null, {update: {method: 'PUT'}}); var previewRepository = $resource(discovery.resolve('/topics/:name/preview'), null); @@ -16,6 +17,15 @@ repository.factory('TopicRepository', ['DiscoveryService', '$resource', '$locati return promise; } + function wrapSchemaRemove(topic, promise) { + if (topicConfig.removeSchema && topic.contentType == 'AVRO') { + return promise.then(function () { + return schemaRepository.remove(topic.name).$promise; + }); + } + return promise; + } + function ngPromiseCleaner(key, value) { if (_.contains(["$promise", "$resolved"], key)) { return undefined; @@ -39,8 +49,8 @@ repository.factory('TopicRepository', ['DiscoveryService', '$resource', '$locati add: function (topic, schema) { return wrapSchemaSave(topic, schema, listing.save({}, topic).$promise); }, - remove: function (name) { - return repository.remove({name: name}); + remove: function (topic) { + return wrapSchemaRemove(topic, repository.delete({name: topic.name}).$promise); }, save: function (topic, schema) { return wrapSchemaSave(topic, schema, repository.update({name: topic.name}, topic).$promise); @@ -74,6 +84,9 @@ repository.factory('SchemaRepository', ['DiscoveryService', '$resource', }, save: function (name, schema) { return repository.save({name: name}, schema); + }, + remove: function(name) { + return repository.remove({name: name}); } }; }]); diff --git a/hermes-console/static/partials/modal/editTopic.html b/hermes-console/static/partials/modal/editTopic.html index e0d50468bb..61c4b24666 100644 --- a/hermes-console/static/partials/modal/editTopic.html +++ b/hermes-console/static/partials/modal/editTopic.html @@ -58,18 +58,12 @@