Merge branch 'master' into default_miro

This commit is contained in:
Ettore Di Giacinto 2024-07-18 23:21:35 +02:00 committed by GitHub
commit 7c27c91b2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 225 additions and 349 deletions

View File

@ -8,9 +8,10 @@ jobs:
MODEL_NAME: hermes-2-theta-llama-3-8b
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0 # needed to checkout all branches for this Action to work
ref: "${{ github.event.pull_request.merge_commit_sha }}"
- uses: mudler/localai-github-action@v1
with:
model: 'hermes-2-theta-llama-3-8b' # Any from models.localai.io, or from huggingface.com with: "huggingface://<repository>/file"
@ -21,6 +22,7 @@ jobs:
json_diff_file_output: diff.json
raw_diff_file_output: diff.txt
file_output_only: "true"
base_branch: ${{ github.event.pull_request.base.sha }}
- name: Show diff
env:
DIFF: ${{ steps.git-diff-action.outputs.raw-diff-path }}

View File

@ -8,7 +8,7 @@ DETECT_LIBS?=true
# llama.cpp versions
GOLLAMA_REPO?=https://github.com/go-skynet/go-llama.cpp
GOLLAMA_VERSION?=2b57a8ae43e4699d3dc5d1496a1ccd42922993be
CPPLLAMA_VERSION?=b3283448ce9a5098226afe1d8648ccc578511fe4
CPPLLAMA_VERSION?=705b7ecf60e667ced57c15d67aa86865e3cc7aa7
# gpt4all version
GPT4ALL_REPO?=https://github.com/nomic-ai/gpt4all

View File

@ -2,135 +2,20 @@ package cli
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
"math/rand/v2"
cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)
type FederatedCLI struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
}
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
n, err := p2p.NewNode(f.Peer2PeerToken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(context.Background())
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken, f.LoadBalanced)
if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, func(servicesID string, tunnel p2p.NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}); err != nil {
return err
}
return Proxy(context.Background(), n, f.Address, p2p.FederatedID)
}
func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error {
log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Error().Err(err).Msg("Error listening")
return err
}
// ll.Info("Binding local port on", srcaddr)
ledger, _ := node.Ledger()
// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)
defer l.Close()
for {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
continue
}
// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}
if len(tunnelAddresses) == 0 {
log.Error().Msg("No available nodes yet")
return
}
// open a TCP stream to one of the tunnels
// chosen randomly
// TODO: optimize this and track usage
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]
tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
return
}
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer
tunnelConn.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
}()
}
}
}
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)
return fs.Start(context.Background())
}

47
core/p2p/federated.go Normal file
View File

@ -0,0 +1,47 @@
package p2p
const FederatedID = "federated"
type FederatedServer struct {
listenAddr, service, p2ptoken string
requestTable map[string]int
loadBalanced bool
}
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool) *FederatedServer {
return &FederatedServer{
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
requestTable: map[string]int{},
loadBalanced: loadBalanced,
}
}
func (fs *FederatedServer) SelectLeastUsedServer() string {
// cycle over requestTable and find the entry with the lower number
// if there are multiple entries with the same number, select one randomly
// if there are no entries, return an empty string
var min int
var minKey string
for k, v := range fs.requestTable {
if min == 0 || v < min {
min = v
minKey = k
}
}
return minKey
}
func (fs *FederatedServer) RecordRequest(nodeID string) {
// increment the counter for the nodeID in the requestTable
fs.requestTable[nodeID]++
}
func (fs *FederatedServer) EnsureRecordExist(nodeID string) {
// if the nodeID is not in the requestTable, add it with a counter of 0
_, ok := fs.requestTable[nodeID]
if !ok {
fs.requestTable[nodeID] = 0
}
}

View File

@ -0,0 +1,140 @@
//go:build p2p
// +build p2p
package p2p
import (
"context"
"errors"
"fmt"
"net"
"time"
"math/rand/v2"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)
func (f *FederatedServer) Start(ctx context.Context) error {
n, err := NewNode(f.p2ptoken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(ctx)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}); err != nil {
return err
}
return f.proxy(ctx, n)
}
func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", fs.listenAddr)
if err != nil {
log.Error().Err(err).Msg("Error listening")
return err
}
// ll.Info("Binding local port on", srcaddr)
ledger, _ := node.Ledger()
// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)
defer l.Close()
for {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
continue
}
// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}
if len(tunnelAddresses) == 0 {
log.Error().Msg("No available nodes yet")
return
}
tunnelAddr := ""
if fs.loadBalanced {
for _, t := range tunnelAddresses {
fs.EnsureRecordExist(t)
}
tunnelAddr = fs.SelectLeastUsedServer()
log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
if tunnelAddr == "" {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
}
fs.RecordRequest(tunnelAddr)
} else {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
}
tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
return
}
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer
tunnelConn.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
}()
}
}
}

View File

@ -6,7 +6,6 @@ import (
)
const defaultServicesID = "services_localai"
const FederatedID = "federated"
type NodeData struct {
Name string

View File

@ -137,11 +137,6 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
}
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)
}
// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error {
@ -396,3 +391,8 @@ func newNodeOpts(token string) ([]node.Option, error) {
return nodeOpts, nil
}
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)
}

View File

@ -14,6 +14,10 @@ func GenerateToken() string {
return "not implemented"
}
func (f *FederatedServer) Start(ctx context.Context) error {
return fmt.Errorf("not implemented")
}
func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error {
return fmt.Errorf("not implemented")
}

View File

@ -700,18 +700,6 @@ const docTemplate = `{
}
}
},
"functions.Argument": {
"type": "object",
"properties": {
"properties": {
"type": "object",
"additionalProperties": true
},
"type": {
"type": "string"
}
}
},
"functions.Function": {
"type": "object",
"properties": {
@ -727,48 +715,19 @@ const docTemplate = `{
}
}
},
"functions.FunctionName": {
"type": "object",
"properties": {
"const": {
"type": "string"
}
}
},
"functions.FunctionProperties": {
"type": "object",
"properties": {
"arguments": {
"$ref": "#/definitions/functions.Argument"
},
"function": {
"$ref": "#/definitions/functions.FunctionName"
}
}
},
"functions.ItemFunction": {
"functions.Item": {
"type": "object",
"properties": {
"properties": {
"$ref": "#/definitions/functions.FunctionProperties"
"type": "object",
"additionalProperties": true
},
"type": {
"type": "string"
}
}
},
"functions.ItemName": {
"type": "object",
"properties": {
"properties": {
"$ref": "#/definitions/functions.NameProperties"
},
"type": {
"type": "string"
}
}
},
"functions.JSONFunctionStructureFunction": {
"functions.JSONFunctionStructure": {
"type": "object",
"properties": {
"$defs": {
@ -778,49 +737,17 @@ const docTemplate = `{
"anyOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemFunction"
"$ref": "#/definitions/functions.Item"
}
},
"oneOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemFunction"
"$ref": "#/definitions/functions.Item"
}
}
}
},
"functions.JSONFunctionStructureName": {
"type": "object",
"properties": {
"$defs": {
"type": "object",
"additionalProperties": true
},
"anyOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemName"
}
},
"oneOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemName"
}
}
}
},
"functions.NameProperties": {
"type": "object",
"properties": {
"arguments": {
"$ref": "#/definitions/functions.Argument"
},
"name": {
"$ref": "#/definitions/functions.FunctionName"
}
}
},
"functions.Tool": {
"type": "object",
"properties": {
@ -1488,10 +1415,7 @@ const docTemplate = `{
"type": "string"
},
"grammar_json_functions": {
"$ref": "#/definitions/functions.JSONFunctionStructureFunction"
},
"grammar_json_name": {
"$ref": "#/definitions/functions.JSONFunctionStructureName"
"$ref": "#/definitions/functions.JSONFunctionStructure"
},
"ignore_eos": {
"type": "boolean"

View File

@ -693,18 +693,6 @@
}
}
},
"functions.Argument": {
"type": "object",
"properties": {
"properties": {
"type": "object",
"additionalProperties": true
},
"type": {
"type": "string"
}
}
},
"functions.Function": {
"type": "object",
"properties": {
@ -720,48 +708,19 @@
}
}
},
"functions.FunctionName": {
"type": "object",
"properties": {
"const": {
"type": "string"
}
}
},
"functions.FunctionProperties": {
"type": "object",
"properties": {
"arguments": {
"$ref": "#/definitions/functions.Argument"
},
"function": {
"$ref": "#/definitions/functions.FunctionName"
}
}
},
"functions.ItemFunction": {
"functions.Item": {
"type": "object",
"properties": {
"properties": {
"$ref": "#/definitions/functions.FunctionProperties"
"type": "object",
"additionalProperties": true
},
"type": {
"type": "string"
}
}
},
"functions.ItemName": {
"type": "object",
"properties": {
"properties": {
"$ref": "#/definitions/functions.NameProperties"
},
"type": {
"type": "string"
}
}
},
"functions.JSONFunctionStructureFunction": {
"functions.JSONFunctionStructure": {
"type": "object",
"properties": {
"$defs": {
@ -771,49 +730,17 @@
"anyOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemFunction"
"$ref": "#/definitions/functions.Item"
}
},
"oneOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemFunction"
"$ref": "#/definitions/functions.Item"
}
}
}
},
"functions.JSONFunctionStructureName": {
"type": "object",
"properties": {
"$defs": {
"type": "object",
"additionalProperties": true
},
"anyOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemName"
}
},
"oneOf": {
"type": "array",
"items": {
"$ref": "#/definitions/functions.ItemName"
}
}
}
},
"functions.NameProperties": {
"type": "object",
"properties": {
"arguments": {
"$ref": "#/definitions/functions.Argument"
},
"name": {
"$ref": "#/definitions/functions.FunctionName"
}
}
},
"functions.Tool": {
"type": "object",
"properties": {
@ -1481,10 +1408,7 @@
"type": "string"
},
"grammar_json_functions": {
"$ref": "#/definitions/functions.JSONFunctionStructureFunction"
},
"grammar_json_name": {
"$ref": "#/definitions/functions.JSONFunctionStructureName"
"$ref": "#/definitions/functions.JSONFunctionStructure"
},
"ignore_eos": {
"type": "boolean"

View File

@ -7,14 +7,6 @@ definitions:
url:
type: string
type: object
functions.Argument:
properties:
properties:
additionalProperties: true
type: object
type:
type: string
type: object
functions.Function:
properties:
description:
@ -25,67 +17,28 @@ definitions:
additionalProperties: true
type: object
type: object
functions.FunctionName:
properties:
const:
type: string
type: object
functions.FunctionProperties:
properties:
arguments:
$ref: '#/definitions/functions.Argument'
function:
$ref: '#/definitions/functions.FunctionName'
type: object
functions.ItemFunction:
functions.Item:
properties:
properties:
$ref: '#/definitions/functions.FunctionProperties'
additionalProperties: true
type: object
type:
type: string
type: object
functions.ItemName:
properties:
properties:
$ref: '#/definitions/functions.NameProperties'
type:
type: string
type: object
functions.JSONFunctionStructureFunction:
functions.JSONFunctionStructure:
properties:
$defs:
additionalProperties: true
type: object
anyOf:
items:
$ref: '#/definitions/functions.ItemFunction'
$ref: '#/definitions/functions.Item'
type: array
oneOf:
items:
$ref: '#/definitions/functions.ItemFunction'
$ref: '#/definitions/functions.Item'
type: array
type: object
functions.JSONFunctionStructureName:
properties:
$defs:
additionalProperties: true
type: object
anyOf:
items:
$ref: '#/definitions/functions.ItemName'
type: array
oneOf:
items:
$ref: '#/definitions/functions.ItemName'
type: array
type: object
functions.NameProperties:
properties:
arguments:
$ref: '#/definitions/functions.Argument'
name:
$ref: '#/definitions/functions.FunctionName'
type: object
functions.Tool:
properties:
function:
@ -538,9 +491,7 @@ definitions:
description: A grammar to constrain the LLM output
type: string
grammar_json_functions:
$ref: '#/definitions/functions.JSONFunctionStructureFunction'
grammar_json_name:
$ref: '#/definitions/functions.JSONFunctionStructureName'
$ref: '#/definitions/functions.JSONFunctionStructure'
ignore_eos:
type: boolean
input: {}