Upgraded coding system

This commit is contained in:
Saifeddine ALOUI 2024-12-15 02:17:33 +01:00
parent d259103fdf
commit 759810a349
4 changed files with 625 additions and 533 deletions

View File

@ -252,11 +252,6 @@ class LollmsApplication(LoLLMsCom):
def add_discussion_to_skills_library(self, client: Client): def add_discussion_to_skills_library(self, client: Client):
end_header_id_template = self.config.end_header_id_template
separator_template = self.config.separator_template
system_message_template = self.config.system_message_template
messages = client.discussion.get_messages() messages = client.discussion.get_messages()
# Extract relevant information from messages # Extract relevant information from messages
@ -269,21 +264,19 @@ class LollmsApplication(LoLLMsCom):
self.tasks_library.callback = bk_cb self.tasks_library.callback = bk_cb
# Generate title # Generate title
title_prompt = f"{separator_template}".join([ title_prompt = f"{self.separator_template}".join([
f"{self.start_header_id_template}{system_message_template}{end_header_id_template}Generate a concise and descriptive title for the following content.", f"{self.system_full_header}Generate a concise and descriptive title and category for the following content:",
"The title should summarize the main topic or subject of the content.", content
"Do not mention the format of the content (e.g., bullet points, discussion, etc.) in the title.",
"Provide only the title without any additional explanations or context.",
f"{self.start_header_id_template}content{end_header_id_template}",
f"{content}",
f"{self.start_header_id_template}title{end_header_id_template}"
]) ])
template = f"{self.separator_template}".join([
title = self._generate_text(title_prompt) "{",
' "title":"here you put the title"',
# Determine category ' "category":"here you put the category"',
category_prompt = f"{self.system_full_header}Analyze the following title, and determine the most appropriate generic category that encompasses the main subject or theme. The category should be broad enough to include multiple related skill entries. Provide only the category name without any additional explanations or context:\n\nTitle:\n{title}\n{separator_template}{self.start_header_id_template}Category:\n" "}"])
category = self._generate_text(category_prompt) language = "json"
title_category_json = json.loads(self._generate_code(title_prompt, template, language))
title = title_category_json["title"]
category = title_category_json["category"]
# Add entry to skills library # Add entry to skills library
self.skills_library.add_entry(1, category, title, content) self.skills_library.add_entry(1, category, title, content)
@ -318,7 +311,11 @@ class LollmsApplication(LoLLMsCom):
max_tokens = min(self.config.ctx_size - self.model.get_nb_tokens(prompt),self.config.max_n_predict if self.config.max_n_predict else self.config.ctx_size- self.model.get_nb_tokens(prompt)) max_tokens = min(self.config.ctx_size - self.model.get_nb_tokens(prompt),self.config.max_n_predict if self.config.max_n_predict else self.config.ctx_size- self.model.get_nb_tokens(prompt))
generated_text = self.model.generate(prompt, max_tokens) generated_text = self.model.generate(prompt, max_tokens)
return generated_text.strip() return generated_text.strip()
def _generate_code(self, prompt, template, language):
max_tokens = min(self.config.ctx_size - self.model.get_nb_tokens(prompt),self.config.max_n_predict if self.config.max_n_predict else self.config.ctx_size- self.model.get_nb_tokens(prompt))
generated_code = self.personality.generate_code(prompt, self.personality.image_files, template, language, max_size= max_tokens)
return generated_code
def get_uploads_path(self, client_id): def get_uploads_path(self, client_id):
return self.lollms_paths.personal_uploads_path return self.lollms_paths.personal_uploads_path
@ -888,7 +885,7 @@ class LollmsApplication(LoLLMsCom):
def recover_discussion(self,client_id, message_index=-1): def recover_discussion(self,client_id, message_index=-1):
messages = self.session.get_client(client_id).discussion.get_messages() messages = self.session.get_client(client_id).discussion.get_messages()
discussion="" discussion=""
for msg in messages: for msg in messages[:-1]:
if message_index!=-1 and msg>message_index: if message_index!=-1 and msg>message_index:
break break
discussion += "\n" + self.config.discussion_prompt_separator + msg.sender + ": " + msg.content.strip() discussion += "\n" + self.config.discussion_prompt_separator + msg.sender + ": " + msg.content.strip()
@ -1252,17 +1249,12 @@ class LollmsApplication(LoLLMsCom):
if discussion is None: if discussion is None:
discussion = self.recover_discussion(client_id) discussion = self.recover_discussion(client_id)
self.personality.step_start("Building query") self.personality.step_start("Building query")
query = self.personality.generate_code(f"""{self.system_full_header} query = self.personality.generate_code(f"""Your task is to carefully read the provided discussion and reformulate {self.config.user_name}'s request concisely.
Your task is to carefully read the provided discussion and reformulate {self.config.user_name}'s request concisely. {self.system_custom_header("discussion")}
The reformulation must be placed inside a json markdown tag like this:
```json
{{
"request": the reformulated request
}}
```
{self.system_custom_header("discussion:")}
{discussion[-2048:]} {discussion[-2048:]}
{self.system_custom_header("search query:")}""", callback=self.personality.sink) """, template="""{
"request": "the reformulated request"
}""", callback=self.personality.sink)
query_code = json.loads(query) query_code = json.loads(query)
query = query_code["request"] query = query_code["request"]
self.personality.step_end("Building query") self.personality.step_end("Building query")

View File

@ -43,13 +43,18 @@ from lollms.com import LoLLMsCom
from lollms.helpers import trace_exception from lollms.helpers import trace_exception
from lollms.utilities import PackageManager from lollms.utilities import PackageManager
import pipmaster as pm
import inspect import inspect
from lollms.code_parser import compress_js, compress_python, compress_html from lollms.code_parser import compress_js, compress_python, compress_html
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import pipmaster as pm
if not pm.is_installed("PyQt5"):
pm.install("PyQt5")
import sys
from PyQt5.QtWidgets import QApplication, QLineEdit, QButtonGroup, QRadioButton, QVBoxLayout, QWidget, QMessageBox
def get_element_id(url, text): def get_element_id(url, text):
response = requests.get(url) response = requests.get(url)
@ -721,17 +726,100 @@ class AIPersonality:
return gen return gen
def generate_codes(self, prompt, max_size = None, temperature = None, top_k = None, top_p=None, repeat_penalty=None, repeat_last_n=None, callback=None, debug=False ): def generate_codes(
if len(self.image_files)>0: self,
response = self.generate_with_images(self.system_custom_header("Generation infos")+ "Generated code must be put inside the adequate markdown code tag. Use this template:\n```language name\nCode\n```\n" + self.separator_template + prompt, self.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) prompt,
else: images=[],
response = self.generate(self.system_custom_header("Generation infos")+ "Generated code must be put inside the adequate markdown code tag. Use this template:\n```language name\nCode\n```\n" + self.separator_template + prompt, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) template=None,
codes = self.extract_code_blocks(response) language="json",
return codes code_tag_format="markdown", # or "html"
max_size = None,
def generate_code(self, prompt, images=[], max_size = None, temperature = None, top_k = None, top_p=None, repeat_penalty=None, repeat_last_n=None, callback=None, debug=False, return_full_generated_code=False, accept_all_if_no_code_tags_is_present=False, max_continues=5): temperature = None,
top_k = None,
top_p=None,
repeat_penalty=None,
repeat_last_n=None,
callback=None,
debug=False,
return_full_generated_code=False,
):
response_full = "" response_full = ""
full_prompt = self.system_custom_header("Generation infos")+ "Generated code must be put inside the adequate markdown code tag. Use this template:\n```language name\nCode\n```\nMake sure only a single code tag is generated at each dialogue turn." + self.separator_template + self.system_custom_header("User prompt")+ prompt + self.separator_template + self.ai_custom_header("generated code") full_prompt = f"""{self.system_full_header}Act as code generation assistant who answers with a single code tag content.
{prompt}
Make sure only a single code tag is generated at each dialogue turn.
"""
if template:
full_prompt += "Here is a template of the answer:\n"
if code_tag_format=="markdown":
full_prompt += f"""```{language}
{template}
```
The generated code must be placed inside the markdown code tag.
"""
elif code_tag_format=="html":
full_prompt +=f"""<code language="{language}">
{template}
</code>
The generated code must be placed inside the html code tag.
"""
full_prompt += self.ai_custom_header("assistant")
if len(self.image_files)>0:
response = self.generate_with_images(full_prompt, self.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
elif len(images)>0:
response = self.generate_with_images(full_prompt, images, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
else:
response = self.generate(full_prompt, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
response_full += response
codes = self.extract_code_blocks(response)
if return_full_generated_code:
return codes, response_full
else:
return codes
def generate_code(
self,
prompt,
images=[],
template=None,
language="json",
code_tag_format="markdown", # or "html"
max_size = None,
temperature = None,
top_k = None,
top_p=None,
repeat_penalty=None,
repeat_last_n=None,
callback=None,
debug=False,
return_full_generated_code=False,
accept_all_if_no_code_tags_is_present=False,
max_continues=5
):
response_full = ""
full_prompt = f"""{self.system_full_header}Act as a code generation assistant who answers with a single code tag content.
{self.system_custom_header("user")}
{prompt}
Make sure only a single code tag is generated at each dialogue turn.
"""
if template:
full_prompt += "Here is a template of the answer:\n"
if code_tag_format=="markdown":
full_prompt += f"""You must answer with the code placed inside the markdown code tag like this:
```{language}
{template}
```
Don't forget to close the markdown code tag
"""
elif code_tag_format=="html":
full_prompt +=f"""You must answer with the code placed inside the html code tag like this:
<code language="{language}">
{template}
</code>
Don't forget to close the html code tag
"""
full_prompt += self.ai_custom_header("assistant")
if len(self.image_files)>0: if len(self.image_files)>0:
response = self.generate_with_images(full_prompt, self.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) response = self.generate_with_images(full_prompt, self.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
elif len(images)>0: elif len(images)>0:
@ -771,9 +859,49 @@ class AIPersonality:
else: else:
return None return None
def generate_text(self, prompt, images=[], max_size = None, temperature = None, top_k = None, top_p=None, repeat_penalty=None, repeat_last_n=None, callback=None, debug=False, return_full_generated_code=False, accept_all_if_no_code_tags_is_present=False):
def generate_text(
self,
prompt,
images=[],
template=None,
code_tag_format="markdown", # or "html"
max_size = None,
temperature = None,
top_k = None,
top_p=None,
repeat_penalty=None,
repeat_last_n=None,
callback=None,
debug=False,
return_full_generated_code=False,
accept_all_if_no_code_tags_is_present=False,
max_continues=5
):
response_full = "" response_full = ""
full_prompt = self.system_custom_header("Generation infos")+ "Generated text content must be put inside a markdown code tag. Use this template:\n```\nText\n```\nMake sure only a single text tag is generated at each dialogue turn." + self.separator_template + self.system_custom_header("User prompt")+ prompt + self.separator_template + self.ai_custom_header("generated answer") full_prompt = f"""{self.system_full_header}Act as a json generation assistant who answers with a single json code tag content.
{self.system_custom_header("user")}
{prompt}
Make sure only a single code tag is generated at each dialogue turn.
"""
if template:
full_prompt += "Here is a template of the answer:\n"
if code_tag_format=="markdown":
full_prompt += f"""You must answer with the code placed inside the markdown code tag like this:
```json
{template}
```
Don't forget to close the markdown code tag
"""
elif code_tag_format=="html":
full_prompt +=f"""You must answer with the code placed inside the html code tag like this:
<code language="json">
{template}
</code>
Don't forget to close the html code tag
"""
full_prompt += self.ai_custom_header("assistant")
if len(self.image_files)>0: if len(self.image_files)>0:
response = self.generate_with_images(full_prompt, self.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) response = self.generate_with_images(full_prompt, self.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
elif len(images)>0: elif len(images)>0:
@ -790,8 +918,9 @@ class AIPersonality:
if len(codes)>0: if len(codes)>0:
if not codes[-1]["is_complete"]: if not codes[-1]["is_complete"]:
code = "\n".join(codes[-1]["content"].split("\n")[:-1]) code = "\n".join(codes[-1]["content"].split("\n")[:-1])
while not codes[-1]["is_complete"]: nb_continues = 0
response = self.generate(prompt+code+self.user_full_header+"continue the text. Start from last line and continue the text. Put the text inside a markdown code tag."+self.separator_template+self.ai_full_header, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) while not codes[-1]["is_complete"] and nb_continues<max_continues:
response = self.generate(full_prompt+code+self.user_full_header+"continue the code. Start from last line and continue the code. Put the code inside a markdown code tag."+self.separator_template+self.ai_full_header, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
response_full += response response_full += response
codes = self.extract_code_blocks(response) codes = self.extract_code_blocks(response)
if len(codes)==0: if len(codes)==0:
@ -801,6 +930,7 @@ class AIPersonality:
code +="\n"+ "\n".join(codes[-1]["content"].split("\n")[:-1]) code +="\n"+ "\n".join(codes[-1]["content"].split("\n")[:-1])
else: else:
code +="\n"+ "\n".join(codes[-1]["content"].split("\n")) code +="\n"+ "\n".join(codes[-1]["content"].split("\n"))
nb_continues += 1
else: else:
code = codes[-1]["content"] code = codes[-1]["content"]
@ -809,13 +939,14 @@ class AIPersonality:
else: else:
return code return code
else: else:
return None return None
def generate_structured_content(self, def generate_structured_content(self,
prompt, prompt,
template, images = [],
template = {},
single_shot=False, single_shot=False,
output_format="yaml"): output_format="json"):
""" """
Generate structured content (YAML/JSON) either in single-shot or step-by-step mode. Generate structured content (YAML/JSON) either in single-shot or step-by-step mode.
@ -836,26 +967,25 @@ class AIPersonality:
if single_shot: if single_shot:
# Generate all content at once for powerful LLMs # Generate all content at once for powerful LLMs
full_prompt = f"""Generate {output_format.upper()} content for: {prompt} full_prompt = f"""Generate {output_format} content for: {prompt}
Use this structure:
{output_data}
""" """
if self.config.debug and not self.processor: if self.config.debug and not self.processor:
ASCIIColors.highlight(full_prompt,"source_document_title", ASCIIColors.color_yellow, ASCIIColors.color_red, False) ASCIIColors.highlight(full_prompt,"source_document_title", ASCIIColors.color_yellow, ASCIIColors.color_red, False)
response = self.generate_code(full_prompt, callback=self.sink, accept_all_if_no_code_tags_is_present=True) if output_format=="yaml":
output_data = yaml.dumps(output_data)
code = self.generate_code(full_prompt, images, output_data, output_format, callback=self.sink, accept_all_if_no_code_tags_is_present=True)
# Parse the response based on format # Parse the response based on format
if output_format == "yaml": if output_format == "yaml":
try: try:
cleaned_response = response.replace("```yaml", "").replace("```", "").strip() output_data = yaml.safe_load(code)
output_data = yaml.safe_load(cleaned_response)
except yaml.YAMLError: except yaml.YAMLError:
# If parsing fails, fall back to step-by-step # If parsing fails, fall back to step-by-step
single_shot = False single_shot = False
elif output_format == "json": elif output_format == "json":
try: try:
cleaned_response = response.replace("```json", "").replace("```", "").strip() output_data = json.loads(code)
output_data = json.loads(cleaned_response)
except json.JSONDecodeError: except json.JSONDecodeError:
# If parsing fails, fall back to step-by-step # If parsing fails, fall back to step-by-step
single_shot = False single_shot = False
@ -865,12 +995,10 @@ Use this structure:
for field, field_info in template.items(): for field, field_info in template.items():
if "prompt" in field_info: if "prompt" in field_info:
field_prompt = field_info["prompt"].format(main_prompt=prompt) field_prompt = field_info["prompt"].format(main_prompt=prompt)
response = self.generate_code(field_prompt, callback=self.sink, accept_all_if_no_code_tags_is_present=True ) code = self.generate_code(field_prompt, images, output_data, callback=self.sink, accept_all_if_no_code_tags_is_present=True )
# Clean up the response
cleaned_response = response.strip()
# Apply any field-specific processing # Apply any field-specific processing
if "processor" in field_info: if "processor" in field_info:
cleaned_response = field_info["processor"](cleaned_response) cleaned_response = field_info["processor"](code)
output_data[field] = cleaned_response output_data[field] = cleaned_response
# Format the output string # Format the output string
@ -3002,113 +3130,79 @@ class APScript(StateMachine):
codes = self.extract_code_blocks(response) codes = self.extract_code_blocks(response)
return codes return codes
def generate_code(self, prompt, images=[], max_size = None, temperature = None, top_k = None, top_p=None, repeat_penalty=None, repeat_last_n=None, callback=None, debug=False, return_full_generated_code=False, accept_all_if_no_code_tags_is_present=False):
response_full = ""
full_prompt = f"""{self.system_custom_header("system")}You are a code generation assistant.
Your objective is to generate code as requested by the user and format the output as markdown.
Generated code must be put inside the adequate markdown code tag.
Use this code generation template:
```language name (ex: python, json, javascript...)
Code
```
Make sure only a single code tag is generated at each dialogue turn.
{self.separator_template}{self.system_custom_header("user")}{prompt}
{self.separator_template}{self.ai_custom_header("assistant")}"""
if self.config.debug:
ASCIIColors.red("Generation request prompt:")
ASCIIColors.yellow(full_prompt)
if len(self.personality.image_files)>0:
response = self.personality.generate_with_images(full_prompt, self.personality.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
elif len(images)>0:
response = self.personality.generate_with_images(full_prompt, images, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
else:
response = self.personality.generate(full_prompt, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
response_full += response
codes = self.extract_code_blocks(response)
if self.config.debug:
ASCIIColors.red("Generated codes:")
ASCIIColors.green(codes)
if len(codes)==0 and accept_all_if_no_code_tags_is_present:
if return_full_generated_code:
return response, response_full
else:
return response
if len(codes)>0:
if not codes[-1]["is_complete"]:
code = "\n".join(codes[-1]["content"].split("\n")[:-1])
while not codes[-1]["is_complete"]:
response = self.personality.generate(prompt+code+self.user_full_header+"continue the code. Start from last line and continue the code. Put the code inside a markdown code tag."+self.separator_template+self.ai_full_header, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug)
response_full += response
codes = self.extract_code_blocks(response)
if len(codes)==0:
break
else:
if not codes[-1]["is_complete"]:
code +="\n"+ "\n".join(codes[-1]["content"].split("\n")[:-1])
else:
code +="\n"+ "\n".join(codes[-1]["content"].split("\n"))
else:
code = codes[-1]["content"]
if return_full_generated_code:
return code, response_full
else:
return code
else:
if return_full_generated_code:
return None, None
else:
return None
def generate_text(self, prompt, images=[], max_size = None, temperature = None, top_k = None, top_p=None, repeat_penalty=None, repeat_last_n=None, callback=None, debug=False, return_full_generated_code=False, accept_all_if_no_code_tags_is_present=False):
response_full = "" def generate_code(
full_prompt = f""" self,
{self.system_custom_header("system")} prompt,
You are a text generation assistant. images=[],
Generated text content must be put inside a markdown code tag. template=None,
Use this template: language="json",
``` code_tag_format="markdown", # or "html"
Text max_size = None,
``` temperature = None,
Make sure only a single text tag is generated at each dialogue turn. top_k = None,
{self.separator_template}{self.system_custom_header("User prompt")}{prompt} top_p=None,
{self.separator_template}{self.ai_custom_header("assistant")}""" repeat_penalty=None,
if len(self.personality.image_files)>0: repeat_last_n=None,
response = self.personality.generate_with_images(full_prompt, self.personality.image_files, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) callback=None,
elif len(images)>0: debug=False,
response = self.personality.generate_with_images(full_prompt, images, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) return_full_generated_code=False,
else: accept_all_if_no_code_tags_is_present=False,
response = self.personality.generate(full_prompt, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) max_continues=5
response_full += response ):
codes = self.extract_code_blocks(response) return self.personality.generate_code(prompt,
if len(codes)==0 and accept_all_if_no_code_tags_is_present: images,
if return_full_generated_code: template,
return response, response_full language,
else: code_tag_format, # or "html"
return response max_size,
if len(codes)>0: temperature,
if not codes[-1]["is_complete"]: top_k,
code = "\n".join(codes[-1]["content"].split("\n")[:-1]) top_p,
while not codes[-1]["is_complete"]: repeat_penalty,
response = self.personality.generate(prompt+code+self.user_full_header+"continue the text. Start from last line and continue the text. Put the text inside a markdown code tag."+self.separator_template+self.ai_full_header, max_size, temperature, top_k, top_p, repeat_penalty, repeat_last_n, callback, debug=debug) repeat_last_n,
response_full += response callback,
codes = self.extract_code_blocks(response) debug,
if len(codes)==0: return_full_generated_code,
break accept_all_if_no_code_tags_is_present,
else: max_continues
if not codes[-1]["is_complete"]: )
code +="\n"+ "\n".join(codes[-1]["content"].split("\n")[:-1])
else: def generate_text(
code +="\n"+ "\n".join(codes[-1]["content"].split("\n")) self,
else: prompt,
code = codes[-1]["content"] images=[],
template=None,
if return_full_generated_code: code_tag_format="markdown", # or "html"
return code, response_full max_size = None,
else: temperature = None,
return code top_k = None,
else: top_p=None,
return None repeat_penalty=None,
repeat_last_n=None,
callback=None,
debug=False,
return_full_generated_code=False,
accept_all_if_no_code_tags_is_present=False,
max_continues=5
):
return self.personality.generate_text(prompt,
images,
template,
code_tag_format, # or "html"
max_size,
temperature,
top_k,
top_p,
repeat_penalty,
repeat_last_n,
callback,
debug,
return_full_generated_code,
accept_all_if_no_code_tags_is_present,
max_continues
)
def generate_structured_content(self, def generate_structured_content(self,
prompt, prompt,
@ -3131,42 +3225,33 @@ Make sure only a single text tag is generated at each dialogue turn.
# Initialize the output dictionary with default values # Initialize the output dictionary with default values
output_data = {} output_data = {}
for field, field_info in template.items(): for field, field_info in template.items():
output_data[field] = field_info.get("default", "") output_data[field] = field_info.get("default", f'[{field_info.get(f"prompt","")}]')
if single_shot: if single_shot:
# Generate all content at once for powerful LLMs # Generate all content at once for powerful LLMs
full_prompt = f"Generate {output_format.lower()} content for: {prompt}"
if output_format=="yaml": if output_format=="yaml":
full_prompt = f"""Generate {output_format.upper()} content for: {prompt} template = f"""{yaml.dump(output_data, default_flow_style=False)}
Use this structure:
```yaml
{yaml.dump(output_data, default_flow_style=False)}
```
""" """
elif output_format=="json": elif output_format=="json":
full_prompt = f"""Generate {output_format.lower()} content for: {prompt} template = f"""{json.dumps(output_data)}
Use this structure:
```json
{json.dumps(output_data)}
```
""" """
if self.config.debug: if self.config.debug:
ASCIIColors.green(full_prompt) ASCIIColors.green(full_prompt)
if self.config.debug and not self.personality.processor: if self.config.debug and not self.personality.processor:
ASCIIColors.highlight(full_prompt,"source_document_title", ASCIIColors.color_yellow, ASCIIColors.color_red, False) ASCIIColors.highlight(full_prompt,"source_document_title", ASCIIColors.color_yellow, ASCIIColors.color_red, False)
response = self.generate_code(full_prompt, callback=self.sink, accept_all_if_no_code_tags_is_present=True) code = self.generate_code(full_prompt, self.personality.image_files, template, language=output_format, callback=self.sink, accept_all_if_no_code_tags_is_present=True)
# Parse the response based on format # Parse the response based on format
if output_format == "yaml": if output_format == "yaml":
try: try:
cleaned_response = response.replace("```yaml", "").replace("```", "").strip() output_data = yaml.safe_load(code)
output_data = yaml.safe_load(cleaned_response)
except yaml.YAMLError: except yaml.YAMLError:
# If parsing fails, fall back to step-by-step # If parsing fails, fall back to step-by-step
single_shot = False single_shot = False
elif output_format == "json": elif output_format == "json":
try: try:
cleaned_response = response.replace("```json", "").replace("```", "").strip() output_data = json.loads(code)
output_data = json.loads(cleaned_response)
except json.JSONDecodeError: except json.JSONDecodeError:
# If parsing fails, fall back to step-by-step # If parsing fails, fall back to step-by-step
single_shot = False single_shot = False
@ -3176,9 +3261,13 @@ Use this structure:
for field, field_info in template.items(): for field, field_info in template.items():
if "prompt" in field_info: if "prompt" in field_info:
field_prompt = field_info["prompt"].format(main_prompt=prompt) field_prompt = field_info["prompt"].format(main_prompt=prompt)
response = self.generate_text(field_prompt, callback=self.sink, accept_all_if_no_code_tags_is_present=True ) template = f"""{{
"{field}": [The value of {field}]
}}
"""
code = self.generate_code(field_prompt, self.personality.image_files, template, "json", callback=self.sink, accept_all_if_no_code_tags_is_present=True )
# Clean up the response # Clean up the response
cleaned_response = response.strip() cleaned_response = json.loads(code)[field]
# Apply any field-specific processing # Apply any field-specific processing
if "processor" in field_info: if "processor" in field_info:
cleaned_response = field_info["processor"](cleaned_response) cleaned_response = field_info["processor"](cleaned_response)
@ -4401,38 +4490,30 @@ transition-all duration-300 ease-in-out">
def build_and_execute_python_code(self,context, instructions, execution_function_signature, extra_imports=""): def build_and_execute_python_code(self,context, instructions, execution_function_signature, extra_imports=""):
start_header_id_template = self.config.start_header_id_template start_header_id_template = self.config.start_header_id_template
end_header_id_template = self.config.end_header_id_template end_header_id_template = self.config.end_header_id_template
system_message_template = self.config.system_message_template
code = "```python\n"+self.fast_gen( code = self.generate_code(self.build_prompt([
self.build_prompt([ self.system_custom_header('context'),
f"{start_header_id_template}context{end_header_id_template}",
context, context,
f"{start_header_id_template}{system_message_template}{end_header_id_template}", self.system_full_header,
f"{instructions}", f"{instructions}",
f"Here is the signature of the function:\n{execution_function_signature}",
"Don't call the function, just write it", "Don't call the function, just write it",
"Do not provide usage example.", "Do not provide usage example.",
"The code must me without comments", "The code must me without comments",
f"{start_header_id_template}coder{end_header_id_template}Sure, in the following code, I import the necessary libraries, then define the function as you asked.", ],2), self.personality.image_files, execution_function_signature, "python")
"The function is ready to be used in your code and performs the task as you asked:",
"```python\n"
],2), callback=self.sink)
code = code.replace("```python\n```python\n", "```python\n").replace("```\n```","```")
code=self.extract_code_blocks(code)
if len(code)>0:
# Perform the search query # Perform the search query
code = code[0]["content"] code = code["content"]
code = "\n".join([ code = "\n".join([
extra_imports, extra_imports,
code code
]) ])
ASCIIColors.magenta(code) ASCIIColors.magenta(code)
module_name = 'custom_module' module_name = 'custom_module'
spec = importlib.util.spec_from_loader(module_name, loader=None) spec = importlib.util.spec_from_loader(module_name, loader=None)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
exec(code, module.__dict__) exec(code, module.__dict__)
return module, code return module, code
def yes_no(self, question: str, context:str="", max_answer_length: int = 50, conditionning="") -> bool: def yes_no(self, question: str, context:str="", max_answer_length: int = 50, conditionning="") -> bool:
@ -4448,7 +4529,7 @@ transition-all duration-300 ease-in-out">
""" """
return self.multichoice_question(question, ["no","yes"], context, max_answer_length, conditionning=conditionning)>0 return self.multichoice_question(question, ["no","yes"], context, max_answer_length, conditionning=conditionning)>0
def multichoice_question(self, question: str, possible_answers:list, context:str = "", max_answer_length: int = 50, conditionning="") -> int: def multichoice_question(self, question: str, possible_answers:list, context:str = "", max_answer_length: int = 1024, conditionning="", return_justification=False) -> int:
""" """
Interprets a multi-choice question from a users response. This function expects only one choice as true. All other choices are considered false. If none are correct, returns -1. Interprets a multi-choice question from a users response. This function expects only one choice as true. All other choices are considered false. If none are correct, returns -1.
@ -4463,38 +4544,45 @@ transition-all duration-300 ease-in-out">
""" """
choices = "\n".join([f"{i}. {possible_answer}" for i, possible_answer in enumerate(possible_answers)]) choices = "\n".join([f"{i}. {possible_answer}" for i, possible_answer in enumerate(possible_answers)])
elements = [conditionning] if conditionning!="" else [] elements = [conditionning] if conditionning!="" else []
elements += [
f"{self.system_full_header}",
"Answer this multi choices question in form of a json in this form:\n",
"""```json
{
"justification": "A justification for your choice",
"choice_index": the index of the choice made
}
```
""",
]
if context!="": if context!="":
elements+=[ elements+=[
self.system_custom_header("Context"), self.system_custom_header("context"),
f"{context}", f"{context}",
] ]
elements += [
"Answer this multi choices question about the context:\n",
]
elements += [ elements += [
self.system_custom_header("question"), self.system_custom_header("question"),
question, question,
self.system_custom_header("possible answers"), self.system_custom_header("possible answers"),
f"{choices}", f"{choices}",
] ]
elements += [self.system_custom_header("answer")]
prompt = self.build_prompt(elements) prompt = self.build_prompt(elements)
code = self.generate_code(prompt, self.personality.image_files, max_answer_length, temperature=0.1, top_k=50, top_p=0.9, repeat_penalty=1.0, repeat_last_n=50, callback=self.sink).strip().replace("</s>","").replace("<s>","") code = self.generate_code(
prompt,
self.personality.image_files,"""{
"choice_index": [an int representing the index of the choice made]
"justification": "[Justify the choice]",
}""",
max_size= max_answer_length,
temperature=0.1,
top_k=50,
top_p=0.9,
repeat_penalty=1.0,
repeat_last_n=50,
callback=self.sink
)
if len(code)>0: if len(code)>0:
json_code = json.loads(code) json_code = json.loads(code)
selection = json_code["choice_index"] selection = json_code["choice_index"]
self.print_prompt("Multi choice selection",prompt+code) self.print_prompt("Multi choice selection",prompt+code)
try: try:
return int(selection) if return_justification:
return int(selection), json_code["justification"]
else:
return int(selection)
except: except:
ASCIIColors.cyan("Model failed to answer the question") ASCIIColors.cyan("Model failed to answer the question")
return -1 return -1
@ -4608,58 +4696,71 @@ transition-all duration-300 ease-in-out">
if callback: if callback:
callback(step_text, MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_STEP_PROGRESS, {'progress':progress}) callback(step_text, MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_STEP_PROGRESS, {'progress':progress})
def ask_user(self, question): def ask_user(self, question):
import tkinter as tk try:
from tkinter import simpledialog app = QApplication(sys.argv)
root = tk.Tk() input_field = QLineEdit(question)
root.withdraw() # Hide the main window input_field.setWindowTitle("Input")
input_field.exec_()
answer = simpledialog.askstring("Input", question, parent=root) answer = input_field.text()
input_field.deleteLater()
root.destroy() # Ensure the hidden root window is properly closed return answer
except:
return answer ASCIIColors.warning(question)
def ask_user_yes_no(self, question): def ask_user_yes_no(self, question):
import tkinter as tk try:
from tkinter import messagebox app = QApplication(sys.argv)
root = tk.Tk() msg = QMessageBox()
root.withdraw() # Hide the main window msg.setIcon(QMessageBox.Question)
msg.setText(question)
response = messagebox.askyesno("Question", question) msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
response = msg.exec_()
root.destroy() # Ensure the hidden root window is properly closed return response == QMessageBox.Yes
except:
return response print(question)
def ask_user_multichoice_question(self, question, choices, default=None):
import tkinter as tk
from tkinter import ttk
def on_ok():
nonlocal result
result = var.get()
root.quit()
root = tk.Tk()
root.title("Question")
def ask_user_multichoice_question(self, question, choices, default=None):
frame = ttk.Frame(root, padding="10") try:
frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) app = QApplication(sys.argv)
window = QWidget()
ttk.Label(frame, text=question).grid(column=0, row=0, sticky=tk.W, pady=5) layout = QVBoxLayout()
window.setLayout(layout)
var = tk.StringVar(value=default if default in choices else choices[0])
label = QLabel(question)
for i, choice in enumerate(choices): layout.addWidget(label)
ttk.Radiobutton(frame, text=choice, variable=var, value=choice).grid(column=0, row=i+1, sticky=tk.W, padx=20)
button_group = QButtonGroup()
ttk.Button(frame, text="OK", command=on_ok).grid(column=0, row=len(choices)+1, pady=10) for i, choice in enumerate(choices):
button = QRadioButton(choice)
root.protocol("WM_DELETE_WINDOW", on_ok) # Handle window close button_group.addButton(button)
layout.addWidget(button)
result = None
root.mainloop() if default is not None:
for button in button_group.buttons():
return result if button.text() == default:
button.setChecked(True)
break
def on_ok():
nonlocal result
result = [button.text() for button in button_group.buttons() if button.isChecked()]
window.close()
button = QPushButton("OK")
button.clicked.connect(on_ok)
layout.addWidget(button)
window.show()
result = None
sys.exit(app.exec_())
return result
except:
ASCIIColors.error(question)
def new_message(self, message_text:str, message_type:MSG_OPERATION_TYPE= MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_SET_CONTENT, metadata=[], callback: Callable[[str, int, dict, list, AIPersonality], bool]=None): def new_message(self, message_text:str, message_type:MSG_OPERATION_TYPE= MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_SET_CONTENT, metadata=[], callback: Callable[[str, int, dict, list, AIPersonality], bool]=None):
"""This sends step rogress to front end """This sends step rogress to front end

View File

@ -24,167 +24,178 @@ from functools import partial
import os import os
import re import re
import threading import threading
import pipmaster as pm
if not pm.is_installed("PyQt5"):
pm.install("PyQt5")
import sys
from PyQt5.QtWidgets import QApplication, QFileDialog, QInputDialog
from pathlib import Path
from PyQt5.QtCore import Qt
from typing import Optional, Dict
# ----------------------- Defining router and main class ------------------------------ # ----------------------- Defining router and main class ------------------------------
router = APIRouter() router = APIRouter()
lollmsElfServer = LOLLMSElfServer.get_instance() lollmsElfServer = LOLLMSElfServer.get_instance()
# Tools
def open_folder() -> Optional[Path]: def open_folder() -> Optional[Path]:
"""
Opens a folder selection dialog and returns the selected folder path.
Returns:
Optional[Path]: The path of the selected folder or None if no folder was selected.
"""
import tkinter as tk
from tkinter import filedialog
try: try:
# Create a new Tkinter root window and hide it app = QApplication(sys.argv)
root = tk.Tk()
root.withdraw()
# Make the window appear on top # Créer une instance de QFileDialog au lieu d'utiliser la méthode statique
root.attributes('-topmost', True) dialog = QFileDialog()
dialog.setOption(QFileDialog.DontUseNativeDialog, True)
dialog.setWindowFlag(Qt.WindowStaysOnTopHint, True)
dialog.setFileMode(QFileDialog.Directory)
dialog.setOption(QFileDialog.ShowDirsOnly, True)
# Open the folder selection dialog # Afficher le dialogue et le mettre au premier plan
folder_path = filedialog.askdirectory() dialog.show()
dialog.raise_()
dialog.activateWindow()
# Destroy the root window if dialog.exec_() == QFileDialog.Accepted:
root.destroy() selected_folder = dialog.selectedFiles()[0]
return Path(selected_folder)
if folder_path:
return Path(folder_path)
else: else:
return None return None
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") print(f"Une erreur s'est produite : {e}")
return None return None
def open_file(file_types: List[str]) -> Optional[Path]: def open_file(file_types: List[str]) -> Optional[Path]:
"""
Opens a file selection dialog and returns the selected file path.
Args:
file_types (List[str]): A list of file types to filter in the dialog (e.g., ["*.txt", "*.pdf"]).
Returns:
Optional[Path]: The path of the selected file or None if no file was selected.
"""
import tkinter as tk
from tkinter import filedialog
try: try:
# Create a new Tkinter root window and hide it app = QApplication(sys.argv)
root = tk.Tk()
root.withdraw()
# Make the window appear on top # Créer une instance de QFileDialog
root.attributes('-topmost', True) dialog = QFileDialog()
dialog.setOption(QFileDialog.DontUseNativeDialog, True)
dialog.setWindowFlag(Qt.WindowStaysOnTopHint, True)
dialog.setFileMode(QFileDialog.ExistingFile)
dialog.setNameFilter(';;'.join(file_types))
# Open the file selection dialog # Afficher le dialogue et le mettre au premier plan
file_path = filedialog.askopenfilename(filetypes=[("Files", file_types)]) dialog.show()
dialog.raise_()
dialog.activateWindow()
# Destroy the root window if dialog.exec_() == QFileDialog.Accepted:
root.destroy() selected_file = dialog.selectedFiles()[0]
return Path(selected_file)
if file_path:
return Path(file_path)
else: else:
return None return None
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") print(f"Une erreur s'est produite : {e}")
return None return None
def select_rag_database(client) -> Optional[Dict[str, Path]]: def select_rag_database(client) -> Optional[Dict[str, Path]]:
""" """
Opens a folder selection dialog and then a string input dialog to get the database name. Opens a folder selection dialog and then a string input dialog to get the database name using PyQt5.
Returns: Returns:
Optional[Dict[str, Path]]: A dictionary with the database name and the database path, or None if no folder was selected. Optional[Dict[str, Path]]: A dictionary with the database name and the database path, or None if no folder was selected.
""" """
try: try:
import tkinter as tk # Create a QApplication instance
from tkinter import simpledialog, filedialog app = QApplication.instance()
# Create a new Tkinter root window and hide it if not app:
root = tk.Tk() app = QApplication(sys.argv)
root.withdraw()
# Make the window appear on top
root.attributes('-topmost', True)
# Open the folder selection dialog # Open the folder selection dialog
folder_path = filedialog.askdirectory() dialog = QFileDialog()
dialog.setOption(QFileDialog.DontUseNativeDialog, True)
dialog.setWindowFlag(Qt.WindowStaysOnTopHint, True)
dialog.setWindowModality(Qt.ApplicationModal)
dialog.raise_()
dialog.activateWindow()
# Add a custom filter to show network folders
dialog.setNameFilter("All Files (*)")
dialog.setViewMode(QFileDialog.List)
if folder_path: # Show the dialog modally
# Ask for the database name if dialog.exec_() == QFileDialog.Accepted:
db_name = simpledialog.askstring("Database Name", "Please enter the database name:") folder_path = dialog.selectedFiles()[0] # Get the selected folder path
if folder_path:
# Destroy the root window # Bring the input dialog to the foreground as well
root.destroy() input_dialog = QInputDialog()
input_dialog.setWindowFlags(input_dialog.windowFlags() | Qt.WindowStaysOnTopHint)
if db_name: input_dialog.setWindowModality(Qt.ApplicationModal)
try: input_dialog.setOption(QInputDialog.DontUseNativeDialog, True)
lollmsElfServer.ShowBlockingMessage("Adding a new database.") input_dialog.setWindowFlag(Qt.WindowStaysOnTopHint, True)
if not PackageManager.check_package_installed_with_version("lollmsvectordb","0.6.0"): input_dialog.setWindowModality(Qt.ApplicationModal)
PackageManager.install_or_update("lollmsvectordb") input_dialog.raise_()
input_dialog.activateWindow()
db_name, ok = input_dialog.getText(None, "Database Name", "Please enter the database name:")
if ok and db_name:
try:
lollmsElfServer.ShowBlockingMessage("Adding a new database.")
if not PackageManager.check_package_installed_with_version("lollmsvectordb","0.6.0"):
PackageManager.install_or_update("lollmsvectordb")
from lollmsvectordb import VectorDatabase
from lollmsvectordb.text_document_loader import TextDocumentsLoader
from lollmsvectordb.lollms_tokenizers.tiktoken_tokenizer import TikTokenTokenizer
if lollmsElfServer.config.rag_vectorizer == "semantic":
from lollmsvectordb.lollms_vectorizers.semantic_vectorizer import SemanticVectorizer
v = SemanticVectorizer(lollmsElfServer.config.rag_vectorizer_model)
elif lollmsElfServer.config.rag_vectorizer == "tfidf":
from lollmsvectordb.lollms_vectorizers.tfidf_vectorizer import TFIDFVectorizer
v = TFIDFVectorizer()
elif lollmsElfServer.config.rag_vectorizer == "openai":
from lollmsvectordb.lollms_vectorizers.openai_vectorizer import OpenAIVectorizer
v = OpenAIVectorizer(lollmsElfServer.config.rag_vectorizer_openai_key)
elif lollmsElfServer.config.rag_vectorizer == "ollama":
from lollmsvectordb.lollms_vectorizers.ollama_vectorizer import OllamaVectorizer
v = OllamaVectorizer(lollmsElfServer.config.rag_vectorizer_model, lollmsElfServer.config.rag_service_url)
vdb = VectorDatabase(Path(folder_path)/f"{db_name}.sqlite", v, lollmsElfServer.model if lollmsElfServer.model else TikTokenTokenizer())
# Get all files in the folder
folder = Path(folder_path)
file_types = [f"**/*{f}" if lollmsElfServer.config.rag_follow_subfolders else f"*{f}" for f in TextDocumentsLoader.get_supported_file_types()]
files = []
for file_type in file_types:
files.extend(folder.glob(file_type))
# Load and add each document to the database
for fn in files:
try:
text = TextDocumentsLoader.read_file(fn)
title = fn.stem # Use the file name without extension as the title
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nAdding {title}")
vdb.add_document(title, text, fn)
print(f"Added document: {title}")
except Exception as e:
lollmsElfServer.error(f"Failed to add document {fn}: {e}")
print(f"Failed to add document {fn}: {e}")
if vdb.new_data: #New files are added, need reindexing
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nIndexing the database...")
vdb.build_index()
ASCIIColors.success("OK")
lollmsElfServer.HideBlockingMessage()
run_async(partial(lollmsElfServer.sio.emit,'rag_db_added', {"database_name": db_name, "database_path": str(folder_path)}, to=client.client_id))
except Exception as ex:
trace_exception(ex)
lollmsElfServer.HideBlockingMessage()
from lollmsvectordb import VectorDatabase return {"database_name": db_name, "database_path": Path(folder_path)}
from lollmsvectordb.text_document_loader import TextDocumentsLoader else:
from lollmsvectordb.lollms_tokenizers.tiktoken_tokenizer import TikTokenTokenizer return None
if lollmsElfServer.config.rag_vectorizer == "semantic":
from lollmsvectordb.lollms_vectorizers.semantic_vectorizer import SemanticVectorizer
v = SemanticVectorizer(lollmsElfServer.config.rag_vectorizer_model)
elif lollmsElfServer.config.rag_vectorizer == "tfidf":
from lollmsvectordb.lollms_vectorizers.tfidf_vectorizer import TFIDFVectorizer
v = TFIDFVectorizer()
elif lollmsElfServer.config.rag_vectorizer == "openai":
from lollmsvectordb.lollms_vectorizers.openai_vectorizer import OpenAIVectorizer
v = OpenAIVectorizer(lollmsElfServer.config.rag_vectorizer_openai_key)
elif lollmsElfServer.config.rag_vectorizer == "ollama":
from lollmsvectordb.lollms_vectorizers.ollama_vectorizer import OllamaVectorizer
v = OllamaVectorizer(lollmsElfServer.config.rag_vectorizer_model, lollmsElfServer.config.rag_service_url)
vdb = VectorDatabase(Path(folder_path)/f"{db_name}.sqlite", v, lollmsElfServer.model if lollmsElfServer.model else TikTokenTokenizer())
# Get all files in the folder
folder = Path(folder_path)
file_types = [f"**/*{f}" if lollmsElfServer.config.rag_follow_subfolders else f"*{f}" for f in TextDocumentsLoader.get_supported_file_types()]
files = []
for file_type in file_types:
files.extend(folder.glob(file_type))
# Load and add each document to the database
for fn in files:
try:
text = TextDocumentsLoader.read_file(fn)
title = fn.stem # Use the file name without extension as the title
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nAdding {title}")
vdb.add_document(title, text, fn)
print(f"Added document: {title}")
except Exception as e:
lollmsElfServer.error(f"Failed to add document {fn}: {e}")
print(f"Failed to add document {fn}: {e}")
if vdb.new_data: #New files are added, need reindexing
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nIndexing the database...")
vdb.build_index()
ASCIIColors.success("OK")
lollmsElfServer.HideBlockingMessage()
run_async(partial(lollmsElfServer.sio.emit,'rag_db_added', {"database_name": db_name, "database_path": str(folder_path)}, to=client.client_id))
except Exception as ex:
trace_exception(ex)
lollmsElfServer.HideBlockingMessage()
else: else:
return None return None
else:
# Destroy the root window if no folder was selected
root.destroy()
return None
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") print(f"An error occurred: {e}")
return None return None
def find_rag_database_by_name(entries: List[str], name: str) -> Optional[str]: def find_rag_database_by_name(entries: List[str], name: str) -> Optional[str]:
""" """
Finds an entry in the list by its name. Finds an entry in the list by its name.
@ -315,75 +326,72 @@ async def vectorize_folder(database_infos: FolderInfos):
db_name = parts[0] db_name = parts[0]
folder_path = sanitize_path(parts[1], True) folder_path = sanitize_path(parts[1], True)
else: else:
import tkinter as tk # Create a QApplication instance
from tkinter import simpledialog, filedialog app = QApplication.instance()
# Create a new Tkinter root window and hide it if not app:
root = tk.Tk() app = QApplication(sys.argv)
root.withdraw()
# Make the window appear on top
root.attributes('-topmost', True)
# Ask for the database name # Ask for the database name
db_name = simpledialog.askstring("Database Name", "Please enter the database name:") db_name, ok = QInputDialog.getText(None, "Database Name", "Please enter the database name:")
folder_path = database_infos.db_path folder_path = database_infos.db_path
if db_name: if not ok or not db_name:
try: return
lollmsElfServer.ShowBlockingMessage("Revectorizing the database.")
if not PackageManager.check_package_installed_with_version("lollmsvectordb","0.6.0"): try:
PackageManager.install_or_update("lollmsvectordb") lollmsElfServer.ShowBlockingMessage("Revectorizing the database.")
if not PackageManager.check_package_installed_with_version("lollmsvectordb","0.6.0"):
PackageManager.install_or_update("lollmsvectordb")
from lollmsvectordb.lollms_vectorizers.semantic_vectorizer import SemanticVectorizer
from lollmsvectordb import VectorDatabase
from lollmsvectordb.text_document_loader import TextDocumentsLoader
from lollmsvectordb.lollms_tokenizers.tiktoken_tokenizer import TikTokenTokenizer
if lollmsElfServer.config.rag_vectorizer == "semantic":
from lollmsvectordb.lollms_vectorizers.semantic_vectorizer import SemanticVectorizer from lollmsvectordb.lollms_vectorizers.semantic_vectorizer import SemanticVectorizer
from lollmsvectordb import VectorDatabase v = SemanticVectorizer(lollmsElfServer.config.rag_vectorizer_model)
from lollmsvectordb.text_document_loader import TextDocumentsLoader elif lollmsElfServer.config.rag_vectorizer == "tfidf":
from lollmsvectordb.lollms_tokenizers.tiktoken_tokenizer import TikTokenTokenizer from lollmsvectordb.lollms_vectorizers.tfidf_vectorizer import TFIDFVectorizer
v = TFIDFVectorizer()
elif lollmsElfServer.config.rag_vectorizer == "openai":
from lollmsvectordb.lollms_vectorizers.openai_vectorizer import OpenAIVectorizer
v = OpenAIVectorizer(lollmsElfServer.config.rag_vectorizer_openai_key)
elif lollmsElfServer.config.rag_vectorizer == "ollama":
from lollmsvectordb.lollms_vectorizers.ollama_vectorizer import OllamaVectorizer
v = OllamaVectorizer(lollmsElfServer.config.rag_vectorizer_model, lollmsElfServer.config.rag_service_url)
vector_db_path = Path(folder_path)/f"{db_name}.sqlite"
if lollmsElfServer.config.rag_vectorizer == "semantic": vdb = VectorDatabase(vector_db_path, v, lollmsElfServer.model if lollmsElfServer.model else TikTokenTokenizer(), reset=True)
from lollmsvectordb.lollms_vectorizers.semantic_vectorizer import SemanticVectorizer vdb.new_data = True
v = SemanticVectorizer(lollmsElfServer.config.rag_vectorizer_model) # Get all files in the folder
elif lollmsElfServer.config.rag_vectorizer == "tfidf": folder = Path(folder_path)
from lollmsvectordb.lollms_vectorizers.tfidf_vectorizer import TFIDFVectorizer file_types = [f"**/*{f}" if lollmsElfServer.config.rag_follow_subfolders else f"*{f}" for f in TextDocumentsLoader.get_supported_file_types()]
v = TFIDFVectorizer() files = []
elif lollmsElfServer.config.rag_vectorizer == "openai": for file_type in file_types:
from lollmsvectordb.lollms_vectorizers.openai_vectorizer import OpenAIVectorizer files.extend(folder.glob(file_type))
v = OpenAIVectorizer(lollmsElfServer.config.rag_vectorizer_openai_key)
elif lollmsElfServer.config.rag_vectorizer == "ollama": # Load and add each document to the database
from lollmsvectordb.lollms_vectorizers.ollama_vectorizer import OllamaVectorizer for fn in files:
v = OllamaVectorizer(lollmsElfServer.config.rag_vectorizer_model, lollmsElfServer.config.rag_service_url) try:
text = TextDocumentsLoader.read_file(fn)
title = fn.stem # Use the file name without extension as the title
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nAdding {title}")
vdb.add_document(title, text, fn)
print(f"Added document: {title}")
except Exception as e:
lollmsElfServer.error(f"Failed to add document {fn}: {e}")
if vdb.new_data: #New files are added, need reindexing
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nIndexing the database...")
vdb.build_index()
ASCIIColors.success("OK")
lollmsElfServer.HideBlockingMessage()
run_async(partial(lollmsElfServer.sio.emit,'rag_db_added', {"database_name": db_name, "database_path": str(folder_path)}, to=client.client_id))
vector_db_path = Path(folder_path)/f"{db_name}.sqlite" except Exception as ex:
trace_exception(ex)
vdb = VectorDatabase(vector_db_path, v, lollmsElfServer.model if lollmsElfServer.model else TikTokenTokenizer(), reset=True) lollmsElfServer.HideBlockingMessage()
vdb.new_data = True
# Get all files in the folder
folder = Path(folder_path)
file_types = [f"**/*{f}" if lollmsElfServer.config.rag_follow_subfolders else f"*{f}" for f in TextDocumentsLoader.get_supported_file_types()]
files = []
for file_type in file_types:
files.extend(folder.glob(file_type))
# Load and add each document to the database
for fn in files:
try:
text = TextDocumentsLoader.read_file(fn)
title = fn.stem # Use the file name without extension as the title
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nAdding {title}")
vdb.add_document(title, text, fn)
print(f"Added document: {title}")
except Exception as e:
lollmsElfServer.error(f"Failed to add document {fn}: {e}")
if vdb.new_data: #New files are added, need reindexing
lollmsElfServer.ShowBlockingMessage(f"Adding a new database.\nIndexing the database...")
vdb.build_index()
ASCIIColors.success("OK")
lollmsElfServer.HideBlockingMessage()
run_async(partial(lollmsElfServer.sio.emit,'rag_db_added', {"database_name": db_name, "database_path": str(folder_path)}, to=client.client_id))
except Exception as ex:
trace_exception(ex)
lollmsElfServer.HideBlockingMessage()
lollmsElfServer.rag_thread = threading.Thread(target=process) lollmsElfServer.rag_thread = threading.Thread(target=process)
lollmsElfServer.rag_thread.start() lollmsElfServer.rag_thread.start()

View File

@ -45,6 +45,13 @@ import pipmaster as pm
if not pm.is_installed("Pillow"): if not pm.is_installed("Pillow"):
pm.install("Pillow") pm.install("Pillow")
from PIL import Image from PIL import Image
if not pm.is_installed("PyQt5"):
pm.install("PyQt5")
import sys
from PyQt5.QtWidgets import QApplication, QButtonGroup, QRadioButton, QVBoxLayout, QWidget, QPushButton, QMessageBox
from PyQt5.QtCore import Qt
import os import os
import subprocess import subprocess
import sys import sys
@ -397,72 +404,60 @@ def show_console_custom_dialog(title, text, options):
def show_custom_dialog(title, text, options): def show_custom_dialog(title, text, options):
try: try:
import tkinter as tk app = QApplication(sys.argv)
from tkinter import simpledialog window = QWidget()
class CustomDialog(simpledialog.Dialog): layout = QVBoxLayout()
def __init__(self, parent, title, options, root): window.setLayout(layout)
self.options = options
self.root = root label = QLabel(text)
self.buttons = [] layout.addWidget(label)
self.result_value = ""
super().__init__(parent, title) button_group = QButtonGroup()
def do_ok(self, option): for i, option in enumerate(options):
self.result_value = option button = QRadioButton(option)
self.ok(option) button_group.addButton(button)
self.root.destroy() layout.addWidget(button)
def body(self, master):
for option in self.options: def on_ok():
button = tk.Button(master, text=option, command=partial(self.do_ok, option)) nonlocal result
button.pack(side="left", fill="x") result = [button.text() for button in button_group.buttons() if button.isChecked()]
self.buttons.append(button) window.close()
def apply(self): button = QPushButton("OK")
self.result = self.options[0] # Default value button.clicked.connect(on_ok)
root = tk.Tk() layout.addWidget(button)
root.withdraw()
root.attributes('-topmost', True) window.show()
d = CustomDialog(root, title=title, options=options, root=root) result = None
try: sys.exit(app.exec_())
d.mainloop()
except Exception as ex:
pass
result = d.result_value
return result return result
except Exception as ex: except:
ASCIIColors.error(ex) print(title)
return show_console_custom_dialog(title, text, options)
def show_yes_no_dialog(title, text): def show_yes_no_dialog(title, text):
try: try:
if sys.platform.startswith('win'): app = QApplication.instance() or QApplication(sys.argv)
return show_windows_dialog(title, text)
elif sys.platform.startswith('darwin'): # Create a message box with Yes/No buttons
return show_macos_dialog(title, text) msg = QMessageBox()
elif sys.platform.startswith('linux'): msg.setIcon(QMessageBox.Question)
return show_linux_dialog(title, text) msg.setText(text)
else: msg.setWindowTitle(title)
return console_dialog(title, text) msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
# Ensure the dialog comes to the foreground
msg.setWindowFlags(msg.windowFlags() | Qt.WindowStaysOnTopHint)
msg.raise_()
msg.activateWindow()
# Execute the dialog and return True if 'Yes' was clicked, False otherwise
return msg.exec_() == QMessageBox.Yes
except Exception as ex: except Exception as ex:
print(f"Error: {ex}") print(f"Error: {ex}")
return console_dialog(title, text) return console_dialog(title, text)
def show_windows_dialog(title, text):
from ctypes import windll
result = windll.user32.MessageBoxW(0, text, title, 4 | 0x40000)
return result == 6 # 6 means "Yes"
def show_macos_dialog(title, text):
script = f'tell app "System Events" to display dialog "{text}" buttons {{"No", "Yes"}} default button "Yes" with title "{title}"'
result = subprocess.run(['osascript', '-e', script], capture_output=True, text=True)
return "Yes" in result.stdout
def show_linux_dialog(title, text):
zenity_path = Path('/usr/bin/zenity')
if zenity_path.exists():
result = subprocess.run([str(zenity_path), '--question', '--title', title, '--text', text], capture_output=True)
return result.returncode == 0
else:
return console_dialog(title, text)
def console_dialog(title, text): def console_dialog(title, text):
print(f"{title}\n{text}") print(f"{title}\n{text}")
@ -473,22 +468,18 @@ def console_dialog(title, text):
print("Invalid input. Please enter 'yes' or 'no'.") print("Invalid input. Please enter 'yes' or 'no'.")
def show_message_dialog(title, text): def show_message_dialog(title, text):
import tkinter as tk try:
from tkinter import messagebox app = QApplication(sys.argv)
# Create a new Tkinter root window and hide it msg = QMessageBox()
root = tk.Tk() msg.setOption(QMessageBox.DontUseNativeDialog, True)
root.withdraw() msg.setWindowFlag(Qt.WindowStaysOnTopHint, True)
msg.setIcon(QMessageBox.Information)
# Make the window appear on top msg.setText(text)
root.attributes('-topmost', True) msg.setWindowTitle(title)
result = msg.question(None, title, text, QMessageBox.Yes | QMessageBox.No)
# Show the dialog box return result == QMessageBox.Yes
result = messagebox.askquestion(title, text) except:
print(title)
# Destroy the root window
root.destroy()
return result
def is_linux(): def is_linux():