Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix:for deploy multi Gate,use snowflake generate sessionId #90

Merged
merged 3 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (*
}

resp := &clusterpb.RegisterResponse{}
c.mu.Lock()
for k, m := range c.members {
if m.memberInfo.ServiceAddr == req.MemberInfo.ServiceAddr {
// 节点异常崩溃,不会执行unregister,此时再次启动该节点,由于已存在注册信息,将再也无法成功注册,这里做个修改,先移除后重新注册
Expand All @@ -64,6 +65,7 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (*
//return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr)
}
}
c.mu.Unlock()

// Notify registered node to update remote services
newMember := &clusterpb.NewMemberRequest{MemberInfo: req.MemberInfo}
Expand Down
1 change: 1 addition & 0 deletions examples/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func runGate(args *cli.Context) error {
nano.WithWSPath("/nano"),
nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }),
nano.WithDebugMode(),
nano.WithNodeId(2), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid()
)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/lonng/nano
go 1.12

require (
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lonng/nano/internal/message"
"github.com/lonng/nano/pipeline"
"github.com/lonng/nano/serialize"
"github.com/lonng/nano/service"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -152,3 +153,10 @@ func WithHandshakeValidator(fn func([]byte) error) Option {
env.HandshakeValidator = fn
}
}

// WithNodeId set nodeId use snowflake nodeId generate sessionId, default: pid
func WithNodeId(nodeId uint64) Option {
return func(opt *cluster.Options) {
service.ResetNodeId(nodeId)
}
}
16 changes: 15 additions & 1 deletion service/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,25 @@
package service

import (
"os"
"sync/atomic"
)

type Connection interface {
Increment()
Decrement()
Count() int64
Reset()
SessionID() int64
}

func ResetNodeId(nodeId uint64) {
Connections = newDefaultConnectionServer(nodeId)
}

// Connections is a global variable which is used by session.
var Connections = newConnectionService()
//var Connections = newConnectionService()
var Connections Connection = newDefaultConnectionServer(uint64(os.Getpid()))

type connectionService struct {
count int64
Expand Down
48 changes: 48 additions & 0 deletions service/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package service

import (
"sync/atomic"

"github.com/bwmarrin/snowflake"
)

// implement Connection
type defaultConnectionServer struct {
count int64
node *snowflake.Node
}

func newDefaultConnectionServer(node uint64) *defaultConnectionServer {
dcs := &defaultConnectionServer{
count: 0,
node: nil,
}
n := int64(node % 1000) // safety node value
dcs.node, _ = snowflake.NewNode(n)
return dcs
}

// Increment increment the connection count
func (dcs *defaultConnectionServer) Increment() {
atomic.AddInt64(&dcs.count, 1)
}

// Decrement decrement the connection count
func (dcs *defaultConnectionServer) Decrement() {
atomic.AddInt64(&dcs.count, -1)
}

// Count returns the connection numbers in current
func (dcs *defaultConnectionServer) Count() int64 {
return atomic.LoadInt64(&dcs.count)
}

// Reset reset the connection service status
func (dcs *defaultConnectionServer) Reset() {
atomic.StoreInt64(&dcs.count, 0)
}

// SessionID returns the session id, (snowflake impl)
func (dcs *defaultConnectionServer) SessionID() int64 {
return dcs.node.Generate().Int64()
}
31 changes: 31 additions & 0 deletions service/default_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package service

import (
"testing"
)

func TestNewDefaultConnectionServer(t *testing.T) {
service := newDefaultConnectionServer(101024)
w := make(chan bool, paraCount)
sidChan := make(chan int64, paraCount)
for i := 0; i < paraCount; i++ {
go func() {
service.Increment()
w <- true
sidChan <- service.SessionID()
}()
}
smap := make(map[int64]struct{}, paraCount)
for i := 0; i < paraCount; i++ {
<-w
sid := <-sidChan
if _, ok := smap[sid]; ok {
t.Error("wrong session id repeat")
} else {
smap[sid] = struct{}{}
}
}
if service.Count() != paraCount {
t.Error("wrong connection count")
}
}