Skip to content

Commit

Permalink
Use k8s events instead of status messages (open-telemetry#707)
Browse files Browse the repository at this point in the history
* Storing upgrade status on events

Signed-off-by: Yuri Sa <[email protected]>

* Fixing Lint nits

Signed-off-by: Yuri Sa <[email protected]>

* Fixing test checks and converting messages to events

Signed-off-by: Yuri Sa <[email protected]>

* Fixing upgrade test

Signed-off-by: Yuri Sa <[email protected]>

* Fixing var value of expected version

Signed-off-by: Yuri Sa <[email protected]>

* Fixing var value of expected version

Signed-off-by: Yuri Sa <[email protected]>

* Change upgrade script fo v0.19.0

Signed-off-by: Yuri Sa <[email protected]>

* Created const for RecordBufferSize and Removed commented lines

Signed-off-by: Yuri Sa <[email protected]>

* Changed function signature and type name

Signed-off-by: Yuri Sa <[email protected]>

* Changed variable exposure

Signed-off-by: Yuri Sa <[email protected]>
  • Loading branch information
yuriolisa authored Feb 22, 2022
1 parent e6cad70 commit cbdb6ee
Show file tree
Hide file tree
Showing 25 changed files with 232 additions and 116 deletions.
10 changes: 8 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -225,10 +226,15 @@ func addDependencies(_ context.Context, mgr ctrl.Manager, cfg config.Config, v v
if err != nil {
return fmt.Errorf("failed to start the auto-detect mechanism: %w", err)
}

// adds the upgrade mechanism to be executed once the manager is ready
err = mgr.Add(manager.RunnableFunc(func(c context.Context) error {
return collectorupgrade.ManagedInstances(c, ctrl.Log.WithName("collector-upgrade"), v, mgr.GetClient())
up := &collectorupgrade.VersionUpgrade{
Log: ctrl.Log.WithName("collector-upgrade"),
Version: v,
Client: mgr.GetClient(),
Recorder: record.NewFakeRecorder(collectorupgrade.RecordBufferSize),
}
return up.ManagedInstances(c)
}))
if err != nil {
return fmt.Errorf("failed to upgrade OpenTelemetryCollector instances: %w", err)
Expand Down
42 changes: 26 additions & 16 deletions pkg/collector/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,44 @@ import (

semver "github.com/Masterminds/semver/v3"
"github.com/go-logr/logr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/internal/version"
)

type VersionUpgrade struct {
Log logr.Logger
Version version.Version
Client client.Client
Recorder record.EventRecorder
}

const RecordBufferSize int = 10

// ManagedInstances finds all the otelcol instances for the current operator and upgrades them, if necessary.
func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Version, cl client.Client) error {
logger.Info("looking for managed instances to upgrade")
func (u VersionUpgrade) ManagedInstances(ctx context.Context) error {
u.Log.Info("looking for managed instances to upgrade")

opts := []client.ListOption{
client.MatchingLabels(map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
}),
}
list := &v1alpha1.OpenTelemetryCollectorList{}
if err := cl.List(ctx, list, opts...); err != nil {
if err := u.Client.List(ctx, list, opts...); err != nil {
return fmt.Errorf("failed to list: %w", err)
}

for i := range list.Items {
original := list.Items[i]
itemLogger := logger.WithValues("name", original.Name, "namespace", original.Namespace)
itemLogger := u.Log.WithValues("name", original.Name, "namespace", original.Namespace)
if original.Spec.UpgradeStrategy == v1alpha1.UpgradeStrategyNone {
itemLogger.Info("skipping instance upgrade due to UpgradeStrategy")
continue
}
upgraded, err := ManagedInstance(ctx, logger, ver, cl, original)
upgraded, err := u.ManagedInstance(ctx, original)
if err != nil {
// nothing to do at this level, just go to the next instance
continue
Expand All @@ -59,14 +69,14 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
// the resource update overrides the status, so, keep it so that we can reset it later
st := upgraded.Status
patch := client.MergeFrom(&original)
if err := cl.Patch(ctx, &upgraded, patch); err != nil {
if err := u.Client.Patch(ctx, &upgraded, patch); err != nil {
itemLogger.Error(err, "failed to apply changes to instance")
continue
}

// the status object requires its own update
upgraded.Status = st
if err := cl.Status().Patch(ctx, &upgraded, patch); err != nil {
if err := u.Client.Status().Patch(ctx, &upgraded, patch); err != nil {
itemLogger.Error(err, "failed to apply changes to instance's status object")
continue
}
Expand All @@ -76,48 +86,48 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
}

if len(list.Items) == 0 {
logger.Info("no instances to upgrade")
u.Log.Info("no instances to upgrade")
}

return nil
}

// ManagedInstance performs the necessary changes to bring the given otelcol instance to the current version.
func ManagedInstance(ctx context.Context, logger logr.Logger, currentV version.Version, cl client.Client, otelcol v1alpha1.OpenTelemetryCollector) (v1alpha1.OpenTelemetryCollector, error) {
func (u VersionUpgrade) ManagedInstance(ctx context.Context, otelcol v1alpha1.OpenTelemetryCollector) (v1alpha1.OpenTelemetryCollector, error) {
// this is likely a new instance, assume it's already up to date
if otelcol.Status.Version == "" {
return otelcol, nil
}

instanceV, err := semver.NewVersion(otelcol.Status.Version)
if err != nil {
logger.Error(err, "failed to parse version for OpenTelemetry Collector instance", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
u.Log.Error(err, "failed to parse version for OpenTelemetry Collector instance", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
return otelcol, err
}

if instanceV.GreaterThan(&Latest.Version) {
logger.Info("skipping upgrade for OpenTelemetry Collector instance, as it's newer than our latest version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version, "latest", Latest.Version.String())
u.Log.Info("skipping upgrade for OpenTelemetry Collector instance, as it's newer than our latest version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version, "latest", Latest.Version.String())
return otelcol, nil
}

for _, available := range versions {
if available.GreaterThan(instanceV) {
upgraded, err := available.upgrade(cl, &otelcol)
upgraded, err := available.upgrade(u, &otelcol) //available.upgrade(params., &otelcol)

if err != nil {
logger.Error(err, "failed to upgrade managed otelcol instances", "name", otelcol.Name, "namespace", otelcol.Namespace)
u.Log.Error(err, "failed to upgrade managed otelcol instances", "name", otelcol.Name, "namespace", otelcol.Namespace)
return otelcol, err
}

logger.V(1).Info("step upgrade", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", available.String())
u.Log.V(1).Info("step upgrade", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", available.String())
upgraded.Status.Version = available.String()
otelcol = *upgraded
}
}

// at the end of the process, we are up to date with the latest known version, which is what we have from versions.txt
otelcol.Status.Version = currentV.OpenTelemetryCollector
otelcol.Status.Version = u.Version.OpenTelemetryCollector

logger.V(1).Info("final version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
u.Log.V(1).Info("final version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
return otelcol, nil
}
27 changes: 23 additions & 4 deletions pkg/collector/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
Expand Down Expand Up @@ -60,9 +61,15 @@ func TestShouldUpgradeAllToLatestBasedOnUpgradeStrategy(t *testing.T) {
err = k8sClient.Get(context.Background(), nsn, persisted)
require.NoError(t, err)
require.Equal(t, beginV, persisted.Status.Version)
up := &upgrade.VersionUpgrade{
Log: logger,
Version: currentV,
Client: k8sClient,
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
}

// test
err = upgrade.ManagedInstances(context.Background(), logger, currentV, k8sClient)
err = up.ManagedInstances(context.Background())
assert.NoError(t, err)

// verify
Expand All @@ -84,9 +91,14 @@ func TestUpgradeUpToLatestKnownVersion(t *testing.T) {

currentV := version.Get()
currentV.OpenTelemetryCollector = "0.10.0" // we don't have a 0.10.0 upgrade, but we have a 0.9.0

up := &upgrade.VersionUpgrade{
Log: logger,
Version: currentV,
Client: k8sClient,
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
}
// test
res, err := upgrade.ManagedInstance(context.Background(), logger, currentV, k8sClient, existing)
res, err := up.ManagedInstance(context.Background(), existing)

// verify
assert.NoError(t, err)
Expand All @@ -113,8 +125,15 @@ func TestVersionsShouldNotBeChanged(t *testing.T) {
currentV := version.Get()
currentV.OpenTelemetryCollector = upgrade.Latest.String()

up := &upgrade.VersionUpgrade{
Log: logger,
Version: currentV,
Client: k8sClient,
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
}

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, currentV, k8sClient, existing)
res, err := up.ManagedInstance(context.Background(), existing)
if tt.failureExpected {
assert.Error(t, err)
} else {
Expand Down
10 changes: 7 additions & 3 deletions pkg/collector/upgrade/v0_15_0.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
package upgrade

import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"

corev1 "k8s.io/api/core/v1"
)

func upgrade0_15_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
func upgrade0_15_0(u VersionUpgrade, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
delete(otelcol.Spec.Args, "--new-metrics")
delete(otelcol.Spec.Args, "--legacy-metrics")
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
u.Recorder.Event(updated, "Normal", "Upgrade", "upgrade to v0.15.0 dropped the deprecated metrics arguments")

return otelcol, nil
}
9 changes: 8 additions & 1 deletion pkg/collector/upgrade/v0_15_0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/internal/version"
Expand Down Expand Up @@ -54,7 +55,13 @@ func TestRemoveMetricsTypeFlags(t *testing.T) {
require.Contains(t, existing.Spec.Args, "--legacy-metrics")

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
up := &upgrade.VersionUpgrade{
Log: logger,
Version: version.Get(),
Client: nil,
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
}
res, err := up.ManagedInstance(context.Background(), existing)
assert.NoError(t, err)

// verify
Expand Down
17 changes: 12 additions & 5 deletions pkg/collector/upgrade/v0_19_0.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
"strings"

"gopkg.in/yaml.v2"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"

corev1 "k8s.io/api/core/v1"
)

func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
func upgrade0_19_0(u VersionUpgrade, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
if len(otelcol.Spec.Config) == 0 {
return otelcol, nil
}
Expand All @@ -47,7 +48,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (
// Remove deprecated queued_retry processor
if strings.HasPrefix(k.(string), "queued_retry") {
delete(processors, k)
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 removed the processor %q", k))
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
u.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 removed the processor %q", k))
continue
}

Expand All @@ -71,7 +74,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (

processor["attributes"] = attributes
delete(processor, "type")
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 migrated the property 'type' for processor %q", k))
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
u.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 migrated the property 'type' for processor %q", k))
}

// handle labels
Expand All @@ -95,7 +100,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (

processor["attributes"] = attributes
delete(processor, "labels")
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 migrated the property 'labels' for processor %q", k))
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
u.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 migrated the property 'labels' for processor %q", k))
}

processors[k] = processor
Expand Down
28 changes: 22 additions & 6 deletions pkg/collector/upgrade/v0_19_0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/internal/version"
Expand Down Expand Up @@ -58,15 +59,20 @@ func TestRemoveQueuedRetryProcessor(t *testing.T) {
require.Contains(t, existing.Spec.Config, "num_workers: 123") // checking one property is sufficient

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
up := &upgrade.VersionUpgrade{
Log: logger,
Version: version.Get(),
Client: nil,
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
}
res, err := up.ManagedInstance(context.Background(), existing)
assert.NoError(t, err)

// verify
assert.NotContains(t, res.Spec.Config, "queued_retry:")
assert.Contains(t, res.Spec.Config, "otherprocessor:")
assert.NotContains(t, res.Spec.Config, "queued_retry/second:")
assert.NotContains(t, res.Spec.Config, "num_workers: 123") // checking one property is sufficient
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 removed the processor")
}

func TestMigrateResourceType(t *testing.T) {
Expand All @@ -90,7 +96,13 @@ func TestMigrateResourceType(t *testing.T) {
existing.Status.Version = "0.18.0"

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
up := &upgrade.VersionUpgrade{
Log: logger,
Version: version.Get(),
Client: nil,
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
}
res, err := up.ManagedInstance(context.Background(), existing)
assert.NoError(t, err)

// verify
Expand All @@ -101,7 +113,6 @@ func TestMigrateResourceType(t *testing.T) {
key: opencensus.type
value: some-type
`, res.Spec.Config)
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'type' for processor")
}

func TestMigrateLabels(t *testing.T) {
Expand All @@ -127,7 +138,13 @@ func TestMigrateLabels(t *testing.T) {
existing.Status.Version = "0.18.0"

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
up := &upgrade.VersionUpgrade{
Log: logger,
Version: version.Get(),
Client: nil,
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
}
res, err := up.ManagedInstance(context.Background(), existing)
assert.NoError(t, err)

actual, err := adapters.ConfigFromString(res.Spec.Config)
Expand All @@ -139,5 +156,4 @@ func TestMigrateLabels(t *testing.T) {
// verify
assert.Len(t, actualAttrs, 2)
assert.Nil(t, actualProcessor["labels"])
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'labels' for processor")
}
Loading

0 comments on commit cbdb6ee

Please sign in to comment.