From 7d7a58e54cde67e83170f8b22cb1185ef2dc5ec3 Mon Sep 17 00:00:00 2001 From: bbdshow Date: Thu, 10 Nov 2022 00:44:31 +0800 Subject: [PATCH 1/3] Fix:for deploy multi Gate,use snowflake generate sessionId bugs: Register member slice maybe panic --- cluster/cluster.go | 2 ++ examples/cluster/main.go | 1 + go.mod | 1 + go.sum | 2 ++ internal/env/env.go | 5 +++++ options.go | 7 ++++++ service/connection.go | 12 +++++++++- service/default.go | 47 ++++++++++++++++++++++++++++++++++++++++ service/default_test.go | 31 ++++++++++++++++++++++++++ 9 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 service/default.go create mode 100644 service/default_test.go diff --git a/cluster/cluster.go b/cluster/cluster.go index 627d3268..51364707 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -52,6 +52,7 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* } resp := &clusterpb.RegisterResponse{} + c.mu.Lock() for k, m := range c.members { if m.memberInfo.ServiceAddr == req.MemberInfo.ServiceAddr { // 节点异常崩溃,不会执行unregister,此时再次启动该节点,由于已存在注册信息,将再也无法成功注册,这里做个修改,先移除后重新注册 @@ -64,6 +65,7 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* //return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr) } } + c.mu.Unlock() // Notify registered node to update remote services newMember := &clusterpb.NewMemberRequest{MemberInfo: req.MemberInfo} diff --git a/examples/cluster/main.go b/examples/cluster/main.go index a1cc9c99..7b326a3a 100644 --- a/examples/cluster/main.go +++ b/examples/cluster/main.go @@ -146,6 +146,7 @@ func runGate(args *cli.Context) error { nano.WithWSPath("/nano"), nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }), nano.WithDebugMode(), + nano.WithNodeId(1), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid() ) return nil } diff --git a/go.mod b/go.mod index 311cf128..c240e5ef 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/lonng/nano go 1.12 require ( + github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/google/uuid v1.2.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index 074497b1..9cbc15db 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= +github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= diff --git a/internal/env/env.go b/internal/env/env.go index 971ebdaa..369fe936 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -24,6 +24,7 @@ package env import ( "net/http" + "os" "time" "github.com/lonng/nano/serialize" @@ -50,6 +51,8 @@ var ( Serializer serialize.Serializer GrpcOptions = []grpc.DialOption{grpc.WithInsecure()} + + NodeId uint64 // when cluster mode, as sessionId snowflake nodeId ) func init() { @@ -59,4 +62,6 @@ func init() { CheckOrigin = func(_ *http.Request) bool { return true } HandshakeValidator = func(_ []byte) error { return nil } Serializer = protobuf.NewSerializer() + // default use pid + NodeId = uint64(os.Getpid()) } diff --git a/options.go b/options.go index 5ae6f615..ed483d6b 100644 --- a/options.go +++ b/options.go @@ -152,3 +152,10 @@ func WithHandshakeValidator(fn func([]byte) error) Option { env.HandshakeValidator = fn } } + +// WithNodeId set nodeId use snowflake nodeId generate sessionId, default: pid +func WithNodeId(nodeId uint64) Option { + return func(opt *cluster.Options) { + env.NodeId = nodeId + } +} diff --git a/service/connection.go b/service/connection.go index 38eb4f0a..8f9ef4ac 100644 --- a/service/connection.go +++ b/service/connection.go @@ -21,11 +21,21 @@ package service import ( + "github.com/lonng/nano/internal/env" "sync/atomic" ) +type Connection interface { + Increment() + Decrement() + Count() int64 + Reset() + SessionID() int64 +} + // Connections is a global variable which is used by session. -var Connections = newConnectionService() +//var Connections = newConnectionService() +var Connections Connection = newDefaultConnectionServer(env.NodeId) type connectionService struct { count int64 diff --git a/service/default.go b/service/default.go new file mode 100644 index 00000000..35e97d15 --- /dev/null +++ b/service/default.go @@ -0,0 +1,47 @@ +package service + +import ( + "github.com/bwmarrin/snowflake" + "sync/atomic" +) + +// implement Connection +type defaultConnectionServer struct { + count int64 + node *snowflake.Node +} + +func newDefaultConnectionServer(node uint64) *defaultConnectionServer { + dcs := &defaultConnectionServer{ + count: 0, + node: nil, + } + n := int64(node % 1000) // safety node value + dcs.node, _ = snowflake.NewNode(n) + return dcs +} + +// Increment increment the connection count +func (dcs *defaultConnectionServer) Increment() { + atomic.AddInt64(&dcs.count, 1) +} + +// Decrement decrement the connection count +func (dcs *defaultConnectionServer) Decrement() { + atomic.AddInt64(&dcs.count, -1) +} + +// Count returns the connection numbers in current +func (dcs *defaultConnectionServer) Count() int64 { + return atomic.LoadInt64(&dcs.count) +} + +// Reset reset the connection service status +func (dcs *defaultConnectionServer) Reset() { + atomic.StoreInt64(&dcs.count, 0) +} + +// SessionID returns the session id, (snowflake impl) +func (dcs *defaultConnectionServer) SessionID() int64 { + return dcs.node.Generate().Int64() +} diff --git a/service/default_test.go b/service/default_test.go new file mode 100644 index 00000000..6e711967 --- /dev/null +++ b/service/default_test.go @@ -0,0 +1,31 @@ +package service + +import ( + "testing" +) + +func TestNewDefaultConnectionServer(t *testing.T) { + service := newDefaultConnectionServer(101024) + w := make(chan bool, paraCount) + sidChan := make(chan int64, paraCount) + for i := 0; i < paraCount; i++ { + go func() { + service.Increment() + w <- true + sidChan <- service.SessionID() + }() + } + smap := make(map[int64]struct{}, paraCount) + for i := 0; i < paraCount; i++ { + <-w + sid := <-sidChan + if _, ok := smap[sid]; ok { + t.Error("wrong session id repeat") + } else { + smap[sid] = struct{}{} + } + } + if service.Count() != paraCount { + t.Error("wrong connection count") + } +} From 8d1599c71c64c91efb076e5d5497360ba8134ba6 Mon Sep 17 00:00:00 2001 From: bbdshow Date: Thu, 10 Nov 2022 02:00:30 +0800 Subject: [PATCH 2/3] Fix options.WithNodeId --- options.go | 2 ++ service/connection.go | 7 ++++++- service/default.go | 3 ++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/options.go b/options.go index ed483d6b..3b008292 100644 --- a/options.go +++ b/options.go @@ -11,6 +11,7 @@ import ( "github.com/lonng/nano/internal/message" "github.com/lonng/nano/pipeline" "github.com/lonng/nano/serialize" + "github.com/lonng/nano/service" "google.golang.org/grpc" ) @@ -157,5 +158,6 @@ func WithHandshakeValidator(fn func([]byte) error) Option { func WithNodeId(nodeId uint64) Option { return func(opt *cluster.Options) { env.NodeId = nodeId + service.ResetNodeId() } } diff --git a/service/connection.go b/service/connection.go index 8f9ef4ac..934dc730 100644 --- a/service/connection.go +++ b/service/connection.go @@ -21,8 +21,9 @@ package service import ( - "github.com/lonng/nano/internal/env" "sync/atomic" + + "github.com/lonng/nano/internal/env" ) type Connection interface { @@ -33,6 +34,10 @@ type Connection interface { SessionID() int64 } +func ResetNodeId() { + Connections = newDefaultConnectionServer(env.NodeId) +} + // Connections is a global variable which is used by session. //var Connections = newConnectionService() var Connections Connection = newDefaultConnectionServer(env.NodeId) diff --git a/service/default.go b/service/default.go index 35e97d15..4f79709e 100644 --- a/service/default.go +++ b/service/default.go @@ -1,8 +1,9 @@ package service import ( - "github.com/bwmarrin/snowflake" "sync/atomic" + + "github.com/bwmarrin/snowflake" ) // implement Connection From c0a64120251a72dc5cf5ab61e331f6a6f7bc1590 Mon Sep 17 00:00:00 2001 From: bbdshow Date: Thu, 10 Nov 2022 09:02:13 +0800 Subject: [PATCH 3/3] Fix: del env.NodeId --- examples/cluster/main.go | 2 +- internal/env/env.go | 5 ----- options.go | 3 +-- service/connection.go | 9 ++++----- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/examples/cluster/main.go b/examples/cluster/main.go index 7b326a3a..c2e31c0c 100644 --- a/examples/cluster/main.go +++ b/examples/cluster/main.go @@ -146,7 +146,7 @@ func runGate(args *cli.Context) error { nano.WithWSPath("/nano"), nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }), nano.WithDebugMode(), - nano.WithNodeId(1), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid() + nano.WithNodeId(2), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid() ) return nil } diff --git a/internal/env/env.go b/internal/env/env.go index 369fe936..971ebdaa 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -24,7 +24,6 @@ package env import ( "net/http" - "os" "time" "github.com/lonng/nano/serialize" @@ -51,8 +50,6 @@ var ( Serializer serialize.Serializer GrpcOptions = []grpc.DialOption{grpc.WithInsecure()} - - NodeId uint64 // when cluster mode, as sessionId snowflake nodeId ) func init() { @@ -62,6 +59,4 @@ func init() { CheckOrigin = func(_ *http.Request) bool { return true } HandshakeValidator = func(_ []byte) error { return nil } Serializer = protobuf.NewSerializer() - // default use pid - NodeId = uint64(os.Getpid()) } diff --git a/options.go b/options.go index 3b008292..8f656213 100644 --- a/options.go +++ b/options.go @@ -157,7 +157,6 @@ func WithHandshakeValidator(fn func([]byte) error) Option { // WithNodeId set nodeId use snowflake nodeId generate sessionId, default: pid func WithNodeId(nodeId uint64) Option { return func(opt *cluster.Options) { - env.NodeId = nodeId - service.ResetNodeId() + service.ResetNodeId(nodeId) } } diff --git a/service/connection.go b/service/connection.go index 934dc730..7c08c05e 100644 --- a/service/connection.go +++ b/service/connection.go @@ -21,9 +21,8 @@ package service import ( + "os" "sync/atomic" - - "github.com/lonng/nano/internal/env" ) type Connection interface { @@ -34,13 +33,13 @@ type Connection interface { SessionID() int64 } -func ResetNodeId() { - Connections = newDefaultConnectionServer(env.NodeId) +func ResetNodeId(nodeId uint64) { + Connections = newDefaultConnectionServer(nodeId) } // Connections is a global variable which is used by session. //var Connections = newConnectionService() -var Connections Connection = newDefaultConnectionServer(env.NodeId) +var Connections Connection = newDefaultConnectionServer(uint64(os.Getpid())) type connectionService struct { count int64