From 95700aa6b327f77e8a8a992377aeabc89bc20ee5 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Wed, 22 Jan 2025 01:51:28 +0000 Subject: [PATCH] Update p2p boostrap helpers for Spegel v0.0.30 Signed-off-by: Brad Davidson --- pkg/clientaccess/token.go | 56 +++++++++--- pkg/spegel/bootstrap.go | 178 ++++++++++++++++++++++++-------------- pkg/spegel/spegel.go | 61 ++++++++----- 3 files changed, 200 insertions(+), 95 deletions(-) diff --git a/pkg/clientaccess/token.go b/pkg/clientaccess/token.go index 2995d67507d7..c7b22cedae1a 100644 --- a/pkg/clientaccess/token.go +++ b/pkg/clientaccess/token.go @@ -48,6 +48,9 @@ var ( // ClientOption is a callback to mutate the http client prior to use type ClientOption func(*http.Client) +// RequestOption is a callback to mutate the http request prior to use +type RequestOption func(*http.Request) + // Info contains fields that track parsed parts of a cluster join token type Info struct { *kubeadm.BootstrapTokenString @@ -240,7 +243,7 @@ func parseToken(token string) (*Info, error) { // If the CA bundle is not empty but does not contain any valid certs, it validates using // an empty CA bundle (which will always fail). // If valid cert+key paths can be loaded from the provided paths, they are used for client cert auth. -func GetHTTPClient(cacerts []byte, certFile, keyFile string, option ...ClientOption) *http.Client { +func GetHTTPClient(cacerts []byte, certFile, keyFile string, options ...any) *http.Client { if len(cacerts) == 0 { return defaultClient } @@ -265,8 +268,10 @@ func GetHTTPClient(cacerts []byte, certFile, keyFile string, option ...ClientOpt }, } - for _, o := range option { - o(client) + for _, o := range options { + if clientOption, ok := o.(ClientOption); ok { + clientOption(client) + } } return client } @@ -278,8 +283,14 @@ func WithTimeout(d time.Duration) ClientOption { } } +func WithHeader(k, v string) RequestOption { + return func(r *http.Request) { + r.Header.Add(k, v) + } +} + // Get makes a request to a subpath of info's BaseURL -func (i *Info) Get(path string, option ...ClientOption) ([]byte, error) { +func (i *Info) Get(path string, options ...any) ([]byte, error) { u, err := url.Parse(i.BaseURL) if err != nil { return nil, err @@ -290,11 +301,12 @@ func (i *Info) Get(path string, option ...ClientOption) ([]byte, error) { } p.Scheme = u.Scheme p.Host = u.Host - return get(p.String(), GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token()) + client := GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, options...) + return get(p.String(), client, i.Username, i.Password, i.Token(), options...) } // Put makes a request to a subpath of info's BaseURL -func (i *Info) Put(path string, body []byte, option ...ClientOption) error { +func (i *Info) Put(path string, body []byte, options ...any) error { u, err := url.Parse(i.BaseURL) if err != nil { return err @@ -305,11 +317,12 @@ func (i *Info) Put(path string, body []byte, option ...ClientOption) error { } p.Scheme = u.Scheme p.Host = u.Host - return put(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token()) + client := GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, options...) + return put(p.String(), body, client, i.Username, i.Password, i.Token(), options...) } // Post makes a request to a subpath of info's BaseURL -func (i *Info) Post(path string, body []byte, option ...ClientOption) ([]byte, error) { +func (i *Info) Post(path string, body []byte, options ...any) ([]byte, error) { u, err := url.Parse(i.BaseURL) if err != nil { return nil, err @@ -320,7 +333,8 @@ func (i *Info) Post(path string, body []byte, option ...ClientOption) ([]byte, e } p.Scheme = u.Scheme p.Host = u.Host - return post(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token()) + client := GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, options...) + return post(p.String(), body, client, i.Username, i.Password, i.Token(), options...) } // setServer sets the BaseURL and CACerts fields of the Info by connecting to the server @@ -402,7 +416,7 @@ func getCACerts(u url.URL) ([]byte, error) { // get makes a request to a url using a provided client and credentials, // returning the response body. -func get(u string, client *http.Client, username, password, token string) ([]byte, error) { +func get(u string, client *http.Client, username, password, token string, options ...any) ([]byte, error) { req, err := http.NewRequest(http.MethodGet, u, nil) if err != nil { return nil, err @@ -414,6 +428,12 @@ func get(u string, client *http.Client, username, password, token string) ([]byt req.SetBasicAuth(username, password) } + for _, o := range options { + if requestOption, ok := o.(RequestOption); ok { + requestOption(req) + } + } + resp, err := client.Do(req) if err != nil { return nil, err @@ -424,7 +444,7 @@ func get(u string, client *http.Client, username, password, token string) ([]byt // put makes a request to a url using a provided client and credentials, // only an error is returned -func put(u string, body []byte, client *http.Client, username, password, token string) error { +func put(u string, body []byte, client *http.Client, username, password, token string, options ...any) error { req, err := http.NewRequest(http.MethodPut, u, bytes.NewBuffer(body)) if err != nil { return err @@ -436,6 +456,12 @@ func put(u string, body []byte, client *http.Client, username, password, token s req.SetBasicAuth(username, password) } + for _, o := range options { + if requestOption, ok := o.(RequestOption); ok { + requestOption(req) + } + } + resp, err := client.Do(req) if err != nil { return err @@ -447,7 +473,7 @@ func put(u string, body []byte, client *http.Client, username, password, token s // post makes a request to a url using a provided client and credentials, // returning the response body and error. -func post(u string, body []byte, client *http.Client, username, password, token string) ([]byte, error) { +func post(u string, body []byte, client *http.Client, username, password, token string, options ...any) ([]byte, error) { req, err := http.NewRequest(http.MethodPost, u, bytes.NewBuffer(body)) if err != nil { return nil, err @@ -459,6 +485,12 @@ func post(u string, body []byte, client *http.Client, username, password, token req.SetBasicAuth(username, password) } + for _, o := range options { + if requestOption, ok := o.(RequestOption); ok { + requestOption(req) + } + } + resp, err := client.Do(req) if err != nil { return nil, err diff --git a/pkg/spegel/bootstrap.go b/pkg/spegel/bootstrap.go index b88e4040dfe5..7f79ef747ff3 100644 --- a/pkg/spegel/bootstrap.go +++ b/pkg/spegel/bootstrap.go @@ -2,7 +2,7 @@ package spegel import ( "context" - "math/rand" + "encoding/json" "os" "path/filepath" "strings" @@ -17,6 +17,7 @@ import ( "github.com/rancher/wrangler/v3/pkg/merr" "github.com/sirupsen/logrus" "github.com/spegel-org/spegel/pkg/routing" + "golang.org/x/sync/errgroup" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -39,13 +40,20 @@ func NewSelfBootstrapper() routing.Bootstrapper { return &selfBootstrapper{} } -func (s *selfBootstrapper) Run(_ context.Context, id string) error { +func (s *selfBootstrapper) Run(ctx context.Context, id string) error { s.id = id - return nil + return waitForDone(ctx) } -func (s *selfBootstrapper) Get() (*peer.AddrInfo, error) { - return peer.AddrInfoFromString(s.id) +func (s *selfBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) { + if s.id == "" { + return nil, errors.New("p2p peer not ready") + } + self, err := peer.AddrInfoFromString(s.id) + if err != nil { + return nil, err + } + return []peer.AddrInfo{*self}, nil } type agentBootstrapper struct { @@ -53,6 +61,8 @@ type agentBootstrapper struct { token string clientCert string clientKey string + kubeConfig string + info *clientaccess.Info } // NewAgentBootstrapper returns a p2p bootstrapper that retrieves a peer address from its server @@ -60,79 +70,104 @@ func NewAgentBootstrapper(server, token, dataDir string) routing.Bootstrapper { return &agentBootstrapper{ clientCert: filepath.Join(dataDir, "agent", "client-kubelet.crt"), clientKey: filepath.Join(dataDir, "agent", "client-kubelet.key"), + kubeConfig: filepath.Join(dataDir, "agent", "kubelet.kubeconfig"), server: server, token: token, } } -func (c *agentBootstrapper) Run(_ context.Context, _ string) error { - return nil +func (c *agentBootstrapper) Run(ctx context.Context, id string) error { + if c.server != "" && c.token != "" { + withCert := clientaccess.WithClientCertificate(c.clientCert, c.clientKey) + info, err := clientaccess.ParseAndValidateToken(c.server, c.token, withCert) + if err != nil { + return errors.Wrap(err, "failed to validate join token") + } + c.info = info + } + + client, err := util.GetClientSet(c.kubeConfig) + if err != nil { + return errors.Wrap(err, "failed to create kubernetes client") + } + nodes := client.CoreV1().Nodes() + + go wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { + nodeName := os.Getenv("NODE_NAME") + if nodeName == "" { + return false, nil + } + node, err := nodes.Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + logrus.Debugf("Failed to update P2P address annotations and labels: %v", err) + return false, nil + } + + if node.Annotations == nil { + node.Annotations = map[string]string{} + } + node.Annotations[P2pAddressAnnotation] = id + if node.Labels == nil { + node.Labels = map[string]string{} + } + node.Labels[P2pEnabledLabel] = "true" + + if _, err = nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { + logrus.Debugf("Failed to update P2P address annotations and labels: %v", err) + return false, nil + } + logrus.Infof("Node P2P address annotations and labels added: %s", id) + return true, nil + }) + return waitForDone(ctx) } -func (c *agentBootstrapper) Get() (*peer.AddrInfo, error) { +func (c *agentBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) { if c.server == "" || c.token == "" { return nil, errors.New("cannot get addresses without server and token") } - withCert := clientaccess.WithClientCertificate(c.clientCert, c.clientKey) - info, err := clientaccess.ParseAndValidateToken(c.server, c.token, withCert) - if err != nil { - return nil, err + if c.info == nil { + return nil, errors.New("client not ready") } - addr, err := info.Get("/v1-" + version.Program + "/p2p") + addr, err := c.info.Get("/v1-"+version.Program+"/p2p", clientaccess.WithHeader("Accept", "application/json")) if err != nil { return nil, err } - addrInfo, err := peer.AddrInfoFromString(string(addr)) - return addrInfo, err + // If the response cannot be decoded as a JSON list of addresses, fall back + // to using it as a legacy single-address response. + addrs := []string{} + if err := json.Unmarshal(addr, &addrs); err != nil { + addrs = append(addrs, string(addr)) + } + + addrInfos := []peer.AddrInfo{} + for _, addr := range addrs { + if addrInfo, err := peer.AddrInfoFromString(addr); err == nil { + addrInfos = append(addrInfos, *addrInfo) + } + } + return addrInfos, nil } type serverBootstrapper struct { controlConfig *config.Control } -// NewServerBootstrapper returns a p2p bootstrapper that returns an address from a random other cluster member. +// NewServerBootstrapper returns a p2p bootstrapper that returns an address from the Kubernetes node list func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper { return &serverBootstrapper{ controlConfig: controlConfig, } } -func (s *serverBootstrapper) Run(_ context.Context, id string) error { - s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) { - nodes := s.controlConfig.Runtime.Core.Core().V1().Node() - _ = wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { - nodeName := os.Getenv("NODE_NAME") - if nodeName == "" { - return false, nil - } - node, err := nodes.Get(nodeName, metav1.GetOptions{}) - if err != nil { - return false, nil - } - - if node.Annotations == nil { - node.Annotations = map[string]string{} - } - node.Annotations[P2pAddressAnnotation] = id - if node.Labels == nil { - node.Labels = map[string]string{} - } - node.Labels[P2pEnabledLabel] = "true" - - if _, err = nodes.Update(node); err != nil { - return false, nil - } - logrus.Infof("Node P2P address annotations and labels added: %s", id) - return true, nil - }) - } - return nil +func (s *serverBootstrapper) Run(ctx context.Context, _ string) error { + return waitForDone(ctx) } -func (s *serverBootstrapper) Get() (addrInfo *peer.AddrInfo, err error) { +func (s *serverBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) { if s.controlConfig.Runtime.Core == nil { return nil, util.ErrCoreNotReady } @@ -146,8 +181,9 @@ func (s *serverBootstrapper) Get() (addrInfo *peer.AddrInfo, err error) { if err != nil { return nil, err } - for _, i := range rand.Perm(len(nodeList.Items)) { - node := nodeList.Items[i] + + addrs := []peer.AddrInfo{} + for _, node := range nodeList.Items { if node.Name == nodeName { // don't return our own address continue @@ -159,12 +195,12 @@ func (s *serverBootstrapper) Get() (addrInfo *peer.AddrInfo, err error) { if val, ok := node.Annotations[P2pAddressAnnotation]; ok { for _, addr := range strings.Split(val, ",") { if info, err := peer.AddrInfoFromString(addr); err == nil { - return info, nil + addrs = append(addrs, *info) } } } } - return nil, errors.New("no ready p2p peers found") + return addrs, nil } type chainingBootstrapper struct { @@ -172,6 +208,7 @@ type chainingBootstrapper struct { } // NewChainingBootstrapper returns a p2p bootstrapper that passes through to a list of bootstrappers. +// Addressess are returned from all boostrappers that return successfully. func NewChainingBootstrapper(bootstrappers ...routing.Bootstrapper) routing.Bootstrapper { return &chainingBootstrapper{ bootstrappers: bootstrappers, @@ -179,23 +216,38 @@ func NewChainingBootstrapper(bootstrappers ...routing.Bootstrapper) routing.Boot } func (c *chainingBootstrapper) Run(ctx context.Context, id string) error { + eg, ctx := errgroup.WithContext(ctx) + for i := range c.bootstrappers { + b := c.bootstrappers[i] + eg.Go(func() error { + return b.Run(ctx, id) + }) + } + return eg.Wait() +} + +func (c *chainingBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) { errs := merr.Errors{} - for _, b := range c.bootstrappers { - if err := b.Run(ctx, id); err != nil { + addrs := []peer.AddrInfo{} + for i := range c.bootstrappers { + b := c.bootstrappers[i] + as, err := b.Get(ctx) + if err != nil { errs = append(errs, err) + } else { + addrs = append(addrs, as...) } } - return merr.NewErrors(errs...) + if len(addrs) == 0 { + return nil, merr.NewErrors(errs...) + } + return addrs, nil } -func (c *chainingBootstrapper) Get() (*peer.AddrInfo, error) { - errs := merr.Errors{} - for _, b := range c.bootstrappers { - addr, err := b.Get() - if err == nil { - return addr, nil - } - errs = append(errs, err) +func waitForDone(ctx context.Context) error { + <-ctx.Done() + if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { + return err } - return nil, merr.NewErrors(errs...) + return nil } diff --git a/pkg/spegel/spegel.go b/pkg/spegel/spegel.go index aacdadca84c7..5e10333d44af 100644 --- a/pkg/spegel/spegel.go +++ b/pkg/spegel/spegel.go @@ -2,6 +2,7 @@ package spegel import ( "context" + "encoding/json" "fmt" "log" "net" @@ -131,7 +132,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error { if logrus.IsLevelEnabled(logrus.DebugLevel) { level = ipfslog.LevelDebug stdlog := log.New(logrus.StandardLogger().Writer(), "spegel ", log.LstdFlags) - logger := stdr.NewWithOptions(stdlog, stdr.Options{Verbosity: ptr.To(10)}) + logger := stdr.NewWithOptions(stdlog, stdr.Options{Verbosity: ptr.To(7)}) ctx = logr.NewContext(ctx, logger) } ipfslog.SetAllLoggers(level) @@ -197,7 +198,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error { } router, err := routing.NewP2PRouter(ctx, routerAddr, c.Bootstrapper, c.RegistryPort, opts...) if err != nil { - return errors.Wrap(err, "failed to create p2p router") + return errors.Wrap(err, "failed to create P2P router") } go router.Run(ctx) @@ -216,13 +217,10 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error { registry.WithLogger(logr.FromContextOrDiscard(ctx)), } reg := registry.NewRegistry(ociClient, router, registryOpts...) - regSvr := reg.Server(":" + c.RegistryPort) - - // Close router on shutdown - go func() { - <-ctx.Done() - router.Close() - }() + regSvr, err := reg.Server(":" + c.RegistryPort) + if err != nil { + return errors.Wrap(err, "failed to create embedded registry server") + } // Track images available in containerd and publish via p2p router go state.Track(ctx, ociClient, router, resolveLatestTag) @@ -232,29 +230,52 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error { return err } mRouter.PathPrefix("/v2").Handler(regSvr.Handler) - mRouter.PathPrefix("/v1-" + version.Program + "/p2p").Handler(c.peerInfo()) + mRouter.PathPrefix("/v1-{program}/p2p").Handler(c.peerInfo()) // Wait up to 5 seconds for the p2p network to find peers. This will return // immediately if the node is bootstrapping from itself. - _ = wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) { - return router.Ready() - }) - + if err := wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) { + ready, _ := router.Ready(ctx) + return ready, nil + }); err != nil { + logrus.Warnf("Failed to wait for P2P mesh to become ready, will retry in the background: %v", err) + } return nil } // peerInfo sends a peer address retrieved from the bootstrapper via HTTP func (c *Config) peerInfo() http.HandlerFunc { return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + info, err := c.Bootstrapper.Get(req.Context()) + if err != nil || len(info) == 0 { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return + } + + addrs := []string{} + for _, ai := range info { + for _, ma := range ai.Addrs { + addrs = append(addrs, fmt.Sprintf("%s/p2p/%s", ma, ai.ID)) + } + } + client, _, _ := net.SplitHostPort(req.RemoteAddr) - info, err := c.Bootstrapper.Get() - if err != nil { - http.Error(resp, "Internal Error", http.StatusInternalServerError) + if req.Header.Get("Accept") == "application/json" { + b, err := json.Marshal(addrs) + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return + } + logrus.Debugf("Serving p2p peer addrs %v to client at %s", addrs, client) + resp.Header().Set("Content-Type", "application/json") + resp.WriteHeader(http.StatusOK) + resp.Write(b) return } - logrus.Debugf("Serving p2p peer addr %s to client at %s", info, client) - resp.WriteHeader(http.StatusOK) + + logrus.Debugf("Serving p2p peer addr %v to client at %s", addrs[0], client) resp.Header().Set("Content-Type", "text/plain") - fmt.Fprintf(resp, "%s/p2p/%s", info.Addrs[0].String(), info.ID.String()) + resp.WriteHeader(http.StatusOK) + resp.Write([]byte(addrs[0])) }) }