Skip to content

Instantly share code, notes, and snippets.

@closetgeekshow
Last active November 20, 2024 04:45
Show Gist options
  • Select an option

  • Save closetgeekshow/bf91666f9aef46301550100ede1daa49 to your computer and use it in GitHub Desktop.

Select an option

Save closetgeekshow/bf91666f9aef46301550100ede1daa49 to your computer and use it in GitHub Desktop.
python script to spider an URL and convert its contents to md and put in a directory
import os
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify
from urllib.parse import urljoin, urlparse
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import hashlib
from ratelimit import limits, sleep_and_retry
import logging
# Rate limiting - 1 request per second by default
CALLS_PER_SECOND = 1
RETRY_COUNT = 3
RETRY_DELAY = 2
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@sleep_and_retry
@limits(calls=CALLS_PER_SECOND, period=1)
def rate_limited_request(url, session):
return session.get(url)
def normalize_url(url):
"""Normalize URLs to avoid duplication"""
parsed = urlparse(url)
# Remove trailing slashes and normalize to lowercase
path = parsed.path.rstrip('/').lower()
return f"{parsed.scheme}://{parsed.netloc}{path}"
def validate_content(content):
"""Basic content validation"""
if not content or len(content.strip()) < 50: # Arbitrary minimum length
return False
return True
def process_url(url, session, base_url, ignore_paths, stay_in_path):
"""Process a single URL with retry logic"""
for attempt in range(RETRY_COUNT):
try:
response = rate_limited_request(url, session)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
new_urls = set()
for link in soup.find_all("a", href=True):
absolute_url = urljoin(base_url, link['href'])
if not should_ignore_url(absolute_url, base_url, ignore_paths, stay_in_path):
new_urls.add(normalize_url(absolute_url))
return url, new_urls, response.text
except requests.RequestException as e:
if attempt == RETRY_COUNT - 1:
logger.error(f"Failed to fetch {url} after {RETRY_COUNT} attempts: {e}")
return url, set(), None
time.sleep(RETRY_DELAY)
return url, set(), None
def collect_urls(base_url, ignore_paths=None, stay_in_path=False, max_workers=5):
if ignore_paths is None:
ignore_paths = []
visited = set()
to_visit = {normalize_url(base_url)}
all_urls = set()
session = requests.Session()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
with tqdm(total=len(to_visit), desc="Collecting URLs") as pbar:
while to_visit:
# Process URLs in parallel
future_to_url = {
executor.submit(process_url, url, session, base_url, ignore_paths, stay_in_path): url
for url in list(to_visit)[:max_workers]
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
current_url, new_urls, content = future.result()
visited.add(url)
to_visit.remove(url)
if content and validate_content(content):
all_urls.add(url)
to_visit.update(new_urls - visited)
pbar.update(1)
pbar.total = len(to_visit) + len(visited)
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return all_urls
def convert_urls_to_markdown(urls, output_dir, base_url, ignore_paths, stay_in_path, content_selector="body", max_workers=5):
os.makedirs(output_dir, exist_ok=True)
session = requests.Session()
def convert_single_url(url):
try:
response = rate_limited_request(url, session)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
main_pane = soup.select_one(content_selector)
if main_pane and validate_content(str(main_pane)):
markdown_content = markdownify(str(main_pane))
output_file = os.path.join(output_dir, f"{sanitize_filename(url)}.md")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(markdown_content)
return True
return False
except requests.RequestException as e:
logger.error(f"Failed to convert {url}: {e}")
return False
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for url in urls:
future = executor.submit(convert_single_url, url)
futures.append(future)
with tqdm(total=len(urls), desc="Converting to Markdown") as pbar:
for future in as_completed(futures):
try:
result = future.result()
if result:
logger.info(f"Successfully converted {url}")
except Exception as e:
logger.error(f"Error converting {url}: {e}")
pbar.update(1)
def should_ignore_url(url, base_url, ignore_paths, stay_in_path):
parsed_url = urlparse(url)
parsed_base = urlparse(base_url)
# Ignore URLs outside the base domain
if parsed_url.netloc != parsed_base.netloc:
return True
# Stay within base path if enabled
if stay_in_path and not parsed_url.path.startswith(parsed_base.path):
return True
# Ignore specific paths
for ignore_path in ignore_paths:
if ignore_path in parsed_url.path:
return True
return False
def sanitize_filename(url):
"""Convert URL to a valid filename by replacing invalid characters"""
# Remove the protocol and domain
parsed = urlparse(url)
filename = parsed.path
if parsed.query:
filename += '_' + parsed.query
if parsed.fragment:
filename += '_' + parsed.fragment
# Replace invalid filename characters with underscores
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# Remove leading/trailing spaces and dots
filename = filename.strip('. ')
# Use 'index' for empty filenames
if not filename:
filename = 'index'
return filename
def convert_url_to_markdown_link(url, base_url, ignore_paths, stay_in_path):
"""Convert URLs to local markdown file references"""
if should_ignore_url(url, base_url, ignore_paths, stay_in_path):
return url
normalized = normalize_url(url)
return f"{sanitize_filename(normalized)}.md"
if __name__ == "__main__":
print("Spider-Mark Configuration\n")
BASE_URL = input("Enter base URL [https://fabricjs.com/docs]: ").strip() or "https://fabricjs.com/docs"
OUTPUT_DIR = input("Enter output directory [output_markdown]: ").strip() or "output_markdown"
CONTENT_SELECTOR = input("Enter content selector [div.main-pane]: ").strip() or "div.main-pane"
IGNORE_PATHS = ['docs/old-docs']
if IGNORE_PATHS:
print(f"Default ignore paths: {IGNORE_PATHS}")
keep_default = input("Keep default ignore paths? (y/n) [y]: ").lower().strip() != 'n'
if not keep_default:
IGNORE_PATHS = []
while True:
path = input("Enter path to ignore (press Enter when done): ").strip()
if not path:
break
IGNORE_PATHS.append(path)
else:
while True:
path = input("Enter path to ignore (press Enter when done): ").strip()
if not path:
break
IGNORE_PATHS.append(path)
STAY_IN_PATH = input("Stay in base path? (y/n) [y]: ").lower().strip() != 'n'
MAX_WORKERS = int(input("Number of parallel workers [5]: ") or "5")
print("\nConfiguration Summary:")
print(f"Base URL: {BASE_URL}")
print(f"Output Directory: {OUTPUT_DIR}")
print(f"Content Selector: {CONTENT_SELECTOR}")
print(f"Ignored Paths: {IGNORE_PATHS}")
print(f"Stay in Path: {STAY_IN_PATH}")
print(f"Max Workers: {MAX_WORKERS}")
proceed = input("\nProceed with these settings? (y/n) [y]: ").lower().strip() != 'n'
if proceed:
logger.info("Phase 1: Collecting URLs...")
all_urls = collect_urls(BASE_URL, IGNORE_PATHS, STAY_IN_PATH, MAX_WORKERS)
logger.info("Found URLs:")
for url in all_urls:
logger.info(f"- {url}")
logger.info("\nStarting conversion...")
convert_urls_to_markdown(all_urls, OUTPUT_DIR, BASE_URL, IGNORE_PATHS, STAY_IN_PATH, CONTENT_SELECTOR, MAX_WORKERS)
logger.info("Conversion complete!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment