fix(p2p): re-use p2p host when running federated mode (#3341)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto 2024-08-20 20:14:17 +02:00 committed by GitHub
parent aca2c4196a
commit 2669f4738a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 13 additions and 16 deletions

View File

@ -145,15 +145,13 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if err != nil { if err != nil {
return err return err
} }
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID)); err != nil { fedCtx := context.Background()
return err node, err := p2p.ExposeService(fedCtx, "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID))
}
node, err := p2p.NewNode(token)
if err != nil { if err != nil {
return err return err
} }
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil, false); err != nil { if err := p2p.ServiceDiscoverer(fedCtx, node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil, false); err != nil {
return err return err
} }
} }

View File

@ -60,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort p = r.RunnerPort
} }
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) _, err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil { if err != nil {
return err return err
} }
@ -100,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
} }
}() }()
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) _, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil { if err != nil {
return err return err
} }

View File

@ -99,5 +99,4 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
}() }()
} }
} }
} }

View File

@ -309,7 +309,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
} }
// This is the P2P worker main // This is the P2P worker main
func ExposeService(ctx context.Context, host, port, token, servicesID string) error { func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
if servicesID == "" { if servicesID == "" {
servicesID = defaultServicesID servicesID = defaultServicesID
} }
@ -317,7 +317,7 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
nodeOpts, err := newNodeOpts(token) nodeOpts, err := newNodeOpts(token)
if err != nil { if err != nil {
return err return nil, err
} }
// generate a random string for the name // generate a random string for the name
name := utils.RandString(10) name := utils.RandString(10)
@ -327,17 +327,17 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...) services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...)
n, err := node.New(nodeOpts...) n, err := node.New(nodeOpts...)
if err != nil { if err != nil {
return fmt.Errorf("creating a new node: %w", err) return nil, fmt.Errorf("creating a new node: %w", err)
} }
err = n.Start(ctx) err = n.Start(ctx)
if err != nil { if err != nil {
return fmt.Errorf("creating a new node: %w", err) return n, fmt.Errorf("creating a new node: %w", err)
} }
ledger, err := n.Ledger() ledger, err := n.Ledger()
if err != nil { if err != nil {
return fmt.Errorf("creating a new node: %w", err) return n, fmt.Errorf("creating a new node: %w", err)
} }
ledger.Announce( ledger.Announce(
@ -354,7 +354,7 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
}, },
) )
return err return n, err
} }
func NewNode(token string) (*node.Node, error) { func NewNode(token string) (*node.Node, error) {

View File

@ -22,8 +22,8 @@ func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID s
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
func ExposeService(ctx context.Context, host, port, token, servicesID string) error { func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
return fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }
func IsP2PEnabled() bool { func IsP2PEnabled() bool {