Skip to content

Commit

Permalink
cache event fanout & reversetunnel improvements
Browse files Browse the repository at this point in the history
- cache now perforams in-memory fanout of events, eliminating
spurious event generation due to cache init/reset.

- removed old unused logic from reversetunnel agents.

- replaced seekpool with simpler ttl-cache and semaphore-like
lease system.

- add jittered backoff to agent connection attempts to
reduce "thundering herd" effect.

- improved reversetunnel logging.

- improved LB usage in tests.
  • Loading branch information
fspmarshall committed Apr 23, 2020
1 parent 703708a commit 4e9eed9
Show file tree
Hide file tree
Showing 28 changed files with 1,387 additions and 1,078 deletions.
48 changes: 25 additions & 23 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2030,7 +2030,8 @@ func (s *IntSuite) TestDiscoveryRecovers(c *check.C) {
continue
}
if p.Config.Hostname == name {
lb.RemoveBackend(*utils.MustParseAddr(p.Config.Proxy.ReverseTunnelListenAddr.Addr))
reverseTunnelPort := utils.MustParseAddr(p.Config.Proxy.ReverseTunnelListenAddr.Addr).Port(0)
c.Assert(lb.RemoveBackend(*utils.MustParseAddr(net.JoinHostPort(Loopback, strconv.Itoa(reverseTunnelPort)))), check.IsNil)
c.Assert(p.Close(), check.IsNil)
c.Assert(p.Wait(), check.IsNil)
return
Expand Down Expand Up @@ -2073,22 +2074,22 @@ func (s *IntSuite) TestDiscoveryRecovers(c *check.C) {
// create first numbered proxy
_, c0 := addNewMainProxy(pname(0))
// check that we now have two tunnel connections
waitForProxyCount(remote, "cluster-main", 2)
c.Assert(waitForProxyCount(remote, "cluster-main", 2), check.IsNil)
// check that first numbered proxy is OK.
testProxyConn(&c0, false)
// remove the initial proxy.
lb.RemoveBackend(mainProxyAddr)
waitForProxyCount(remote, "cluster-main", 1)
c.Assert(lb.RemoveBackend(mainProxyAddr), check.IsNil)
c.Assert(waitForProxyCount(remote, "cluster-main", 1), check.IsNil)

// force bad state by iteratively removing previous proxy before
// adding next proxy; this ensures that discovery protocol's list of
// known proxies is all invalid.
for i := 0; i < 6; i++ {
prev, next := pname(i), pname(i+1)
killMainProxy(prev)
waitForProxyCount(remote, "cluster-main", 0)
c.Assert(waitForProxyCount(remote, "cluster-main", 0), check.IsNil)
_, cn := addNewMainProxy(next)
waitForProxyCount(remote, "cluster-main", 1)
c.Assert(waitForProxyCount(remote, "cluster-main", 1), check.IsNil)
testProxyConn(&cn, false)
}

Expand Down Expand Up @@ -2186,7 +2187,7 @@ func (s *IntSuite) TestDiscovery(c *check.C) {
c.Assert(output, check.Equals, "hello world\n")

// Now disconnect the main proxy and make sure it will reconnect eventually.
lb.RemoveBackend(mainProxyAddr)
c.Assert(lb.RemoveBackend(mainProxyAddr), check.IsNil)
waitForActiveTunnelConnections(c, secondProxy, "cluster-remote", 1)

// Requests going via main proxy should fail.
Expand Down Expand Up @@ -2218,7 +2219,7 @@ func (s *IntSuite) TestDiscovery(c *check.C) {
c.Assert(err, check.IsNil)

// Wait for the remote cluster to detect the outbound connection is gone.
waitForProxyCount(remote, "cluster-main", 1)
c.Assert(waitForProxyCount(remote, "cluster-main", 1), check.IsNil)

// Stop both clusters and remaining nodes.
c.Assert(remote.StopAll(), check.IsNil)
Expand Down Expand Up @@ -2337,7 +2338,7 @@ func (s *IntSuite) TestDiscoveryNode(c *check.C) {
c.Assert(output, check.Equals, "hello world\n")

// Remove second proxy from LB.
lb.RemoveBackend(*proxyTwoBackend)
c.Assert(lb.RemoveBackend(*proxyTwoBackend), check.IsNil)
waitForActiveTunnelConnections(c, main.Tunnel, Site, 1)

// Requests going via main proxy will succeed. Requests going via second
Expand Down Expand Up @@ -2391,17 +2392,17 @@ func waitForActiveTunnelConnections(c *check.C, tunnel reversetunnel.Server, clu
// reach some value.
func waitForProxyCount(t *TeleInstance, clusterName string, count int) error {
var counts map[string]int

for i := 0; i < 20; i++ {
start := time.Now()
for time.Since(start) < 17*time.Second {
counts = t.Pool.Counts()
if counts[clusterName] == count {
return nil
}

time.Sleep(250 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}

return trace.BadParameter("proxy count on %v: %v", clusterName, counts[clusterName])
return trace.BadParameter("proxy count on %v: %v (wanted %v)", clusterName, counts[clusterName], count)
}

// waitForNodeCount waits for a certain number of nodes to show up in the remote site.
Expand Down Expand Up @@ -3031,7 +3032,7 @@ func (s *IntSuite) TestRotateSuccess(c *check.C) {
c.Assert(err, check.IsNil)

// client works as is before servers have been rotated
err = runAndMatch(clt, 3, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Setting rotation state to %v", services.RotationPhaseUpdateServers)
Expand Down Expand Up @@ -3059,7 +3060,7 @@ func (s *IntSuite) TestRotateSuccess(c *check.C) {
c.Assert(err, check.IsNil)

// new client works
err = runAndMatch(clt, 3, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Setting rotation state to %v.", services.RotationPhaseStandby)
Expand All @@ -3080,7 +3081,7 @@ func (s *IntSuite) TestRotateSuccess(c *check.C) {
c.Assert(err, check.IsNil)

// new client still works
err = runAndMatch(clt, 3, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Rotation has completed. Shuttting down service.")
Expand Down Expand Up @@ -3175,7 +3176,7 @@ func (s *IntSuite) TestRotateRollback(c *check.C) {
c.Assert(err, check.IsNil)

// client works as is before servers have been rotated
err = runAndMatch(clt, 3, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Setting rotation state to %v", services.RotationPhaseUpdateServers)
Expand Down Expand Up @@ -3205,7 +3206,7 @@ func (s *IntSuite) TestRotateRollback(c *check.C) {
c.Assert(err, check.IsNil)

// old client works
err = runAndMatch(clt, 3, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Rotation has completed. Shuttting down service.")
Expand Down Expand Up @@ -3323,7 +3324,7 @@ func (s *IntSuite) TestRotateTrustedClusters(c *check.C) {
clt, err := main.NewClientWithCreds(cfg, *initialCreds)
c.Assert(err, check.IsNil)

err = runAndMatch(clt, 6, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Setting rotation state to %v", services.RotationPhaseInit)
Expand Down Expand Up @@ -3375,7 +3376,7 @@ func (s *IntSuite) TestRotateTrustedClusters(c *check.C) {
c.Assert(err, check.IsNil)

// old client should work as is
err = runAndMatch(clt, 6, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Setting rotation state to %v", services.RotationPhaseUpdateServers)
Expand All @@ -3402,7 +3403,7 @@ func (s *IntSuite) TestRotateTrustedClusters(c *check.C) {
c.Assert(err, check.IsNil)

// new client works
err = runAndMatch(clt, 3, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Setting rotation state to %v.", services.RotationPhaseStandby)
Expand All @@ -3425,7 +3426,7 @@ func (s *IntSuite) TestRotateTrustedClusters(c *check.C) {
l.Infof("Phase completed.")

// new client still works
err = runAndMatch(clt, 3, []string{"echo", "hello world"}, ".*hello world.*")
err = runAndMatch(clt, 8, []string{"echo", "hello world"}, ".*hello world.*")
c.Assert(err, check.IsNil)

l.Infof("Service reloaded. Rotation has completed. Shuttting down service.")
Expand Down Expand Up @@ -3525,6 +3526,7 @@ func runAndMatch(tc *client.TeleportClient, attempts int, command []string, patt
for i := 0; i < attempts; i++ {
err = tc.SSH(context.TODO(), command, false)
if err != nil {
time.Sleep(500 * time.Millisecond)
continue
}
out := output.String()
Expand All @@ -3534,7 +3536,7 @@ func runAndMatch(tc *client.TeleportClient, attempts int, command []string, patt
return nil
}
err = trace.CompareFailed("output %q did not match pattern %q", out, pattern)
time.Sleep(250 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}
return err
}
Expand Down
14 changes: 9 additions & 5 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type Cache struct {
accessCache services.Access
dynamicAccessCache services.DynamicAccessExt
presenceCache services.Presence
eventsCache services.Events
eventsFanout *services.Fanout

// closedFlag is set to indicate that the services are closed
closedFlag int32
Expand Down Expand Up @@ -275,7 +275,7 @@ func New(config Config) (*Cache, error) {
accessCache: local.NewAccessService(wrapper),
dynamicAccessCache: local.NewDynamicAccessService(wrapper),
presenceCache: local.NewPresenceService(wrapper),
eventsCache: local.NewEventsService(config.Backend),
eventsFanout: services.NewFanout(),
Entry: log.WithFields(log.Fields{
trace.Component: config.Component,
}),
Expand Down Expand Up @@ -305,7 +305,7 @@ func New(config Config) (*Cache, error) {
// to handle subscribers connected to the in-memory caches
// instead of reading from the backend.
func (c *Cache) NewWatcher(ctx context.Context, watch services.Watch) (services.Watcher, error) {
return c.eventsCache.NewWatcher(ctx, watch)
return c.eventsFanout.NewWatcher(ctx, watch)
}

func (c *Cache) isClosed() bool {
Expand Down Expand Up @@ -337,7 +337,7 @@ func (c *Cache) update() {
// all watchers will be out of sync, because
// cache will reload its own watcher to the backend,
// so signal closure to reset the watchers
c.Backend.CloseWatchers()
c.eventsFanout.CloseWatchers()
// events cache should be closed as well
c.Debugf("Reloading %v.", retry)
select {
Expand Down Expand Up @@ -528,7 +528,11 @@ func (c *Cache) processEvent(event services.Event) error {
c.Warningf("Skipping unsupported event %v.", event.Resource.GetKind())
return nil
}
return collection.processEvent(event)
if err := collection.processEvent(event); err != nil {
return trace.Wrap(err)
}
c.eventsFanout.Emit(event)
return nil
}

// GetCertAuthority returns certificate authority by given id. Parameter loadSigningKeys
Expand Down
10 changes: 3 additions & 7 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,9 @@ func (s *CacheSuite) preferRecent(c *check.C) {
// service is back and the new value has propagated
p.backend.SetReadError(nil)

// wait for watcher to restart
select {
case event := <-p.eventsC:
c.Assert(event.Type, check.Equals, WatcherStarted)
case <-time.After(time.Second):
c.Fatalf("timeout waiting for event")
}
// wait for watcher to restart successfully; ignoring any failed
// attempts which ocurred before backend became healthy again.
waitForEvent(c, p.eventsC, WatcherStarted, WatcherFailed)

// new value is available now
out, err = p.cache.GetCertAuthority(ca.GetID(), false)
Expand Down
Loading

0 comments on commit 4e9eed9

Please sign in to comment.