Skip to content

Commit

Permalink
Add iterators to unified resource cache
Browse files Browse the repository at this point in the history
This updates the UnifiedResourceCache with IterateResources as an
alternative to IterateUnifiedResources. The new function returns
an iterator instead of collecting and returning a page of results.
While this API may not entirely replace the current one, it offers
a better way for users that just want to iterate resources without
collecting them. Additionally, a few helper methods were included
for callers that might wish to only iterate one specific resource
type. Internally the UnifiedResourceCache was refactored to use the
same logic for all exposed iteration methods.
  • Loading branch information
rosstimothy committed Feb 13, 2025
1 parent e01d19d commit 54f0fc6
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 133 deletions.
86 changes: 35 additions & 51 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,62 +1520,46 @@ func (a *Server) runPeriodicOperations() {
}()
case heartbeatCheckKey:
go func() {
req := &proto.ListUnifiedResourcesRequest{Kinds: []string{types.KindNode}, SortBy: types.SortBy{Field: types.ResourceKind}}

for {
_, next, err := a.UnifiedResourceCache.IterateUnifiedResources(a.closeCtx,
func(rwl types.ResourceWithLabels) (bool, error) {
srv, ok := rwl.(types.Server)
if !ok {
return false, nil
}
if services.NodeHasMissedKeepAlives(srv) {
heartbeatsMissedByAuth.Inc()
}

if srv.GetSubKind() != types.SubKindOpenSSHNode {
return false, nil
}
// TODO(tross) DELETE in v20.0.0 - all invalid hostnames should have been sanitized by then.
if !validServerHostname(srv.GetHostname()) {
logger := a.logger.With("server", srv.GetName(), "hostname", srv.GetHostname())

logger.DebugContext(a.closeCtx, "sanitizing invalid static SSH server hostname")
// Any existing static hosts will not have their
// hostname sanitized since they don't heartbeat.
if err := sanitizeHostname(srv); err != nil {
logger.WarnContext(a.closeCtx, "failed to sanitize static SSH server hostname", "error", err)
return false, nil
}

if _, err := a.Services.UpdateNode(a.closeCtx, srv); err != nil && !trace.IsCompareFailed(err) {
logger.WarnContext(a.closeCtx, "failed to update SSH server hostname", "error", err)
}
} else if oldHostname, ok := srv.GetLabel(replacedHostnameLabel); ok && validServerHostname(oldHostname) {
// If the hostname has been replaced by a sanitized version, revert it back to the original
// if the original is valid under the most recent rules.
logger := a.logger.With("server", srv.GetName(), "old_hostname", oldHostname, "sanitized_hostname", srv.GetHostname())
if err := restoreSanitizedHostname(srv); err != nil {
logger.WarnContext(a.closeCtx, "failed to restore sanitized static SSH server hostname", "error", err)
return false, nil
}
if _, err := a.Services.UpdateNode(a.closeCtx, srv); err != nil && !trace.IsCompareFailed(err) {
logger.WarnContext(a.closeCtx, "Failed to update node hostname", "error", err)
}
}

return false, nil
},
req,
)
for srv, err := range a.UnifiedResourceCache.IterateNodes(a.closeCtx, "", types.SortBy{Field: types.ResourceKind}) {
if err != nil {
a.logger.ErrorContext(a.closeCtx, "Failed to load nodes for heartbeat metric calculation", "error", err)
return
}

req.StartKey = next
if req.StartKey == "" {
break
if services.NodeHasMissedKeepAlives(srv) {
heartbeatsMissedByAuth.Inc()
}

if srv.GetSubKind() != types.SubKindOpenSSHNode {
continue
}

// TODO(tross) DELETE in v20.0.0 - all invalid hostnames should have been sanitized by then.
if !validServerHostname(srv.GetHostname()) {
logger := a.logger.With("server", srv.GetName(), "hostname", srv.GetHostname())

logger.DebugContext(a.closeCtx, "sanitizing invalid static SSH server hostname")
// Any existing static hosts will not have their
// hostname sanitized since they don't heartbeat.
if err := sanitizeHostname(srv); err != nil {
logger.WarnContext(a.closeCtx, "failed to sanitize static SSH server hostname", "error", err)
continue
}

if _, err := a.Services.UpdateNode(a.closeCtx, srv); err != nil && !trace.IsCompareFailed(err) {
logger.WarnContext(a.closeCtx, "failed to update SSH server hostname", "error", err)
}
} else if oldHostname, ok := srv.GetLabel(replacedHostnameLabel); ok && validServerHostname(oldHostname) {
// If the hostname has been replaced by a sanitized version, revert it back to the original
// if the original is valid under the most recent rules.
logger := a.logger.With("server", srv.GetName(), "old_hostname", oldHostname, "sanitized_hostname", srv.GetHostname())
if err := restoreSanitizedHostname(srv); err != nil {
logger.WarnContext(a.closeCtx, "failed to restore sanitized static SSH server hostname", "error", err)
continue
}
if _, err := a.Services.UpdateNode(a.closeCtx, srv); err != nil && !trace.IsCompareFailed(err) {
logger.WarnContext(a.closeCtx, "Failed to update node hostname", "error", err)
}
}
}
}()
Expand Down
Loading

0 comments on commit 54f0fc6

Please sign in to comment.