Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring #11

Merged
merged 1 commit into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.

GOOS ?= linux
GOARCH ?= amd64
GOOS ?=
GOARCH ?=
GO111MODULE ?= on
CGO_ENABLED ?= 0
CGO_CFLAGS ?=
CGO_LDFLAGS ?=
BUILD_TAGS ?=
DOCKER_REPOSITORY ?= mosuka
VERSION ?=
BIN_EXT ?=

GO := GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=$(CGO_ENABLED) CGO_CFLAGS=$(CGO_CFLAGS) CGO_LDFLAGS=$(CGO_LDFLAGS) GO111MODULE=on go
DOCKER_REPOSITORY ?= mosuka

PACKAGES = $(shell $(GO) list ./... | grep -v '/vendor/')

PROTOBUFS = $(shell find . -name '*.proto' -print0 | xargs -0 -n1 dirname | sort | uniq | grep -v /vendor/)

TARGET_PACKAGES = $(shell find . -name 'main.go' -print0 | xargs -0 -n1 dirname | sort | uniq | grep -v /vendor/)

ifeq ($(GOOS),)
GOOS = $(shell go version | awk -F ' ' '{print $$NF}' | awk -F '/' '{print $$1}')
endif

ifeq ($(GOARCH),)
GOARCH = $(shell go version | awk -F ' ' '{print $$NF}' | awk -F '/' '{print $$2}')
endif

ifeq ($(VERSION),)
VERSION = latest
endif
Expand All @@ -39,6 +46,8 @@ ifeq ($(GOOS),windows)
BIN_EXT = .exe
endif

GO := GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=$(CGO_ENABLED) CGO_CFLAGS=$(CGO_CFLAGS) CGO_LDFLAGS=$(CGO_LDFLAGS) GO111MODULE=$(GO111MODULE) go

.DEFAULT_GOAL := build

.PHONY: protoc
Expand Down
26 changes: 21 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,27 @@ $ make GOOS=darwin dist
Starting cete is easy as follows:

```bash
$ ./bin/cete start --node-id=node1 --data-dir=/tmp/cete/node1 --bind-addr=:7000 --grpc-addr=:9000 --http-addr=:8000
$ ./bin/cete start --id=node1 --bind-addr=:7000 --grpc-addr=:9000 --http-addr=:8000 --data-dir=/tmp/cete/node1
```

You can now set, get and delete data via CLI.
You can get node info as follows:

```bash
$ ./bin/cete node --grpc-addr=:9000
```

The result of the above command is:

```json
{
"node": {
"bind_addr": ":7000",
"grpc_addr": ":9000",
"http_addr": ":8000",
"state": "Leader"
}
}
```

### Setting a value by key via CLI

Expand Down Expand Up @@ -173,14 +189,14 @@ $ curl -X DELETE 'http://127.0.0.1:8000/store/key1'
Cete is easy to bring up the cluster. Cete node is already running, but that is not fault tolerant. If you need to increase the fault tolerance, bring up 2 more data nodes like so:

```bash
$ ./bin/cete start --node-id=node2 --data-dir=/tmp/cete/node2 --bind-addr=:7001 --grpc-addr=:9001 --http-addr=:8001 --join-addr=:9000
$ ./bin/cete start --node-id=node3 --data-dir=/tmp/cete/node3 --bind-addr=:7002 --grpc-addr=:9002 --http-addr=:8002 --join-addr=:9000
$ ./bin/cete start --id=node2 --bind-addr=:7001 --grpc-addr=:9001 --http-addr=:8001 --data-dir=/tmp/cete/node2 --peer-grpc-addr=:9000
$ ./bin/cete start --id=node3 --bind-addr=:7002 --grpc-addr=:9002 --http-addr=:8002 --data-dir=/tmp/cete/node3 --peer-grpc-addr=:9000
```

_Above example shows each Cete node running on the same host, so each node must listen on different ports. This would not be necessary if each node ran on a different host._

This instructs each new node to join an existing node, each node recognizes the joining clusters when started.
So you have a 3-node cluster. That way you can tolerate the failure of 1 node. You can check the peers with the following command:
So you have a 3-node cluster. That way you can tolerate the failure of 1 node. You can check the cluster with the following command:

```bash
$ ./bin/cete cluster --grpc-addr=:9000
Expand Down
2 changes: 1 addition & 1 deletion cmd/cete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func execDelete(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")

key := c.String("key")
key := c.Args().Get(0)
if key == "" {
err := errors.New("key argument must be set")
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/cete/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func execGet(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")

key := c.String("key")
key := c.Args().Get(0)
if key == "" {
err := errors.New("key argument must be set")
return err
Expand Down
19 changes: 3 additions & 16 deletions cmd/cete/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,8 @@ func main() {
Value: ":9000",
Usage: "gRPC address to connect to",
},
cli.StringFlag{
Name: "key, k",
Value: "",
Usage: "key",
},
},
ArgsUsage: "[key]",
Action: execGet,
},
{
Expand All @@ -189,13 +185,8 @@ func main() {
Value: ":9000",
Usage: "gRPC address to connect to",
},
cli.StringFlag{
Name: "key, k",
Value: "",
Usage: "key",
},
},
ArgsUsage: "[value]",
ArgsUsage: "[key] [value]",
Action: execSet,
},
{
Expand All @@ -207,12 +198,8 @@ func main() {
Value: ":9000",
Usage: "gRPC address to connect to",
},
cli.StringFlag{
Name: "key, k",
Value: "",
Usage: "key",
},
},
ArgsUsage: "[key]",
Action: execDelete,
},
{
Expand Down
4 changes: 2 additions & 2 deletions cmd/cete/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (
func execSet(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")

key := c.String("key")
key := c.Args().Get(0)
if key == "" {
err := errors.New("key argument must be set")
return err
}

value := c.Args().Get(0)
value := c.Args().Get(1)
if value == "" {
err := errors.New("value argument must be set")
return err
Expand Down
17 changes: 7 additions & 10 deletions kvs/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (
)

type GRPCServer struct {
grpcAddr string
server *grpc.Server
listener net.Listener

logger *zap.Logger
}

//func NewGRPCServer(grpcAddr string, raftService raftgrpc.RaftServiceServer, kvsService pbkvs.KVSServer, logger *log.Logger) (*GRPCServer, error) {
func NewGRPCServer(grpcAddr string, kvsService pbkvs.KVSServer, logger *zap.Logger) (*GRPCServer, error) {
grpcLogger := logger.Named("grpc")
server := grpc.NewServer(
Expand All @@ -53,7 +53,6 @@ func NewGRPCServer(grpcAddr string, kvsService pbkvs.KVSServer, logger *zap.Logg
),
)

//raftgrpc.RegisterRaftServiceServer(server, raftService)
pbkvs.RegisterKVSServer(server, kvsService)

listener, err := net.Listen("tcp", grpcAddr)
Expand All @@ -63,26 +62,24 @@ func NewGRPCServer(grpcAddr string, kvsService pbkvs.KVSServer, logger *zap.Logg
}

return &GRPCServer{
grpcAddr: grpcAddr,
server: server,
listener: listener,
logger: logger,
}, nil
}

func (s *GRPCServer) Start() error {
err := s.server.Serve(s.listener)
if err != nil {
s.logger.Error("failed to start server", zap.String("addr", s.listener.Addr().String()), zap.Error(err))
return err
}
go s.server.Serve(s.listener)

s.logger.Info("gRPC server started", zap.String("addr", s.grpcAddr))
return nil
}

func (s *GRPCServer) Stop() error {
//s.server.GracefulStop()
s.server.Stop()
s.logger.Info("server stopped")
s.server.GracefulStop()
//s.server.Stop()

s.logger.Info("gRPC server stopped", zap.String("addr", s.grpcAddr))
return nil
}
22 changes: 11 additions & 11 deletions kvs/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *GRPCService) Join(ctx context.Context, req *pbkvs.JoinRequest) (*empty.

err = client.Join(req)
if err != nil {
s.logger.Error("failed to join node to the cluster", zap.Any("req", req), zap.Error(err))
s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err))
return resp, status.Error(codes.Internal, err.Error())
}

Expand All @@ -82,14 +82,14 @@ func (s *GRPCService) Join(ctx context.Context, req *pbkvs.JoinRequest) (*empty.

err := s.raftServer.Join(req)
if err != nil {
s.logger.Error("failed to join node to the cluster", zap.Any("req", req), zap.Error(err))
s.logger.Error("failed to join node to the cluster", zap.String("id", req.Id), zap.Error(err))
return resp, status.Error(codes.Internal, err.Error())
}

// notify
joinReqAny := &any.Any{}
if err := protobuf.UnmarshalAny(req, joinReqAny); err != nil {
s.logger.Error("failed to unmarshal request to the watch data", zap.Any("req", req), zap.String("err", err.Error()))
s.logger.Error("failed to unmarshal request to the watch data", zap.String("id", req.Id), zap.String("err", err.Error()))
} else {
watchResp := &pbkvs.WatchResponse{
Event: pbkvs.WatchResponse_JOIN,
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *GRPCService) Leave(ctx context.Context, req *pbkvs.LeaveRequest) (*empt

err = client.Leave(req)
if err != nil {
s.logger.Error("failed to leave node from the cluster", zap.Any("req", req), zap.Error(err))
s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err))
return resp, status.Error(codes.Internal, err.Error())
}

Expand Down Expand Up @@ -208,10 +208,10 @@ func (s *GRPCService) Get(ctx context.Context, req *pbkvs.GetRequest) (*pbkvs.Ge
if err != nil {
switch err {
case errors.ErrNotFound:
s.logger.Debug("key not found", zap.Any("req", req), zap.String("err", err.Error()))
s.logger.Debug("key not found", zap.Binary("key", req.Key), zap.String("err", err.Error()))
return resp, status.Error(codes.NotFound, err.Error())
default:
s.logger.Debug("failed to get data", zap.Any("req", req), zap.String("err", err.Error()))
s.logger.Debug("failed to get data", zap.Binary("key", req.Key), zap.String("err", err.Error()))
return resp, status.Error(codes.Internal, err.Error())
}
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *GRPCService) Put(ctx context.Context, req *pbkvs.PutRequest) (*empty.Em

err = client.Put(req)
if err != nil {
s.logger.Error("failed to put data", zap.Any("req", req), zap.Error(err))
s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err))
return resp, status.Error(codes.Internal, err.Error())
}

Expand Down Expand Up @@ -302,7 +302,7 @@ func (s *GRPCService) Delete(ctx context.Context, req *pbkvs.DeleteRequest) (*em

err = client.Delete(req)
if err != nil {
s.logger.Error("failed to delete data", zap.Any("req", req), zap.Error(err))
s.logger.Error("failed to forward request", zap.String("leaderAddr", string(leaderAddr)), zap.Error(err))
return resp, status.Error(codes.Internal, err.Error())
}

Expand All @@ -312,14 +312,14 @@ func (s *GRPCService) Delete(ctx context.Context, req *pbkvs.DeleteRequest) (*em
// delete value by key
err := s.raftServer.Delete(req)
if err != nil {
s.logger.Error("failed to delete data", zap.Any("req", req), zap.Error(err))
s.logger.Error("failed to delete data", zap.Binary("key", req.Key), zap.Error(err))
return resp, status.Error(codes.Internal, err.Error())
}

// notify
deleteReqAny := &any.Any{}
if err := protobuf.UnmarshalAny(req, deleteReqAny); err != nil {
s.logger.Error("failed to unmarshal request to the watch data", zap.Any("req", req), zap.String("err", err.Error()))
s.logger.Error("failed to unmarshal request to the watch data", zap.Binary("key", req.Key), zap.Error(err))
} else {
watchResp := &pbkvs.WatchResponse{
Event: pbkvs.WatchResponse_DELETE,
Expand Down Expand Up @@ -349,7 +349,7 @@ func (s *GRPCService) Watch(req *empty.Empty, server pbkvs.KVS_WatchServer) erro

for resp := range chans {
if err := server.Send(&resp); err != nil {
s.logger.Error("failed to send watch data", zap.Any("resp", resp), zap.String("err", err.Error()))
s.logger.Error("failed to send watch data", zap.String("event", resp.Event.String()), zap.Error(err))
return status.Error(codes.Internal, err.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion kvs/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (h *PutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
},
)
httpStatus = http.StatusInternalServerError
h.logger.Error("failed to put data", zap.Error(err))
h.logger.Error("failed to set data", zap.Error(err))
} else {
httpStatus = http.StatusOK
}
Expand Down
22 changes: 10 additions & 12 deletions kvs/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

type HTTPServer struct {
httpAddr string
listener net.Listener
router *mux.Router

Expand Down Expand Up @@ -57,17 +58,20 @@ func NewHTTPServer(httpAddr string, grpcAddr string, logger *zap.Logger) (*HTTPS
router.Handle("/store/{path:.*}", NewDeleteHandler(grpcClient, logger)).Methods("DELETE")
router.Handle("/metrics", promhttp.Handler()).Methods("GET")

httpLogger := logger.Named("http")

return &HTTPServer{
httpAddr: httpAddr,
listener: listener,
router: router,
grpcClient: grpcClient,
logger: logger,
httpLogger: logger.Named("http"),
httpLogger: httpLogger,
}, nil
}

func (s *HTTPServer) Start() error {
err := http.Serve(
go http.Serve(
s.listener,
accesslog.NewLoggingHandler(
s.router,
Expand All @@ -76,26 +80,20 @@ func (s *HTTPServer) Start() error {
},
),
)
if err != nil {
s.logger.Error("failed to start listener", zap.String("addr", s.listener.Addr().String()), zap.Error(err))
return err
}

s.logger.Info("HTTP server started", zap.String("addr", s.httpAddr))
return nil
}

func (s *HTTPServer) Stop() error {
err := s.listener.Close()
if err != nil {
if err := s.listener.Close(); err != nil {
s.logger.Error("failed to close listener", zap.String("addr", s.listener.Addr().String()), zap.Error(err))
return err
}

err = s.grpcClient.Close()
if err != nil {
if err := s.grpcClient.Close(); err != nil {
s.logger.Error("failed to close gRPC client", zap.String("addr", s.grpcClient.conn.Target()), zap.Error(err))
return err
}

s.logger.Info("HTTP server stopped", zap.String("addr", s.httpAddr))
return nil
}
Loading