Skip to content

Commit

Permalink
cluster,example: propagate session closed event to backend servers
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Jun 30, 2019
1 parent dce2405 commit abfc3be
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 12 deletions.
17 changes: 17 additions & 0 deletions cluster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ func (h *LocalHandler) handle(conn net.Conn) {

// guarantee agent related resource be destroyed
defer func() {
remotes := agent.session.Router().Remote()
request := &clusterpb.SessionClosedRequest{
SessionId: agent.session.ID(),
}
for _, remote := range remotes {
pool, err := h.currentNode.rpcClient.getConnPool(remote)
if err != nil {
log.Println("Cannot retrieve connection pool for address", remote, err)
continue
}
client := clusterpb.NewMemberClient(pool.Get())
_, err = client.SessionClosed(context.Background(), request)
if err != nil {
log.Println("Cannot closed session in remote address", remote, err)
}
}

agent.Close()
if env.Debug {
log.Println(fmt.Sprintf("Session read goroutine exit, SessionID=%d, UID=%d", agent.session.ID(), agent.session.UID()))
Expand Down
10 changes: 4 additions & 6 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,26 @@ func (n *Node) DelMember(_ context.Context, req *clusterpb.DelMemberRequest) (*c
return &clusterpb.DelMemberResponse{}, nil
}

// SessionClosed implements the MemberServer interface
func (n *Node) SessionClosed(_ context.Context, req *clusterpb.SessionClosedRequest) (*clusterpb.SessionClosedResponse, error) {
n.mu.Lock()
s, found := n.sessions[req.SessionId]
delete(n.sessions, req.SessionId)
n.mu.Unlock()
if found {
scheduler.PushTask(func() {
session.Lifetime.Close(s)
})
scheduler.PushTask(func() { session.Lifetime.Close(s) })
}
return &clusterpb.SessionClosedResponse{}, nil
}

// CloseSession implements the MemberServer interface
func (n *Node) CloseSession(_ context.Context, req *clusterpb.CloseSessionRequest) (*clusterpb.CloseSessionResponse, error) {
n.mu.Lock()
s, found := n.sessions[req.SessionId]
delete(n.sessions, req.SessionId)
n.mu.Unlock()
if found {
scheduler.PushTask(func() {
session.Lifetime.Close(s)
})
s.Close()
}
return &clusterpb.CloseSessionResponse{}, nil
}
4 changes: 4 additions & 0 deletions examples/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lonng/nano/examples/cluster/gate"
"github.com/lonng/nano/examples/cluster/master"
"github.com/lonng/nano/serialize/json"
"github.com/lonng/nano/session"
"github.com/pingcap/errors"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -101,6 +102,9 @@ func runMaster(args *cli.Context) error {
}
}()

// Register session closed callback
session.Lifetime.OnClosed(master.OnSessionClosed)

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithMaster(),
Expand Down
1 change: 0 additions & 1 deletion examples/cluster/master/http_api.go

This file was deleted.

9 changes: 8 additions & 1 deletion examples/cluster/master/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package master

import "github.com/lonng/nano/component"
import (
"github.com/lonng/nano/component"
"github.com/lonng/nano/session"
)

var (
// All services in master server
Expand All @@ -14,3 +17,7 @@ var (
func init() {
Services.Register(topicService)
}

func OnSessionClosed(s *session.Session) {
topicService.userDisconnected(s)
}
19 changes: 16 additions & 3 deletions examples/cluster/master/topic_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package master

import (
"fmt"
"log"

"github.com/lonng/nano"
"github.com/lonng/nano/component"
Expand Down Expand Up @@ -32,10 +33,12 @@ func newTopicService() *TopicService {
}

func (ts *TopicService) NewUser(s *session.Session, msg *protocol.NewUserRequest) error {
// exists users

ts.nextUid++
uid := ts.nextUid
if err := s.Bind(uid); err != nil {
return errors.Trace(err)
}

user := &User{
session: s,
nickname: msg.Nickname,
Expand All @@ -48,7 +51,7 @@ func (ts *TopicService) NewUser(s *session.Session, msg *protocol.NewUserRequest
Content: fmt.Sprintf("User user join: %v", msg.Nickname),
}
if err := ts.group.Broadcast("onNewUser", broadcast); err != nil {
return err
return errors.Trace(err)
}
return ts.group.Add(s)
}
Expand All @@ -60,3 +63,13 @@ type OpenTopicRequest struct {
func (ts *TopicService) OpenTopic(s *session.Session, msg *OpenTopicRequest) error {
return errors.Errorf("not implemented: %v", msg)
}

func (ts *TopicService) userDisconnected(s *session.Session) {
uid := s.UID()
delete(ts.users, uid)
if err := ts.group.Leave(s); err != nil {
log.Println("Remove user from group failed", s.UID(), err)
return
}
log.Println("User session disconnected", s.UID())
}
2 changes: 1 addition & 1 deletion examples/cluster/master/web/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<li v-for="msg in messages">[<span style="color:red;">{{msg.name}}</span>]{{msg.content}}</li>
</ul>
<div class="controls">
<input type="text" v-model="nickname">
<text type="text" v-model="nickname">
<input type="text" v-model="inputMessage">
<input type="button" v-on:click="sendMessage" value="Send">
</div>
Expand Down
10 changes: 10 additions & 0 deletions session/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,13 @@ func (r *Router) Find(service string) (string, bool) {
}
return v.(string), true
}

// Remote returns all remote service that have been registered to router
func (r *Router) Remote() []string {
var addrs []string
r.routes.Range(func(_, value interface{}) bool {
addrs = append(addrs, value.(string))
return true
})
return addrs
}

0 comments on commit abfc3be

Please sign in to comment.