LocalAI/backend/cpp/llama/utils.hpp
Ettore Di Giacinto adb24214c6
Some checks failed
Explorer deployment / build-linux (push) Has been cancelled
GPU tests / ubuntu-latest (1.21.x) (push) Has been cancelled
generate and publish intel docker caches / generate_caches (intel/oneapi-basekit:2025.1.0-0-devel-ubuntu22.04, linux/amd64, ubuntu-latest) (push) Has been cancelled
build container images / hipblas-jobs (-aio-gpu-hipblas, rocm/dev-ubuntu-22.04:6.1, hipblas, true, ubuntu:22.04, extras, latest-gpu-hipblas, latest-aio-gpu-hipblas, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -hipblas) (push) Has been cancelled
build container images / hipblas-jobs (rocm/dev-ubuntu-22.04:6.1, hipblas, true, ubuntu:22.04, core, latest-gpu-hipblas-core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -hipblas-core) (push) Has been cancelled
build container images / self-hosted-jobs (-aio-gpu-intel-f16, quay.io/go-skynet/intel-oneapi-base:latest, sycl_f16, true, ubuntu:22.04, extras, latest-gpu-intel-f16, latest-aio-gpu-intel-f16, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -sycl-f16) (push) Has been cancelled
build container images / self-hosted-jobs (-aio-gpu-intel-f32, quay.io/go-skynet/intel-oneapi-base:latest, sycl_f32, true, ubuntu:22.04, extras, latest-gpu-intel-f32, latest-aio-gpu-intel-f32, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -sycl-f32) (push) Has been cancelled
build container images / self-hosted-jobs (-aio-gpu-nvidia-cuda-11, ubuntu:22.04, cublas, 11, 7, true, extras, latest-gpu-nvidia-cuda-11, latest-aio-gpu-nvidia-cuda-11, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -cublas-cuda11) (push) Has been cancelled
build container images / self-hosted-jobs (-aio-gpu-nvidia-cuda-12, ubuntu:22.04, cublas, 12, 0, true, extras, latest-gpu-nvidia-cuda-12, latest-aio-gpu-nvidia-cuda-12, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, -cublas-cuda12) (push) Has been cancelled
build container images / self-hosted-jobs (quay.io/go-skynet/intel-oneapi-base:latest, sycl_f16, true, ubuntu:22.04, core, latest-gpu-intel-f16-core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -sycl-f16-core) (push) Has been cancelled
build container images / self-hosted-jobs (quay.io/go-skynet/intel-oneapi-base:latest, sycl_f32, true, ubuntu:22.04, core, latest-gpu-intel-f32-core, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, false, -sycl-f32-core) (push) Has been cancelled
build container images / self-hosted-jobs (ubuntu:22.04, , true, extras, --jobs=3 --output-sync=target, linux/amd64, arc-runner-set, auto, ) (push) Has been cancelled
build container images / core-image-build (-aio-cpu, ubuntu:22.04, , true, core, latest-cpu, latest-aio-cpu, --jobs=4 --output-sync=target, linux/amd64,linux/arm64, arc-runner-set, false, auto, -core) (push) Has been cancelled
build container images / core-image-build (ubuntu:22.04, cublas, 11, 7, true, core, latest-gpu-nvidia-cuda-12-core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -cublas-cuda11-core) (push) Has been cancelled
build container images / core-image-build (ubuntu:22.04, cublas, 12, 0, true, core, latest-gpu-nvidia-cuda-12-core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -cublas-cuda12-core) (push) Has been cancelled
build container images / core-image-build (ubuntu:22.04, vulkan, true, core, latest-gpu-vulkan-core, --jobs=4 --output-sync=target, linux/amd64, arc-runner-set, false, false, -vulkan-core) (push) Has been cancelled
build container images / gh-runner (nvcr.io/nvidia/l4t-jetpack:r36.4.0, cublas, 12, 0, true, core, latest-nvidia-l4t-arm64-core, --jobs=4 --output-sync=target, linux/arm64, ubuntu-24.04-arm, true, false, -nvidia-l4t-arm64-core) (push) Has been cancelled
Security Scan / tests (push) Has been cancelled
Tests extras backends / tests-transformers (push) Has been cancelled
Tests extras backends / tests-rerankers (push) Has been cancelled
Tests extras backends / tests-diffusers (push) Has been cancelled
Tests extras backends / tests-coqui (push) Has been cancelled
tests / tests-linux (1.21.x) (push) Has been cancelled
tests / tests-aio-container (push) Has been cancelled
tests / tests-apple (1.21.x) (push) Has been cancelled
Update swagger / swagger (push) Has been cancelled
Check if checksums are up-to-date / checksum_check (push) Has been cancelled
Bump dependencies / bump (mudler/LocalAI) (push) Has been cancelled
Bump dependencies / bump (main, PABannier/bark.cpp, BARKCPP_VERSION) (push) Has been cancelled
Bump dependencies / bump (master, ggerganov/whisper.cpp, WHISPER_CPP_VERSION) (push) Has been cancelled
Bump dependencies / bump (master, ggml-org/llama.cpp, CPPLLAMA_VERSION) (push) Has been cancelled
Bump dependencies / bump (master, leejet/stable-diffusion.cpp, STABLEDIFFUSION_GGML_VERSION) (push) Has been cancelled
Bump dependencies / bump (master, mudler/go-piper, PIPER_VERSION) (push) Has been cancelled
Bump dependencies / bump (master, mudler/go-stable-diffusion, STABLEDIFFUSION_VERSION) (push) Has been cancelled
generate and publish GRPC docker caches / generate_caches (ubuntu:22.04, linux/amd64,linux/arm64, arc-runner-set) (push) Has been cancelled
chore(deps): bump llama.cpp to b34c859146630dff136943abc9852ca173a7c9d6 (#5323)
chore(deps): bump llama.cpp to 'b34c859146630dff136943abc9852ca173a7c9d6'

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2025-05-06 11:21:25 +02:00

483 lines
14 KiB
C++
Vendored

// https://github.com/ggerganov/llama.cpp/blob/master/tools/server/utils.hpp
#pragma once
#include <string>
#include <vector>
#include <set>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include "json.hpp"
#include "../mtmd/clip.h"
using json = nlohmann::json;
extern bool server_verbose;
#ifndef SERVER_VERBOSE
#define SERVER_VERBOSE 1
#endif
#if SERVER_VERBOSE != 1
#define LOG_VERBOSE(MSG, ...)
#else
#define LOG_VERBOSE(MSG, ...) \
do \
{ \
if (server_verbose) \
{ \
server_log("VERBOSE", __func__, __LINE__, MSG, __VA_ARGS__); \
} \
} while (0)
#endif
#define LOG_ERROR( MSG, ...) server_log("ERROR", __func__, __LINE__, MSG, __VA_ARGS__)
#define LOG_WARNING(MSG, ...) server_log("WARNING", __func__, __LINE__, MSG, __VA_ARGS__)
#define LOG_INFO( MSG, ...) server_log("INFO", __func__, __LINE__, MSG, __VA_ARGS__)
//
// parallel
//
enum server_state {
SERVER_STATE_LOADING_MODEL, // Server is starting up, model not fully loaded yet
SERVER_STATE_READY, // Server is ready and model is loaded
SERVER_STATE_ERROR // An error occurred, load_model failed
};
enum task_type {
TASK_TYPE_COMPLETION,
TASK_TYPE_CANCEL,
TASK_TYPE_NEXT_RESPONSE
};
struct task_server {
int id = -1; // to be filled by llama_server_queue
int target_id;
task_type type;
json data;
bool infill_mode = false;
bool embedding_mode = false;
int multitask_id = -1;
};
struct task_result {
int id;
int multitask_id = -1;
bool stop;
bool error;
json result_json;
};
struct task_multi {
int id;
std::set<int> subtasks_remaining{};
std::vector<task_result> results{};
};
// TODO: can become bool if we can't find use of more states
enum slot_state
{
IDLE,
PROCESSING,
};
enum slot_command
{
NONE,
LOAD_PROMPT,
RELEASE,
};
struct slot_params
{
bool stream = true;
bool cache_prompt = false; // remember the prompt to avoid reprocessing all prompt
uint32_t seed = -1; // RNG seed
int32_t n_keep = 0; // number of tokens to keep from initial prompt
int32_t n_predict = -1; // new tokens to predict
std::vector<std::string> antiprompt;
json input_prefix;
json input_suffix;
};
struct slot_image
{
int32_t id;
bool request_encode_image = false;
float * image_embedding = nullptr;
int32_t image_tokens = 0;
clip_image_u8 * img_data;
std::string prefix_prompt; // before of this image
};
// completion token output with probabilities
struct completion_token_output
{
struct token_prob
{
llama_token tok;
float prob;
};
std::vector<token_prob> probs;
llama_token tok;
std::string text_to_send;
};
static inline void server_log(const char *level, const char *function, int line,
const char *message, const nlohmann::ordered_json &extra)
{
nlohmann::ordered_json log
{
{"timestamp", time(nullptr)},
{"level", level},
{"function", function},
{"line", line},
{"message", message},
};
if (!extra.empty())
{
log.merge_patch(extra);
}
const std::string str = log.dump(-1, ' ', false, json::error_handler_t::replace);
printf("%.*s\n", (int)str.size(), str.data());
fflush(stdout);
}
//
// server utils
//
template <typename T>
static T json_value(const json &body, const std::string &key, const T &default_value)
{
// Fallback null to default value
return body.contains(key) && !body.at(key).is_null()
? body.value(key, default_value)
: default_value;
}
inline std::string format_chatml(std::vector<json> messages)
{
std::ostringstream chatml_msgs;
for (auto it = messages.begin(); it != messages.end(); ++it) {
chatml_msgs << "<|im_start|>"
<< json_value(*it, "role", std::string("user")) << '\n';
chatml_msgs << json_value(*it, "content", std::string(""))
<< "<|im_end|>\n";
}
chatml_msgs << "<|im_start|>assistant" << '\n';
return chatml_msgs.str();
}
//
// work queue utils
//
struct llama_server_queue {
int id = 0;
std::mutex mutex_tasks;
// queues
std::vector<task_server> queue_tasks;
std::vector<task_server> queue_tasks_deferred;
std::vector<task_multi> queue_multitasks;
std::condition_variable condition_tasks;
// callback functions
std::function<void(task_server&)> callback_new_task;
std::function<void(task_multi&)> callback_finish_multitask;
std::function<void(void)> callback_all_task_finished;
// Add a new task to the end of the queue
int post(task_server task) {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (task.id == -1) {
task.id = id++;
}
queue_tasks.push_back(std::move(task));
condition_tasks.notify_one();
return task.id;
}
// Add a new task, but defer until one slot is available
void defer(task_server task) {
std::unique_lock<std::mutex> lock(mutex_tasks);
queue_tasks_deferred.push_back(std::move(task));
}
// Get the next id for creating anew task
int get_new_id() {
std::unique_lock<std::mutex> lock(mutex_tasks);
return id++;
}
// Register function to process a new task
void on_new_task(std::function<void(task_server&)> callback) {
callback_new_task = callback;
}
// Register function to process a multitask
void on_finish_multitask(std::function<void(task_multi&)> callback) {
callback_finish_multitask = callback;
}
// Register the function to be called when the batch of tasks is finished
void on_all_tasks_finished(std::function<void(void)> callback) {
callback_all_task_finished = callback;
}
// Call when the state of one slot is changed
void notify_slot_changed() {
// move deferred tasks back to main loop
std::unique_lock<std::mutex> lock(mutex_tasks);
for (auto & task : queue_tasks_deferred) {
queue_tasks.push_back(std::move(task));
}
queue_tasks_deferred.clear();
}
// Start the main loop. This call is blocking
[[noreturn]]
void start_loop() {
while (true) {
// new task arrived
LOG_VERBOSE("have new task", {});
{
while (true)
{
std::unique_lock<std::mutex> lock(mutex_tasks);
if (queue_tasks.empty()) {
lock.unlock();
break;
}
task_server task = queue_tasks.front();
queue_tasks.erase(queue_tasks.begin());
lock.unlock();
LOG_VERBOSE("callback_new_task", {});
callback_new_task(task);
}
LOG_VERBOSE("callback_all_task_finished", {});
// process and update all the multitasks
auto queue_iterator = queue_multitasks.begin();
while (queue_iterator != queue_multitasks.end())
{
if (queue_iterator->subtasks_remaining.empty())
{
// all subtasks done == multitask is done
task_multi current_multitask = *queue_iterator;
callback_finish_multitask(current_multitask);
// remove this multitask
queue_iterator = queue_multitasks.erase(queue_iterator);
}
else
{
++queue_iterator;
}
}
// all tasks in the current loop is finished
callback_all_task_finished();
}
LOG_VERBOSE("wait for new task", {});
// wait for new task
{
std::unique_lock<std::mutex> lock(mutex_tasks);
if (queue_tasks.empty()) {
condition_tasks.wait(lock, [&]{
return !queue_tasks.empty();
});
}
}
}
}
//
// functions to manage multitasks
//
// add a multitask by specifying the id of all subtask (subtask is a task_server)
void add_multitask(int multitask_id, std::vector<int>& sub_ids)
{
std::lock_guard<std::mutex> lock(mutex_tasks);
task_multi multi;
multi.id = multitask_id;
std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end()));
queue_multitasks.push_back(multi);
}
// updatethe remaining subtasks, while appending results to multitask
void update_multitask(int multitask_id, int subtask_id, task_result& result)
{
std::lock_guard<std::mutex> lock(mutex_tasks);
for (auto& multitask : queue_multitasks)
{
if (multitask.id == multitask_id)
{
multitask.subtasks_remaining.erase(subtask_id);
multitask.results.push_back(result);
}
}
}
};
struct llama_server_response {
typedef std::function<void(int, int, task_result&)> callback_multitask_t;
callback_multitask_t callback_update_multitask;
// for keeping track of all tasks waiting for the result
std::set<int> waiting_task_ids;
// the main result queue
std::vector<task_result> queue_results;
std::mutex mutex_results;
std::condition_variable condition_results;
void add_waiting_task_id(int task_id) {
std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.insert(task_id);
}
void remove_waiting_task_id(int task_id) {
std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.erase(task_id);
}
// This function blocks the thread until there is a response for this task_id
task_result recv(int task_id) {
while (true)
{
std::unique_lock<std::mutex> lock(mutex_results);
condition_results.wait(lock, [&]{
return !queue_results.empty();
});
LOG_VERBOSE("condition_results unblock", {});
for (int i = 0; i < (int) queue_results.size(); i++)
{
if (queue_results[i].id == task_id)
{
assert(queue_results[i].multitask_id == -1);
task_result res = queue_results[i];
queue_results.erase(queue_results.begin() + i);
return res;
}
}
}
// should never reach here
}
// Register the function to update multitask
void on_multitask_update(callback_multitask_t callback) {
callback_update_multitask = callback;
}
// Send a new result to a waiting task_id
void send(task_result result) {
std::unique_lock<std::mutex> lock(mutex_results);
LOG_VERBOSE("send new result", {});
for (auto& task_id : waiting_task_ids) {
// LOG_TEE("waiting task id %i \n", task_id);
// for now, tasks that have associated parent multitasks just get erased once multitask picks up the result
if (result.multitask_id == task_id)
{
LOG_VERBOSE("callback_update_multitask", {});
callback_update_multitask(task_id, result.id, result);
continue;
}
if (result.id == task_id)
{
LOG_VERBOSE("queue_results.push_back", {});
queue_results.push_back(result);
condition_results.notify_one();
return;
}
}
}
};
//
// base64 utils (TODO: move to common in the future)
//
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
static inline bool is_base64(uint8_t c)
{
return (isalnum(c) || (c == '+') || (c == '/'));
}
static inline std::vector<uint8_t> base64_decode(const std::string & encoded_string)
{
int i = 0;
int j = 0;
int in_ = 0;
int in_len = encoded_string.size();
uint8_t char_array_4[4];
uint8_t char_array_3[3];
std::vector<uint8_t> ret;
while (in_len-- && (encoded_string[in_] != '=') && is_base64(encoded_string[in_]))
{
char_array_4[i++] = encoded_string[in_]; in_++;
if (i == 4)
{
for (i = 0; i <4; i++)
{
char_array_4[i] = base64_chars.find(char_array_4[i]);
}
char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for (i = 0; (i < 3); i++)
{
ret.push_back(char_array_3[i]);
}
i = 0;
}
}
if (i)
{
for (j = i; j <4; j++)
{
char_array_4[j] = 0;
}
for (j = 0; j <4; j++)
{
char_array_4[j] = base64_chars.find(char_array_4[j]);
}
char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for (j = 0; (j < i - 1); j++)
{
ret.push_back(char_array_3[j]);
}
}
return ret;
}