diff --git a/core/cli/federated.go b/core/cli/federated.go index b99ef4f8..b1de1840 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -34,7 +34,9 @@ func (f *FederatedCLI) Run(ctx *cliContext.Context) error { return fmt.Errorf("creating a new node: %w", err) } - if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil { + if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, func(servicesID string, tunnel p2p.NodeData) { + log.Debug().Msgf("Discovered node: %s", tunnel.ID) + }); err != nil { return err } @@ -98,6 +100,10 @@ func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) err } } + if len(tunnelAddresses) == 0 { + log.Error().Msg("No available nodes yet") + return + } // open a TCP stream to one of the tunnels // chosen randomly // TODO: optimize this and track usage diff --git a/core/cli/run.go b/core/cli/run.go index d7b45f77..b3d91632 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -119,7 +119,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { } log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func() { + if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func(serviceID string, node p2p.NodeData) { var tunnelAddresses []string for _, v := range p2p.GetAvailableNodes("") { if v.IsOnline() { diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index e0e46170..9b71f7de 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -144,7 +144,7 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services -func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func()) error { +func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error { if servicesID == "" { servicesID = defaultServicesID } @@ -166,7 +166,7 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri case tunnel := <-tunnels: AddNode(servicesID, tunnel) if discoveryFunc != nil { - discoveryFunc() + discoveryFunc(servicesID, tunnel) } } } diff --git a/core/p2p/p2p_disabled.go b/core/p2p/p2p_disabled.go index 340a4fb4..b1d1d04a 100644 --- a/core/p2p/p2p_disabled.go +++ b/core/p2p/p2p_disabled.go @@ -14,7 +14,7 @@ func GenerateToken() string { return "not implemented" } -func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func()) error { +func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error { return fmt.Errorf("not implemented") }