Skip to content

Commit

Permalink
Support for k8s 1.16 and cert-manager 0.11.0+ (#18)
Browse files Browse the repository at this point in the history
* Added support for k8s 1.16. Updated to newer version of CertManager
  • Loading branch information
winromulus authored Oct 31, 2019
1 parent 1a58147 commit 65aca81
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
{
public static class CertManagerConstants
{
public static string CrdGroup => "certmanager.k8s.io";
public static string CertificatePlural => "certificates";
public static string CertificateNameLabel => "certmanager.k8s.io/certificate-name";
public static string CertificateKind => "Certificate";
public static string CertificatePlural => "certificates";

public static string CrdGroup => "cert-manager.io";
public static string CertificateNameLabel => "cert-manager.io/certificate-name";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ namespace ES.Kubernetes.Reflector.CertManager.Events
{
public class InternalCertificateWatcherEvent : WatcherEvent<Certificate>
{
public string CertificateResourceDefinitionVersion { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using ES.Kubernetes.Reflector.Core.Events;
using System.Collections.Generic;
using ES.Kubernetes.Reflector.Core.Events;
using k8s.Models;

namespace ES.Kubernetes.Reflector.CertManager.Events
{
public class InternalSecretWatcherEvent : WatcherEvent<V1Secret>
{
public string CertificateResourceDefinitionVersion { get; set; }
public List<string> CertificateResourceDefinitionVersions { get; set; }
}
}
115 changes: 67 additions & 48 deletions ES.Kubernetes.Reflector.CertManager/Monitor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ES.Kubernetes.Reflector.CertManager.Constants;
Expand All @@ -20,24 +22,26 @@ namespace ES.Kubernetes.Reflector.CertManager
{
public class Monitor : IHostedService, IHealthCheck
{
private readonly ManagedWatcher<Certificate> _certificatesWatcher;
private readonly ManagedWatcher<V1beta1CustomResourceDefinition> _crdWatcher;
private readonly Func<ManagedWatcher<Certificate, object>> _certificatesWatcherFactory;

private readonly Dictionary<string, ManagedWatcher<Certificate, object>> _certificatesWatchers =
new Dictionary<string, ManagedWatcher<Certificate, object>>();

private readonly ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> _crdWatcher;
private readonly FeederQueue<WatcherEvent> _eventQueue;
private readonly ILogger<Monitor> _logger;
private readonly IMediator _mediator;
private readonly ManagedWatcher<V1Secret> _secretsWatcher;

private string _crdVersion;
private readonly ManagedWatcher<V1Secret, V1SecretList> _secretsWatcher;

public Monitor(ILogger<Monitor> logger,
ManagedWatcher<V1beta1CustomResourceDefinition> crdWatcher,
ManagedWatcher<Certificate> certificatesWatcher,
ManagedWatcher<V1Secret> secretsWatcher,
ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> crdWatcher,
Func<ManagedWatcher<Certificate, object>> certificatesWatcherFactory,
ManagedWatcher<V1Secret, V1SecretList> secretsWatcher,
IMediator mediator)
{
_logger = logger;
_crdWatcher = crdWatcher;
_certificatesWatcher = certificatesWatcher;
_certificatesWatcherFactory = certificatesWatcherFactory;
_secretsWatcher = secretsWatcher;
_mediator = mediator;

Expand All @@ -47,17 +51,14 @@ public Monitor(ILogger<Monitor> logger,
_secretsWatcher.OnStateChanged = OnWatcherStateChanged;
_secretsWatcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalSecretWatcherEvent
{ Item = e.Item, Type = e.Type, CertificateResourceDefinitionVersion = _crdVersion });
{
Item = e.Item, Type = e.Type,
CertificateResourceDefinitionVersions = _certificatesWatchers.Keys.ToList()
});
_secretsWatcher.RequestFactory = async c =>
await c.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true);


_certificatesWatcher.OnStateChanged = OnWatcherStateChanged;
_certificatesWatcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalCertificateWatcherEvent
{ Item = e.Item, Type = e.Type, CertificateResourceDefinitionVersion = _crdVersion });


_crdWatcher.EventHandlerFactory = OnCrdEvent;
_crdWatcher.RequestFactory = async c =>
await c.ListCustomResourceDefinitionWithHttpMessagesAsync(watch: true);
Expand All @@ -66,22 +67,31 @@ public Monitor(ILogger<Monitor> logger,
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {state}", typeof(V1beta1CustomResourceDefinition).Name,
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
update.State);
await sender.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {state}",
typeof(V1beta1CustomResourceDefinition).Name, update.State);
typeof(V1CustomResourceDefinition).Name, update.State);
break;
default:
_logger.LogDebug("{type} watcher {state}", typeof(V1beta1CustomResourceDefinition).Name,
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
update.State);
break;
}
};
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(_crdWatcher.IsFaulted || _secretsWatcher.IsFaulted ||
_certificatesWatchers.Values.Any(s => s.IsFaulted)
? HealthCheckResult.Unhealthy()
: HealthCheckResult.Healthy());
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await _crdWatcher.Start();
Expand All @@ -90,30 +100,32 @@ public async Task StartAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
await _crdWatcher.Stop();
await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
await _secretsWatcher.Stop();
}

private async Task OnWatcherStateChanged<TS>(ManagedWatcher<TS, WatcherEvent<TS>> sender,
private async Task OnWatcherStateChanged<TS, TSL>(ManagedWatcher<TS, TSL, WatcherEvent<TS>> sender,
ManagedWatcherStateUpdate update) where TS : class, IKubernetesObject
{
var tag = sender.Tag ?? string.Empty;
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {state}", typeof(TS).Name, update.State);
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TS).Name, tag, update.State);
await _secretsWatcher.Stop();
await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();

await _eventQueue.WaitAndClear();

await _secretsWatcher.Start();
await _certificatesWatcher.Start();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {state}", typeof(TS).Name, update.State);
_logger.LogError(update.Exception, "{type} watcher {tag} {state}", typeof(TS).Name, tag,
update.State);
break;
default:
_logger.LogDebug("{type} watcher {state}", typeof(TS).Name, update.State);
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TS).Name, tag, update.State);
break;
}
}
Expand All @@ -131,47 +143,54 @@ private async Task OnEventHandlingError(WatcherEvent e, Exception ex)
_logger.LogError(ex, "Failed to process {eventType} {kind} {@id} due to exception",
e.Type, e.Item.Kind, id);
await _secretsWatcher.Stop();
await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
_eventQueue.Clear();

_logger.LogTrace("Watchers restarting");
await _secretsWatcher.Start();
await _certificatesWatcher.Start();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Start();
_logger.LogTrace("Watchers restarted");
}


private async Task OnCrdEvent(WatcherEvent<V1beta1CustomResourceDefinition> request)
private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
{
if (request.Type != WatchEventType.Added && request.Type != WatchEventType.Modified) return;
if (request.Item.Spec?.Names == null) return;

if (request.Item.Spec.Group != CertManagerConstants.CrdGroup ||
request.Item.Spec.Names.Kind != CertManagerConstants.CertificateKind) return;
if (request.Item.Spec.Version == _crdVersion) return;
var versions = request.Item.Spec.Versions.Select(s => s.Name).ToList();
if (versions.TrueForAll(s => _certificatesWatchers.ContainsKey(s))) return;

_crdVersion = request.Item.Spec.Version;
_logger.LogInformation("{crdType} {kind} version updated to {crdGroup}/{version}",
typeof(V1beta1CustomResourceDefinition).Name,
CertManagerConstants.CertificateKind,
CertManagerConstants.CrdGroup,
request.Item.Spec.Version);
_logger.LogInformation("{crdType} {kind} in group {group} versions updated to {versions}",
typeof(V1CustomResourceDefinition).Name,
request.Item.Spec.Names.Kind,
request.Item.Spec.Group,
versions);

await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
await _secretsWatcher.Stop();

_certificatesWatcher.RequestFactory = async client =>
await client.ListClusterCustomObjectWithHttpMessagesAsync(request.Item.Spec.Group,
request.Item.Spec.Version, request.Item.Spec.Names.Plural, watch: true, timeoutSeconds: (int)TimeSpan.FromHours(1).TotalSeconds);
_certificatesWatchers.Clear();

await _certificatesWatcher.Start();
await _secretsWatcher.Start();
}
foreach (var version in versions)
{
var watcher = _certificatesWatcherFactory();
watcher.Tag = version;
watcher.OnStateChanged = OnWatcherStateChanged;
watcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalCertificateWatcherEvent {Item = e.Item, Type = e.Type});
watcher.RequestFactory = async client => await client.ListClusterCustomObjectWithHttpMessagesAsync(
request.Item.Spec.Group,
version, request.Item.Spec.Names.Plural, watch: true,
timeoutSeconds: (int) TimeSpan.FromHours(1).TotalSeconds);
_certificatesWatchers.Add(version, watcher);
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(_crdWatcher.IsFaulted || _secretsWatcher.IsFaulted || _certificatesWatcher.IsFaulted
? HealthCheckResult.Unhealthy()
: HealthCheckResult.Healthy());

foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Start();
await _secretsWatcher.Start();
}
}
}
6 changes: 4 additions & 2 deletions ES.Kubernetes.Reflector.CertManager/Resources/Certificate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ public class Certificate : IKubernetesObject
[JsonProperty(PropertyName = "metadata")]
public V1ObjectMeta Metadata { get; set; }

[JsonProperty(PropertyName = "spec")] public SpecDefinition Spec { get; set; }
[JsonProperty(PropertyName = "spec")]
public SpecDefinition Spec { get; set; }

[JsonProperty(PropertyName = "apiVersion")]
public string ApiVersion { get; set; }

[JsonProperty(PropertyName = "kind")] public string Kind { get; set; }
[JsonProperty(PropertyName = "kind")]
public string Kind { get; set; }

public class SpecDefinition
{
Expand Down
35 changes: 20 additions & 15 deletions ES.Kubernetes.Reflector.CertManager/SecretEtcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,26 @@ public async Task Handle(InternalSecretWatcherEvent notification, CancellationTo
CertManagerConstants.CertificateKind, certificateId, secretId);

Certificate certificate = null;
try
{
var certificateJObject = await _client.GetNamespacedCustomObjectAsync(CertManagerConstants.CrdGroup,
notification.CertificateResourceDefinitionVersion, metadata.NamespaceProperty,
CertManagerConstants.CertificatePlural,
certificateName, cancellationToken);
certificate = ((JObject) certificateJObject).ToObject<Certificate>();
}
catch (HttpOperationException exception) when (exception.Response.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogDebug("Could not find {kind} {@id}",
CertManagerConstants.CertificateKind, certificateId);
}

if (certificate != null) await Annotate(secret, certificate);
foreach (var certificateResourceDefinitionVersion in notification.CertificateResourceDefinitionVersions)
try
{
var certificateJObject = await _client.GetNamespacedCustomObjectAsync(
CertManagerConstants.CrdGroup,
certificateResourceDefinitionVersion, metadata.NamespaceProperty,
CertManagerConstants.CertificatePlural,
certificateName, cancellationToken);
certificate = ((JObject) certificateJObject).ToObject<Certificate>();
}
catch (HttpOperationException exception) when (exception.Response.StatusCode ==
HttpStatusCode.NotFound)
{
}

if (certificate != null)
await Annotate(secret, certificate);
else
_logger.LogDebug("Could not find {kind} {@id}", CertManagerConstants.CertificateKind,
certificateId);
}
}

Expand Down
19 changes: 10 additions & 9 deletions ES.Kubernetes.Reflector.ConfigMaps/Mirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

namespace ES.Kubernetes.Reflector.ConfigMaps
{
public class Mirror : ResourceMirror<V1ConfigMap>, IHostedService, IHealthCheck
public class Mirror : ResourceMirror<V1ConfigMap, V1ConfigMapList>, IHostedService, IHealthCheck
{
public Mirror(ILogger<Mirror> logger, IKubernetes client,
ManagedWatcher<V1ConfigMap> configMapWatcher,
ManagedWatcher<V1Namespace> namespaceWatcher)
ManagedWatcher<V1ConfigMap, V1ConfigMapList> configMapWatcher,
ManagedWatcher<V1Namespace, V1NamespaceList> namespaceWatcher)
: base(logger, client, configMapWatcher, namespaceWatcher)
{
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(IsFaulted ? HealthCheckResult.Unhealthy() : HealthCheckResult.Healthy());
}


public async Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -38,7 +44,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
}


protected override async Task<HttpOperationResponse> OnResourceWatcher(IKubernetes client)
protected override async Task<HttpOperationResponse<V1ConfigMapList>> OnResourceWatcher(IKubernetes client)
{
return await client.ListConfigMapForAllNamespacesWithHttpMessagesAsync(watch: true);
}
Expand Down Expand Up @@ -98,10 +104,5 @@ protected override async Task OnResourcePatch(IKubernetes client, V1ConfigMap ta
await client.PatchNamespacedConfigMapWithHttpMessagesAsync(new V1Patch(patch),
target.Metadata.Name, target.Metadata.NamespaceProperty);
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(IsFaulted ? HealthCheckResult.Unhealthy() : HealthCheckResult.Healthy());
}
}
}
2 changes: 1 addition & 1 deletion ES.Kubernetes.Reflector.Core/CoreModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public class CoreModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterGeneric(typeof(ManagedWatcher<>));
builder.RegisterGeneric(typeof(ManagedWatcher<,>));
builder.RegisterGeneric(typeof(ManagedWatcher<,,>));


builder.Register(s =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Autofac" Version="4.9.2" />
<PackageReference Include="KubernetesClient" Version="1.5.19" />
<PackageReference Include="Autofac" Version="4.9.4" />
<PackageReference Include="KubernetesClient" Version="1.6.3" />
<PackageReference Include="MediatR" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="2.2.5" />
Expand Down
Loading

0 comments on commit 65aca81

Please sign in to comment.