mirror of
https://github.com/ParisNeo/lollms.git
synced 2025-01-02 19:06:41 +00:00
172 lines
5.6 KiB
Python
172 lines
5.6 KiB
Python
import argparse
|
|
import socketio
|
|
from pathlib import Path
|
|
from lollms import MSG_TYPE
|
|
from lollms.helpers import ASCIIColors
|
|
from lollms.paths import LollmsPaths
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
import PyPDF2
|
|
from docx import Document
|
|
from bs4 import BeautifulSoup
|
|
import json
|
|
import csv
|
|
from pptx import Presentation
|
|
import sys
|
|
|
|
# Connect to the Socket.IO server
|
|
sio = socketio.Client()
|
|
|
|
|
|
|
|
def read_pdf_file(file_path):
|
|
with open(file_path, 'rb') as file:
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
text = ""
|
|
for page in pdf_reader.pages:
|
|
text += page.extract_text()
|
|
return text
|
|
|
|
|
|
def read_docx_file(file_path):
|
|
doc = Document(file_path)
|
|
text = ""
|
|
for paragraph in doc.paragraphs:
|
|
text += paragraph.text + "\n"
|
|
return text
|
|
|
|
|
|
def read_json_file(file_path):
|
|
with open(file_path, 'r') as file:
|
|
data = json.load(file)
|
|
return data
|
|
|
|
|
|
def read_csv_file(file_path):
|
|
with open(file_path, 'r') as file:
|
|
csv_reader = csv.reader(file)
|
|
lines = [row for row in csv_reader]
|
|
return lines
|
|
|
|
|
|
def read_html_file(file_path):
|
|
with open(file_path, 'r') as file:
|
|
soup = BeautifulSoup(file, 'html.parser')
|
|
text = soup.get_text()
|
|
return text
|
|
|
|
def read_pptx_file(file_path):
|
|
prs = Presentation(file_path)
|
|
text = ""
|
|
for slide in prs.slides:
|
|
for shape in slide.shapes:
|
|
if shape.has_text_frame:
|
|
for paragraph in shape.text_frame.paragraphs:
|
|
for run in paragraph.runs:
|
|
text += run.text
|
|
return text
|
|
|
|
def read_text_file(file_path):
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
return content
|
|
|
|
def chunk(input_text, word_count):
|
|
# Check for empty text case (word count = 0)
|
|
if not input_text or word_count == 0:
|
|
return []
|
|
|
|
# Split the string into words using .split() method. Remove leading and trailing whitespace characters from each word by replacing them with an empty strinng ""
|
|
cleaned_words = [word.strip(' ') for word in input_text.lower().split()]
|
|
|
|
return [' '.join(cleaned_words[i:i+word_count]) for i in range(0,len(cleaned_words),word_count)]
|
|
|
|
# Event handler for receiving generated text
|
|
@sio.event
|
|
def text_generated(data):
|
|
print('Generated text:', data)
|
|
|
|
def test_generate_text(host, port, lollms_paths:LollmsPaths):
|
|
docs=lollms_paths.personal_data_path/"docs.txt"
|
|
questions_path = lollms_paths.personal_data_path/"questions.txt"
|
|
outputs=lollms_paths.personal_data_path/"outputs.txt"
|
|
|
|
if not docs.exists():
|
|
ASCIIColors.error(f"Documents file {docs} does not exist")
|
|
sys.exit(0)
|
|
|
|
if not questions_path.exists():
|
|
sys.exit(0)
|
|
|
|
files = []
|
|
# Read the text file and split by multiple newlines
|
|
print("Loading files")
|
|
with open(docs, 'r') as file:
|
|
file = file.read().split('\n')
|
|
files.append(Path(file[0]))
|
|
|
|
for file in files:
|
|
infos={
|
|
"is_ready":False,
|
|
"answer":""
|
|
}
|
|
if file.suffix.lower()==".pdf":
|
|
txt = read_pdf_file(file)
|
|
|
|
file_chunks = chunk(txt,512)
|
|
# Event handler for successful connection
|
|
@sio.event
|
|
def connect():
|
|
print('Connected to Socket.IO server')
|
|
questions = []
|
|
with open(questions_path, 'r') as file:
|
|
questions = file.read().split('\n')
|
|
|
|
for question in questions:
|
|
print(f"Question:{question}")
|
|
useful_chunks=[]
|
|
for chunk in file_chunks:
|
|
prompt="Read text chunk, then answer question by yes or no\nText Chunk:\n"+chunk+"\nQ: "+question+"\nA:"
|
|
# Trigger the 'generate_text' event with the prompt
|
|
infos["is_ready"]=False
|
|
print(f"Sending prompt:{prompt}")
|
|
sio.emit('generate_text', {'prompt': prompt, 'personality':-1, "n_predicts":1024})
|
|
while infos["is_ready"]==False:
|
|
time.sleep(0.1)
|
|
if(infos["answer"].lower()=="yes"):
|
|
ASCIIColors.info("Found useful chunk")
|
|
useful_chunks.append(chunk)
|
|
with open(outputs, 'w') as file:
|
|
file.writelines([question,"Useful chunks"])
|
|
for i in range(len(useful_chunks)):
|
|
file.writelines([useful_chunks[i]])
|
|
|
|
|
|
@sio.event
|
|
def text_chunk(data):
|
|
print(data["chunk"],end="",flush=True)
|
|
|
|
@sio.event
|
|
def text_generated(data):
|
|
infos["answer"]=data["text"]
|
|
infos["is_ready"]=True
|
|
|
|
print(f"Connecting to http://{host}:{port}")
|
|
# Connect to the Socket.IO server
|
|
sio.connect(f'http://{host}:{port}')
|
|
|
|
# Start the event loop
|
|
sio.wait()
|
|
|
|
if __name__ == '__main__':
|
|
# Parse command-line arguments
|
|
parser = argparse.ArgumentParser(description='Socket.IO endpoint test')
|
|
parser.add_argument('--host', type=str, default='localhost', help='Socket.IO server host')
|
|
parser.add_argument('--port', type=int, default=9601, help='Socket.IO server port')
|
|
parser.add_argument('--text-file', type=str, default=str(Path(__file__).parent/"example_text_gen.txt"),help='Path to the text file')
|
|
args = parser.parse_args()
|
|
lollms_paths = LollmsPaths.find_paths(force_local=False)
|
|
# Run the test with provided arguments
|
|
test_generate_text(args.host, args.port, lollms_paths)
|