Skip to content

Instantly share code, notes, and snippets.

@juanandresgs
Created February 9, 2026 23:18
Show Gist options
  • Select an option

  • Save juanandresgs/5ed1aa00c0c6bbd28a73c81915891b88 to your computer and use it in GitHub Desktop.

Select an option

Save juanandresgs/5ed1aa00c0c6bbd28a73c81915891b88 to your computer and use it in GitHub Desktop.
Synapse Convention Review & Migration Tool — AST-based linter with --fix mode + interactive convention reviewer for 11 dimensions
#!/usr/bin/env python3
"""
Interactive convention review and migration tool for Synapse.
Scans the codebase for 11 convention dimensions, presents statistics,
collects user decisions, and generates migration artifacts.
@decision DEC-CONV-001
@title Convention review tool architecture
@status accepted
@rationale Implements interactive review workflow with 11 convention dimensions.
Uses AST + tokenize for detection, presents stats, collects user choices,
generates config and migration reports. Detector functions are pure and
operate on parsed file data for efficiency. Session manages interactive
flow, OutputGenerator handles artifacts. CLI provides review, status,
migrate, and guide subcommands.
"""
import os
import sys
import io
import ast
import re
import json
import argparse
import tokenize
import datetime
import collections
# ============================================================================
# Detector Functions
# ============================================================================
def detect_single_quotes(filepath, source, source_lines, tree, tokens):
"""Detect single-quoted strings (excluding triple-quoted)."""
results = []
for tok in tokens:
if tok.type == tokenize.STRING:
# Strip prefix (r, b, f, u, etc.)
text = tok.string
stripped = text.lstrip("rfbRFBuU")
if stripped.startswith("'") and not stripped.startswith("'''"):
lineno = tok.start[0]
results.append(
(
lineno,
source_lines[lineno - 1]
if lineno <= len(source_lines)
else text,
)
)
return results
def detect_double_quotes(filepath, source, source_lines, tree, tokens):
"""Detect double-quoted strings (excluding triple-quoted)."""
results = []
for tok in tokens:
if tok.type == tokenize.STRING:
text = tok.string
stripped = text.lstrip("rfbRFBuU")
if stripped.startswith('"') and not stripped.startswith('"""'):
lineno = tok.start[0]
results.append(
(
lineno,
source_lines[lineno - 1]
if lineno <= len(source_lines)
else text,
)
)
return results
def detect_triple_single_quotes(filepath, source, source_lines, tree, tokens):
"""Detect triple single-quoted strings."""
results = []
for tok in tokens:
if tok.type == tokenize.STRING:
text = tok.string
stripped = text.lstrip("rfbRFBuU")
if stripped.startswith("'''"):
lineno = tok.start[0]
results.append(
(
lineno,
source_lines[lineno - 1]
if lineno <= len(source_lines)
else text,
)
)
return results
def detect_triple_double_quotes(filepath, source, source_lines, tree, tokens):
"""Detect triple double-quoted strings."""
results = []
for tok in tokens:
if tok.type == tokenize.STRING:
text = tok.string
stripped = text.lstrip("rfbRFBuU")
if stripped.startswith('"""'):
lineno = tok.start[0]
results.append(
(
lineno,
source_lines[lineno - 1]
if lineno <= len(source_lines)
else text,
)
)
return results
def detect_single_blank_lines(filepath, source, source_lines, tree, tokens):
"""Detect single blank line before def/class."""
results = []
for i, line in enumerate(source_lines):
stripped = line.lstrip()
if (
stripped.startswith("def ")
or stripped.startswith("class ")
or stripped.startswith("async def ")
):
# Check if exactly 1 blank line precedes
if i > 0 and source_lines[i - 1].strip() == "":
if i == 1 or source_lines[i - 2].strip() != "":
results.append((i + 1, line))
return results
def detect_double_blank_lines(filepath, source, source_lines, tree, tokens):
"""Detect double blank lines before def/class."""
results = []
for i, line in enumerate(source_lines):
stripped = line.lstrip()
if (
stripped.startswith("def ")
or stripped.startswith("class ")
or stripped.startswith("async def ")
):
# Check if 2+ blank lines precede
if (
i >= 2
and source_lines[i - 1].strip() == ""
and source_lines[i - 2].strip() == ""
):
results.append((i + 1, line))
return results
def detect_regex_import(filepath, source, source_lines, tree, tokens):
"""Detect 'import regex' or 'from regex import'."""
results = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name == "regex":
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else "import regex",
)
)
elif isinstance(node, ast.ImportFrom):
if node.module == "regex":
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else "from regex import",
)
)
return results
def detect_re_import(filepath, source, source_lines, tree, tokens):
"""Detect 'import re' or 'from re import'."""
results = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name == "re":
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else "import re",
)
)
elif isinstance(node, ast.ImportFrom):
if node.module == "re":
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else "from re import",
)
)
return results
def detect_logger_percent(filepath, source, source_lines, tree, tokens):
"""Detect logger calls with %s lazy interpolation."""
results = []
logger_methods = {
"debug",
"info",
"warning",
"error",
"critical",
"log",
"exception",
}
for node in ast.walk(tree):
if isinstance(node, ast.Call):
# Check if it's logger.method() or self.logger.method()
if (
isinstance(node.func, ast.Attribute)
and node.func.attr in logger_methods
):
if (
node.args
and isinstance(node.args[0], ast.Constant)
and isinstance(node.args[0].value, str)
):
if (
"%s" in node.args[0].value
or "%d" in node.args[0].value
or "%r" in node.args[0].value
):
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else str(node.args[0].value),
)
)
return results
def detect_logger_format(filepath, source, source_lines, tree, tokens):
"""Detect logger calls with .format() eager interpolation."""
results = []
logger_methods = {
"debug",
"info",
"warning",
"error",
"critical",
"log",
"exception",
}
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if (
isinstance(node.func, ast.Attribute)
and node.func.attr in logger_methods
):
if node.args and isinstance(node.args[0], ast.Call):
if (
isinstance(node.args[0].func, ast.Attribute)
and node.args[0].func.attr == "format"
):
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else "format()",
)
)
return results
def detect_exception_keyword_args(filepath, source, source_lines, tree, tokens):
"""Detect raise s_exc.X(mesg=...) keyword style."""
results = []
for node in ast.walk(tree):
if isinstance(node, ast.Raise):
if isinstance(node.exc, ast.Call):
# Check if any keyword args present
if node.exc.keywords:
for kw in node.exc.keywords:
if kw.arg == "mesg":
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else "mesg=",
)
)
break
return results
def detect_exception_positional_args(filepath, source, source_lines, tree, tokens):
"""Detect raise s_exc.X('msg') positional style."""
results = []
for node in ast.walk(tree):
if isinstance(node, ast.Raise):
if isinstance(node.exc, ast.Call):
# Check if positional args present and no mesg= keyword
if node.exc.args and not any(
kw.arg == "mesg" for kw in node.exc.keywords
):
results.append(
(
node.lineno,
source_lines[node.lineno - 1]
if node.lineno <= len(source_lines)
else "positional",
)
)
return results
def detect_camelcase_methods(filepath, source, source_lines, tree, tokens):
"""Detect camelCase method names."""
results = []
pattern = re.compile(r"^_?[a-z][a-zA-Z0-9]*$")
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
name = node.name
# Skip dunders, test methods, single-word lowercase
if name.startswith("__") or name.startswith("test_"):
continue
# Must match pattern and contain mixed case
if pattern.match(name) and any(c.isupper() for c in name):
results.append((node.lineno, f"def {name}"))
return results
def detect_snakecase_methods(filepath, source, source_lines, tree, tokens):
"""Detect snake_case method names."""
results = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
name = node.name
# Skip dunders
if name.startswith("__"):
continue
# Must contain underscore in non-prefix part and be lowercase
stripped = name.lstrip("_")
if "_" in stripped and stripped.islower():
results.append((node.lineno, f"def {name}"))
return results
def detect_param_valu(filepath, source, source_lines, tree, tokens):
"""Detect parameters named 'valu'."""
results = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs:
if arg.arg == "valu":
results.append((node.lineno, f"def {node.name}(..., valu, ...)"))
return results
def detect_param_value(filepath, source, source_lines, tree, tokens):
"""Detect parameters named 'value'."""
results = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs:
if arg.arg == "value":
results.append((node.lineno, f"def {node.name}(..., value, ...)"))
return results
def detect_getter_methods(filepath, source, source_lines, tree, tokens):
"""Detect explicit getter-style methods (getX pattern) without @property."""
results = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
# Check if method name starts with 'get' and has no @property
if node.name.startswith("get") and len(node.name) > 3:
has_property = any(
isinstance(dec, ast.Name) and dec.id == "property"
for dec in node.decorator_list
)
if not has_property:
results.append((node.lineno, f"def {node.name}"))
return results
def detect_property_methods(filepath, source, source_lines, tree, tokens):
"""Detect methods with @property decorator."""
results = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
for dec in node.decorator_list:
if isinstance(dec, ast.Name) and dec.id == "property":
results.append((node.lineno, f"@property def {node.name}"))
break
return results
def _base_is_syntest(base):
"""Check if a base class node refers to SynTest or StormPkgTest."""
known = ("SynTest", "StormPkgTest")
if isinstance(base, ast.Name) and base.id in known:
return True
if isinstance(base, ast.Attribute) and base.attr in known:
return True
return False
def detect_syntest_base(filepath, source, source_lines, tree, tokens):
"""Detect test classes inheriting from SynTest/StormPkgTest."""
if "synapse/tests/" not in filepath.replace("\\", "/"):
return []
results = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
for base in node.bases:
if _base_is_syntest(base):
base_name = base.id if isinstance(base, ast.Name) else base.attr
results.append((node.lineno, f"class {node.name}({base_name})"))
break
return results
def detect_other_test_base(filepath, source, source_lines, tree, tokens):
"""Detect test classes NOT inheriting from SynTest."""
if "synapse/tests/" not in filepath.replace("\\", "/"):
return []
results = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
has_test_methods = any(
isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
and n.name.startswith("test_")
for n in node.body
)
if has_test_methods:
inherits_syntest = any(_base_is_syntest(base) for base in node.bases)
if not inherits_syntest:
results.append((node.lineno, f"class {node.name}"))
return results
def detect_async_test_methods(filepath, source, source_lines, tree, tokens):
"""Detect async def test_* in test classes."""
if "synapse/tests/" not in filepath.replace("\\", "/"):
return []
results = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
for item in node.body:
if isinstance(item, ast.AsyncFunctionDef) and item.name.startswith(
"test_"
):
results.append((item.lineno, f"async def {item.name}"))
return results
def detect_sync_test_methods(filepath, source, source_lines, tree, tokens):
"""Detect non-async def test_* in test classes."""
if "synapse/tests/" not in filepath.replace("\\", "/"):
return []
results = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name.startswith("test_"):
results.append((item.lineno, f"def {item.name}"))
return results
# ============================================================================
# ConventionDimension Class
# ============================================================================
class ConventionDimension:
"""Defines one convention with two sides (A and B)."""
def __init__(
self,
name,
code,
description,
option_a,
option_b,
auto_fixable=False,
fix_rule=None,
):
self.name = name
self.code = code
self.description = description
self.option_a = option_a # {label, detector}
self.option_b = option_b # {label, detector}
self.auto_fixable = auto_fixable
self.fix_rule = fix_rule
# Populated by scan
self.count_a = 0
self.count_b = 0
self.files_a = set()
self.files_b = set()
self.samples_a = []
self.samples_b = []
def scan_with_data(self, file_data):
"""Run both detectors against all files, populate counts and samples."""
for filepath, (source, source_lines, tree, tokens) in file_data.items():
# Run detector A
results_a = self.option_a["detector"](
filepath, source, source_lines, tree, tokens
)
if results_a:
self.count_a += len(results_a)
self.files_a.add(filepath)
for lineno, text in results_a[:3]:
if len(self.samples_a) < 3:
self.samples_a.append((filepath, lineno, text))
# Run detector B
results_b = self.option_b["detector"](
filepath, source, source_lines, tree, tokens
)
if results_b:
self.count_b += len(results_b)
self.files_b.add(filepath)
for lineno, text in results_b[:3]:
if len(self.samples_b) < 3:
self.samples_b.append((filepath, lineno, text))
def total(self):
return self.count_a + self.count_b
def pct_a(self):
total = self.total()
return (self.count_a / total * 100) if total else 0
def pct_b(self):
total = self.total()
return (self.count_b / total * 100) if total else 0
# ============================================================================
# Scan Infrastructure
# ============================================================================
def _collect_files(paths):
"""Walk paths and return list of .py files, excluding vendor/."""
files = []
for path in paths:
if os.path.isfile(path) and path.endswith(".py"):
files.append(path)
elif os.path.isdir(path):
for root, dirs, filenames in os.walk(path):
dirs[:] = [d for d in dirs if d != "vendor" and not d.startswith(".")]
for f in sorted(filenames):
if f.endswith(".py"):
files.append(os.path.join(root, f))
return files
def _parse_file(filepath):
"""Return (source, source_lines, tree, tokens) or None on error."""
try:
with open(filepath, "r", encoding="utf-8") as f:
source = f.read()
except (OSError, UnicodeDecodeError):
return None
source_lines = source.splitlines()
try:
tree = ast.parse(source, filename=filepath)
except SyntaxError:
return None
try:
tokens = list(tokenize.generate_tokens(io.StringIO(source).readline))
except tokenize.TokenError:
tokens = []
return source, source_lines, tree, tokens
# ============================================================================
# Build Dimensions
# ============================================================================
def _build_dimensions():
"""Return list of all 11 ConventionDimension instances."""
return [
ConventionDimension(
name="String Quoting",
code="string_quoting",
description="Single vs double quotes for strings",
option_a={
"label": "Single quotes ('...)",
"detector": detect_single_quotes,
},
option_b={
"label": 'Double quotes ("...)',
"detector": detect_double_quotes,
},
auto_fixable=True,
fix_rule="SYN020",
),
ConventionDimension(
name="Docstring Quoting",
code="docstring_quoting",
description="Triple single vs triple double quotes for docstrings",
option_a={
"label": "Triple single (''')",
"detector": detect_triple_single_quotes,
},
option_b={
"label": 'Triple double (""")',
"detector": detect_triple_double_quotes,
},
auto_fixable=True,
fix_rule="SYN021",
),
ConventionDimension(
name="Blank Lines Between Defs",
code="blank_lines",
description="Single vs double blank lines before function/class definitions",
option_a={
"label": "Single blank line",
"detector": detect_single_blank_lines,
},
option_b={
"label": "Double blank lines",
"detector": detect_double_blank_lines,
},
auto_fixable=True,
fix_rule="SYN030",
),
ConventionDimension(
name="Regex Module",
code="regex_module",
description="Use of regex vs re module",
option_a={"label": "import regex", "detector": detect_regex_import},
option_b={"label": "import re", "detector": detect_re_import},
auto_fixable=True,
fix_rule="SYN004",
),
ConventionDimension(
name="Logger Interpolation",
code="logger_interpolation",
description="Lazy %s vs eager .format() in logger calls",
option_a={
"label": "%s lazy interpolation",
"detector": detect_logger_percent,
},
option_b={"label": ".format() eager", "detector": detect_logger_format},
auto_fixable=True,
fix_rule="SYN041",
),
ConventionDimension(
name="Exception Args",
code="exception_args",
description="Keyword vs positional args for exception construction",
option_a={
"label": "Keyword (mesg=)",
"detector": detect_exception_keyword_args,
},
option_b={
"label": "Positional",
"detector": detect_exception_positional_args,
},
auto_fixable=True,
fix_rule="SYN050",
),
ConventionDimension(
name="Method Naming",
code="method_naming",
description="camelCase vs snake_case for method names",
option_a={"label": "camelCase", "detector": detect_camelcase_methods},
option_b={"label": "snake_case", "detector": detect_snakecase_methods},
auto_fixable=False,
),
ConventionDimension(
name="Parameter Naming",
code="parameter_naming",
description="valu vs value for parameter names",
option_a={"label": "valu", "detector": detect_param_valu},
option_b={"label": "value", "detector": detect_param_value},
auto_fixable=False,
),
ConventionDimension(
name="Property Usage",
code="property_usage",
description="Explicit getters vs @property decorators",
option_a={
"label": "Explicit getters (getX)",
"detector": detect_getter_methods,
},
option_b={"label": "@property", "detector": detect_property_methods},
auto_fixable=False,
),
ConventionDimension(
name="Test Base Class",
code="test_base_class",
description="SynTest vs other test base classes",
option_a={"label": "SynTest/StormPkgTest", "detector": detect_syntest_base},
option_b={
"label": "Other base classes",
"detector": detect_other_test_base,
},
auto_fixable=False,
),
ConventionDimension(
name="Test Method Style",
code="test_method_style",
description="async vs sync test methods",
option_a={
"label": "async def test_*",
"detector": detect_async_test_methods,
},
option_b={
"label": "def test_* (sync)",
"detector": detect_sync_test_methods,
},
auto_fixable=False,
),
]
# ============================================================================
# ReviewSession Class
# ============================================================================
class ReviewSession:
"""Runs all dimension scans, presents interactive UI, collects decisions."""
def __init__(self, dimensions):
self.dimensions = dimensions
self.decisions = {}
def scan(self, paths):
"""Scan all files and populate all dimensions."""
files = _collect_files(paths)
print(f"Scanning {len(files)} files...\n")
file_data = {}
for fp in files:
parsed = _parse_file(fp)
if parsed:
file_data[fp] = parsed
for dim in self.dimensions:
dim.scan_with_data(file_data)
def present(self, dim, index, total):
"""Print stats for one dimension."""
print("=" * 60)
print(f" Convention {index} of {total}: {dim.name}")
print("=" * 60)
print()
print(f" [A] {dim.option_a['label']}")
print(
f" {dim.count_a:,} instances across {len(dim.files_a)} files ({dim.pct_a():.1f}%)"
)
print()
print(f" [B] {dim.option_b['label']}")
print(
f" {dim.count_b:,} instances across {len(dim.files_b)} files ({dim.pct_b():.1f}%)"
)
print()
if dim.samples_a or dim.samples_b:
print(" Samples:")
for fp, ln, txt in dim.samples_a[:2]:
print(f" A: {fp}:{ln} {txt.strip()[:60]}")
for fp, ln, txt in dim.samples_b[:2]:
print(f" B: {fp}:{ln} {txt.strip()[:60]}")
print()
if dim.auto_fixable:
minority = dim.count_b if dim.count_a >= dim.count_b else dim.count_a
minority_files = (
len(dim.files_b) if dim.count_a >= dim.count_b else len(dim.files_a)
)
print(f" Auto-fixable: YES ({dim.fix_rule} --fix)")
print(f" Migration scope: {minority_files} files, ~{minority} changes")
else:
print(f" Auto-fixable: NO (requires manual migration)")
print()
def prompt_choice(self, dim):
"""Ask user for A/B/S choice. Returns 'a', 'b', or 's'."""
while True:
choice = (
input(
f" Choice: [A] {dim.option_a['label']} [B] {dim.option_b['label']} [S] Skip\n > "
)
.strip()
.lower()
)
if choice in ("a", "b", "s"):
return choice
print(" Please enter A, B, or S")
def run_interactive(self, paths):
"""Main interactive loop."""
self.scan(paths)
total = len(self.dimensions)
for i, dim in enumerate(self.dimensions, 1):
self.present(dim, i, total)
choice = self.prompt_choice(dim)
if choice == "s":
print(f" -> Skipped\n")
continue
canonical = (
dim.option_a["label"] if choice == "a" else dim.option_b["label"]
)
self.decisions[dim.code] = {
"choice": choice,
"canonical": canonical,
"rule": dim.fix_rule or dim.code.upper(),
"auto_fixable": dim.auto_fixable,
"violation_count": dim.count_b if choice == "a" else dim.count_a,
"files_affected": len(dim.files_b if choice == "a" else dim.files_a),
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
}
print(f" ✓ Canonical: {canonical}")
vcount = self.decisions[dim.code]["violation_count"]
fcount = self.decisions[dim.code]["files_affected"]
print(f" {vcount} instances to migrate across {fcount} files\n")
self._print_summary()
return self.decisions
def _print_summary(self):
"""Print final summary."""
print("=" * 60)
print(" Review Complete")
print("=" * 60)
print(f"\n Decisions recorded: {len(self.decisions)}/{len(self.dimensions)}")
fixable = {k: v for k, v in self.decisions.items() if v["auto_fixable"]}
if fixable:
print("\n Auto-fixable migrations available:")
for code, dec in fixable.items():
print(
f" {dec['rule']} {code}: {dec['violation_count']} changes across {dec['files_affected']} files"
)
print()
# ============================================================================
# OutputGenerator Class
# ============================================================================
class OutputGenerator:
"""Writes .synapse_conventions.json and migration reports."""
@staticmethod
def to_config(decisions, output_path=".synapse_conventions.json"):
"""Write decisions to config file."""
data = {
"version": 1,
"reviewed_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"reviewed_by": "maintainer",
"dimensions": decisions,
}
with open(output_path, "w") as f:
json.dump(data, f, indent=2)
f.write("\n")
return output_path
@staticmethod
def from_config(config_path=".synapse_conventions.json"):
"""Load decisions from config file."""
with open(config_path, "r") as f:
return json.load(f)
@staticmethod
def to_migration_report(decisions):
"""Generate migration report text."""
lines = ["Convention Review Migration Report", "=" * 40, ""]
for code, dec in decisions.items():
status = "AUTO" if dec["auto_fixable"] else "MANUAL"
lines.append(f"[{status}] {code}: {dec['canonical']}")
lines.append(
f" {dec['violation_count']} violations in {dec['files_affected']} files"
)
if dec["auto_fixable"]:
lines.append(
f" Fix: synapse_stylecheck.py --fix --select {dec['rule']}"
)
lines.append("")
return "\n".join(lines)
# ============================================================================
# CLI Commands
# ============================================================================
def cmd_review(args):
"""Interactive convention review command."""
dimensions = _build_dimensions()
session = ReviewSession(dimensions)
decisions = session.run_interactive(args.paths)
config_path = OutputGenerator.to_config(decisions, args.output)
print(f" Configuration saved to: {config_path}")
report = OutputGenerator.to_migration_report(decisions)
report_path = args.output.replace(".json", "_report.txt")
with open(report_path, "w") as f:
f.write(report)
print(f" Migration report saved to: {report_path}\n")
def cmd_status(args):
"""Show adoption status for approved conventions."""
try:
config = OutputGenerator.from_config(args.config)
except FileNotFoundError:
print(f"Error: Config file not found: {args.config}")
print('Run "review" command first to generate config.')
return 1
dimensions = _build_dimensions()
files = _collect_files(args.paths)
file_data = {}
for fp in files:
parsed = _parse_file(fp)
if parsed:
file_data[fp] = parsed
print(f"Convention Adoption Status ({len(files)} files scanned)\n")
for dim in dimensions:
if dim.code not in config["dimensions"]:
continue
dim.scan_with_data(file_data)
dec = config["dimensions"][dim.code]
if dec["choice"] == "a":
adopted = dim.count_a
remaining = dim.count_b
else:
adopted = dim.count_b
remaining = dim.count_a
total = adopted + remaining
pct = (adopted / total * 100) if total else 100
bar = "█" * int(pct / 5) + "░" * (20 - int(pct / 5))
fixable = " [auto-fixable]" if dec["auto_fixable"] else ""
print(f" {dim.name:25s} {bar} {pct:5.1f}% ({remaining} remaining){fixable}")
print()
def cmd_migrate(args):
"""Generate migration branches from approved config.
For each auto-fixable dimension in the config:
1. Create branch: style/fix-{RULE}-{dimension}
2. Run: synapse_stylecheck.py --fix --select {RULE} synapse/
3. Validate: ruff check + pycodestyle
4. Commit with descriptive message
5. Optionally push and create PR via gh
"""
import subprocess
import shutil
try:
config = OutputGenerator.from_config(args.config)
except FileNotFoundError:
print(f"Error: Config file not found: {args.config}")
return 1
stylecheck = os.path.join(os.path.dirname(__file__), "synapse_stylecheck.py")
if not os.path.exists(stylecheck):
print(f"Error: synapse_stylecheck.py not found at {stylecheck}")
return 1
python = sys.executable
dimensions = config.get("dimensions", {})
fixable = {k: v for k, v in dimensions.items() if v.get("auto_fixable")}
if args.dimension:
if args.dimension not in fixable:
print(f'Error: Dimension "{args.dimension}" not found or not auto-fixable.')
print(f"Available: {', '.join(fixable.keys())}")
return 1
fixable = {args.dimension: fixable[args.dimension]}
if not fixable:
print("No auto-fixable dimensions found in config.")
return 0
print(f"Migration plan: {len(fixable)} dimension(s)\n")
for dim_code, dec in fixable.items():
rule = dec["rule"]
canonical = dec["canonical"]
branch_name = f"style/fix-{rule}-{dim_code}".lower().replace(" ", "-")
print(f"{'=' * 60}")
print(f" {dim_code}: {canonical} ({rule})")
print(f" Branch: {branch_name}")
print(
f" Scope: {dec['violation_count']} changes in {dec['files_affected']} files"
)
print(f"{'=' * 60}")
if args.dry_run:
print(f" [dry-run] Would create branch: {branch_name}")
print(
f" [dry-run] Would run: {python} {stylecheck} --fix --select {rule} synapse/"
)
print(f" [dry-run] Would validate with ruff + pycodestyle")
print(
f" [dry-run] Would commit: Style: migrate {dim_code} to {canonical} ({rule})"
)
print()
continue
# Create branch
result = subprocess.run(
["git", "checkout", "-b", branch_name], capture_output=True, text=True
)
if result.returncode != 0:
# Branch may already exist
result = subprocess.run(
["git", "checkout", branch_name], capture_output=True, text=True
)
if result.returncode != 0:
print(f" Error creating/switching to branch: {result.stderr.strip()}")
continue
# Apply fixes
print(f" Applying fixes...")
result = subprocess.run(
[python, stylecheck, "--fix", "--select", rule, "synapse/"],
capture_output=True,
text=True,
)
print(f" {result.stdout.strip()}")
# Validate with ruff
if shutil.which("ruff"):
print(f" Validating with ruff...")
ruff_result = subprocess.run(
["ruff", "check", "synapse/", "--select", "E,W,F"],
capture_output=True,
text=True,
)
if ruff_result.returncode != 0:
print(f" Warning: ruff found issues:\n{ruff_result.stdout[:500]}")
# Stage and commit
commit_msg = f"Style: migrate {dim_code} to {canonical} ({rule})"
subprocess.run(["git", "add", "-A"], capture_output=True)
result = subprocess.run(
["git", "commit", "-m", commit_msg], capture_output=True, text=True
)
if result.returncode == 0:
print(f" Committed: {commit_msg}")
else:
print(f" No changes to commit (already clean?)")
# Push and create PR if gh is available
if shutil.which("gh") and not args.dry_run:
print(f" Pushing branch...")
subprocess.run(
["git", "push", "-u", "origin", branch_name],
capture_output=True,
text=True,
)
pr_title = f"Style: migrate to {canonical} ({rule})"
pr_body = (
f"## Summary\n\n"
f"- Migrate {dim_code} convention to: **{canonical}**\n"
f"- Rule: {rule}\n"
f"- Changes: ~{dec['violation_count']} across {dec['files_affected']} files\n"
f"- Applied automatically via `synapse_stylecheck.py --fix --select {rule}`\n\n"
f"## Verification\n\n"
f"- [ ] ruff check passes\n"
f"- [ ] pycodestyle passes\n"
f"- [ ] Tests pass\n"
)
result = subprocess.run(
["gh", "pr", "create", "--title", pr_title, "--body", pr_body],
capture_output=True,
text=True,
)
if result.returncode == 0:
print(f" PR created: {result.stdout.strip()}")
else:
print(f" PR creation skipped: {result.stderr.strip()[:200]}")
# Return to original branch
subprocess.run(["git", "checkout", "-"], capture_output=True, text=True)
print()
print("Migration complete.")
return 0
# RST generation helpers
_RST_HEADER = """\
.. _style-conventions:
Synapse Convention Reference
============================
.. note::
This file was auto-generated by ``synapse_convention_review.py guide``.
Conventions were approved via interactive review on {reviewed_at}.
"""
_RST_SECTION = """\
{name}
{underline}
**Canonical:** {canonical}
{description}
.. list-table::
:header-rows: 1
:widths: 10 45 45
* -
- DO
- DON'T
* - Example
- ``{do_example}``
- ``{dont_example}``
"""
# Map dimension codes to DO/DON'T example pairs
_CONVENTION_EXAMPLES = {
"string_quoting": {
"a": ("name = 'cortex'", 'name = "cortex"'),
"b": ('name = "cortex"', "name = 'cortex'"),
},
"docstring_quoting": {
"a": ("'''Return the node ndef.'''", '"""Return the node ndef."""'),
"b": ('"""Return the node ndef."""', "'''Return the node ndef.'''"),
},
"blank_lines": {
"a": (
"def foo():\\n pass\\n\\ndef bar():",
"def foo():\\n pass\\n\\n\\ndef bar():",
),
"b": (
"def foo():\\n pass\\n\\n\\ndef bar():",
"def foo():\\n pass\\n\\ndef bar():",
),
},
"regex_module": {
"a": ("import regex as re", "import re"),
"b": ("import re", "import regex as re"),
},
"logger_interpolation": {
"a": ("logger.info('count %d', n)", "logger.info('count {}'.format(n))"),
"b": ("logger.info('count {}'.format(n))", "logger.info('count %d', n)"),
},
"exception_args": {
"a": ("raise s_exc.BadArg(mesg='invalid')", "raise s_exc.BadArg('invalid')"),
"b": ("raise s_exc.BadArg('invalid')", "raise s_exc.BadArg(mesg='invalid')"),
},
"method_naming": {
"a": ("def getNodeInfo(self):", "def get_node_info(self):"),
"b": ("def get_node_info(self):", "def getNodeInfo(self):"),
},
"parameter_naming": {
"a": ("def setName(self, valu):", "def setName(self, value):"),
"b": ("def setName(self, value):", "def setName(self, valu):"),
},
"property_usage": {
"a": ("def getName(self):", "@property\\ndef name(self):"),
"b": ("@property\\ndef name(self):", "def getName(self):"),
},
"test_base_class": {
"a": ("class MyTest(s_t_utils.SynTest):", "class MyTest(unittest.TestCase):"),
"b": ("class MyTest(unittest.TestCase):", "class MyTest(s_t_utils.SynTest):"),
},
"test_method_style": {
"a": ("async def test_foo(self):", "def test_foo(self):"),
"b": ("def test_foo(self):", "async def test_foo(self):"),
},
}
# Human-readable descriptions for RST
_CONVENTION_DESCRIPTIONS = {
"string_quoting": "All string literals should use single quotes unless the string contains a single quote character.",
"docstring_quoting": "All docstrings and triple-quoted strings should use triple single quotes.",
"blank_lines": "Use a single blank line between function and class definitions, not the PEP 8 double blank line.",
"regex_module": "Use the ``regex`` module (imported as ``re``) instead of the stdlib ``re`` module for better Unicode support.",
"logger_interpolation": "Use ``%s``-style lazy interpolation in logger calls instead of ``.format()`` for performance.",
"exception_args": "Use keyword arguments (``mesg=``) when constructing ``SynErr`` exceptions.",
"method_naming": "Public methods use camelCase naming. Test methods and dunder methods are exceptions.",
"parameter_naming": "Use ``valu`` instead of ``value`` as a parameter name to avoid shadowing the builtin.",
"property_usage": "Avoid ``@property`` decorator as it is incompatible with Telepath RPC. Use explicit getter methods.",
"test_base_class": "Test classes should inherit from ``SynTest`` or ``StormPkgTest``.",
"test_method_style": "Test methods should be ``async def`` to work with the async test infrastructure.",
}
def cmd_guide(args):
"""Generate RST style guide from approved conventions."""
try:
config = OutputGenerator.from_config(args.config)
except FileNotFoundError:
print(f"Error: Config file not found: {args.config}")
return 1
dimensions = config.get("dimensions", {})
reviewed_at = config.get("reviewed_at", "unknown")
rst = _RST_HEADER.format(reviewed_at=reviewed_at[:10])
# Build dimension lookup for ordering
dim_order = [
"string_quoting",
"docstring_quoting",
"blank_lines",
"regex_module",
"logger_interpolation",
"exception_args",
"method_naming",
"parameter_naming",
"property_usage",
"test_base_class",
"test_method_style",
]
for dim_code in dim_order:
if dim_code not in dimensions:
continue
dec = dimensions[dim_code]
choice = dec["choice"]
canonical = dec["canonical"]
name = dim_code.replace("_", " ").title()
description = _CONVENTION_DESCRIPTIONS.get(dim_code, "")
examples = _CONVENTION_EXAMPLES.get(dim_code, {}).get(choice, ("", ""))
do_example, dont_example = examples
fixable_note = ""
if dec.get("auto_fixable"):
fixable_note = f"\n*Auto-fixable:* ``synapse_stylecheck.py --fix --select {dec['rule']}``\n"
rst += _RST_SECTION.format(
name=name,
underline="-" * len(name),
canonical=canonical,
description=description + fixable_note,
do_example=do_example,
dont_example=dont_example,
)
# Write output
output_path = args.output
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
with open(output_path, "w") as f:
f.write(rst)
print(f"Style guide written to: {output_path}")
print(f" {len(dimensions)} conventions documented")
return 0
# ============================================================================
# CLI Setup
# ============================================================================
def build_parser():
"""Build argument parser."""
parser = argparse.ArgumentParser(
prog="synapse_convention_review",
description="Interactive convention review and migration tool for Synapse",
)
sub = parser.add_subparsers(dest="command")
# review
p_review = sub.add_parser("review", help="Interactive convention review")
p_review.add_argument("paths", nargs="+", help="Files or directories to scan")
p_review.add_argument(
"--output", default=".synapse_conventions.json", help="Config output path"
)
# status
p_status = sub.add_parser("status", help="Show adoption status")
p_status.add_argument("paths", nargs="+", help="Files or directories to scan")
p_status.add_argument(
"--config", default=".synapse_conventions.json", help="Config file path"
)
# migrate
p_migrate = sub.add_parser("migrate", help="Generate PRs from approved config")
p_migrate.add_argument("--config", default=".synapse_conventions.json")
p_migrate.add_argument("--dimension", help="Only migrate this dimension")
p_migrate.add_argument(
"--dry-run", action="store_true", help="Preview without creating branches/PRs"
)
# guide
p_guide = sub.add_parser("guide", help="Generate RST style guide")
p_guide.add_argument("--config", default=".synapse_conventions.json")
p_guide.add_argument(
"--output", default="docs/synapse/devguides/style_guide_conventions.rst"
)
return parser
def main():
"""Main entry point."""
parser = build_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
return 0
if args.command == "review":
return cmd_review(args)
elif args.command == "status":
return cmd_status(args)
elif args.command == "migrate":
return cmd_migrate(args)
elif args.command == "guide":
return cmd_guide(args)
else:
parser.print_help()
return 1
if __name__ == "__main__":
sys.exit(main() or 0)
#!/usr/bin/env python
"""
Synapse Style Checker - AST-based linter for Synapse-specific conventions.
Checks for project-specific style rules that ruff and pycodestyle cannot enforce.
Uses Python's ast module for precise analysis with zero external dependencies.
@decision: Uses ast module instead of regex for rule checking. AST gives precise
node-level analysis (distinguishing imports from strings from comments) with
fewer false positives. tokenize is used alongside ast for string-literal quote
checking since ast normalizes quotes away. Single-file design matches the
existing scripts/ layout (pep8_staged_files.py).
Exit codes:
0 - No violations found
1 - Violations found
2 - Tool error (bad arguments, parse failures, etc.)
Usage:
python scripts/synapse_stylecheck.py synapse/
python scripts/synapse_stylecheck.py --select SYN00 synapse/lib/base.py
python scripts/synapse_stylecheck.py --ignore SYN020,SYN030 synapse/
python scripts/synapse_stylecheck.py --format json synapse/
python scripts/synapse_stylecheck.py --stats synapse/
"""
import io
import os
import re
import ast
import sys
import json
import difflib
import fnmatch
import argparse
import tokenize
import collections
# ---------------------------------------------------------------------------
# Violation
# ---------------------------------------------------------------------------
class Violation:
__slots__ = ("code", "filepath", "line", "col", "message")
def __init__(self, code, filepath, line, col, message):
self.code = code
self.filepath = filepath
self.line = line
self.col = col
self.message = message
def __repr__(self):
return f"{self.filepath}:{self.line}:{self.col}: {self.code} {self.message}"
def to_dict(self):
return {
"code": self.code,
"filepath": self.filepath,
"line": self.line,
"col": self.col,
"message": self.message,
}
# ---------------------------------------------------------------------------
# Base rule
# ---------------------------------------------------------------------------
class BaseRule:
"""Base class for all lint rules."""
code = None
message = None
fixable = False
def check(self, tree, filepath, source_lines):
"""Yield Violation instances."""
raise NotImplementedError
def fix(self, source, violation):
"""Return modified source string with this violation fixed, or None if can't fix."""
return None
def _viol(self, filepath, line, col, message=None):
return Violation(self.code, filepath, line, col, message or self.message)
# ---------------------------------------------------------------------------
# noqa support
# ---------------------------------------------------------------------------
def _noqa_lines(source_lines):
"""Return a dict mapping line number -> set of suppressed codes."""
noqa = {}
for i, line in enumerate(source_lines, 1):
m = re.search(r"#\s*noqa:\s*([A-Z0-9,\s]+)", line)
if m:
codes = {c.strip() for c in m.group(1).split(",")}
noqa[i] = codes
elif re.search(r"#\s*noqa\s*$", line):
noqa[i] = {"*"}
return noqa
# ---------------------------------------------------------------------------
# Import rules
# ---------------------------------------------------------------------------
class SYN001(BaseRule):
"""Synapse import missing s_ alias."""
code = "SYN001"
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name.startswith("synapse"):
if alias.asname is None:
yield self._viol(
filepath,
node.lineno,
node.col_offset,
f"Synapse import must use s_ alias: 'import {alias.name}'",
)
class SYN002(BaseRule):
"""Synapse import with wrong alias."""
code = "SYN002"
# Known aliases that don't follow the standard pattern
_KNOWN_ALIASES = {
"synapse.lib.platforms.common": {"s_pcommon"},
"synapse.cryotank": {"s_cryotank", "s_cryo"},
}
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if not alias.name.startswith("synapse"):
continue
if alias.asname is None:
continue # Handled by SYN001
# Skip vendor imports — they use their own conventions
if ".vendor." in alias.name:
continue
# The fundamental rule: synapse imports must use s_ prefix
# (or s_t_ for tests)
if not alias.asname.startswith("s_"):
yield self._viol(
filepath,
node.lineno,
node.col_offset,
f"Synapse import alias must start with 's_': "
f"got '{alias.asname}' for 'import {alias.name}'",
)
class SYN003(BaseRule):
"""Star import from synapse modules."""
code = "SYN003"
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
if node.module and node.module.startswith("synapse"):
for alias in node.names:
if alias.name == "*":
yield self._viol(
filepath,
node.lineno,
node.col_offset,
f"Star import from synapse module: 'from {node.module} import *'",
)
class SYN004(BaseRule):
"""re module imported instead of regex."""
code = "SYN004"
fixable = True
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name == "re":
yield self._viol(
filepath,
node.lineno,
node.col_offset,
"Use 'regex' module instead of 're' (unicode bugs in re)",
)
elif isinstance(node, ast.ImportFrom):
if node.module == "re":
yield self._viol(
filepath,
node.lineno,
node.col_offset,
"Use 'regex' module instead of 're' (unicode bugs in re)",
)
def fix(self, source, violation):
lines = source.splitlines(True)
line_idx = violation.line - 1
line = lines[line_idx]
if "import re\n" in line or line.rstrip() == "import re":
lines[line_idx] = line.replace("import re", "import regex as re", 1)
return "".join(lines)
if line.strip().startswith("from re import"):
# from re import X → from regex import X
lines[line_idx] = line.replace("from re ", "from regex ", 1)
return "".join(lines)
return None
# ---------------------------------------------------------------------------
# Naming rules
# ---------------------------------------------------------------------------
_CAMEL_RE = re.compile(r"^_?[a-z][a-zA-Z0-9]*$")
_SNAKE_RE = re.compile(r"^_?[a-z][a-z0-9_]*$")
_DUNDER_RE = re.compile(r"^__[a-z][a-z0-9_]*__$")
def _is_camel(name):
return bool(_CAMEL_RE.match(name))
def _is_snake(name):
return bool(_SNAKE_RE.match(name))
def _is_dunder(name):
return bool(_DUNDER_RE.match(name))
def _is_single_word(name):
stripped = name.lstrip("_")
return "_" not in stripped and stripped == stripped.lower()
class SYN010(BaseRule):
"""Public method uses snake_case instead of camelCase."""
code = "SYN010"
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
name = node.name
# Skip dunder methods
if _is_dunder(name):
continue
# Skip test methods
if name.startswith("test_"):
continue
# Skip setUp/tearDown family
if name in (
"setUp",
"tearDown",
"setUpClass",
"tearDownClass",
"setUpModule",
"tearDownModule",
):
continue
# Skip single-word names (ambiguous)
if _is_single_word(name):
continue
# Skip private names that are single word after stripping leading underscores
stripped = name.lstrip("_")
if _is_single_word(stripped):
continue
# Check if it looks like snake_case (has underscores in non-private part)
if "_" in stripped and not _is_camel(name):
# Suggest camelCase version
parts = stripped.split("_")
suggested = parts[0] + "".join(p.capitalize() for p in parts[1:])
prefix = "_" * (len(name) - len(stripped))
suggested = prefix + suggested
yield self._viol(
filepath,
node.lineno,
node.col_offset,
f"Public method should use camelCase: '{name}' -> '{suggested}'",
)
class SYN011(BaseRule):
"""Parameter named value instead of valu."""
code = "SYN011"
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
for arg in node.args.args:
if arg.arg == "value":
yield self._viol(
filepath,
node.lineno,
arg.col_offset,
f"Parameter should be 'valu' not 'value' in '{node.name}'",
)
class SYN012(BaseRule):
"""Logger not initialized as logger = logging.getLogger(__name__)."""
code = "SYN012"
def check(self, tree, filepath, source_lines):
has_logging_import = False
has_logger = False
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name == "logging":
has_logging_import = True
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "logger":
if (
isinstance(node.value, ast.Call)
and isinstance(node.value.func, ast.Attribute)
and isinstance(node.value.func.value, ast.Name)
and node.value.func.value.id == "logging"
and node.value.func.attr == "getLogger"
):
has_logger = True
if has_logging_import and not has_logger:
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name == "logging":
yield self._viol(
filepath,
node.lineno,
0,
"Module imports logging but missing "
"'logger = logging.getLogger(__name__)'",
)
return
# ---------------------------------------------------------------------------
# String rules
# ---------------------------------------------------------------------------
def _tokenize_source(source_lines):
"""Tokenize source lines and return list of tokens."""
source = "\n".join(source_lines)
return list(tokenize.generate_tokens(io.StringIO(source).readline))
class SYN020(BaseRule):
"""Double-quote string where single quote would work."""
code = "SYN020"
fixable = True
def check(self, tree, filepath, source_lines):
try:
tokens = _tokenize_source(source_lines)
except tokenize.TokenError:
return
for tok_type, tok_string, tok_start, tok_end, tok_line in tokens:
if tok_type != tokenize.STRING:
continue
# Strip string prefix (b, r, f, u combinations)
stripped = tok_string
while stripped and stripped[0] in "bBuUrRfF":
stripped = stripped[1:]
if not stripped:
continue
# Skip triple-quoted (handled by SYN021)
if stripped.startswith('"""') or stripped.startswith("'''"):
continue
if stripped.startswith('"'):
# Check if single-quoting would work (string contains single quotes)
inner = stripped[1:-1]
if "'" not in inner:
yield self._viol(
filepath,
tok_start[0],
tok_start[1],
"Use single quotes instead of double quotes",
)
def fix(self, source, violation):
lines = source.splitlines(True)
line_idx = violation.line - 1
line = lines[line_idx]
# Tokenize just this line to find the string token
try:
tokens = list(tokenize.generate_tokens(io.StringIO(line).readline))
except tokenize.TokenError:
return None
for tok_type, tok_string, tok_start, tok_end, tok_line in tokens:
if tok_type != tokenize.STRING:
continue
if tok_start[1] != violation.col:
continue
# Strip prefix
prefix = ""
rest = tok_string
while rest and rest[0] in "bBuUrRfF":
prefix += rest[0]
rest = rest[1:]
if not rest.startswith('"') or rest.startswith('"""'):
return None
inner = rest[1:-1]
if "'" in inner:
return None
new_tok = prefix + "'" + inner + "'"
new_line = line[: tok_start[1]] + new_tok + line[tok_end[1] :]
lines[line_idx] = new_line
return "".join(lines)
return None
class SYN021(BaseRule):
"""Triple double-quote docstring (should be triple single-quote)."""
code = "SYN021"
fixable = True
def check(self, tree, filepath, source_lines):
try:
tokens = _tokenize_source(source_lines)
except tokenize.TokenError:
return
for tok_type, tok_string, tok_start, tok_end, tok_line in tokens:
if tok_type != tokenize.STRING:
continue
stripped = tok_string
while stripped and stripped[0] in "bBuUrRfF":
stripped = stripped[1:]
if stripped.startswith('"""'):
yield self._viol(
filepath,
tok_start[0],
tok_start[1],
"Use triple single-quotes (''') instead of triple double-quotes (\"\"\")",
)
def fix(self, source, violation):
# For multiline triple-quoted strings, work on full source via tokenize
try:
tokens = list(tokenize.generate_tokens(io.StringIO(source).readline))
except tokenize.TokenError:
return None
for tok_type, tok_string, tok_start, tok_end, tok_line in tokens:
if tok_type != tokenize.STRING:
continue
if tok_start[0] != violation.line or tok_start[1] != violation.col:
continue
prefix = ""
rest = tok_string
while rest and rest[0] in "bBuUrRfF":
prefix += rest[0]
rest = rest[1:]
if not rest.startswith('"""'):
return None
inner = rest[3:-3]
new_tok = prefix + "'''" + inner + "'''"
# Reconstruct source by replacing token span
source_lines_raw = source.splitlines(True)
# Build position-based replacement
start_line = tok_start[0] - 1
start_col = tok_start[1]
end_line = tok_end[0] - 1
end_col = tok_end[1]
before = (
"".join(source_lines_raw[:start_line])
+ source_lines_raw[start_line][:start_col]
)
after = source_lines_raw[end_line][end_col:] + "".join(
source_lines_raw[end_line + 1 :]
)
return before + new_tok + after
return None
# ---------------------------------------------------------------------------
# Formatting rules
# ---------------------------------------------------------------------------
class SYN030(BaseRule):
"""Double blank line between definitions (should be single)."""
code = "SYN030"
fixable = True
def check(self, tree, filepath, source_lines):
# Collect line numbers of all def/class statements
def_lines = set()
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
def_lines.add(node.lineno)
# Look for double blank lines preceding definitions
for i in range(2, len(source_lines)):
line_num = i + 1 # 1-indexed
if line_num not in def_lines:
continue
# Check if the two preceding lines are both blank
if source_lines[i - 1].strip() == "" and source_lines[i - 2].strip() == "":
yield self._viol(
filepath,
line_num,
0,
"Use single blank line between definitions, not double",
)
def fix(self, source, violation):
lines = source.splitlines(True)
# violation.line is the def line. The two preceding lines are blank.
# Remove one of them (the one at violation.line - 2, i.e. index violation.line - 3)
blank_idx = violation.line - 3 # 0-indexed, the first of the two blanks
if blank_idx >= 0 and blank_idx < len(lines) and lines[blank_idx].strip() == "":
del lines[blank_idx]
return "".join(lines)
return None
# ---------------------------------------------------------------------------
# Pattern rules
# ---------------------------------------------------------------------------
class SYN040(BaseRule):
"""@property decorator used (incompatible with Telepath)."""
code = "SYN040"
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
for deco in node.decorator_list:
if isinstance(deco, ast.Name) and deco.id == "property":
yield self._viol(
filepath,
deco.lineno,
deco.col_offset,
f"@property decorator is incompatible with Telepath: '{node.name}'",
)
class SYN041(BaseRule):
"""Logger .format() call (should use %s interpolation)."""
code = "SYN041"
fixable = True
_LOG_METHODS = {"debug", "info", "warning", "error", "critical", "exception"}
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
func = node.func
if not isinstance(func, ast.Attribute):
continue
if func.attr not in self._LOG_METHODS:
continue
# Check if any argument is a .format() call
for arg in node.args:
if (
isinstance(arg, ast.Call)
and isinstance(arg.func, ast.Attribute)
and arg.func.attr == "format"
):
yield self._viol(
filepath,
node.lineno,
node.col_offset,
f"Logger should use %s interpolation, not .format(): "
f"'{func.attr}()'",
)
break
def fix(self, source, violation):
# Parse and find the Call node at the violation line
try:
tree = ast.parse(source)
except SyntaxError:
return None
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
if not isinstance(node.func, ast.Attribute):
continue
if node.func.attr not in self._LOG_METHODS:
continue
if node.lineno != violation.line:
continue
# Find the .format() arg
for i, arg in enumerate(node.args):
if (
isinstance(arg, ast.Call)
and isinstance(arg.func, ast.Attribute)
and arg.func.attr == "format"
):
# Get the format string (the value the .format is called on)
fmt_str_node = arg.func.value
if not isinstance(fmt_str_node, ast.Constant) or not isinstance(
fmt_str_node.value, str
):
return None
fmt_str = fmt_str_node.value
# Count {} placeholders and replace with %s
fmt_args = list(arg.args)
new_fmt = fmt_str.replace("{}", "%s")
# Rebuild: logger.info('msg %s', x)
# Replace the format call arg with the new format string + args
node.args[i] = ast.Constant(value=new_fmt)
node.args[i + 1 : i + 1] = fmt_args
# Use ast.unparse for the whole call, then splice back
new_call = ast.unparse(node)
lines = source.splitlines(True)
# Get the full line span of the original call
line_idx = violation.line - 1
# Simple single-line replacement
old_line = lines[line_idx]
indent = len(old_line) - len(old_line.lstrip())
lines[line_idx] = " " * indent + new_call + "\n"
return "".join(lines)
return None
# ---------------------------------------------------------------------------
# Exception rules
# ---------------------------------------------------------------------------
class SYN050(BaseRule):
"""raise s_exc.X(...) with positional args (should use keyword args with mesg=)."""
code = "SYN050"
fixable = True
def check(self, tree, filepath, source_lines):
for node in ast.walk(tree):
if not isinstance(node, ast.Raise):
continue
exc = node.exc
if exc is None:
continue
if not isinstance(exc, ast.Call):
continue
func = exc.func
is_syn_exc = False
if isinstance(func, ast.Attribute):
if isinstance(func.value, ast.Name):
if func.value.id in ("s_exc", "s_common"):
is_syn_exc = True
if not is_syn_exc:
continue
# Has positional arguments — should use keyword args
if exc.args:
yield self._viol(
filepath,
node.lineno,
node.col_offset,
"Exception should use keyword arguments: add mesg= parameter",
)
def fix(self, source, violation):
try:
tree = ast.parse(source)
except SyntaxError:
return None
for node in ast.walk(tree):
if not isinstance(node, ast.Raise):
continue
if node.lineno != violation.line:
continue
exc = node.exc
if not isinstance(exc, ast.Call):
return None
if not exc.args:
return None
# Convert first positional arg to mesg= keyword
first_arg = exc.args.pop(0)
exc.keywords.insert(0, ast.keyword(arg="mesg", value=first_arg))
new_stmt = ast.unparse(node)
lines = source.splitlines(True)
line_idx = violation.line - 1
old_line = lines[line_idx]
indent = len(old_line) - len(old_line.lstrip())
lines[line_idx] = " " * indent + new_stmt + "\n"
return "".join(lines)
return None
# ---------------------------------------------------------------------------
# Test rules
# ---------------------------------------------------------------------------
def _is_test_file(filepath):
"""Check if a file is in the synapse tests directory."""
norm = filepath.replace("\\", "/")
return "/synapse/tests/" in norm or norm.startswith("synapse/tests/")
def _attr_to_str(node):
"""Convert an ast.Attribute chain to dotted string."""
parts = []
while isinstance(node, ast.Attribute):
parts.append(node.attr)
node = node.value
if isinstance(node, ast.Name):
parts.append(node.id)
return ".".join(reversed(parts))
return None
class SYN060(BaseRule):
"""Test class doesn't inherit from SynTest."""
code = "SYN060"
_KNOWN_BASES = {
"SynTest",
"StormPkgTest",
"s_t_utils.SynTest",
"s_test.SynTest",
"s_t_utils.StormPkgTest",
"s_test.StormPkgTest",
}
def check(self, tree, filepath, source_lines):
if not _is_test_file(filepath):
return
for node in ast.walk(tree):
if not isinstance(node, ast.ClassDef):
continue
# Only check classes that have test_ methods
has_test_method = False
for item in node.body:
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
if item.name.startswith("test_"):
has_test_method = True
break
if not has_test_method:
continue
base_names = set()
for base in node.bases:
if isinstance(base, ast.Name):
base_names.add(base.id)
elif isinstance(base, ast.Attribute):
name = _attr_to_str(base)
if name:
base_names.add(name)
if not base_names.intersection(self._KNOWN_BASES):
yield self._viol(
filepath,
node.lineno,
node.col_offset,
f"Test class '{node.name}' should inherit from SynTest",
)
class SYN061(BaseRule):
"""Non-async test method."""
code = "SYN061"
def check(self, tree, filepath, source_lines):
if not _is_test_file(filepath):
return
for node in ast.walk(tree):
if not isinstance(node, ast.ClassDef):
continue
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name.startswith("test_"):
yield self._viol(
filepath,
item.lineno,
item.col_offset,
f"Test method '{item.name}' should be async",
)
# ---------------------------------------------------------------------------
# Rule registry
# ---------------------------------------------------------------------------
ALL_RULES = [
SYN001(),
SYN002(),
SYN003(),
SYN004(),
SYN010(),
SYN011(),
SYN012(),
SYN020(),
SYN021(),
SYN030(),
SYN040(),
SYN041(),
SYN050(),
SYN060(),
SYN061(),
]
# ---------------------------------------------------------------------------
# Style checker
# ---------------------------------------------------------------------------
class StyleChecker:
"""Runs all rules against source files and collects violations."""
EXCLUDE_PATTERNS = [
"synapse/vendor/*",
"*/synapse/vendor/*",
"synapse/lookup/*",
"*/synapse/lookup/*",
]
def __init__(self, rules=None, select=None, ignore=None):
rules = rules or ALL_RULES
if select:
rules = [r for r in rules if any(r.code.startswith(s) for s in select)]
if ignore:
rules = [r for r in rules if r.code not in ignore]
self.rules = rules
def _is_excluded(self, filepath):
norm = filepath.replace("\\", "/")
for pat in self.EXCLUDE_PATTERNS:
if fnmatch.fnmatch(norm, pat):
return True
return False
def check_file(self, filepath):
"""Check a single file and return list of Violations."""
if self._is_excluded(filepath):
return []
try:
with open(filepath, "r", encoding="utf-8") as f:
source = f.read()
except (OSError, UnicodeDecodeError):
return []
source_lines = source.splitlines()
try:
tree = ast.parse(source, filename=filepath)
except SyntaxError:
return []
noqa = _noqa_lines(source_lines)
violations = []
for rule in self.rules:
for v in rule.check(tree, filepath, source_lines):
line_noqa = noqa.get(v.line, set())
if "*" in line_noqa or v.code in line_noqa:
continue
violations.append(v)
violations.sort(key=lambda v: (v.line, v.col, v.code))
return violations
def check_paths(self, paths):
"""Check multiple paths (files or directories)."""
all_violations = []
files_checked = 0
for path in paths:
if os.path.isfile(path):
if path.endswith(".py"):
viols = self.check_file(path)
all_violations.extend(viols)
files_checked += 1
elif os.path.isdir(path):
for root, dirs, files in os.walk(path):
dirs[:] = [
d for d in dirs if d != "vendor" and not d.startswith(".")
]
for fname in sorted(files):
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
viols = self.check_file(fpath)
all_violations.extend(viols)
files_checked += 1
return all_violations, files_checked
def fix_file(self, filepath, dry_run=False, diff=False):
"""Fix all auto-fixable violations in a file.
Returns (fixes_applied: int, diff_text: str or None).
"""
if self._is_excluded(filepath):
return 0, None
try:
with open(filepath, "r", encoding="utf-8") as f:
original = f.read()
except (OSError, UnicodeDecodeError):
return 0, None
source = original
fixes = 0
# Keep fixing until no more fixable violations
for _ in range(100): # safety limit
source_lines = source.splitlines()
try:
tree = ast.parse(source, filename=filepath)
except SyntaxError:
break
noqa = _noqa_lines(source_lines)
fixable_viols = []
for rule in self.rules:
if not rule.fixable:
continue
for v in rule.check(tree, filepath, source_lines):
line_noqa = noqa.get(v.line, set())
if "*" in line_noqa or v.code in line_noqa:
continue
fixable_viols.append((rule, v))
if not fixable_viols:
break
# Apply first fix only (then re-scan to keep line numbers valid)
rule, viol = fixable_viols[0]
new_source = rule.fix(source, viol)
if new_source is None or new_source == source:
# Can't fix this one; skip it and try next
# (Would need a skip list in production, but for now break)
break
source = new_source
fixes += 1
if fixes == 0:
return 0, None
diff_text = None
if diff or dry_run:
diff_text = "".join(
difflib.unified_diff(
original.splitlines(True),
source.splitlines(True),
fromfile=filepath,
tofile=filepath,
)
)
if not dry_run:
with open(filepath, "w", encoding="utf-8") as f:
f.write(source)
return fixes, diff_text
def fix_paths(self, paths, dry_run=False, diff=False):
"""Fix all auto-fixable violations in files/directories.
Returns (total_fixes, files_fixed, diff_texts).
"""
total_fixes = 0
files_fixed = 0
diff_texts = []
for path in paths:
if os.path.isfile(path):
if path.endswith(".py"):
nfixes, dtxt = self.fix_file(path, dry_run=dry_run, diff=diff)
total_fixes += nfixes
if nfixes > 0:
files_fixed += 1
if dtxt:
diff_texts.append(dtxt)
elif os.path.isdir(path):
for root, dirs, files in os.walk(path):
dirs[:] = [
d for d in dirs if d != "vendor" and not d.startswith(".")
]
for fname in sorted(files):
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
nfixes, dtxt = self.fix_file(fpath, dry_run=dry_run, diff=diff)
total_fixes += nfixes
if nfixes > 0:
files_fixed += 1
if dtxt:
diff_texts.append(dtxt)
return total_fixes, files_fixed, diff_texts
# ---------------------------------------------------------------------------
# Output formatters
# ---------------------------------------------------------------------------
def format_text(violations, files_checked):
lines = []
for v in violations:
lines.append(str(v))
if violations:
file_count = len({v.filepath for v in violations})
lines.append("")
lines.append(
f"Found {len(violations)} violation(s) across "
f"{file_count} file(s) ({files_checked} files checked)"
)
else:
lines.append(f"All clean! ({files_checked} files checked)")
return "\n".join(lines)
def format_json(violations, files_checked):
data = {
"violations": [v.to_dict() for v in violations],
"summary": {
"total_violations": len(violations),
"files_with_violations": len({v.filepath for v in violations}),
"files_checked": files_checked,
},
}
return json.dumps(data, indent=2)
def format_stats(violations, files_checked):
lines = []
by_code = collections.Counter(v.code for v in violations)
by_file = collections.Counter(v.filepath for v in violations)
lines.append(f"Files checked: {files_checked}")
lines.append(f"Total violations: {len(violations)}")
lines.append("")
if by_code:
lines.append("By rule:")
for code, count in by_code.most_common():
lines.append(f" {code}: {count}")
lines.append("")
lines.append("Top files:")
for fpath, count in by_file.most_common(10):
lines.append(f" {fpath}: {count}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser():
parser = argparse.ArgumentParser(
prog="synapse_stylecheck",
description="Synapse-specific style checker (AST-based)",
)
parser.add_argument(
"paths",
nargs="+",
help="Files or directories to check",
)
parser.add_argument(
"--select",
help="Comma-separated rule prefixes to select (e.g. SYN00,SYN01)",
)
parser.add_argument(
"--ignore",
help="Comma-separated rule codes to ignore (e.g. SYN020,SYN030)",
)
parser.add_argument(
"--format",
choices=["text", "json", "stats"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--stats",
action="store_true",
help="Shortcut for --format stats",
)
parser.add_argument(
"--fix",
action="store_true",
help="Auto-fix fixable violations in-place",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be fixed without modifying files",
)
parser.add_argument(
"--diff",
action="store_true",
help="Show unified diff of fixes",
)
return parser
def main(argv=None):
parser = build_parser()
args = parser.parse_args(argv)
select = [s.strip() for s in args.select.split(",")] if args.select else None
ignore = {s.strip() for s in args.ignore.split(",")} if args.ignore else None
# Handle fix mode
if args.fix or args.dry_run:
dry_run = args.dry_run
show_diff = args.diff or args.dry_run # dry-run always shows diff
checker = StyleChecker(select=select, ignore=ignore)
total_fixes, files_fixed, diff_texts = checker.fix_paths(
args.paths, dry_run=dry_run, diff=show_diff
)
if diff_texts:
for dt in diff_texts:
print(dt)
if dry_run:
print(
f"\nDry run: {total_fixes} fix(es) across {files_fixed} file(s) would be applied"
)
else:
print(f"\nApplied {total_fixes} fix(es) across {files_fixed} file(s)")
return 0
# Normal check mode
fmt = "stats" if args.stats else args.format
checker = StyleChecker(select=select, ignore=ignore)
violations, files_checked = checker.check_paths(args.paths)
if fmt == "json":
output = format_json(violations, files_checked)
elif fmt == "stats":
output = format_stats(violations, files_checked)
else:
output = format_text(violations, files_checked)
print(output)
if violations:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment