diff --git a/changelogs/unreleased/1854-prydonius b/changelogs/unreleased/1854-prydonius new file mode 100644 index 0000000000..535b433bf2 --- /dev/null +++ b/changelogs/unreleased/1854-prydonius @@ -0,0 +1 @@ +report restore progress in PodVolumeRestores and expose progress in the velero restore describe --details command diff --git a/pkg/apis/velero/v1/pod_volume_restore.go b/pkg/apis/velero/v1/pod_volume_restore.go index 98668e002f..bee99e84d4 100644 --- a/pkg/apis/velero/v1/pod_volume_restore.go +++ b/pkg/apis/velero/v1/pod_volume_restore.go @@ -66,6 +66,11 @@ type PodVolumeRestoreStatus struct { // Completion time is recorded even on failed restores. // The server's time is used for CompletionTimestamps CompletionTimestamp metav1.Time `json:"completionTimestamp"` + + // Progress holds the total number of bytes of the snapshot and the current + // number of restored bytes. This can be used to display progress information + // about the restore operation. + Progress PodVolumeOperationProgress `json:"progress"` } // +genclient diff --git a/pkg/apis/velero/v1/zz_generated.deepcopy.go b/pkg/apis/velero/v1/zz_generated.deepcopy.go index 0cc4aaa2d3..d60e159a13 100644 --- a/pkg/apis/velero/v1/zz_generated.deepcopy.go +++ b/pkg/apis/velero/v1/zz_generated.deepcopy.go @@ -835,6 +835,7 @@ func (in *PodVolumeRestoreStatus) DeepCopyInto(out *PodVolumeRestoreStatus) { *out = *in in.StartTimestamp.DeepCopyInto(&out.StartTimestamp) in.CompletionTimestamp.DeepCopyInto(&out.CompletionTimestamp) + out.Progress = in.Progress return } diff --git a/pkg/cmd/util/output/restore_describer.go b/pkg/cmd/util/output/restore_describer.go index d81ab951b4..e8e0bb268d 100644 --- a/pkg/cmd/util/output/restore_describer.go +++ b/pkg/cmd/util/output/restore_describer.go @@ -188,8 +188,7 @@ func describePodVolumeRestores(d *Describer, restores []v1.PodVolumeRestore, det restoresByPod := new(volumesByPod) for _, restore := range restoresByPhase[phase] { - // TODO(adnan): replace last parameter with progress from status (#1749) - restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume, phase, v1.PodVolumeOperationProgress{}) + restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume, phase, restore.Status.Progress) } d.Printf("\t%s:\n", phase) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index c3ff87bbb1..0b8189de52 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -42,7 +42,6 @@ import ( listers "github.com/heptio/velero/pkg/generated/listers/velero/v1" "github.com/heptio/velero/pkg/restic" "github.com/heptio/velero/pkg/util/boolptr" - veleroexec "github.com/heptio/velero/pkg/util/exec" "github.com/heptio/velero/pkg/util/filesystem" "github.com/heptio/velero/pkg/util/kube" ) @@ -339,7 +338,7 @@ func (c *podVolumeRestoreController) restorePodVolume(req *velerov1api.PodVolume var stdout, stderr string - if stdout, stderr, err = veleroexec.RunCommand(resticCmd.Cmd()); err != nil { + if stdout, stderr, err = restic.RunRestore(resticCmd, log, c.updateRestoreProgressFunc(req, log)); err != nil { return errors.Wrapf(err, "error running restic restore, cmd=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr) } log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr) @@ -416,3 +415,15 @@ func (c *podVolumeRestoreController) failRestore(req *velerov1api.PodVolumeResto } return nil } + +// updateRestoreProgressFunc returns a func that takes progress info and patches +// the PVR with the new progress +func (c *podVolumeRestoreController) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { + return func(progress velerov1api.PodVolumeOperationProgress) { + if _, err := c.patchPodVolumeRestore(req, func(r *velerov1api.PodVolumeRestore) { + r.Status.Progress = progress + }); err != nil { + log.WithError(err).Error("error updating PodVolumeRestore progress") + } + } +} diff --git a/pkg/restic/command_factory.go b/pkg/restic/command_factory.go index 6217fba67f..16625cd3f0 100644 --- a/pkg/restic/command_factory.go +++ b/pkg/restic/command_factory.go @@ -112,3 +112,13 @@ func UnlockCommand(repoIdentifier string) *Command { RepoIdentifier: repoIdentifier, } } + +func StatsCommand(repoIdentifier, passwordFile, snapshotID string) *Command { + return &Command{ + Command: "stats", + RepoIdentifier: repoIdentifier, + PasswordFile: passwordFile, + Args: []string{snapshotID}, + ExtraFlags: []string{"--json"}, + } +} diff --git a/pkg/restic/command_factory_test.go b/pkg/restic/command_factory_test.go index d4236198f5..e8145a28f6 100644 --- a/pkg/restic/command_factory_test.go +++ b/pkg/restic/command_factory_test.go @@ -117,3 +117,13 @@ func TestForgetCommand(t *testing.T) { assert.Equal(t, "repo-id", c.RepoIdentifier) assert.Equal(t, []string{"snapshot-id"}, c.Args) } + +func TestStatsCommand(t *testing.T) { + c := StatsCommand("repo-id", "password-file", "snapshot-id") + + assert.Equal(t, "stats", c.Command) + assert.Equal(t, "repo-id", c.RepoIdentifier) + assert.Equal(t, "password-file", c.PasswordFile) + assert.Equal(t, []string{"snapshot-id"}, c.Args) + assert.Equal(t, []string{"--json"}, c.ExtraFlags) +} diff --git a/pkg/restic/exec_commands.go b/pkg/restic/exec_commands.go index b021774d7b..58fa6b26de 100644 --- a/pkg/restic/exec_commands.go +++ b/pkg/restic/exec_commands.go @@ -27,10 +27,14 @@ import ( velerov1api "github.com/heptio/velero/pkg/apis/velero/v1" "github.com/heptio/velero/pkg/util/exec" + "github.com/heptio/velero/pkg/util/filesystem" ) +const restoreProgressCheckInterval = 10 * time.Second const backupProgressCheckInterval = 10 * time.Second +var fileSystem = filesystem.NewFileSystem() + type backupStatusLine struct { MessageType string `json:"message_type"` // seen in status lines @@ -171,3 +175,98 @@ func getSummaryLine(b []byte) ([]byte, error) { } return b[summaryLineIdx : summaryLineIdx+newLineIdx], nil } + +// RunRestore runs a `restic restore` command and monitors the volume size to +// provide progress updates to the caller. +func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, string, error) { + snapshotSize, err := getSnapshotSize(restoreCmd.RepoIdentifier, restoreCmd.PasswordFile, restoreCmd.Args[0]) + if err != nil { + return "", "", err + } + + updateFunc(velerov1api.PodVolumeOperationProgress{ + TotalBytes: snapshotSize, + }) + + // create a channel to signal when to end the goroutine scanning for progress + // updates + quit := make(chan struct{}) + + go func() { + ticker := time.NewTicker(restoreProgressCheckInterval) + for { + select { + case <-ticker.C: + volumeSize, err := getVolumeSize(restoreCmd.Dir) + if err != nil { + log.WithError(err).Errorf("error getting restic restore progress") + } + + updateFunc(velerov1api.PodVolumeOperationProgress{ + TotalBytes: snapshotSize, + BytesDone: volumeSize, + }) + case <-quit: + ticker.Stop() + return + } + } + }() + + stdout, stderr, err := exec.RunCommand(restoreCmd.Cmd()) + quit <- struct{}{} + + // update progress to 100% + updateFunc(velerov1api.PodVolumeOperationProgress{ + TotalBytes: snapshotSize, + BytesDone: snapshotSize, + }) + + return stdout, stderr, err +} + +func getSnapshotSize(repoIdentifier, passwordFile, snapshotID string) (int64, error) { + cmd := StatsCommand(repoIdentifier, passwordFile, snapshotID) + + stdout, stderr, err := exec.RunCommand(cmd.Cmd()) + if err != nil { + return 0, errors.Wrapf(err, "error running command, stderr=%s", stderr) + } + + var snapshotStats struct { + TotalSize int64 `json:"total_size"` + } + + if err := json.Unmarshal([]byte(stdout), &snapshotStats); err != nil { + return 0, errors.Wrap(err, "error unmarshalling restic stats result") + } + + if snapshotStats.TotalSize == 0 { + return 0, errors.Errorf("error getting snapshot size %+v", snapshotStats) + } + + return snapshotStats.TotalSize, nil +} + +func getVolumeSize(path string) (int64, error) { + var size int64 + + files, err := fileSystem.ReadDir(path) + if err != nil { + return 0, errors.Wrapf(err, "error reading directory %s", path) + } + + for _, file := range files { + if file.IsDir() { + s, err := getVolumeSize(fmt.Sprintf("%s/%s", path, file.Name())) + if err != nil { + return 0, err + } + size += s + } else { + size += file.Size() + } + } + + return size, nil +} diff --git a/pkg/restic/exec_commands_test.go b/pkg/restic/exec_commands_test.go index ee0257eae0..a7b4d4af5e 100644 --- a/pkg/restic/exec_commands_test.go +++ b/pkg/restic/exec_commands_test.go @@ -20,6 +20,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/heptio/velero/pkg/test" + "github.com/heptio/velero/pkg/util/filesystem" ) func Test_getSummaryLine(t *testing.T) { @@ -78,3 +81,28 @@ third line }) } } + +func Test_getVolumeSize(t *testing.T) { + files := map[string][]byte{ + "/file1.txt": []byte("file1"), + "/file2.txt": []byte("file2"), + "/file3.txt": []byte("file3"), + "/files/file4.txt": []byte("file4"), + "/files/nested/file5.txt": []byte("file5"), + } + fakefs := test.NewFakeFileSystem() + + var expectedSize int64 + for path, content := range files { + fakefs.WithFile(path, content) + expectedSize += int64(len(content)) + } + + fileSystem = fakefs + defer func() { fileSystem = filesystem.NewFileSystem() }() + + actualSize, err := getVolumeSize("/") + + assert.NoError(t, err) + assert.Equal(t, expectedSize, actualSize) +}