Skip to content

Commit

Permalink
QPID-8571 : Non-unique consumer tags created for AMPQ 0-9-1
Browse files Browse the repository at this point in the history
  • Loading branch information
rgodfrey committed Jan 31, 2025
1 parent 05686f8 commit d31f04f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0

/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<>();
private final Set<AMQShortString> _nonGeneratedTags = new HashSet<>();

private final MessageStore _messageStore;

Expand Down Expand Up @@ -567,7 +568,16 @@ private AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageS
{
if (tag == null)
{
tag = AMQShortString.createAMQShortString("sgen_" + getNextConsumerTag());
do {
tag = AMQShortString.createAMQShortString("sgen_" + getNextConsumerTag());
}
while(_nonGeneratedTags.contains(tag));
}
else {
if(!_nonGeneratedTags.add(tag)) {
throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
}

}

if (_tag2SubscriptionTargetMap.containsKey(tag))
Expand Down Expand Up @@ -699,6 +709,7 @@ else if(value instanceof String || value instanceof AMQShortString)
| MessageSource.ConsumerAccessRefused e)
{
_tag2SubscriptionTargetMap.remove(tag);
_nonGeneratedTags.remove(tag);
throw e;
}
return tag;
Expand All @@ -716,6 +727,7 @@ private boolean unsubscribeConsumer(AMQShortString consumerTag)
LOGGER.debug("Unsubscribing consumer '{}' on channel {}", consumerTag, this);
}

_nonGeneratedTags.remove(consumerTag);
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
if (target != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
*/
package org.apache.qpid.tests.protocol.v0_8;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

Expand Down Expand Up @@ -292,6 +289,39 @@ public void consumeMessage() throws Exception
}
}

@Test
@SpecificationTest(section = "1.8.3.3", description = "If no consumer tag is given then the server should generate " +
"a unique tag, not clashing with an existing tag")
public void serverGeneratedTagUniqueness() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();


String queueName = BrokerAdmin.TEST_QUEUE_NAME;
String consumerTag = "sgen_1";
interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().qosPrefetchCount(1)
.qos()
.consumeResponse(BasicQosOkBody.class)
.basic().consumeConsumerTag(consumerTag)
.consumeQueue(queueName)
.consume()
.consumeResponse(BasicConsumeOkBody.class)
.channel().flow(true)
.consumeResponse(ChannelFlowOkBody.class);

BasicConsumeOkBody consumeOK = interaction.basic().consumeConsumerTag("")
.consumeQueue(queueName)
.consume()
.consumeResponse(BasicConsumeOkBody.class).getLatestResponse(BasicConsumeOkBody.class);
assertThat(consumeOK.getConsumerTag().toString(), is(not(equalTo(consumerTag))));
}
}

@Test
@SpecificationTest(section = "1.8.3.13",
description = "The server MUST validate that a non-zero delivery-tag refers to a delivered message,"
Expand Down

0 comments on commit d31f04f

Please sign in to comment.