Last active
November 20, 2024 04:45
-
-
Save closetgeekshow/bf91666f9aef46301550100ede1daa49 to your computer and use it in GitHub Desktop.
python script to spider an URL and convert its contents to md and put in a directory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify | |
| from urllib.parse import urljoin, urlparse | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from tqdm import tqdm | |
| import hashlib | |
| from ratelimit import limits, sleep_and_retry | |
| import logging | |
| # Rate limiting - 1 request per second by default | |
| CALLS_PER_SECOND = 1 | |
| RETRY_COUNT = 3 | |
| RETRY_DELAY = 2 | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| @sleep_and_retry | |
| @limits(calls=CALLS_PER_SECOND, period=1) | |
| def rate_limited_request(url, session): | |
| return session.get(url) | |
| def normalize_url(url): | |
| """Normalize URLs to avoid duplication""" | |
| parsed = urlparse(url) | |
| # Remove trailing slashes and normalize to lowercase | |
| path = parsed.path.rstrip('/').lower() | |
| return f"{parsed.scheme}://{parsed.netloc}{path}" | |
| def validate_content(content): | |
| """Basic content validation""" | |
| if not content or len(content.strip()) < 50: # Arbitrary minimum length | |
| return False | |
| return True | |
| def process_url(url, session, base_url, ignore_paths, stay_in_path): | |
| """Process a single URL with retry logic""" | |
| for attempt in range(RETRY_COUNT): | |
| try: | |
| response = rate_limited_request(url, session) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| new_urls = set() | |
| for link in soup.find_all("a", href=True): | |
| absolute_url = urljoin(base_url, link['href']) | |
| if not should_ignore_url(absolute_url, base_url, ignore_paths, stay_in_path): | |
| new_urls.add(normalize_url(absolute_url)) | |
| return url, new_urls, response.text | |
| except requests.RequestException as e: | |
| if attempt == RETRY_COUNT - 1: | |
| logger.error(f"Failed to fetch {url} after {RETRY_COUNT} attempts: {e}") | |
| return url, set(), None | |
| time.sleep(RETRY_DELAY) | |
| return url, set(), None | |
| def collect_urls(base_url, ignore_paths=None, stay_in_path=False, max_workers=5): | |
| if ignore_paths is None: | |
| ignore_paths = [] | |
| visited = set() | |
| to_visit = {normalize_url(base_url)} | |
| all_urls = set() | |
| session = requests.Session() | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| with tqdm(total=len(to_visit), desc="Collecting URLs") as pbar: | |
| while to_visit: | |
| # Process URLs in parallel | |
| future_to_url = { | |
| executor.submit(process_url, url, session, base_url, ignore_paths, stay_in_path): url | |
| for url in list(to_visit)[:max_workers] | |
| } | |
| for future in as_completed(future_to_url): | |
| url = future_to_url[future] | |
| try: | |
| current_url, new_urls, content = future.result() | |
| visited.add(url) | |
| to_visit.remove(url) | |
| if content and validate_content(content): | |
| all_urls.add(url) | |
| to_visit.update(new_urls - visited) | |
| pbar.update(1) | |
| pbar.total = len(to_visit) + len(visited) | |
| except Exception as e: | |
| logger.error(f"Error processing {url}: {e}") | |
| return all_urls | |
| def convert_urls_to_markdown(urls, output_dir, base_url, ignore_paths, stay_in_path, content_selector="body", max_workers=5): | |
| os.makedirs(output_dir, exist_ok=True) | |
| session = requests.Session() | |
| def convert_single_url(url): | |
| try: | |
| response = rate_limited_request(url, session) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| main_pane = soup.select_one(content_selector) | |
| if main_pane and validate_content(str(main_pane)): | |
| markdown_content = markdownify(str(main_pane)) | |
| output_file = os.path.join(output_dir, f"{sanitize_filename(url)}.md") | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| f.write(markdown_content) | |
| return True | |
| return False | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to convert {url}: {e}") | |
| return False | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| futures = [] | |
| for url in urls: | |
| future = executor.submit(convert_single_url, url) | |
| futures.append(future) | |
| with tqdm(total=len(urls), desc="Converting to Markdown") as pbar: | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| if result: | |
| logger.info(f"Successfully converted {url}") | |
| except Exception as e: | |
| logger.error(f"Error converting {url}: {e}") | |
| pbar.update(1) | |
| def should_ignore_url(url, base_url, ignore_paths, stay_in_path): | |
| parsed_url = urlparse(url) | |
| parsed_base = urlparse(base_url) | |
| # Ignore URLs outside the base domain | |
| if parsed_url.netloc != parsed_base.netloc: | |
| return True | |
| # Stay within base path if enabled | |
| if stay_in_path and not parsed_url.path.startswith(parsed_base.path): | |
| return True | |
| # Ignore specific paths | |
| for ignore_path in ignore_paths: | |
| if ignore_path in parsed_url.path: | |
| return True | |
| return False | |
| def sanitize_filename(url): | |
| """Convert URL to a valid filename by replacing invalid characters""" | |
| # Remove the protocol and domain | |
| parsed = urlparse(url) | |
| filename = parsed.path | |
| if parsed.query: | |
| filename += '_' + parsed.query | |
| if parsed.fragment: | |
| filename += '_' + parsed.fragment | |
| # Replace invalid filename characters with underscores | |
| invalid_chars = '<>:"/\\|?*' | |
| for char in invalid_chars: | |
| filename = filename.replace(char, '_') | |
| # Remove leading/trailing spaces and dots | |
| filename = filename.strip('. ') | |
| # Use 'index' for empty filenames | |
| if not filename: | |
| filename = 'index' | |
| return filename | |
| def convert_url_to_markdown_link(url, base_url, ignore_paths, stay_in_path): | |
| """Convert URLs to local markdown file references""" | |
| if should_ignore_url(url, base_url, ignore_paths, stay_in_path): | |
| return url | |
| normalized = normalize_url(url) | |
| return f"{sanitize_filename(normalized)}.md" | |
| if __name__ == "__main__": | |
| print("Spider-Mark Configuration\n") | |
| BASE_URL = input("Enter base URL [https://fabricjs.com/docs]: ").strip() or "https://fabricjs.com/docs" | |
| OUTPUT_DIR = input("Enter output directory [output_markdown]: ").strip() or "output_markdown" | |
| CONTENT_SELECTOR = input("Enter content selector [div.main-pane]: ").strip() or "div.main-pane" | |
| IGNORE_PATHS = ['docs/old-docs'] | |
| if IGNORE_PATHS: | |
| print(f"Default ignore paths: {IGNORE_PATHS}") | |
| keep_default = input("Keep default ignore paths? (y/n) [y]: ").lower().strip() != 'n' | |
| if not keep_default: | |
| IGNORE_PATHS = [] | |
| while True: | |
| path = input("Enter path to ignore (press Enter when done): ").strip() | |
| if not path: | |
| break | |
| IGNORE_PATHS.append(path) | |
| else: | |
| while True: | |
| path = input("Enter path to ignore (press Enter when done): ").strip() | |
| if not path: | |
| break | |
| IGNORE_PATHS.append(path) | |
| STAY_IN_PATH = input("Stay in base path? (y/n) [y]: ").lower().strip() != 'n' | |
| MAX_WORKERS = int(input("Number of parallel workers [5]: ") or "5") | |
| print("\nConfiguration Summary:") | |
| print(f"Base URL: {BASE_URL}") | |
| print(f"Output Directory: {OUTPUT_DIR}") | |
| print(f"Content Selector: {CONTENT_SELECTOR}") | |
| print(f"Ignored Paths: {IGNORE_PATHS}") | |
| print(f"Stay in Path: {STAY_IN_PATH}") | |
| print(f"Max Workers: {MAX_WORKERS}") | |
| proceed = input("\nProceed with these settings? (y/n) [y]: ").lower().strip() != 'n' | |
| if proceed: | |
| logger.info("Phase 1: Collecting URLs...") | |
| all_urls = collect_urls(BASE_URL, IGNORE_PATHS, STAY_IN_PATH, MAX_WORKERS) | |
| logger.info("Found URLs:") | |
| for url in all_urls: | |
| logger.info(f"- {url}") | |
| logger.info("\nStarting conversion...") | |
| convert_urls_to_markdown(all_urls, OUTPUT_DIR, BASE_URL, IGNORE_PATHS, STAY_IN_PATH, CONTENT_SELECTOR, MAX_WORKERS) | |
| logger.info("Conversion complete!") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment