|
#!/usr/bin/env python3
|
|
# /// script
|
|
# requires-python = ">=3.10"
|
|
# ///
|
|
#
|
|
# (PEP 723 inline metadata — `uv run csv_to_ynab_json.py` uses this Python bound;
|
|
# no third-party packages; stdlib only.)
|
|
"""
|
|
Convert YNAB CSV exports (Plan + Register) into JSON shaped like a full plan
|
|
export. Field meanings and enums follow the YNAB API (see OpenAPI spec).
|
|
|
|
Spec: https://api.ynab.com/papi/open_api_spec.yaml
|
|
|
|
Limitations (CSV does not contain):
|
|
- Server-issued UUIDs, goals, category notes, payee locations, splits,
|
|
scheduled transactions, real account balances, direct-import flags.
|
|
|
|
Dependencies: Python 3.10+ standard library only.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
import tempfile
|
|
import uuid
|
|
import zipfile
|
|
from collections import defaultdict
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
# Deterministic IDs across runs (same inputs -> same UUIDs)
|
|
_NS = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
|
|
|
|
# YNAB export / API labels for the Ready to Assign inflow category (not user payees).
|
|
# See MonthSummaryBase.income in the OpenAPI spec.
|
|
_INFLOW_GROUP = "Inflow"
|
|
_RTA_CATEGORY = "Ready to Assign"
|
|
|
|
|
|
def _uid(kind: str, *parts: str) -> str:
|
|
return str(uuid.uuid5(_NS, f"{kind}|" + "|".join(parts)))
|
|
|
|
|
|
def parse_money(s: str) -> int:
|
|
"""CSV currency string -> YNAB milliunits (1000 = $1.00)."""
|
|
if not s or not str(s).strip():
|
|
return 0
|
|
s = str(s).strip().replace("$", "").replace(",", "")
|
|
neg = False
|
|
if s.startswith("-"):
|
|
neg = True
|
|
s = s[1:].strip()
|
|
if s.startswith("(") and s.endswith(")"):
|
|
neg = True
|
|
s = s[1:-1].strip()
|
|
try:
|
|
v = float(s)
|
|
except ValueError:
|
|
return 0
|
|
if neg:
|
|
v = -v
|
|
return int(round(v * 1000))
|
|
|
|
|
|
def parse_month_label(label: str) -> str | None:
|
|
"""'Mar 2024' -> '2024-03-01'."""
|
|
label = label.strip()
|
|
try:
|
|
dt = datetime.strptime(label, "%b %Y")
|
|
except ValueError:
|
|
try:
|
|
dt = datetime.strptime(label, "%B %Y")
|
|
except ValueError:
|
|
return None
|
|
return f"{dt.year:04d}-{dt.month:02d}-01"
|
|
|
|
|
|
def guess_account_type(name: str) -> str:
|
|
"""
|
|
Map account name to AccountType enum (OpenAPI components/schemas/AccountType).
|
|
Uses only generic English keywords, not institution or person names.
|
|
"""
|
|
n = name.lower()
|
|
|
|
if "mortgage" in n or "home loan" in n:
|
|
return "mortgage"
|
|
if "heloc" in n or "home equity" in n and "loan" not in n:
|
|
return "lineOfCredit"
|
|
if "line of credit" in n or re.search(r"\bloc\b", n):
|
|
return "lineOfCredit"
|
|
if "student loan" in n:
|
|
return "studentLoan"
|
|
if "auto loan" in n or "car loan" in n:
|
|
return "autoLoan"
|
|
if "medical" in n and "debt" in n:
|
|
return "medicalDebt"
|
|
if "credit card" in n:
|
|
return "creditCard"
|
|
if re.search(r"\b(visa|mastercard|amex|discover)\b", n):
|
|
return "creditCard"
|
|
if n.endswith(" card") or n.endswith(" card "):
|
|
return "creditCard"
|
|
if "store card" in n:
|
|
return "creditCard"
|
|
if "savings" in n:
|
|
return "savings"
|
|
if re.search(r"\bcash\b", n) or "wallet" in n:
|
|
return "cash"
|
|
if any(x in n for x in ("ira", "401k", "401(k)", "brokerage", "investment")):
|
|
return "otherAsset"
|
|
if any(x in n for x in ("equity", "retirement")) and "line" not in n:
|
|
return "otherAsset"
|
|
if any(x in n for x in ("crypto", "bitcoin", "precious metal")):
|
|
return "otherAsset"
|
|
if "loan" in n:
|
|
return "otherDebt"
|
|
if "liability" in n:
|
|
return "otherLiability"
|
|
return "checking"
|
|
|
|
|
|
def guess_on_budget(acct_type: str) -> bool:
|
|
"""Tracking / debt accounts are off-budget; everyday accounts on-budget."""
|
|
off_budget = frozenset(
|
|
{
|
|
"mortgage",
|
|
"autoLoan",
|
|
"studentLoan",
|
|
"personalLoan",
|
|
"medicalDebt",
|
|
"otherDebt",
|
|
"otherAsset",
|
|
"otherLiability",
|
|
}
|
|
)
|
|
return acct_type not in off_budget
|
|
|
|
|
|
def cleared_status(raw: str) -> str:
|
|
"""TransactionClearedStatus: cleared | uncleared | reconciled."""
|
|
m = raw.strip().lower()
|
|
if m == "reconciled":
|
|
return "reconciled"
|
|
if m == "uncleared":
|
|
return "uncleared"
|
|
return "cleared"
|
|
|
|
|
|
def transfer_target_name(payee: str) -> str | None:
|
|
"""
|
|
If payee is a transfer payee (\"Transfer : …\"), return the payee name with
|
|
the \"Transfer : \" prefix removed; otherwise None.
|
|
|
|
The remainder is typically the destination account display name as exported
|
|
by YNAB (e.g. matching another account's name).
|
|
"""
|
|
p = payee.strip()
|
|
prefix = "Transfer : "
|
|
if not p.startswith(prefix):
|
|
return None
|
|
rest = p[len(prefix) :].strip()
|
|
return rest if rest else None
|
|
|
|
|
|
def empty_category_template(
|
|
cid: str,
|
|
gid: str,
|
|
name: str,
|
|
hidden: bool,
|
|
budgeted: int,
|
|
activity: int,
|
|
balance: int,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"id": cid,
|
|
"category_group_id": gid,
|
|
"name": name,
|
|
"hidden": hidden,
|
|
"original_category_group_id": None,
|
|
"note": None,
|
|
"budgeted": budgeted,
|
|
"activity": activity,
|
|
"balance": balance,
|
|
"goal_type": None,
|
|
"goal_needs_whole_amount": None,
|
|
"goal_day": None,
|
|
"goal_cadence": None,
|
|
"goal_cadence_frequency": None,
|
|
"goal_creation_month": None,
|
|
"goal_target": 0,
|
|
"goal_target_month": None,
|
|
"goal_percentage_complete": None,
|
|
"goal_months_to_budget": None,
|
|
"goal_under_funded": None,
|
|
"goal_overall_funded": None,
|
|
"goal_overall_left": None,
|
|
"goal_snoozed_at": None,
|
|
"deleted": False,
|
|
}
|
|
|
|
|
|
def load_plan(path: str | Path) -> tuple[dict[tuple[str, str], str], dict[str, dict[tuple[str, str], dict[str, int]]]]:
|
|
by_month: dict[str, dict[tuple[str, str], dict[str, int]]] = defaultdict(
|
|
lambda: defaultdict(lambda: {"budgeted": 0, "activity": 0, "balance": 0})
|
|
)
|
|
month_for_key: dict[tuple[str, str], str] = {}
|
|
|
|
with open(path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
g = (row.get("Category Group") or "").strip()
|
|
c = (row.get("Category") or "").strip()
|
|
if not g and not c:
|
|
continue
|
|
mlabel = (row.get("Month") or "").strip()
|
|
month_iso = parse_month_label(mlabel)
|
|
if not month_iso:
|
|
continue
|
|
key = (g, c)
|
|
month_for_key[key] = month_iso
|
|
by_month[month_iso][key]["budgeted"] = parse_money(row.get("Assigned", ""))
|
|
by_month[month_iso][key]["activity"] = parse_money(row.get("Activity", ""))
|
|
by_month[month_iso][key]["balance"] = parse_money(row.get("Available", ""))
|
|
|
|
return month_for_key, dict(by_month)
|
|
|
|
|
|
def collect_category_keys_from_register(path: str | Path) -> set[tuple[str, str]]:
|
|
keys: set[tuple[str, str]] = set()
|
|
with open(path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
g = (row.get("Category Group") or "").strip()
|
|
c = (row.get("Category") or "").strip()
|
|
if g or c:
|
|
keys.add((g, c))
|
|
return keys
|
|
|
|
|
|
def _is_rta_row(row: dict[str, str]) -> bool:
|
|
return (
|
|
(row.get("Category Group") or "").strip() == _INFLOW_GROUP
|
|
and (row.get("Category") or "").strip() == _RTA_CATEGORY
|
|
)
|
|
|
|
|
|
def _register_row_amount(row: dict[str, str]) -> int:
|
|
return parse_money(row.get("Inflow", "")) - parse_money(row.get("Outflow", ""))
|
|
|
|
|
|
def _scan_register_metrics(path: str | Path) -> tuple[dict[str, int], dict[str, int]]:
|
|
"""
|
|
Per YNAB API MonthSummaryBase:
|
|
- income: total transaction amounts in 'Inflow: Ready to Assign' for the month
|
|
- activity_month: total transaction amounts excluding RTA inflow category
|
|
"""
|
|
income_by_ym: dict[str, int] = defaultdict(int)
|
|
activity_ex_rta_by_ym: dict[str, int] = defaultdict(int)
|
|
|
|
with open(path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
ds = (row.get("Date") or "").strip()
|
|
if len(ds) < 7:
|
|
continue
|
|
ym = ds[:7]
|
|
amt = _register_row_amount(row)
|
|
if _is_rta_row(row):
|
|
income_by_ym[ym] += amt
|
|
else:
|
|
activity_ex_rta_by_ym[ym] += amt
|
|
|
|
return dict(income_by_ym), dict(activity_ex_rta_by_ym)
|
|
|
|
|
|
def find_plan_and_register_csvs_in_zip(zip_path: Path) -> tuple[Path, Path]:
|
|
"""Locate Plan.csv and Register.csv inside an export zip (any folder)."""
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
names = [n for n in zf.namelist() if not n.endswith("/")]
|
|
|
|
def is_plan(n: str) -> bool:
|
|
return bool(re.search(r"Plan\.csv$", n, re.I))
|
|
|
|
def is_register(n: str) -> bool:
|
|
return bool(re.search(r"Register\.csv$", n, re.I))
|
|
|
|
plans = [n for n in names if is_plan(n)]
|
|
regs = [n for n in names if is_register(n)]
|
|
|
|
if not plans:
|
|
raise SystemExit(f"No *Plan.csv file found in {zip_path}")
|
|
if not regs:
|
|
raise SystemExit(f"No *Register.csv file found in {zip_path}")
|
|
|
|
if len(plans) == 1 and len(regs) == 1:
|
|
return Path(plans[0]), Path(regs[0])
|
|
|
|
def stem_base(path_str: str, kind: str) -> str:
|
|
base = Path(path_str).name
|
|
suf = f" - {kind}.csv"
|
|
if base.lower().endswith(suf.lower()):
|
|
return base[: -len(suf)].strip()
|
|
base = re.sub(r"(?i)\s*-\s*" + re.escape(kind) + r"\.csv\s*$", "", base)
|
|
return base.strip()
|
|
|
|
pb = {stem_base(p, "Plan"): p for p in plans}
|
|
rb = {stem_base(r, "Register"): r for r in regs}
|
|
common = set(pb.keys()) & set(rb.keys())
|
|
if len(common) == 1:
|
|
k = next(iter(common))
|
|
return Path(pb[k]), Path(rb[k])
|
|
|
|
raise SystemExit(
|
|
f"Could not pair Plan/Register CSVs in {zip_path}. Found plans={plans!r} registers={regs!r}"
|
|
)
|
|
|
|
|
|
def extract_zip(zip_path: Path) -> tuple[Path, Path, tempfile.TemporaryDirectory[str]]:
|
|
"""Extract zip to a temp directory; return (plan_path, register_path, tempdir)."""
|
|
tmp = tempfile.TemporaryDirectory(prefix="ynab_csv_export_")
|
|
root = Path(tmp.name)
|
|
plan_member, reg_member = find_plan_and_register_csvs_in_zip(zip_path)
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
zf.extractall(root)
|
|
plan_p = root / plan_member
|
|
reg_p = root / reg_member
|
|
if not plan_p.is_file():
|
|
raise SystemExit(f"Extracted Plan path missing: {plan_p}")
|
|
if not reg_p.is_file():
|
|
raise SystemExit(f"Extracted Register path missing: {reg_p}")
|
|
return plan_p, reg_p, tmp
|
|
|
|
|
|
def build_budget(
|
|
plan_path: str | Path,
|
|
register_path: str | Path,
|
|
budget_name: str,
|
|
) -> dict[str, Any]:
|
|
plan_path = Path(plan_path)
|
|
register_path = Path(register_path)
|
|
|
|
month_for_key, plan_by_month = load_plan(plan_path)
|
|
reg_cats = collect_category_keys_from_register(register_path)
|
|
|
|
all_keys: set[tuple[str, str]] = set(month_for_key.keys()) | reg_cats
|
|
all_keys = {(g, c) for g, c in all_keys if g and c}
|
|
|
|
group_names = sorted({g for g, _ in all_keys})
|
|
group_meta: dict[str, dict[str, Any]] = {}
|
|
for g in group_names:
|
|
gid = _uid("category_group", g)
|
|
group_meta[g] = {"id": gid, "name": g, "hidden": False, "deleted": False}
|
|
|
|
cat_ids: dict[tuple[str, str], str] = {}
|
|
for g, c in sorted(all_keys):
|
|
cat_ids[(g, c)] = _uid("category", g, c)
|
|
|
|
account_names: list[str] = []
|
|
seen_acc: set[str] = set()
|
|
with open(register_path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
name = (row.get("Account") or "").strip()
|
|
if name and name not in seen_acc:
|
|
seen_acc.add(name)
|
|
account_names.append(name)
|
|
|
|
accounts: list[dict[str, Any]] = []
|
|
account_by_name: dict[str, dict[str, Any]] = {}
|
|
for name in account_names:
|
|
aid = _uid("account", name)
|
|
tid = _uid("transfer_payee", name)
|
|
atype = guess_account_type(name)
|
|
ob = guess_on_budget(atype)
|
|
acct = {
|
|
"id": aid,
|
|
"name": name,
|
|
"type": atype,
|
|
"on_budget": ob,
|
|
"closed": False,
|
|
"note": None,
|
|
"balance": 0,
|
|
"cleared_balance": 0,
|
|
"uncleared_balance": 0,
|
|
"transfer_payee_id": tid,
|
|
"direct_import_linked": False,
|
|
"direct_import_in_error": False,
|
|
"last_reconciled_at": None,
|
|
"debt_original_balance": None,
|
|
"debt_interest_rates": {},
|
|
"debt_minimum_payments": {},
|
|
"debt_escrow_amounts": {},
|
|
"deleted": False,
|
|
}
|
|
accounts.append(acct)
|
|
account_by_name[name] = acct
|
|
|
|
payees: list[dict[str, Any]] = []
|
|
payee_by_name: dict[str, str] = {}
|
|
|
|
for name in account_names:
|
|
acct = account_by_name[name]
|
|
pid = acct["transfer_payee_id"]
|
|
payees.append(
|
|
{
|
|
"id": pid,
|
|
"name": f"Transfer : {name}",
|
|
"transfer_account_id": acct["id"],
|
|
"deleted": False,
|
|
}
|
|
)
|
|
payee_by_name[f"Transfer : {name}"] = pid
|
|
|
|
with open(register_path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
payee_names: set[str] = set()
|
|
for row in reader:
|
|
p = (row.get("Payee") or "").strip()
|
|
if not p:
|
|
continue
|
|
if transfer_target_name(p) is not None:
|
|
continue
|
|
payee_names.add(p)
|
|
|
|
for pname in sorted(payee_names):
|
|
pid = _uid("payee", pname)
|
|
payees.append({"id": pid, "name": pname, "transfer_account_id": None, "deleted": False})
|
|
payee_by_name[pname] = pid
|
|
|
|
all_months = sorted(plan_by_month.keys())
|
|
if not all_months:
|
|
first_m = last_m = datetime.now(UTC).strftime("%Y-%m-01")
|
|
else:
|
|
first_m = all_months[0]
|
|
last_m = all_months[-1]
|
|
|
|
budget_id = _uid("budget", budget_name)
|
|
|
|
last_plan = plan_by_month.get(last_m, {})
|
|
root_categories: list[dict[str, Any]] = []
|
|
for (g, c), cid in sorted(cat_ids.items(), key=lambda x: (x[0][0], x[0][1])):
|
|
gid = group_meta[g]["id"]
|
|
cat_hidden = g == "Hidden Categories"
|
|
nums = last_plan.get((g, c), {"budgeted": 0, "activity": 0, "balance": 0})
|
|
root_categories.append(
|
|
empty_category_template(
|
|
cid, gid, c, cat_hidden, nums["budgeted"], nums["activity"], nums["balance"]
|
|
)
|
|
)
|
|
|
|
income_by_month, activity_register_by_month = _scan_register_metrics(register_path)
|
|
|
|
months_out: list[dict[str, Any]] = []
|
|
for month_iso in sorted(plan_by_month.keys(), reverse=True):
|
|
pdata = plan_by_month[month_iso]
|
|
cats_layer: list[dict[str, Any]] = []
|
|
sum_budgeted = 0
|
|
for (g, c), cid in sorted(cat_ids.items(), key=lambda x: (x[0][0], x[0][1])):
|
|
gid = group_meta[g]["id"]
|
|
cat_hidden = g == "Hidden Categories"
|
|
nums = pdata.get((g, c), {"budgeted": 0, "activity": 0, "balance": 0})
|
|
sum_budgeted += nums["budgeted"]
|
|
cats_layer.append(
|
|
empty_category_template(
|
|
cid, gid, c, cat_hidden, nums["budgeted"], nums["activity"], nums["balance"]
|
|
)
|
|
)
|
|
|
|
ym = month_iso[:7]
|
|
income_mu = income_by_month.get(ym, 0)
|
|
# API: month.activity = transaction totals excluding RTA inflow (not Plan rollups).
|
|
activity_month = activity_register_by_month.get(ym, 0)
|
|
|
|
months_out.append(
|
|
{
|
|
"month": month_iso,
|
|
"note": "",
|
|
"income": income_mu,
|
|
"budgeted": sum_budgeted,
|
|
"activity": activity_month,
|
|
"to_be_budgeted": 0,
|
|
"age_of_money": None,
|
|
"deleted": False,
|
|
"categories": cats_layer,
|
|
}
|
|
)
|
|
|
|
raw_rows: list[dict[str, Any]] = []
|
|
with open(register_path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
for i, row in enumerate(reader):
|
|
raw_rows.append({"row_index": i, **row})
|
|
|
|
pending: dict[tuple[Any, ...], list[dict[str, Any]]] = defaultdict(list)
|
|
for row in raw_rows:
|
|
acc = (row.get("Account") or "").strip()
|
|
payee = (row.get("Payee") or "").strip()
|
|
tgt = transfer_target_name(payee)
|
|
if tgt is None:
|
|
continue
|
|
amt = parse_money(row.get("Outflow", "")) - parse_money(row.get("Inflow", ""))
|
|
if amt == 0:
|
|
continue
|
|
d = (row.get("Date") or "").strip()
|
|
key = (d, abs(amt), tuple(sorted([acc, tgt])))
|
|
pending[key].append(row)
|
|
|
|
pair_id: dict[int, tuple[str, str]] = {}
|
|
for key, rows in pending.items():
|
|
if len(rows) != 2:
|
|
continue
|
|
r1, r2 = rows
|
|
a1 = (r1.get("Account") or "").strip()
|
|
a2 = (r2.get("Account") or "").strip()
|
|
t1 = transfer_target_name((r1.get("Payee") or "").strip())
|
|
t2 = transfer_target_name((r2.get("Payee") or "").strip())
|
|
if t1 == a2 and t2 == a1:
|
|
base = _uid("xfer", key[0], str(key[1]), key[2][0], key[2][1])
|
|
d = key[0]
|
|
id_a = f"{base}_{d}"
|
|
id_b = f"{base}_t_{d}"
|
|
amt1 = parse_money(r1.get("Outflow", "")) - parse_money(r1.get("Inflow", ""))
|
|
if amt1 < 0:
|
|
pair_id[r1["row_index"]] = (id_a, id_b)
|
|
pair_id[r2["row_index"]] = (id_b, id_a)
|
|
else:
|
|
pair_id[r1["row_index"]] = (id_b, id_a)
|
|
pair_id[r2["row_index"]] = (id_a, id_b)
|
|
|
|
transactions: list[dict[str, Any]] = []
|
|
for row in raw_rows:
|
|
acc_name = (row.get("Account") or "").strip()
|
|
acct = account_by_name.get(acc_name)
|
|
if not acct:
|
|
continue
|
|
outf = parse_money(row.get("Outflow", ""))
|
|
inf = parse_money(row.get("Inflow", ""))
|
|
amount = inf - outf
|
|
payee = (row.get("Payee") or "").strip()
|
|
memo = (row.get("Memo") or "").strip() or None
|
|
date_s = (row.get("Date") or "").strip()
|
|
|
|
cat_id: str | None = None
|
|
g = (row.get("Category Group") or "").strip()
|
|
c = (row.get("Category") or "").strip()
|
|
if g and c and (g, c) in cat_ids:
|
|
cat_id = cat_ids[(g, c)]
|
|
|
|
xfer_account: str | None = None
|
|
payee_id: str | None = None
|
|
tt = transfer_target_name(payee)
|
|
if tt is not None:
|
|
target = account_by_name.get(tt)
|
|
if target:
|
|
payee_id = target["transfer_payee_id"]
|
|
xfer_account = target["id"]
|
|
else:
|
|
pname = payee
|
|
payee_id = payee_by_name.get(pname) or _uid("payee", pname)
|
|
if pname not in payee_by_name:
|
|
payees.append(
|
|
{"id": payee_id, "name": pname, "transfer_account_id": None, "deleted": False}
|
|
)
|
|
payee_by_name[pname] = payee_id
|
|
else:
|
|
payee_id = payee_by_name.get(payee)
|
|
if not payee_id:
|
|
payee_id = _uid("payee", payee)
|
|
payees.append({"id": payee_id, "name": payee, "transfer_account_id": None, "deleted": False})
|
|
payee_by_name[payee] = payee_id
|
|
|
|
if tt is not None:
|
|
cat_id = None
|
|
|
|
transfer_transaction_id: str | None = None
|
|
tx_id: str
|
|
if row["row_index"] in pair_id:
|
|
my_id, other_id = pair_id[row["row_index"]]
|
|
tx_id = my_id
|
|
transfer_transaction_id = other_id
|
|
else:
|
|
tx_id = f"{_uid('tx', acc_name, date_s, str(amount), str(row['row_index']))}_{date_s}"
|
|
|
|
flag = (row.get("Flag") or "").strip().lower() or None
|
|
if flag not in {"red", "orange", "yellow", "green", "blue", "purple"}:
|
|
flag = None
|
|
|
|
transactions.append(
|
|
{
|
|
"id": tx_id,
|
|
"date": date_s,
|
|
"amount": amount,
|
|
"memo": memo,
|
|
"cleared": cleared_status(row.get("Cleared", "")),
|
|
"approved": True,
|
|
"flag_color": flag,
|
|
"account_id": acct["id"],
|
|
"payee_id": payee_id,
|
|
"category_id": cat_id,
|
|
"transfer_account_id": xfer_account,
|
|
"transfer_transaction_id": transfer_transaction_id,
|
|
"matched_transaction_id": None,
|
|
"import_id": None,
|
|
"import_payee_name": None,
|
|
"import_payee_name_original": None,
|
|
"debt_transaction_type": None,
|
|
"deleted": False,
|
|
}
|
|
)
|
|
|
|
transactions.sort(key=lambda t: (t["date"], t["id"]), reverse=True)
|
|
|
|
now = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
budget: dict[str, Any] = {
|
|
"id": budget_id,
|
|
"name": budget_name,
|
|
"last_modified_on": now,
|
|
"date_format": {"format": "YYYY-MM-DD"},
|
|
"currency_format": {
|
|
"iso_code": "USD",
|
|
"example_format": "123,456.78",
|
|
"decimal_digits": 2,
|
|
"decimal_separator": ".",
|
|
"symbol_first": True,
|
|
"group_separator": ",",
|
|
"currency_symbol": "$",
|
|
"display_symbol": True,
|
|
},
|
|
"first_month": first_m,
|
|
"last_month": last_m,
|
|
"accounts": accounts,
|
|
"payees": payees,
|
|
"payee_locations": [],
|
|
"category_groups": [group_meta[g] for g in group_names],
|
|
"categories": root_categories,
|
|
"months": months_out,
|
|
"transactions": transactions,
|
|
"subtransactions": [],
|
|
"scheduled_transactions": [],
|
|
"scheduled_subtransactions": [],
|
|
}
|
|
|
|
return {"data": {"budget": budget, "server_knowledge": 0}}
|
|
|
|
|
|
def _budget_name_from_plan_path(plan_path: Path) -> str:
|
|
base = plan_path.name
|
|
name = re.sub(r"\s+as of\s+.*$", "", base, flags=re.I)
|
|
name = re.sub(r"\s*-\s*Plan\.csv$", "", name, flags=re.I).strip()
|
|
return name or "Budget"
|
|
|
|
|
|
def default_output_path(*, plan_path: Path | None, zip_path: Path | None) -> Path:
|
|
"""
|
|
Default JSON path beside the input: same directory and basename pattern as
|
|
the zip, or the Plan/Register CSV with ' - Plan.csv' / ' - Register.csv' removed.
|
|
"""
|
|
if zip_path is not None:
|
|
return zip_path.with_suffix(".json")
|
|
if plan_path is None:
|
|
raise ValueError("default_output_path requires zip_path or plan_path")
|
|
stem = plan_path.name
|
|
stem = re.sub(r"(?i)\s*-\s*Plan\.csv\s*$", "", stem)
|
|
stem = re.sub(r"(?i)\s*-\s*Register\.csv\s*$", "", stem).strip()
|
|
if not stem:
|
|
stem = plan_path.stem
|
|
return plan_path.with_name(f"{stem}.json")
|
|
|
|
|
|
def _budget_name_from_zip_path(zip_path: Path) -> str:
|
|
base = zip_path.stem
|
|
base = re.sub(r"^\s*YNAB\s+Export\s*-\s*", "", base, flags=re.I).strip()
|
|
base = re.sub(r"\s+as of\s+.*$", "", base, flags=re.I).strip()
|
|
return base or "Budget"
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(
|
|
description="Convert YNAB Plan.csv + Register.csv to JSON aligned with the YNAB API plan export shape."
|
|
)
|
|
ap.add_argument(
|
|
"-z",
|
|
"--zip",
|
|
type=Path,
|
|
help="Zip export containing *Plan.csv and *Register.csv (e.g. YNAB Export - ….zip)",
|
|
)
|
|
ap.add_argument(
|
|
"-p",
|
|
"--plan",
|
|
type=Path,
|
|
help="Path to Plan.csv (requires --register; mutually exclusive with --zip)",
|
|
)
|
|
ap.add_argument(
|
|
"-r",
|
|
"--register",
|
|
type=Path,
|
|
help="Path to Register.csv (requires --plan; mutually exclusive with --zip)",
|
|
)
|
|
ap.add_argument(
|
|
"-o",
|
|
"--output",
|
|
type=Path,
|
|
default=None,
|
|
help="Output .json path (default: derived from Plan filename or zip name)",
|
|
)
|
|
ap.add_argument(
|
|
"-n",
|
|
"--budget-name",
|
|
default=None,
|
|
help="Budget name in JSON (default: derived from Plan filename or zip name)",
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
use_zip = args.zip is not None
|
|
use_csv = args.plan is not None or args.register is not None
|
|
if use_zip and use_csv:
|
|
ap.error("Use either --zip or --plan/--register together, not both.")
|
|
if not use_zip and not (args.plan is not None and args.register is not None):
|
|
ap.error("Provide --zip, or both --plan and --register.")
|
|
|
|
out_path = args.output
|
|
if out_path is None:
|
|
out_path = default_output_path(
|
|
plan_path=args.plan if not use_zip else None,
|
|
zip_path=args.zip if use_zip else None,
|
|
)
|
|
|
|
if use_zip:
|
|
if not args.zip.is_file():
|
|
ap.error(f"Not a file: {args.zip}")
|
|
plan_path, reg_path, tmp = extract_zip(args.zip)
|
|
try:
|
|
name = args.budget_name or _budget_name_from_zip_path(args.zip)
|
|
out = build_budget(plan_path, reg_path, name)
|
|
finally:
|
|
tmp.cleanup()
|
|
else:
|
|
if not args.plan.is_file():
|
|
ap.error(f"Not a file: {args.plan}")
|
|
if not args.register.is_file():
|
|
ap.error(f"Not a file: {args.register}")
|
|
name = args.budget_name or _budget_name_from_plan_path(args.plan)
|
|
out = build_budget(args.plan, args.register, name)
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(out_path, "w", encoding="utf-8") as f:
|
|
json.dump(out, f, indent=2, ensure_ascii=False)
|
|
f.write("\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|