feat(loader): enhance single active backend by treating as singleton (#5107)
Some checks are pending
Explorer deployment / build-linux (push) Waiting to run
GPU tests / ubuntu-latest (1.21.x) (push) Waiting to run
generate and publish intel docker caches / generate_caches (intel/oneapi-basekit:2025.0.0-0-devel-ubuntu22.04, linux/amd64, ubuntu-latest) (push) Waiting to run
build container images / hipblas-jobs (-aio-gpu-hipblas, rocm/dev-ubuntu-22.04:6.1, hipblas, true, ubuntu:22.04, extras, latest-gpu-hipblas, latest-aio-gpu-hipblas, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -hipblas-ffmpeg) (push) Waiting to run
build container images / hipblas-jobs (rocm/dev-ubuntu-22.04:6.1, hipblas, false, ubuntu:22.04, core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -hipblas-core) (push) Waiting to run
build container images / hipblas-jobs (rocm/dev-ubuntu-22.04:6.1, hipblas, false, ubuntu:22.04, extras, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -hipblas) (push) Waiting to run
build container images / hipblas-jobs (rocm/dev-ubuntu-22.04:6.1, hipblas, true, ubuntu:22.04, core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -hipblas-ffmpeg-core) (push) Waiting to run
build container images / self-hosted-jobs (-aio-gpu-intel-f16, quay.io/go-skynet/intel-oneapi-base:latest, sycl_f16, true, ubuntu:22.04, extras, latest-gpu-intel-f16, latest-aio-gpu-intel-f16, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -sycl-f16-ffmpeg) (push) Waiting to run
build container images / self-hosted-jobs (-aio-gpu-intel-f32, quay.io/go-skynet/intel-oneapi-base:latest, sycl_f32, true, ubuntu:22.04, extras, latest-gpu-intel-f32, latest-aio-gpu-intel-f32, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -sycl-f32-ffmpeg) (push) Waiting to run
build container images / self-hosted-jobs (-aio-gpu-nvidia-cuda-11, ubuntu:22.04, cublas, 11, 7, true, extras, latest-gpu-nvidia-cuda-11, latest-aio-gpu-nvidia-cuda-11, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -cublas-cuda11-ffmpeg) (push) Waiting to run
build container images / self-hosted-jobs (-aio-gpu-nvidia-cuda-12, ubuntu:22.04, cublas, 12, 0, true, extras, latest-gpu-nvidia-cuda-12, latest-aio-gpu-nvidia-cuda-12, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -cublas-cuda12-ffmpeg) (push) Waiting to run
build container images / self-hosted-jobs (quay.io/go-skynet/intel-oneapi-base:latest, sycl_f16, false, ubuntu:22.04, core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -sycl-f16-core) (push) Waiting to run
build container images / self-hosted-jobs (quay.io/go-skynet/intel-oneapi-base:latest, sycl_f16, true, ubuntu:22.04, core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -sycl-f16-ffmpeg-core) (push) Waiting to run
build container images / self-hosted-jobs (quay.io/go-skynet/intel-oneapi-base:latest, sycl_f32, false, ubuntu:22.04, core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -sycl-f32-core) (push) Waiting to run
build container images / self-hosted-jobs (quay.io/go-skynet/intel-oneapi-base:latest, sycl_f32, true, ubuntu:22.04, core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -sycl-f32-ffmpeg-core) (push) Waiting to run
build container images / self-hosted-jobs (ubuntu:22.04, , , extras, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, ) (push) Waiting to run
build container images / self-hosted-jobs (ubuntu:22.04, , true, extras, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -ffmpeg) (push) Waiting to run
build container images / self-hosted-jobs (ubuntu:22.04, cublas, 11, 7, , extras, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -cublas-cuda11) (push) Waiting to run
build container images / self-hosted-jobs (ubuntu:22.04, cublas, 12, 0, , extras, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -cublas-cuda12) (push) Waiting to run
build container images / core-image-build (-aio-cpu, ubuntu:22.04, , true, core, latest-cpu, latest-aio-cpu, --jobs=4 --output-sync=target, linux/amd64,linux/arm64, arc-runner-set, false, auto, -ffmpeg-core) (push) Waiting to run
build container images / core-image-build (ubuntu:22.04, cublas, 11, 7, , core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -cublas-cuda11-core) (push) Waiting to run
build container images / core-image-build (ubuntu:22.04, cublas, 11, 7, true, core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -cublas-cuda11-ffmpeg-core) (push) Waiting to run
build container images / core-image-build (ubuntu:22.04, cublas, 12, 0, , core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -cublas-cuda12-core) (push) Waiting to run
build container images / core-image-build (ubuntu:22.04, cublas, 12, 0, true, core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -cublas-cuda12-ffmpeg-core) (push) Waiting to run
build container images / core-image-build (ubuntu:22.04, vulkan, true, core, latest-vulkan-ffmpeg-core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -vulkan-ffmpeg-core) (push) Waiting to run
build container images / gh-runner (nvcr.io/nvidia/l4t-jetpack:r36.4.0, cublas, 12, 0, true, core, latest-nvidia-l4t-arm64-core, --jobs=4 --output-sync=target, linux/arm64, ubuntu-24.04-arm, true, false, -nvidia-l4t-arm64-core) (push) Waiting to run
Security Scan / tests (push) Waiting to run
Tests extras backends / tests-transformers (push) Waiting to run
Tests extras backends / tests-rerankers (push) Waiting to run
Tests extras backends / tests-diffusers (push) Waiting to run
Tests extras backends / tests-coqui (push) Waiting to run
tests / tests-linux (1.21.x) (push) Waiting to run
tests / tests-aio-container (push) Waiting to run
tests / tests-apple (1.21.x) (push) Waiting to run

feat(loader): enhance single active backend by treating at singleton

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto 2025-04-01 20:58:11 +02:00 committed by GitHub
parent c59975ab05
commit 2c425e9c69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 92 additions and 71 deletions

View File

@ -16,7 +16,7 @@ type Application struct {
func newApplication(appConfig *config.ApplicationConfig) *Application {
return &Application{
backendLoader: config.NewBackendConfigLoader(appConfig.ModelPath),
modelLoader: model.NewModelLoader(appConfig.ModelPath),
modelLoader: model.NewModelLoader(appConfig.ModelPath, appConfig.SingleBackend),
applicationConfig: appConfig,
templatesEvaluator: templates.NewEvaluator(appConfig.ModelPath),
}

View File

@ -143,7 +143,7 @@ func New(opts ...config.AppOption) (*Application, error) {
}()
}
if options.LoadToMemory != nil {
if options.LoadToMemory != nil && !options.SingleBackend {
for _, m := range options.LoadToMemory {
cfg, err := application.BackendLoader().LoadBackendConfigFileByNameDefaultOptions(m, options)
if err != nil {

View File

@ -17,6 +17,7 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, backendCo
if err != nil {
return nil, err
}
defer loader.Close()
var fn func() ([]float32, error)
switch model := inferenceModel.(type) {

View File

@ -16,6 +16,7 @@ func ImageGeneration(height, width, mode, step, seed int, positive_prompt, negat
if err != nil {
return nil, err
}
defer loader.Close()
fn := func() error {
_, err := inferenceModel.GenerateImage(

View File

@ -53,6 +53,7 @@ func ModelInference(ctx context.Context, s string, messages []schema.Message, im
if err != nil {
return nil, err
}
defer loader.Close()
var protoMessages []*proto.Message
// if we are using the tokenizer template, we need to convert the messages to proto messages

View File

@ -40,10 +40,6 @@ func ModelOptions(c config.BackendConfig, so *config.ApplicationConfig, opts ...
grpcOpts := grpcModelOpts(c)
defOpts = append(defOpts, model.WithLoadGRPCLoadModelOpts(grpcOpts))
if so.SingleBackend {
defOpts = append(defOpts, model.WithSingleActiveBackend())
}
if so.ParallelBackendRequests {
defOpts = append(defOpts, model.EnableParallelRequests)
}
@ -121,7 +117,7 @@ func grpcModelOpts(c config.BackendConfig) *pb.ModelOptions {
triggers := make([]*pb.GrammarTrigger, 0)
for _, t := range c.FunctionsConfig.GrammarConfig.GrammarTriggers {
triggers = append(triggers, &pb.GrammarTrigger{
Word: t.Word,
Word: t.Word,
})
}
@ -161,33 +157,33 @@ func grpcModelOpts(c config.BackendConfig) *pb.ModelOptions {
DisableLogStatus: c.DisableLogStatus,
DType: c.DType,
// LimitMMPerPrompt vLLM
LimitImagePerPrompt: int32(c.LimitMMPerPrompt.LimitImagePerPrompt),
LimitVideoPerPrompt: int32(c.LimitMMPerPrompt.LimitVideoPerPrompt),
LimitAudioPerPrompt: int32(c.LimitMMPerPrompt.LimitAudioPerPrompt),
MMProj: c.MMProj,
FlashAttention: c.FlashAttention,
CacheTypeKey: c.CacheTypeK,
CacheTypeValue: c.CacheTypeV,
NoKVOffload: c.NoKVOffloading,
YarnExtFactor: c.YarnExtFactor,
YarnAttnFactor: c.YarnAttnFactor,
YarnBetaFast: c.YarnBetaFast,
YarnBetaSlow: c.YarnBetaSlow,
NGQA: c.NGQA,
RMSNormEps: c.RMSNormEps,
MLock: mmlock,
RopeFreqBase: c.RopeFreqBase,
RopeScaling: c.RopeScaling,
Type: c.ModelType,
RopeFreqScale: c.RopeFreqScale,
NUMA: c.NUMA,
Embeddings: embeddings,
LowVRAM: lowVRAM,
NGPULayers: int32(nGPULayers),
MMap: mmap,
MainGPU: c.MainGPU,
Threads: int32(*c.Threads),
TensorSplit: c.TensorSplit,
LimitImagePerPrompt: int32(c.LimitMMPerPrompt.LimitImagePerPrompt),
LimitVideoPerPrompt: int32(c.LimitMMPerPrompt.LimitVideoPerPrompt),
LimitAudioPerPrompt: int32(c.LimitMMPerPrompt.LimitAudioPerPrompt),
MMProj: c.MMProj,
FlashAttention: c.FlashAttention,
CacheTypeKey: c.CacheTypeK,
CacheTypeValue: c.CacheTypeV,
NoKVOffload: c.NoKVOffloading,
YarnExtFactor: c.YarnExtFactor,
YarnAttnFactor: c.YarnAttnFactor,
YarnBetaFast: c.YarnBetaFast,
YarnBetaSlow: c.YarnBetaSlow,
NGQA: c.NGQA,
RMSNormEps: c.RMSNormEps,
MLock: mmlock,
RopeFreqBase: c.RopeFreqBase,
RopeScaling: c.RopeScaling,
Type: c.ModelType,
RopeFreqScale: c.RopeFreqScale,
NUMA: c.NUMA,
Embeddings: embeddings,
LowVRAM: lowVRAM,
NGPULayers: int32(nGPULayers),
MMap: mmap,
MainGPU: c.MainGPU,
Threads: int32(*c.Threads),
TensorSplit: c.TensorSplit,
// AutoGPTQ
ModelBaseName: c.AutoGPTQ.ModelBaseName,
Device: c.AutoGPTQ.Device,

View File

@ -12,10 +12,10 @@ import (
func Rerank(request *proto.RerankRequest, loader *model.ModelLoader, appConfig *config.ApplicationConfig, backendConfig config.BackendConfig) (*proto.RerankResult, error) {
opts := ModelOptions(backendConfig, appConfig)
rerankModel, err := loader.Load(opts...)
if err != nil {
return nil, err
}
defer loader.Close()
if rerankModel == nil {
return nil, fmt.Errorf("could not load rerank model")

View File

@ -26,10 +26,10 @@ func SoundGeneration(
opts := ModelOptions(backendConfig, appConfig)
soundGenModel, err := loader.Load(opts...)
if err != nil {
return "", nil, err
}
defer loader.Close()
if soundGenModel == nil {
return "", nil, fmt.Errorf("could not load sound generation model")

View File

@ -20,6 +20,7 @@ func TokenMetrics(
if err != nil {
return nil, err
}
defer loader.Close()
if model == nil {
return nil, fmt.Errorf("could not loadmodel model")

View File

@ -14,10 +14,10 @@ func ModelTokenize(s string, loader *model.ModelLoader, backendConfig config.Bac
opts := ModelOptions(backendConfig, appConfig)
inferenceModel, err = loader.Load(opts...)
if err != nil {
return schema.TokenizeResponse{}, err
}
defer loader.Close()
predictOptions := gRPCPredictOpts(backendConfig, loader.ModelPath)
predictOptions.Prompt = s

View File

@ -24,6 +24,7 @@ func ModelTranscription(audio, language string, translate bool, ml *model.ModelL
if err != nil {
return nil, err
}
defer ml.Close()
if transcriptionModel == nil {
return nil, fmt.Errorf("could not load transcription model")

View File

@ -23,10 +23,10 @@ func ModelTTS(
) (string, *proto.Result, error) {
opts := ModelOptions(backendConfig, appConfig, model.WithDefaultBackendString(model.PiperBackend))
ttsModel, err := loader.Load(opts...)
if err != nil {
return "", nil, err
}
defer loader.Close()
if ttsModel == nil {
return "", nil, fmt.Errorf("could not load tts model %q", backendConfig.Model)

View File

@ -19,6 +19,8 @@ func VAD(request *schema.VADRequest,
if err != nil {
return nil, err
}
defer ml.Close()
req := proto.VADRequest{
Audio: request.Audio,
}

View File

@ -74,7 +74,7 @@ func (t *SoundGenerationCMD) Run(ctx *cliContext.Context) error {
AssetsDestination: t.BackendAssetsPath,
ExternalGRPCBackends: externalBackends,
}
ml := model.NewModelLoader(opts.ModelPath)
ml := model.NewModelLoader(opts.ModelPath, opts.SingleBackend)
defer func() {
err := ml.StopAllGRPC()

View File

@ -32,7 +32,7 @@ func (t *TranscriptCMD) Run(ctx *cliContext.Context) error {
}
cl := config.NewBackendConfigLoader(t.ModelsPath)
ml := model.NewModelLoader(opts.ModelPath)
ml := model.NewModelLoader(opts.ModelPath, opts.SingleBackend)
if err := cl.LoadBackendConfigsFromPath(t.ModelsPath); err != nil {
return err
}

View File

@ -41,7 +41,7 @@ func (t *TTSCMD) Run(ctx *cliContext.Context) error {
AudioDir: outputDir,
AssetsDestination: t.BackendAssetsPath,
}
ml := model.NewModelLoader(opts.ModelPath)
ml := model.NewModelLoader(opts.ModelPath, opts.SingleBackend)
defer func() {
err := ml.StopAllGRPC()

View File

@ -21,6 +21,7 @@ func StoresSetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi
if err != nil {
return err
}
defer sl.Close()
vals := make([][]byte, len(input.Values))
for i, v := range input.Values {
@ -48,6 +49,7 @@ func StoresDeleteEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationCo
if err != nil {
return err
}
defer sl.Close()
if err := store.DeleteCols(c.Context(), sb, input.Keys); err != nil {
return err
@ -69,6 +71,7 @@ func StoresGetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi
if err != nil {
return err
}
defer sl.Close()
keys, vals, err := store.GetCols(c.Context(), sb, input.Keys)
if err != nil {
@ -100,6 +103,7 @@ func StoresFindEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConf
if err != nil {
return err
}
defer sl.Close()
keys, vals, similarities, err := store.Find(c.Context(), sb, input.Key, input.Topk)
if err != nil {

View File

@ -40,7 +40,7 @@ func TestAssistantEndpoints(t *testing.T) {
cl := &config.BackendConfigLoader{}
//configsDir := "/tmp/localai/configs"
modelPath := "/tmp/localai/model"
var ml = model.NewModelLoader(modelPath)
var ml = model.NewModelLoader(modelPath, false)
appConfig := &config.ApplicationConfig{
ConfigsDir: configsDir,

View File

@ -50,11 +50,10 @@ func RegisterLocalAIRoutes(router *fiber.App,
router.Post("/v1/vad", vadChain...)
// Stores
sl := model.NewModelLoader("")
router.Post("/stores/set", localai.StoresSetEndpoint(sl, appConfig))
router.Post("/stores/delete", localai.StoresDeleteEndpoint(sl, appConfig))
router.Post("/stores/get", localai.StoresGetEndpoint(sl, appConfig))
router.Post("/stores/find", localai.StoresFindEndpoint(sl, appConfig))
router.Post("/stores/set", localai.StoresSetEndpoint(ml, appConfig))
router.Post("/stores/delete", localai.StoresDeleteEndpoint(ml, appConfig))
router.Post("/stores/get", localai.StoresGetEndpoint(ml, appConfig))
router.Post("/stores/find", localai.StoresFindEndpoint(ml, appConfig))
if !appConfig.DisableMetrics {
router.Get("/metrics", localai.LocalAIMetricsEndpoint())

View File

@ -509,7 +509,23 @@ func (ml *ModelLoader) stopActiveBackends(modelID string, singleActiveBackend bo
}
}
func (ml *ModelLoader) Close() {
if !ml.singletonMode {
return
}
ml.singletonLock.Unlock()
}
func (ml *ModelLoader) lockBackend() {
if !ml.singletonMode {
return
}
ml.singletonLock.Lock()
}
func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
ml.lockBackend() // grab the singleton lock if needed
o := NewOptions(opts...)
// Return earlier if we have a model already loaded
@ -520,7 +536,7 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
return m.GRPC(o.parallelRequests, ml.wd), nil
}
ml.stopActiveBackends(o.modelID, o.singleActiveBackend)
ml.stopActiveBackends(o.modelID, ml.singletonMode)
// if a backend is defined, return the loader directly
if o.backendString != "" {
@ -533,6 +549,7 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
// get backends embedded in the binary
autoLoadBackends, err := ml.ListAvailableBackends(o.assetDir)
if err != nil {
ml.Close() // we failed, release the lock
return nil, err
}
@ -564,5 +581,7 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
}
}
ml.Close() // make sure to release the lock in case of failure
return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error())
}

View File

@ -18,16 +18,19 @@ import (
// TODO: Split ModelLoader and TemplateLoader? Just to keep things more organized. Left together to share a mutex until I look into that. Would split if we seperate directories for .bin/.yaml and .tmpl
type ModelLoader struct {
ModelPath string
mu sync.Mutex
models map[string]*Model
wd *WatchDog
ModelPath string
mu sync.Mutex
singletonLock sync.Mutex
singletonMode bool
models map[string]*Model
wd *WatchDog
}
func NewModelLoader(modelPath string) *ModelLoader {
func NewModelLoader(modelPath string, singleActiveBackend bool) *ModelLoader {
nml := &ModelLoader{
ModelPath: modelPath,
models: make(map[string]*Model),
ModelPath: modelPath,
models: make(map[string]*Model),
singletonMode: singleActiveBackend,
}
return nml

View File

@ -17,10 +17,9 @@ type Options struct {
externalBackends map[string]string
grpcAttempts int
grpcAttemptsDelay int
singleActiveBackend bool
parallelRequests bool
grpcAttempts int
grpcAttemptsDelay int
parallelRequests bool
}
type Option func(*Options)
@ -88,12 +87,6 @@ func WithContext(ctx context.Context) Option {
}
}
func WithSingleActiveBackend() Option {
return func(o *Options) {
o.singleActiveBackend = true
}
}
func WithModelID(id string) Option {
return func(o *Options) {
o.modelID = id

View File

@ -21,7 +21,7 @@ var _ = Describe("ModelLoader", func() {
// Setup the model loader with a test directory
modelPath = "/tmp/test_model_path"
os.Mkdir(modelPath, 0755)
modelLoader = model.NewModelLoader(modelPath)
modelLoader = model.NewModelLoader(modelPath, false)
})
AfterEach(func() {

View File

@ -70,7 +70,7 @@ var _ = Describe("Integration tests for the stores backend(s) and internal APIs"
model.WithModel("test"),
}
sl = model.NewModelLoader("")
sl = model.NewModelLoader("", false)
sc, err = sl.Load(storeOpts...)
Expect(err).ToNot(HaveOccurred())
Expect(sc).ToNot(BeNil())
@ -235,7 +235,7 @@ var _ = Describe("Integration tests for the stores backend(s) and internal APIs"
keys := [][]float32{{1.0, 0.0, 0.0}, {0.0, 1.0, 0.0}, {0.0, 0.0, 1.0}, {-1.0, 0.0, 0.0}}
vals := [][]byte{[]byte("x"), []byte("y"), []byte("z"), []byte("-z")}
err := store.SetCols(context.Background(), sc, keys, vals);
err := store.SetCols(context.Background(), sc, keys, vals)
Expect(err).ToNot(HaveOccurred())
_, _, sims, err := store.Find(context.Background(), sc, keys[0], 4)
@ -247,7 +247,7 @@ var _ = Describe("Integration tests for the stores backend(s) and internal APIs"
keys := [][]float32{{1.0, 0.0, 1.0}, {0.0, 2.0, 0.0}, {0.0, 0.0, -1.0}, {-1.0, 0.0, -1.0}}
vals := [][]byte{[]byte("x"), []byte("y"), []byte("z"), []byte("-z")}
err := store.SetCols(context.Background(), sc, keys, vals);
err := store.SetCols(context.Background(), sc, keys, vals)
Expect(err).ToNot(HaveOccurred())
_, _, sims, err := store.Find(context.Background(), sc, keys[0], 4)
@ -314,7 +314,7 @@ var _ = Describe("Integration tests for the stores backend(s) and internal APIs"
normalize(keys[6:])
err := store.SetCols(context.Background(), sc, keys, vals);
err := store.SetCols(context.Background(), sc, keys, vals)
Expect(err).ToNot(HaveOccurred())
expectTriangleEq(keys, vals)
@ -341,7 +341,7 @@ var _ = Describe("Integration tests for the stores backend(s) and internal APIs"
c += 1
}
err := store.SetCols(context.Background(), sc, keys, vals);
err := store.SetCols(context.Background(), sc, keys, vals)
Expect(err).ToNot(HaveOccurred())
expectTriangleEq(keys, vals)