Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

728 payload content-type check & error handling #738

Merged
merged 3 commits into from
Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.http;
package pl.allegro.tech.hermes.api;

public class AvroMediaType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

@Path("topics")
public interface TopicEndpoint {
Expand Down Expand Up @@ -69,7 +69,7 @@ List<String> queryList(
Response publishMessage(@PathParam("topicName") String qualifiedTopicName, String message);

@POST
@Consumes(MediaType.TEXT_PLAIN)
@Consumes(AVRO_BINARY)
@Produces(APPLICATION_JSON)
@Path("/{topicName}")
Response publishMessage(@PathParam("topicName") String qualifiedTopicName, byte[] message);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.common.message.wrapper;

import org.apache.avro.Schema;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
Expand All @@ -22,4 +23,12 @@ public UnsupportedContentTypeException(Subscription subscription) {
));
}

public UnsupportedContentTypeException(String payloadContentType, Schema schema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Avro Schema here? i don't get it, we probably want topic name, not schema name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. In most cases schema name matches topic name, and tests didn't catch that typo.
Fixed

super(String.format(
"Unsupported payload content type %s for %s",
payloadContentType,
schema.getFullName()
));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static pl.allegro.tech.hermes.api.ContentType.AVRO;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.BATCH_ID;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.RETRY_COUNT;
import static pl.allegro.tech.hermes.consumers.consumer.sender.http.AvroMediaType.AVRO_BINARY;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

public class ApacheHttpClientMessageBatchSender implements MessageBatchSender {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.MESSAGE_ID;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.RETRY_COUNT;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.SCHEMA_VERSION;
import static pl.allegro.tech.hermes.consumers.consumer.sender.http.AvroMediaType.AVRO_BINARY;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

class HttpRequestFactory {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
package pl.allegro.tech.hermes.frontend.publishing.message;

import org.apache.avro.Schema;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

public class MessageContentTypeEnforcer {

private final JsonAvroConverter converter = new JsonAvroConverter();

private static final String APPLICATION_JSON_WITH_DELIM = APPLICATION_JSON + ";";
private static final String AVRO_BINARY_WITH_DELIM = AVRO_BINARY + ";";

public byte[] enforceAvro(String payloadContentType, byte[] data, Schema schema) {
if (isJSON(payloadContentType)) {
return converter.convertToAvro(data, schema);
} else if (isAvro(payloadContentType)) {
return data;
} else {
throw new UnsupportedContentTypeException(payloadContentType, schema);
}
return data;
}

private boolean isJSON(String contentType) {
return contentType != null && (contentType.length() > APPLICATION_JSON.length() ?
contentType.startsWith(APPLICATION_JSON_WITH_DELIM) : contentType.equals(APPLICATION_JSON));
return isOfType(contentType, APPLICATION_JSON, APPLICATION_JSON_WITH_DELIM);
}

private boolean isAvro(String contentType) {
return isOfType(contentType, AVRO_BINARY, AVRO_BINARY_WITH_DELIM);
}

private boolean isOfType(String contentType, String expectedContentType, String expectedWithDelim) {
return contentType != null && (contentType.length() > expectedContentType.length() ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we just check startsWith?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, content type is not case sensitive (https://www.w3.org/Protocols/rfc1341/4_Content-Type.html) so we can match with String.toLowerCase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking with startsWith doesn't enforce ';' to be the first character after content-type value.
It would accept values like 'application/jsonANYTEXT'
Do we want to allow such values?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the current algorithm is fine

Copy link
Contributor

@chemicL chemicL Mar 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
We can still simplify the check

contentType != null && (contentType.equals(expectedContentType)) || contentType.startsWith(expectedWithDelim))

String.equals has a check for length.

contentType.startsWith(expectedWithDelim) : contentType.equals(expectedContentType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import org.apache.avro.Schema;
import org.junit.Test;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import pl.allegro.tech.hermes.schema.CompiledSchema;
import pl.allegro.tech.hermes.schema.SchemaVersion;
import pl.allegro.tech.hermes.test.helper.avro.AvroUser;

import javax.ws.rs.core.MediaType;
import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -44,4 +46,10 @@ public void shouldNotConvertWhenReceivingAvroOnAvroTopic() throws IOException {
assertThat(enforcedMessage).isEqualTo(avroMessage.asBytes());
}

@Test(expected = UnsupportedContentTypeException.class)
public void shouldThrowUnsupportedContentTypeExceptionWhenReceivedWrongContentType() throws IOException {
// when
enforcer.enforceAvro(MediaType.TEXT_PLAIN, avroMessage.asBytes(), schema.getSchema());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CREATED;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
Expand Down Expand Up @@ -73,6 +75,23 @@ public void shouldPublishAvroAndConsumeJsonMessage() throws InterruptedException
remoteService.waitUntilReceived(json -> assertThatJson(json).isEqualTo(user.asJson()));
}

@Test
public void shouldNotPublishAvroWhenMessageIsNotJsonOrAvro() throws InterruptedException, ExecutionException, TimeoutException, IOException {
// given
Topic topic = operations.buildTopic(topic("publishAvroConsumeAvro.topic")
.withContentType(AVRO)
.build()
);
operations.saveSchema(topic, user.getSchemaAsString());
operations.createSubscription(topic, "subscription", HTTP_ENDPOINT_URL, ContentType.AVRO);

// when
Response response = publisher.publish("publishAvroConsumeAvro.topic", user.asJson(), Collections.singletonMap("Content-Type", TEXT_PLAIN));

// then
assertThat(response).hasStatus(BAD_REQUEST);
}

@Test
public void shouldPublishAvroAndConsumeAvroMessage() throws InterruptedException, ExecutionException, TimeoutException, IOException {
// given
Expand Down