From fdd95d1d8695d005f7047bd03fbe6a2257652c60 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 16 Nov 2023 08:20:05 +0100 Subject: [PATCH] feat: allow to run parallel requests (#1290) * feat: allow to run parallel requests Signed-off-by: Ettore Di Giacinto * fixup Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- .env | 5 ++- api/backend/options.go | 4 ++ api/localai/backend_monitor.go | 4 +- api/options/options.go | 9 ++++- main.go | 11 +++++- pkg/model/initializers.go | 69 +++++++++++++++++++++------------- pkg/model/loader.go | 26 ++++++++----- pkg/model/options.go | 5 +++ pkg/model/process.go | 2 +- 9 files changed, 91 insertions(+), 44 deletions(-) diff --git a/.env b/.env index 8e0b0500..c4691deb 100644 --- a/.env +++ b/.env @@ -69,4 +69,7 @@ MODELS_PATH=/models # PYTHON_GRPC_MAX_WORKERS=1 ### Define the number of parallel LLAMA.cpp workers (Defaults to 1) -# LLAMACPP_PARALLEL=1 \ No newline at end of file +# LLAMACPP_PARALLEL=1 + +### Enable to run parallel requests +# PARALLEL_REQUESTS=true \ No newline at end of file diff --git a/api/backend/options.go b/api/backend/options.go index e8913f8c..c83cb92b 100644 --- a/api/backend/options.go +++ b/api/backend/options.go @@ -16,6 +16,10 @@ func modelOpts(c config.Config, o *options.Option, opts []model.Option) []model. opts = append(opts, model.WithSingleActiveBackend()) } + if o.ParallelBackendRequests { + opts = append(opts, model.EnableParallelRequests) + } + if c.GRPC.Attempts != 0 { opts = append(opts, model.WithGRPCAttempts(c.GRPC.Attempts)) } diff --git a/api/localai/backend_monitor.go b/api/localai/backend_monitor.go index e8b53556..e841c5f8 100644 --- a/api/localai/backend_monitor.go +++ b/api/localai/backend_monitor.go @@ -125,11 +125,11 @@ func BackendMonitorEndpoint(bm BackendMonitor) func(c *fiber.Ctx) error { client := bm.options.Loader.CheckIsLoaded(backendId) - if client == nil { + if client == "" { return fmt.Errorf("backend %s is not currently loaded", backendId) } - status, rpcErr := client.Status(context.TODO()) + status, rpcErr := client.GRPC().Status(context.TODO()) if rpcErr != nil { log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error()) val, slbErr := bm.SampleLocalBackendProcess(backendId) diff --git a/api/options/options.go b/api/options/options.go index 13c9d607..9488e549 100644 --- a/api/options/options.go +++ b/api/options/options.go @@ -5,9 +5,9 @@ import ( "embed" "encoding/json" + "github.com/go-skynet/LocalAI/metrics" "github.com/go-skynet/LocalAI/pkg/gallery" model "github.com/go-skynet/LocalAI/pkg/model" - "github.com/go-skynet/LocalAI/metrics" "github.com/rs/zerolog/log" ) @@ -36,7 +36,8 @@ type Option struct { AutoloadGalleries bool - SingleBackend bool + SingleBackend bool + ParallelBackendRequests bool } type AppOption func(*Option) @@ -66,6 +67,10 @@ var EnableSingleBackend = func(o *Option) { o.SingleBackend = true } +var EnableParallelBackendRequests = func(o *Option) { + o.ParallelBackendRequests = true +} + var EnableGalleriesAutoload = func(o *Option) { o.AutoloadGalleries = true } diff --git a/main.go b/main.go index 73a18fd0..bc9d1bba 100644 --- a/main.go +++ b/main.go @@ -16,9 +16,9 @@ import ( config "github.com/go-skynet/LocalAI/api/config" "github.com/go-skynet/LocalAI/api/options" "github.com/go-skynet/LocalAI/internal" + "github.com/go-skynet/LocalAI/metrics" "github.com/go-skynet/LocalAI/pkg/gallery" model "github.com/go-skynet/LocalAI/pkg/model" - "github.com/go-skynet/LocalAI/metrics" "github.com/rs/zerolog" "github.com/rs/zerolog/log" progressbar "github.com/schollz/progressbar/v3" @@ -63,6 +63,11 @@ func main() { EnvVars: []string{"SINGLE_ACTIVE_BACKEND"}, Usage: "Allow only one backend to be running.", }, + &cli.BoolFlag{ + Name: "parallel-requests", + EnvVars: []string{"PARALLEL_REQUESTS"}, + Usage: "Enable backends to handle multiple requests in parallel. This is for backends that supports multiple requests in parallel, like llama.cpp or vllm", + }, &cli.BoolFlag{ Name: "cors", EnvVars: []string{"CORS"}, @@ -193,7 +198,9 @@ For a list of compatible model, check out: https://localai.io/model-compatibilit options.WithUploadLimitMB(ctx.Int("upload-limit")), options.WithApiKeys(ctx.StringSlice("api-keys")), } - + if ctx.Bool("parallel-requests") { + opts = append(opts, options.EnableParallelBackendRequests) + } if ctx.Bool("single-active-backend") { opts = append(opts, options.EnableSingleBackend) } diff --git a/pkg/model/initializers.go b/pkg/model/initializers.go index 535e21e5..c303e64d 100644 --- a/pkg/model/initializers.go +++ b/pkg/model/initializers.go @@ -61,11 +61,11 @@ var AutoLoadBackends []string = []string{ // starts the grpcModelProcess for the backend, and returns a grpc client // It also loads the model -func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (*grpc.Client, error) { - return func(modelName, modelFile string) (*grpc.Client, error) { +func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (ModelAddress, error) { + return func(modelName, modelFile string) (ModelAddress, error) { log.Debug().Msgf("Loading Model %s with gRPC (file: %s) (backend: %s): %+v", modelName, modelFile, backend, *o) - var client *grpc.Client + var client ModelAddress getFreeAddress := func() (string, error) { port, err := freeport.GetFreePort() @@ -82,46 +82,46 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string if _, err := os.Stat(uri); err == nil { serverAddress, err := getFreeAddress() if err != nil { - return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) + return "", fmt.Errorf("failed allocating free ports: %s", err.Error()) } // Make sure the process is executable if err := ml.startProcess(uri, o.model, serverAddress); err != nil { - return nil, err + return "", err } log.Debug().Msgf("GRPC Service Started") - client = grpc.NewClient(serverAddress) + client = ModelAddress(serverAddress) } else { // address - client = grpc.NewClient(uri) + client = ModelAddress(uri) } } else { grpcProcess := filepath.Join(o.assetDir, "backend-assets", "grpc", backend) // Check if the file exists if _, err := os.Stat(grpcProcess); os.IsNotExist(err) { - return nil, fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess) + return "", fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess) } serverAddress, err := getFreeAddress() if err != nil { - return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) + return "", fmt.Errorf("failed allocating free ports: %s", err.Error()) } // Make sure the process is executable if err := ml.startProcess(grpcProcess, o.model, serverAddress); err != nil { - return nil, err + return "", err } log.Debug().Msgf("GRPC Service Started") - client = grpc.NewClient(serverAddress) + client = ModelAddress(serverAddress) } // Wait for the service to start up ready := false for i := 0; i < o.grpcAttempts; i++ { - if client.HealthCheck(context.Background()) { + if client.GRPC().HealthCheck(context.Background()) { log.Debug().Msgf("GRPC Service Ready") ready = true break @@ -131,7 +131,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string if !ready { log.Debug().Msgf("GRPC Service NOT ready") - return nil, fmt.Errorf("grpc service not ready") + return "", fmt.Errorf("grpc service not ready") } options := *o.gRPCOptions @@ -140,19 +140,30 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string log.Debug().Msgf("GRPC: Loading model with options: %+v", options) - res, err := client.LoadModel(o.context, &options) + res, err := client.GRPC().LoadModel(o.context, &options) if err != nil { - return nil, fmt.Errorf("could not load model: %w", err) + return "", fmt.Errorf("could not load model: %w", err) } if !res.Success { - return nil, fmt.Errorf("could not load model (no success): %s", res.Message) + return "", fmt.Errorf("could not load model (no success): %s", res.Message) } return client, nil } } -func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err error) { +func (ml *ModelLoader) resolveAddress(addr ModelAddress, parallel bool) (*grpc.Client, error) { + if parallel { + return addr.GRPC(), nil + } + + if _, ok := ml.grpcClients[string(addr)]; !ok { + ml.grpcClients[string(addr)] = addr.GRPC() + } + return ml.grpcClients[string(addr)], nil +} + +func (ml *ModelLoader) BackendLoader(opts ...Option) (client *grpc.Client, err error) { o := NewOptions(opts...) log.Debug().Msgf("Loading model %s from %s", o.backendString, o.model) @@ -166,22 +177,25 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err er ml.mu.Unlock() } - // if an external backend is provided, use it - _, externalBackendExists := o.externalBackends[backend] - if externalBackendExists { - return ml.LoadModel(o.model, ml.grpcModel(backend, o)) - } + var backendToConsume string switch backend { case Gpt4AllLlamaBackend, Gpt4AllMptBackend, Gpt4AllJBackend, Gpt4All: o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "gpt4all") - return ml.LoadModel(o.model, ml.grpcModel(Gpt4All, o)) + backendToConsume = Gpt4All case PiperBackend: o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "espeak-ng-data") - return ml.LoadModel(o.model, ml.grpcModel(PiperBackend, o)) + backendToConsume = PiperBackend default: - return ml.LoadModel(o.model, ml.grpcModel(backend, o)) + backendToConsume = backend } + + addr, err := ml.LoadModel(o.model, ml.grpcModel(backendToConsume, o)) + if err != nil { + return nil, err + } + + return ml.resolveAddress(addr, o.parallelRequests) } func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) { @@ -190,10 +204,11 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) { ml.mu.Lock() // Return earlier if we have a model already loaded // (avoid looping through all the backends) - if m := ml.CheckIsLoaded(o.model); m != nil { + if m := ml.CheckIsLoaded(o.model); m != "" { log.Debug().Msgf("Model '%s' already loaded", o.model) ml.mu.Unlock() - return m, nil + + return ml.resolveAddress(m, o.parallelRequests) } // If we can have only one backend active, kill all the others (except external backends) if o.singleActiveBackend { diff --git a/pkg/model/loader.go b/pkg/model/loader.go index e4a4437c..c9471f1c 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -59,15 +59,23 @@ type ModelLoader struct { ModelPath string mu sync.Mutex // TODO: this needs generics - models map[string]*grpc.Client + grpcClients map[string]*grpc.Client + models map[string]ModelAddress grpcProcesses map[string]*process.Process templates map[TemplateType]map[string]*template.Template } +type ModelAddress string + +func (m ModelAddress) GRPC() *grpc.Client { + return grpc.NewClient(string(m)) +} + func NewModelLoader(modelPath string) *ModelLoader { nml := &ModelLoader{ ModelPath: modelPath, - models: make(map[string]*grpc.Client), + grpcClients: make(map[string]*grpc.Client), + models: make(map[string]ModelAddress), templates: make(map[TemplateType]map[string]*template.Template), grpcProcesses: make(map[string]*process.Process), } @@ -98,12 +106,12 @@ func (ml *ModelLoader) ListModels() ([]string, error) { return models, nil } -func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (*grpc.Client, error)) (*grpc.Client, error) { +func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (ModelAddress, error)) (ModelAddress, error) { ml.mu.Lock() defer ml.mu.Unlock() // Check if we already have a loaded model - if model := ml.CheckIsLoaded(modelName); model != nil { + if model := ml.CheckIsLoaded(modelName); model != "" { return model, nil } @@ -113,7 +121,7 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( model, err := loader(modelName, modelFile) if err != nil { - return nil, err + return "", err } // TODO: Add a helper method to iterate all prompt templates associated with a config if and only if it's YAML? @@ -138,24 +146,24 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error { return ml.deleteProcess(modelName) } -func (ml *ModelLoader) CheckIsLoaded(s string) *grpc.Client { +func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress { if m, ok := ml.models[s]; ok { log.Debug().Msgf("Model already loaded in memory: %s", s) - if !m.HealthCheck(context.Background()) { + if !m.GRPC().HealthCheck(context.Background()) { log.Debug().Msgf("GRPC Model not responding: %s", s) if !ml.grpcProcesses[s].IsAlive() { log.Debug().Msgf("GRPC Process is not responding: %s", s) // stop and delete the process, this forces to re-load the model and re-create again the service ml.deleteProcess(s) - return nil + return "" } } return m } - return nil + return "" } func (ml *ModelLoader) EvaluateTemplateForPrompt(templateType TemplateType, templateName string, in PromptTemplateData) (string, error) { diff --git a/pkg/model/options.go b/pkg/model/options.go index faaf6fb2..5748be9b 100644 --- a/pkg/model/options.go +++ b/pkg/model/options.go @@ -20,10 +20,15 @@ type Options struct { grpcAttempts int grpcAttemptsDelay int singleActiveBackend bool + parallelRequests bool } type Option func(*Options) +var EnableParallelRequests = func(o *Options) { + o.parallelRequests = true +} + func WithExternalBackend(name string, uri string) Option { return func(o *Options) { if o.externalBackends == nil { diff --git a/pkg/model/process.go b/pkg/model/process.go index 156f4195..7048499d 100644 --- a/pkg/model/process.go +++ b/pkg/model/process.go @@ -17,7 +17,7 @@ import ( func (ml *ModelLoader) StopAllExcept(s string) { ml.StopGRPC(func(id string, p *process.Process) bool { if id != s { - for ml.models[id].IsBusy() { + for ml.models[id].GRPC().IsBusy() { log.Debug().Msgf("%s busy. Waiting.", id) time.Sleep(2 * time.Second) }