From 3335738e3413be1679ad9ce6095aa42d849a23be Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 18 Jul 2024 18:19:56 +0200 Subject: [PATCH] refactor: move federated server logic to its own service Signed-off-by: Ettore Di Giacinto --- core/cli/federated.go | 115 +------------------------------- core/p2p/federatedServer.go | 13 ++++ core/p2p/federation.go | 127 ++++++++++++++++++++++++++++++++++++ core/p2p/p2p.go | 6 -- core/p2p/p2p_disabled.go | 4 ++ 5 files changed, 146 insertions(+), 119 deletions(-) create mode 100644 core/p2p/federatedServer.go create mode 100644 core/p2p/federation.go diff --git a/core/cli/federated.go b/core/cli/federated.go index b99ef4f8..83aba0f5 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -2,20 +2,9 @@ package cli import ( "context" - "errors" - "fmt" - "io" - "net" - "time" - - "math/rand/v2" cliContext "github.com/mudler/LocalAI/core/cli/context" "github.com/mudler/LocalAI/core/p2p" - "github.com/mudler/edgevpn/pkg/node" - "github.com/mudler/edgevpn/pkg/protocol" - "github.com/mudler/edgevpn/pkg/types" - "github.com/rs/zerolog/log" ) type FederatedCLI struct { @@ -24,107 +13,7 @@ type FederatedCLI struct { } func (f *FederatedCLI) Run(ctx *cliContext.Context) error { + fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken) - n, err := p2p.NewNode(f.Peer2PeerToken) - if err != nil { - return fmt.Errorf("creating a new node: %w", err) - } - err = n.Start(context.Background()) - if err != nil { - return fmt.Errorf("creating a new node: %w", err) - } - - if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil { - return err - } - - return Proxy(context.Background(), n, f.Address, p2p.FederatedID) -} - -func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error { - - log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) - // Open local port for listening - l, err := net.Listen("tcp", listenAddr) - if err != nil { - log.Error().Err(err).Msg("Error listening") - return err - } - // ll.Info("Binding local port on", srcaddr) - - ledger, _ := node.Ledger() - - // Announce ourselves so nodes accepts our connection - ledger.Announce( - ctx, - 10*time.Second, - func() { - // Retrieve current ID for ip in the blockchain - //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) - // If mismatch, update the blockchain - //if !found { - updatedMap := map[string]interface{}{} - updatedMap[node.Host().ID().String()] = &types.User{ - PeerID: node.Host().ID().String(), - Timestamp: time.Now().String(), - } - ledger.Add(protocol.UsersLedgerKey, updatedMap) - // } - }, - ) - - defer l.Close() - for { - select { - case <-ctx.Done(): - return errors.New("context canceled") - default: - log.Debug().Msg("New for connection") - // Listen for an incoming connection. - conn, err := l.Accept() - if err != nil { - fmt.Println("Error accepting: ", err.Error()) - continue - } - - // Handle connections in a new goroutine, forwarding to the p2p service - go func() { - var tunnelAddresses []string - for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) { - if v.IsOnline() { - tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) - } else { - log.Info().Msgf("Node %s is offline", v.ID) - } - } - - // open a TCP stream to one of the tunnels - // chosen randomly - // TODO: optimize this and track usage - tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] - - tunnelConn, err := net.Dial("tcp", tunnelAddr) - if err != nil { - log.Error().Err(err).Msg("Error connecting to tunnel") - return - } - - log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) - closer := make(chan struct{}, 2) - go copyStream(closer, tunnelConn, conn) - go copyStream(closer, conn, tunnelConn) - <-closer - - tunnelConn.Close() - conn.Close() - // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) - }() - } - } - -} - -func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { - defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy - io.Copy(dst, src) + return fs.Start(context.Background()) } diff --git a/core/p2p/federatedServer.go b/core/p2p/federatedServer.go new file mode 100644 index 00000000..444a9109 --- /dev/null +++ b/core/p2p/federatedServer.go @@ -0,0 +1,13 @@ +package p2p + +type FederatedServer struct { + listenAddr, service, p2ptoken string +} + +func NewFederatedServer(listenAddr, service, p2pToken string) *FederatedServer { + return &FederatedServer{ + listenAddr: listenAddr, + service: service, + p2ptoken: p2pToken, + } +} diff --git a/core/p2p/federation.go b/core/p2p/federation.go new file mode 100644 index 00000000..e75e6c15 --- /dev/null +++ b/core/p2p/federation.go @@ -0,0 +1,127 @@ +//go:build p2p +// +build p2p + +package p2p + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "time" + + "github.com/rs/zerolog/log" + + "math/rand/v2" + + "github.com/mudler/edgevpn/pkg/node" + "github.com/mudler/edgevpn/pkg/protocol" + "github.com/mudler/edgevpn/pkg/types" +) + +func (fs *FederatedServer) Start(ctx context.Context) error { + n, err := NewNode(fs.p2ptoken) + if err != nil { + return fmt.Errorf("creating a new node: %w", err) + } + err = n.Start(ctx) + if err != nil { + return fmt.Errorf("starting a new node: %w", err) + } + + if err := ServiceDiscoverer(ctx, n, fs.p2ptoken, FederatedID, nil); err != nil { + return err + } + + return fs.proxy(ctx, n) +} + +func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { + + log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr) + // Open local port for listening + l, err := net.Listen("tcp", fs.listenAddr) + if err != nil { + log.Error().Err(err).Msg("Error listening") + return err + } + // ll.Info("Binding local port on", srcaddr) + + ledger, _ := node.Ledger() + + // Announce ourselves so nodes accepts our connection + ledger.Announce( + ctx, + 10*time.Second, + func() { + // Retrieve current ID for ip in the blockchain + //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) + // If mismatch, update the blockchain + //if !found { + updatedMap := map[string]interface{}{} + updatedMap[node.Host().ID().String()] = &types.User{ + PeerID: node.Host().ID().String(), + Timestamp: time.Now().String(), + } + ledger.Add(protocol.UsersLedgerKey, updatedMap) + // } + }, + ) + + defer l.Close() + for { + select { + case <-ctx.Done(): + return errors.New("context canceled") + default: + log.Debug().Msg("New for connection") + // Listen for an incoming connection. + conn, err := l.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + continue + } + + // Handle connections in a new goroutine, forwarding to the p2p service + go handleConn(conn) + } + } + +} + +func handleConn(conn net.Conn) { + var tunnelAddresses []string + for _, v := range GetAvailableNodes(FederatedID) { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } else { + log.Info().Msgf("Node %s is offline", v.ID) + } + } + + // open a TCP stream to one of the tunnels + // chosen randomly + // TODO: optimize this and track usage + tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] + + tunnelConn, err := net.Dial("tcp", tunnelAddr) + if err != nil { + log.Error().Err(err).Msg("Error connecting to tunnel") + return + } + + log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) + closer := make(chan struct{}, 2) + go copyStream(closer, tunnelConn, conn) + go copyStream(closer, conn, tunnelConn) + <-closer + + tunnelConn.Close() + conn.Close() +} + +func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { + defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy + io.Copy(dst, src) +} diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index e0e46170..e65c5025 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "io" "net" "os" "sync" @@ -137,11 +136,6 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv } -func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { - defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy - io.Copy(dst, src) -} - // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func()) error { diff --git a/core/p2p/p2p_disabled.go b/core/p2p/p2p_disabled.go index 340a4fb4..604fbdde 100644 --- a/core/p2p/p2p_disabled.go +++ b/core/p2p/p2p_disabled.go @@ -14,6 +14,10 @@ func GenerateToken() string { return "not implemented" } +func (fs *FederatedServer) Start(ctx context.Context) error { + return fmt.Errorf("not implemented") +} + func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func()) error { return fmt.Errorf("not implemented") }