Created
October 7, 2025 14:33
-
-
Save matthewdavis/3ee92e700059e6f589dba4802ed14a45 to your computer and use it in GitHub Desktop.
python script to compare crew banking statements against a csv exported from monarch money
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """CLI for reconciling Crew bank statements against a Monarch Money ledger CSV.""" | |
| import argparse | |
| import csv | |
| import os | |
| import re | |
| import subprocess | |
| from collections import defaultdict | |
| from dataclasses import dataclass, replace | |
| from datetime import datetime, date | |
| from typing import DefaultDict, Dict, List, Optional, Tuple | |
| DATE_RE = re.compile(r"^\s*(\d{4}-\d{2}-\d{2})\b") | |
| RIGHT_ALIGNED_AMOUNT_RE = re.compile(r"\$?([0-9]{1,3}(?:,[0-9]{3})*|[0-9]+)\.[0-9]{2}") | |
| def normalize_vendor(value: str) -> str: | |
| """Normalize merchant-like values for fuzzy matching.""" | |
| cleaned = re.sub(r"[^\w\s]", " ", value.upper()) | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| return cleaned | |
| def lookup_hint(text: str, hints: Dict[str, str]) -> Optional[str]: | |
| """Return a best-effort match from normalized hints based on containment.""" | |
| norm = normalize_vendor(text) | |
| if not norm: | |
| return None | |
| best_key: Optional[str] = None | |
| best_len = -1 | |
| for key in hints.keys(): | |
| if not key: | |
| continue | |
| if key in norm or norm in key: | |
| if len(key) > best_len: | |
| best_len = len(key) | |
| best_key = key | |
| if best_key is None: | |
| return None | |
| return hints[best_key] | |
| @dataclass | |
| class Columns: | |
| date_start: int | |
| desc_start: int | |
| deposits_start: int | |
| withdrawals_start: int | |
| balance_start: int | |
| def slices(self, line: str) -> Tuple[str, str, str, str, str]: | |
| date_slice = line[self.date_start:self.desc_start].strip() | |
| desc_slice = line[self.desc_start:self.deposits_start].rstrip() | |
| dep_start = max(self.deposits_start - 4, self.desc_start) | |
| wdr_start = max(self.withdrawals_start - 4, self.deposits_start) | |
| deposits_slice = line[dep_start:self.withdrawals_start].strip() | |
| withdrawals_slice = line[wdr_start:self.balance_start].strip() | |
| balance_slice = line[self.withdrawals_start:].strip() | |
| return date_slice, desc_slice, deposits_slice, withdrawals_slice, balance_slice | |
| @dataclass | |
| class Txn: | |
| date: str | |
| amount: float | |
| desc: str | |
| extra: dict | |
| matched: bool = False | |
| def parse_date(d: str) -> date: | |
| return datetime.strptime(d, "%Y-%m-%d").date() | |
| def month_key(d: str) -> str: | |
| return d[:7] | |
| def build_month_filter(month: Optional[str], months: Optional[str]) -> Optional[set[str]]: | |
| if not month and not months: | |
| return None | |
| out: set[str] = set() | |
| if month: | |
| out.add(month.strip()) | |
| if months: | |
| for m in months.split(','): | |
| m = m.strip() | |
| if m: | |
| out.add(m) | |
| return out if out else None | |
| def in_date_window( | |
| d: str, | |
| months: Optional[set[str]] = None, | |
| start: Optional[str] = None, | |
| end: Optional[str] = None, | |
| ) -> bool: | |
| if (months is None or len(months) == 0) and start is None and end is None: | |
| return True | |
| day = parse_date(d) | |
| if months is not None and len(months) > 0 and month_key(d) not in months: | |
| return False | |
| if start is not None and day < parse_date(start): | |
| return False | |
| if end is not None and day > parse_date(end): | |
| return False | |
| return True | |
| def pdftotext_layout(pdf_path: str) -> List[str]: | |
| res = subprocess.run( | |
| ["pdftotext", "-layout", pdf_path, "-"], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| return res.stdout.splitlines() | |
| def extract_columns(header_line: str) -> Optional[Columns]: | |
| try: | |
| date_start = header_line.index("Date") | |
| desc_start = header_line.index("Description") | |
| deposits_start = header_line.index("Deposits") | |
| withdrawals_start = header_line.index("Withdrawals") | |
| balance_start = header_line.index("Balance") | |
| except ValueError: | |
| return None | |
| return Columns(date_start, desc_start, deposits_start, withdrawals_start, balance_start) | |
| def parse_right_aligned_amount(s: str) -> Optional[float]: | |
| matches = list(RIGHT_ALIGNED_AMOUNT_RE.finditer(s)) | |
| if not matches: | |
| return None | |
| num = matches[-1].group(0).lstrip('$').replace(',', '') | |
| try: | |
| return float(num) | |
| except ValueError: | |
| return None | |
| def clean_spaces(s: str) -> str: | |
| return re.sub(r"\s+", " ", s).strip() | |
| def extract_transactions_from_lines(lines: List[str], source_pdf: str) -> List[dict]: | |
| in_transactions = False | |
| cols: Optional[Columns] = None | |
| rows: List[dict] = [] | |
| current: Optional[dict] = None | |
| for raw in lines: | |
| line = raw.rstrip("\n") | |
| if not in_transactions: | |
| if line.strip() == "Transactions": | |
| in_transactions = True | |
| continue | |
| if "Date" in line and "Description" in line and "Balance" in line: | |
| maybe = extract_columns(line) | |
| if maybe: | |
| cols = maybe | |
| continue | |
| if line.strip().startswith("In case of errors"): | |
| break | |
| if "Account offered by" in line and "Member FDIC" in line: | |
| continue | |
| if "— Account #" in line or "Account Statement" in line: | |
| continue | |
| if cols is None: | |
| continue | |
| date_match = DATE_RE.match(line) | |
| if date_match: | |
| if current: | |
| rows.append(current) | |
| current = None | |
| date_str, desc, deposits, withdrawals, _ = cols.slices(line) | |
| margin = 8 | |
| dep_l = max(cols.deposits_start - margin, cols.desc_start) | |
| wdr_l = max(cols.withdrawals_start - margin, cols.deposits_start) | |
| deposits_tight = line[dep_l:cols.withdrawals_start] | |
| withdrawals_tight = line[wdr_l:cols.balance_start] | |
| dep_amt = parse_right_aligned_amount(deposits_tight) | |
| wdr_amt = parse_right_aligned_amount(withdrawals_tight) | |
| is_deposit = dep_amt is not None | |
| is_withdrawal = wdr_amt is not None | |
| amount: Optional[float] = None | |
| tx_type: Optional[str] = None | |
| if is_deposit and not is_withdrawal: | |
| amount = dep_amt | |
| tx_type = "deposit" | |
| elif is_withdrawal and not is_deposit: | |
| amount = -abs(wdr_amt) | |
| tx_type = "withdrawal" | |
| elif is_deposit and is_withdrawal: | |
| dep_v = dep_amt if dep_amt is not None else 0.0 | |
| wdr_v = wdr_amt if wdr_amt is not None else 0.0 | |
| if abs(dep_v) >= abs(wdr_v): | |
| amount = dep_v | |
| tx_type = "deposit" | |
| else: | |
| amount = -abs(wdr_v) | |
| tx_type = "withdrawal" | |
| else: | |
| if deposits_tight.strip().startswith("$"): | |
| amount = 0.0 | |
| tx_type = "deposit" | |
| desc_clean = clean_spaces(desc) | |
| desc_clean = re.sub(r"\s*\$\d?$\s*$", "", desc_clean) | |
| current = { | |
| "date": date_str.strip(), | |
| "description": desc_clean, | |
| "amount": amount, | |
| "type": tx_type, | |
| "balance": None, | |
| "source_pdf": os.path.basename(source_pdf), | |
| } | |
| continue | |
| if current is not None: | |
| padded = line + " " * max(0, cols.balance_start - len(line)) | |
| _, desc_cont, _, _, _ = cols.slices(padded) | |
| desc_cont = clean_spaces(desc_cont) | |
| if desc_cont: | |
| current["description"] = clean_spaces(f"{current['description']} {desc_cont}") | |
| continue | |
| if current: | |
| rows.append(current) | |
| return [r for r in rows if r.get("amount") is not None] | |
| def extract_pdf(pdf_path: str) -> List[dict]: | |
| lines = pdftotext_layout(pdf_path) | |
| return extract_transactions_from_lines(lines, pdf_path) | |
| def read_extracted(path: str) -> List[Txn]: | |
| out: List[Txn] = [] | |
| with open(path, newline="") as f: | |
| for row in csv.DictReader(f): | |
| date_val = row["date"].strip() | |
| amount = float(row["amount"]) | |
| desc = row.get("description", "").strip() | |
| out.append(Txn(date=date_val, amount=amount, desc=desc, extra=row)) | |
| return out | |
| def read_user_csv(path: str) -> List[Txn]: | |
| out: List[Txn] = [] | |
| with open(path, newline="") as f: | |
| for row in csv.DictReader(f): | |
| date_val = row["Date"].strip() | |
| amount = float(row["Amount"]) | |
| desc = (row.get("Original Statement") or row.get("Merchant") or "").strip() | |
| out.append(Txn(date=date_val, amount=amount, desc=desc, extra=row)) | |
| return out | |
| def build_merchant_category_hints(ledger: List[Txn]) -> Tuple[Dict[str, str], Dict[str, str]]: | |
| merchant_hints: Dict[str, str] = {} | |
| category_counts: Dict[str, DefaultDict[str, int]] = {} | |
| for txn in ledger: | |
| merchant = txn.extra.get("Merchant", "").strip() | |
| original = txn.extra.get("Original Statement", "").strip() | |
| category = txn.extra.get("Category", "").strip() | |
| for source in (merchant, original): | |
| if not source: | |
| continue | |
| key = normalize_vendor(source) | |
| if not key: | |
| continue | |
| canonical = merchant or source | |
| merchant_hints.setdefault(key, canonical) | |
| if category: | |
| category_counts.setdefault(key, defaultdict(int))[category] += 1 | |
| category_hints: Dict[str, str] = {} | |
| for key, counts in category_counts.items(): | |
| if counts: | |
| category_hints[key] = max(counts.items(), key=lambda kv: kv[1])[0] | |
| return merchant_hints, category_hints | |
| def reconcile_fuzzy( | |
| statements: List[Txn], | |
| ledger: List[Txn], | |
| date_window_days: int = 2, | |
| amount_tolerance: float = 0.01, | |
| ) -> Tuple[List[Tuple[Txn, Txn, int, float]], List[Txn], List[Txn]]: | |
| unmatched_ledger: List[Txn] = list(ledger) | |
| matched: List[Tuple[Txn, Txn, int, float]] = [] | |
| for stmt_txn in statements: | |
| stmt_date = parse_date(stmt_txn.date) | |
| stmt_amt = stmt_txn.amount | |
| best: Optional[Tuple[int, float, int]] = None | |
| best_idx: Optional[int] = None | |
| for idx, ledger_txn in enumerate(unmatched_ledger): | |
| if ledger_txn.matched: | |
| continue | |
| if (stmt_amt >= 0) != (ledger_txn.amount >= 0): | |
| continue | |
| amt_diff = abs(stmt_amt - ledger_txn.amount) | |
| if amt_diff > amount_tolerance: | |
| continue | |
| days = abs((parse_date(ledger_txn.date) - stmt_date).days) | |
| if days > date_window_days: | |
| continue | |
| key = (days, amt_diff, idx) | |
| if best is None or key < best: | |
| best = key | |
| best_idx = idx | |
| if best_idx is not None and best is not None: | |
| ledger_txn = unmatched_ledger[best_idx] | |
| ledger_txn.matched = True | |
| stmt_txn.matched = True | |
| matched.append((stmt_txn, ledger_txn, best[0], best[1])) | |
| unmatched_statements = [t for t in statements if not t.matched] | |
| unmatched_ledger = [t for t in unmatched_ledger if not t.matched] | |
| return matched, unmatched_statements, unmatched_ledger | |
| def write_report_detailed( | |
| matched: List[Tuple[Txn, Txn, int, float]], | |
| unmatched_statements: List[Txn], | |
| unmatched_ledger: List[Txn], | |
| out_dir: str, | |
| filename: str = "report.csv", | |
| ) -> str: | |
| os.makedirs(out_dir, exist_ok=True) | |
| out_path = os.path.join(out_dir, filename) | |
| with open(out_path, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| "status", | |
| "s_date", | |
| "s_amount", | |
| "s_description", | |
| "s_source_pdf", | |
| "c_date", | |
| "c_amount", | |
| "c_merchant", | |
| "c_category", | |
| "c_original_statement", | |
| "c_account", | |
| "c_notes", | |
| "c_tags", | |
| "date_delta_days", | |
| "amount_diff", | |
| ]) | |
| for stmt_txn, ledger_txn, days, amt_diff in matched: | |
| writer.writerow([ | |
| "matched", | |
| stmt_txn.date, | |
| f"{stmt_txn.amount:.2f}", | |
| stmt_txn.desc, | |
| stmt_txn.extra.get("source_pdf", ""), | |
| ledger_txn.date, | |
| f"{ledger_txn.amount:.2f}", | |
| ledger_txn.extra.get("Merchant", ""), | |
| ledger_txn.extra.get("Category", ""), | |
| ledger_txn.extra.get("Original Statement", ""), | |
| ledger_txn.extra.get("Account", ""), | |
| ledger_txn.extra.get("Notes", ""), | |
| ledger_txn.extra.get("Tags", ""), | |
| days, | |
| f"{amt_diff:.2f}", | |
| ]) | |
| for stmt_txn in unmatched_statements: | |
| writer.writerow([ | |
| "unmatched_statement", | |
| stmt_txn.date, | |
| f"{stmt_txn.amount:.2f}", | |
| stmt_txn.desc, | |
| stmt_txn.extra.get("source_pdf", ""), | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| ]) | |
| for ledger_txn in unmatched_ledger: | |
| writer.writerow([ | |
| "unmatched_csv", | |
| "", | |
| "", | |
| "", | |
| "", | |
| ledger_txn.date, | |
| f"{ledger_txn.amount:.2f}", | |
| ledger_txn.extra.get("Merchant", ""), | |
| ledger_txn.extra.get("Category", ""), | |
| ledger_txn.extra.get("Original Statement", ""), | |
| ledger_txn.extra.get("Account", ""), | |
| ledger_txn.extra.get("Notes", ""), | |
| ledger_txn.extra.get("Tags", ""), | |
| "", | |
| "", | |
| ]) | |
| return out_path | |
| def guess_merchant(desc: str, merchant_hints: Dict[str, str]) -> str: | |
| s = desc.strip() | |
| if not s: | |
| return "" | |
| hint = lookup_hint(s, merchant_hints) | |
| if hint: | |
| return hint | |
| ach_match = re.match(r"^INCOMING ACH\s+(?:CREDIT|DEBIT|DEPOSIT)\s+([^\[,]+)", s, re.IGNORECASE) | |
| if ach_match: | |
| ach_val = ach_match.group(1).strip() | |
| return lookup_hint(ach_val, merchant_hints) or ach_val | |
| transfer_match = re.match(r"^(?:DEPOSIT|WITHDRAWAL) for book transfer (?:from|to)\s+([^,]+)", s, re.IGNORECASE) | |
| if transfer_match: | |
| transfer_val = transfer_match.group(1).strip() | |
| return lookup_hint(transfer_val, merchant_hints) or transfer_val | |
| if ' - ' in s: | |
| left = s.split(' - ', 1)[0] | |
| else: | |
| left = s | |
| parts = left.split() | |
| while parts and parts[-1].isdigit() and len(parts[-1]) >= 6: | |
| parts.pop() | |
| left = ' '.join(parts).strip() | |
| return lookup_hint(left, merchant_hints) or left | |
| def guess_category(merchant: str, desc: str, category_hints: Dict[str, str]) -> str: | |
| for text in (merchant, desc): | |
| hint = lookup_hint(text, category_hints) | |
| if hint: | |
| return hint | |
| s = desc.upper() | |
| m = merchant.upper() | |
| transfer_keywords = ['TRANSFER', 'ACH', 'WIRE', 'ZELLE', 'VENMO', 'CASH APP'] | |
| if any(word in s for word in transfer_keywords) or any(word in m for word in transfer_keywords): | |
| return 'Transfer' | |
| payroll_keywords = ['PAYROLL', 'PAYCHECK', 'SALARY', 'DIRECT DEPOSIT'] | |
| if any(word in s for word in payroll_keywords) or any(word in m for word in payroll_keywords): | |
| return 'Paychecks' | |
| card_payment_keywords = ['CARDMEMBER', 'CREDIT CARD PAYMENT', 'CARD PAYMENT', 'AMERICAN EXPRESS', 'AMEX', 'VISA PAYMENT'] | |
| if any(word in s for word in card_payment_keywords) or any(word in m for word in card_payment_keywords): | |
| return 'Credit Card Payment' | |
| utility_keywords = ['UTILITY', 'UTILITIES', 'ENERGY', 'ELECTRIC', 'POWER', 'GAS', 'WATER', 'SEWER'] | |
| if any(word in s for word in utility_keywords) or any(word in m for word in utility_keywords): | |
| return 'Utilities' | |
| phone_keywords = ['PHONE', 'CELL', 'MOBILE', 'WIRELESS'] | |
| if any(word in s for word in phone_keywords) or any(word in m for word in phone_keywords): | |
| return 'Phone' | |
| internet_keywords = ['INTERNET', 'STREAM', 'CABLE', 'BROADBAND'] | |
| if any(word in s for word in internet_keywords) or any(word in m for word in internet_keywords): | |
| return 'TV & Internet' | |
| insurance_keywords = ['INSURANCE', 'PREMIUM'] | |
| if any(word in s for word in insurance_keywords) or any(word in m for word in insurance_keywords): | |
| return 'Insurance' | |
| if 'MORTGAGE' in s or 'MORTGAGE' in m: | |
| return 'Mortgage' | |
| if 'CHECK' in s or 'CHECK' in m: | |
| return 'Check' | |
| subscription_keywords = ['SUBSCRIPT', 'MEMBERSHIP'] | |
| if any(word in s for word in subscription_keywords) or any(word in m for word in subscription_keywords): | |
| return 'Subscriptions' | |
| hosting_keywords = ['HOSTING', 'SERVER', 'VPS'] | |
| if any(word in s for word in hosting_keywords) or any(word in m for word in hosting_keywords): | |
| return 'Web Hosting' | |
| entertainment_keywords = ['ENTERTAINMENT', 'THEATER', 'CINEMA', 'GAME', 'GAMING'] | |
| if any(word in s for word in entertainment_keywords) or any(word in m for word in entertainment_keywords): | |
| return 'Entertainment & Recreation' | |
| retail_keywords = ['STORE', 'MARKET', 'SHOP', 'MART', 'GROCERY'] | |
| if any(word in s for word in retail_keywords) or any(word in m for word in retail_keywords): | |
| return 'Shopping' | |
| donation_keywords = ['DONATION', 'CHURCH', 'TITHE'] | |
| if any(word in s for word in donation_keywords) or any(word in m for word in donation_keywords): | |
| return 'Donations' | |
| return '' | |
| def write_unmatched_statements_csv( | |
| rows: List[Txn], | |
| path: str, | |
| merchant_hints: Dict[str, str], | |
| category_hints: Dict[str, str], | |
| ) -> str: | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| fieldnames = [ | |
| "Date", | |
| "Merchant", | |
| "Category", | |
| "Account", | |
| "Original Statement", | |
| "Notes", | |
| "Amount", | |
| "Tags", | |
| ] | |
| with open(path, "w", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for txn in rows: | |
| merchant = guess_merchant(txn.desc, merchant_hints) | |
| category = guess_category(merchant, txn.desc, category_hints) | |
| writer.writerow({ | |
| "Date": txn.date, | |
| "Merchant": merchant, | |
| "Category": category, | |
| "Account": "Bank Account", | |
| "Original Statement": txn.desc, | |
| "Notes": "", | |
| "Amount": f"{txn.amount:.2f}", | |
| "Tags": "", | |
| }) | |
| return path | |
| def cmd_reconcile(args: argparse.Namespace) -> None: | |
| selected_months = build_month_filter(args.month, args.months) | |
| def load_statements_from_inputs(inputs: List[str]) -> List[Txn]: | |
| txns: List[Txn] = [] | |
| for inp in inputs: | |
| if os.path.isdir(inp): | |
| for name in sorted(os.listdir(inp)): | |
| path = os.path.join(inp, name) | |
| if name.lower().endswith('.csv'): | |
| txns.extend(read_extracted(path)) | |
| elif name.lower().endswith('.pdf'): | |
| rows = extract_pdf(path) | |
| for row in rows: | |
| txns.append(Txn(date=row['date'], amount=float(row['amount']), desc=row.get('description', ''), extra=row)) | |
| elif inp.lower().endswith('.csv'): | |
| txns.extend(read_extracted(inp)) | |
| elif inp.lower().endswith('.pdf'): | |
| rows = extract_pdf(inp) | |
| for row in rows: | |
| txns.append(Txn(date=row['date'], amount=float(row['amount']), desc=row.get('description', ''), extra=row)) | |
| else: | |
| raise SystemExit(f"Unsupported --statements input: {inp}") | |
| return [t for t in txns if in_date_window(t.date, selected_months, args.start, args.end)] | |
| statements = load_statements_from_inputs(args.statements) | |
| ledger_all = read_user_csv(args.ledger) | |
| merchant_hints, category_hints = build_merchant_category_hints(ledger_all) | |
| statements = [t for t in statements if in_date_window(t.date, selected_months, args.start, args.end)] | |
| ledger = [t for t in ledger_all if in_date_window(t.date, selected_months, args.start, args.end)] | |
| matched, unmatched_statements, unmatched_ledger = reconcile_fuzzy( | |
| statements, | |
| ledger, | |
| date_window_days=args.date_window_days, | |
| amount_tolerance=args.amount_tolerance, | |
| ) | |
| report_path = write_report_detailed(matched, unmatched_statements, unmatched_ledger, args.out_dir, filename="report.csv") | |
| print(f"Matched: {len(matched)}") | |
| print(f"Unmatched in statements: {len(unmatched_statements)}") | |
| print(f"Unmatched in csv: {len(unmatched_ledger)}") | |
| print(f"Report -> {report_path}") | |
| unmatched_export = write_unmatched_statements_csv( | |
| unmatched_statements, | |
| os.path.join(args.out_dir, "unmatched_statements.csv"), | |
| merchant_hints, | |
| category_hints, | |
| ) | |
| print(f"Unmatched statements (Monarch import) -> {unmatched_export}") | |
| if args.per_month: | |
| by_month_statements: Dict[str, List[Txn]] = defaultdict(list) | |
| by_month_ledger: Dict[str, List[Txn]] = defaultdict(list) | |
| for txn in statements: | |
| by_month_statements[month_key(txn.date)].append(replace(txn)) | |
| for txn in ledger: | |
| by_month_ledger[month_key(txn.date)].append(replace(txn)) | |
| months = sorted(set(by_month_statements.keys()) | set(by_month_ledger.keys())) | |
| if selected_months: | |
| months = [m for m in months if m in selected_months] | |
| summary_path = os.path.join(args.out_dir, "monthly_summary.csv") | |
| with open(summary_path, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["month", "matched", "unmatched_statement", "unmatched_csv", "report_path"]) | |
| for month in months: | |
| stmts = by_month_statements.get(month, []) | |
| ledg = by_month_ledger.get(month, []) | |
| matched_m, um_s_m, um_l_m = reconcile_fuzzy( | |
| stmts, | |
| ledg, | |
| args.date_window_days, | |
| args.amount_tolerance, | |
| ) | |
| month_dir = os.path.join(args.out_dir, month) | |
| month_report = write_report_detailed(matched_m, um_s_m, um_l_m, month_dir, filename=f"{month}-report.csv") | |
| writer.writerow([month, len(matched_m), len(um_s_m), len(um_l_m), month_report]) | |
| write_unmatched_statements_csv( | |
| um_s_m, | |
| os.path.join(month_dir, f"{month}-unmatched_statements.csv"), | |
| merchant_hints, | |
| category_hints, | |
| ) | |
| print(f"Monthly summary -> {summary_path}") | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser( | |
| description="Reconcile Crew statement PDFs (and extracted CSVs) against a ledger CSV.", | |
| ) | |
| parser.add_argument( | |
| "--statements", | |
| required=True, | |
| nargs='+', | |
| help="Path(s) to extracted CSVs, statement PDFs, or directories containing them.", | |
| ) | |
| parser.add_argument( | |
| "--csv", | |
| dest="ledger", | |
| required=True, | |
| help="Path to your ledger CSV (columns: Date, Amount, etc.).", | |
| ) | |
| parser.add_argument( | |
| "--out-dir", | |
| default="reconciliation", | |
| help="Directory to write reconciliation outputs.", | |
| ) | |
| parser.add_argument( | |
| "--date-window-days", | |
| type=int, | |
| default=2, | |
| help="Allow matching within ±N days (default 2).", | |
| ) | |
| parser.add_argument( | |
| "--amount-tolerance", | |
| type=float, | |
| default=0.01, | |
| help="Allowed absolute amount diff (default 0.01).", | |
| ) | |
| parser.add_argument( | |
| "--per-month", | |
| action="store_true", | |
| help="Also write per-month detailed reports.", | |
| ) | |
| parser.add_argument("--month", help="Limit reconciliation to a specific month YYYY-MM.") | |
| parser.add_argument( | |
| "--months", | |
| help="Limit reconciliation to multiple months (comma-separated YYYY-MM values).", | |
| ) | |
| parser.add_argument("--start", help="Limit reconciliation start date YYYY-MM-DD (inclusive).") | |
| parser.add_argument("--end", help="Limit reconciliation end date YYYY-MM-DD (inclusive).") | |
| return parser | |
| def main(argv: Optional[List[str]] = None) -> None: | |
| parser = build_parser() | |
| args = parser.parse_args(argv) | |
| cmd_reconcile(args) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I place all my statements into the 'statements/' dir. Then put a csv somewhere, but I put them in csv/ for organization sake. Then An example of what the output of this script would be
Then you can review the reconciliation/unmatched_statements.csv and fill in any missing categories if there are any, then you can use that file to import what is missing into Monarch.