Created
February 9, 2026 23:18
-
-
Save juanandresgs/5ed1aa00c0c6bbd28a73c81915891b88 to your computer and use it in GitHub Desktop.
Synapse Convention Review & Migration Tool — AST-based linter with --fix mode + interactive convention reviewer for 11 dimensions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Interactive convention review and migration tool for Synapse. | |
| Scans the codebase for 11 convention dimensions, presents statistics, | |
| collects user decisions, and generates migration artifacts. | |
| @decision DEC-CONV-001 | |
| @title Convention review tool architecture | |
| @status accepted | |
| @rationale Implements interactive review workflow with 11 convention dimensions. | |
| Uses AST + tokenize for detection, presents stats, collects user choices, | |
| generates config and migration reports. Detector functions are pure and | |
| operate on parsed file data for efficiency. Session manages interactive | |
| flow, OutputGenerator handles artifacts. CLI provides review, status, | |
| migrate, and guide subcommands. | |
| """ | |
| import os | |
| import sys | |
| import io | |
| import ast | |
| import re | |
| import json | |
| import argparse | |
| import tokenize | |
| import datetime | |
| import collections | |
| # ============================================================================ | |
| # Detector Functions | |
| # ============================================================================ | |
| def detect_single_quotes(filepath, source, source_lines, tree, tokens): | |
| """Detect single-quoted strings (excluding triple-quoted).""" | |
| results = [] | |
| for tok in tokens: | |
| if tok.type == tokenize.STRING: | |
| # Strip prefix (r, b, f, u, etc.) | |
| text = tok.string | |
| stripped = text.lstrip("rfbRFBuU") | |
| if stripped.startswith("'") and not stripped.startswith("'''"): | |
| lineno = tok.start[0] | |
| results.append( | |
| ( | |
| lineno, | |
| source_lines[lineno - 1] | |
| if lineno <= len(source_lines) | |
| else text, | |
| ) | |
| ) | |
| return results | |
| def detect_double_quotes(filepath, source, source_lines, tree, tokens): | |
| """Detect double-quoted strings (excluding triple-quoted).""" | |
| results = [] | |
| for tok in tokens: | |
| if tok.type == tokenize.STRING: | |
| text = tok.string | |
| stripped = text.lstrip("rfbRFBuU") | |
| if stripped.startswith('"') and not stripped.startswith('"""'): | |
| lineno = tok.start[0] | |
| results.append( | |
| ( | |
| lineno, | |
| source_lines[lineno - 1] | |
| if lineno <= len(source_lines) | |
| else text, | |
| ) | |
| ) | |
| return results | |
| def detect_triple_single_quotes(filepath, source, source_lines, tree, tokens): | |
| """Detect triple single-quoted strings.""" | |
| results = [] | |
| for tok in tokens: | |
| if tok.type == tokenize.STRING: | |
| text = tok.string | |
| stripped = text.lstrip("rfbRFBuU") | |
| if stripped.startswith("'''"): | |
| lineno = tok.start[0] | |
| results.append( | |
| ( | |
| lineno, | |
| source_lines[lineno - 1] | |
| if lineno <= len(source_lines) | |
| else text, | |
| ) | |
| ) | |
| return results | |
| def detect_triple_double_quotes(filepath, source, source_lines, tree, tokens): | |
| """Detect triple double-quoted strings.""" | |
| results = [] | |
| for tok in tokens: | |
| if tok.type == tokenize.STRING: | |
| text = tok.string | |
| stripped = text.lstrip("rfbRFBuU") | |
| if stripped.startswith('"""'): | |
| lineno = tok.start[0] | |
| results.append( | |
| ( | |
| lineno, | |
| source_lines[lineno - 1] | |
| if lineno <= len(source_lines) | |
| else text, | |
| ) | |
| ) | |
| return results | |
| def detect_single_blank_lines(filepath, source, source_lines, tree, tokens): | |
| """Detect single blank line before def/class.""" | |
| results = [] | |
| for i, line in enumerate(source_lines): | |
| stripped = line.lstrip() | |
| if ( | |
| stripped.startswith("def ") | |
| or stripped.startswith("class ") | |
| or stripped.startswith("async def ") | |
| ): | |
| # Check if exactly 1 blank line precedes | |
| if i > 0 and source_lines[i - 1].strip() == "": | |
| if i == 1 or source_lines[i - 2].strip() != "": | |
| results.append((i + 1, line)) | |
| return results | |
| def detect_double_blank_lines(filepath, source, source_lines, tree, tokens): | |
| """Detect double blank lines before def/class.""" | |
| results = [] | |
| for i, line in enumerate(source_lines): | |
| stripped = line.lstrip() | |
| if ( | |
| stripped.startswith("def ") | |
| or stripped.startswith("class ") | |
| or stripped.startswith("async def ") | |
| ): | |
| # Check if 2+ blank lines precede | |
| if ( | |
| i >= 2 | |
| and source_lines[i - 1].strip() == "" | |
| and source_lines[i - 2].strip() == "" | |
| ): | |
| results.append((i + 1, line)) | |
| return results | |
| def detect_regex_import(filepath, source, source_lines, tree, tokens): | |
| """Detect 'import regex' or 'from regex import'.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name == "regex": | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else "import regex", | |
| ) | |
| ) | |
| elif isinstance(node, ast.ImportFrom): | |
| if node.module == "regex": | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else "from regex import", | |
| ) | |
| ) | |
| return results | |
| def detect_re_import(filepath, source, source_lines, tree, tokens): | |
| """Detect 'import re' or 'from re import'.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name == "re": | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else "import re", | |
| ) | |
| ) | |
| elif isinstance(node, ast.ImportFrom): | |
| if node.module == "re": | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else "from re import", | |
| ) | |
| ) | |
| return results | |
| def detect_logger_percent(filepath, source, source_lines, tree, tokens): | |
| """Detect logger calls with %s lazy interpolation.""" | |
| results = [] | |
| logger_methods = { | |
| "debug", | |
| "info", | |
| "warning", | |
| "error", | |
| "critical", | |
| "log", | |
| "exception", | |
| } | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Call): | |
| # Check if it's logger.method() or self.logger.method() | |
| if ( | |
| isinstance(node.func, ast.Attribute) | |
| and node.func.attr in logger_methods | |
| ): | |
| if ( | |
| node.args | |
| and isinstance(node.args[0], ast.Constant) | |
| and isinstance(node.args[0].value, str) | |
| ): | |
| if ( | |
| "%s" in node.args[0].value | |
| or "%d" in node.args[0].value | |
| or "%r" in node.args[0].value | |
| ): | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else str(node.args[0].value), | |
| ) | |
| ) | |
| return results | |
| def detect_logger_format(filepath, source, source_lines, tree, tokens): | |
| """Detect logger calls with .format() eager interpolation.""" | |
| results = [] | |
| logger_methods = { | |
| "debug", | |
| "info", | |
| "warning", | |
| "error", | |
| "critical", | |
| "log", | |
| "exception", | |
| } | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Call): | |
| if ( | |
| isinstance(node.func, ast.Attribute) | |
| and node.func.attr in logger_methods | |
| ): | |
| if node.args and isinstance(node.args[0], ast.Call): | |
| if ( | |
| isinstance(node.args[0].func, ast.Attribute) | |
| and node.args[0].func.attr == "format" | |
| ): | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else "format()", | |
| ) | |
| ) | |
| return results | |
| def detect_exception_keyword_args(filepath, source, source_lines, tree, tokens): | |
| """Detect raise s_exc.X(mesg=...) keyword style.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Raise): | |
| if isinstance(node.exc, ast.Call): | |
| # Check if any keyword args present | |
| if node.exc.keywords: | |
| for kw in node.exc.keywords: | |
| if kw.arg == "mesg": | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else "mesg=", | |
| ) | |
| ) | |
| break | |
| return results | |
| def detect_exception_positional_args(filepath, source, source_lines, tree, tokens): | |
| """Detect raise s_exc.X('msg') positional style.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Raise): | |
| if isinstance(node.exc, ast.Call): | |
| # Check if positional args present and no mesg= keyword | |
| if node.exc.args and not any( | |
| kw.arg == "mesg" for kw in node.exc.keywords | |
| ): | |
| results.append( | |
| ( | |
| node.lineno, | |
| source_lines[node.lineno - 1] | |
| if node.lineno <= len(source_lines) | |
| else "positional", | |
| ) | |
| ) | |
| return results | |
| def detect_camelcase_methods(filepath, source, source_lines, tree, tokens): | |
| """Detect camelCase method names.""" | |
| results = [] | |
| pattern = re.compile(r"^_?[a-z][a-zA-Z0-9]*$") | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| name = node.name | |
| # Skip dunders, test methods, single-word lowercase | |
| if name.startswith("__") or name.startswith("test_"): | |
| continue | |
| # Must match pattern and contain mixed case | |
| if pattern.match(name) and any(c.isupper() for c in name): | |
| results.append((node.lineno, f"def {name}")) | |
| return results | |
| def detect_snakecase_methods(filepath, source, source_lines, tree, tokens): | |
| """Detect snake_case method names.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| name = node.name | |
| # Skip dunders | |
| if name.startswith("__"): | |
| continue | |
| # Must contain underscore in non-prefix part and be lowercase | |
| stripped = name.lstrip("_") | |
| if "_" in stripped and stripped.islower(): | |
| results.append((node.lineno, f"def {name}")) | |
| return results | |
| def detect_param_valu(filepath, source, source_lines, tree, tokens): | |
| """Detect parameters named 'valu'.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs: | |
| if arg.arg == "valu": | |
| results.append((node.lineno, f"def {node.name}(..., valu, ...)")) | |
| return results | |
| def detect_param_value(filepath, source, source_lines, tree, tokens): | |
| """Detect parameters named 'value'.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs: | |
| if arg.arg == "value": | |
| results.append((node.lineno, f"def {node.name}(..., value, ...)")) | |
| return results | |
| def detect_getter_methods(filepath, source, source_lines, tree, tokens): | |
| """Detect explicit getter-style methods (getX pattern) without @property.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| # Check if method name starts with 'get' and has no @property | |
| if node.name.startswith("get") and len(node.name) > 3: | |
| has_property = any( | |
| isinstance(dec, ast.Name) and dec.id == "property" | |
| for dec in node.decorator_list | |
| ) | |
| if not has_property: | |
| results.append((node.lineno, f"def {node.name}")) | |
| return results | |
| def detect_property_methods(filepath, source, source_lines, tree, tokens): | |
| """Detect methods with @property decorator.""" | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| for dec in node.decorator_list: | |
| if isinstance(dec, ast.Name) and dec.id == "property": | |
| results.append((node.lineno, f"@property def {node.name}")) | |
| break | |
| return results | |
| def _base_is_syntest(base): | |
| """Check if a base class node refers to SynTest or StormPkgTest.""" | |
| known = ("SynTest", "StormPkgTest") | |
| if isinstance(base, ast.Name) and base.id in known: | |
| return True | |
| if isinstance(base, ast.Attribute) and base.attr in known: | |
| return True | |
| return False | |
| def detect_syntest_base(filepath, source, source_lines, tree, tokens): | |
| """Detect test classes inheriting from SynTest/StormPkgTest.""" | |
| if "synapse/tests/" not in filepath.replace("\\", "/"): | |
| return [] | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.ClassDef): | |
| for base in node.bases: | |
| if _base_is_syntest(base): | |
| base_name = base.id if isinstance(base, ast.Name) else base.attr | |
| results.append((node.lineno, f"class {node.name}({base_name})")) | |
| break | |
| return results | |
| def detect_other_test_base(filepath, source, source_lines, tree, tokens): | |
| """Detect test classes NOT inheriting from SynTest.""" | |
| if "synapse/tests/" not in filepath.replace("\\", "/"): | |
| return [] | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.ClassDef): | |
| has_test_methods = any( | |
| isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef)) | |
| and n.name.startswith("test_") | |
| for n in node.body | |
| ) | |
| if has_test_methods: | |
| inherits_syntest = any(_base_is_syntest(base) for base in node.bases) | |
| if not inherits_syntest: | |
| results.append((node.lineno, f"class {node.name}")) | |
| return results | |
| def detect_async_test_methods(filepath, source, source_lines, tree, tokens): | |
| """Detect async def test_* in test classes.""" | |
| if "synapse/tests/" not in filepath.replace("\\", "/"): | |
| return [] | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.ClassDef): | |
| for item in node.body: | |
| if isinstance(item, ast.AsyncFunctionDef) and item.name.startswith( | |
| "test_" | |
| ): | |
| results.append((item.lineno, f"async def {item.name}")) | |
| return results | |
| def detect_sync_test_methods(filepath, source, source_lines, tree, tokens): | |
| """Detect non-async def test_* in test classes.""" | |
| if "synapse/tests/" not in filepath.replace("\\", "/"): | |
| return [] | |
| results = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.ClassDef): | |
| for item in node.body: | |
| if isinstance(item, ast.FunctionDef) and item.name.startswith("test_"): | |
| results.append((item.lineno, f"def {item.name}")) | |
| return results | |
| # ============================================================================ | |
| # ConventionDimension Class | |
| # ============================================================================ | |
| class ConventionDimension: | |
| """Defines one convention with two sides (A and B).""" | |
| def __init__( | |
| self, | |
| name, | |
| code, | |
| description, | |
| option_a, | |
| option_b, | |
| auto_fixable=False, | |
| fix_rule=None, | |
| ): | |
| self.name = name | |
| self.code = code | |
| self.description = description | |
| self.option_a = option_a # {label, detector} | |
| self.option_b = option_b # {label, detector} | |
| self.auto_fixable = auto_fixable | |
| self.fix_rule = fix_rule | |
| # Populated by scan | |
| self.count_a = 0 | |
| self.count_b = 0 | |
| self.files_a = set() | |
| self.files_b = set() | |
| self.samples_a = [] | |
| self.samples_b = [] | |
| def scan_with_data(self, file_data): | |
| """Run both detectors against all files, populate counts and samples.""" | |
| for filepath, (source, source_lines, tree, tokens) in file_data.items(): | |
| # Run detector A | |
| results_a = self.option_a["detector"]( | |
| filepath, source, source_lines, tree, tokens | |
| ) | |
| if results_a: | |
| self.count_a += len(results_a) | |
| self.files_a.add(filepath) | |
| for lineno, text in results_a[:3]: | |
| if len(self.samples_a) < 3: | |
| self.samples_a.append((filepath, lineno, text)) | |
| # Run detector B | |
| results_b = self.option_b["detector"]( | |
| filepath, source, source_lines, tree, tokens | |
| ) | |
| if results_b: | |
| self.count_b += len(results_b) | |
| self.files_b.add(filepath) | |
| for lineno, text in results_b[:3]: | |
| if len(self.samples_b) < 3: | |
| self.samples_b.append((filepath, lineno, text)) | |
| def total(self): | |
| return self.count_a + self.count_b | |
| def pct_a(self): | |
| total = self.total() | |
| return (self.count_a / total * 100) if total else 0 | |
| def pct_b(self): | |
| total = self.total() | |
| return (self.count_b / total * 100) if total else 0 | |
| # ============================================================================ | |
| # Scan Infrastructure | |
| # ============================================================================ | |
| def _collect_files(paths): | |
| """Walk paths and return list of .py files, excluding vendor/.""" | |
| files = [] | |
| for path in paths: | |
| if os.path.isfile(path) and path.endswith(".py"): | |
| files.append(path) | |
| elif os.path.isdir(path): | |
| for root, dirs, filenames in os.walk(path): | |
| dirs[:] = [d for d in dirs if d != "vendor" and not d.startswith(".")] | |
| for f in sorted(filenames): | |
| if f.endswith(".py"): | |
| files.append(os.path.join(root, f)) | |
| return files | |
| def _parse_file(filepath): | |
| """Return (source, source_lines, tree, tokens) or None on error.""" | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| source = f.read() | |
| except (OSError, UnicodeDecodeError): | |
| return None | |
| source_lines = source.splitlines() | |
| try: | |
| tree = ast.parse(source, filename=filepath) | |
| except SyntaxError: | |
| return None | |
| try: | |
| tokens = list(tokenize.generate_tokens(io.StringIO(source).readline)) | |
| except tokenize.TokenError: | |
| tokens = [] | |
| return source, source_lines, tree, tokens | |
| # ============================================================================ | |
| # Build Dimensions | |
| # ============================================================================ | |
| def _build_dimensions(): | |
| """Return list of all 11 ConventionDimension instances.""" | |
| return [ | |
| ConventionDimension( | |
| name="String Quoting", | |
| code="string_quoting", | |
| description="Single vs double quotes for strings", | |
| option_a={ | |
| "label": "Single quotes ('...)", | |
| "detector": detect_single_quotes, | |
| }, | |
| option_b={ | |
| "label": 'Double quotes ("...)', | |
| "detector": detect_double_quotes, | |
| }, | |
| auto_fixable=True, | |
| fix_rule="SYN020", | |
| ), | |
| ConventionDimension( | |
| name="Docstring Quoting", | |
| code="docstring_quoting", | |
| description="Triple single vs triple double quotes for docstrings", | |
| option_a={ | |
| "label": "Triple single (''')", | |
| "detector": detect_triple_single_quotes, | |
| }, | |
| option_b={ | |
| "label": 'Triple double (""")', | |
| "detector": detect_triple_double_quotes, | |
| }, | |
| auto_fixable=True, | |
| fix_rule="SYN021", | |
| ), | |
| ConventionDimension( | |
| name="Blank Lines Between Defs", | |
| code="blank_lines", | |
| description="Single vs double blank lines before function/class definitions", | |
| option_a={ | |
| "label": "Single blank line", | |
| "detector": detect_single_blank_lines, | |
| }, | |
| option_b={ | |
| "label": "Double blank lines", | |
| "detector": detect_double_blank_lines, | |
| }, | |
| auto_fixable=True, | |
| fix_rule="SYN030", | |
| ), | |
| ConventionDimension( | |
| name="Regex Module", | |
| code="regex_module", | |
| description="Use of regex vs re module", | |
| option_a={"label": "import regex", "detector": detect_regex_import}, | |
| option_b={"label": "import re", "detector": detect_re_import}, | |
| auto_fixable=True, | |
| fix_rule="SYN004", | |
| ), | |
| ConventionDimension( | |
| name="Logger Interpolation", | |
| code="logger_interpolation", | |
| description="Lazy %s vs eager .format() in logger calls", | |
| option_a={ | |
| "label": "%s lazy interpolation", | |
| "detector": detect_logger_percent, | |
| }, | |
| option_b={"label": ".format() eager", "detector": detect_logger_format}, | |
| auto_fixable=True, | |
| fix_rule="SYN041", | |
| ), | |
| ConventionDimension( | |
| name="Exception Args", | |
| code="exception_args", | |
| description="Keyword vs positional args for exception construction", | |
| option_a={ | |
| "label": "Keyword (mesg=)", | |
| "detector": detect_exception_keyword_args, | |
| }, | |
| option_b={ | |
| "label": "Positional", | |
| "detector": detect_exception_positional_args, | |
| }, | |
| auto_fixable=True, | |
| fix_rule="SYN050", | |
| ), | |
| ConventionDimension( | |
| name="Method Naming", | |
| code="method_naming", | |
| description="camelCase vs snake_case for method names", | |
| option_a={"label": "camelCase", "detector": detect_camelcase_methods}, | |
| option_b={"label": "snake_case", "detector": detect_snakecase_methods}, | |
| auto_fixable=False, | |
| ), | |
| ConventionDimension( | |
| name="Parameter Naming", | |
| code="parameter_naming", | |
| description="valu vs value for parameter names", | |
| option_a={"label": "valu", "detector": detect_param_valu}, | |
| option_b={"label": "value", "detector": detect_param_value}, | |
| auto_fixable=False, | |
| ), | |
| ConventionDimension( | |
| name="Property Usage", | |
| code="property_usage", | |
| description="Explicit getters vs @property decorators", | |
| option_a={ | |
| "label": "Explicit getters (getX)", | |
| "detector": detect_getter_methods, | |
| }, | |
| option_b={"label": "@property", "detector": detect_property_methods}, | |
| auto_fixable=False, | |
| ), | |
| ConventionDimension( | |
| name="Test Base Class", | |
| code="test_base_class", | |
| description="SynTest vs other test base classes", | |
| option_a={"label": "SynTest/StormPkgTest", "detector": detect_syntest_base}, | |
| option_b={ | |
| "label": "Other base classes", | |
| "detector": detect_other_test_base, | |
| }, | |
| auto_fixable=False, | |
| ), | |
| ConventionDimension( | |
| name="Test Method Style", | |
| code="test_method_style", | |
| description="async vs sync test methods", | |
| option_a={ | |
| "label": "async def test_*", | |
| "detector": detect_async_test_methods, | |
| }, | |
| option_b={ | |
| "label": "def test_* (sync)", | |
| "detector": detect_sync_test_methods, | |
| }, | |
| auto_fixable=False, | |
| ), | |
| ] | |
| # ============================================================================ | |
| # ReviewSession Class | |
| # ============================================================================ | |
| class ReviewSession: | |
| """Runs all dimension scans, presents interactive UI, collects decisions.""" | |
| def __init__(self, dimensions): | |
| self.dimensions = dimensions | |
| self.decisions = {} | |
| def scan(self, paths): | |
| """Scan all files and populate all dimensions.""" | |
| files = _collect_files(paths) | |
| print(f"Scanning {len(files)} files...\n") | |
| file_data = {} | |
| for fp in files: | |
| parsed = _parse_file(fp) | |
| if parsed: | |
| file_data[fp] = parsed | |
| for dim in self.dimensions: | |
| dim.scan_with_data(file_data) | |
| def present(self, dim, index, total): | |
| """Print stats for one dimension.""" | |
| print("=" * 60) | |
| print(f" Convention {index} of {total}: {dim.name}") | |
| print("=" * 60) | |
| print() | |
| print(f" [A] {dim.option_a['label']}") | |
| print( | |
| f" {dim.count_a:,} instances across {len(dim.files_a)} files ({dim.pct_a():.1f}%)" | |
| ) | |
| print() | |
| print(f" [B] {dim.option_b['label']}") | |
| print( | |
| f" {dim.count_b:,} instances across {len(dim.files_b)} files ({dim.pct_b():.1f}%)" | |
| ) | |
| print() | |
| if dim.samples_a or dim.samples_b: | |
| print(" Samples:") | |
| for fp, ln, txt in dim.samples_a[:2]: | |
| print(f" A: {fp}:{ln} {txt.strip()[:60]}") | |
| for fp, ln, txt in dim.samples_b[:2]: | |
| print(f" B: {fp}:{ln} {txt.strip()[:60]}") | |
| print() | |
| if dim.auto_fixable: | |
| minority = dim.count_b if dim.count_a >= dim.count_b else dim.count_a | |
| minority_files = ( | |
| len(dim.files_b) if dim.count_a >= dim.count_b else len(dim.files_a) | |
| ) | |
| print(f" Auto-fixable: YES ({dim.fix_rule} --fix)") | |
| print(f" Migration scope: {minority_files} files, ~{minority} changes") | |
| else: | |
| print(f" Auto-fixable: NO (requires manual migration)") | |
| print() | |
| def prompt_choice(self, dim): | |
| """Ask user for A/B/S choice. Returns 'a', 'b', or 's'.""" | |
| while True: | |
| choice = ( | |
| input( | |
| f" Choice: [A] {dim.option_a['label']} [B] {dim.option_b['label']} [S] Skip\n > " | |
| ) | |
| .strip() | |
| .lower() | |
| ) | |
| if choice in ("a", "b", "s"): | |
| return choice | |
| print(" Please enter A, B, or S") | |
| def run_interactive(self, paths): | |
| """Main interactive loop.""" | |
| self.scan(paths) | |
| total = len(self.dimensions) | |
| for i, dim in enumerate(self.dimensions, 1): | |
| self.present(dim, i, total) | |
| choice = self.prompt_choice(dim) | |
| if choice == "s": | |
| print(f" -> Skipped\n") | |
| continue | |
| canonical = ( | |
| dim.option_a["label"] if choice == "a" else dim.option_b["label"] | |
| ) | |
| self.decisions[dim.code] = { | |
| "choice": choice, | |
| "canonical": canonical, | |
| "rule": dim.fix_rule or dim.code.upper(), | |
| "auto_fixable": dim.auto_fixable, | |
| "violation_count": dim.count_b if choice == "a" else dim.count_a, | |
| "files_affected": len(dim.files_b if choice == "a" else dim.files_a), | |
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), | |
| } | |
| print(f" ✓ Canonical: {canonical}") | |
| vcount = self.decisions[dim.code]["violation_count"] | |
| fcount = self.decisions[dim.code]["files_affected"] | |
| print(f" {vcount} instances to migrate across {fcount} files\n") | |
| self._print_summary() | |
| return self.decisions | |
| def _print_summary(self): | |
| """Print final summary.""" | |
| print("=" * 60) | |
| print(" Review Complete") | |
| print("=" * 60) | |
| print(f"\n Decisions recorded: {len(self.decisions)}/{len(self.dimensions)}") | |
| fixable = {k: v for k, v in self.decisions.items() if v["auto_fixable"]} | |
| if fixable: | |
| print("\n Auto-fixable migrations available:") | |
| for code, dec in fixable.items(): | |
| print( | |
| f" {dec['rule']} {code}: {dec['violation_count']} changes across {dec['files_affected']} files" | |
| ) | |
| print() | |
| # ============================================================================ | |
| # OutputGenerator Class | |
| # ============================================================================ | |
| class OutputGenerator: | |
| """Writes .synapse_conventions.json and migration reports.""" | |
| @staticmethod | |
| def to_config(decisions, output_path=".synapse_conventions.json"): | |
| """Write decisions to config file.""" | |
| data = { | |
| "version": 1, | |
| "reviewed_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), | |
| "reviewed_by": "maintainer", | |
| "dimensions": decisions, | |
| } | |
| with open(output_path, "w") as f: | |
| json.dump(data, f, indent=2) | |
| f.write("\n") | |
| return output_path | |
| @staticmethod | |
| def from_config(config_path=".synapse_conventions.json"): | |
| """Load decisions from config file.""" | |
| with open(config_path, "r") as f: | |
| return json.load(f) | |
| @staticmethod | |
| def to_migration_report(decisions): | |
| """Generate migration report text.""" | |
| lines = ["Convention Review Migration Report", "=" * 40, ""] | |
| for code, dec in decisions.items(): | |
| status = "AUTO" if dec["auto_fixable"] else "MANUAL" | |
| lines.append(f"[{status}] {code}: {dec['canonical']}") | |
| lines.append( | |
| f" {dec['violation_count']} violations in {dec['files_affected']} files" | |
| ) | |
| if dec["auto_fixable"]: | |
| lines.append( | |
| f" Fix: synapse_stylecheck.py --fix --select {dec['rule']}" | |
| ) | |
| lines.append("") | |
| return "\n".join(lines) | |
| # ============================================================================ | |
| # CLI Commands | |
| # ============================================================================ | |
| def cmd_review(args): | |
| """Interactive convention review command.""" | |
| dimensions = _build_dimensions() | |
| session = ReviewSession(dimensions) | |
| decisions = session.run_interactive(args.paths) | |
| config_path = OutputGenerator.to_config(decisions, args.output) | |
| print(f" Configuration saved to: {config_path}") | |
| report = OutputGenerator.to_migration_report(decisions) | |
| report_path = args.output.replace(".json", "_report.txt") | |
| with open(report_path, "w") as f: | |
| f.write(report) | |
| print(f" Migration report saved to: {report_path}\n") | |
| def cmd_status(args): | |
| """Show adoption status for approved conventions.""" | |
| try: | |
| config = OutputGenerator.from_config(args.config) | |
| except FileNotFoundError: | |
| print(f"Error: Config file not found: {args.config}") | |
| print('Run "review" command first to generate config.') | |
| return 1 | |
| dimensions = _build_dimensions() | |
| files = _collect_files(args.paths) | |
| file_data = {} | |
| for fp in files: | |
| parsed = _parse_file(fp) | |
| if parsed: | |
| file_data[fp] = parsed | |
| print(f"Convention Adoption Status ({len(files)} files scanned)\n") | |
| for dim in dimensions: | |
| if dim.code not in config["dimensions"]: | |
| continue | |
| dim.scan_with_data(file_data) | |
| dec = config["dimensions"][dim.code] | |
| if dec["choice"] == "a": | |
| adopted = dim.count_a | |
| remaining = dim.count_b | |
| else: | |
| adopted = dim.count_b | |
| remaining = dim.count_a | |
| total = adopted + remaining | |
| pct = (adopted / total * 100) if total else 100 | |
| bar = "█" * int(pct / 5) + "░" * (20 - int(pct / 5)) | |
| fixable = " [auto-fixable]" if dec["auto_fixable"] else "" | |
| print(f" {dim.name:25s} {bar} {pct:5.1f}% ({remaining} remaining){fixable}") | |
| print() | |
| def cmd_migrate(args): | |
| """Generate migration branches from approved config. | |
| For each auto-fixable dimension in the config: | |
| 1. Create branch: style/fix-{RULE}-{dimension} | |
| 2. Run: synapse_stylecheck.py --fix --select {RULE} synapse/ | |
| 3. Validate: ruff check + pycodestyle | |
| 4. Commit with descriptive message | |
| 5. Optionally push and create PR via gh | |
| """ | |
| import subprocess | |
| import shutil | |
| try: | |
| config = OutputGenerator.from_config(args.config) | |
| except FileNotFoundError: | |
| print(f"Error: Config file not found: {args.config}") | |
| return 1 | |
| stylecheck = os.path.join(os.path.dirname(__file__), "synapse_stylecheck.py") | |
| if not os.path.exists(stylecheck): | |
| print(f"Error: synapse_stylecheck.py not found at {stylecheck}") | |
| return 1 | |
| python = sys.executable | |
| dimensions = config.get("dimensions", {}) | |
| fixable = {k: v for k, v in dimensions.items() if v.get("auto_fixable")} | |
| if args.dimension: | |
| if args.dimension not in fixable: | |
| print(f'Error: Dimension "{args.dimension}" not found or not auto-fixable.') | |
| print(f"Available: {', '.join(fixable.keys())}") | |
| return 1 | |
| fixable = {args.dimension: fixable[args.dimension]} | |
| if not fixable: | |
| print("No auto-fixable dimensions found in config.") | |
| return 0 | |
| print(f"Migration plan: {len(fixable)} dimension(s)\n") | |
| for dim_code, dec in fixable.items(): | |
| rule = dec["rule"] | |
| canonical = dec["canonical"] | |
| branch_name = f"style/fix-{rule}-{dim_code}".lower().replace(" ", "-") | |
| print(f"{'=' * 60}") | |
| print(f" {dim_code}: {canonical} ({rule})") | |
| print(f" Branch: {branch_name}") | |
| print( | |
| f" Scope: {dec['violation_count']} changes in {dec['files_affected']} files" | |
| ) | |
| print(f"{'=' * 60}") | |
| if args.dry_run: | |
| print(f" [dry-run] Would create branch: {branch_name}") | |
| print( | |
| f" [dry-run] Would run: {python} {stylecheck} --fix --select {rule} synapse/" | |
| ) | |
| print(f" [dry-run] Would validate with ruff + pycodestyle") | |
| print( | |
| f" [dry-run] Would commit: Style: migrate {dim_code} to {canonical} ({rule})" | |
| ) | |
| print() | |
| continue | |
| # Create branch | |
| result = subprocess.run( | |
| ["git", "checkout", "-b", branch_name], capture_output=True, text=True | |
| ) | |
| if result.returncode != 0: | |
| # Branch may already exist | |
| result = subprocess.run( | |
| ["git", "checkout", branch_name], capture_output=True, text=True | |
| ) | |
| if result.returncode != 0: | |
| print(f" Error creating/switching to branch: {result.stderr.strip()}") | |
| continue | |
| # Apply fixes | |
| print(f" Applying fixes...") | |
| result = subprocess.run( | |
| [python, stylecheck, "--fix", "--select", rule, "synapse/"], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| print(f" {result.stdout.strip()}") | |
| # Validate with ruff | |
| if shutil.which("ruff"): | |
| print(f" Validating with ruff...") | |
| ruff_result = subprocess.run( | |
| ["ruff", "check", "synapse/", "--select", "E,W,F"], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if ruff_result.returncode != 0: | |
| print(f" Warning: ruff found issues:\n{ruff_result.stdout[:500]}") | |
| # Stage and commit | |
| commit_msg = f"Style: migrate {dim_code} to {canonical} ({rule})" | |
| subprocess.run(["git", "add", "-A"], capture_output=True) | |
| result = subprocess.run( | |
| ["git", "commit", "-m", commit_msg], capture_output=True, text=True | |
| ) | |
| if result.returncode == 0: | |
| print(f" Committed: {commit_msg}") | |
| else: | |
| print(f" No changes to commit (already clean?)") | |
| # Push and create PR if gh is available | |
| if shutil.which("gh") and not args.dry_run: | |
| print(f" Pushing branch...") | |
| subprocess.run( | |
| ["git", "push", "-u", "origin", branch_name], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| pr_title = f"Style: migrate to {canonical} ({rule})" | |
| pr_body = ( | |
| f"## Summary\n\n" | |
| f"- Migrate {dim_code} convention to: **{canonical}**\n" | |
| f"- Rule: {rule}\n" | |
| f"- Changes: ~{dec['violation_count']} across {dec['files_affected']} files\n" | |
| f"- Applied automatically via `synapse_stylecheck.py --fix --select {rule}`\n\n" | |
| f"## Verification\n\n" | |
| f"- [ ] ruff check passes\n" | |
| f"- [ ] pycodestyle passes\n" | |
| f"- [ ] Tests pass\n" | |
| ) | |
| result = subprocess.run( | |
| ["gh", "pr", "create", "--title", pr_title, "--body", pr_body], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode == 0: | |
| print(f" PR created: {result.stdout.strip()}") | |
| else: | |
| print(f" PR creation skipped: {result.stderr.strip()[:200]}") | |
| # Return to original branch | |
| subprocess.run(["git", "checkout", "-"], capture_output=True, text=True) | |
| print() | |
| print("Migration complete.") | |
| return 0 | |
| # RST generation helpers | |
| _RST_HEADER = """\ | |
| .. _style-conventions: | |
| Synapse Convention Reference | |
| ============================ | |
| .. note:: | |
| This file was auto-generated by ``synapse_convention_review.py guide``. | |
| Conventions were approved via interactive review on {reviewed_at}. | |
| """ | |
| _RST_SECTION = """\ | |
| {name} | |
| {underline} | |
| **Canonical:** {canonical} | |
| {description} | |
| .. list-table:: | |
| :header-rows: 1 | |
| :widths: 10 45 45 | |
| * - | |
| - DO | |
| - DON'T | |
| * - Example | |
| - ``{do_example}`` | |
| - ``{dont_example}`` | |
| """ | |
| # Map dimension codes to DO/DON'T example pairs | |
| _CONVENTION_EXAMPLES = { | |
| "string_quoting": { | |
| "a": ("name = 'cortex'", 'name = "cortex"'), | |
| "b": ('name = "cortex"', "name = 'cortex'"), | |
| }, | |
| "docstring_quoting": { | |
| "a": ("'''Return the node ndef.'''", '"""Return the node ndef."""'), | |
| "b": ('"""Return the node ndef."""', "'''Return the node ndef.'''"), | |
| }, | |
| "blank_lines": { | |
| "a": ( | |
| "def foo():\\n pass\\n\\ndef bar():", | |
| "def foo():\\n pass\\n\\n\\ndef bar():", | |
| ), | |
| "b": ( | |
| "def foo():\\n pass\\n\\n\\ndef bar():", | |
| "def foo():\\n pass\\n\\ndef bar():", | |
| ), | |
| }, | |
| "regex_module": { | |
| "a": ("import regex as re", "import re"), | |
| "b": ("import re", "import regex as re"), | |
| }, | |
| "logger_interpolation": { | |
| "a": ("logger.info('count %d', n)", "logger.info('count {}'.format(n))"), | |
| "b": ("logger.info('count {}'.format(n))", "logger.info('count %d', n)"), | |
| }, | |
| "exception_args": { | |
| "a": ("raise s_exc.BadArg(mesg='invalid')", "raise s_exc.BadArg('invalid')"), | |
| "b": ("raise s_exc.BadArg('invalid')", "raise s_exc.BadArg(mesg='invalid')"), | |
| }, | |
| "method_naming": { | |
| "a": ("def getNodeInfo(self):", "def get_node_info(self):"), | |
| "b": ("def get_node_info(self):", "def getNodeInfo(self):"), | |
| }, | |
| "parameter_naming": { | |
| "a": ("def setName(self, valu):", "def setName(self, value):"), | |
| "b": ("def setName(self, value):", "def setName(self, valu):"), | |
| }, | |
| "property_usage": { | |
| "a": ("def getName(self):", "@property\\ndef name(self):"), | |
| "b": ("@property\\ndef name(self):", "def getName(self):"), | |
| }, | |
| "test_base_class": { | |
| "a": ("class MyTest(s_t_utils.SynTest):", "class MyTest(unittest.TestCase):"), | |
| "b": ("class MyTest(unittest.TestCase):", "class MyTest(s_t_utils.SynTest):"), | |
| }, | |
| "test_method_style": { | |
| "a": ("async def test_foo(self):", "def test_foo(self):"), | |
| "b": ("def test_foo(self):", "async def test_foo(self):"), | |
| }, | |
| } | |
| # Human-readable descriptions for RST | |
| _CONVENTION_DESCRIPTIONS = { | |
| "string_quoting": "All string literals should use single quotes unless the string contains a single quote character.", | |
| "docstring_quoting": "All docstrings and triple-quoted strings should use triple single quotes.", | |
| "blank_lines": "Use a single blank line between function and class definitions, not the PEP 8 double blank line.", | |
| "regex_module": "Use the ``regex`` module (imported as ``re``) instead of the stdlib ``re`` module for better Unicode support.", | |
| "logger_interpolation": "Use ``%s``-style lazy interpolation in logger calls instead of ``.format()`` for performance.", | |
| "exception_args": "Use keyword arguments (``mesg=``) when constructing ``SynErr`` exceptions.", | |
| "method_naming": "Public methods use camelCase naming. Test methods and dunder methods are exceptions.", | |
| "parameter_naming": "Use ``valu`` instead of ``value`` as a parameter name to avoid shadowing the builtin.", | |
| "property_usage": "Avoid ``@property`` decorator as it is incompatible with Telepath RPC. Use explicit getter methods.", | |
| "test_base_class": "Test classes should inherit from ``SynTest`` or ``StormPkgTest``.", | |
| "test_method_style": "Test methods should be ``async def`` to work with the async test infrastructure.", | |
| } | |
| def cmd_guide(args): | |
| """Generate RST style guide from approved conventions.""" | |
| try: | |
| config = OutputGenerator.from_config(args.config) | |
| except FileNotFoundError: | |
| print(f"Error: Config file not found: {args.config}") | |
| return 1 | |
| dimensions = config.get("dimensions", {}) | |
| reviewed_at = config.get("reviewed_at", "unknown") | |
| rst = _RST_HEADER.format(reviewed_at=reviewed_at[:10]) | |
| # Build dimension lookup for ordering | |
| dim_order = [ | |
| "string_quoting", | |
| "docstring_quoting", | |
| "blank_lines", | |
| "regex_module", | |
| "logger_interpolation", | |
| "exception_args", | |
| "method_naming", | |
| "parameter_naming", | |
| "property_usage", | |
| "test_base_class", | |
| "test_method_style", | |
| ] | |
| for dim_code in dim_order: | |
| if dim_code not in dimensions: | |
| continue | |
| dec = dimensions[dim_code] | |
| choice = dec["choice"] | |
| canonical = dec["canonical"] | |
| name = dim_code.replace("_", " ").title() | |
| description = _CONVENTION_DESCRIPTIONS.get(dim_code, "") | |
| examples = _CONVENTION_EXAMPLES.get(dim_code, {}).get(choice, ("", "")) | |
| do_example, dont_example = examples | |
| fixable_note = "" | |
| if dec.get("auto_fixable"): | |
| fixable_note = f"\n*Auto-fixable:* ``synapse_stylecheck.py --fix --select {dec['rule']}``\n" | |
| rst += _RST_SECTION.format( | |
| name=name, | |
| underline="-" * len(name), | |
| canonical=canonical, | |
| description=description + fixable_note, | |
| do_example=do_example, | |
| dont_example=dont_example, | |
| ) | |
| # Write output | |
| output_path = args.output | |
| os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) | |
| with open(output_path, "w") as f: | |
| f.write(rst) | |
| print(f"Style guide written to: {output_path}") | |
| print(f" {len(dimensions)} conventions documented") | |
| return 0 | |
| # ============================================================================ | |
| # CLI Setup | |
| # ============================================================================ | |
| def build_parser(): | |
| """Build argument parser.""" | |
| parser = argparse.ArgumentParser( | |
| prog="synapse_convention_review", | |
| description="Interactive convention review and migration tool for Synapse", | |
| ) | |
| sub = parser.add_subparsers(dest="command") | |
| # review | |
| p_review = sub.add_parser("review", help="Interactive convention review") | |
| p_review.add_argument("paths", nargs="+", help="Files or directories to scan") | |
| p_review.add_argument( | |
| "--output", default=".synapse_conventions.json", help="Config output path" | |
| ) | |
| # status | |
| p_status = sub.add_parser("status", help="Show adoption status") | |
| p_status.add_argument("paths", nargs="+", help="Files or directories to scan") | |
| p_status.add_argument( | |
| "--config", default=".synapse_conventions.json", help="Config file path" | |
| ) | |
| # migrate | |
| p_migrate = sub.add_parser("migrate", help="Generate PRs from approved config") | |
| p_migrate.add_argument("--config", default=".synapse_conventions.json") | |
| p_migrate.add_argument("--dimension", help="Only migrate this dimension") | |
| p_migrate.add_argument( | |
| "--dry-run", action="store_true", help="Preview without creating branches/PRs" | |
| ) | |
| # guide | |
| p_guide = sub.add_parser("guide", help="Generate RST style guide") | |
| p_guide.add_argument("--config", default=".synapse_conventions.json") | |
| p_guide.add_argument( | |
| "--output", default="docs/synapse/devguides/style_guide_conventions.rst" | |
| ) | |
| return parser | |
| def main(): | |
| """Main entry point.""" | |
| parser = build_parser() | |
| args = parser.parse_args() | |
| if not args.command: | |
| parser.print_help() | |
| return 0 | |
| if args.command == "review": | |
| return cmd_review(args) | |
| elif args.command == "status": | |
| return cmd_status(args) | |
| elif args.command == "migrate": | |
| return cmd_migrate(args) | |
| elif args.command == "guide": | |
| return cmd_guide(args) | |
| else: | |
| parser.print_help() | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main() or 0) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| Synapse Style Checker - AST-based linter for Synapse-specific conventions. | |
| Checks for project-specific style rules that ruff and pycodestyle cannot enforce. | |
| Uses Python's ast module for precise analysis with zero external dependencies. | |
| @decision: Uses ast module instead of regex for rule checking. AST gives precise | |
| node-level analysis (distinguishing imports from strings from comments) with | |
| fewer false positives. tokenize is used alongside ast for string-literal quote | |
| checking since ast normalizes quotes away. Single-file design matches the | |
| existing scripts/ layout (pep8_staged_files.py). | |
| Exit codes: | |
| 0 - No violations found | |
| 1 - Violations found | |
| 2 - Tool error (bad arguments, parse failures, etc.) | |
| Usage: | |
| python scripts/synapse_stylecheck.py synapse/ | |
| python scripts/synapse_stylecheck.py --select SYN00 synapse/lib/base.py | |
| python scripts/synapse_stylecheck.py --ignore SYN020,SYN030 synapse/ | |
| python scripts/synapse_stylecheck.py --format json synapse/ | |
| python scripts/synapse_stylecheck.py --stats synapse/ | |
| """ | |
| import io | |
| import os | |
| import re | |
| import ast | |
| import sys | |
| import json | |
| import difflib | |
| import fnmatch | |
| import argparse | |
| import tokenize | |
| import collections | |
| # --------------------------------------------------------------------------- | |
| # Violation | |
| # --------------------------------------------------------------------------- | |
| class Violation: | |
| __slots__ = ("code", "filepath", "line", "col", "message") | |
| def __init__(self, code, filepath, line, col, message): | |
| self.code = code | |
| self.filepath = filepath | |
| self.line = line | |
| self.col = col | |
| self.message = message | |
| def __repr__(self): | |
| return f"{self.filepath}:{self.line}:{self.col}: {self.code} {self.message}" | |
| def to_dict(self): | |
| return { | |
| "code": self.code, | |
| "filepath": self.filepath, | |
| "line": self.line, | |
| "col": self.col, | |
| "message": self.message, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Base rule | |
| # --------------------------------------------------------------------------- | |
| class BaseRule: | |
| """Base class for all lint rules.""" | |
| code = None | |
| message = None | |
| fixable = False | |
| def check(self, tree, filepath, source_lines): | |
| """Yield Violation instances.""" | |
| raise NotImplementedError | |
| def fix(self, source, violation): | |
| """Return modified source string with this violation fixed, or None if can't fix.""" | |
| return None | |
| def _viol(self, filepath, line, col, message=None): | |
| return Violation(self.code, filepath, line, col, message or self.message) | |
| # --------------------------------------------------------------------------- | |
| # noqa support | |
| # --------------------------------------------------------------------------- | |
| def _noqa_lines(source_lines): | |
| """Return a dict mapping line number -> set of suppressed codes.""" | |
| noqa = {} | |
| for i, line in enumerate(source_lines, 1): | |
| m = re.search(r"#\s*noqa:\s*([A-Z0-9,\s]+)", line) | |
| if m: | |
| codes = {c.strip() for c in m.group(1).split(",")} | |
| noqa[i] = codes | |
| elif re.search(r"#\s*noqa\s*$", line): | |
| noqa[i] = {"*"} | |
| return noqa | |
| # --------------------------------------------------------------------------- | |
| # Import rules | |
| # --------------------------------------------------------------------------- | |
| class SYN001(BaseRule): | |
| """Synapse import missing s_ alias.""" | |
| code = "SYN001" | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name.startswith("synapse"): | |
| if alias.asname is None: | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| f"Synapse import must use s_ alias: 'import {alias.name}'", | |
| ) | |
| class SYN002(BaseRule): | |
| """Synapse import with wrong alias.""" | |
| code = "SYN002" | |
| # Known aliases that don't follow the standard pattern | |
| _KNOWN_ALIASES = { | |
| "synapse.lib.platforms.common": {"s_pcommon"}, | |
| "synapse.cryotank": {"s_cryotank", "s_cryo"}, | |
| } | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if not alias.name.startswith("synapse"): | |
| continue | |
| if alias.asname is None: | |
| continue # Handled by SYN001 | |
| # Skip vendor imports — they use their own conventions | |
| if ".vendor." in alias.name: | |
| continue | |
| # The fundamental rule: synapse imports must use s_ prefix | |
| # (or s_t_ for tests) | |
| if not alias.asname.startswith("s_"): | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| f"Synapse import alias must start with 's_': " | |
| f"got '{alias.asname}' for 'import {alias.name}'", | |
| ) | |
| class SYN003(BaseRule): | |
| """Star import from synapse modules.""" | |
| code = "SYN003" | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.ImportFrom): | |
| if node.module and node.module.startswith("synapse"): | |
| for alias in node.names: | |
| if alias.name == "*": | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| f"Star import from synapse module: 'from {node.module} import *'", | |
| ) | |
| class SYN004(BaseRule): | |
| """re module imported instead of regex.""" | |
| code = "SYN004" | |
| fixable = True | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name == "re": | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| "Use 'regex' module instead of 're' (unicode bugs in re)", | |
| ) | |
| elif isinstance(node, ast.ImportFrom): | |
| if node.module == "re": | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| "Use 'regex' module instead of 're' (unicode bugs in re)", | |
| ) | |
| def fix(self, source, violation): | |
| lines = source.splitlines(True) | |
| line_idx = violation.line - 1 | |
| line = lines[line_idx] | |
| if "import re\n" in line or line.rstrip() == "import re": | |
| lines[line_idx] = line.replace("import re", "import regex as re", 1) | |
| return "".join(lines) | |
| if line.strip().startswith("from re import"): | |
| # from re import X → from regex import X | |
| lines[line_idx] = line.replace("from re ", "from regex ", 1) | |
| return "".join(lines) | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Naming rules | |
| # --------------------------------------------------------------------------- | |
| _CAMEL_RE = re.compile(r"^_?[a-z][a-zA-Z0-9]*$") | |
| _SNAKE_RE = re.compile(r"^_?[a-z][a-z0-9_]*$") | |
| _DUNDER_RE = re.compile(r"^__[a-z][a-z0-9_]*__$") | |
| def _is_camel(name): | |
| return bool(_CAMEL_RE.match(name)) | |
| def _is_snake(name): | |
| return bool(_SNAKE_RE.match(name)) | |
| def _is_dunder(name): | |
| return bool(_DUNDER_RE.match(name)) | |
| def _is_single_word(name): | |
| stripped = name.lstrip("_") | |
| return "_" not in stripped and stripped == stripped.lower() | |
| class SYN010(BaseRule): | |
| """Public method uses snake_case instead of camelCase.""" | |
| code = "SYN010" | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| continue | |
| name = node.name | |
| # Skip dunder methods | |
| if _is_dunder(name): | |
| continue | |
| # Skip test methods | |
| if name.startswith("test_"): | |
| continue | |
| # Skip setUp/tearDown family | |
| if name in ( | |
| "setUp", | |
| "tearDown", | |
| "setUpClass", | |
| "tearDownClass", | |
| "setUpModule", | |
| "tearDownModule", | |
| ): | |
| continue | |
| # Skip single-word names (ambiguous) | |
| if _is_single_word(name): | |
| continue | |
| # Skip private names that are single word after stripping leading underscores | |
| stripped = name.lstrip("_") | |
| if _is_single_word(stripped): | |
| continue | |
| # Check if it looks like snake_case (has underscores in non-private part) | |
| if "_" in stripped and not _is_camel(name): | |
| # Suggest camelCase version | |
| parts = stripped.split("_") | |
| suggested = parts[0] + "".join(p.capitalize() for p in parts[1:]) | |
| prefix = "_" * (len(name) - len(stripped)) | |
| suggested = prefix + suggested | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| f"Public method should use camelCase: '{name}' -> '{suggested}'", | |
| ) | |
| class SYN011(BaseRule): | |
| """Parameter named value instead of valu.""" | |
| code = "SYN011" | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| for arg in node.args.args: | |
| if arg.arg == "value": | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| arg.col_offset, | |
| f"Parameter should be 'valu' not 'value' in '{node.name}'", | |
| ) | |
| class SYN012(BaseRule): | |
| """Logger not initialized as logger = logging.getLogger(__name__).""" | |
| code = "SYN012" | |
| def check(self, tree, filepath, source_lines): | |
| has_logging_import = False | |
| has_logger = False | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name == "logging": | |
| has_logging_import = True | |
| if isinstance(node, ast.Assign): | |
| for target in node.targets: | |
| if isinstance(target, ast.Name) and target.id == "logger": | |
| if ( | |
| isinstance(node.value, ast.Call) | |
| and isinstance(node.value.func, ast.Attribute) | |
| and isinstance(node.value.func.value, ast.Name) | |
| and node.value.func.value.id == "logging" | |
| and node.value.func.attr == "getLogger" | |
| ): | |
| has_logger = True | |
| if has_logging_import and not has_logger: | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name == "logging": | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| 0, | |
| "Module imports logging but missing " | |
| "'logger = logging.getLogger(__name__)'", | |
| ) | |
| return | |
| # --------------------------------------------------------------------------- | |
| # String rules | |
| # --------------------------------------------------------------------------- | |
| def _tokenize_source(source_lines): | |
| """Tokenize source lines and return list of tokens.""" | |
| source = "\n".join(source_lines) | |
| return list(tokenize.generate_tokens(io.StringIO(source).readline)) | |
| class SYN020(BaseRule): | |
| """Double-quote string where single quote would work.""" | |
| code = "SYN020" | |
| fixable = True | |
| def check(self, tree, filepath, source_lines): | |
| try: | |
| tokens = _tokenize_source(source_lines) | |
| except tokenize.TokenError: | |
| return | |
| for tok_type, tok_string, tok_start, tok_end, tok_line in tokens: | |
| if tok_type != tokenize.STRING: | |
| continue | |
| # Strip string prefix (b, r, f, u combinations) | |
| stripped = tok_string | |
| while stripped and stripped[0] in "bBuUrRfF": | |
| stripped = stripped[1:] | |
| if not stripped: | |
| continue | |
| # Skip triple-quoted (handled by SYN021) | |
| if stripped.startswith('"""') or stripped.startswith("'''"): | |
| continue | |
| if stripped.startswith('"'): | |
| # Check if single-quoting would work (string contains single quotes) | |
| inner = stripped[1:-1] | |
| if "'" not in inner: | |
| yield self._viol( | |
| filepath, | |
| tok_start[0], | |
| tok_start[1], | |
| "Use single quotes instead of double quotes", | |
| ) | |
| def fix(self, source, violation): | |
| lines = source.splitlines(True) | |
| line_idx = violation.line - 1 | |
| line = lines[line_idx] | |
| # Tokenize just this line to find the string token | |
| try: | |
| tokens = list(tokenize.generate_tokens(io.StringIO(line).readline)) | |
| except tokenize.TokenError: | |
| return None | |
| for tok_type, tok_string, tok_start, tok_end, tok_line in tokens: | |
| if tok_type != tokenize.STRING: | |
| continue | |
| if tok_start[1] != violation.col: | |
| continue | |
| # Strip prefix | |
| prefix = "" | |
| rest = tok_string | |
| while rest and rest[0] in "bBuUrRfF": | |
| prefix += rest[0] | |
| rest = rest[1:] | |
| if not rest.startswith('"') or rest.startswith('"""'): | |
| return None | |
| inner = rest[1:-1] | |
| if "'" in inner: | |
| return None | |
| new_tok = prefix + "'" + inner + "'" | |
| new_line = line[: tok_start[1]] + new_tok + line[tok_end[1] :] | |
| lines[line_idx] = new_line | |
| return "".join(lines) | |
| return None | |
| class SYN021(BaseRule): | |
| """Triple double-quote docstring (should be triple single-quote).""" | |
| code = "SYN021" | |
| fixable = True | |
| def check(self, tree, filepath, source_lines): | |
| try: | |
| tokens = _tokenize_source(source_lines) | |
| except tokenize.TokenError: | |
| return | |
| for tok_type, tok_string, tok_start, tok_end, tok_line in tokens: | |
| if tok_type != tokenize.STRING: | |
| continue | |
| stripped = tok_string | |
| while stripped and stripped[0] in "bBuUrRfF": | |
| stripped = stripped[1:] | |
| if stripped.startswith('"""'): | |
| yield self._viol( | |
| filepath, | |
| tok_start[0], | |
| tok_start[1], | |
| "Use triple single-quotes (''') instead of triple double-quotes (\"\"\")", | |
| ) | |
| def fix(self, source, violation): | |
| # For multiline triple-quoted strings, work on full source via tokenize | |
| try: | |
| tokens = list(tokenize.generate_tokens(io.StringIO(source).readline)) | |
| except tokenize.TokenError: | |
| return None | |
| for tok_type, tok_string, tok_start, tok_end, tok_line in tokens: | |
| if tok_type != tokenize.STRING: | |
| continue | |
| if tok_start[0] != violation.line or tok_start[1] != violation.col: | |
| continue | |
| prefix = "" | |
| rest = tok_string | |
| while rest and rest[0] in "bBuUrRfF": | |
| prefix += rest[0] | |
| rest = rest[1:] | |
| if not rest.startswith('"""'): | |
| return None | |
| inner = rest[3:-3] | |
| new_tok = prefix + "'''" + inner + "'''" | |
| # Reconstruct source by replacing token span | |
| source_lines_raw = source.splitlines(True) | |
| # Build position-based replacement | |
| start_line = tok_start[0] - 1 | |
| start_col = tok_start[1] | |
| end_line = tok_end[0] - 1 | |
| end_col = tok_end[1] | |
| before = ( | |
| "".join(source_lines_raw[:start_line]) | |
| + source_lines_raw[start_line][:start_col] | |
| ) | |
| after = source_lines_raw[end_line][end_col:] + "".join( | |
| source_lines_raw[end_line + 1 :] | |
| ) | |
| return before + new_tok + after | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Formatting rules | |
| # --------------------------------------------------------------------------- | |
| class SYN030(BaseRule): | |
| """Double blank line between definitions (should be single).""" | |
| code = "SYN030" | |
| fixable = True | |
| def check(self, tree, filepath, source_lines): | |
| # Collect line numbers of all def/class statements | |
| def_lines = set() | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): | |
| def_lines.add(node.lineno) | |
| # Look for double blank lines preceding definitions | |
| for i in range(2, len(source_lines)): | |
| line_num = i + 1 # 1-indexed | |
| if line_num not in def_lines: | |
| continue | |
| # Check if the two preceding lines are both blank | |
| if source_lines[i - 1].strip() == "" and source_lines[i - 2].strip() == "": | |
| yield self._viol( | |
| filepath, | |
| line_num, | |
| 0, | |
| "Use single blank line between definitions, not double", | |
| ) | |
| def fix(self, source, violation): | |
| lines = source.splitlines(True) | |
| # violation.line is the def line. The two preceding lines are blank. | |
| # Remove one of them (the one at violation.line - 2, i.e. index violation.line - 3) | |
| blank_idx = violation.line - 3 # 0-indexed, the first of the two blanks | |
| if blank_idx >= 0 and blank_idx < len(lines) and lines[blank_idx].strip() == "": | |
| del lines[blank_idx] | |
| return "".join(lines) | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Pattern rules | |
| # --------------------------------------------------------------------------- | |
| class SYN040(BaseRule): | |
| """@property decorator used (incompatible with Telepath).""" | |
| code = "SYN040" | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| continue | |
| for deco in node.decorator_list: | |
| if isinstance(deco, ast.Name) and deco.id == "property": | |
| yield self._viol( | |
| filepath, | |
| deco.lineno, | |
| deco.col_offset, | |
| f"@property decorator is incompatible with Telepath: '{node.name}'", | |
| ) | |
| class SYN041(BaseRule): | |
| """Logger .format() call (should use %s interpolation).""" | |
| code = "SYN041" | |
| fixable = True | |
| _LOG_METHODS = {"debug", "info", "warning", "error", "critical", "exception"} | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if not isinstance(node, ast.Call): | |
| continue | |
| func = node.func | |
| if not isinstance(func, ast.Attribute): | |
| continue | |
| if func.attr not in self._LOG_METHODS: | |
| continue | |
| # Check if any argument is a .format() call | |
| for arg in node.args: | |
| if ( | |
| isinstance(arg, ast.Call) | |
| and isinstance(arg.func, ast.Attribute) | |
| and arg.func.attr == "format" | |
| ): | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| f"Logger should use %s interpolation, not .format(): " | |
| f"'{func.attr}()'", | |
| ) | |
| break | |
| def fix(self, source, violation): | |
| # Parse and find the Call node at the violation line | |
| try: | |
| tree = ast.parse(source) | |
| except SyntaxError: | |
| return None | |
| for node in ast.walk(tree): | |
| if not isinstance(node, ast.Call): | |
| continue | |
| if not isinstance(node.func, ast.Attribute): | |
| continue | |
| if node.func.attr not in self._LOG_METHODS: | |
| continue | |
| if node.lineno != violation.line: | |
| continue | |
| # Find the .format() arg | |
| for i, arg in enumerate(node.args): | |
| if ( | |
| isinstance(arg, ast.Call) | |
| and isinstance(arg.func, ast.Attribute) | |
| and arg.func.attr == "format" | |
| ): | |
| # Get the format string (the value the .format is called on) | |
| fmt_str_node = arg.func.value | |
| if not isinstance(fmt_str_node, ast.Constant) or not isinstance( | |
| fmt_str_node.value, str | |
| ): | |
| return None | |
| fmt_str = fmt_str_node.value | |
| # Count {} placeholders and replace with %s | |
| fmt_args = list(arg.args) | |
| new_fmt = fmt_str.replace("{}", "%s") | |
| # Rebuild: logger.info('msg %s', x) | |
| # Replace the format call arg with the new format string + args | |
| node.args[i] = ast.Constant(value=new_fmt) | |
| node.args[i + 1 : i + 1] = fmt_args | |
| # Use ast.unparse for the whole call, then splice back | |
| new_call = ast.unparse(node) | |
| lines = source.splitlines(True) | |
| # Get the full line span of the original call | |
| line_idx = violation.line - 1 | |
| # Simple single-line replacement | |
| old_line = lines[line_idx] | |
| indent = len(old_line) - len(old_line.lstrip()) | |
| lines[line_idx] = " " * indent + new_call + "\n" | |
| return "".join(lines) | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Exception rules | |
| # --------------------------------------------------------------------------- | |
| class SYN050(BaseRule): | |
| """raise s_exc.X(...) with positional args (should use keyword args with mesg=).""" | |
| code = "SYN050" | |
| fixable = True | |
| def check(self, tree, filepath, source_lines): | |
| for node in ast.walk(tree): | |
| if not isinstance(node, ast.Raise): | |
| continue | |
| exc = node.exc | |
| if exc is None: | |
| continue | |
| if not isinstance(exc, ast.Call): | |
| continue | |
| func = exc.func | |
| is_syn_exc = False | |
| if isinstance(func, ast.Attribute): | |
| if isinstance(func.value, ast.Name): | |
| if func.value.id in ("s_exc", "s_common"): | |
| is_syn_exc = True | |
| if not is_syn_exc: | |
| continue | |
| # Has positional arguments — should use keyword args | |
| if exc.args: | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| "Exception should use keyword arguments: add mesg= parameter", | |
| ) | |
| def fix(self, source, violation): | |
| try: | |
| tree = ast.parse(source) | |
| except SyntaxError: | |
| return None | |
| for node in ast.walk(tree): | |
| if not isinstance(node, ast.Raise): | |
| continue | |
| if node.lineno != violation.line: | |
| continue | |
| exc = node.exc | |
| if not isinstance(exc, ast.Call): | |
| return None | |
| if not exc.args: | |
| return None | |
| # Convert first positional arg to mesg= keyword | |
| first_arg = exc.args.pop(0) | |
| exc.keywords.insert(0, ast.keyword(arg="mesg", value=first_arg)) | |
| new_stmt = ast.unparse(node) | |
| lines = source.splitlines(True) | |
| line_idx = violation.line - 1 | |
| old_line = lines[line_idx] | |
| indent = len(old_line) - len(old_line.lstrip()) | |
| lines[line_idx] = " " * indent + new_stmt + "\n" | |
| return "".join(lines) | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Test rules | |
| # --------------------------------------------------------------------------- | |
| def _is_test_file(filepath): | |
| """Check if a file is in the synapse tests directory.""" | |
| norm = filepath.replace("\\", "/") | |
| return "/synapse/tests/" in norm or norm.startswith("synapse/tests/") | |
| def _attr_to_str(node): | |
| """Convert an ast.Attribute chain to dotted string.""" | |
| parts = [] | |
| while isinstance(node, ast.Attribute): | |
| parts.append(node.attr) | |
| node = node.value | |
| if isinstance(node, ast.Name): | |
| parts.append(node.id) | |
| return ".".join(reversed(parts)) | |
| return None | |
| class SYN060(BaseRule): | |
| """Test class doesn't inherit from SynTest.""" | |
| code = "SYN060" | |
| _KNOWN_BASES = { | |
| "SynTest", | |
| "StormPkgTest", | |
| "s_t_utils.SynTest", | |
| "s_test.SynTest", | |
| "s_t_utils.StormPkgTest", | |
| "s_test.StormPkgTest", | |
| } | |
| def check(self, tree, filepath, source_lines): | |
| if not _is_test_file(filepath): | |
| return | |
| for node in ast.walk(tree): | |
| if not isinstance(node, ast.ClassDef): | |
| continue | |
| # Only check classes that have test_ methods | |
| has_test_method = False | |
| for item in node.body: | |
| if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| if item.name.startswith("test_"): | |
| has_test_method = True | |
| break | |
| if not has_test_method: | |
| continue | |
| base_names = set() | |
| for base in node.bases: | |
| if isinstance(base, ast.Name): | |
| base_names.add(base.id) | |
| elif isinstance(base, ast.Attribute): | |
| name = _attr_to_str(base) | |
| if name: | |
| base_names.add(name) | |
| if not base_names.intersection(self._KNOWN_BASES): | |
| yield self._viol( | |
| filepath, | |
| node.lineno, | |
| node.col_offset, | |
| f"Test class '{node.name}' should inherit from SynTest", | |
| ) | |
| class SYN061(BaseRule): | |
| """Non-async test method.""" | |
| code = "SYN061" | |
| def check(self, tree, filepath, source_lines): | |
| if not _is_test_file(filepath): | |
| return | |
| for node in ast.walk(tree): | |
| if not isinstance(node, ast.ClassDef): | |
| continue | |
| for item in node.body: | |
| if isinstance(item, ast.FunctionDef) and item.name.startswith("test_"): | |
| yield self._viol( | |
| filepath, | |
| item.lineno, | |
| item.col_offset, | |
| f"Test method '{item.name}' should be async", | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Rule registry | |
| # --------------------------------------------------------------------------- | |
| ALL_RULES = [ | |
| SYN001(), | |
| SYN002(), | |
| SYN003(), | |
| SYN004(), | |
| SYN010(), | |
| SYN011(), | |
| SYN012(), | |
| SYN020(), | |
| SYN021(), | |
| SYN030(), | |
| SYN040(), | |
| SYN041(), | |
| SYN050(), | |
| SYN060(), | |
| SYN061(), | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Style checker | |
| # --------------------------------------------------------------------------- | |
| class StyleChecker: | |
| """Runs all rules against source files and collects violations.""" | |
| EXCLUDE_PATTERNS = [ | |
| "synapse/vendor/*", | |
| "*/synapse/vendor/*", | |
| "synapse/lookup/*", | |
| "*/synapse/lookup/*", | |
| ] | |
| def __init__(self, rules=None, select=None, ignore=None): | |
| rules = rules or ALL_RULES | |
| if select: | |
| rules = [r for r in rules if any(r.code.startswith(s) for s in select)] | |
| if ignore: | |
| rules = [r for r in rules if r.code not in ignore] | |
| self.rules = rules | |
| def _is_excluded(self, filepath): | |
| norm = filepath.replace("\\", "/") | |
| for pat in self.EXCLUDE_PATTERNS: | |
| if fnmatch.fnmatch(norm, pat): | |
| return True | |
| return False | |
| def check_file(self, filepath): | |
| """Check a single file and return list of Violations.""" | |
| if self._is_excluded(filepath): | |
| return [] | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| source = f.read() | |
| except (OSError, UnicodeDecodeError): | |
| return [] | |
| source_lines = source.splitlines() | |
| try: | |
| tree = ast.parse(source, filename=filepath) | |
| except SyntaxError: | |
| return [] | |
| noqa = _noqa_lines(source_lines) | |
| violations = [] | |
| for rule in self.rules: | |
| for v in rule.check(tree, filepath, source_lines): | |
| line_noqa = noqa.get(v.line, set()) | |
| if "*" in line_noqa or v.code in line_noqa: | |
| continue | |
| violations.append(v) | |
| violations.sort(key=lambda v: (v.line, v.col, v.code)) | |
| return violations | |
| def check_paths(self, paths): | |
| """Check multiple paths (files or directories).""" | |
| all_violations = [] | |
| files_checked = 0 | |
| for path in paths: | |
| if os.path.isfile(path): | |
| if path.endswith(".py"): | |
| viols = self.check_file(path) | |
| all_violations.extend(viols) | |
| files_checked += 1 | |
| elif os.path.isdir(path): | |
| for root, dirs, files in os.walk(path): | |
| dirs[:] = [ | |
| d for d in dirs if d != "vendor" and not d.startswith(".") | |
| ] | |
| for fname in sorted(files): | |
| if not fname.endswith(".py"): | |
| continue | |
| fpath = os.path.join(root, fname) | |
| viols = self.check_file(fpath) | |
| all_violations.extend(viols) | |
| files_checked += 1 | |
| return all_violations, files_checked | |
| def fix_file(self, filepath, dry_run=False, diff=False): | |
| """Fix all auto-fixable violations in a file. | |
| Returns (fixes_applied: int, diff_text: str or None). | |
| """ | |
| if self._is_excluded(filepath): | |
| return 0, None | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| original = f.read() | |
| except (OSError, UnicodeDecodeError): | |
| return 0, None | |
| source = original | |
| fixes = 0 | |
| # Keep fixing until no more fixable violations | |
| for _ in range(100): # safety limit | |
| source_lines = source.splitlines() | |
| try: | |
| tree = ast.parse(source, filename=filepath) | |
| except SyntaxError: | |
| break | |
| noqa = _noqa_lines(source_lines) | |
| fixable_viols = [] | |
| for rule in self.rules: | |
| if not rule.fixable: | |
| continue | |
| for v in rule.check(tree, filepath, source_lines): | |
| line_noqa = noqa.get(v.line, set()) | |
| if "*" in line_noqa or v.code in line_noqa: | |
| continue | |
| fixable_viols.append((rule, v)) | |
| if not fixable_viols: | |
| break | |
| # Apply first fix only (then re-scan to keep line numbers valid) | |
| rule, viol = fixable_viols[0] | |
| new_source = rule.fix(source, viol) | |
| if new_source is None or new_source == source: | |
| # Can't fix this one; skip it and try next | |
| # (Would need a skip list in production, but for now break) | |
| break | |
| source = new_source | |
| fixes += 1 | |
| if fixes == 0: | |
| return 0, None | |
| diff_text = None | |
| if diff or dry_run: | |
| diff_text = "".join( | |
| difflib.unified_diff( | |
| original.splitlines(True), | |
| source.splitlines(True), | |
| fromfile=filepath, | |
| tofile=filepath, | |
| ) | |
| ) | |
| if not dry_run: | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| f.write(source) | |
| return fixes, diff_text | |
| def fix_paths(self, paths, dry_run=False, diff=False): | |
| """Fix all auto-fixable violations in files/directories. | |
| Returns (total_fixes, files_fixed, diff_texts). | |
| """ | |
| total_fixes = 0 | |
| files_fixed = 0 | |
| diff_texts = [] | |
| for path in paths: | |
| if os.path.isfile(path): | |
| if path.endswith(".py"): | |
| nfixes, dtxt = self.fix_file(path, dry_run=dry_run, diff=diff) | |
| total_fixes += nfixes | |
| if nfixes > 0: | |
| files_fixed += 1 | |
| if dtxt: | |
| diff_texts.append(dtxt) | |
| elif os.path.isdir(path): | |
| for root, dirs, files in os.walk(path): | |
| dirs[:] = [ | |
| d for d in dirs if d != "vendor" and not d.startswith(".") | |
| ] | |
| for fname in sorted(files): | |
| if not fname.endswith(".py"): | |
| continue | |
| fpath = os.path.join(root, fname) | |
| nfixes, dtxt = self.fix_file(fpath, dry_run=dry_run, diff=diff) | |
| total_fixes += nfixes | |
| if nfixes > 0: | |
| files_fixed += 1 | |
| if dtxt: | |
| diff_texts.append(dtxt) | |
| return total_fixes, files_fixed, diff_texts | |
| # --------------------------------------------------------------------------- | |
| # Output formatters | |
| # --------------------------------------------------------------------------- | |
| def format_text(violations, files_checked): | |
| lines = [] | |
| for v in violations: | |
| lines.append(str(v)) | |
| if violations: | |
| file_count = len({v.filepath for v in violations}) | |
| lines.append("") | |
| lines.append( | |
| f"Found {len(violations)} violation(s) across " | |
| f"{file_count} file(s) ({files_checked} files checked)" | |
| ) | |
| else: | |
| lines.append(f"All clean! ({files_checked} files checked)") | |
| return "\n".join(lines) | |
| def format_json(violations, files_checked): | |
| data = { | |
| "violations": [v.to_dict() for v in violations], | |
| "summary": { | |
| "total_violations": len(violations), | |
| "files_with_violations": len({v.filepath for v in violations}), | |
| "files_checked": files_checked, | |
| }, | |
| } | |
| return json.dumps(data, indent=2) | |
| def format_stats(violations, files_checked): | |
| lines = [] | |
| by_code = collections.Counter(v.code for v in violations) | |
| by_file = collections.Counter(v.filepath for v in violations) | |
| lines.append(f"Files checked: {files_checked}") | |
| lines.append(f"Total violations: {len(violations)}") | |
| lines.append("") | |
| if by_code: | |
| lines.append("By rule:") | |
| for code, count in by_code.most_common(): | |
| lines.append(f" {code}: {count}") | |
| lines.append("") | |
| lines.append("Top files:") | |
| for fpath, count in by_file.most_common(10): | |
| lines.append(f" {fpath}: {count}") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def build_parser(): | |
| parser = argparse.ArgumentParser( | |
| prog="synapse_stylecheck", | |
| description="Synapse-specific style checker (AST-based)", | |
| ) | |
| parser.add_argument( | |
| "paths", | |
| nargs="+", | |
| help="Files or directories to check", | |
| ) | |
| parser.add_argument( | |
| "--select", | |
| help="Comma-separated rule prefixes to select (e.g. SYN00,SYN01)", | |
| ) | |
| parser.add_argument( | |
| "--ignore", | |
| help="Comma-separated rule codes to ignore (e.g. SYN020,SYN030)", | |
| ) | |
| parser.add_argument( | |
| "--format", | |
| choices=["text", "json", "stats"], | |
| default="text", | |
| help="Output format (default: text)", | |
| ) | |
| parser.add_argument( | |
| "--stats", | |
| action="store_true", | |
| help="Shortcut for --format stats", | |
| ) | |
| parser.add_argument( | |
| "--fix", | |
| action="store_true", | |
| help="Auto-fix fixable violations in-place", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be fixed without modifying files", | |
| ) | |
| parser.add_argument( | |
| "--diff", | |
| action="store_true", | |
| help="Show unified diff of fixes", | |
| ) | |
| return parser | |
| def main(argv=None): | |
| parser = build_parser() | |
| args = parser.parse_args(argv) | |
| select = [s.strip() for s in args.select.split(",")] if args.select else None | |
| ignore = {s.strip() for s in args.ignore.split(",")} if args.ignore else None | |
| # Handle fix mode | |
| if args.fix or args.dry_run: | |
| dry_run = args.dry_run | |
| show_diff = args.diff or args.dry_run # dry-run always shows diff | |
| checker = StyleChecker(select=select, ignore=ignore) | |
| total_fixes, files_fixed, diff_texts = checker.fix_paths( | |
| args.paths, dry_run=dry_run, diff=show_diff | |
| ) | |
| if diff_texts: | |
| for dt in diff_texts: | |
| print(dt) | |
| if dry_run: | |
| print( | |
| f"\nDry run: {total_fixes} fix(es) across {files_fixed} file(s) would be applied" | |
| ) | |
| else: | |
| print(f"\nApplied {total_fixes} fix(es) across {files_fixed} file(s)") | |
| return 0 | |
| # Normal check mode | |
| fmt = "stats" if args.stats else args.format | |
| checker = StyleChecker(select=select, ignore=ignore) | |
| violations, files_checked = checker.check_paths(args.paths) | |
| if fmt == "json": | |
| output = format_json(violations, files_checked) | |
| elif fmt == "stats": | |
| output = format_stats(violations, files_checked) | |
| else: | |
| output = format_text(violations, files_checked) | |
| print(output) | |
| if violations: | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment