From 58def2797970ef7fc1b0a3718d00f1a2d80d8144 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 7 Oct 2024 23:54:28 -0400 Subject: [PATCH 01/11] WIP Signed-off-by: Cole Miller --- app/app.go | 141 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/app/app.go b/app/app.go index 3c148277..0d5910ba 100644 --- a/app/app.go +++ b/app/app.go @@ -1,13 +1,18 @@ package app import ( + "archive/tar" + "bytes" "context" "crypto/tls" "database/sql" "fmt" + "io" + "io/ioutil" "net" "os" "path/filepath" + "regexp" "runtime" "sync" "sync/atomic" @@ -310,6 +315,142 @@ func New(dir string, options ...Option) (app *App, err error) { return app, nil } +// RecoveryKernel is a bundle of data that supports recovery of an App-managed dqlite cluster. +// +// When using App, the procedure to recover a cluster is as follows: +// +// 0. Make sure all nodes are stopped. +// 1. Call ReadLastEntryInfo on each node to determine which has the most up-to-date log. This is the "template node". +// 2. On the template node, create an empty recovery kernel. +// 3. On the template node, call PrepareRecovery to get a RecoveryKernel. +// 4. Transfer the RecoveryKernel to each remaining node. +// 5. On each remaining node, call Propagate. +// 6. Restart the cluster. +type RecoveryKernel struct { + // Opaque contains data that is used during recovery but should not otherwise be inspected. + Opaque bytes.Buffer + // Cluster is the list of nodes that should be in the cluster after recovery. It should not be mutated directly. + Cluster []dqlite.NodeInfo +} + +var belongsInKernelPattern = regexp.MustCompile(`\A(metadata[1-2]|[0-9]{16}-[0-9]{16}|open-[0-9]+|snapshot-[0-9]+-[0-9]+-[0-9]+(.meta)?)\z`) + +func belongsInKernel(name string) bool { + return belongsInKernelPattern.MatchString(name) +} + +func PrepareRecovery(dir string, address string, cluster []dqlite.NodeInfo) (*RecoveryKernel, error) { + var me *dqlite.NodeInfo + for _, node := range cluster { + if node.Address == address { + me = &node + break + } + } + if me == nil { + return nil, fmt.Errorf("address %q not in provided cluster list %v", address, cluster) + } + + if err := dqlite.ReconfigureMembershipExt(dir, cluster); err != nil { + return nil, errors.Wrapf(err, "reconfigure membership") + } + store, err := client.NewYamlNodeStore(filepath.Join(dir, storeFile)) + if err != nil { + return nil, errors.Wrapf(err, "create node store") + } + if err := store.Set(context.Background(), cluster); err != nil { + return nil, errors.Wrapf(err, "write node store") + } + if err := fileMarshal(dir, filepath.Join(dir, infoFile), me); err != nil { + return nil, errors.Wrapf(err, "write info file") + } + + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, errors.Wrapf(err, "read %q", dir) + } + var tarball bytes.Buffer + tw := tar.NewWriter(&tarball) + defer tw.Close() + for _, file := range files { + if file.IsDir() || !belongsInKernel(file.Name()) { + continue + } + path := filepath.Join(dir, file.Name()) + hdr, err := tar.FileInfoHeader(file, "") + if err != nil { + return nil, errors.Wrapf(err, "file info header for %q", path) + } + if err = tw.WriteHeader(hdr); err != nil { + return nil, errors.Wrapf(err, "write header for %q", path) + } + src, err := os.Open(path) + if err != nil { + return nil, errors.Wrapf(err, "open %q", path) + } + if _, err := io.Copy(tw, src); err != nil { + return nil, errors.Wrapf(err, "read %q", path) + } + } + if err = tw.Flush(); err != nil { + return nil, errors.Wrapf(err, "finish tarball for %q", dir) + } + return &RecoveryKernel{tarball, cluster}, nil +} + +func (kern *RecoveryKernel) Propagate(dir string, address string) error { + var me *dqlite.NodeInfo + for _, node := range kern.Cluster { + if node.Address == address { + me = &node + break + } + } + if me == nil { + return fmt.Errorf("address %q not in provided cluster list %v", address, kern.Cluster) + } + + info, err := os.Stat(dir) + if err != nil { + return errors.Wrapf(err, "stat %q", dir) + } + if err := os.RemoveAll(dir); err != nil { + return errors.Wrapf(err, "remove %q", dir) + } + if err := os.Mkdir(dir, info.Mode()); err != nil { + return errors.Wrapf(err, "create %q", dir) + } + + tr := tar.NewReader(&kern.Opaque) + var hdr *tar.Header + for hdr, err = nil, nil; err == nil; hdr, err = tr.Next() { + path := filepath.Join(dir, hdr.Name) + dest, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, os.FileMode(hdr.Mode)) + if err != nil { + return errors.Wrapf(err, "create %q", path) + } + if _, err := io.Copy(dest, tr); err != nil { + return errors.Wrapf(err, "write %q", path) + } + } + if err != io.EOF { + return errors.Wrapf(err, "read tarball for %q", dir) + } + + store, err := client.NewYamlNodeStore(filepath.Join(dir, storeFile)) + if err != nil { + return errors.Wrapf(err, "create node store") + } + if err := store.Set(context.Background(), kern.Cluster); err != nil { + return errors.Wrapf(err, "write node store") + } + if err := fileMarshal(dir, filepath.Join(dir, infoFile), me); err != nil { + return errors.Wrapf(err, "write info file") + } + + return nil +} + // Handover transfers all responsibilities for this node (such has leadership // and voting rights) to another node, if one is available. // From 5ab9fa4dc84c6219b45fec2bf7c93f5c5926659d Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 15:58:18 -0400 Subject: [PATCH 02/11] WIP Signed-off-by: Cole Miller --- go.mod | 3 ++- go.sum | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 13c9e8e0..76b00fd3 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/spf13/cobra v1.2.1 github.com/stretchr/testify v1.7.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 + golang.org/x/sys v0.26.0 + golang.org/x/term v0.25.0 // indirect gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 5dc872c8..4e49bbd3 100644 --- a/go.sum +++ b/go.sum @@ -399,7 +399,11 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= +golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= From 71f9e8e667ee54a7979d3af01f8f343f9f79b0ea Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 16:07:40 -0400 Subject: [PATCH 03/11] cmd Signed-off-by: Cole Miller --- cmd/dqlite-recover/dqlite-recover.go | 61 ++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 cmd/dqlite-recover/dqlite-recover.go diff --git a/cmd/dqlite-recover/dqlite-recover.go b/cmd/dqlite-recover/dqlite-recover.go new file mode 100644 index 00000000..336b05cc --- /dev/null +++ b/cmd/dqlite-recover/dqlite-recover.go @@ -0,0 +1,61 @@ +package main + +import ( + "encoding/gob" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "os" + + "github.com/canonical/go-dqlite/app" + "golang.org/x/term" +) + +func main() { + dir := flag.String("dir", "", "local data directory") + address := flag.String("address", "", "new address of this node") + mode := flag.String("mode", "", "'prepare' or 'propagate'") + flag.Parse() + if *address == "" { + log.Fatal("address is required") + } + if *dir == "" { + log.Fatal("dir is required") + } + + switch *mode { + case "prepare": + kern, err := app.PrepareRecovery(*dir, *address) + if err != nil { + log.Fatal(err) + } + var sink io.Writer + if term.IsTerminal(1 /* STDOUT_FILENO */) { + base := fmt.Sprintf("dqlite-recover-kernel-%s-%s", *dir, *address) + f, err := ioutil.TempFile(".", base) + if err != nil { + log.Fatal(err) + } + sink = f + } else { + sink = os.Stdout + } + enc := gob.NewEncoder(sink) + if err := enc.Encode(kern); err != nil { + log.Fatal(err) + } + case "propagate": + dec := gob.NewDecoder(os.Stdin) + var kern app.RecoveryKernel + if err := dec.Decode(&kern); err != nil { + log.Fatal(err) + } + if err := kern.Propagate(*dir, *address); err != nil { + log.Fatal(err) + } + default: + log.Fatal("mode must be 'prepare' or 'propagate'") + } +} From bf93a5d25e77a0c886483d24c17b9589c648e3ed Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 12:41:56 -0400 Subject: [PATCH 04/11] Test App recovery, fix bugs Signed-off-by: Cole Miller --- app/app.go | 141 ---------------------------------- app/app_go1.18_test.go | 140 --------------------------------- app/recovery.go | 155 +++++++++++++++++++++++++++++++++++++ app/recovery_test.go | 170 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 325 insertions(+), 281 deletions(-) delete mode 100644 app/app_go1.18_test.go create mode 100644 app/recovery.go create mode 100644 app/recovery_test.go diff --git a/app/app.go b/app/app.go index 0d5910ba..3c148277 100644 --- a/app/app.go +++ b/app/app.go @@ -1,18 +1,13 @@ package app import ( - "archive/tar" - "bytes" "context" "crypto/tls" "database/sql" "fmt" - "io" - "io/ioutil" "net" "os" "path/filepath" - "regexp" "runtime" "sync" "sync/atomic" @@ -315,142 +310,6 @@ func New(dir string, options ...Option) (app *App, err error) { return app, nil } -// RecoveryKernel is a bundle of data that supports recovery of an App-managed dqlite cluster. -// -// When using App, the procedure to recover a cluster is as follows: -// -// 0. Make sure all nodes are stopped. -// 1. Call ReadLastEntryInfo on each node to determine which has the most up-to-date log. This is the "template node". -// 2. On the template node, create an empty recovery kernel. -// 3. On the template node, call PrepareRecovery to get a RecoveryKernel. -// 4. Transfer the RecoveryKernel to each remaining node. -// 5. On each remaining node, call Propagate. -// 6. Restart the cluster. -type RecoveryKernel struct { - // Opaque contains data that is used during recovery but should not otherwise be inspected. - Opaque bytes.Buffer - // Cluster is the list of nodes that should be in the cluster after recovery. It should not be mutated directly. - Cluster []dqlite.NodeInfo -} - -var belongsInKernelPattern = regexp.MustCompile(`\A(metadata[1-2]|[0-9]{16}-[0-9]{16}|open-[0-9]+|snapshot-[0-9]+-[0-9]+-[0-9]+(.meta)?)\z`) - -func belongsInKernel(name string) bool { - return belongsInKernelPattern.MatchString(name) -} - -func PrepareRecovery(dir string, address string, cluster []dqlite.NodeInfo) (*RecoveryKernel, error) { - var me *dqlite.NodeInfo - for _, node := range cluster { - if node.Address == address { - me = &node - break - } - } - if me == nil { - return nil, fmt.Errorf("address %q not in provided cluster list %v", address, cluster) - } - - if err := dqlite.ReconfigureMembershipExt(dir, cluster); err != nil { - return nil, errors.Wrapf(err, "reconfigure membership") - } - store, err := client.NewYamlNodeStore(filepath.Join(dir, storeFile)) - if err != nil { - return nil, errors.Wrapf(err, "create node store") - } - if err := store.Set(context.Background(), cluster); err != nil { - return nil, errors.Wrapf(err, "write node store") - } - if err := fileMarshal(dir, filepath.Join(dir, infoFile), me); err != nil { - return nil, errors.Wrapf(err, "write info file") - } - - files, err := ioutil.ReadDir(dir) - if err != nil { - return nil, errors.Wrapf(err, "read %q", dir) - } - var tarball bytes.Buffer - tw := tar.NewWriter(&tarball) - defer tw.Close() - for _, file := range files { - if file.IsDir() || !belongsInKernel(file.Name()) { - continue - } - path := filepath.Join(dir, file.Name()) - hdr, err := tar.FileInfoHeader(file, "") - if err != nil { - return nil, errors.Wrapf(err, "file info header for %q", path) - } - if err = tw.WriteHeader(hdr); err != nil { - return nil, errors.Wrapf(err, "write header for %q", path) - } - src, err := os.Open(path) - if err != nil { - return nil, errors.Wrapf(err, "open %q", path) - } - if _, err := io.Copy(tw, src); err != nil { - return nil, errors.Wrapf(err, "read %q", path) - } - } - if err = tw.Flush(); err != nil { - return nil, errors.Wrapf(err, "finish tarball for %q", dir) - } - return &RecoveryKernel{tarball, cluster}, nil -} - -func (kern *RecoveryKernel) Propagate(dir string, address string) error { - var me *dqlite.NodeInfo - for _, node := range kern.Cluster { - if node.Address == address { - me = &node - break - } - } - if me == nil { - return fmt.Errorf("address %q not in provided cluster list %v", address, kern.Cluster) - } - - info, err := os.Stat(dir) - if err != nil { - return errors.Wrapf(err, "stat %q", dir) - } - if err := os.RemoveAll(dir); err != nil { - return errors.Wrapf(err, "remove %q", dir) - } - if err := os.Mkdir(dir, info.Mode()); err != nil { - return errors.Wrapf(err, "create %q", dir) - } - - tr := tar.NewReader(&kern.Opaque) - var hdr *tar.Header - for hdr, err = nil, nil; err == nil; hdr, err = tr.Next() { - path := filepath.Join(dir, hdr.Name) - dest, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, os.FileMode(hdr.Mode)) - if err != nil { - return errors.Wrapf(err, "create %q", path) - } - if _, err := io.Copy(dest, tr); err != nil { - return errors.Wrapf(err, "write %q", path) - } - } - if err != io.EOF { - return errors.Wrapf(err, "read tarball for %q", dir) - } - - store, err := client.NewYamlNodeStore(filepath.Join(dir, storeFile)) - if err != nil { - return errors.Wrapf(err, "create node store") - } - if err := store.Set(context.Background(), kern.Cluster); err != nil { - return errors.Wrapf(err, "write node store") - } - if err := fileMarshal(dir, filepath.Join(dir, infoFile), me); err != nil { - return errors.Wrapf(err, "write info file") - } - - return nil -} - // Handover transfers all responsibilities for this node (such has leadership // and voting rights) to another node, if one is available. // diff --git a/app/app_go1.18_test.go b/app/app_go1.18_test.go deleted file mode 100644 index 3fbe6b3d..00000000 --- a/app/app_go1.18_test.go +++ /dev/null @@ -1,140 +0,0 @@ -//go:build go1.18 -// +build go1.18 - -package app_test - -// import ( -// "context" -// "crypto/tls" -// "net" -// "testing" - -// "github.com/canonical/go-dqlite/app" -// "github.com/canonical/go-dqlite/client" -// "github.com/quic-go/quic-go" -// "github.com/stretchr/testify/assert" -// "github.com/stretchr/testify/require" -// ) - -// // quic.Stream doesn't implement net.Conn, so we need to wrap it. -// type quicConn struct { -// quic.Stream -// } - -// func (c *quicConn) LocalAddr() net.Addr { -// return nil -// } - -// func (c *quicConn) RemoteAddr() net.Addr { -// return nil -// } - -// // TestExternalConnWithQUIC creates a 3-member cluster using external quic connection -// // and ensures the cluster is successfully created, and that the connection is -// // handled manually. -// func TestExternalConnWithQUIC(t *testing.T) { -// externalAddr1 := "127.0.0.1:9191" -// externalAddr2 := "127.0.0.1:9292" -// externalAddr3 := "127.0.0.1:9393" -// acceptCh1 := make(chan net.Conn) -// acceptCh2 := make(chan net.Conn) -// acceptCh3 := make(chan net.Conn) - -// dialFunc := func(ctx context.Context, addr string) (net.Conn, error) { -// conn, err := quic.DialAddrContext(ctx, addr, &tls.Config{InsecureSkipVerify: true, NextProtos: []string{"quic"}}, nil) -// require.NoError(t, err) - -// stream, err := conn.OpenStreamSync(ctx) -// require.NoError(t, err) - -// return &quicConn{ -// Stream: stream, -// }, nil -// } - -// cert, pool := loadCert(t) -// tlsconfig := app.SimpleListenTLSConfig(cert, pool) -// tlsconfig.NextProtos = []string{"quic"} -// tlsconfig.ClientAuth = tls.NoClientCert - -// serveQUIC := func(addr string, acceptCh chan net.Conn, cleanups chan func()) { -// lis, err := quic.ListenAddr(addr, tlsconfig, nil) -// require.NoError(t, err) - -// ctx, cancel := context.WithCancel(context.Background()) - -// go func() { -// for { -// select { -// case <-ctx.Done(): -// return -// default: -// conn, err := lis.Accept(context.Background()) -// if err != nil { -// return -// } - -// stream, err := conn.AcceptStream(context.Background()) -// if err != nil { -// return -// } - -// acceptCh <- &quicConn{ -// Stream: stream, -// } -// } -// } -// }() - -// cleanup := func() { -// cancel() -// require.NoError(t, lis.Close()) -// } - -// cleanups <- cleanup -// } - -// liscleanups := make(chan func(), 3) -// // Start up three listeners. -// go serveQUIC(externalAddr1, acceptCh1, liscleanups) -// go serveQUIC(externalAddr2, acceptCh2, liscleanups) -// go serveQUIC(externalAddr3, acceptCh3, liscleanups) - -// defer func() { -// for i := 0; i < 3; i++ { -// cleanup := <-liscleanups -// cleanup() -// } -// close(liscleanups) -// }() - -// app1, cleanup := newAppWithNoTLS(t, app.WithAddress(externalAddr1), app.WithExternalConn(dialFunc, acceptCh1)) -// defer cleanup() - -// app2, cleanup := newAppWithNoTLS(t, app.WithAddress(externalAddr2), app.WithExternalConn(dialFunc, acceptCh2), app.WithCluster([]string{externalAddr1})) -// defer cleanup() - -// require.NoError(t, app2.Ready(context.Background())) - -// app3, cleanup := newAppWithNoTLS(t, app.WithAddress(externalAddr3), app.WithExternalConn(dialFunc, acceptCh3), app.WithCluster([]string{externalAddr1})) -// defer cleanup() - -// require.NoError(t, app3.Ready(context.Background())) - -// // Get a client from the first node (likely the leader). -// cli, err := app1.Leader(context.Background()) -// require.NoError(t, err) -// defer cli.Close() - -// // Ensure entries exist for each cluster member. -// cluster, err := cli.Cluster(context.Background()) -// require.NoError(t, err) -// assert.Equal(t, externalAddr1, cluster[0].Address) -// assert.Equal(t, externalAddr2, cluster[1].Address) -// assert.Equal(t, externalAddr3, cluster[2].Address) - -// // Every cluster member should be a voter. -// assert.Equal(t, client.Voter, cluster[0].Role) -// assert.Equal(t, client.Voter, cluster[1].Role) -// assert.Equal(t, client.Voter, cluster[2].Role) -// } diff --git a/app/recovery.go b/app/recovery.go new file mode 100644 index 00000000..bc176650 --- /dev/null +++ b/app/recovery.go @@ -0,0 +1,155 @@ +package app + +import ( + "archive/tar" + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "regexp" + + dqlite "github.com/canonical/go-dqlite" + "github.com/canonical/go-dqlite/client" + "github.com/pkg/errors" +) + +// RecoveryKernel is a bundle of data that supports recovery of an App-managed dqlite cluster. +// +// When using App, the procedure to recover a cluster is as follows: +// +// 0. Make sure all nodes are stopped. +// 1. Call ReadLastEntryInfo on each node to determine which has the most up-to-date log. This is the "template node". +// 2. On the template node, create an empty recovery kernel. +// 3. On the template node, call PrepareRecovery to get a RecoveryKernel. +// 4. Transfer the RecoveryKernel to each remaining node. This can be done, for example, by gob-encoding it. +// 5. On each remaining node, call Propagate. +// 6. Restart the cluster. +type RecoveryKernel struct { + // Opaque contains data that is used during recovery but should not otherwise be inspected. + Opaque []byte + // Cluster is the list of nodes that should be in the cluster after recovery. It should not be mutated directly. + Cluster []dqlite.NodeInfo +} + +// The files we recover are: +// +// - metadata1 and metadata2 +// - closed segments like 0000000000000001-0000000000000002 +// - open segments like open-3 +// - snapshots like snapshot-1-75776-57638163 and their accompanying .meta files +var belongsInKernelPattern = regexp.MustCompile(`\A(metadata[1-2]|[0-9]{16}-[0-9]{16}|open-[0-9]+|snapshot-[0-9]+-[0-9]+-[0-9]+(.meta)?)\z`) + +func PrepareRecovery(dir string, address string, cluster []dqlite.NodeInfo) (*RecoveryKernel, error) { + var me *dqlite.NodeInfo + for _, node := range cluster { + if node.Address == address { + me = &node + break + } + } + if me == nil { + return nil, fmt.Errorf("address %q not in provided cluster list %v", address, cluster) + } + + if err := dqlite.ReconfigureMembershipExt(dir, cluster); err != nil { + return nil, errors.Wrapf(err, "reconfigure membership") + } + store, err := client.NewYamlNodeStore(filepath.Join(dir, storeFile)) + if err != nil { + return nil, errors.Wrapf(err, "create node store") + } + if err := store.Set(context.Background(), cluster); err != nil { + return nil, errors.Wrapf(err, "write node store") + } + if err := fileMarshal(dir, infoFile, me); err != nil { + return nil, errors.Wrapf(err, "write info file") + } + + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, errors.Wrapf(err, "read %q", dir) + } + var tarball bytes.Buffer + tw := tar.NewWriter(&tarball) + defer tw.Close() + for _, file := range files { + if file.IsDir() || !belongsInKernelPattern.MatchString(file.Name()) { + continue + } + path := filepath.Join(dir, file.Name()) + hdr, err := tar.FileInfoHeader(file, "") + if err != nil { + return nil, errors.Wrapf(err, "file info header for %q", path) + } + if err = tw.WriteHeader(hdr); err != nil { + return nil, errors.Wrapf(err, "write header for %q", path) + } + src, err := os.Open(path) + if err != nil { + return nil, errors.Wrapf(err, "open %q", path) + } + if _, err := io.Copy(tw, src); err != nil { + return nil, errors.Wrapf(err, "read %q", path) + } + } + if err = tw.Flush(); err != nil { + return nil, errors.Wrapf(err, "finish tarball for %q", dir) + } + return &RecoveryKernel{tarball.Bytes(), cluster}, nil +} + +func (kern *RecoveryKernel) Propagate(dir string, address string) error { + var me *dqlite.NodeInfo + for _, node := range kern.Cluster { + if node.Address == address { + me = &node + break + } + } + if me == nil { + return fmt.Errorf("address %q not in provided cluster list %v", address, kern.Cluster) + } + + info, err := os.Stat(dir) + if err != nil { + return errors.Wrapf(err, "stat %q", dir) + } + if err := os.RemoveAll(dir); err != nil { + return errors.Wrapf(err, "remove %q", dir) + } + if err := os.Mkdir(dir, info.Mode()); err != nil { + return errors.Wrapf(err, "create %q", dir) + } + + tr := tar.NewReader(bytes.NewBuffer(kern.Opaque)) + var hdr *tar.Header + for hdr, err = tr.Next(); err == nil; hdr, err = tr.Next() { + path := filepath.Join(dir, hdr.Name) + dest, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, os.FileMode(hdr.Mode)) + if err != nil { + return errors.Wrapf(err, "create %q", path) + } + if _, err := io.Copy(dest, tr); err != nil { + return errors.Wrapf(err, "write %q", path) + } + } + if err != io.EOF { + return errors.Wrapf(err, "read tarball for %q", dir) + } + + store, err := client.NewYamlNodeStore(filepath.Join(dir, storeFile)) + if err != nil { + return errors.Wrapf(err, "create node store") + } + if err := store.Set(context.Background(), kern.Cluster); err != nil { + return errors.Wrapf(err, "write node store") + } + if err := fileMarshal(dir, infoFile, me); err != nil { + return errors.Wrapf(err, "write info file") + } + + return nil +} diff --git a/app/recovery_test.go b/app/recovery_test.go new file mode 100644 index 00000000..b5acbf51 --- /dev/null +++ b/app/recovery_test.go @@ -0,0 +1,170 @@ +package app_test + +import ( + "bytes" + "context" + "encoding/gob" + "fmt" + "io/ioutil" + "testing" + "time" + + "github.com/canonical/go-dqlite" + "github.com/canonical/go-dqlite/app" + "github.com/canonical/go-dqlite/client" + "github.com/canonical/go-dqlite/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Run a cluster using App and then recover it. +func TestRecovery(t *testing.T) { + dirs := make([]string, 3) + for i := range dirs { + dir, err := ioutil.TempDir("", "dqlite-test-") + require.NoError(t, err) + dirs[i] = dir + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + apps := make([]*app.App, 3) + addrs := []string{"@1", "@2", "@3"} + for i := range apps { + app, err := app.New(dirs[i], app.WithAddress(addrs[i]), app.WithCluster(addrs[:i]), app.WithLogFunc(makeLogFunc(t, i))) + require.NoError(t, err) + err = app.Ready(ctx) + require.NoError(t, err) + apps[i] = app + } + + // run a transaction + db, err := apps[0].Open(ctx, "test.db") + require.NoError(t, err) + _, err = db.ExecContext(ctx, `CREATE TABLE foo (n INTEGER); INSERT INTO foo VALUES (42)`) + require.NoError(t, err) + assert.NoError(t, err) + + // stop the third node so it won't receive data + err = apps[2].Close() + require.NoError(t, err) + + // another transaction + db, err = apps[0].Open(ctx, "test.db") + require.NoError(t, err) + _, err = db.ExecContext(ctx, `INSERT INTO foo VALUES (17)`) + require.NoError(t, err) + err = db.Close() + require.NoError(t, err) + + // close the leader + err = apps[0].Close() + require.NoError(t, err) + + // can't commit a change + shortCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + db, err = apps[1].Open(shortCtx, "test.db") + require.Error(t, err) + + // take down the final node for recovery + err = apps[1].Close() + require.NoError(t, err) + + infos := make([]dqlite.LastEntryInfo, 3) + for i, dir := range dirs { + info, err := dqlite.ReadLastEntryInfo(dir) + require.NoError(t, err) + infos[i] = info + } + require.Equal(t, dqlite.LastEntryInfo{Term: 1, Index: 8}, infos[0]) + require.Equal(t, dqlite.LastEntryInfo{Term: 1, Index: 8}, infos[1]) + require.Equal(t, dqlite.LastEntryInfo{Term: 1, Index: 7}, infos[2]) + require.True(t, infos[2].Before(infos[1])) + + cluster := []client.NodeInfo{ + {Address: "@2", ID: dqlite.GenerateID("@2"), Role: client.Voter}, + {Address: "@3", ID: dqlite.GenerateID("@3"), Role: client.Voter}, + } + kern, err := app.PrepareRecovery(dirs[1], addrs[1], cluster) + require.NoError(t, err) + + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err = enc.Encode(kern) + require.NoError(t, err) + decodedKern := &app.RecoveryKernel{} + dec := gob.NewDecoder(&buf) + err = dec.Decode(decodedKern) + require.NoError(t, err) + require.Equal(t, kern, decodedKern) + + err = kern.Propagate(dirs[2], addrs[2]) + require.NoError(t, err) + + for i := 1; i < 3; i++ { + // No app.WithCluster this time since recovery has written the node store. + app, err := app.New(dirs[i], app.WithAddress(addrs[i]), app.WithLogFunc(makeLogFunc(t, i))) + require.NoError(t, err) + apps[i] = app + // Don't call app.Ready yet, since it's not possible to have a leader after + // only app[1] has been started. + } + for i := 1; i < 3; i++ { + err = apps[i].Ready(ctx) + require.NoError(t, err) + } + + // Only two nodes in the configuration. + cli, err := apps[1].Client(ctx) + require.NoError(t, err) + foundCluster, err := cli.Cluster(ctx) + require.NoError(t, err) + require.ElementsMatch(t, cluster, foundCluster) + + // The data from before is still there. + db, err = apps[1].Open(ctx, "test.db") + require.NoError(t, err) + rows, err := db.QueryContext(ctx, `SELECT n FROM foo`) + require.NoError(t, err) + defer rows.Close() + var values []int + for rows.Next() { + var n int + err = rows.Scan(&n) + require.NoError(t, err) + values = append(values, n) + } + require.NoError(t, rows.Err()) + assert.ElementsMatch(t, []int{42, 17}, values) + + // We can commmit a new transaction. + _, err = db.ExecContext(ctx, `INSERT INTO foo VALUES (99)`) + require.NoError(t, err) + + err = apps[2].Close() + require.NoError(t, err) + err = apps[1].Close() + require.NoError(t, err) + + for i := 1; i < 3; i++ { + info, err := dqlite.ReadLastEntryInfo(dirs[i]) + require.NoError(t, err) + infos[i] = info + } + // Term 2 started with the election of a new leader after recovery. + // Beyond the previous highest index we have three new entries: the + // new configuration forced by recovery, a barrier run by the leader + // to discover the commit index before querying, and the final transaction. + require.Equal(t, infos[1:], []dqlite.LastEntryInfo{ + {Term: 2, Index: 11}, + {Term: 2, Index: 11}, + }) +} + +func makeLogFunc(t *testing.T, index int) logging.Func { + return func(l client.LogLevel, format string, a ...interface{}) { + format = fmt.Sprintf("%s - %d: %s: %s\n", time.Now().Format("15:04:01.000"), index, l.String(), format) + t.Logf(format, a...) + } +} From 706bdc63571a3c0ca8837cfd457de0a84d0865f0 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 12:56:35 -0400 Subject: [PATCH 05/11] Check for bad node IDs, avoid loop trap Signed-off-by: Cole Miller --- app/recovery.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/recovery.go b/app/recovery.go index bc176650..ee6ff9e0 100644 --- a/app/recovery.go +++ b/app/recovery.go @@ -44,10 +44,12 @@ var belongsInKernelPattern = regexp.MustCompile(`\A(metadata[1-2]|[0-9]{16}-[0-9 func PrepareRecovery(dir string, address string, cluster []dqlite.NodeInfo) (*RecoveryKernel, error) { var me *dqlite.NodeInfo - for _, node := range cluster { - if node.Address == address { + for i := range cluster { + node := cluster[i] + if node.ID == 0 { + return nil, fmt.Errorf("node ID may not be zero") + } else if node.Address == address { me = &node - break } } if me == nil { From 606ec0615a3132893f52f114680b791f9c991fc0 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 13:29:42 -0400 Subject: [PATCH 06/11] Module fixes Signed-off-by: Cole Miller --- go.mod | 3 ++- go.sum | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index c089d851..2d72c073 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,8 @@ require ( github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.7.0 golang.org/x/sync v0.8.0 - golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 + golang.org/x/sys v0.26.0 + golang.org/x/term v0.25.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 6a668b3d..25f099db 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,10 @@ golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 h1:kwrAHlwJ0DUBZwQ238v+Uod/3eZ8B2K5rYsUHBQvzmI= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= +golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From cbd0d21849745fcef97cca7e415b00f4ca244b7c Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 13:33:01 -0400 Subject: [PATCH 07/11] Back out sys update again Signed-off-by: Cole Miller --- go.mod | 2 +- go.sum | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 2d72c073..a4449162 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.7.0 golang.org/x/sync v0.8.0 - golang.org/x/sys v0.26.0 + golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 golang.org/x/term v0.25.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 25f099db..37063d43 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,6 @@ golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 h1:kwrAHlwJ0DUBZwQ238v+Uod/3eZ8B2K5rYsUHBQvzmI= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From f1a89a113cc548ae930e058b095a36a6b22fce4f Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 13:45:49 -0400 Subject: [PATCH 08/11] Try to fix it again Signed-off-by: Cole Miller --- go.mod | 2 +- go.sum | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a4449162..c2dbeb5b 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/stretchr/testify v1.7.0 golang.org/x/sync v0.8.0 golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 - golang.org/x/term v0.25.0 // indirect + golang.org/x/term v0.1.0 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 37063d43..4b5aa349 100644 --- a/go.sum +++ b/go.sum @@ -36,10 +36,11 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 h1:kwrAHlwJ0DUBZwQ238v+Uod/3eZ8B2K5rYsUHBQvzmI= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= -golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.1.0 h1:g6Z6vPFA9dYBAF7DWcH6sCcOntplXsDKcliusYijMlw= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 355c5010397b8236852ca45b030db5546a868a09 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 14:01:24 -0400 Subject: [PATCH 09/11] Fix recover cmd Signed-off-by: Cole Miller --- cmd/dqlite-recover/dqlite-recover.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/cmd/dqlite-recover/dqlite-recover.go b/cmd/dqlite-recover/dqlite-recover.go index 336b05cc..69cd6d86 100644 --- a/cmd/dqlite-recover/dqlite-recover.go +++ b/cmd/dqlite-recover/dqlite-recover.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/gob" "flag" "fmt" @@ -10,6 +11,7 @@ import ( "os" "github.com/canonical/go-dqlite/app" + "github.com/canonical/go-dqlite/client" "golang.org/x/term" ) @@ -17,17 +19,32 @@ func main() { dir := flag.String("dir", "", "local data directory") address := flag.String("address", "", "new address of this node") mode := flag.String("mode", "", "'prepare' or 'propagate'") + clusterPath := flag.String("cluster", "", "path to node store describing the new configuration (prepare mode only)") flag.Parse() if *address == "" { - log.Fatal("address is required") + log.Fatal("-address is required") } if *dir == "" { - log.Fatal("dir is required") + log.Fatal("-dir is required") + } + if *mode == "prepare" && *clusterPath == "" { + log.Fatal("-cluster is required for prepare mode") + } + if *mode == "propagate" && *clusterPath != "" { + log.Fatal("-cluster is forbidden for propagate mode") } switch *mode { case "prepare": - kern, err := app.PrepareRecovery(*dir, *address) + store, err := client.DefaultNodeStore(*clusterPath) + if err != nil { + log.Fatal(err) + } + cluster, err := store.Get(context.Background()) + if err != nil { + log.Fatal(err) + } + kern, err := app.PrepareRecovery(*dir, *address, cluster) if err != nil { log.Fatal(err) } From cd790e53e1488c9cf34ee8a7263407922c2f45c8 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 15:53:52 -0400 Subject: [PATCH 10/11] Fix staticcheck Signed-off-by: Cole Miller --- app/recovery_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/recovery_test.go b/app/recovery_test.go index b5acbf51..8f1cb4bf 100644 --- a/app/recovery_test.go +++ b/app/recovery_test.go @@ -64,7 +64,7 @@ func TestRecovery(t *testing.T) { // can't commit a change shortCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - db, err = apps[1].Open(shortCtx, "test.db") + _, err = apps[1].Open(shortCtx, "test.db") require.Error(t, err) // take down the final node for recovery From 97822075e61a2628c6bf96905a0e265bd56e1a98 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 15 Oct 2024 19:11:48 -0400 Subject: [PATCH 11/11] Drive-by shell refactor Signed-off-by: Cole Miller --- internal/shell/shell.go | 52 +++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/internal/shell/shell.go b/internal/shell/shell.go index 0ef58c0f..fb1d7e86 100644 --- a/internal/shell/shell.go +++ b/internal/shell/shell.go @@ -65,28 +65,25 @@ func New(database string, store client.NodeStore, options ...Option) (*Shell, er // Process a single input line. func (s *Shell) Process(ctx context.Context, line string) (string, error) { - switch line { + parts := strings.Split(strings.TrimLeft(line, " "), " ") + cmd := parts[0] + switch cmd { case ".cluster": - return s.processCluster(ctx, line) + return s.processCluster(ctx) case ".leader": - return s.processLeader(ctx, line) + return s.processLeader(ctx) case ".help": return s.processHelp(), nil - } - if strings.HasPrefix(strings.ToLower(strings.TrimLeft(line, " ")), ".remove") { - return s.processRemove(ctx, line) - } - if strings.HasPrefix(strings.ToLower(strings.TrimLeft(line, " ")), ".describe") { - return s.processDescribe(ctx, line) - } - if strings.HasPrefix(strings.ToLower(strings.TrimLeft(line, " ")), ".weight") { - return s.processWeight(ctx, line) - } - if strings.HasPrefix(strings.ToLower(strings.TrimLeft(line, " ")), ".dump") { - return s.processDump(ctx, line) - } - if strings.HasPrefix(strings.ToLower(strings.TrimLeft(line, " ")), ".reconfigure") { - return s.processReconfigure(ctx, line) + case ".remove": + return s.processRemove(ctx, parts) + case ".describe": + return s.processDescribe(ctx, parts) + case ".weight": + return s.processWeight(ctx, parts) + case ".dump": + return s.processDump(ctx, parts) + case ".reconfigure": + return s.processReconfigure(ctx, parts) } return s.processQuery(ctx, line) } @@ -106,7 +103,7 @@ Enter a SQL statement to execute it, or one of the following built-in commands: `[1:] } -func (s *Shell) processCluster(ctx context.Context, line string) (string, error) { +func (s *Shell) processCluster(ctx context.Context) (string, error) { cli, err := client.FindLeader(ctx, s.store, client.WithDialFunc(s.dial)) if err != nil { return "", err @@ -137,7 +134,7 @@ func (s *Shell) processCluster(ctx context.Context, line string) (string, error) return result, nil } -func (s *Shell) processLeader(ctx context.Context, line string) (string, error) { +func (s *Shell) processLeader(ctx context.Context) (string, error) { cli, err := client.FindLeader(ctx, s.store, client.WithDialFunc(s.dial)) if err != nil { return "", err @@ -152,8 +149,7 @@ func (s *Shell) processLeader(ctx context.Context, line string) (string, error) return leader.Address, nil } -func (s *Shell) processRemove(ctx context.Context, line string) (string, error) { - parts := strings.Split(line, " ") +func (s *Shell) processRemove(ctx context.Context, parts []string) (string, error) { if len(parts) != 2 { return "", fmt.Errorf("bad command format, should be: .remove
") } @@ -179,8 +175,7 @@ func (s *Shell) processRemove(ctx context.Context, line string) (string, error) return "", fmt.Errorf("no node has address %q", address) } -func (s *Shell) processDescribe(ctx context.Context, line string) (string, error) { - parts := strings.Split(line, " ") +func (s *Shell) processDescribe(ctx context.Context, parts []string) (string, error) { if len(parts) != 2 { return "", fmt.Errorf("bad command format, should be: .describe
") } @@ -211,8 +206,7 @@ func (s *Shell) processDescribe(ctx context.Context, line string) (string, error return result, nil } -func (s *Shell) processDump(ctx context.Context, line string) (string, error) { - parts := strings.Split(line, " ") +func (s *Shell) processDump(ctx context.Context, parts []string) (string, error) { if len(parts) < 2 || len(parts) > 3 { return "NOK", fmt.Errorf("bad command format, should be: .dump
[]") } @@ -247,8 +241,7 @@ func (s *Shell) processDump(ctx context.Context, line string) (string, error) { return "OK", nil } -func (s *Shell) processReconfigure(ctx context.Context, line string) (string, error) { - parts := strings.Split(line, " ") +func (s *Shell) processReconfigure(ctx context.Context, parts []string) (string, error) { if len(parts) != 3 { //lint:ignore ST1005 intentional long prosy error message return "NOK", fmt.Errorf("bad command format, should be: .reconfigure \n" + @@ -293,8 +286,7 @@ func (s *Shell) processReconfigure(ctx context.Context, line string) (string, er return "OK", nil } -func (s *Shell) processWeight(ctx context.Context, line string) (string, error) { - parts := strings.Split(line, " ") +func (s *Shell) processWeight(ctx context.Context, parts []string) (string, error) { if len(parts) != 3 { return "", fmt.Errorf("bad command format, should be: .weight
") }