diff --git a/app.go b/app.go deleted file mode 100644 index 7cc9abe3..00000000 --- a/app.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright (c) nano Authors. All Rights Reserved. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package nano - -import ( - "fmt" - "os" - "os/signal" - "sync/atomic" - "syscall" - "time" - - "github.com/lonng/nano/cluster" - "github.com/lonng/nano/component" - "github.com/lonng/nano/internal/env" - "github.com/lonng/nano/internal/log" - "github.com/lonng/nano/internal/runtime" - "github.com/lonng/nano/scheduler" -) - -var running int32 - -func run(addr string, isWs bool, certificate string, key string, opts ...Option) { - if atomic.AddInt32(&running, 1) != 1 { - log.Println("Nano has running") - return - } - - opt := &options{ - components: &component.Components{}, - } - for _, option := range opts { - option(opt) - } - - // Use listen address as client address in non-cluster mode - if !opt.isMaster && opt.advertiseAddr == "" && opt.clientAddr == "" { - log.Println("The current server running in singleton mode") - opt.clientAddr = addr - } - - // Set the retry interval to 3 secondes if doesn't set by user - if opt.retryInterval == 0 { - opt.retryInterval = time.Second * 3 - } - - node := &cluster.Node{ - Label: opt.label, - IsMaster: opt.isMaster, - AdvertiseAddr: opt.advertiseAddr, - RetryInterval: opt.retryInterval, - ClientAddr: opt.clientAddr, - ServiceAddr: addr, - Components: opt.components, - IsWebsocket: isWs, - TSLCertificate: certificate, - TSLKey: key, - Pipeline: opt.pipeline, - } - err := node.Startup() - if err != nil { - log.Fatalf("Node startup failed: %v", err) - } - runtime.CurrentNode = node - - if node.ClientAddr != "" { - log.Println(fmt.Sprintf("Startup *Nano gate server* %s, client address: %v, service address: %s", - app.name, node.ClientAddr, node.ServiceAddr)) - } else { - log.Println(fmt.Sprintf("Startup *Nano backend server* %s, service address %s", - app.name, node.ServiceAddr)) - } - - go scheduler.Sched() - sg := make(chan os.Signal) - signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM) - - select { - case <-env.Die: - log.Println("The app will shutdown in a few seconds") - case s := <-sg: - log.Println("Nano server got signal", s) - } - - log.Println("Nano server is stopping...") - - node.Shutdown() - runtime.CurrentNode = nil - scheduler.Close() - atomic.StoreInt32(&running, 0) -} diff --git a/cluster/node.go b/cluster/node.go index 952768e1..6a7b797a 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -42,22 +42,26 @@ import ( "google.golang.org/grpc" ) -// Node represents a node in nano cluster, which will contains a group of services. -// All services will register to cluster and messages will be forwarded to the node -// which provides respective service -type Node struct { - Label string - IsMaster bool // indicate if the current node is master - IsGate bool // indicate if the current node is gate - AdvertiseAddr string // master server service address +// Options contains some configurations for current node +type Options struct { + Pipeline pipeline.Pipeline + IsMaster bool + AdvertiseAddr string RetryInterval time.Duration ClientAddr string - ServiceAddr string Components *component.Components + Label string IsWebsocket bool TSLCertificate string TSLKey string - Pipeline pipeline.Pipeline +} + +// Node represents a node in nano cluster, which will contains a group of services. +// All services will register to cluster and messages will be forwarded to the node +// which provides respective service +type Node struct { + Options // current node options + ServiceAddr string // current server service address (RPC) cluster *cluster handler *LocalHandler diff --git a/examples/cluster/main.go b/examples/cluster/main.go index b48a68ad..8809d27b 100644 --- a/examples/cluster/main.go +++ b/examples/cluster/main.go @@ -137,13 +137,14 @@ func runGate(args *cli.Context) error { log.Println("Remote master server address", masterAddr) // Startup Nano server with the specified listen address - nano.ListenWS(listen, + nano.Listen(listen, nano.WithAdvertiseAddr(masterAddr), nano.WithClientAddr(gateAddr), nano.WithComponents(gate.Services), nano.WithSerializer(json.NewSerializer()), - nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }), + nano.WithIsWebsocket(true), nano.WithWSPath("/nano"), + nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }), nano.WithDebugMode(), ) return nil diff --git a/interface.go b/interface.go index 83dd46bb..41aa307b 100644 --- a/interface.go +++ b/interface.go @@ -21,37 +21,92 @@ package nano import ( + "fmt" + "os" + "os/signal" + "sync/atomic" + "syscall" + "time" + + "github.com/lonng/nano/cluster" + "github.com/lonng/nano/component" "github.com/lonng/nano/internal/env" "github.com/lonng/nano/internal/log" + "github.com/lonng/nano/internal/message" + "github.com/lonng/nano/internal/runtime" + "github.com/lonng/nano/scheduler" ) +var running int32 + +// Message is the alias of `message.Message` +type Message = message.Message + // Listen listens on the TCP network address addr // and then calls Serve with handler to handle requests // on incoming connections. func Listen(addr string, opts ...Option) { - run(addr, false, "", "", opts...) -} + if atomic.AddInt32(&running, 1) != 1 { + log.Println("Nano has running") + return + } -// ListenWS listens on the TCP network address addr -// and then upgrades the HTTP server connection to the WebSocket protocol -// to handle requests on incoming connections. -func ListenWS(addr string, opts ...Option) { - run(addr, true, "", "", opts...) -} + opt := cluster.Options{ + Components: &component.Components{}, + } + for _, option := range opts { + option(&opt) + } + + // Use listen address as client address in non-cluster mode + if !opt.IsMaster && opt.AdvertiseAddr == "" && opt.ClientAddr == "" { + log.Println("The current server running in singleton mode") + opt.ClientAddr = addr + } + + // Set the retry interval to 3 secondes if doesn't set by user + if opt.RetryInterval == 0 { + opt.RetryInterval = time.Second * 3 + } -// ListenWS listens on the TCP network address addr -// and then upgrades the HTTP server connection to the WebSocket protocol -// to handle requests on incoming connections. -func ListenWSTLS(addr string, certificate string, key string, opts ...Option) { - run(addr, true, certificate, key, opts...) + node := &cluster.Node{ + Options: opt, + ServiceAddr: addr, + } + err := node.Startup() + if err != nil { + log.Fatalf("Node startup failed: %v", err) + } + runtime.CurrentNode = node + + if node.ClientAddr != "" { + log.Println(fmt.Sprintf("Startup *Nano gate server* %s, client address: %v, service address: %s", + app.name, node.ClientAddr, node.ServiceAddr)) + } else { + log.Println(fmt.Sprintf("Startup *Nano backend server* %s, service address %s", + app.name, node.ServiceAddr)) + } + + go scheduler.Sched() + sg := make(chan os.Signal) + signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM) + + select { + case <-env.Die: + log.Println("The app will shutdown in a few seconds") + case s := <-sg: + log.Println("Nano server got signal", s) + } + + log.Println("Nano server is stopping...") + + node.Shutdown() + runtime.CurrentNode = nil + scheduler.Close() + atomic.StoreInt32(&running, 0) } // Shutdown send a signal to let 'nano' shutdown itself. func Shutdown() { close(env.Die) } - -// SetLogger rewrites the default logger -func SetLogger(l log.Logger) { - log.SetLogger(l) -} diff --git a/options.go b/options.go index 7adcf26c..c9996c91 100644 --- a/options.go +++ b/options.go @@ -4,41 +4,31 @@ import ( "net/http" "time" + "github.com/lonng/nano/cluster" "github.com/lonng/nano/component" "github.com/lonng/nano/internal/env" + "github.com/lonng/nano/internal/log" "github.com/lonng/nano/internal/message" "github.com/lonng/nano/pipeline" "github.com/lonng/nano/serialize" "google.golang.org/grpc" ) -type ( - options struct { - pipeline pipeline.Pipeline - isMaster bool - advertiseAddr string - retryInterval time.Duration - clientAddr string - components *component.Components - label string - } - - Option func(*options) -) +type Option func(*cluster.Options) func WithPipeline(pipeline pipeline.Pipeline) Option { - return func(opt *options) { - opt.pipeline = pipeline + return func(opt *cluster.Options) { + opt.Pipeline = pipeline } } // WithAdvertiseAddr sets the advertise address option, it will be the listen address in // master node and an advertise address which cluster member to connect func WithAdvertiseAddr(addr string, retryInterval ...time.Duration) Option { - return func(opt *options) { - opt.advertiseAddr = addr + return func(opt *cluster.Options) { + opt.AdvertiseAddr = addr if len(retryInterval) > 0 { - opt.retryInterval = retryInterval[0] + opt.RetryInterval = retryInterval[0] } } } @@ -47,74 +37,74 @@ func WithAdvertiseAddr(addr string, retryInterval ...time.Duration) Option { // cluster members. Will select an available port automatically if no member address // setting and panic if no available port func WithClientAddr(addr string) Option { - return func(opt *options) { - opt.clientAddr = addr + return func(opt *cluster.Options) { + opt.ClientAddr = addr } } // WithMaster sets the option to indicate whether the current node is master node func WithMaster() Option { - return func(opt *options) { - opt.isMaster = true + return func(opt *cluster.Options) { + opt.IsMaster = true } } // WithGrpcOptions sets the grpc dial options func WithGrpcOptions(opts ...grpc.DialOption) Option { - return func(_ *options) { + return func(_ *cluster.Options) { env.GrpcOptions = append(env.GrpcOptions, opts...) } } -// WithComponents sets the components +// WithComponents sets the Components func WithComponents(components *component.Components) Option { - return func(opt *options) { - opt.components = components + return func(opt *cluster.Options) { + opt.Components = components } } // WithHeartbeatInterval sets Heartbeat time interval func WithHeartbeatInterval(d time.Duration) Option { - return func(_ *options) { + return func(_ *cluster.Options) { env.Heartbeat = d } } // WithCheckOriginFunc sets the function that check `Origin` in http headers func WithCheckOriginFunc(fn func(*http.Request) bool) Option { - return func(opt *options) { + return func(opt *cluster.Options) { env.CheckOrigin = fn } } // WithDebugMode let 'nano' to run under Debug mode. func WithDebugMode() Option { - return func(_ *options) { + return func(_ *cluster.Options) { env.Debug = true } } // SetDictionary sets routes map func WithDictionary(dict map[string]uint16) Option { - return func(_ *options) { + return func(_ *cluster.Options) { message.SetDictionary(dict) } } func WithWSPath(path string) Option { - return func(_ *options) { + return func(_ *cluster.Options) { env.WSPath = path } } -// SetTimerPrecision set the ticker precision, and time precision can not less +// SetTimerPrecision sets the ticker precision, and time precision can not less // than a Millisecond, and can not change after application running. The default // precision is time.Second func WithTimerPrecision(precision time.Duration) Option { if precision < time.Millisecond { panic("time precision can not less than a Millisecond") } - return func(_ *options) { + return func(_ *cluster.Options) { env.TimerPrecision = precision } } @@ -122,13 +112,36 @@ func WithTimerPrecision(precision time.Duration) Option { // WithSerializer customizes application serializer, which automatically Marshal // and UnMarshal handler payload func WithSerializer(serializer serialize.Serializer) Option { - return func(opt *options) { + return func(opt *cluster.Options) { env.Serializer = serializer } } +// WithLabel sets the current node label in cluster func WithLabel(label string) Option { - return func(opt *options) { - opt.label = label + return func(opt *cluster.Options) { + opt.Label = label + } +} + +// WithIsWebsocket indicates whether current node WebSocket is enabled +func WithIsWebsocket(enableWs bool) Option { + return func(opt *cluster.Options) { + opt.IsWebsocket = enableWs + } +} + +// WithTSLConfig sets the `key` and `certificate` of TSL +func WithTSLConfig(certificate, key string) Option { + return func(opt *cluster.Options) { + opt.TSLCertificate = certificate + opt.TSLKey = key + } +} + +// WithLogger overrides the default logger +func WithLogger(l log.Logger) Option { + return func(opt *cluster.Options) { + log.SetLogger(l) } }