Created
March 20, 2026 13:34
-
-
Save kartben/8399015a8c6625fd5f2e4388360b98f1 to your computer and use it in GitHub Desktop.
mergeable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # SPDX-FileCopyrightText: Copyright The Zephyr Project Contributors | |
| # SPDX-License-Identifier: Apache-2.0 | |
| """ | |
| List open GitHub PRs that are **truly mergeable**: no merge conflicts (GitHub | |
| ``mergeable``) **and** all **required** status checks completed successfully. | |
| For each listed PR, shows failing **non-required** checks (if any). | |
| Uses the GitHub GraphQL API. Authenticate with ``gh auth login`` or set | |
| ``GITHUB_TOKEN`` in the environment. | |
| Example:: | |
| python3 mergeable_prs_nonrequired_failures.py | |
| python3 mergeable_prs_nonrequired_failures.py --owner zephyrproject-rtos --repo zephyr | |
| python3 mergeable_prs_nonrequired_failures.py --only-nonrequired-failures | |
| Progress bars are written to stderr when connected to a TTY (uses ``tqdm`` if | |
| installed; otherwise a simple counter). Use ``--no-progress`` to disable. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import urllib.error | |
| import urllib.request | |
| from collections.abc import Iterable | |
| from dataclasses import dataclass | |
| from typing import Any | |
| try: | |
| from tqdm import tqdm as _tqdm_bar | |
| except ImportError: | |
| _tqdm_bar = None | |
| GITHUB_GRAPHQL_URL = "https://api.github.com/graphql" | |
| # CheckRun.conclusion values that count as a failed check for reporting. | |
| _FAILED_CHECK_RUN_CONCLUSIONS = frozenset( | |
| { | |
| "FAILURE", | |
| "TIMED_OUT", | |
| "ACTION_REQUIRED", | |
| "STARTUP_FAILURE", | |
| "CANCELLED", | |
| } | |
| ) | |
| # Commit status API rollup (legacy StatusContext). | |
| _FAILED_STATUS_CONTEXT_STATES = frozenset({"FAILURE", "ERROR"}) | |
| # Required CheckRun is treated as satisfied when completed with one of these conclusions. | |
| _SUCCESS_CHECK_RUN_CONCLUSIONS = frozenset({"SUCCESS", "NEUTRAL", "SKIPPED"}) | |
| def _progress_disabled(no_progress_flag: bool) -> bool: | |
| return no_progress_flag or not sys.stderr.isatty() | |
| def _iter_with_progress( | |
| iterable: Iterable[dict[str, Any]], | |
| *, | |
| desc: str, | |
| unit: str, | |
| disable: bool, | |
| ) -> Iterable[dict[str, Any]]: | |
| if _tqdm_bar is not None and not disable: | |
| yield from _tqdm_bar( | |
| iterable, | |
| desc=desc, | |
| unit=unit, | |
| file=sys.stderr, | |
| leave=True, | |
| ) | |
| return | |
| n = 0 | |
| for item in iterable: | |
| n += 1 | |
| if not disable: | |
| print(f"\r{desc}: {n} {unit}...", end="", file=sys.stderr, flush=True) | |
| yield item | |
| if n and not disable: | |
| print(file=sys.stderr) | |
| class _FallbackTotalBar: | |
| def __init__(self, total: int, desc: str, unit: str, disable: bool) -> None: | |
| self._total = total | |
| self._desc = desc | |
| self._unit = unit | |
| self._disable = disable | |
| self._n = 0 | |
| def update(self, n: int = 1) -> None: | |
| self._n += n | |
| if self._disable or self._total <= 0: | |
| return | |
| print( | |
| f"\r{self._desc}: {self._n}/{self._total} {self._unit}", | |
| end="", | |
| file=sys.stderr, | |
| flush=True, | |
| ) | |
| def close(self) -> None: | |
| if not self._disable and self._total > 0: | |
| print(file=sys.stderr) | |
| def _total_progress_bar( | |
| total: int, | |
| *, | |
| desc: str, | |
| unit: str, | |
| disable: bool, | |
| ) -> Any: | |
| if _tqdm_bar is not None and not disable: | |
| return _tqdm_bar( | |
| total=total, | |
| desc=desc, | |
| unit=unit, | |
| file=sys.stderr, | |
| leave=True, | |
| ) | |
| return _FallbackTotalBar(total, desc, unit, disable) | |
| def _get_token() -> str: | |
| token = os.environ.get("GITHUB_TOKEN") | |
| if token: | |
| return token.strip() | |
| try: | |
| out = subprocess.run( | |
| ["gh", "auth", "token"], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| except (FileNotFoundError, subprocess.CalledProcessError) as e: | |
| raise RuntimeError( | |
| "Need GitHub credentials: set GITHUB_TOKEN or run `gh auth login`." | |
| ) from e | |
| t = out.stdout.strip() | |
| if not t: | |
| raise RuntimeError("gh auth token returned empty output.") | |
| return t | |
| def _graphql(token: str, query: str, variables: dict[str, Any] | None = None) -> dict[str, Any]: | |
| body = json.dumps({"query": query, "variables": variables or {}}).encode() | |
| req = urllib.request.Request( | |
| GITHUB_GRAPHQL_URL, | |
| data=body, | |
| method="POST", | |
| headers={ | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| "User-Agent": "zephyr-mergeable-pr-checks-script", | |
| }, | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| payload = json.loads(resp.read().decode()) | |
| except urllib.error.HTTPError as e: | |
| detail = e.read().decode(errors="replace") | |
| raise RuntimeError(f"GraphQL HTTP {e.code}: {detail}") from e | |
| if payload.get("errors"): | |
| msgs = "; ".join(err.get("message", str(err)) for err in payload["errors"]) | |
| raise RuntimeError(f"GraphQL errors: {msgs}") | |
| return payload["data"] | |
| def _iter_open_prs(owner: str, repo: str, token: str) -> Iterable[dict[str, Any]]: | |
| cursor: str | None = None | |
| q = """ | |
| query ($owner: String!, $repo: String!, $cursor: String) { | |
| repository(owner: $owner, name: $repo) { | |
| pullRequests( | |
| first: 100 | |
| after: $cursor | |
| states: OPEN | |
| orderBy: { field: UPDATED_AT, direction: DESC } | |
| ) { | |
| pageInfo { | |
| hasNextPage | |
| endCursor | |
| } | |
| nodes { | |
| number | |
| mergeable | |
| title | |
| url | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| while True: | |
| data = _graphql( | |
| token, | |
| q, | |
| {"owner": owner, "repo": repo, "cursor": cursor}, | |
| ) | |
| conn = data["repository"]["pullRequests"] | |
| yield from conn["nodes"] | |
| if not conn["pageInfo"]["hasNextPage"]: | |
| break | |
| cursor = conn["pageInfo"]["endCursor"] | |
| def _alias_for_pr(num: int) -> str: | |
| return f"pr_{num}" | |
| def _fetch_pr_check_rollups( | |
| owner: str, | |
| repo: str, | |
| token: str, | |
| pr_numbers: list[int], | |
| ) -> dict[int, dict[str, Any]]: | |
| """Return map pr_number -> pullRequest object from batched query.""" | |
| if not pr_numbers: | |
| return {} | |
| lines: list[str] = [ | |
| "query ($owner: String!, $repo: String!) {", | |
| ' repository(owner: $owner, name: $repo) {', | |
| ] | |
| for num in pr_numbers: | |
| alias = _alias_for_pr(num) | |
| lines.append(f" {alias}: pullRequest(number: {num}) {{") | |
| lines.append(" number") | |
| lines.append(" mergeable") | |
| lines.append(" title") | |
| lines.append(" url") | |
| lines.append(" commits(last: 1) {") | |
| lines.append(" nodes {") | |
| lines.append(" commit {") | |
| lines.append(" statusCheckRollup {") | |
| lines.append(" state") | |
| lines.append(" contexts(first: 100) {") | |
| lines.append(" pageInfo { hasNextPage endCursor }") | |
| lines.append(" totalCount") | |
| lines.append(" nodes {") | |
| lines.append(" __typename") | |
| lines.append(" ... on CheckRun {") | |
| lines.append(" name") | |
| lines.append(" conclusion") | |
| lines.append(" status") | |
| lines.append(f" isRequired(pullRequestNumber: {num})") | |
| lines.append( | |
| " checkSuite { workflowRun { workflow { name } } }" | |
| ) | |
| lines.append(" }") | |
| lines.append(" ... on StatusContext {") | |
| lines.append(" context") | |
| lines.append(" state") | |
| lines.append(f" isRequired(pullRequestNumber: {num})") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append(" }") | |
| lines.append("}") | |
| data = _graphql(token, "\n".join(lines), {"owner": owner, "repo": repo}) | |
| repo_obj = data["repository"] | |
| out: dict[int, dict[str, Any]] = {} | |
| for num in pr_numbers: | |
| alias = _alias_for_pr(num) | |
| pr = repo_obj.get(alias) | |
| if pr is not None: | |
| out[num] = pr | |
| return out | |
| @dataclass(frozen=True) | |
| class OptionalFailure: | |
| kind: str # "CheckRun" | "StatusContext" | "Meta" | |
| workflow_or_context: str | |
| name: str | |
| detail: str | |
| def _rollup_optional_failures(pr_number: int, pr_obj: dict[str, Any]) -> list[OptionalFailure]: | |
| failures: list[OptionalFailure] = [] | |
| try: | |
| commit = pr_obj["commits"]["nodes"][0]["commit"] | |
| except (IndexError, KeyError, TypeError): | |
| return failures | |
| rollup = commit.get("statusCheckRollup") or {} | |
| contexts = (rollup.get("contexts") or {}).get("nodes") or [] | |
| if (rollup.get("contexts") or {}).get("pageInfo", {}).get("hasNextPage"): | |
| # Rare: more than 100 checks; surface a hint. | |
| failures.append( | |
| OptionalFailure( | |
| kind="Meta", | |
| workflow_or_context="(pagination)", | |
| name="truncated", | |
| detail="More than 100 status contexts; re-run with smaller repo or extend script.", | |
| ) | |
| ) | |
| for node in contexts: | |
| if not node: | |
| continue | |
| typename = node.get("__typename") | |
| if typename == "CheckRun": | |
| if node.get("isRequired") is not False: | |
| continue | |
| if node.get("status") != "COMPLETED": | |
| continue | |
| conc = node.get("conclusion") | |
| if conc not in _FAILED_CHECK_RUN_CONCLUSIONS: | |
| continue | |
| suite = node.get("checkSuite") or {} | |
| wr = suite.get("workflowRun") or {} | |
| wf = (wr.get("workflow") or {}).get("name") | |
| wf = wf or "(no workflow)" | |
| name = node.get("name") or "(unnamed job)" | |
| failures.append( | |
| OptionalFailure( | |
| kind="CheckRun", | |
| workflow_or_context=wf, | |
| name=name, | |
| detail=f"conclusion={conc}", | |
| ) | |
| ) | |
| elif typename == "StatusContext": | |
| if node.get("isRequired") is not False: | |
| continue | |
| st = node.get("state") | |
| if st not in _FAILED_STATUS_CONTEXT_STATES: | |
| continue | |
| ctx = node.get("context") or "(context)" | |
| failures.append( | |
| OptionalFailure( | |
| kind="StatusContext", | |
| workflow_or_context=ctx, | |
| name=ctx, | |
| detail=f"state={st}", | |
| ) | |
| ) | |
| return failures | |
| def _required_checks_satisfied(pr_obj: dict[str, Any]) -> tuple[bool, list[str]]: | |
| """True if every required context is present and successful (rollup may omit checks).""" | |
| reasons: list[str] = [] | |
| try: | |
| commit = pr_obj["commits"]["nodes"][0]["commit"] | |
| except (IndexError, KeyError, TypeError): | |
| return False, ["no head commit on PR"] | |
| rollup = commit.get("statusCheckRollup") or {} | |
| contexts_conn = rollup.get("contexts") or {} | |
| if contexts_conn.get("pageInfo", {}).get("hasNextPage"): | |
| return False, [ | |
| "status contexts truncated (>100); cannot verify all required checks", | |
| ] | |
| for node in contexts_conn.get("nodes") or []: | |
| if not node: | |
| continue | |
| typename = node.get("__typename") | |
| if typename == "CheckRun": | |
| if node.get("isRequired") is not True: | |
| continue | |
| name = node.get("name") or "(unnamed check)" | |
| st = node.get("status") | |
| if st != "COMPLETED": | |
| reasons.append(f"required check not finished: {name!r} ({st})") | |
| continue | |
| conc = node.get("conclusion") | |
| if conc not in _SUCCESS_CHECK_RUN_CONCLUSIONS: | |
| reasons.append(f"required check not successful: {name!r} (conclusion={conc})") | |
| elif typename == "StatusContext": | |
| if node.get("isRequired") is not True: | |
| continue | |
| ctx = node.get("context") or "(context)" | |
| state = node.get("state") | |
| if state != "SUCCESS": | |
| reasons.append(f"required status context {ctx!r}: {state}") | |
| return (len(reasons) == 0, reasons) | |
| def _real_optional_failures(failures: list[OptionalFailure]) -> list[OptionalFailure]: | |
| """CheckRun / StatusContext only (excludes Meta pagination hints).""" | |
| return [f for f in failures if f.kind in ("CheckRun", "StatusContext")] | |
| def main() -> int: | |
| p = argparse.ArgumentParser( | |
| description=( | |
| "List open PRs with no merge conflicts and all required checks green, " | |
| "and any failing non-required checks." | |
| ) | |
| ) | |
| p.add_argument("--owner", default="zephyrproject-rtos", help="GitHub org or user") | |
| p.add_argument("--repo", default="zephyr", help="Repository name") | |
| p.add_argument( | |
| "--batch-size", | |
| type=int, | |
| default=12, | |
| metavar="N", | |
| help="Number of PRs to fetch per GraphQL request (default: 12)", | |
| ) | |
| p.add_argument( | |
| "--no-progress", | |
| action="store_true", | |
| help="Disable progress output on stderr", | |
| ) | |
| p.add_argument( | |
| "--only-nonrequired-failures", | |
| action="store_true", | |
| help=( | |
| "Print only PRs (among truly mergeable) that have at least one " | |
| "failing non-required check" | |
| ), | |
| ) | |
| args = p.parse_args() | |
| prog_off = _progress_disabled(args.no_progress) | |
| try: | |
| token = _get_token() | |
| except RuntimeError as e: | |
| print(str(e), file=sys.stderr) | |
| return 1 | |
| mergeable_prs: list[dict[str, Any]] = [] | |
| unknown_mergeable = 0 | |
| if not prog_off: | |
| print("Fetching open PRs (GraphQL)...", file=sys.stderr) | |
| for node in _iter_with_progress( | |
| _iter_open_prs(args.owner, args.repo, token), | |
| desc="Open PRs", | |
| unit="PR", | |
| disable=prog_off, | |
| ): | |
| m = node.get("mergeable") | |
| if m == "MERGEABLE": | |
| mergeable_prs.append(node) | |
| elif m == "UNKNOWN": | |
| unknown_mergeable += 1 | |
| extra = ( | |
| f" ({unknown_mergeable} open PR(s) have mergeable=UNKNOWN and are skipped)" | |
| if unknown_mergeable | |
| else "" | |
| ) | |
| print( | |
| f"Open PRs (no merge conflicts, GitHub mergeable): {len(mergeable_prs)}{extra}" | |
| ) | |
| print("(only PRs with all required checks successful are listed below)") | |
| if args.only_nonrequired_failures: | |
| print("(output limited to PRs with failing non-required checks)") | |
| if not mergeable_prs: | |
| print("No open PRs with mergeable state (no conflicts).") | |
| return 0 | |
| numbers = [int(n["number"]) for n in mergeable_prs] | |
| batch_size = max(1, args.batch_size) | |
| any_real_optional_failures = False | |
| prs_with_real_failures = 0 | |
| skipped_not_truly_mergeable = 0 | |
| n_truly_mergeable = 0 | |
| if not prog_off: | |
| print("Fetching check rollups for mergeable PRs...", file=sys.stderr) | |
| pbar = _total_progress_bar( | |
| len(numbers), | |
| desc="Check rollups", | |
| unit="PR", | |
| disable=prog_off, | |
| ) | |
| try: | |
| for i in range(0, len(numbers), batch_size): | |
| chunk = numbers[i : i + batch_size] | |
| rollups = _fetch_pr_check_rollups(args.owner, args.repo, token, chunk) | |
| pbar.update(len(chunk)) | |
| for num in chunk: | |
| pr_list_meta = next( | |
| (x for x in mergeable_prs if int(x["number"]) == num), None | |
| ) | |
| pr_obj = rollups.get(num) | |
| title = (pr_list_meta or {}).get("title") or (pr_obj or {}).get("title") or "" | |
| url = (pr_list_meta or {}).get("url") or (pr_obj or {}).get("url") or "" | |
| if not pr_obj: | |
| if args.only_nonrequired_failures: | |
| print( | |
| f"Warning: could not load check rollup for PR #{num}", | |
| file=sys.stderr, | |
| ) | |
| else: | |
| print(f"\n#{num} {title}\n {url}\n (could not load check rollup)") | |
| continue | |
| # Re-check mergeable in case it changed between list and detail fetch. | |
| if pr_obj.get("mergeable") != "MERGEABLE": | |
| continue | |
| req_ok, _req_reasons = _required_checks_satisfied(pr_obj) | |
| if not req_ok: | |
| skipped_not_truly_mergeable += 1 | |
| continue | |
| n_truly_mergeable += 1 | |
| fails = _rollup_optional_failures(num, pr_obj) | |
| real = _real_optional_failures(fails) | |
| if args.only_nonrequired_failures and not real: | |
| continue | |
| print(f"\n#{num} {title}\n {url}") | |
| if not fails: | |
| print(" Non-required failing checks: none") | |
| continue | |
| if real: | |
| any_real_optional_failures = True | |
| prs_with_real_failures += 1 | |
| print(" Non-required failing checks:") | |
| for f in fails: | |
| if f.kind == "Meta": | |
| print(f" ! {f.detail}") | |
| elif f.kind == "CheckRun": | |
| print( | |
| f" - workflow={f.workflow_or_context!r} " | |
| f"job={f.name!r} ({f.detail})" | |
| ) | |
| else: | |
| print( | |
| f" - status context={f.workflow_or_context!r} ({f.detail})" | |
| ) | |
| finally: | |
| pbar.close() | |
| if skipped_not_truly_mergeable: | |
| print( | |
| f"Note: omitted {skipped_not_truly_mergeable} PR(s) " | |
| "(required checks not all successful, incomplete, or rollup truncated).", | |
| file=sys.stderr, | |
| ) | |
| if args.only_nonrequired_failures: | |
| if n_truly_mergeable == 0: | |
| print( | |
| "\nNo truly mergeable PRs (no candidate had all required checks successful)." | |
| ) | |
| elif not any_real_optional_failures: | |
| print( | |
| "\nNo failing non-required checks on any truly mergeable PR." | |
| ) | |
| else: | |
| print( | |
| f"\nTotal: {prs_with_real_failures} truly mergeable PR(s) " | |
| "with failing non-required checks." | |
| ) | |
| elif n_truly_mergeable == 0: | |
| print( | |
| "\nNo truly mergeable PRs (no candidate had all required checks successful)." | |
| ) | |
| elif not any_real_optional_failures: | |
| print("\nSummary: no failing non-required checks on listed truly mergeable PRs.") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment