chore(refactor): track grpcProcess in the model structure (#3663)

* chore(refactor): track grpcProcess in the model structure

This avoids to have to handle in two parts the data relative to the same
model. It makes it easier to track and use mutex with.

This also fixes races conditions while accessing to the model.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* chore(tests): run protogen-go before starting aio tests

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* chore(tests): install protoc in aio tests

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto 2024-09-26 12:44:55 +02:00 committed by GitHub
parent 3d12d2037c
commit fa5c98549a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 71 additions and 44 deletions

View File

@ -178,13 +178,22 @@ jobs:
uses: actions/checkout@v4
with:
submodules: true
- name: Dependencies
run: |
# Install protoc
curl -L -s https://github.com/protocolbuffers/protobuf/releases/download/v26.1/protoc-26.1-linux-x86_64.zip -o protoc.zip && \
unzip -j -d /usr/local/bin protoc.zip bin/protoc && \
rm protoc.zip
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.34.2
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@1958fcbe2ca8bd93af633f11e97d44e567e945af
PATH="$PATH:$HOME/go/bin" make protogen-go
- name: Build images
run: |
docker build --build-arg FFMPEG=true --build-arg IMAGE_TYPE=extras --build-arg EXTRA_BACKENDS=rerankers --build-arg MAKEFLAGS="--jobs=5 --output-sync=target" -t local-ai:tests -f Dockerfile .
BASE_IMAGE=local-ai:tests DOCKER_AIO_IMAGE=local-ai-aio:test make docker-aio
- name: Test
run: |
LOCALAI_MODELS_DIR=$PWD/models LOCALAI_IMAGE_TAG=test LOCALAI_IMAGE=local-ai-aio \
PATH="$PATH:$HOME/go/bin" LOCALAI_MODELS_DIR=$PWD/models LOCALAI_IMAGE_TAG=test LOCALAI_IMAGE=local-ai-aio \
make run-e2e-aio
- name: Setup tmate session if tests fail
if: ${{ failure() }}

View File

@ -468,7 +468,7 @@ run-e2e-image:
ls -liah $(abspath ./tests/e2e-fixtures)
docker run -p 5390:8080 -e MODELS_PATH=/models -e THREADS=1 -e DEBUG=true -d --rm -v $(TEST_DIR):/models --gpus all --name e2e-tests-$(RANDOM) localai-tests
run-e2e-aio:
run-e2e-aio: protogen-go
@echo 'Running e2e AIO tests'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --flake-attempts 5 -v -r ./tests/e2e-aio

View File

@ -304,18 +304,19 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
}
// Make sure the process is executable
if err := ml.startProcess(uri, o.model, serverAddress); err != nil {
process, err := ml.startProcess(uri, o.model, serverAddress)
if err != nil {
log.Error().Err(err).Str("path", uri).Msg("failed to launch ")
return nil, err
}
log.Debug().Msgf("GRPC Service Started")
client = NewModel(modelName, serverAddress)
client = NewModel(modelName, serverAddress, process)
} else {
log.Debug().Msg("external backend is uri")
// address
client = NewModel(modelName, uri)
client = NewModel(modelName, uri, nil)
}
} else {
grpcProcess := backendPath(o.assetDir, backend)
@ -346,13 +347,14 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
args, grpcProcess = library.LoadLDSO(o.assetDir, args, grpcProcess)
// Make sure the process is executable in any circumstance
if err := ml.startProcess(grpcProcess, o.model, serverAddress, args...); err != nil {
process, err := ml.startProcess(grpcProcess, o.model, serverAddress, args...)
if err != nil {
return nil, err
}
log.Debug().Msgf("GRPC Service Started")
client = NewModel(modelName, serverAddress)
client = NewModel(modelName, serverAddress, process)
}
log.Debug().Msgf("Wait for the service to start up")
@ -374,6 +376,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
if !ready {
log.Debug().Msgf("GRPC Service NOT ready")
ml.deleteProcess(o.model)
return nil, fmt.Errorf("grpc service not ready")
}
@ -385,9 +388,11 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options)
if err != nil {
ml.deleteProcess(o.model)
return nil, fmt.Errorf("could not load model: %w", err)
}
if !res.Success {
ml.deleteProcess(o.model)
return nil, fmt.Errorf("could not load model (no success): %s", res.Message)
}

View File

@ -13,7 +13,6 @@ import (
"github.com/mudler/LocalAI/pkg/utils"
process "github.com/mudler/go-processmanager"
"github.com/rs/zerolog/log"
)
@ -21,20 +20,18 @@ import (
// TODO: Split ModelLoader and TemplateLoader? Just to keep things more organized. Left together to share a mutex until I look into that. Would split if we seperate directories for .bin/.yaml and .tmpl
type ModelLoader struct {
ModelPath string
mu sync.Mutex
models map[string]*Model
grpcProcesses map[string]*process.Process
templates *templates.TemplateCache
wd *WatchDog
ModelPath string
mu sync.Mutex
models map[string]*Model
templates *templates.TemplateCache
wd *WatchDog
}
func NewModelLoader(modelPath string) *ModelLoader {
nml := &ModelLoader{
ModelPath: modelPath,
models: make(map[string]*Model),
templates: templates.NewTemplateCache(modelPath),
grpcProcesses: make(map[string]*process.Process),
ModelPath: modelPath,
models: make(map[string]*Model),
templates: templates.NewTemplateCache(modelPath),
}
return nml
@ -127,6 +124,8 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
modelFile := filepath.Join(ml.ModelPath, modelName)
log.Debug().Msgf("Loading model in memory from file: %s", modelFile)
ml.mu.Lock()
defer ml.mu.Unlock()
model, err := loader(modelName, modelFile)
if err != nil {
return nil, err
@ -136,8 +135,6 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
return nil, fmt.Errorf("loader didn't return a model")
}
ml.mu.Lock()
defer ml.mu.Unlock()
ml.models[modelName] = model
return model, nil
@ -146,14 +143,13 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
func (ml *ModelLoader) ShutdownModel(modelName string) error {
ml.mu.Lock()
defer ml.mu.Unlock()
_, ok := ml.models[modelName]
model, ok := ml.models[modelName]
if !ok {
return fmt.Errorf("model %s not found", modelName)
}
retries := 1
for ml.models[modelName].GRPC(false, ml.wd).IsBusy() {
for model.GRPC(false, ml.wd).IsBusy() {
log.Debug().Msgf("%s busy. Waiting.", modelName)
dur := time.Duration(retries*2) * time.Second
if dur > retryTimeout {
@ -185,8 +181,8 @@ func (ml *ModelLoader) CheckIsLoaded(s string) *Model {
if !alive {
log.Warn().Msgf("GRPC Model not responding: %s", err.Error())
log.Warn().Msgf("Deleting the process in order to recreate it")
process, exists := ml.grpcProcesses[s]
if !exists {
process := m.Process()
if process == nil {
log.Error().Msgf("Process not found for '%s' and the model is not responding anymore !", s)
return m
}

View File

@ -63,7 +63,7 @@ var _ = Describe("ModelLoader", func() {
Context("LoadModel", func() {
It("should load a model and keep it in memory", func() {
mockModel = model.NewModel("foo", "test.model")
mockModel = model.NewModel("foo", "test.model", nil)
mockLoader := func(modelName, modelFile string) (*model.Model, error) {
return mockModel, nil
@ -88,7 +88,7 @@ var _ = Describe("ModelLoader", func() {
Context("ShutdownModel", func() {
It("should shutdown a loaded model", func() {
mockModel = model.NewModel("foo", "test.model")
mockModel = model.NewModel("foo", "test.model", nil)
mockLoader := func(modelName, modelFile string) (*model.Model, error) {
return mockModel, nil

View File

@ -1,20 +1,32 @@
package model
import grpc "github.com/mudler/LocalAI/pkg/grpc"
import (
"sync"
grpc "github.com/mudler/LocalAI/pkg/grpc"
process "github.com/mudler/go-processmanager"
)
type Model struct {
ID string `json:"id"`
address string
client grpc.Backend
process *process.Process
sync.Mutex
}
func NewModel(ID, address string) *Model {
func NewModel(ID, address string, process *process.Process) *Model {
return &Model{
ID: ID,
address: address,
process: process,
}
}
func (m *Model) Process() *process.Process {
return m.process
}
func (m *Model) GRPC(parallel bool, wd *WatchDog) grpc.Backend {
if m.client != nil {
return m.client
@ -25,6 +37,8 @@ func (m *Model) GRPC(parallel bool, wd *WatchDog) grpc.Backend {
enableWD = true
}
m.Lock()
defer m.Unlock()
m.client = grpc.NewClient(m.address, parallel, wd, enableWD)
return m.client
}

View File

@ -16,20 +16,22 @@ import (
)
func (ml *ModelLoader) deleteProcess(s string) error {
if _, exists := ml.grpcProcesses[s]; exists {
if err := ml.grpcProcesses[s].Stop(); err != nil {
log.Error().Err(err).Msgf("(deleteProcess) error while deleting grpc process %s", s)
if m, exists := ml.models[s]; exists {
process := m.Process()
if process != nil {
if err := process.Stop(); err != nil {
log.Error().Err(err).Msgf("(deleteProcess) error while deleting process %s", s)
}
}
}
delete(ml.grpcProcesses, s)
delete(ml.models, s)
return nil
}
func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error {
var err error = nil
for k, p := range ml.grpcProcesses {
if filter(k, p) {
for k, m := range ml.models {
if filter(k, m.Process()) {
e := ml.ShutdownModel(k)
err = errors.Join(err, e)
}
@ -44,17 +46,20 @@ func (ml *ModelLoader) StopAllGRPC() error {
func (ml *ModelLoader) GetGRPCPID(id string) (int, error) {
ml.mu.Lock()
defer ml.mu.Unlock()
p, exists := ml.grpcProcesses[id]
p, exists := ml.models[id]
if !exists {
return -1, fmt.Errorf("no grpc backend found for %s", id)
}
return strconv.Atoi(p.PID)
if p.Process() == nil {
return -1, fmt.Errorf("no grpc backend found for %s", id)
}
return strconv.Atoi(p.Process().PID)
}
func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string, args ...string) error {
func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string, args ...string) (*process.Process, error) {
// Make sure the process is executable
if err := os.Chmod(grpcProcess, 0700); err != nil {
return err
return nil, err
}
log.Debug().Msgf("Loading GRPC Process: %s", grpcProcess)
@ -63,7 +68,7 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string
workDir, err := filepath.Abs(filepath.Dir(grpcProcess))
if err != nil {
return err
return nil, err
}
grpcControlProcess := process.New(
@ -79,10 +84,8 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string
ml.wd.AddAddressModelMap(serverAddress, id)
}
ml.grpcProcesses[id] = grpcControlProcess
if err := grpcControlProcess.Run(); err != nil {
return err
return grpcControlProcess, err
}
log.Debug().Msgf("GRPC Service state dir: %s", grpcControlProcess.StateDir())
@ -116,5 +119,5 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string
}
}()
return nil
return grpcControlProcess, nil
}