Created
December 15, 2025 03:51
-
-
Save REASY/4010e427ac377ca2e25023b6c02bbd4f to your computer and use it in GitHub Desktop.
Python: Multi-Threading with GIL and Free-Threading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import threading | |
| import sys | |
| import time | |
| import os | |
| from typing import List, NamedTuple | |
| # Optional import for visualization | |
| try: | |
| import matplotlib.pyplot as plt | |
| import matplotlib.patches as mpatches | |
| HAS_MATPLOTLIB = True | |
| except ImportError: | |
| HAS_MATPLOTLIB = False | |
| # --- Configuration --- | |
| CHUNK_SIZE = 1_000 # Smaller chunks for better resolution | |
| TOTAL_ITERS = 20_000_000 # Total work per thread | |
| NUM_THREADS = 8 # Keep it clean | |
| class TaskEvent(NamedTuple): | |
| thread_id: int | |
| start_time: float | |
| end_time: float | |
| class CpuBoundTask: | |
| def __init__(self, thread_id: int, collector: ThreadSafeResultCollector): | |
| self.thread_id = thread_id | |
| self.num_chunks = TOTAL_ITERS // CHUNK_SIZE | |
| self.collector = collector | |
| def run(self) -> None: | |
| local_events: List[TaskEvent] = [] | |
| for _ in range(self.num_chunks): | |
| t0 = time.perf_counter() | |
| # Burn CPU | |
| val = 0 | |
| for i in range(CHUNK_SIZE): | |
| val += i % 256 | |
| t1 = time.perf_counter() | |
| local_events.append(TaskEvent(self.thread_id, t0, t1)) | |
| self.collector.add_result(local_events) | |
| class ThreadSafeResultCollector: | |
| def __init__(self) -> None: | |
| self._results: List[TaskEvent] = [] | |
| self._lock = threading.Lock() | |
| def add_result(self, events: List[TaskEvent]) -> None: | |
| with self._lock: | |
| self._results.extend(events) | |
| def get_results(self) -> List[TaskEvent]: | |
| with self._lock: | |
| # Return a shallow copy to prevent external modification issues | |
| return list(self._results) | |
| def plot_timeline(history: List[TaskEvent]): | |
| if not history: | |
| print("No history recorded.") | |
| return | |
| if not HAS_MATPLOTLIB: | |
| print("Error: matplotlib is not installed. Visualization disabled.") | |
| return | |
| # Normalize times | |
| min_t = min(e.start_time for e in history) | |
| max_t = max(e.end_time for e in history) | |
| total_duration_ms = (max_t - min_t) * 1000 | |
| # Setup Plot | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| # Distinct colors for threads | |
| colors = plt.cm.get_cmap("tab10", NUM_THREADS) | |
| # Plot each chunk as a horizontal bar | |
| # (y, width, height, left) | |
| bar_height = 0.8 | |
| print("Rendering plot...") | |
| for tid in range(NUM_THREADS): | |
| events = sorted( | |
| [e for e in history if e.thread_id == tid], key=lambda x: x.start_time | |
| ) | |
| if not events: | |
| continue | |
| # Add broken barh | |
| ranges = [] | |
| for e in events: | |
| start_norm = (e.start_time - min_t) * 1000 # Convert to ms | |
| duration = (e.end_time - e.start_time) * 1000 | |
| ranges.append((start_norm, duration)) | |
| ax.broken_barh( | |
| ranges, (tid - bar_height / 2, bar_height), facecolors=colors(tid) | |
| ) | |
| # Detect GIL status for title | |
| is_gil_enabled = True | |
| if hasattr(sys, "_is_gil_enabled"): | |
| is_gil_enabled = sys._is_gil_enabled() # type: ignore | |
| mode = ( | |
| "Standard (GIL Enabled)" if is_gil_enabled else "Free-Threading (GIL Disabled)" | |
| ) | |
| ax.set_yticks(range(NUM_THREADS)) | |
| ax.set_yticklabels([f"Thread {i}" for i in range(NUM_THREADS)]) | |
| ax.set_xlabel("Time (milliseconds)") | |
| # Updated title with total duration | |
| title = ( | |
| f"Python Thread Scheduling Visualization\n" | |
| f"{sys.version.split()[0]} - {mode}\n" | |
| f"Total Duration: {total_duration_ms:.2f} ms" | |
| ) | |
| ax.set_title(title) | |
| ax.grid(True, axis="x", linestyle="--", alpha=0.7) | |
| plt.tight_layout() | |
| plt.show() | |
| def report_text(history: List[TaskEvent]): | |
| if not history: | |
| print("No history recorded.") | |
| return | |
| min_t = min(e.start_time for e in history) | |
| max_t = max(e.end_time for e in history) | |
| total_duration = max_t - min_t | |
| print("-" * 40) | |
| print(f"Total Execution Time: {total_duration:.4f}s") | |
| print("-" * 40) | |
| # Calculate per-thread active time | |
| thread_durations = {} | |
| for e in history: | |
| thread_durations[e.thread_id] = thread_durations.get(e.thread_id, 0.0) + ( | |
| e.end_time - e.start_time | |
| ) | |
| print("Active CPU Time per Thread:") | |
| for tid in sorted(thread_durations.keys()): | |
| print(f" Thread {tid}: {thread_durations[tid]:.4f}s") | |
| def main() -> None: | |
| # Detect GIL status | |
| is_gil_enabled = True | |
| if hasattr(sys, "_is_gil_enabled"): | |
| is_gil_enabled = sys._is_gil_enabled() # type: ignore | |
| mode = "Standard (GIL)" if is_gil_enabled else "Free-Threading (No GIL)" | |
| print(f"Python: {sys.version}") | |
| print(f"Mode: {mode}") | |
| print(f"Running {NUM_THREADS} threads...") | |
| collector = ThreadSafeResultCollector() | |
| threads = [ | |
| threading.Thread(target=CpuBoundTask(i, collector).run) | |
| for i in range(NUM_THREADS) | |
| ] | |
| start_time = time.time() | |
| for t in threads: | |
| t.start() | |
| for t in threads: | |
| t.join() | |
| # Check env var. Default to False (Text only) unless explicitly set to '1' or 'true' | |
| visualize_env = os.environ.get("VISUALIZE", "0").lower() | |
| should_visualize = visualize_env in ("1", "true", "yes", "on") | |
| if should_visualize: | |
| plot_timeline(collector.get_results()) | |
| else: | |
| report_text(collector.get_results()) | |
| print("\nTip: Set VISUALIZE=1 to see the Matplotlib graph.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment

