feat(explorer): relax token deletion with error threshold (#3211)

feat(explorer): relax token deletion with error threashold

Signed-off-by: Ettore Di Giacinto <mudler@users.noreply.github.com>
This commit is contained in:
Ettore Di Giacinto 2024-08-10 20:50:57 +02:00 committed by GitHub
parent 0c0bc18c94
commit 8627bc2dd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 13 deletions

View File

@ -13,6 +13,7 @@ type ExplorerCMD struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"` PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"` ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
ConnectionErrorThreshold int `env:"LOCALAI_CONNECTION_ERROR_THRESHOLD,CONNECTION_ERROR_THRESHOLD" default:"3" help:"Connection failure threshold for the explorer" group:"api"`
} }
func (e *ExplorerCMD) Run(ctx *cliContext.Context) error { func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
@ -26,7 +27,7 @@ func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
if err != nil { if err != nil {
return err return err
} }
ds := explorer.NewDiscoveryServer(db, dur) ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
go ds.Start(context.Background()) go ds.Start(context.Background())
appHTTP := http.Explorer(db, ds) appHTTP := http.Explorer(db, ds)

View File

@ -18,6 +18,8 @@ type DiscoveryServer struct {
database *Database database *Database
networkState *NetworkState networkState *NetworkState
connectionTime time.Duration connectionTime time.Duration
failures map[string]int
errorThreshold int
} }
type NetworkState struct { type NetworkState struct {
@ -32,16 +34,20 @@ func (s *DiscoveryServer) NetworkState() *NetworkState {
// NewDiscoveryServer creates a new DiscoveryServer with the given Database. // NewDiscoveryServer creates a new DiscoveryServer with the given Database.
// it keeps the db state in sync with the network state // it keeps the db state in sync with the network state
func NewDiscoveryServer(db *Database, dur time.Duration) *DiscoveryServer { func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *DiscoveryServer {
if dur == 0 { if dur == 0 {
dur = 50 * time.Second dur = 50 * time.Second
} }
if failureThreshold == 0 {
failureThreshold = 3
}
return &DiscoveryServer{ return &DiscoveryServer{
database: db, database: db,
connectionTime: dur, connectionTime: dur,
networkState: &NetworkState{ networkState: &NetworkState{
Networks: map[string]Network{}, Networks: map[string]Network{},
}, },
errorThreshold: failureThreshold,
} }
} }
@ -66,21 +72,21 @@ func (s *DiscoveryServer) runBackground() {
n, err := p2p.NewNode(token) n, err := p2p.NewNode(token)
if err != nil { if err != nil {
log.Err(err).Msg("Failed to create node") log.Err(err).Msg("Failed to create node")
s.database.Delete(token) s.failedToken(token)
continue continue
} }
err = n.Start(c) err = n.Start(c)
if err != nil { if err != nil {
log.Err(err).Msg("Failed to start node") log.Err(err).Msg("Failed to start node")
s.database.Delete(token) s.failedToken(token)
continue continue
} }
ledger, err := n.Ledger() ledger, err := n.Ledger()
if err != nil { if err != nil {
log.Err(err).Msg("Failed to start ledger") log.Err(err).Msg("Failed to start ledger")
s.database.Delete(token) s.failedToken(token)
continue continue
} }
@ -114,8 +120,27 @@ func (s *DiscoveryServer) runBackground() {
} }
s.Unlock() s.Unlock()
} else { } else {
log.Info().Any("network", token).Msg("No workers found in the network. Removing it from the database") s.failedToken(token)
s.database.Delete(token) }
}
s.deleteFailedConnections()
}
func (s *DiscoveryServer) failedToken(token string) {
s.Lock()
defer s.Unlock()
s.failures[token]++
}
func (s *DiscoveryServer) deleteFailedConnections() {
s.Lock()
defer s.Unlock()
for k, v := range s.failures {
if v > s.errorThreshold {
log.Info().Any("network", k).Msg("Network has been removed from the database")
s.database.Delete(k)
delete(s.failures, k)
} }
} }
} }