Skip to content

Commit

Permalink
Decorate request channel and changes type in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
anuragagarwal561994 committed Mar 26, 2022
1 parent 0ef6cbd commit dd8ac9f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ public List<String> requestHeader(ApacheHttpClientRequest request, String name)
}

@Override
@Nullable
public Long requestContentLength(
ApacheHttpClientRequest request, @Nullable HttpResponse response) {
return null;
return request.requestContentLength();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.ContentEncoder;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,53 +95,64 @@ public DelegatingRequestProducer(
}

@Override
public HttpHost getTarget() {
return delegate.getTarget();
public void failed(Exception ex) {
delegate.failed(ex);
}

@Override
public HttpRequest generateRequest() throws IOException, HttpException {
HttpHost target = delegate.getTarget();
HttpRequest request = delegate.generateRequest();

ApacheHttpClientRequest otelRequest = new ApacheHttpClientRequest(target, request);

if (instrumenter().shouldStart(parentContext, otelRequest)) {
wrappedFutureCallback.context = instrumenter().start(parentContext, otelRequest);
wrappedFutureCallback.otelRequest = otelRequest;
}

return request;
public void sendRequest(RequestChannel channel, HttpContext context)
throws HttpException, IOException {
DelegatingRequestChannel requestChannel =
new DelegatingRequestChannel(channel, parentContext, wrappedFutureCallback);
delegate.sendRequest(requestChannel, context);
}

@Override
public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOException {
delegate.produceContent(encoder, ioctrl);
public boolean isRepeatable() {
return delegate.isRepeatable();
}

@Override
public void requestCompleted(HttpContext context) {
delegate.requestCompleted(context);
public int available() {
return delegate.available();
}

@Override
public void failed(Exception ex) {
delegate.failed(ex);
public void produce(DataStreamChannel channel) throws IOException {
delegate.produce(channel);
}

@Override
public boolean isRepeatable() {
return delegate.isRepeatable();
public void releaseResources() {
delegate.releaseResources();
}
}

@Override
public void resetRequest() throws IOException {
delegate.resetRequest();
public static class DelegatingRequestChannel implements RequestChannel {
private final RequestChannel delegate;
private final Context parentContext;
private final WrappedFutureCallback<?> wrappedFutureCallback;

public DelegatingRequestChannel(
RequestChannel requestChannel,
Context parentContext,
WrappedFutureCallback<?> wrappedFutureCallback) {
this.delegate = requestChannel;
this.parentContext = parentContext;
this.wrappedFutureCallback = wrappedFutureCallback;
}

@Override
public void close() throws IOException {
delegate.close();
public void sendRequest(HttpRequest request, EntityDetails entityDetails, HttpContext context)
throws HttpException, IOException {
ApacheHttpClientRequest otelRequest = new ApacheHttpClientRequest(request, entityDetails);

if (instrumenter().shouldStart(parentContext, otelRequest)) {
wrappedFutureCallback.context = instrumenter().start(parentContext, otelRequest);
wrappedFutureCallback.otelRequest = otelRequest;
}

delegate.sendRequest(request, entityDetails, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.ProtocolVersion;
import org.slf4j.Logger;
Expand All @@ -26,15 +26,12 @@ public final class ApacheHttpClientRequest {
@Nullable private final URI uri;

private final HttpRequest delegate;
private final EntityDetails entityDetails;

public ApacheHttpClientRequest(HttpHost httpHost, HttpRequest httpRequest) {
URI calculatedUri = getUri(httpRequest);
if (calculatedUri != null && httpHost != null) {
uri = getCalculatedUri(httpHost, calculatedUri);
} else {
uri = calculatedUri;
}
delegate = httpRequest;
public ApacheHttpClientRequest(HttpRequest httpRequest, EntityDetails entityDetails) {
uri = getUri(httpRequest);
this.entityDetails = entityDetails;
this.delegate = httpRequest;
}

public List<String> getHeader(String name) {
Expand All @@ -53,6 +50,10 @@ static List<String> headersToList(Header[] headers) {
return headersList;
}

public long requestContentLength() {
return entityDetails.getContentLength();
}

public void setHeader(String name, String value) {
delegate.setHeader(name, value);
}
Expand Down Expand Up @@ -119,27 +120,4 @@ private static URI getUri(HttpRequest httpRequest) {
return null;
}
}

@Nullable
private static URI getCalculatedUri(HttpHost httpHost, URI uri) {
try {
String path = uri.getPath();
if (!path.startsWith("/")) {
// elasticsearch RestClient sends relative urls
// TODO(trask) add test for this and extend to Apache 4, 4.3 and 5
path = "/" + path;
}
return new URI(
httpHost.getSchemeName(),
null,
httpHost.getHostName(),
httpHost.getPort(),
path,
uri.getQuery(),
uri.getFragment());
} catch (URISyntaxException e) {
logger.debug(e.getMessage(), e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
package io.opentelemetry.javaagent.instrumentation.apachehttpasyncclient;

import io.opentelemetry.context.propagation.TextMapSetter;
import javax.annotation.Nullable;

enum HttpHeaderSetter implements TextMapSetter<ApacheHttpClientRequest> {
INSTANCE;

@Override
public void set(ApacheHttpClientRequest carrier, String key, String value) {
public void set(@Nullable ApacheHttpClientRequest carrier, String key, String value) {
if (carrier == null) {
return;
}
carrier.setHeader(key, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse
import org.apache.hc.client5.http.config.RequestConfig
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient
import org.apache.hc.client5.http.impl.async.HttpAsyncClients
import org.apache.hc.core5.concurrent.FutureCallback
import org.apache.hc.core5.http.HttpHost
import org.apache.hc.core5.http.HttpResponse
import org.apache.hc.core5.http.message.BasicHeader
import org.apache.hc.core5.util.Timeout
import spock.lang.AutoCleanup
import spock.lang.Shared

import java.util.concurrent.CancellationException

abstract class ApacheHttpAsyncClientTest extends HttpClientTest<HttpUriRequest> implements AgentTestTrait {
abstract class ApacheHttpAsyncClientTest extends HttpClientTest<SimpleHttpRequest> implements AgentTestTrait {

@Shared
RequestConfig requestConfig = RequestConfig.custom()
Expand Down Expand Up @@ -70,7 +71,7 @@ abstract class ApacheHttpAsyncClientTest extends HttpClientTest<HttpUriRequest>
}

@Override
HttpUriRequest buildRequest(String method, URI uri, Map<String, String> headers) {
SimpleHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def request = createRequest(method, uri)
request.addHeader("user-agent", userAgent())
headers.entrySet().each {
Expand All @@ -89,18 +90,18 @@ abstract class ApacheHttpAsyncClientTest extends HttpClientTest<HttpUriRequest>
}

// compilation fails with @Override annotation on this method (groovy quirk?)
int sendRequest(HttpUriRequest request, String method, URI uri, Map<String, String> headers) {
int sendRequest(SimpleHttpRequest request, String method, URI uri, Map<String, String> headers) {
def response = executeRequest(request, uri)
response.entity?.content?.close() // Make sure the connection is closed.
return response.statusLine.statusCode
}

// compilation fails with @Override annotation on this method (groovy quirk?)
void sendRequestWithCallback(HttpUriRequest request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
void sendRequestWithCallback(SimpleHttpRequest request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
try {
executeRequestWithCallback(request, uri, new FutureCallback<HttpResponse>() {
executeRequestWithCallback(request, uri, new FutureCallback<SimpleHttpResponse>() {
@Override
void completed(HttpResponse httpResponse) {
void completed(SimpleHttpResponse httpResponse) {
httpResponse.entity?.content?.close() // Make sure the connection is closed.
requestResult.complete(httpResponse.statusLine.statusCode)
}
Expand All @@ -120,11 +121,11 @@ abstract class ApacheHttpAsyncClientTest extends HttpClientTest<HttpUriRequest>
}
}

abstract HttpUriRequest createRequest(String method, URI uri)
abstract SimpleHttpRequest createRequest(String method, URI uri)

abstract HttpResponse executeRequest(HttpUriRequest request, URI uri)
abstract SimpleHttpResponse executeRequest(SimpleHttpRequest request, URI uri)

abstract void executeRequestWithCallback(HttpUriRequest request, URI uri, FutureCallback<HttpResponse> callback)
abstract void executeRequestWithCallback(SimpleHttpRequest request, URI uri, FutureCallback<SimpleHttpResponse> callback)

static String fullPathFromURI(URI uri) {
StringBuilder builder = new StringBuilder()
Expand All @@ -147,53 +148,53 @@ abstract class ApacheHttpAsyncClientTest extends HttpClientTest<HttpUriRequest>

class ApacheClientUriRequest extends ApacheHttpAsyncClientTest {
@Override
HttpUriRequest createRequest(String method, URI uri) {
return new HttpUriRequest(method, uri)
SimpleHttpRequest createRequest(String method, URI uri) {
return new SimpleHttpRequest(method, uri)
}

@Override
HttpResponse executeRequest(HttpUriRequest request, URI uri) {
SimpleHttpResponse executeRequest(SimpleHttpRequest request, URI uri) {
return getClient(uri).execute(request, null).get()
}

@Override
void executeRequestWithCallback(HttpUriRequest request, URI uri, FutureCallback<HttpResponse> callback) {
void executeRequestWithCallback(SimpleHttpRequest request, URI uri, FutureCallback<SimpleHttpResponse> callback) {
getClient(uri).execute(request, callback)
}
}

class ApacheClientHostRequest extends ApacheHttpAsyncClientTest {
@Override
HttpUriRequest createRequest(String method, URI uri) {
SimpleHttpRequest createRequest(String method, URI uri) {
// also testing with absolute path below
return new HttpUriRequest(method, new URI(fullPathFromURI(uri)))
return new SimpleHttpRequest(method, new URI(fullPathFromURI(uri)))
}

@Override
HttpResponse executeRequest(HttpUriRequest request, URI uri) {
SimpleHttpResponse executeRequest(SimpleHttpRequest request, URI uri) {
return getClient(uri).execute(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), request, null).get()
}

@Override
void executeRequestWithCallback(HttpUriRequest request, URI uri, FutureCallback<HttpResponse> callback) {
void executeRequestWithCallback(SimpleHttpRequest request, URI uri, FutureCallback<SimpleHttpResponse> callback) {
getClient(uri).execute(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), request, callback)
}
}

class ApacheClientHostAbsoluteUriRequest extends ApacheHttpAsyncClientTest {

@Override
HttpUriRequest createRequest(String method, URI uri) {
return new HttpUriRequest(method, new URI(uri.toString()))
SimpleHttpRequest createRequest(String method, URI uri) {
return new SimpleHttpRequest(method, new URI(uri.toString()))
}

@Override
HttpResponse executeRequest(HttpUriRequest request, URI uri) {
SimpleHttpResponse executeRequest(SimpleHttpRequest request, URI uri) {
return getClient(uri).execute(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), request, null).get()
}

@Override
void executeRequestWithCallback(HttpUriRequest request, URI uri, FutureCallback<HttpResponse> callback) {
void executeRequestWithCallback(SimpleHttpRequest request, URI uri, FutureCallback<SimpleHttpResponse> callback) {
getClient(uri).execute(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), request, callback)
}
}

This file was deleted.

0 comments on commit dd8ac9f

Please sign in to comment.