From a68d3e1a31cddab44d4bb1b494c3c2faca790f8c Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sat, 17 Aug 2024 09:59:40 +0200 Subject: [PATCH] fix(p2p): allocate tunnels only when needed Signed-off-by: Ettore Di Giacinto --- core/cli/run.go | 4 ++-- core/p2p/federated_server.go | 2 +- core/p2p/p2p.go | 33 ++++++++++++++++++--------------- core/p2p/p2p_disabled.go | 2 +- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/core/cli/run.go b/core/cli/run.go index b2f73ef935a7..23939548e6b1 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -135,7 +135,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar) log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar) - }); err != nil { + }, true); err != nil { return err } } @@ -153,7 +153,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { return err } - if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil); err != nil { + if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil, false); err != nil { return err } } diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index acd6e7bf9031..c356ae9605ac 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -29,7 +29,7 @@ func (f *FederatedServer) Start(ctx context.Context) error { if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) { log.Debug().Msgf("Discovered node: %s", tunnel.ID) - }); err != nil { + }, true); err != nil { return err } diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index 758cb621b059..c1039f58ae5d 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -139,11 +139,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services -func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error { +func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error { if servicesID == "" { servicesID = defaultServicesID } - tunnels, err := discoveryTunnels(ctx, n, token, servicesID) + tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate) if err != nil { return err } @@ -170,7 +170,7 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri return nil } -func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string) (chan NodeData, error) { +func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) { tunnels := make(chan NodeData) err := n.Start(ctx) @@ -209,7 +209,7 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin zlog.Error().Msg("cannot unmarshal node data") continue } - ensureService(ctx, n, nd, k) + ensureService(ctx, n, nd, k, allocate) muservice.Lock() if _, ok := service[nd.Name]; ok { tunnels <- service[nd.Name].NodeData @@ -231,7 +231,7 @@ type nodeServiceData struct { var service = map[string]nodeServiceData{} var muservice sync.Mutex -func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) { +func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) { muservice.Lock() defer muservice.Unlock() if ndService, found := service[nd.Name]; !found { @@ -240,22 +240,25 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string zlog.Debug().Msgf("Node %s is offline", nd.ID) return } + newCtxm, cancel := context.WithCancel(ctx) - // Start the service - port, err := freeport.GetFreePort() - if err != nil { - zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID) - return - } + if allocate { + // Start the service + port, err := freeport.GetFreePort() + if err != nil { + zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID) + return + } - tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) - nd.TunnelAddress = tunnelAddress + tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) + nd.TunnelAddress = tunnelAddress + go allocateLocalService(newCtxm, n, tunnelAddress, sserv) + zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress) + } service[nd.Name] = nodeServiceData{ NodeData: *nd, CancelFunc: cancel, } - go allocateLocalService(newCtxm, n, tunnelAddress, sserv) - zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress) } else { // Check if the service is still alive // if not cancel the context diff --git a/core/p2p/p2p_disabled.go b/core/p2p/p2p_disabled.go index ab1d69dc26be..92241f42e0f3 100644 --- a/core/p2p/p2p_disabled.go +++ b/core/p2p/p2p_disabled.go @@ -18,7 +18,7 @@ func (f *FederatedServer) Start(ctx context.Context) error { return fmt.Errorf("not implemented") } -func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error { +func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData), allocate bool) error { return fmt.Errorf("not implemented") }