Skip to content

Instantly share code, notes, and snippets.

@kartben
Created March 20, 2026 13:34
Show Gist options
  • Select an option

  • Save kartben/8399015a8c6625fd5f2e4388360b98f1 to your computer and use it in GitHub Desktop.

Select an option

Save kartben/8399015a8c6625fd5f2e4388360b98f1 to your computer and use it in GitHub Desktop.
mergeable
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright The Zephyr Project Contributors
# SPDX-License-Identifier: Apache-2.0
"""
List open GitHub PRs that are **truly mergeable**: no merge conflicts (GitHub
``mergeable``) **and** all **required** status checks completed successfully.
For each listed PR, shows failing **non-required** checks (if any).
Uses the GitHub GraphQL API. Authenticate with ``gh auth login`` or set
``GITHUB_TOKEN`` in the environment.
Example::
python3 mergeable_prs_nonrequired_failures.py
python3 mergeable_prs_nonrequired_failures.py --owner zephyrproject-rtos --repo zephyr
python3 mergeable_prs_nonrequired_failures.py --only-nonrequired-failures
Progress bars are written to stderr when connected to a TTY (uses ``tqdm`` if
installed; otherwise a simple counter). Use ``--no-progress`` to disable.
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
import urllib.error
import urllib.request
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any
try:
from tqdm import tqdm as _tqdm_bar
except ImportError:
_tqdm_bar = None
GITHUB_GRAPHQL_URL = "https://api.github.com/graphql"
# CheckRun.conclusion values that count as a failed check for reporting.
_FAILED_CHECK_RUN_CONCLUSIONS = frozenset(
{
"FAILURE",
"TIMED_OUT",
"ACTION_REQUIRED",
"STARTUP_FAILURE",
"CANCELLED",
}
)
# Commit status API rollup (legacy StatusContext).
_FAILED_STATUS_CONTEXT_STATES = frozenset({"FAILURE", "ERROR"})
# Required CheckRun is treated as satisfied when completed with one of these conclusions.
_SUCCESS_CHECK_RUN_CONCLUSIONS = frozenset({"SUCCESS", "NEUTRAL", "SKIPPED"})
def _progress_disabled(no_progress_flag: bool) -> bool:
return no_progress_flag or not sys.stderr.isatty()
def _iter_with_progress(
iterable: Iterable[dict[str, Any]],
*,
desc: str,
unit: str,
disable: bool,
) -> Iterable[dict[str, Any]]:
if _tqdm_bar is not None and not disable:
yield from _tqdm_bar(
iterable,
desc=desc,
unit=unit,
file=sys.stderr,
leave=True,
)
return
n = 0
for item in iterable:
n += 1
if not disable:
print(f"\r{desc}: {n} {unit}...", end="", file=sys.stderr, flush=True)
yield item
if n and not disable:
print(file=sys.stderr)
class _FallbackTotalBar:
def __init__(self, total: int, desc: str, unit: str, disable: bool) -> None:
self._total = total
self._desc = desc
self._unit = unit
self._disable = disable
self._n = 0
def update(self, n: int = 1) -> None:
self._n += n
if self._disable or self._total <= 0:
return
print(
f"\r{self._desc}: {self._n}/{self._total} {self._unit}",
end="",
file=sys.stderr,
flush=True,
)
def close(self) -> None:
if not self._disable and self._total > 0:
print(file=sys.stderr)
def _total_progress_bar(
total: int,
*,
desc: str,
unit: str,
disable: bool,
) -> Any:
if _tqdm_bar is not None and not disable:
return _tqdm_bar(
total=total,
desc=desc,
unit=unit,
file=sys.stderr,
leave=True,
)
return _FallbackTotalBar(total, desc, unit, disable)
def _get_token() -> str:
token = os.environ.get("GITHUB_TOKEN")
if token:
return token.strip()
try:
out = subprocess.run(
["gh", "auth", "token"],
check=True,
capture_output=True,
text=True,
)
except (FileNotFoundError, subprocess.CalledProcessError) as e:
raise RuntimeError(
"Need GitHub credentials: set GITHUB_TOKEN or run `gh auth login`."
) from e
t = out.stdout.strip()
if not t:
raise RuntimeError("gh auth token returned empty output.")
return t
def _graphql(token: str, query: str, variables: dict[str, Any] | None = None) -> dict[str, Any]:
body = json.dumps({"query": query, "variables": variables or {}}).encode()
req = urllib.request.Request(
GITHUB_GRAPHQL_URL,
data=body,
method="POST",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": "zephyr-mergeable-pr-checks-script",
},
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
payload = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
detail = e.read().decode(errors="replace")
raise RuntimeError(f"GraphQL HTTP {e.code}: {detail}") from e
if payload.get("errors"):
msgs = "; ".join(err.get("message", str(err)) for err in payload["errors"])
raise RuntimeError(f"GraphQL errors: {msgs}")
return payload["data"]
def _iter_open_prs(owner: str, repo: str, token: str) -> Iterable[dict[str, Any]]:
cursor: str | None = None
q = """
query ($owner: String!, $repo: String!, $cursor: String) {
repository(owner: $owner, name: $repo) {
pullRequests(
first: 100
after: $cursor
states: OPEN
orderBy: { field: UPDATED_AT, direction: DESC }
) {
pageInfo {
hasNextPage
endCursor
}
nodes {
number
mergeable
title
url
}
}
}
}
"""
while True:
data = _graphql(
token,
q,
{"owner": owner, "repo": repo, "cursor": cursor},
)
conn = data["repository"]["pullRequests"]
yield from conn["nodes"]
if not conn["pageInfo"]["hasNextPage"]:
break
cursor = conn["pageInfo"]["endCursor"]
def _alias_for_pr(num: int) -> str:
return f"pr_{num}"
def _fetch_pr_check_rollups(
owner: str,
repo: str,
token: str,
pr_numbers: list[int],
) -> dict[int, dict[str, Any]]:
"""Return map pr_number -> pullRequest object from batched query."""
if not pr_numbers:
return {}
lines: list[str] = [
"query ($owner: String!, $repo: String!) {",
' repository(owner: $owner, name: $repo) {',
]
for num in pr_numbers:
alias = _alias_for_pr(num)
lines.append(f" {alias}: pullRequest(number: {num}) {{")
lines.append(" number")
lines.append(" mergeable")
lines.append(" title")
lines.append(" url")
lines.append(" commits(last: 1) {")
lines.append(" nodes {")
lines.append(" commit {")
lines.append(" statusCheckRollup {")
lines.append(" state")
lines.append(" contexts(first: 100) {")
lines.append(" pageInfo { hasNextPage endCursor }")
lines.append(" totalCount")
lines.append(" nodes {")
lines.append(" __typename")
lines.append(" ... on CheckRun {")
lines.append(" name")
lines.append(" conclusion")
lines.append(" status")
lines.append(f" isRequired(pullRequestNumber: {num})")
lines.append(
" checkSuite { workflowRun { workflow { name } } }"
)
lines.append(" }")
lines.append(" ... on StatusContext {")
lines.append(" context")
lines.append(" state")
lines.append(f" isRequired(pullRequestNumber: {num})")
lines.append(" }")
lines.append(" }")
lines.append(" }")
lines.append(" }")
lines.append(" }")
lines.append(" }")
lines.append(" }")
lines.append(" }")
lines.append(" }")
lines.append("}")
data = _graphql(token, "\n".join(lines), {"owner": owner, "repo": repo})
repo_obj = data["repository"]
out: dict[int, dict[str, Any]] = {}
for num in pr_numbers:
alias = _alias_for_pr(num)
pr = repo_obj.get(alias)
if pr is not None:
out[num] = pr
return out
@dataclass(frozen=True)
class OptionalFailure:
kind: str # "CheckRun" | "StatusContext" | "Meta"
workflow_or_context: str
name: str
detail: str
def _rollup_optional_failures(pr_number: int, pr_obj: dict[str, Any]) -> list[OptionalFailure]:
failures: list[OptionalFailure] = []
try:
commit = pr_obj["commits"]["nodes"][0]["commit"]
except (IndexError, KeyError, TypeError):
return failures
rollup = commit.get("statusCheckRollup") or {}
contexts = (rollup.get("contexts") or {}).get("nodes") or []
if (rollup.get("contexts") or {}).get("pageInfo", {}).get("hasNextPage"):
# Rare: more than 100 checks; surface a hint.
failures.append(
OptionalFailure(
kind="Meta",
workflow_or_context="(pagination)",
name="truncated",
detail="More than 100 status contexts; re-run with smaller repo or extend script.",
)
)
for node in contexts:
if not node:
continue
typename = node.get("__typename")
if typename == "CheckRun":
if node.get("isRequired") is not False:
continue
if node.get("status") != "COMPLETED":
continue
conc = node.get("conclusion")
if conc not in _FAILED_CHECK_RUN_CONCLUSIONS:
continue
suite = node.get("checkSuite") or {}
wr = suite.get("workflowRun") or {}
wf = (wr.get("workflow") or {}).get("name")
wf = wf or "(no workflow)"
name = node.get("name") or "(unnamed job)"
failures.append(
OptionalFailure(
kind="CheckRun",
workflow_or_context=wf,
name=name,
detail=f"conclusion={conc}",
)
)
elif typename == "StatusContext":
if node.get("isRequired") is not False:
continue
st = node.get("state")
if st not in _FAILED_STATUS_CONTEXT_STATES:
continue
ctx = node.get("context") or "(context)"
failures.append(
OptionalFailure(
kind="StatusContext",
workflow_or_context=ctx,
name=ctx,
detail=f"state={st}",
)
)
return failures
def _required_checks_satisfied(pr_obj: dict[str, Any]) -> tuple[bool, list[str]]:
"""True if every required context is present and successful (rollup may omit checks)."""
reasons: list[str] = []
try:
commit = pr_obj["commits"]["nodes"][0]["commit"]
except (IndexError, KeyError, TypeError):
return False, ["no head commit on PR"]
rollup = commit.get("statusCheckRollup") or {}
contexts_conn = rollup.get("contexts") or {}
if contexts_conn.get("pageInfo", {}).get("hasNextPage"):
return False, [
"status contexts truncated (>100); cannot verify all required checks",
]
for node in contexts_conn.get("nodes") or []:
if not node:
continue
typename = node.get("__typename")
if typename == "CheckRun":
if node.get("isRequired") is not True:
continue
name = node.get("name") or "(unnamed check)"
st = node.get("status")
if st != "COMPLETED":
reasons.append(f"required check not finished: {name!r} ({st})")
continue
conc = node.get("conclusion")
if conc not in _SUCCESS_CHECK_RUN_CONCLUSIONS:
reasons.append(f"required check not successful: {name!r} (conclusion={conc})")
elif typename == "StatusContext":
if node.get("isRequired") is not True:
continue
ctx = node.get("context") or "(context)"
state = node.get("state")
if state != "SUCCESS":
reasons.append(f"required status context {ctx!r}: {state}")
return (len(reasons) == 0, reasons)
def _real_optional_failures(failures: list[OptionalFailure]) -> list[OptionalFailure]:
"""CheckRun / StatusContext only (excludes Meta pagination hints)."""
return [f for f in failures if f.kind in ("CheckRun", "StatusContext")]
def main() -> int:
p = argparse.ArgumentParser(
description=(
"List open PRs with no merge conflicts and all required checks green, "
"and any failing non-required checks."
)
)
p.add_argument("--owner", default="zephyrproject-rtos", help="GitHub org or user")
p.add_argument("--repo", default="zephyr", help="Repository name")
p.add_argument(
"--batch-size",
type=int,
default=12,
metavar="N",
help="Number of PRs to fetch per GraphQL request (default: 12)",
)
p.add_argument(
"--no-progress",
action="store_true",
help="Disable progress output on stderr",
)
p.add_argument(
"--only-nonrequired-failures",
action="store_true",
help=(
"Print only PRs (among truly mergeable) that have at least one "
"failing non-required check"
),
)
args = p.parse_args()
prog_off = _progress_disabled(args.no_progress)
try:
token = _get_token()
except RuntimeError as e:
print(str(e), file=sys.stderr)
return 1
mergeable_prs: list[dict[str, Any]] = []
unknown_mergeable = 0
if not prog_off:
print("Fetching open PRs (GraphQL)...", file=sys.stderr)
for node in _iter_with_progress(
_iter_open_prs(args.owner, args.repo, token),
desc="Open PRs",
unit="PR",
disable=prog_off,
):
m = node.get("mergeable")
if m == "MERGEABLE":
mergeable_prs.append(node)
elif m == "UNKNOWN":
unknown_mergeable += 1
extra = (
f" ({unknown_mergeable} open PR(s) have mergeable=UNKNOWN and are skipped)"
if unknown_mergeable
else ""
)
print(
f"Open PRs (no merge conflicts, GitHub mergeable): {len(mergeable_prs)}{extra}"
)
print("(only PRs with all required checks successful are listed below)")
if args.only_nonrequired_failures:
print("(output limited to PRs with failing non-required checks)")
if not mergeable_prs:
print("No open PRs with mergeable state (no conflicts).")
return 0
numbers = [int(n["number"]) for n in mergeable_prs]
batch_size = max(1, args.batch_size)
any_real_optional_failures = False
prs_with_real_failures = 0
skipped_not_truly_mergeable = 0
n_truly_mergeable = 0
if not prog_off:
print("Fetching check rollups for mergeable PRs...", file=sys.stderr)
pbar = _total_progress_bar(
len(numbers),
desc="Check rollups",
unit="PR",
disable=prog_off,
)
try:
for i in range(0, len(numbers), batch_size):
chunk = numbers[i : i + batch_size]
rollups = _fetch_pr_check_rollups(args.owner, args.repo, token, chunk)
pbar.update(len(chunk))
for num in chunk:
pr_list_meta = next(
(x for x in mergeable_prs if int(x["number"]) == num), None
)
pr_obj = rollups.get(num)
title = (pr_list_meta or {}).get("title") or (pr_obj or {}).get("title") or ""
url = (pr_list_meta or {}).get("url") or (pr_obj or {}).get("url") or ""
if not pr_obj:
if args.only_nonrequired_failures:
print(
f"Warning: could not load check rollup for PR #{num}",
file=sys.stderr,
)
else:
print(f"\n#{num} {title}\n {url}\n (could not load check rollup)")
continue
# Re-check mergeable in case it changed between list and detail fetch.
if pr_obj.get("mergeable") != "MERGEABLE":
continue
req_ok, _req_reasons = _required_checks_satisfied(pr_obj)
if not req_ok:
skipped_not_truly_mergeable += 1
continue
n_truly_mergeable += 1
fails = _rollup_optional_failures(num, pr_obj)
real = _real_optional_failures(fails)
if args.only_nonrequired_failures and not real:
continue
print(f"\n#{num} {title}\n {url}")
if not fails:
print(" Non-required failing checks: none")
continue
if real:
any_real_optional_failures = True
prs_with_real_failures += 1
print(" Non-required failing checks:")
for f in fails:
if f.kind == "Meta":
print(f" ! {f.detail}")
elif f.kind == "CheckRun":
print(
f" - workflow={f.workflow_or_context!r} "
f"job={f.name!r} ({f.detail})"
)
else:
print(
f" - status context={f.workflow_or_context!r} ({f.detail})"
)
finally:
pbar.close()
if skipped_not_truly_mergeable:
print(
f"Note: omitted {skipped_not_truly_mergeable} PR(s) "
"(required checks not all successful, incomplete, or rollup truncated).",
file=sys.stderr,
)
if args.only_nonrequired_failures:
if n_truly_mergeable == 0:
print(
"\nNo truly mergeable PRs (no candidate had all required checks successful)."
)
elif not any_real_optional_failures:
print(
"\nNo failing non-required checks on any truly mergeable PR."
)
else:
print(
f"\nTotal: {prs_with_real_failures} truly mergeable PR(s) "
"with failing non-required checks."
)
elif n_truly_mergeable == 0:
print(
"\nNo truly mergeable PRs (no candidate had all required checks successful)."
)
elif not any_real_optional_failures:
print("\nSummary: no failing non-required checks on listed truly mergeable PRs.")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment