From 2673b98b138cffeb6591291dcc8aae3c2d85783e Mon Sep 17 00:00:00 2001 From: Maciej Moscicki Date: Tue, 25 Jun 2024 13:00:39 +0200 Subject: [PATCH] Allow for 'empty' broadcast send attempt (#1876) --- .../hermes/consumers/consumer/Message.java | 4 ++ .../http/JettyBroadCastMessageSender.java | 32 ++++++++++--- .../JettyBroadCastMessageSenderTest.groovy | 45 ++++++++++++++++++- 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java index c0d5c5df17..c0ed185669 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java @@ -138,6 +138,10 @@ public String getId() { return id; } + public synchronized Set getSucceededUris() { + return succeededUris; + } + @Override public Map getExternalMetadata() { return externalMetadata; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java index 73cabc57e8..31e86f7a45 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java @@ -17,8 +17,12 @@ import pl.allegro.tech.hermes.consumers.consumer.sender.resolver.ResolvableEndpointAddress; import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -58,17 +62,35 @@ public CompletableFuture send(Message message) { private CompletableFuture> sendMessage(Message message) { try { - List> results = collectResults(message); + Set> results = collectResults(message); return mergeResults(results); } catch (EndpointAddressResolutionException exception) { return CompletableFuture.completedFuture(Collections.singletonList(exceptionMapper.apply(exception))); } } - private List> collectResults( + private Set> collectResults( Message message ) throws EndpointAddressResolutionException { + var currentResults = sendPendingMessages(message); + var results = new HashSet<>(currentResults); + + // add previously succeeded uris to the result set so that successful uris from all attempts are retained. + // this way a MessageSendingResult can be considered successful even when the last send attempt + // did not send to any uri, e.g. because all uris returned by endpoint resolver were already sent to in the past. + for (String succeededUri : message.getSucceededUris()) { + try { + var uri = new URI(succeededUri); + var result = MessageSendingResult.succeededResult(uri); + results.add(CompletableFuture.completedFuture(result)); + } catch (URISyntaxException exception) { + logger.error("Error while parsing already sent broadcast URI {}", succeededUri, exception); + } + } + return results; + } + private Set> sendPendingMessages(Message message) throws EndpointAddressResolutionException { final HttpRequestData requestData = new HttpRequestDataBuilder() .withRawAddress(endpoint.getRawAddress()) .build(); @@ -80,16 +102,16 @@ private List> collectResults( if (resolvedUris.isEmpty()) { logger.debug("Empty resolved URIs for message: {}", message.getId()); - return Collections.emptyList(); + return Collections.emptySet(); } else { return resolvedUris.stream() .map(uri -> requestFactory.buildRequest(message, uri, headers)) .map(this::processResponse) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } } - private CompletableFuture> mergeResults(List> results) { + private CompletableFuture> mergeResults(Set> results) { return CompletableFuture.allOf(results.toArray(new CompletableFuture[results.size()])) .thenApply(v -> results.stream() .map(CompletableFuture::join) diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy index 06d1bf7832..10688bb01a 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy @@ -150,7 +150,7 @@ class JettyBroadCastMessageSenderTest extends Specification { future.get(1, TimeUnit.SECONDS).succeeded() } - def "should return not succeeded and retry later when endpoint resolver return no hosts"() { + def "should return not succeeded and retry later when endpoint resolver return no hosts and no message was sent previously"() { given: def address = Stub(ResolvableEndpointAddress) { resolveAllFor(_ as Message) >> [] @@ -173,6 +173,49 @@ class JettyBroadCastMessageSenderTest extends Specification { messageSendingResult.isRetryLater() } + def "should return succeeded when endpoint resolver return no hosts and but message was sent previously"() { + given: + Message message = testMessage() + message.incrementRetryCounter([serviceEndpoints[0].url]) + def address = Stub(ResolvableEndpointAddress) { + resolveAllFor(_ as Message) >> [] + + getRawAddress() >> endpoint + } + + def httpRequestFactory = new DefaultHttpRequestFactory(client, 1000, 1000, new DefaultHttpMetadataAppender()) + MessageSender messageSender = new JettyBroadCastMessageSender(httpRequestFactory, address, + requestHeadersProvider, resultHandlersProvider, Mock(ResilientMessageSender)) + + when: + def future = messageSender.send(message) + + then: + MessageSendingResult messageSendingResult = future.get(1, TimeUnit.SECONDS) + + messageSendingResult.succeeded() + } + + + def "should return succeeded when endpoint resolver returns the same urls that the message was already sent to"() { + given: "a message that was sent" + ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) { + 0 * registerSuccessfulSending() + } + + serviceEndpoints.forEach { endpoint -> endpoint.expectMessages(TEST_MESSAGE_CONTENT) } + + Message message = testMessage() + message.incrementRetryCounter(serviceEndpoints.collect { it.url }) + + when: + def future = getSender(rateLimiter).send(message) + + then: + MessageSendingResult messageSendingResult = future.get(1, TimeUnit.SECONDS) + messageSendingResult.succeeded() + } + def cleanupSpec() { wireMockServers.forEach { it.stop() } client.stop()