Skip to content

Commit

Permalink
wip task api via unix domain socket
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Feb 2, 2023
1 parent fe4ff5b commit 7e51b69
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 20 deletions.
83 changes: 83 additions & 0 deletions client/allocrunner/taskrunner/api_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package taskrunner

import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/config"
)

type apiHook struct {
srv config.APIListenerRegistrar
logger log.Logger
ln net.Listener
}

func newAPIHook(srv config.APIListenerRegistrar, logger log.Logger) *apiHook {
h := &apiHook{
srv: srv,
}
h.logger = logger.Named(h.Name())
return h
}

func (*apiHook) Name() string {
return "api"
}

func (h *apiHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
if h.ln != nil {
//TODO(schmichael) remove me
h.logger.Trace("huh, called apiHook.Prestart twice")
return nil
}

//TODO(schmichael) what dir & perms
udsDir := filepath.Join(req.TaskDir.SecretsDir, "run")
if err := os.MkdirAll(udsDir, 0o775); err != nil {
return fmt.Errorf("error creating api socket directory: %w", err)
}

//TODO(schmichael) what name
udsPath := filepath.Join(udsDir, "nomad.socket")
if err := os.RemoveAll(udsPath); err != nil {
return fmt.Errorf("could not remove existing api socket: %w", err)
}

udsln, err := net.Listen("unix", udsPath)
if err != nil {
return fmt.Errorf("could not create api socket: %w", err)
}

go func() {
if err := h.srv.Serve(ctx, udsln); err != nil {
//TODO(schmichael) probably ignore http.ErrServerClosed
h.logger.Warn("error serving api", "error", err)
}
}()

h.ln = udsln
return nil
}

func (h *apiHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
if h.ln == nil {
//TODO(schmichael) remove me
h.logger.Trace("huh, no listener")
return nil
}

if err := h.ln.Close(); err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
}
h.logger.Trace("error closing task listener: %v", err)
}
return nil
}
6 changes: 5 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,11 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {

// Use the client secret only as the initial value; the identity hook will
// update this with a workload identity if one is available
tr.setNomadToken(config.ClientConfig.Node.SecretID)
//TODO(schmichael) can we remove this entirely? seems like a huge
//security problem if we use accidentally use it instead of the
//workload identity
//tr.setNomadToken(config.ClientConfig.Node.SecretID)
tr.setNomadToken("Hi, this is a fake token that should never work or be used! Remove me.")

// Initialize the runners hooks. Must come after initDriver so hooks
// can use tr.driverCapabilities
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (tr *TaskRunner) initHooks() {
newArtifactHook(tr, tr.getter, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
newAPIHook(tr.clientConfig.APIListenerRegistrar, hookLogger),
}

// If the task has a CSI block, add the hook.
Expand Down
17 changes: 17 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package config

import (
"context"
"errors"
"fmt"
"net"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -301,10 +303,25 @@ type Config struct {
// used for template functions which require access to the Nomad API.
TemplateDialer *bufconndialer.BufConnWrapper

//TODO(schmichael) write something
APIListenerRegistrar APIListenerRegistrar

// Artifact configuration from the agent's config file.
Artifact *ArtifactConfig
}

type APIListenerRegistrar interface {
// Serve the HTTP API on the provided listener. The returned channel may be
// used to receive errors and is closed when the listener is no longer being
// served. If the agent is shutting down the error will be
// http.ErrServerClosed.
//
// The context is because Serve may be called before the HTTP server has been
// initialized. If the context is canceled before the HTTP server is
// initialized, the context's error will be returned on the chan.
Serve(context.Context, net.Listener) chan error
}

// ClientTemplateConfig is configuration on the client specific to template
// rendering
type ClientTemplateConfig struct {
Expand Down
19 changes: 17 additions & 2 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ type Agent struct {
builtinListener net.Listener
builtinDialer *bufconndialer.BufConnWrapper

InmemSink *metrics.InmemSink
//TODO(schmichael) builtinServer is an HTTP server for attaching
//per-task listeners. Always requires auth.
builtinServer *builtinAPI

inmemSink *metrics.InmemSink
}

// NewAgent is used to create a new agent with the given configuration
Expand All @@ -124,7 +128,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i
config: config,
logOutput: logOutput,
shutdownCh: make(chan struct{}),
InmemSink: inmem,
inmemSink: inmem,
}

// Create the loggers
Expand Down Expand Up @@ -1017,9 +1021,15 @@ func (a *Agent) setupClient() error {
// running consul-template functions that utilize the Nomad API. We lazy
// load this into the client config, therefore this needs to happen before
// we call NewClient.
//TODO migrate to APIListenerRegistrar
a.builtinListener, a.builtinDialer = bufconndialer.New()
conf.TemplateDialer = a.builtinDialer

// Initialize builtin API server here for use in the client, but it won't
// accept connections until the HTTP servers are created.
a.builtinServer = newBuiltinAPI()
conf.APIListenerRegistrar = a.builtinServer

nomadClient, err := client.NewClient(
conf, a.consulCatalog, a.consulProxies, a.consulService, nil)
if err != nil {
Expand Down Expand Up @@ -1300,6 +1310,11 @@ func (a *Agent) GetConfig() *Config {
return a.config
}

// GetMetricsSink returns the metrics sink.
func (a *Agent) GetMetricsSink() *metrics.InmemSink {
return a.inmemSink
}

// setupConsul creates the Consul client and starts its main Run loop.
func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
apiConf, err := consulConfig.ApiConfig()
Expand Down
8 changes: 4 additions & 4 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i
return nil, structs.ErrPermissionDenied
}

peers := s.agent.client.GetServers()
peers := client.GetServers()
sort.Strings(peers)
return peers, nil
}
Expand Down Expand Up @@ -468,9 +468,9 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)
}

// Set the servers list into the client
s.agent.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT")
s.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT")
if _, err := client.SetServers(servers); err != nil {
s.agent.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT")
s.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT")
//TODO is this the right error to return?
return nil, CodedError(400, err.Error())
}
Expand Down Expand Up @@ -708,7 +708,7 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques
// The RPC endpoint actually forwards the request to the correct
// agent, but we need to use the correct RPC interface.
localClient, remoteClient, localServer := s.rpcHandlerForNode(lookupNodeID)
s.agent.logger.Debug("s.rpcHandlerForNode()", "lookupNodeID", lookupNodeID, "serverID", serverID, "nodeID", nodeID, "localClient", localClient, "remoteClient", remoteClient, "localServer", localServer)
s.logger.Debug("s.rpcHandlerForNode()", "lookupNodeID", lookupNodeID, "serverID", serverID, "nodeID", nodeID, "localClient", localClient, "remoteClient", remoteClient, "localServer", localServer)

// Make the RPC call
if localClient {
Expand Down
2 changes: 1 addition & 1 deletion command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
case "exec":
return s.allocExec(allocID, resp, req)
case "snapshot":
if s.agent.client == nil {
if s.agent.Client() == nil {
return nil, clientNotRunning
}
return s.allocSnapshot(allocID, resp, req)
Expand Down
Loading

0 comments on commit 7e51b69

Please sign in to comment.