diff --git a/cluster/cluster.go b/cluster/cluster.go index e24883eb98..5fedba2728 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "errors" "github.com/weaveworks/flux" @@ -25,11 +26,11 @@ const ( // are distinct interfaces. type Cluster interface { // Get all of the services (optionally, from a specific namespace), excluding those - AllWorkloads(maybeNamespace string) ([]Workload, error) - SomeWorkloads([]flux.ResourceID) ([]Workload, error) + AllWorkloads(ctx context.Context, maybeNamespace string) ([]Workload, error) + SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]Workload, error) IsAllowedResource(flux.ResourceID) bool Ping() error - Export() ([]byte, error) + Export(ctx context.Context) ([]byte, error) Sync(SyncSet) error PublicSSHKey(regenerate bool) (ssh.PublicKey, error) } diff --git a/cluster/kubernetes/images.go b/cluster/kubernetes/images.go index dccf306840..e4fdcc912e 100644 --- a/cluster/kubernetes/images.go +++ b/cluster/kubernetes/images.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "fmt" "github.com/go-kit/kit/log" @@ -122,8 +123,9 @@ func mergeCredentials(log func(...interface{}) error, // ImagesToFetch is a k8s specific method to get a list of images to update along with their credentials func (c *Cluster) ImagesToFetch() registry.ImageCreds { allImageCreds := make(registry.ImageCreds) + ctx := context.Background() - namespaces, err := c.getAllowedAndExistingNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces(ctx) if err != nil { c.logger.Log("err", errors.Wrap(err, "getting namespaces")) return allImageCreds @@ -132,7 +134,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds { for _, ns := range namespaces { seenCreds := make(map[string]registry.Credentials) for kind, resourceKind := range resourceKinds { - workloads, err := resourceKind.getWorkloads(c, ns.Name) + workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name) if err != nil { if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) { // Skip unsupported or forbidden resource kinds diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index 5de47558a6..a9000fa5ee 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -2,6 +2,7 @@ package kubernetes import ( "bytes" + "context" "encoding/json" "fmt" "sync" @@ -127,7 +128,7 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, // SomeWorkloads returns the workloads named, missing out any that don't // exist in the cluster or aren't in an allowed namespace. // They do not necessarily have to be returned in the order requested. -func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) { +func (c *Cluster) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) (res []cluster.Workload, err error) { var workloads []cluster.Workload for _, id := range ids { if !c.IsAllowedResource(id) { @@ -141,7 +142,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, continue } - workload, err := resourceKind.getWorkload(c, ns, name) + workload, err := resourceKind.getWorkload(ctx, c, ns, name) if err != nil { if apierrors.IsForbidden(err) || apierrors.IsNotFound(err) { continue @@ -161,8 +162,8 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, // AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in // the namespace (or any namespace if that argument is empty) -func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) { - namespaces, err := c.getAllowedAndExistingNamespaces() +func (c *Cluster) AllWorkloads(ctx context.Context, namespace string) (res []cluster.Workload, err error) { + namespaces, err := c.getAllowedAndExistingNamespaces(ctx) if err != nil { return nil, errors.Wrap(err, "getting namespaces") } @@ -174,7 +175,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er } for kind, resourceKind := range resourceKinds { - workloads, err := resourceKind.getWorkloads(c, ns.Name) + workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name) if err != nil { switch { case apierrors.IsNotFound(err): @@ -219,10 +220,10 @@ func (c *Cluster) Ping() error { } // Export exports cluster resources -func (c *Cluster) Export() ([]byte, error) { +func (c *Cluster) Export(ctx context.Context) ([]byte, error) { var config bytes.Buffer - namespaces, err := c.getAllowedAndExistingNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces(ctx) if err != nil { return nil, errors.Wrap(err, "getting namespaces") } @@ -240,7 +241,7 @@ func (c *Cluster) Export() ([]byte, error) { } for _, resourceKind := range resourceKinds { - workloads, err := resourceKind.getWorkloads(c, ns.Name) + workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name) if err != nil { switch { case apierrors.IsNotFound(err): @@ -281,7 +282,7 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) { // the Flux instance is expected to have access to and can look for resources inside of. // It returns a list of all namespaces unless an explicit list of allowed namespaces // has been set on the Cluster instance. -func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) { +func (c *Cluster) getAllowedAndExistingNamespaces(ctx context.Context) ([]apiv1.Namespace, error) { if len(c.allowedNamespaces) > 0 { nsList := []apiv1.Namespace{} for _, name := range c.allowedNamespaces { diff --git a/cluster/kubernetes/kubernetes_test.go b/cluster/kubernetes/kubernetes_test.go index 4e16c6c290..6ebfee91c9 100644 --- a/cluster/kubernetes/kubernetes_test.go +++ b/cluster/kubernetes/kubernetes_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "reflect" "testing" @@ -28,7 +29,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin client := ExtendedClient{coreClient: clientset} c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{}) - namespaces, err := c.getAllowedAndExistingNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces(context.Background()) if err != nil { t.Errorf("The error should be nil, not: %s", err) } diff --git a/cluster/kubernetes/resourcekinds.go b/cluster/kubernetes/resourcekinds.go index dab1bd207b..2958227e61 100644 --- a/cluster/kubernetes/resourcekinds.go +++ b/cluster/kubernetes/resourcekinds.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "strings" apiapps "k8s.io/api/apps/v1" @@ -30,8 +31,8 @@ const AntecedentAnnotation = "flux.weave.works/antecedent" // Kind registry type resourceKind interface { - getWorkload(c *Cluster, namespace, name string) (workload, error) - getWorkloads(c *Cluster, namespace string) ([]workload, error) + getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) + getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) } var ( @@ -114,7 +115,7 @@ func (w workload) toClusterWorkload(resourceID flux.ResourceID) cluster.Workload type deploymentKind struct{} -func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workload, error) { +func (dk *deploymentKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) { deployment, err := c.client.AppsV1().Deployments(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { return workload{}, err @@ -123,7 +124,7 @@ func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workl return makeDeploymentWorkload(deployment), nil } -func (dk *deploymentKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) { +func (dk *deploymentKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) { deployments, err := c.client.AppsV1().Deployments(namespace).List(meta_v1.ListOptions{}) if err != nil { return nil, err @@ -191,7 +192,7 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload { type daemonSetKind struct{} -func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) { +func (dk *daemonSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) { daemonSet, err := c.client.AppsV1().DaemonSets(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { return workload{}, err @@ -200,7 +201,7 @@ func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (worklo return makeDaemonSetWorkload(daemonSet), nil } -func (dk *daemonSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) { +func (dk *daemonSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) { daemonSets, err := c.client.AppsV1().DaemonSets(namespace).List(meta_v1.ListOptions{}) if err != nil { return nil, err @@ -252,7 +253,7 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload { type statefulSetKind struct{} -func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) { +func (dk *statefulSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) { statefulSet, err := c.client.AppsV1().StatefulSets(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { return workload{}, err @@ -261,7 +262,7 @@ func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (work return makeStatefulSetWorkload(statefulSet), nil } -func (dk *statefulSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) { +func (dk *statefulSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) { statefulSets, err := c.client.AppsV1().StatefulSets(namespace).List(meta_v1.ListOptions{}) if err != nil { return nil, err @@ -345,7 +346,7 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload { type cronJobKind struct{} -func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload, error) { +func (dk *cronJobKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) { cronJob, err := c.client.BatchV1beta1().CronJobs(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { return workload{}, err @@ -354,7 +355,7 @@ func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload return makeCronJobWorkload(cronJob), nil } -func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) { +func (dk *cronJobKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) { cronJobs, err := c.client.BatchV1beta1().CronJobs(namespace).List(meta_v1.ListOptions{}) if err != nil { return nil, err @@ -382,7 +383,7 @@ func makeCronJobWorkload(cronJob *apibatch.CronJob) workload { type fluxHelmReleaseKind struct{} -func (fhr *fluxHelmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) { +func (fhr *fluxHelmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) { fluxHelmRelease, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { return workload{}, err @@ -390,7 +391,7 @@ func (fhr *fluxHelmReleaseKind) getWorkload(c *Cluster, namespace, name string) return makeFluxHelmReleaseWorkload(fluxHelmRelease), nil } -func (fhr *fluxHelmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) { +func (fhr *fluxHelmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) { fluxHelmReleases, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).List(meta_v1.ListOptions{}) if err != nil { return nil, err @@ -444,7 +445,7 @@ func createK8sFHRContainers(values map[string]interface{}) []apiv1.Container { type helmReleaseKind struct{} -func (hr *helmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) { +func (hr *helmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) { helmRelease, err := c.client.FluxV1beta1().HelmReleases(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { return workload{}, err @@ -452,7 +453,7 @@ func (hr *helmReleaseKind) getWorkload(c *Cluster, namespace, name string) (work return makeHelmReleaseWorkload(helmRelease), nil } -func (hr *helmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) { +func (hr *helmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) { helmReleases, err := c.client.FluxV1beta1().HelmReleases(namespace).List(meta_v1.ListOptions{}) if err != nil { return nil, err diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index acc858be45..502de85fe1 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -2,6 +2,7 @@ package kubernetes import ( "bytes" + "context" "crypto/sha1" "crypto/sha256" "encoding/base64" @@ -292,7 +293,7 @@ func (c *Cluster) listAllowedResources( } // List resources only from the allowed namespaces - namespaces, err := c.getAllowedAndExistingNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces(context.Background()) if err != nil { return nil, err } diff --git a/cluster/mock/mock.go b/cluster/mock/mock.go index ae6c47bef6..a16633fd09 100644 --- a/cluster/mock/mock.go +++ b/cluster/mock/mock.go @@ -2,6 +2,7 @@ package mock import ( "bytes" + "context" "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" @@ -14,11 +15,11 @@ import ( // Doubles as a cluster.Cluster and cluster.Manifests implementation type Mock struct { - AllWorkloadsFunc func(maybeNamespace string) ([]cluster.Workload, error) - SomeWorkloadsFunc func([]flux.ResourceID) ([]cluster.Workload, error) + AllWorkloadsFunc func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) + SomeWorkloadsFunc func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) IsAllowedResourceFunc func(flux.ResourceID) bool PingFunc func() error - ExportFunc func() ([]byte, error) + ExportFunc func(ctx context.Context) ([]byte, error) SyncFunc func(cluster.SyncSet) error PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error) SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) @@ -33,12 +34,12 @@ type Mock struct { var _ cluster.Cluster = &Mock{} var _ manifests.Manifests = &Mock{} -func (m *Mock) AllWorkloads(maybeNamespace string) ([]cluster.Workload, error) { - return m.AllWorkloadsFunc(maybeNamespace) +func (m *Mock) AllWorkloads(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) { + return m.AllWorkloadsFunc(ctx, maybeNamespace) } -func (m *Mock) SomeWorkloads(s []flux.ResourceID) ([]cluster.Workload, error) { - return m.SomeWorkloadsFunc(s) +func (m *Mock) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) { + return m.SomeWorkloadsFunc(ctx, ids) } func (m *Mock) IsAllowedResource(id flux.ResourceID) bool { @@ -49,8 +50,8 @@ func (m *Mock) Ping() error { return m.PingFunc() } -func (m *Mock) Export() ([]byte, error) { - return m.ExportFunc() +func (m *Mock) Export(ctx context.Context) ([]byte, error) { + return m.ExportFunc(ctx) } func (m *Mock) Sync(c cluster.SyncSet) error { diff --git a/daemon/daemon.go b/daemon/daemon.go index f7be8b0082..eee5991b40 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -71,7 +71,7 @@ func (d *Daemon) Ping(ctx context.Context) error { } func (d *Daemon) Export(ctx context.Context) ([]byte, error) { - return d.Cluster.Export() + return d.Cluster.Export(ctx) } func (d *Daemon) getManifestStore(checkout *git.Checkout) (manifests.Store, error) { @@ -122,9 +122,9 @@ func (d *Daemon) ListServicesWithOptions(ctx context.Context, opts v11.ListServi var clusterWorkloads []cluster.Workload var err error if len(opts.Services) > 0 { - clusterWorkloads, err = d.Cluster.SomeWorkloads(opts.Services) + clusterWorkloads, err = d.Cluster.SomeWorkloads(ctx, opts.Services) } else { - clusterWorkloads, err = d.Cluster.AllWorkloads(opts.Namespace) + clusterWorkloads, err = d.Cluster.AllWorkloads(ctx, opts.Namespace) } if err != nil { return nil, errors.Wrap(err, "getting workloads from cluster") @@ -199,12 +199,12 @@ func (d *Daemon) ListImagesWithOptions(ctx context.Context, opts v10.ListImagesO if err != nil { return nil, errors.Wrap(err, "treating workload spec as ID") } - workloads, err = d.Cluster.SomeWorkloads([]flux.ResourceID{id}) + workloads, err = d.Cluster.SomeWorkloads(ctx, []flux.ResourceID{id}) if err != nil { return nil, errors.Wrap(err, "getting some workloads") } } else { - workloads, err = d.Cluster.AllWorkloads(opts.Namespace) + workloads, err = d.Cluster.AllWorkloads(ctx, opts.Namespace) if err != nil { return nil, errors.Wrap(err, "getting all workloads") } diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 6600c5df31..bf27c521df 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -587,7 +587,7 @@ func TestDaemon_Automated(t *testing.T) { }, }, } - k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) { + k8s.SomeWorkloadsFunc = func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) { return []cluster.Workload{workload}, nil } start() @@ -613,7 +613,7 @@ func TestDaemon_Automated_semver(t *testing.T) { }, }, } - k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) { + k8s.SomeWorkloadsFunc = func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) { return []cluster.Workload{workload}, nil } start() @@ -675,7 +675,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *mock.Mock, *mockEventWr var k8s *mock.Mock { k8s = &mock.Mock{} - k8s.AllWorkloadsFunc = func(maybeNamespace string) ([]cluster.Workload, error) { + k8s.AllWorkloadsFunc = func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) { if maybeNamespace == ns { return []cluster.Workload{ singleService, @@ -686,9 +686,9 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *mock.Mock, *mockEventWr return []cluster.Workload{}, nil } k8s.IsAllowedResourceFunc = func(flux.ResourceID) bool { return true } - k8s.ExportFunc = func() ([]byte, error) { return testBytes, nil } + k8s.ExportFunc = func(ctx context.Context) ([]byte, error) { return testBytes, nil } k8s.PingFunc = func() error { return nil } - k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) { + k8s.SomeWorkloadsFunc = func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) { return []cluster.Workload{ singleService, }, nil diff --git a/daemon/images.go b/daemon/images.go index 6fca631d55..6404b825b9 100644 --- a/daemon/images.go +++ b/daemon/images.go @@ -29,7 +29,7 @@ func (d *Daemon) pollForNewImages(logger log.Logger) { return } // Find images to check - workloads, err := d.Cluster.SomeWorkloads(candidateWorkloads.IDs()) + workloads, err := d.Cluster.SomeWorkloads(ctx, candidateWorkloads.IDs()) if err != nil { logger.Log("error", errors.Wrap(err, "checking workloads for new images")) return diff --git a/daemon/sync_test.go b/daemon/sync_test.go index 7e65bc3bc1..2c0d190657 100644 --- a/daemon/sync_test.go +++ b/daemon/sync_test.go @@ -44,7 +44,7 @@ func daemon(t *testing.T) (*Daemon, func()) { repo, repoCleanup := gittest.Repo(t) k8s = &mock.Mock{} - k8s.ExportFunc = func() ([]byte, error) { return nil, nil } + k8s.ExportFunc = func(ctx context.Context) ([]byte, error) { return nil, nil } events = &mockEventWriter{} diff --git a/release/context.go b/release/context.go index c307e951f7..ac279ae4b1 100644 --- a/release/context.go +++ b/release/context.go @@ -85,7 +85,7 @@ func (rc *ReleaseContext) SelectWorkloads(ctx context.Context, results update.Re } // Ask the cluster about those that we're still interested in - definedAndRunning, err := rc.cluster.SomeWorkloads(toAskClusterAbout) + definedAndRunning, err := rc.cluster.SomeWorkloads(ctx, toAskClusterAbout) if err != nil { return nil, err } diff --git a/release/releaser_test.go b/release/releaser_test.go index 8b05056248..f4d7330812 100644 --- a/release/releaser_test.go +++ b/release/releaser_test.go @@ -145,10 +145,10 @@ var ( func mockCluster(running ...cluster.Workload) *mock.Mock { return &mock.Mock{ - AllWorkloadsFunc: func(string) ([]cluster.Workload, error) { + AllWorkloadsFunc: func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) { return running, nil }, - SomeWorkloadsFunc: func(ids []flux.ResourceID) ([]cluster.Workload, error) { + SomeWorkloadsFunc: func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) { var res []cluster.Workload for _, id := range ids { for _, svc := range running {