upgraded stt

This commit is contained in:
Saifeddine ALOUI 2024-05-26 21:42:27 +02:00
parent 2d0c4e76be
commit 22f553d9a7
8 changed files with 145 additions and 85 deletions

View File

@ -167,6 +167,10 @@ class LoLLMsCom:
"""
pass
def emit_socket_io_info(self, name, data, client_id):
pass
def notify(
self,
content:str,

View File

@ -7,7 +7,7 @@ from lollms.utilities import PackageManager
from ascii_colors import trace_exception
# Ensure necessary packages are installed
if not PackageManager.check_package_installed("beautifulsoup4"):
if not PackageManager.check_package_installed("bs4"):
PackageManager.install_package("beautifulsoup4")
if not PackageManager.check_package_installed("html2text"):
PackageManager.install_package("html2text")

View File

@ -8,7 +8,7 @@ License: Apache 2.0
"""
from lollms.utilities import PackageManager
from lollms.com import LoLLMsCom
from lollms.utilities import trace_exception, run_async
from lollms.utilities import trace_exception, run_async, install_conda_package
from lollms.types import MSG_TYPE, SENDER_TYPES
from lollms.client_session import Session
from ascii_colors import ASCIIColors
@ -16,6 +16,7 @@ import platform
from functools import partial
import subprocess
from collections import deque
from scipy.signal import butter, lfilter
import os
import threading
@ -47,15 +48,7 @@ import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
if not PackageManager.check_package_installed("whisper"):
PackageManager.install_package("openai-whisper")
try:
import conda.cli
conda.cli.main("install", "conda-forge::ffmpeg", "-y")
except:
ASCIIColors.bright_red("Couldn't install ffmpeg. whisper won't work. Please install it manually")
import whisper
import socketio
from lollms.com import LoLLMsCom
@ -92,7 +85,37 @@ from lollms.function_call import FunctionCalling_Library
from lollms.client_session import Client
from datetime import datetime
import math
import sys
def update_progress_bar(silence_counter, max_silence):
bar_length = 40 # Length of the progress bar
progress = silence_counter / max_silence
block = int(round(bar_length * progress))
# Determine the color based on progress
if progress < 0.5:
color = ASCIIColors.color_bright_green
else:
color = ASCIIColors.color_bright_red
bar = "#" * block + "-" * (bar_length - block)
sys.stdout.write("\r")
ASCIIColors.print(f"silence_counter: {silence_counter} |{bar}| {round(progress * 100, 2)}%", color=color, end="")
sys.stdout.flush()
# Step 1: Define your high-pass and low-pass filters (theyre like the bouncers for your audio club)
def butter_bandpass(lowcut, highcut, fs, order=5):
nyquist = 0.5 * fs
low = lowcut / nyquist
high = highcut / nyquist
b, a = butter(order, [low, high], btype='band')
return b, a
def bandpass_filter(data, lowcut, highcut, fs, order=5):
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = lfilter(b, a, data)
return y
class RTCom:
def __init__(
@ -108,25 +131,16 @@ class RTCom:
rate=44100,
channels=1,
buffer_size=10,
model="small.en",
snd_input_device=None,
snd_output_device=None,
logs_folder="logs",
voice=None,
block_while_talking=True,
context_size=4096
):
self.sio = sio
self.lc = lc
self.client = client
self.tts = LollmsTTS(self.lc)
self.tl = TasksLibrary(self.lc)
self.block_listening = False
if not voice:
voices = self.get_voices()
voice = voices[0]
self.voice = voice
self.context_size = context_size
self.personality = personality
self.rate = rate
@ -171,32 +185,6 @@ class RTCom:
self.transcribed_files = deque()
self.buffer_lock = threading.Condition()
self.transcribed_lock = threading.Condition()
self.lc.ShowBlockingMessage("Loading whisper...")
ASCIIColors.info("Loading whisper...", end="",flush=True)
self.model = model
self.whisper = whisper.load_model(model)
self.lc.HideBlockingMessage()
ASCIIColors.success("OK")
def get_date_time(self):
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
def calculator_function(self, expression: str) -> float:
try:
# Add the math module functions to the local namespace
allowed_names = {k: v for k, v in math.__dict__.items() if not k.startswith("__")}
# Evaluate the expression safely using the allowed names
result = eval(expression, {"__builtins__": None}, allowed_names)
return result
except Exception as e:
return str(e)
def take_a_photo(self):
return "Couldn't take a photo"
def start_recording(self):
self.recording = True
@ -224,8 +212,12 @@ class RTCom:
# self._save_histogram(self.audio_values)
def callback(self, indata, frames, time, status):
max_scilence = int((self.rate / frames) * self.silence_duration)
if not self.block_listening:
# Transform the buffer into a numpy array (like turning a frog into a prince)
audio_data = np.frombuffer(indata, dtype=np.int16)
# Apply the bandpass filter to the incoming audio data
audio_data = bandpass_filter(audio_data, lowcut=300, highcut=3000, fs=self.rate)
max_value = np.max(audio_data)
min_value = np.min(audio_data)
@ -237,6 +229,7 @@ class RTCom:
self.audio_values.extend(audio_data)
self.total_frames += frames
ASCIIColors.red(f" max_value: {max_value}", end="")
if max_value < self.threshold:
self.silence_counter += 1
self.current_silence_duration += frames
@ -248,21 +241,19 @@ class RTCom:
if self.current_silence_duration > self.longest_silence_duration:
self.longest_silence_duration = self.current_silence_duration
if self.silence_counter > (self.rate / frames) * self.silence_duration:
ASCIIColors.red("Silence counter reached threshold")
if self.silence_counter > max_scilence:
trimmed_frames = self._trim_silence(self.frames)
sound_percentage = self._calculate_sound_percentage(trimmed_frames)
ASCIIColors.red(f"Sound percentage {sound_percentage}")
if sound_percentage >= self.sound_threshold_percentage:
ASCIIColors.red(f"Sound percentage {sound_percentage}")
ASCIIColors.red("\nSilence counter reached threshold")
self._save_wav(self.frames)
self.frames = []
self.silence_counter = 0
self.total_frames = 0
self.sound_frames = 0
else:
ASCIIColors.red(f"Appending data")
ASCIIColors.yellow(f"silence_counter: {self.silence_counter}")
print(f"silence duration: {(self.rate / frames) * self.silence_duration}")
update_progress_bar(self.silence_counter, max_scilence)
self.frames.append(indata.copy())
else:
self.frames = []
@ -385,7 +376,7 @@ class RTCom:
ASCIIColors.red(" -------------------------------------------------")
self.lc.info("Talking")
ASCIIColors.green("<<TALKING>>")
self.lc.tts.tts_audio(lollms_text, speaker=self.voice, file_name_or_path=str(Path(self.logs_folder)/filename)+"_answer.wav")
self.lc.tts.tts_audio(lollms_text, file_name_or_path=str(Path(self.logs_folder)/filename)+"_answer.wav")
except Exception as ex:
trace_exception(ex)
self.block_listening = False
@ -400,6 +391,81 @@ class RTCom:
return voices
return []
from pathlib import Path
import os
import sounddevice as sd
import threading
import datetime
import wave
class AudioNinja:
def __init__(self, lc:LollmsApplication, logs_folder='logs', device=None):
"""
Initialize the AudioNinja with a LollmsApplication object,
a log folder, and an optional recording device.
Args:
lc (LollmsApplication): The LollmsApplication object for communication.
logs_folder (str): The folder to save recordings. Default is 'logs'.
device (int or str): The recording device index or name. Default is None.
"""
self.lc = lc
self.logs_folder = Path(logs_folder)
self.device = device
self.recording_thread = None
self.is_recording = False
self.frames = []
if not self.logs_folder.exists():
self.logs_folder.mkdir(parents=True, exist_ok=True)
self.lc.info(f"AudioNinja is ready to strike from the shadows! Logging to '{self.logs_folder}' with device '{self.device}'")
def _record_audio(self):
"""
Internal method to handle audio recording callback.
"""
def callback(indata, frames, time, status):
if self.is_recording:
self.frames.append(indata.copy())
self.lc.info("Ninja is capturing sounds... Shhh!")
with sd.InputStream(callback=callback, device=self.device):
while self.is_recording:
sd.sleep(1000)
def start_recording(self):
"""
Start the audio recording.
"""
if not self.is_recording:
self.is_recording = True
self.frames = []
self.recording_thread = threading.Thread(target=self._record_audio)
self.recording_thread.start()
self.lc.info("Ninja recording started! 🥷🔴")
def stop_recording(self):
"""
Stop the audio recording.
"""
if self.is_recording:
self.is_recording = False
self.recording_thread.join()
self._save_recording()
self.lc.info("Ninja recording stopped! 🥷⚪️")
def _save_recording(self):
"""
Save the recorded audio to a .wav file.
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = self.logs_folder / f"recording_{timestamp}.wav"
with wave.open(filename, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(sd.default.dtype[0].itemsize)
wf.setframerate(44100)
wf.writeframes(b''.join(self.frames))
self.lc.info(f"Ninja stored the audio file at '{filename}'! 🥷📂")
class WebcamImageSender:
"""
@ -535,17 +601,6 @@ class MusicPlayer(threading.Thread):
class RealTimeTranscription:
def __init__(self, callback):
if not PackageManager.check_package_installed('pyaudio'):
try:
import conda.cli
conda.cli.main("install", "anaconda::pyaudio", "-y")
except:
ASCIIColors.bright_red("Couldn't install pyaudio. whisper won't work. Please install it manually")
import pyaudio
# Initialize Whisper ASR
print("Loading whisper ...", end="")
self.whisper = whisper.load_model("base")
print("ok")
# Set up PyAudio
self.p = pyaudio.PyAudio()

View File

@ -995,24 +995,16 @@ class AIPersonality:
self.new_message("")
self.info(f"Transcribing ... ")
self.step_start("Transcribing ... ")
if self.whisper is None:
if not PackageManager.check_package_installed("whisper"):
PackageManager.install_package("openai-whisper")
try:
import conda.cli
conda.cli.main("install", "conda-forge::ffmpeg", "-y")
except:
ASCIIColors.bright_red("Couldn't install ffmpeg. whisper won't work. Please install it manually")
import whisper
self.whisper = whisper.load_model("base")
result = self.whisper.transcribe(str(path))
if self.app.stt is None:
self.InfoMessage("No STT service is up.\nPlease configure your default STT service in the settings page.")
return
text = self.app.stt.transcribe(str(path))
transcription_fn = str(path)+".txt"
with open(transcription_fn, "w", encoding="utf-8") as f:
f.write(result["text"])
f.write(text)
self.info(f"File saved to {transcription_fn}")
self.full(result["text"])
self.full(text)
self.step_end("Transcribing ... ")
elif path.suffix in [".png",".jpg",".jpeg",".gif",".bmp",".svg",".webp"]:
self.image_files.append(path)

View File

@ -85,11 +85,11 @@ class LollmsOpenAITTS(LollmsTTS):
response.write_to_file(speech_file_path)
def tts_audio(self, text, speaker, file_name_or_path:Path|str=None, language="en", use_threading=False):
def tts_audio(self, text, speaker:str=None, file_name_or_path:Path|str=None, language="en", use_threading=False):
speech_file_path = file_name_or_path
response = self.client.audio.speech.create(
model=self.model,
voice=self.voice,
voice=self.voice if speaker is None else speaker,
input=text,
response_format="wav"

View File

@ -18,9 +18,18 @@ from ascii_colors import ASCIIColors, trace_exception
from lollms.paths import LollmsPaths
import subprocess
if not PackageManager.check_package_installed("whisper"):
install_conda_package("ffmpeg")
PackageManager.install_package("whisper")
try:
if not PackageManager.check_package_installed("whisper"):
PackageManager.install_package("openai-whisper")
try:
install_conda_package("conda-forge::ffmpeg")
except Exception as ex:
trace_exception(ex)
ASCIIColors.red("Couldn't install ffmpeg")
except:
PackageManager.install_package("git+https://github.com/openai/whisper.git")
import whisper

View File

@ -291,7 +291,7 @@ class LollmsXTTS(LollmsTTS):
return file_name_or_path
def tts_audio(self, text, speaker, file_name_or_path:Path|str=None, language="en", use_threading=False):
def tts_audio(self, text, speaker=None, file_name_or_path:Path|str=None, language="en", use_threading=False):
voice=self.app.config.xtts_current_voice if speaker is None else speaker
index = find_first_available_file_index(self.output_folder, "voice_sample_",".wav")
output_fn=f"voice_sample_{index}.wav" if file_name_or_path is None else file_name_or_path

View File

@ -75,7 +75,7 @@ class LollmsTTS:
"""
pass
def tts_audio(self, text, speaker, file_name_or_path: Path | str = None, language="en", use_threading=False):
def tts_audio(self, text, speaker=None, file_name_or_path: Path | str = None, language="en", use_threading=False):
"""
Converts the given text to speech and returns the audio data.