Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling conversion of JSON with standard Avro encoding #749

Merged
merged 2 commits into from
Mar 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
public class AvroMediaType {

public final static String AVRO_BINARY = "avro/binary";

public final static String AVRO_JSON = "avro/json";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pl.allegro.tech.hermes.frontend.publishing.message;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.*;
import tech.allegro.schema.json2avro.converter.AvroConversionException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

class AvroEncodedJsonAvroConverter {

byte[] convertToAvro(byte[] bytes, Schema schema) {
try {
return convertToAvro(readJson(bytes, schema), schema);
} catch (IOException | AvroRuntimeException e) {
throw new AvroConversionException("Failed to convert to AVRO.", e);
}
}

private GenericData.Record readJson(byte[] bytes, Schema schema) throws IOException {
InputStream input = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, input);
return new GenericDatumReader<GenericData.Record>(schema).read(null, decoder);
}

private byte[] convertToAvro(GenericData.Record jsonData, Schema schema) throws IOException {
GenericDatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
writer.write(jsonData, encoder);
encoder.flush();
return outputStream.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_JSON;

public class MessageContentTypeEnforcer {

private final JsonAvroConverter converter = new JsonAvroConverter();
private final JsonAvroConverter defaultJsonAvroconverter = new JsonAvroConverter();
private final AvroEncodedJsonAvroConverter avroEncodedJsonAvroConverter = new AvroEncodedJsonAvroConverter();

private static final String APPLICATION_JSON_WITH_DELIM = APPLICATION_JSON + ";";
private static final String AVRO_JSON_WITH_DELIM = AVRO_JSON + ";";
private static final String AVRO_BINARY_WITH_DELIM = AVRO_BINARY + ";";

public byte[] enforceAvro(String payloadContentType, byte[] data, Schema schema, Topic topic) {
String contentTypeLowerCase = StringUtils.lowerCase(payloadContentType);
if (isJSON(contentTypeLowerCase)) {
return converter.convertToAvro(data, schema);
} else if (isAvro(contentTypeLowerCase)) {
return defaultJsonAvroconverter.convertToAvro(data, schema);
} else if (isAvroJSON(contentTypeLowerCase)) {
return avroEncodedJsonAvroConverter.convertToAvro(data, schema);
} else if (isAvroBinary(contentTypeLowerCase)) {
return data;
} else {
throw new UnsupportedContentTypeException(payloadContentType, topic);
Expand All @@ -31,7 +36,11 @@ private boolean isJSON(String contentType) {
return isOfType(contentType, APPLICATION_JSON, APPLICATION_JSON_WITH_DELIM);
}

private boolean isAvro(String contentType) {
private boolean isAvroJSON(String contentType) {
return isOfType(contentType, AVRO_JSON, AVRO_JSON_WITH_DELIM);
}

private boolean isAvroBinary(String contentType) {
return isOfType(contentType, AVRO_BINARY, AVRO_BINARY_WITH_DELIM);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ public void shouldConvertToAvroWhenReceivedJSONOnAvroTopic() throws IOException
assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes());
}

@Test
public void shouldConvertToAvroWhenReceivedAvroJSONOnAvroTopic() throws IOException {
// when
byte[] enforcedMessage = enforcer.enforceAvro("avro/json", avroMessage.asAvroEncodedJson().getBytes(), schema.getSchema(), topic);

// then
assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes());
}

@Test
public void shouldStringContentTypeOfAdditionalOptionsWhenInterpretingIt() throws IOException {
// when
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.test.helper.avro;

import com.google.common.collect.ImmutableMap;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -10,10 +11,12 @@
import java.io.IOException;
import java.io.UncheckedIOException;

import static java.util.Optional.ofNullable;
import static pl.allegro.tech.hermes.test.helper.avro.RecordToBytesConverter.recordToBytes;

public class AvroUser {

private static final String METADATA_FIELD = "__metadata";
private static final String NAME_FIELD = "name";
private static final String AGE_FIELD = "age";
private static final String FAVORITE_COLOR_FIELD = "favoriteColor";
Expand Down Expand Up @@ -68,10 +71,19 @@ public String asJson() {
return asTestMessage().toString();
}

public String asAvroEncodedJson() {
return asAvroEncodedTestMessage().toString();
}

public TestMessage asTestMessage() {
return TestMessage.of(NAME_FIELD, getName()).append(AGE_FIELD, getAge()).append(FAVORITE_COLOR_FIELD, getFavoriteColor());
}

public TestMessage asAvroEncodedTestMessage() {
Object favoriteColorType = ofNullable(getFavoriteColor()).map(color -> (Object)ImmutableMap.of("string", color)).orElse("null");
return TestMessage.of(METADATA_FIELD, null).append(NAME_FIELD, getName()).append(AGE_FIELD, getAge()).append(FAVORITE_COLOR_FIELD, favoriteColorType);
}

public static AvroUser create(CompiledSchema<Schema> schema, byte[] bytes) {
try {
return new AvroUser(schema, bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
import java.util.Map;

import static javax.ws.rs.client.Entity.text;


public class HermesPublisher {

private final TopicEndpoint topicEndpoint;
Expand All @@ -28,7 +26,8 @@ public Response publish(String qualifiedTopicName, String message) {
}

public Response publish(String qualifiedTopicName, String message, Map<String, String> headers) {
return webTarget.path(qualifiedTopicName).request().headers(new MultivaluedHashMap<>(headers)).post(text(message));
String contentType = headers.getOrDefault("Content-Type", MediaType.TEXT_PLAIN);
return webTarget.path(qualifiedTopicName).request().headers(new MultivaluedHashMap<>(headers)).post(Entity.entity(message, contentType));
}

public Response publish(String qualifiedTopicName, byte[] message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.OK;
import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_JSON;
import static pl.allegro.tech.hermes.api.ContentType.AVRO;
import static pl.allegro.tech.hermes.api.ContentType.JSON;
import static pl.allegro.tech.hermes.api.PatchData.patchData;
Expand Down Expand Up @@ -185,6 +186,22 @@ public void shouldPublishJsonMessageConvertedToAvroForAvroTopics() {
assertThat(response.getStatus()).isEqualTo(CREATED.getStatusCode());
}

@Test
public void shouldPublishAvroEncodedJsonMessageConvertedToAvroForAvroTopics() {
// given
Topic topic = operations.buildTopic(topic("avro.topic2")
.withContentType(AVRO)
.build()
);
operations.saveSchema(topic, user.getSchemaAsString());

// when
Response response = publisher.publish("avro.topic2", user.asAvroEncodedJson(), Collections.singletonMap("Content-Type", AVRO_JSON));

// then
assertThat(response.getStatus()).isEqualTo(CREATED.getStatusCode());
}

@Test
public void shouldGetBadRequestForJsonInvalidWithAvroSchema() {
Topic topic = topic("avro.invalidJson")
Expand Down