mirror of
https://github.com/ParisNeo/lollms.git
synced 2025-02-01 08:48:19 +00:00
enhanced function calls
This commit is contained in:
parent
1e11d05eb9
commit
eaba5573ba
@ -10,7 +10,7 @@ from lollms.utilities import PackageManager
|
|||||||
from ascii_colors import trace_exception
|
from ascii_colors import trace_exception
|
||||||
|
|
||||||
# Installing necessary packages
|
# Installing necessary packages
|
||||||
if not PackageManager.check_package_installed("youtube-transcript-api"):
|
if not PackageManager.check_package_installed("youtube_transcript_api"):
|
||||||
PackageManager.install_package("youtube-transcript-api")
|
PackageManager.install_package("youtube-transcript-api")
|
||||||
|
|
||||||
# Importing the package after installation
|
# Importing the package after installation
|
||||||
|
74
lollms/functions/youtube/download_transcript_by_channel.py
Normal file
74
lollms/functions/youtube/download_transcript_by_channel.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
# Lollms function call definition file
|
||||||
|
# File Name: download_channel_transcripts.py
|
||||||
|
# Author: ParisNeo
|
||||||
|
# Description: This function takes a YouTube channel name, scans all their videos using web scraping, and downloads the transcripts. Each transcript is saved in a folder as a text file.
|
||||||
|
|
||||||
|
# Importing necessary libraries
|
||||||
|
from functools import partial
|
||||||
|
from typing import List
|
||||||
|
from lollms.utilities import PackageManager
|
||||||
|
from ascii_colors import trace_exception
|
||||||
|
import pathlib
|
||||||
|
import requests
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
# Installing necessary packages
|
||||||
|
if not PackageManager.check_package_installed("youtube_transcript_api"):
|
||||||
|
PackageManager.install_package("youtube-transcript-api")
|
||||||
|
|
||||||
|
# Importing the package after installation
|
||||||
|
from youtube_transcript_api import YouTubeTranscriptApi
|
||||||
|
|
||||||
|
def download_channel_transcripts(channel_url: str, output_folder: str) -> str:
|
||||||
|
"""
|
||||||
|
This function takes a YouTube channel URL, scans all their videos using web scraping, and downloads the transcripts.
|
||||||
|
Each transcript is saved in a folder as a text file.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
channel_url (str): The URL of the YouTube channel.
|
||||||
|
output_folder (str): The folder where transcripts will be saved.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: A message indicating the status of the download process.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Create output folder if it doesn't exist
|
||||||
|
output_folder_path = pathlib.Path(output_folder)
|
||||||
|
output_folder_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Get channel page content
|
||||||
|
response = requests.get(channel_url)
|
||||||
|
response.raise_for_status()
|
||||||
|
soup = BeautifulSoup(response.content, "html.parser")
|
||||||
|
|
||||||
|
# Find all video links
|
||||||
|
video_links = soup.find_all("a", href=True)
|
||||||
|
video_ids = [link['href'].split('v=')[1] for link in video_links if "watch?v=" in link['href']]
|
||||||
|
|
||||||
|
# Remove duplicates
|
||||||
|
video_ids = list(set(video_ids))
|
||||||
|
|
||||||
|
# Download transcripts and save to files
|
||||||
|
for video_id in video_ids:
|
||||||
|
try:
|
||||||
|
transcript = YouTubeTranscriptApi.get_transcript(video_id)
|
||||||
|
transcript_text = " ".join([entry['text'] for entry in transcript])
|
||||||
|
|
||||||
|
output_file_path = output_folder_path / f"{video_id}.txt"
|
||||||
|
output_file_path.write_text(transcript_text, encoding='utf-8')
|
||||||
|
except Exception as e:
|
||||||
|
trace_exception(e)
|
||||||
|
|
||||||
|
return "Transcripts downloaded successfully!"
|
||||||
|
except Exception as e:
|
||||||
|
return trace_exception(e)
|
||||||
|
|
||||||
|
def download_channel_transcripts_function(output_folder:str):
|
||||||
|
return {
|
||||||
|
"function_name": "download_channel_transcripts",
|
||||||
|
"function": partial(download_channel_transcripts, output_folder=output_folder),
|
||||||
|
"function_description": "This function takes a YouTube channel name, scans all their videos using web scraping, and downloads the transcripts. Each transcript is saved in a folder as a text file.",
|
||||||
|
"function_parameters": [
|
||||||
|
{"name": "channel_url", "type": "str"},
|
||||||
|
]
|
||||||
|
}
|
@ -33,6 +33,7 @@ from lollms.tti import LollmsTTI
|
|||||||
import subprocess
|
import subprocess
|
||||||
import shutil
|
import shutil
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
import threading
|
||||||
|
|
||||||
if not PackageManager.check_package_installed("websocket"):
|
if not PackageManager.check_package_installed("websocket"):
|
||||||
PackageManager.install_or_update("websocket-client")
|
PackageManager.install_or_update("websocket-client")
|
||||||
@ -224,10 +225,14 @@ class LollmsComfyUI(LollmsTTI):
|
|||||||
|
|
||||||
# Wait until the service is available at http://127.0.0.1:8188//
|
# Wait until the service is available at http://127.0.0.1:8188//
|
||||||
if wait_for_service:
|
if wait_for_service:
|
||||||
self.wait_for_service(max_retries=max_retries)
|
self.wait_for_service()
|
||||||
else:
|
else:
|
||||||
ASCIIColors.warning("We are not waiting for the SD service to be up.\nThis means that you may need to wait a bit before you can use it.")
|
self.wait_for_service_in_another_thread(max_retries=max_retries)
|
||||||
|
|
||||||
|
def wait_for_service_in_another_thread(self, max_retries=150, show_warning=True):
|
||||||
|
thread = threading.Thread(target=self.wait_for_service, args=(max_retries, show_warning))
|
||||||
|
thread.start()
|
||||||
|
return thread
|
||||||
|
|
||||||
def wait_for_service(self, max_retries = 50, show_warning=True):
|
def wait_for_service(self, max_retries = 50, show_warning=True):
|
||||||
url = f"{self.comfyui_base_url}"
|
url = f"{self.comfyui_base_url}"
|
||||||
|
@ -127,11 +127,20 @@ class LollmsDiffusers(LollmsTTI):
|
|||||||
PackageManager.install_or_update("sentencepiece")
|
PackageManager.install_or_update("sentencepiece")
|
||||||
PackageManager.install_or_update("accelerate")
|
PackageManager.install_or_update("accelerate")
|
||||||
try:
|
try:
|
||||||
from diffusers import AutoPipelineForText2Image, AutoPipelineForImage2Image#PixArtSigmaPipeline
|
if "stable-diffusion-3" in app.config.diffusers_model:
|
||||||
self.model = AutoPipelineForText2Image.from_pretrained(
|
from diffusers import StableDiffusion3Pipeline # AutoPipelineForImage2Image#PixArtSigmaPipeline
|
||||||
app.config.diffusers_model, torch_dtype=torch.float16, cache_dir=self.models_dir,
|
self.model = StableDiffusion3Pipeline.from_pretrained(
|
||||||
use_safetensors=True,
|
app.config.diffusers_model, torch_dtype=torch.float16, cache_dir=self.models_dir,
|
||||||
)
|
use_safetensors=True,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
from diffusers import AutoPipelineForText2Image # AutoPipelineForImage2Image#PixArtSigmaPipeline
|
||||||
|
self.model = AutoPipelineForText2Image.from_pretrained(
|
||||||
|
app.config.diffusers_model, torch_dtype=torch.float16, cache_dir=self.models_dir,
|
||||||
|
use_safetensors=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# AutoPipelineForText2Image
|
||||||
# self.model = StableDiffusionPipeline.from_pretrained(
|
# self.model = StableDiffusionPipeline.from_pretrained(
|
||||||
# "CompVis/stable-diffusion-v1-4", torch_dtype=torch.float16, cache_dir=self.models_dir,
|
# "CompVis/stable-diffusion-v1-4", torch_dtype=torch.float16, cache_dir=self.models_dir,
|
||||||
# use_safetensors=True,
|
# use_safetensors=True,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user