2024-03-01 10:19:53 -05:00
package services
import (
"context"
"fmt"
"strings"
"github.com/go-skynet/LocalAI/core/config"
"github.com/go-skynet/LocalAI/core/schema"
"github.com/go-skynet/LocalAI/pkg/grpc/proto"
"github.com/go-skynet/LocalAI/pkg/model"
"github.com/rs/zerolog/log"
gopsutil "github.com/shirou/gopsutil/v3/process"
)
2024-04-17 23:33:49 +02:00
type BackendMonitor struct {
2024-03-01 10:19:53 -05:00
configLoader * config . BackendConfigLoader
modelLoader * model . ModelLoader
options * config . ApplicationConfig // Taking options in case we need to inspect ExternalGRPCBackends, though that's out of scope for now, hence the name.
}
2024-04-17 23:33:49 +02:00
func NewBackendMonitor ( configLoader * config . BackendConfigLoader , modelLoader * model . ModelLoader , appConfig * config . ApplicationConfig ) BackendMonitor {
return BackendMonitor {
2024-03-01 10:19:53 -05:00
configLoader : configLoader ,
modelLoader : modelLoader ,
options : appConfig ,
}
}
2024-04-17 23:33:49 +02:00
func ( bm BackendMonitor ) getModelLoaderIDFromModelName ( modelName string ) ( string , error ) {
config , exists := bm . configLoader . GetBackendConfig ( modelName )
2024-03-01 10:19:53 -05:00
var backendId string
if exists {
backendId = config . Model
} else {
// Last ditch effort: use it raw, see if a backend happens to match.
backendId = modelName
}
if ! strings . HasSuffix ( backendId , ".bin" ) {
backendId = fmt . Sprintf ( "%s.bin" , backendId )
}
return backendId , nil
}
2024-04-17 23:33:49 +02:00
func ( bm * BackendMonitor ) SampleLocalBackendProcess ( model string ) ( * schema . BackendMonitorResponse , error ) {
config , exists := bm . configLoader . GetBackendConfig ( model )
2024-03-01 10:19:53 -05:00
var backend string
if exists {
backend = config . Model
} else {
// Last ditch effort: use it raw, see if a backend happens to match.
backend = model
}
if ! strings . HasSuffix ( backend , ".bin" ) {
backend = fmt . Sprintf ( "%s.bin" , backend )
}
2024-04-17 23:33:49 +02:00
pid , err := bm . modelLoader . GetGRPCPID ( backend )
2024-03-01 10:19:53 -05:00
if err != nil {
2024-04-04 02:24:22 -05:00
log . Error ( ) . Err ( err ) . Str ( "model" , model ) . Msg ( "failed to find GRPC pid" )
2024-03-01 10:19:53 -05:00
return nil , err
}
// Name is slightly frightening but this does _not_ create a new process, rather it looks up an existing process by PID.
backendProcess , err := gopsutil . NewProcess ( int32 ( pid ) )
if err != nil {
2024-04-04 02:24:22 -05:00
log . Error ( ) . Err ( err ) . Str ( "model" , model ) . Int ( "pid" , pid ) . Msg ( "error getting process info" )
2024-03-01 10:19:53 -05:00
return nil , err
}
memInfo , err := backendProcess . MemoryInfo ( )
if err != nil {
2024-04-04 02:24:22 -05:00
log . Error ( ) . Err ( err ) . Str ( "model" , model ) . Int ( "pid" , pid ) . Msg ( "error getting memory info" )
2024-03-01 10:19:53 -05:00
return nil , err
}
memPercent , err := backendProcess . MemoryPercent ( )
if err != nil {
2024-04-04 02:24:22 -05:00
log . Error ( ) . Err ( err ) . Str ( "model" , model ) . Int ( "pid" , pid ) . Msg ( "error getting memory percent" )
2024-03-01 10:19:53 -05:00
return nil , err
}
cpuPercent , err := backendProcess . CPUPercent ( )
if err != nil {
2024-04-04 02:24:22 -05:00
log . Error ( ) . Err ( err ) . Str ( "model" , model ) . Int ( "pid" , pid ) . Msg ( "error getting cpu percent" )
2024-03-01 10:19:53 -05:00
return nil , err
}
return & schema . BackendMonitorResponse {
MemoryInfo : memInfo ,
MemoryPercent : memPercent ,
CPUPercent : cpuPercent ,
} , nil
}
2024-04-17 23:33:49 +02:00
func ( bm BackendMonitor ) CheckAndSample ( modelName string ) ( * proto . StatusResponse , error ) {
backendId , err := bm . getModelLoaderIDFromModelName ( modelName )
2024-03-01 10:19:53 -05:00
if err != nil {
return nil , err
}
2024-04-17 23:33:49 +02:00
modelAddr := bm . modelLoader . CheckIsLoaded ( backendId )
2024-03-01 10:19:53 -05:00
if modelAddr == "" {
return nil , fmt . Errorf ( "backend %s is not currently loaded" , backendId )
}
status , rpcErr := modelAddr . GRPC ( false , nil ) . Status ( context . TODO ( ) )
if rpcErr != nil {
log . Warn ( ) . Msgf ( "backend %s experienced an error retrieving status info: %s" , backendId , rpcErr . Error ( ) )
2024-04-17 23:33:49 +02:00
val , slbErr := bm . SampleLocalBackendProcess ( backendId )
2024-03-01 10:19:53 -05:00
if slbErr != nil {
return nil , fmt . Errorf ( "backend %s experienced an error retrieving status info via rpc: %s, then failed local node process sample: %s" , backendId , rpcErr . Error ( ) , slbErr . Error ( ) )
}
return & proto . StatusResponse {
State : proto . StatusResponse_ERROR ,
Memory : & proto . MemoryUsageData {
Total : val . MemoryInfo . VMS ,
Breakdown : map [ string ] uint64 {
"gopsutil-RSS" : val . MemoryInfo . RSS ,
} ,
} ,
} , nil
}
return status , nil
}
2024-04-17 23:33:49 +02:00
func ( bm BackendMonitor ) ShutdownModel ( modelName string ) error {
backendId , err := bm . getModelLoaderIDFromModelName ( modelName )
2024-03-01 10:19:53 -05:00
if err != nil {
return err
}
2024-04-17 23:33:49 +02:00
return bm . modelLoader . ShutdownModel ( backendId )
2024-03-01 10:19:53 -05:00
}