Skip to content

Instantly share code, notes, and snippets.

@matthewdavis
Created October 7, 2025 14:33
Show Gist options
  • Select an option

  • Save matthewdavis/3ee92e700059e6f589dba4802ed14a45 to your computer and use it in GitHub Desktop.

Select an option

Save matthewdavis/3ee92e700059e6f589dba4802ed14a45 to your computer and use it in GitHub Desktop.
python script to compare crew banking statements against a csv exported from monarch money
#!/usr/bin/env python3
"""CLI for reconciling Crew bank statements against a Monarch Money ledger CSV."""
import argparse
import csv
import os
import re
import subprocess
from collections import defaultdict
from dataclasses import dataclass, replace
from datetime import datetime, date
from typing import DefaultDict, Dict, List, Optional, Tuple
DATE_RE = re.compile(r"^\s*(\d{4}-\d{2}-\d{2})\b")
RIGHT_ALIGNED_AMOUNT_RE = re.compile(r"\$?([0-9]{1,3}(?:,[0-9]{3})*|[0-9]+)\.[0-9]{2}")
def normalize_vendor(value: str) -> str:
"""Normalize merchant-like values for fuzzy matching."""
cleaned = re.sub(r"[^\w\s]", " ", value.upper())
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
def lookup_hint(text: str, hints: Dict[str, str]) -> Optional[str]:
"""Return a best-effort match from normalized hints based on containment."""
norm = normalize_vendor(text)
if not norm:
return None
best_key: Optional[str] = None
best_len = -1
for key in hints.keys():
if not key:
continue
if key in norm or norm in key:
if len(key) > best_len:
best_len = len(key)
best_key = key
if best_key is None:
return None
return hints[best_key]
@dataclass
class Columns:
date_start: int
desc_start: int
deposits_start: int
withdrawals_start: int
balance_start: int
def slices(self, line: str) -> Tuple[str, str, str, str, str]:
date_slice = line[self.date_start:self.desc_start].strip()
desc_slice = line[self.desc_start:self.deposits_start].rstrip()
dep_start = max(self.deposits_start - 4, self.desc_start)
wdr_start = max(self.withdrawals_start - 4, self.deposits_start)
deposits_slice = line[dep_start:self.withdrawals_start].strip()
withdrawals_slice = line[wdr_start:self.balance_start].strip()
balance_slice = line[self.withdrawals_start:].strip()
return date_slice, desc_slice, deposits_slice, withdrawals_slice, balance_slice
@dataclass
class Txn:
date: str
amount: float
desc: str
extra: dict
matched: bool = False
def parse_date(d: str) -> date:
return datetime.strptime(d, "%Y-%m-%d").date()
def month_key(d: str) -> str:
return d[:7]
def build_month_filter(month: Optional[str], months: Optional[str]) -> Optional[set[str]]:
if not month and not months:
return None
out: set[str] = set()
if month:
out.add(month.strip())
if months:
for m in months.split(','):
m = m.strip()
if m:
out.add(m)
return out if out else None
def in_date_window(
d: str,
months: Optional[set[str]] = None,
start: Optional[str] = None,
end: Optional[str] = None,
) -> bool:
if (months is None or len(months) == 0) and start is None and end is None:
return True
day = parse_date(d)
if months is not None and len(months) > 0 and month_key(d) not in months:
return False
if start is not None and day < parse_date(start):
return False
if end is not None and day > parse_date(end):
return False
return True
def pdftotext_layout(pdf_path: str) -> List[str]:
res = subprocess.run(
["pdftotext", "-layout", pdf_path, "-"],
check=True,
capture_output=True,
text=True,
)
return res.stdout.splitlines()
def extract_columns(header_line: str) -> Optional[Columns]:
try:
date_start = header_line.index("Date")
desc_start = header_line.index("Description")
deposits_start = header_line.index("Deposits")
withdrawals_start = header_line.index("Withdrawals")
balance_start = header_line.index("Balance")
except ValueError:
return None
return Columns(date_start, desc_start, deposits_start, withdrawals_start, balance_start)
def parse_right_aligned_amount(s: str) -> Optional[float]:
matches = list(RIGHT_ALIGNED_AMOUNT_RE.finditer(s))
if not matches:
return None
num = matches[-1].group(0).lstrip('$').replace(',', '')
try:
return float(num)
except ValueError:
return None
def clean_spaces(s: str) -> str:
return re.sub(r"\s+", " ", s).strip()
def extract_transactions_from_lines(lines: List[str], source_pdf: str) -> List[dict]:
in_transactions = False
cols: Optional[Columns] = None
rows: List[dict] = []
current: Optional[dict] = None
for raw in lines:
line = raw.rstrip("\n")
if not in_transactions:
if line.strip() == "Transactions":
in_transactions = True
continue
if "Date" in line and "Description" in line and "Balance" in line:
maybe = extract_columns(line)
if maybe:
cols = maybe
continue
if line.strip().startswith("In case of errors"):
break
if "Account offered by" in line and "Member FDIC" in line:
continue
if "— Account #" in line or "Account Statement" in line:
continue
if cols is None:
continue
date_match = DATE_RE.match(line)
if date_match:
if current:
rows.append(current)
current = None
date_str, desc, deposits, withdrawals, _ = cols.slices(line)
margin = 8
dep_l = max(cols.deposits_start - margin, cols.desc_start)
wdr_l = max(cols.withdrawals_start - margin, cols.deposits_start)
deposits_tight = line[dep_l:cols.withdrawals_start]
withdrawals_tight = line[wdr_l:cols.balance_start]
dep_amt = parse_right_aligned_amount(deposits_tight)
wdr_amt = parse_right_aligned_amount(withdrawals_tight)
is_deposit = dep_amt is not None
is_withdrawal = wdr_amt is not None
amount: Optional[float] = None
tx_type: Optional[str] = None
if is_deposit and not is_withdrawal:
amount = dep_amt
tx_type = "deposit"
elif is_withdrawal and not is_deposit:
amount = -abs(wdr_amt)
tx_type = "withdrawal"
elif is_deposit and is_withdrawal:
dep_v = dep_amt if dep_amt is not None else 0.0
wdr_v = wdr_amt if wdr_amt is not None else 0.0
if abs(dep_v) >= abs(wdr_v):
amount = dep_v
tx_type = "deposit"
else:
amount = -abs(wdr_v)
tx_type = "withdrawal"
else:
if deposits_tight.strip().startswith("$"):
amount = 0.0
tx_type = "deposit"
desc_clean = clean_spaces(desc)
desc_clean = re.sub(r"\s*\$\d?$\s*$", "", desc_clean)
current = {
"date": date_str.strip(),
"description": desc_clean,
"amount": amount,
"type": tx_type,
"balance": None,
"source_pdf": os.path.basename(source_pdf),
}
continue
if current is not None:
padded = line + " " * max(0, cols.balance_start - len(line))
_, desc_cont, _, _, _ = cols.slices(padded)
desc_cont = clean_spaces(desc_cont)
if desc_cont:
current["description"] = clean_spaces(f"{current['description']} {desc_cont}")
continue
if current:
rows.append(current)
return [r for r in rows if r.get("amount") is not None]
def extract_pdf(pdf_path: str) -> List[dict]:
lines = pdftotext_layout(pdf_path)
return extract_transactions_from_lines(lines, pdf_path)
def read_extracted(path: str) -> List[Txn]:
out: List[Txn] = []
with open(path, newline="") as f:
for row in csv.DictReader(f):
date_val = row["date"].strip()
amount = float(row["amount"])
desc = row.get("description", "").strip()
out.append(Txn(date=date_val, amount=amount, desc=desc, extra=row))
return out
def read_user_csv(path: str) -> List[Txn]:
out: List[Txn] = []
with open(path, newline="") as f:
for row in csv.DictReader(f):
date_val = row["Date"].strip()
amount = float(row["Amount"])
desc = (row.get("Original Statement") or row.get("Merchant") or "").strip()
out.append(Txn(date=date_val, amount=amount, desc=desc, extra=row))
return out
def build_merchant_category_hints(ledger: List[Txn]) -> Tuple[Dict[str, str], Dict[str, str]]:
merchant_hints: Dict[str, str] = {}
category_counts: Dict[str, DefaultDict[str, int]] = {}
for txn in ledger:
merchant = txn.extra.get("Merchant", "").strip()
original = txn.extra.get("Original Statement", "").strip()
category = txn.extra.get("Category", "").strip()
for source in (merchant, original):
if not source:
continue
key = normalize_vendor(source)
if not key:
continue
canonical = merchant or source
merchant_hints.setdefault(key, canonical)
if category:
category_counts.setdefault(key, defaultdict(int))[category] += 1
category_hints: Dict[str, str] = {}
for key, counts in category_counts.items():
if counts:
category_hints[key] = max(counts.items(), key=lambda kv: kv[1])[0]
return merchant_hints, category_hints
def reconcile_fuzzy(
statements: List[Txn],
ledger: List[Txn],
date_window_days: int = 2,
amount_tolerance: float = 0.01,
) -> Tuple[List[Tuple[Txn, Txn, int, float]], List[Txn], List[Txn]]:
unmatched_ledger: List[Txn] = list(ledger)
matched: List[Tuple[Txn, Txn, int, float]] = []
for stmt_txn in statements:
stmt_date = parse_date(stmt_txn.date)
stmt_amt = stmt_txn.amount
best: Optional[Tuple[int, float, int]] = None
best_idx: Optional[int] = None
for idx, ledger_txn in enumerate(unmatched_ledger):
if ledger_txn.matched:
continue
if (stmt_amt >= 0) != (ledger_txn.amount >= 0):
continue
amt_diff = abs(stmt_amt - ledger_txn.amount)
if amt_diff > amount_tolerance:
continue
days = abs((parse_date(ledger_txn.date) - stmt_date).days)
if days > date_window_days:
continue
key = (days, amt_diff, idx)
if best is None or key < best:
best = key
best_idx = idx
if best_idx is not None and best is not None:
ledger_txn = unmatched_ledger[best_idx]
ledger_txn.matched = True
stmt_txn.matched = True
matched.append((stmt_txn, ledger_txn, best[0], best[1]))
unmatched_statements = [t for t in statements if not t.matched]
unmatched_ledger = [t for t in unmatched_ledger if not t.matched]
return matched, unmatched_statements, unmatched_ledger
def write_report_detailed(
matched: List[Tuple[Txn, Txn, int, float]],
unmatched_statements: List[Txn],
unmatched_ledger: List[Txn],
out_dir: str,
filename: str = "report.csv",
) -> str:
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, filename)
with open(out_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"status",
"s_date",
"s_amount",
"s_description",
"s_source_pdf",
"c_date",
"c_amount",
"c_merchant",
"c_category",
"c_original_statement",
"c_account",
"c_notes",
"c_tags",
"date_delta_days",
"amount_diff",
])
for stmt_txn, ledger_txn, days, amt_diff in matched:
writer.writerow([
"matched",
stmt_txn.date,
f"{stmt_txn.amount:.2f}",
stmt_txn.desc,
stmt_txn.extra.get("source_pdf", ""),
ledger_txn.date,
f"{ledger_txn.amount:.2f}",
ledger_txn.extra.get("Merchant", ""),
ledger_txn.extra.get("Category", ""),
ledger_txn.extra.get("Original Statement", ""),
ledger_txn.extra.get("Account", ""),
ledger_txn.extra.get("Notes", ""),
ledger_txn.extra.get("Tags", ""),
days,
f"{amt_diff:.2f}",
])
for stmt_txn in unmatched_statements:
writer.writerow([
"unmatched_statement",
stmt_txn.date,
f"{stmt_txn.amount:.2f}",
stmt_txn.desc,
stmt_txn.extra.get("source_pdf", ""),
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
])
for ledger_txn in unmatched_ledger:
writer.writerow([
"unmatched_csv",
"",
"",
"",
"",
ledger_txn.date,
f"{ledger_txn.amount:.2f}",
ledger_txn.extra.get("Merchant", ""),
ledger_txn.extra.get("Category", ""),
ledger_txn.extra.get("Original Statement", ""),
ledger_txn.extra.get("Account", ""),
ledger_txn.extra.get("Notes", ""),
ledger_txn.extra.get("Tags", ""),
"",
"",
])
return out_path
def guess_merchant(desc: str, merchant_hints: Dict[str, str]) -> str:
s = desc.strip()
if not s:
return ""
hint = lookup_hint(s, merchant_hints)
if hint:
return hint
ach_match = re.match(r"^INCOMING ACH\s+(?:CREDIT|DEBIT|DEPOSIT)\s+([^\[,]+)", s, re.IGNORECASE)
if ach_match:
ach_val = ach_match.group(1).strip()
return lookup_hint(ach_val, merchant_hints) or ach_val
transfer_match = re.match(r"^(?:DEPOSIT|WITHDRAWAL) for book transfer (?:from|to)\s+([^,]+)", s, re.IGNORECASE)
if transfer_match:
transfer_val = transfer_match.group(1).strip()
return lookup_hint(transfer_val, merchant_hints) or transfer_val
if ' - ' in s:
left = s.split(' - ', 1)[0]
else:
left = s
parts = left.split()
while parts and parts[-1].isdigit() and len(parts[-1]) >= 6:
parts.pop()
left = ' '.join(parts).strip()
return lookup_hint(left, merchant_hints) or left
def guess_category(merchant: str, desc: str, category_hints: Dict[str, str]) -> str:
for text in (merchant, desc):
hint = lookup_hint(text, category_hints)
if hint:
return hint
s = desc.upper()
m = merchant.upper()
transfer_keywords = ['TRANSFER', 'ACH', 'WIRE', 'ZELLE', 'VENMO', 'CASH APP']
if any(word in s for word in transfer_keywords) or any(word in m for word in transfer_keywords):
return 'Transfer'
payroll_keywords = ['PAYROLL', 'PAYCHECK', 'SALARY', 'DIRECT DEPOSIT']
if any(word in s for word in payroll_keywords) or any(word in m for word in payroll_keywords):
return 'Paychecks'
card_payment_keywords = ['CARDMEMBER', 'CREDIT CARD PAYMENT', 'CARD PAYMENT', 'AMERICAN EXPRESS', 'AMEX', 'VISA PAYMENT']
if any(word in s for word in card_payment_keywords) or any(word in m for word in card_payment_keywords):
return 'Credit Card Payment'
utility_keywords = ['UTILITY', 'UTILITIES', 'ENERGY', 'ELECTRIC', 'POWER', 'GAS', 'WATER', 'SEWER']
if any(word in s for word in utility_keywords) or any(word in m for word in utility_keywords):
return 'Utilities'
phone_keywords = ['PHONE', 'CELL', 'MOBILE', 'WIRELESS']
if any(word in s for word in phone_keywords) or any(word in m for word in phone_keywords):
return 'Phone'
internet_keywords = ['INTERNET', 'STREAM', 'CABLE', 'BROADBAND']
if any(word in s for word in internet_keywords) or any(word in m for word in internet_keywords):
return 'TV & Internet'
insurance_keywords = ['INSURANCE', 'PREMIUM']
if any(word in s for word in insurance_keywords) or any(word in m for word in insurance_keywords):
return 'Insurance'
if 'MORTGAGE' in s or 'MORTGAGE' in m:
return 'Mortgage'
if 'CHECK' in s or 'CHECK' in m:
return 'Check'
subscription_keywords = ['SUBSCRIPT', 'MEMBERSHIP']
if any(word in s for word in subscription_keywords) or any(word in m for word in subscription_keywords):
return 'Subscriptions'
hosting_keywords = ['HOSTING', 'SERVER', 'VPS']
if any(word in s for word in hosting_keywords) or any(word in m for word in hosting_keywords):
return 'Web Hosting'
entertainment_keywords = ['ENTERTAINMENT', 'THEATER', 'CINEMA', 'GAME', 'GAMING']
if any(word in s for word in entertainment_keywords) or any(word in m for word in entertainment_keywords):
return 'Entertainment & Recreation'
retail_keywords = ['STORE', 'MARKET', 'SHOP', 'MART', 'GROCERY']
if any(word in s for word in retail_keywords) or any(word in m for word in retail_keywords):
return 'Shopping'
donation_keywords = ['DONATION', 'CHURCH', 'TITHE']
if any(word in s for word in donation_keywords) or any(word in m for word in donation_keywords):
return 'Donations'
return ''
def write_unmatched_statements_csv(
rows: List[Txn],
path: str,
merchant_hints: Dict[str, str],
category_hints: Dict[str, str],
) -> str:
os.makedirs(os.path.dirname(path), exist_ok=True)
fieldnames = [
"Date",
"Merchant",
"Category",
"Account",
"Original Statement",
"Notes",
"Amount",
"Tags",
]
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for txn in rows:
merchant = guess_merchant(txn.desc, merchant_hints)
category = guess_category(merchant, txn.desc, category_hints)
writer.writerow({
"Date": txn.date,
"Merchant": merchant,
"Category": category,
"Account": "Bank Account",
"Original Statement": txn.desc,
"Notes": "",
"Amount": f"{txn.amount:.2f}",
"Tags": "",
})
return path
def cmd_reconcile(args: argparse.Namespace) -> None:
selected_months = build_month_filter(args.month, args.months)
def load_statements_from_inputs(inputs: List[str]) -> List[Txn]:
txns: List[Txn] = []
for inp in inputs:
if os.path.isdir(inp):
for name in sorted(os.listdir(inp)):
path = os.path.join(inp, name)
if name.lower().endswith('.csv'):
txns.extend(read_extracted(path))
elif name.lower().endswith('.pdf'):
rows = extract_pdf(path)
for row in rows:
txns.append(Txn(date=row['date'], amount=float(row['amount']), desc=row.get('description', ''), extra=row))
elif inp.lower().endswith('.csv'):
txns.extend(read_extracted(inp))
elif inp.lower().endswith('.pdf'):
rows = extract_pdf(inp)
for row in rows:
txns.append(Txn(date=row['date'], amount=float(row['amount']), desc=row.get('description', ''), extra=row))
else:
raise SystemExit(f"Unsupported --statements input: {inp}")
return [t for t in txns if in_date_window(t.date, selected_months, args.start, args.end)]
statements = load_statements_from_inputs(args.statements)
ledger_all = read_user_csv(args.ledger)
merchant_hints, category_hints = build_merchant_category_hints(ledger_all)
statements = [t for t in statements if in_date_window(t.date, selected_months, args.start, args.end)]
ledger = [t for t in ledger_all if in_date_window(t.date, selected_months, args.start, args.end)]
matched, unmatched_statements, unmatched_ledger = reconcile_fuzzy(
statements,
ledger,
date_window_days=args.date_window_days,
amount_tolerance=args.amount_tolerance,
)
report_path = write_report_detailed(matched, unmatched_statements, unmatched_ledger, args.out_dir, filename="report.csv")
print(f"Matched: {len(matched)}")
print(f"Unmatched in statements: {len(unmatched_statements)}")
print(f"Unmatched in csv: {len(unmatched_ledger)}")
print(f"Report -> {report_path}")
unmatched_export = write_unmatched_statements_csv(
unmatched_statements,
os.path.join(args.out_dir, "unmatched_statements.csv"),
merchant_hints,
category_hints,
)
print(f"Unmatched statements (Monarch import) -> {unmatched_export}")
if args.per_month:
by_month_statements: Dict[str, List[Txn]] = defaultdict(list)
by_month_ledger: Dict[str, List[Txn]] = defaultdict(list)
for txn in statements:
by_month_statements[month_key(txn.date)].append(replace(txn))
for txn in ledger:
by_month_ledger[month_key(txn.date)].append(replace(txn))
months = sorted(set(by_month_statements.keys()) | set(by_month_ledger.keys()))
if selected_months:
months = [m for m in months if m in selected_months]
summary_path = os.path.join(args.out_dir, "monthly_summary.csv")
with open(summary_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["month", "matched", "unmatched_statement", "unmatched_csv", "report_path"])
for month in months:
stmts = by_month_statements.get(month, [])
ledg = by_month_ledger.get(month, [])
matched_m, um_s_m, um_l_m = reconcile_fuzzy(
stmts,
ledg,
args.date_window_days,
args.amount_tolerance,
)
month_dir = os.path.join(args.out_dir, month)
month_report = write_report_detailed(matched_m, um_s_m, um_l_m, month_dir, filename=f"{month}-report.csv")
writer.writerow([month, len(matched_m), len(um_s_m), len(um_l_m), month_report])
write_unmatched_statements_csv(
um_s_m,
os.path.join(month_dir, f"{month}-unmatched_statements.csv"),
merchant_hints,
category_hints,
)
print(f"Monthly summary -> {summary_path}")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Reconcile Crew statement PDFs (and extracted CSVs) against a ledger CSV.",
)
parser.add_argument(
"--statements",
required=True,
nargs='+',
help="Path(s) to extracted CSVs, statement PDFs, or directories containing them.",
)
parser.add_argument(
"--csv",
dest="ledger",
required=True,
help="Path to your ledger CSV (columns: Date, Amount, etc.).",
)
parser.add_argument(
"--out-dir",
default="reconciliation",
help="Directory to write reconciliation outputs.",
)
parser.add_argument(
"--date-window-days",
type=int,
default=2,
help="Allow matching within ±N days (default 2).",
)
parser.add_argument(
"--amount-tolerance",
type=float,
default=0.01,
help="Allowed absolute amount diff (default 0.01).",
)
parser.add_argument(
"--per-month",
action="store_true",
help="Also write per-month detailed reports.",
)
parser.add_argument("--month", help="Limit reconciliation to a specific month YYYY-MM.")
parser.add_argument(
"--months",
help="Limit reconciliation to multiple months (comma-separated YYYY-MM values).",
)
parser.add_argument("--start", help="Limit reconciliation start date YYYY-MM-DD (inclusive).")
parser.add_argument("--end", help="Limit reconciliation end date YYYY-MM-DD (inclusive).")
return parser
def main(argv: Optional[List[str]] = None) -> None:
parser = build_parser()
args = parser.parse_args(argv)
cmd_reconcile(args)
if __name__ == "__main__":
main()
@matthewdavis
Copy link
Author

matthewdavis commented Oct 7, 2025

I place all my statements into the 'statements/' dir. Then put a csv somewhere, but I put them in csv/ for organization sake. Then An example of what the output of this script would be


$ ./crew_toolkit.py --statements statements/ --csv csv/transactions-07-oct-2025.csv 
Matched:                 267
Unmatched in statements: 23
Unmatched in csv:        15
Report -> reconciliation/report.csv
Unmatched statements (Monarch import) -> reconciliation/unmatched_statements.csv

Then you can review the reconciliation/unmatched_statements.csv and fill in any missing categories if there are any, then you can use that file to import what is missing into Monarch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment