#!/usr/bin/env python3 """ Youtube Watchmarker [1] to Grayjay [2] history migration, using channel and video info from MW Metadata [3]. [1] https://github.com/sniklaus/youtube-watchmarker [2] https://grayjay.app [3] https://mattw.io/youtube-metadata """ from base64 import b64decode from dataclasses import dataclass from datetime import datetime import json import logging from pathlib import Path import re import shutil import sqlite3 from typing import NotRequired, TypedDict import isodate class YTWMHistory(TypedDict): strIdent: str strTitle: str intTimestamp: int intCount: int class MWImage(TypedDict): url: str width: int height: int class MWChannelSnippet(TypedDict): title: str description: str customUrl: NotRequired[str] thumbnails: dict[str, MWImage] class MWChannelStats(TypedDict): subscriberCount: str class MWChannelBrandingImage(TypedDict): bannerExternalUrl: str class MWChannelBranding(TypedDict): image: NotRequired[MWChannelBrandingImage] class MWChannel(TypedDict): id: str snippet: MWChannelSnippet statistics: MWChannelStats brandingSettings: MWChannelBranding class MWVideoSnippet(TypedDict): publishedAt: str channelId: str title: str thumbnails: dict[str, MWImage] class MWVideoContent(TypedDict): duration: str class MWVideoStats(TypedDict): viewCount: NotRequired[str] class MWVideo(TypedDict): id: str snippet: MWVideoSnippet contentDetails: MWVideoContent statistics: MWVideoStats class GJDBThumb(TypedDict): Url: str Quality: int class GJDBThumbs(TypedDict): Sources: list[GJDBThumb] class GJDBID(TypedDict): Platform: str Value: str PluginID: str ClaimType: int ClaimFieldType: int class GJDBAuthor(TypedDict): ID: GJDBID Name: str Url: str Thumbnail: str Subscribers: int class GJDBVideo(TypedDict): ContentType: int Thumbnails: GJDBThumbs Duration: int ViewCount: int | None IsLive: bool Metadata: dict ID: GJDBID DateTime: int Name: str Author: GJDBAuthor Url: str ShareUrl: str | None BackendUrl: str | None IsDetailObject: bool class GJDBHistory(TypedDict): Video: GJDBVideo Position: int Date: str @dataclass class GJDBRow: id: int url: str position: int date: str name: str data: GJDBHistory class GJVideoID(TypedDict): platform: str value: str pluginId: str class GJChannelID(GJVideoID): claimType: int class GJChannelBase(TypedDict): id: GJChannelID name: str url: str thumbnail: str subscribers: int class GJChannel(GJChannelBase): banner: NotRequired[str] description: str links: dict[str, str] urlAlternatives: list[str] class GJThumb(TypedDict): url: str quality: int class GJThumbs(TypedDict): sources: list[GJThumb] class GJVideo(TypedDict): id: GJVideoID name: str thumbnails: GJThumbs author: GJChannelBase datetime: int url: str shareUrl: str duration: int viewCount: NotRequired[int] @dataclass class GJHistory: url: str watch_timestamp: int watch_progress: int title: str @classmethod def decode(cls, raw: str): url, watch_time, watch_progress, title = raw.split("|||") return cls(url, int(watch_time), int(watch_progress), title) def encode(self): return "|||".join((self.url, str(self.watch_timestamp), str(self.watch_progress), self.title)) LOG = logging.getLogger() def get_url_id(url: str) -> str | None: match = re.search("[?&]v=(.{11})(&|$)", url) return match.group(1) if match else None def normalise_url(url: str): url_id = get_url_id(url) return f"https://www.youtube.com/watch?v={url_id}" if url_id else url def main( ytwm_database: Path, mw_channels: list[Path] | None, mw_videos: list[Path] | None, gj_input: Path | None, gj_output: Path, gj_database: Path | None, force_output: bool, ): LOG.info("Loading Youtube Watchmarker history: %s", ytwm_database) with open(ytwm_database, "rb") as fp: history: list[YTWMHistory] = json.loads(b64decode(fp.read())) channel_meta = dict[str, MWChannel]() for channel_meta_path in (mw_channels or ()): LOG.info("Loading MW Metadata channels: %s", channel_meta_path) with open(channel_meta_path, errors="surrogateescape") as fp: channel_meta |= {item["id"]: item for item in json.load(fp)} video_meta = dict[str, MWVideo]() for video_meta_path in (mw_videos or ()): LOG.info("Loading MW Metadata videos: %s", video_meta_path) with open(video_meta_path, errors="surrogateescape") as fp: video_meta |= {item["id"]: item for item in json.load(fp)} if gj_output.exists(): if not force_output: raise RuntimeError("Output path already exists (use --force to overwrite)") LOG.info("Removing existing output files: %s", gj_output) shutil.rmtree(gj_output) if gj_input: LOG.info("Copying existing Grayjay backup: %s -> %s", gj_input, gj_output) shutil.copytree(gj_input, gj_output) else: LOG.info("Creating new Grayjay backup: %s", gj_output) gj_output.mkdir() (gj_output / "stores").mkdir() channel_cache_path = gj_output / "cache_channels" if channel_cache_path.is_file(): LOG.info("Loading existing Grayjay channel cache: %s", channel_cache_path) with open(channel_cache_path) as fp: raw_channels: list[GJChannel] = json.load(fp) channel_cache = {item["id"]["value"]: item for item in raw_channels} else: channel_cache = {} video_cache_path = gj_output / "cache_videos" if video_cache_path.is_file(): LOG.info("Loading existing Grayjay video cache: %s", video_cache_path) with open(video_cache_path) as fp: raw_videos: list[GJVideo] = json.load(fp) video_cache = {item["id"]["value"]: item for item in raw_videos} else: video_cache = {} history_store_path = gj_output / "stores" / "history" history_store = dict[tuple[str, int], GJHistory]() if history_store_path.is_file(): LOG.info("Loading existing Grayjay history: %s", history_store_path) with open(history_store_path) as fp: for raw in json.load(fp): item = GJHistory.decode(raw) history_store[(normalise_url(item.url), item.watch_timestamp)] = item LOG.info("Building history (start: %s items)", len(history_store)) for item in history: video_id = item["strIdent"] video_url = f"https://youtube.com/watch?v={video_id}" try: video = video_meta[video_id] except KeyError: LOG.debug("Missing video metadata: %s", video_id) video = None watch_timestamp = int(item["intTimestamp"] / 1000) if video: duration = int(isodate.parse_duration(video["contentDetails"]["duration"]).total_seconds()) else: duration = 1 if (video_id, watch_timestamp) not in history_store: LOG.debug("Adding to history store: %s at %s", video_id, watch_timestamp) if video: title = video["snippet"]["title"] elif item["strTitle"] != "YouTube": title = item["strTitle"] else: title = "" history_store[(video_id, watch_timestamp)] = GJHistory( url=video_url, watch_timestamp=watch_timestamp, watch_progress=duration, title=title, ) if not video: continue if video_id not in video_cache: channel_id = video["snippet"]["channelId"] channel_url = f"https://www.youtube.com/channel/{channel_id}" try: channel = channel_meta[channel_id] except KeyError: LOG.debug("Missing channel metadata: %s", channel_id) continue channel_base = GJChannelBase({ "id": { "platform": "YouTube", "value": channel_id, "pluginId": "35ae969a-a7db-11ed-afa1-0242ac120002", "claimType": 2, }, "name": channel["snippet"]["title"], "url": channel_url, "thumbnail": sorted( channel["snippet"]["thumbnails"].values(), key=lambda thumb: max(thumb["width"], thumb["height"]), )[-1]["url"], "subscribers": int(channel["statistics"]["subscriberCount"]), }) if channel_id not in channel_cache: LOG.debug("Adding to channel cache: %s", channel_id) channel_cache[channel_id] = GJChannel({ **channel_base, "description": channel["snippet"]["description"], "links": {}, "urlAlternatives": [channel_url], }) try: channel_cache[channel_id]["urlAlternatives"].append(f"https://www.youtube.com/{channel["snippet"]["customUrl"]}") except KeyError: pass try: channel_cache[channel_id]["banner"] = channel["brandingSettings"]["image"]["bannerExternalUrl"] except KeyError: pass LOG.debug("Adding to video cache: %s", video_id) video_cache[video_id] = GJVideo({ "id": { "platform": "YouTube", "value": video_id, "pluginId": "35ae969a-a7db-11ed-afa1-0242ac120002", }, "name": video["snippet"]["title"], "thumbnails": { "sources": [ { "url": thumb["url"], "quality": max(thumb["width"], thumb["height"]), } for thumb in video["snippet"]["thumbnails"].values() ], }, "author": channel_base, "datetime": int(isodate.parse_datetime(video["snippet"]["publishedAt"]).timestamp()), "url": video_url, "shareUrl": video_url, "duration": duration, }) try: video_cache[video_id]["viewCount"] = int(video["statistics"]["viewCount"]) except KeyError: pass LOG.info("Built history (end: %s items)", len(history_store)) LOG.info("Saving Grayjay channel cache: %s", channel_cache_path) with open(channel_cache_path, "w") as fp: json.dump(list(channel_cache.values()), fp) LOG.info("Saving Grayjay video cache: %s", video_cache_path) with open(video_cache_path, "w") as fp: json.dump(list(video_cache.values()), fp) LOG.info("Saving Grayjay history store: %s", history_store_path) with open(history_store_path, "w") as fp: json.dump([item.encode() for item in history_store.values()], fp) if not gj_database: return LOG.info("Loading database history: %s", gj_database) with sqlite3.connect(gj_database) as conn: cur = conn.cursor() db_store = dict[tuple[str, int], GJDBRow]() for raw in cur.execute("SELECT ID, Url, Position, DateTime, Name, Serialized FROM history"): row = GJDBRow(*raw[:-1], json.loads(raw[-1])) date = isodate.parse_datetime(row.date.replace(" ", "T")) db_store[(normalise_url(row.url), int(date.timestamp()))] = row LOG.info("Building history (start: %s items)", len(db_store)) inserts = list[tuple[str, int, str, str, sqlite3.Binary]]() for (key_id, key_timestamp), item in history_store.items(): if (f"https://www.youtube.com/watch?v={key_id}", key_timestamp) in db_store: continue video_id = get_url_id(item.url) if not video_id: continue date = datetime.fromtimestamp(item.watch_timestamp) try: video_info = video_cache[video_id] except KeyError: LOG.debug("Missing video cache: %s", video_id) serial = GJDBHistory({ "Video": { "ContentType": 1, "Thumbnails": { "Sources": [], }, "Duration": item.watch_progress, "IsLive": False, "Metadata": {}, "ID": { "Platform": "YouTube", "Value": video_id, "PluginID": "35ae969a-a7db-11ed-afa1-0242ac120002", "ClaimType": 0, "ClaimFieldType": -1 }, "Name": item.title, "Author": { "ID": {}, }, "Url": item.url, "ShareUrl": item.url, "BackendUrl": None, "IsDetailObject": False }, "Position": item.watch_progress, "Date": date.astimezone().strftime("%Y-%m-%dT%H:%M:%S.%f0%:z"), }) else: serial = GJDBHistory({ "Video": { "ContentType": 1, "Thumbnails": { "Sources": [ { "Url": thumb["url"], "Quality": thumb["quality"], } for thumb in video_info["thumbnails"]["sources"] ], }, "Duration": video_info["duration"], "ViewCount": video_info.get("viewCount"), "IsLive": False, "Metadata": {}, "ID": { "Platform": "YouTube", "Value": video_id, "PluginID": "35ae969a-a7db-11ed-afa1-0242ac120002", "ClaimType": 0, "ClaimFieldType": -1 }, "DateTime": video_info["datetime"], "Name": video_info["name"], "Author": { "ID": { "Platform": "YouTube", "Value": video_info["author"]["id"]["value"], "PluginID": "35ae969a-a7db-11ed-afa1-0242ac120002", "ClaimType": 2, "ClaimFieldType": -1 }, "Name": video_info["author"]["name"], "Url": video_info["author"]["url"], "Thumbnail": video_info["author"]["thumbnail"], "Subscribers": video_info["author"]["subscribers"], }, "Url": video_info["url"], "ShareUrl": video_info["shareUrl"], "BackendUrl": None, "IsDetailObject": False }, "Position": item.watch_progress, "Date": date.astimezone().strftime("%Y-%m-%dT%H:%M:%S.%f0%:z"), }) LOG.debug("Adding to insert queue: %s", video_id) inserts.append(( serial["Video"]["Url"], serial["Position"], date.strftime("%Y-%m-%d %H:%M:%S.%f"), serial["Video"]["Name"], sqlite3.Binary(json.dumps(serial, separators=(",", ":")).encode()), )) LOG.info("Built history (pending: %s items)", len(inserts)) if inserts: LOG.info("Saving Grayjay database: %s", gj_database) cur.executemany("INSERT INTO history (Url, Position, DateTime, Name, Serialized) VALUES (?, ?, ?, ?, ?)", inserts) if __name__ == "__main__": import argparse p = argparse.ArgumentParser() p.add_argument("-d", "--debug", action="store_true", help="show debug logging") p.add_argument("--history", type=Path, required=True, help="(Youtube Watchmarker) database export") p.add_argument("--channels", type=Path, nargs="+", help="(MW Metadata) channel metadata export(s)") p.add_argument("--videos", type=Path, nargs="+", help="(MW Metadata) video metadata export(s)") p.add_argument("--database", type=Path, required=True, help="(Grayjay) database file to import history") p.add_argument("--input", type=Path, help="(Grayjay) existing unpacked backup to update") p.add_argument("--output", type=Path, required=True, help="(Grayjay) new unpacked backup target") p.add_argument("--force", action="store_true", help="delete and recreate existing output") a = p.parse_args() logging.basicConfig(level=logging.DEBUG if a.debug else logging.INFO) main(a.history, a.channels, a.videos, a.input, a.output, a.database, a.force)