From 9e4f62ed5e63b29b02897b898e9b20e0cf104526 Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Tue, 17 Mar 2020 14:05:21 +0900 Subject: [PATCH] Refactoring --- Makefile | 19 +++++--- README.md | 26 ++++++++--- cmd/cete/delete.go | 2 +- cmd/cete/get.go | 2 +- cmd/cete/main.go | 19 ++------ cmd/cete/set.go | 4 +- kvs/grpc_server.go | 17 +++---- kvs/grpc_service.go | 22 ++++----- kvs/http_handler.go | 2 +- kvs/http_server.go | 22 +++++---- kvs/kvs.go | 65 ++++++++++++--------------- kvs/raft_fsm.go | 50 ++++++++++----------- kvs/raft_server.go | 107 +++++++++++++++++++++----------------------- kvs/server.go | 31 +++++-------- log/logger.go | 2 +- 15 files changed, 189 insertions(+), 201 deletions(-) diff --git a/Makefile b/Makefile index c2f03af..77970ba 100644 --- a/Makefile +++ b/Makefile @@ -12,17 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -GOOS ?= linux -GOARCH ?= amd64 +GOOS ?= +GOARCH ?= +GO111MODULE ?= on CGO_ENABLED ?= 0 CGO_CFLAGS ?= CGO_LDFLAGS ?= BUILD_TAGS ?= -DOCKER_REPOSITORY ?= mosuka VERSION ?= BIN_EXT ?= - -GO := GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=$(CGO_ENABLED) CGO_CFLAGS=$(CGO_CFLAGS) CGO_LDFLAGS=$(CGO_LDFLAGS) GO111MODULE=on go +DOCKER_REPOSITORY ?= mosuka PACKAGES = $(shell $(GO) list ./... | grep -v '/vendor/') @@ -30,6 +29,14 @@ PROTOBUFS = $(shell find . -name '*.proto' -print0 | xargs -0 -n1 dirname | sort TARGET_PACKAGES = $(shell find . -name 'main.go' -print0 | xargs -0 -n1 dirname | sort | uniq | grep -v /vendor/) +ifeq ($(GOOS),) + GOOS = $(shell go version | awk -F ' ' '{print $$NF}' | awk -F '/' '{print $$1}') +endif + +ifeq ($(GOARCH),) + GOARCH = $(shell go version | awk -F ' ' '{print $$NF}' | awk -F '/' '{print $$2}') +endif + ifeq ($(VERSION),) VERSION = latest endif @@ -39,6 +46,8 @@ ifeq ($(GOOS),windows) BIN_EXT = .exe endif +GO := GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=$(CGO_ENABLED) CGO_CFLAGS=$(CGO_CFLAGS) CGO_LDFLAGS=$(CGO_LDFLAGS) GO111MODULE=$(GO111MODULE) go + .DEFAULT_GOAL := build .PHONY: protoc diff --git a/README.md b/README.md index 7792327..921ae16 100644 --- a/README.md +++ b/README.md @@ -97,11 +97,27 @@ $ make GOOS=darwin dist Starting cete is easy as follows: ```bash -$ ./bin/cete start --node-id=node1 --data-dir=/tmp/cete/node1 --bind-addr=:7000 --grpc-addr=:9000 --http-addr=:8000 +$ ./bin/cete start --id=node1 --bind-addr=:7000 --grpc-addr=:9000 --http-addr=:8000 --data-dir=/tmp/cete/node1 ``` -You can now set, get and delete data via CLI. +You can get node info as follows: +```bash +$ ./bin/cete node --grpc-addr=:9000 +``` + +The result of the above command is: + +```json +{ + "node": { + "bind_addr": ":7000", + "grpc_addr": ":9000", + "http_addr": ":8000", + "state": "Leader" + } +} +``` ### Setting a value by key via CLI @@ -173,14 +189,14 @@ $ curl -X DELETE 'http://127.0.0.1:8000/store/key1' Cete is easy to bring up the cluster. Cete node is already running, but that is not fault tolerant. If you need to increase the fault tolerance, bring up 2 more data nodes like so: ```bash -$ ./bin/cete start --node-id=node2 --data-dir=/tmp/cete/node2 --bind-addr=:7001 --grpc-addr=:9001 --http-addr=:8001 --join-addr=:9000 -$ ./bin/cete start --node-id=node3 --data-dir=/tmp/cete/node3 --bind-addr=:7002 --grpc-addr=:9002 --http-addr=:8002 --join-addr=:9000 +$ ./bin/cete start --id=node2 --bind-addr=:7001 --grpc-addr=:9001 --http-addr=:8001 --data-dir=/tmp/cete/node2 --peer-grpc-addr=:9000 +$ ./bin/cete start --id=node3 --bind-addr=:7002 --grpc-addr=:9002 --http-addr=:8002 --data-dir=/tmp/cete/node3 --peer-grpc-addr=:9000 ``` _Above example shows each Cete node running on the same host, so each node must listen on different ports. This would not be necessary if each node ran on a different host._ This instructs each new node to join an existing node, each node recognizes the joining clusters when started. -So you have a 3-node cluster. That way you can tolerate the failure of 1 node. You can check the peers with the following command: +So you have a 3-node cluster. That way you can tolerate the failure of 1 node. You can check the cluster with the following command: ```bash $ ./bin/cete cluster --grpc-addr=:9000 diff --git a/cmd/cete/delete.go b/cmd/cete/delete.go index ce7bd08..5dd804e 100644 --- a/cmd/cete/delete.go +++ b/cmd/cete/delete.go @@ -27,7 +27,7 @@ import ( func execDelete(c *cli.Context) error { grpcAddr := c.String("grpc-addr") - key := c.String("key") + key := c.Args().Get(0) if key == "" { err := errors.New("key argument must be set") return err diff --git a/cmd/cete/get.go b/cmd/cete/get.go index 3726037..9aefc65 100644 --- a/cmd/cete/get.go +++ b/cmd/cete/get.go @@ -27,7 +27,7 @@ import ( func execGet(c *cli.Context) error { grpcAddr := c.String("grpc-addr") - key := c.String("key") + key := c.Args().Get(0) if key == "" { err := errors.New("key argument must be set") return err diff --git a/cmd/cete/main.go b/cmd/cete/main.go index af8c0b7..2f1df8d 100644 --- a/cmd/cete/main.go +++ b/cmd/cete/main.go @@ -172,12 +172,8 @@ func main() { Value: ":9000", Usage: "gRPC address to connect to", }, - cli.StringFlag{ - Name: "key, k", - Value: "", - Usage: "key", - }, }, + ArgsUsage: "[key]", Action: execGet, }, { @@ -189,13 +185,8 @@ func main() { Value: ":9000", Usage: "gRPC address to connect to", }, - cli.StringFlag{ - Name: "key, k", - Value: "", - Usage: "key", - }, }, - ArgsUsage: "[value]", + ArgsUsage: "[key] [value]", Action: execSet, }, { @@ -207,12 +198,8 @@ func main() { Value: ":9000", Usage: "gRPC address to connect to", }, - cli.StringFlag{ - Name: "key, k", - Value: "", - Usage: "key", - }, }, + ArgsUsage: "[key]", Action: execDelete, }, { diff --git a/cmd/cete/set.go b/cmd/cete/set.go index 73ca6b4..2e3c945 100644 --- a/cmd/cete/set.go +++ b/cmd/cete/set.go @@ -27,13 +27,13 @@ import ( func execSet(c *cli.Context) error { grpcAddr := c.String("grpc-addr") - key := c.String("key") + key := c.Args().Get(0) if key == "" { err := errors.New("key argument must be set") return err } - value := c.Args().Get(0) + value := c.Args().Get(1) if value == "" { err := errors.New("value argument must be set") return err diff --git a/kvs/grpc_server.go b/kvs/grpc_server.go index ba3ace1..1ef0d07 100644 --- a/kvs/grpc_server.go +++ b/kvs/grpc_server.go @@ -27,13 +27,13 @@ import ( ) type GRPCServer struct { + grpcAddr string server *grpc.Server listener net.Listener logger *zap.Logger } -//func NewGRPCServer(grpcAddr string, raftService raftgrpc.RaftServiceServer, kvsService pbkvs.KVSServer, logger *log.Logger) (*GRPCServer, error) { func NewGRPCServer(grpcAddr string, kvsService pbkvs.KVSServer, logger *zap.Logger) (*GRPCServer, error) { grpcLogger := logger.Named("grpc") server := grpc.NewServer( @@ -53,7 +53,6 @@ func NewGRPCServer(grpcAddr string, kvsService pbkvs.KVSServer, logger *zap.Logg ), ) - //raftgrpc.RegisterRaftServiceServer(server, raftService) pbkvs.RegisterKVSServer(server, kvsService) listener, err := net.Listen("tcp", grpcAddr) @@ -63,6 +62,7 @@ func NewGRPCServer(grpcAddr string, kvsService pbkvs.KVSServer, logger *zap.Logg } return &GRPCServer{ + grpcAddr: grpcAddr, server: server, listener: listener, logger: logger, @@ -70,19 +70,16 @@ func NewGRPCServer(grpcAddr string, kvsService pbkvs.KVSServer, logger *zap.Logg } func (s *GRPCServer) Start() error { - err := s.server.Serve(s.listener) - if err != nil { - s.logger.Error("failed to start server", zap.String("addr", s.listener.Addr().String()), zap.Error(err)) - return err - } + go s.server.Serve(s.listener) + s.logger.Info("gRPC server started", zap.String("addr", s.grpcAddr)) return nil } func (s *GRPCServer) Stop() error { - //s.server.GracefulStop() - s.server.Stop() - s.logger.Info("server stopped") + s.server.GracefulStop() + //s.server.Stop() + s.logger.Info("gRPC server stopped", zap.String("addr", s.grpcAddr)) return nil } diff --git a/kvs/grpc_service.go b/kvs/grpc_service.go index 14b3e7a..87e3a6b 100644 --- a/kvs/grpc_service.go +++ b/kvs/grpc_service.go @@ -73,7 +73,7 @@ func (s *GRPCService) Join(ctx context.Context, req *pbkvs.JoinRequest) (*empty. err = client.Join(req) if err != nil { - s.logger.Error("failed to join node to the cluster", zap.Any("req", req), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } @@ -82,14 +82,14 @@ func (s *GRPCService) Join(ctx context.Context, req *pbkvs.JoinRequest) (*empty. err := s.raftServer.Join(req) if err != nil { - s.logger.Error("failed to join node to the cluster", zap.Any("req", req), zap.Error(err)) + s.logger.Error("failed to join node to the cluster", zap.String("id", req.Id), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } // notify joinReqAny := &any.Any{} if err := protobuf.UnmarshalAny(req, joinReqAny); err != nil { - s.logger.Error("failed to unmarshal request to the watch data", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Error("failed to unmarshal request to the watch data", zap.String("id", req.Id), zap.String("err", err.Error())) } else { watchResp := &pbkvs.WatchResponse{ Event: pbkvs.WatchResponse_JOIN, @@ -129,7 +129,7 @@ func (s *GRPCService) Leave(ctx context.Context, req *pbkvs.LeaveRequest) (*empt err = client.Leave(req) if err != nil { - s.logger.Error("failed to leave node from the cluster", zap.Any("req", req), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } @@ -208,10 +208,10 @@ func (s *GRPCService) Get(ctx context.Context, req *pbkvs.GetRequest) (*pbkvs.Ge if err != nil { switch err { case errors.ErrNotFound: - s.logger.Debug("key not found", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Debug("key not found", zap.Binary("key", req.Key), zap.String("err", err.Error())) return resp, status.Error(codes.NotFound, err.Error()) default: - s.logger.Debug("failed to get data", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Debug("failed to get data", zap.Binary("key", req.Key), zap.String("err", err.Error())) return resp, status.Error(codes.Internal, err.Error()) } } @@ -245,7 +245,7 @@ func (s *GRPCService) Put(ctx context.Context, req *pbkvs.PutRequest) (*empty.Em err = client.Put(req) if err != nil { - s.logger.Error("failed to put data", zap.Any("req", req), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } @@ -302,7 +302,7 @@ func (s *GRPCService) Delete(ctx context.Context, req *pbkvs.DeleteRequest) (*em err = client.Delete(req) if err != nil { - s.logger.Error("failed to delete data", zap.Any("req", req), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } @@ -312,14 +312,14 @@ func (s *GRPCService) Delete(ctx context.Context, req *pbkvs.DeleteRequest) (*em // delete value by key err := s.raftServer.Delete(req) if err != nil { - s.logger.Error("failed to delete data", zap.Any("req", req), zap.Error(err)) + s.logger.Error("failed to delete data", zap.Binary("key", req.Key), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } // notify deleteReqAny := &any.Any{} if err := protobuf.UnmarshalAny(req, deleteReqAny); err != nil { - s.logger.Error("failed to unmarshal request to the watch data", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Error("failed to unmarshal request to the watch data", zap.Binary("key", req.Key), zap.Error(err)) } else { watchResp := &pbkvs.WatchResponse{ Event: pbkvs.WatchResponse_DELETE, @@ -349,7 +349,7 @@ func (s *GRPCService) Watch(req *empty.Empty, server pbkvs.KVS_WatchServer) erro for resp := range chans { if err := server.Send(&resp); err != nil { - s.logger.Error("failed to send watch data", zap.Any("resp", resp), zap.String("err", err.Error())) + s.logger.Error("failed to send watch data", zap.String("event", resp.Event.String()), zap.Error(err)) return status.Error(codes.Internal, err.Error()) } } diff --git a/kvs/http_handler.go b/kvs/http_handler.go index 0e396ad..ed3e682 100644 --- a/kvs/http_handler.go +++ b/kvs/http_handler.go @@ -137,7 +137,7 @@ func (h *PutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { }, ) httpStatus = http.StatusInternalServerError - h.logger.Error("failed to put data", zap.Error(err)) + h.logger.Error("failed to set data", zap.Error(err)) } else { httpStatus = http.StatusOK } diff --git a/kvs/http_server.go b/kvs/http_server.go index 3c50fe3..b7aba1d 100644 --- a/kvs/http_server.go +++ b/kvs/http_server.go @@ -26,6 +26,7 @@ import ( ) type HTTPServer struct { + httpAddr string listener net.Listener router *mux.Router @@ -57,17 +58,20 @@ func NewHTTPServer(httpAddr string, grpcAddr string, logger *zap.Logger) (*HTTPS router.Handle("/store/{path:.*}", NewDeleteHandler(grpcClient, logger)).Methods("DELETE") router.Handle("/metrics", promhttp.Handler()).Methods("GET") + httpLogger := logger.Named("http") + return &HTTPServer{ + httpAddr: httpAddr, listener: listener, router: router, grpcClient: grpcClient, logger: logger, - httpLogger: logger.Named("http"), + httpLogger: httpLogger, }, nil } func (s *HTTPServer) Start() error { - err := http.Serve( + go http.Serve( s.listener, accesslog.NewLoggingHandler( s.router, @@ -76,26 +80,20 @@ func (s *HTTPServer) Start() error { }, ), ) - if err != nil { - s.logger.Error("failed to start listener", zap.String("addr", s.listener.Addr().String()), zap.Error(err)) - return err - } + s.logger.Info("HTTP server started", zap.String("addr", s.httpAddr)) return nil } func (s *HTTPServer) Stop() error { - err := s.listener.Close() - if err != nil { + if err := s.listener.Close(); err != nil { s.logger.Error("failed to close listener", zap.String("addr", s.listener.Addr().String()), zap.Error(err)) - return err } - err = s.grpcClient.Close() - if err != nil { + if err := s.grpcClient.Close(); err != nil { s.logger.Error("failed to close gRPC client", zap.String("addr", s.grpcClient.conn.Target()), zap.Error(err)) - return err } + s.logger.Info("HTTP server stopped", zap.String("addr", s.httpAddr)) return nil } diff --git a/kvs/kvs.go b/kvs/kvs.go index 60732c6..90435b9 100644 --- a/kvs/kvs.go +++ b/kvs/kvs.go @@ -15,12 +15,12 @@ package kvs import ( - "go.uber.org/zap" "time" "github.com/dgraph-io/badger" ceteerrors "github.com/mosuka/cete/errors" pbkvs "github.com/mosuka/cete/protobuf/kvs" + "go.uber.org/zap" ) type KVS struct { @@ -38,7 +38,7 @@ func NewKVS(dir string, valueDir string, logger *zap.Logger) (*KVS, error) { db, err := badger.Open(opts) if err != nil { - logger.Error("failed to open database", zap.Any("opts", opts), zap.String("err", err.Error())) + logger.Error("failed to open database", zap.Any("opts", opts), zap.Error(err)) return nil, err } @@ -51,9 +51,8 @@ func NewKVS(dir string, valueDir string, logger *zap.Logger) (*KVS, error) { } func (b *KVS) Close() error { - err := b.db.Close() - if err != nil { - b.logger.Error("failed to close database", zap.String("err", err.Error())) + if err := b.db.Close(); err != nil { + b.logger.Error("failed to close database", zap.Error(err)) return err } @@ -64,10 +63,10 @@ func (b *KVS) Get(key []byte) ([]byte, error) { start := time.Now() var value []byte - err := b.db.View(func(txn *badger.Txn) error { + if err := b.db.View(func(txn *badger.Txn) error { item, err := txn.Get(key) if err != nil { - b.logger.Error("failed to get item", zap.Binary("key", key), zap.String("err", err.Error())) + b.logger.Error("failed to get item", zap.Binary("key", key), zap.Error(err)) return err } @@ -76,58 +75,54 @@ func (b *KVS) Get(key []byte) ([]byte, error) { return nil }) if err != nil { - b.logger.Error("failed to get item value", zap.Binary("key", key), zap.String("err", err.Error())) + b.logger.Error("failed to get item value", zap.Binary("key", key), zap.Error(err)) return err } return nil - }) - if err == badger.ErrKeyNotFound { - b.logger.Debug("not found", zap.Binary("key", key), zap.String("err", err.Error())) + }); err == badger.ErrKeyNotFound { + b.logger.Debug("not found", zap.Binary("key", key), zap.Error(err)) return nil, ceteerrors.ErrNotFound - } - if err != nil { - b.logger.Error("failed to get value", zap.Binary("key", key), zap.String("err", err.Error())) + } else if err != nil { + b.logger.Error("failed to get value", zap.Binary("key", key), zap.Error(err)) return nil, err } - b.logger.Debug("get", zap.Binary("key", key), zap.Binary("value", value), zap.Float64("time", float64(time.Since(start))/float64(time.Second))) + b.logger.Debug("get", zap.Binary("key", key), zap.Float64("time", float64(time.Since(start))/float64(time.Second))) return value, nil } func (b *KVS) Set(key []byte, value []byte) error { start := time.Now() - err := b.db.Update(func(txn *badger.Txn) error { + if err := b.db.Update(func(txn *badger.Txn) error { err := txn.Set(key, value) if err != nil { - b.logger.Error("failed to set item", zap.Binary("key", key), zap.Binary("value", value), zap.String("err", err.Error())) + b.logger.Error("failed to set item", zap.Binary("key", key), zap.Error(err)) return err } return nil - }) - if err != nil { - b.logger.Error("failed to set value", zap.Binary("key", key), zap.Binary("value", value), zap.String("err", err.Error())) + }); err != nil { + b.logger.Error("failed to set value", zap.Binary("key", key), zap.Error(err)) return err } - b.logger.Debug("set", zap.Binary("key", key), zap.Binary("value", value), zap.Float64("time", float64(time.Since(start))/float64(time.Second))) + b.logger.Debug("set", zap.Binary("key", key), zap.Float64("time", float64(time.Since(start))/float64(time.Second))) return nil } func (b *KVS) Delete(key []byte) error { start := time.Now() - err := b.db.Update(func(txn *badger.Txn) error { + if err := b.db.Update(func(txn *badger.Txn) error { err := txn.Delete(key) if err != nil { - b.logger.Error("failed to delete item", zap.Binary("key", key), zap.String("err", err.Error())) + b.logger.Error("failed to delete item", zap.Binary("key", key), zap.Error(err)) return err } return nil - }) - if err != nil { - b.logger.Error("failed to delete value", zap.Binary("key", key), zap.String("err", err.Error())) + }); err != nil { + b.logger.Error("failed to delete value", zap.Binary("key", key), zap.Error(err)) return err } @@ -145,7 +140,7 @@ func (b *KVS) SnapshotItems() <-chan *pbkvs.KeyValuePair { keyCount := uint64(0) - err := b.db.View(func(txn *badger.Txn) error { + if err := b.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = 10 it := txn.NewIterator(opts) @@ -156,29 +151,25 @@ func (b *KVS) SnapshotItems() <-chan *pbkvs.KeyValuePair { key := item.Key() var value []byte - err := item.Value(func(val []byte) error { + if err := item.Value(func(val []byte) error { value = append([]byte{}, val...) return nil - }) - if err != nil { - b.logger.Error("failed to get item value", zap.Binary("key", key), zap.String("err", err.Error())) + }); err != nil { + b.logger.Error("failed to get item value", zap.Binary("key", key), zap.Error(err)) return err } - kvp := &pbkvs.KeyValuePair{ + ch <- &pbkvs.KeyValuePair{ Key: append([]byte{}, key...), Value: append([]byte{}, value...), } - ch <- kvp keyCount = keyCount + 1 } ch <- nil return nil - }) - - if err != nil { - b.logger.Error("failed to snapshot items", zap.String("err", err.Error())) + }); err != nil { + b.logger.Error("failed to snapshot items", zap.Error(err)) return } diff --git a/kvs/raft_fsm.go b/kvs/raft_fsm.go index 4020c6f..4c242a5 100644 --- a/kvs/raft_fsm.go +++ b/kvs/raft_fsm.go @@ -40,13 +40,13 @@ type RaftFSM struct { func NewRaftFSM(path string, logger *zap.Logger) (*RaftFSM, error) { err := os.MkdirAll(path, 0755) if err != nil && !os.IsExist(err) { - logger.Error("failed to make directories", zap.String("path", path), zap.String("err", err.Error())) + logger.Error("failed to make directories", zap.String("path", path), zap.Error(err)) return nil, err } kvs, err := NewKVS(path, path, logger) if err != nil { - logger.Error("failed to create key value store", zap.String("path", path), zap.String("err", err.Error())) + logger.Error("failed to create key value store", zap.String("path", path), zap.Error(err)) return nil, err } @@ -60,7 +60,7 @@ func NewRaftFSM(path string, logger *zap.Logger) (*RaftFSM, error) { func (f *RaftFSM) Close() error { err := f.kvs.Close() if err != nil { - f.logger.Error("failed to close key value store", zap.String("err", err.Error())) + f.logger.Error("failed to close key value store", zap.Error(err)) return err } @@ -70,7 +70,7 @@ func (f *RaftFSM) Close() error { func (f *RaftFSM) Get(key []byte) ([]byte, error) { value, err := f.kvs.Get(key) if err != nil { - f.logger.Error("failed to get value", zap.Binary("key", key), zap.String("err", err.Error())) + f.logger.Error("failed to get value", zap.Binary("key", key), zap.Error(err)) return nil, err } @@ -80,7 +80,7 @@ func (f *RaftFSM) Get(key []byte) ([]byte, error) { func (f *RaftFSM) applySet(key []byte, value []byte) interface{} { err := f.kvs.Set(key, value) if err != nil { - f.logger.Error("failed to set value", zap.Binary("key", key), zap.Binary("value", value), zap.String("err", err.Error())) + f.logger.Error("failed to set value", zap.Binary("key", key), zap.Error(err)) return err } @@ -90,7 +90,7 @@ func (f *RaftFSM) applySet(key []byte, value []byte) interface{} { func (f *RaftFSM) applyDelete(key []byte) interface{} { err := f.kvs.Delete(key) if err != nil { - f.logger.Error("failed to delete value", zap.Binary("key", key), zap.String("err", err.Error())) + f.logger.Error("failed to delete value", zap.Binary("key", key), zap.Error(err)) return err } @@ -138,7 +138,7 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { var c pbkvs.KVSCommand err := proto.Unmarshal(l.Data, &c) if err != nil { - f.logger.Error("failed to unmarshal key value store command", zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("failed to unmarshal message bytes to KVS command", zap.Error(err)) return err } @@ -146,12 +146,12 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { case pbkvs.KVSCommand_JOIN: joinRequestInstance, err := protobuf.MarshalAny(c.Data) if err != nil { - f.logger.Error("failed to marshal to request from any", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("failed to marshal to request from KVS command request", zap.String("type", c.Type.String()), zap.Error(err)) return err } if joinRequestInstance == nil { err = errors.New("nil") - f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Error(err)) return err } joinRequest := joinRequestInstance.(*pbkvs.JoinRequest) @@ -160,12 +160,12 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { case pbkvs.KVSCommand_LEAVE: leaveRequestInstance, err := protobuf.MarshalAny(c.Data) if err != nil { - f.logger.Error("failed to marshal to request from any", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("failed to marshal to request from KVS command request", zap.String("type", c.Type.String()), zap.Error(err)) return err } if leaveRequestInstance == nil { err = errors.New("nil") - f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Error(err)) return err } leaveRequest := *leaveRequestInstance.(*pbkvs.LeaveRequest) @@ -174,12 +174,12 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { case pbkvs.KVSCommand_PUT: putRequestInstance, err := protobuf.MarshalAny(c.Data) if err != nil { - f.logger.Error("failed to marshal to request from any", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("failed to marshal to request from KVS command request", zap.String("type", c.Type.String()), zap.Error(err)) return err } if putRequestInstance == nil { err = errors.New("nil") - f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Error(err)) return err } putRequest := *putRequestInstance.(*pbkvs.PutRequest) @@ -188,12 +188,12 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { case pbkvs.KVSCommand_DELETE: deleteRequestInstance, err := protobuf.MarshalAny(c.Data) if err != nil { - f.logger.Error("failed to marshal to request from any", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("failed to marshal to request from KVS command request", zap.String("type", c.Type.String()), zap.Error(err)) return err } if deleteRequestInstance == nil { err = errors.New("nil") - f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Error(err)) return err } deleteRequest := *deleteRequestInstance.(*pbkvs.DeleteRequest) @@ -201,7 +201,7 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { return f.applyDelete(deleteRequest.Key) default: err = errors.New("command type not support") - f.logger.Error("request is nil", zap.String("type", c.Type.String()), zap.Binary("data", l.Data), zap.String("err", err.Error())) + f.logger.Error("unsupported command", zap.String("type", c.Type.String()), zap.Error(err)) return err } } @@ -221,13 +221,13 @@ func (f *RaftFSM) Restore(rc io.ReadCloser) error { defer func() { err := rc.Close() if err != nil { - f.logger.Error("failed to close reader", zap.String("err", err.Error())) + f.logger.Error("failed to close reader", zap.Error(err)) } }() data, err := ioutil.ReadAll(rc) if err != nil { - f.logger.Error("failed to open reader", zap.String("err", err.Error())) + f.logger.Error("failed to open reader", zap.Error(err)) return err } @@ -238,22 +238,22 @@ func (f *RaftFSM) Restore(rc io.ReadCloser) error { kvp := &pbkvs.KeyValuePair{} err = buff.DecodeMessage(kvp) if err == io.ErrUnexpectedEOF { - f.logger.Debug("reached the EOF", zap.String("err", err.Error())) + f.logger.Debug("reached the EOF", zap.Error(err)) break } if err != nil { - f.logger.Error("failed to read key value pair", zap.String("err", err.Error())) + f.logger.Error("failed to read key value pair", zap.Error(err)) return err } // apply item to store err = f.kvs.Set(kvp.Key, kvp.Value) if err != nil { - f.logger.Error("failed to set key value pair to key value store", zap.String("err", err.Error())) + f.logger.Error("failed to set key value pair to key value store", zap.Error(err)) return err } - f.logger.Debug("restore", zap.Binary("key", kvp.Key), zap.Binary("value", kvp.Value)) + f.logger.Debug("restore", zap.Binary("key", kvp.Key)) keyCount = keyCount + 1 } @@ -277,7 +277,7 @@ func (f *KVSFSMSnapshot) Persist(sink raft.SnapshotSink) error { defer func() { err := sink.Close() if err != nil { - f.logger.Error("failed to close sink", zap.String("err", err.Error())) + f.logger.Error("failed to close sink", zap.Error(err)) } }() @@ -297,13 +297,13 @@ func (f *KVSFSMSnapshot) Persist(sink raft.SnapshotSink) error { buff := proto.NewBuffer([]byte{}) err := buff.EncodeMessage(kvp) if err != nil { - f.logger.Error("failed to encode key value pair", zap.String("err", err.Error())) + f.logger.Error("failed to encode key value pair", zap.Error(err)) return err } _, err = sink.Write(buff.Bytes()) if err != nil { - f.logger.Error("failed to write key value pair", zap.String("err", err.Error())) + f.logger.Error("failed to write key value pair", zap.Error(err)) return err } } diff --git a/kvs/raft_server.go b/kvs/raft_server.go index 1bdca6d..a522505 100644 --- a/kvs/raft_server.go +++ b/kvs/raft_server.go @@ -42,7 +42,6 @@ type RaftServer struct { fsm *RaftFSM - //transport *raftgrpc.RaftGRPCTransport transport *raft.NetworkTransport raft *raft.Raft @@ -61,7 +60,7 @@ func NewRaftServer(nodeId string, bindAddr string, grpcAddr string, httpAddr str fsmPath := filepath.Join(dataDir, "kvs") fsm, err := NewRaftFSM(fsmPath, logger) if err != nil { - logger.Error("failed to create FSM", zap.String("path", fsmPath), zap.String("err", err.Error())) + logger.Error("failed to create FSM", zap.String("path", fsmPath), zap.Error(err)) return nil, err } @@ -87,22 +86,20 @@ func (s *RaftServer) Start() error { addr, err := net.ResolveTCPAddr("tcp", s.bindAddr) if err != nil { - s.logger.Error("failed to resolve TCP address", zap.String("tcp", s.bindAddr), zap.String("err", err.Error())) + s.logger.Error("failed to resolve TCP address", zap.String("tcp", s.bindAddr), zap.Error(err)) return err } - //s.transport = raftgrpc.NewTransport(context.Background(), string(config.LocalID)) - //s.transport = raftgrpc.NewTransport(context.TODO(), s.nodeId) s.transport, err = raft.NewTCPTransport(s.bindAddr, addr, 3, 10*time.Second, ioutil.Discard) if err != nil { - s.logger.Error("failed to create TCP transport", zap.String("tcp", s.bindAddr), zap.String("err", err.Error())) + s.logger.Error("failed to create TCP transport", zap.String("tcp", s.bindAddr), zap.Error(err)) return err } // create snapshot store snapshotStore, err := raft.NewFileSnapshotStore(s.dataDir, 2, ioutil.Discard) if err != nil { - s.logger.Error("failed to create file snapshot store", zap.String("path", s.dataDir), zap.String("err", err.Error())) + s.logger.Error("failed to create file snapshot store", zap.String("path", s.dataDir), zap.Error(err)) return err } @@ -110,14 +107,14 @@ func (s *RaftServer) Start() error { raftLogStorePath := filepath.Join(s.dataDir, "raft.db") raftLogStore, err := raftboltdb.NewBoltStore(raftLogStorePath) if err != nil { - s.logger.Error("failed to create raft log store", zap.String("path", raftLogStorePath), zap.String("err", err.Error())) + s.logger.Error("failed to create raft log store", zap.String("path", raftLogStorePath), zap.Error(err)) return err } // create raft s.raft, err = raft.NewRaft(config, s.fsm, raftLogStore, raftLogStore, snapshotStore, s.transport) if err != nil { - s.logger.Error("failed to create raft", zap.Any("config", config), zap.String("err", err.Error())) + s.logger.Error("failed to create raft", zap.Any("config", config), zap.Error(err)) return err } @@ -137,9 +134,9 @@ func (s *RaftServer) Start() error { err = s.WaitForDetectLeader(timeout) if err != nil { if err == ceteerrors.ErrTimeout { - s.logger.Error("leader detection timed out", zap.Duration("timeout", timeout), zap.String("err", err.Error())) + s.logger.Error("leader detection timed out", zap.Duration("timeout", timeout), zap.Error(err)) } else { - s.logger.Error("failed to detect leader", zap.String("err", err.Error())) + s.logger.Error("failed to detect leader", zap.Error(err)) } return err } @@ -152,7 +149,7 @@ func (s *RaftServer) Start() error { } err = s.join(req) if err != nil { - s.logger.Error("failed to join node to the cluster", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Error("failed to join node to the cluster", zap.Any("req", req), zap.Error(err)) return err } } @@ -165,7 +162,7 @@ func (s *RaftServer) Start() error { // s.startUpdateCluster(500 * time.Millisecond) //}() - s.logger.Info("Raft server started") + s.logger.Info("Raft server started", zap.String("addr", s.bindAddr)) return nil } @@ -174,12 +171,11 @@ func (s *RaftServer) Stop() error { //s.stopUpdateCluster() - err := s.fsm.Close() - if err != nil { - s.logger.Error("failed to close FSM", zap.String("err", err.Error())) - return err + if err := s.fsm.Close(); err != nil { + s.logger.Error("failed to close FSM", zap.Error(err)) } + s.logger.Info("Raft server stopped", zap.String("addr", s.bindAddr)) return nil } @@ -197,12 +193,11 @@ func (s *RaftServer) startUpdateNode(checkInterval time.Duration) { defer ticker.Stop() timeout := 60 * time.Second - err := s.WaitForDetectLeader(timeout) - if err != nil { + if err := s.WaitForDetectLeader(timeout); err != nil { if err == ceteerrors.ErrTimeout { - s.logger.Error("leader detection timed out", zap.Duration("timeout", timeout), zap.String("err", err.Error())) + s.logger.Error("leader detection timed out", zap.Duration("timeout", timeout), zap.Error(err)) } else { - s.logger.Error("failed to detect leader", zap.String("err", err.Error())) + s.logger.Error("failed to detect leader", zap.Error(err)) } } @@ -254,7 +249,14 @@ func (s *RaftServer) startUpdateCluster(checkInterval time.Duration) { ticker := time.NewTicker(checkInterval) defer ticker.Stop() - s.WaitForDetectLeader(60 * time.Second) + timeout := 60 * time.Second + if err := s.WaitForDetectLeader(timeout); err != nil { + if err == ceteerrors.ErrTimeout { + s.logger.Error("leader detection timed out", zap.Duration("timeout", timeout), zap.Error(err)) + } else { + s.logger.Error("failed to detect leader", zap.Error(err)) + } + } for { select { @@ -361,7 +363,7 @@ func (s *RaftServer) stopUpdateCluster() { s.logger.Info("close peer client", zap.String("id", id), zap.String("addr", client.conn.Target())) err := client.Close() if err != nil { - s.logger.Info("failed to close peer client", zap.String("id", id), zap.String("addr", client.conn.Target()), zap.String("err", err.Error())) + s.logger.Info("failed to close peer client", zap.String("id", id), zap.String("addr", client.conn.Target()), zap.Error(err)) } } s.updateClusterMutex.Unlock() @@ -392,7 +394,7 @@ func (s *RaftServer) LeaderAddress(timeout time.Duration) (raft.ServerAddress, e } case <-timer.C: err := ceteerrors.ErrTimeout - s.logger.Error("failed to detect leader address", zap.String("err", err.Error())) + s.logger.Error("failed to detect leader address", zap.Error(err)) return "", err } } @@ -402,13 +404,13 @@ func (s *RaftServer) LeaderID(timeout time.Duration) (raft.ServerID, error) { cf := s.raft.GetConfiguration() err := cf.Error() if err != nil { - s.logger.Error("failed to get Raft configuration", zap.String("err", err.Error())) + s.logger.Error("failed to get Raft configuration", zap.Error(err)) return "", err } leaderAddr, err := s.LeaderAddress(timeout) if err != nil { - s.logger.Error("failed to get leader address", zap.String("err", err.Error())) + s.logger.Error("failed to get leader address", zap.Error(err)) return "", err } @@ -420,13 +422,13 @@ func (s *RaftServer) LeaderID(timeout time.Duration) (raft.ServerID, error) { } err = ceteerrors.ErrNotFoundLeader - s.logger.Error("failed to detect leader ID", zap.String("err", err.Error())) + s.logger.Error("failed to detect leader ID", zap.Error(err)) return "", err } func (s *RaftServer) WaitForDetectLeader(timeout time.Duration) error { if _, err := s.LeaderAddress(timeout); err != nil { - s.logger.Error("failed to wait for detect leader", zap.String("err", err.Error())) + s.logger.Error("failed to wait for detect leader", zap.Error(err)) return err } @@ -441,7 +443,7 @@ func (s *RaftServer) join(req *pbkvs.JoinRequest) error { nodeAny := &any.Any{} err := protobuf.UnmarshalAny(req, nodeAny) if err != nil { - s.logger.Error("failed to unmarshal request to the command data", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Error("failed to unmarshal request to the command data", zap.Any("req", req), zap.Error(err)) return err } @@ -452,13 +454,13 @@ func (s *RaftServer) join(req *pbkvs.JoinRequest) error { msg, err := proto.Marshal(c) if err != nil { - s.logger.Error("failed to marshal the command into the bytes as message", zap.String("err", err.Error())) + s.logger.Error("failed to marshal the command into the bytes as message", zap.Error(err)) return err } f := s.raft.Apply(msg, 10*time.Second) if err = f.Error(); err != nil { - s.logger.Error("failed to apply message", zap.String("err", err.Error())) + s.logger.Error("failed to apply message", zap.Error(err)) return err } @@ -469,7 +471,7 @@ func (s *RaftServer) Join(req *pbkvs.JoinRequest) error { cf := s.raft.GetConfiguration() err := cf.Error() if err != nil { - s.logger.Error("failed to get Raft configuration", zap.String("err", err.Error())) + s.logger.Error("failed to get Raft configuration", zap.Error(err)) return err } @@ -501,7 +503,7 @@ func (s *RaftServer) leave(req *pbkvs.LeaveRequest) error { nodeAny := &any.Any{} err := protobuf.UnmarshalAny(req, nodeAny) if err != nil { - s.logger.Error("failed to unmarshal request to the command data", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Error("failed to unmarshal request to the command data", zap.Any("req", req), zap.Error(err)) return err } @@ -512,13 +514,13 @@ func (s *RaftServer) leave(req *pbkvs.LeaveRequest) error { msg, err := proto.Marshal(c) if err != nil { - s.logger.Error("failed to marshal the command into the bytes as the message", zap.String("err", err.Error())) + s.logger.Error("failed to marshal the command into the bytes as the message", zap.Error(err)) return err } f := s.raft.Apply(msg, 10*time.Second) if err = f.Error(); err != nil { - s.logger.Error("failed to apply the message", zap.String("err", err.Error())) + s.logger.Error("failed to apply the message", zap.Error(err)) return err } @@ -631,10 +633,9 @@ func (s *RaftServer) Cluster() (*pbkvs.ClusterResponse, error) { } func (s *RaftServer) Snapshot() error { - f := s.raft.Snapshot() - if err := f.Error(); err != nil { - s.logger.Error("failed to snapshot", zap.Error(err)) - return err + if future := s.raft.Snapshot(); future.Error() != nil { + s.logger.Error("failed to snapshot", zap.Error(future.Error())) + return future.Error() } return nil @@ -643,7 +644,7 @@ func (s *RaftServer) Snapshot() error { func (s *RaftServer) Get(req *pbkvs.GetRequest) (*pbkvs.GetResponse, error) { value, err := s.fsm.Get(req.Key) if err != nil { - s.logger.Error("failed to get", zap.Any("req", req), zap.Error(err)) + s.logger.Error("failed to get", zap.Any("key", req.Key), zap.Error(err)) return nil, err } @@ -657,7 +658,7 @@ func (s *RaftServer) Get(req *pbkvs.GetRequest) (*pbkvs.GetResponse, error) { func (s *RaftServer) Set(req *pbkvs.PutRequest) error { kvpAny := &any.Any{} if err := protobuf.UnmarshalAny(req, kvpAny); err != nil { - s.logger.Error("failed to unmarshal request to the command data", zap.Any("req", req), zap.String("err", err.Error())) + s.logger.Error("failed to unmarshal request to the command data", zap.Binary("key", req.Key), zap.Error(err)) return err } @@ -668,14 +669,13 @@ func (s *RaftServer) Set(req *pbkvs.PutRequest) error { msg, err := proto.Marshal(c) if err != nil { - s.logger.Error("failed to marshal the command into the bytes as the message", zap.String("err", err.Error())) + s.logger.Error("failed to marshal the command into the bytes as the message", zap.Binary("key", req.Key), zap.Error(err)) return err } - f := s.raft.Apply(msg, 10*time.Second) - if err = f.Error(); err != nil { - s.logger.Error("failed to apply the message", zap.String("err", err.Error())) - return err + if future := s.raft.Apply(msg, 10*time.Second); future.Error() != nil { + s.logger.Error("failed to apply the message", zap.Error(future.Error())) + return future.Error() } return nil @@ -683,9 +683,8 @@ func (s *RaftServer) Set(req *pbkvs.PutRequest) error { func (s *RaftServer) Delete(req *pbkvs.DeleteRequest) error { kvpAny := &any.Any{} - err := protobuf.UnmarshalAny(req, kvpAny) - if err != nil { - s.logger.Error("failed to unmarshal request to the command data", zap.Any("req", req), zap.String("err", err.Error())) + if err := protobuf.UnmarshalAny(req, kvpAny); err != nil { + s.logger.Error("failed to unmarshal request to the command data", zap.Binary("key", req.Key), zap.Error(err)) return err } @@ -696,15 +695,13 @@ func (s *RaftServer) Delete(req *pbkvs.DeleteRequest) error { msg, err := proto.Marshal(c) if err != nil { - s.logger.Error("failed to marshal the command into the bytes as the message", zap.String("err", err.Error())) + s.logger.Error("failed to marshal the command into the bytes as the message", zap.Binary("key", req.Key), zap.Error(err)) return err } - f := s.raft.Apply(msg, 10*time.Second) - err = f.Error() - if err != nil { - s.logger.Error("failed to unmarshal request to the command data", zap.Any("req", req), zap.String("err", err.Error())) - return err + if future := s.raft.Apply(msg, 10*time.Second); future.Error() != nil { + s.logger.Error("failed to unmarshal request to the command data", zap.Binary("key", req.Key), zap.Error(future.Error())) + return future.Error() } return nil diff --git a/kvs/server.go b/kvs/server.go index f1727e2..17514fb 100644 --- a/kvs/server.go +++ b/kvs/server.go @@ -92,27 +92,20 @@ func NewServer(nodeId string, bindAddr string, grpcAddr string, httpAddr string, } func (s *Server) Start() { - go func() { - if err := s.raftServer.Start(); err != nil { - s.logger.Error("failed to start Raft server", zap.Error(err)) - return - } - }() + if err := s.raftServer.Start(); err != nil { + s.logger.Error("failed to start Raft server", zap.Error(err)) + return + } - go func() { - if err := s.grpcServer.Start(); err != nil { - s.logger.Error("failed to start gRPC server", zap.Error(err)) - return - } - }() + if err := s.grpcServer.Start(); err != nil { + s.logger.Error("failed to start gRPC server", zap.Error(err)) + return + } - go func() { - err := s.httpServer.Start() - if err != nil { - s.logger.Error("failed to start HTTP server", zap.Error(err)) - return - } - }() + if err := s.httpServer.Start(); err != nil { + s.logger.Error("failed to start HTTP server", zap.Error(err)) + return + } if !s.bootstrap { // create gRPC client diff --git a/log/logger.go b/log/logger.go index 5502c59..d810423 100644 --- a/log/logger.go +++ b/log/logger.go @@ -76,7 +76,7 @@ func NewLogger(logLevel string, logFilename string, logMaxSize int, logMaxBackup ), zap.AddCaller(), //zap.AddStacktrace(ll), - ) + ).Named("cete") return logger }