|
#!/usr/bin/env python3 |
|
""" |
|
Survey flake.nix vs shell.nix adoption on GitHub. |
|
|
|
Uses the GitHub REST API (via `gh api`) to: |
|
1. Count root-level flake.nix / shell.nix / default.nix / flake.lock files |
|
2. Sample N repos from each population |
|
3. Check each sampled repo for the *other* file to measure overlap |
|
4. Detect flake-compat shims in shell.nix |
|
5. Sample repo creation years and .envrc presence |
|
|
|
Requires: gh CLI (authenticated), Python 3.13 |
|
|
|
Usage: |
|
./nix-flake-survey.py --sample-size 300 --output report.md |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import base64 |
|
import json |
|
import random |
|
import subprocess |
|
import sys |
|
import time |
|
from collections import Counter |
|
from dataclasses import dataclass, field |
|
from pathlib import Path |
|
|
|
|
|
def gh_api(path: str, *, params: dict[str, str] | None = None) -> dict | None: |
|
"""Call `gh api` and return parsed JSON, or None on 404/error.""" |
|
cmd = ["gh", "api"] |
|
if params: |
|
cmd += ["-X", "GET"] |
|
for k, v in params.items(): |
|
cmd += ["-f", f"{k}={v}"] |
|
cmd.append(path) |
|
try: |
|
out = subprocess.run( |
|
cmd, capture_output=True, text=True, check=True, timeout=30 |
|
) |
|
return json.loads(out.stdout) |
|
except (subprocess.CalledProcessError, json.JSONDecodeError): |
|
return None |
|
|
|
|
|
def search_count(query: str) -> int: |
|
"""Return approximate total_count for a code search query.""" |
|
res = gh_api("search/code", params={"q": query}) |
|
return res.get("total_count", 0) if res else 0 |
|
|
|
|
|
def search_repos(query: str, n: int) -> list[str]: |
|
"""Return up to n unique repo full_names matching a code search query. |
|
|
|
Search API has a strict secondary rate limit (30 req/min). Retry with |
|
backoff on empty results before giving up. |
|
""" |
|
repos: set[str] = set() |
|
page = 1 |
|
retries = 0 |
|
while len(repos) < n and page <= 10: # API caps at 1000 results |
|
res = gh_api( |
|
"search/code", |
|
params={"q": query, "per_page": "100", "page": str(page)}, |
|
) |
|
if not res or not res.get("items"): |
|
if retries < 3: |
|
retries += 1 |
|
wait = 20 * retries |
|
print( |
|
f" search empty/rate-limited, retry {retries} in {wait}s", |
|
file=sys.stderr, |
|
) |
|
time.sleep(wait) |
|
continue |
|
break |
|
retries = 0 |
|
for item in res["items"]: |
|
repos.add(item["repository"]["full_name"]) |
|
page += 1 |
|
time.sleep(6) # stay under 10 req/min to be safe |
|
return sorted(repos)[:n] |
|
|
|
|
|
def has_file(repo: str, path: str) -> bool: |
|
"""Check if a file exists at the given path in repo's default branch.""" |
|
return gh_api(f"repos/{repo}/contents/{path}") is not None |
|
|
|
|
|
def get_file(repo: str, path: str) -> str: |
|
"""Fetch decoded file contents, or empty string on failure.""" |
|
res = gh_api(f"repos/{repo}/contents/{path}") |
|
if not res or "content" not in res: |
|
return "" |
|
try: |
|
return base64.b64decode(res["content"]).decode("utf-8", errors="replace") |
|
except Exception: |
|
return "" |
|
|
|
|
|
def repo_created_year(repo: str) -> str | None: |
|
res = gh_api(f"repos/{repo}") |
|
return res["created_at"][:4] if res else None |
|
|
|
|
|
@dataclass |
|
class SurveyResult: |
|
totals: dict[str, int] = field(default_factory=dict) |
|
shell_sample: list[str] = field(default_factory=list) |
|
flake_sample: list[str] = field(default_factory=list) |
|
shell_both: list[str] = field(default_factory=list) |
|
shell_only: list[str] = field(default_factory=list) |
|
shell_compat: list[str] = field(default_factory=list) |
|
flake_both: list[str] = field(default_factory=list) |
|
flake_only: list[str] = field(default_factory=list) |
|
shell_envrc: int = 0 |
|
flake_envrc: int = 0 |
|
shell_years: Counter[str] = field(default_factory=Counter) |
|
flake_years: Counter[str] = field(default_factory=Counter) |
|
|
|
|
|
def run_survey(sample_size: int) -> SurveyResult: |
|
r = SurveyResult() |
|
|
|
print("== Fetching total counts ==", file=sys.stderr) |
|
for f in ("flake.nix", "shell.nix", "default.nix", "flake.lock"): |
|
r.totals[f] = search_count(f"filename:{f} path:/") |
|
print(f" {f}: {r.totals[f]}", file=sys.stderr) |
|
time.sleep(2) |
|
|
|
print(f"== Sampling {sample_size} repos each ==", file=sys.stderr) |
|
r.shell_sample = search_repos("filename:shell.nix path:/", sample_size) |
|
r.flake_sample = search_repos("filename:flake.nix path:/", sample_size) |
|
print( |
|
f" shell={len(r.shell_sample)} flake={len(r.flake_sample)}", |
|
file=sys.stderr, |
|
) |
|
|
|
print("== Checking shell.nix repos for flake.nix ==", file=sys.stderr) |
|
for i, repo in enumerate(r.shell_sample, 1): |
|
if has_file(repo, "flake.nix"): |
|
r.shell_both.append(repo) |
|
content = get_file(repo, "shell.nix") |
|
if "flake-compat" in content or "getFlake" in content: |
|
r.shell_compat.append(repo) |
|
else: |
|
r.shell_only.append(repo) |
|
if i % 50 == 0: |
|
print(f" {i}/{len(r.shell_sample)}", file=sys.stderr) |
|
|
|
print("== Checking flake.nix repos for shell.nix ==", file=sys.stderr) |
|
for i, repo in enumerate(r.flake_sample, 1): |
|
if has_file(repo, "shell.nix"): |
|
r.flake_both.append(repo) |
|
else: |
|
r.flake_only.append(repo) |
|
if i % 50 == 0: |
|
print(f" {i}/{len(r.flake_sample)}", file=sys.stderr) |
|
|
|
print("== Checking .envrc (first 100 each) ==", file=sys.stderr) |
|
r.shell_envrc = sum(has_file(repo, ".envrc") for repo in r.shell_sample[:100]) |
|
r.flake_envrc = sum(has_file(repo, ".envrc") for repo in r.flake_sample[:100]) |
|
|
|
print("== Sampling creation years (30 each) ==", file=sys.stderr) |
|
for repo in random.sample(r.shell_only, min(30, len(r.shell_only))): |
|
if year := repo_created_year(repo): |
|
r.shell_years[year] += 1 |
|
for repo in random.sample(r.flake_only, min(30, len(r.flake_only))): |
|
if year := repo_created_year(repo): |
|
r.flake_years[year] += 1 |
|
|
|
return r |
|
|
|
|
|
def render_report(r: SurveyResult) -> str: |
|
n_shell = len(r.shell_sample) |
|
n_flake = len(r.flake_sample) |
|
|
|
def pct(a: int, b: int) -> str: |
|
return f"{100 * a / b:.1f}" if b else "0.0" |
|
|
|
shell_only_est = int(r.totals["shell.nix"] * len(r.shell_only) / n_shell) |
|
flake_only_est = int(r.totals["flake.nix"] * len(r.flake_only) / n_flake) |
|
overlap_lo = int(r.totals["flake.nix"] * len(r.flake_both) / n_flake) |
|
overlap_hi = int(r.totals["shell.nix"] * len(r.shell_both) / n_shell) |
|
|
|
def year_bar(years: Counter[str]) -> str: |
|
return "\n".join(f"{y} {'▌' * c}" for y, c in sorted(years.items())) |
|
|
|
return f"""# flake.nix vs shell.nix on GitHub — Adoption Report |
|
|
|
**Generated by:** `nix-flake-survey.py` |
|
**Sample size:** {n_shell} / {n_flake} repos |
|
|
|
## Raw Totals (root-level) |
|
|
|
| File | Count | |
|
|---|---:| |
|
| shell.nix | {r.totals["shell.nix"]:,} | |
|
| flake.nix | {r.totals["flake.nix"]:,} | |
|
| default.nix | {r.totals["default.nix"]:,} | |
|
| flake.lock | {r.totals["flake.lock"]:,} | |
|
|
|
## Overlap (sampled) |
|
|
|
**shell.nix repos (n={n_shell}):** |
|
- Only shell.nix: {len(r.shell_only)} ({pct(len(r.shell_only), n_shell)}%) |
|
- Has both: {len(r.shell_both)} ({pct(len(r.shell_both), n_shell)}%) |
|
- flake-compat shim: {len(r.shell_compat)} ({pct(len(r.shell_compat), len(r.shell_both) or 1)}% of "both") |
|
|
|
**flake.nix repos (n={n_flake}):** |
|
- Only flake.nix: {len(r.flake_only)} ({pct(len(r.flake_only), n_flake)}%) |
|
- Has both: {len(r.flake_both)} ({pct(len(r.flake_both), n_flake)}%) |
|
|
|
## Extrapolated Estimates |
|
|
|
| Category | Estimate | |
|
|---|---:| |
|
| shell.nix only | ~{shell_only_est:,} | |
|
| flake.nix only | ~{flake_only_est:,} | |
|
| Both | ~{overlap_lo:,}–{overlap_hi:,} | |
|
|
|
## Repo Creation Years |
|
|
|
Random subsample of 30 repos per category. Each `▌` = 1 repo created |
|
that year. |
|
|
|
**shell.nix-only:** |
|
|
|
``` |
|
{year_bar(r.shell_years)} |
|
``` |
|
|
|
**flake.nix-only:** |
|
|
|
``` |
|
{year_bar(r.flake_years)} |
|
``` |
|
|
|
## direnv (.envrc) Adoption |
|
|
|
- shell.nix repos: {r.shell_envrc}/100 |
|
- flake.nix repos: {r.flake_envrc}/100 |
|
|
|
## Methodology |
|
|
|
1. **Totals** via `GET /search/code?q=filename:X path:/` → `.total_count` |
|
2. **Samples** from first {n_shell}/{n_flake} results of each search |
|
3. **Overlap** by probing `GET /repos/{{owner}}/{{name}}/contents/{{file}}` |
|
for each sampled repo (404 = absent; check exit code, not stdout) |
|
4. **Compat shims** detected by grepping shell.nix for |
|
`flake-compat|getFlake` |
|
5. **Years** from `GET /repos/{{owner}}/{{name}}` → `.created_at[:4]` |
|
on a random subsample of 30 |
|
|
|
Caveats: GitHub search counts are approximate (±10%), results are |
|
capped at 1000 and ordered by relevance (popularity bias), forks not |
|
filtered. Margin of error ≈ ±5pp at n=300. |
|
""" |
|
|
|
|
|
def main() -> None: |
|
ap = argparse.ArgumentParser(description=__doc__) |
|
ap.add_argument("--sample-size", type=int, default=300) |
|
ap.add_argument("--output", type=Path, default=Path("report.md")) |
|
args = ap.parse_args() |
|
|
|
result = run_survey(args.sample_size) |
|
report = render_report(result) |
|
args.output.write_text(report) |
|
print(f"\nReport written to {args.output}", file=sys.stderr) |
|
print(report) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |