Skip to content

Commit

Permalink
Make filtered message receiver always returning a message
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Aug 14, 2024
1 parent ffcc75b commit d2a4130
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class Message implements FilterableMessage {

private long currentMessageBackoff = -1;

private boolean isFiltered = false;

public Message(String id,
String topic,
byte[] content,
Expand Down Expand Up @@ -122,7 +124,7 @@ public boolean isTtlExceeded(long ttlMillis) {

public synchronized void incrementRetryCounter(Collection<URI> succeededUris) {
this.retryCounter++;
this.succeededUris.addAll(succeededUris.stream().map(URI::toString).collect(toList()));
this.succeededUris.addAll(succeededUris.stream().map(URI::toString).toList());
}

public synchronized int getRetryCounter() {
Expand Down Expand Up @@ -206,6 +208,14 @@ public String getSubscription() {
return subscription;
}

public boolean isFiltered() {
return isFiltered;
}

public void setFiltered(boolean filtered) {
isFiltered = filtered;
}

public static class Builder {
private String id;
private PartitionOffset partitionOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,19 @@ public void consume(Runnable signalsInterrupt) {
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

if (logger.isDebugEnabled()) {
logger.debug(
"Read message {} partition {} offset {}",
message.getContentType(), message.getPartition(), message.getOffset()
);
if (message.isFiltered()) {
profiler.flushMeasurements(ConsumerRun.FILTERED);
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"Read message {} partition {} offset {}",
message.getContentType(), message.getPartition(), message.getOffset()
);
}

Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
sendMessage(convertedMessage, profiler);
}

Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
sendMessage(convertedMessage, profiler);
} else {
pendingOffsets.releaseSlot();
profiler.flushMeasurements(ConsumerRun.EMPTY);
Expand Down Expand Up @@ -190,7 +194,7 @@ private void initializeMessageReceiver() {
rateLimiter,
loadRecorder,
metrics,
pendingOffsets::markAsFiltered
pendingOffsets::markAsProcessed
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ public void markAsProcessed(SubscriptionPartitionOffset subscriptionPartitionOff
slots.put(subscriptionPartitionOffset, MessageState.PROCESSED);
}

public void markAsFiltered(SubscriptionPartitionOffset subscriptionPartitionOffset) {
slots.put(subscriptionPartitionOffset, MessageState.FILTERED);
}

public boolean tryAcquireSlot(Duration processingInterval) throws InterruptedException {
if (inflightSemaphore.tryAcquire(processingInterval.toMillis(), TimeUnit.MILLISECONDS)) {
if (maxPendingOffsetsSemaphore.tryAcquire(processingInterval.toMillis(), TimeUnit.MILLISECONDS)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package pl.allegro.tech.hermes.consumers.consumer.profiling;

public enum ConsumerRun {
EMPTY, DELIVERED, DISCARDED, RETRIED
EMPTY, DELIVERED, DISCARDED, RETRIED, FILTERED
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ public FilteringMessageReceiver(MessageReceiver receiver,

@Override
public Optional<Message> next() {
return receiver.next().map(message ->
allow(message) ? message : null
);
return receiver.next().map(this::filter);
}

private boolean allow(Message message) {
private Message filter(Message message) {
FilterResult result = filterChain.apply(message);
filteredMessageHandler.handle(result, message, subscription);
return !result.isFiltered();
message.setFiltered(result.isFiltered());
return message;
}

@Override
Expand Down

0 comments on commit d2a4130

Please sign in to comment.