diff --git a/cluster/cluster.go b/cluster/cluster.go index 627d3268..51364707 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -52,6 +52,7 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* } resp := &clusterpb.RegisterResponse{} + c.mu.Lock() for k, m := range c.members { if m.memberInfo.ServiceAddr == req.MemberInfo.ServiceAddr { // 节点异常崩溃,不会执行unregister,此时再次启动该节点,由于已存在注册信息,将再也无法成功注册,这里做个修改,先移除后重新注册 @@ -64,6 +65,7 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* //return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr) } } + c.mu.Unlock() // Notify registered node to update remote services newMember := &clusterpb.NewMemberRequest{MemberInfo: req.MemberInfo} diff --git a/examples/cluster/main.go b/examples/cluster/main.go index a1cc9c99..c2e31c0c 100644 --- a/examples/cluster/main.go +++ b/examples/cluster/main.go @@ -146,6 +146,7 @@ func runGate(args *cli.Context) error { nano.WithWSPath("/nano"), nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }), nano.WithDebugMode(), + nano.WithNodeId(2), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid() ) return nil } diff --git a/go.mod b/go.mod index 311cf128..c240e5ef 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/lonng/nano go 1.12 require ( + github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/google/uuid v1.2.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index 074497b1..9cbc15db 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= +github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= diff --git a/options.go b/options.go index 5ae6f615..8f656213 100644 --- a/options.go +++ b/options.go @@ -11,6 +11,7 @@ import ( "github.com/lonng/nano/internal/message" "github.com/lonng/nano/pipeline" "github.com/lonng/nano/serialize" + "github.com/lonng/nano/service" "google.golang.org/grpc" ) @@ -152,3 +153,10 @@ func WithHandshakeValidator(fn func([]byte) error) Option { env.HandshakeValidator = fn } } + +// WithNodeId set nodeId use snowflake nodeId generate sessionId, default: pid +func WithNodeId(nodeId uint64) Option { + return func(opt *cluster.Options) { + service.ResetNodeId(nodeId) + } +} diff --git a/service/connection.go b/service/connection.go index 38eb4f0a..7c08c05e 100644 --- a/service/connection.go +++ b/service/connection.go @@ -21,11 +21,25 @@ package service import ( + "os" "sync/atomic" ) +type Connection interface { + Increment() + Decrement() + Count() int64 + Reset() + SessionID() int64 +} + +func ResetNodeId(nodeId uint64) { + Connections = newDefaultConnectionServer(nodeId) +} + // Connections is a global variable which is used by session. -var Connections = newConnectionService() +//var Connections = newConnectionService() +var Connections Connection = newDefaultConnectionServer(uint64(os.Getpid())) type connectionService struct { count int64 diff --git a/service/default.go b/service/default.go new file mode 100644 index 00000000..4f79709e --- /dev/null +++ b/service/default.go @@ -0,0 +1,48 @@ +package service + +import ( + "sync/atomic" + + "github.com/bwmarrin/snowflake" +) + +// implement Connection +type defaultConnectionServer struct { + count int64 + node *snowflake.Node +} + +func newDefaultConnectionServer(node uint64) *defaultConnectionServer { + dcs := &defaultConnectionServer{ + count: 0, + node: nil, + } + n := int64(node % 1000) // safety node value + dcs.node, _ = snowflake.NewNode(n) + return dcs +} + +// Increment increment the connection count +func (dcs *defaultConnectionServer) Increment() { + atomic.AddInt64(&dcs.count, 1) +} + +// Decrement decrement the connection count +func (dcs *defaultConnectionServer) Decrement() { + atomic.AddInt64(&dcs.count, -1) +} + +// Count returns the connection numbers in current +func (dcs *defaultConnectionServer) Count() int64 { + return atomic.LoadInt64(&dcs.count) +} + +// Reset reset the connection service status +func (dcs *defaultConnectionServer) Reset() { + atomic.StoreInt64(&dcs.count, 0) +} + +// SessionID returns the session id, (snowflake impl) +func (dcs *defaultConnectionServer) SessionID() int64 { + return dcs.node.Generate().Int64() +} diff --git a/service/default_test.go b/service/default_test.go new file mode 100644 index 00000000..6e711967 --- /dev/null +++ b/service/default_test.go @@ -0,0 +1,31 @@ +package service + +import ( + "testing" +) + +func TestNewDefaultConnectionServer(t *testing.T) { + service := newDefaultConnectionServer(101024) + w := make(chan bool, paraCount) + sidChan := make(chan int64, paraCount) + for i := 0; i < paraCount; i++ { + go func() { + service.Increment() + w <- true + sidChan <- service.SessionID() + }() + } + smap := make(map[int64]struct{}, paraCount) + for i := 0; i < paraCount; i++ { + <-w + sid := <-sidChan + if _, ok := smap[sid]; ok { + t.Error("wrong session id repeat") + } else { + smap[sid] = struct{}{} + } + } + if service.Count() != paraCount { + t.Error("wrong connection count") + } +}