Skip to content

Commit

Permalink
fix(Schema Registry): append '/' to RestService.BaseAddress and rem…
Browse files Browse the repository at this point in the history
…ove '/' from subject requests (#996)

* fix(Schema Registry): append '/' to `RestService.BaseAddress` and remove '/' from subject requests

* remove starting '/' from config requests

* use trim instead of `Regex.Replace`
  • Loading branch information
jonathansant authored and mhowlett committed Jul 8, 2019
1 parent 1c05a0f commit bdc66b8
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions src/Confluent.SchemaRegistry/Rest/RestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
//
// Refer to LICENSE for more information.

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Net;
using System.Linq;
using System.Net;
using System.Net.Http;
using System;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;


namespace Confluent.SchemaRegistry
Expand Down Expand Up @@ -52,23 +52,29 @@ internal class RestService : IRestService
/// Initializes a new instance of the RestService class.
/// </summary>
public RestService(string schemaRegistryUrl, int timeoutMs, string username, string password)
{
var authorizationHeader = username != null && password != null
{
var authorizationHeader = username != null && password != null
? new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")))
: null;

this.clients = schemaRegistryUrl
.Split(',')
.Select(uri => uri.StartsWith("http", StringComparison.Ordinal) ? uri : "http://" + uri) // need http or https - use http if not present.
.Select(uri =>
{
var client = new HttpClient() { BaseAddress = new Uri(uri, UriKind.Absolute), Timeout = TimeSpan.FromMilliseconds(timeoutMs) };
if (authorizationHeader != null) { client.DefaultRequestHeaders.Authorization = authorizationHeader; }
return client;
})
.Select(SanitizeUri)// need http or https - use http if not present.
.Select(uri =>
{
var client = new HttpClient { BaseAddress = new Uri(uri, UriKind.Absolute), Timeout = TimeSpan.FromMilliseconds(timeoutMs) };
if (authorizationHeader != null) { client.DefaultRequestHeaders.Authorization = authorizationHeader; }
return client;
})
.ToList();
}

private static string SanitizeUri(string uri)
{
var sanitized = uri.StartsWith("http", StringComparison.Ordinal) ? uri : $"http://{uri}";
return $"{sanitized.TrimEnd('/')}/";
}

#region Base Requests

private async Task<HttpResponseMessage> ExecuteOnOneInstanceAsync(Func<HttpRequestMessage> createRequest)
Expand All @@ -83,8 +89,8 @@ private async Task<HttpResponseMessage> ExecuteOnOneInstanceAsync(Func<HttpReque

string aggregatedErrorMessage = null;
HttpResponseMessage response = null;
bool firstError = true;
bool firstError = true;

int startClientIndex;
lock (lastClientUsedLock)
{
Expand Down Expand Up @@ -219,75 +225,75 @@ private HttpRequestMessage CreateRequest(string endPoint, HttpMethod method, par
#region Schemas

public async Task<string> GetSchemaAsync(int id)
=> (await RequestAsync<SchemaString>($"/schemas/ids/{id}", HttpMethod.Get)
=> (await RequestAsync<SchemaString>($"schemas/ids/{id}", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false)).Schema;

#endregion Schemas

#region Subjects

public async Task<List<string>> GetSubjectsAsync()
=> await RequestListOfAsync<string>("/subjects", HttpMethod.Get)
=> await RequestListOfAsync<string>("subjects", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false);

public async Task<List<int>> GetSubjectVersionsAsync(string subject)
=> await RequestListOfAsync<int>($"/subjects/{subject}/versions", HttpMethod.Get)
=> await RequestListOfAsync<int>($"subjects/{subject}/versions", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false);

public async Task<Schema> GetSchemaAsync(string subject, int version)
=> await RequestAsync<Schema>($"/subjects/{subject}/versions/{version}", HttpMethod.Get)
=> await RequestAsync<Schema>($"subjects/{subject}/versions/{version}", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false);

public async Task<Schema> GetLatestSchemaAsync(string subject)
=> await RequestAsync<Schema>($"/subjects/{subject}/versions/latest", HttpMethod.Get)
=> await RequestAsync<Schema>($"subjects/{subject}/versions/latest", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false);

public async Task<int> RegisterSchemaAsync(string subject, string schema)
=> (await RequestAsync<SchemaId>($"/subjects/{subject}/versions", HttpMethod.Post, new SchemaString(schema))
=> (await RequestAsync<SchemaId>($"subjects/{subject}/versions", HttpMethod.Post, new SchemaString(schema))
.ConfigureAwait(continueOnCapturedContext: false)).Id;

public async Task<Schema> CheckSchemaAsync(string subject, string schema, bool ignoreDeletedSchemas)
=> await RequestAsync<Schema>($"/subjects/{subject}?deleted={!ignoreDeletedSchemas}", HttpMethod.Post, new SchemaString(schema))
=> await RequestAsync<Schema>($"subjects/{subject}?deleted={!ignoreDeletedSchemas}", HttpMethod.Post, new SchemaString(schema))
.ConfigureAwait(continueOnCapturedContext: false);

public async Task<Schema> CheckSchemaAsync(string subject, string schema)
=> await RequestAsync<Schema>($"/subjects/{subject}", HttpMethod.Post, new SchemaString(schema))
=> await RequestAsync<Schema>($"subjects/{subject}", HttpMethod.Post, new SchemaString(schema))
.ConfigureAwait(continueOnCapturedContext: false);

#endregion Subjects

#region Compatibility

public async Task<bool> TestCompatibilityAsync(string subject, int versionId, string schema)
=> (await RequestAsync<CompatibilityCheck>($"/compatibility/subjects/{subject}/versions/{versionId}", HttpMethod.Post, new SchemaString(schema))
=> (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{subject}/versions/{versionId}", HttpMethod.Post, new SchemaString(schema))
.ConfigureAwait(continueOnCapturedContext: false)).IsCompatible;

public async Task<bool> TestLatestCompatibilityAsync(string subject, string schema)
=> (await RequestAsync<CompatibilityCheck>($"/compatibility/subjects/{subject}/versions/latest", HttpMethod.Post, new SchemaString(schema))
=> (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{subject}/versions/latest", HttpMethod.Post, new SchemaString(schema))
.ConfigureAwait(continueOnCapturedContext: false)).IsCompatible;

#endregion Compatibility

#region Config

public async Task<Compatibility> GetGlobalCompatibilityAsync()
=> (await RequestAsync<Config>("/config", HttpMethod.Get)
=> (await RequestAsync<Config>("config", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;

public async Task<Compatibility> GetCompatibilityAsync(string subject)
=> (await RequestAsync<Config>($"/config/{subject}", HttpMethod.Get)
=> (await RequestAsync<Config>($"config/{subject}", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;

public async Task<Config> SetGlobalCompatibilityAsync(Compatibility compatibility)
=> await RequestAsync<Config>("/config", HttpMethod.Put, new Config(compatibility))
=> await RequestAsync<Config>("config", HttpMethod.Put, new Config(compatibility))
.ConfigureAwait(continueOnCapturedContext: false);

public async Task<Config> SetCompatibilityAsync(string subject, Compatibility compatibility)
=> await RequestAsync<Config>($"/config/{subject}", HttpMethod.Put, new Config(compatibility))
.ConfigureAwait(continueOnCapturedContext: false);
=> await RequestAsync<Config>($"config/{subject}", HttpMethod.Put, new Config(compatibility))
.ConfigureAwait(continueOnCapturedContext: false);

#endregion Config


public void Dispose()
{
Dispose(true);
Expand Down

0 comments on commit bdc66b8

Please sign in to comment.