mirror of
https://github.com/ParisNeo/lollms.git
synced 2025-04-13 22:02:56 +00:00
enhanced
This commit is contained in:
parent
0c21805e55
commit
f7759f0f76
@ -1486,22 +1486,12 @@ Don't forget encapsulate the code inside a markdown code tag. This is mandatory.
|
||||
await client.generation_routine
|
||||
try:
|
||||
if len(context_details.function_calls)>0:
|
||||
codes = self.personality.extract_code_blocks(client.generated_text)
|
||||
for function_call in context_details.function_calls:
|
||||
fc:FunctionCall = function_call["class"]
|
||||
for code in codes:
|
||||
if code["type"]=="function":
|
||||
infos = json.loads(code["content"])
|
||||
if infos["function_name"]==function_call["name"]:
|
||||
if fc.function_type == FunctionType.CLASSIC:
|
||||
context_details.ai_output = client.generated_text
|
||||
output = fc.execute(context_details,**infos["function_parameters"])
|
||||
await self.new_message(client_id,"System","",message_type=MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_SET_CONTENT, sender_type=SENDER_TYPES.SENDER_TYPES_AI)
|
||||
self.update_message(client_id, "output")
|
||||
|
||||
if fc.function_type == FunctionType.CONTEXT_UPDATE:
|
||||
process_output = fc.process_output(context_details, client.generated_text)
|
||||
await self.set_message_content(process_output,client_id=client_id)
|
||||
self.set_message_content(process_output,client_id=client_id)
|
||||
|
||||
except Exception as ex:
|
||||
trace_exception(ex)
|
||||
|
||||
@ -1718,9 +1708,26 @@ Don't forget encapsulate the code inside a markdown code tag. This is mandatory.
|
||||
self.cancel_gen = False
|
||||
|
||||
ASCIIColors.yellow("Closing message")
|
||||
await self.close_message(client_id)
|
||||
|
||||
client.processing = False
|
||||
try:
|
||||
if len(context_details.function_calls)>0:
|
||||
codes = self.personality.extract_code_blocks(client.generated_text)
|
||||
for function_call in context_details.function_calls:
|
||||
fc:FunctionCall = function_call["class"]
|
||||
for code in codes:
|
||||
if code["type"]=="function":
|
||||
infos = json.loads(code["content"])
|
||||
if infos["function_name"]==function_call["name"]:
|
||||
if fc.function_type == FunctionType.CLASSIC:
|
||||
context_details.ai_output = client.generated_text
|
||||
output = fc.execute(context_details,**infos["function_parameters"])
|
||||
if output[0]=="<":
|
||||
await self.new_message(client_id,"System",output,message_type=MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_UI, sender_type=SENDER_TYPES.SENDER_TYPES_AI)
|
||||
else:
|
||||
await self.new_message(client_id,"System",output,message_type=MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_SET_CONTENT, sender_type=SENDER_TYPES.SENDER_TYPES_AI)
|
||||
|
||||
except Exception as ex:
|
||||
trace_exception(ex)
|
||||
# Clients are now kept forever
|
||||
# if client.schedule_for_deletion:
|
||||
# self.session.remove_client(client.client_id, client.client_id)
|
||||
@ -1754,12 +1761,14 @@ Don't forget encapsulate the code inside a markdown code tag. This is mandatory.
|
||||
},
|
||||
to=client_id,
|
||||
)
|
||||
await self.close_message(client_id)
|
||||
else:
|
||||
self.cancel_gen = False
|
||||
# No discussion available
|
||||
ASCIIColors.warning("No discussion selected!!!")
|
||||
|
||||
self.error("No discussion selected!!!", client_id=client_id)
|
||||
await self.close_message(client_id)
|
||||
|
||||
return ""
|
||||
|
||||
|
@ -664,7 +664,7 @@ class AIPersonality:
|
||||
if callback:
|
||||
callback(step_text, MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_STEP_START)
|
||||
|
||||
def step_end(self, step_text, status=True, callback: Callable[[str, int, dict, list], bool]=None):
|
||||
def step_end(self, step_text, success=True, callback: Callable[[str, int, dict, list], bool]=None):
|
||||
"""This triggers a step end
|
||||
|
||||
Args:
|
||||
@ -675,8 +675,11 @@ class AIPersonality:
|
||||
callback = self.callback
|
||||
|
||||
if callback:
|
||||
callback(step_text, MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_STEP_END_SUCCESS)
|
||||
|
||||
if success:
|
||||
callback(step_text, MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_STEP_END_SUCCESS)
|
||||
else:
|
||||
callback(step_text, MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_STEP_END_FAILURE)
|
||||
|
||||
def step(self, step_text, callback: Callable[[str | list | None, MSG_OPERATION_TYPE, str, Any | None], bool]=None):
|
||||
"""This triggers a step information
|
||||
|
||||
|
@ -65,6 +65,78 @@ import sys
|
||||
import subprocess
|
||||
from typing import Union, List
|
||||
|
||||
# Pre-compile regex for efficiency
|
||||
# Characters typically invalid in filenames across OSes + ASCII control characters
|
||||
_INVALID_FILENAME_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
|
||||
# Reserved filenames in Windows (case-insensitive)
|
||||
_WINDOWS_RESERVED_NAMES_RE = re.compile(
|
||||
r"^(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])$", re.IGNORECASE
|
||||
)
|
||||
# Matches one or more dots or spaces at the beginning or end of a string
|
||||
_LEADING_TRAILING_DOTS_SPACES_RE = re.compile(r"^[. ]+|[. ]+$")
|
||||
|
||||
def safe_filename(
|
||||
filename: str,
|
||||
replacement: str = '_',
|
||||
max_len: int = 200,
|
||||
default_name: str = "unnamed_file"
|
||||
) -> str:
|
||||
"""
|
||||
Sanitizes a string to create a safe filename for most operating systems.
|
||||
|
||||
This function performs several steps:
|
||||
1. Converts input to string.
|
||||
2. Replaces characters invalid in Windows/Linux/macOS filenames with `replacement`.
|
||||
3. Removes leading and trailing periods and spaces (problematic on Windows).
|
||||
4. Truncates the filename to `max_len`.
|
||||
5. Checks if the result is empty or consists only of periods, returning `default_name` if so.
|
||||
6. Prepends `replacement` if the name matches a reserved Windows filename (e.g., CON, PRN).
|
||||
|
||||
Args:
|
||||
filename: The original string to sanitize.
|
||||
replacement: Character(s) to substitute for invalid characters. Defaults to '_'.
|
||||
max_len: The maximum allowed length for the final filename. Defaults to 200.
|
||||
default_name: The name returned if sanitization results in an empty or invalid string.
|
||||
Defaults to "unnamed_file".
|
||||
|
||||
Returns:
|
||||
A sanitized string suitable for use as a filename component.
|
||||
"""
|
||||
if not isinstance(filename, str):
|
||||
filename = str(filename) # Ensure we're working with a string
|
||||
|
||||
# 1. Replace invalid characters using pre-compiled regex
|
||||
sanitized = _INVALID_FILENAME_CHARS_RE.sub(replacement, filename)
|
||||
|
||||
# 2. Remove leading/trailing dots and spaces
|
||||
sanitized = _LEADING_TRAILING_DOTS_SPACES_RE.sub('', sanitized)
|
||||
|
||||
# 3. Truncate to maximum length
|
||||
if len(sanitized) > max_len:
|
||||
sanitized = sanitized[:max_len]
|
||||
# Re-apply step 2 in case truncation created trailing dots/spaces
|
||||
sanitized = _LEADING_TRAILING_DOTS_SPACES_RE.sub('', sanitized)
|
||||
|
||||
# 4. Check if the result is empty or only periods (edge case)
|
||||
if not sanitized or all(c == '.' for c in sanitized):
|
||||
return default_name
|
||||
|
||||
# 5. Check against reserved Windows names (case-insensitive)
|
||||
if _WINDOWS_RESERVED_NAMES_RE.match(sanitized):
|
||||
sanitized = replacement + sanitized
|
||||
# Re-truncate if the prefix made it too long
|
||||
if len(sanitized) > max_len:
|
||||
sanitized = sanitized[:max_len]
|
||||
# Re-apply step 2 again after potential truncation
|
||||
sanitized = _LEADING_TRAILING_DOTS_SPACES_RE.sub('', sanitized)
|
||||
|
||||
|
||||
# Final check for empty string after all modifications
|
||||
if not sanitized:
|
||||
return default_name
|
||||
|
||||
return sanitized
|
||||
|
||||
def run_with_current_interpreter(
|
||||
script_path: Union[str, Path],
|
||||
args: List[str] = None
|
||||
|
Loading…
x
Reference in New Issue
Block a user