diff --git a/backend/cpp/llama/grpc-server.cpp b/backend/cpp/llama/grpc-server.cpp index 04c6586c..3bbf7ce0 100644 --- a/backend/cpp/llama/grpc-server.cpp +++ b/backend/cpp/llama/grpc-server.cpp @@ -158,8 +158,8 @@ static std::vector base64_decode(const std::string & encoded_string) // enum task_type { - COMPLETION_TASK, - CANCEL_TASK + TASK_TYPE_COMPLETION, + TASK_TYPE_CANCEL, }; struct task_server { @@ -458,8 +458,12 @@ struct llama_client_slot } bool has_budget(gpt_params &global_params) { + if (params.n_predict == -1 && global_params.n_predict == -1) + { + return true; // limitless + } n_remaining = -1; - if(params.n_predict != -1) + if (params.n_predict != -1) { n_remaining = params.n_predict - n_decoded; } @@ -467,7 +471,7 @@ struct llama_client_slot { n_remaining = global_params.n_predict - n_decoded; } - return n_remaining > 0 || n_remaining == -1; // no budget || limitless + return n_remaining > 0; // no budget } bool available() const { @@ -1113,7 +1117,7 @@ struct llama_server_context } // check the limits - if (slot.n_decoded > 2 && slot.has_next_token && !slot.has_budget(params)) + if (slot.n_decoded > 0 && slot.has_next_token && !slot.has_budget(params)) { slot.stopped_limit = true; slot.has_next_token = false; @@ -1177,8 +1181,9 @@ struct llama_server_context return slot.images.size() > 0; } - void send_error(task_server& task, std::string error) + void send_error(task_server& task, const std::string &error) { + LOG_TEE("task %i - error: %s\n", task.id, error.c_str()); std::unique_lock lock(mutex_results); task_result res; res.id = task.id; @@ -1276,7 +1281,7 @@ struct llama_server_context { std::vector probs_output = {}; const std::vector to_send_toks = llama_tokenize(ctx, tkn.text_to_send, false); - size_t probs_pos = std::min(slot.sent_token_probs_index, slot.generated_token_probs.size()); + size_t probs_pos = std::min(slot.sent_token_probs_index, slot.generated_token_probs.size()); size_t probs_stop_pos = std::min(slot.sent_token_probs_index + to_send_toks.size(), slot.generated_token_probs.size()); if (probs_pos < probs_stop_pos) { @@ -1336,7 +1341,7 @@ struct llama_server_context { probs = std::vector( slot.generated_token_probs.begin(), - slot.generated_token_probs.begin() + slot.sent_token_probs_index); + slot.generated_token_probs.end()); } res.result_json["completion_probabilities"] = probs_vector_to_json(ctx, probs); } @@ -1346,6 +1351,11 @@ struct llama_server_context res.result_json["oaicompat_token_ctr"] = slot.n_decoded; res.result_json["model"] = slot.oaicompat_model; } + queue_results.push_back(res); + condition_results.notify_all(); + + // done with results, unlock + lock.unlock(); // parent multitask, if any, needs to be updated if (slot.multitask_id != -1) @@ -1353,8 +1363,6 @@ struct llama_server_context update_multi_task(slot.multitask_id, slot.task_id, res); } - queue_results.push_back(res); - condition_results.notify_all(); } void send_embedding(llama_client_slot &slot) @@ -1399,11 +1407,11 @@ struct llama_server_context task.data = std::move(data); task.infill_mode = infill; task.embedding_mode = embedding; - task.type = COMPLETION_TASK; + task.type = TASK_TYPE_COMPLETION; task.multitask_id = multitask_id; // when a completion task's prompt array is not a singleton, we split it into multiple requests - if (task.data.at("prompt").size() > 1) + if (task.data.count("prompt") && task.data.at("prompt").size() > 1) { lock.unlock(); // entering new func scope return split_multiprompt_task(task); @@ -1521,7 +1529,7 @@ struct llama_server_context std::unique_lock lock(mutex_tasks); task_server task; task.id = id_gen++; - task.type = CANCEL_TASK; + task.type = TASK_TYPE_CANCEL; task.target_id = task_id; queue_tasks.push_back(task); condition_tasks.notify_one(); @@ -1551,32 +1559,41 @@ struct llama_server_context void process_tasks() { std::unique_lock lock(mutex_tasks); + std::vector deferred_tasks; while (!queue_tasks.empty()) { task_server task = queue_tasks.front(); queue_tasks.erase(queue_tasks.begin()); switch (task.type) { - case COMPLETION_TASK: { + case TASK_TYPE_COMPLETION: { llama_client_slot *slot = get_slot(json_value(task.data, "slot_id", -1)); if (slot == nullptr) { - LOG_TEE("slot unavailable\n"); - // send error result - send_error(task, "slot unavailable"); - return; + // if no slot is available, we defer this task for processing later + deferred_tasks.push_back(task); + break; } if (task.data.contains("system_prompt")) { + if (!all_slots_are_idle) { + send_error(task, "system prompt can only be updated when all slots are idle"); + break; + } process_system_prompt_data(task.data["system_prompt"]); + // reset cache_tokens for all slots + for (llama_client_slot &slot : slots) + { + slot.cache_tokens.clear(); + } } slot->reset(); - slot->infill = task.infill_mode; - slot->embedding = task.embedding_mode; - slot->task_id = task.id; + slot->infill = task.infill_mode; + slot->embedding = task.embedding_mode; + slot->task_id = task.id; slot->multitask_id = task.multitask_id; if (!launch_slot_with_data(slot, task.data)) @@ -1586,7 +1603,7 @@ struct llama_server_context break; } } break; - case CANCEL_TASK: { // release slot linked with the task id + case TASK_TYPE_CANCEL: { // release slot linked with the task id for (auto & slot : slots) { if (slot.task_id == task.target_id) @@ -1599,7 +1616,14 @@ struct llama_server_context } } + // add all the deferred tasks back the the queue + for (task_server &task : deferred_tasks) + { + queue_tasks.push_back(task); + } + // remove finished multitasks from the queue of multitasks, and add the corresponding result to the result queue + std::vector agg_results; auto queue_iterator = queue_multitasks.begin(); while (queue_iterator != queue_multitasks.end()) { @@ -1620,8 +1644,7 @@ struct llama_server_context } aggregate_result.result_json = json{ "results", result_jsons }; - std::lock_guard lock(mutex_results); - queue_results.push_back(aggregate_result); + agg_results.push_back(aggregate_result); condition_results.notify_all(); queue_iterator = queue_multitasks.erase(queue_iterator); @@ -1631,14 +1654,19 @@ struct llama_server_context ++queue_iterator; } } + // done with tasks, unlock + lock.unlock(); + + // copy aggregate results of complete multi-tasks to the results queue + std::lock_guard lock_results(mutex_results); + queue_results.insert(queue_results.end(), agg_results.begin(), agg_results.end()); } bool update_slots() { // attend tasks process_tasks(); - // update the system prompt wait until all slots are idle state - if (system_need_update && all_slots_are_idle) + if (system_need_update) { LOG_TEE("updating system prompt\n"); update_system_prompt(); @@ -1714,7 +1742,6 @@ struct llama_server_context llama_batch_add(batch, slot.sampled, system_tokens.size() + slot.n_past, { slot.id }, true); - slot.n_decoded += 1; slot.n_past += 1; } @@ -1729,7 +1756,8 @@ struct llama_server_context const bool has_prompt = slot.prompt.is_array() || (slot.prompt.is_string() && !slot.prompt.get().empty()) || !slot.images.empty(); // empty prompt passed -> release the slot and send empty response - if (slot.state == IDLE && slot.command == LOAD_PROMPT && !has_prompt) + // note: infill mode allows empty prompt + if (slot.state == IDLE && slot.command == LOAD_PROMPT && !has_prompt && !slot.infill) { slot.release(); slot.print_timings(); @@ -1832,7 +1860,7 @@ struct llama_server_context slot.cache_tokens = prompt_tokens; - if (slot.n_past == slot.num_prompt_tokens) + if (slot.n_past == slot.num_prompt_tokens && slot.n_past > 0) { // we have to evaluate at least 1 token to generate logits. LOG_TEE("slot %d : we have to evaluate at least 1 token to generate logits\n", slot.id); @@ -1932,6 +1960,7 @@ struct llama_server_context llama_sampling_accept(slot.ctx_sampling, ctx, id, true); + slot.n_decoded += 1; if (slot.n_decoded == 1) { slot.t_start_genereration = ggml_time_us(); @@ -2023,7 +2052,7 @@ json oaicompat_completion_params_parse( // // https://platform.openai.com/docs/api-reference/chat/create llama_sampling_params default_sparams; - llama_params["model"] = json_value(body, "model", std::string("uknown")); + llama_params["model"] = json_value(body, "model", std::string("unknown")); llama_params["prompt"] = format_chatml(body["messages"]); // OpenAI 'messages' to llama.cpp 'prompt' llama_params["cache_prompt"] = json_value(body, "cache_prompt", false); llama_params["temperature"] = json_value(body, "temperature", 0.0); @@ -2095,8 +2124,8 @@ static json format_final_response_oaicompat(const json &request, const task_resu {"object", streaming ? "chat.completion.chunk" : "chat.completion"}, {"usage", json{{"completion_tokens", num_tokens_predicted}, - {"prompt_tokens", num_prompt_tokens}, - {"total_tokens", num_tokens_predicted + num_prompt_tokens}}}, + {"prompt_tokens", num_prompt_tokens}, + {"total_tokens", num_tokens_predicted + num_prompt_tokens}}}, {"id", gen_chatcmplid()}}; if (server_verbose) { @@ -2439,7 +2468,7 @@ static void params_parse(const backend::ModelOptions* request, } else { params.n_parallel = 1; } - + params.cont_batching = true; // TODO: Add yarn if (!request->tensorsplit().empty()) {