diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 1129ef96e32..bfd14a91381 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -481,7 +481,6 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str } det := details.NewDetails(version, details.StateRequested, actionID) det.RegisterObserver(c.SetUpgradeDetails) - det.RegisterObserver(c.logUpgradeDetails) cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 22394ee52e0..0d6ba22bddd 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -120,6 +120,8 @@ func (c *Coordinator) setOverrideState(overrideState *coordinatorOverrideState) func (c *Coordinator) setUpgradeDetails(upgradeDetails *details.Details) { c.state.UpgradeDetails = upgradeDetails c.stateNeedsRefresh = true + + c.logUpgradeDetails(upgradeDetails) } // Forward the current state to the broadcaster and clear the stateNeedsRefresh diff --git a/internal/pkg/agent/application/upgrade/marker_watcher.go b/internal/pkg/agent/application/upgrade/marker_watcher.go index 2d16d5a8ee0..69eaa5b2245 100644 --- a/internal/pkg/agent/application/upgrade/marker_watcher.go +++ b/internal/pkg/agent/application/upgrade/marker_watcher.go @@ -29,6 +29,7 @@ type MarkerFileWatcher struct { updateCh chan UpdateMarker upgradeStarted atomic.Bool + lastMarker *UpdateMarker } func newMarkerFileWatcher(upgradeMarkerFilePath string, logger *logger.Logger) MarkerWatcher { @@ -100,6 +101,16 @@ func (mfw *MarkerFileWatcher) Run(ctx context.Context) error { // Upgrade marker file was created or updated; read its contents // and send them over the update channel. mfw.processMarker(version.GetAgentPackageVersion(), version.Commit()) + case e.Op&(fsnotify.Remove) != 0: + // Upgrade marker file was removed. + // - Upgrade could've been rolled back + // - Upgrade could've been successful + // If last known Upgrade Details state is not `UPG_ROLLBACK`, assume + // upgrade was successful + if mfw.lastMarker != nil && mfw.lastMarker.Details != nil && mfw.lastMarker.Details.State != details.StateRollback { + mfw.lastMarker.Details = nil + mfw.updateCh <- *mfw.lastMarker + } } case <-doInitialRead: mfw.processMarker(version.GetAgentPackageVersion(), version.Commit()) @@ -129,12 +140,16 @@ func (mfw *MarkerFileWatcher) processMarker(currentVersion string, commit string // isn't for some reason, we fallback to explicitly setting that state as // part of the upgrade details in the marker. if marker.PrevVersion == currentVersion && marker.PrevHash == commit && !mfw.upgradeStarted.Load() { + // If there are no upgrade details in the marker or the state in the + // details is not set for some reason, we assume the worst and + // explicitly set the state to UPG_ROLLBACK if marker.Details == nil { marker.Details = details.NewDetails("unknown", details.StateRollback, marker.GetActionID()) - } else { + } else if marker.Details.State == "" { marker.Details.SetState(details.StateRollback) } } + mfw.lastMarker = marker mfw.updateCh <- *marker } diff --git a/internal/pkg/agent/application/upgrade/marker_watcher_test.go b/internal/pkg/agent/application/upgrade/marker_watcher_test.go index 956f87df02a..42af901687a 100644 --- a/internal/pkg/agent/application/upgrade/marker_watcher_test.go +++ b/internal/pkg/agent/application/upgrade/marker_watcher_test.go @@ -128,18 +128,18 @@ details: State: details.StateRollback, }, }, - "same_version_with_details_wrong_state": { + "same_version_with_details_some_state": { markerFileContents: ` prev_version: 8.9.2 details: target_version: 8.9.2 - state: UPG_WATCHING + state: UPG_REPLACING `, upgradeStarted: false, expectedErrLogMsg: false, expectedDetails: &details.Details{ TargetVersion: "8.9.2", - State: details.StateRollback, + State: details.StateReplacing, }, }, "different_version": { @@ -185,7 +185,7 @@ details: expectedErrLogMsg: false, expectedDetails: &details.Details{ TargetVersion: "8.9.2", - State: details.StateRollback, + State: details.StateWatching, }, }, "same_version_same_hash_no_details": { diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index b299dd3fd75..15a92f4bab7 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -198,7 +198,6 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, err } - det.SetState(details.StateWatching) if err := u.markUpgrade(ctx, u.log, newHash, action, det); err != nil { u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err) rollbackInstall(ctx, u.log, newHash) diff --git a/internal/pkg/agent/cmd/watch.go b/internal/pkg/agent/cmd/watch.go index a895e10df5a..69f0b0b033f 100644 --- a/internal/pkg/agent/cmd/watch.go +++ b/internal/pkg/agent/cmd/watch.go @@ -28,7 +28,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" - agtversion "github.com/elastic/elastic-agent/pkg/version" "github.com/elastic/elastic-agent/version" ) @@ -105,51 +104,26 @@ func watchCmd(log *logp.Logger, cfg *configuration.Configuration) error { return nil } + // About to start watching the upgrade. Initialize upgrade details and save them in the + // upgrade marker. + upgradeDetails := initUpgradeDetails(marker, upgrade.SaveMarker, log) + errorCheckInterval := cfg.Settings.Upgrade.Watcher.ErrorCheck.Interval ctx := context.Background() if err := watch(ctx, tilGrace, errorCheckInterval, log); err != nil { log.Error("Error detected, proceeding to rollback: %v", err) - // If we are upgrading from version >= 8.12.0, marker.Details should be non-nil - // because the Agent we upgraded FROM would've written upgrade details in the upgrade - // marker. However, if we're upgrading from version < 8.12.0, the marker won't - // contain upgrade details, so we populate them now. - if marker.Details == nil { - fromVersion, err := agtversion.ParseVersion(marker.PrevVersion) - if err != nil { - log.Warnf("upgrade details are nil, but unable to parse version being upgraded from [%s]: %s", marker.PrevVersion, err.Error()) - } else if fromVersion.Less(*agtversion.NewParsedSemVer(8, 12, 0, "", "")) { - log.Warnf("upgrade details are unexpectedly nil, upgrading from version [%s]", marker.PrevVersion) - } - - marker.Details = details.NewDetails(version.GetAgentPackageVersion(), details.StateRollback, marker.GetActionID()) - } - - marker.Details.SetState(details.StateRollback) - err = upgrade.SaveMarker(marker, true) - if err != nil { - log.Errorf("unable to save upgrade marker before attempting to rollback: %s", err.Error()) - } - + upgradeDetails.SetState(details.StateRollback) err = upgrade.Rollback(ctx, log, marker.PrevHash, marker.Hash) if err != nil { log.Error("rollback failed", err) - - marker.Details.Fail(err) - err = upgrade.SaveMarker(marker, true) - if err != nil { - log.Errorf("unable to save upgrade marker after rollback failed: %s", err.Error()) - } + upgradeDetails.Fail(err) } return err } // watch succeeded - upgrade was successful! - marker.Details.SetState(details.StateCompleted) - err = upgrade.SaveMarker(marker, false) - if err != nil { - log.Errorf("unable to save upgrade marker after successful watch: %s", err.Error()) - } + upgradeDetails.SetState(details.StateCompleted) // cleanup older versions, // in windows it might leave self untouched, this will get cleaned up @@ -260,3 +234,19 @@ func getConfig(streams *cli.IOStreams) *configuration.Configuration { return cfg } + +func initUpgradeDetails(marker *upgrade.UpdateMarker, saveMarker func(*upgrade.UpdateMarker, bool) error, log *logp.Logger) *details.Details { + upgradeDetails := details.NewDetails(version.GetAgentPackageVersion(), details.StateWatching, marker.GetActionID()) + upgradeDetails.RegisterObserver(func(details *details.Details) { + marker.Details = details + if err := saveMarker(marker, true); err != nil { + if details != nil { + log.Errorf("unable to save upgrade marker after setting upgrade details (state = %s): %s", details.State, err.Error()) + } else { + log.Errorf("unable to save upgrade marker after clearing upgrade details: %s", err.Error()) + } + } + }) + + return upgradeDetails +} diff --git a/internal/pkg/agent/cmd/watch_test.go b/internal/pkg/agent/cmd/watch_test.go new file mode 100644 index 00000000000..8a3494d042c --- /dev/null +++ b/internal/pkg/agent/cmd/watch_test.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "testing" + + "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + + "github.com/elastic/elastic-agent/internal/pkg/fleetapi" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent/pkg/core/logger" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" +) + +func TestInitUpgradeDetails(t *testing.T) { + testMarker := &upgrade.UpdateMarker{ + Action: &fleetapi.ActionUpgrade{ + ActionID: "foobar", + }, + } + + saveCount := 0 + mockSaveMarker := func(marker *upgrade.UpdateMarker, _ bool) error { + saveCount++ + if saveCount <= 3 { + testMarker = marker + return nil + } + return errors.New("some error") + } + + log, obs := logger.NewTesting("initUpgradeDetails") + + upgradeDetails := initUpgradeDetails(testMarker, mockSaveMarker, log) + + // Verify initial state + require.NotNil(t, testMarker.Details) + require.Equal(t, details.StateWatching, testMarker.Details.State) + require.Equal(t, 0, obs.Len()) + + // Verify state after changing details state + upgradeDetails.SetState(details.StateRollback) + require.NotNil(t, testMarker.Details) + require.Equal(t, details.StateRollback, testMarker.Details.State) + require.Equal(t, 0, obs.Len()) + + // Verify state after clearing details state + upgradeDetails.SetState(details.StateCompleted) + require.Nil(t, testMarker.Details) + require.Equal(t, 0, obs.Len()) + + // Verify state after changing details state and there's an + // error saving the marker + upgradeDetails.SetState(details.StateRollback) + require.NotNil(t, testMarker.Details) + require.Equal(t, 1, obs.Len()) + logs := obs.TakeAll() + require.Equal(t, zapcore.ErrorLevel, logs[0].Level) + require.Equal(t, `unable to save upgrade marker after setting upgrade details (state = UPG_ROLLBACK): some error`, logs[0].Message) + + // Verify state after clearing details state and there's an + // error saving the marker + upgradeDetails.SetState(details.StateCompleted) + require.Nil(t, testMarker.Details) + require.Equal(t, 1, obs.Len()) + logs = obs.TakeAll() + require.Equal(t, zapcore.ErrorLevel, logs[0].Level) + require.Equal(t, `unable to save upgrade marker after clearing upgrade details: some error`, logs[0].Message) +} diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index c117556ad37..63f4cc65588 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -19,11 +19,11 @@ import ( "testing" "time" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" - "github.com/otiai10/copy" "gopkg.in/yaml.v2" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/control" "github.com/elastic/elastic-agent/pkg/control/v2/client" @@ -572,9 +572,10 @@ func (e *ExecErr) Unwrap() error { // ExecStatus executes the status subcommand on the prepared Elastic Agent binary. // It returns the parsed output and the error from the execution. Keep in mind // the agent exits with status 1 if it's unhealthy, but it still outputs the -// status successfully. Therefore, a not empty AgentStatusOutput is valid +// status successfully. Therefore, a non-empty AgentStatusOutput is valid // regardless of the error. An empty AgentStatusOutput and non nil error -// means the output could not be parsed. +// means the output could not be parsed. Use AgentStatusOutput.IsZero() to +// determine if the returned AgentStatusOutput is empty or not. // It should work with any 8.6+ agent func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (AgentStatusOutput, error) { out, err := f.Exec(ctx, []string{"status", "--output", "json"}, opts...) @@ -1007,8 +1008,13 @@ type AgentStatusOutput struct { } `json:"meta"` } `json:"version_info,omitempty"` } `json:"components"` - FleetState int `json:"FleetState"` - FleetMessage string `json:"FleetMessage"` + FleetState int `json:"FleetState"` + FleetMessage string `json:"FleetMessage"` + UpgradeDetails *details.Details `json:"upgrade_details"` +} + +func (aso *AgentStatusOutput) IsZero() bool { + return aso.Info.ID == "" } type AgentInspectOutput struct { diff --git a/testing/integration/upgrade_downgrade_test.go b/testing/integration/upgrade_downgrade_test.go index 0f36e98a99c..a1b763dd600 100644 --- a/testing/integration/upgrade_downgrade_test.go +++ b/testing/integration/upgrade_downgrade_test.go @@ -85,6 +85,10 @@ func TestStandaloneDowngradeToSpecificSnapshotBuild(t *testing.T) { t.Logf("Testing Elastic Agent upgrade from %s to %s...", define.Version(), endParsedVersion.String()) - err = upgradetest.PerformUpgrade(ctx, startFixture, endFixture, t) + // We pass the upgradetest.WithDisableUpgradeWatcherUpgradeDetailsCheck option here because the endFixture + // is fetched from the artifacts API and it may not contain changes in the Upgrade Watcher whose effects are + // being asserted upon in upgradetest.PerformUpgrade. + // TODO: Stop passing this option and remove these comments once 8.13.0 has been released. + err = upgradetest.PerformUpgrade(ctx, startFixture, endFixture, t, upgradetest.WithDisableUpgradeWatcherUpgradeDetailsCheck()) assert.NoError(t, err) } diff --git a/testing/integration/upgrade_rollback_test.go b/testing/integration/upgrade_rollback_test.go index 3ebd5b336bd..2ce5fbb7b19 100644 --- a/testing/integration/upgrade_rollback_test.go +++ b/testing/integration/upgrade_rollback_test.go @@ -136,7 +136,7 @@ inputs: require.NoError(t, err) require.NotNil(t, state.UpgradeDetails) - require.Equal(t, details.StateRollback, state.UpgradeDetails.State) + require.Equal(t, details.StateRollback, details.State(state.UpgradeDetails.State)) } // rollback should stop the watcher @@ -268,7 +268,7 @@ func TestStandaloneUpgradeRollbackOnRestarts(t *testing.T) { require.NoError(t, err) require.NotNil(t, state.UpgradeDetails) - require.Equal(t, details.StateRollback, state.UpgradeDetails.State) + require.Equal(t, details.StateRollback, details.State(state.UpgradeDetails.State)) } // rollback should stop the watcher diff --git a/testing/upgradetest/upgrader.go b/testing/upgradetest/upgrader.go index 1a89b40a50c..281e2cbe05c 100644 --- a/testing/upgradetest/upgrader.go +++ b/testing/upgradetest/upgrader.go @@ -7,10 +7,12 @@ package upgradetest import ( "context" "encoding/json" + "errors" "fmt" "path/filepath" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" v1client "github.com/elastic/elastic-agent/pkg/control/v1/client" v2proto "github.com/elastic/elastic-agent/pkg/control/v2/cproto" atesting "github.com/elastic/elastic-agent/pkg/testing" @@ -32,6 +34,10 @@ type upgradeOpts struct { customPgp *CustomPGP customWatcherCfg string + // TODO: should be removed along with all references once 8.13.0 has been released. + // See also WithDisableUpgradeWatcherUpgradeDetailsCheck. + disableUpgradeWatcherUpgradeDetailsCheck bool + preInstallHook func() error postInstallHook func() error preUpgradeHook func() error @@ -106,6 +112,17 @@ func WithCustomWatcherConfig(cfg string) upgradeOpt { } } +// WithDisableUpgradeWatcherUpgradeDetailsCheck disables any assertions for +// upgrade details that are being set by the Upgrade Watcher. This option is +// useful in upgrade tests where the end Agent version does not contain changes +// in the Upgrade Watcher whose effects are being asserted upon in PerformUpgrade. +// TODO: should be removed along with all references once 8.13.0 has been released. +func WithDisableUpgradeWatcherUpgradeDetailsCheck() upgradeOpt { + return func(opts *upgradeOpts) { + opts.disableUpgradeWatcherUpgradeDetailsCheck = true + } +} + // PerformUpgrade performs the upgrading of the Elastic Agent. func PerformUpgrade( ctx context.Context, @@ -158,6 +175,18 @@ func PerformUpgrade( return fmt.Errorf("failed to get end agent build version info: %w", err) } + // For asserting on the effects of any Upgrade Watcher changes made in 8.13.0, we need + // the endVersion to be >= 8.13.0. Otherwise, these assertions will fail as those changes + // won't be present in the Upgrade Watcher. So we disable these assertions if the endVersion + // is < 8.13.0. + endVersion, err := version.ParseVersion(endVersionInfo.Binary.Version) + if err != nil { + return fmt.Errorf("failed to parse version of upgraded Agent binary: %w", err) + } + + upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck = upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck || + endVersion.Less(*version.NewParsedSemVer(8, 13, 0, "", "")) + if upgradeOpts.preInstallHook != nil { if err := upgradeOpts.preInstallHook(); err != nil { return fmt.Errorf("pre install hook failed: %w", err) @@ -250,6 +279,16 @@ func PerformUpgrade( } logger.Logf("upgrade watcher started") + // Check that, while the Upgrade Watcher is running, the upgrade details in Agent status + // show the state as UPG_WATCHING. + if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck { + logger.Logf("Checking upgrade details state while Upgrade Watcher is running") + if err := waitUpgradeDetailsState(ctx, startFixture, details.StateWatching, 2*time.Minute, 10*time.Second, logger); err != nil { + // error context added by waitUpgradeDetailsState + return err + } + } + if upgradeOpts.postUpgradeHook != nil { if err := upgradeOpts.postUpgradeHook(); err != nil { return fmt.Errorf("post upgrade hook failed: %w", err) @@ -281,6 +320,16 @@ func PerformUpgrade( } logger.Logf("upgrade watcher finished") + // Check that, upon successful upgrade, the upgrade details have been cleared out + // from Agent status. + if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck { + logger.Logf("Checking upgrade details state after successful upgrade") + if err := waitUpgradeDetailsState(ctx, startFixture, "", 2*time.Minute, 10*time.Second, logger); err != nil { + // error context added by checkUpgradeDetailsState + return err + } + } + // now that the watcher has stopped lets ensure that it's still the expected // version, otherwise it's possible that it was rolled back to the original version err = CheckHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary) @@ -288,6 +337,7 @@ func PerformUpgrade( // error context added by CheckHealthyAndVersion return err } + return nil } @@ -385,3 +435,68 @@ func WaitHealthyAndVersion(ctx context.Context, f *atesting.Fixture, versionInfo } } } + +func waitUpgradeDetailsState(ctx context.Context, f *atesting.Fixture, expectedState details.State, timeout time.Duration, interval time.Duration, logger Logger) error { + versionStr, err := f.ExecVersion(ctx) + if err != nil { + return fmt.Errorf("failed to get Agent version: %w", err) + } + + versionParsed, err := version.ParseVersion(versionStr.Binary.Version) + if err != nil { + return fmt.Errorf("failed to parse version [%s]: %w", versionStr.Binary.Version, err) + } + + // Upgrade details are only available in Agent version >= 8.12.0 + versionUpgradeDetailsAvailable := version.NewParsedSemVer(8, 12, 0, "", "") + if versionParsed.Less(*versionUpgradeDetailsAvailable) { + logger.Logf("upgrade details functionality not implemented in Agent version [%s]. Skipping check for upgrade details state.", versionParsed.String()) + return nil + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + t := time.NewTicker(interval) + defer t.Stop() + + var lastErr error + for { + select { + case <-ctx.Done(): + if lastErr != nil { + return fmt.Errorf("failed waiting for status: %w", errors.Join(ctx.Err(), lastErr)) + } + return ctx.Err() + case <-t.C: + status, err := f.ExecStatus(ctx) + if err != nil && status.IsZero() { + lastErr = err + continue + } + + if expectedState == "" { + if status.UpgradeDetails == nil { + // Expected and actual match, so we're good + return nil + } + + lastErr = errors.New("upgrade details found in status but they were expected to be absent") + continue + } + + if status.UpgradeDetails == nil { + lastErr = fmt.Errorf("upgrade details not found in status but expected upgrade details state was [%s]", expectedState) + continue + } + + // Neither expected nor actual are nil, so compare the two + if status.UpgradeDetails.State == expectedState { + return nil + } + + lastErr = fmt.Errorf("upgrade details state in status [%s] is not the same as expected upgrade details state [%s]", status.UpgradeDetails.State, expectedState) + continue + } + } +}