Skip to content

Commit

Permalink
Feature/add http custom headers (#580)
Browse files Browse the repository at this point in the history
* refactor: Sort methods by modifier, append ConfigureAwait(false) consistent, remove unnecessary preprocessor directives

* feat: Add default HTTP custom header
  • Loading branch information
HofmeisterAn authored Aug 17, 2022
1 parent 4d313ee commit 6285c9a
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 125 deletions.
22 changes: 11 additions & 11 deletions src/Docker.DotNet/DockerApiStreamedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@

namespace Docker.DotNet
{
internal class DockerApiStreamedResponse
internal sealed class DockerApiStreamedResponse
{
public HttpStatusCode StatusCode { get; private set; }

public Stream Body { get; private set; }

public HttpResponseHeaders Headers { get; private set; }

public DockerApiStreamedResponse(HttpStatusCode statusCode, Stream body, HttpResponseHeaders headers)
{
this.StatusCode = statusCode;
this.Body = body;
this.Headers = headers;
StatusCode = statusCode;
Body = body;
Headers = headers;
}

public HttpStatusCode StatusCode { get; }

public Stream Body { get; }

public HttpResponseHeaders Headers { get; }
}
}
}
176 changes: 94 additions & 82 deletions src/Docker.DotNet/DockerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,35 @@
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Net.Http.Client;

#if (NETSTANDARD1_6 || NETSTANDARD2_0)
using System.Net.Sockets;
#endif

namespace Docker.DotNet
{
public sealed class DockerClient : IDockerClient
{
internal readonly IEnumerable<ApiResponseErrorHandlingDelegate> NoErrorHandlers = Enumerable.Empty<ApiResponseErrorHandlingDelegate>();

private const string UserAgent = "Docker.DotNet";

private static readonly TimeSpan s_InfiniteTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
private static readonly TimeSpan SInfiniteTimeout = Timeout.InfiniteTimeSpan;

private readonly HttpClient _client;

private readonly Uri _endpointBaseUri;

internal readonly IEnumerable<ApiResponseErrorHandlingDelegate> NoErrorHandlers = Enumerable.Empty<ApiResponseErrorHandlingDelegate>();
private readonly Version _requestedApiVersion;

internal DockerClient(DockerClientConfiguration configuration, Version requestedApiVersion)
{
Configuration = configuration;
_requestedApiVersion = requestedApiVersion;
JsonSerializer = new JsonSerializer();

Configuration = configuration;
DefaultTimeout = configuration.DefaultTimeout;

JsonSerializer = new JsonSerializer();
Images = new ImageOperations(this);
Containers = new ContainerOperations(this);
System = new SystemOperations(this);
Expand Down Expand Up @@ -73,15 +73,13 @@ internal DockerClient(DockerClientConfiguration configuration, Version requested
uri = new UriBuilder("http", pipeName).Uri;
handler = new ManagedHandler(async (host, port, cancellationToken) =>
{
int timeout = (int)this.Configuration.NamedPipeConnectTimeout.TotalMilliseconds;
var timeout = (int)Configuration.NamedPipeConnectTimeout.TotalMilliseconds;
var stream = new NamedPipeClientStream(serverName, pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
var dockerStream = new DockerPipeStream(stream);

#if NET45
await Task.Run(() => stream.Connect(timeout), cancellationToken);
#else
await stream.ConnectAsync(timeout, cancellationToken);
#endif
await stream.ConnectAsync(timeout, cancellationToken)
.ConfigureAwait(false);

return dockerStream;
});

Expand All @@ -101,18 +99,19 @@ internal DockerClient(DockerClientConfiguration configuration, Version requested
handler = new ManagedHandler();
break;

#if (NETSTANDARD1_6 || NETSTANDARD2_0)
case "unix":
var pipeString = uri.LocalPath;
handler = new ManagedHandler(async (string host, int port, CancellationToken cancellationToken) =>
handler = new ManagedHandler(async (host, port, cancellationToken) =>
{
var sock = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
await sock.ConnectAsync(new UnixDomainSocketEndPoint(pipeString));

await sock.ConnectAsync(new Microsoft.Net.Http.Client.UnixDomainSocketEndPoint(pipeString))
.ConfigureAwait(false);

return sock;
});
uri = new UriBuilder("http", uri.Segments.Last()).Uri;
break;
#endif

default:
throw new Exception($"Unknown URL scheme {configuration.EndpointBaseUri.Scheme}");
Expand All @@ -121,8 +120,7 @@ internal DockerClient(DockerClientConfiguration configuration, Version requested
_endpointBaseUri = uri;

_client = new HttpClient(Configuration.Credentials.GetHandler(handler), true);
DefaultTimeout = Configuration.DefaultTimeout;
_client.Timeout = s_InfiniteTimeout;
_client.Timeout = SInfiniteTimeout;
}

public DockerClientConfiguration Configuration { get; }
Expand Down Expand Up @@ -151,6 +149,11 @@ internal DockerClient(DockerClientConfiguration configuration, Version requested

internal JsonSerializer JsonSerializer { get; }

public void Dispose()
{
Configuration.Dispose();
}

internal Task<DockerApiResponse> MakeRequestAsync(
IEnumerable<ApiResponseErrorHandlingDelegate> errorHandlers,
HttpMethod method,
Expand Down Expand Up @@ -190,7 +193,7 @@ internal Task<DockerApiResponse> MakeRequestAsync(
IDictionary<string, string> headers,
CancellationToken token)
{
return MakeRequestAsync(errorHandlers, method, path, queryString, body, headers, this.DefaultTimeout, token);
return MakeRequestAsync(errorHandlers, method, path, queryString, body, headers, DefaultTimeout, token);
}

internal async Task<DockerApiResponse> MakeRequestAsync(
Expand All @@ -203,12 +206,16 @@ internal async Task<DockerApiResponse> MakeRequestAsync(
TimeSpan timeout,
CancellationToken token)
{
var response = await PrivateMakeRequestAsync(timeout, HttpCompletionOption.ResponseContentRead, method, path, queryString, headers, body, token).ConfigureAwait(false);
var response = await PrivateMakeRequestAsync(timeout, HttpCompletionOption.ResponseContentRead, method, path, queryString, headers, body, token)
.ConfigureAwait(false);

using (response)
{
await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers);
await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers)
.ConfigureAwait(false);

var responseBody = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
var responseBody = await response.Content.ReadAsStringAsync()
.ConfigureAwait(false);

return new DockerApiResponse(response.StatusCode, responseBody);
}
Expand Down Expand Up @@ -253,7 +260,7 @@ internal Task<Stream> MakeRequestForStreamAsync(
IDictionary<string, string> headers,
CancellationToken token)
{
return MakeRequestForStreamAsync(errorHandlers, method, path, queryString, body, headers, s_InfiniteTimeout, token);
return MakeRequestForStreamAsync(errorHandlers, method, path, queryString, body, headers, SInfiniteTimeout, token);
}

internal async Task<Stream> MakeRequestForStreamAsync(
Expand All @@ -266,23 +273,25 @@ internal async Task<Stream> MakeRequestForStreamAsync(
TimeSpan timeout,
CancellationToken token)
{
var response = await PrivateMakeRequestAsync(timeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, headers, body, token).ConfigureAwait(false);
var response = await PrivateMakeRequestAsync(timeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, headers, body, token)
.ConfigureAwait(false);

await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers);
await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers)
.ConfigureAwait(false);

return await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
return await response.Content.ReadAsStreamAsync()
.ConfigureAwait(false);
}

internal async Task<HttpResponseMessage> MakeRequestForRawResponseAsync(
internal Task<HttpResponseMessage> MakeRequestForRawResponseAsync(
HttpMethod method,
string path,
IQueryString queryString,
IRequestContent body,
IDictionary<string, string> headers,
CancellationToken token)
{
var response = await PrivateMakeRequestAsync(s_InfiniteTimeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, headers, body, token).ConfigureAwait(false);
return response;
return PrivateMakeRequestAsync(SInfiniteTimeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, headers, body, token);
}

internal async Task<DockerApiStreamedResponse> MakeRequestForStreamedResponseAsync(
Expand All @@ -292,11 +301,14 @@ internal async Task<DockerApiStreamedResponse> MakeRequestForStreamedResponseAsy
IQueryString queryString,
CancellationToken cancellationToken)
{
var response = await PrivateMakeRequestAsync(s_InfiniteTimeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, null, null, cancellationToken);
var response = await PrivateMakeRequestAsync(SInfiniteTimeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, null, null, cancellationToken)
.ConfigureAwait(false);

await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers);
await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers)
.ConfigureAwait(false);

var body = await response.Content.ReadAsStreamAsync();
var body = await response.Content.ReadAsStreamAsync()
.ConfigureAwait(false);

return new DockerApiStreamedResponse(response.StatusCode, body, response.Headers);
}
Expand All @@ -310,7 +322,7 @@ internal Task<WriteClosableStream> MakeRequestForHijackedStreamAsync(
IDictionary<string, string> headers,
CancellationToken cancellationToken)
{
return MakeRequestForHijackedStreamAsync(errorHandlers, method, path, queryString, body, headers, s_InfiniteTimeout, cancellationToken);
return MakeRequestForHijackedStreamAsync(errorHandlers, method, path, queryString, body, headers, SInfiniteTimeout, cancellationToken);
}

internal async Task<WriteClosableStream> MakeRequestForHijackedStreamAsync(
Expand All @@ -323,18 +335,20 @@ internal async Task<WriteClosableStream> MakeRequestForHijackedStreamAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
var response = await PrivateMakeRequestAsync(timeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, headers, body, cancellationToken).ConfigureAwait(false);
var response = await PrivateMakeRequestAsync(timeout, HttpCompletionOption.ResponseHeadersRead, method, path, queryString, headers, body, cancellationToken)
.ConfigureAwait(false);

await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers);
await HandleIfErrorResponseAsync(response.StatusCode, response, errorHandlers)
.ConfigureAwait(false);

var content = response.Content as HttpConnectionResponseContent;
if (content == null)
if (!(response.Content is HttpConnectionResponseContent content))
{
throw new NotSupportedException("message handler does not support hijacked streams");
}

var stream = await content.ReadAsStreamAsync()
.ConfigureAwait(false);

return (WriteClosableStream)stream;
}

Expand All @@ -350,25 +364,56 @@ private async Task<HttpResponseMessage> PrivateMakeRequestAsync(
{
var request = PrepareRequest(method, path, queryString, headers, data);

if (timeout != s_InfiniteTimeout)
if (timeout != SInfiniteTimeout)
{
using (var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
timeoutTokenSource.CancelAfter(timeout);
return await _client.SendAsync(request, completionOption, timeoutTokenSource.Token).ConfigureAwait(false);
return await _client.SendAsync(request, completionOption, timeoutTokenSource.Token)
.ConfigureAwait(false);
}
}

var tcs = new TaskCompletionSource<HttpResponseMessage>();
using (cancellationToken.Register(() => tcs.SetCanceled()))
{
return await await Task.WhenAny(tcs.Task, _client.SendAsync(request, completionOption, cancellationToken)).ConfigureAwait(false);
return await await Task.WhenAny(tcs.Task, _client.SendAsync(request, completionOption, cancellationToken))
.ConfigureAwait(false);
}
}

internal HttpRequestMessage PrepareRequest(HttpMethod method, string path, IQueryString queryString, IDictionary<string, string> headers, IRequestContent data)
{
if (string.IsNullOrEmpty(path))
{
throw new ArgumentNullException(nameof(path));
}

var request = new HttpRequestMessage(method, HttpUtility.BuildUri(_endpointBaseUri, _requestedApiVersion, path, queryString));
request.Version = new Version(1, 1);
request.Headers.Add("User-Agent", UserAgent);

var customHeaders = headers == null
? Configuration.DefaultHttpRequestHeaders
: Configuration.DefaultHttpRequestHeaders.Concat(headers);

foreach (var header in customHeaders)
{
request.Headers.Add(header.Key, header.Value);
}

if (data != null)
{
var requestContent = data.GetContent(); // make the call only once.
request.Content = requestContent;
}

return request;
}

private async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResponseMessage response, IEnumerable<ApiResponseErrorHandlingDelegate> handlers)
{
bool isErrorResponse = statusCode < HttpStatusCode.OK || statusCode >= HttpStatusCode.BadRequest;
var isErrorResponse = statusCode < HttpStatusCode.OK || statusCode >= HttpStatusCode.BadRequest;

string responseBody = null;

Expand All @@ -377,7 +422,8 @@ private async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpRes
// If it is not an error response, we do not read the response body because the caller may wish to consume it.
// If it is an error response, we do because there is nothing else going to be done with it anyway and
// we want to report the response body in the error message as it contains potentially useful info.
responseBody = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
responseBody = await response.Content.ReadAsStringAsync()
.ConfigureAwait(false);
}

// If no customer handlers just default the response.
Expand All @@ -396,9 +442,9 @@ private async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpRes
}
}

public async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResponseMessage response)
private async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResponseMessage response)
{
bool isErrorResponse = statusCode < HttpStatusCode.OK || statusCode >= HttpStatusCode.BadRequest;
var isErrorResponse = statusCode < HttpStatusCode.OK || statusCode >= HttpStatusCode.BadRequest;

string responseBody = null;

Expand All @@ -407,7 +453,8 @@ public async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResp
// If it is not an error response, we do not read the response body because the caller may wish to consume it.
// If it is an error response, we do because there is nothing else going to be done with it anyway and
// we want to report the response body in the error message as it contains potentially useful info.
responseBody = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
responseBody = await response.Content.ReadAsStringAsync()
.ConfigureAwait(false);
}

// No custom handler was fired. Default the response for generic success/failures.
Expand All @@ -416,41 +463,6 @@ public async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResp
throw new DockerApiException(statusCode, responseBody);
}
}

internal HttpRequestMessage PrepareRequest(HttpMethod method, string path, IQueryString queryString, IDictionary<string, string> headers, IRequestContent data)
{
if (string.IsNullOrEmpty(path))
{
throw new ArgumentNullException(nameof(path));
}

var request = new HttpRequestMessage(method, HttpUtility.BuildUri(_endpointBaseUri, this._requestedApiVersion, path, queryString));

request.Version = new Version(1, 1);

request.Headers.Add("User-Agent", UserAgent);

if (headers != null)
{
foreach (var header in headers)
{
request.Headers.Add(header.Key, header.Value);
}
}

if (data != null)
{
var requestContent = data.GetContent(); // make the call only once.
request.Content = requestContent;
}

return request;
}

public void Dispose()
{
Configuration.Dispose();
}
}

internal delegate void ApiResponseErrorHandlingDelegate(HttpStatusCode statusCode, string responseBody);
Expand Down
Loading

0 comments on commit 6285c9a

Please sign in to comment.