mirror of
https://github.com/mudler/LocalAI.git
synced 2024-12-19 20:57:54 +00:00
refactor: move federated server logic to its own service
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
parent
27e16a00fa
commit
3335738e34
@ -2,20 +2,9 @@ package cli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"math/rand/v2"
|
|
||||||
|
|
||||||
cliContext "github.com/mudler/LocalAI/core/cli/context"
|
cliContext "github.com/mudler/LocalAI/core/cli/context"
|
||||||
"github.com/mudler/LocalAI/core/p2p"
|
"github.com/mudler/LocalAI/core/p2p"
|
||||||
"github.com/mudler/edgevpn/pkg/node"
|
|
||||||
"github.com/mudler/edgevpn/pkg/protocol"
|
|
||||||
"github.com/mudler/edgevpn/pkg/types"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type FederatedCLI struct {
|
type FederatedCLI struct {
|
||||||
@ -24,107 +13,7 @@ type FederatedCLI struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
|
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
|
||||||
|
fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken)
|
||||||
|
|
||||||
n, err := p2p.NewNode(f.Peer2PeerToken)
|
return fs.Start(context.Background())
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("creating a new node: %w", err)
|
|
||||||
}
|
|
||||||
err = n.Start(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("creating a new node: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return Proxy(context.Background(), n, f.Address, p2p.FederatedID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error {
|
|
||||||
|
|
||||||
log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
|
|
||||||
// Open local port for listening
|
|
||||||
l, err := net.Listen("tcp", listenAddr)
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msg("Error listening")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// ll.Info("Binding local port on", srcaddr)
|
|
||||||
|
|
||||||
ledger, _ := node.Ledger()
|
|
||||||
|
|
||||||
// Announce ourselves so nodes accepts our connection
|
|
||||||
ledger.Announce(
|
|
||||||
ctx,
|
|
||||||
10*time.Second,
|
|
||||||
func() {
|
|
||||||
// Retrieve current ID for ip in the blockchain
|
|
||||||
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
|
|
||||||
// If mismatch, update the blockchain
|
|
||||||
//if !found {
|
|
||||||
updatedMap := map[string]interface{}{}
|
|
||||||
updatedMap[node.Host().ID().String()] = &types.User{
|
|
||||||
PeerID: node.Host().ID().String(),
|
|
||||||
Timestamp: time.Now().String(),
|
|
||||||
}
|
|
||||||
ledger.Add(protocol.UsersLedgerKey, updatedMap)
|
|
||||||
// }
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
defer l.Close()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return errors.New("context canceled")
|
|
||||||
default:
|
|
||||||
log.Debug().Msg("New for connection")
|
|
||||||
// Listen for an incoming connection.
|
|
||||||
conn, err := l.Accept()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Error accepting: ", err.Error())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle connections in a new goroutine, forwarding to the p2p service
|
|
||||||
go func() {
|
|
||||||
var tunnelAddresses []string
|
|
||||||
for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) {
|
|
||||||
if v.IsOnline() {
|
|
||||||
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
|
|
||||||
} else {
|
|
||||||
log.Info().Msgf("Node %s is offline", v.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// open a TCP stream to one of the tunnels
|
|
||||||
// chosen randomly
|
|
||||||
// TODO: optimize this and track usage
|
|
||||||
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]
|
|
||||||
|
|
||||||
tunnelConn, err := net.Dial("tcp", tunnelAddr)
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msg("Error connecting to tunnel")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
|
|
||||||
closer := make(chan struct{}, 2)
|
|
||||||
go copyStream(closer, tunnelConn, conn)
|
|
||||||
go copyStream(closer, conn, tunnelConn)
|
|
||||||
<-closer
|
|
||||||
|
|
||||||
tunnelConn.Close()
|
|
||||||
conn.Close()
|
|
||||||
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
|
|
||||||
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
|
|
||||||
io.Copy(dst, src)
|
|
||||||
}
|
}
|
||||||
|
13
core/p2p/federatedServer.go
Normal file
13
core/p2p/federatedServer.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package p2p
|
||||||
|
|
||||||
|
type FederatedServer struct {
|
||||||
|
listenAddr, service, p2ptoken string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFederatedServer(listenAddr, service, p2pToken string) *FederatedServer {
|
||||||
|
return &FederatedServer{
|
||||||
|
listenAddr: listenAddr,
|
||||||
|
service: service,
|
||||||
|
p2ptoken: p2pToken,
|
||||||
|
}
|
||||||
|
}
|
127
core/p2p/federation.go
Normal file
127
core/p2p/federation.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
//go:build p2p
|
||||||
|
// +build p2p
|
||||||
|
|
||||||
|
package p2p
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
|
"math/rand/v2"
|
||||||
|
|
||||||
|
"github.com/mudler/edgevpn/pkg/node"
|
||||||
|
"github.com/mudler/edgevpn/pkg/protocol"
|
||||||
|
"github.com/mudler/edgevpn/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (fs *FederatedServer) Start(ctx context.Context) error {
|
||||||
|
n, err := NewNode(fs.p2ptoken)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating a new node: %w", err)
|
||||||
|
}
|
||||||
|
err = n.Start(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("starting a new node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ServiceDiscoverer(ctx, n, fs.p2ptoken, FederatedID, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return fs.proxy(ctx, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
|
||||||
|
|
||||||
|
log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr)
|
||||||
|
// Open local port for listening
|
||||||
|
l, err := net.Listen("tcp", fs.listenAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Error listening")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// ll.Info("Binding local port on", srcaddr)
|
||||||
|
|
||||||
|
ledger, _ := node.Ledger()
|
||||||
|
|
||||||
|
// Announce ourselves so nodes accepts our connection
|
||||||
|
ledger.Announce(
|
||||||
|
ctx,
|
||||||
|
10*time.Second,
|
||||||
|
func() {
|
||||||
|
// Retrieve current ID for ip in the blockchain
|
||||||
|
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
|
||||||
|
// If mismatch, update the blockchain
|
||||||
|
//if !found {
|
||||||
|
updatedMap := map[string]interface{}{}
|
||||||
|
updatedMap[node.Host().ID().String()] = &types.User{
|
||||||
|
PeerID: node.Host().ID().String(),
|
||||||
|
Timestamp: time.Now().String(),
|
||||||
|
}
|
||||||
|
ledger.Add(protocol.UsersLedgerKey, updatedMap)
|
||||||
|
// }
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
defer l.Close()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errors.New("context canceled")
|
||||||
|
default:
|
||||||
|
log.Debug().Msg("New for connection")
|
||||||
|
// Listen for an incoming connection.
|
||||||
|
conn, err := l.Accept()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error accepting: ", err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle connections in a new goroutine, forwarding to the p2p service
|
||||||
|
go handleConn(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleConn(conn net.Conn) {
|
||||||
|
var tunnelAddresses []string
|
||||||
|
for _, v := range GetAvailableNodes(FederatedID) {
|
||||||
|
if v.IsOnline() {
|
||||||
|
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
|
||||||
|
} else {
|
||||||
|
log.Info().Msgf("Node %s is offline", v.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// open a TCP stream to one of the tunnels
|
||||||
|
// chosen randomly
|
||||||
|
// TODO: optimize this and track usage
|
||||||
|
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]
|
||||||
|
|
||||||
|
tunnelConn, err := net.Dial("tcp", tunnelAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Error connecting to tunnel")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
|
||||||
|
closer := make(chan struct{}, 2)
|
||||||
|
go copyStream(closer, tunnelConn, conn)
|
||||||
|
go copyStream(closer, conn, tunnelConn)
|
||||||
|
<-closer
|
||||||
|
|
||||||
|
tunnelConn.Close()
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
|
||||||
|
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
|
||||||
|
io.Copy(dst, src)
|
||||||
|
}
|
@ -7,7 +7,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
@ -137,11 +136,6 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
|
|
||||||
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
|
|
||||||
io.Copy(dst, src)
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is the main of the server (which keeps the env variable updated)
|
// This is the main of the server (which keeps the env variable updated)
|
||||||
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
|
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
|
||||||
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func()) error {
|
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func()) error {
|
||||||
|
@ -14,6 +14,10 @@ func GenerateToken() string {
|
|||||||
return "not implemented"
|
return "not implemented"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *FederatedServer) Start(ctx context.Context) error {
|
||||||
|
return fmt.Errorf("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func()) error {
|
func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func()) error {
|
||||||
return fmt.Errorf("not implemented")
|
return fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user