mirror of
https://github.com/mudler/LocalAI.git
synced 2024-12-18 20:27:57 +00:00
feat(explorer): make possible to run sync in a separate process (#3224)
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
parent
4dfa085339
commit
9729d2ae37
@ -14,6 +14,9 @@ type ExplorerCMD struct {
|
|||||||
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
|
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
|
||||||
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
|
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
|
||||||
ConnectionErrorThreshold int `env:"LOCALAI_CONNECTION_ERROR_THRESHOLD,CONNECTION_ERROR_THRESHOLD" default:"3" help:"Connection failure threshold for the explorer" group:"api"`
|
ConnectionErrorThreshold int `env:"LOCALAI_CONNECTION_ERROR_THRESHOLD,CONNECTION_ERROR_THRESHOLD" default:"3" help:"Connection failure threshold for the explorer" group:"api"`
|
||||||
|
|
||||||
|
WithSync bool `env:"LOCALAI_WITH_SYNC,WITH_SYNC" default:"false" help:"Enable sync with the network" group:"api"`
|
||||||
|
OnlySync bool `env:"LOCALAI_ONLY_SYNC,ONLY_SYNC" default:"false" help:"Only sync with the network" group:"api"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
|
func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
|
||||||
@ -27,10 +30,20 @@ func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
|
|
||||||
|
|
||||||
go ds.Start(context.Background())
|
if e.WithSync {
|
||||||
appHTTP := http.Explorer(db, ds)
|
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
|
||||||
|
go ds.Start(context.Background(), true)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.OnlySync {
|
||||||
|
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
return ds.Start(ctx, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
appHTTP := http.Explorer(db)
|
||||||
|
|
||||||
return appHTTP.Listen(e.Address)
|
return appHTTP.Listen(e.Address)
|
||||||
}
|
}
|
||||||
|
@ -7,58 +7,83 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/gofrs/flock"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description.
|
// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description.
|
||||||
type Database struct {
|
type Database struct {
|
||||||
sync.RWMutex
|
|
||||||
path string
|
path string
|
||||||
data map[string]TokenData
|
data map[string]TokenData
|
||||||
|
flock *flock.Flock
|
||||||
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// TokenData is a p2p network token with a name and description.
|
// TokenData is a p2p network token with a name and description.
|
||||||
type TokenData struct {
|
type TokenData struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Description string `json:"description"`
|
Description string `json:"description"`
|
||||||
|
Clusters []ClusterData
|
||||||
|
Failures int
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClusterData struct {
|
||||||
|
Workers []string
|
||||||
|
Type string
|
||||||
|
NetworkID string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase creates a new Database with the given path.
|
// NewDatabase creates a new Database with the given path.
|
||||||
func NewDatabase(path string) (*Database, error) {
|
func NewDatabase(path string) (*Database, error) {
|
||||||
|
fileLock := flock.New(path + ".lock")
|
||||||
db := &Database{
|
db := &Database{
|
||||||
data: make(map[string]TokenData),
|
data: make(map[string]TokenData),
|
||||||
path: path,
|
path: path,
|
||||||
|
flock: fileLock,
|
||||||
}
|
}
|
||||||
return db, db.load()
|
return db, db.load()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get retrieves a Token from the Database by its token.
|
// Get retrieves a Token from the Database by its token.
|
||||||
func (db *Database) Get(token string) (TokenData, bool) {
|
func (db *Database) Get(token string) (TokenData, bool) {
|
||||||
db.RLock()
|
db.flock.Lock() // we are making sure that the file is not being written to
|
||||||
defer db.RUnlock()
|
defer db.flock.Unlock()
|
||||||
|
db.Lock() // we are making sure that is safe if called by another instance in the same process
|
||||||
|
defer db.Unlock()
|
||||||
|
db.load()
|
||||||
t, ok := db.data[token]
|
t, ok := db.data[token]
|
||||||
return t, ok
|
return t, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a Token in the Database by its token.
|
// Set stores a Token in the Database by its token.
|
||||||
func (db *Database) Set(token string, t TokenData) error {
|
func (db *Database) Set(token string, t TokenData) error {
|
||||||
|
db.flock.Lock()
|
||||||
|
defer db.flock.Unlock()
|
||||||
db.Lock()
|
db.Lock()
|
||||||
|
defer db.Unlock()
|
||||||
|
db.load()
|
||||||
db.data[token] = t
|
db.data[token] = t
|
||||||
db.Unlock()
|
|
||||||
|
|
||||||
return db.Save()
|
return db.save()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete removes a Token from the Database by its token.
|
// Delete removes a Token from the Database by its token.
|
||||||
func (db *Database) Delete(token string) error {
|
func (db *Database) Delete(token string) error {
|
||||||
|
db.flock.Lock()
|
||||||
|
defer db.flock.Unlock()
|
||||||
db.Lock()
|
db.Lock()
|
||||||
|
defer db.Unlock()
|
||||||
|
db.load()
|
||||||
delete(db.data, token)
|
delete(db.data, token)
|
||||||
db.Unlock()
|
return db.save()
|
||||||
return db.Save()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Database) TokenList() []string {
|
func (db *Database) TokenList() []string {
|
||||||
db.RLock()
|
db.flock.Lock()
|
||||||
defer db.RUnlock()
|
defer db.flock.Unlock()
|
||||||
|
db.Lock()
|
||||||
|
defer db.Unlock()
|
||||||
|
db.load()
|
||||||
tokens := []string{}
|
tokens := []string{}
|
||||||
for k := range db.data {
|
for k := range db.data {
|
||||||
tokens = append(tokens, k)
|
tokens = append(tokens, k)
|
||||||
@ -74,9 +99,6 @@ func (db *Database) TokenList() []string {
|
|||||||
|
|
||||||
// load reads the Database from disk.
|
// load reads the Database from disk.
|
||||||
func (db *Database) load() error {
|
func (db *Database) load() error {
|
||||||
db.Lock()
|
|
||||||
defer db.Unlock()
|
|
||||||
|
|
||||||
if _, err := os.Stat(db.path); os.IsNotExist(err) {
|
if _, err := os.Stat(db.path); os.IsNotExist(err) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -91,10 +113,7 @@ func (db *Database) load() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Save writes the Database to disk.
|
// Save writes the Database to disk.
|
||||||
func (db *Database) Save() error {
|
func (db *Database) save() error {
|
||||||
db.RLock()
|
|
||||||
defer db.RUnlock()
|
|
||||||
|
|
||||||
// Marshal db.data into JSON
|
// Marshal db.data into JSON
|
||||||
// Write the JSON to the file
|
// Write the JSON to the file
|
||||||
f, err := os.Create(db.path)
|
f, err := os.Create(db.path)
|
||||||
|
@ -16,22 +16,10 @@ import (
|
|||||||
type DiscoveryServer struct {
|
type DiscoveryServer struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
database *Database
|
database *Database
|
||||||
networkState *NetworkState
|
|
||||||
connectionTime time.Duration
|
connectionTime time.Duration
|
||||||
failures map[string]int
|
|
||||||
errorThreshold int
|
errorThreshold int
|
||||||
}
|
}
|
||||||
|
|
||||||
type NetworkState struct {
|
|
||||||
Networks map[string]Network
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *DiscoveryServer) NetworkState() *NetworkState {
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
return s.networkState
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDiscoveryServer creates a new DiscoveryServer with the given Database.
|
// NewDiscoveryServer creates a new DiscoveryServer with the given Database.
|
||||||
// it keeps the db state in sync with the network state
|
// it keeps the db state in sync with the network state
|
||||||
func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *DiscoveryServer {
|
func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *DiscoveryServer {
|
||||||
@ -44,11 +32,7 @@ func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *
|
|||||||
return &DiscoveryServer{
|
return &DiscoveryServer{
|
||||||
database: db,
|
database: db,
|
||||||
connectionTime: dur,
|
connectionTime: dur,
|
||||||
networkState: &NetworkState{
|
|
||||||
Networks: map[string]Network{},
|
|
||||||
},
|
|
||||||
errorThreshold: failureThreshold,
|
errorThreshold: failureThreshold,
|
||||||
failures: make(map[string]int),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,10 +100,10 @@ func (s *DiscoveryServer) runBackground() {
|
|||||||
|
|
||||||
if hasWorkers {
|
if hasWorkers {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.networkState.Networks[token] = Network{
|
data, _ := s.database.Get(token)
|
||||||
Clusters: ledgerK,
|
(&data).Clusters = ledgerK
|
||||||
}
|
(&data).Failures = 0
|
||||||
delete(s.failures, token)
|
s.database.Set(token, data)
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
} else {
|
} else {
|
||||||
s.failedToken(token)
|
s.failedToken(token)
|
||||||
@ -132,27 +116,23 @@ func (s *DiscoveryServer) runBackground() {
|
|||||||
func (s *DiscoveryServer) failedToken(token string) {
|
func (s *DiscoveryServer) failedToken(token string) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
s.failures[token]++
|
data, _ := s.database.Get(token)
|
||||||
|
(&data).Failures++
|
||||||
|
s.database.Set(token, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DiscoveryServer) deleteFailedConnections() {
|
func (s *DiscoveryServer) deleteFailedConnections() {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
for k, v := range s.failures {
|
for _, t := range s.database.TokenList() {
|
||||||
if v > s.errorThreshold {
|
data, _ := s.database.Get(t)
|
||||||
log.Info().Any("network", k).Msg("Network has been removed from the database")
|
if data.Failures > s.errorThreshold {
|
||||||
s.database.Delete(k)
|
log.Info().Any("token", t).Msg("Token has been removed from the database")
|
||||||
delete(s.failures, k)
|
s.database.Delete(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterData struct {
|
|
||||||
Workers []string
|
|
||||||
Type string
|
|
||||||
NetworkID string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) {
|
func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) {
|
||||||
clusters := map[string]ClusterData{}
|
clusters := map[string]ClusterData{}
|
||||||
|
|
||||||
@ -217,7 +197,7 @@ func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockch
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start the discovery server. This is meant to be run in to a goroutine.
|
// Start the discovery server. This is meant to be run in to a goroutine.
|
||||||
func (s *DiscoveryServer) Start(ctx context.Context) error {
|
func (s *DiscoveryServer) Start(ctx context.Context, keepRunning bool) error {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -225,6 +205,9 @@ func (s *DiscoveryServer) Start(ctx context.Context) error {
|
|||||||
default:
|
default:
|
||||||
// Collect data
|
// Collect data
|
||||||
s.runBackground()
|
s.runBackground()
|
||||||
|
if !keepRunning {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
|
|
||||||
func Dashboard() func(*fiber.Ctx) error {
|
func Dashboard() func(*fiber.Ctx) error {
|
||||||
return func(c *fiber.Ctx) error {
|
return func(c *fiber.Ctx) error {
|
||||||
|
|
||||||
summary := fiber.Map{
|
summary := fiber.Map{
|
||||||
"Title": "LocalAI API - " + internal.PrintableVersion(),
|
"Title": "LocalAI API - " + internal.PrintableVersion(),
|
||||||
"Version": internal.PrintableVersion(),
|
"Version": internal.PrintableVersion(),
|
||||||
@ -34,26 +33,24 @@ type AddNetworkRequest struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Network struct {
|
type Network struct {
|
||||||
explorer.Network
|
|
||||||
explorer.TokenData
|
explorer.TokenData
|
||||||
Token string `json:"token"`
|
Token string `json:"token"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func ShowNetworks(db *explorer.Database, ds *explorer.DiscoveryServer) func(*fiber.Ctx) error {
|
func ShowNetworks(db *explorer.Database) func(*fiber.Ctx) error {
|
||||||
return func(c *fiber.Ctx) error {
|
return func(c *fiber.Ctx) error {
|
||||||
networkState := ds.NetworkState()
|
|
||||||
results := []Network{}
|
results := []Network{}
|
||||||
for token, network := range networkState.Networks {
|
for _, token := range db.TokenList() {
|
||||||
networkData, exists := db.Get(token) // get the token data
|
networkData, exists := db.Get(token) // get the token data
|
||||||
hasWorkers := false
|
hasWorkers := false
|
||||||
for _, cluster := range network.Clusters {
|
for _, cluster := range networkData.Clusters {
|
||||||
if len(cluster.Workers) > 0 {
|
if len(cluster.Workers) > 0 {
|
||||||
hasWorkers = true
|
hasWorkers = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if exists && hasWorkers {
|
if exists && hasWorkers {
|
||||||
results = append(results, Network{Network: network, TokenData: networkData, Token: token})
|
results = append(results, Network{TokenData: networkData, Token: token})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
"github.com/mudler/LocalAI/core/http/routes"
|
"github.com/mudler/LocalAI/core/http/routes"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Explorer(db *explorer.Database, discoveryServer *explorer.DiscoveryServer) *fiber.App {
|
func Explorer(db *explorer.Database) *fiber.App {
|
||||||
|
|
||||||
fiberCfg := fiber.Config{
|
fiberCfg := fiber.Config{
|
||||||
Views: renderEngine(),
|
Views: renderEngine(),
|
||||||
@ -22,7 +22,7 @@ func Explorer(db *explorer.Database, discoveryServer *explorer.DiscoveryServer)
|
|||||||
|
|
||||||
app := fiber.New(fiberCfg)
|
app := fiber.New(fiberCfg)
|
||||||
|
|
||||||
routes.RegisterExplorerRoutes(app, db, discoveryServer)
|
routes.RegisterExplorerRoutes(app, db)
|
||||||
|
|
||||||
httpFS := http.FS(embedDirStatic)
|
httpFS := http.FS(embedDirStatic)
|
||||||
|
|
||||||
|
@ -6,8 +6,8 @@ import (
|
|||||||
"github.com/mudler/LocalAI/core/http/endpoints/explorer"
|
"github.com/mudler/LocalAI/core/http/endpoints/explorer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database, ds *coreExplorer.DiscoveryServer) {
|
func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database) {
|
||||||
app.Get("/", explorer.Dashboard())
|
app.Get("/", explorer.Dashboard())
|
||||||
app.Post("/network/add", explorer.AddNetwork(db))
|
app.Post("/network/add", explorer.AddNetwork(db))
|
||||||
app.Get("/networks", explorer.ShowNetworks(db, ds))
|
app.Get("/networks", explorer.ShowNetworks(db))
|
||||||
}
|
}
|
||||||
|
@ -236,6 +236,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
|
|||||||
if ndService, found := service[nd.Name]; !found {
|
if ndService, found := service[nd.Name]; !found {
|
||||||
if !nd.IsOnline() {
|
if !nd.IsOnline() {
|
||||||
// if node is offline and not present, do nothing
|
// if node is offline and not present, do nothing
|
||||||
|
zlog.Debug().Msgf("Node %s is offline", nd.ID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
newCtxm, cancel := context.WithCancel(ctx)
|
newCtxm, cancel := context.WithCancel(ctx)
|
||||||
|
1
go.mod
1
go.mod
@ -67,6 +67,7 @@ require (
|
|||||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||||
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
|
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
|
||||||
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
|
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
|
||||||
|
github.com/gofrs/flock v0.12.1 // indirect
|
||||||
github.com/labstack/echo/v4 v4.12.0 // indirect
|
github.com/labstack/echo/v4 v4.12.0 // indirect
|
||||||
github.com/labstack/gommon v0.4.2 // indirect
|
github.com/labstack/gommon v0.4.2 // indirect
|
||||||
github.com/moby/docker-image-spec v1.3.1 // indirect
|
github.com/moby/docker-image-spec v1.3.1 // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -204,6 +204,8 @@ github.com/gofiber/template/html/v2 v2.1.2 h1:wkK/mYJ3nIhongTkG3t0QgV4ADdgOYJYVS
|
|||||||
github.com/gofiber/template/html/v2 v2.1.2/go.mod h1:E98Z/FzvpaSib06aWEgYk6GXNf3ctoyaJH8yW5ay5ak=
|
github.com/gofiber/template/html/v2 v2.1.2/go.mod h1:E98Z/FzvpaSib06aWEgYk6GXNf3ctoyaJH8yW5ay5ak=
|
||||||
github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM=
|
github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM=
|
||||||
github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0=
|
github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0=
|
||||||
|
github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E=
|
||||||
|
github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0=
|
||||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||||
|
Loading…
Reference in New Issue
Block a user