diff --git a/README.md b/README.md
index 9c4b415aa5..b82ce57263 100644
--- a/README.md
+++ b/README.md
@@ -57,6 +57,25 @@ The `config` node holds the `YAML` that should be passed down as-is to the under
At this point, the Operator does *not* validate the contents of the configuration file: if the configuration is invalid, the instance will still be created but the underlying OpenTelemetry Collector might crash.
+
+### Upgrades
+
+As noted above, the OpenTelemetry Collector format is continuing to evolve. However, a best-effort attempt is made to upgrade all managed `OpenTelemetryCollector` resources.
+
+In certain scenarios, it may be desirable to prevent the operator from upgrading certain `OpenTelemetryCollector` resources. For example, when a resource is configured with a custom `.Spec.Image`, end users may wish to manage configuration themselves as opposed to having the operator upgrade it. This can be configured on a resource by resource basis with the exposed property `.Spec.UpgradeStrategy`.
+
+By configuring a resource's `.Spec.UpgradeStrategy` to `none`, the operator will skip the given instance during the upgrade routine.
+
+The default and only other acceptable value for `.Spec.UpgradeStrategy` is `automatic`.
+
+
+### `opentelemetry-operator` vs `OpenTelemetryCollector` Versioning
+
+By default, the operator ensures consistent versioning between itself and the managed `OpenTelemetryCollector` resources. That is, if the operator is based on version `0.40.0`, it will create resources with an underlying core `opentelemetry-collector` at version `0.40.0`.
+
+When a custom `Spec.Image` is used, the operator will not manage this versioning and upgrading. In this scenario, it is best practice that the operator version should match the underlying core version. Given a `OpenTelemetryCollector` resource with a `Spec.Image` configured to a custom image based on underlying core `opentelemetry-collector` at version `0.40.0`, it is recommended that the operator is kept at version `0.40.0`.
+
+
### Deployment modes
The `CustomResource` for the `OpenTelemetryCollector` exposes a property named `.Spec.Mode`, which can be used to specify whether the collector should run as a `DaemonSet`, `Sidecar`, or `Deployment` (default). Look at [this sample](https://github.com/open-telemetry/opentelemetry-operator/blob/main/tests/e2e/daemonset-features/00-install.yaml) for reference.
diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go
index 04f35c07d8..170e2463c2 100644
--- a/apis/v1alpha1/opentelemetrycollector_types.go
+++ b/apis/v1alpha1/opentelemetrycollector_types.go
@@ -25,6 +25,10 @@ type OpenTelemetryCollectorSpec struct {
// +required
Config string `json:"config,omitempty"`
+ // UpgradeStrategy represents how the operator will handle upgrades to the CR when a newer version of the operator is deployed
+ // +optional
+ UpgradeStrategy UpgradeStrategy `json:"upgradeStrategy"`
+
// Args is the set of arguments to pass to the OpenTelemetry Collector binary
// +optional
Args map[string]string `json:"args,omitempty"`
diff --git a/apis/v1alpha1/opentelemetrycollector_webhook.go b/apis/v1alpha1/opentelemetrycollector_webhook.go
index 77499d11da..e6296121f0 100644
--- a/apis/v1alpha1/opentelemetrycollector_webhook.go
+++ b/apis/v1alpha1/opentelemetrycollector_webhook.go
@@ -43,6 +43,9 @@ func (r *OpenTelemetryCollector) Default() {
if len(r.Spec.Mode) == 0 {
r.Spec.Mode = ModeDeployment
}
+ if len(r.Spec.UpgradeStrategy) == 0 {
+ r.Spec.UpgradeStrategy = UpgradeStrategyAutomatic
+ }
if r.Labels == nil {
r.Labels = map[string]string{}
diff --git a/apis/v1alpha1/upgrade_strategy.go b/apis/v1alpha1/upgrade_strategy.go
new file mode 100644
index 0000000000..95e16f1210
--- /dev/null
+++ b/apis/v1alpha1/upgrade_strategy.go
@@ -0,0 +1,29 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v1alpha1
+
+type (
+ // UpgradeStrategy represents how the operator will handle upgrades to the CR when a newer version of the operator is deployed
+ // +kubebuilder:validation:Enum=automatic;none
+ UpgradeStrategy string
+)
+
+const (
+ // UpgradeStrategyAutomatic specifies that the operator will automatically apply upgrades to the CR.
+ UpgradeStrategyAutomatic UpgradeStrategy = "automatic"
+
+ // UpgradeStrategyNone specifies that the operator will not apply any upgrades to the CR.
+ UpgradeStrategyNone UpgradeStrategy = "none"
+)
diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
index e475e4928a..edd8d3ad28 100644
--- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
+++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
@@ -689,6 +689,13 @@ spec:
type: string
type: object
type: array
+ upgradeStrategy:
+ description: UpgradeStrategy represents how the operator will handle
+ upgrades to the CR when a newer version of the operator is deployed
+ enum:
+ - automatic
+ - none
+ type: string
volumeClaimTemplates:
description: VolumeClaimTemplates will provide stable storage using
PersistentVolumes. Only available when the mode=statefulset.
diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml
index ce55cbdc73..e5c7760fb9 100644
--- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml
+++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml
@@ -690,6 +690,13 @@ spec:
type: string
type: object
type: array
+ upgradeStrategy:
+ description: UpgradeStrategy represents how the operator will handle
+ upgrades to the CR when a newer version of the operator is deployed
+ enum:
+ - automatic
+ - none
+ type: string
volumeClaimTemplates:
description: VolumeClaimTemplates will provide stable storage using
PersistentVolumes. Only available when the mode=statefulset.
diff --git a/controllers/suite_test.go b/controllers/suite_test.go
index a07dc75b9d..1068a79be3 100644
--- a/controllers/suite_test.go
+++ b/controllers/suite_test.go
@@ -15,13 +15,21 @@
package controllers_test
import (
+ "context"
+ "crypto/tls"
"fmt"
+ "net"
"os"
"path/filepath"
+ "sync"
"testing"
+ "time"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/util/retry"
+ ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -29,15 +37,24 @@ import (
// +kubebuilder:scaffold:imports
)
-var k8sClient client.Client
-var testEnv *envtest.Environment
-var testScheme *runtime.Scheme = scheme.Scheme
+var (
+ k8sClient client.Client
+ testEnv *envtest.Environment
+ testScheme *runtime.Scheme = scheme.Scheme
+ ctx context.Context
+ cancel context.CancelFunc
+)
func TestMain(m *testing.M) {
+ ctx, cancel = context.WithCancel(context.TODO())
+ defer cancel()
+
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
+ WebhookInstallOptions: envtest.WebhookInstallOptions{
+ Paths: []string{filepath.Join("..", "config", "webhook")},
+ },
}
-
cfg, err := testEnv.Start()
if err != nil {
fmt.Printf("failed to start testEnv: %v", err)
@@ -56,6 +73,65 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
+ // start webhook server using Manager
+ webhookInstallOptions := &testEnv.WebhookInstallOptions
+ mgr, err := ctrl.NewManager(cfg, ctrl.Options{
+ Scheme: testScheme,
+ Host: webhookInstallOptions.LocalServingHost,
+ Port: webhookInstallOptions.LocalServingPort,
+ CertDir: webhookInstallOptions.LocalServingCertDir,
+ LeaderElection: false,
+ MetricsBindAddress: "0",
+ })
+ if err != nil {
+ fmt.Printf("failed to start webhook server: %v", err)
+ os.Exit(1)
+ }
+
+ if err := (&v1alpha1.OpenTelemetryCollector{}).SetupWebhookWithManager(mgr); err != nil {
+ fmt.Printf("failed to SetupWebhookWithManager: %v", err)
+ os.Exit(1)
+ }
+
+ ctx, cancel = context.WithCancel(context.TODO())
+ defer cancel()
+ go func() {
+ if err = mgr.Start(ctx); err != nil {
+ fmt.Printf("failed to start manager: %v", err)
+ os.Exit(1)
+ }
+ }()
+
+ // wait for the webhook server to get ready
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ dialer := &net.Dialer{Timeout: time.Second}
+ addrPort := fmt.Sprintf("%s:%d", webhookInstallOptions.LocalServingHost, webhookInstallOptions.LocalServingPort)
+ go func(wg *sync.WaitGroup) {
+ defer wg.Done()
+ if err = retry.OnError(wait.Backoff{
+ Steps: 20,
+ Duration: 10 * time.Millisecond,
+ Factor: 1.5,
+ Jitter: 0.1,
+ Cap: time.Second * 30,
+ }, func(error) bool {
+ return true
+ }, func() error {
+ // #nosec G402
+ conn, err := tls.DialWithDialer(dialer, "tcp", addrPort, &tls.Config{InsecureSkipVerify: true})
+ if err != nil {
+ return err
+ }
+ _ = conn.Close()
+ return nil
+ }); err != nil {
+ fmt.Printf("failed to wait for webhook server to be ready: %v", err)
+ os.Exit(1)
+ }
+ }(wg)
+ wg.Wait()
+
code := m.Run()
err = testEnv.Stop()
diff --git a/docs/api.md b/docs/api.md
index 84abec6d1a..0e756e3f9a 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -508,6 +508,15 @@ OpenTelemetryCollectorSpec defines the desired state of OpenTelemetryCollector.
Toleration to schedule OpenTelemetry Collector pods. This is only relevant to daemonsets, statefulsets and deployments
false |
+
+ upgradeStrategy |
+ enum |
+
+ UpgradeStrategy represents how the operator will handle upgrades to the CR when a newer version of the operator is deployed
+
+ Enum: automatic, none
+ |
+ false |
volumeClaimTemplates |
[]object |
diff --git a/internal/webhookhandler/webhookhandler_suite_test.go b/internal/webhookhandler/webhookhandler_suite_test.go
index a6fffe952d..4c0b1c6814 100644
--- a/internal/webhookhandler/webhookhandler_suite_test.go
+++ b/internal/webhookhandler/webhookhandler_suite_test.go
@@ -15,13 +15,21 @@
package webhookhandler_test
import (
+ "context"
+ "crypto/tls"
"fmt"
+ "net"
"os"
"path/filepath"
+ "sync"
"testing"
+ "time"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/util/retry"
+ ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -29,15 +37,24 @@ import (
// +kubebuilder:scaffold:imports
)
-var k8sClient client.Client
-var testEnv *envtest.Environment
-var testScheme *runtime.Scheme = scheme.Scheme
+var (
+ k8sClient client.Client
+ testEnv *envtest.Environment
+ testScheme *runtime.Scheme = scheme.Scheme
+ ctx context.Context
+ cancel context.CancelFunc
+)
func TestMain(m *testing.M) {
+ ctx, cancel = context.WithCancel(context.TODO())
+ defer cancel()
+
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
+ WebhookInstallOptions: envtest.WebhookInstallOptions{
+ Paths: []string{filepath.Join("..", "..", "config", "webhook")},
+ },
}
-
cfg, err := testEnv.Start()
if err != nil {
fmt.Printf("failed to start testEnv: %v", err)
@@ -56,6 +73,65 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
+ // start webhook server using Manager
+ webhookInstallOptions := &testEnv.WebhookInstallOptions
+ mgr, err := ctrl.NewManager(cfg, ctrl.Options{
+ Scheme: testScheme,
+ Host: webhookInstallOptions.LocalServingHost,
+ Port: webhookInstallOptions.LocalServingPort,
+ CertDir: webhookInstallOptions.LocalServingCertDir,
+ LeaderElection: false,
+ MetricsBindAddress: "0",
+ })
+ if err != nil {
+ fmt.Printf("failed to start webhook server: %v", err)
+ os.Exit(1)
+ }
+
+ if err := (&v1alpha1.OpenTelemetryCollector{}).SetupWebhookWithManager(mgr); err != nil {
+ fmt.Printf("failed to SetupWebhookWithManager: %v", err)
+ os.Exit(1)
+ }
+
+ ctx, cancel = context.WithCancel(context.TODO())
+ defer cancel()
+ go func() {
+ if err = mgr.Start(ctx); err != nil {
+ fmt.Printf("failed to start manager: %v", err)
+ os.Exit(1)
+ }
+ }()
+
+ // wait for the webhook server to get ready
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ dialer := &net.Dialer{Timeout: time.Second}
+ addrPort := fmt.Sprintf("%s:%d", webhookInstallOptions.LocalServingHost, webhookInstallOptions.LocalServingPort)
+ go func(wg *sync.WaitGroup) {
+ defer wg.Done()
+ if err = retry.OnError(wait.Backoff{
+ Steps: 20,
+ Duration: 10 * time.Millisecond,
+ Factor: 1.5,
+ Jitter: 0.1,
+ Cap: time.Second * 30,
+ }, func(error) bool {
+ return true
+ }, func() error {
+ // #nosec G402
+ conn, err := tls.DialWithDialer(dialer, "tcp", addrPort, &tls.Config{InsecureSkipVerify: true})
+ if err != nil {
+ return err
+ }
+ _ = conn.Close()
+ return nil
+ }); err != nil {
+ fmt.Printf("failed to wait for webhook server to be ready: %v", err)
+ os.Exit(1)
+ }
+ }(wg)
+ wg.Wait()
+
code := m.Run()
err = testEnv.Stop()
diff --git a/pkg/collector/reconcile/suite_test.go b/pkg/collector/reconcile/suite_test.go
index 623f7dd534..9bd26991f0 100644
--- a/pkg/collector/reconcile/suite_test.go
+++ b/pkg/collector/reconcile/suite_test.go
@@ -16,11 +16,15 @@ package reconcile
import (
"context"
+ "crypto/tls"
"fmt"
"io/ioutil"
+ "net"
"os"
"path/filepath"
+ "sync"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
@@ -30,8 +34,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
+ "k8s.io/client-go/util/retry"
+ ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
@@ -40,12 +47,17 @@ import (
"github.com/open-telemetry/opentelemetry-operator/internal/config"
)
-var k8sClient client.Client
-var testEnv *envtest.Environment
-var testScheme *runtime.Scheme = scheme.Scheme
-var logger = logf.Log.WithName("unit-tests")
+var (
+ k8sClient client.Client
+ testEnv *envtest.Environment
+ testScheme *runtime.Scheme = scheme.Scheme
+ ctx context.Context
+ cancel context.CancelFunc
-var instanceUID = uuid.NewUUID()
+ logger = logf.Log.WithName("unit-tests")
+
+ instanceUID = uuid.NewUUID()
+)
const (
defaultCollectorImage = "default-collector"
@@ -53,10 +65,15 @@ const (
)
func TestMain(m *testing.M) {
+ ctx, cancel = context.WithCancel(context.TODO())
+ defer cancel()
+
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")},
+ WebhookInstallOptions: envtest.WebhookInstallOptions{
+ Paths: []string{filepath.Join("..", "..", "..", "config", "webhook")},
+ },
}
-
cfg, err := testEnv.Start()
if err != nil {
fmt.Printf("failed to start testEnv: %v", err)
@@ -75,6 +92,65 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
+ // start webhook server using Manager
+ webhookInstallOptions := &testEnv.WebhookInstallOptions
+ mgr, err := ctrl.NewManager(cfg, ctrl.Options{
+ Scheme: testScheme,
+ Host: webhookInstallOptions.LocalServingHost,
+ Port: webhookInstallOptions.LocalServingPort,
+ CertDir: webhookInstallOptions.LocalServingCertDir,
+ LeaderElection: false,
+ MetricsBindAddress: "0",
+ })
+ if err != nil {
+ fmt.Printf("failed to start webhook server: %v", err)
+ os.Exit(1)
+ }
+
+ if err := (&v1alpha1.OpenTelemetryCollector{}).SetupWebhookWithManager(mgr); err != nil {
+ fmt.Printf("failed to SetupWebhookWithManager: %v", err)
+ os.Exit(1)
+ }
+
+ ctx, cancel = context.WithCancel(context.TODO())
+ defer cancel()
+ go func() {
+ if err = mgr.Start(ctx); err != nil {
+ fmt.Printf("failed to start manager: %v", err)
+ os.Exit(1)
+ }
+ }()
+
+ // wait for the webhook server to get ready
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ dialer := &net.Dialer{Timeout: time.Second}
+ addrPort := fmt.Sprintf("%s:%d", webhookInstallOptions.LocalServingHost, webhookInstallOptions.LocalServingPort)
+ go func(wg *sync.WaitGroup) {
+ defer wg.Done()
+ if err = retry.OnError(wait.Backoff{
+ Steps: 20,
+ Duration: 10 * time.Millisecond,
+ Factor: 1.5,
+ Jitter: 0.1,
+ Cap: time.Second * 30,
+ }, func(error) bool {
+ return true
+ }, func() error {
+ // #nosec G402
+ conn, err := tls.DialWithDialer(dialer, "tcp", addrPort, &tls.Config{InsecureSkipVerify: true})
+ if err != nil {
+ return err
+ }
+ _ = conn.Close()
+ return nil
+ }); err != nil {
+ fmt.Printf("failed to wait for webhook server to be ready: %v", err)
+ os.Exit(1)
+ }
+ }(wg)
+ wg.Wait()
+
code := m.Run()
err = testEnv.Stop()
diff --git a/pkg/collector/upgrade/suite_test.go b/pkg/collector/upgrade/suite_test.go
index b7fdb25b35..84b3e43e7c 100644
--- a/pkg/collector/upgrade/suite_test.go
+++ b/pkg/collector/upgrade/suite_test.go
@@ -15,13 +15,21 @@
package upgrade_test
import (
+ "context"
+ "crypto/tls"
"fmt"
+ "net"
"os"
"path/filepath"
+ "sync"
"testing"
+ "time"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/util/retry"
+ ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -29,13 +37,23 @@ import (
// +kubebuilder:scaffold:imports
)
-var k8sClient client.Client
-var testEnv *envtest.Environment
-var testScheme *runtime.Scheme = scheme.Scheme
+var (
+ k8sClient client.Client
+ testEnv *envtest.Environment
+ testScheme *runtime.Scheme = scheme.Scheme
+ ctx context.Context
+ cancel context.CancelFunc
+)
func TestMain(m *testing.M) {
+ ctx, cancel = context.WithCancel(context.TODO())
+ defer cancel()
+
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")},
+ WebhookInstallOptions: envtest.WebhookInstallOptions{
+ Paths: []string{filepath.Join("..", "..", "..", "config", "webhook")},
+ },
}
cfg, err := testEnv.Start()
@@ -56,6 +74,66 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
+ // start webhook server using Manager
+ webhookInstallOptions := &testEnv.WebhookInstallOptions
+ mgr, err := ctrl.NewManager(cfg, ctrl.Options{
+ Scheme: testScheme,
+ Host: webhookInstallOptions.LocalServingHost,
+ Port: webhookInstallOptions.LocalServingPort,
+ CertDir: webhookInstallOptions.LocalServingCertDir,
+ LeaderElection: false,
+ MetricsBindAddress: "0",
+ })
+ if err != nil {
+ fmt.Printf("failed to start webhook server: %v", err)
+ os.Exit(1)
+ }
+
+ if err := (&v1alpha1.OpenTelemetryCollector{}).SetupWebhookWithManager(mgr); err != nil {
+ fmt.Printf("failed to SetupWebhookWithManager: %v", err)
+ os.Exit(1)
+ }
+
+ //+kubebuilder:scaffold:webhook
+
+ go func() {
+ if err = mgr.Start(ctx); err != nil {
+ fmt.Printf("failed to start manager: %v", err)
+ os.Exit(1)
+ }
+ }()
+
+ // wait for the webhook server to get ready
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ dialer := &net.Dialer{Timeout: time.Second}
+ addrPort := fmt.Sprintf("%s:%d", webhookInstallOptions.LocalServingHost, webhookInstallOptions.LocalServingPort)
+ go func(wg *sync.WaitGroup) {
+ defer wg.Done()
+ if err = retry.OnError(wait.Backoff{
+ Steps: 20,
+ Duration: 10 * time.Millisecond,
+ Factor: 1.5,
+ Jitter: 0.1,
+ Cap: time.Second * 30,
+ }, func(error) bool {
+ return true
+ }, func() error {
+ // #nosec G402
+ conn, err := tls.DialWithDialer(dialer, "tcp", addrPort, &tls.Config{InsecureSkipVerify: true})
+ if err != nil {
+ return err
+ }
+ _ = conn.Close()
+ return nil
+ }); err != nil {
+ fmt.Printf("failed to wait for webhook server to be ready: %v", err)
+ os.Exit(1)
+ }
+
+ }(wg)
+ wg.Wait()
+
code := m.Run()
err = testEnv.Stop()
diff --git a/pkg/collector/upgrade/upgrade.go b/pkg/collector/upgrade/upgrade.go
index a48cf29cd6..efb00c023f 100644
--- a/pkg/collector/upgrade/upgrade.go
+++ b/pkg/collector/upgrade/upgrade.go
@@ -44,6 +44,11 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
for i := range list.Items {
original := list.Items[i]
+ itemLogger := logger.WithValues("name", original.Name, "namespace", original.Namespace)
+ if original.Spec.UpgradeStrategy == v1alpha1.UpgradeStrategyNone {
+ itemLogger.Info("skipping instance upgrade due to UpgradeStrategy")
+ continue
+ }
upgraded, err := ManagedInstance(ctx, logger, ver, cl, original)
if err != nil {
// nothing to do at this level, just go to the next instance
@@ -55,18 +60,18 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
st := upgraded.Status
patch := client.MergeFrom(&original)
if err := cl.Patch(ctx, &upgraded, patch); err != nil {
- logger.Error(err, "failed to apply changes to instance", "name", upgraded.Name, "namespace", upgraded.Namespace)
+ itemLogger.Error(err, "failed to apply changes to instance")
continue
}
// the status object requires its own update
upgraded.Status = st
if err := cl.Status().Patch(ctx, &upgraded, patch); err != nil {
- logger.Error(err, "failed to apply changes to instance's status object", "name", upgraded.Name, "namespace", upgraded.Namespace)
+ itemLogger.Error(err, "failed to apply changes to instance's status object")
continue
}
- logger.Info("instance upgraded", "name", upgraded.Name, "namespace", upgraded.Namespace, "version", upgraded.Status.Version)
+ itemLogger.Info("instance upgraded", "version", upgraded.Status.Version)
}
}
diff --git a/pkg/collector/upgrade/upgrade_test.go b/pkg/collector/upgrade/upgrade_test.go
index ddf4fc2d4d..e99fd6f31b 100644
--- a/pkg/collector/upgrade/upgrade_test.go
+++ b/pkg/collector/upgrade/upgrade_test.go
@@ -31,59 +31,55 @@ import (
var logger = logf.Log.WithName("unit-tests")
-func TestShouldUpgradeAllToLatest(t *testing.T) {
- // prepare
- nsn := types.NamespacedName{Name: "my-instance", Namespace: "default"}
- existing := v1alpha1.OpenTelemetryCollector{
- ObjectMeta: metav1.ObjectMeta{
- Name: nsn.Name,
- Namespace: nsn.Namespace,
- Labels: map[string]string{
- "app.kubernetes.io/managed-by": "opentelemetry-operator",
- },
- },
- }
- existing.Status.Version = "0.0.1" // this is the first version we have an upgrade function
- err := k8sClient.Create(context.Background(), &existing)
- require.NoError(t, err)
-
- err = k8sClient.Status().Update(context.Background(), &existing)
- require.NoError(t, err)
+func TestShouldUpgradeAllToLatestBasedOnUpgradeStrategy(t *testing.T) {
+ const beginV = "0.0.1" // this is the first version we have an upgrade function
currentV := version.Get()
currentV.OpenTelemetryCollector = upgrade.Latest.String()
- // sanity check
- persisted := &v1alpha1.OpenTelemetryCollector{}
- err = k8sClient.Get(context.Background(), nsn, persisted)
- require.NoError(t, err)
- require.Equal(t, "0.0.1", persisted.Status.Version)
+ for _, tt := range []struct {
+ strategy v1alpha1.UpgradeStrategy
+ expectedV string
+ }{
+ {v1alpha1.UpgradeStrategyAutomatic, upgrade.Latest.String()},
+ {v1alpha1.UpgradeStrategyNone, beginV},
+ } {
+ t.Run("spec.UpgradeStrategy = "+string(tt.strategy), func(t *testing.T) {
+ // prepare
+ nsn := types.NamespacedName{Name: "my-instance", Namespace: "default"}
+ existing := makeOtelcol(nsn)
+ existing.Status.Version = beginV
+ err := k8sClient.Create(context.Background(), &existing)
+ require.NoError(t, err)
- // test
- err = upgrade.ManagedInstances(context.Background(), logger, currentV, k8sClient)
- assert.NoError(t, err)
+ err = k8sClient.Status().Update(context.Background(), &existing)
+ require.NoError(t, err)
- // verify
- err = k8sClient.Get(context.Background(), nsn, persisted)
- assert.NoError(t, err)
- assert.Equal(t, upgrade.Latest.String(), persisted.Status.Version)
+ // sanity check
+ persisted := &v1alpha1.OpenTelemetryCollector{}
+ err = k8sClient.Get(context.Background(), nsn, persisted)
+ require.NoError(t, err)
+ require.Equal(t, beginV, persisted.Status.Version)
- // cleanup
- assert.NoError(t, k8sClient.Delete(context.Background(), &existing))
+ // test
+ err = upgrade.ManagedInstances(context.Background(), logger, currentV, k8sClient)
+ assert.NoError(t, err)
+
+ // verify
+ err = k8sClient.Get(context.Background(), nsn, persisted)
+ assert.NoError(t, err)
+ assert.Equal(t, upgrade.Latest.String(), persisted.Status.Version)
+
+ // cleanup
+ assert.NoError(t, k8sClient.Delete(context.Background(), &existing))
+ })
+ }
}
func TestUpgradeUpToLatestKnownVersion(t *testing.T) {
// prepare
nsn := types.NamespacedName{Name: "my-instance", Namespace: "default"}
- existing := v1alpha1.OpenTelemetryCollector{
- ObjectMeta: metav1.ObjectMeta{
- Name: nsn.Name,
- Namespace: nsn.Namespace,
- Labels: map[string]string{
- "app.kubernetes.io/managed-by": "opentelemetry-operator",
- },
- },
- }
+ existing := makeOtelcol(nsn)
existing.Status.Version = "0.8.0"
currentV := version.Get()
@@ -111,15 +107,7 @@ func TestVersionsShouldNotBeChanged(t *testing.T) {
t.Run(tt.desc, func(t *testing.T) {
// prepare
nsn := types.NamespacedName{Name: "my-instance", Namespace: "default"}
- existing := v1alpha1.OpenTelemetryCollector{
- ObjectMeta: metav1.ObjectMeta{
- Name: nsn.Name,
- Namespace: nsn.Namespace,
- Labels: map[string]string{
- "app.kubernetes.io/managed-by": "opentelemetry-operator",
- },
- },
- }
+ existing := makeOtelcol(nsn)
existing.Status.Version = tt.v
currentV := version.Get()
@@ -138,3 +126,15 @@ func TestVersionsShouldNotBeChanged(t *testing.T) {
})
}
}
+
+func makeOtelcol(nsn types.NamespacedName) v1alpha1.OpenTelemetryCollector {
+ return v1alpha1.OpenTelemetryCollector{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: nsn.Name,
+ Namespace: nsn.Namespace,
+ Labels: map[string]string{
+ "app.kubernetes.io/managed-by": "opentelemetry-operator",
+ },
+ },
+ }
+}