diff --git a/main.go b/main.go index e56318c71c6..6a1c1a36d2d 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ Cluster Options: --routes Routes to solicit and connect --cluster Cluster URL for solicited routes --no_advertise Advertise known cluster IPs to clients + --connect_retries For implicit routes, number of connect retries Common Options: @@ -110,6 +111,7 @@ func main() { flag.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") flag.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") flag.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") + flag.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries") flag.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") flag.BoolVar(&opts.TLS, "tls", false, "Enable TLS.") flag.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.") diff --git a/server/configs/cluster.conf b/server/configs/cluster.conf index 3ce5ece894c..cc134fef902 100644 --- a/server/configs/cluster.conf +++ b/server/configs/cluster.conf @@ -32,4 +32,6 @@ cluster { nats-route://foo:bar@localhost:4246 ] + no_advertise: true + connect_retries: 2 } diff --git a/server/opts.go b/server/opts.go index 455d496eb3f..a02962cdbe3 100644 --- a/server/opts.go +++ b/server/opts.go @@ -33,15 +33,16 @@ type Permissions struct { // Options for clusters. type ClusterOpts struct { - Host string `json:"addr"` - Port int `json:"cluster_port"` - Username string `json:"-"` - Password string `json:"-"` - AuthTimeout float64 `json:"auth_timeout"` - TLSTimeout float64 `json:"-"` - TLSConfig *tls.Config `json:"-"` - ListenStr string `json:"-"` - NoAdvertise bool `json:"-"` + Host string `json:"addr"` + Port int `json:"cluster_port"` + Username string `json:"-"` + Password string `json:"-"` + AuthTimeout float64 `json:"auth_timeout"` + TLSTimeout float64 `json:"-"` + TLSConfig *tls.Config `json:"-"` + ListenStr string `json:"-"` + NoAdvertise bool `json:"-"` + ConnectRetries int `json:"-"` } // Options block for gnatsd server. @@ -314,6 +315,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.Cluster.TLSTimeout = tc.Timeout case "no_advertise": opts.Cluster.NoAdvertise = mv.(bool) + case "connect_retries": + opts.Cluster.ConnectRetries = int(mv.(int64)) } } return nil @@ -647,6 +650,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.Cluster.NoAdvertise { opts.Cluster.NoAdvertise = true } + if flagOpts.Cluster.ConnectRetries != 0 { + opts.Cluster.ConnectRetries = flagOpts.Cluster.ConnectRetries + } if flagOpts.RoutesStr != "" { mergeRoutes(&opts, flagOpts) } diff --git a/server/opts_test.go b/server/opts_test.go index 54ac3c2a018..a8302ef0be2 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -191,7 +191,8 @@ func TestMergeOverrides(t *testing.T) { PingInterval: 60 * time.Second, MaxPingsOut: 3, Cluster: ClusterOpts{ - NoAdvertise: true, + NoAdvertise: true, + ConnectRetries: 2, }, } fopts, err := ProcessConfigFile("./configs/test.conf") @@ -207,7 +208,8 @@ func TestMergeOverrides(t *testing.T) { HTTPPort: DEFAULT_HTTP_PORT, ProfPort: 6789, Cluster: ClusterOpts{ - NoAdvertise: true, + NoAdvertise: true, + ConnectRetries: 2, }, } merged := MergeOptions(fopts, opts) diff --git a/server/route.go b/server/route.go index f9e04343c8f..6e004bc16ed 100644 --- a/server/route.go +++ b/server/route.go @@ -689,18 +689,25 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) { func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) { defer s.grWG.Done() + attempts := 0 for s.isRunning() && rURL != nil { Debugf("Trying to connect to route on %s", rURL.Host) conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL) if err != nil { Debugf("Error trying to connect to route: %v", err) + if !tryForEver { + if s.opts.Cluster.ConnectRetries <= 0 { + return + } + attempts++ + if attempts > s.opts.Cluster.ConnectRetries { + return + } + } select { case <-s.rcQuit: return case <-time.After(DEFAULT_ROUTE_CONNECT): - if !tryForEver { - return - } continue } } diff --git a/server/routes_test.go b/server/routes_test.go index 14cbeeb7167..e0f5f90d7b2 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -27,11 +27,13 @@ func TestRouteConfig(t *testing.T) { Password: "bella", AuthTimeout: 1.0, Cluster: ClusterOpts{ - Host: "127.0.0.1", - Port: 4244, - Username: "route_user", - Password: "top_secret", - AuthTimeout: 1.0, + Host: "127.0.0.1", + Port: 4244, + Username: "route_user", + Password: "top_secret", + AuthTimeout: 1.0, + NoAdvertise: true, + ConnectRetries: 2, }, LogFile: "/tmp/nats_cluster_test.log", PidFile: "/tmp/nats_cluster_test.pid", diff --git a/test/route_discovery_test.go b/test/route_discovery_test.go index 64b4434dd0f..5d33bd1daba 100644 --- a/test/route_discovery_test.go +++ b/test/route_discovery_test.go @@ -3,6 +3,7 @@ package test import ( + "bufio" "encoding/json" "fmt" "io/ioutil" @@ -638,3 +639,65 @@ func TestSeedReturnIPInInfo(t *testing.T) { t.Fatalf("Expected IP %s, got %s", s1, s2) } } + +func TestImplicitRouteRetry(t *testing.T) { + srvSeed, optsSeed := runSeedServer(t) + defer srvSeed.Shutdown() + + optsA := nextServerOpts(optsSeed) + optsA.Routes = server.RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsA.Cluster.ConnectRetries = 5 + srvA := RunServer(optsA) + defer srvA.Shutdown() + + optsB := nextServerOpts(optsA) + rcb := createRouteConn(t, optsSeed.Cluster.Host, optsSeed.Cluster.Port) + defer rcb.Close() + rcbID := "ServerB" + routeBSend, routeBExpect := setupRouteEx(t, rcb, optsB, rcbID) + routeBExpect(infoRe) + // register ourselves via INFO + rbInfo := server.Info{ID: rcbID, Host: optsB.Cluster.Host, Port: optsB.Cluster.Port} + b, _ := json.Marshal(rbInfo) + infoJSON := fmt.Sprintf(server.InfoProto, b) + routeBSend(infoJSON) + routeBSend("PING\r\n") + routeBExpect(pongRe) + + // srvA should try to connect. Wait to make sure that it fails. + time.Sleep(1200 * time.Millisecond) + + // Setup a fake route listen for routeB + rbListen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", optsB.Cluster.Host, optsB.Cluster.Port)) + if err != nil { + t.Fatalf("Error during listen: %v", err) + } + c, err := rbListen.Accept() + if err != nil { + t.Fatalf("Error during accept: %v", err) + } + defer c.Close() + + br := bufio.NewReaderSize(c, 32768) + // Consume CONNECT and INFO + for i := 0; i < 2; i++ { + c.SetReadDeadline(time.Now().Add(2 * time.Second)) + buf, _, err := br.ReadLine() + c.SetReadDeadline(time.Time{}) + if err != nil { + t.Fatalf("Error reading: %v", err) + } + if i == 0 { + continue + } + buf = buf[len("INFO "):] + info := &server.Info{} + if err := json.Unmarshal(buf, info); err != nil { + t.Fatalf("Error during unmarshal: %v", err) + } + // Check INFO is from server A. + if info.ID != srvA.ID() { + t.Fatalf("Expected CONNECT from %v, got CONNECT from %v", srvA.ID(), info.ID) + } + } +}