diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/AvroMediaType.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/AvroMediaType.java index 17f2458b50..e7e79ef8c0 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/AvroMediaType.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/AvroMediaType.java @@ -3,4 +3,6 @@ public class AvroMediaType { public final static String AVRO_BINARY = "avro/binary"; + + public final static String AVRO_JSON = "avro/json"; } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/AvroEncodedJsonAvroConverter.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/AvroEncodedJsonAvroConverter.java new file mode 100644 index 0000000000..9940301955 --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/AvroEncodedJsonAvroConverter.java @@ -0,0 +1,40 @@ +package pl.allegro.tech.hermes.frontend.publishing.message; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.*; +import tech.allegro.schema.json2avro.converter.AvroConversionException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +class AvroEncodedJsonAvroConverter { + + byte[] convertToAvro(byte[] bytes, Schema schema) { + try { + return convertToAvro(readJson(bytes, schema), schema); + } catch (IOException | AvroRuntimeException e) { + throw new AvroConversionException("Failed to convert to AVRO.", e); + } + } + + private GenericData.Record readJson(byte[] bytes, Schema schema) throws IOException { + InputStream input = new ByteArrayInputStream(bytes); + Decoder decoder = DecoderFactory.get().jsonDecoder(schema, input); + return new GenericDatumReader(schema).read(null, decoder); + } + + private byte[] convertToAvro(GenericData.Record jsonData, Schema schema) throws IOException { + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + writer.write(jsonData, encoder); + encoder.flush(); + return outputStream.toByteArray(); + } +} \ No newline at end of file diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcer.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcer.java index abca33c582..5c9c67b41f 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcer.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcer.java @@ -8,19 +8,24 @@ import static javax.ws.rs.core.MediaType.APPLICATION_JSON; import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY; +import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_JSON; public class MessageContentTypeEnforcer { - private final JsonAvroConverter converter = new JsonAvroConverter(); + private final JsonAvroConverter defaultJsonAvroconverter = new JsonAvroConverter(); + private final AvroEncodedJsonAvroConverter avroEncodedJsonAvroConverter = new AvroEncodedJsonAvroConverter(); private static final String APPLICATION_JSON_WITH_DELIM = APPLICATION_JSON + ";"; + private static final String AVRO_JSON_WITH_DELIM = AVRO_JSON + ";"; private static final String AVRO_BINARY_WITH_DELIM = AVRO_BINARY + ";"; public byte[] enforceAvro(String payloadContentType, byte[] data, Schema schema, Topic topic) { String contentTypeLowerCase = StringUtils.lowerCase(payloadContentType); if (isJSON(contentTypeLowerCase)) { - return converter.convertToAvro(data, schema); - } else if (isAvro(contentTypeLowerCase)) { + return defaultJsonAvroconverter.convertToAvro(data, schema); + } else if (isAvroJSON(contentTypeLowerCase)) { + return avroEncodedJsonAvroConverter.convertToAvro(data, schema); + } else if (isAvroBinary(contentTypeLowerCase)) { return data; } else { throw new UnsupportedContentTypeException(payloadContentType, topic); @@ -31,7 +36,11 @@ private boolean isJSON(String contentType) { return isOfType(contentType, APPLICATION_JSON, APPLICATION_JSON_WITH_DELIM); } - private boolean isAvro(String contentType) { + private boolean isAvroJSON(String contentType) { + return isOfType(contentType, AVRO_JSON, AVRO_JSON_WITH_DELIM); + } + + private boolean isAvroBinary(String contentType) { return isOfType(contentType, AVRO_BINARY, AVRO_BINARY_WITH_DELIM); } diff --git a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcerTest.java b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcerTest.java index dba4809260..59bd959b0f 100644 --- a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcerTest.java +++ b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/publishing/message/MessageContentTypeEnforcerTest.java @@ -32,6 +32,15 @@ public void shouldConvertToAvroWhenReceivedJSONOnAvroTopic() throws IOException assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes()); } + @Test + public void shouldConvertToAvroWhenReceivedAvroJSONOnAvroTopic() throws IOException { + // when + byte[] enforcedMessage = enforcer.enforceAvro("avro/json", avroMessage.asAvroEncodedJson().getBytes(), schema.getSchema(), topic); + + // then + assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes()); + } + @Test public void shouldStringContentTypeOfAdditionalOptionsWhenInterpretingIt() throws IOException { // when diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/avro/AvroUser.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/avro/AvroUser.java index c030ce33e7..9c0940dade 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/avro/AvroUser.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/avro/AvroUser.java @@ -1,5 +1,6 @@ package pl.allegro.tech.hermes.test.helper.avro; +import com.google.common.collect.ImmutableMap; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -10,10 +11,12 @@ import java.io.IOException; import java.io.UncheckedIOException; +import static java.util.Optional.ofNullable; import static pl.allegro.tech.hermes.test.helper.avro.RecordToBytesConverter.recordToBytes; public class AvroUser { + private static final String METADATA_FIELD = "__metadata"; private static final String NAME_FIELD = "name"; private static final String AGE_FIELD = "age"; private static final String FAVORITE_COLOR_FIELD = "favoriteColor"; @@ -68,10 +71,19 @@ public String asJson() { return asTestMessage().toString(); } + public String asAvroEncodedJson() { + return asAvroEncodedTestMessage().toString(); + } + public TestMessage asTestMessage() { return TestMessage.of(NAME_FIELD, getName()).append(AGE_FIELD, getAge()).append(FAVORITE_COLOR_FIELD, getFavoriteColor()); } + public TestMessage asAvroEncodedTestMessage() { + Object favoriteColorType = ofNullable(getFavoriteColor()).map(color -> (Object)ImmutableMap.of("string", color)).orElse("null"); + return TestMessage.of(METADATA_FIELD, null).append(NAME_FIELD, getName()).append(AGE_FIELD, getAge()).append(FAVORITE_COLOR_FIELD, favoriteColorType); + } + public static AvroUser create(CompiledSchema schema, byte[] bytes) { try { return new AvroUser(schema, bytes); diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesPublisher.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesPublisher.java index ff6f1abd0c..28e2dfa388 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesPublisher.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesPublisher.java @@ -5,13 +5,11 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.Response; import java.util.Map; -import static javax.ws.rs.client.Entity.text; - - public class HermesPublisher { private final TopicEndpoint topicEndpoint; @@ -28,7 +26,8 @@ public Response publish(String qualifiedTopicName, String message) { } public Response publish(String qualifiedTopicName, String message, Map headers) { - return webTarget.path(qualifiedTopicName).request().headers(new MultivaluedHashMap<>(headers)).post(text(message)); + String contentType = headers.getOrDefault("Content-Type", MediaType.TEXT_PLAIN); + return webTarget.path(qualifiedTopicName).request().headers(new MultivaluedHashMap<>(headers)).post(Entity.entity(message, contentType)); } public Response publish(String qualifiedTopicName, byte[] message) { diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java index 28c126faef..317c24ed24 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java @@ -34,6 +34,7 @@ import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static javax.ws.rs.core.Response.Status.OK; import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; +import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_JSON; import static pl.allegro.tech.hermes.api.ContentType.AVRO; import static pl.allegro.tech.hermes.api.ContentType.JSON; import static pl.allegro.tech.hermes.api.PatchData.patchData; @@ -185,6 +186,22 @@ public void shouldPublishJsonMessageConvertedToAvroForAvroTopics() { assertThat(response.getStatus()).isEqualTo(CREATED.getStatusCode()); } + @Test + public void shouldPublishAvroEncodedJsonMessageConvertedToAvroForAvroTopics() { + // given + Topic topic = operations.buildTopic(topic("avro.topic2") + .withContentType(AVRO) + .build() + ); + operations.saveSchema(topic, user.getSchemaAsString()); + + // when + Response response = publisher.publish("avro.topic2", user.asAvroEncodedJson(), Collections.singletonMap("Content-Type", AVRO_JSON)); + + // then + assertThat(response.getStatus()).isEqualTo(CREATED.getStatusCode()); + } + @Test public void shouldGetBadRequestForJsonInvalidWithAvroSchema() { Topic topic = topic("avro.invalidJson")