Skip to content

Commit

Permalink
Fix Interop issue: Amqp body section can be AmqpValue or AmqpSequence (
Browse files Browse the repository at this point in the history
…Azure#66)

* support interop with 2 other body types - amqp-value & amqp-sequence
* receivedEvent.getBody() should throw and communicate - in case of interop issues
* fix flaky CIT issues
* Change exception name from Illegal to Unexpected
* implement reSend path for other Body section types
  • Loading branch information
SreeramGarlapati committed Mar 17, 2017
1 parent 180bc11 commit ad9fddb
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 17 deletions.
41 changes: 33 additions & 8 deletions azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.qpid.proton.Proton;
Expand All @@ -21,6 +22,9 @@
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;

import com.microsoft.azure.servicebus.amqp.AmqpConstants;
Expand All @@ -36,11 +40,14 @@ public class EventData implements Serializable

transient private Binary bodyData;

private final Class bodyType;

private Map<String, Object> properties;
private SystemProperties systemProperties;

private EventData()
{
this.bodyType = Data.class;
}

/**
Expand Down Expand Up @@ -72,19 +79,31 @@ private EventData()
if (amqpMessage.getCorrelationId() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CORRELATION_ID, amqpMessage.getCorrelationId());
if (amqpMessage.getContentType() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CONTENT_TYPE, amqpMessage.getContentType());
if (amqpMessage.getContentEncoding() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CONTENT_ENCODING, amqpMessage.getContentEncoding());
if (amqpMessage.getProperties().getAbsoluteExpiryTime() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time, amqpMessage.getExpiryTime());
if (amqpMessage.getProperties().getAbsoluteExpiryTime() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME, amqpMessage.getExpiryTime());
if (amqpMessage.getProperties().getCreationTime() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CREATION_TIME, amqpMessage.getCreationTime());
if (amqpMessage.getGroupId() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_GROUP_ID, amqpMessage.getGroupId());
if (amqpMessage.getProperties().getGroupSequence() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_GROUP_SEQUENCE, amqpMessage.getGroupSequence());
if (amqpMessage.getReplyToGroupId() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_REPLY_TO_GROUP_ID, amqpMessage.getReplyToGroupId());
}

this.systemProperties = new SystemProperties(receiveProperties);
this.properties = amqpMessage.getApplicationProperties() == null ? null
: ((Map<String, Object>)(amqpMessage.getApplicationProperties().getValue()));

this.bodyData = amqpMessage.getBody() == null ? null : ((Data) amqpMessage.getBody()).getValue();


Section bodySection = amqpMessage.getBody();
if (bodySection != null) {
this.bodyType = bodySection.getClass();
if (bodySection instanceof Data)
this.bodyData = ((Data) bodySection).getValue();
else if (bodySection instanceof AmqpValue)
receiveProperties.put(AmqpConstants.AMQP_VALUE, ((AmqpValue) bodySection).getValue());
else if (bodySection instanceof AmqpSequence)
receiveProperties.put(AmqpConstants.AMQP_SEQUENCE, ((AmqpSequence) bodySection).getValue());
}
else {
this.bodyType = Data.class;
}

this.systemProperties = new SystemProperties(receiveProperties);
amqpMessage.clear();
}

Expand Down Expand Up @@ -181,7 +200,11 @@ public EventData(ByteBuffer buffer)
*/
public byte[] getBody()
{
return this.bodyData == null ? null : this.bodyData.getArray();
if (this.bodyType != Data.class) {
throw new UnexpectedEventDataBodyException(this.bodyType);
}

return this.bodyData == null ? null : this.bodyData.getArray();
}

/**
Expand Down Expand Up @@ -279,12 +302,14 @@ Message toAmqpMessage()
case AmqpConstants.AMQP_PROPERTY_CORRELATION_ID: amqpMessage.setCorrelationId(systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_CONTENT_TYPE: amqpMessage.setContentType((String) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_CONTENT_ENCODING: amqpMessage.setContentEncoding((String) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time: amqpMessage.setExpiryTime((long) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME: amqpMessage.setExpiryTime((long) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_CREATION_TIME: amqpMessage.setCreationTime((long) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_GROUP_ID: amqpMessage.setGroupId((String) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_GROUP_SEQUENCE: amqpMessage.setGroupSequence((long) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_REPLY_TO_GROUP_ID: amqpMessage.setReplyToGroupId((String) systemProperty.getValue()); break;
default: throw new RuntimeException("unreachable");
case AmqpConstants.AMQP_VALUE: amqpMessage.setBody(new AmqpValue(systemProperty.getValue())); break;
case AmqpConstants.AMQP_SEQUENCE: amqpMessage.setBody(new AmqpSequence((List) systemProperty.getValue())); break;
default: throw new RuntimeException("unreachable");
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs;

import java.util.HashMap;
import java.util.Map;

import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;

import com.microsoft.azure.servicebus.amqp.AmqpConstants;

public class UnexpectedEventDataBodyException extends RuntimeException {

private static final Map<Class, String> KNOWN_SECTIONS = new HashMap<Class, String>() {{
put(AmqpValue.class, AmqpConstants.AMQP_VALUE);
put(AmqpSequence.class, AmqpConstants.AMQP_SEQUENCE);
}};

private final Class bodySection;

public UnexpectedEventDataBodyException(final Class actualBodySection) {
super(KNOWN_SECTIONS.containsKey(actualBodySection)
? String.format("AmqpMessage Body Section will be available in %s.getBody() only if it is of type: %s. " +
"If AmqpMessage has any other type as part of Body Section - it will be added to %s.getSystemProperties()." +
" Use '%s' as Key to fetch this from %s.getSystemProperties().",
EventData.class, Data.class, EventData.class, KNOWN_SECTIONS.get(actualBodySection), EventData.class)
: "AmqpMessage Body Section cannot be mapped to any EventData section.");
this.bodySection = actualBodySection;
}

// used for testing
public String getSystemPropertyName() {
return KNOWN_SECTIONS.get(this.bodySection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ private AmqpConstants() { }
add(AMQP_PROPERTY_CORRELATION_ID);
add(AMQP_PROPERTY_CONTENT_TYPE);
add(AMQP_PROPERTY_CONTENT_ENCODING);
add(AMQP_PROPERTY_ABSOLUTE_EXPRITY_time);
add(AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME);
add(AMQP_PROPERTY_CREATION_TIME);
add(AMQP_PROPERTY_GROUP_ID);
add(AMQP_PROPERTY_GROUP_SEQUENCE);
add(AMQP_PROPERTY_REPLY_TO_GROUP_ID);
add(AmqpConstants.AMQP_VALUE);
add(AmqpConstants.AMQP_SEQUENCE);
}});

public static final String APACHE = "apache.org";
Expand Down Expand Up @@ -65,11 +67,14 @@ private AmqpConstants() { }
public static final String AMQP_PROPERTY_CORRELATION_ID = "correlation-id";
public static final String AMQP_PROPERTY_CONTENT_TYPE = "content-type";
public static final String AMQP_PROPERTY_CONTENT_ENCODING = "content-encoding";
public static final String AMQP_PROPERTY_ABSOLUTE_EXPRITY_time = "absolute-expiry-time";
public static final String AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME = "absolute-expiry-time";
public static final String AMQP_PROPERTY_CREATION_TIME = "creation-time";
public static final String AMQP_PROPERTY_GROUP_ID = "group-id";
public static final String AMQP_PROPERTY_GROUP_SEQUENCE = "group-sequence";
public static final String AMQP_PROPERTY_REPLY_TO_GROUP_ID = "reply-to-group-id";

public static final String AMQP_VALUE = "amqp:amqp-value:*";
public static final String AMQP_SEQUENCE = "amqp:amqp-sequence:list";

public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -36,7 +37,7 @@
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.amqp.AmqpConstants;

public class InteropTest extends ApiTestBase
public class InteropAmqpPropertiesTest extends ApiTestBase
{
static EventHubClient ehClient;
static MessagingFactory msgFactory;
Expand Down Expand Up @@ -83,8 +84,8 @@ public void accept(EventData eData)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_REPLY_TO_GROUP_ID).equals(originalMessage.getReplyToGroupId()));
Assert.assertTrue(eData.getSystemProperties().containsKey(AmqpConstants.AMQP_PROPERTY_REPLY_TO)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_REPLY_TO).equals(originalMessage.getReplyTo()));
Assert.assertTrue(eData.getSystemProperties().containsKey(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time).equals(originalMessage.getExpiryTime()));
Assert.assertTrue(eData.getSystemProperties().containsKey(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME).equals(originalMessage.getExpiryTime()));

Assert.assertTrue(eData.getSystemProperties().containsKey(msgAnnotation)
&& eData.getSystemProperties().get(msgAnnotation).equals(originalMessage.getMessageAnnotations().getValue().get(Symbol.getSymbol(msgAnnotation))));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs.eventdata;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.LinkedList;
import java.util.List;

import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.message.Message;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.UnexpectedEventDataBodyException;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventhubs.lib.ApiTestBase;
import com.microsoft.azure.eventhubs.lib.TestContext;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.MessageSender;
import com.microsoft.azure.servicebus.MessagingFactory;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.amqp.AmqpConstants;

public class InteropEventBodyTest extends ApiTestBase {

static EventHubClient ehClient;
static MessagingFactory msgFactory;
static PartitionReceiver receiver;
static MessageSender partitionMsgSender;
static PartitionSender partitionSender;

static final String partitionId = "0";
static EventData receivedEvent;
static EventData reSentAndReceivedEvent;
static Message reSendAndReceivedMessage;

@BeforeClass
public static void initialize() throws ServiceBusException, IOException, InterruptedException, ExecutionException
{
final ConnectionStringBuilder connStrBuilder = TestContext.getConnectionString();
final String connectionString = connStrBuilder.toString();

ehClient = EventHubClient.createFromConnectionStringSync(connectionString);
msgFactory = MessagingFactory.createFromConnectionString(connectionString).get();
receiver = ehClient.createReceiverSync(TestContext.getConsumerGroupName(), partitionId, Instant.now());
partitionSender = ehClient.createPartitionSenderSync(partitionId);
partitionMsgSender = MessageSender.create(msgFactory, "link1", connStrBuilder.getEntityPath() + "/partitions/" + partitionId).get();

// run out of messages in that specific partition - to account for clock-skew with Instant.now() on test machine vs eventhubs service
receiver.setReceiveTimeout(Duration.ofSeconds(5));
Iterable<EventData> clockSkewEvents;
do {
clockSkewEvents = receiver.receiveSync(100);
} while (clockSkewEvents != null && clockSkewEvents.iterator().hasNext());
}

@Test
public void interopWithProtonAmqpMessageBodyAsAmqpValue() throws ServiceBusException, InterruptedException, ExecutionException
{
Message originalMessage = Proton.message();
String payload = "testmsg";
originalMessage.setBody(new AmqpValue(payload));
partitionMsgSender.send(originalMessage).get();
receivedEvent = receiver.receiveSync(10).iterator().next();

Assert.assertEquals(payload, receivedEvent.getSystemProperties().get(AmqpConstants.AMQP_VALUE));

try {
receivedEvent.getBody();
Assert.assertTrue(false); // this line shouldn't be reachable
} catch (UnexpectedEventDataBodyException exception) {
Assert.assertEquals(AmqpConstants.AMQP_VALUE, exception.getSystemPropertyName());
}

partitionSender.sendSync(receivedEvent);
reSentAndReceivedEvent = receiver.receiveSync(10).iterator().next();
Assert.assertEquals(payload, reSentAndReceivedEvent.getSystemProperties().get(AmqpConstants.AMQP_VALUE));

try {
reSentAndReceivedEvent.getBody();
Assert.assertTrue(false); // this line shouldn't be reachable
} catch (UnexpectedEventDataBodyException exception) {
Assert.assertEquals(AmqpConstants.AMQP_VALUE, exception.getSystemPropertyName());
}
}

@Test
public void interopWithProtonAmqpMessageBodyAsAmqpSequence() throws ServiceBusException, InterruptedException, ExecutionException
{
Message originalMessage = Proton.message();
String payload = "testmsg";
LinkedList<Data> datas = new LinkedList<>();
datas.add(new Data(new Binary(payload.getBytes())));
originalMessage.setBody(new AmqpSequence(datas));

partitionMsgSender.send(originalMessage).get();
receivedEvent = receiver.receiveSync(10).iterator().next();

Assert.assertEquals(payload, new String(((List<Data>)(receivedEvent.getSystemProperties().get(AmqpConstants.AMQP_SEQUENCE))).get(0).getValue().getArray()));

try {
receivedEvent.getBody();
Assert.assertTrue(false); // getBody() should throw; this line shouldn't be reachable
} catch (UnexpectedEventDataBodyException exception) {
Assert.assertEquals(AmqpConstants.AMQP_SEQUENCE, exception.getSystemPropertyName());
}

partitionSender.sendSync(receivedEvent);
reSentAndReceivedEvent = receiver.receiveSync(10).iterator().next();
Assert.assertEquals(payload, new String(((List<Data>)(reSentAndReceivedEvent.getSystemProperties().get(AmqpConstants.AMQP_SEQUENCE))).get(0).getValue().getArray()));

try {
reSentAndReceivedEvent.getBody();
Assert.assertTrue(false); // getBody() should throw; this line shouldn't be reachable
} catch (UnexpectedEventDataBodyException exception) {
Assert.assertEquals(AmqpConstants.AMQP_SEQUENCE, exception.getSystemPropertyName());
}
}

@AfterClass
public static void cleanup() throws ServiceBusException
{
if (partitionMsgSender != null)
partitionMsgSender.closeSync();

if (receiver != null)
receiver.closeSync();

if (ehClient != null)
ehClient.closeSync();

if (msgFactory != null)
msgFactory.closeSync();
}
}
Loading

0 comments on commit ad9fddb

Please sign in to comment.