Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add requeued message count metric #1476

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@
<spotless.check.skip>true</spotless.check.skip>
<slf4j.version>1.7.36</slf4j.version>
<metrics.version>4.2.28</metrics.version>
<micrometer.version>1.13.6</micrometer.version>
<opentelemetry.version>1.43.0</opentelemetry.version>
<micrometer.version>1.13.7</micrometer.version>
<opentelemetry.version>1.44.1</opentelemetry.version>
<jackson.version>2.18.1</jackson.version>
<logback.version>1.2.13</logback.version>
<junit.jupiter.version>5.11.3</junit.jupiter.version>
<mockito.version>5.14.2</mockito.version>
<assertj.version>3.26.3</assertj.version>
<micrometer-tracing-test.version>1.3.5</micrometer-tracing-test.version>
<micrometer-tracing-test.version>1.3.6</micrometer-tracing-test.version>
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
<jetty.version>9.4.56.v20240826</jetty.version>
<bouncycastle.version>1.79</bouncycastle.version>
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/rabbitmq/client/MetricsCollector.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -60,8 +60,16 @@ default void basicPublishUnrouted(Channel channel) {

void basicNack(Channel channel, long deliveryTag);

default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
this.basicNack(channel, deliveryTag);
}

void basicReject(Channel channel, long deliveryTag);

default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
this.basicReject(channel, deliveryTag);
}

void basicConsume(Channel channel, String consumerTag, boolean autoAck);

void basicCancel(Channel channel, String consumerTag);
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -50,11 +50,21 @@ public void basicNack(Channel channel, long deliveryTag) {

}

@Override
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {

}

@Override
public void basicReject(Channel channel, long deliveryTag) {

}

@Override
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {

}

@Override
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {

private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();

private final Runnable markRejectedMessageAction = () -> markRejectedMessage();
private final Function<Boolean, Runnable> markRejectedMessageAction;

private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged();

Expand All @@ -53,6 +53,12 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {

private static final Function<ChannelState, Set<Long>> GET_UNCONFIRMED_DTAGS = channelState -> channelState.unconfirmedMessageDeliveryTags;

public AbstractMetricsCollector() {
Runnable rejectRequeue = () -> markRejectedMessage(true);
Runnable rejectNoRequeue = () -> markRejectedMessage(false);
this.markRejectedMessageAction = requeue -> requeue ? rejectRequeue : rejectNoRequeue;
}

@Override
public void newConnection(final Connection connection) {
try {
Expand Down Expand Up @@ -237,17 +243,27 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {

@Override
public void basicNack(Channel channel, long deliveryTag) {
// replaced by #basicNack(Channel, long, boolean)
}

@Override
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
try {
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction);
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
} catch(Exception e) {
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
}
}

@Override
public void basicReject(Channel channel, long deliveryTag) {
// replaced by #basicReject(Channel, long, boolean)
}

@Override
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
try {
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction);
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
} catch(Exception e) {
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
}
Expand Down Expand Up @@ -408,9 +424,18 @@ private ChannelState(Channel channel) {

/**
* Marks the event of a rejected message.
*
* @deprecated Use {@link #markRejectedMessage(boolean)} instead
*/
protected abstract void markRejectedMessage();

/**
* Marks the event of a rejected message.
*/
protected void markRejectedMessage(boolean requeue) {
this.markRejectedMessage();
}

/**
* Marks the event of a message publishing acknowledgement.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException
{
transmit(new Basic.Nack(deliveryTag, multiple, requeue));
metricsCollector.basicNack(this, deliveryTag);
metricsCollector.basicNack(this, deliveryTag, requeue);
}

/** Public API - {@inheritDoc} */
Expand All @@ -1222,7 +1222,7 @@ public void basicReject(long deliveryTag, boolean requeue)
throws IOException
{
transmit(new Basic.Reject(deliveryTag, requeue));
metricsCollector.basicReject(this, deliveryTag);
metricsCollector.basicReject(this, deliveryTag, requeue);
}

/** Public API - {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {

private final Counter rejectedMessages;

private final Counter requeuedMessages;

public MicrometerMetricsCollector(MeterRegistry registry) {
this(registry, "rabbitmq");
}
Expand Down Expand Up @@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
this.requeuedMessages = (Counter) metricsCreator.apply(REQUEUED_MESSAGES);
}

@Override
Expand Down Expand Up @@ -133,7 +136,16 @@ protected void markAcknowledgedMessage() {
}

@Override
@SuppressWarnings("deprecation")
protected void markRejectedMessage() {

}

@Override
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedMessages.increment();
}
rejectedMessages.increment();
}

Expand Down Expand Up @@ -192,6 +204,10 @@ public Counter getRejectedMessages() {
return rejectedMessages;
}

public Counter getRequeuedMessages() {
return requeuedMessages;
}

public enum Metrics {
CONNECTIONS {
@Override
Expand Down Expand Up @@ -229,6 +245,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".rejected", tags);
}
},
REQUEUED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".requeued", tags);
}
},
FAILED_TO_PUBLISH_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
private final LongCounter ackedPublishedMessagesCounter;
private final LongCounter nackedPublishedMessagesCounter;
private final LongCounter unroutedPublishedMessagesCounter;
private final LongCounter requeuedMessagesCounter;

public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
this(openTelemetry, "rabbitmq");
Expand Down Expand Up @@ -100,6 +101,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
.setDescription("The number of messages rejected from the RabbitMQ server")
.build();

// requeuedPublishedMessages
this.requeuedMessagesCounter = meter.counterBuilder(prefix + ".requeued")
.setUnit("{messages}")
.setDescription("The number of re-queued messages to the RabbitMQ server")
.build();

// failedToPublishMessages
this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
.setUnit("{messages}")
Expand Down Expand Up @@ -166,7 +173,16 @@ protected void markAcknowledgedMessage() {
}

@Override
@SuppressWarnings("deprecation")
protected void markRejectedMessage() {

}

@Override
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedMessagesCounter.add(1L, attributes);
}
rejectedMessagesCounter.add(1L, attributes);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -41,12 +41,12 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
private final Meter consumedMessages;
private final Meter acknowledgedMessages;
private final Meter rejectedMessages;
private final Meter requeuedMessages;
private final Meter failedToPublishMessages;
private final Meter publishAcknowledgedMessages;
private final Meter publishNacknowledgedMessages;
private final Meter publishUnroutedMessages;


public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.registry = registry;
this.connections = registry.counter(metricsPrefix+".connections");
Expand All @@ -59,6 +59,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
this.requeuedMessages = registry.meter(metricsPrefix+".requeued");
}

public StandardMetricsCollector() {
Expand Down Expand Up @@ -110,7 +111,16 @@ protected void markAcknowledgedMessage() {
}

@Override
@SuppressWarnings("deprecation")
protected void markRejectedMessage() {

}

@Override
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedMessages.mark();
}
rejectedMessages.mark();
}

Expand Down Expand Up @@ -157,6 +167,10 @@ public Meter getRejectedMessages() {
return rejectedMessages;
}

public Meter getRequeuedMessages() {
return this.requeuedMessages;
}

public Meter getFailedToPublishMessages() {
return failedToPublishMessages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw
return;
}
transmit(new Basic.Nack(realTag, multiple, requeue));
metricsCollector.basicNack(this, deliveryTag);
metricsCollector.basicNack(this, deliveryTag, requeue);
}

@Override
Expand All @@ -137,7 +137,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException {
long realTag = deliveryTag - activeDeliveryTagOffset;
if (realTag > 0) {
transmit(new Basic.Reject(realTag, requeue));
metricsCollector.basicReject(this, deliveryTag);
metricsCollector.basicReject(this, deliveryTag, requeue);
}
}

Expand Down
Loading