Skip to content

Commit

Permalink
Allow for 'empty' broadcast send attempt (#1876)
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky authored Jun 25, 2024
1 parent 012581a commit 2673b98
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public String getId() {
return id;
}

public synchronized Set<String> getSucceededUris() {
return succeededUris;
}

@Override
public Map<String, String> getExternalMetadata() {
return externalMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
import pl.allegro.tech.hermes.consumers.consumer.sender.resolver.ResolvableEndpointAddress;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -58,17 +62,35 @@ public CompletableFuture<MessageSendingResult> send(Message message) {

private CompletableFuture<List<SingleMessageSendingResult>> sendMessage(Message message) {
try {
List<CompletableFuture<SingleMessageSendingResult>> results = collectResults(message);
Set<CompletableFuture<SingleMessageSendingResult>> results = collectResults(message);
return mergeResults(results);
} catch (EndpointAddressResolutionException exception) {
return CompletableFuture.completedFuture(Collections.singletonList(exceptionMapper.apply(exception)));
}
}

private List<CompletableFuture<SingleMessageSendingResult>> collectResults(
private Set<CompletableFuture<SingleMessageSendingResult>> collectResults(
Message message
) throws EndpointAddressResolutionException {
var currentResults = sendPendingMessages(message);
var results = new HashSet<>(currentResults);

// add previously succeeded uris to the result set so that successful uris from all attempts are retained.
// this way a MessageSendingResult can be considered successful even when the last send attempt
// did not send to any uri, e.g. because all uris returned by endpoint resolver were already sent to in the past.
for (String succeededUri : message.getSucceededUris()) {
try {
var uri = new URI(succeededUri);
var result = MessageSendingResult.succeededResult(uri);
results.add(CompletableFuture.completedFuture(result));
} catch (URISyntaxException exception) {
logger.error("Error while parsing already sent broadcast URI {}", succeededUri, exception);
}
}
return results;
}

private Set<CompletableFuture<SingleMessageSendingResult>> sendPendingMessages(Message message) throws EndpointAddressResolutionException {
final HttpRequestData requestData = new HttpRequestDataBuilder()
.withRawAddress(endpoint.getRawAddress())
.build();
Expand All @@ -80,16 +102,16 @@ private List<CompletableFuture<SingleMessageSendingResult>> collectResults(

if (resolvedUris.isEmpty()) {
logger.debug("Empty resolved URIs for message: {}", message.getId());
return Collections.emptyList();
return Collections.emptySet();
} else {
return resolvedUris.stream()
.map(uri -> requestFactory.buildRequest(message, uri, headers))
.map(this::processResponse)
.collect(Collectors.toList());
.collect(Collectors.toSet());
}
}

private CompletableFuture<List<SingleMessageSendingResult>> mergeResults(List<CompletableFuture<SingleMessageSendingResult>> results) {
private CompletableFuture<List<SingleMessageSendingResult>> mergeResults(Set<CompletableFuture<SingleMessageSendingResult>> results) {
return CompletableFuture.allOf(results.toArray(new CompletableFuture[results.size()]))
.thenApply(v -> results.stream()
.map(CompletableFuture::join)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class JettyBroadCastMessageSenderTest extends Specification {
future.get(1, TimeUnit.SECONDS).succeeded()
}

def "should return not succeeded and retry later when endpoint resolver return no hosts"() {
def "should return not succeeded and retry later when endpoint resolver return no hosts and no message was sent previously"() {
given:
def address = Stub(ResolvableEndpointAddress) {
resolveAllFor(_ as Message) >> []
Expand All @@ -173,6 +173,49 @@ class JettyBroadCastMessageSenderTest extends Specification {
messageSendingResult.isRetryLater()
}

def "should return succeeded when endpoint resolver return no hosts and but message was sent previously"() {
given:
Message message = testMessage()
message.incrementRetryCounter([serviceEndpoints[0].url])
def address = Stub(ResolvableEndpointAddress) {
resolveAllFor(_ as Message) >> []

getRawAddress() >> endpoint
}

def httpRequestFactory = new DefaultHttpRequestFactory(client, 1000, 1000, new DefaultHttpMetadataAppender())
MessageSender messageSender = new JettyBroadCastMessageSender(httpRequestFactory, address,
requestHeadersProvider, resultHandlersProvider, Mock(ResilientMessageSender))

when:
def future = messageSender.send(message)

then:
MessageSendingResult messageSendingResult = future.get(1, TimeUnit.SECONDS)

messageSendingResult.succeeded()
}


def "should return succeeded when endpoint resolver returns the same urls that the message was already sent to"() {
given: "a message that was sent"
ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) {
0 * registerSuccessfulSending()
}

serviceEndpoints.forEach { endpoint -> endpoint.expectMessages(TEST_MESSAGE_CONTENT) }

Message message = testMessage()
message.incrementRetryCounter(serviceEndpoints.collect { it.url })

when:
def future = getSender(rateLimiter).send(message)

then:
MessageSendingResult messageSendingResult = future.get(1, TimeUnit.SECONDS)
messageSendingResult.succeeded()
}

def cleanupSpec() {
wireMockServers.forEach { it.stop() }
client.stop()
Expand Down

0 comments on commit 2673b98

Please sign in to comment.