Skip to content

Commit

Permalink
Schema-registry integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Oct 7, 2016
1 parent 01b8148 commit 9f1e3f9
Show file tree
Hide file tree
Showing 148 changed files with 1,554 additions and 1,839 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ allprojects {
guava : '19.0',
jackson : '2.5.1',
jersey : '2.23.2',
jetty : '9.3.6.v20151106',
curator : '2.11.0',
wiremock : '1.58',
fongo : '1.6.1',
Expand Down
34 changes: 6 additions & 28 deletions hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ public class Topic {
@NotNull
private String description;

private boolean validationEnabled = false;

private boolean validationDryRunEnabled = false;

private boolean jsonToAvroDryRunEnabled = false;

@NotNull
Expand All @@ -44,14 +40,11 @@ public enum Ack {
private boolean schemaVersionAwareSerializationEnabled = false;

public Topic(TopicName name, String description, RetentionTime retentionTime,
boolean validationEnabled, boolean validationDryRunEnabled, boolean migratedFromJsonType,
Ack ack, boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled,
boolean schemaVersionAwareSerializationEnabled) {
boolean migratedFromJsonType, Ack ack, boolean trackingEnabled, ContentType contentType,
boolean jsonToAvroDryRunEnabled, boolean schemaVersionAwareSerializationEnabled) {
this.name = name;
this.description = description;
this.retentionTime = retentionTime;
this.validationEnabled = validationEnabled;
this.validationDryRunEnabled = validationDryRunEnabled;
this.ack = (ack == null ? Ack.LEADER : ack);
this.trackingEnabled = trackingEnabled;
this.migratedFromJsonType = migratedFromJsonType;
Expand All @@ -65,17 +58,14 @@ public Topic(
@JsonProperty("name") String qualifiedName,
@JsonProperty("description") String description,
@JsonProperty("retentionTime") RetentionTime retentionTime,
@JsonProperty("validation") boolean validationEnabled,
@JsonProperty("validationDryRun") boolean validationDryRunEnabled,
@JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled,
@JsonProperty("ack") Ack ack,
@JsonProperty("trackingEnabled") boolean trackingEnabled,
@JsonProperty("migratedFromJsonType") boolean migratedFromJsonType,
@JsonProperty("schemaVersionAwareSerializationEnabled") boolean schemaVersionAwareSerializationEnabled,
@JsonProperty("contentType") ContentType contentType) {
this(TopicName.fromQualifiedName(qualifiedName), description, retentionTime, validationEnabled,
validationDryRunEnabled, migratedFromJsonType, ack, trackingEnabled, contentType, jsonToAvroDryRunEnabled,
schemaVersionAwareSerializationEnabled);
this(TopicName.fromQualifiedName(qualifiedName), description, retentionTime, migratedFromJsonType, ack,
trackingEnabled, contentType, jsonToAvroDryRunEnabled, schemaVersionAwareSerializationEnabled);
}

public RetentionTime getRetentionTime() {
Expand All @@ -84,8 +74,8 @@ public RetentionTime getRetentionTime() {

@Override
public int hashCode() {
return Objects.hash(name, description, retentionTime, validationEnabled, validationDryRunEnabled,
migratedFromJsonType, trackingEnabled, ack, contentType, jsonToAvroDryRunEnabled, schemaVersionAwareSerializationEnabled);
return Objects.hash(name, description, retentionTime, migratedFromJsonType, trackingEnabled, ack, contentType,
jsonToAvroDryRunEnabled, schemaVersionAwareSerializationEnabled);
}

@Override
Expand All @@ -101,8 +91,6 @@ public boolean equals(Object obj) {
return Objects.equals(this.name, other.name)
&& Objects.equals(this.description, other.description)
&& Objects.equals(this.retentionTime, other.retentionTime)
&& Objects.equals(this.isValidationEnabled(), other.isValidationEnabled())
&& Objects.equals(this.validationDryRunEnabled, other.validationDryRunEnabled)
&& Objects.equals(this.jsonToAvroDryRunEnabled, other.jsonToAvroDryRunEnabled)
&& Objects.equals(this.trackingEnabled, other.trackingEnabled)
&& Objects.equals(this.migratedFromJsonType, other.migratedFromJsonType)
Expand Down Expand Up @@ -133,16 +121,6 @@ public void setRetentionTime(RetentionTime retentionTime) {
this.retentionTime = retentionTime;
}

@JsonProperty("validation")
public boolean isValidationEnabled() {
return validationEnabled || ContentType.AVRO == contentType;
}

@JsonProperty("validationDryRun")
public boolean isValidationDryRunEnabled() {
return validationDryRunEnabled;
}

@JsonProperty("jsonToAvroDryRun")
public boolean isJsonToAvroDryRunEnabled() {
return jsonToAvroDryRunEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public void shouldDeserializeTopic() throws Exception {
assertThat(topic.getName().getName()).isEqualTo("bar");
assertThat(topic.getName().getGroupName()).isEqualTo("foo");
assertThat(topic.getDescription()).isEqualTo("description");
assertThat(topic.isValidationEnabled()).isFalse();
assertThat(topic.isValidationDryRunEnabled()).isFalse();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public void shouldPatchNestedObjects() {
public void shouldNotResetPrimitiveFields() {
// given
Topic topic = topic("group.topic").withTrackingEnabled(true).build();
PatchData patch = patchData().set("validation", true).build();
PatchData patch = patchData().set("schemaVersionAwareSerializationEnabled", true).build();

// when
Topic patched = Patch.apply(topic, patch);

// then
assertThat(patched.isTrackingEnabled()).isTrue();
assertThat(patched.isValidationEnabled()).isTrue();
assertThat(patched.isSchemaVersionAwareSerializationEnabled()).isTrue();
}
}
5 changes: 1 addition & 4 deletions hermes-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ apply plugin: 'groovy'
dependencies {
compile project(':hermes-api')
compile project(':hermes-metrics')
compile project(':hermes-schema')

compile group: 'com.netflix.archaius', name: 'archaius-core', version: '0.6.0'

Expand All @@ -14,9 +15,6 @@ dependencies {
exclude module: 'slf4j-log4j12'
exclude module: 'log4j'
}
compile (group: 'com.github.fge', name: 'json-schema-validator', version: '2.2.6') {
exclude group: 'net.sf.jopt-simple'
}

compile group: 'org.glassfish.hk2', name: 'hk2-locator', version: '2.3.0-b01'
compile group: 'org.glassfish.jersey.core', name: 'jersey-client', version: versions.jersey
Expand All @@ -30,7 +28,6 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: versions.guava

compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: versions.jackson
compile group: 'io.fastjson', name: 'boon', version: '0.28'
compile group: 'org.apache.avro', name: 'avro', version: '1.7.7'

compile group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: '3.1.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ public enum Configs {
SCHEMA_CACHE_RELOAD_THREAD_POOL_SIZE("schema.cache.reload.thread.pool.size", 2),
SCHEMA_CACHE_ENABLED("schema.cache.enabled", true),
SCHEMA_CACHE_COMPILED_MAXIMUM_SIZE("schema.cache.compiled.maximum.size", 2000),
SCHEMA_REPOSITORY_TYPE("schema.repository.type", "zookeeper"),
SCHEMA_REPOSITORY_SERVER_URL("schema.repository.serverUrl", "http://localhost:8888/schema-repo/"),
SCHEMA_REPOSITORY_TYPE("schema.repository.type", "schema_registry"),
SCHEMA_REPOSITORY_SERVER_URL("schema.repository.serverUrl", "http://localhost:8888/"),
SCHEMA_REPOSITORY_HTTP_READ_TIMEOUT_MS("schema.repository.http.read.timeout.ms", 2000),
SCHEMA_REPOSITORY_HTTP_CONNECT_TIMEOUT_MS("schema.repository.http.connect.timeout.ms", 2000),

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package pl.allegro.tech.hermes.common.di;

import com.github.fge.jsonschema.main.JsonSchema;
import com.yammer.metrics.core.HealthCheckRegistry;
import org.apache.avro.Schema;
import org.glassfish.hk2.api.TypeLiteral;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.broker.ZookeeperBrokerStorage;
import pl.allegro.tech.hermes.common.clock.ClockFactory;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.di.factories.BoonObjectMapperFactory;
import pl.allegro.tech.hermes.common.di.factories.CuratorClientFactory;
import pl.allegro.tech.hermes.common.di.factories.DistributedEphemeralCounterFactory;
import pl.allegro.tech.hermes.common.di.factories.GraphiteWebTargetFactory;
Expand All @@ -34,17 +32,15 @@
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.counter.CounterStorage;
import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterStorage;
import pl.allegro.tech.hermes.common.schema.AvroCompiledSchemaRepositoryFactory;
import pl.allegro.tech.hermes.common.schema.SchemaRepositoryFactory;
import pl.allegro.tech.hermes.common.schema.SchemaSourceClientFactory;
import pl.allegro.tech.hermes.common.schema.SchemaVersionsRepositoryFactory;
import pl.allegro.tech.hermes.common.util.HostnameResolver;
import pl.allegro.tech.hermes.common.util.InetAddressHostnameResolver;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchemaRepository;
import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepository;
import pl.allegro.tech.hermes.infrastructure.schema.AvroCompiledSchemaRepositoryFactory;
import pl.allegro.tech.hermes.infrastructure.schema.JsonCompiledSchemaRepositoryFactory;
import pl.allegro.tech.hermes.infrastructure.schema.SchemaSourceProviderFactory;
import pl.allegro.tech.hermes.infrastructure.schema.SchemaVersionsRepositoryFactory;
import pl.allegro.tech.hermes.infrastructure.schema.repo.SchemaRepoClientFactory;
import pl.allegro.tech.hermes.infrastructure.zookeeper.notifications.ZookeeperInternalNotificationBus;
import pl.allegro.tech.hermes.schema.CompiledSchemaRepository;

import javax.inject.Singleton;
import java.time.Clock;
Expand All @@ -58,12 +54,10 @@ protected void configure() {
bindFactory(ClockFactory.class).in(Singleton.class).to(Clock.class);
bind(ZookeeperBrokerStorage.class).to(BrokerStorage.class).in(Singleton.class);
bind(InetAddressHostnameResolver.class).in(Singleton.class).to(HostnameResolver.class);
bindSingletonFactory(SchemaSourceProviderFactory.class);
bindSingletonFactory(SchemaRepoClientFactory.class);
bindSingletonFactory(SchemaSourceClientFactory.class);
bindSingletonFactory(SchemaVersionsRepositoryFactory.class);
bindSingleton(SchemaRepository.class);
bindFactory(JsonCompiledSchemaRepositoryFactory.class).in(Singleton.class).to(new TypeLiteral<CompiledSchemaRepository<JsonSchema>>() {});
bindFactory(AvroCompiledSchemaRepositoryFactory.class).in(Singleton.class).to(new TypeLiteral<CompiledSchemaRepository<Schema>>() {});
bindSingletonFactory(SchemaRepositoryFactory.class);

bindSingleton(CuratorClientFactory.class);
bindSingleton(HermesMetrics.class);
Expand All @@ -77,7 +71,6 @@ protected void configure() {
bindSingletonFactory(KafkaCuratorClientFactory.class).named(CuratorType.KAFKA);
bindSingletonFactory(GraphiteWebTargetFactory.class);
bindSingletonFactory(ObjectMapperFactory.class);
bindSingletonFactory(BoonObjectMapperFactory.class);
bindSingletonFactory(SharedCounterFactory.class);
bindSingletonFactory(DistributedEphemeralCounterFactory.class);
bindSingletonFactory(MetricRegistryFactory.class);
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import pl.allegro.tech.hermes.common.util.MessageId;
import pl.allegro.tech.hermes.domain.topic.schema.CompiledSchema;
import pl.allegro.tech.hermes.schema.CompiledSchema;

import javax.inject.Inject;
import java.time.Clock;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package pl.allegro.tech.hermes.common.message.wrapper;

public class DeserializationException extends RuntimeException {

DeserializationException(String message) {
super(message);
}
}
Loading

0 comments on commit 9f1e3f9

Please sign in to comment.