Skip to content

Instantly share code, notes, and snippets.

@carterhudson
Last active March 21, 2026 21:45
Show Gist options
  • Select an option

  • Save carterhudson/62d10dd336038131541a592c6af8e928 to your computer and use it in GitHub Desktop.

Select an option

Save carterhudson/62d10dd336038131541a592c6af8e928 to your computer and use it in GitHub Desktop.
SlothServ watchdog daemon — automated media server management
"""
SlothServ watchdog daemon.
Run as: python3 -m watchdog (from scripts/)
or: python3 scripts/watchdog (from media-server/)
"""
"""
Watchdog daemon entry point.
Run as:
python3 /path/to/scripts/watchdog (launchd)
python3 -m watchdog (from scripts/)
"""
import sys
from pathlib import Path
# Ensure the parent of watchdog/ (i.e. scripts/) is on sys.path so
# `from watchdog import ...` resolves to this package regardless of
# how the script is invoked (python3 -m watchdog vs python3 path/to/watchdog).
_scripts_dir = str(Path(__file__).resolve().parent.parent)
if _scripts_dir not in sys.path:
sys.path.insert(0, _scripts_dir)
import time
import urllib.error
from watchdog import config
from watchdog.connectivity import init_service_urls, check_connectivity
from watchdog.sonarr import (
sync_watchlist,
snapshot_series,
detect_and_search_new_series,
handle_stuck_imports as sonarr_stuck_imports,
handle_failed_downloads as sonarr_failed_downloads,
blocklist_hygiene,
sweep_missing_episodes,
)
from watchdog import radarr
from watchdog.plex import monitor_sessions, detect_truncated_episodes
from watchdog.health import health_check
from watchdog.vpn import check_health as check_vpn_health
def main():
config.setup_logging()
config.load_api_keys()
config.logger.info("Watchdog started")
init_service_urls()
try:
snapshot_series()
except Exception as e:
config.logger.error(f"Failed to snapshot series: {e}")
while True:
try:
check_connectivity()
check_vpn_health()
health_check()
sync_watchlist()
detect_and_search_new_series()
sonarr_stuck_imports()
sonarr_failed_downloads()
radarr.handle_stuck_imports()
radarr.handle_failed_downloads()
blocklist_hygiene()
sweep_missing_episodes()
detect_truncated_episodes()
monitor_sessions()
except urllib.error.URLError:
config.logger.debug("Sonarr not reachable")
except Exception as e:
config.logger.error(f"Unexpected error: {e}")
time.sleep(config.CHECK_INTERVAL)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
config.logger.info("Watchdog stopped")
"""
Thin HTTP helpers for Sonarr, Radarr, and Plex APIs.
"""
import json
import urllib.request
from watchdog import config
def sonarr(method, path, data=None, timeout=30):
url = f"{config.sonarr_url}{path}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(
url,
data=body,
headers={
"X-Api-Key": config.sonarr_api_key,
"Content-Type": "application/json",
},
method=method,
)
with urllib.request.urlopen(req, timeout=timeout) as r:
if r.status == 204:
return None
raw = r.read()
return json.loads(raw) if raw else None
def radarr(method, path, data=None, timeout=30):
if not config.radarr_api_key:
return None
url = f"{config.radarr_url}{path}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(
url,
data=body,
headers={
"X-Api-Key": config.radarr_api_key,
"Content-Type": "application/json",
},
method=method,
)
with urllib.request.urlopen(req, timeout=timeout) as r:
if r.status == 204:
return None
raw = r.read()
return json.loads(raw) if raw else None
def plex(path):
if not config.plex_token:
return None
sep = "&" if "?" in path else "?"
req = urllib.request.Request(
f"{config.plex_url}{path}{sep}X-Plex-Token={config.plex_token}",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=15) as r:
return json.loads(r.read())
"""
Shared configuration, constants, mutable state, and logging setup.
Other modules import this module (not individual names) so mutations
to module-level variables are visible everywhere:
from watchdog import config
config.sonarr_url = "http://..."
"""
import logging
import logging.handlers
import os
import xml.etree.ElementTree as ET
from pathlib import Path
# ─── Paths ────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent.parent.parent # media-server/
LOG_DIR = BASE_DIR / "logs"
# ─── Constants ────────────────────────────────────────────────────────
CHECK_INTERVAL = 30
EPISODE_SEARCH_DELAY = 2
ANIME_ROOT = "/data/media/anime"
TV_ROOT = "/data/media/tv"
ANIME_QUALITY_PROFILE_ID = 8
TV_QUALITY_PROFILE_ID = 7
MISSING_SWEEP_INTERVAL = 6 * 3600
BLOCKLIST_HYGIENE_INTERVAL = 4 * 3600
TRUNCATION_CHECK_INTERVAL = 6 * 3600
TRUNCATION_THRESHOLD = 0.6
HEALTH_CHECK_INTERVAL = 3600
VPN_CHECK_INTERVAL = 60
VPN_UNHEALTHY_THRESHOLD = 3
COLIMA_BIN = "/opt/homebrew/bin/colima"
BREW_ENV = {**os.environ, "PATH": "/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin"}
# ─── Mutable state (written by load_api_keys / init_service_urls) ─────
sonarr_url = "http://localhost:8989"
radarr_url = "http://localhost:7878"
plex_url = "http://localhost:32400"
sonarr_api_key = ""
radarr_api_key = ""
plex_token = ""
force_health_check = False
# ─── Logger ───────────────────────────────────────────────────────────
logger = logging.getLogger("watchdog")
def setup_logging():
LOG_DIR.mkdir(exist_ok=True)
handler = logging.handlers.RotatingFileHandler(
LOG_DIR / "watchdog.log", maxBytes=5_000_000, backupCount=3
)
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
def load_api_keys():
global sonarr_api_key, radarr_api_key, plex_token
tree = ET.parse(str(BASE_DIR / "config/sonarr/config.xml"))
sonarr_api_key = tree.find("ApiKey").text
try:
radarr_tree = ET.parse(str(BASE_DIR / "config/radarr/config.xml"))
radarr_api_key = radarr_tree.find("ApiKey").text
except Exception:
logger.warning("Could not read Radarr API key — Radarr integration disabled")
prefs_path = BASE_DIR / "config/plex/Library/Application Support/Plex Media Server/Preferences.xml"
try:
plex_tree = ET.parse(str(prefs_path))
plex_token = plex_tree.getroot().get("PlexOnlineToken", "")
except Exception:
logger.warning("Could not read Plex token — session monitoring disabled")
"""
Colima VM IP resolution and connectivity failover.
"""
import json
import subprocess
import time
import urllib.request
from pathlib import Path
from watchdog import config
log = config.logger
def resolve_colima_ip():
"""
Get the Colima VM's routable IP address. This bypasses port forwarding
entirely — the VM IP is directly reachable from the host via the macOS
Virtualization.framework shared network.
"""
try:
result = subprocess.run(
[config.COLIMA_BIN, "ls", "--json"],
capture_output=True, text=True, timeout=10, env=config.BREW_ENV,
)
if result.returncode == 0:
for line in result.stdout.strip().splitlines():
info = json.loads(line)
if info.get("status") == "Running":
addr = info.get("address", "")
if addr:
return addr
except Exception:
pass
return ""
def init_service_urls():
"""
Try to reach services via the Colima VM IP directly, falling back to
localhost. Using the VM IP makes the watchdog immune to port forwarding
failures.
"""
vm_ip = resolve_colima_ip()
if not vm_ip:
log.info("Could not resolve Colima VM IP — using localhost")
return
try:
req = urllib.request.Request(f"http://{vm_ip}:8989/ping",
headers={"X-Api-Key": config.sonarr_api_key})
urllib.request.urlopen(req, timeout=5)
config.sonarr_url = f"http://{vm_ip}:8989"
config.radarr_url = f"http://{vm_ip}:7878"
config.plex_url = f"http://{vm_ip}:32400"
log.info(f"Using Colima VM IP directly: {vm_ip} (bypasses port forwarding)")
return
except Exception:
pass
log.info(f"Colima VM IP {vm_ip} not reachable yet — using localhost")
def check_connectivity():
"""
Verify we can reach Sonarr. If the current URL fails, try switching
to the VM IP (immune to port forwarding failures). Only restart
Colima + stack as a last resort when even the VM IP is unreachable.
"""
# Can we reach Sonarr on the current URL?
try:
req = urllib.request.Request(
f"{config.sonarr_url}/ping",
headers={"X-Api-Key": config.sonarr_api_key},
)
urllib.request.urlopen(req, timeout=5)
return
except Exception:
pass
# Current URL failed — try the VM IP as failover
vm_ip = resolve_colima_ip()
if vm_ip:
try:
req = urllib.request.Request(
f"http://{vm_ip}:8989/ping",
headers={"X-Api-Key": config.sonarr_api_key},
)
urllib.request.urlopen(req, timeout=5)
config.sonarr_url = f"http://{vm_ip}:8989"
config.radarr_url = f"http://{vm_ip}:7878"
config.plex_url = f"http://{vm_ip}:32400"
log.warning(f"Port forwarding broken — switched to VM IP {vm_ip}")
return
except Exception:
pass
# Neither localhost nor VM IP works — check if the container is running at all
try:
result = subprocess.run(
["docker", "inspect", "--format", "{{.State.Status}}", "sonarr"],
capture_output=True, text=True, timeout=10, env=config.BREW_ENV,
)
if result.stdout.strip() != "running":
return
except Exception:
return
# Container running but completely unreachable — nuclear restart
log.warning("Sonarr unreachable via localhost and VM IP — restarting Colima")
try:
subprocess.run(
["docker", "compose", "down"],
cwd=str(config.BASE_DIR), capture_output=True, timeout=60, env=config.BREW_ENV,
)
log.info(" Containers stopped")
subprocess.run(
[config.COLIMA_BIN, "restart"],
capture_output=True, timeout=120, env=config.BREW_ENV,
)
log.info(" Colima restarted")
subprocess.run(
["docker", "compose", "up", "-d"],
cwd=str(config.BASE_DIR), capture_output=True, timeout=60, env=config.BREW_ENV,
)
log.info(" Containers restarted")
time.sleep(30)
init_service_urls()
except subprocess.TimeoutExpired:
log.error(" Recovery timed out — manual intervention may be needed (run: mrestart)")
except Exception as e:
log.error(f" Recovery failed: {e}")
"""
Health checks: database integrity auto-repair and import list staleness detection.
"""
import json
import shutil
import sqlite3 as _sqlite3
import subprocess
import time
import urllib.error
import urllib.request
from pathlib import Path
from watchdog import config
from watchdog.api import sonarr as sonarr_api, radarr as radarr_api
log = config.logger
# ─── Module state ─────────────────────────────────────────────────────
last_health_check = 0.0
# ─── Public entry point ──────────────────────────────────────────────
def health_check():
"""
Hourly health check (or on-demand after a 500 error) that catches:
1. Database corruption — detected by attempting Sonarr/Radarr API commands
2. Stale import lists — detected by comparing Plex Watchlist with *arr libraries
"""
global last_health_check
now = time.time()
if not config.force_health_check and now - last_health_check < config.HEALTH_CHECK_INTERVAL:
return
last_health_check = now
config.force_health_check = False
log.info("Health check: testing DB integrity and import list health")
_test_and_repair_db("sonarr")
if config.radarr_api_key:
_test_and_repair_db("radarr")
_verify_import_lists()
# ─── DB integrity ────────────────────────────────────────────────────
def _test_and_repair_db(service):
"""Test both read and write API paths to detect DB corruption."""
endpoints = [
("read", "/api/v3/series" if service == "sonarr" else "/api/v3/movie"),
("write", None),
]
call = sonarr_api if service == "sonarr" else radarr_api
for test_type, endpoint in endpoints:
try:
if test_type == "read":
call("GET", endpoint)
else:
cmd = "RefreshSeries" if service == "sonarr" else "RefreshMovie"
call("POST", "/api/v3/command", {"name": cmd})
except urllib.error.HTTPError as e:
if e.code == 500:
body = ""
try:
body = e.read().decode()
except Exception:
pass
if "database disk image is malformed" in body:
log.warning(f" {service} DB corruption detected ({test_type} path) — starting auto-repair")
_repair_database(service)
return
else:
log.error(f" {service} API 500 on {test_type} path (not DB corruption): {body[:200]}")
except Exception:
pass
def _repair_database(service):
"""Stop container, rebuild DB via SQLite recover/dump, restart container."""
db_path = config.BASE_DIR / f"config/{service}/{service}.db"
if not db_path.exists():
log.error(f" {service} DB not found at {db_path}")
return
try:
subprocess.run(
["docker", "compose", "stop", service],
cwd=str(config.BASE_DIR), capture_output=True, timeout=30, env=config.BREW_ENV,
)
log.info(f" Stopped {service} container")
except Exception as e:
log.error(f" Failed to stop {service}: {e}")
return
bak = db_path.with_suffix(".db.bak")
shutil.copy2(str(db_path), str(bak))
try:
conn = _sqlite3.connect(str(db_path))
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
conn.close()
except Exception:
pass
dump = subprocess.run(
["sqlite3", str(db_path), ".recover"],
capture_output=True, text=True, timeout=120,
)
if dump.returncode != 0 or not dump.stdout.strip():
log.warning(f" .recover failed or empty, falling back to .dump")
dump = subprocess.run(
["sqlite3", str(db_path), ".dump"],
capture_output=True, text=True, timeout=60,
)
if dump.returncode != 0:
log.error(f" DB dump failed: {dump.stderr[:200]}")
subprocess.run(
["docker", "compose", "start", service],
cwd=str(config.BASE_DIR), capture_output=True, timeout=30, env=config.BREW_ENV,
)
return
rebuilt = Path(f"/tmp/{service}_rebuilt.db")
rebuild = subprocess.run(
["sqlite3", str(rebuilt)],
input=dump.stdout, capture_output=True, text=True, timeout=60,
)
if rebuild.returncode != 0:
log.error(f" DB rebuild failed: {rebuild.stderr[:200]}")
rebuilt.unlink(missing_ok=True)
subprocess.run(
["docker", "compose", "start", service],
cwd=str(config.BASE_DIR), capture_output=True, timeout=30, env=config.BREW_ENV,
)
return
shutil.move(str(rebuilt), str(db_path))
db_path.with_suffix(".db-wal").unlink(missing_ok=True)
db_path.with_suffix(".db-shm").unlink(missing_ok=True)
log.info(f" {service} DB rebuilt successfully")
subprocess.run(
["docker", "compose", "start", service],
cwd=str(config.BASE_DIR), capture_output=True, timeout=30, env=config.BREW_ENV,
)
time.sleep(15)
log.info(f" {service} container restarted")
# ─── Import list staleness ────────────────────────────────────────────
def _verify_import_lists():
"""
Compare Plex Watchlist with Sonarr/Radarr libraries. If there are
watchlist items that aren't being picked up, delete and recreate
the import list to clear stale cached results.
"""
if not config.plex_token:
return
try:
req = urllib.request.Request(
"https://discover.provider.plex.tv/library/sections/watchlist/all"
f"?X-Plex-Token={config.plex_token}",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=15) as r:
data = json.loads(r.read())
watchlist = data.get("MediaContainer", {}).get("Metadata", [])
except Exception as e:
log.debug(f" Could not fetch Plex Watchlist: {e}")
return
if not watchlist:
log.info(" Plex Watchlist empty — nothing to verify")
return
watchlist_shows = {
item["title"].lower() for item in watchlist if item.get("type") == "show"
}
watchlist_movies = {
item["title"].lower() for item in watchlist if item.get("type") == "movie"
}
if watchlist_shows:
try:
sonarr_series = sonarr_api("GET", "/api/v3/series") or []
sonarr_titles = set()
for s in sonarr_series:
sonarr_titles.add(s["title"].lower())
for alt in s.get("alternateTitles", []):
sonarr_titles.add(alt["title"].lower())
missing = watchlist_shows - sonarr_titles
if missing:
log.warning(
f" Sonarr missing {len(missing)} watchlist show(s): "
f"{', '.join(sorted(missing)[:5])}"
)
_recreate_import_list("sonarr")
else:
log.info(" Sonarr import list OK — all watchlist shows present")
except Exception as e:
log.error(f" Sonarr import list verification failed: {e}")
if watchlist_movies and config.radarr_api_key:
try:
radarr_movies = radarr_api("GET", "/api/v3/movie") or []
radarr_titles = {m["title"].lower() for m in radarr_movies}
missing = watchlist_movies - radarr_titles
if missing:
log.warning(
f" Radarr missing {len(missing)} watchlist movie(s): "
f"{', '.join(sorted(missing)[:5])}"
)
_recreate_import_list("radarr")
else:
log.info(" Radarr import list OK — all watchlist movies present")
except Exception as e:
log.error(f" Radarr import list verification failed: {e}")
def _recreate_import_list(service):
"""Delete and recreate the Plex import list with the current token."""
log.info(f" Recreating {service} import list to clear stale cache")
call = sonarr_api if service == "sonarr" else radarr_api
try:
lists = call("GET", "/api/v3/importlist") or []
plex_lists = [il for il in lists if il.get("implementation") == "PlexImport"]
if not plex_lists:
log.warning(f" No Plex import list found in {service}")
return
for il in plex_lists:
il_config = dict(il)
old_id = il_config.pop("id", None)
for field in il_config.get("fields", []):
if field["name"] == "accessToken":
field["value"] = config.plex_token
call("DELETE", f"/api/v3/importlist/{old_id}")
result = call("POST", "/api/v3/importlist", il_config)
new_id = (result or {}).get("id", "?")
log.info(f" Replaced import list {old_id} → {new_id} with fresh token")
except Exception as e:
log.error(f" Failed to recreate {service} import list: {e}")
"""
Plex handlers: session quality monitoring, truncated episode detection,
and anime subtitle verification.
"""
import json
import time
import urllib.request
from watchdog import config
from watchdog.api import sonarr as sonarr_api, plex as plex_api
log = config.logger
# ─── Module state ─────────────────────────────────────────────────────
last_truncation_check = 0.0
# ─── Session quality monitoring ───────────────────────────────────────
def monitor_sessions():
if not config.plex_token:
return
try:
req = urllib.request.Request(
f"{config.plex_url}/status/sessions?X-Plex-Token={config.plex_token}",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as r:
data = json.load(r)
except Exception:
return
sessions = data.get("MediaContainer", {}).get("Metadata", [])
for s in sessions:
title = s.get("grandparentTitle", s.get("title", "?"))
ep_title = s.get("title", "")
player = s.get("Player", {})
if isinstance(player, list):
player = player[0] if player else {}
device = player.get("title", "?")
platform = player.get("platform", "?")
local = player.get("local", False)
session = s.get("Session", {})
location = session.get("location", "?")
bandwidth = session.get("bandwidth", 0)
relayed = session.get("relayed", False)
video_decision = "?"
audio_decision = "?"
resolution = "?"
for m in s.get("Media", []):
resolution = m.get("videoResolution", "?")
for p in m.get("Part", []):
for stream in p.get("Stream", []):
if stream.get("streamType") == 1:
video_decision = stream.get("decision", "?")
elif stream.get("streamType") == 2:
audio_decision = stream.get("decision", "?")
is_degraded = (
video_decision == "transcode"
or relayed
or location == "wan"
)
if is_degraded:
reasons = []
if video_decision == "transcode":
reasons.append("video transcoding")
if relayed:
reasons.append("relayed")
if location == "wan":
reasons.append("WAN connection")
log.warning(
f"Degraded stream: \"{title} - {ep_title}\" on {device} ({platform}) "
f"— {', '.join(reasons)} | {resolution} {bandwidth}kbps "
f"video={video_decision} audio={audio_decision} local={local}"
)
# ─── Truncated episode + subtitle detection ───────────────────────────
def detect_truncated_episodes():
"""
Two checks across all Plex show libraries:
1. Truncation: episodes under TRUNCATION_THRESHOLD of the show's median
duration are flagged as incomplete Usenet articles.
2. Missing subtitles (anime only): episodes without an English subtitle
track are flagged for re-download.
Flagged files are deleted from Sonarr so the missing sweep re-searches.
"""
global last_truncation_check
now = time.time()
if now - last_truncation_check < config.TRUNCATION_CHECK_INTERVAL:
return
last_truncation_check = now
if not config.plex_token:
return
log.info("Truncation/subtitle check: scanning Plex episodes")
try:
sections = plex_api("/library/sections")
show_sections = [
s for s in sections["MediaContainer"].get("Directory", [])
if s["type"] == "show"
]
except Exception as e:
log.error(f" Failed to get Plex libraries: {e}")
return
sonarr_series = sonarr_api("GET", "/api/v3/series") or []
bad_file_ids = []
for section in show_sections:
is_anime = section.get("title", "").lower() == "anime"
try:
shows = plex_api(f"/library/sections/{section['key']}/all")
shows = shows["MediaContainer"].get("Metadata", [])
except Exception:
continue
for show in shows:
try:
eps_data = plex_api(f"/library/metadata/{show['ratingKey']}/allLeaves")
episodes = eps_data["MediaContainer"].get("Metadata", [])
except Exception:
continue
if len(episodes) < 3:
continue
durations = [ep.get("duration", 0) / 60000 for ep in episodes]
durations = [d for d in durations if d > 0]
if not durations:
continue
sorted_durs = sorted(durations)
mid = len(sorted_durs) // 2
median_min = (sorted_durs[mid] if len(sorted_durs) % 2
else (sorted_durs[mid - 1] + sorted_durs[mid]) / 2)
if median_min < 10:
continue
threshold = median_min * config.TRUNCATION_THRESHOLD
show_title = show["title"]
matched_series = None
for s in sonarr_series:
if s["title"].lower() == show_title.lower():
matched_series = s
break
for alt in s.get("alternateTitles", []):
if alt["title"].lower() == show_title.lower():
matched_series = s
break
if matched_series:
break
if not matched_series:
continue
sonarr_eps = None
for ep in episodes:
ep_dur_min = ep.get("duration", 0) / 60000
sn = ep.get("parentIndex", 0)
en = ep.get("index", 0)
reason = None
if 0 < ep_dur_min < threshold:
reason = (
f"Truncated: {show_title} S{sn:02d}E{en:02d} "
f"({ep_dur_min:.1f} min, median ~{median_min:.0f} min)"
)
if reason is None and is_anime:
try:
detail = plex_api(f"/library/metadata/{ep['ratingKey']}")
meta = detail["MediaContainer"]["Metadata"][0]
has_eng_sub = False
for m in meta.get("Media", []):
for p in m.get("Part", []):
for stream in p.get("Stream", []):
if (stream.get("streamType") == 3
and stream.get("languageCode") == "eng"):
has_eng_sub = True
break
if not has_eng_sub:
reason = (
f"No English subs: {show_title} S{sn:02d}E{en:02d}"
)
except Exception:
pass
if reason is None:
continue
if sonarr_eps is None:
try:
sonarr_eps = sonarr_api("GET", f"/api/v3/episode?seriesId={matched_series['id']}") or []
except Exception:
break
for sep in sonarr_eps:
if sep["seasonNumber"] == sn and sep.get("episodeNumber") == en:
fid = sep.get("episodeFileId", 0)
if fid and fid not in bad_file_ids:
bad_file_ids.append(fid)
log.warning(f" {reason} — deleting file {fid}")
break
deleted = 0
for fid in bad_file_ids:
try:
sonarr_api("DELETE", f"/api/v3/episodefile/{fid}")
deleted += 1
except Exception as e:
log.error(f" Failed to delete file {fid}: {e}")
if deleted:
log.info(f" Deleted {deleted} bad file(s) — missing sweep will re-search them")
else:
log.info(" No truncated/subtitle-less episodes found")
"""
Radarr handlers: stuck imports, failed downloads.
"""
import time
from watchdog import config
from watchdog.api import radarr as api
log = config.logger
def handle_stuck_imports():
"""Auto-import stuck Radarr queue items with warnings."""
if not config.radarr_api_key:
return
try:
queue = api("GET", "/api/v3/queue?pageSize=200&includeUnknownMovieItems=true")
except Exception:
return
for item in (queue or {}).get("records", []):
if item.get("trackedDownloadStatus") != "warning" or item.get("trackedDownloadState") != "importing":
continue
title = item.get("title", "?")[:60]
movie_id = item.get("movieId")
download_id = item.get("downloadId")
if not movie_id or not download_id:
continue
log.info(f"Radarr stuck import: {title}")
try:
scan = api("GET",
f"/api/v3/manualimport?downloadId={download_id}"
f"&movieId={movie_id}&filterExistingFiles=false",
timeout=60)
files = []
for f in (scan or []):
if not f.get("movie"):
continue
rejections = f.get("rejections", [])
has_permanent_block = any(
r.get("type") == "permanent" and "sample" not in r.get("reason", "").lower()
for r in rejections
)
if has_permanent_block:
continue
files.append({
"path": f["path"],
"movieId": f["movie"]["id"],
"quality": f["quality"],
"languages": f.get("languages", [{"id": 1, "name": "English"}]),
"indexerFlags": 0,
"downloadId": download_id,
})
if files:
result = api("POST", "/api/v3/command", {
"name": "ManualImport",
"files": files,
})
log.info(f" Radarr auto-imported {len(files)} file(s): {(result or {}).get('status', '?')}")
else:
log.warning(f" No importable files for: {title}")
except Exception as e:
log.error(f" Radarr import error for {title}: {e}")
def handle_failed_downloads():
"""Remove failed Radarr downloads without blocklisting and re-search."""
if not config.radarr_api_key:
return
try:
queue = api("GET", "/api/v3/queue?pageSize=200&includeUnknownMovieItems=true")
except Exception:
return
movie_ids_to_search = []
for item in (queue or {}).get("records", []):
state = item.get("trackedDownloadState", "")
tracked_status = item.get("trackedDownloadStatus", "")
status = item.get("status", "")
is_failed = (
state == "importFailed"
or tracked_status == "error"
or status == "failed"
)
if not is_failed:
continue
title = item.get("title", "?")[:60]
item_id = item["id"]
movie_id = item.get("movieId")
log.info(f"Radarr failed download: {title}")
try:
api("DELETE",
f"/api/v3/queue/{item_id}?removeFromClient=true&blocklist=false&skipRedownload=true")
log.info(f" Removed from queue")
if movie_id:
movie_ids_to_search.append(movie_id)
except Exception as e:
log.error(f" Error: {e}")
for mid in movie_ids_to_search:
try:
api("POST", "/api/v3/command", {
"name": "MoviesSearch",
"movieIds": [mid],
})
log.info(f" Re-searching movie {mid}")
time.sleep(config.EPISODE_SEARCH_DELAY)
except Exception as e:
log.error(f" Movie search error for {mid}: {e}")
"""
Sonarr handlers: watchlist sync, new series detection, anime rerouting,
stuck imports, failed downloads, blocklist hygiene, missing episode sweep.
"""
import time
import urllib.error
from datetime import datetime, timezone
from watchdog import config
from watchdog.api import sonarr as api, radarr as radarr_api
log = config.logger
# ─── Module state ─────────────────────────────────────────────────────
known_series_ids: set = set()
initial_snapshot_taken = False
last_missing_sweep = 0.0
last_blocklist_hygiene = 0.0
# ─── Watchlist sync ───────────────────────────────────────────────────
def sync_watchlist():
"""Trigger ImportListSync on both Sonarr and Radarr."""
try:
api("POST", "/api/v3/command", {"name": "ImportListSync"})
except urllib.error.HTTPError as e:
if e.code == 500:
config.force_health_check = True
raise
try:
radarr_api("POST", "/api/v3/command", {"name": "ImportListSync"})
except urllib.error.HTTPError as e:
if e.code == 500:
config.force_health_check = True
except Exception:
pass
# ─── Series snapshot + new series detection ───────────────────────────
def get_all_series():
result = api("GET", "/api/v3/series")
return {s["id"]: s for s in (result or [])}
def snapshot_series():
global known_series_ids, initial_snapshot_taken
series = get_all_series()
known_series_ids = set(series.keys())
initial_snapshot_taken = True
log.info(f"Snapshot: {len(known_series_ids)} existing series")
def is_anime(series):
return "Anime" in series.get("genres", [])
def reroute_anime(series):
"""If a series is anime but was added with TV defaults, fix its config."""
sid = series["id"]
title = series["title"]
if not is_anime(series):
return
needs_update = (
series["rootFolderPath"] != config.ANIME_ROOT
or series["seriesType"] != "anime"
or series["qualityProfileId"] != config.ANIME_QUALITY_PROFILE_ID
)
if not needs_update:
return
log.info(f"Rerouting anime: {title} -> {config.ANIME_ROOT}")
series["rootFolderPath"] = config.ANIME_ROOT
series["seriesType"] = "anime"
series["qualityProfileId"] = config.ANIME_QUALITY_PROFILE_ID
old_path = series["path"]
series_folder = old_path.rsplit("/", 1)[-1]
series["path"] = f"{config.ANIME_ROOT}/{series_folder}"
try:
api("PUT", f"/api/v3/series/{sid}?moveFiles=true", series)
log.info(f" Updated and moved {title}")
except Exception as e:
log.error(f" Reroute error for {title}: {e}")
def detect_and_search_new_series():
"""
Compare current series against our snapshot. For any new series:
1. Reroute anime to the anime folder/profile/type
2. Search episodes one-by-one starting from S01E01
"""
global known_series_ids
current = get_all_series()
current_ids = set(current.keys())
new_ids = current_ids - known_series_ids
if not new_ids:
known_series_ids = current_ids
return
for sid in new_ids:
series = current[sid]
title = series["title"]
log.info(f"New series detected: {title} (ID: {sid})")
reroute_anime(series)
episodes = api("GET", f"/api/v3/episode?seriesId={sid}")
missing = [
ep for ep in episodes
if ep.get("monitored") and not ep.get("hasFile") and ep["seasonNumber"] > 0
]
missing.sort(key=lambda e: (e["seasonNumber"], e["episodeNumber"]))
if not missing:
log.info(f" No missing episodes for {title}")
continue
log.info(f" Searching {len(missing)} episodes one-by-one for {title}")
for i, ep in enumerate(missing):
label = f"S{ep['seasonNumber']:02d}E{ep['episodeNumber']:02d}"
try:
api("POST", "/api/v3/command", {
"name": "EpisodeSearch",
"episodeIds": [ep["id"]],
})
log.info(f" [{i+1}/{len(missing)}] Searched {label}")
except Exception as e:
log.error(f" Search error for {label}: {e}")
if i < len(missing) - 1:
time.sleep(config.EPISODE_SEARCH_DELAY)
known_series_ids = current_ids
# ─── Stuck imports ────────────────────────────────────────────────────
def handle_stuck_imports():
"""
Handle ALL queue items stuck with warnings, including:
- "matched to series by ID" (name mismatch)
- "Unable to determine if file is a sample" (obfuscated filenames)
- Any other importable warning state
"""
queue = api("GET", "/api/v3/queue?pageSize=200&includeUnknownSeriesItems=true")
for item in queue.get("records", []):
tracked_status = item.get("trackedDownloadStatus", "")
tracked_state = item.get("trackedDownloadState", "")
if tracked_status != "warning" or tracked_state != "importing":
continue
title = item.get("title", "?")[:60]
series_id = item.get("seriesId")
download_id = item.get("downloadId")
if not series_id or not download_id:
continue
warning_reasons = []
for msg in item.get("statusMessages", []):
for m in msg.get("messages", []):
warning_reasons.append(m)
log.info(f"Stuck import detected: {title} — {'; '.join(warning_reasons)}")
try:
scan = api("GET",
f"/api/v3/manualimport?downloadId={download_id}"
f"&seriesId={series_id}&filterExistingFiles=false",
timeout=60)
files = []
for f in scan:
if not f.get("series") or not f.get("episodes"):
continue
rejections = f.get("rejections", [])
has_permanent_block = any(
r.get("type") == "permanent" and "sample" not in r.get("reason", "").lower()
for r in rejections
)
if has_permanent_block:
continue
files.append({
"path": f["path"],
"seriesId": f["series"]["id"],
"episodeIds": [e["id"] for e in f["episodes"]],
"quality": f["quality"],
"languages": f.get("languages", [{"id": 1, "name": "English"}]),
"indexerFlags": 0,
"releaseType": "singleEpisode",
"downloadId": download_id,
})
if files:
result = api("POST", "/api/v3/command", {
"name": "ManualImport",
"files": files,
})
log.info(f" Auto-imported {len(files)} file(s): {result.get('status', '?')}")
else:
log.warning(f" No importable files for: {title}")
except Exception as e:
log.error(f" Import error for {title}: {e}")
# ─── Failed downloads ─────────────────────────────────────────────────
def handle_failed_downloads():
"""
Catch all failure states and remove without blocklisting so a different
release can be tried. Re-searches affected episodes individually.
"""
queue = api("GET", "/api/v3/queue?pageSize=200&includeUnknownSeriesItems=true")
episode_ids_to_search = []
for item in queue.get("records", []):
state = item.get("trackedDownloadState", "")
tracked_status = item.get("trackedDownloadStatus", "")
status = item.get("status", "")
is_failed = (
state == "importFailed"
or tracked_status == "error"
or status == "failed"
)
if not is_failed:
continue
title = item.get("title", "?")[:60]
item_id = item["id"]
episode_id = item.get("episodeId")
log.info(f"Failed download detected: {title} (status={status} state={state} tracked={tracked_status})")
try:
api("DELETE",
f"/api/v3/queue/{item_id}?removeFromClient=true&blocklist=false&skipRedownload=true")
log.info(f" Removed failed item from queue")
if episode_id:
episode_ids_to_search.append(episode_id)
except Exception as e:
log.error(f" Retry error for {title}: {e}")
for eid in episode_ids_to_search:
try:
api("POST", "/api/v3/command", {
"name": "EpisodeSearch",
"episodeIds": [eid],
})
log.info(f" Re-searching episode {eid}")
time.sleep(config.EPISODE_SEARCH_DELAY)
except Exception as e:
log.error(f" Episode search error for {eid}: {e}")
# ─── Blocklist hygiene ────────────────────────────────────────────────
def blocklist_hygiene():
"""
Prevent the blocklist death spiral: when all releases for an episode are
blocklisted, it can never be found again. Periodically clear blocklist
entries for episodes that are still missing, then re-search them.
"""
global last_blocklist_hygiene
now = time.time()
if now - last_blocklist_hygiene < config.BLOCKLIST_HYGIENE_INTERVAL:
return
last_blocklist_hygiene = now
log.info("Blocklist hygiene: scanning for stuck episodes")
missing_episode_ids = set()
series_list = api("GET", "/api/v3/series")
for s in series_list:
stats = s.get("statistics", {})
if stats.get("episodeFileCount", 0) >= stats.get("episodeCount", 0):
continue
episodes = api("GET", f"/api/v3/episode?seriesId={s['id']}")
utcnow = datetime.now(timezone.utc).isoformat()
for ep in episodes:
if ep["seasonNumber"] == 0 or not ep.get("monitored") or ep.get("hasFile"):
continue
airdate = ep.get("airDateUtc", "")
if airdate and airdate < utcnow:
missing_episode_ids.add(ep["id"])
if not missing_episode_ids:
log.info(" No missing aired episodes — blocklist clean")
return
blocklist = api("GET", "/api/v3/blocklist?page=1&pageSize=1000")
records = blocklist.get("records", [])
cleared = 0
episode_ids_to_search = set()
for record in records:
ep_ids = [e.get("id") for e in record.get("episodes", [])]
if any(eid in missing_episode_ids for eid in ep_ids):
try:
api("DELETE", f"/api/v3/blocklist/{record['id']}")
cleared += 1
episode_ids_to_search.update(eid for eid in ep_ids if eid in missing_episode_ids)
except Exception as e:
log.error(f" Failed to clear blocklist entry {record['id']}: {e}")
log.info(f" Cleared {cleared} blocklist entries for {len(episode_ids_to_search)} missing episodes")
for eid in episode_ids_to_search:
try:
api("POST", "/api/v3/command", {"name": "EpisodeSearch", "episodeIds": [eid]})
time.sleep(config.EPISODE_SEARCH_DELAY)
except Exception as e:
log.error(f" Re-search failed for episode {eid}: {e}")
# ─── Missing episode sweep ────────────────────────────────────────────
def sweep_missing_episodes():
"""
Every MISSING_SWEEP_INTERVAL, find all missing aired episodes across all
series and trigger individual episode searches.
"""
global last_missing_sweep
now = time.time()
if now - last_missing_sweep < config.MISSING_SWEEP_INTERVAL:
return
last_missing_sweep = now
log.info("Missing episode sweep: scanning all series")
total_missing = 0
total_searched = 0
series_list = api("GET", "/api/v3/series")
for s in series_list:
stats = s.get("statistics", {})
if stats.get("episodeFileCount", 0) >= stats.get("episodeCount", 0):
continue
episodes = api("GET", f"/api/v3/episode?seriesId={s['id']}")
utcnow = datetime.now(timezone.utc).isoformat()
missing = [
ep for ep in episodes
if ep["seasonNumber"] > 0
and ep.get("monitored")
and not ep.get("hasFile")
and ep.get("airDateUtc", "") < utcnow
and ep.get("airDateUtc", "") != ""
]
if not missing:
continue
total_missing += len(missing)
title = s["title"]
log.info(f" {title}: {len(missing)} missing aired episodes — searching")
for ep in sorted(missing, key=lambda e: (e["seasonNumber"], e["episodeNumber"])):
label = f"S{ep['seasonNumber']:02d}E{ep['episodeNumber']:02d}"
try:
api("POST", "/api/v3/command", {"name": "EpisodeSearch", "episodeIds": [ep["id"]]})
total_searched += 1
log.info(f" Searched {label}")
time.sleep(config.EPISODE_SEARCH_DELAY)
except Exception as e:
log.error(f" Search failed for {label}: {e}")
log.info(f" Sweep complete: {total_searched}/{total_missing} episodes searched")
"""
VPN health monitoring and automatic server failover via gluetun.
"""
import random
import shutil
import subprocess
import time
from watchdog import config
log = config.logger
# ─── Module state ─────────────────────────────────────────────────────
last_vpn_check = 0.0
vpn_consecutive_failures = 0
# ─── Public entry point ──────────────────────────────────────────────
def check_health():
"""
Monitor gluetun's Docker health status. If the VPN tunnel is down for
VPN_UNHEALTHY_THRESHOLD consecutive checks, rotate to a different
WireGuard server and restart gluetun.
"""
global last_vpn_check, vpn_consecutive_failures
now = time.time()
if now - last_vpn_check < config.VPN_CHECK_INTERVAL:
return
last_vpn_check = now
healthy = False
try:
result = subprocess.run(
["docker", "inspect", "--format", "{{.State.Health.Status}}", "gluetun"],
capture_output=True, text=True, timeout=10, env=config.BREW_ENV,
)
if result.stdout.strip() == "healthy":
healthy = True
except Exception:
pass
if healthy:
if vpn_consecutive_failures > 0:
log.info(f"VPN recovered after {vpn_consecutive_failures} failed check(s)")
vpn_consecutive_failures = 0
return
vpn_consecutive_failures += 1
log.warning(f"VPN unhealthy ({vpn_consecutive_failures}/{config.VPN_UNHEALTHY_THRESHOLD})")
if vpn_consecutive_failures < config.VPN_UNHEALTHY_THRESHOLD:
return
vpn_consecutive_failures = 0
_rotate_server()
# ─── Server rotation ─────────────────────────────────────────────────
def _rotate_server():
"""Pick a different WireGuard server config and restart gluetun."""
wg_dir = config.BASE_DIR / "config" / "gluetun" / "wireguard"
active_conf = wg_dir / "wg0.conf"
pool = sorted(wg_dir.glob("us-*.conf"))
if not pool:
log.error("No VPN server pool configs found — cannot rotate")
return
current_endpoint = ""
try:
for line in active_conf.read_text().splitlines():
if line.strip().startswith("Endpoint"):
current_endpoint = line.split("=", 1)[1].strip()
break
except Exception:
pass
candidates = []
for conf in pool:
try:
text = conf.read_text()
for line in text.splitlines():
if line.strip().startswith("Endpoint"):
ep = line.split("=", 1)[1].strip()
if ep != current_endpoint:
candidates.append(conf)
break
except Exception:
continue
if not candidates:
candidates = pool
chosen = random.choice(candidates)
log.info(f"Rotating VPN server: {chosen.stem}")
shutil.copy2(str(chosen), str(active_conf))
try:
subprocess.run(
["docker", "compose", "restart", "gluetun"],
cwd=str(config.BASE_DIR), capture_output=True, timeout=60, env=config.BREW_ENV,
)
time.sleep(15)
log.info(f" gluetun restarted with {chosen.stem}")
except Exception as e:
log.error(f" Failed to restart gluetun: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment