package grpc import ( "context" "fmt" "io" "sync" "time" pb "github.com/mudler/LocalAI/pkg/grpc/proto" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { address string busy bool parallel bool sync.Mutex opMutex sync.Mutex wd WatchDog } type WatchDog interface { Mark(address string) UnMark(address string) } func (c *Client) IsBusy() bool { c.Lock() defer c.Unlock() return c.busy } func (c *Client) setBusy(v bool) { c.Lock() c.busy = v c.Unlock() } func (c *Client) wdMark() { if c.wd != nil { c.wd.Mark(c.address) } } func (c *Client) wdUnMark() { if c.wd != nil { c.wd.UnMark(c.address) } } func (c *Client) HealthCheck(ctx context.Context) (bool, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return false, err } defer conn.Close() client := pb.NewBackendClient(conn) // The healthcheck call shouldn't take long time ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() res, err := client.Health(ctx, &pb.HealthMessage{}) if err != nil { return false, err } if string(res.Message) == "OK" { return true, nil } return false, fmt.Errorf("health check failed: %s", res.Message) } func (c *Client) Embeddings(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.EmbeddingResult, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.Embedding(ctx, in, opts...) } func (c *Client) Predict(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.Reply, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.Predict(ctx, in, opts...) } func (c *Client) LoadModel(ctx context.Context, in *pb.ModelOptions, opts ...grpc.CallOption) (*pb.Result, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.LoadModel(ctx, in, opts...) } func (c *Client) PredictStream(ctx context.Context, in *pb.PredictOptions, f func(s []byte), opts ...grpc.CallOption) error { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return err } defer conn.Close() client := pb.NewBackendClient(conn) stream, err := client.PredictStream(ctx, in, opts...) if err != nil { return err } for { feature, err := stream.Recv() if err == io.EOF { break } if err != nil { fmt.Println("Error", err) return err } f(feature.GetMessage()) } return nil } func (c *Client) GenerateImage(ctx context.Context, in *pb.GenerateImageRequest, opts ...grpc.CallOption) (*pb.Result, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.GenerateImage(ctx, in, opts...) } func (c *Client) TTS(ctx context.Context, in *pb.TTSRequest, opts ...grpc.CallOption) (*pb.Result, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.TTS(ctx, in, opts...) } func (c *Client) SoundGeneration(ctx context.Context, in *pb.SoundGenerationRequest, opts ...grpc.CallOption) (*pb.Result, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.SoundGeneration(ctx, in, opts...) } func (c *Client) AudioTranscription(ctx context.Context, in *pb.TranscriptRequest, opts ...grpc.CallOption) (*pb.TranscriptResult, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.AudioTranscription(ctx, in, opts...) } func (c *Client) TokenizeString(ctx context.Context, in *pb.PredictOptions, opts ...grpc.CallOption) (*pb.TokenizationResponse, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) res, err := client.TokenizeString(ctx, in, opts...) if err != nil { return nil, err } return res, nil } func (c *Client) Status(ctx context.Context) (*pb.StatusResponse, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.Status(ctx, &pb.HealthMessage{}) } func (c *Client) StoresSet(ctx context.Context, in *pb.StoresSetOptions, opts ...grpc.CallOption) (*pb.Result, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.StoresSet(ctx, in, opts...) } func (c *Client) StoresDelete(ctx context.Context, in *pb.StoresDeleteOptions, opts ...grpc.CallOption) (*pb.Result, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.wdMark() defer c.wdUnMark() c.setBusy(true) defer c.setBusy(false) conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.StoresDelete(ctx, in, opts...) } func (c *Client) StoresGet(ctx context.Context, in *pb.StoresGetOptions, opts ...grpc.CallOption) (*pb.StoresGetResult, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.StoresGet(ctx, in, opts...) } func (c *Client) StoresFind(ctx context.Context, in *pb.StoresFindOptions, opts ...grpc.CallOption) (*pb.StoresFindResult, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.StoresFind(ctx, in, opts...) } func (c *Client) Rerank(ctx context.Context, in *pb.RerankRequest, opts ...grpc.CallOption) (*pb.RerankResult, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.Rerank(ctx, in, opts...) } func (c *Client) GetTokenMetrics(ctx context.Context, in *pb.MetricsRequest, opts ...grpc.CallOption) (*pb.MetricsResponse, error) { if !c.parallel { c.opMutex.Lock() defer c.opMutex.Unlock() } c.setBusy(true) defer c.setBusy(false) c.wdMark() defer c.wdUnMark() conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() client := pb.NewBackendClient(conn) return client.GetMetrics(ctx, in, opts...) }