diff --git a/internal/pkg/agent/application/upgrade/marker_access_common.go b/internal/pkg/agent/application/upgrade/marker_access_common.go new file mode 100644 index 00000000000..fc069466c2c --- /dev/null +++ b/internal/pkg/agent/application/upgrade/marker_access_common.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package upgrade + +import ( + "fmt" + "os" +) + +func writeMarkerFileCommon(markerFile string, markerBytes []byte, shouldFsync bool) error { + f, err := os.OpenFile(markerFile, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return fmt.Errorf("failed to open upgrade marker file for writing: %w", err) + } + defer f.Close() + + if _, err := f.Write(markerBytes); err != nil { + return fmt.Errorf("failed to write upgrade marker file: %w", err) + } + + if !shouldFsync { + return nil + } + + if err := f.Sync(); err != nil { + return fmt.Errorf("failed to sync upgrade marker file to disk: %w", err) + } + + return nil +} diff --git a/internal/pkg/agent/application/upgrade/marker_access_other.go b/internal/pkg/agent/application/upgrade/marker_access_other.go index ed854160e94..fbcfeae5726 100644 --- a/internal/pkg/agent/application/upgrade/marker_access_other.go +++ b/internal/pkg/agent/application/upgrade/marker_access_other.go @@ -25,6 +25,6 @@ func readMarkerFile(markerFile string) ([]byte, error) { // On non-Windows platforms, writeMarkerFile simply writes the marker file. // See marker_access_windows.go for behavior on Windows platforms. -func writeMarkerFile(markerFile string, markerBytes []byte) error { - return os.WriteFile(markerFilePath(), markerBytes, 0600) +func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool) error { + return writeMarkerFileCommon(markerFile, markerBytes, shouldFsync) } diff --git a/internal/pkg/agent/application/upgrade/marker_access_test.go b/internal/pkg/agent/application/upgrade/marker_access_test.go new file mode 100644 index 00000000000..3f1ff637eaa --- /dev/null +++ b/internal/pkg/agent/application/upgrade/marker_access_test.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package upgrade + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWriteMarkerFile(t *testing.T) { + tmpDir := t.TempDir() + markerFile := filepath.Join(tmpDir, markerFilename) + + markerBytes := []byte("foo bar") + err := writeMarkerFile(markerFile, markerBytes, true) + require.NoError(t, err) + + data, err := os.ReadFile(markerFile) + require.NoError(t, err) + require.Equal(t, markerBytes, data) +} diff --git a/internal/pkg/agent/application/upgrade/marker_access_windows.go b/internal/pkg/agent/application/upgrade/marker_access_windows.go index 673a57eeabf..cb37f9c0e88 100644 --- a/internal/pkg/agent/application/upgrade/marker_access_windows.go +++ b/internal/pkg/agent/application/upgrade/marker_access_windows.go @@ -49,9 +49,9 @@ func readMarkerFile(markerFile string) ([]byte, error) { // mechanism is necessary since the marker file could be accessed by multiple // processes (the Upgrade Watcher and the main Agent process) at the same time, // which could fail on Windows. -func writeMarkerFile(markerFile string, markerBytes []byte) error { +func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool) error { writeFn := func() error { - return os.WriteFile(markerFile, markerBytes, 0600) + return writeMarkerFileCommon(markerFile, markerBytes, shouldFsync) } if err := accessMarkerFileWithRetries(writeFn); err != nil { diff --git a/internal/pkg/agent/application/upgrade/step_mark.go b/internal/pkg/agent/application/upgrade/step_mark.go index b5743582317..bca67d307f0 100644 --- a/internal/pkg/agent/application/upgrade/step_mark.go +++ b/internal/pkg/agent/application/upgrade/step_mark.go @@ -194,7 +194,10 @@ func loadMarker(markerFile string) (*UpdateMarker, error) { }, nil } -func SaveMarker(marker *UpdateMarker) error { +// SaveMarker serializes and persists the given upgrade marker to disk. +// For critical upgrade transitions, pass shouldFsync as true so the marker +// file is immediately flushed to persistent storage. +func SaveMarker(marker *UpdateMarker, shouldFsync bool) error { makerSerializer := &updateMarkerSerializer{ Hash: marker.Hash, UpdatedOn: marker.UpdatedOn, @@ -209,7 +212,7 @@ func SaveMarker(marker *UpdateMarker) error { return err } - return writeMarkerFile(markerFilePath(), markerBytes) + return writeMarkerFile(markerFilePath(), markerBytes, shouldFsync) } func markerFilePath() string { diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index 0873a0d4d69..b299dd3fd75 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -253,7 +253,7 @@ func (u *Upgrader) Ack(ctx context.Context, acker acker.Acker) error { marker.Acked = true - return SaveMarker(marker) + return SaveMarker(marker, false) } func (u *Upgrader) MarkerWatcher() MarkerWatcher { diff --git a/internal/pkg/agent/cmd/watch.go b/internal/pkg/agent/cmd/watch.go index b3d5727175c..a895e10df5a 100644 --- a/internal/pkg/agent/cmd/watch.go +++ b/internal/pkg/agent/cmd/watch.go @@ -126,7 +126,7 @@ func watchCmd(log *logp.Logger, cfg *configuration.Configuration) error { } marker.Details.SetState(details.StateRollback) - err = upgrade.SaveMarker(marker) + err = upgrade.SaveMarker(marker, true) if err != nil { log.Errorf("unable to save upgrade marker before attempting to rollback: %s", err.Error()) } @@ -136,7 +136,7 @@ func watchCmd(log *logp.Logger, cfg *configuration.Configuration) error { log.Error("rollback failed", err) marker.Details.Fail(err) - err = upgrade.SaveMarker(marker) + err = upgrade.SaveMarker(marker, true) if err != nil { log.Errorf("unable to save upgrade marker after rollback failed: %s", err.Error()) } @@ -146,7 +146,7 @@ func watchCmd(log *logp.Logger, cfg *configuration.Configuration) error { // watch succeeded - upgrade was successful! marker.Details.SetState(details.StateCompleted) - err = upgrade.SaveMarker(marker) + err = upgrade.SaveMarker(marker, false) if err != nil { log.Errorf("unable to save upgrade marker after successful watch: %s", err.Error()) }