changed to list of states, added asr startup code

This commit is contained in:
Saifeddine ALOUI 2024-05-08 12:46:37 +02:00
parent 8ca192ea64
commit 835d85ce7b
6 changed files with 662 additions and 61 deletions

View File

@ -1,5 +1,5 @@
# =================== Lord Of Large Language Multimodal Systems Configuration file ===========================
version: 92
version: 93
binding_name: null
model_name: null
model_variant: null
@ -80,6 +80,10 @@ auto_show_browser: true
# copy to clipboard
copy_to_clipboard_add_all_details: false
# STT service
asr_enable: false
asr_base_url: http://localhost:9000
# Voice service
xtts_enable: false
xtts_base_url: http://localhost:8020

View File

@ -165,7 +165,9 @@ class LollmsApplication(LoLLMsCom):
messages = client.discussion.get_messages()
# Extract relevant information from messages
content = self._extract_content(messages)
def cb(str, MSG_TYPE, dict, list):
self.ShowBlockingMessage(f"Learning\n{str}")
content = self._extract_content(messages, cb)
# Generate title
title_prompt = "\n".join([
@ -181,61 +183,29 @@ class LollmsApplication(LoLLMsCom):
title = self._generate_text(title_prompt)
# Determine category
category_prompt = f"!@>system:Analyze the following title and content, and determine the most appropriate generic category that encompasses the main subject or theme. The category should be broad enough to include multiple related skill entries. Provide only the category name without any additional explanations or context:\n\nTitle:\n{title}\nContent:\n{content}\n\n!@>Category:\n"
category_prompt = f"!@>system:Analyze the following title, and determine the most appropriate generic category that encompasses the main subject or theme. The category should be broad enough to include multiple related skill entries. Provide only the category name without any additional explanations or context:\n\nTitle:\n{title}\n\n!@>Category:\n"
category = self._generate_text(category_prompt)
# Add entry to skills library
self.skills_library.add_entry(1, category, title, content)
return category, title, content
def _extract_content(self, messages:List[Message]):
ranked_messages = sorted(messages, key=lambda m: m.rank, reverse=True)
def _extract_content(self, messages:List[Message], callback = None):
message_content = ""
max_chunk_size = int(self.config.ctx_size * 0.75)
chunks = []
current_chunk = ""
current_chunk_tokens = 0
for message in ranked_messages:
for message in messages:
rank = message.rank
sender = message.sender
text = message.content
message_content = f"Rank {rank} - {sender}: {text}\n"
message_content += f"Rank {rank} - {sender}: {text}\n"
message_tokens = self.model.get_nb_tokens(message_content)
if current_chunk_tokens + message_tokens <= max_chunk_size:
current_chunk += message_content
current_chunk_tokens += message_tokens
else:
chunks.append(current_chunk)
current_chunk = message_content
current_chunk_tokens = message_tokens
if current_chunk:
chunks.append(current_chunk)
summarized_chunks = []
for chunk in chunks:
prompt = "\n".join([
"!@>system:",
"Analyzing the discussion chunk requires careful examination of each sub-question. First, carefully examine each sub-questions to extract key information components. Then, generate two-three intermediate thoughts as bullet-points representing steps towards an answer. Evaluate clarity, relevance, logical flow, and coverage of concepts using bullet-point evaluation. If a incorrect result is detected, explain and then Backtrack and explore alternative paths by substituting different high-scoring thoughts. Throughout the reasoning process aim to provide concise and detailed explanations of key information or specific skills or methods or code sections both of the good that are kept and the low scoring ones that are filtered out to enable learning from mistakes. Once a comprehensive bullet-point summary of important information has been constructed, synthesize these insights into a final comprehensive bullet-point summary of important information. This summary includes specific skills or methods or code sections that related to each sub-question.",
"!@>discussion snippet:",
f"{chunk}",
"!@>analysis:\n"])
max_tokens = self.config.ctx_size - self.model.get_nb_tokens(prompt)
if self.config.debug:
ASCIIColors.yellow(prompt)
summarized_chunk = self.model.generate(prompt, max_tokens)
if summarized_chunk:
summarized_chunks.append(summarized_chunk.strip())
else:
raise Exception("Couldn't generate text.")
summarized_content = "\n".join(summarized_chunks)
return summarized_content
return self.tasks_library.summerize_text(
message_content,
"\n".join([
"Extract useful information from this discussion."
]),
doc_name="discussion",
callback=callback)
def _generate_text(self, prompt):
@ -244,8 +214,6 @@ class LollmsApplication(LoLLMsCom):
return generated_text.strip()
def get_uploads_path(self, client_id):
return self.lollms_paths.personal_uploads_path
@ -827,7 +795,7 @@ class LollmsApplication(LoLLMsCom):
if knowledge=="":
knowledge=f"!@>knowledge:\n"
for i,(title, content) in enumerate(zip(skill_titles,skills)):
knowledge += f"!@>knowledge {i}:\ntitle:\n{title}\ncontent:\n{content}"
knowledge += f"!@>knowledge {i}:\ntitle:\n{title}\ncontent:\n{content}\n"
self.personality.step_end("Adding skills")
self.personality.step_end("Querying skills library")
except Exception as ex:

View File

@ -1,5 +1,5 @@
# =================== Lord Of Large Language Multimodal Systems Configuration file ===========================
version: 92
version: 93
binding_name: null
model_name: null
model_variant: null
@ -80,6 +80,10 @@ auto_show_browser: true
# copy to clipboard
copy_to_clipboard_add_all_details: false
# STT service
asr_enable: false
asr_base_url: http://localhost:9000
# Voice service
xtts_enable: false
xtts_base_url: http://localhost:8020

View File

@ -1690,7 +1690,7 @@ class AIPersonality:
class StateMachine:
def __init__(self, states_dict):
def __init__(self, states_list):
"""
states structure is the following
[
@ -1703,7 +1703,7 @@ class StateMachine:
}
]
"""
self.states_dict = states_dict
self.states_list = states_list
self.current_state_id = 0
self.callback = None
@ -1718,12 +1718,12 @@ class StateMachine:
ValueError: If no state is found with the given name or index.
"""
if isinstance(state, str):
for i, state_dict in enumerate(self.states_dict):
for i, state_dict in enumerate(self.states_list):
if state_dict["name"] == state:
self.current_state_id = i
return
elif isinstance(state, int):
if 0 <= state < len(self.states_dict):
if 0 <= state < len(self.states_list):
self.current_state_id = state
return
raise ValueError(f"No state found with name or index: {state}")
@ -1743,7 +1743,7 @@ class StateMachine:
if callback:
self.callback=callback
current_state = self.states_dict[self.current_state_id]
current_state = self.states_list[self.current_state_id]
commands = current_state["commands"]
command = command.strip()
@ -1892,10 +1892,10 @@ class APScript(StateMachine):
self,
personality :AIPersonality,
personality_config :TypedConfig,
states_dict :dict = {},
states_list :dict = {},
callback = None
) -> None:
super().__init__(states_dict)
super().__init__(states_list)
self.notify = personality.app.notify
self.personality = personality

View File

@ -0,0 +1,215 @@
# Title LollmsASR
# Licence: MIT
# Author : Paris Neo
# Adapted from the work of ahmetoner's whisper-asr-webservice
# check it out : https://github.com/ahmetoner/whisper-asr-webservice
# Here is a copy of the LICENCE https://github.com/ahmetoner/whisper-asr-webservice/blob/main/LICENCE
# All rights are reserved
from pathlib import Path
import sys
from lollms.app import LollmsApplication
from lollms.paths import LollmsPaths
from lollms.config import TypedConfig, ConfigTemplate, BaseConfig
from lollms.utilities import PackageManager
import time
import io
import sys
import requests
import os
import base64
import subprocess
import time
import json
import platform
import threading
from dataclasses import dataclass
from PIL import Image, PngImagePlugin
from enum import Enum
from typing import List, Dict, Any
import uuid
from ascii_colors import ASCIIColors, trace_exception
from lollms.paths import LollmsPaths
from lollms.utilities import git_pull, show_yes_no_dialog, run_python_script_in_env, create_conda_env, run_pip_in_env, environment_exists
import subprocess
import platform
def verify_asr(lollms_paths:LollmsPaths):
# Clone repository
root_dir = lollms_paths.personal_path
shared_folder = root_dir/"shared"
asr_path = shared_folder / "asr"
return asr_path.exists()
def install_asr(lollms_app:LollmsApplication):
ASCIIColors.green("asr installation started")
repo_url = "https://github.com/ParisNeo/whisper-asr-webservice.git"
root_dir = lollms_app.lollms_paths.personal_path
shared_folder = root_dir/"shared"
asr_path = shared_folder / "asr"
# Step 1: Clone or update the repository
if os.path.exists(asr_path):
print("Repository already exists. Pulling latest changes...")
try:
subprocess.run(["git", "-C", asr_path, "pull"], check=True)
except:
subprocess.run(["git", "clone", repo_url, asr_path], check=True)
else:
print("Cloning repository...")
subprocess.run(["git", "clone", repo_url, asr_path], check=True)
# Step 2: Create or update the Conda environment
if environment_exists("asr"):
print("Conda environment 'asr' already exists. Updating...")
# Here you might want to update the environment, e.g., update Python or dependencies
# This step is highly dependent on how you manage your Conda environments and might involve
# running `conda update` commands or similar.
else:
print("Creating Conda environment 'asr'...")
create_conda_env("asr", "3.10")
# Step 3: Install or update dependencies using your custom function
requirements_path = os.path.join(asr_path, "requirements.txt")
run_pip_in_env("asr", f"install .", cwd=asr_path)
# Step 4: Launch the server
# Assuming the server can be started with a Python script in the cloned repository
print("Launching asr API server...")
run_python_script_in_env("asr", "asr_api_server", cwd=asr_path)
print("asr API server setup and launch completed.")
ASCIIColors.cyan("Done")
ASCIIColors.cyan("Installing asr-api-server")
ASCIIColors.green("asr server installed successfully")
def get_asr(lollms_paths:LollmsPaths):
root_dir = lollms_paths.personal_path
shared_folder = root_dir/"shared"
asr_path = shared_folder / "asr"
asr_script_path = asr_path / "lollms_asr.py"
git_pull(asr_path)
if asr_script_path.exists():
ASCIIColors.success("lollms_asr found.")
ASCIIColors.success("Loading source file...",end="")
# use importlib to load the module from the file path
from lollms.services.asr.lollms_asr import LollmsASR
ASCIIColors.success("ok")
return LollmsASR
class LollmsASR:
has_controlnet = False
def __init__(
self,
app:LollmsApplication,
asr_base_url=None,
share=False,
max_retries=20,
voices_folder=None,
voice_samples_path="",
wait_for_service=True,
use_deep_speed=False,
use_streaming_mode = True
):
self.generation_threads = []
self.voices_folder = voices_folder
self.ready = False
if asr_base_url=="" or asr_base_url=="http://127.0.0.1:9000":
asr_base_url = None
# Get the current directory
lollms_paths = app.lollms_paths
self.app = app
root_dir = lollms_paths.personal_path
self.voice_samples_path = voice_samples_path
self.use_deep_speed = use_deep_speed
self.use_streaming_mode = use_streaming_mode
# Store the path to the script
if asr_base_url is None:
self.asr_base_url = "http://127.0.0.1:9000"
if not verify_asr(lollms_paths):
install_asr(app.lollms_paths)
else:
self.asr_base_url = asr_base_url
self.auto_asr_url = self.asr_base_url+"/asr"
shared_folder = root_dir/"shared"
self.asr_path = shared_folder / "asr"
ASCIIColors.red(" _ _ _ ___ ___ ___ ___________ ")
ASCIIColors.red("| | | | | | | \/ | / _ \ / ___| ___ \ ")
ASCIIColors.red("| | ___ | | | | | . . |___ / /_\ \\ `--.| |_/ /")
ASCIIColors.red("| | / _ \| | | | | |\/| / __| | _ | `--. \ / ")
ASCIIColors.red("| |___| (_) | |____| |____| | | \__ \ | | | |/\__/ / |\ \ ")
ASCIIColors.red("\_____/\___/\_____/\_____/\_| |_/___/ \_| |_/\____/\_| \_|")
ASCIIColors.red(" ______ ")
ASCIIColors.red(" |______| ")
ASCIIColors.red(" Forked from ahmetoner's asr server")
ASCIIColors.red(" Integration in lollms by ParisNeo using ahmetoner's webapi")
ASCIIColors.red(" Address :",end="")
ASCIIColors.yellow(f"{self.asr_base_url}")
self.output_folder = app.lollms_paths.personal_outputs_path/"audio_out"
self.output_folder.mkdir(parents=True, exist_ok=True)
if not self.wait_for_service(1,False):
ASCIIColors.info("Loading lollms_asr")
# Launch the Flask service using the appropriate script for the platform
self.process = self.run_asr_api_server()
# Wait until the service is available at http://127.0.0.1:9000/
if wait_for_service:
self.wait_for_service()
else:
self.wait_for_service_in_another_thread(max_retries=max_retries)
def run_asr_api_server(self):
# Get the path to the current Python interpreter
ASCIIColors.yellow("Loading asr ")
process = run_python_script_in_env("asr", f"app/webservice.py", wait= False, cwd=self.asr_path)
return process
def wait_for_service_in_another_thread(self, max_retries=150, show_warning=True):
thread = threading.Thread(target=self.wait_for_service, args=(max_retries, show_warning))
thread.start()
return thread
def wait_for_service(self, max_retries = 150, show_warning=True):
print(f"Waiting for asr service (max_retries={max_retries})")
url = f"{self.asr_base_url}/languages"
# Adjust this value as needed
retries = 0
while retries < max_retries or max_retries<0:
try:
response = requests.get(url)
if response.status_code == 200:
print(f"voices_folder is {self.voices_folder}.")
if self.voices_folder is not None:
print("Generating sample audio.")
voice_file = [v for v in self.voices_folder.iterdir() if v.suffix==".wav"]
self.tts_to_audio("asr is ready",voice_file[0].name)
print("Service is available.")
if self.app is not None:
self.app.success("asr Service is now available.")
self.ready = True
return True
except:
pass
retries += 1
ASCIIColors.yellow("Waiting for asr...")
time.sleep(5)
if show_warning:
print("Service did not become available within the given time.")
if self.app is not None:
self.app.error("asr Service did not become available within the given time.")
return False

View File

@ -4,14 +4,15 @@ from typing import Callable, List
from functools import partial
from datetime import datetime
from ascii_colors import ASCIIColors
from lollms.types import MSG_TYPE
from lollms.types import MSG_TYPE, SUMMARY_MODE
from lollms.com import LoLLMsCom
from lollms.utilities import PromptReshaper, remove_text_from_string
from safe_store import DocumentDecomposer
class TasksLibrary:
def __init__(self, lollms:LoLLMsCom) -> None:
def __init__(self, lollms:LoLLMsCom, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None) -> None:
self.lollms = lollms
self.callback = callback
self.anti_prompts = [self.lollms.config.discussion_prompt_separator]+["!@>"]
def print_prompt(self, title, prompt):
@ -144,6 +145,218 @@ class TasksLibrary:
return gen
# Communications with the user
def step_start(self, step_text, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This triggers a step start
Args:
step_text (str): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE) to send the step start to. Defaults to None.
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(step_text, MSG_TYPE.MSG_TYPE_STEP_START)
def step_end(self, step_text, status=True, callback: Callable[[str, int, dict, list], bool]=None):
"""This triggers a step end
Args:
step_text (str): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE) to send the step end to. Defaults to None.
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(step_text, MSG_TYPE.MSG_TYPE_STEP_END, {'status':status})
def step(self, step_text, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This triggers a step information
Args:
step_text (str): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE, dict, list) to send the step to. Defaults to None.
The callback has these fields:
- chunk
- Message Type : the type of message
- Parameters (optional) : a dictionary of parameters
- Metadata (optional) : a list of metadata
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(step_text, MSG_TYPE.MSG_TYPE_STEP)
def exception(self, ex, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends exception to the client
Args:
step_text (str): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE, dict, list) to send the step to. Defaults to None.
The callback has these fields:
- chunk
- Message Type : the type of message
- Parameters (optional) : a dictionary of parameters
- Metadata (optional) : a list of metadata
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(str(ex), MSG_TYPE.MSG_TYPE_EXCEPTION)
def warning(self, warning:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends exception to the client
Args:
step_text (str): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE, dict, list) to send the step to. Defaults to None.
The callback has these fields:
- chunk
- Message Type : the type of message
- Parameters (optional) : a dictionary of parameters
- Metadata (optional) : a list of metadata
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(warning, MSG_TYPE.MSG_TYPE_EXCEPTION)
def info(self, info:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends exception to the client
Args:
inf (str): The information to be sent
callback (callable, optional): A callable with this signature (str, MSG_TYPE, dict, list) to send the step to. Defaults to None.
The callback has these fields:
- chunk
- Message Type : the type of message
- Parameters (optional) : a dictionary of parameters
- Metadata (optional) : a list of metadata
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(info, MSG_TYPE.MSG_TYPE_INFO)
def json(self, title:str, json_infos:dict, callback: Callable[[str, int, dict, list], bool]=None, indent=4):
"""This sends json data to front end
Args:
step_text (dict): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE, dict, list) to send the step to. Defaults to None.
The callback has these fields:
- chunk
- Message Type : the type of message
- Parameters (optional) : a dictionary of parameters
- Metadata (optional) : a list of metadata
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback("", MSG_TYPE.MSG_TYPE_JSON_INFOS, metadata = [{"title":title, "content":json.dumps(json_infos, indent=indent)}])
def ui(self, html_ui:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends ui elements to front end
Args:
step_text (dict): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE, dict, list) to send the step to. Defaults to None.
The callback has these fields:
- chunk
- Message Type : the type of message
- Parameters (optional) : a dictionary of parameters
- Metadata (optional) : a list of metadata
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(html_ui, MSG_TYPE.MSG_TYPE_UI)
def code(self, code:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends code to front end
Args:
step_text (dict): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE, dict, list) to send the step to. Defaults to None.
The callback has these fields:
- chunk
- Message Type : the type of message
- Parameters (optional) : a dictionary of parameters
- Metadata (optional) : a list of metadata
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(code, MSG_TYPE.MSG_TYPE_CODE)
def chunk(self, full_text:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends full text to front end
Args:
step_text (dict): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE) to send the text to. Defaults to None.
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(full_text, MSG_TYPE.MSG_TYPE_CHUNK)
def full(self, full_text:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None, msg_type:MSG_TYPE = MSG_TYPE.MSG_TYPE_FULL):
"""This sends full text to front end
Args:
step_text (dict): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE) to send the text to. Defaults to None.
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(full_text, msg_type)
def full_invisible_to_ai(self, full_text:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends full text to front end (INVISIBLE to AI)
Args:
step_text (dict): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE) to send the text to. Defaults to None.
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(full_text, MSG_TYPE.MSG_TYPE_FULL_INVISIBLE_TO_AI)
def full_invisible_to_user(self, full_text:str, callback: Callable[[str, MSG_TYPE, dict, list], bool]=None):
"""This sends full text to front end (INVISIBLE to user)
Args:
step_text (dict): The step text
callback (callable, optional): A callable with this signature (str, MSG_TYPE) to send the text to. Defaults to None.
"""
if not callback and self.callback:
callback = self.callback
if callback:
callback(full_text, MSG_TYPE.MSG_TYPE_FULL_INVISIBLE_TO_USER)
def extract_code_blocks(self, text: str) -> List[dict]:
"""
This function extracts code blocks from a given text.
@ -243,3 +456,200 @@ class TasksLibrary:
message_translation_text = f"!@>instruction: Translate the following message to {language}.\nDo not translate any css or code, just the text and strings.\n!@>message:\n{prompt.replace('!@>','')}\n!@>translation:\n"
translated = self.fast_gen(message_translation_text, temperature=0.1, callback=self.sink)
return translated
def summerize_text(
self,
text,
summary_instruction="summerize",
doc_name="chunk",
answer_start="",
max_generation_size=3000,
max_summary_size=512,
callback=None,
chunk_summary_post_processing=None,
summary_mode=SUMMARY_MODE.SUMMARY_MODE_SEQUENCIAL
):
depth=0
tk = self.lollms.model.tokenize(text)
prev_len = len(tk)
document_chunks=None
while len(tk)>max_summary_size and (document_chunks is None or len(document_chunks)>1):
self.step_start(f"Comprerssing {doc_name}... [depth {depth+1}]")
chunk_size = int(self.lollms.config.ctx_size*0.6)
document_chunks = DocumentDecomposer.decompose_document(text, chunk_size, 0, self.lollms.model.tokenize, self.lollms.model.detokenize, True)
text = self.summerize_chunks(
document_chunks,
summary_instruction,
doc_name,
answer_start,
max_generation_size,
callback,
chunk_summary_post_processing=chunk_summary_post_processing,
summary_mode=summary_mode)
tk = self.lollms.model.tokenize(text)
tk = self.lollms.model.tokenize(text)
dtk_ln=prev_len-len(tk)
prev_len = len(tk)
self.step(f"Current text size : {prev_len}, max summary size : {max_summary_size}")
self.step_end(f"Comprerssing {doc_name}... [depth {depth+1}]")
depth += 1
if dtk_ln<=10: # it is not sumlmarizing
break
return text
def smart_data_extraction(
self,
text,
data_extraction_instruction="summerize",
final_task_instruction="reformulate with better wording",
doc_name="chunk",
answer_start="",
max_generation_size=3000,
max_summary_size=512,
callback=None,
chunk_summary_post_processing=None,
summary_mode=SUMMARY_MODE.SUMMARY_MODE_SEQUENCIAL
):
tk = self.lollms.model.tokenize(text)
prev_len = len(tk)
while len(tk)>max_summary_size:
chunk_size = int(self.lollms.config.ctx_size*0.6)
document_chunks = DocumentDecomposer.decompose_document(text, chunk_size, 0, self.lollms.model.tokenize, self.lollms.model.detokenize, True)
text = self.summerize_chunks(
document_chunks,
data_extraction_instruction,
doc_name,
answer_start,
max_generation_size,
callback,
chunk_summary_post_processing=chunk_summary_post_processing,
summary_mode=summary_mode
)
tk = self.lollms.model.tokenize(text)
dtk_ln=prev_len-len(tk)
prev_len = len(tk)
self.step(f"Current text size : {prev_len}, max summary size : {max_summary_size}")
if dtk_ln<=10: # it is not sumlmarizing
break
self.step_start(f"Rewriting ...")
text = self.summerize_chunks(
[text],
final_task_instruction,
doc_name, answer_start,
max_generation_size,
callback,
chunk_summary_post_processing=chunk_summary_post_processing
)
self.step_end(f"Rewriting ...")
return text
def summerize_chunks(
self,
chunks,
summary_instruction="summerize",
doc_name="chunk",
answer_start="",
max_generation_size=3000,
callback=None,
chunk_summary_post_processing=None,
summary_mode=SUMMARY_MODE.SUMMARY_MODE_SEQUENCIAL
):
if summary_mode==SUMMARY_MODE.SUMMARY_MODE_SEQUENCIAL:
summary = ""
for i, chunk in enumerate(chunks):
self.step_start(f" Summary of {doc_name} - Processing chunk : {i+1}/{len(chunks)}")
if summary !="":
summary = f"{answer_start}"+ self.fast_gen(
"\n".join([
f"!@>Document_chunk: {doc_name}:",
f"This is a cumulative summary step. Use the summary of the previous chunks and the current chunk of the document to make a new summary integrating information from both. Make sure not to loose information from previous summaries",
f"Summary of previous chunks",
f"{summary}",
f"current chunk:",
f"{chunk}",
f"!@>instruction: {summary_instruction}",
f"The summary should extract required information from the current chunk to increment the previous summary.",
f"Answer directly with the cumulative summary with no extra comments.",
f"!@>summary:",
f"{answer_start}"
]),
max_generation_size=max_generation_size,
callback=callback)
else:
summary = f"{answer_start}"+ self.fast_gen(
"\n".join([
f"!@>Document_chunk: {doc_name}:",
f"current chunk:",
f"{chunk}",
f"!@>instruction: {summary_instruction}",
f"Answer directly with the summary with no extra comments.",
f"!@>summary:",
f"{answer_start}"
]),
max_generation_size=max_generation_size,
callback=callback)
if chunk_summary_post_processing:
summary = chunk_summary_post_processing(summary)
self.step_end(f" Summary of {doc_name} - Processing chunk : {i+1}/{len(chunks)}")
return summary
else:
summeries = []
for i, chunk in enumerate(chunks):
self.step_start(f" Summary of {doc_name} - Processing chunk : {i+1}/{len(chunks)}")
summary = f"{answer_start}"+ self.fast_gen(
"\n".join([
f"!@>Document_chunk [{doc_name}]:",
f"{chunk}",
f"!@>instruction: {summary_instruction}",
f"Answer directly with the summary with no extra comments.",
f"!@>summary:",
f"{answer_start}"
]),
max_generation_size=max_generation_size,
callback=callback)
if chunk_summary_post_processing:
summary = chunk_summary_post_processing(summary)
summeries.append(summary)
self.step_end(f" Summary of {doc_name} - Processing chunk : {i+1}/{len(chunks)}")
return "\n".join(summeries)
def sequencial_chunks_summary(
self,
chunks,
summary_instruction="summerize",
doc_name="chunk",
answer_start="",
max_generation_size=3000,
callback=None,
chunk_summary_post_processing=None
):
summeries = []
for i, chunk in enumerate(chunks):
if i<len(chunks)-1:
chunk1 = chunks[i+1]
else:
chunk1=""
if i>0:
chunk=summary
self.step_start(f" Summary of {doc_name} - Processing chunk : {i+1}/{len(chunks)}")
summary = f"{answer_start}"+ self.fast_gen(
"\n".join([
f"!@>Document_chunk: {doc_name}:",
f"Block1:",
f"{chunk}",
f"Block2:",
f"{chunk1}",
f"!@>instruction: {summary_instruction}",
f"Answer directly with the summary with no extra comments.",
f"!@>summary:",
f"{answer_start}"
]),
max_generation_size=max_generation_size,
callback=callback)
if chunk_summary_post_processing:
summary = chunk_summary_post_processing(summary)
summeries.append(summary)
self.step_end(f" Summary of {doc_name} - Processing chunk : {i+1}/{len(chunks)}")
return "\n".join(summeries)