diff --git a/node/node.go b/node/node.go index a9f767bf28f1d..8feeabe125bc3 100644 --- a/node/node.go +++ b/node/node.go @@ -324,47 +324,21 @@ func (n *Node) startIPC(apis []rpc.API) error { if n.ipcEndpoint == "" { return nil } - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - n.log.Debug("IPC registered", "service", api.Service, "namespace", api.Namespace) - } - // All APIs registered, start the IPC listener - var ( - listener net.Listener - err error - ) - if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { + isClosed := func() bool { + n.lock.RLock() + defer n.lock.RUnlock() + return n.ipcListener == nil + } + + listener, handler, err := rpc.StartIPCEndpoint(isClosed, n.ipcEndpoint, apis) + if err != nil { return err } - go func() { - n.log.Info("IPC endpoint opened", "url", n.ipcEndpoint) - - for { - conn, err := listener.Accept() - if err != nil { - // Terminate if the listener was closed - n.lock.RLock() - closed := n.ipcListener == nil - n.lock.RUnlock() - if closed { - return - } - // Not closed, just some error; report and continue - n.log.Error("IPC accept failed", "err", err) - continue - } - log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) - go handler.ServeCodec(rpc.NewCodec(conn), 0) - } - }() + // All listeners booted successfully n.ipcListener = listener n.ipcHandler = handler - + log.Info("IPC endpoint opened", "url", n.ipcEndpoint) return nil } @@ -388,30 +362,10 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors if endpoint == "" { return nil } - // Generate the whitelist based on the allowed modules - whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - } - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - n.log.Debug("HTTP registered", "service", api.Service, "namespace", api.Namespace) - } - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { + listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, n.config.HTTPWriteTimeout) + if err != nil { return err } - go rpc.NewHTTPServer(cors, vhosts, handler, n.config.HTTPWriteTimeout).Serve(listener) n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ",")) // All listeners booted successfully n.httpEndpoint = endpoint @@ -441,32 +395,11 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig if endpoint == "" { return nil } - // Generate the whitelist based on the allowed modules - whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - } - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - n.log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) - } - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { + listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll) + if err != nil { return err } - go rpc.NewWSServer(wsOrigins, handler).Serve(listener) n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr())) - // All listeners booted successfully n.wsEndpoint = endpoint n.wsListener = listener diff --git a/rpc/endpoints.go b/rpc/endpoints.go index 1f5770a45d506..c3d828527170d 100644 --- a/rpc/endpoints.go +++ b/rpc/endpoints.go @@ -19,12 +19,72 @@ package rpc import ( "net" "strings" + "time" "github.com/XinFinOrg/XDPoSChain/log" ) +// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules +func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string, timeout time.Duration) (net.Listener, *Server, error) { + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("HTTP registered", "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + go NewHTTPServer(cors, vhosts, handler, timeout).Serve(listener) + return listener, handler, err +} + +// StartWSEndpoint starts a websocket endpoint +func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *Server, error) { + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := NewServer() + for _, api := range apis { + if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + go NewWSServer(wsOrigins, handler).Serve(listener) + return listener, handler, err + +} + // StartIPCEndpoint starts an IPC endpoint. -func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { +func StartIPCEndpoint(isClosedFn func() bool, ipcEndpoint string, apis []API) (net.Listener, *Server, error) { // Register all the APIs exposed by the services. var ( handler = NewServer() @@ -36,6 +96,7 @@ func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, er log.Info("IPC registration failed", "namespace", api.Namespace, "error", err) return nil, nil, err } + log.Debug("IPC registered", "service", api.Service, "namespace", api.Namespace) if _, ok := regMap[api.Namespace]; !ok { registered = append(registered, api.Namespace) regMap[api.Namespace] = struct{}{} @@ -43,10 +104,30 @@ func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, er } log.Debug("IPCs registered", "namespaces", strings.Join(registered, ",")) // All APIs registered, start the IPC listener. - listener, err := ipcListen(ipcEndpoint) - if err != nil { + var ( + listener net.Listener + err error + ) + if listener, err = CreateIPCListener(ipcEndpoint); err != nil { return nil, nil, err } - go handler.ServeListener(listener) + go func() { + for { + conn, err := listener.Accept() + if err != nil { + // Terminate if the listener was closed + if isClosedFn() { + log.Info("IPC closed", "err", err) + return + } + // Not closed, just some error; report and continue + log.Error("IPC accept failed", "err", err) + continue + } + log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) + go handler.ServeCodec(NewCodec(conn), 0) + } + }() + return listener, handler, nil }