mirror of
https://github.com/mudler/LocalAI.git
synced 2025-05-04 17:52:53 +00:00
chore: allow to disable gallery endpoints, improve p2p connection handling (#3256)
* Add more debug messages Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat: allow to disable gallery endpoints Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * improve p2p messaging Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * improve error handling Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Make sure to close the listening socket when context is exhausted Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
parent
d6b3fbb4ad
commit
7278bf3de8
@ -64,6 +64,7 @@ type RunCMD struct {
|
|||||||
EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"`
|
EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"`
|
||||||
WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"`
|
WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"`
|
||||||
Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"`
|
Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"`
|
||||||
|
DisableGalleryEndpoint bool `env:"LOCALAI_DISABLE_GALLERY_ENDPOINT,DISABLE_GALLERY_ENDPOINT" help:"Disable the gallery endpoints" group:"api"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
||||||
@ -164,6 +165,10 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
|||||||
opts = append(opts, config.DisableWebUI)
|
opts = append(opts, config.DisableWebUI)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r.DisableGalleryEndpoint {
|
||||||
|
opts = append(opts, config.DisableGalleryEndpoint)
|
||||||
|
}
|
||||||
|
|
||||||
if idleWatchDog || busyWatchDog {
|
if idleWatchDog || busyWatchDog {
|
||||||
opts = append(opts, config.EnableWatchDog)
|
opts = append(opts, config.EnableWatchDog)
|
||||||
if idleWatchDog {
|
if idleWatchDog {
|
||||||
|
@ -57,6 +57,8 @@ type ApplicationConfig struct {
|
|||||||
ModelsURL []string
|
ModelsURL []string
|
||||||
|
|
||||||
WatchDogBusyTimeout, WatchDogIdleTimeout time.Duration
|
WatchDogBusyTimeout, WatchDogIdleTimeout time.Duration
|
||||||
|
|
||||||
|
DisableGalleryEndpoint bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type AppOption func(*ApplicationConfig)
|
type AppOption func(*ApplicationConfig)
|
||||||
@ -131,6 +133,10 @@ var EnableWatchDogIdleCheck = func(o *ApplicationConfig) {
|
|||||||
o.WatchDogIdle = true
|
o.WatchDogIdle = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var DisableGalleryEndpoint = func(o *ApplicationConfig) {
|
||||||
|
o.DisableGalleryEndpoint = true
|
||||||
|
}
|
||||||
|
|
||||||
var EnableWatchDogBusyCheck = func(o *ApplicationConfig) {
|
var EnableWatchDogBusyCheck = func(o *ApplicationConfig) {
|
||||||
o.WatchDog = true
|
o.WatchDog = true
|
||||||
o.WatchDogBusy = true
|
o.WatchDogBusy = true
|
||||||
|
@ -21,17 +21,18 @@ func RegisterLocalAIRoutes(app *fiber.App,
|
|||||||
app.Get("/swagger/*", swagger.HandlerDefault) // default
|
app.Get("/swagger/*", swagger.HandlerDefault) // default
|
||||||
|
|
||||||
// LocalAI API endpoints
|
// LocalAI API endpoints
|
||||||
|
if !appConfig.DisableGalleryEndpoint {
|
||||||
|
modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService)
|
||||||
|
app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint())
|
||||||
|
app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint())
|
||||||
|
|
||||||
modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService)
|
app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint())
|
||||||
app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint())
|
app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint())
|
||||||
app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint())
|
app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint())
|
||||||
|
app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint())
|
||||||
app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint())
|
app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint())
|
||||||
app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint())
|
app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint())
|
||||||
app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint())
|
}
|
||||||
app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint())
|
|
||||||
app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint())
|
|
||||||
app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint())
|
|
||||||
|
|
||||||
app.Post("/tts", auth, localai.TTSEndpoint(cl, ml, appConfig))
|
app.Post("/tts", auth, localai.TTSEndpoint(cl, ml, appConfig))
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ func (fs *FederatedServer) SelectLeastUsedServer() string {
|
|||||||
fs.Lock()
|
fs.Lock()
|
||||||
defer fs.Unlock()
|
defer fs.Unlock()
|
||||||
|
|
||||||
log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
|
log.Debug().Any("request_table", fs.requestTable).Msgf("SelectLeastUsedServer()")
|
||||||
|
|
||||||
// cycle over requestTable and find the entry with the lower number
|
// cycle over requestTable and find the entry with the lower number
|
||||||
// if there are multiple entries with the same number, select one randomly
|
// if there are multiple entries with the same number, select one randomly
|
||||||
@ -93,7 +93,7 @@ func (fs *FederatedServer) SelectLeastUsedServer() string {
|
|||||||
minKey = k
|
minKey = k
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Debug().Any("requests_served", min).Msgf("Selected tunnel %s", minKey)
|
log.Debug().Any("requests_served", min).Any("request_table", fs.requestTable).Msgf("Selected tunnel %s", minKey)
|
||||||
|
|
||||||
return minKey
|
return minKey
|
||||||
}
|
}
|
||||||
@ -104,7 +104,7 @@ func (fs *FederatedServer) RecordRequest(nodeID string) {
|
|||||||
// increment the counter for the nodeID in the requestTable
|
// increment the counter for the nodeID in the requestTable
|
||||||
fs.requestTable[nodeID]++
|
fs.requestTable[nodeID]++
|
||||||
|
|
||||||
log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
|
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Recording request")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FederatedServer) ensureRecordExist(nodeID string) {
|
func (fs *FederatedServer) ensureRecordExist(nodeID string) {
|
||||||
@ -114,5 +114,5 @@ func (fs *FederatedServer) ensureRecordExist(nodeID string) {
|
|||||||
fs.requestTable[nodeID] = 0
|
fs.requestTable[nodeID] = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
|
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Ensure record exists")
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,10 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// ll.Info("Binding local port on", srcaddr)
|
// ll.Info("Binding local port on", srcaddr)
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
l.Close()
|
||||||
|
}()
|
||||||
ledger, _ := node.Ledger()
|
ledger, _ := node.Ledger()
|
||||||
|
|
||||||
// Announce ourselves so nodes accepts our connection
|
// Announce ourselves so nodes accepts our connection
|
||||||
@ -54,17 +57,12 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
|
|||||||
ctx,
|
ctx,
|
||||||
10*time.Second,
|
10*time.Second,
|
||||||
func() {
|
func() {
|
||||||
// Retrieve current ID for ip in the blockchain
|
|
||||||
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
|
|
||||||
// If mismatch, update the blockchain
|
|
||||||
//if !found {
|
|
||||||
updatedMap := map[string]interface{}{}
|
updatedMap := map[string]interface{}{}
|
||||||
updatedMap[node.Host().ID().String()] = &types.User{
|
updatedMap[node.Host().ID().String()] = &types.User{
|
||||||
PeerID: node.Host().ID().String(),
|
PeerID: node.Host().ID().String(),
|
||||||
Timestamp: time.Now().String(),
|
Timestamp: time.Now().String(),
|
||||||
}
|
}
|
||||||
ledger.Add(protocol.UsersLedgerKey, updatedMap)
|
ledger.Add(protocol.UsersLedgerKey, updatedMap)
|
||||||
// }
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,6 +51,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
|
|||||||
zlog.Error().Err(err).Msg("Error listening")
|
zlog.Error().Err(err).Msg("Error listening")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
l.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
// ll.Info("Binding local port on", srcaddr)
|
// ll.Info("Binding local port on", srcaddr)
|
||||||
|
|
||||||
ledger, _ := node.Ledger()
|
ledger, _ := node.Ledger()
|
||||||
@ -60,17 +65,12 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
|
|||||||
ctx,
|
ctx,
|
||||||
10*time.Second,
|
10*time.Second,
|
||||||
func() {
|
func() {
|
||||||
// Retrieve current ID for ip in the blockchain
|
|
||||||
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
|
|
||||||
// If mismatch, update the blockchain
|
|
||||||
//if !found {
|
|
||||||
updatedMap := map[string]interface{}{}
|
updatedMap := map[string]interface{}{}
|
||||||
updatedMap[node.Host().ID().String()] = &types.User{
|
updatedMap[node.Host().ID().String()] = &types.User{
|
||||||
PeerID: node.Host().ID().String(),
|
PeerID: node.Host().ID().String(),
|
||||||
Timestamp: time.Now().String(),
|
Timestamp: time.Now().String(),
|
||||||
}
|
}
|
||||||
ledger.Add(protocol.UsersLedgerKey, updatedMap)
|
ledger.Add(protocol.UsersLedgerKey, updatedMap)
|
||||||
// }
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -197,14 +197,13 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
|
|||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
zlog.Debug().Msg("Searching for workers")
|
|
||||||
|
|
||||||
data := ledger.LastBlock().Storage[servicesID]
|
data := ledger.LastBlock().Storage[servicesID]
|
||||||
|
|
||||||
zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")
|
zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")
|
||||||
|
|
||||||
for k, v := range data {
|
for k, v := range data {
|
||||||
zlog.Info().Msgf("Found worker %s", k)
|
zlog.Debug().Msgf("New worker found in the ledger data '%s'", k)
|
||||||
nd := &NodeData{}
|
nd := &NodeData{}
|
||||||
if err := v.Unmarshal(nd); err != nil {
|
if err := v.Unmarshal(nd); err != nil {
|
||||||
zlog.Error().Msg("cannot unmarshal node data")
|
zlog.Error().Msg("cannot unmarshal node data")
|
||||||
@ -245,8 +244,10 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
|
|||||||
// Start the service
|
// Start the service
|
||||||
port, err := freeport.GetFreePort()
|
port, err := freeport.GetFreePort()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Print(err)
|
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
|
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
|
||||||
nd.TunnelAddress = tunnelAddress
|
nd.TunnelAddress = tunnelAddress
|
||||||
service[nd.Name] = nodeServiceData{
|
service[nd.Name] = nodeServiceData{
|
||||||
@ -310,10 +311,6 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
|
|||||||
ctx,
|
ctx,
|
||||||
20*time.Second,
|
20*time.Second,
|
||||||
func() {
|
func() {
|
||||||
// Retrieve current ID for ip in the blockchain
|
|
||||||
//_, found := ledger.GetKey("services_localai", name)
|
|
||||||
// If mismatch, update the blockchain
|
|
||||||
//if !found {
|
|
||||||
updatedMap := map[string]interface{}{}
|
updatedMap := map[string]interface{}{}
|
||||||
updatedMap[name] = &NodeData{
|
updatedMap[name] = &NodeData{
|
||||||
Name: name,
|
Name: name,
|
||||||
@ -321,7 +318,6 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
|
|||||||
ID: nodeID(name),
|
ID: nodeID(name),
|
||||||
}
|
}
|
||||||
ledger.Add(servicesID, updatedMap)
|
ledger.Add(servicesID, updatedMap)
|
||||||
// }
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -354,7 +350,10 @@ func newNodeOpts(token string) ([]node.Option, error) {
|
|||||||
if loglevel == "" {
|
if loglevel == "" {
|
||||||
loglevel = "info"
|
loglevel = "info"
|
||||||
}
|
}
|
||||||
|
libp2ploglevel := os.Getenv("LOCALAI_LIBP2P_LOGLEVEL")
|
||||||
|
if libp2ploglevel == "" {
|
||||||
|
libp2ploglevel = "info"
|
||||||
|
}
|
||||||
c := config.Config{
|
c := config.Config{
|
||||||
Limit: config.ResourceLimit{
|
Limit: config.ResourceLimit{
|
||||||
Enable: noLimits,
|
Enable: noLimits,
|
||||||
@ -363,7 +362,7 @@ func newNodeOpts(token string) ([]node.Option, error) {
|
|||||||
NetworkToken: token,
|
NetworkToken: token,
|
||||||
LowProfile: false,
|
LowProfile: false,
|
||||||
LogLevel: loglevel,
|
LogLevel: loglevel,
|
||||||
Libp2pLogLevel: "fatal",
|
Libp2pLogLevel: libp2ploglevel,
|
||||||
Ledger: config.Ledger{
|
Ledger: config.Ledger{
|
||||||
SyncInterval: defaultInterval,
|
SyncInterval: defaultInterval,
|
||||||
AnnounceInterval: defaultInterval,
|
AnnounceInterval: defaultInterval,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user