Skip to content

Commit

Permalink
Updated dir backend to a flat keyspace. Added UpsertItems endpoint to
Browse files Browse the repository at this point in the history
all backends to support bulk insertion. Added UpsertNodes endpoint,
which is used by the state cache to speed up GetNodes.
  • Loading branch information
russjones committed Jul 11, 2018
1 parent 2a690ef commit 3a592d2
Show file tree
Hide file tree
Showing 34 changed files with 985 additions and 457 deletions.
2 changes: 1 addition & 1 deletion integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (i *TeleInstance) GenerateConfig(trustedSecrets []*InstanceSecrets, tconf *
tconf.AuthServers = append(tconf.AuthServers, tconf.Auth.SSHAddr)
tconf.Auth.StorageConfig = backend.Config{
Type: dir.GetName(),
Params: backend.Params{"path": dataDir},
Params: backend.Params{"path": dataDir + string(os.PathListSeparator) + defaults.BackendDir},
}

tconf.Keygen = testauthority.New()
Expand Down
2 changes: 1 addition & 1 deletion integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
for {
select {
case <-tickCh:
nodesInSite, err := site.GetNodes(defaults.Namespace)
nodesInSite, err := site.GetNodes(defaults.Namespace, services.SkipValidation())
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/auth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type AccessPoint interface {
GetNamespace(name string) (*services.Namespace, error)

// GetServers returns a list of registered servers
GetNodes(namespace string) ([]services.Server, error)
GetNodes(namespace string, opts ...services.MarshalOption) ([]services.Server, error)

// UpsertServer registers server presence, permanently if ttl is 0 or
// for the specified duration with second resolution if it's >= 1 second
Expand Down
40 changes: 39 additions & 1 deletion lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func NewAPIServer(config *APIConfig) http.Handler {

// Servers and presence heartbeat
srv.POST("/:version/namespaces/:namespace/nodes", srv.withAuth(srv.upsertNode))
srv.PUT("/:version/namespaces/:namespace/nodes", srv.withAuth(srv.upsertNodes))
srv.GET("/:version/namespaces/:namespace/nodes", srv.withAuth(srv.getNodes))
srv.POST("/:version/authservers", srv.withAuth(srv.upsertAuthServer))
srv.GET("/:version/authservers", srv.withAuth(srv.getAuthServers))
Expand Down Expand Up @@ -315,6 +316,34 @@ func (s *APIServer) upsertServer(auth ClientI, role teleport.Role, w http.Respon
return message("ok"), nil
}

type upsertNodesReq struct {
Nodes json.RawMessage `json:"nodes"`
Namespace string `json:"namespace"`
}

// upsertNodes is used to bulk insert nodes into the backend.
func (s *APIServer) upsertNodes(auth ClientI, w http.ResponseWriter, r *http.Request, p httprouter.Params, version string) (interface{}, error) {
var req upsertNodesReq
if err := httplib.ReadJSON(r, &req); err != nil {
return nil, trace.Wrap(err)
}
if !services.IsValidNamespace(req.Namespace) {
return nil, trace.BadParameter("invalid namespace %q", req.Namespace)
}

nodes, err := services.GetServerMarshaler().UnmarshalServers(req.Nodes)
if err != nil {
return nil, trace.Wrap(err)
}

err = auth.UpsertNodes(req.Namespace, nodes)
if err != nil {
return nil, trace.Wrap(err)
}

return message("ok"), nil
}

// upsertNode is called by remote SSH nodes when they ping back into the auth service
func (s *APIServer) upsertNode(auth ClientI, w http.ResponseWriter, r *http.Request, p httprouter.Params, version string) (interface{}, error) {
return s.upsertServer(auth, teleport.RoleNode, w, r, p, version)
Expand All @@ -326,7 +355,16 @@ func (s *APIServer) getNodes(auth ClientI, w http.ResponseWriter, r *http.Reques
if !services.IsValidNamespace(namespace) {
return nil, trace.BadParameter("invalid namespace %q", namespace)
}
servers, err := auth.GetNodes(namespace)
skipValidation, _, err := httplib.ParseBool(r.URL.Query(), "skip_validation")
if err != nil {
return nil, trace.Wrap(err)
}
var opts []services.MarshalOption
if skipValidation {
opts = append(opts, services.SkipValidation())
}

servers, err := auth.GetNodes(namespace, opts...)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func (s *AuthServer) DeleteNamespace(namespace string) error {
if namespace == defaults.Namespace {
return trace.AccessDenied("can't delete default namespace")
}
nodes, err := s.Presence.GetNodes(namespace)
nodes, err := s.Presence.GetNodes(namespace, services.SkipValidation())
if err != nil {
return trace.Wrap(err)
}
Expand Down
15 changes: 13 additions & 2 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,17 @@ func (a *AuthWithRoles) GenerateServerKeys(req GenerateServerKeysRequest) (*Pack
return a.authServer.GenerateServerKeys(req)
}

// UpsertNodes bulk upserts nodes into the backend.
func (a *AuthWithRoles) UpsertNodes(namespace string, servers []services.Server) error {
if err := a.action(namespace, services.KindNode, services.VerbCreate); err != nil {
return trace.Wrap(err)
}
if err := a.action(namespace, services.KindNode, services.VerbUpdate); err != nil {
return trace.Wrap(err)
}
return a.authServer.UpsertNodes(namespace, servers)
}

func (a *AuthWithRoles) UpsertNode(s services.Server) error {
if err := a.action(s.GetNamespace(), services.KindNode, services.VerbCreate); err != nil {
return trace.Wrap(err)
Expand All @@ -306,11 +317,11 @@ func (a *AuthWithRoles) UpsertNode(s services.Server) error {
return a.authServer.UpsertNode(s)
}

func (a *AuthWithRoles) GetNodes(namespace string) ([]services.Server, error) {
func (a *AuthWithRoles) GetNodes(namespace string, opts ...services.MarshalOption) ([]services.Server, error) {
if err := a.action(namespace, services.KindNode, services.VerbList); err != nil {
return nil, trace.Wrap(err)
}
return a.authServer.GetNodes(namespace)
return a.authServer.GetNodes(namespace, opts...)
}

func (a *AuthWithRoles) UpsertAuthServer(s services.Server) error {
Expand Down
36 changes: 33 additions & 3 deletions lib/auth/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,27 +563,57 @@ func (c *Client) UpsertNode(s services.Server) error {
return trace.Wrap(err)
}

// UpsertNodes bulk inserts nodes.
func (c *Client) UpsertNodes(namespace string, servers []services.Server) error {
if namespace == "" {
return trace.BadParameter("missing node namespace")
}

bytes, err := services.GetServerMarshaler().MarshalServers(servers)
if err != nil {
return trace.Wrap(err)
}
args := &upsertNodesReq{
Namespace: namespace,
Nodes: bytes,
}
_, err = c.PutJSON(c.Endpoint("namespaces", namespace, "nodes"), args)
return trace.Wrap(err)
}

// GetNodes returns the list of servers registered in the cluster.
func (c *Client) GetNodes(namespace string) ([]services.Server, error) {
func (c *Client) GetNodes(namespace string, opts ...services.MarshalOption) ([]services.Server, error) {
if namespace == "" {
return nil, trace.BadParameter(MissingNamespaceError)
}
out, err := c.Get(c.Endpoint("namespaces", namespace, "nodes"), url.Values{})
cfg, err := services.CollectOptions(opts)
if err != nil {
return nil, trace.Wrap(err)
}

out, err := c.Get(c.Endpoint("namespaces", namespace, "nodes"), url.Values{
"skip_validation": []string{fmt.Sprintf("%t", cfg.SkipValidation)},
})
if err != nil {
return nil, trace.Wrap(err)
}

var items []json.RawMessage
if err := json.Unmarshal(out.Bytes(), &items); err != nil {
return nil, trace.Wrap(err)
}
re := make([]services.Server, len(items))
for i, raw := range items {
s, err := services.GetServerMarshaler().UnmarshalServer(raw, services.KindNode)
s, err := services.GetServerMarshaler().UnmarshalServer(
raw,
services.KindNode,
opts...)
if err != nil {
return nil, trace.Wrap(err)
}
re[i] = s
}

return re, nil
}

Expand Down
Loading

0 comments on commit 3a592d2

Please sign in to comment.