From 46a694fa0b0423b0b36a434a00c4c50cb01a976a Mon Sep 17 00:00:00 2001 From: Saifeddine ALOUI Date: Sun, 17 Mar 2024 15:10:18 +0100 Subject: [PATCH] fixed internet issues --- lollms/internet.py | 147 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 130 insertions(+), 17 deletions(-) diff --git a/lollms/internet.py b/lollms/internet.py index 9136d65..8567e90 100644 --- a/lollms/internet.py +++ b/lollms/internet.py @@ -1,4 +1,8 @@ from ascii_colors import ASCIIColors, trace_exception +from lollms.utilities import PackageManager +import time +import re + def get_favicon_url(url): import requests from bs4 import BeautifulSoup @@ -29,18 +33,136 @@ def format_url_parameter(value:str): return encoded_value +def wait_for_page(driver, step_delay=1): + # Get the initial page height + last_height = driver.execute_script("return document.body.scrollHeight") + + while True: + # Scroll to the bottom of the page + driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + + # Wait for the page to load new content + time.sleep(step_delay) + + # Get the new page height + new_height = driver.execute_script("return document.body.scrollHeight") + + # If the page height hasn't changed, exit the loop + if new_height == last_height: + break + + last_height = new_height + + +def prepare_chrome_driver(chromedriver_path = None): + from selenium import webdriver + from selenium.common.exceptions import TimeoutException + from selenium.webdriver.common.by import By + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC + + # Create a new instance of the Chrome driver + chrome_options = webdriver.ChromeOptions() + #chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + # chrome_options.add_argument("--remote-debugging-port=9222") + chrome_options.add_argument("--headless") # Run Chrome in headless mode + # chrome_options.add_argument('--ignore-certificate-errors') + # chrome_options.add_argument('--ignore-ssl-errors') + chrome_options.add_argument("--enable-third-party-cookies") + + # Set path to chromedriver executable (replace with your own path) + if chromedriver_path is None: + chromedriver_path = ""#"/snap/bin/chromium.chromedriver" + + # Create a new Chrome webdriver instance + try: + driver = webdriver.Chrome(executable_path=chromedriver_path, options=chrome_options) + except: + driver = webdriver.Chrome(options=chrome_options) + return driver + +def scrape_and_save(url, file_path=None, lollms_com=None, chromedriver_path=None, wait_step_delay=1): + if not PackageManager.check_package_installed("selenium"): + PackageManager.install_package("selenium") + if not PackageManager.check_package_installed("bs4"): + PackageManager.install_package("bs4") + + from bs4 import BeautifulSoup + + from selenium import webdriver + from selenium.common.exceptions import TimeoutException + from selenium.webdriver.common.by import By + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC + + driver = prepare_chrome_driver(chromedriver_path) + + # Navigate to the URL + driver.get(url) + wait_for_page(driver, wait_step_delay) + + # Parse the HTML content using BeautifulSoup + soup = BeautifulSoup(driver.page_source, 'html.parser') + + # Find the button that contains the text "accept" (case-insensitive) + accept_button = soup.find('button', text=lambda t: 'accept' in t.lower()) + + if accept_button: + # Click the button using Selenium + button_element = driver.find_element(By.XPATH, "//button[contains(translate(., 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'accept')]") + button_element.click() + print("Button clicked!") + else: + print("Button not found.") + # Find and click the "Continue reading" button (if available) + try: + continue_button = WebDriverWait(driver, 0).until( + EC.presence_of_element_located((By.XPATH, "//button[contains(text(), 'Continue reading')]")) + ) + continue_button.click() + wait_for_page(driver, wait_step_delay) + # Wait for the additional content to load + time.sleep(5) + except: + if lollms_com: + lollms_com.info("No 'Continue reading' button found. Proceeding with the current content.") + + # Parse the HTML content using BeautifulSoup + soup = BeautifulSoup(driver.page_source, 'html.parser') + + # Find all the text content in the webpage + text_content = soup.get_text() + text_content = re.sub(r'\n+', '\n', text_content) + + + if file_path: + # Save the text content as a text file + with open(file_path, 'w', encoding="utf-8") as file: + file.write(text_content) + if lollms_com: + lollms_com.info(f"Webpage content saved to {file_path}") + + # Close the driver + driver.quit() + + + return text_content + def get_relevant_text_block( url, driver, config, vectorizer, title=None, - brief=None + brief=None, + wait_step_delay=0.5 ): try: from bs4 import BeautifulSoup # Load the webpage driver.get(url) + wait_for_page(driver, wait_step_delay) # Wait for JavaScript to execute and get the final page source html_content = driver.page_source @@ -62,12 +184,17 @@ def get_relevant_text_block( except: ASCIIColors.warning(f"Couldn't scrape: {url}") -def extract_results(url, max_num, driver=None): +def extract_results(url, max_num, driver=None, wait_step_delay=0.5): from bs4 import BeautifulSoup # Load the webpage driver.get(url) + # Get the initial page height + last_height = driver.execute_script("return document.body.scrollHeight") + + wait_for_page(driver, wait_step_delay) + # Wait for JavaScript to execute and get the final page source html_content = driver.page_source @@ -134,21 +261,7 @@ def internet_search(query, chromedriver_path, config, model = None, quick_search formatted_text = "" nb_non_empty = 0 # Configure Chrome options - chrome_options = Options() - chrome_options.add_argument("--no-sandbox") - chrome_options.add_argument("--disable-dev-shm-usage") - chrome_options.add_argument("--remote-debugging-port=9222") - chrome_options.add_argument("--headless") # Run Chrome in headless mode - - # Set path to chromedriver executable (replace with your own path) - if chromedriver_path is None: - chromedriver_path = ""#"/snap/bin/chromium.chromedriver" - - # Create a new Chrome webdriver instance - try: - driver = webdriver.Chrome(executable_path=chromedriver_path, options=chrome_options) - except: - driver = webdriver.Chrome(options=chrome_options) + driver = prepare_chrome_driver(chromedriver_path) results = extract_results( f"https://duckduckgo.com/?q={format_url_parameter(query)}&t=h_&ia=web",