Skip to content

Instantly share code, notes, and snippets.

@bloodearnest
Created March 18, 2026 13:18
Show Gist options
  • Select an option

  • Save bloodearnest/23b20fc0f34c2322a41a429666ede057 to your computer and use it in GitHub Desktop.

Select an option

Save bloodearnest/23b20fc0f34c2322a41a429666ede057 to your computer and use it in GitHub Desktop.
Benchmark docker volume IO options
#!/usr/bin/env python3
"""
Standalone benchmark for comparing Docker storage strategies.
Run with:
uvx python scripts/docker_copy_benchmark.py
or:
uvx --from . python scripts/docker_copy_benchmark.py
"""
from __future__ import annotations
import argparse
import os
import shutil
import statistics
import subprocess
import sys
import tarfile
import tempfile
import textwrap
import time
import uuid
from pathlib import Path
MANAGEMENT_IMAGE = "ghcr.io/opensafely-core/busybox"
RUNTIME_IMAGE = "ghcr.io/opensafely-core/python:v2"
MOUNT_POINT = "/workspace"
SOURCE_NAME = "source.bin"
READ_SCRIPT_NAME = "read.py"
WRITE_SCRIPT_NAME = "write.py"
OUTPUT_NAME = "output-copy.bin"
DEFAULT_FILE_SIZE_GIB = 1.0
DEFAULT_REPEATS = 3
READ_SCRIPT = """
import pathlib
import sys
import time
source = pathlib.Path(sys.argv[1])
started = time.perf_counter()
data = source.read_bytes()
elapsed = time.perf_counter() - started
print(f"RESULT bytes={len(data)} read_seconds={elapsed:.6f}")
"""
WRITE_SCRIPT = """
import os
import pathlib
import sys
import time
source = pathlib.Path(sys.argv[1])
dest = pathlib.Path(sys.argv[2])
started = time.perf_counter()
data = source.read_bytes()
read_elapsed = time.perf_counter() - started
started = time.perf_counter()
with dest.open("wb") as handle:
handle.write(data)
handle.flush()
os.fsync(handle.fileno())
write_elapsed = time.perf_counter() - started
print(
f"RESULT bytes_read={len(data)} bytes_written={dest.stat().st_size} "
f"read_seconds={read_elapsed:.6f} write_seconds={write_elapsed:.6f}"
)
"""
def run(cmd, **kwargs):
return subprocess.run(cmd, check=True, **kwargs)
def docker(*args, **kwargs):
return run(["docker", *args], **kwargs)
def docker_version():
return docker(
"version",
"--format",
"{{.Server.Version}}",
capture_output=True,
text=True,
).stdout.strip()
def default_cache_dir():
if sys.platform == "win32":
base = os.environ.get("LOCALAPPDATA") or str(Path.home() / "AppData" / "Local")
return Path(base) / "docker-copy-benchmark"
if sys.platform == "darwin":
return Path.home() / "Library" / "Caches" / "docker-copy-benchmark"
return Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache")) / (
"docker-copy-benchmark"
)
def ensure_source_file(cache_dir, size_bytes):
cache_dir.mkdir(parents=True, exist_ok=True)
path = cache_dir / f"source-{size_bytes}.bin"
if path.exists() and path.stat().st_size == size_bytes:
return path, True
tmp_path = path.with_suffix(".tmp")
chunk_size = 8 * 1024 * 1024
remaining = size_bytes
with tmp_path.open("wb") as handle:
while remaining:
this_chunk = min(chunk_size, remaining)
handle.write(os.urandom(this_chunk))
remaining -= this_chunk
tmp_path.replace(path)
return path, False
def ensure_images():
for image in (MANAGEMENT_IMAGE, RUNTIME_IMAGE):
print(f"Pulling image: {image}")
docker("pull", image)
def create_workload_scripts(directory):
read_path = directory / READ_SCRIPT_NAME
write_path = directory / WRITE_SCRIPT_NAME
read_path.write_text(textwrap.dedent(READ_SCRIPT).strip() + "\n")
write_path.write_text(textwrap.dedent(WRITE_SCRIPT).strip() + "\n")
return read_path, write_path
def parse_result(output):
for line in output.splitlines():
if line.startswith("RESULT "):
fields = {}
for part in line.split()[1:]:
key, value = part.split("=", 1)
fields[key] = value
return fields
raise RuntimeError(f"Did not find RESULT line in output:\n{output}")
def create_manager():
suffix = uuid.uuid4().hex[:12]
volume_name = f"docker-copy-bench-{suffix}"
container_name = f"{volume_name}-manager"
docker("volume", "create", "--name", volume_name, capture_output=True)
docker(
"run",
"--detach",
"--interactive",
"--init",
"--name",
container_name,
"--mount",
f"type=volume,source={volume_name},target={MOUNT_POINT}",
"--network",
"none",
MANAGEMENT_IMAGE,
"sh",
capture_output=True,
)
return volume_name, container_name
def cleanup_manager(container_name, volume_name):
docker("container", "rm", "--force", container_name, capture_output=True)
docker("volume", "rm", volume_name, capture_output=True)
def mount_args(strategy, source):
if strategy == "bind":
mount_type = "bind"
else:
mount_type = "volume"
return ["--mount", f"type={mount_type},source={source},target={MOUNT_POINT}"]
def run_runtime_command(strategy, source, script_name, source_path, output_path=None):
cmd = [
"run",
"--rm",
"--network",
"none",
*mount_args(strategy, source),
RUNTIME_IMAGE,
"python",
f"{MOUNT_POINT}/{script_name}",
str(source_path),
]
if output_path is not None:
cmd.append(str(output_path))
response = docker(*cmd, capture_output=True, text=True)
return parse_result(response.stdout), response.stdout
def copy_file(src, dest):
dest.parent.mkdir(parents=True, exist_ok=True)
started = time.perf_counter()
shutil.copyfile(src, dest)
return time.perf_counter() - started
def prepare_bind(work_dir, cached_source, read_script, write_script):
started = time.perf_counter()
shutil.copyfile(cached_source, work_dir / SOURCE_NAME)
shutil.copyfile(read_script, work_dir / READ_SCRIPT_NAME)
shutil.copyfile(write_script, work_dir / WRITE_SCRIPT_NAME)
return time.perf_counter() - started
def prepare_volume_docker_cp(manager_name, cached_source, read_script, write_script):
started = time.perf_counter()
docker(
"cp",
"--follow-link",
str(cached_source),
f"{manager_name}:{MOUNT_POINT}/{SOURCE_NAME}",
capture_output=True,
)
docker(
"cp",
"--follow-link",
str(read_script),
f"{manager_name}:{MOUNT_POINT}/{READ_SCRIPT_NAME}",
capture_output=True,
)
docker(
"cp",
"--follow-link",
str(write_script),
f"{manager_name}:{MOUNT_POINT}/{WRITE_SCRIPT_NAME}",
capture_output=True,
)
return time.perf_counter() - started
def stream_tar(manager_name, cached_source, read_script, write_script):
process = subprocess.Popen(
[
"docker",
"exec",
"-i",
manager_name,
"tar",
"-xf",
"-",
"-C",
MOUNT_POINT,
],
stdin=subprocess.PIPE,
)
try:
with tarfile.open(fileobj=process.stdin, mode="w|") as tar:
tar.add(cached_source, arcname=SOURCE_NAME)
tar.add(read_script, arcname=READ_SCRIPT_NAME)
tar.add(write_script, arcname=WRITE_SCRIPT_NAME)
finally:
if process.stdin is not None:
process.stdin.close()
return_code = process.wait()
if return_code != 0:
raise subprocess.CalledProcessError(return_code, process.args)
def prepare_volume_tar(manager_name, cached_source, read_script, write_script):
started = time.perf_counter()
stream_tar(manager_name, cached_source, read_script, write_script)
return time.perf_counter() - started
def export_bind(work_dir, export_dir):
return copy_file(work_dir / OUTPUT_NAME, export_dir / OUTPUT_NAME)
def export_volume(manager_name, export_dir):
export_dir.mkdir(parents=True, exist_ok=True)
destination = export_dir / OUTPUT_NAME
started = time.perf_counter()
docker(
"cp",
f"{manager_name}:{MOUNT_POINT}/{OUTPUT_NAME}",
str(destination),
capture_output=True,
)
return time.perf_counter() - started
def verify_file_size(path, expected_size):
if not path.exists():
raise RuntimeError(f"Expected file to exist: {path}")
actual = path.stat().st_size
if actual != expected_size:
raise RuntimeError(f"Expected {path} to be {expected_size} bytes, got {actual}")
def summarize(values):
return {
"min": min(values),
"median": statistics.median(values),
"mean": statistics.mean(values),
"max": max(values),
}
def format_seconds(value):
return f"{value:.2f}s"
def print_summary(results):
medians = {}
print("\nSummary")
print(
f"{'strategy':<12} {'prepare':>10} {'cold':>10} {'warm':>10} "
f"{'write-read':>12} {'write-write':>12} {'export':>10} {'total':>10}"
)
for strategy in ("bind", "docker-cp", "tar-stream"):
samples = results.get(strategy)
if not samples:
continue
prepare = summarize([item["prepare"] for item in samples])["median"]
cold = summarize([item["cold_read"] for item in samples])["median"]
warm = summarize([item["warm_read"] for item in samples])["median"]
write_read = summarize([item["write_read"] for item in samples])["median"]
write_write = summarize([item["write_write"] for item in samples])["median"]
export = summarize([item["export"] for item in samples])["median"]
total = prepare + cold + warm + write_read + write_write + export
medians[strategy] = {
"prepare": prepare,
"cold": cold,
"warm": warm,
"write_read": write_read,
"write_write": write_write,
"export": export,
"total": total,
}
print(
f"{strategy:<12} {format_seconds(prepare):>10} {format_seconds(cold):>10} "
f"{format_seconds(warm):>10} {format_seconds(write_read):>12} "
f"{format_seconds(write_write):>12} {format_seconds(export):>10} "
f"{format_seconds(total):>10}"
)
if not medians:
return
fastest_strategy = min(medians, key=lambda strategy: medians[strategy]["total"])
fastest_total = medians[fastest_strategy]["total"]
print("\nRelative to fastest")
for strategy in medians:
slowdown = medians[strategy]["total"] / fastest_total
if strategy == fastest_strategy:
print(f" {strategy}: baseline")
else:
print(f" {strategy}: {slowdown:.2f}x slower than {fastest_strategy}")
print("\nMedian cost split")
for strategy, values in medians.items():
staging = values["prepare"] + values["export"]
runtime = (
values["cold"]
+ values["warm"]
+ values["write_read"]
+ values["write_write"]
)
dominant = "staging/export dominates" if staging > runtime else "runtime dominates"
print(
f" {strategy}: staging+export {format_seconds(staging)}, "
f"runtime {format_seconds(runtime)} ({dominant})"
)
def run_strategy(strategy, repetition, root_dir, cached_source, source_size, read_script, write_script):
work_dir = root_dir / strategy / f"repeat-{repetition}" / "work"
export_dir = root_dir / strategy / f"repeat-{repetition}" / "export"
work_dir.mkdir(parents=True, exist_ok=True)
export_dir.mkdir(parents=True, exist_ok=True)
manager_name = None
volume_name = None
try:
if strategy == "bind":
prepare_seconds = prepare_bind(work_dir, cached_source, read_script, write_script)
runtime_source = str(work_dir.resolve())
else:
volume_name, manager_name = create_manager()
runtime_source = volume_name
if strategy == "docker-cp":
prepare_seconds = prepare_volume_docker_cp(
manager_name, cached_source, read_script, write_script
)
else:
prepare_seconds = prepare_volume_tar(
manager_name, cached_source, read_script, write_script
)
read_result, _ = run_runtime_command(
strategy,
runtime_source,
READ_SCRIPT_NAME,
Path(MOUNT_POINT) / SOURCE_NAME,
)
warm_result, _ = run_runtime_command(
strategy,
runtime_source,
READ_SCRIPT_NAME,
Path(MOUNT_POINT) / SOURCE_NAME,
)
write_result, _ = run_runtime_command(
strategy,
runtime_source,
WRITE_SCRIPT_NAME,
Path(MOUNT_POINT) / SOURCE_NAME,
Path(MOUNT_POINT) / OUTPUT_NAME,
)
if int(read_result["bytes"]) != source_size:
raise RuntimeError("Cold-read byte count did not match source size")
if int(warm_result["bytes"]) != source_size:
raise RuntimeError("Warm-read byte count did not match source size")
if int(write_result["bytes_read"]) != source_size:
raise RuntimeError("Write-read byte count did not match source size")
if int(write_result["bytes_written"]) != source_size:
raise RuntimeError("Write-write byte count did not match source size")
if strategy == "bind":
verify_file_size(work_dir / OUTPUT_NAME, source_size)
export_seconds = export_bind(work_dir, export_dir)
else:
export_seconds = export_volume(manager_name, export_dir)
verify_file_size(export_dir / OUTPUT_NAME, source_size)
return {
"prepare": prepare_seconds,
"cold_read": float(read_result["read_seconds"]),
"warm_read": float(warm_result["read_seconds"]),
"write_read": float(write_result["read_seconds"]),
"write_write": float(write_result["write_seconds"]),
"export": export_seconds,
"exported_path": export_dir / OUTPUT_NAME,
}
finally:
if manager_name is not None and volume_name is not None:
cleanup_manager(manager_name, volume_name)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--strategy",
action="append",
choices=["bind", "docker-cp", "tar-stream"],
help="Run only the named strategy. Repeat to choose multiple.",
)
parser.add_argument("--repeats", type=int, default=DEFAULT_REPEATS)
parser.add_argument("--file-size-gib", type=float, default=DEFAULT_FILE_SIZE_GIB)
parser.add_argument("--cache-dir", type=Path, default=default_cache_dir())
parser.add_argument("--work-dir", type=Path)
parser.add_argument("--keep-artifacts", action="store_true")
args = parser.parse_args()
if args.repeats < 1:
raise SystemExit("--repeats must be >= 1")
if args.file_size_gib <= 0:
raise SystemExit("--file-size-gib must be > 0")
return args
def main():
args = parse_args()
strategies = args.strategy or ["bind", "docker-cp", "tar-stream"]
source_size = int(args.file_size_gib * (1024**3))
if source_size < 1:
raise SystemExit("--file-size-gib is too small")
ensure_images()
cached_source, cache_hit = ensure_source_file(args.cache_dir, source_size)
if args.work_dir is None:
root_dir_obj = Path(tempfile.mkdtemp(prefix="docker-copy-benchmark-"))
cleanup_root = not args.keep_artifacts
else:
root_dir_obj = args.work_dir.resolve()
root_dir_obj.mkdir(parents=True, exist_ok=True)
cleanup_root = False
script_dir = root_dir_obj / "scripts"
script_dir.mkdir(parents=True, exist_ok=True)
read_script, write_script = create_workload_scripts(script_dir)
print("Docker storage benchmark")
print(f"Host platform: {sys.platform}")
print(f"Python version: {sys.version.split()[0]}")
print(f"Docker version: {docker_version()}")
print(f"Management image: {MANAGEMENT_IMAGE}")
print(f"Runtime image: {RUNTIME_IMAGE}")
print(f"Source file: {cached_source}")
print(f"Source size: {source_size} bytes")
print(f"Source cache hit: {'yes' if cache_hit else 'no'}")
print(f"Work root: {root_dir_obj}")
print(f"Repeats: {args.repeats}")
print(f"Strategies: {', '.join(strategies)}")
results = {strategy: [] for strategy in strategies}
try:
for strategy in strategies:
print(f"\nStrategy: {strategy}")
for repetition in range(1, args.repeats + 1):
result = run_strategy(
strategy,
repetition,
root_dir_obj,
cached_source,
source_size,
read_script,
write_script,
)
results[strategy].append(result)
print(
f" repeat {repetition}: prepare {format_seconds(result['prepare'])}"
f" | cold-read {format_seconds(result['cold_read'])}"
f" | warm-read {format_seconds(result['warm_read'])}"
f" | write-read {format_seconds(result['write_read'])}"
f" | write-write {format_seconds(result['write_write'])}"
f" | export {format_seconds(result['export'])}"
)
print_summary(results)
if cleanup_root:
print("\nArtifacts were cleaned up after the run.")
else:
print("\nLast exported files")
for strategy in strategies:
if results[strategy]:
print(f" {strategy}: {results[strategy][-1]['exported_path']}")
finally:
if cleanup_root:
shutil.rmtree(root_dir_obj, ignore_errors=True)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment