From 2a5b008fce5545ce36c4f557c3267afd5bd7ced8 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Mon, 17 Oct 2022 10:54:49 +0800 Subject: [PATCH] proxy: add SnapshotHash and SnapshotHashStatus Longhorn 4210 Longhorn 3198 Signed-off-by: Derek Su --- pkg/client/proxy_snapshot.go | 67 ++++++++++++++++++++++++++++++++++-- pkg/client/proxy_types.go | 7 ++++ pkg/proxy/snapshot.go | 47 ++++++++++++++++++++++++- 3 files changed, 118 insertions(+), 3 deletions(-) diff --git a/pkg/client/proxy_snapshot.go b/pkg/client/proxy_snapshot.go index 8765f72c0..1fc09fa12 100644 --- a/pkg/client/proxy_snapshot.go +++ b/pkg/client/proxy_snapshot.go @@ -3,11 +3,10 @@ package client import ( "github.com/pkg/errors" - rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" - etypes "github.com/longhorn/longhorn-engine/pkg/types" eutil "github.com/longhorn/longhorn-engine/pkg/util" eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" + rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" ) const ( @@ -279,3 +278,67 @@ func (c *ProxyClient) SnapshotRemove(serviceAddress string, names []string) (err return nil } + +func (c *ProxyClient) SnapshotHash(serviceAddress string, snapshotNames []string, rehash bool) (err error) { + input := map[string]string{ + "serviceAddress": serviceAddress, + } + if err := validateProxyMethodParameters(input); err != nil { + return errors.Wrap(err, "failed to hash snapshot") + } + + defer func() { + err = errors.Wrapf(err, "%v failed to hash snapshot", c.getProxyErrorPrefix(serviceAddress)) + }() + + req := &rpc.EngineSnapshotHashRequest{ + ProxyEngineRequest: &rpc.ProxyEngineRequest{ + Address: serviceAddress, + }, + SnapshotNames: snapshotNames, + Rehash: rehash, + } + _, err = c.service.SnapshotHash(getContextWithGRPCTimeout(c.ctx), req) + if err != nil { + return err + } + + return nil +} + +func (c *ProxyClient) SnapshotHashStatus(serviceAddress, snapshotName string) (status map[string]*SnapshotHashStatus, err error) { + input := map[string]string{ + "serviceAddress": serviceAddress, + } + if err := validateProxyMethodParameters(input); err != nil { + return nil, errors.Wrap(err, "failed to get snapshot hash status") + } + + defer func() { + err = errors.Wrapf(err, "%v failed to get snapshot hash status", c.getProxyErrorPrefix(serviceAddress)) + }() + + req := &rpc.EngineSnapshotHashStatusRequest{ + ProxyEngineRequest: &rpc.ProxyEngineRequest{ + Address: serviceAddress, + }, + SnapshotName: snapshotName, + } + + recv, err := c.service.SnapshotHashStatus(getContextWithGRPCTimeout(c.ctx), req) + if err != nil { + return nil, err + } + + status = make(map[string]*SnapshotHashStatus) + for k, v := range recv.Status { + status[k] = &SnapshotHashStatus{ + State: v.State, + Progress: int(v.Progress), + Checksum: v.Checksum, + Error: v.Error, + } + } + + return status, nil +} diff --git a/pkg/client/proxy_types.go b/pkg/client/proxy_types.go index 18b22a334..4424e2cda 100644 --- a/pkg/client/proxy_types.go +++ b/pkg/client/proxy_types.go @@ -73,3 +73,10 @@ type ReplicaRebuildStatus struct { State string FromReplicaAddress string } + +type SnapshotHashStatus struct { + State string + Progress int + Checksum string + Error string +} diff --git a/pkg/proxy/snapshot.go b/pkg/proxy/snapshot.go index 35bcf8dc3..f8cd1ad95 100644 --- a/pkg/proxy/snapshot.go +++ b/pkg/proxy/snapshot.go @@ -164,7 +164,7 @@ func (p *Proxy) SnapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeR func (p *Proxy) SnapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotPurgeStatusProxyResponse, err error) { log := logrus.WithFields(logrus.Fields{"serviceURL": req.Address}) - log.Debug("Get snapshot purge status") + log.Debug("Getting snapshot purge status") task, err := esync.NewTask(ctx, req.Address) if err != nil { @@ -210,3 +210,48 @@ func (p *Proxy) SnapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemov return &empty.Empty{}, lastErr } + +func (p *Proxy) SnapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashRequest) (resp *empty.Empty, err error) { + log := logrus.WithFields(logrus.Fields{"serviceURL": req.ProxyEngineRequest.Address}) + log.Debug("Hashing snapshot") + + task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address) + if err != nil { + return nil, err + } + + if err := task.HashSnapshot(req.SnapshotNames, req.Rehash); err != nil { + return nil, err + } + + return &empty.Empty{}, nil +} + +func (p *Proxy) SnapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotHashStatusRequest) (resp *rpc.EngineSnapshotHashStatusProxyResponse, err error) { + log := logrus.WithFields(logrus.Fields{"serviceURL": req.ProxyEngineRequest.Address}) + log.Debug("Getting snapshot hash status") + + task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address) + if err != nil { + return nil, err + } + + recv, err := task.HashSnapshotStatus(req.SnapshotName) + if err != nil { + return nil, err + } + + resp = &rpc.EngineSnapshotHashStatusProxyResponse{ + Status: map[string]*eptypes.SnapshotHashStatusResponse{}, + } + for k, v := range recv { + resp.Status[k] = &eptypes.SnapshotHashStatusResponse{ + State: v.State, + Progress: int32(v.Progress), + Checksum: v.Checksum, + Error: v.Error, + } + } + + return resp, nil +}