diff --git a/app.go b/app.go index c389dfd6..d967b594 100644 --- a/app.go +++ b/app.go @@ -51,7 +51,7 @@ func run(addr string, isWs bool, certificate string, key string, opts ...Option) } // Use listen address as client address in non-cluster mode - if opt.advertiseAddr == "" && opt.clientAddr == "" { + if !opt.isMaster && opt.advertiseAddr == "" && opt.clientAddr == "" { log.Println("The current server running in singleton mode") opt.clientAddr = addr } diff --git a/cluster/handler.go b/cluster/handler.go index 22b1ebc7..39cf5aae 100644 --- a/cluster/handler.go +++ b/cluster/handler.go @@ -370,7 +370,7 @@ func (h *LocalHandler) localProcess(handler *component.Handler, lastMid uint64, } args := []reflect.Value{handler.Receiver, reflect.ValueOf(session), reflect.ValueOf(data)} - scheduler.PushTask(func() { + task := func() { switch v := session.NetworkEntity().(type) { case *agent: v.lastMid = lastMid @@ -384,5 +384,31 @@ func (h *LocalHandler) localProcess(handler *component.Handler, lastMid uint64, log.Println(fmt.Sprintf("Service %s error: %+v", msg.Route, err)) } } - }) + } + + index := strings.LastIndex(msg.Route, ".") + if index < 0 { + log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route)) + return + } + + // A message can be dispatch to global thread or a user customized thread + service := msg.Route[:index] + if s, found := h.localServices[service]; found && s.SchedName != "" { + sched := session.Value(s.SchedName) + if sched == nil { + log.Println(fmt.Sprintf("nanl/handler: cannot found `schedular.LocalScheduler` by %s", s.SchedName)) + return + } + + local, ok := sched.(scheduler.LocalScheduler) + if !ok { + log.Println(fmt.Sprintf("nanl/handler: Type %T does not implement the `schedular.LocalScheduler` interface", + sched)) + return + } + local.Schedule(task) + } else { + scheduler.PushTask(task) + } } diff --git a/component/options.go b/component/options.go index af7a74f7..2c253319 100644 --- a/component/options.go +++ b/component/options.go @@ -22,8 +22,9 @@ package component type ( options struct { - name string // component name - nameFunc func(string) string // rename handler name + name string // component name + nameFunc func(string) string // rename handler name + schedName string // schedName name } // Option used to customize handler @@ -44,3 +45,10 @@ func WithNameFunc(fn func(string) string) Option { opt.nameFunc = fn } } + +// WithSchedulerName set the name of the service scheduler +func WithSchedulerName(name string) Option { + return func(opt *options) { + opt.schedName = name + } +} diff --git a/component/service.go b/component/service.go index b27d723e..f709632c 100644 --- a/component/service.go +++ b/component/service.go @@ -38,11 +38,12 @@ type ( // Service implements a specific service, some of it's methods will be // called when the correspond events is occurred. Service struct { - Name string // name of service - Type reflect.Type // type of the receiver - Receiver reflect.Value // receiver of methods for the service - Handlers map[string]*Handler // registered methods - Options options // options + Name string // name of service + Type reflect.Type // type of the receiver + Receiver reflect.Value // receiver of methods for the service + Handlers map[string]*Handler // registered methods + SchedName string // name of scheduler variable in session data + Options options // options } ) @@ -62,6 +63,7 @@ func NewService(comp Component, opts []Option) *Service { } else { s.Name = reflect.Indirect(s.Receiver).Type().Name() } + s.SchedName = s.Options.schedName return s } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 53c3f4a6..8900b334 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -35,6 +35,11 @@ const ( sessionCloseBacklog = 1 << 8 ) +// LocalScheduler schedules task to a customized goroutine +type LocalScheduler interface { + Schedule(Task) +} + type Task func() type Hook func()