Skip to content

Commit

Permalink
Refactoring (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
mosuka authored Mar 22, 2020
1 parent 7c41598 commit 0d1f8e0
Show file tree
Hide file tree
Showing 27 changed files with 602 additions and 685 deletions.
158 changes: 158 additions & 0 deletions client/grpc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright (c) 2020 Minoru Osuka
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"
"log"
"math"

"github.com/golang/protobuf/ptypes/empty"
"github.com/mosuka/cete/errors"
"github.com/mosuka/cete/protobuf"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type GRPCClient struct {
ctx context.Context
cancel context.CancelFunc
conn *grpc.ClientConn
client protobuf.KVSClient

logger *log.Logger
}

func NewGRPCClient(address string) (*GRPCClient, error) {
baseCtx := context.TODO()
ctx, cancel := context.WithCancel(baseCtx)

dialOpts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(math.MaxInt64),
grpc.MaxCallRecvMsgSize(math.MaxInt64),
),
}

conn, err := grpc.DialContext(ctx, address, dialOpts...)
if err != nil {
cancel()
return nil, err
}

return &GRPCClient{
ctx: ctx,
cancel: cancel,
conn: conn,
client: protobuf.NewKVSClient(conn),
}, nil
}

func (c *GRPCClient) Close() error {
c.cancel()
if c.conn != nil {
return c.conn.Close()
}

return c.ctx.Err()
}

func (c *GRPCClient) Target() string {
return c.conn.Target()
}

func (c *GRPCClient) Join(req *protobuf.JoinRequest, opts ...grpc.CallOption) error {
if _, err := c.client.Join(c.ctx, req, opts...); err != nil {
return err
}

return nil
}

func (c *GRPCClient) Leave(req *protobuf.LeaveRequest, opts ...grpc.CallOption) error {
if _, err := c.client.Leave(c.ctx, req, opts...); err != nil {
return err
}

return nil
}

func (c *GRPCClient) Node(opts ...grpc.CallOption) (*protobuf.NodeResponse, error) {
if resp, err := c.client.Node(c.ctx, &empty.Empty{}, opts...); err != nil {
return nil, err
} else {
return resp, nil
}
}

func (c *GRPCClient) Cluster(opts ...grpc.CallOption) (*protobuf.ClusterResponse, error) {
if resp, err := c.client.Cluster(c.ctx, &empty.Empty{}, opts...); err != nil {
return nil, err
} else {
return resp, nil
}
}

func (c *GRPCClient) Snapshot(opts ...grpc.CallOption) error {
if _, err := c.client.Snapshot(c.ctx, &empty.Empty{}); err != nil {
return err
}

return nil
}

func (c *GRPCClient) Get(req *protobuf.GetRequest, opts ...grpc.CallOption) (*protobuf.GetResponse, error) {
if resp, err := c.client.Get(c.ctx, req, opts...); err != nil {
st, _ := status.FromError(err)
switch st.Code() {
case codes.NotFound:
return nil, errors.ErrNotFound
default:
return nil, err
}
} else {
return resp, nil
}
}

func (c *GRPCClient) Put(req *protobuf.PutRequest, opts ...grpc.CallOption) error {
if _, err := c.client.Put(c.ctx, req, opts...); err != nil {
return err
}

return nil
}

func (c *GRPCClient) Delete(req *protobuf.DeleteRequest, opts ...grpc.CallOption) error {
if _, err := c.client.Delete(c.ctx, req, opts...); err != nil {
return err
}

return nil
}

func (c *GRPCClient) Watch(req *empty.Empty, opts ...grpc.CallOption) (protobuf.KVS_WatchClient, error) {
return c.client.Watch(c.ctx, req, opts...)
}

func (c *GRPCClient) Metrics(opts ...grpc.CallOption) (*protobuf.MetricsResponse, error) {
if resp, err := c.client.Metrics(c.ctx, &empty.Empty{}, opts...); err != nil {
return nil, err
} else {
return resp, nil
}
}
12 changes: 6 additions & 6 deletions cmd/cete/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ import (
"fmt"
"os"

"github.com/mosuka/cete/kvs"
"github.com/mosuka/cete/client"
"github.com/urfave/cli"
)

func execCluster(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")
func execCluster(ctx *cli.Context) error {
grpcAddr := ctx.String("grpc-addr")

client, err := kvs.NewGRPCClient(grpcAddr)
c, err := client.NewGRPCClient(grpcAddr)
if err != nil {
return err
}
defer func() {
_ = client.Close()
_ = c.Close()
}()

resp, err := client.Cluster()
resp, err := c.Cluster()
if err != nil {
return err
}
Expand Down
19 changes: 9 additions & 10 deletions cmd/cete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,33 @@ package main
import (
"errors"

"github.com/mosuka/cete/kvs"
pbkvs "github.com/mosuka/cete/protobuf/kvs"
"github.com/mosuka/cete/client"
"github.com/mosuka/cete/protobuf"
"github.com/urfave/cli"
)

func execDelete(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")
func execDelete(ctx *cli.Context) error {
grpcAddr := ctx.String("grpc-addr")

key := c.Args().Get(0)
key := ctx.Args().Get(0)
if key == "" {
err := errors.New("key argument must be set")
return err
}

req := &pbkvs.DeleteRequest{
req := &protobuf.DeleteRequest{
Key: key,
}

client, err := kvs.NewGRPCClient(grpcAddr)
c, err := client.NewGRPCClient(grpcAddr)
if err != nil {
return err
}
defer func() {
_ = client.Close()
_ = c.Close()
}()

err = client.Delete(req)
if err != nil {
if err = c.Delete(req); err != nil {
return err
}

Expand Down
24 changes: 12 additions & 12 deletions cmd/cete/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,41 @@ import (
"fmt"
"os"

"github.com/mosuka/cete/kvs"
pbkvs "github.com/mosuka/cete/protobuf/kvs"
"github.com/mosuka/cete/client"
"github.com/mosuka/cete/protobuf"
"github.com/urfave/cli"
)

func execGet(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")
func execGet(ctx *cli.Context) error {
grpcAddr := ctx.String("grpc-addr")

key := c.Args().Get(0)
key := ctx.Args().Get(0)
if key == "" {
err := errors.New("key argument must be set")
return err
}

req := &pbkvs.GetRequest{
req := &protobuf.GetRequest{
Key: key,
}

client, err := kvs.NewGRPCClient(grpcAddr)
c, err := client.NewGRPCClient(grpcAddr)
if err != nil {
return err
}
defer func() {
_ = client.Close()
_ = c.Close()
}()

resp, err := client.Get(req)
resp, err := c.Get(req)
if err != nil {
return err
}

// key does not exist
if resp.Value == nil {
return nil
}
//if resp.Value == nil {
// return nil
//}

_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%s", string(resp.Value)))

Expand Down
26 changes: 13 additions & 13 deletions cmd/cete/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,55 @@ package main
import (
"errors"

"github.com/mosuka/cete/kvs"
pbkvs "github.com/mosuka/cete/protobuf/kvs"
"github.com/mosuka/cete/client"
"github.com/mosuka/cete/protobuf"
"github.com/urfave/cli"
)

func execJoin(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")
func execJoin(ctx *cli.Context) error {
grpcAddr := ctx.String("grpc-addr")

id := c.Args().Get(0)
id := ctx.Args().Get(0)
if id == "" {
err := errors.New("id argument must be set")
return err
}

targetGrpcAddr := c.Args().Get(1)
targetGrpcAddr := ctx.Args().Get(1)
if targetGrpcAddr == "" {
err := errors.New("address argument must be set")
return err
}

targetClient, err := kvs.NewGRPCClient(targetGrpcAddr)
t, err := client.NewGRPCClient(targetGrpcAddr)
if err != nil {
return err
}
defer func() {
_ = targetClient.Close()
_ = t.Close()
}()

nodeResp, err := targetClient.Node()
nodeResp, err := t.Node()
if err != nil {
return err
}

req := &pbkvs.JoinRequest{
req := &protobuf.JoinRequest{
Id: id,
BindAddr: nodeResp.Node.BindAddr,
GrpcAddr: nodeResp.Node.GrpcAddr,
HttpAddr: nodeResp.Node.HttpAddr,
}

client, err := kvs.NewGRPCClient(grpcAddr)
c, err := client.NewGRPCClient(grpcAddr)
if err != nil {
return err
}
defer func() {
_ = client.Close()
_ = c.Close()
}()

err = client.Join(req)
err = c.Join(req)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions cmd/cete/leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,33 @@ package main
import (
"errors"

"github.com/mosuka/cete/kvs"
pbkvs "github.com/mosuka/cete/protobuf/kvs"
"github.com/mosuka/cete/client"
"github.com/mosuka/cete/protobuf"
"github.com/urfave/cli"
)

func execLeave(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")
func execLeave(ctx *cli.Context) error {
grpcAddr := ctx.String("grpc-addr")

id := c.Args().Get(0)
id := ctx.Args().Get(0)
if id == "" {
err := errors.New("id argument must be set")
return err
}

req := &pbkvs.LeaveRequest{
req := &protobuf.LeaveRequest{
Id: id,
}

client, err := kvs.NewGRPCClient(grpcAddr)
c, err := client.NewGRPCClient(grpcAddr)
if err != nil {
return err
}
defer func() {
_ = client.Close()
_ = c.Close()
}()

err = client.Leave(req)
err = c.Leave(req)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 0d1f8e0

Please sign in to comment.