//go:build p2p // +build p2p package p2p import ( "context" "errors" "fmt" "io" "net" "os" "sync" "time" "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p/core/peer" "github.com/mudler/LocalAI/pkg/utils" "github.com/mudler/edgevpn/pkg/config" "github.com/mudler/edgevpn/pkg/node" "github.com/mudler/edgevpn/pkg/protocol" "github.com/mudler/edgevpn/pkg/services" "github.com/mudler/edgevpn/pkg/types" "github.com/phayes/freeport" zlog "github.com/rs/zerolog/log" "github.com/mudler/edgevpn/pkg/logger" ) func GenerateToken() string { // Generates a new config and exit newData := node.GenerateNewConnectionData(900) return newData.Base64() } func IsP2PEnabled() bool { return true } func nodeID(s string) string { hostname, _ := os.Hostname() return fmt.Sprintf("%s-%s", hostname, s) } func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) // Open local port for listening l, err := net.Listen("tcp", listenAddr) if err != nil { zlog.Error().Err(err).Msg("Error listening") return err } // ll.Info("Binding local port on", srcaddr) ledger, _ := node.Ledger() // Announce ourselves so nodes accepts our connection ledger.Announce( ctx, 10*time.Second, func() { // Retrieve current ID for ip in the blockchain //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) // If mismatch, update the blockchain //if !found { updatedMap := map[string]interface{}{} updatedMap[node.Host().ID().String()] = &types.User{ PeerID: node.Host().ID().String(), Timestamp: time.Now().String(), } ledger.Add(protocol.UsersLedgerKey, updatedMap) // } }, ) defer l.Close() for { select { case <-ctx.Done(): return errors.New("context canceled") default: zlog.Debug().Msg("New for connection") // Listen for an incoming connection. conn, err := l.Accept() if err != nil { fmt.Println("Error accepting: ", err.Error()) continue } // Handle connections in a new goroutine, forwarding to the p2p service go func() { // Retrieve current ID for ip in the blockchain existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, service) service := &types.Service{} existingValue.Unmarshal(service) // If mismatch, update the blockchain if !found { zlog.Error().Msg("Service not found on blockchain") conn.Close() // ll.Debugf("service '%s' not found on blockchain", serviceID) return } // Decode the Peer d, err := peer.Decode(service.PeerID) if err != nil { zlog.Error().Msg("cannot decode peer") conn.Close() // ll.Debugf("could not decode peer '%s'", service.PeerID) return } // Open a stream stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) if err != nil { zlog.Error().Msg("cannot open stream peer") conn.Close() // ll.Debugf("could not open stream '%s'", err.Error()) return } // ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String()) zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String()) closer := make(chan struct{}, 2) go copyStream(closer, stream, conn) go copyStream(closer, conn, stream) <-closer stream.Close() conn.Close() // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) }() } } } // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error { if servicesID == "" { servicesID = defaultServicesID } tunnels, err := discoveryTunnels(ctx, n, token, servicesID) if err != nil { return err } // TODO: discoveryTunnels should return all the nodes that are available? // In this way we updated availableNodes here instead of appending // e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels // each time the node is seen // In this case the below function should be idempotent and just keep track of the nodes go func() { for { select { case <-ctx.Done(): zlog.Error().Msg("Discoverer stopped") return case tunnel := <-tunnels: AddNode(servicesID, tunnel) if discoveryFunc != nil { discoveryFunc(servicesID, tunnel) } } } }() return nil } func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string) (chan NodeData, error) { tunnels := make(chan NodeData) err := n.Start(ctx) if err != nil { return nil, fmt.Errorf("creating a new node: %w", err) } ledger, err := n.Ledger() if err != nil { return nil, fmt.Errorf("creating a new node: %w", err) } // get new services, allocate and return to the channel // TODO: // a function ensureServices that: // - starts a service if not started, if the worker is Online // - checks that workers are Online, if not cancel the context of allocateLocalService // - discoveryTunnels should return all the nodes and addresses associated with it // - the caller should take now care of the fact that we are always returning fresh informations go func() { for { select { case <-ctx.Done(): zlog.Error().Msg("Discoverer stopped") return default: time.Sleep(5 * time.Second) zlog.Debug().Msg("Searching for workers") data := ledger.LastBlock().Storage[servicesID] for k, v := range data { zlog.Info().Msgf("Found worker %s", k) nd := &NodeData{} if err := v.Unmarshal(nd); err != nil { zlog.Error().Msg("cannot unmarshal node data") continue } ensureService(ctx, n, nd, k) muservice.Lock() if _, ok := service[nd.Name]; ok { tunnels <- service[nd.Name].NodeData } muservice.Unlock() } } } }() return tunnels, err } type nodeServiceData struct { NodeData NodeData CancelFunc context.CancelFunc } var service = map[string]nodeServiceData{} var muservice sync.Mutex func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) { muservice.Lock() defer muservice.Unlock() if ndService, found := service[nd.Name]; !found { if !nd.IsOnline() { // if node is offline and not present, do nothing return } newCtxm, cancel := context.WithCancel(ctx) // Start the service port, err := freeport.GetFreePort() if err != nil { fmt.Print(err) } tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) nd.TunnelAddress = tunnelAddress service[nd.Name] = nodeServiceData{ NodeData: *nd, CancelFunc: cancel, } go allocateLocalService(newCtxm, n, tunnelAddress, sserv) zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress) } else { // Check if the service is still alive // if not cancel the context if !nd.IsOnline() && !ndService.NodeData.IsOnline() { ndService.CancelFunc() delete(service, nd.Name) zlog.Info().Msgf("Node %s is offline, deleting", nd.ID) } else if nd.IsOnline() { // update last seen inside service nd.TunnelAddress = ndService.NodeData.TunnelAddress service[nd.Name] = nodeServiceData{ NodeData: *nd, CancelFunc: ndService.CancelFunc, } zlog.Debug().Msgf("Node %s is still online", nd.ID) } } } // This is the P2P worker main func ExposeService(ctx context.Context, host, port, token, servicesID string) error { if servicesID == "" { servicesID = defaultServicesID } llger := logger.New(log.LevelFatal) nodeOpts, err := newNodeOpts(token) if err != nil { return err } // generate a random string for the name name := utils.RandString(10) // Register the service nodeOpts = append(nodeOpts, services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...) n, err := node.New(nodeOpts...) if err != nil { return fmt.Errorf("creating a new node: %w", err) } err = n.Start(ctx) if err != nil { return fmt.Errorf("creating a new node: %w", err) } ledger, err := n.Ledger() if err != nil { return fmt.Errorf("creating a new node: %w", err) } ledger.Announce( ctx, 20*time.Second, func() { // Retrieve current ID for ip in the blockchain //_, found := ledger.GetKey("services_localai", name) // If mismatch, update the blockchain //if !found { updatedMap := map[string]interface{}{} updatedMap[name] = &NodeData{ Name: name, LastSeen: time.Now(), ID: nodeID(name), } ledger.Add(servicesID, updatedMap) // } }, ) return err } func NewNode(token string) (*node.Node, error) { nodeOpts, err := newNodeOpts(token) if err != nil { return nil, err } n, err := node.New(nodeOpts...) if err != nil { return nil, fmt.Errorf("creating a new node: %w", err) } return n, nil } func newNodeOpts(token string) ([]node.Option, error) { llger := logger.New(log.LevelFatal) defaultInterval := 10 * time.Second // TODO: move this up, expose more config options when creating a node noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true" noLimits := os.Getenv("LOCALAI_P2P_DISABLE_LIMITS") == "true" loglevel := "info" c := config.Config{ Limit: config.ResourceLimit{ Enable: !noLimits, MaxConns: 100, }, NetworkToken: token, LowProfile: false, LogLevel: loglevel, Libp2pLogLevel: "fatal", Ledger: config.Ledger{ SyncInterval: defaultInterval, AnnounceInterval: defaultInterval, }, NAT: config.NAT{ Service: true, Map: true, RateLimit: true, RateLimitGlobal: 10, RateLimitPeer: 10, RateLimitInterval: defaultInterval, }, Discovery: config.Discovery{ DHT: noDHT, MDNS: true, Interval: 30 * time.Second, }, Connection: config.Connection{ HolePunch: true, AutoRelay: true, MaxConnections: 100, }, } nodeOpts, _, err := c.ToOpts(llger) if err != nil { return nil, fmt.Errorf("parsing options: %w", err) } nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...) return nodeOpts, nil } func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy io.Copy(dst, src) }