Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed bug leading to semaphore leak when filtering messages #1891

Merged
merged 16 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.Optional;
import java.util.Set;

import static java.util.stream.Collectors.toList;

/**
* Implementation note: this class is partially mutable and may be accessed from multiple
* threads involved in message lifecycle, it must be thread safe.
Expand Down Expand Up @@ -53,6 +51,8 @@ public class Message implements FilterableMessage {

private long currentMessageBackoff = -1;

private boolean isFiltered = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we must ensure that the this value is visible to all threads (see the comment on top of this class)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked it and this is always used within a single thread.


public Message(String id,
String topic,
byte[] content,
Expand Down Expand Up @@ -122,7 +122,7 @@ public boolean isTtlExceeded(long ttlMillis) {

public synchronized void incrementRetryCounter(Collection<URI> succeededUris) {
this.retryCounter++;
this.succeededUris.addAll(succeededUris.stream().map(URI::toString).collect(toList()));
this.succeededUris.addAll(succeededUris.stream().map(URI::toString).toList());
}

public synchronized int getRetryCounter() {
Expand Down Expand Up @@ -206,6 +206,14 @@ public String getSubscription() {
return subscription;
}

public synchronized boolean isFiltered() {
return isFiltered;
}

public synchronized void setFiltered(boolean filtered) {
isFiltered = filtered;
}

public static class Builder {
private String id;
private PartitionOffset partitionOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,19 @@ public void consume(Runnable signalsInterrupt) {
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

if (logger.isDebugEnabled()) {
logger.debug(
"Read message {} partition {} offset {}",
message.getContentType(), message.getPartition(), message.getOffset()
);
if (message.isFiltered()) {
profiler.flushMeasurements(ConsumerRun.FILTERED);
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"Read message {} partition {} offset {}",
message.getContentType(), message.getPartition(), message.getOffset()
);
}

Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
sendMessage(convertedMessage, profiler);
}

Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
sendMessage(convertedMessage, profiler);
} else {
pendingOffsets.releaseSlot();
profiler.flushMeasurements(ConsumerRun.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ private Optional<Message> readAndTransform(Subscription subscription, String bat
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build();
if (!message.isFiltered()) {
Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build();

trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId));
trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId));

return Optional.of(transformed);
return Optional.of(transformed);
}
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package pl.allegro.tech.hermes.consumers.consumer.profiling;

public enum ConsumerRun {
EMPTY, DELIVERED, DISCARDED, RETRIED
EMPTY, DELIVERED, DISCARDED, RETRIED, FILTERED
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@

public interface MessageReceiver {

/**
* Retrieves the next available message from the queue.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

* <p>Depending on the context, the returned {@link Optional} can contain:
* <ul>
* <li>A {@link Message} that contains a valid message ready to be sent.</li>
* <li>A {@link Message} with the `isFiltered` flag set, indicating that the message
* has been filtered and should be skipped during processing or sending.</li>
* <li>{@code null}, indicating that there are no messages currently available in the queue.</li>
* </ul>
*
* @return an {@link Optional} containing the next {@link Message} if available;
* an {@link Optional} containing a filtered message if it should be skipped;
* or an empty {@link Optional} if there are no messages in the queue.
*/
Optional<Message> next();

default void stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ public FilteringMessageReceiver(MessageReceiver receiver,

@Override
public Optional<Message> next() {
return receiver.next().map(message ->
allow(message) ? message : null
);
return receiver.next().map(this::filter);
}

private boolean allow(Message message) {
private Message filter(Message message) {
FilterResult result = filterChain.apply(message);
filteredMessageHandler.handle(result, message, subscription);
return !result.isFiltered();
message.setFiltered(result.isFiltered());
return message;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.MessageFilterSpecification;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension;
Expand All @@ -23,10 +24,14 @@
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
import static com.google.common.collect.ImmutableMap.of;
import static java.util.Arrays.stream;
import static org.awaitility.Awaitility.waitAtMost;
import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy;
import static pl.allegro.tech.hermes.api.TopicWithSchema.topicWithSchema;
import static pl.allegro.tech.hermes.integrationtests.assertions.HermesAssertions.assertThatMetrics;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscriptionWithRandomName;
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName;

public class BatchDeliveryTest {
Expand All @@ -39,10 +44,19 @@ public class BatchDeliveryTest {
@RegisterExtension
public static final TestSubscribersExtension subscribers = new TestSubscribersExtension();

static final AvroUser BOB = new AvroUser("Bob", 50, "blue");

static final AvroUser ALICE = new AvroUser("Alice", 20, "magenta");

private static final TestMessage[] SMALL_BATCH = TestMessage.simpleMessages(2);

private static final TestMessage SINGLE_MESSAGE = TestMessage.simple();

private static final TestMessage SINGLE_MESSAGE_FILTERED = BOB.asTestMessage();

private static final MessageFilterSpecification MESSAGE_NAME_FILTER =
new MessageFilterSpecification(of("type", "jsonpath", "path", ".name", "matcher", "^Bob.*"));

@Test
public void shouldDeliverMessagesInBatch() {
// given
Expand All @@ -67,6 +81,43 @@ public void shouldDeliverMessagesInBatch() {
expectSingleBatch(subscriber, SMALL_BATCH);
}

@Test
public void shouldFilterIncomingEventsForBatch() {
// given
TestSubscriber subscriber = subscribers.createSubscriber();
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
final Subscription subscription = hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint())
.withSubscriptionPolicy(buildBatchPolicy()
.withBatchSize(2)
.withBatchTime(3)
.withBatchVolume(1024)
.build())
.withFilter(MESSAGE_NAME_FILTER)
.build());

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), BOB.asJson());
hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson());

// then
expectSingleBatch(subscriber, SINGLE_MESSAGE_FILTERED);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestions:

  1. check the value of metric subscription.filtered-out
  2. publish bob first - he is getting filtered
  3. maybe set batch size to 2, and batch time to some small value?

waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_filtered_out_total")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(1.0)
)
);
}

@Test
public void shouldDeliverBatchInGivenTimePeriod() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,69 @@ public void shouldReportMetricForFilteredSubscription() {
);
}

@Test
public void shouldNotIncreaseInflightForFilteredSubscription() {
// given
TestSubscriber subscriber = subscribers.createSubscriber(503);
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
final Subscription subscription = hermes.initHelper().createSubscription(
subscription(topic, "subscription")
.withEndpoint(subscriber.getEndpoint())
.withSubscriptionPolicy(
subscriptionPolicy()
.withMessageTtl(3600)
.withInflightSize(1)
.build()
)
.withFilter(filterMatchingHeaderByPattern("Trace-Id", "^vte.*"))
.build()
);
TestMessage unfiltered = TestMessage.of("msg", "unfiltered");
TestMessage filtered = TestMessage.of("msg", "filtered");

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId"));
hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId"));

// then
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_inflight")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(0.0)
)
);

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12"));
hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12"));

// then
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_inflight")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(1.0)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 1.0 instead of 2.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because previously we set subscriptionPolicy().withInflightSize(1) and we want to be sure, that this value wouldn't be exceeded when there are more messages and there were messages filtered previously.

);
}

@Test
public void shouldReportMetricsForSuccessfulBatchDelivery() {
// given
Expand Down
Loading