Skip to content

Commit

Permalink
payload content-type check & error handling
Browse files Browse the repository at this point in the history
message payload content-type check & error message for wrong content-type (#728)
  • Loading branch information
rprzystasz authored and adamdubiel committed Mar 14, 2017
1 parent 5142f4b commit e304e84
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.http;
package pl.allegro.tech.hermes.api;

public class AvroMediaType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

@Path("topics")
public interface TopicEndpoint {
Expand Down Expand Up @@ -69,7 +69,7 @@ List<String> queryList(
Response publishMessage(@PathParam("topicName") String qualifiedTopicName, String message);

@POST
@Consumes(MediaType.TEXT_PLAIN)
@Consumes(AVRO_BINARY)
@Produces(APPLICATION_JSON)
@Path("/{topicName}")
Response publishMessage(@PathParam("topicName") String qualifiedTopicName, byte[] message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,12 @@ public UnsupportedContentTypeException(Subscription subscription) {
));
}

public UnsupportedContentTypeException(String payloadContentType, Topic topic) {
super(String.format(
"Unsupported payload content type header %s for topic %s",
payloadContentType,
topic.getQualifiedName()
));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static pl.allegro.tech.hermes.api.ContentType.AVRO;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.BATCH_ID;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.RETRY_COUNT;
import static pl.allegro.tech.hermes.consumers.consumer.sender.http.AvroMediaType.AVRO_BINARY;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

public class ApacheHttpClientMessageBatchSender implements MessageBatchSender {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.MESSAGE_ID;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.RETRY_COUNT;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.SCHEMA_VERSION;
import static pl.allegro.tech.hermes.consumers.consumer.sender.http.AvroMediaType.AVRO_BINARY;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

class HttpRequestFactory {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
package pl.allegro.tech.hermes.frontend.publishing.message;

import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

public class MessageContentTypeEnforcer {

private final JsonAvroConverter converter = new JsonAvroConverter();

private static final String APPLICATION_JSON_WITH_DELIM = APPLICATION_JSON + ";";
private static final String AVRO_BINARY_WITH_DELIM = AVRO_BINARY + ";";

public byte[] enforceAvro(String payloadContentType, byte[] data, Schema schema) {
if (isJSON(payloadContentType)) {
public byte[] enforceAvro(String payloadContentType, byte[] data, Schema schema, Topic topic) {
String contentTypeLowerCase = StringUtils.lowerCase(payloadContentType);
if (isJSON(contentTypeLowerCase)) {
return converter.convertToAvro(data, schema);
} else if (isAvro(contentTypeLowerCase)) {
return data;
} else {
throw new UnsupportedContentTypeException(payloadContentType, topic);
}
return data;
}

private boolean isJSON(String contentType) {
return contentType != null && (contentType.length() > APPLICATION_JSON.length() ?
contentType.startsWith(APPLICATION_JSON_WITH_DELIM) : contentType.equals(APPLICATION_JSON));
return isOfType(contentType, APPLICATION_JSON, APPLICATION_JSON_WITH_DELIM);
}

private boolean isAvro(String contentType) {
return isOfType(contentType, AVRO_BINARY, AVRO_BINARY_WITH_DELIM);
}

private boolean isOfType(String contentType, String expectedContentType, String expectedWithDelim) {
return contentType != null && (contentType.equals(expectedContentType) || contentType.startsWith(expectedWithDelim));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private AvroMessage createAvroMessage(HeaderMap headerMap, Topic topic, String m

AvroMessage message = new AvroMessage(
messageId,
enforcer.enforceAvro(headerMap.getFirst(Headers.CONTENT_TYPE_STRING), messageContent, schema.getSchema()),
enforcer.enforceAvro(headerMap.getFirst(Headers.CONTENT_TYPE_STRING), messageContent, schema.getSchema(), topic),
timestamp,
schema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import org.apache.avro.Schema;
import org.junit.Test;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import pl.allegro.tech.hermes.schema.CompiledSchema;
import pl.allegro.tech.hermes.schema.SchemaVersion;
import pl.allegro.tech.hermes.test.helper.avro.AvroUser;
import pl.allegro.tech.hermes.test.helper.builder.TopicBuilder;

import javax.ws.rs.core.MediaType;
import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -14,13 +19,14 @@ public class MessageContentTypeEnforcerTest {

private MessageContentTypeEnforcer enforcer = new MessageContentTypeEnforcer();

private Topic topic = TopicBuilder.topic("test.Topic").withContentType(ContentType.AVRO).build();
private AvroUser avroMessage = new AvroUser("Bob", 30, "black");
private CompiledSchema<Schema> schema = new CompiledSchema<>(avroMessage.getSchema(), SchemaVersion.valueOf(0));

@Test
public void shouldConvertToAvroWhenReceivedJSONOnAvroTopic() throws IOException {
// when
byte[] enforcedMessage = enforcer.enforceAvro("application/json", avroMessage.asJson().getBytes(), schema.getSchema());
byte[] enforcedMessage = enforcer.enforceAvro("application/json", avroMessage.asJson().getBytes(), schema.getSchema(), topic);

// then
assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes());
Expand All @@ -29,7 +35,7 @@ public void shouldConvertToAvroWhenReceivedJSONOnAvroTopic() throws IOException
@Test
public void shouldStringContentTypeOfAdditionalOptionsWhenInterpretingIt() throws IOException {
// when
byte[] enforcedMessage = enforcer.enforceAvro("application/json;encoding=utf-8", avroMessage.asJson().getBytes(), schema.getSchema());
byte[] enforcedMessage = enforcer.enforceAvro("application/json;encoding=utf-8", avroMessage.asJson().getBytes(), schema.getSchema(), topic);

// then
assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes());
Expand All @@ -38,10 +44,25 @@ public void shouldStringContentTypeOfAdditionalOptionsWhenInterpretingIt() throw
@Test
public void shouldNotConvertWhenReceivingAvroOnAvroTopic() throws IOException {
// when
byte[] enforcedMessage = enforcer.enforceAvro("avro/binary", avroMessage.asBytes(), schema.getSchema());
byte[] enforcedMessage = enforcer.enforceAvro("avro/binary", avroMessage.asBytes(), schema.getSchema(), topic);

// then
assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes());
}

@Test
public void shouldBeCaseInsensitiveForPayloadContentType() throws IOException {
// when
byte[] enforcedMessage = enforcer.enforceAvro("AVRO/Binary", avroMessage.asBytes(), schema.getSchema(), topic);

// then
assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes());
}

@Test(expected = UnsupportedContentTypeException.class)
public void shouldThrowUnsupportedContentTypeExceptionWhenReceivedWrongContentType() throws IOException {
// when
enforcer.enforceAvro(MediaType.TEXT_PLAIN, avroMessage.asBytes(), schema.getSchema(), topic);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CREATED;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
Expand Down Expand Up @@ -73,6 +75,23 @@ public void shouldPublishAvroAndConsumeJsonMessage() throws InterruptedException
remoteService.waitUntilReceived(json -> assertThatJson(json).isEqualTo(user.asJson()));
}

@Test
public void shouldNotPublishAvroWhenMessageIsNotJsonOrAvro() throws InterruptedException, ExecutionException, TimeoutException, IOException {
// given
Topic topic = operations.buildTopic(topic("publishAvroConsumeAvro.topic")
.withContentType(AVRO)
.build()
);
operations.saveSchema(topic, user.getSchemaAsString());
operations.createSubscription(topic, "subscription", HTTP_ENDPOINT_URL, ContentType.AVRO);

// when
Response response = publisher.publish("publishAvroConsumeAvro.topic", user.asJson(), Collections.singletonMap("Content-Type", TEXT_PLAIN));

// then
assertThat(response).hasStatus(BAD_REQUEST);
}

@Test
public void shouldPublishAvroAndConsumeAvroMessage() throws InterruptedException, ExecutionException, TimeoutException, IOException {
// given
Expand Down

0 comments on commit e304e84

Please sign in to comment.