Skip to content

Commit

Permalink
cluster: move Options to cluster package
Browse files Browse the repository at this point in the history
Signed-off-by: Lonng <[email protected]>
  • Loading branch information
lonng committed Jul 4, 2019
1 parent 58cab73 commit 26cc845
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 175 deletions.
109 changes: 0 additions & 109 deletions app.go

This file was deleted.

24 changes: 14 additions & 10 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,26 @@ import (
"google.golang.org/grpc"
)

// Node represents a node in nano cluster, which will contains a group of services.
// All services will register to cluster and messages will be forwarded to the node
// which provides respective service
type Node struct {
Label string
IsMaster bool // indicate if the current node is master
IsGate bool // indicate if the current node is gate
AdvertiseAddr string // master server service address
// Options contains some configurations for current node
type Options struct {
Pipeline pipeline.Pipeline
IsMaster bool
AdvertiseAddr string
RetryInterval time.Duration
ClientAddr string
ServiceAddr string
Components *component.Components
Label string
IsWebsocket bool
TSLCertificate string
TSLKey string
Pipeline pipeline.Pipeline
}

// Node represents a node in nano cluster, which will contains a group of services.
// All services will register to cluster and messages will be forwarded to the node
// which provides respective service
type Node struct {
Options // current node options
ServiceAddr string // current server service address (RPC)

cluster *cluster
handler *LocalHandler
Expand Down
5 changes: 3 additions & 2 deletions examples/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,14 @@ func runGate(args *cli.Context) error {
log.Println("Remote master server address", masterAddr)

// Startup Nano server with the specified listen address
nano.ListenWS(listen,
nano.Listen(listen,
nano.WithAdvertiseAddr(masterAddr),
nano.WithClientAddr(gateAddr),
nano.WithComponents(gate.Services),
nano.WithSerializer(json.NewSerializer()),
nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }),
nano.WithIsWebsocket(true),
nano.WithWSPath("/nano"),
nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }),
nano.WithDebugMode(),
)
return nil
Expand Down
91 changes: 73 additions & 18 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,92 @@
package nano

import (
"fmt"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

"github.com/lonng/nano/cluster"
"github.com/lonng/nano/component"
"github.com/lonng/nano/internal/env"
"github.com/lonng/nano/internal/log"
"github.com/lonng/nano/internal/message"
"github.com/lonng/nano/internal/runtime"
"github.com/lonng/nano/scheduler"
)

var running int32

// Message is the alias of `message.Message`
type Message = message.Message

// Listen listens on the TCP network address addr
// and then calls Serve with handler to handle requests
// on incoming connections.
func Listen(addr string, opts ...Option) {
run(addr, false, "", "", opts...)
}
if atomic.AddInt32(&running, 1) != 1 {
log.Println("Nano has running")
return
}

// ListenWS listens on the TCP network address addr
// and then upgrades the HTTP server connection to the WebSocket protocol
// to handle requests on incoming connections.
func ListenWS(addr string, opts ...Option) {
run(addr, true, "", "", opts...)
}
opt := cluster.Options{
Components: &component.Components{},
}
for _, option := range opts {
option(&opt)
}

// Use listen address as client address in non-cluster mode
if !opt.IsMaster && opt.AdvertiseAddr == "" && opt.ClientAddr == "" {
log.Println("The current server running in singleton mode")
opt.ClientAddr = addr
}

// Set the retry interval to 3 secondes if doesn't set by user
if opt.RetryInterval == 0 {
opt.RetryInterval = time.Second * 3
}

// ListenWS listens on the TCP network address addr
// and then upgrades the HTTP server connection to the WebSocket protocol
// to handle requests on incoming connections.
func ListenWSTLS(addr string, certificate string, key string, opts ...Option) {
run(addr, true, certificate, key, opts...)
node := &cluster.Node{
Options: opt,
ServiceAddr: addr,
}
err := node.Startup()
if err != nil {
log.Fatalf("Node startup failed: %v", err)
}
runtime.CurrentNode = node

if node.ClientAddr != "" {
log.Println(fmt.Sprintf("Startup *Nano gate server* %s, client address: %v, service address: %s",
app.name, node.ClientAddr, node.ServiceAddr))
} else {
log.Println(fmt.Sprintf("Startup *Nano backend server* %s, service address %s",
app.name, node.ServiceAddr))
}

go scheduler.Sched()
sg := make(chan os.Signal)
signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM)

select {
case <-env.Die:
log.Println("The app will shutdown in a few seconds")
case s := <-sg:
log.Println("Nano server got signal", s)
}

log.Println("Nano server is stopping...")

node.Shutdown()
runtime.CurrentNode = nil
scheduler.Close()
atomic.StoreInt32(&running, 0)
}

// Shutdown send a signal to let 'nano' shutdown itself.
func Shutdown() {
close(env.Die)
}

// SetLogger rewrites the default logger
func SetLogger(l log.Logger) {
log.SetLogger(l)
}
Loading

0 comments on commit 26cc845

Please sign in to comment.