Created
March 19, 2026 14:02
-
-
Save tylr/561cc6718f1aa61c258d79a52f46aaaf to your computer and use it in GitHub Desktop.
BDG Geodesix Nightly Sync — pushes affiliate/shopping articles to Geodesix S3 data queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Geodesix Nightly Sync | |
| ===================== | |
| Pushes BDG affiliate/shopping articles (all sites) to Geodesix data pipeline. | |
| Rules: | |
| - Tags: "affiliate" OR "shopping" | |
| - Exclude: sponsored=true | |
| - Exclude: any tag slug/name containing "sponsored" | |
| - All BDG consumer sites | |
| - Runs nightly | |
| Usage: | |
| GSX_SDK_ACCESS_KEY_ID=... GSX_SDK_SECRET_ACCESS_KEY=... GSX_DATA_FOLDER=... python3 geodesix_nightly_sync.py | |
| Credentials: | |
| - Typeset: ~/.openclaw/workspace/.secrets/typeset.json | |
| - Geodesix AWS: env vars GSX_SDK_ACCESS_KEY_ID, GSX_SDK_SECRET_ACCESS_KEY, GSX_DATA_FOLDER | |
| S3 format (reverse-engineered from DataQueueClient.php): | |
| Bucket: geodesix-data-queue | |
| Content key: {data_folder}/queue/html/{domain}/html_{md5(url)}.json | |
| Log key: {data_folder}/log/html_{md5(url)}.json | |
| Body: JSON envelope with version, uuid, type, uri, domain, data (html string), relations | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import hashlib | |
| import logging | |
| import requests | |
| import boto3 | |
| from datetime import datetime, timezone | |
| from urllib.parse import urlparse | |
| from botocore.exceptions import ClientError | |
| # ── Logging ────────────────────────────────────────────────────────────────── | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[logging.StreamHandler(sys.stdout)] | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ── Config ─────────────────────────────────────────────────────────────────── | |
| TYPESET_ENDPOINT = "https://graph.bustle.com" | |
| TYPESET_SECRETS_PATH = os.path.expanduser("~/.openclaw/workspace/.secrets/typeset.json") | |
| GSX_SECRETS_PATH = os.path.expanduser("~/.openclaw/workspace/.secrets/geodesix.json") | |
| # Load Geodesix credentials from secrets file | |
| with open(GSX_SECRETS_PATH) as _f: | |
| _gsx = json.load(_f) | |
| GSX_ACCESS_KEY = _gsx["access_key_id"] | |
| GSX_SECRET_KEY = _gsx["secret_access_key"] | |
| GSX_DATA_FOLDER = _gsx["data_folder"] | |
| GSX_BUCKET = _gsx["bucket"] | |
| GSX_REGION = _gsx.get("region", "us-east-1") | |
| DATA_TYPE = "html" | |
| # All BDG consumer sites | |
| SITES = [ | |
| "BUSTLE", "NYLON", "W", "SCARY_MOMMY", "ELITE_DAILY", | |
| "INVERSE", "ZOE", "FATHERLY", "ROMPER", "INPUT" | |
| ] | |
| # Tags that make an article eligible (must have at least one) | |
| INCLUDE_TAGS = {"affiliate", "shopping"} | |
| # Delay between HTML fetches (be polite to our own CDN) | |
| FETCH_DELAY_SEC = 0.5 | |
| # ── Typeset Auth ────────────────────────────────────────────────────────────── | |
| def load_typeset_secrets(): | |
| with open(TYPESET_SECRETS_PATH) as f: | |
| return json.load(f) | |
| def refresh_typeset_token(secrets): | |
| log.info("Refreshing Typeset token...") | |
| r = requests.post( | |
| TYPESET_ENDPOINT, | |
| json={"query": f'mutation {{ login(input: {{email: "{secrets["username"]}", password: "{secrets["password"]}"}}) {{ authToken }} }}'}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30 | |
| ) | |
| token = r.json()["data"]["login"]["authToken"] | |
| secrets["token"] = token | |
| with open(TYPESET_SECRETS_PATH, "w") as f: | |
| json.dump(secrets, f, indent=2) | |
| log.info("Token refreshed.") | |
| return token | |
| def gql(query, variables=None, token=None): | |
| r = requests.post( | |
| TYPESET_ENDPOINT, | |
| json={"query": query, "variables": variables or {}}, | |
| headers={"Content-Type": "application/json", "Authorization": token}, | |
| timeout=30 | |
| ) | |
| data = r.json() | |
| if "errors" in data and any("UNAUTHENTICATED" in str(e) for e in data["errors"]): | |
| raise PermissionError("Typeset token expired") | |
| return data | |
| # ── Article Query ───────────────────────────────────────────────────────────── | |
| PAGES_QUERY = """ | |
| query GetPages($sites: [SiteName!], $after: String) { | |
| pageConnection( | |
| first: 100, | |
| order: DESC, | |
| sponsored: false, | |
| sites: $sites, | |
| after: $after | |
| ) { | |
| pageInfo { hasNextPage endCursor } | |
| edges { | |
| node { | |
| id | |
| title | |
| url | |
| state | |
| sponsored | |
| tags { name slug } | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| def is_eligible(page): | |
| """Return True if page matches affiliate/shopping criteria (not sponsored).""" | |
| tags = page.get("tags") or [] | |
| tag_slugs = {(t.get("slug") or "").lower() for t in tags} | |
| tag_names = {(t.get("name") or "").lower() for t in tags} | |
| all_tag_strings = tag_slugs | tag_names | |
| # Must have at least one include tag | |
| if not (INCLUDE_TAGS & tag_slugs): | |
| return False | |
| # Exclude if any tag contains "sponsored" | |
| if any("sponsored" in t for t in all_tag_strings): | |
| return False | |
| # Must be published | |
| if page.get("state") != "PUBLISHED": | |
| return False | |
| return True | |
| def fetch_eligible_articles(token): | |
| """Paginate through all sites and return eligible articles.""" | |
| eligible = [] | |
| total_scanned = 0 | |
| for site in SITES: | |
| log.info(f"Scanning {site}...") | |
| cursor = None | |
| site_count = 0 | |
| while True: | |
| data = gql(PAGES_QUERY, variables={"sites": [site], "after": cursor}, token=token) | |
| conn = data.get("data", {}).get("pageConnection", {}) | |
| edges = conn.get("edges", []) | |
| page_info = conn.get("pageInfo", {}) | |
| for edge in edges: | |
| page = edge["node"] | |
| total_scanned += 1 | |
| if is_eligible(page): | |
| page["site"] = site | |
| eligible.append(page) | |
| site_count += 1 | |
| if not page_info.get("hasNextPage"): | |
| break | |
| cursor = page_info["endCursor"] | |
| time.sleep(0.1) | |
| log.info(f" {site}: {site_count} eligible articles found") | |
| log.info(f"Total scanned: {total_scanned} | Total eligible: {len(eligible)}") | |
| return eligible | |
| # ── HTML Fetch ──────────────────────────────────────────────────────────────── | |
| def fetch_article_html(url): | |
| """Fetch rendered HTML from the live article URL.""" | |
| try: | |
| r = requests.get( | |
| url, | |
| headers={"User-Agent": "BDG-Geodesix-Sync/1.0"}, | |
| timeout=30 | |
| ) | |
| r.raise_for_status() | |
| return r.text | |
| except Exception as e: | |
| log.warning(f"Failed to fetch {url}: {e}") | |
| return None | |
| # ── Geodesix Push (matches DataQueueClient.php exactly) ────────────────────── | |
| def make_uuid(url): | |
| """Replicate PHP: $uuid = $dataType . '_' . md5($uri)""" | |
| return f"{DATA_TYPE}_{hashlib.md5(url.encode('utf-8')).hexdigest()}" | |
| def get_domain(url): | |
| """Replicate PHP: parse_url + str_replace('www.', '', $domain)""" | |
| parsed = urlparse(url) | |
| return parsed.netloc.replace("www.", "") | |
| def push_to_geodesix(s3, url, html): | |
| """ | |
| Push article HTML to Geodesix S3 queue. | |
| Replicates DataQueueClient::pushData() exactly: | |
| - Content key: {data_folder}/queue/html/{domain}/{uuid}.json | |
| - Log key: {data_folder}/log/{uuid}.json | |
| - Body: JSON envelope matching the PHP SDK's object_data structure | |
| """ | |
| domain = get_domain(url) | |
| uuid = make_uuid(url) | |
| queue_name = f"{GSX_DATA_FOLDER}/queue" | |
| object_key = f"{queue_name}/{DATA_TYPE}/{domain}/{uuid}.json" | |
| log_key = f"{GSX_DATA_FOLDER}/log/{uuid}.json" | |
| now = datetime.now(timezone.utc) | |
| now_ts = int(now.timestamp()) | |
| now_str = now.isoformat() | |
| # Content envelope — matches PHP $object_data | |
| object_data = { | |
| "version": "1.0", | |
| "uuid": uuid, | |
| "type": DATA_TYPE, | |
| "repository": GSX_DATA_FOLDER, | |
| "time_str": now_str, | |
| "time": now_ts, | |
| "uri": url, | |
| "domain": domain, | |
| "data": html, | |
| "relations": [] | |
| } | |
| # Log envelope — matches PHP $log_json | |
| path = urlparse(url).path.strip("/").replace("/", " ") | |
| log_data = { | |
| "time": now_ts, | |
| "uuid": uuid, | |
| "uri": url, | |
| "domain": domain, | |
| "data_type": DATA_TYPE, | |
| "repository": GSX_DATA_FOLDER, | |
| "title": path or domain | |
| } | |
| try: | |
| s3.put_object(Bucket=GSX_BUCKET, Key=object_key, Body=json.dumps(object_data).encode("utf-8")) | |
| s3.put_object(Bucket=GSX_BUCKET, Key=log_key, Body=json.dumps(log_data).encode("utf-8")) | |
| return uuid | |
| except ClientError as e: | |
| log.error(f"S3 push failed for {url}: {e}") | |
| return None | |
| # ── Main ────────────────────────────────────────────────────────────────────── | |
| def main(): | |
| log.info("=== Geodesix Nightly Sync Starting ===") | |
| log.info(f"Data folder: {GSX_DATA_FOLDER} | Bucket: {GSX_BUCKET} | Sites: {len(SITES)}") | |
| if not all([GSX_ACCESS_KEY, GSX_SECRET_KEY, GSX_DATA_FOLDER]): | |
| log.error("Missing credentials in ~/.openclaw/workspace/.secrets/geodesix.json") | |
| sys.exit(1) | |
| # Load Typeset credentials | |
| secrets = load_typeset_secrets() | |
| token = secrets["token"] | |
| # Fetch eligible articles from Typeset | |
| log.info("Querying Typeset for affiliate/shopping articles...") | |
| try: | |
| articles = fetch_eligible_articles(token) | |
| except PermissionError: | |
| token = refresh_typeset_token(secrets) | |
| articles = fetch_eligible_articles(token) | |
| if not articles: | |
| log.info("No eligible articles found. Exiting.") | |
| return | |
| # Initialize S3 client with Geodesix credentials | |
| s3 = boto3.client( | |
| "s3", | |
| region_name=GSX_REGION, | |
| aws_access_key_id=GSX_ACCESS_KEY, | |
| aws_secret_access_key=GSX_SECRET_KEY, | |
| ) | |
| pushed = 0 | |
| failed = 0 | |
| for i, article in enumerate(articles, 1): | |
| url = article.get("url") | |
| if not url: | |
| log.warning(f"Skipping article {article['id']} — no URL") | |
| continue | |
| log.info(f"[{i}/{len(articles)}] {article.get('site')} | {url}") | |
| html = fetch_article_html(url) | |
| if not html: | |
| failed += 1 | |
| continue | |
| uuid = push_to_geodesix(s3, url, html) | |
| if uuid: | |
| pushed += 1 | |
| log.info(f" ✓ {uuid}") | |
| else: | |
| failed += 1 | |
| time.sleep(FETCH_DELAY_SEC) | |
| log.info(f"=== Done === Pushed: {pushed} | Failed: {failed} | Total: {len(articles)}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment