Skip to content

Instantly share code, notes, and snippets.

@petrbouda
Last active June 22, 2024 19:00
Show Gist options
  • Select an option

  • Save petrbouda/92647b243eac71b089eb4fb2cfa90bf2 to your computer and use it in GitHub Desktop.

Select an option

Save petrbouda/92647b243eac71b089eb4fb2cfa90bf2 to your computer and use it in GitHub Desktop.
Retry Mechanism for JDK HTTP Client
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
public class HttpInvocation<T> {
private static final Logger LOG = LoggerFactory.getLogger(HttpInvocation.class);
/**
* Retry on all exceptions that inherits from IOException:
* <ul>
* <li>{@link java.net.http.HttpTimeoutException}</li>
* <li>{@link java.net.http.HttpConnectTimeoutException}</li>
* <li>{@link java.nio.channels.ClosedChannelException}</li>
* <li>{@link sun.net.ConnectionResetException}</li>
* </ul>
*/
private static final Predicate<Throwable> DEFAULT_RETRY_ON_THROWABLE =
ex -> ex instanceof IOException;
/**
* A default number of maximum retries on both types <b>on-response</b> and <b>on-throwable</b>
*/
private static final int DEFAULT_MAX_ATTEMPTS = 5;
/**
* When a retry on-response exceeded then throw an exception by default.
*/
private static final boolean DEFAULT_THROW_WHEN_RETRY_ON_RESPONSE_EXCEEDED = true;
/**
* By default it waits 5 seconds between two retries.
*/
private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(5);
/**
* If there is no {@link BodyHandler} specified, then discard entire incoming entity in a response.
*/
private static final BodyHandler<Void> DEFAULT_BODY_HANDLER = HttpResponse.BodyHandlers.discarding();
private final HttpClient client;
private final HttpRequest request;
private final BodyHandler<T> handler;
private final AtomicInteger attempts = new AtomicInteger();
private final Predicate<HttpResponse<?>> retryOnResponse;
private final Predicate<Throwable> retryOnThrowable;
private final int maxAttempts;
private final boolean throwWhenRetryOnResponseExceeded;
private final Executor delayedExecutor;
private HttpInvocation(Builder<T> builder) {
this.client = builder.client != null
? builder.client : HttpClient.newHttpClient();
this.request = builder.request;
this.handler = builder.bodyHandler;
this.maxAttempts = builder.maxAttempts != null
? builder.maxAttempts : DEFAULT_MAX_ATTEMPTS;
this.retryOnResponse = builder.retryOnResponse != null
? builder.retryOnResponse : DEFAULT_RETRY_ON_RESPONSE;
this.retryOnThrowable = builder.retryOnThrowable != null
? builder.retryOnThrowable : DEFAULT_RETRY_ON_THROWABLE;
this.throwWhenRetryOnResponseExceeded = builder.throwWhenRetryOnResponseExceeded != null
? builder.throwWhenRetryOnResponseExceeded : DEFAULT_THROW_WHEN_RETRY_ON_RESPONSE_EXCEEDED;
Duration delay = builder.retryDelay != null
? builder.retryDelay
: DEFAULT_RETRY_DELAY;
this.delayedExecutor = CompletableFuture.delayedExecutor(
delay.toMillis(), TimeUnit.MILLISECONDS);
}
/**
* Retry on all Server Response (status code >= 500).
*
* @return default implementation of retry-on-response based on a status code.
*/
private static <T> Predicate<HttpResponse<T>> defaultRetryOnResponse() {
return resp -> resp.statusCode() >= 500;
}
/**
* Invokes a configured {@link HttpInvocation} using {@link Builder} and
* handle exceptions, incorrect responses and retries with a configured
* delay.
*
* @return a completable future with a completed response or failed in
* case of any exception.
*/
public CompletableFuture<HttpResponse<T>> invoke() {
attempts.incrementAndGet();
return client.sendAsync(request, handler)
.thenApply(resp -> {
if (retryOnResponse.test(resp)) {
return attemptRetry(resp, null);
} else {
return CompletableFuture.completedFuture(resp);
}
})
.exceptionally(ex -> {
// All internal exceptions are wrapped by `CompletionException`
if (retryOnThrowable.test(ex.getCause())) {
return attemptRetry(null, ex);
} else {
return CompletableFuture.failedFuture(ex);
}
})
.thenCompose(Function.identity());
}
/**
* It tries to invoke the request again if there is any remaining attempt, or handle the situation
* when a threshold of maximum attempts was exceeded.
*
* @param response a failed response or <b>NULL</b>.
* @param throwable a thrown exception or <b>NULL</b>.
* @return a new completable future with a next attempt, or a failed response/exception in a case
* of exceeded attempts.
*/
private CompletableFuture<HttpResponse<T>> attemptRetry(HttpResponse<T> response, Throwable throwable) {
if (attemptsRemains()) {
LOG.warn("Retrying: attempt={} path={}", attempts.get() + 1, request.uri());
return CompletableFuture.supplyAsync(this::invoke, delayedExecutor)
.thenCompose(Function.identity());
} else {
return handleRetryExceeded(response, throwable);
}
}
/**
* Defines the handler for an exceeded retry attempts. If the last attempt failed because of
* an exception then throw it immediately. However, if the attempt failed on a regular response and
* status code, them there are two possible behaviors based on the property {@link #throwWhenRetryOnResponseExceeded}.
* <ul>
* <li><b>TRUE</b> when {@link #maxAttempts} is exceeded then an exception is thrown</li>
* <li><b>FALSE</b> when {@link #maxAttempts} is exceeded then the latest {@link HttpResponse}
* is returned</li>
* </ul>
*
* @param response the very latest response object
* @return a new completable future with a completed or failed state
* depending on {@link #throwWhenRetryOnResponseExceeded}
*/
private CompletableFuture<HttpResponse<T>> handleRetryExceeded(
HttpResponse<T> response, Throwable throwable) {
if (throwable != null || throwWhenRetryOnResponseExceeded) {
Throwable ex = throwable == null
? new RuntimeException("Retries exceeded: status-code=" + response.statusCode())
: throwable;
return CompletableFuture.failedFuture(ex);
} else {
return CompletableFuture.completedFuture(response);
}
}
/**
* Returns <b>TRUE</b> if the number of retries has not exceeded the predefined
* {@link #maxAttempts} value.
*
* @return <b>TRUE</b> if some attempts still remaining.
*/
private boolean attemptsRemains() {
return attempts.get() < maxAttempts;
}
/**
* Creates a builder without an explicit {@link BodyHandler} which means that the default
* {@link #DEFAULT_BODY_HANDLER} (discarding) with a return type {@link Void}.
*
* @param request an http request to invoke.
* @return a builder with predefined <b>request</b> and a body-handler {@link #DEFAULT_BODY_HANDLER}.
*/
public static Builder<Void> builder(HttpRequest request) {
return new Builder<>(request, DEFAULT_BODY_HANDLER);
}
/**
* Creates a builder along with a {@link BodyHandler} that determines the return type
* defined by a generic <b>T</b>.
*
* @param request an http request to invoke.
* @param bodyHandler a handler to process an incoming entity in a response.
* @param <T> a type of a body of incoming entity.
* @return a builder with predefined <b>request</b> and <b>bodyHandler</b>.
*/
public static <T> Builder<T> builder(HttpRequest request, BodyHandler<T> bodyHandler) {
return new Builder<>(request, bodyHandler);
}
public static final class Builder<T> {
private final HttpRequest request;
private final BodyHandler<T> bodyHandler;
private HttpClient client;
private Integer maxAttempts;
private Duration retryDelay;
private Predicate<HttpResponse<?>> retryOnResponse;
private Predicate<Throwable> retryOnThrowable;
private Boolean throwWhenRetryOnResponseExceeded;
public Builder(HttpRequest request, BodyHandler<T> bodyHandler) {
this.request = request;
this.bodyHandler = bodyHandler;
}
public Builder<T> withHttpClient(HttpClient client) {
this.client = client;
return this;
}
public Builder<T> withMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public Builder<T> withRetryDelay(Duration retryDelay) {
this.retryDelay = retryDelay;
return this;
}
public Builder<T> withRetryOnResponse(Predicate<HttpResponse<?>> retryOnResponse) {
this.retryOnResponse = retryOnResponse;
return this;
}
public Builder<T> withRetryOnThrowable(Predicate<Throwable> retryOnThrowable) {
this.retryOnThrowable = retryOnThrowable;
return this;
}
public Builder<T> withThrowWhenRetryOnResponseExceeded(boolean throwWhenRetryOnResponseExceeded) {
this.throwWhenRetryOnResponseExceeded = throwWhenRetryOnResponseExceeded;
return this;
}
public HttpInvocation<T> build() {
return new HttpInvocation<>(this);
}
}
}
import ch.qos.logback.classic.spi.ILoggingEvent;
import ... <different-gist> AssertAppender;
import ...MockServerContainer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.mockserver.client.MockServerClient;
import org.mockserver.matchers.Times;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.net.ConnectException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
@Testcontainers
class HttpInvocationTest {
private static final String PATH = "/rest/api/latest";
@Container
private static final MockServerContainer CONTAINER = new MockServerContainer();
private static HttpRequest REQUEST;
@BeforeAll
public static void setup() {
REQUEST = HttpRequest.newBuilder()
.uri(CONTAINER.url().resolve(PATH))
.GET()
.build();
}
@Test
public void singleSuccessInvocationWithoutBody() throws Exception {
MockServerClient mockClient = CONTAINER.cleanClient();
mockClient
.when(request()
.withMethod("GET")
.withPath(PATH)
)
.respond(response()
.withStatusCode(200)
);
HttpResponse<Void> response = HttpInvocation.builder(REQUEST)
.build()
.invoke()
.get(1, TimeUnit.SECONDS);
assertEquals(200, response.statusCode());
}
@Test
public void singleSuccessInvocationWithBody() throws Exception {
MockServerClient mockClient = CONTAINER.cleanClient();
mockClient
.when(request()
.withMethod("GET")
.withPath(PATH)
)
.respond(response()
.withStatusCode(200)
.withBody("Body")
);
HttpResponse<String> response =
HttpInvocation.builder(REQUEST, HttpResponse.BodyHandlers.ofString())
.build()
.invoke()
.get(1, TimeUnit.SECONDS);
assertEquals(200, response.statusCode());
assertEquals("Body", response.body());
}
@Test
public void successfulWithRetryOnResponseWithBody() throws Exception {
MockServerClient mockClient = CONTAINER.cleanClient();
mockClient
.when(request()
.withMethod("GET")
.withPath(PATH)
)
.respond(response()
.withStatusCode(200)
.withBody("Body")
);
HttpResponse<String> response =
HttpInvocation.builder(REQUEST, HttpResponse.BodyHandlers.ofString())
.withRetryOnResponse(resp -> !resp.body().equals("Weird Body"))
.build()
.invoke()
.get(1, TimeUnit.SECONDS);
assertEquals(200, response.statusCode());
assertEquals("Body", response.body());
}
@Test
public void successfulRetry() throws Exception {
MockServerClient mockClient = CONTAINER.cleanClient();
mockClient
.when(request()
.withMethod("GET")
.withPath(PATH),
Times.exactly(2)
)
.respond(response()
.withStatusCode(500)
);
mockClient
.when(request()
.withMethod("GET")
.withPath(PATH)
)
.respond(response()
.withStatusCode(200)
.withBody("Body")
);
Predicate<ILoggingEvent> assertion = event ->
event.toString().startsWith("[WARN] Retrying: attempt=3");
try (var appender = new AssertAppender(HttpInvocation.class, assertion)) {
appender.start();
HttpResponse<String> response =
HttpInvocation.builder(REQUEST, HttpResponse.BodyHandlers.ofString())
.withMaxAttempts(3)
.withRetryDelay(Duration.ofMillis(100))
.build()
.invoke()
.get(1, TimeUnit.SECONDS);
appender.waitForAssertion(Duration.ofSeconds(1));
assertEquals(200, response.statusCode());
assertEquals("Body", response.body());
}
}
@Test
public void attemptsExceededOnResponseThrowException() {
MockServerClient mockClient = CONTAINER.cleanClient();
mockClient
.when(request()
.withMethod("GET")
.withPath(PATH)
)
.respond(response()
.withStatusCode(500)
);
Executable executable =
() -> HttpInvocation.builder(REQUEST, HttpResponse.BodyHandlers.ofString())
.withMaxAttempts(3)
.withRetryDelay(Duration.ofMillis(100))
.withThrowWhenRetryOnResponseExceeded(true)
.build()
.invoke()
.get(1, TimeUnit.SECONDS);
ExecutionException ex = assertThrows(ExecutionException.class, executable);
assertEquals("java.lang.RuntimeException: Retries exceeded: status-code=500", ex.getMessage());
}
@Test
public void attemptsExceededOnResponseReturnResponse() throws Exception {
MockServerClient mockClient = CONTAINER.cleanClient();
mockClient
.when(request()
.withMethod("GET")
.withPath(PATH)
)
.respond(response()
.withStatusCode(500)
);
HttpResponse<String> response = HttpInvocation.builder(REQUEST, HttpResponse.BodyHandlers.ofString())
.withMaxAttempts(3)
.withRetryDelay(Duration.ofMillis(100))
.withThrowWhenRetryOnResponseExceeded(false)
.build()
.invoke()
.get(1, TimeUnit.SECONDS);
assertEquals(500, response.statusCode());
}
@Test
public void attemptsExceededOnIOException() throws Exception {
HttpRequest request = HttpRequest.newBuilder(URI.create("http://non-existing"))
.build();
Predicate<ILoggingEvent> assertion = event ->
event.toString().startsWith("[WARN] Retrying: attempt=3");
try (var appender = new AssertAppender(HttpInvocation.class, assertion)) {
appender.start();
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(10))
.build();
Executable executable =
() -> HttpInvocation.builder(request, HttpResponse.BodyHandlers.ofString())
.withHttpClient(httpClient)
.withMaxAttempts(3)
.withRetryDelay(Duration.ofMillis(100))
.withThrowWhenRetryOnResponseExceeded(false)
.build()
.invoke()
.get(1, TimeUnit.SECONDS);
ExecutionException ex = assertThrows(ExecutionException.class, executable);
appender.waitForAssertion(Duration.ofSeconds(1));
assertEquals(ConnectException.class, ex.getCause().getClass());
}
}
}
@petrbouda
Copy link
Copy Markdown
Author

petrbouda commented May 25, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment