mirror of
https://github.com/SevaSk/ecoute.git
synced 2024-12-24 06:46:38 +00:00
117 lines
4.9 KiB
Python
117 lines
4.9 KiB
Python
import whisper
|
|
import torch
|
|
import wave
|
|
import os
|
|
import threading
|
|
from tempfile import NamedTemporaryFile
|
|
import custom_speech_recognition as sr
|
|
import io
|
|
from datetime import timedelta
|
|
import pyaudiowpatch as pyaudio
|
|
from AudioRecorder import DefaultMicRecorder, DefaultSpeakerRecorder
|
|
from heapq import merge
|
|
|
|
PHRASE_TIMEOUT = 3.01
|
|
|
|
MAX_PHRASES = 10
|
|
|
|
class AudioTranscriber:
|
|
def __init__(self, default_mic : DefaultMicRecorder, default_speaker : DefaultSpeakerRecorder):
|
|
self.mic_transcript_data = []
|
|
self.speaker_transcript_data = []
|
|
self.transcript_changed_event = threading.Event()
|
|
self.audio_model = whisper.load_model(os.getcwd() + r'\tiny.en' + '.pt')
|
|
|
|
self.mic_sample_rate = default_mic.source.SAMPLE_RATE
|
|
self.mic_sample_width = default_mic.source.SAMPLE_WIDTH
|
|
self.mic_channels = default_mic.num_channels
|
|
|
|
self.speaker_sample_rate = default_speaker.source.SAMPLE_RATE
|
|
self.speaker_sample_width = default_speaker.source.SAMPLE_WIDTH
|
|
self.speaker_channels = default_speaker.num_channels
|
|
|
|
def create_transcription_from_queue(self, audio_queue):
|
|
mic_last_sample = bytes()
|
|
speaker_last_sample = bytes()
|
|
|
|
mic_last_spoken = None
|
|
speaker_last_spoken = None
|
|
|
|
mic_start_new_phrase = True
|
|
speaker_start_new_phrase = True
|
|
|
|
while True:
|
|
top_of_queue = audio_queue.get()
|
|
who_spoke = top_of_queue[0]
|
|
data = top_of_queue[1]
|
|
time_spoken = top_of_queue[2]
|
|
|
|
if who_spoke == "You":
|
|
if mic_last_spoken and time_spoken - mic_last_spoken > timedelta(seconds=PHRASE_TIMEOUT):
|
|
mic_last_sample = bytes()
|
|
mic_start_new_phrase = True
|
|
else:
|
|
mic_start_new_phrase = False
|
|
|
|
mic_last_sample += data
|
|
mic_last_spoken = time_spoken
|
|
|
|
mic_temp_file = NamedTemporaryFile().name
|
|
audio_data = sr.AudioData(mic_last_sample, self.mic_sample_rate, self.mic_sample_width)
|
|
wav_data = io.BytesIO(audio_data.get_wav_data())
|
|
with open(mic_temp_file, 'w+b') as f:
|
|
f.write(wav_data.read())
|
|
|
|
result = self.audio_model.transcribe(mic_temp_file, fp16=torch.cuda.is_available())
|
|
text = result['text'].strip()
|
|
|
|
if text != '' and text.lower() != 'you':
|
|
if mic_start_new_phrase or len(self.mic_transcript_data) == 0:
|
|
if len(self.mic_transcript_data) > MAX_PHRASES:
|
|
self.mic_transcript_data.pop()
|
|
self.mic_transcript_data = [(who_spoke + ": [" + text + ']\n\n', time_spoken)] + self.mic_transcript_data
|
|
self.transcript_changed_event.set()
|
|
else:
|
|
self.mic_transcript_data[0] = (who_spoke + ": [" + text + ']\n\n',
|
|
time_spoken)
|
|
self.transcript_changed_event.set()
|
|
else:
|
|
if speaker_last_spoken and time_spoken - speaker_last_spoken > timedelta(seconds=PHRASE_TIMEOUT):
|
|
speaker_last_sample = bytes()
|
|
speaker_start_new_phrase = True
|
|
else:
|
|
speaker_start_new_phrase = False
|
|
|
|
speaker_last_sample += data
|
|
speaker_last_spoken = time_spoken
|
|
|
|
speaker_temp_file = NamedTemporaryFile().name
|
|
|
|
with wave.open(speaker_temp_file, 'wb') as wf:
|
|
wf.setnchannels(self.speaker_channels)
|
|
p = pyaudio.PyAudio()
|
|
wf.setsampwidth(p.get_sample_size(pyaudio.paInt16))
|
|
wf.setframerate(self.speaker_sample_rate)
|
|
wf.writeframes(speaker_last_sample)
|
|
|
|
result = self.audio_model.transcribe(speaker_temp_file, fp16=torch.cuda.is_available())
|
|
text = result['text'].strip()
|
|
|
|
if text != '' and text.lower() != 'you':
|
|
if speaker_start_new_phrase or len(self.speaker_transcript_data) == 0:
|
|
if len(self.mic_transcript_data) > MAX_PHRASES:
|
|
self.mic_transcript_data.pop()
|
|
self.speaker_transcript_data = [(who_spoke + ": [" + text + ']\n\n', time_spoken)] + self.speaker_transcript_data
|
|
self.transcript_changed_event.set()
|
|
|
|
else:
|
|
self.speaker_transcript_data[0] = (who_spoke + ": [" + text + ']\n\n',
|
|
time_spoken)
|
|
self.transcript_changed_event.set()
|
|
|
|
def get_transcript(self):
|
|
key = lambda x : x[1]
|
|
transcript_tuple = list(merge(self.mic_transcript_data, self.speaker_transcript_data, key=key, reverse=True))
|
|
transcript_tuple = transcript_tuple[0:MAX_PHRASES]
|
|
return "".join([t[0] for t in transcript_tuple])
|
|
|