From c7357a98722667070ca8834333b06ce985466057 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 18 Jul 2024 14:44:31 +0200 Subject: [PATCH] fix: short-circuit when nodes aren't detected (#2909) Fixes: ``` panic: invalid argument to IntN goroutine 401 [running]: math/rand/v2.(*Rand).IntN(...) /home/mudler/_git/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.4.linux-amd64/src/math/rand/v2/rand.go:190 math/rand/v2.IntN(...) /home/mudler/_git/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.4.linux-amd64/src/math/rand/v2/rand.go:307 github.com/mudler/LocalAI/core/cli.Proxy.func2() /home/mudler/_git/LocalAI/core/cli/federated.go:104 +0x76e created by github.com/mudler/LocalAI/core/cli.Proxy in goroutine 1 /home/mudler/_git/LocalAI/core/cli/federated.go:91 +0x3c5 ``` When no nodes are found and something is trying to hit the federated endpoint (and no tunnels are ready yet). Signed-off-by: Ettore Di Giacinto --- core/cli/federated.go | 8 +++++++- core/cli/run.go | 2 +- core/p2p/p2p.go | 4 ++-- core/p2p/p2p_disabled.go | 2 +- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/cli/federated.go b/core/cli/federated.go index b99ef4f8..b1de1840 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -34,7 +34,9 @@ func (f *FederatedCLI) Run(ctx *cliContext.Context) error { return fmt.Errorf("creating a new node: %w", err) } - if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil { + if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, func(servicesID string, tunnel p2p.NodeData) { + log.Debug().Msgf("Discovered node: %s", tunnel.ID) + }); err != nil { return err } @@ -98,6 +100,10 @@ func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) err } } + if len(tunnelAddresses) == 0 { + log.Error().Msg("No available nodes yet") + return + } // open a TCP stream to one of the tunnels // chosen randomly // TODO: optimize this and track usage diff --git a/core/cli/run.go b/core/cli/run.go index d7b45f77..b3d91632 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -119,7 +119,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { } log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func() { + if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func(serviceID string, node p2p.NodeData) { var tunnelAddresses []string for _, v := range p2p.GetAvailableNodes("") { if v.IsOnline() { diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index e0e46170..9b71f7de 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -144,7 +144,7 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services -func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func()) error { +func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error { if servicesID == "" { servicesID = defaultServicesID } @@ -166,7 +166,7 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri case tunnel := <-tunnels: AddNode(servicesID, tunnel) if discoveryFunc != nil { - discoveryFunc() + discoveryFunc(servicesID, tunnel) } } } diff --git a/core/p2p/p2p_disabled.go b/core/p2p/p2p_disabled.go index 340a4fb4..b1d1d04a 100644 --- a/core/p2p/p2p_disabled.go +++ b/core/p2p/p2p_disabled.go @@ -14,7 +14,7 @@ func GenerateToken() string { return "not implemented" } -func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func()) error { +func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error { return fmt.Errorf("not implemented") }