mirror of
https://github.com/ParisNeo/lollms.git
synced 2024-12-21 21:47:54 +00:00
fixed internet issues
This commit is contained in:
parent
ddba6ed364
commit
46a694fa0b
@ -1,4 +1,8 @@
|
||||
from ascii_colors import ASCIIColors, trace_exception
|
||||
from lollms.utilities import PackageManager
|
||||
import time
|
||||
import re
|
||||
|
||||
def get_favicon_url(url):
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
@ -29,18 +33,136 @@ def format_url_parameter(value:str):
|
||||
return encoded_value
|
||||
|
||||
|
||||
def wait_for_page(driver, step_delay=1):
|
||||
# Get the initial page height
|
||||
last_height = driver.execute_script("return document.body.scrollHeight")
|
||||
|
||||
while True:
|
||||
# Scroll to the bottom of the page
|
||||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
||||
|
||||
# Wait for the page to load new content
|
||||
time.sleep(step_delay)
|
||||
|
||||
# Get the new page height
|
||||
new_height = driver.execute_script("return document.body.scrollHeight")
|
||||
|
||||
# If the page height hasn't changed, exit the loop
|
||||
if new_height == last_height:
|
||||
break
|
||||
|
||||
last_height = new_height
|
||||
|
||||
|
||||
def prepare_chrome_driver(chromedriver_path = None):
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
# Create a new instance of the Chrome driver
|
||||
chrome_options = webdriver.ChromeOptions()
|
||||
#chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
# chrome_options.add_argument("--remote-debugging-port=9222")
|
||||
chrome_options.add_argument("--headless") # Run Chrome in headless mode
|
||||
# chrome_options.add_argument('--ignore-certificate-errors')
|
||||
# chrome_options.add_argument('--ignore-ssl-errors')
|
||||
chrome_options.add_argument("--enable-third-party-cookies")
|
||||
|
||||
# Set path to chromedriver executable (replace with your own path)
|
||||
if chromedriver_path is None:
|
||||
chromedriver_path = ""#"/snap/bin/chromium.chromedriver"
|
||||
|
||||
# Create a new Chrome webdriver instance
|
||||
try:
|
||||
driver = webdriver.Chrome(executable_path=chromedriver_path, options=chrome_options)
|
||||
except:
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
return driver
|
||||
|
||||
def scrape_and_save(url, file_path=None, lollms_com=None, chromedriver_path=None, wait_step_delay=1):
|
||||
if not PackageManager.check_package_installed("selenium"):
|
||||
PackageManager.install_package("selenium")
|
||||
if not PackageManager.check_package_installed("bs4"):
|
||||
PackageManager.install_package("bs4")
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
driver = prepare_chrome_driver(chromedriver_path)
|
||||
|
||||
# Navigate to the URL
|
||||
driver.get(url)
|
||||
wait_for_page(driver, wait_step_delay)
|
||||
|
||||
# Parse the HTML content using BeautifulSoup
|
||||
soup = BeautifulSoup(driver.page_source, 'html.parser')
|
||||
|
||||
# Find the button that contains the text "accept" (case-insensitive)
|
||||
accept_button = soup.find('button', text=lambda t: 'accept' in t.lower())
|
||||
|
||||
if accept_button:
|
||||
# Click the button using Selenium
|
||||
button_element = driver.find_element(By.XPATH, "//button[contains(translate(., 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'accept')]")
|
||||
button_element.click()
|
||||
print("Button clicked!")
|
||||
else:
|
||||
print("Button not found.")
|
||||
# Find and click the "Continue reading" button (if available)
|
||||
try:
|
||||
continue_button = WebDriverWait(driver, 0).until(
|
||||
EC.presence_of_element_located((By.XPATH, "//button[contains(text(), 'Continue reading')]"))
|
||||
)
|
||||
continue_button.click()
|
||||
wait_for_page(driver, wait_step_delay)
|
||||
# Wait for the additional content to load
|
||||
time.sleep(5)
|
||||
except:
|
||||
if lollms_com:
|
||||
lollms_com.info("No 'Continue reading' button found. Proceeding with the current content.")
|
||||
|
||||
# Parse the HTML content using BeautifulSoup
|
||||
soup = BeautifulSoup(driver.page_source, 'html.parser')
|
||||
|
||||
# Find all the text content in the webpage
|
||||
text_content = soup.get_text()
|
||||
text_content = re.sub(r'\n+', '\n', text_content)
|
||||
|
||||
|
||||
if file_path:
|
||||
# Save the text content as a text file
|
||||
with open(file_path, 'w', encoding="utf-8") as file:
|
||||
file.write(text_content)
|
||||
if lollms_com:
|
||||
lollms_com.info(f"Webpage content saved to {file_path}")
|
||||
|
||||
# Close the driver
|
||||
driver.quit()
|
||||
|
||||
|
||||
return text_content
|
||||
|
||||
def get_relevant_text_block(
|
||||
url,
|
||||
driver,
|
||||
config,
|
||||
vectorizer,
|
||||
title=None,
|
||||
brief=None
|
||||
brief=None,
|
||||
wait_step_delay=0.5
|
||||
):
|
||||
try:
|
||||
from bs4 import BeautifulSoup
|
||||
# Load the webpage
|
||||
driver.get(url)
|
||||
wait_for_page(driver, wait_step_delay)
|
||||
|
||||
# Wait for JavaScript to execute and get the final page source
|
||||
html_content = driver.page_source
|
||||
@ -62,12 +184,17 @@ def get_relevant_text_block(
|
||||
except:
|
||||
ASCIIColors.warning(f"Couldn't scrape: {url}")
|
||||
|
||||
def extract_results(url, max_num, driver=None):
|
||||
def extract_results(url, max_num, driver=None, wait_step_delay=0.5):
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# Load the webpage
|
||||
driver.get(url)
|
||||
|
||||
# Get the initial page height
|
||||
last_height = driver.execute_script("return document.body.scrollHeight")
|
||||
|
||||
wait_for_page(driver, wait_step_delay)
|
||||
|
||||
# Wait for JavaScript to execute and get the final page source
|
||||
html_content = driver.page_source
|
||||
|
||||
@ -134,21 +261,7 @@ def internet_search(query, chromedriver_path, config, model = None, quick_search
|
||||
formatted_text = ""
|
||||
nb_non_empty = 0
|
||||
# Configure Chrome options
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
chrome_options.add_argument("--remote-debugging-port=9222")
|
||||
chrome_options.add_argument("--headless") # Run Chrome in headless mode
|
||||
|
||||
# Set path to chromedriver executable (replace with your own path)
|
||||
if chromedriver_path is None:
|
||||
chromedriver_path = ""#"/snap/bin/chromium.chromedriver"
|
||||
|
||||
# Create a new Chrome webdriver instance
|
||||
try:
|
||||
driver = webdriver.Chrome(executable_path=chromedriver_path, options=chrome_options)
|
||||
except:
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
driver = prepare_chrome_driver(chromedriver_path)
|
||||
|
||||
results = extract_results(
|
||||
f"https://duckduckgo.com/?q={format_url_parameter(query)}&t=h_&ia=web",
|
||||
|
Loading…
Reference in New Issue
Block a user