Skip to content

Commit

Permalink
Merge pull request #577 from aihnat/481_add_retries_delays_in_client
Browse files Browse the repository at this point in the history
(Resolving #481) adding exponential delay between retries
  • Loading branch information
druminski authored Sep 16, 2016
2 parents 52fbd47 + c3f53f7 commit 673c859
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/docs/user/java-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Client should be always built using `HermesClientBuilder`, which allows on setti
HermesClient client = HermesClientBuilder.hermesClient(...)
.withURI(...) // Hermes URI
.withRetries(...) // how many times retry in case of errors, default: 3
.withRetrySleep(...) // initial and max delay between consecutive retries in milliseconds, default: 100ms (initial), 300ms (max)
.withDefaultContentType(...) // what Content-Type to use when none set, default: application/json
.withDefaultHeaderValue(...) // append default headers added to each message
.withMetrics(metricsRegistry) // see Metrics section below
Expand Down
1 change: 1 addition & 0 deletions hermes-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
provided group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.2.0'
provided group: 'org.eclipse.jetty.alpn', name: 'alpn-api', version: versions.alpn_api

compile group: 'net.jodah', name: 'failsafe', version: '0.9.3'

testCompile group: 'org.spockframework', name: 'spock-core', version: versions.spock
testCompile group: 'com.github.tomakehurst', name: 'wiremock', version: versions.wiremock
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,55 @@
package pl.allegro.tech.hermes.client;

import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.IntStream.range;
import static pl.allegro.tech.hermes.client.HermesMessage.hermesMessage;

public class HermesClient {

private final HermesSender sender;
private final String uri;
private final Map<String, String> defaultHeaders;
private final int retries;
private final Predicate<HermesResponse> retryCondition;
private final AtomicInteger currentlySending = new AtomicInteger(0);
private final RetryPolicy retryPolicy;
private final ScheduledExecutorService scheduler;
private volatile boolean shutdown = false;

HermesClient(HermesSender sender,
URI uri,
Map<String, String> defaultHeaders,
int retries,
Predicate<HermesResponse> retryCondition) {
Predicate<HermesResponse> retryCondition,
long retrySleepInMillis,
long maxRetrySleepInMillis,
ScheduledExecutorService scheduler) {
this.sender = sender;
this.uri = createUri(uri);
this.defaultHeaders = Collections.unmodifiableMap(new HashMap<>(defaultHeaders));
this.retries = retries;
this.retryCondition = retryCondition;
this.retryPolicy = createRetryPolicy(retries, retryCondition, retrySleepInMillis, maxRetrySleepInMillis);
this.scheduler = scheduler;
}

private RetryPolicy createRetryPolicy(int retries, Predicate<HermesResponse> retryCondition,
long retrySleepInMillis, long maxRetrySleepInMillis) {
RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(retries)
.retryIf(retryCondition::test);
if (retrySleepInMillis > 0) {
retryPolicy.withBackoff(retrySleepInMillis, maxRetrySleepInMillis, TimeUnit.MILLISECONDS);
}
return retryPolicy;
}

private String createUri(URI uri) {
Expand Down Expand Up @@ -77,12 +90,14 @@ public CompletableFuture<HermesResponse> publish(HermesMessage message) {
return completedWithShutdownException();
}
HermesMessage.appendDefaults(message, defaultHeaders);
return publish(message, (response) -> retryCondition.test(response) ? sendOnce(message) : completedFuture(response));
return publishWithRetries(message);
}

private CompletableFuture<HermesResponse> publish(HermesMessage message, Function<HermesResponse, CompletionStage<HermesResponse>> retryDecision) {
private CompletableFuture<HermesResponse> publishWithRetries(HermesMessage message) {
currentlySending.incrementAndGet();
return range(0, retries).boxed().reduce(sendOnce(message), (future, attempt) -> future.thenCompose(retryDecision), (future, attempt) -> future)
return Failsafe.with(retryPolicy)
.with(scheduler)
.future(() -> sendOnce(message))
.whenComplete((response, ex) -> currentlySending.decrementAndGet());
}

Expand All @@ -98,7 +113,9 @@ private CompletableFuture<HermesResponse> completedWithShutdownException() {

public CompletableFuture<Void> closeAsync(long pollInterval) {
shutdown = true;
return new HermesClientTermination(pollInterval).observe(() -> currentlySending.get() == 0);
return new HermesClientTermination(pollInterval)
.observe(() -> currentlySending.get() == 0)
.whenComplete((response, ex) -> scheduler.shutdown());
}

public void close(long pollInterval, long timeout) throws InterruptedException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import com.codahale.metrics.MetricRegistry;
import pl.allegro.tech.hermes.client.metrics.MetricsHermesSender;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class HermesClientBuilder {

Expand All @@ -15,6 +18,9 @@ public class HermesClientBuilder {
private final Map<String, String> defaultHeaders = new HashMap<>();
private int retries = 3;
private Predicate<HermesResponse> retryCondition = new HermesClientBasicRetryCondition();
private long retrySleepInMillis = 100;
private long maxRetrySleepInMillis = 300;
private Supplier<ScheduledExecutorService> schedulerFactory = Executors::newSingleThreadScheduledExecutor;

public HermesClientBuilder(HermesSender sender) {
this.sender = sender;
Expand All @@ -26,7 +32,8 @@ public static HermesClientBuilder hermesClient(HermesSender sender) {
}

public HermesClient build() {
return new HermesClient(sender, uri, defaultHeaders, retries, retryCondition);
return new HermesClient(sender, uri, defaultHeaders, retries, retryCondition, retrySleepInMillis,
maxRetrySleepInMillis, schedulerFactory.get());
}

public HermesClientBuilder withURI(URI uri) {
Expand Down Expand Up @@ -58,4 +65,20 @@ public HermesClientBuilder withRetries(int retries, Predicate<HermesResponse> re
this.retryCondition = retryCondition;
return withRetries(retries);
}

public HermesClientBuilder withRetrySleep(long retrySleepInMillis) {
this.retrySleepInMillis = retrySleepInMillis;
return this;
}

public HermesClientBuilder withRetrySleep(long retrySleepInMillis, long maxRetrySleepInMillis) {
this.retrySleepInMillis = retrySleepInMillis;
this.maxRetrySleepInMillis = maxRetrySleepInMillis;
return this;
}

public HermesClientBuilder withScheduler(ScheduledExecutorService scheduler) {
this.schedulerFactory = () -> scheduler;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ class HermesClientMetricsTest extends Specification {

def "should register latency timer in sanitized path"() {
given:
HermesClient client = hermesClient({uri, msg -> completedFuture({201} as HermesResponse)}).withMetrics(metrics).build()
HermesClient client = hermesClient({uri, msg -> completedFuture({201} as HermesResponse)})
.withRetrySleep(0)
.withMetrics(metrics).build()

when:
client.publish("com.group.topic", "123").join()
Expand All @@ -26,7 +28,9 @@ class HermesClientMetricsTest extends Specification {

def "should close timer on exceptional completion and log failure metric"() {
given:
HermesClient client = hermesClient({uri, msg -> failingFuture(new RuntimeException())}).withMetrics(metrics).build()
HermesClient client = hermesClient({uri, msg -> failingFuture(new RuntimeException())})
.withRetrySleep(0)
.withMetrics(metrics).build()

when:
silence({ client.publish("com.group.topic", "123").join() })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ class HermesClientTest extends Specification {

def "should interpret message as failed for status different than 201 or 202"() {
given:
HermesClient client = hermesClient({uri, msg -> statusFuture(status)}).withURI(create(HERMES_URI)).build()
HermesClient client = hermesClient({uri, msg -> statusFuture(status)})
.withURI(create(HERMES_URI))
.withRetrySleep(0)
.build()

when:
HermesResponse response = client.publish(TOPIC, CONTENT_TYPE, CONTENT).join()
Expand All @@ -76,11 +79,13 @@ class HermesClientTest extends Specification {
def "should retry on http failure"() {
given:
CountDownLatch latch = new CountDownLatch(5)
HermesClient client = hermesClient(getCountDownSender(latch, (Integer) status)).withRetries(5).build()
HermesClient client = hermesClient(getCountDownSender(latch, (Integer) status))
.withRetries(5)
.withRetrySleep(0)
.build()

when:
client.publish(TOPIC, CONTENT_TYPE, CONTENT).join()

then:
latch.count == 0

Expand All @@ -91,7 +96,10 @@ class HermesClientTest extends Specification {
def "should retry on sender exception"() {
given:
CountDownLatch latch = new CountDownLatch(5)
HermesClient client = hermesClient(getExceptionallyFailingCountDownSender(latch)).withRetries(5).build()
HermesClient client = hermesClient(getExceptionallyFailingCountDownSender(latch))
.withRetries(5)
.withRetrySleep(0)
.build()

when:
client.publish(TOPIC, CONTENT_TYPE, CONTENT).join()
Expand All @@ -103,7 +111,9 @@ class HermesClientTest extends Specification {
def "should not retry when supplied retry condition says it should not retry"() {
given:
CountDownLatch latch = new CountDownLatch(2)
HermesClient client = hermesClient(getCountDownSender(latch, 503)).withRetries(5, {false}).build()
HermesClient client = hermesClient(getCountDownSender(latch, 503))
.withRetries(5, {false})
.build()

when:
client.publish(TOPIC, CONTENT_TYPE, CONTENT).join()
Expand All @@ -115,7 +125,10 @@ class HermesClientTest extends Specification {
def "should not retry when one of the attempts succeeds to send"() {
given:
CountDownLatch latch = new CountDownLatch(5)
HermesClient client = hermesClient(getCountDownSender(latch, {latch.getCount() > 2 ? 408 : 201})).withRetries(5).build()
HermesClient client = hermesClient(getCountDownSender(latch, {latch.getCount() > 2 ? 408 : 201}))
.withRetries(5)
.withRetrySleep(0)
.build()

when:
client.publish(TOPIC, CONTENT_TYPE, CONTENT).join()
Expand All @@ -127,7 +140,10 @@ class HermesClientTest extends Specification {
def "should wait until all sent after shutdown"() {
given:
CountDownLatch latch = new CountDownLatch(5)
HermesClient client = hermesClient(getCountDownDelayedSender(latch, 408, 20)).withRetries(5).build()
HermesClient client = hermesClient(getCountDownDelayedSender(latch, 408, 20))
.withRetries(5)
.withRetrySleep(0)
.build()

when:
client.publish(TOPIC, CONTENT_TYPE, CONTENT)
Expand All @@ -154,7 +170,10 @@ class HermesClientTest extends Specification {
def "should keep retrying on sender exception after shutdown"() {
given:
CountDownLatch latch = new CountDownLatch(5)
HermesClient client = hermesClient(getExceptionallyFailingCountDownSender(latch, 20)).withRetries(5).build()
HermesClient client = hermesClient(getExceptionallyFailingCountDownSender(latch, 20))
.withRetries(5)
.withRetrySleep(0)
.build()

when:
client.publish(TOPIC, CONTENT_TYPE, CONTENT).join()
Expand Down Expand Up @@ -201,6 +220,21 @@ class HermesClientTest extends Specification {
headers['Header'] == 'OtherValue'
}

def "should retry on sender exception when retry sleep is provided"() {
given:
CountDownLatch latch = new CountDownLatch(2)
HermesClient client = hermesClient(getExceptionallyFailingCountDownSender(latch))
.withRetries(2)
.withRetrySleep(10)
.build()

when:
client.publish(HermesMessage.hermesMessage(TOPIC, CONTENT).build()).join()

then:
latch.count == 0
}

private HermesSender getExceptionallyFailingCountDownSender(CountDownLatch latch, long delay) {
{ uri, msg ->
def future = new CompletableFuture()
Expand Down

0 comments on commit 673c859

Please sign in to comment.