This commit is contained in:
Saifeddine ALOUI 2023-11-30 22:49:33 +01:00
parent 96e5cc297c
commit dde31ab4d4

View File

@ -16,7 +16,6 @@ from lollms.utilities import PromptReshaper, PackageManager
import pkg_resources
from pathlib import Path
from PIL import Image
from typing import Optional, List
import re
from datetime import datetime
import importlib
@ -26,14 +25,20 @@ import yaml
from ascii_colors import ASCIIColors
import time
from lollms.types import MSG_TYPE
from typing import Callable
import json
from typing import Any, List, Optional, Type, Callable, Dict, Any
import json
from safe_store import TextVectorizer, GenericDataLoader, VisualizationMethod, VectorizationMethod
from functools import partial
from typing import Dict, Any
import sys
from lollms.helpers import trace_exception
from lollms.utilities import PackageManager
if not PackageManager.check_package_installed("jsonschema"):
PackageManager.install_package("jsonschema")
from jsonschema import Draft7Validator
else:
from jsonschema import Draft7Validator
def is_package_installed(package_name):
try:
dist = pkg_resources.get_distribution(package_name)
@ -55,6 +60,24 @@ def install_package(package_name):
print(f"{package_name} has been successfully installed.")
def fix_json(json_text):
try:
# Try to load the JSON string
json_obj = json.loads(json_text)
return json_obj
except json.JSONDecodeError as e:
# If there's a syntax error, try to fix it using jsonschema
validator = Draft7Validator()
errors = list(validator.iter_errors(json_text))
for error in errors:
# Check if the error is due to a missing closing bracket
if error.validator == 'additionalProperties' and error.validator_value == False:
# Find the position of the error in the JSON string
position = error.absolute_path.pop()
# Add the missing closing bracket at the position
repaired_json_text = json_text[:position] + '}' + json_text[position:]
return fix_json(repaired_json_text)
class AIPersonality:
# Extra
@ -289,6 +312,47 @@ Date: {{date}}
ASCIIColors.red(" *-*-*-*-*-*-*-*")
def fast_gen_with_images(self, prompt: str, images:list, max_generation_size: int=None, placeholders: dict = {}, sacrifice: list = ["previous_discussion"], debug: bool = False, callback=None, show_progress=False) -> str:
"""
Fast way to generate text from text and images
This method takes in a prompt, maximum generation size, optional placeholders, sacrifice list, and debug flag.
It reshapes the context before performing text generation by adjusting and cropping the number of tokens.
Parameters:
- prompt (str): The input prompt for text generation.
- max_generation_size (int): The maximum number of tokens to generate.
- placeholders (dict, optional): A dictionary of placeholders to be replaced in the prompt. Defaults to an empty dictionary.
- sacrifice (list, optional): A list of placeholders to sacrifice if the window is bigger than the context size minus the number of tokens to generate. Defaults to ["previous_discussion"].
- debug (bool, optional): Flag to enable/disable debug mode. Defaults to False.
Returns:
- str: The generated text after removing special tokens ("<s>" and "</s>") and stripping any leading/trailing whitespace.
"""
if debug == False:
debug = self.config.debug
if max_generation_size is None:
prompt_size = self.model.tokenize(prompt)
max_generation_size = self.model.config.ctx_size - len(prompt_size)
pr = PromptReshaper(prompt)
prompt = pr.build(placeholders,
self.model.tokenize,
self.model.detokenize,
self.model.config.ctx_size - max_generation_size,
sacrifice
)
ntk = len(self.model.tokenize(prompt))
max_generation_size = min(self.model.config.ctx_size - ntk, max_generation_size)
# TODO : add show progress
gen = self.generate_with_images(prompt, images, max_generation_size, callback=callback, show_progress=show_progress).strip().replace("</s>", "").replace("<s>", "")
if debug:
self.print_prompt("prompt", prompt+gen)
return gen
def fast_gen(self, prompt: str, max_generation_size: int=None, placeholders: dict = {}, sacrifice: list = ["previous_discussion"], debug: bool = False, callback=None, show_progress=False) -> str:
"""
Fast way to generate code
@ -324,7 +388,7 @@ Date: {{date}}
max_generation_size = min(self.model.config.ctx_size - ntk, max_generation_size)
# TODO : add show progress
gen = self.generate(prompt, max_generation_size, callback=callback).strip().replace("</s>", "").replace("<s>", "")
gen = self.generate(prompt, max_generation_size, callback=callback, show_progress=show_progress).strip().replace("</s>", "").replace("<s>", "")
if debug:
self.print_prompt("prompt", prompt+gen)
@ -348,10 +412,22 @@ Date: {{date}}
return string
def process(self, text:str, message_type:MSG_TYPE, callback=None):
def process(self, text:str, message_type:MSG_TYPE, callback=None, show_progress=False):
if text is None:
return True
bot_says = self.bot_says + text
if show_progress:
if self.nb_received_tokens==0:
self.start_time = datetime.now()
dt =(datetime.now() - self.start_time).seconds
if dt==0:
dt=1
spd = self.nb_received_tokens/dt
ASCIIColors.green(f"Received {self.nb_received_tokens} tokens (speed: {spd:.2f}t/s) ",end="\r",flush=True)
sys.stdout = sys.__stdout__
sys.stdout.flush()
self.nb_received_tokens+=1
antiprompt = self.detect_antiprompt(bot_says)
if antiprompt:
@ -363,9 +439,29 @@ Date: {{date}}
callback(text,MSG_TYPE.MSG_TYPE_CHUNK)
self.bot_says = bot_says
return True
def generate(self, prompt, max_size, temperature = None, top_k = None, top_p=None, repeat_penalty=None, callback=None, debug=False ):
def generate_with_images(self, prompt, images, max_size, temperature = None, top_k = None, top_p=None, repeat_penalty=None, callback=None, debug=False, show_progress=False ):
ASCIIColors.info("Text generation started: Warming up")
self.nb_received_tokens = 0
self.bot_says = ""
if debug:
self.print_prompt("gen",prompt)
self.model.generate_with_images(
prompt,
images,
max_size,
partial(self.process, callback=callback, show_progress=show_progress),
temperature=self.model_temperature if temperature is None else temperature,
top_k=self.model_top_k if top_k is None else top_k,
top_p=self.model_top_p if top_p is None else top_p,
repeat_penalty=self.model_repeat_penalty if repeat_penalty is None else repeat_penalty,
).strip()
return self.bot_says
def generate(self, prompt, max_size, temperature = None, top_k = None, top_p=None, repeat_penalty=None, callback=None, debug=False, show_progress=False ):
ASCIIColors.info("Text generation started: Warming up")
self.nb_received_tokens = 0
self.bot_says = ""
if debug:
self.print_prompt("gen",prompt)
@ -373,7 +469,7 @@ Date: {{date}}
self.model.generate(
prompt,
max_size,
partial(self.process, callback=callback),
partial(self.process, callback=callback, show_progress=show_progress),
temperature=self.model_temperature if temperature is None else temperature,
top_k=self.model_top_k if top_k is None else top_k,
top_p=self.model_top_p if top_p is None else top_p,
@ -1306,7 +1402,94 @@ class StateMachine:
else:
raise ValueError(f"Command '{command}' not found in current state and no default function defined.")
class LoLLMsActionParameters:
def __init__(self, name: str, parameter_type: Type, range: Optional[List] = None, options: Optional[List] = None, value: Any = None) -> None:
self.name = name
self.parameter_type = parameter_type
self.range = range
self.options = options
self.value = value
def __str__(self) -> str:
parameter_dict = {
'name': self.name,
'parameter_type': self.parameter_type.__name__,
'value': self.value
}
if self.range is not None:
parameter_dict['range'] = self.range
if self.options is not None:
parameter_dict['options'] = self.options
return json.dumps(parameter_dict, indent=4)
@staticmethod
def from_str(string: str) -> 'LoLLMsActionParameters':
parameter_dict = json.loads(string)
name = parameter_dict['name']
parameter_type = eval(parameter_dict['parameter_type'])
range = parameter_dict.get('range', None)
options = parameter_dict.get('options', None)
value = parameter_dict['value']
return LoLLMsActionParameters(name, parameter_type, range, options, value)
class LoLLMsActionParametersEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, LoLLMsActionParameters):
parameter_dict = {
'name': obj.name,
'parameter_type': obj.parameter_type.__name__,
'value': obj.value
}
if obj.range is not None:
parameter_dict['range'] = obj.range
if obj.options is not None:
parameter_dict['options'] = obj.options
return parameter_dict
return super().default(obj)
class LoLLMsAction:
def __init__(self, name, parameters: List[LoLLMsActionParameters], callback: Callable) -> None:
self.name = name
self.parameters = parameters
self.callback = callback
def __str__(self) -> str:
action_dict = {
'name': self.name,
'parameters': self.parameters
}
return json.dumps(action_dict, indent=4, cls=LoLLMsActionParametersEncoder)
@staticmethod
def from_str(string: str) -> 'LoLLMsAction':
action_dict = json.loads(string)
name = action_dict['name']
parameters = [LoLLMsActionParameters.from_str(param_str) for param_str in action_dict['parameters']]
return LoLLMsAction(name, parameters, None)
def run(self) -> None:
args = {param.name: param.value for param in self.parameters}
self.callback(**args)
def generate_actions(potential_actions: List[LoLLMsAction], parsed_text: dict) -> List[LoLLMsAction]:
actions = []
try:
for action_data in parsed_text["actions"]:
name = action_data['name']
parameters = action_data['parameters']
matching_action = next((action for action in potential_actions if action.name == name), None)
if matching_action:
action = LoLLMsAction.from_str(str(matching_action))
action.callback=matching_action.callback
for param_name, param_value in parameters.items():
matching_param = next((param for param in action.parameters if param.name == param_name), None)
if matching_param:
matching_param.value = param_value
actions.append(action)
except json.JSONDecodeError:
print("Invalid JSON format.")
return actions
class APScript(StateMachine):
"""
@ -1472,6 +1655,8 @@ class APScript(StateMachine):
with open(path, 'w') as file:
yaml.dump(data, file)
def generate_with_images(self, prompt, images, max_size, temperature = None, top_k = None, top_p=None, repeat_penalty=None, callback=None, debug=False ):
return self.personality.generate_with_images(prompt, images, max_size, temperature, top_k, top_p, repeat_penalty, callback, debug=debug)
def generate(self, prompt, max_size, temperature = None, top_k = None, top_p=None, repeat_penalty=None, callback=None, debug=False ):
return self.personality.generate(prompt, max_size, temperature, top_k, top_p, repeat_penalty, callback, debug=debug)
@ -1777,7 +1962,99 @@ class APScript(StateMachine):
global_prompt = f"!@>instruction: Create a title for the following prompt:\n!@>prompt:{prompt}\n!@>title:"
title = self.fast_gen(global_prompt,max_title_length)
return title
def plan_with_images(self, request: str, images:list, actions_list:list=[LoLLMsAction], context:str = "", max_answer_length: int = 512) -> List[LoLLMsAction]:
"""
creates a plan out of a request and a context
Args:
request (str): The request posed by the user.
max_answer_length (int, optional): Maximum string length allowed while interpreting the users' responses. Defaults to 50.
Returns:
int: Index of the selected option within the possible_ansers list. Or -1 if there was not match found among any of them.
"""
template = """!@>instruction:
Act as plan builder, a tool capable of making plans to perform the user requested operation.
"""
if len(actions_list)>0:
template +="""The plan builder is an AI that responds in json format
!@>actions_list:
[
{{actions_list}}
]
"""
if context!="":
template += """!@>Context:
{{context}}"""
template +="""!@>request: {{request}}
"""
template +="""!@>planner: Analyzing the user prompt.\n!@>planner: Plan has been established.\n!@>planner: list of actions in json format:\n```json\n"""
pr = PromptReshaper(template)
prompt = pr.build({
"context":context,
"request":request,
"actions_list":",\n".join([f"{action}" for action in actions_list])
},
self.personality.model.tokenize,
self.personality.model.detokenize,
self.personality.model.config.ctx_size,
["previous_discussion"]
)
gen = self.generate_with_images(prompt, images, max_answer_length).strip().replace("</s>","").replace("<s>","")
gen = self.remove_backticks(gen)
self.print_prompt("full",prompt+gen)
gen = "{\n \"actions\": [\n" + gen
gen = fix_json(gen)
return generate_actions(actions_list, gen)
def plan(self, request: str, actions_list:list=[LoLLMsAction], context:str = "", max_answer_length: int = 512) -> List[LoLLMsAction]:
"""
creates a plan out of a request and a context
Args:
request (str): The request posed by the user.
max_answer_length (int, optional): Maximum string length allowed while interpreting the users' responses. Defaults to 50.
Returns:
int: Index of the selected option within the possible_ansers list. Or -1 if there was not match found among any of them.
"""
template = """!@>instruction:
Act as plan builder, a tool capable of making plans to perform the user requested operation.
"""
if len(actions_list)>0:
template +="""The plan builder is an AI that responds in json format
!@>actions_list:
[
{{actions_list}}
]
"""
if context!="":
template += """!@>Context:
{{context}}"""
template +="""!@>request: {{request}}
"""
template +="""!@>planner: Analyzing the user prompt.\n!@>planner: Plan has been established.\n!@>planner: list of actions in json format:\n```json\n{\n \"actions\": [\n"""
pr = PromptReshaper(template)
prompt = pr.build({
"context":context,
"request":request,
"actions_list":",\n".join([f"{action}" for action in actions_list])
},
self.personality.model.tokenize,
self.personality.model.detokenize,
self.personality.model.config.ctx_size,
["previous_discussion"]
)
gen = self.generate(prompt, max_answer_length).strip().replace("</s>","").replace("<s>","")
gen = self.remove_backticks(gen)
self.print_prompt("full",prompt+gen)
gen = "{\n \"actions\": [\n" + gen
gen = fix_json(gen)
return generate_actions(actions_list, gen)
def yes_no(self, question: str, context:str="", max_answer_length: int = 50) -> bool:
"""
Analyzes the user prompt and answers whether it is asking to generate an image.
@ -1951,6 +2228,26 @@ Act as prompt ranker, a tool capable of ranking the user prompt. The ranks are r
ASCIIColors.yellow(prompt)
ASCIIColors.red(" *-*-*-*-*-*-*-*")
def fast_gen_with_images(self, prompt: str, images:list, max_generation_size: int= None, placeholders: dict = {}, sacrifice: list = ["previous_discussion"], debug: bool = False, callback=None, show_progress=False) -> str:
"""
Fast way to generate code
This method takes in a prompt, maximum generation size, optional placeholders, sacrifice list, and debug flag.
It reshapes the context before performing text generation by adjusting and cropping the number of tokens.
Parameters:
- prompt (str): The input prompt for text generation.
- max_generation_size (int): The maximum number of tokens to generate.
- placeholders (dict, optional): A dictionary of placeholders to be replaced in the prompt. Defaults to an empty dictionary.
- sacrifice (list, optional): A list of placeholders to sacrifice if the window is bigger than the context size minus the number of tokens to generate. Defaults to ["previous_discussion"].
- debug (bool, optional): Flag to enable/disable debug mode. Defaults to False.
Returns:
- str: The generated text after removing special tokens ("<s>" and "</s>") and stripping any leading/trailing whitespace.
"""
return self.personality.fast_gen_with_images(prompt=prompt, images=images, max_generation_size=max_generation_size,placeholders=placeholders, sacrifice=sacrifice, debug=debug, callback=callback, show_progress=show_progress)
def fast_gen(self, prompt: str, max_generation_size: int= None, placeholders: dict = {}, sacrifice: list = ["previous_discussion"], debug: bool = False, callback=None, show_progress=False) -> str:
"""
Fast way to generate code
@ -1968,7 +2265,7 @@ Act as prompt ranker, a tool capable of ranking the user prompt. The ranks are r
Returns:
- str: The generated text after removing special tokens ("<s>" and "</s>") and stripping any leading/trailing whitespace.
"""
return self.personality.fast_gen(prompt=prompt,max_generation_size=max_generation_size,placeholders=placeholders, sacrifice=sacrifice, debug=debug, callback=callback)
return self.personality.fast_gen(prompt=prompt,max_generation_size=max_generation_size,placeholders=placeholders, sacrifice=sacrifice, debug=debug, callback=callback, show_progress=show_progress)
#Helper method to convert outputs path to url