From 7e51b69f6aa95c0a27485cd1e54f917ef5b5890f Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 31 Oct 2022 16:52:02 -0700 Subject: [PATCH 01/18] wip task api via unix domain socket --- client/allocrunner/taskrunner/api_hook.go | 83 +++++++++++ client/allocrunner/taskrunner/task_runner.go | 6 +- .../taskrunner/task_runner_hooks.go | 1 + client/config/config.go | 17 +++ command/agent/agent.go | 19 ++- command/agent/agent_endpoint.go | 8 +- command/agent/alloc_endpoint.go | 2 +- command/agent/http.go | 132 ++++++++++++++++-- command/agent/job_endpoint.go | 2 +- command/agent/metrics_endpoint.go | 4 +- command/agent/variable_endpoint.go | 3 +- nomad/structs/structs.go | 8 ++ 12 files changed, 265 insertions(+), 20 deletions(-) create mode 100644 client/allocrunner/taskrunner/api_hook.go diff --git a/client/allocrunner/taskrunner/api_hook.go b/client/allocrunner/taskrunner/api_hook.go new file mode 100644 index 00000000000..c2d0261ec7e --- /dev/null +++ b/client/allocrunner/taskrunner/api_hook.go @@ -0,0 +1,83 @@ +package taskrunner + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "path/filepath" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/config" +) + +type apiHook struct { + srv config.APIListenerRegistrar + logger log.Logger + ln net.Listener +} + +func newAPIHook(srv config.APIListenerRegistrar, logger log.Logger) *apiHook { + h := &apiHook{ + srv: srv, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*apiHook) Name() string { + return "api" +} + +func (h *apiHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + if h.ln != nil { + //TODO(schmichael) remove me + h.logger.Trace("huh, called apiHook.Prestart twice") + return nil + } + + //TODO(schmichael) what dir & perms + udsDir := filepath.Join(req.TaskDir.SecretsDir, "run") + if err := os.MkdirAll(udsDir, 0o775); err != nil { + return fmt.Errorf("error creating api socket directory: %w", err) + } + + //TODO(schmichael) what name + udsPath := filepath.Join(udsDir, "nomad.socket") + if err := os.RemoveAll(udsPath); err != nil { + return fmt.Errorf("could not remove existing api socket: %w", err) + } + + udsln, err := net.Listen("unix", udsPath) + if err != nil { + return fmt.Errorf("could not create api socket: %w", err) + } + + go func() { + if err := h.srv.Serve(ctx, udsln); err != nil { + //TODO(schmichael) probably ignore http.ErrServerClosed + h.logger.Warn("error serving api", "error", err) + } + }() + + h.ln = udsln + return nil +} + +func (h *apiHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { + if h.ln == nil { + //TODO(schmichael) remove me + h.logger.Trace("huh, no listener") + return nil + } + + if err := h.ln.Close(); err != nil { + if errors.Is(err, net.ErrClosed) { + return nil + } + h.logger.Trace("error closing task listener: %v", err) + } + return nil +} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index e8107f19b4f..ee0dace73ab 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -426,7 +426,11 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { // Use the client secret only as the initial value; the identity hook will // update this with a workload identity if one is available - tr.setNomadToken(config.ClientConfig.Node.SecretID) + //TODO(schmichael) can we remove this entirely? seems like a huge + //security problem if we use accidentally use it instead of the + //workload identity + //tr.setNomadToken(config.ClientConfig.Node.SecretID) + tr.setNomadToken("Hi, this is a fake token that should never work or be used! Remove me.") // Initialize the runners hooks. Must come after initDriver so hooks // can use tr.driverCapabilities diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 7c1b73dbd9b..4dd9a6b7969 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -68,6 +68,7 @@ func (tr *TaskRunner) initHooks() { newArtifactHook(tr, tr.getter, hookLogger), newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), newDeviceHook(tr.devicemanager, hookLogger), + newAPIHook(tr.clientConfig.APIListenerRegistrar, hookLogger), } // If the task has a CSI block, add the hook. diff --git a/client/config/config.go b/client/config/config.go index 2945f6daa70..8c6ae89545e 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -1,8 +1,10 @@ package config import ( + "context" "errors" "fmt" + "net" "reflect" "strconv" "strings" @@ -301,10 +303,25 @@ type Config struct { // used for template functions which require access to the Nomad API. TemplateDialer *bufconndialer.BufConnWrapper + //TODO(schmichael) write something + APIListenerRegistrar APIListenerRegistrar + // Artifact configuration from the agent's config file. Artifact *ArtifactConfig } +type APIListenerRegistrar interface { + // Serve the HTTP API on the provided listener. The returned channel may be + // used to receive errors and is closed when the listener is no longer being + // served. If the agent is shutting down the error will be + // http.ErrServerClosed. + // + // The context is because Serve may be called before the HTTP server has been + // initialized. If the context is canceled before the HTTP server is + // initialized, the context's error will be returned on the chan. + Serve(context.Context, net.Listener) chan error +} + // ClientTemplateConfig is configuration on the client specific to template // rendering type ClientTemplateConfig struct { diff --git a/command/agent/agent.go b/command/agent/agent.go index ae3d3e6613c..54d4142035d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -115,7 +115,11 @@ type Agent struct { builtinListener net.Listener builtinDialer *bufconndialer.BufConnWrapper - InmemSink *metrics.InmemSink + //TODO(schmichael) builtinServer is an HTTP server for attaching + //per-task listeners. Always requires auth. + builtinServer *builtinAPI + + inmemSink *metrics.InmemSink } // NewAgent is used to create a new agent with the given configuration @@ -124,7 +128,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i config: config, logOutput: logOutput, shutdownCh: make(chan struct{}), - InmemSink: inmem, + inmemSink: inmem, } // Create the loggers @@ -1017,9 +1021,15 @@ func (a *Agent) setupClient() error { // running consul-template functions that utilize the Nomad API. We lazy // load this into the client config, therefore this needs to happen before // we call NewClient. + //TODO migrate to APIListenerRegistrar a.builtinListener, a.builtinDialer = bufconndialer.New() conf.TemplateDialer = a.builtinDialer + // Initialize builtin API server here for use in the client, but it won't + // accept connections until the HTTP servers are created. + a.builtinServer = newBuiltinAPI() + conf.APIListenerRegistrar = a.builtinServer + nomadClient, err := client.NewClient( conf, a.consulCatalog, a.consulProxies, a.consulService, nil) if err != nil { @@ -1300,6 +1310,11 @@ func (a *Agent) GetConfig() *Config { return a.config } +// GetMetricsSink returns the metrics sink. +func (a *Agent) GetMetricsSink() *metrics.InmemSink { + return a.inmemSink +} + // setupConsul creates the Consul client and starts its main Run loop. func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error { apiConf, err := consulConfig.ApiConfig() diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 898001ca8cc..21df7c3b3b2 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -440,7 +440,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i return nil, structs.ErrPermissionDenied } - peers := s.agent.client.GetServers() + peers := client.GetServers() sort.Strings(peers) return peers, nil } @@ -468,9 +468,9 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request) } // Set the servers list into the client - s.agent.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT") + s.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT") if _, err := client.SetServers(servers); err != nil { - s.agent.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT") + s.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT") //TODO is this the right error to return? return nil, CodedError(400, err.Error()) } @@ -708,7 +708,7 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques // The RPC endpoint actually forwards the request to the correct // agent, but we need to use the correct RPC interface. localClient, remoteClient, localServer := s.rpcHandlerForNode(lookupNodeID) - s.agent.logger.Debug("s.rpcHandlerForNode()", "lookupNodeID", lookupNodeID, "serverID", serverID, "nodeID", nodeID, "localClient", localClient, "remoteClient", remoteClient, "localServer", localServer) + s.logger.Debug("s.rpcHandlerForNode()", "lookupNodeID", lookupNodeID, "serverID", serverID, "nodeID", nodeID, "localClient", localClient, "remoteClient", remoteClient, "localServer", localServer) // Make the RPC call if localClient { diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 18b4dfdc3f3..ee071e7dbed 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -222,7 +222,7 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ case "exec": return s.allocExec(allocID, resp, req) case "snapshot": - if s.agent.client == nil { + if s.agent.Client() == nil { return nil, clientNotRunning } return s.allocSnapshot(allocID, resp, req) diff --git a/command/agent/http.go b/command/agent/http.go index da3580e7728..766f65f8206 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -2,6 +2,7 @@ package agent import ( "bytes" + "context" "crypto/tls" "encoding/json" "errors" @@ -26,8 +27,10 @@ import ( "golang.org/x/time/rate" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/helper/noxssrw" "github.com/hashicorp/nomad/helper/tlsutil" + "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" ) @@ -74,9 +77,18 @@ var ( type handlerFn func(resp http.ResponseWriter, req *http.Request) (interface{}, error) type handlerByteFn func(resp http.ResponseWriter, req *http.Request) ([]byte, error) +type RPCer interface { + RPC(string, any, any) error + Server() *nomad.Server + Client() *client.Client + Stats() map[string]map[string]string + GetConfig() *Config + GetMetricsSink() *metrics.InmemSink +} + // HTTPServer is used to wrap an Agent and expose it over an HTTP interface type HTTPServer struct { - agent *Agent + agent RPCer mux *http.ServeMux listener net.Listener listenerCh chan struct{} @@ -170,7 +182,7 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { srvs = append(srvs, srv) } - // This HTTP server is only create when running in client mode, otherwise + // This HTTP server is only created when running in client mode, otherwise // the builtinDialer and builtinListener will be nil. if agent.builtinDialer != nil && agent.builtinListener != nil { srv := &HTTPServer{ @@ -185,12 +197,15 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { srv.registerHandlers(config.EnableDebug) + // builtinServer adds a wrapper to always authenticate requests httpServer := http.Server{ Addr: srv.Addr, - Handler: srv.mux, + Handler: newAuthMiddleware(srv, srv.mux), ErrorLog: newHTTPServerLogger(srv.logger), } + agent.builtinServer.SetServer(&httpServer) + go func() { defer close(srv.listenerCh) httpServer.Serve(agent.builtinListener) @@ -199,6 +214,8 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { srvs = append(srvs, srv) } + // This HTTP server is only + if serverInitializationErrors != nil { for _, srv := range srvs { srv.Shutdown() @@ -465,7 +482,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.Handle("/v1/vars", wrapCORS(s.wrap(s.VariablesListRequest))) s.mux.Handle("/v1/var/", wrapCORSWithAllowedMethods(s.wrap(s.VariableSpecificRequest), "HEAD", "GET", "PUT", "DELETE")) - uiConfigEnabled := s.agent.config.UI != nil && s.agent.config.UI.Enabled + agentConfig := s.agent.GetConfig() + uiConfigEnabled := agentConfig.UI != nil && agentConfig.UI.Enabled if uiEnabled && uiConfigEnabled { s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()})))) @@ -484,7 +502,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.Handle("/", s.handleRootFallthrough()) if enableDebug { - if !s.agent.config.DevMode { + if !agentConfig.DevMode { s.logger.Warn("enable_debug is set to true. This is insecure and should not be enabled in production") } s.mux.HandleFunc("/debug/pprof/", pprof.Index) @@ -498,6 +516,53 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.registerEnterpriseHandlers() } +type builtinAPI struct { + srv *http.Server + srvReadyCh chan struct{} +} + +func newBuiltinAPI() *builtinAPI { + return &builtinAPI{ + srvReadyCh: make(chan struct{}), + } +} + +// SetServer sets the API HTTP server for Serve to add listeners to. +// +// It must be called exactly once and will panic if called more than once. +func (b *builtinAPI) SetServer(srv *http.Server) { + select { + case <-b.srvReadyCh: + panic(fmt.Sprintf("SetServer called twice. first=%p second=%p", b.srv, srv)) + default: + } + b.srv = srv + close(b.srvReadyCh) +} + +// Serve the HTTP API on the listener unless the context is canceled before the +// HTTP API is ready to serve listeners. A non-nil error will always be +// returned on the chan. +func (b *builtinAPI) Serve(ctx context.Context, l net.Listener) chan error { + errCh := make(chan error, 1) + select { + case <-ctx.Done(): + // Caller canceled context before server was ready. + errCh <- ctx.Err() + close(errCh) + return errCh + case <-b.srvReadyCh: + // Server ready for listeners! Continue on... + } + + go func() { + defer close(errCh) + errCh <- b.srv.Serve(l) + }() + + return errCh +} + // HTTPCodedError is used to provide the HTTP error code type HTTPCodedError interface { error @@ -591,7 +656,7 @@ func errCodeFromHandler(err error) (int, string) { // wrap is used to wrap functions to make them more convenient func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { f := func(resp http.ResponseWriter, req *http.Request) { - setHeaders(resp, s.agent.config.HTTPAPIResponseHeaders) + setHeaders(resp, s.agent.GetConfig().HTTPAPIResponseHeaders) // Invoke the handler reqURL := req.URL.String() start := time.Now() @@ -673,7 +738,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque // Handler functions are responsible for setting Content-Type Header func (s *HTTPServer) wrapNonJSON(handler func(resp http.ResponseWriter, req *http.Request) ([]byte, error)) func(resp http.ResponseWriter, req *http.Request) { f := func(resp http.ResponseWriter, req *http.Request) { - setHeaders(resp, s.agent.config.HTTPAPIResponseHeaders) + setHeaders(resp, s.agent.GetConfig().HTTPAPIResponseHeaders) // Invoke the handler reqURL := req.URL.String() start := time.Now() @@ -817,7 +882,7 @@ func (s *HTTPServer) parseRegion(req *http.Request, r *string) { if other := req.URL.Query().Get("region"); other != "" { *r = other } else if *r == "" { - *r = s.agent.config.Region + *r = s.agent.GetConfig().Region } } @@ -976,3 +1041,54 @@ func wrapCORS(f func(http.ResponseWriter, *http.Request)) http.Handler { func wrapCORSWithAllowedMethods(f func(http.ResponseWriter, *http.Request), methods ...string) http.Handler { return allowCORSWithMethods(methods...).Handler(http.HandlerFunc(f)) } + +// TODO(schmichael) caching - see client/acl.go +// TODO(schmichael) where should this thing live +type authMiddleware struct { + srv *HTTPServer + wrapped http.Handler +} + +func newAuthMiddleware(srv *HTTPServer, h http.Handler) http.Handler { + return &authMiddleware{ + srv: srv, + wrapped: h, + } +} + +func (a *authMiddleware) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + args := structs.GenericRequest{} + reply := structs.ACLWhoAmIResponse{} + if a.srv.parse(resp, req, &args.Region, &args.QueryOptions) { + // Error parsing request, 400 + resp.WriteHeader(http.StatusBadRequest) + resp.Write([]byte("Invalid request parameters\n")) + return + } + + if args.AuthToken == "" { + // 401 instead of 403 since no token was present. + resp.WriteHeader(http.StatusUnauthorized) + resp.Write([]byte("Authorization required\n")) + return + } + + if err := a.srv.agent.RPC("ACL.WhoAmI", &args, &reply); err != nil { + a.srv.logger.Error("error authenticating built API request", "error", err, "url", req.URL, "method", req.Method) + resp.WriteHeader(500) + resp.Write([]byte("Server error authenticating request\n")) + return + } + + //TODO(schmichael) this is janky but works? + // Require an acl token or workload identity + if reply.Identity == nil || (reply.Identity.ACLToken == nil && reply.Identity.Claims == nil) { + resp.WriteHeader(http.StatusForbidden) + resp.Write([]byte("Forbidden\n")) + return + } + + a.srv.logger.Trace("Authenticated request", "id", reply.Identity, "method", req.Method, "url", req.URL) + a.wrapped.ServeHTTP(resp, req) + return +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 47f5fce9cb7..407e782b660 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -819,7 +819,7 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request, queryRegion := req.URL.Query().Get("region") requestRegion, jobRegion := regionForJob( - job, queryRegion, writeReq.Region, s.agent.config.Region, + job, queryRegion, writeReq.Region, s.agent.GetConfig().Region, ) sJob := ApiJobToStructJob(job) diff --git a/command/agent/metrics_endpoint.go b/command/agent/metrics_endpoint.go index 7233492ae04..90686cb3e41 100644 --- a/command/agent/metrics_endpoint.go +++ b/command/agent/metrics_endpoint.go @@ -25,14 +25,14 @@ func (s *HTTPServer) MetricsRequest(resp http.ResponseWriter, req *http.Request) // Only return Prometheus formatted metrics if the user has enabled // this functionality. - if !s.agent.config.Telemetry.PrometheusMetrics { + if !s.agent.GetConfig().Telemetry.PrometheusMetrics { return nil, CodedError(http.StatusUnsupportedMediaType, "Prometheus is not enabled") } s.prometheusHandler().ServeHTTP(resp, req) return nil, nil } - return s.agent.InmemSink.DisplayMetrics(resp, req) + return s.agent.GetMetricsSink().DisplayMetrics(resp, req) } func (s *HTTPServer) prometheusHandler() http.Handler { diff --git a/command/agent/variable_endpoint.go b/command/agent/variable_endpoint.go index 17f55ac9c38..bbf8b03bc9d 100644 --- a/command/agent/variable_endpoint.go +++ b/command/agent/variable_endpoint.go @@ -16,7 +16,8 @@ func (s *HTTPServer) VariablesListRequest(resp http.ResponseWriter, req *http.Re args := structs.VariablesListRequest{} if s.parse(resp, req, &args.Region, &args.QueryOptions) { - return nil, nil + //TODO(schmichael) shouldn't we return something here?! + return nil, CodedError(http.StatusBadRequest, "failed to parse parameters") } var out structs.VariablesListResponse diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index eb05ade656b..82553c1b552 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10632,6 +10632,14 @@ type IdentityClaims struct { jwt.RegisteredClaims } +// TODO(schmichael) this is just for debugging, feel free to remove +// String version of the identity. No secrets, safe for display. +func (ic *IdentityClaims) String() string { + //TODO(schmichael) use "jwt" or something vendor specific like "nwi" + //for Nomad Workload Identity here? + return fmt.Sprintf("jwt:%s/%s/%s/%s", ic.Namespace, ic.JobID, ic.AllocationID, ic.TaskName) +} + // AllocationDiff is another named type for Allocation (to use the same fields), // which is used to represent the delta for an Allocation. If you need a method // defined on the al From 228aef922ca26f67b9cf6983f48166183a4c790d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 2 Feb 2023 14:33:57 -0800 Subject: [PATCH 02/18] make whoami always validate jwts; fix race between api serve & context --- client/allocrunner/taskrunner/api_hook.go | 9 +++++---- client/config/config.go | 10 ++++------ command/agent/http.go | 17 +++++------------ nomad/acl_endpoint.go | 13 ++++++++++++- nomad/structs/structs.go | 12 ++++++------ 5 files changed, 32 insertions(+), 29 deletions(-) diff --git a/client/allocrunner/taskrunner/api_hook.go b/client/allocrunner/taskrunner/api_hook.go index c2d0261ec7e..6a3f7575eaf 100644 --- a/client/allocrunner/taskrunner/api_hook.go +++ b/client/allocrunner/taskrunner/api_hook.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "net/http" "os" "path/filepath" @@ -44,7 +45,6 @@ func (h *apiHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequ return fmt.Errorf("error creating api socket directory: %w", err) } - //TODO(schmichael) what name udsPath := filepath.Join(udsDir, "nomad.socket") if err := os.RemoveAll(udsPath); err != nil { return fmt.Errorf("could not remove existing api socket: %w", err) @@ -56,9 +56,10 @@ func (h *apiHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequ } go func() { - if err := h.srv.Serve(ctx, udsln); err != nil { - //TODO(schmichael) probably ignore http.ErrServerClosed - h.logger.Warn("error serving api", "error", err) + // Cannot use Prestart's context as it is closed after all prestart hooks + // have been closed. + if err := h.srv.Serve(context.TODO(), udsln); err != http.ErrServerClosed { + h.logger.Error("error serving api", "error", err) } }() diff --git a/client/config/config.go b/client/config/config.go index 8c6ae89545e..cf2ffc7b905 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -311,15 +311,13 @@ type Config struct { } type APIListenerRegistrar interface { - // Serve the HTTP API on the provided listener. The returned channel may be - // used to receive errors and is closed when the listener is no longer being - // served. If the agent is shutting down the error will be - // http.ErrServerClosed. + // Serve the HTTP API on the provided listener. If the agent is shutting down + // the error will be http.ErrServerClosed. // // The context is because Serve may be called before the HTTP server has been // initialized. If the context is canceled before the HTTP server is - // initialized, the context's error will be returned on the chan. - Serve(context.Context, net.Listener) chan error + // initialized, the context's error will be returned. + Serve(context.Context, net.Listener) error } // ClientTemplateConfig is configuration on the client specific to template diff --git a/command/agent/http.go b/command/agent/http.go index 766f65f8206..1bc541a7697 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -542,25 +542,17 @@ func (b *builtinAPI) SetServer(srv *http.Server) { // Serve the HTTP API on the listener unless the context is canceled before the // HTTP API is ready to serve listeners. A non-nil error will always be -// returned on the chan. -func (b *builtinAPI) Serve(ctx context.Context, l net.Listener) chan error { - errCh := make(chan error, 1) +// returned, but http.ErrServerClosed can likely be ignored. +func (b *builtinAPI) Serve(ctx context.Context, l net.Listener) error { select { case <-ctx.Done(): // Caller canceled context before server was ready. - errCh <- ctx.Err() - close(errCh) - return errCh + return ctx.Err() case <-b.srvReadyCh: // Server ready for listeners! Continue on... } - go func() { - defer close(errCh) - errCh <- b.srv.Serve(l) - }() - - return errCh + return b.srv.Serve(l) } // HTTPCodedError is used to provide the HTTP error code @@ -1083,6 +1075,7 @@ func (a *authMiddleware) ServeHTTP(resp http.ResponseWriter, req *http.Request) //TODO(schmichael) this is janky but works? // Require an acl token or workload identity if reply.Identity == nil || (reply.Identity.ACLToken == nil && reply.Identity.Claims == nil) { + a.srv.logger.Debug("Failed to authenticated Task API request", "method", req.Method, "url", req.URL) resp.WriteHeader(http.StatusForbidden) resp.Write([]byte("Forbidden\n")) return diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index 3df43b4d73e..58064b70995 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -2102,7 +2102,9 @@ func (a *ACL) GetAuthMethods( } // WhoAmI is a RPC for debugging authentication. This endpoint returns the same -// AuthenticatedIdentity that will be used by RPC handlers. +// AuthenticatedIdentity that will be used by RPC handlers, but unlike other +// endpoints will try to authenticate workload identities even if ACLs are +// disabled. // // TODO: At some point we might want to give this an equivalent HTTP endpoint // once other Workload Identity work is solidified @@ -2118,6 +2120,15 @@ func (a *ACL) WhoAmI(args *structs.GenericRequest, reply *structs.ACLWhoAmIRespo defer metrics.MeasureSince([]string{"nomad", "acl", "whoami"}, time.Now()) + if !a.srv.config.ACLEnabled { + // Authenticate never verifies claimed when ACLs are disabled, but since + // this endpoint is explicitly for resolving identities, always try to + // verify any claims. + if claims, _ := a.srv.VerifyClaim(args.AuthToken); claims != nil { + args.SetIdentity(&structs.AuthenticatedIdentity{Claims: claims}) + } + } + reply.Identity = args.GetIdentity() return nil } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 82553c1b552..a76e5c2ab6e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -501,12 +501,6 @@ func (ai *AuthenticatedIdentity) GetClaims() *IdentityClaims { return ai.Claims } -type RequestWithIdentity interface { - GetAuthToken() string - SetIdentity(identity *AuthenticatedIdentity) - GetIdentity() *AuthenticatedIdentity -} - func (ai *AuthenticatedIdentity) String() string { if ai == nil { return "unauthenticated" @@ -523,6 +517,12 @@ func (ai *AuthenticatedIdentity) String() string { return fmt.Sprintf("%s:%s", ai.TLSName, ai.RemoteIP.String()) } +type RequestWithIdentity interface { + GetAuthToken() string + SetIdentity(identity *AuthenticatedIdentity) + GetIdentity() *AuthenticatedIdentity +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { From 3f6b5e160a50ca829323af29fb3cafb8c406a257 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 2 Feb 2023 16:49:20 -0800 Subject: [PATCH 03/18] cleanup socket handling code --- .../allocrunner/interfaces/task_lifecycle.go | 3 ++ client/allocrunner/taskrunner/api_hook.go | 44 +++++++++---------- .../taskrunner/task_runner_hooks.go | 4 +- command/agent/http.go | 12 ++++- 4 files changed, 38 insertions(+), 25 deletions(-) diff --git a/client/allocrunner/interfaces/task_lifecycle.go b/client/allocrunner/interfaces/task_lifecycle.go index 1bf61bd5a0a..cb4cdd0af26 100644 --- a/client/allocrunner/interfaces/task_lifecycle.go +++ b/client/allocrunner/interfaces/task_lifecycle.go @@ -186,6 +186,9 @@ type TaskStopRequest struct { // ExistingState is previously set hook data and should only be // read. Stop hooks cannot alter state. ExistingState map[string]string + + // TaskDir contains the task's directory tree on the host + TaskDir *allocdir.TaskDir } type TaskStopResponse struct{} diff --git a/client/allocrunner/taskrunner/api_hook.go b/client/allocrunner/taskrunner/api_hook.go index 6a3f7575eaf..5b5e1e74e85 100644 --- a/client/allocrunner/taskrunner/api_hook.go +++ b/client/allocrunner/taskrunner/api_hook.go @@ -9,18 +9,18 @@ import ( "os" "path/filepath" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" ) type apiHook struct { srv config.APIListenerRegistrar - logger log.Logger + logger hclog.Logger ln net.Listener } -func newAPIHook(srv config.APIListenerRegistrar, logger log.Logger) *apiHook { +func newAPIHook(srv config.APIListenerRegistrar, logger hclog.Logger) *apiHook { h := &apiHook{ srv: srv, } @@ -32,14 +32,7 @@ func (*apiHook) Name() string { return "api" } -func (h *apiHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { - if h.ln != nil { - //TODO(schmichael) remove me - h.logger.Trace("huh, called apiHook.Prestart twice") - return nil - } - - //TODO(schmichael) what dir & perms +func (h *apiHook) Prestart(_ context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { udsDir := filepath.Join(req.TaskDir.SecretsDir, "run") if err := os.MkdirAll(udsDir, 0o775); err != nil { return fmt.Errorf("error creating api socket directory: %w", err) @@ -58,7 +51,13 @@ func (h *apiHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequ go func() { // Cannot use Prestart's context as it is closed after all prestart hooks // have been closed. - if err := h.srv.Serve(context.TODO(), udsln); err != http.ErrServerClosed { + if err := h.srv.Serve(context.TODO(), udsln); err != nil { + if errors.Is(err, http.ErrServerClosed) { + return + } + if errors.Is(err, net.ErrClosed) { + return + } h.logger.Error("error serving api", "error", err) } }() @@ -68,17 +67,18 @@ func (h *apiHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequ } func (h *apiHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { - if h.ln == nil { - //TODO(schmichael) remove me - h.logger.Trace("huh, no listener") - return nil - } - - if err := h.ln.Close(); err != nil { - if errors.Is(err, net.ErrClosed) { - return nil + if h.ln != nil { + if err := h.ln.Close(); err != nil { + if !errors.Is(err, net.ErrClosed) { + h.logger.Trace("error closing task listener: %v", err) + } } - h.logger.Trace("error closing task listener: %v", err) } + + // Best-effort at cleaining things up. Alloc dir cleanup will remove it if + // this fails for any reason. + udsPath := filepath.Join(req.TaskDir.SecretsDir, "run", "nomad.socket") + _ = os.RemoveAll(udsPath) + return nil } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 4dd9a6b7969..4c498995bdb 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -432,7 +432,9 @@ func (tr *TaskRunner) stop() error { tr.logger.Trace("running stop hook", "name", name, "start", start) } - req := interfaces.TaskStopRequest{} + req := interfaces.TaskStopRequest{ + TaskDir: tr.taskDir, + } origHookState := tr.hookState(name) if origHookState != nil { diff --git a/command/agent/http.go b/command/agent/http.go index 1bc541a7697..cca6f3f75fd 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -516,6 +516,14 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.registerEnterpriseHandlers() } +// builtinAPI is a wrapper around serving the HTTP API to arbitrary listeners +// such as the Task API. It is necessary because the HTTP servers are created +// *after* the client has been initialized, so this wrapper blocks Serve +// requests from task api hooks until the HTTP server is setup and ready to +// accept from new listeners. +// +// bufconndialer provides similar functionality to consul-template except it +// satisfies the Dialer API as opposed to the Serve(Listener) API. type builtinAPI struct { srv *http.Server srvReadyCh chan struct{} @@ -542,7 +550,8 @@ func (b *builtinAPI) SetServer(srv *http.Server) { // Serve the HTTP API on the listener unless the context is canceled before the // HTTP API is ready to serve listeners. A non-nil error will always be -// returned, but http.ErrServerClosed can likely be ignored. +// returned, but http.ErrServerClosed and net.ErrClosed can likely be ignored +// as they indicate the server or listener is being shutdown. func (b *builtinAPI) Serve(ctx context.Context, l net.Listener) error { select { case <-ctx.Done(): @@ -1072,7 +1081,6 @@ func (a *authMiddleware) ServeHTTP(resp http.ResponseWriter, req *http.Request) return } - //TODO(schmichael) this is janky but works? // Require an acl token or workload identity if reply.Identity == nil || (reply.Identity.ACLToken == nil && reply.Identity.Claims == nil) { a.srv.logger.Debug("Failed to authenticated Task API request", "method", req.Method, "url", req.URL) From f75c7d4444709499048efdb293638296da283009 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 12:04:02 -0800 Subject: [PATCH 04/18] shorten path and add test --- .../allocrunner/interfaces/task_lifecycle.go | 2 - client/allocrunner/taskrunner/api_hook.go | 58 +++++--- .../allocrunner/taskrunner/api_hook_test.go | 140 ++++++++++++++++++ .../taskrunner/task_runner_hooks.go | 2 +- client/config/config.go | 3 +- 5 files changed, 181 insertions(+), 24 deletions(-) create mode 100644 client/allocrunner/taskrunner/api_hook_test.go diff --git a/client/allocrunner/interfaces/task_lifecycle.go b/client/allocrunner/interfaces/task_lifecycle.go index cb4cdd0af26..3ea51c4a90a 100644 --- a/client/allocrunner/interfaces/task_lifecycle.go +++ b/client/allocrunner/interfaces/task_lifecycle.go @@ -33,8 +33,6 @@ import ( +-----------+ *Kill (forces terminal) - -Link: http://stable.ascii-flow.appspot.com/#Draw4489375405966393064/1824429135 */ // TaskHook is a lifecycle hook into the life cycle of a task runner. diff --git a/client/allocrunner/taskrunner/api_hook.go b/client/allocrunner/taskrunner/api_hook.go index 5b5e1e74e85..13144c211f8 100644 --- a/client/allocrunner/taskrunner/api_hook.go +++ b/client/allocrunner/taskrunner/api_hook.go @@ -3,26 +3,39 @@ package taskrunner import ( "context" "errors" - "fmt" "net" "net/http" "os" "path/filepath" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" ) +// apiHook exposes the Task API. The Task API allows task's to access the Nomad +// HTTP API without having to discover and connect to an agent's address. +// Instead a unix socket is provided in a standard location. To prevent access +// by untrusted workloads the Task API always requires authentication even when +// ACLs are disabled. +// +// The Task API hook largely soft-fails as there are a number of ways creating +// the unix socket could fail (the most common one being path length +// restrictions), and it is assumed most tasks won't require access to the Task +// API anyway. Tasks that do require access are expected to crash and get +// rescheduled should they land on a client who Task API hook soft-fails. type apiHook struct { - srv config.APIListenerRegistrar - logger hclog.Logger - ln net.Listener + shutdownCtx context.Context + srv config.APIListenerRegistrar + logger hclog.Logger + ln net.Listener } -func newAPIHook(srv config.APIListenerRegistrar, logger hclog.Logger) *apiHook { +func newAPIHook(shutdownCtx context.Context, srv config.APIListenerRegistrar, logger hclog.Logger) *apiHook { h := &apiHook{ - srv: srv, + shutdownCtx: shutdownCtx, + srv: srv, } h.logger = logger.Named(h.Name()) return h @@ -33,25 +46,21 @@ func (*apiHook) Name() string { } func (h *apiHook) Prestart(_ context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { - udsDir := filepath.Join(req.TaskDir.SecretsDir, "run") - if err := os.MkdirAll(udsDir, 0o775); err != nil { - return fmt.Errorf("error creating api socket directory: %w", err) - } - - udsPath := filepath.Join(udsDir, "nomad.socket") + udsPath := apiSocketPath(req.TaskDir) if err := os.RemoveAll(udsPath); err != nil { - return fmt.Errorf("could not remove existing api socket: %w", err) + h.logger.Warn("error removing task api socket", "path", udsPath, "error", err) } udsln, err := net.Listen("unix", udsPath) if err != nil { - return fmt.Errorf("could not create api socket: %w", err) + h.logger.Warn("error listening on task api socket", "path", udsPath, "error", err) + return nil } go func() { // Cannot use Prestart's context as it is closed after all prestart hooks - // have been closed. - if err := h.srv.Serve(context.TODO(), udsln); err != nil { + // have been closed, but we do want to try to cleanup on shutdown. + if err := h.srv.Serve(h.shutdownCtx, udsln); err != nil { if errors.Is(err, http.ErrServerClosed) { return } @@ -70,15 +79,26 @@ func (h *apiHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, res if h.ln != nil { if err := h.ln.Close(); err != nil { if !errors.Is(err, net.ErrClosed) { - h.logger.Trace("error closing task listener: %v", err) + h.logger.Debug("error closing task listener: %v", err) } } + h.ln = nil } // Best-effort at cleaining things up. Alloc dir cleanup will remove it if // this fails for any reason. - udsPath := filepath.Join(req.TaskDir.SecretsDir, "run", "nomad.socket") - _ = os.RemoveAll(udsPath) + _ = os.RemoveAll(apiSocketPath(req.TaskDir)) return nil } + +// apiSocketPath returns the path to the Task API socket. +// +// The path needs to be as short as possible because of the low limits on the +// sun_path char array imposed by the syscall used to create unix sockets. +// +// See https://github.com/hashicorp/nomad/pull/13971 for an example of the +// sadness this causes. +func apiSocketPath(taskDir *allocdir.TaskDir) string { + return filepath.Join(taskDir.SecretsDir, "api.sock") +} diff --git a/client/allocrunner/taskrunner/api_hook_test.go b/client/allocrunner/taskrunner/api_hook_test.go new file mode 100644 index 00000000000..85b2c7e1975 --- /dev/null +++ b/client/allocrunner/taskrunner/api_hook_test.go @@ -0,0 +1,140 @@ +package taskrunner + +import ( + "context" + "io/fs" + "net" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/shoenig/test/must" +) + +type testAPIListenerRegistrar struct { + cb func(net.Listener) error +} + +func (n testAPIListenerRegistrar) Serve(_ context.Context, ln net.Listener) error { + if n.cb != nil { + return n.cb(ln) + } + return nil +} + +// TestAPIHook_SoftFail asserts that the Task API Hook soft fails and does not +// return errors. +func TestAPIHook_SoftFail(t *testing.T) { + ci.Parallel(t) + + // Use a SecretsDir that will always exceed Unix socket path length + // limits (sun_path) + dst := filepath.Join(t.TempDir(), strings.Repeat("_NOMAD_TEST_", 500)) + + ctx := context.Background() + srv := testAPIListenerRegistrar{} + logger := testlog.HCLogger(t) + h := newAPIHook(ctx, srv, logger) + + req := &interfaces.TaskPrestartRequest{ + TaskDir: &allocdir.TaskDir{ + SecretsDir: dst, + }, + } + resp := &interfaces.TaskPrestartResponse{} + + err := h.Prestart(ctx, req, resp) + must.NoError(t, err) + + // listener should not have been set + must.Nil(t, h.ln) + + // File should not have been created + _, err = os.Stat(dst) + must.Error(t, err) + + // Assert stop also soft-fails + stopReq := &interfaces.TaskStopRequest{ + TaskDir: req.TaskDir, + } + stopResp := &interfaces.TaskStopResponse{} + err = h.Stop(ctx, stopReq, stopResp) + must.NoError(t, err) + + // File should not have been created + _, err = os.Stat(dst) + must.Error(t, err) +} + +// TestAPIHook_Ok asserts that the Task API Hook creates and cleans up a +// socket. +func TestAPIHook_Ok(t *testing.T) { + ci.Parallel(t) + + // If this test fails it may be because TempDir() + /api.sock is longer than + // the unix socket path length limit (sun_path) in which case the test should + // use a different temporary directory on that platform. + dst := t.TempDir() + + // Write "ok" and close the connection and listener + srv := testAPIListenerRegistrar{ + cb: func(ln net.Listener) error { + conn, err := ln.Accept() + if err != nil { + return err + } + if _, err = conn.Write([]byte("ok")); err != nil { + return err + } + conn.Close() + return nil + }, + } + + ctx := context.Background() + logger := testlog.HCLogger(t) + h := newAPIHook(ctx, srv, logger) + + req := &interfaces.TaskPrestartRequest{ + TaskDir: &allocdir.TaskDir{ + SecretsDir: dst, + }, + } + resp := &interfaces.TaskPrestartResponse{} + + err := h.Prestart(ctx, req, resp) + must.NoError(t, err) + + // File should have been created + sockDst := apiSocketPath(req.TaskDir) + stat, err := os.Stat(sockDst) + must.NoError(t, err) + must.True(t, stat.Mode()&fs.ModeSocket != 0, + must.Sprintf("expected %q to be a unix socket but got %s", sockDst, stat.Mode())) + + // Assert the listener is working + conn, err := net.Dial("unix", sockDst) + must.NoError(t, err) + buf := make([]byte, 2) + _, err = conn.Read(buf) + must.NoError(t, err) + must.Eq(t, []byte("ok"), buf) + conn.Close() + + // Assert stop cleans up + stopReq := &interfaces.TaskStopRequest{ + TaskDir: req.TaskDir, + } + stopResp := &interfaces.TaskStopResponse{} + err = h.Stop(ctx, stopReq, stopResp) + must.NoError(t, err) + + // File should be gone + _, err = net.Dial("unix", sockDst) + must.Error(t, err) +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 4c498995bdb..7ea50198288 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -68,7 +68,7 @@ func (tr *TaskRunner) initHooks() { newArtifactHook(tr, tr.getter, hookLogger), newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), newDeviceHook(tr.devicemanager, hookLogger), - newAPIHook(tr.clientConfig.APIListenerRegistrar, hookLogger), + newAPIHook(tr.shutdownCtx, tr.clientConfig.APIListenerRegistrar, hookLogger), } // If the task has a CSI block, add the hook. diff --git a/client/config/config.go b/client/config/config.go index cf2ffc7b905..6523c978761 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -311,8 +311,7 @@ type Config struct { } type APIListenerRegistrar interface { - // Serve the HTTP API on the provided listener. If the agent is shutting down - // the error will be http.ErrServerClosed. + // Serve the HTTP API on the provided listener. // // The context is because Serve may be called before the HTTP server has been // initialized. If the context is canceled before the HTTP server is From 6c71f18fc471a061eb598ea9728a3819751b6826 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 12:20:57 -0800 Subject: [PATCH 05/18] windows will be the end of me --- client/allocrunner/taskrunner/api_hook_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/client/allocrunner/taskrunner/api_hook_test.go b/client/allocrunner/taskrunner/api_hook_test.go index 85b2c7e1975..e5ce0822ab1 100644 --- a/client/allocrunner/taskrunner/api_hook_test.go +++ b/client/allocrunner/taskrunner/api_hook_test.go @@ -6,6 +6,7 @@ import ( "net" "os" "path/filepath" + "runtime" "strings" "testing" @@ -112,10 +113,14 @@ func TestAPIHook_Ok(t *testing.T) { // File should have been created sockDst := apiSocketPath(req.TaskDir) - stat, err := os.Stat(sockDst) - must.NoError(t, err) - must.True(t, stat.Mode()&fs.ModeSocket != 0, - must.Sprintf("expected %q to be a unix socket but got %s", sockDst, stat.Mode())) + + // Stat on Windows fails on sockets + if runtime.GOOS != "windows" { + stat, err := os.Stat(sockDst) + must.NoError(t, err) + must.True(t, stat.Mode()&fs.ModeSocket != 0, + must.Sprintf("expected %q to be a unix socket but got %s", sockDst, stat.Mode())) + } // Assert the listener is working conn, err := net.Dial("unix", sockDst) From e41683f85e15e1e3a6ac136227c62ed00c77cd6d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 13:05:05 -0800 Subject: [PATCH 06/18] stub out task api in tests --- client/config/testing.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/client/config/testing.go b/client/config/testing.go index 02f87984f57..145b1626d39 100644 --- a/client/config/testing.go +++ b/client/config/testing.go @@ -1,7 +1,9 @@ package config import ( + "context" "io/ioutil" + "net" "os" "path/filepath" "time" @@ -74,5 +76,14 @@ func TestClientConfig(t testing.T) (*Config, func()) { // Same as default; necessary for task Event messages conf.MaxKillTimeout = 30 * time.Second + // Provide a stub APIListenerRegistrar implementation + conf.APIListenerRegistrar = noopAPIListenerRegistrar{} + return conf, cleanup } + +type noopAPIListenerRegistrar struct{} + +func (noopAPIListenerRegistrar) Serve(_ context.Context, _ net.Listener) error { + return nil +} From b80763b960bcef904c2ca431819286c5fd4b987d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 14:23:21 -0800 Subject: [PATCH 07/18] fix panic on task restart --- client/allocrunner/taskrunner/api_hook.go | 5 +++++ command/agent/http.go | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/client/allocrunner/taskrunner/api_hook.go b/client/allocrunner/taskrunner/api_hook.go index 13144c211f8..81974e79a86 100644 --- a/client/allocrunner/taskrunner/api_hook.go +++ b/client/allocrunner/taskrunner/api_hook.go @@ -46,6 +46,11 @@ func (*apiHook) Name() string { } func (h *apiHook) Prestart(_ context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + if h.ln != nil { + // Listener already set. Task is probably restarting. + return nil + } + udsPath := apiSocketPath(req.TaskDir) if err := os.RemoveAll(udsPath); err != nil { h.logger.Warn("error removing task api socket", "path", udsPath, "error", err) diff --git a/command/agent/http.go b/command/agent/http.go index cca6f3f75fd..1ab0a657db0 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -1063,14 +1063,14 @@ func (a *authMiddleware) ServeHTTP(resp http.ResponseWriter, req *http.Request) if a.srv.parse(resp, req, &args.Region, &args.QueryOptions) { // Error parsing request, 400 resp.WriteHeader(http.StatusBadRequest) - resp.Write([]byte("Invalid request parameters\n")) + resp.Write([]byte(http.StatusText(http.StatusBadRequest))) return } if args.AuthToken == "" { // 401 instead of 403 since no token was present. resp.WriteHeader(http.StatusUnauthorized) - resp.Write([]byte("Authorization required\n")) + resp.Write([]byte(http.StatusText(http.StatusUnauthorized))) return } @@ -1085,7 +1085,7 @@ func (a *authMiddleware) ServeHTTP(resp http.ResponseWriter, req *http.Request) if reply.Identity == nil || (reply.Identity.ACLToken == nil && reply.Identity.Claims == nil) { a.srv.logger.Debug("Failed to authenticated Task API request", "method", req.Method, "url", req.URL) resp.WriteHeader(http.StatusForbidden) - resp.Write([]byte("Forbidden\n")) + resp.Write([]byte(http.StatusText(http.StatusForbidden))) return } From f222ad18b11aecb740c1c295c0bb4f81a9e2fd7b Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 16:08:54 -0800 Subject: [PATCH 08/18] must make task api uds world writable --- client/allocrunner/taskrunner/api_hook.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/allocrunner/taskrunner/api_hook.go b/client/allocrunner/taskrunner/api_hook.go index 81974e79a86..c7bb34bf716 100644 --- a/client/allocrunner/taskrunner/api_hook.go +++ b/client/allocrunner/taskrunner/api_hook.go @@ -62,6 +62,10 @@ func (h *apiHook) Prestart(_ context.Context, req *interfaces.TaskPrestartReques return nil } + if err := os.Chmod(udsPath, 0o777); err != nil { + h.logger.Warn("error setting task api socket permissions", "path", udsPath, "error", err) + } + go func() { // Cannot use Prestart's context as it is closed after all prestart hooks // have been closed, but we do want to try to cleanup on shutdown. From ee0e42390b39e78b895cf8af38597bb376221820 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 16:09:33 -0800 Subject: [PATCH 09/18] task api tests --- e2e/workload_id/input/api-auth.nomad.hcl | 99 ++++++++++++++++++++ e2e/workload_id/input/api-win.nomad.hcl | 36 ++++++++ e2e/workload_id/taskapi_test.go | 111 +++++++++++++++++++++++ 3 files changed, 246 insertions(+) create mode 100644 e2e/workload_id/input/api-auth.nomad.hcl create mode 100644 e2e/workload_id/input/api-win.nomad.hcl create mode 100644 e2e/workload_id/taskapi_test.go diff --git a/e2e/workload_id/input/api-auth.nomad.hcl b/e2e/workload_id/input/api-auth.nomad.hcl new file mode 100644 index 00000000000..331f492c97f --- /dev/null +++ b/e2e/workload_id/input/api-auth.nomad.hcl @@ -0,0 +1,99 @@ +job "api-auth" { + datacenters = ["dc1"] + type = "batch" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "api-auth" { + + # none task should get a 401 response + task "none" { + driver = "docker" + config { + image = "curlimages/curl:7.87.0" + args = [ + "--unix-socket", "${NOMAD_SECRETS_DIR}/api.sock", + "-v", + "localhost/v1/agent/health", + ] + } + resources { + cpu = 16 + memory = 32 + disk = 64 + } + } + + # bad task should get a 403 response + task "bad" { + driver = "docker" + config { + image = "curlimages/curl:7.87.0" + args = [ + "--unix-socket", "${NOMAD_SECRETS_DIR}/api.sock", + "-H", "X-Nomad-Token: 37297754-3b87-41da-9ac7-d98fd934deed", + "-v", + "localhost/v1/agent/health", + ] + } + resources { + cpu = 16 + memory = 32 + disk = 64 + } + } + + # docker-wid task should succeed due to using workload identity + task "docker-wid" { + driver = "docker" + + config { + image = "curlimages/curl:7.87.0" + args = [ + "--unix-socket", "${NOMAD_SECRETS_DIR}/api.sock", + "-H", "Authorization: Bearer ${NOMAD_TOKEN}", + "-v", + "localhost/v1/agent/health", + ] + } + + identity { + env = true + } + + resources { + cpu = 16 + memory = 32 + disk = 64 + } + } + + # exec-wid task should succeed due to using workload identity + task "exec-wid" { + driver = "exec" + + config { + command = "curl" + args = [ + "-H", "Authorization: Bearer ${NOMAD_TOKEN}", + "--unix-socket", "${NOMAD_SECRETS_DIR}/api.sock", + "-v", + "localhost/v1/agent/health", + ] + } + + identity { + env = true + } + + resources { + cpu = 16 + memory = 32 + disk = 64 + } + } + } +} diff --git a/e2e/workload_id/input/api-win.nomad.hcl b/e2e/workload_id/input/api-win.nomad.hcl new file mode 100644 index 00000000000..7552500f32a --- /dev/null +++ b/e2e/workload_id/input/api-win.nomad.hcl @@ -0,0 +1,36 @@ +job "api-win" { + datacenters = ["dc1"] + type = "batch" + + constraint { + attribute = "${attr.kernel.name}" + value = "windows" + } + + constraint { + attribute = "${attr.cpu.arch}" + value = "amd64" + } + + group "api-win" { + + task "win" { + driver = "raw_exec" + config { + command = "powershell" + args = ["local/curl-7.87.0_4-win64-mingw/bin/curl.exe -H \"Authorization: Bearer $env:NOMAD_TOKEN\" --unix-socket $env:NOMAD_SECRETS_DIR/api.sock -v localhost:4646/v1/agent/health"] + } + artifact { + source = "https://curl.se/windows/dl-7.87.0_4/curl-7.87.0_4-win64-mingw.zip" + } + identity { + env = true + } + resources { + cpu = 16 + memory = 32 + disk = 64 + } + } + } +} diff --git a/e2e/workload_id/taskapi_test.go b/e2e/workload_id/taskapi_test.go new file mode 100644 index 00000000000..3c636d367e1 --- /dev/null +++ b/e2e/workload_id/taskapi_test.go @@ -0,0 +1,111 @@ +package main + +import ( + "fmt" + "io" + "net/http" + "testing" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/shoenig/test" + "github.com/shoenig/test/must" +) + +// TestTaskAPI runs subtets exercising the Task API related functionality. +// Bundled with Workload Identity as that's a prereq for the Task API to work. +func TestTaskAPI(t *testing.T) { + nomad := e2eutil.NomadClient(t) + + e2eutil.WaitForLeader(t, nomad) + e2eutil.WaitForNodesReady(t, nomad, 1) + + t.Run("testTaskAPI_Auth", testTaskAPIAuth) + t.Run("testTaskAPI_Windows", testTaskAPIWindows) +} + +func testTaskAPIAuth(t *testing.T) { + nomad := e2eutil.NomadClient(t) + jobID := "api-auth-" + uuid.Short() + jobIDs := []string{jobID} + t.Cleanup(e2eutil.CleanupJobsAndGC(t, &jobIDs)) + + // start job + allocs := e2eutil.RegisterAndWaitForAllocs(t, nomad, "./input/api-auth.nomad.hcl", jobID, "") + must.Len(t, 1, allocs) + allocID := allocs[0].ID + + // wait for batch alloc to complete + alloc := e2eutil.WaitForAllocStopped(t, nomad, allocID) + must.Eq(t, alloc.ClientStatus, "complete") + + assertions := []struct { + task string + suffix string + }{ + { + task: "none", + suffix: http.StatusText(http.StatusUnauthorized), + }, + { + task: "bad", + suffix: http.StatusText(http.StatusForbidden), + }, + { + task: "docker-wid", + suffix: `"ok":true}}`, + }, + { + task: "exec-wid", + suffix: `"ok":true}}`, + }, + } + + // Ensure the assertions and input file match + must.Len(t, len(assertions), alloc.Job.TaskGroups[0].Tasks, + must.Sprintf("test and jobspec mismatch")) + + for _, tc := range assertions { + logFile := fmt.Sprintf("alloc/logs/%s.stdout.0", tc.task) + fd, err := nomad.AllocFS().Cat(alloc, logFile, nil) + must.NoError(t, err) + logBytes, err := io.ReadAll(fd) + must.NoError(t, err) + logs := string(logBytes) + + ps := must.Sprintf("Task: %s Logs: < Date: Fri, 3 Feb 2023 16:49:43 -0800 Subject: [PATCH 10/18] remove redundant return --- command/agent/http.go | 1 - 1 file changed, 1 deletion(-) diff --git a/command/agent/http.go b/command/agent/http.go index 1ab0a657db0..2e181172d15 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -1091,5 +1091,4 @@ func (a *authMiddleware) ServeHTTP(resp http.ResponseWriter, req *http.Request) a.srv.logger.Trace("Authenticated request", "id", reply.Identity, "method", req.Method, "url", req.URL) a.wrapped.ServeHTTP(resp, req) - return } From 66b68e8342144ac2dac196fd699e93ee2f0fc97d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 16:58:24 -0800 Subject: [PATCH 11/18] fix panic in consul integration test --- client/config/testing.go | 6 +++--- command/agent/consul/int_test.go | 29 +++++++++++++++-------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/client/config/testing.go b/client/config/testing.go index 145b1626d39..adb703de2c2 100644 --- a/client/config/testing.go +++ b/client/config/testing.go @@ -77,13 +77,13 @@ func TestClientConfig(t testing.T) (*Config, func()) { conf.MaxKillTimeout = 30 * time.Second // Provide a stub APIListenerRegistrar implementation - conf.APIListenerRegistrar = noopAPIListenerRegistrar{} + conf.APIListenerRegistrar = NoopAPIListenerRegistrar{} return conf, cleanup } -type noopAPIListenerRegistrar struct{} +type NoopAPIListenerRegistrar struct{} -func (noopAPIListenerRegistrar) Serve(_ context.Context, _ net.Listener) error { +func (NoopAPIListenerRegistrar) Serve(_ context.Context, _ net.Listener) error { return nil } diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 1fd6a6fcdb6..c48f56ec83f 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -46,7 +46,7 @@ func TestConsul_Integration(t *testing.T) { // Create an embedded Consul server testconsul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { - c.Peering = nil // fix for older versions of Consul (<1.13.0) that don't support peering + c.Peering = nil // fix for older versions of Consul (<1.13.0) that don't support peering // If -v wasn't specified squelch consul logging if !testing.Verbose() { c.Stdout = ioutil.Discard @@ -152,19 +152,20 @@ func TestConsul_Integration(t *testing.T) { // Build the config config := &taskrunner.Config{ - Alloc: alloc, - ClientConfig: conf, - Consul: serviceClient, - Task: task, - TaskDir: taskDir, - Logger: logger, - Vault: vclient, - StateDB: state.NoopDB{}, - StateUpdater: logUpdate, - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), - StartConditionMetCh: closedCh, - ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)), + Alloc: alloc, + ClientConfig: conf, + Consul: serviceClient, + Task: task, + TaskDir: taskDir, + Logger: logger, + Vault: vclient, + StateDB: state.NoopDB{}, + StateUpdater: logUpdate, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + StartConditionMetCh: closedCh, + ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)), + APIListenerRegistrar: config.NoopAPIListenerRegistrar{}, } tr, err := taskrunner.NewTaskRunner(config) From 49ab5cbe28d9f07553193f5fc602282f68ed9671 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 16:59:37 -0800 Subject: [PATCH 12/18] changelog --- .changelog/15864.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/15864.txt diff --git a/.changelog/15864.txt b/.changelog/15864.txt new file mode 100644 index 00000000000..b91ffba97b5 --- /dev/null +++ b/.changelog/15864.txt @@ -0,0 +1,3 @@ +```release-note:improvement +client: added http api access for tasks via unix socket +``` From a5adab9b61da50b0fe08edb279e30b2af84b6471 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 17:06:01 -0800 Subject: [PATCH 13/18] revert initial token hack --- client/allocrunner/taskrunner/task_runner.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index ee0dace73ab..e8107f19b4f 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -426,11 +426,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { // Use the client secret only as the initial value; the identity hook will // update this with a workload identity if one is available - //TODO(schmichael) can we remove this entirely? seems like a huge - //security problem if we use accidentally use it instead of the - //workload identity - //tr.setNomadToken(config.ClientConfig.Node.SecretID) - tr.setNomadToken("Hi, this is a fake token that should never work or be used! Remove me.") + tr.setNomadToken(config.ClientConfig.Node.SecretID) // Initialize the runners hooks. Must come after initDriver so hooks // can use tr.driverCapabilities From 4114588f0f314305c7e06cfac41aa84e3eb2ea4e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Feb 2023 17:13:54 -0800 Subject: [PATCH 14/18] cleanup this mess --- client/config/config.go | 7 ++++++- command/agent/agent.go | 5 ++--- command/agent/http.go | 8 ++++---- nomad/structs/structs.go | 8 -------- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/client/config/config.go b/client/config/config.go index 6523c978761..92ba262860c 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -303,7 +303,12 @@ type Config struct { // used for template functions which require access to the Nomad API. TemplateDialer *bufconndialer.BufConnWrapper - //TODO(schmichael) write something + // APIListenerRegistrar allows the client to registers listeners created at + // runtime (eg the Task API) with the agent's HTTP server. Since the agent + // creates the HTTP *after* the client starts, we have to use this shim to + // pass listeners back to the agent. + // This is the same design as the bufconndialer but for the + // http.Serve(listener) API instead of the net.Dial API. APIListenerRegistrar APIListenerRegistrar // Artifact configuration from the agent's config file. diff --git a/command/agent/agent.go b/command/agent/agent.go index 54d4142035d..d5703f72104 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -115,8 +115,8 @@ type Agent struct { builtinListener net.Listener builtinDialer *bufconndialer.BufConnWrapper - //TODO(schmichael) builtinServer is an HTTP server for attaching - //per-task listeners. Always requires auth. + // builtinServer is an HTTP server for attaching per-task listeners. Always + // requires auth. builtinServer *builtinAPI inmemSink *metrics.InmemSink @@ -1021,7 +1021,6 @@ func (a *Agent) setupClient() error { // running consul-template functions that utilize the Nomad API. We lazy // load this into the client config, therefore this needs to happen before // we call NewClient. - //TODO migrate to APIListenerRegistrar a.builtinListener, a.builtinDialer = bufconndialer.New() conf.TemplateDialer = a.builtinDialer diff --git a/command/agent/http.go b/command/agent/http.go index 2e181172d15..b51d6696a47 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -214,8 +214,6 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { srvs = append(srvs, srv) } - // This HTTP server is only - if serverInitializationErrors != nil { for _, srv := range srvs { srv.Shutdown() @@ -1043,8 +1041,10 @@ func wrapCORSWithAllowedMethods(f func(http.ResponseWriter, *http.Request), meth return allowCORSWithMethods(methods...).Handler(http.HandlerFunc(f)) } -// TODO(schmichael) caching - see client/acl.go -// TODO(schmichael) where should this thing live +// authMiddleware implements the http.Handler interface to enforce +// authentication for *all* requests. Even with ACLs enabled there are +// endpoints which are accessible without authenticating. This middleware is +// used for the Task API to enfoce authentication for all API access. type authMiddleware struct { srv *HTTPServer wrapped http.Handler diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a76e5c2ab6e..8ad29418f42 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10632,14 +10632,6 @@ type IdentityClaims struct { jwt.RegisteredClaims } -// TODO(schmichael) this is just for debugging, feel free to remove -// String version of the identity. No secrets, safe for display. -func (ic *IdentityClaims) String() string { - //TODO(schmichael) use "jwt" or something vendor specific like "nwi" - //for Nomad Workload Identity here? - return fmt.Sprintf("jwt:%s/%s/%s/%s", ic.Namespace, ic.JobID, ic.AllocationID, ic.TaskName) -} - // AllocationDiff is another named type for Allocation (to use the same fields), // which is used to represent the delta for an Allocation. If you need a method // defined on the al From ba34c9db99d314c4a26a86ad571d4f65cea77ed7 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 6 Feb 2023 10:29:24 -0800 Subject: [PATCH 15/18] hclfmt --- e2e/workload_id/input/api-auth.nomad.hcl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e/workload_id/input/api-auth.nomad.hcl b/e2e/workload_id/input/api-auth.nomad.hcl index 331f492c97f..dae1346973c 100644 --- a/e2e/workload_id/input/api-auth.nomad.hcl +++ b/e2e/workload_id/input/api-auth.nomad.hcl @@ -14,7 +14,7 @@ job "api-auth" { driver = "docker" config { image = "curlimages/curl:7.87.0" - args = [ + args = [ "--unix-socket", "${NOMAD_SECRETS_DIR}/api.sock", "-v", "localhost/v1/agent/health", @@ -32,7 +32,7 @@ job "api-auth" { driver = "docker" config { image = "curlimages/curl:7.87.0" - args = [ + args = [ "--unix-socket", "${NOMAD_SECRETS_DIR}/api.sock", "-H", "X-Nomad-Token: 37297754-3b87-41da-9ac7-d98fd934deed", "-v", @@ -52,7 +52,7 @@ job "api-auth" { config { image = "curlimages/curl:7.87.0" - args = [ + args = [ "--unix-socket", "${NOMAD_SECRETS_DIR}/api.sock", "-H", "Authorization: Bearer ${NOMAD_TOKEN}", "-v", From fc8f5b89d42c0b7e19e4582dab382a9bbdc71ede Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 6 Feb 2023 10:35:01 -0800 Subject: [PATCH 16/18] opportunistically use least privileges for task api socket --- client/allocrunner/taskrunner/api_hook.go | 30 ++++--- .../allocrunner/taskrunner/api_hook_test.go | 28 ++++++- helper/users/lookup.go | 83 +++++++++++++++++-- helper/users/lookup_linux_test.go | 39 ++++++++- helper/users/lookup_windows_test.go | 18 +++- 5 files changed, 176 insertions(+), 22 deletions(-) diff --git a/client/allocrunner/taskrunner/api_hook.go b/client/allocrunner/taskrunner/api_hook.go index c7bb34bf716..003d5fecdd3 100644 --- a/client/allocrunner/taskrunner/api_hook.go +++ b/client/allocrunner/taskrunner/api_hook.go @@ -7,11 +7,13 @@ import ( "net/http" "os" "path/filepath" + "sync" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/helper/users" ) // apiHook exposes the Task API. The Task API allows task's to access the Nomad @@ -29,7 +31,12 @@ type apiHook struct { shutdownCtx context.Context srv config.APIListenerRegistrar logger hclog.Logger - ln net.Listener + + // Lock listener as it is updated from multiple hooks. + lock sync.Mutex + + // Listener is the unix domain socket of the task api for this taks. + ln net.Listener } func newAPIHook(shutdownCtx context.Context, srv config.APIListenerRegistrar, logger hclog.Logger) *apiHook { @@ -46,26 +53,22 @@ func (*apiHook) Name() string { } func (h *apiHook) Prestart(_ context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + h.lock.Lock() + defer h.lock.Unlock() + if h.ln != nil { // Listener already set. Task is probably restarting. return nil } udsPath := apiSocketPath(req.TaskDir) - if err := os.RemoveAll(udsPath); err != nil { - h.logger.Warn("error removing task api socket", "path", udsPath, "error", err) - } - - udsln, err := net.Listen("unix", udsPath) + udsln, err := users.SocketFileFor(h.logger, udsPath, req.Task.User) if err != nil { - h.logger.Warn("error listening on task api socket", "path", udsPath, "error", err) + // Soft-fail and let the task fail if it requires the task api. + h.logger.Warn("error creating task api socket", "path", udsPath, "error", err) return nil } - if err := os.Chmod(udsPath, 0o777); err != nil { - h.logger.Warn("error setting task api socket permissions", "path", udsPath, "error", err) - } - go func() { // Cannot use Prestart's context as it is closed after all prestart hooks // have been closed, but we do want to try to cleanup on shutdown. @@ -76,7 +79,7 @@ func (h *apiHook) Prestart(_ context.Context, req *interfaces.TaskPrestartReques if errors.Is(err, net.ErrClosed) { return } - h.logger.Error("error serving api", "error", err) + h.logger.Error("error serving task api", "error", err) } }() @@ -85,6 +88,9 @@ func (h *apiHook) Prestart(_ context.Context, req *interfaces.TaskPrestartReques } func (h *apiHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { + h.lock.Lock() + defer h.lock.Unlock() + if h.ln != nil { if err := h.ln.Close(); err != nil { if !errors.Is(err, net.ErrClosed) { diff --git a/client/allocrunner/taskrunner/api_hook_test.go b/client/allocrunner/taskrunner/api_hook_test.go index e5ce0822ab1..164d4433d58 100644 --- a/client/allocrunner/taskrunner/api_hook_test.go +++ b/client/allocrunner/taskrunner/api_hook_test.go @@ -7,13 +7,17 @@ import ( "os" "path/filepath" "runtime" + "strconv" "strings" + "syscall" "testing" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/users" + "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test/must" ) @@ -35,7 +39,7 @@ func TestAPIHook_SoftFail(t *testing.T) { // Use a SecretsDir that will always exceed Unix socket path length // limits (sun_path) - dst := filepath.Join(t.TempDir(), strings.Repeat("_NOMAD_TEST_", 500)) + dst := filepath.Join(t.TempDir(), strings.Repeat("_NOMAD_TEST_", 100)) ctx := context.Background() srv := testAPIListenerRegistrar{} @@ -43,6 +47,7 @@ func TestAPIHook_SoftFail(t *testing.T) { h := newAPIHook(ctx, srv, logger) req := &interfaces.TaskPrestartRequest{ + Task: &structs.Task{}, // needs to be non-nil for Task.User lookup TaskDir: &allocdir.TaskDir{ SecretsDir: dst, }, @@ -102,6 +107,9 @@ func TestAPIHook_Ok(t *testing.T) { h := newAPIHook(ctx, srv, logger) req := &interfaces.TaskPrestartRequest{ + Task: &structs.Task{ + User: "nobody", + }, TaskDir: &allocdir.TaskDir{ SecretsDir: dst, }, @@ -114,12 +122,28 @@ func TestAPIHook_Ok(t *testing.T) { // File should have been created sockDst := apiSocketPath(req.TaskDir) - // Stat on Windows fails on sockets + // Stat and chown fail on Windows, so skip these checks if runtime.GOOS != "windows" { stat, err := os.Stat(sockDst) must.NoError(t, err) must.True(t, stat.Mode()&fs.ModeSocket != 0, must.Sprintf("expected %q to be a unix socket but got %s", sockDst, stat.Mode())) + + nobody, _ := users.Lookup("nobody") + if syscall.Getuid() == 0 && nobody != nil { + t.Logf("root and nobody exists: testing file perms") + + // We're root and nobody exists! Check perms + must.Eq(t, fs.FileMode(0o600), stat.Mode().Perm()) + + sysStat, ok := stat.Sys().(*syscall.Stat_t) + must.True(t, ok, must.Sprintf("expected stat.Sys() to be a *syscall.Stat_t on %s but found %T", + runtime.GOOS, stat.Sys())) + + nobodyUID, err := strconv.Atoi(nobody.Uid) + must.NoError(t, err) + must.Eq(t, nobodyUID, int(sysStat.Uid)) + } } // Assert the listener is working diff --git a/helper/users/lookup.go b/helper/users/lookup.go index 7c5bafc8683..1d75d0db80c 100644 --- a/helper/users/lookup.go +++ b/helper/users/lookup.go @@ -2,11 +2,13 @@ package users import ( "fmt" + "net" "os" "os/user" "strconv" "sync" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" ) @@ -36,6 +38,23 @@ func Current() (*user.User, error) { return user.Current() } +// UIDforUser returns the UID for the specified username or returns an error. +// +// Will always fail on Windows and Plan 9. +func UIDforUser(username string) (int, error) { + u, err := Lookup(username) + if err != nil { + return 0, err + } + + uid, err := strconv.Atoi(u.Uid) + if err != nil { + return 0, fmt.Errorf("error parsing uid: %w", err) + } + + return uid, nil +} + // WriteFileFor is like os.WriteFile except if possible it chowns the file to // the specified user (possibly from Task.User) and sets the permissions to // 0o600. @@ -45,6 +64,8 @@ func Current() (*user.User, error) { // // On failure a multierror with both the original and fallback errors will be // returned. +// +// See SocketFileFor if writing a unix socket file. func WriteFileFor(path string, contents []byte, username string) error { // Don't even bother trying to chown to an empty username var origErr error @@ -72,16 +93,11 @@ func WriteFileFor(path string, contents []byte, username string) error { } func writeFileFor(path string, contents []byte, username string) error { - user, err := Lookup(username) + uid, err := UIDforUser(username) if err != nil { return err } - uid, err := strconv.Atoi(user.Uid) - if err != nil { - return fmt.Errorf("error parsing uid: %w", err) - } - if err := os.WriteFile(path, contents, 0o600); err != nil { return err } @@ -95,3 +111,58 @@ func writeFileFor(path string, contents []byte, username string) error { return nil } + +// SocketFileFor creates a unix domain socket file on the specified path and, +// if possible, makes it usable by only the specified user. Failing that it +// will leave the socket open to all users. Non-fatal errors are logged. +// +// See WriteFileFor if writing a regular file. +func SocketFileFor(logger hclog.Logger, path, username string) (net.Listener, error) { + if err := os.RemoveAll(path); err != nil { + logger.Warn("error removing socket", "path", path, "error", err) + } + + udsln, err := net.Listen("unix", path) + if err != nil { + return nil, err + } + + if username != "" { + // Try to set perms on socket file to least privileges. + if err := setSocketOwner(path, username); err == nil { + // Success! Exit early + return udsln, nil + } + + // This error is expected to always occur in some environments (Windows, + // non-root agents), so don't log above Trace. + logger.Trace("failed to set user on socket", "path", path, "user", username, "error", err) + } + + // Opportunistic least privileges failed above, so make sure anyone can use + // the socket. + if err := os.Chmod(path, 0o666); err != nil { + logger.Warn("error setting socket permissions", "path", path, "error", err) + } + + return udsln, nil +} + +func setSocketOwner(path, username string) error { + uid, err := UIDforUser(username) + if err != nil { + return err + } + + if err := os.Chown(path, uid, -1); err != nil { + return err + } + + if err := os.Chmod(path, 0o600); err != nil { + // Awkward situation that is hopefully impossible to reach where we could + // chown the socket but not change its mode. + return err + } + + return nil +} diff --git a/helper/users/lookup_linux_test.go b/helper/users/lookup_linux_test.go index 1559befbbb7..0855949b79c 100644 --- a/helper/users/lookup_linux_test.go +++ b/helper/users/lookup_linux_test.go @@ -11,6 +11,7 @@ import ( "syscall" "testing" + "github.com/hashicorp/nomad/helper/testlog" "github.com/shoenig/test/must" "golang.org/x/sys/unix" ) @@ -58,7 +59,7 @@ func TestWriteFileFor_Linux(t *testing.T) { stat, err := os.Lstat(path) must.NoError(t, err) must.True(t, stat.Mode().IsRegular(), - must.Sprintf("expected %s to be a normal file but found %#o", path, stat.Mode())) + must.Sprintf("expected %s to be a regular file but found %#o", path, stat.Mode())) linuxStat, ok := stat.Sys().(*syscall.Stat_t) must.True(t, ok, must.Sprintf("expected stat.Sys() to be a *syscall.Stat_t but found %T", stat.Sys())) @@ -78,3 +79,39 @@ func TestWriteFileFor_Linux(t *testing.T) { must.Eq(t, 0o666&(^umask), int(stat.Mode())) } } + +// TestSocketFileFor_Linux asserts that when running as root on Linux socket +// files are created with least permissions. If running as non-root then we +// leave the socket file as world writable. +func TestSocketFileFor_Linux(t *testing.T) { + path := filepath.Join(t.TempDir(), "api.sock") + + logger := testlog.HCLogger(t) + ln, err := SocketFileFor(logger, path, "nobody") + must.NoError(t, err) + must.NotNil(t, ln) + defer ln.Close() + + stat, err := os.Lstat(path) + must.NoError(t, err) + must.False(t, stat.Mode().IsRegular(), + must.Sprintf("expected %s to be a regular file but found %#o", path, stat.Mode())) + + linuxStat, ok := stat.Sys().(*syscall.Stat_t) + must.True(t, ok, must.Sprintf("expected stat.Sys() to be a *syscall.Stat_t but found %T", stat.Sys())) + + current, err := Current() + must.NoError(t, err) + + if current.Username == "root" { + t.Logf("Running as root: asserting %s is owned by nobody", path) + nobody, err := Lookup("nobody") + must.NoError(t, err) + must.Eq(t, nobody.Uid, fmt.Sprintf("%d", linuxStat.Uid)) + must.Eq(t, 0o600, int(stat.Mode().Perm())) + } else { + t.Logf("Running as non-root: asserting %s is world writable", path) + must.Eq(t, current.Uid, fmt.Sprintf("%d", linuxStat.Uid)) + must.Eq(t, 0o666, int(stat.Mode().Perm())) + } +} diff --git a/helper/users/lookup_windows_test.go b/helper/users/lookup_windows_test.go index 688f740061a..4a5345793ad 100644 --- a/helper/users/lookup_windows_test.go +++ b/helper/users/lookup_windows_test.go @@ -44,7 +44,23 @@ func TestWriteFileFor_Windows(t *testing.T) { stat, err := os.Lstat(path) must.NoError(t, err) must.True(t, stat.Mode().IsRegular(), - must.Sprintf("expected %s to be a normal file but found %#o", path, stat.Mode())) + must.Sprintf("expected %s to be a regular file but found %#o", path, stat.Mode())) + + // Assert Windows hits the fallback world-accessible case + must.Eq(t, 0o666, stat.Mode().Perm()) +} + +// TestSocketFileFor_Windows asserts that socket files cannot be chowned on +// windows. +func TestSocketFileFor_Windows(t *testing.T) { + path := filepath.Join(t.TempDir(), "api.sock") + + ln, err := SocketFileFor(testlog.HCLogger(t), path, "Administrator") + must.NoError(t, err) + must.NotNil(t, ln) + defer ln.Close() + stat, err := os.Lstat(path) + must.NoError(t, err) // Assert Windows hits the fallback world-accessible case must.Eq(t, 0o666, stat.Mode().Perm()) From e137a8c6251013309daa8d309da4be3648754ce9 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 6 Feb 2023 10:40:13 -0800 Subject: [PATCH 17/18] Fix typo in client/config/config.go Co-authored-by: Seth Hoenig --- client/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/config/config.go b/client/config/config.go index 92ba262860c..466230ddce1 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -303,7 +303,7 @@ type Config struct { // used for template functions which require access to the Nomad API. TemplateDialer *bufconndialer.BufConnWrapper - // APIListenerRegistrar allows the client to registers listeners created at + // APIListenerRegistrar allows the client to register listeners created at // runtime (eg the Task API) with the agent's HTTP server. Since the agent // creates the HTTP *after* the client starts, we have to use this shim to // pass listeners back to the agent. From e9d02dbc3840e9535033066c4267db1501a7c88e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 6 Feb 2023 10:46:32 -0800 Subject: [PATCH 18/18] fix test --- command/agent/consul/int_test.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index c48f56ec83f..02124f108ee 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -61,6 +61,7 @@ func TestConsul_Integration(t *testing.T) { conf := config.DefaultConfig() conf.Node = mock.Node() conf.ConsulConfig.Addr = testconsul.HTTPAddr + conf.APIListenerRegistrar = config.NoopAPIListenerRegistrar{} consulConfig, err := conf.ConsulConfig.ApiConfig() if err != nil { t.Fatalf("error generating consul config: %v", err) @@ -152,20 +153,19 @@ func TestConsul_Integration(t *testing.T) { // Build the config config := &taskrunner.Config{ - Alloc: alloc, - ClientConfig: conf, - Consul: serviceClient, - Task: task, - TaskDir: taskDir, - Logger: logger, - Vault: vclient, - StateDB: state.NoopDB{}, - StateUpdater: logUpdate, - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), - StartConditionMetCh: closedCh, - ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)), - APIListenerRegistrar: config.NoopAPIListenerRegistrar{}, + Alloc: alloc, + ClientConfig: conf, + Consul: serviceClient, + Task: task, + TaskDir: taskDir, + Logger: logger, + Vault: vclient, + StateDB: state.NoopDB{}, + StateUpdater: logUpdate, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + StartConditionMetCh: closedCh, + ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)), } tr, err := taskrunner.NewTaskRunner(config)