diff --git a/core/cli/federated.go b/core/cli/federated.go index 271babca..b917812c 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -10,13 +10,14 @@ import ( type FederatedCLI struct { Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` - LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"` + RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"` Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"` + TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"` } func (f *FederatedCLI) Run(ctx *cliContext.Context) error { - fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced) + fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker) return fs.Start(context.Background()) } diff --git a/core/p2p/federated.go b/core/p2p/federated.go index 3ac3ff91..8e468ef6 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -1,6 +1,12 @@ package p2p -import "fmt" +import ( + "fmt" + "math/rand/v2" + "sync" + + "github.com/rs/zerolog/log" +) const FederatedID = "federated" @@ -12,22 +18,70 @@ func NetworkID(networkID, serviceID string) string { } type FederatedServer struct { + sync.Mutex listenAddr, service, p2ptoken string requestTable map[string]int loadBalanced bool + workerTarget string } -func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool) *FederatedServer { +func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer { return &FederatedServer{ listenAddr: listenAddr, service: service, p2ptoken: p2pToken, requestTable: map[string]int{}, loadBalanced: loadBalanced, + workerTarget: workerTarget, + } +} + +func (fs *FederatedServer) RandomServer() string { + var tunnelAddresses []string + for _, v := range GetAvailableNodes(fs.service) { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } else { + delete(fs.requestTable, v.TunnelAddress) // make sure it's not tracked + log.Info().Msgf("Node %s is offline", v.ID) + } + } + + if len(tunnelAddresses) == 0 { + return "" + } + + return tunnelAddresses[rand.IntN(len(tunnelAddresses))] +} + +func (fs *FederatedServer) syncTableStatus() { + fs.Lock() + defer fs.Unlock() + currentTunnels := make(map[string]struct{}) + + for _, v := range GetAvailableNodes(fs.service) { + if v.IsOnline() { + fs.ensureRecordExist(v.TunnelAddress) + currentTunnels[v.TunnelAddress] = struct{}{} + } + } + + // delete tunnels that don't exist anymore + for t := range fs.requestTable { + if _, ok := currentTunnels[t]; !ok { + delete(fs.requestTable, t) + } } } func (fs *FederatedServer) SelectLeastUsedServer() string { + fs.syncTableStatus() + + fs.Lock() + defer fs.Unlock() + + log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table") + // cycle over requestTable and find the entry with the lower number // if there are multiple entries with the same number, select one randomly // if there are no entries, return an empty string @@ -39,18 +93,26 @@ func (fs *FederatedServer) SelectLeastUsedServer() string { minKey = k } } + log.Debug().Any("requests_served", min).Msgf("Selected tunnel %s", minKey) + return minKey } func (fs *FederatedServer) RecordRequest(nodeID string) { + fs.Lock() + defer fs.Unlock() // increment the counter for the nodeID in the requestTable fs.requestTable[nodeID]++ + + log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table") } -func (fs *FederatedServer) EnsureRecordExist(nodeID string) { +func (fs *FederatedServer) ensureRecordExist(nodeID string) { // if the nodeID is not in the requestTable, add it with a counter of 0 _, ok := fs.requestTable[nodeID] if !ok { fs.requestTable[nodeID] = 0 } + + log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table") } diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index 75da97ec..6d7ccd46 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -10,8 +10,6 @@ import ( "net" "time" - "math/rand/v2" - "github.com/mudler/edgevpn/pkg/node" "github.com/mudler/edgevpn/pkg/protocol" "github.com/mudler/edgevpn/pkg/types" @@ -76,7 +74,7 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { case <-ctx.Done(): return errors.New("context canceled") default: - log.Debug().Msg("New for connection") + log.Debug().Msgf("New connection from %s", l.Addr().String()) // Listen for an incoming connection. conn, err := l.Accept() if err != nil { @@ -86,37 +84,33 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { // Handle connections in a new goroutine, forwarding to the p2p service go func() { - var tunnelAddresses []string - for _, v := range GetAvailableNodes(fs.service) { - if v.IsOnline() { - tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) - } else { - log.Info().Msgf("Node %s is offline", v.ID) + tunnelAddr := "" + + if fs.workerTarget != "" { + for _, v := range GetAvailableNodes(fs.service) { + if v.ID == fs.workerTarget { + tunnelAddr = v.TunnelAddress + break + } } + } else if fs.loadBalanced { + log.Debug().Msgf("Load balancing request") + + tunnelAddr = fs.SelectLeastUsedServer() + if tunnelAddr == "" { + tunnelAddr = fs.RandomServer() + } + + } else { + tunnelAddr = fs.RandomServer() } - if len(tunnelAddresses) == 0 { + if tunnelAddr == "" { log.Error().Msg("No available nodes yet") return } - tunnelAddr := "" - - if fs.loadBalanced { - for _, t := range tunnelAddresses { - fs.EnsureRecordExist(t) - } - - tunnelAddr = fs.SelectLeastUsedServer() - log.Debug().Msgf("Selected tunnel %s", tunnelAddr) - if tunnelAddr == "" { - tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))] - } - - fs.RecordRequest(tunnelAddr) - } else { - tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))] - } + log.Debug().Msgf("Selected tunnel %s", tunnelAddr) tunnelConn, err := net.Dial("tcp", tunnelAddr) if err != nil { @@ -132,7 +126,10 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { tunnelConn.Close() conn.Close() - // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) + + if fs.loadBalanced { + fs.RecordRequest(tunnelAddr) + } }() } } diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index bfa12287..af2106be 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -181,7 +181,6 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin if err != nil { return nil, fmt.Errorf("creating a new node: %w", err) } - // get new services, allocate and return to the channel // TODO: @@ -201,6 +200,9 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin zlog.Debug().Msg("Searching for workers") data := ledger.LastBlock().Storage[servicesID] + + zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data") + for k, v := range data { zlog.Info().Msgf("Found worker %s", k) nd := &NodeData{}