Skip to content

Commit

Permalink
proxy: add SnapshotHash and SnapshotHashStatus
Browse files Browse the repository at this point in the history
Longhorn 4210
Longhorn 3198

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Oct 17, 2022
1 parent c459ada commit 2a5b008
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 3 deletions.
67 changes: 65 additions & 2 deletions pkg/client/proxy_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package client
import (
"github.com/pkg/errors"

rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc"

etypes "github.com/longhorn/longhorn-engine/pkg/types"
eutil "github.com/longhorn/longhorn-engine/pkg/util"
eptypes "github.com/longhorn/longhorn-engine/proto/ptypes"
rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc"
)

const (
Expand Down Expand Up @@ -279,3 +278,67 @@ func (c *ProxyClient) SnapshotRemove(serviceAddress string, names []string) (err

return nil
}

func (c *ProxyClient) SnapshotHash(serviceAddress string, snapshotNames []string, rehash bool) (err error) {
input := map[string]string{
"serviceAddress": serviceAddress,
}
if err := validateProxyMethodParameters(input); err != nil {
return errors.Wrap(err, "failed to hash snapshot")
}

defer func() {
err = errors.Wrapf(err, "%v failed to hash snapshot", c.getProxyErrorPrefix(serviceAddress))
}()

req := &rpc.EngineSnapshotHashRequest{
ProxyEngineRequest: &rpc.ProxyEngineRequest{
Address: serviceAddress,
},
SnapshotNames: snapshotNames,
Rehash: rehash,
}
_, err = c.service.SnapshotHash(getContextWithGRPCTimeout(c.ctx), req)
if err != nil {
return err
}

return nil
}

func (c *ProxyClient) SnapshotHashStatus(serviceAddress, snapshotName string) (status map[string]*SnapshotHashStatus, err error) {
input := map[string]string{
"serviceAddress": serviceAddress,
}
if err := validateProxyMethodParameters(input); err != nil {
return nil, errors.Wrap(err, "failed to get snapshot hash status")
}

defer func() {
err = errors.Wrapf(err, "%v failed to get snapshot hash status", c.getProxyErrorPrefix(serviceAddress))
}()

req := &rpc.EngineSnapshotHashStatusRequest{
ProxyEngineRequest: &rpc.ProxyEngineRequest{
Address: serviceAddress,
},
SnapshotName: snapshotName,
}

recv, err := c.service.SnapshotHashStatus(getContextWithGRPCTimeout(c.ctx), req)
if err != nil {
return nil, err
}

status = make(map[string]*SnapshotHashStatus)
for k, v := range recv.Status {
status[k] = &SnapshotHashStatus{
State: v.State,
Progress: int(v.Progress),
Checksum: v.Checksum,
Error: v.Error,
}
}

return status, nil
}
7 changes: 7 additions & 0 deletions pkg/client/proxy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,10 @@ type ReplicaRebuildStatus struct {
State string
FromReplicaAddress string
}

type SnapshotHashStatus struct {
State string
Progress int
Checksum string
Error string
}
47 changes: 46 additions & 1 deletion pkg/proxy/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (p *Proxy) SnapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeR

func (p *Proxy) SnapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotPurgeStatusProxyResponse, err error) {
log := logrus.WithFields(logrus.Fields{"serviceURL": req.Address})
log.Debug("Get snapshot purge status")
log.Debug("Getting snapshot purge status")

task, err := esync.NewTask(ctx, req.Address)
if err != nil {
Expand Down Expand Up @@ -210,3 +210,48 @@ func (p *Proxy) SnapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemov

return &empty.Empty{}, lastErr
}

func (p *Proxy) SnapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashRequest) (resp *empty.Empty, err error) {
log := logrus.WithFields(logrus.Fields{"serviceURL": req.ProxyEngineRequest.Address})
log.Debug("Hashing snapshot")

task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address)
if err != nil {
return nil, err
}

if err := task.HashSnapshot(req.SnapshotNames, req.Rehash); err != nil {
return nil, err
}

return &empty.Empty{}, nil
}

func (p *Proxy) SnapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotHashStatusRequest) (resp *rpc.EngineSnapshotHashStatusProxyResponse, err error) {
log := logrus.WithFields(logrus.Fields{"serviceURL": req.ProxyEngineRequest.Address})
log.Debug("Getting snapshot hash status")

task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address)
if err != nil {
return nil, err
}

recv, err := task.HashSnapshotStatus(req.SnapshotName)
if err != nil {
return nil, err
}

resp = &rpc.EngineSnapshotHashStatusProxyResponse{
Status: map[string]*eptypes.SnapshotHashStatusResponse{},
}
for k, v := range recv {
resp.Status[k] = &eptypes.SnapshotHashStatusResponse{
State: v.State,
Progress: int32(v.Progress),
Checksum: v.Checksum,
Error: v.Error,
}
}

return resp, nil
}

0 comments on commit 2a5b008

Please sign in to comment.