Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add range processor to the mirror #67

Merged
merged 37 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
02bcfc4
Add range processor to the mirror
raminqaf Sep 1, 2022
f751e3c
Remove unused Avro files
raminqaf Sep 1, 2022
97d9af1
Update files
raminqaf Sep 1, 2022
ae07e3a
Parametized tests
raminqaf Sep 1, 2022
236d4ee
Add javadocs
raminqaf Sep 1, 2022
7f57803
Add more javadocs
raminqaf Sep 1, 2022
6f990dc
Update files
raminqaf Sep 1, 2022
7e4e7b3
Update files
raminqaf Sep 2, 2022
8b89784
Add more tests
raminqaf Sep 2, 2022
f935999
Update files
raminqaf Sep 2, 2022
4bd8f6c
Fix tests
raminqaf Sep 2, 2022
b71304b
Update files
raminqaf Sep 2, 2022
12c42ca
add padder to the constructor
raminqaf Sep 5, 2022
d9a0797
fix tests
raminqaf Sep 6, 2022
22c688e
Add log and java docs
raminqaf Sep 6, 2022
8ea0849
remove comments
raminqaf Sep 6, 2022
909e6d3
Merge branch 'master' into feature/mirror/add-range-processor
raminqaf Sep 6, 2022
38e0411
Refactor processor
raminqaf Sep 7, 2022
a328a42
Merge branch 'master' into feature/mirror/add-range-processor
raminqaf Sep 8, 2022
279477a
Merge branch 'master' into feature/mirror/add-range-processor
raminqaf Sep 8, 2022
104bf75
Add reviews
raminqaf Sep 8, 2022
2b44588
Merge branch 'feature/mirror/add-range-processor' of github.com:bakda…
raminqaf Sep 8, 2022
661c360
Add javadocs
raminqaf Sep 9, 2022
3236b7d
Add SuppressWarnings
raminqaf Sep 9, 2022
fd847da
Merge branch 'master' into feature/mirror/add-range-processor
raminqaf Sep 15, 2022
85d4859
Add reviews
raminqaf Sep 15, 2022
e521658
add more tests
raminqaf Sep 15, 2022
046d2f5
Add no operation range indexer
raminqaf Sep 16, 2022
ba9a7c9
fix condition
raminqaf Sep 16, 2022
aaec1a3
Merge branch 'master' into feature/mirror/add-range-processor
raminqaf Sep 21, 2022
af72dfa
remove point
raminqaf Sep 21, 2022
8266afb
Merge branch 'feature/mirror/add-range-processor' of github.com:bakda…
raminqaf Sep 21, 2022
57e1247
add reviews
raminqaf Sep 22, 2022
c96844f
Merge branch 'master' into feature/mirror/add-range-processor
raminqaf Sep 28, 2022
b006b1c
add reviews
raminqaf Sep 28, 2022
40cc70b
add reviews
raminqaf Sep 28, 2022
95f56c9
Add reviews
raminqaf Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
"DOUBLE",
"SCHEMA"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
"default": null
}
]
}
}
15 changes: 0 additions & 15 deletions common/src/main/avro/testutils/CompositeKey.avsc

This file was deleted.

30 changes: 0 additions & 30 deletions common/src/main/avro/testutils/ListeningEvent.avsc

This file was deleted.

19 changes: 0 additions & 19 deletions common/src/main/avro/testutils/Song.avsc

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022 bakdata GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bakdata.quick.common.exception;

import edu.umd.cs.findbugs.annotations.Nullable;
import lombok.Getter;

@Getter
public class MirrorTopologyException extends RuntimeException {

public MirrorTopologyException(@Nullable final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import lombok.Getter;

/**
* A resolver for Protobuf messages.
*/
public class ProtobufResolver implements TypeResolver<Message> {
private final JsonFormat.Parser parser;
@Getter
private final Descriptors.Descriptor descriptor;

public ProtobufResolver(final Descriptors.Descriptor descriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import com.bakdata.quick.common.api.model.TopicWriteType;
import com.bakdata.quick.common.resolver.TypeResolver;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import lombok.Value;
import org.apache.kafka.common.serialization.Serde;
import org.jetbrains.annotations.Nullable;

/**
* POJO for topic data.
Expand All @@ -44,5 +46,7 @@ public static class QuickData<T> {
QuickTopicType type;
Serde<T> serde;
TypeResolver<T> resolver;
@Nullable
ParsedSchema parsedSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public boolean isSchema() {
*/
abstract <K> Serde<K> getSerde(final Map<String, ?> configs, final boolean isKey);


@SuppressWarnings("unchecked")
static <K> TypeResolver<K> configuredTypeResolver(final TypeResolver<?> typeResolver) {
return (TypeResolver<K>) typeResolver;
Expand All @@ -179,5 +178,4 @@ static <K> Serde<K> configuredSerde(final Serde<?> serde, final Map<String, ?> c
serde.configure(config, isKey);
return (Serde<K>) serde;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.bakdata.quick.common.api.client.TopicRegistryClient;
import com.bakdata.quick.common.api.model.TopicData;
import com.bakdata.quick.common.config.KafkaConfig;
import com.bakdata.quick.common.resolver.TypeResolver;
import com.bakdata.quick.common.schema.SchemaFetcher;
import com.bakdata.quick.common.type.ConversionProvider;
import com.bakdata.quick.common.type.QuickTopicData;
Expand Down Expand Up @@ -57,14 +56,14 @@ public class QuickTopicTypeService implements TopicTypeService {
/**
* Default constructor.
*
* @param registryFetcher http client for schema registry
* @param registryFetcher http client for schema registry
* @param topicRegistryClient http client for topic registry
* @param kafkaConfig configuration for kafka
* @param conversionProvider provider for conversion operations
* @param kafkaConfig configuration for kafka
* @param conversionProvider provider for conversion operations
*/
public QuickTopicTypeService(final SchemaFetcher registryFetcher,
final TopicRegistryClient topicRegistryClient, final KafkaConfig kafkaConfig,
final ConversionProvider conversionProvider) {
final TopicRegistryClient topicRegistryClient, final KafkaConfig kafkaConfig,
final ConversionProvider conversionProvider) {
this.registryFetcher = registryFetcher;
this.topicRegistryClient = topicRegistryClient;
this.schemaRegistryUrl = kafkaConfig.getSchemaRegistryUrl();
Expand Down Expand Up @@ -107,15 +106,17 @@ public Single<QuickTopicType> getValueType(final String topic) {
.as(single -> singleToFuture(executor, single));
}

private <K> Single<TypeResolver<K>> createResolver(final QuickTopicType type, final String subject) {
private <K> Single<TypeResolverSchema<K>> createResolver(final QuickTopicType type, final String subject) {
// no need for configuration if handle non-schema types
if (!type.isSchema()) {
return Single.just(this.conversionProvider.getTypeResolver(type, null));
return Single.just(new TypeResolverSchema<>(this.conversionProvider.getTypeResolver(type, null),
null));
}
// get schema and configure the resolver with it
return this.registryFetcher.getSchema(subject)
.doOnError(e -> log.error("No schema found for subject {}", subject, e))
.map(schema -> this.conversionProvider.getTypeResolver(type, schema));
.map(schema -> new TypeResolverSchema<>(this.conversionProvider.getTypeResolver(type, schema),
schema));
}

private <K, V> Single<QuickTopicData<K, V>> fromTopicData(final TopicData topicData) {
Expand All @@ -128,13 +129,17 @@ private <K, V> Single<QuickTopicData<K, V>> fromTopicData(final TopicData topicD

final String topic = topicData.getName();
// create key data
final Single<TypeResolver<K>> keyResolver = this.createResolver(keyType, KEY.asSubject(topic));
final Single<QuickData<K>> keyData = keyResolver.map(resolver -> new QuickData<>(keyType, keySerde, resolver));
final Single<TypeResolverSchema<K>> keyResolver = this.createResolver(keyType, KEY.asSubject(topic));
final Single<QuickData<K>> keyData =
keyResolver.map(resolver -> new QuickData<>(keyType, keySerde, resolver.getTypeResolver(),
resolver.getParsedSchema()));

// create value data
final Single<TypeResolver<V>> valueResolver = this.createResolver(valueType, VALUE.asSubject(topic));
final Single<TypeResolverSchema<V>> valueResolver =
this.createResolver(valueType, VALUE.asSubject(topic));
final Single<QuickData<V>> valueData =
valueResolver.map(resolver -> new QuickData<>(valueType, valueSerde, resolver));
valueResolver.map(resolver -> new QuickData<>(valueType, valueSerde, resolver.getTypeResolver(),
resolver.getParsedSchema()));

// combine key and value data when both are ready
return keyData.zipWith(valueData,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2022 bakdata GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bakdata.quick.common.type.registry;

import com.bakdata.quick.common.resolver.TypeResolver;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import lombok.Value;

@Value
public class TypeResolverSchema<K> {
TypeResolver<K> typeResolver;
@Nullable
ParsedSchema parsedSchema;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.quick.avro.ChartRecord;
import com.bakdata.quick.testutil.ChartRecord;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.quick.avro.Person;
import com.bakdata.quick.common.api.client.HttpClient;
import com.bakdata.quick.common.config.KafkaConfig;
import com.bakdata.quick.common.exception.HttpClientException;
import com.bakdata.quick.testutil.Person;
import com.bakdata.quick.testutil.ProtoTestRecord;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.schemaregistry.ParsedSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.quick.avro.ChartRecord;
import com.bakdata.quick.common.TestTopicRegistryClient;
import com.bakdata.quick.common.api.client.HttpClient;
import com.bakdata.quick.common.api.client.TopicRegistryClient;
Expand All @@ -33,6 +32,7 @@
import com.bakdata.quick.common.type.QuickTopicData;
import com.bakdata.quick.common.type.QuickTopicType;
import com.bakdata.quick.common.type.TopicTypeService;
import com.bakdata.quick.testutil.ChartRecord;
import com.bakdata.quick.testutil.ProtoTestRecord;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import io.confluent.kafka.schemaregistry.ParsedSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"namespace": "com.bakdata.quick.avro",
"namespace": "com.bakdata.quick.testutil",
"name": "ChartRecord",
"type": "record",
"fields": [
Expand All @@ -12,4 +12,4 @@
"type": "long"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"namespace": "com.bakdata.quick.avro",
"namespace": "com.bakdata.quick.testutil",
"name": "Person",
"type": "record",
"fields": [
Expand Down
15 changes: 15 additions & 0 deletions common/src/testFixtures/avro/RangeQuery.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"namespace": "com.bakdata.quick.testutil",
"name": "AvroRangeQueryTest",
"type": "record",
"fields": [
{
"name": "userId",
"type": "int"
},
{
"name": "timestamp",
"type": "long"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public <K, V> Single<QuickTopicData<K, V>> getTopicData(final String topic) {
final Serde<V> valueSerde = this.conversionProvider.getSerde(this.valueType, configs, false);
final TypeResolver<K> keyResolver = this.conversionProvider.getTypeResolver(this.keyType, this.keySchema);
final TypeResolver<V> valueResolver = this.conversionProvider.getTypeResolver(this.valueType, this.valueSchema);
final QuickData<K> quickData = new QuickData<>(this.keyType, keySerde, keyResolver);
final QuickData<V> valueInfo = new QuickData<>(this.valueType, valueSerde, valueResolver);
final QuickData<K> quickData = new QuickData<>(this.keyType, keySerde, keyResolver, this.keySchema);
final QuickData<V> valueInfo = new QuickData<>(this.valueType, valueSerde, valueResolver, this.valueSchema);
final QuickTopicData<K, V> topicInfo =
new QuickTopicData<>(topic, TopicWriteType.MUTABLE, quickData, valueInfo);
return Single.just(topicInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.bakdata.quick.common.type.QuickTopicType;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde;
import org.apache.avro.Schema;
Expand All @@ -40,27 +42,29 @@ private TestTypeUtils() {
}

public static QuickData<GenericRecord> newAvroData(final Schema schema) {
return new QuickData<>(QuickTopicType.AVRO, new GenericAvroSerde(), new GenericAvroResolver(schema));
return new QuickData<>(QuickTopicType.AVRO, new GenericAvroSerde(), new GenericAvroResolver(schema),
new AvroSchema(schema));
}

public static QuickData<String> newStringData() {
return new QuickData<>(QuickTopicType.STRING, Serdes.String(), new StringResolver());
return new QuickData<>(QuickTopicType.STRING, Serdes.String(), new StringResolver(), null);
}

public static QuickData<Long> newLongData() {
return new QuickData<>(QuickTopicType.LONG, Serdes.Long(), new LongResolver());
return new QuickData<>(QuickTopicType.LONG, Serdes.Long(), new LongResolver(), null);
}

public static QuickData<Double> newDoubleData() {
return new QuickData<>(QuickTopicType.DOUBLE, Serdes.Double(), new DoubleResolver());
return new QuickData<>(QuickTopicType.DOUBLE, Serdes.Double(), new DoubleResolver(), null);
}

public static QuickData<Integer> newIntegerData() {
return new QuickData<>(QuickTopicType.INTEGER, Serdes.Integer(), new IntegerResolver());
return new QuickData<>(QuickTopicType.INTEGER, Serdes.Integer(), new IntegerResolver(), null);
}

public static QuickData<Message> newProtobufData(final Descriptors.Descriptor descriptor) {
return new QuickData<>(QuickTopicType.PROTOBUF, new KafkaProtobufSerde<>(), new ProtobufResolver(descriptor));
return new QuickData<>(QuickTopicType.PROTOBUF, new KafkaProtobufSerde<>(), new ProtobufResolver(descriptor),
new ProtobufSchema(descriptor));
}

}
Loading