Skip to content

Instantly share code, notes, and snippets.

@tylr
Created March 19, 2026 14:02
Show Gist options
  • Select an option

  • Save tylr/561cc6718f1aa61c258d79a52f46aaaf to your computer and use it in GitHub Desktop.

Select an option

Save tylr/561cc6718f1aa61c258d79a52f46aaaf to your computer and use it in GitHub Desktop.
BDG Geodesix Nightly Sync — pushes affiliate/shopping articles to Geodesix S3 data queue
#!/usr/bin/env python3
"""
Geodesix Nightly Sync
=====================
Pushes BDG affiliate/shopping articles (all sites) to Geodesix data pipeline.
Rules:
- Tags: "affiliate" OR "shopping"
- Exclude: sponsored=true
- Exclude: any tag slug/name containing "sponsored"
- All BDG consumer sites
- Runs nightly
Usage:
GSX_SDK_ACCESS_KEY_ID=... GSX_SDK_SECRET_ACCESS_KEY=... GSX_DATA_FOLDER=... python3 geodesix_nightly_sync.py
Credentials:
- Typeset: ~/.openclaw/workspace/.secrets/typeset.json
- Geodesix AWS: env vars GSX_SDK_ACCESS_KEY_ID, GSX_SDK_SECRET_ACCESS_KEY, GSX_DATA_FOLDER
S3 format (reverse-engineered from DataQueueClient.php):
Bucket: geodesix-data-queue
Content key: {data_folder}/queue/html/{domain}/html_{md5(url)}.json
Log key: {data_folder}/log/html_{md5(url)}.json
Body: JSON envelope with version, uuid, type, uri, domain, data (html string), relations
"""
import os
import sys
import json
import time
import hashlib
import logging
import requests
import boto3
from datetime import datetime, timezone
from urllib.parse import urlparse
from botocore.exceptions import ClientError
# ── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger(__name__)
# ── Config ───────────────────────────────────────────────────────────────────
TYPESET_ENDPOINT = "https://graph.bustle.com"
TYPESET_SECRETS_PATH = os.path.expanduser("~/.openclaw/workspace/.secrets/typeset.json")
GSX_SECRETS_PATH = os.path.expanduser("~/.openclaw/workspace/.secrets/geodesix.json")
# Load Geodesix credentials from secrets file
with open(GSX_SECRETS_PATH) as _f:
_gsx = json.load(_f)
GSX_ACCESS_KEY = _gsx["access_key_id"]
GSX_SECRET_KEY = _gsx["secret_access_key"]
GSX_DATA_FOLDER = _gsx["data_folder"]
GSX_BUCKET = _gsx["bucket"]
GSX_REGION = _gsx.get("region", "us-east-1")
DATA_TYPE = "html"
# All BDG consumer sites
SITES = [
"BUSTLE", "NYLON", "W", "SCARY_MOMMY", "ELITE_DAILY",
"INVERSE", "ZOE", "FATHERLY", "ROMPER", "INPUT"
]
# Tags that make an article eligible (must have at least one)
INCLUDE_TAGS = {"affiliate", "shopping"}
# Delay between HTML fetches (be polite to our own CDN)
FETCH_DELAY_SEC = 0.5
# ── Typeset Auth ──────────────────────────────────────────────────────────────
def load_typeset_secrets():
with open(TYPESET_SECRETS_PATH) as f:
return json.load(f)
def refresh_typeset_token(secrets):
log.info("Refreshing Typeset token...")
r = requests.post(
TYPESET_ENDPOINT,
json={"query": f'mutation {{ login(input: {{email: "{secrets["username"]}", password: "{secrets["password"]}"}}) {{ authToken }} }}'},
headers={"Content-Type": "application/json"},
timeout=30
)
token = r.json()["data"]["login"]["authToken"]
secrets["token"] = token
with open(TYPESET_SECRETS_PATH, "w") as f:
json.dump(secrets, f, indent=2)
log.info("Token refreshed.")
return token
def gql(query, variables=None, token=None):
r = requests.post(
TYPESET_ENDPOINT,
json={"query": query, "variables": variables or {}},
headers={"Content-Type": "application/json", "Authorization": token},
timeout=30
)
data = r.json()
if "errors" in data and any("UNAUTHENTICATED" in str(e) for e in data["errors"]):
raise PermissionError("Typeset token expired")
return data
# ── Article Query ─────────────────────────────────────────────────────────────
PAGES_QUERY = """
query GetPages($sites: [SiteName!], $after: String) {
pageConnection(
first: 100,
order: DESC,
sponsored: false,
sites: $sites,
after: $after
) {
pageInfo { hasNextPage endCursor }
edges {
node {
id
title
url
state
sponsored
tags { name slug }
}
}
}
}
"""
def is_eligible(page):
"""Return True if page matches affiliate/shopping criteria (not sponsored)."""
tags = page.get("tags") or []
tag_slugs = {(t.get("slug") or "").lower() for t in tags}
tag_names = {(t.get("name") or "").lower() for t in tags}
all_tag_strings = tag_slugs | tag_names
# Must have at least one include tag
if not (INCLUDE_TAGS & tag_slugs):
return False
# Exclude if any tag contains "sponsored"
if any("sponsored" in t for t in all_tag_strings):
return False
# Must be published
if page.get("state") != "PUBLISHED":
return False
return True
def fetch_eligible_articles(token):
"""Paginate through all sites and return eligible articles."""
eligible = []
total_scanned = 0
for site in SITES:
log.info(f"Scanning {site}...")
cursor = None
site_count = 0
while True:
data = gql(PAGES_QUERY, variables={"sites": [site], "after": cursor}, token=token)
conn = data.get("data", {}).get("pageConnection", {})
edges = conn.get("edges", [])
page_info = conn.get("pageInfo", {})
for edge in edges:
page = edge["node"]
total_scanned += 1
if is_eligible(page):
page["site"] = site
eligible.append(page)
site_count += 1
if not page_info.get("hasNextPage"):
break
cursor = page_info["endCursor"]
time.sleep(0.1)
log.info(f" {site}: {site_count} eligible articles found")
log.info(f"Total scanned: {total_scanned} | Total eligible: {len(eligible)}")
return eligible
# ── HTML Fetch ────────────────────────────────────────────────────────────────
def fetch_article_html(url):
"""Fetch rendered HTML from the live article URL."""
try:
r = requests.get(
url,
headers={"User-Agent": "BDG-Geodesix-Sync/1.0"},
timeout=30
)
r.raise_for_status()
return r.text
except Exception as e:
log.warning(f"Failed to fetch {url}: {e}")
return None
# ── Geodesix Push (matches DataQueueClient.php exactly) ──────────────────────
def make_uuid(url):
"""Replicate PHP: $uuid = $dataType . '_' . md5($uri)"""
return f"{DATA_TYPE}_{hashlib.md5(url.encode('utf-8')).hexdigest()}"
def get_domain(url):
"""Replicate PHP: parse_url + str_replace('www.', '', $domain)"""
parsed = urlparse(url)
return parsed.netloc.replace("www.", "")
def push_to_geodesix(s3, url, html):
"""
Push article HTML to Geodesix S3 queue.
Replicates DataQueueClient::pushData() exactly:
- Content key: {data_folder}/queue/html/{domain}/{uuid}.json
- Log key: {data_folder}/log/{uuid}.json
- Body: JSON envelope matching the PHP SDK's object_data structure
"""
domain = get_domain(url)
uuid = make_uuid(url)
queue_name = f"{GSX_DATA_FOLDER}/queue"
object_key = f"{queue_name}/{DATA_TYPE}/{domain}/{uuid}.json"
log_key = f"{GSX_DATA_FOLDER}/log/{uuid}.json"
now = datetime.now(timezone.utc)
now_ts = int(now.timestamp())
now_str = now.isoformat()
# Content envelope — matches PHP $object_data
object_data = {
"version": "1.0",
"uuid": uuid,
"type": DATA_TYPE,
"repository": GSX_DATA_FOLDER,
"time_str": now_str,
"time": now_ts,
"uri": url,
"domain": domain,
"data": html,
"relations": []
}
# Log envelope — matches PHP $log_json
path = urlparse(url).path.strip("/").replace("/", " ")
log_data = {
"time": now_ts,
"uuid": uuid,
"uri": url,
"domain": domain,
"data_type": DATA_TYPE,
"repository": GSX_DATA_FOLDER,
"title": path or domain
}
try:
s3.put_object(Bucket=GSX_BUCKET, Key=object_key, Body=json.dumps(object_data).encode("utf-8"))
s3.put_object(Bucket=GSX_BUCKET, Key=log_key, Body=json.dumps(log_data).encode("utf-8"))
return uuid
except ClientError as e:
log.error(f"S3 push failed for {url}: {e}")
return None
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
log.info("=== Geodesix Nightly Sync Starting ===")
log.info(f"Data folder: {GSX_DATA_FOLDER} | Bucket: {GSX_BUCKET} | Sites: {len(SITES)}")
if not all([GSX_ACCESS_KEY, GSX_SECRET_KEY, GSX_DATA_FOLDER]):
log.error("Missing credentials in ~/.openclaw/workspace/.secrets/geodesix.json")
sys.exit(1)
# Load Typeset credentials
secrets = load_typeset_secrets()
token = secrets["token"]
# Fetch eligible articles from Typeset
log.info("Querying Typeset for affiliate/shopping articles...")
try:
articles = fetch_eligible_articles(token)
except PermissionError:
token = refresh_typeset_token(secrets)
articles = fetch_eligible_articles(token)
if not articles:
log.info("No eligible articles found. Exiting.")
return
# Initialize S3 client with Geodesix credentials
s3 = boto3.client(
"s3",
region_name=GSX_REGION,
aws_access_key_id=GSX_ACCESS_KEY,
aws_secret_access_key=GSX_SECRET_KEY,
)
pushed = 0
failed = 0
for i, article in enumerate(articles, 1):
url = article.get("url")
if not url:
log.warning(f"Skipping article {article['id']} — no URL")
continue
log.info(f"[{i}/{len(articles)}] {article.get('site')} | {url}")
html = fetch_article_html(url)
if not html:
failed += 1
continue
uuid = push_to_geodesix(s3, url, html)
if uuid:
pushed += 1
log.info(f" ✓ {uuid}")
else:
failed += 1
time.sleep(FETCH_DELAY_SEC)
log.info(f"=== Done === Pushed: {pushed} | Failed: {failed} | Total: {len(articles)}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment