Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sasha/ha #853

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8c9a10e
use local cache instead of fs cache
klizhentas Mar 5, 2017
16921d8
some experiments
klizhentas Mar 5, 2017
418958d
more work on caching
klizhentas Mar 7, 2017
0605cb9
interim commit
klizhentas Mar 7, 2017
9b95e59
Merge branch 'master' into sasha/ha
klizhentas Mar 14, 2017
fd58ca1
work on throttling
klizhentas Mar 14, 2017
285547f
hello
klizhentas Mar 16, 2017
03e5187
some work
klizhentas Mar 17, 2017
eed6847
a bit more work
klizhentas Mar 17, 2017
1fa596d
move ttl to metadata
klizhentas Mar 18, 2017
9ed80d4
add client caching
klizhentas Mar 19, 2017
c51ff53
Merge branch 'master' into sasha/ha
klizhentas Mar 19, 2017
29b1b23
supply cache configuration
klizhentas Mar 19, 2017
0248c10
improve logs and syntax
klizhentas Mar 19, 2017
89e9856
fix
klizhentas Mar 20, 2017
59d27bd
cleanup tracer
klizhentas Mar 20, 2017
ef3ce98
fix tracer
klizhentas Mar 20, 2017
6dbda51
update dependencies
klizhentas Mar 20, 2017
7d8d8cb
fix all tests
klizhentas Mar 20, 2017
c48682f
write integration tests
klizhentas Mar 21, 2017
f80f74e
revert back logging
klizhentas Mar 21, 2017
fefe7a4
proxy
klizhentas Mar 21, 2017
5352b2f
fix connection leak
klizhentas Mar 21, 2017
9e7bd6b
Merge branch 'master' into sasha/ha
klizhentas Mar 21, 2017
6b244b9
fix IPv6 Jenkins issue
klizhentas Mar 21, 2017
e00f9d8
see if that's a timing issue
klizhentas Mar 21, 2017
76f3843
better tests
klizhentas Mar 22, 2017
c5c76ad
fix tests
klizhentas Mar 22, 2017
88515d4
Merge branch 'master' into sasha/ha
klizhentas Mar 22, 2017
3ab775c
handle expiring nodes
klizhentas Mar 22, 2017
4deade1
Merge branch 'master' into sasha/ha
russjones Apr 7, 2017
437a677
Code review comments and fixes to make tests pass.
russjones Apr 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ $(BUILDDIR)/teleport: $(LIBS) tool/teleport/*.go tool/teleport/common/*.go
$(BUILDDIR)/tsh: $(LIBS) tool/tsh/*.go
go build -o $(BUILDDIR)/tsh -i $(BUILDFLAGS) ./tool/tsh

.PHONY: goinstall
goinstall:
go install github.com/gravitational/teleport/tool/tsh
go install github.com/gravitational/teleport/tool/teleport
go install github.com/gravitational/teleport/tool/tctl

#
# make install will installs system-wide teleport
#
Expand Down
60 changes: 60 additions & 0 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
)

Expand Down Expand Up @@ -49,6 +50,10 @@ type TeleInstance struct {
Process *service.TeleportProcess
Config *service.Config
Tunnel reversetunnel.Server

// Nodes is a list of additional nodes
// started with this instance
Nodes []*service.TeleportProcess
}

type User struct {
Expand Down Expand Up @@ -245,6 +250,13 @@ func (i *TeleInstance) CreateEx(trustedSecrets []*InstanceSecrets, tconf *servic
Params: backend.Params{"path": dataDir},
}
tconf.Keygen = testauthority.New()
tconf.Auth.StaticTokens = []services.ProvisionToken{
{
Roles: []teleport.Role{teleport.RoleNode, teleport.RoleProxy},
Token: "token",
},
}

i.Config = tconf
i.Process, err = service.NewTeleport(tconf)
if err != nil {
Expand Down Expand Up @@ -287,6 +299,37 @@ func (i *TeleInstance) CreateEx(trustedSecrets []*InstanceSecrets, tconf *servic
return nil
}

// StartNode starts SSH node and connects it to the cluster
func (i *TeleInstance) StartNode(name string, sshPort, proxyWebPort, proxySSHPort int) error {
dataDir, err := ioutil.TempDir("", "cluster-"+i.Secrets.SiteName)
if err != nil {
return trace.Wrap(err)
}
tconf := service.MakeDefaultConfig()
tconf.HostUUID = name
tconf.Hostname = name
tconf.DataDir = dataDir
tconf.Auth.Enabled = false
tconf.Proxy.Enabled = true
tconf.SSH.Enabled = true
tconf.SSH.Addr.Addr = net.JoinHostPort(i.Hostname, fmt.Sprintf("%v", sshPort))
authServer := utils.MustParseAddr(net.JoinHostPort(i.Hostname, i.GetPortAuth()))
tconf.AuthServers = append(tconf.AuthServers, *authServer)
tconf.Token = "token"
tconf.Proxy.SSHAddr.Addr = net.JoinHostPort(i.Hostname, fmt.Sprintf("%v", proxySSHPort))
tconf.Proxy.WebAddr.Addr = net.JoinHostPort(i.Hostname, fmt.Sprintf("%v", proxyWebPort))
tconf.Proxy.DisableWebUI = true
// Enable cachingx
tconf.CachePolicy = service.CachePolicy{Enabled: true}

process, err := service.NewTeleport(tconf)
if err != nil {
return trace.Wrap(err)
}
i.Nodes = append(i.Nodes, process)
return process.Start()
}

// Reset re-creates the teleport instance based on the same configuration
// This is needed if you want to stop the instance, reset it and start again
func (i *TeleInstance) Reset() (err error) {
Expand Down Expand Up @@ -425,13 +468,30 @@ func (i *TeleInstance) NewClient(login string, site string, host string, port in
return tc, nil
}

// StopNodes stops additional nodes
func (i *TeleInstance) StopNodes() error {
var errors []error
for _, node := range i.Nodes {
if err := node.Close(); err != nil {
errors = append(errors, err)
log.Errorf("failed closing extra node %v", err)
}
if err := node.Wait(); err != nil {
errors = append(errors, err)
log.Errorf("failed stopping extra node %v", err)
}
}
return trace.NewAggregate(errors...)
}

func (i *TeleInstance) Stop(removeData bool) error {
if i.Config != nil && removeData {
err := os.RemoveAll(i.Config.DataDir)
if err != nil {
log.Error("failed removing temporary local Teleport directory", err)
}
}

log.Infof("Asking Teleport to stop")
err := i.Process.Close()
if err != nil {
Expand Down
72 changes: 70 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,12 @@ func (s *IntSuite) TestInvalidLogins(c *check.C) {
c.Assert(err, check.ErrorMatches, "cluster wrong-site not found")
}

// TestTwoSites creates two teleport sites: "a" and "b" and
// TestTwoClusters creates two teleport clusters: "a" and "b" and
// creates a tunnel from A to B.
//
// Then it executes an SSH command on A by connecting directly
// to A and by connecting to B via B<->A tunnel
func (s *IntSuite) TestTwoSites(c *check.C) {
func (s *IntSuite) TestTwoClusters(c *check.C) {
username := s.me.Username

a := NewInstance("site-A", HostID, Host, s.getPorts(5), s.priv, s.pub)
Expand Down Expand Up @@ -451,6 +451,74 @@ func (s *IntSuite) TestTwoSites(c *check.C) {
c.Assert(a.Stop(true), check.IsNil)
}

// TestHA tests scenario when auth server for the cluster goes down
// and we switch to local persistent caches
func (s *IntSuite) TestHA(c *check.C) {
username := s.me.Username

a := NewInstance("cluster-a", HostID, Host, s.getPorts(5), s.priv, s.pub)
b := NewInstance("cluster-b", HostID, Host, s.getPorts(5), s.priv, s.pub)

a.AddUser(username, []string{username})
b.AddUser(username, []string{username})

c.Assert(b.Create(a.Secrets.AsSlice(), false, nil), check.IsNil)
c.Assert(a.Create(b.Secrets.AsSlice(), true, nil), check.IsNil)

c.Assert(b.Start(), check.IsNil)
c.Assert(a.Start(), check.IsNil)

nodePorts := s.getPorts(3)
sshPort, proxyWebPort, proxySSHPort := nodePorts[0], nodePorts[1], nodePorts[2]
c.Assert(a.StartNode("cluster-a-node", sshPort, proxyWebPort, proxySSHPort), check.IsNil)

// wait for both sites to see each other via their reverse tunnels (for up to 10 seconds)
abortTime := time.Now().Add(time.Second * 10)
for len(b.Tunnel.GetSites()) < 2 && len(b.Tunnel.GetSites()) < 2 {
time.Sleep(time.Millisecond * 2000)
if time.Now().After(abortTime) {
c.Fatalf("two sites do not see each other: tunnels are not working")
}
}

cmd := []string{"echo", "hello world"}
tc, err := b.NewClient(username, "cluster-a", "127.0.0.1", sshPort)
c.Assert(err, check.IsNil)
output := &bytes.Buffer{}
tc.Stdout = output
c.Assert(err, check.IsNil)
// try to execute an SSH command using the same old client to Site-B
// "site-A" and "site-B" reverse tunnels are supposed to reconnect,
// and 'tc' (client) is also supposed to reconnect
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond * 50)
err = tc.SSH(context.TODO(), cmd, false)
if err == nil {
break
}
}
c.Assert(err, check.IsNil)
c.Assert(output.String(), check.Equals, "hello world\n")
// stop auth server a now
c.Assert(a.Stop(true), check.IsNil)

// try to execute an SSH command using the same old client to Site-B
// "site-A" and "site-B" reverse tunnels are supposed to reconnect,
// and 'tc' (client) is also supposed to reconnect
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond * 50)
err = tc.SSH(context.TODO(), cmd, false)
if err == nil {
break
}
}
c.Assert(err, check.IsNil)

// stop cluster and remaining nodes
c.Assert(b.Stop(true), check.IsNil)
c.Assert(b.StopNodes(), check.IsNil)
}

// getPorts helper returns a range of unallocated ports available for litening on
func (s *IntSuite) getPorts(num int) []int {
if len(s.ports) < num {
Expand Down
12 changes: 8 additions & 4 deletions lib/auth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,34 @@ limitations under the License.
package auth

import (
"time"

"github.com/gravitational/teleport/lib/services"
)

// AccessPoint is an API interface implemented by a certificate authority (CA)
type AccessPoint interface {
// GetReverseTunnels returns a list of reverse tunnels
GetReverseTunnels() ([]services.ReverseTunnel, error)

// GetDomainName returns domain name AKA ("cluster name") of the auth
// server / certificate authority (CA)
GetDomainName() (string, error)

// GetNamespaces returns a list of namespaces
GetNamespaces() ([]services.Namespace, error)

// GetNamespace returns namespace by name
GetNamespace(name string) (*services.Namespace, error)

// GetServers returns a list of registered servers
GetNodes(namespace string) ([]services.Server, error)

// UpsertServer registers server presence, permanently if ttl is 0 or
// for the specified duration with second resolution if it's >= 1 second
UpsertNode(s services.Server, ttl time.Duration) error
UpsertNode(s services.Server) error

// UpsertProxy registers server presence, permanently if ttl is 0 or
// for the specified duration with second resolution if it's >= 1 second
UpsertProxy(s services.Server, ttl time.Duration) error
UpsertProxy(s services.Server) error

// GetProxies returns a list of proxy servers registered in the cluster
GetProxies() ([]services.Server, error)
Expand Down
26 changes: 19 additions & 7 deletions lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/gravitational/trace"
"github.com/julienschmidt/httprouter"

"github.com/jonboulle/clockwork"
"github.com/tstranex/u2f"
)

Expand All @@ -50,12 +51,14 @@ type APIConfig struct {
type APIServer struct {
APIConfig
httprouter.Router
clockwork.Clock
}

// NewAPIServer returns a new instance of APIServer HTTP handler
func NewAPIServer(config *APIConfig) http.Handler {
srv := APIServer{
APIConfig: *config,
Clock: clockwork.NewRealClock(),
}
srv.Router = *httprouter.New()

Expand Down Expand Up @@ -228,19 +231,21 @@ func (s *APIServer) upsertServer(auth ClientI, role teleport.Role, w http.Respon
// if server sent "local" IP address to us, replace the ip/host part with the remote address we see
// on the socket, but keep the original port:
server.SetAddr(utils.ReplaceLocalhost(server.GetAddr(), r.RemoteAddr))

if req.TTL != 0 {
server.SetTTL(s, req.TTL)
}
switch role {
case teleport.RoleNode:
server.SetNamespace(p.ByName("namespace"))
if err := auth.UpsertNode(server, req.TTL); err != nil {
if err := auth.UpsertNode(server); err != nil {
return nil, trace.Wrap(err)
}
case teleport.RoleAuth:
if err := auth.UpsertAuthServer(server, req.TTL); err != nil {
if err := auth.UpsertAuthServer(server); err != nil {
return nil, trace.Wrap(err)
}
case teleport.RoleProxy:
if err := auth.UpsertProxy(server, req.TTL); err != nil {
if err := auth.UpsertProxy(server); err != nil {
return nil, trace.Wrap(err)
}
}
Expand Down Expand Up @@ -316,7 +321,8 @@ func (s *APIServer) upsertReverseTunnel(auth ClientI, w http.ResponseWriter, r *
if err != nil {
return nil, trace.Wrap(err)
}
if err := auth.UpsertReverseTunnel(tun, req.TTL); err != nil {
tun.SetTTL(s, req.TTL)
if err := auth.UpsertReverseTunnel(tun); err != nil {
return nil, trace.Wrap(err)
}
return message("ok"), nil
Expand Down Expand Up @@ -775,7 +781,10 @@ func (s *APIServer) upsertCertAuthority(auth ClientI, w http.ResponseWriter, r *
if err != nil {
return nil, trace.Wrap(err)
}
if err := auth.UpsertCertAuthority(ca, req.TTL); err != nil {
if req.TTL != 0 {
ca.SetTTL(s, req.TTL)
}
if err := auth.UpsertCertAuthority(ca); err != nil {
return nil, trace.Wrap(err)
}
return message("ok"), nil
Expand Down Expand Up @@ -1019,7 +1028,10 @@ func (s *APIServer) upsertOIDCConnector(auth ClientI, w http.ResponseWriter, r *
if err != nil {
return nil, trace.Wrap(err)
}
err = auth.UpsertOIDCConnector(connector, req.TTL)
if req.TTL != 0 {
connector.SetTTL(s, req.TTL)
}
err = auth.UpsertOIDCConnector(connector)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
Loading