diff --git a/.github/release.yml b/.github/release.yml index 8c2c11f9..eee7f6ec 100644 --- a/.github/release.yml +++ b/.github/release.yml @@ -13,6 +13,9 @@ changelog: labels: - bug - regression + - title: "🖧 P2P area" + labels: + - area/p2p - title: Exciting New Features 🎉 labels: - Semver-Minor diff --git a/Makefile b/Makefile index df9b9474..99b16e66 100644 --- a/Makefile +++ b/Makefile @@ -53,8 +53,8 @@ RANDOM := $(shell bash -c 'echo $$RANDOM') VERSION?=$(shell git describe --always --tags || echo "dev" ) # go tool nm ./local-ai | grep Commit LD_FLAGS?= -override LD_FLAGS += -X "github.com/go-skynet/LocalAI/internal.Version=$(VERSION)" -override LD_FLAGS += -X "github.com/go-skynet/LocalAI/internal.Commit=$(shell git rev-parse HEAD)" +override LD_FLAGS += -X "github.com/mudler/LocalAI/internal.Version=$(VERSION)" +override LD_FLAGS += -X "github.com/mudler/LocalAI/internal.Commit=$(shell git rev-parse HEAD)" OPTIONAL_TARGETS?= @@ -147,7 +147,7 @@ endif # glibc-static or glibc-devel-static required ifeq ($(STATIC),true) - LD_FLAGS=-linkmode external -extldflags -static + LD_FLAGS+=-linkmode external -extldflags -static endif ifeq ($(findstring stablediffusion,$(GO_TAGS)),stablediffusion) diff --git a/core/cli/cli.go b/core/cli/cli.go index b88db7b2..0fed33fd 100644 --- a/core/cli/cli.go +++ b/core/cli/cli.go @@ -9,6 +9,7 @@ var CLI struct { cliContext.Context `embed:""` Run RunCMD `cmd:"" help:"Run LocalAI, this the default command if no other command is specified. Run 'local-ai run --help' for more information" default:"withargs"` + Federated FederatedCLI `cmd:"" help:"Run LocalAI in federated mode"` Models ModelsCMD `cmd:"" help:"Manage LocalAI models and definitions"` TTS TTSCMD `cmd:"" help:"Convert text to speech"` Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"` diff --git a/core/cli/federated.go b/core/cli/federated.go new file mode 100644 index 00000000..b99ef4f8 --- /dev/null +++ b/core/cli/federated.go @@ -0,0 +1,130 @@ +package cli + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "time" + + "math/rand/v2" + + cliContext "github.com/mudler/LocalAI/core/cli/context" + "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/edgevpn/pkg/node" + "github.com/mudler/edgevpn/pkg/protocol" + "github.com/mudler/edgevpn/pkg/types" + "github.com/rs/zerolog/log" +) + +type FederatedCLI struct { + Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` + Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` +} + +func (f *FederatedCLI) Run(ctx *cliContext.Context) error { + + n, err := p2p.NewNode(f.Peer2PeerToken) + if err != nil { + return fmt.Errorf("creating a new node: %w", err) + } + err = n.Start(context.Background()) + if err != nil { + return fmt.Errorf("creating a new node: %w", err) + } + + if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil { + return err + } + + return Proxy(context.Background(), n, f.Address, p2p.FederatedID) +} + +func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error { + + log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) + // Open local port for listening + l, err := net.Listen("tcp", listenAddr) + if err != nil { + log.Error().Err(err).Msg("Error listening") + return err + } + // ll.Info("Binding local port on", srcaddr) + + ledger, _ := node.Ledger() + + // Announce ourselves so nodes accepts our connection + ledger.Announce( + ctx, + 10*time.Second, + func() { + // Retrieve current ID for ip in the blockchain + //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) + // If mismatch, update the blockchain + //if !found { + updatedMap := map[string]interface{}{} + updatedMap[node.Host().ID().String()] = &types.User{ + PeerID: node.Host().ID().String(), + Timestamp: time.Now().String(), + } + ledger.Add(protocol.UsersLedgerKey, updatedMap) + // } + }, + ) + + defer l.Close() + for { + select { + case <-ctx.Done(): + return errors.New("context canceled") + default: + log.Debug().Msg("New for connection") + // Listen for an incoming connection. + conn, err := l.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + continue + } + + // Handle connections in a new goroutine, forwarding to the p2p service + go func() { + var tunnelAddresses []string + for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } else { + log.Info().Msgf("Node %s is offline", v.ID) + } + } + + // open a TCP stream to one of the tunnels + // chosen randomly + // TODO: optimize this and track usage + tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] + + tunnelConn, err := net.Dial("tcp", tunnelAddr) + if err != nil { + log.Error().Err(err).Msg("Error connecting to tunnel") + return + } + + log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) + closer := make(chan struct{}, 2) + go copyStream(closer, tunnelConn, conn) + go copyStream(closer, conn, tunnelConn) + <-closer + + tunnelConn.Close() + conn.Close() + // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) + }() + } + } + +} + +func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { + defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy + io.Copy(dst, src) +} diff --git a/core/cli/run.go b/core/cli/run.go index 2d19b072..4a313391 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -3,6 +3,8 @@ package cli import ( "context" "fmt" + "net" + "os" "strings" "time" @@ -50,7 +52,7 @@ type RunCMD struct { DisableWebUI bool `env:"LOCALAI_DISABLE_WEBUI,DISABLE_WEBUI" default:"false" help:"Disable webui" group:"api"` OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"api"` Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"` - Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` + Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"` SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"` PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"` @@ -59,6 +61,7 @@ type RunCMD struct { WatchdogIdleTimeout string `env:"LOCALAI_WATCHDOG_IDLE_TIMEOUT,WATCHDOG_IDLE_TIMEOUT" default:"15m" help:"Threshold beyond which an idle backend should be stopped" group:"backends"` EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"` WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"` + Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"` } func (r *RunCMD) Run(ctx *cliContext.Context) error { @@ -91,9 +94,10 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { config.WithOpaqueErrors(r.OpaqueErrors), } + token := "" if r.Peer2Peer || r.Peer2PeerToken != "" { log.Info().Msg("P2P mode enabled") - token := r.Peer2PeerToken + token = r.Peer2PeerToken if token == "" { // IF no token is provided, and p2p is enabled, // we generate one and wait for the user to pick up the token (this is for interactive) @@ -104,14 +108,46 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { log.Info().Msg("To use the token, you can run the following command in another node or terminal:") fmt.Printf("export TOKEN=\"%s\"\nlocal-ai worker p2p-llama-cpp-rpc\n", token) - - // Ask for user confirmation - log.Info().Msg("Press a button to proceed") - var input string - fmt.Scanln(&input) } + opts = append(opts, config.WithP2PToken(token)) + + node, err := p2p.NewNode(token) + if err != nil { + return err + } + log.Info().Msg("Starting P2P server discovery...") - if err := p2p.LLamaCPPRPCServerDiscoverer(context.Background(), token); err != nil { + if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func() { + var tunnelAddresses []string + for _, v := range p2p.GetAvailableNodes("") { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } else { + log.Info().Msgf("Node %s is offline", v.ID) + } + } + tunnelEnvVar := strings.Join(tunnelAddresses, ",") + + os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar) + log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar) + }); err != nil { + return err + } + } + + if r.Federated { + _, port, err := net.SplitHostPort(r.Address) + if err != nil { + return err + } + if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil { + return err + } + node, err := p2p.NewNode(token) + if err != nil { + return err + } + if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil { return err } } diff --git a/core/cli/worker/worker_p2p.go b/core/cli/worker/worker_p2p.go index 4651c36e..2eb5cb94 100644 --- a/core/cli/worker/worker_p2p.go +++ b/core/cli/worker/worker_p2p.go @@ -20,7 +20,7 @@ import ( type P2P struct { WorkerFlags `embed:""` - Token string `env:"LOCALAI_TOKEN,TOKEN" help:"JSON list of galleries"` + Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"` NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"` RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"` RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"` @@ -59,7 +59,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { p = r.RunnerPort } - err = p2p.BindLLamaCPPWorker(context.Background(), address, p, r.Token) + err = p2p.ExposeService(context.Background(), address, p, r.Token, "") if err != nil { return err } @@ -99,7 +99,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { } }() - err = p2p.BindLLamaCPPWorker(context.Background(), address, fmt.Sprint(port), r.Token) + err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "") if err != nil { return err } diff --git a/core/config/application_config.go b/core/config/application_config.go index 65c716f8..1bac349b 100644 --- a/core/config/application_config.go +++ b/core/config/application_config.go @@ -32,6 +32,7 @@ type ApplicationConfig struct { CORSAllowOrigins string ApiKeys []string OpaqueErrors bool + P2PToken string ModelLibraryURL string @@ -95,6 +96,12 @@ func WithCsrf(b bool) AppOption { } } +func WithP2PToken(s string) AppOption { + return func(o *ApplicationConfig) { + o.P2PToken = s + } +} + func WithModelLibraryURL(url string) AppOption { return func(o *ApplicationConfig) { o.ModelLibraryURL = url diff --git a/core/http/elements/gallery.go b/core/http/elements/gallery.go index 373de038..3b3741d8 100644 --- a/core/http/elements/gallery.go +++ b/core/http/elements/gallery.go @@ -7,6 +7,7 @@ import ( "github.com/chasefleming/elem-go" "github.com/chasefleming/elem-go/attrs" "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/p2p" "github.com/mudler/LocalAI/core/services" "github.com/mudler/LocalAI/pkg/xsync" ) @@ -15,6 +16,14 @@ const ( noImage = "https://upload.wikimedia.org/wikipedia/commons/6/65/No-Image-Placeholder.svg" ) +func renderElements(n []elem.Node) string { + render := "" + for _, r := range n { + render += r.Render() + } + return render +} + func DoneProgress(galleryID, text string, showDelete bool) string { var modelName = galleryID // Split by @ and grab the name @@ -72,6 +81,135 @@ func ProgressBar(progress string) string { ).Render() } +func P2PNodeStats(nodes []p2p.NodeData) string { + /* +
Total Workers Detected: {{ len .Nodes }}
+ {{ $online := 0 }} + {{ range .Nodes }} + {{ if .IsOnline }} + {{ $online = add $online 1 }} + {{ end }} + {{ end }} +Total Online Workers: {{$online}}
++ Status: + + + {{ if .IsOnline }}Online{{ else }}Offline{{ end }} + +
+Federated Nodes:
+You can start LocalAI in federated mode to share your instance, or start the federated server to balance requests between nodes of the federation.
+ +Workers (llama.cpp):
+You can start llama.cpp workers to distribute weights between the workers and offload part of the computation. To start a new worker, you can use the CLI or Docker.
+ +