lollms/examples/document_questions/document_questions.py

172 lines
5.6 KiB
Python
Raw Normal View History

2023-06-19 16:51:41 +02:00
import argparse
import socketio
from pathlib import Path
from lollms import MSG_TYPE
from lollms.helpers import ASCIIColors
2023-06-19 21:37:19 +02:00
from lollms.paths import LollmsPaths
2023-06-19 16:51:41 +02:00
import time
import json
from pathlib import Path
import PyPDF2
from docx import Document
from bs4 import BeautifulSoup
import json
import csv
from pptx import Presentation
2023-06-19 21:37:19 +02:00
import sys
2023-06-19 16:51:41 +02:00
# Connect to the Socket.IO server
sio = socketio.Client()
def read_pdf_file(file_path):
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return text
def read_docx_file(file_path):
doc = Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
def read_json_file(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return data
def read_csv_file(file_path):
with open(file_path, 'r') as file:
csv_reader = csv.reader(file)
lines = [row for row in csv_reader]
return lines
def read_html_file(file_path):
with open(file_path, 'r') as file:
soup = BeautifulSoup(file, 'html.parser')
text = soup.get_text()
return text
def read_pptx_file(file_path):
prs = Presentation(file_path)
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
for run in paragraph.runs:
text += run.text
return text
def read_text_file(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
def chunk(input_text, word_count):
# Check for empty text case (word count = 0)
if not input_text or word_count == 0:
return []
# Split the string into words using .split() method. Remove leading and trailing whitespace characters from each word by replacing them with an empty strinng ""
cleaned_words = [word.strip(' ') for word in input_text.lower().split()]
return [' '.join(cleaned_words[i:i+word_count]) for i in range(0,len(cleaned_words),word_count)]
# Event handler for receiving generated text
@sio.event
def text_generated(data):
print('Generated text:', data)
2023-06-19 21:37:19 +02:00
def test_generate_text(host, port, lollms_paths:LollmsPaths):
docs=lollms_paths.personal_data_path/"docs.txt"
questions_path = lollms_paths.personal_data_path/"questions.txt"
outputs=lollms_paths.personal_data_path/"outputs.txt"
if not docs.exists():
2023-06-20 01:29:24 +02:00
ASCIIColors.error(f"Documents file {docs} does not exist")
2023-06-19 21:37:19 +02:00
sys.exit(0)
if not questions_path.exists():
sys.exit(0)
2023-06-19 16:51:41 +02:00
files = []
# Read the text file and split by multiple newlines
print("Loading files")
with open(docs, 'r') as file:
file = file.read().split('\n')
files.append(Path(file[0]))
for file in files:
infos={
"is_ready":False,
"answer":""
}
if file.suffix.lower()==".pdf":
txt = read_pdf_file(file)
file_chunks = chunk(txt,512)
# Event handler for successful connection
@sio.event
def connect():
print('Connected to Socket.IO server')
questions = []
with open(questions_path, 'r') as file:
questions = file.read().split('\n')
for question in questions:
print(f"Question:{question}")
useful_chunks=[]
for chunk in file_chunks:
2023-06-20 01:29:24 +02:00
prompt="Read text chunk, then answer question by yes or no\nText Chunk:\n"+chunk+"\nQ: "+question+"\nA:"
2023-06-19 16:51:41 +02:00
# Trigger the 'generate_text' event with the prompt
infos["is_ready"]=False
print(f"Sending prompt:{prompt}")
sio.emit('generate_text', {'prompt': prompt, 'personality':-1, "n_predicts":1024})
while infos["is_ready"]==False:
time.sleep(0.1)
if(infos["answer"].lower()=="yes"):
ASCIIColors.info("Found useful chunk")
useful_chunks.append(chunk)
with open(outputs, 'w') as file:
file.writelines([question,"Useful chunks"])
for i in range(len(useful_chunks)):
file.writelines([useful_chunks[i]])
@sio.event
def text_chunk(data):
print(data["chunk"],end="",flush=True)
@sio.event
def text_generated(data):
infos["answer"]=data["text"]
infos["is_ready"]=True
print(f"Connecting to http://{host}:{port}")
# Connect to the Socket.IO server
sio.connect(f'http://{host}:{port}')
# Start the event loop
sio.wait()
if __name__ == '__main__':
# Parse command-line arguments
parser = argparse.ArgumentParser(description='Socket.IO endpoint test')
parser.add_argument('--host', type=str, default='localhost', help='Socket.IO server host')
parser.add_argument('--port', type=int, default=9601, help='Socket.IO server port')
parser.add_argument('--text-file', type=str, default=str(Path(__file__).parent/"example_text_gen.txt"),help='Path to the text file')
args = parser.parse_args()
2023-06-19 21:37:19 +02:00
lollms_paths = LollmsPaths.find_paths(force_local=False)
2023-06-19 16:51:41 +02:00
# Run the test with provided arguments
2023-06-19 21:37:19 +02:00
test_generate_text(args.host, args.port, lollms_paths)