-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client: Wrap synchronous exceptions #28919
Changes from 7 commits
211d31b
d823914
fdabee2
966bdde
17c10f0
888c6bd
b5fa54c
6b3e0b5
990444a
7212e84
fa47ddb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import org.apache.http.client.protocol.HttpClientContext; | ||
import org.apache.http.client.utils.URIBuilder; | ||
import org.apache.http.concurrent.FutureCallback; | ||
import org.apache.http.conn.ConnectTimeoutException; | ||
import org.apache.http.impl.auth.BasicScheme; | ||
import org.apache.http.impl.client.BasicAuthCache; | ||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; | ||
|
@@ -47,6 +48,7 @@ | |
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.net.SocketTimeoutException; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.util.ArrayList; | ||
|
@@ -201,6 +203,14 @@ public Response performRequest(String method, String endpoint, Map<String, Strin | |
* they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead | ||
* nodes that deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown. | ||
* | ||
* This method works by performing an asynchronous call and the waiting | ||
* for the result. If the asynchronous call throws and exception we wrap | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/and/an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
* it and rethrow it so that the stack trace attached to the exception | ||
* contains the call site. While we attempt to preserve the original | ||
* exception this isn't always possible and likely haven't covered all of | ||
* the cases. You can get the original exception from | ||
* {@link Exception#getCause()}. | ||
* | ||
* @param method the http method | ||
* @param endpoint the path of the request (without host and port) | ||
* @param params the query_string parameters | ||
|
@@ -218,7 +228,8 @@ public Response performRequest(String method, String endpoint, Map<String, Strin | |
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, | ||
Header... headers) throws IOException { | ||
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis); | ||
performRequestAsync(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, listener, headers); | ||
performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, | ||
listener, headers); | ||
return listener.get(); | ||
} | ||
|
||
|
@@ -293,43 +304,50 @@ public void performRequestAsync(String method, String endpoint, Map<String, Stri | |
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, | ||
ResponseListener responseListener, Header... headers) { | ||
try { | ||
Objects.requireNonNull(params, "params must not be null"); | ||
Map<String, String> requestParams = new HashMap<>(params); | ||
//ignore is a special parameter supported by the clients, shouldn't be sent to es | ||
String ignoreString = requestParams.remove("ignore"); | ||
Set<Integer> ignoreErrorCodes; | ||
if (ignoreString == null) { | ||
if (HttpHead.METHOD_NAME.equals(method)) { | ||
//404 never causes error if returned for a HEAD request | ||
ignoreErrorCodes = Collections.singleton(404); | ||
} else { | ||
ignoreErrorCodes = Collections.emptySet(); | ||
} | ||
performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, | ||
responseListener, headers); | ||
} catch (Exception e) { | ||
responseListener.onFailure(e); | ||
} | ||
} | ||
|
||
void performRequestAsyncNoCatch(String method, String endpoint, Map<String, String> params, | ||
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, | ||
ResponseListener responseListener, Header... headers) { | ||
Objects.requireNonNull(params, "params must not be null"); | ||
Map<String, String> requestParams = new HashMap<>(params); | ||
//ignore is a special parameter supported by the clients, shouldn't be sent to es | ||
String ignoreString = requestParams.remove("ignore"); | ||
Set<Integer> ignoreErrorCodes; | ||
if (ignoreString == null) { | ||
if (HttpHead.METHOD_NAME.equals(method)) { | ||
//404 never causes error if returned for a HEAD request | ||
ignoreErrorCodes = Collections.singleton(404); | ||
} else { | ||
String[] ignoresArray = ignoreString.split(","); | ||
ignoreErrorCodes = new HashSet<>(); | ||
if (HttpHead.METHOD_NAME.equals(method)) { | ||
//404 never causes error if returned for a HEAD request | ||
ignoreErrorCodes.add(404); | ||
} | ||
for (String ignoreCode : ignoresArray) { | ||
try { | ||
ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); | ||
} catch (NumberFormatException e) { | ||
throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); | ||
} | ||
ignoreErrorCodes = Collections.emptySet(); | ||
} | ||
} else { | ||
String[] ignoresArray = ignoreString.split(","); | ||
ignoreErrorCodes = new HashSet<>(); | ||
if (HttpHead.METHOD_NAME.equals(method)) { | ||
//404 never causes error if returned for a HEAD request | ||
ignoreErrorCodes.add(404); | ||
} | ||
for (String ignoreCode : ignoresArray) { | ||
try { | ||
ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); | ||
} catch (NumberFormatException e) { | ||
throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); | ||
} | ||
} | ||
URI uri = buildUri(pathPrefix, endpoint, requestParams); | ||
HttpRequestBase request = createHttpRequest(method, uri, entity); | ||
setHeaders(request, headers); | ||
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); | ||
long startTime = System.nanoTime(); | ||
performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory, | ||
failureTrackingResponseListener); | ||
} catch (Exception e) { | ||
responseListener.onFailure(e); | ||
} | ||
URI uri = buildUri(pathPrefix, endpoint, requestParams); | ||
HttpRequestBase request = createHttpRequest(method, uri, entity); | ||
setHeaders(request, headers); | ||
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); | ||
long startTime = System.nanoTime(); | ||
performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory, | ||
failureTrackingResponseListener); | ||
} | ||
|
||
private void performRequestAsync(final long startTime, final HostTuple<Iterator<HttpHost>> hostTuple, final HttpRequestBase request, | ||
|
@@ -674,12 +692,30 @@ Response get() throws IOException { | |
e.addSuppressed(exception); | ||
throw e; | ||
} | ||
//try and leave the exception untouched as much as possible but we don't want to just add throws Exception clause everywhere | ||
/* | ||
* Wrap and rethrow whatever exception we received, copying the type | ||
* where possible so the synchronous API looks as much as possible | ||
* like the asynchronous API. We wrap the exception so that the caller's | ||
* signature shows up in any exception we throw. | ||
*/ | ||
if (exception instanceof ResponseException) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would have thought to use reflection here to look for a constructor of the appropriate shape (taking an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about the reflection solution here and rejected it, mostly because we tend to dislike it in the rest of Elasticsearch. I think the trouble with reflection here is that it is difficult to reason about. Not in the "what is going to happen?" sense, but in the "are the ctors that I call going to do the right thing with the things I give them?" sense. For what it is worth I'd like to abstract async http client from the Elasticsearch client eventually and I don't think the reflection based approach would play well with that. I certainly understand that this will change the exceptions that we throw in the cases where I don't have an explicit branch though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or is it a way to wrap the exceptions thrown (all or parts) already in the async call into a "CustomException" ? kind of like the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the exception unwrapping is going to be tricky to get perfect. I'd be ok with some kind of rethrown exception. I mean, ultimately, that is what we end up with if we fall through all the @javanna, what do you think? You've been working in this area a lot longer than I have. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I understand what it all boils down to here is the different IOExceptions that can be thrown by the underlying http client, there may be more than the ones that we have a branch for. I think the current solution is good enough, I even wonder if anybody is ever going to catch socket timeout rather connect timeout etc. maybe all those could just be generic IOExceptions like we already do below and one has to look at the cause to see what it really is? I think that would be reasonable too, but we do want to rethrow ResponseException as a proper ResponseException as it's our own (like we already do). I do not have anything against the proposed reflection solution either, but maybe what we have now is easier to reason about and we can always add branches if we missed something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should document what we do with exceptions in the sync methods, so people know what to expect. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another idea, not sure though how much it makes sense, could be to add our own specific runtime exception (instead of a generic one) with a good name that indicates that it's used only for wrapping, and always use that one, and document well that users have to look at the cause. Then we may want to also remove all the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I keep coming back to "I'd like to make async httpclient an implementation detail one day" and I think this kind of ++ on adding javadoc for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nik9000 do you think it would be helpful to add another branch for SSLException ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fine with me! I can do it. |
||
throw new ResponseException((ResponseException) exception); | ||
} | ||
if (exception instanceof ConnectTimeoutException) { | ||
ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage()); | ||
e.initCause(exception); | ||
throw e; | ||
} | ||
if (exception instanceof SocketTimeoutException) { | ||
SocketTimeoutException e = new SocketTimeoutException(exception.getMessage()); | ||
e.initCause(exception); | ||
throw e; | ||
} | ||
if (exception instanceof IOException) { | ||
throw (IOException) exception; | ||
throw new IOException(exception.getMessage(), exception); | ||
} | ||
if (exception instanceof RuntimeException){ | ||
throw (RuntimeException) exception; | ||
throw new RuntimeException(exception.getMessage(), exception); | ||
} | ||
throw new RuntimeException("error while performing request", exception); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import org.apache.http.message.BasicStatusLine; | ||
import org.apache.http.nio.protocol.HttpAsyncRequestProducer; | ||
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.mockito.invocation.InvocationOnMock; | ||
import org.mockito.stubbing.Answer; | ||
|
@@ -44,6 +45,8 @@ | |
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
|
||
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode; | ||
|
@@ -66,6 +69,7 @@ | |
*/ | ||
public class RestClientMultipleHostsTests extends RestClientTestCase { | ||
|
||
private ExecutorService exec = Executors.newFixedThreadPool(1); | ||
private RestClient restClient; | ||
private HttpHost[] httpHosts; | ||
private HostsTrackingFailureListener failureListener; | ||
|
@@ -79,23 +83,28 @@ public void createRestClient() throws IOException { | |
@Override | ||
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable { | ||
HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; | ||
HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); | ||
HttpHost httpHost = requestProducer.getTarget(); | ||
final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); | ||
final HttpHost httpHost = requestProducer.getTarget(); | ||
HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; | ||
assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class)); | ||
FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3]; | ||
final FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3]; | ||
//return the desired status code or exception depending on the path | ||
if (request.getURI().getPath().equals("/soe")) { | ||
futureCallback.failed(new SocketTimeoutException(httpHost.toString())); | ||
} else if (request.getURI().getPath().equals("/coe")) { | ||
futureCallback.failed(new ConnectTimeoutException(httpHost.toString())); | ||
} else if (request.getURI().getPath().equals("/ioe")) { | ||
futureCallback.failed(new IOException(httpHost.toString())); | ||
} else { | ||
int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); | ||
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); | ||
futureCallback.completed(new BasicHttpResponse(statusLine)); | ||
} | ||
exec.execute(new Runnable() { | ||
@Override | ||
public void run() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that I can't do that here because these tests are compiled with source compatibility set to 1.7. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The non-test code has to be compiled against 1.7 because we want folks still on 1.7 to be able to use it. The tests kind of come along for the ride mostly to make sure that everything works properly in 1.7. For things like that it is a bit of a pain though. |
||
if (request.getURI().getPath().equals("/soe")) { | ||
futureCallback.failed(new SocketTimeoutException(httpHost.toString())); | ||
} else if (request.getURI().getPath().equals("/coe")) { | ||
futureCallback.failed(new ConnectTimeoutException(httpHost.toString())); | ||
} else if (request.getURI().getPath().equals("/ioe")) { | ||
futureCallback.failed(new IOException(httpHost.toString())); | ||
} else { | ||
int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); | ||
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); | ||
futureCallback.completed(new BasicHttpResponse(statusLine)); | ||
} | ||
} | ||
}); | ||
return null; | ||
} | ||
}); | ||
|
@@ -108,6 +117,14 @@ public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Thr | |
restClient = new RestClient(httpClient, 10000, new Header[0], httpHosts, null, failureListener); | ||
} | ||
|
||
/** | ||
* Shutdown the executor so we don't leak threads into other test runs. | ||
*/ | ||
@After | ||
public void shutdownExec() { | ||
exec.shutdown(); | ||
} | ||
|
||
public void testRoundRobinOkStatusCodes() throws IOException { | ||
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5); | ||
for (int i = 0; i < numIters; i++) { | ||
|
@@ -142,7 +159,7 @@ public void testRoundRobinNoRetryErrors() throws IOException { | |
} else { | ||
fail("request should have failed"); | ||
} | ||
} catch(ResponseException e) { | ||
} catch (ResponseException e) { | ||
if (method.equals("HEAD") && statusCode == 404) { | ||
throw e; | ||
} | ||
|
@@ -162,7 +179,12 @@ public void testRoundRobinRetryErrors() throws IOException { | |
try { | ||
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint); | ||
fail("request should have failed"); | ||
} catch(ResponseException e) { | ||
} catch (ResponseException e) { | ||
/* | ||
* Unwrap the top level failure that was added so the stack trace contains | ||
* the caller. It wraps the exception that contains the failed hosts. | ||
*/ | ||
e = (ResponseException) e.getCause(); | ||
Set<HttpHost> hostsSet = new HashSet<>(); | ||
Collections.addAll(hostsSet, httpHosts); | ||
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each | ||
|
@@ -182,7 +204,12 @@ public void testRoundRobinRetryErrors() throws IOException { | |
} | ||
} while(e != null); | ||
assertEquals("every host should have been used but some weren't: " + hostsSet, 0, hostsSet.size()); | ||
} catch(IOException e) { | ||
} catch (IOException e) { | ||
/* | ||
* Unwrap the top level failure that was added so the stack trace contains | ||
* the caller. It wraps the exception that contains the failed hosts. | ||
*/ | ||
e = (IOException) e.getCause(); | ||
Set<HttpHost> hostsSet = new HashSet<>(); | ||
Collections.addAll(hostsSet, httpHosts); | ||
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each | ||
|
@@ -212,15 +239,20 @@ public void testRoundRobinRetryErrors() throws IOException { | |
try { | ||
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint); | ||
fail("request should have failed"); | ||
} catch(ResponseException e) { | ||
} catch (ResponseException e) { | ||
Response response = e.getResponse(); | ||
assertThat(response.getStatusLine().getStatusCode(), equalTo(Integer.parseInt(retryEndpoint.substring(1)))); | ||
assertTrue("host [" + response.getHost() + "] not found, most likely used multiple times", | ||
hostsSet.remove(response.getHost())); | ||
//after the first request, all hosts are blacklisted, a single one gets resurrected each time | ||
failureListener.assertCalled(response.getHost()); | ||
assertEquals(0, e.getSuppressed().length); | ||
} catch(IOException e) { | ||
} catch (IOException e) { | ||
/* | ||
* Unwrap the top level failure that was added so the stack trace contains | ||
* the caller. It wraps the exception that contains the failed hosts. | ||
*/ | ||
e = (IOException) e.getCause(); | ||
HttpHost httpHost = HttpHost.create(e.getMessage()); | ||
assertTrue("host [" + httpHost + "] not found, most likely used multiple times", hostsSet.remove(httpHost)); | ||
//after the first request, all hosts are blacklisted, a single one gets resurrected each time | ||
|
@@ -238,8 +270,7 @@ public void testRoundRobinRetryErrors() throws IOException { | |
Response response; | ||
try { | ||
response = restClient.performRequest(randomHttpMethod(getRandom()), "/" + statusCode); | ||
} | ||
catch(ResponseException e) { | ||
} catch (ResponseException e) { | ||
response = e.getResponse(); | ||
} | ||
assertThat(response.getStatusLine().getStatusCode(), equalTo(statusCode)); | ||
|
@@ -257,12 +288,17 @@ public void testRoundRobinRetryErrors() throws IOException { | |
try { | ||
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint); | ||
fail("request should have failed"); | ||
} catch(ResponseException e) { | ||
} catch (ResponseException e) { | ||
Response response = e.getResponse(); | ||
assertThat(response.getStatusLine().getStatusCode(), equalTo(Integer.parseInt(retryEndpoint.substring(1)))); | ||
assertThat(response.getHost(), equalTo(selectedHost)); | ||
failureListener.assertCalled(selectedHost); | ||
} catch(IOException e) { | ||
/* | ||
* Unwrap the top level failure that was added so the stack trace contains | ||
* the caller. It wraps the exception that contains the failed hosts. | ||
*/ | ||
e = (IOException) e.getCause(); | ||
HttpHost httpHost = HttpHost.create(e.getMessage()); | ||
assertThat(httpHost, equalTo(selectedHost)); | ||
failureListener.assertCalled(selectedHost); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove "the" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++