diff --git a/server/client.go b/server/client.go index b5a64012657..706ea7d918f 100644 --- a/server/client.go +++ b/server/client.go @@ -87,6 +87,7 @@ func (cf *clientFlag) clear(c clientFlag) { type client struct { // Here first because of use of atomics, and memory alignment. stats + mpay int64 mu sync.Mutex typ int cid uint64 @@ -94,7 +95,6 @@ type client struct { opts clientOpts start time.Time nc net.Conn - mpay int ncs string bw *bufio.Writer srv *Server @@ -524,8 +524,8 @@ func (c *client) maxConnExceeded() { c.closeConnection() } -func (c *client) maxPayloadViolation(sz int) { - c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, c.mpay) +func (c *client) maxPayloadViolation(sz int, max int64) { + c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, max) c.sendErr("Maximum Payload Violation") c.closeConnection() } @@ -712,8 +712,9 @@ func (c *client) processPub(arg []byte) error { if c.pa.size < 0 { return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg) } - if c.mpay > 0 && c.pa.size > c.mpay { - c.maxPayloadViolation(c.pa.size) + maxPayload := atomic.LoadInt64(&c.mpay) + if maxPayload > 0 && int64(c.pa.size) > maxPayload { + c.maxPayloadViolation(c.pa.size, maxPayload) return ErrMaxPayload } diff --git a/server/configs/reload/max_connections.conf b/server/configs/reload/max_connections.conf new file mode 100644 index 00000000000..c791761632e --- /dev/null +++ b/server/configs/reload/max_connections.conf @@ -0,0 +1,6 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + +listen: localhost:-1 +log_file: "/tmp/gnatsd.log" + +max_connections: 1 diff --git a/server/configs/reload/max_payload.conf b/server/configs/reload/max_payload.conf new file mode 100644 index 00000000000..d1eed42462c --- /dev/null +++ b/server/configs/reload/max_payload.conf @@ -0,0 +1,6 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + +listen: localhost:-1 +log_file: "/tmp/gnatsd.log" + +max_payload: 1 diff --git a/server/configs/reload/reload.conf b/server/configs/reload/reload.conf index 4ed067094ae..b24c2a42297 100644 --- a/server/configs/reload/reload.conf +++ b/server/configs/reload/reload.conf @@ -1,10 +1,19 @@ # Copyright 2017 Apcera Inc. All rights reserved. # logging options -debug: true # enable on reload -trace: true # enable on reload -logtime: false -log_file: "/tmp/gnatsd.log" +debug: true # enable on reload +trace: true # enable on reload +logtime: true # enable on reload +syslog: true # enable on reload +remote_syslog: "udp://localhost:514" # change on reload +log_file: "/tmp/gnatsd-2.log" # change on reload + +pid_file: "/tmp/gnatsd.pid" # change on reload +max_control_line: 512 # change on reload +ping_interval: 5 # change on reload +ping_max: 1 # change on reload +write_deadline: "2s" # change on reload +max_payload: 1024 # change on reload # Enable TLS on reload tls { diff --git a/server/reload.go b/server/reload.go index c14da1c1901..3d8479617ca 100644 --- a/server/reload.go +++ b/server/reload.go @@ -9,6 +9,8 @@ import ( "net/url" "reflect" "strings" + "sync/atomic" + "time" ) // FlagSnapshot captures the server options as specified by CLI flags at @@ -45,8 +47,7 @@ type traceOption struct { newValue bool } -// Apply is a no-op because authorization will be reloaded after options are -// applied +// Apply is a no-op because logging will be reloaded after options are applied. func (t *traceOption) Apply(server *Server) { server.Noticef("Reloaded: trace = %v", t.newValue) } @@ -57,12 +58,56 @@ type debugOption struct { newValue bool } -// Apply is a no-op because authorization will be reloaded after options are -// applied +// Apply is a no-op because logging will be reloaded after options are applied. func (d *debugOption) Apply(server *Server) { server.Noticef("Reloaded: debug = %v", d.newValue) } +// logtimeOption implements the option interface for the `logtime` setting. +type logtimeOption struct { + loggingOption + newValue bool +} + +// Apply is a no-op because logging will be reloaded after options are applied. +func (l *logtimeOption) Apply(server *Server) { + server.Noticef("Reloaded: logtime = %v", l.newValue) +} + +// logfileOption implements the option interface for the `log_file` setting. +type logfileOption struct { + loggingOption + newValue string +} + +// Apply is a no-op because logging will be reloaded after options are applied. +func (l *logfileOption) Apply(server *Server) { + server.Noticef("Reloaded: log_file = %v", l.newValue) +} + +// syslogOption implements the option interface for the `syslog` setting. +type syslogOption struct { + loggingOption + newValue bool +} + +// Apply is a no-op because logging will be reloaded after options are applied. +func (s *syslogOption) Apply(server *Server) { + server.Noticef("Reloaded: syslog = %v", s.newValue) +} + +// remoteSyslogOption implements the option interface for the `remote_syslog` +// setting. +type remoteSyslogOption struct { + loggingOption + newValue string +} + +// Apply is a no-op because logging will be reloaded after options are applied. +func (r *remoteSyslogOption) Apply(server *Server) { + server.Noticef("Reloaded: remote_syslog = %v", r.newValue) +} + // noopOption is a base struct that provides default no-op behaviors. type noopOption struct{} @@ -236,6 +281,134 @@ func (r *routesOption) Apply(server *Server) { server.Noticef("Reloaded: cluster routes") } +// maxConnOption implements the option interface for the `max_connections` +// setting. +type maxConnOption struct { + noopOption + newValue int +} + +// Apply the max connections change by closing random connections til we are +// below the limit if necessary. +func (m *maxConnOption) Apply(server *Server) { + server.mu.Lock() + var ( + clients = make([]*client, len(server.clients)) + i = 0 + ) + // Map iteration is random, which allows us to close random connections. + for _, client := range server.clients { + clients[i] = client + i++ + } + server.mu.Unlock() + + if m.newValue > 0 && len(clients) > m.newValue { + // Close connections til we are within the limit. + var ( + numClose = len(clients) - m.newValue + closed = 0 + ) + for _, client := range clients { + client.maxConnExceeded() + closed++ + if closed >= numClose { + break + } + } + server.Noticef("Closed %d connections to fall within max_connections", closed) + } + server.Noticef("Reloaded: max_connections = %v", m.newValue) +} + +// pidFileOption implements the option interface for the `pid_file` setting. +type pidFileOption struct { + noopOption + newValue string +} + +// Apply the setting by logging the pid to the new file. +func (p *pidFileOption) Apply(server *Server) { + if p.newValue == "" { + return + } + if err := server.logPid(); err != nil { + server.Errorf("Failed to write pidfile: %v", err) + } + server.Noticef("Reloaded: pid_file = %v", p.newValue) +} + +// maxControlLineOption implements the option interface for the +// `max_control_line` setting. +type maxControlLineOption struct { + noopOption + newValue int +} + +// Apply is a no-op because the max control line will be reloaded after options +// are applied +func (m *maxControlLineOption) Apply(server *Server) { + server.Noticef("Reloaded: max_control_line = %d", m.newValue) +} + +// maxPayloadOption implements the option interface for the `max_payload` +// setting. +type maxPayloadOption struct { + noopOption + newValue int +} + +// Apply the setting by updating the server info and each client. +func (m *maxPayloadOption) Apply(server *Server) { + server.mu.Lock() + server.info.MaxPayload = m.newValue + server.generateServerInfoJSON() + for _, client := range server.clients { + atomic.StoreInt64(&client.mpay, int64(m.newValue)) + } + server.mu.Unlock() + server.Noticef("Reloaded: max_payload = %d", m.newValue) +} + +// pingIntervalOption implements the option interface for the `ping_interval` +// setting. +type pingIntervalOption struct { + noopOption + newValue time.Duration +} + +// Apply is a no-op because the ping interval will be reloaded after options +// are applied. +func (p *pingIntervalOption) Apply(server *Server) { + server.Noticef("Reloaded: ping_interval = %s", p.newValue) +} + +// maxPingsOutOption implements the option interface for the `ping_max` +// setting. +type maxPingsOutOption struct { + noopOption + newValue int +} + +// Apply is a no-op because the ping interval will be reloaded after options +// are applied. +func (m *maxPingsOutOption) Apply(server *Server) { + server.Noticef("Reloaded: ping_max = %d", m.newValue) +} + +// writeDeadlineOption implements the option interface for the `write_deadline` +// setting. +type writeDeadlineOption struct { + noopOption + newValue time.Duration +} + +// Apply is a no-op because the write deadline will be reloaded after options +// are applied. +func (w *writeDeadlineOption) Apply(server *Server) { + server.Noticef("Reloaded: write_deadline = %s", w.newValue) +} + // Reload reads the current configuration file and applies any supported // changes. This returns an error if the server was not started with a config // file or an option which doesn't support hot-swapping was changed. @@ -296,6 +469,14 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)}) case "debug": diffOpts = append(diffOpts, &debugOption{newValue: newValue.(bool)}) + case "logtime": + diffOpts = append(diffOpts, &logtimeOption{newValue: newValue.(bool)}) + case "logfile": + diffOpts = append(diffOpts, &logfileOption{newValue: newValue.(string)}) + case "syslog": + diffOpts = append(diffOpts, &syslogOption{newValue: newValue.(bool)}) + case "remotesyslog": + diffOpts = append(diffOpts, &remoteSyslogOption{newValue: newValue.(string)}) case "tlsconfig": diffOpts = append(diffOpts, &tlsOption{newValue: newValue.(*tls.Config)}) case "tlstimeout": @@ -319,6 +500,20 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { case "routes": add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL)) diffOpts = append(diffOpts, &routesOption{add: add, remove: remove}) + case "maxconn": + diffOpts = append(diffOpts, &maxConnOption{newValue: newValue.(int)}) + case "pidfile": + diffOpts = append(diffOpts, &pidFileOption{newValue: newValue.(string)}) + case "maxcontrolline": + diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int)}) + case "maxpayload": + diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int)}) + case "pinginterval": + diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)}) + case "maxpingsout": + diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)}) + case "writedeadline": + diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)}) case "nolog": // Ignore NoLog option since it's not parsed and only used in // testing. diff --git a/server/reload_test.go b/server/reload_test.go index 94c602ba258..33595503562 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -181,6 +181,18 @@ func TestConfigReload(t *testing.T) { if !updated.Debug { t.Fatal("Expected Debug to be true") } + if !updated.Logtime { + t.Fatal("Expected Logtime to be true") + } + if updated.LogFile != "/tmp/gnatsd-2.log" { + t.Fatalf("LogFile is incorrect.\nexpected: /tmp/gnatsd-2.log\ngot: %s", updated.LogFile) + } + if !updated.Syslog { + t.Fatal("Expected Syslog to be true") + } + if updated.RemoteSyslog != "udp://localhost:514" { + t.Fatalf("RemoteSyslog is incorrect.\nexpected: udp://localhost:514\ngot: %s", updated.RemoteSyslog) + } if updated.TLSConfig == nil { t.Fatal("Expected TLSConfig to be non-nil") } @@ -205,6 +217,24 @@ func TestConfigReload(t *testing.T) { if !updated.Cluster.NoAdvertise { t.Fatal("Expected NoAdvertise to be true") } + if updated.PidFile != "/tmp/gnatsd.pid" { + t.Fatalf("PidFile is incorrect.\nexpected: /tmp/gnatsd.pid\ngot: %s", updated.PidFile) + } + if updated.MaxControlLine != 512 { + t.Fatalf("MaxControlLine is incorrect.\nexpected: 512\ngot: %d", updated.MaxControlLine) + } + if updated.PingInterval != 5*time.Second { + t.Fatalf("PingInterval is incorrect.\nexpected 5s\ngot: %s", updated.PingInterval) + } + if updated.MaxPingsOut != 1 { + t.Fatalf("MaxPingsOut is incorrect.\nexpected 1\ngot: %d", updated.MaxPingsOut) + } + if updated.WriteDeadline != 2*time.Second { + t.Fatalf("WriteDeadline is incorrect.\nexpected 2s\ngot: %s", updated.WriteDeadline) + } + if updated.MaxPayload != 1024 { + t.Fatalf("MaxPayload is incorrect.\nexpected 1024\ngot: %d", updated.MaxPayload) + } } // Ensure Reload supports TLS config changes. Test this by starting a server @@ -1342,6 +1372,137 @@ func TestConfigReloadClusterRoutes(t *testing.T) { } } +// Ensure Reload supports changing the max connections. Test this by starting a +// server with no max connections, connecting two clients, reloading with a +// max connections of one, and ensuring one client is disconnected. +func TestConfigReloadMaxConnections(t *testing.T) { + server, opts, config := runServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/basic.conf") + defer os.Remove(config) + defer server.Shutdown() + + // Make two connections. + addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port) + nc1, err := nats.Connect(addr) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc1.Close() + closed := make(chan struct{}, 1) + nc1.SetDisconnectHandler(func(*nats.Conn) { + closed <- struct{}{} + }) + nc2, err := nats.Connect(addr) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc2.Close() + nc2.SetDisconnectHandler(func(*nats.Conn) { + closed <- struct{}{} + }) + + if numClients := server.NumClients(); numClients != 2 { + t.Fatalf("Expected 2 clients, got %d", numClients) + } + + // Set max connections to one. + if err := os.Remove(config); err != nil { + t.Fatalf("Error deleting symlink: %v", err) + } + if err := os.Symlink("./configs/reload/max_connections.conf", config); err != nil { + t.Fatalf("Error creating symlink: %v (ensure you have privileges)", err) + } + if err := server.Reload(); err != nil { + t.Fatalf("Error reloading config: %v", err) + } + + // Ensure one connection was closed. + select { + case <-closed: + case <-time.After(2 * time.Second): + t.Fatal("Expected to be disconnected") + } + + if numClients := server.NumClients(); numClients != 1 { + t.Fatalf("Expected 1 client, got %d", numClients) + } + + // Ensure new connections fail. + _, err = nats.Connect(addr) + if err == nil { + t.Fatal("Expected error on connect") + } +} + +// Ensure reload supports changing the max payload size. Test this by starting +// a server with the default size limit, ensuring publishes work, reloading +// with a restrictive limit, and ensuring publishing an oversized message fails +// and disconnects the client. +func TestConfigReloadMaxPayload(t *testing.T) { + server, opts, config := runServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/basic.conf") + defer os.Remove(config) + defer server.Shutdown() + + addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port) + nc, err := nats.Connect(addr) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc.Close() + closed := make(chan struct{}) + nc.SetDisconnectHandler(func(*nats.Conn) { + closed <- struct{}{} + }) + + conn, err := nats.Connect(addr) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer conn.Close() + sub, err := conn.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + conn.Flush() + + // Ensure we can publish as a sanity check. + if err := nc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error publishing: %v", err) + } + nc.Flush() + _, err = sub.NextMsg(2 * time.Second) + if err != nil { + t.Fatalf("Error receiving message: %v", err) + } + + // Set max payload to one. + if err := os.Remove(config); err != nil { + t.Fatalf("Error deleting symlink: %v", err) + } + if err := os.Symlink("./configs/reload/max_payload.conf", config); err != nil { + t.Fatalf("Error creating symlink: %v (ensure you have privileges)", err) + } + if err := server.Reload(); err != nil { + t.Fatalf("Error reloading config: %v", err) + } + + // Ensure oversized messages don't get delivered and the client is + // disconnected. + if err := nc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error publishing: %v", err) + } + nc.Flush() + _, err = sub.NextMsg(20 * time.Millisecond) + if err != nats.ErrTimeout { + t.Fatalf("Expected ErrTimeout, got: %v", err) + } + + select { + case <-closed: + case <-time.After(2 * time.Second): + t.Fatal("Expected to be disconnected") + } +} + func runServerWithSymlinkConfig(t *testing.T, symlinkName, configName string) (*Server, *Options, string) { opts, config := newOptionsWithSymlinkConfig(t, symlinkName, configName) opts.NoLog = true diff --git a/server/server.go b/server/server.go index cec881e3768..d340c2de40f 100644 --- a/server/server.go +++ b/server/server.go @@ -224,12 +224,9 @@ func (s *Server) isRunning() bool { return s.running } -func (s *Server) logPid() { +func (s *Server) logPid() error { pidStr := strconv.Itoa(os.Getpid()) - err := ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660) - if err != nil { - PrintAndDie(fmt.Sprintf("Could not write pidfile: %v\n", err)) - } + return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660) } // Start up the server, this will block. @@ -252,7 +249,9 @@ func (s *Server) Start() { // Log the pid to a file if opts.PidFile != _EMPTY_ { - s.logPid() + if err := s.logPid(); err != nil { + PrintAndDie(fmt.Sprintf("Could not write pidfile: %v\n", err)) + } } // Start monitoring if needed @@ -638,7 +637,10 @@ func (s *Server) HTTPHandler() http.Handler { } func (s *Server) createClient(conn net.Conn) *client { - c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()} + // Snapshot server options. + opts := s.getOpts() + + c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: int64(opts.MaxPayload), start: time.Now()} // Grab JSON info string s.mu.Lock() @@ -673,9 +675,6 @@ func (s *Server) createClient(conn net.Conn) *client { return c } - // Snapshot server options. - opts := s.getOpts() - // If there is a max connections specified, check that adding // this new client would not push us over the max if opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn {