added generate with images as well as new function call livbrary

This commit is contained in:
Saifeddine ALOUI 2024-05-18 03:14:05 +02:00
parent 379f8116bb
commit 42ebabfe0d
4 changed files with 452 additions and 9 deletions

70
lollms/functions.py Normal file
View File

@ -0,0 +1,70 @@
from functools import partial
from lollms.tasks import TasksLibrary
from typing import Dict, Any, List
class FunctionCalling_Library:
def __init__(self, tasks_library:TasksLibrary):
self.tl = tasks_library
self.function_definitions = []
def register_function(self, function_name, function_callable, function_description, function_parameters):
self.function_definitions.append({
"function_name": function_name,
"function": function_callable,
"function_description": function_description,
"function_parameters": function_parameters
})
def unregister_function(self, function_name):
self.function_definitions = [func for func in self.function_definitions if func["function_name"] != function_name]
def execute_function_calls(self, function_calls: List[Dict[str, Any]]) -> List[Any]:
"""
Executes the function calls with the parameters extracted from the generated text,
using the original functions list to find the right function to execute.
Args:
function_calls (List[Dict[str, Any]]): A list of dictionaries representing the function calls.
function_definitions (List[Dict[str, Any]]): The original list of functions with their descriptions and callable objects.
Returns:
List[Any]: A list of results from executing the function calls.
"""
results = []
# Convert function_definitions to a dict for easier lookup
functions_dict = {func['function_name']: func['function'] for func in self.function_definitions}
for call in function_calls:
function_name = call.get("function_name")
parameters = call.get("function_parameters", [])
function = functions_dict.get(function_name)
if function:
try:
# Assuming parameters is a dictionary that maps directly to the function's arguments.
if type(parameters)==list:
result = function(*parameters)
elif type(parameters)==dict:
result = function(**parameters)
results.append(result)
except TypeError as e:
# Handle cases where the function call fails due to incorrect parameters, etc.
results.append(f"Error calling {function_name}: {e}")
else:
results.append(f"Function {function_name} not found.")
return results
def generate_with_functions(self, prompt):
# Assuming generate_with_function_calls is a method from TasksLibrary
ai_response, function_calls = self.tl.generate_with_function_calls(prompt, self.function_definitions)
return ai_response, function_calls
def generate_with_functions_with_images(self, prompt, image_files):
# Assuming generate_with_function_calls_and_images is a method from TasksLibrary
if len(image_files) > 0:
ai_response, function_calls = self.tl.generate_with_function_calls_and_images(prompt, image_files, self.function_definitions)
else:
ai_response, function_calls = self.tl.generate_with_function_calls(prompt, self.function_definitions)
return ai_response, function_calls

View File

@ -3384,7 +3384,7 @@ The AI should respond in this format using data from actions_list:
# Combine the function descriptions with the original prompt.
function_info = ' '.join(function_descriptions)
upgraded_prompt = f"{function_info} {prompt}"
upgraded_prompt = f"{function_info}\n{prompt}"
return upgraded_prompt

View File

@ -9,6 +9,7 @@ description:
"""
from fastapi import APIRouter, Request, Body, Response
from fastapi.responses import PlainTextResponse
from lollms.server.elf_server import LOLLMSElfServer
from pydantic import BaseModel
from starlette.responses import StreamingResponse
@ -23,7 +24,7 @@ import random
import string
import json
from enum import Enum
import asyncio
import base64
from datetime import datetime
@ -192,7 +193,7 @@ async def lollms_generate(request: LollmsGenerateRequest):
temperature=request.temperature or elf_server.config.temperature
)
completion_tokens = len(elf_server.binding.tokenize(reception_manager.reception_buffer))
return reception_manager.reception_buffer
return PlainTextResponse(reception_manager.reception_buffer)
else:
return None
except Exception as ex:
@ -200,7 +201,160 @@ async def lollms_generate(request: LollmsGenerateRequest):
elf_server.error(ex)
return {"status":False,"error":str(ex)}
class LollmsGenerateRequest(BaseModel):
prompt: str
images: List[str]
model_name: Optional[str] = None
personality: Optional[int] = -1
n_predict: Optional[int] = 1024
stream: bool = False
temperature: float = 0.1
top_k: Optional[int] = 50
top_p: Optional[float] = 0.95
repeat_penalty: Optional[float] = 0.8
repeat_last_n: Optional[int] = 40
seed: Optional[int] = None
n_threads: Optional[int] = 8
@router.post("/lollms_generate_with_images")
async def lollms_generate_with_images(request: LollmsGenerateRequest):
""" Endpoint for generating text from prompts using the LoLLMs fastAPI server.
Args:
Data model for the Generate with images Request.
Attributes:
- prompt: str : representing the input text prompt for text generation.
- images: str : a list of 64 bits encoded images
- model_name: Optional[str] = None : The name of the model to be used (it should be one of the current models)
- personality : Optional[int] = None : The name of the mounted personality to be used (if a personality is None, the endpoint will just return a completion text). To get the list of mounted personalities, just use /list_mounted_personalities
- n_predict: int representing the number of predictions to generate.
- stream: bool indicating whether to stream the generated text or not.
- temperature: float representing the temperature parameter for text generation.
- top_k: int representing the top_k parameter for text generation.
- top_p: float representing the top_p parameter for text generation.
- repeat_penalty: float representing the repeat_penalty parameter for text generation.
- repeat_last_n: int representing the repeat_last_n parameter for text generation.
- seed: int representing the seed for text generation.
- n_threads: int representing the number of threads for text generation.
Returns:
- If the elf_server binding is not None:
- If stream is True, returns a StreamingResponse of generated text chunks.
- If stream is False, returns the generated text as a string.
- If the elf_server binding is None, returns None.
"""
try:
headers = { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive',}
reception_manager=RECEPTION_MANAGER()
prompt = request.prompt
encoded_images = request.images
n_predict = request.n_predict if request.n_predict>0 else 1024
stream = request.stream
prompt_tokens = len(elf_server.binding.tokenize(prompt))
if elf_server.binding is not None:
image_files = []
images_path = elf_server.lollms_paths.personal_outputs_path / "tmp_images"
images_path.mkdir(parents=True, exist_ok=True)
for i, encoded_image in enumerate(encoded_images):
image_path = images_path/ f'image_{i}.png'
with open(image_path, 'wb') as image_file:
image_file.write(base64.b64decode(encoded_image))
image_files.append(image_path)
if stream:
new_output={"new_values":[]}
async def generate_chunks():
lk = threading.Lock()
def callback(chunk, chunk_type:MSG_TYPE=MSG_TYPE.MSG_TYPE_CHUNK):
if elf_server.cancel_gen:
return False
if chunk is None:
return
rx = reception_manager.new_chunk(chunk)
if rx.status!=ROLE_CHANGE_DECISION.MOVE_ON:
if rx.status==ROLE_CHANGE_DECISION.PROGRESSING:
return True
elif rx.status==ROLE_CHANGE_DECISION.ROLE_CHANGED:
return False
else:
chunk = chunk + rx.value
# Yield each chunk of data
lk.acquire()
try:
new_output["new_values"].append(reception_manager.chunk)
lk.release()
return True
except Exception as ex:
trace_exception(ex)
lk.release()
return False
def chunks_builder():
if request.model_name in elf_server.binding.list_models() and elf_server.binding.model_name!=request.model_name:
elf_server.binding.build_model(request.model_name)
elf_server.binding.generate_with_images(
prompt,
image_files,
n_predict,
callback=callback,
temperature=request.temperature or elf_server.config.temperature
)
reception_manager.done = True
thread = threading.Thread(target=chunks_builder)
thread.start()
current_index = 0
while (not reception_manager.done and elf_server.cancel_gen == False):
while (not reception_manager.done and len(new_output["new_values"])==0):
time.sleep(0.001)
lk.acquire()
for i in range(len(new_output["new_values"])):
current_index += 1
yield (new_output["new_values"][i])
new_output["new_values"]=[]
lk.release()
elf_server.cancel_gen = False
return StreamingResponse(generate_chunks(), media_type="text/plain", headers=headers)
else:
def callback(chunk, chunk_type:MSG_TYPE=MSG_TYPE.MSG_TYPE_CHUNK):
# Yield each chunk of data
if chunk is None:
return True
rx = reception_manager.new_chunk(chunk)
if rx.status!=ROLE_CHANGE_DECISION.MOVE_ON:
if rx.status==ROLE_CHANGE_DECISION.PROGRESSING:
return True
elif rx.status==ROLE_CHANGE_DECISION.ROLE_CHANGED:
return False
else:
chunk = chunk + rx.value
return True
elf_server.binding.generate_with_images(
prompt,
image_files,
n_predict,
callback=callback,
temperature=request.temperature or elf_server.config.temperature
)
completion_tokens = len(elf_server.binding.tokenize(reception_manager.reception_buffer))
return PlainTextResponse(reception_manager.reception_buffer)
else:
return None
except Exception as ex:
trace_exception(ex)
elf_server.error(ex)
return {"status":False,"error":str(ex)}
# ----------------------- Open AI ----------------------------------------
class Message(BaseModel):
role: str

View File

@ -1,14 +1,14 @@
import sys
from typing import Callable, List
from typing import Callable, List, Dict, Any, Optional
from functools import partial
from datetime import datetime
from ascii_colors import ASCIIColors
from lollms.types import MSG_TYPE, SUMMARY_MODE
from lollms.com import LoLLMsCom
from lollms.utilities import PromptReshaper, remove_text_from_string
from lollms.utilities import PromptReshaper, remove_text_from_string, process_ai_output
from safe_store import DocumentDecomposer
import json
class TasksLibrary:
def __init__(self, lollms:LoLLMsCom, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None) -> None:
self.lollms = lollms
@ -92,7 +92,27 @@ class TasksLibrary:
repeat_last_n= repeat_last_n if repeat_last_n is not None else self.lollms.config.repeat_last_n if self.lollms.config.override_personality_model_parameters else self.lollms.personality.model_repeat_last_n,
).strip()
return self.bot_says
def generate_with_images(self, prompt, images, max_size, temperature = None, top_k = None, top_p=None, repeat_penalty=None, repeat_last_n=None, callback=None, debug=False, show_progress=False ):
ASCIIColors.info("Text generation started: Warming up")
self.nb_received_tokens = 0
self.bot_says = ""
if debug:
self.print_prompt("gen",prompt)
self.lollms.model.generate_with_images(
prompt,
images,
max_size,
partial(self.process, callback=callback, show_progress=show_progress),
temperature=self.lollms.config.model_temperature if temperature is None else temperature,
top_k=self.lollms.config.model_top_k if top_k is None else top_k,
top_p=self.lollms.config.model_top_p if top_p is None else top_p,
repeat_penalty=self.lollms.config.model_repeat_penalty if repeat_penalty is None else repeat_penalty,
repeat_last_n = self.lollms.config.model_repeat_last_n if repeat_last_n is None else repeat_last_n
).strip()
return self.bot_says
def fast_gen(
self,
prompt: str,
@ -145,7 +165,62 @@ class TasksLibrary:
return gen
def fast_gen_with_images(self, prompt: str, images:list, max_generation_size: int=None, placeholders: dict = {}, sacrifice: list = ["previous_discussion"], debug: bool = False, callback=None, show_progress=False) -> str:
"""
Fast way to generate text from text and images
This method takes in a prompt, maximum generation size, optional placeholders, sacrifice list, and debug flag.
It reshapes the context before performing text generation by adjusting and cropping the number of tokens.
Parameters:
- prompt (str): The input prompt for text generation.
- max_generation_size (int): The maximum number of tokens to generate.
- placeholders (dict, optional): A dictionary of placeholders to be replaced in the prompt. Defaults to an empty dictionary.
- sacrifice (list, optional): A list of placeholders to sacrifice if the window is bigger than the context size minus the number of tokens to generate. Defaults to ["previous_discussion"].
- debug (bool, optional): Flag to enable/disable debug mode. Defaults to False.
Returns:
- str: The generated text after removing special tokens ("<s>" and "</s>") and stripping any leading/trailing whitespace.
"""
prompt = "\n".join([
"!@>system: I am an AI assistant that can converse and analyze images. When asked to locate something in an image you send, I will reply with:",
"boundingbox(image_index, label, left, top, width, height)",
"Where:",
"image_index: 0-based index of the image",
"label: brief description of what is located",
"left, top: x,y coordinates of top-left box corner (0-1 scale)",
"width, height: box dimensions as fraction of image size",
"Coordinates have origin (0,0) at top-left, (1,1) at bottom-right.",
"For other queries, I will respond conversationally to the best of my abilities.",
prompt
])
if debug == False:
debug = self.lollms.config.debug
if max_generation_size is None:
prompt_size = self.lollms.model.tokenize(prompt)
max_generation_size = self.lollms.model.config.ctx_size - len(prompt_size)
pr = PromptReshaper(prompt)
prompt = pr.build(placeholders,
self.lollms.model.tokenize,
self.lollms.model.detokenize,
self.lollms.model.config.ctx_size - max_generation_size,
sacrifice
)
ntk = len(self.lollms.model.tokenize(prompt))
max_generation_size = min(self.lollms.model.config.ctx_size - ntk, max_generation_size)
# TODO : add show progress
gen = self.generate_with_images(prompt, images, max_generation_size, callback=callback, show_progress=show_progress).strip().replace("</s>", "").replace("<s>", "")
try:
gen = process_ai_output(gen, images, "/discussions/")
except Exception as ex:
pass
if debug:
self.print_prompt("prompt", prompt+gen)
return gen
# Communications with the user
def step_start(self, step_text, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
@ -653,3 +728,147 @@ class TasksLibrary:
self.step_end(f" Summary of {doc_name} - Processing chunk : {i+1}/{len(chunks)}")
return "\n".join(summeries)
#======================= Function calls
def _upgrade_prompt_with_function_info(self, prompt: str, functions: List[Dict[str, Any]]) -> str:
"""
Upgrades the prompt with information about function calls.
Args:
prompt (str): The original prompt.
functions (List[Dict[str, Any]]): A list of dictionaries describing functions that can be called.
Returns:
str: The upgraded prompt that includes information about the function calls.
"""
function_descriptions = ["!@>information: If you need to call a function to fulfull the user request, use a function markdown tag with the function call as the following json format:",
"```function",
"{",
'"function_name":the name of the function to be called,',
'"function_parameters": a list of parameter values',
"}",
"```",
"You can call multiple functions in one generation.",
"Each function call needs to be in a separate function markdown tag.",
"Do not add status of the execution as it will be added automatically by the system.",
"If you want to get the output of the function before answering the user, then use the keyword @<NEXT>@ at the end of your message.",
"!@>List of possible functions to be called:\n"]
for function in functions:
description = f"{function['function_name']}: {function['function_description']}\nparameters:{function['function_parameters']}"
function_descriptions.append(description)
# Combine the function descriptions with the original prompt.
function_info = ' '.join(function_descriptions)
upgraded_prompt = f"{function_info}\n{prompt}"
return upgraded_prompt
def extract_function_calls_as_json(self, text: str) -> List[Dict[str, Any]]:
"""
Extracts function calls formatted as JSON inside markdown code blocks.
Args:
text (str): The generated text containing JSON markdown entries for function calls.
Returns:
List[Dict[str, Any]]: A list of dictionaries representing the function calls.
"""
# Extract markdown code blocks that contain JSON.
code_blocks = self.extract_code_blocks(text)
# Filter out and parse JSON entries.
function_calls = []
for block in code_blocks:
if block["type"]=="function":
content = block.get("content", "")
try:
# Attempt to parse the JSON content of the code block.
function_call = json.loads(content)
if type(function_call)==dict:
function_calls.append(function_call)
elif type(function_call)==list:
function_calls+=function_call
except json.JSONDecodeError:
# If the content is not valid JSON, skip it.
continue
return function_calls
def execute_function_calls(self, function_calls: List[Dict[str, Any]], function_definitions: List[Dict[str, Any]]) -> List[Any]:
"""
Executes the function calls with the parameters extracted from the generated text,
using the original functions list to find the right function to execute.
Args:
function_calls (List[Dict[str, Any]]): A list of dictionaries representing the function calls.
function_definitions (List[Dict[str, Any]]): The original list of functions with their descriptions and callable objects.
Returns:
List[Any]: A list of results from executing the function calls.
"""
results = []
# Convert function_definitions to a dict for easier lookup
functions_dict = {func['function_name']: func['function'] for func in function_definitions}
for call in function_calls:
function_name = call.get("function_name")
parameters = call.get("function_parameters", [])
function = functions_dict.get(function_name)
if function:
try:
# Assuming parameters is a dictionary that maps directly to the function's arguments.
if type(parameters)==list:
result = function(*parameters)
elif type(parameters)==dict:
result = function(**parameters)
results.append(result)
except TypeError as e:
# Handle cases where the function call fails due to incorrect parameters, etc.
results.append(f"Error calling {function_name}: {e}")
else:
results.append(f"Function {function_name} not found.")
return results
def generate_with_function_calls(self, prompt: str, functions: List[Dict[str, Any]], max_answer_length: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Performs text generation with function calls.
Args:
prompt (str): The full prompt (including conditioning, user discussion, extra data, and the user prompt).
functions (List[Dict[str, Any]]): A list of dictionaries describing functions that can be called.
max_answer_length (int, optional): Maximum string length allowed for the generated text.
Returns:
List[Dict[str, Any]]: A list of dictionaries with the function names and parameters to execute.
"""
# Upgrade the prompt with information about the function calls.
upgraded_prompt = self._upgrade_prompt_with_function_info(prompt, functions)
# Generate the initial text based on the upgraded prompt.
generated_text = self.fast_gen(upgraded_prompt, max_answer_length)
# Extract the function calls from the generated text.
function_calls = self.extract_function_calls_as_json(generated_text)
return generated_text, function_calls
def generate_with_function_calls_and_images(self, prompt: str, images:list, functions: List[Dict[str, Any]], max_answer_length: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Performs text generation with function calls.
Args:
prompt (str): The full prompt (including conditioning, user discussion, extra data, and the user prompt).
functions (List[Dict[str, Any]]): A list of dictionaries describing functions that can be called.
max_answer_length (int, optional): Maximum string length allowed for the generated text.
Returns:
List[Dict[str, Any]]: A list of dictionaries with the function names and parameters to execute.
"""
# Upgrade the prompt with information about the function calls.
upgraded_prompt = self._upgrade_prompt_with_function_info(prompt, functions)
# Generate the initial text based on the upgraded prompt.
generated_text = self.fast_gen_with_images(upgraded_prompt, images, max_answer_length)
# Extract the function calls from the generated text.
function_calls = self.extract_function_calls_as_json(generated_text)
return generated_text, function_calls