From 05f70044875e18747c08605740f3a59169e998bf Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 1 Apr 2025 00:01:10 +0200 Subject: [PATCH] fix: race during stop of active backends (#5106) * chore: drop double call to stop all backends, refactors Signed-off-by: Ettore Di Giacinto * fix: do lock when cycling to models to delete Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- .env | 3 +++ pkg/model/initializers.go | 20 ++++++++++++-------- pkg/model/loader.go | 20 -------------------- pkg/model/process.go | 39 ++++++++++++++++++++++++++++++--------- 4 files changed, 45 insertions(+), 37 deletions(-) diff --git a/.env b/.env index ee8db74e..86596105 100644 --- a/.env +++ b/.env @@ -29,6 +29,9 @@ ## Enable/Disable single backend (useful if only one GPU is available) # LOCALAI_SINGLE_ACTIVE_BACKEND=true +# Forces shutdown of the backends if busy (only if LOCALAI_SINGLE_ACTIVE_BACKEND is set) +# LOCALAI_FORCE_BACKEND_SHUTDOWN=true + ## Specify a build type. Available: cublas, openblas, clblas. ## cuBLAS: This is a GPU-accelerated version of the complete standard BLAS (Basic Linear Algebra Subprograms) library. It's provided by Nvidia and is part of their CUDA toolkit. ## OpenBLAS: This is an open-source implementation of the BLAS library that aims to provide highly optimized code for various platforms. It includes support for multi-threading and can be compiled to use hardware-specific features for additional performance. OpenBLAS can run on many kinds of hardware, including CPUs from Intel, AMD, and ARM. diff --git a/pkg/model/initializers.go b/pkg/model/initializers.go index 1a51eb2a..12a1a972 100644 --- a/pkg/model/initializers.go +++ b/pkg/model/initializers.go @@ -473,8 +473,6 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e backend = realBackend } - ml.stopActiveBackends(o.modelID, o.singleActiveBackend) - var backendToConsume string switch backend { @@ -497,13 +495,17 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e } func (ml *ModelLoader) stopActiveBackends(modelID string, singleActiveBackend bool) { + if !singleActiveBackend { + return + } + // If we can have only one backend active, kill all the others (except external backends) - if singleActiveBackend { - log.Debug().Msgf("Stopping all backends except '%s'", modelID) - err := ml.StopGRPC(allExcept(modelID)) - if err != nil { - log.Error().Err(err).Str("keptModel", modelID).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing") - } + + // Stop all backends except the one we are going to load + log.Debug().Msgf("Stopping all backends except '%s'", modelID) + err := ml.StopGRPC(allExcept(modelID)) + if err != nil { + log.Error().Err(err).Str("keptModel", modelID).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing") } } @@ -520,10 +522,12 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) { ml.stopActiveBackends(o.modelID, o.singleActiveBackend) + // if a backend is defined, return the loader directly if o.backendString != "" { return ml.backendLoader(opts...) } + // Otherwise scan for backends in the asset directory var err error // get backends embedded in the binary diff --git a/pkg/model/loader.go b/pkg/model/loader.go index bb9bdd8a..c25662d3 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -142,26 +142,6 @@ func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, func (ml *ModelLoader) ShutdownModel(modelName string) error { ml.mu.Lock() defer ml.mu.Unlock() - model, ok := ml.models[modelName] - if !ok { - return fmt.Errorf("model %s not found", modelName) - } - - retries := 1 - for model.GRPC(false, ml.wd).IsBusy() { - log.Debug().Msgf("%s busy. Waiting.", modelName) - dur := time.Duration(retries*2) * time.Second - if dur > retryTimeout { - dur = retryTimeout - } - time.Sleep(dur) - retries++ - - if retries > 10 && os.Getenv("LOCALAI_FORCE_BACKEND_SHUTDOWN") == "true" { - log.Warn().Msgf("Model %s is still busy after %d retries. Forcing shutdown.", modelName, retries) - break - } - } return ml.deleteProcess(modelName) } diff --git a/pkg/model/process.go b/pkg/model/process.go index c27fbda3..2e8369a0 100644 --- a/pkg/model/process.go +++ b/pkg/model/process.go @@ -9,25 +9,43 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/hpcloud/tail" process "github.com/mudler/go-processmanager" "github.com/rs/zerolog/log" ) +var forceBackendShutdown bool = os.Getenv("LOCALAI_FORCE_BACKEND_SHUTDOWN") == "true" + func (ml *ModelLoader) deleteProcess(s string) error { + model, ok := ml.models[s] + if !ok { + log.Debug().Msgf("Model %s not found", s) + return fmt.Errorf("model %s not found", s) + } + defer delete(ml.models, s) + retries := 1 + for model.GRPC(false, ml.wd).IsBusy() { + log.Debug().Msgf("%s busy. Waiting.", s) + dur := time.Duration(retries*2) * time.Second + if dur > retryTimeout { + dur = retryTimeout + } + time.Sleep(dur) + retries++ + + if retries > 10 && forceBackendShutdown { + log.Warn().Msgf("Model %s is still busy after %d retries. Forcing shutdown.", s, retries) + break + } + } + log.Debug().Msgf("Deleting process %s", s) - m, exists := ml.models[s] - if !exists { - log.Error().Msgf("Model does not exist %s", s) - // Nothing to do - return nil - } - - process := m.Process() + process := model.Process() if process == nil { log.Error().Msgf("No process for %s", s) // Nothing to do as there is no process @@ -44,9 +62,12 @@ func (ml *ModelLoader) deleteProcess(s string) error { func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error { var err error = nil + ml.mu.Lock() + defer ml.mu.Unlock() + for k, m := range ml.models { if filter(k, m.Process()) { - e := ml.ShutdownModel(k) + e := ml.deleteProcess(k) err = errors.Join(err, e) } }