Last active
March 19, 2026 23:02
-
-
Save kevyonan/e353a7973a6ccbe1c6961a7d92093825 to your computer and use it in GitHub Desktop.
interactive, single-file script for Computer Architecture students to practice cycle-counting for register scoreboarding.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| This is an interactive Python program intended as an educational tool for students to practice | |
| counting cycles for a basic register scoreboarding RISC machine. | |
| By: Kevin Yonan | |
| Parts of this was made through generative AI. | |
| License: Public Domain | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Set, Tuple | |
| NONE_TOKEN = "-" | |
| HELP_WORDS = {"help", "h", "?"} | |
| REVEAL_WORDS = {"show", "reveal", "answer"} | |
| QUIT_WORDS = {"quit", "exit"} | |
| HIDDEN_STAGE = "." | |
| HIDDEN_TOKEN = "?" | |
| STAGE_NAMES = ("ISSUE", "READ", "EXECUTE", "WRITE") | |
| @dataclass | |
| class InstructionGroup: | |
| name: str | |
| unit_count: int | |
| opcode_latencies: Dict[str, int] | |
| @property | |
| def opcodes(self) -> Set[str]: | |
| return set(self.opcode_latencies.keys()) | |
| def latency_for(self, opcode: str) -> int: | |
| return self.opcode_latencies[opcode.upper()] | |
| @dataclass | |
| class Instruction: | |
| index: int | |
| opcode: str | |
| dest: Optional[str] | |
| src1: Optional[str] | |
| src2: Optional[str] | |
| text: str | |
| group_name: str | |
| @dataclass | |
| class FunctionalUnit: | |
| name: str | |
| group_name: str | |
| busy: bool = False | |
| instruction_index: Optional[int] = None | |
| @dataclass | |
| class InstructionState: | |
| instruction: Instruction | |
| assigned_fu: Optional[str] = None | |
| src1_producer: Optional[int] = None | |
| src2_producer: Optional[int] = None | |
| issue_cycle: Optional[int] = None | |
| read_cycle: Optional[int] = None | |
| exec_complete_cycle: Optional[int] = None | |
| write_cycle: Optional[int] = None | |
| remaining_exec_cycles: int = 0 | |
| issue_wait_reasons: List[str] = field(default_factory=list) | |
| read_wait_reasons: List[str] = field(default_factory=list) | |
| write_wait_reasons: List[str] = field(default_factory=list) | |
| @dataclass | |
| class FunctionalUnitSnapshot: | |
| fu_name: str | |
| group_name: str | |
| busy: str | |
| instruction_label: str | |
| opcode: str | |
| fi: str | |
| fj: str | |
| fk: str | |
| qj: str | |
| qk: str | |
| rj: str | |
| rk: str | |
| @dataclass | |
| class CycleSnapshot: | |
| cycle: int | |
| events: List[str] | |
| instruction_rows: List[Tuple[str, str, str, str, str, str]] | |
| unit_rows: List[FunctionalUnitSnapshot] | |
| register_rows: List[Tuple[str, str]] | |
| DEFAULT_SCENARIOS: Dict[str, List[str]] = { | |
| "1": [ | |
| "LD F6, 34(R2)", | |
| "LD F2, 45(R3)", | |
| "MULTD F0, F2, F4", | |
| "SUBD F8, F6, F2", | |
| "DIVD F10, F0, F6", | |
| "ADDD F6, F8, F2", | |
| ], | |
| "2": [ | |
| "LD F2, 0(R1)", | |
| "ADDD F4, F2, F6", | |
| "SUBD F8, F4, F2", | |
| "MULTD F10, F8, F12", | |
| ], | |
| "3": [ | |
| "LD F2, 0(R1)", | |
| "ADDD F6, F2, F4", | |
| "DIVD F8, F6, F2", | |
| "ADDD F2, F10, F12", | |
| ], | |
| } | |
| def LabelForInstruction(index: int) -> str: | |
| return f"I{index}" | |
| def BuildDefaultGroups() -> List[InstructionGroup]: | |
| return [ | |
| InstructionGroup("INT", 1, {"ADD": 1, "SUB": 1, "ADDI": 1, "SUBI": 1}), | |
| InstructionGroup("MEMORY", 1, {"LD": 2, "L.D": 2, "SD": 2, "S.D": 2}), | |
| InstructionGroup("FP_ADD", 1, {"ADDD": 2, "SUBD": 2, "ADD.D": 2, "SUB.D": 2}), | |
| InstructionGroup("FP_MULDIV", 1, {"MULTD": 10, "MUL.D": 10, "DIVD": 40, "DIV.D": 40}), | |
| ] | |
| def BuildOpcodeMap(groups: List[InstructionGroup]) -> Dict[str, InstructionGroup]: | |
| opcode_map: Dict[str, InstructionGroup] = {} | |
| for group in groups: | |
| for opcode in group.opcodes: | |
| opcode_map[opcode.upper()] = group | |
| return opcode_map | |
| def BuildFunctionalUnits(groups: List[InstructionGroup]) -> Dict[str, List[FunctionalUnit]]: | |
| functional_units: Dict[str, List[FunctionalUnit]] = {} | |
| for group in groups: | |
| functional_units[group.name] = [ | |
| FunctionalUnit(f"{group.name}{index + 1}", group.name) | |
| for index in range(group.unit_count) | |
| ] | |
| return functional_units | |
| def NormalizeToken(token: Optional[str]) -> Optional[str]: | |
| if token is None: | |
| return None | |
| token = token.strip().upper() | |
| if not token or token == NONE_TOKEN: | |
| return None | |
| return token | |
| def AppendUnique(target: List[str], items: List[str]) -> None: | |
| for item in items: | |
| if item not in target: | |
| target.append(item) | |
| def FindFreeUnit( | |
| functional_units: Dict[str, List[FunctionalUnit]], | |
| group_name: str, | |
| ) -> Optional[FunctionalUnit]: | |
| for unit in functional_units[group_name]: | |
| if not unit.busy: | |
| return unit | |
| return None | |
| def CleanInstructionTokens(line: str) -> List[str]: | |
| spaced = line.replace(",", " ") | |
| spaced = spaced.replace("(", " ").replace(")", " ") | |
| spaced = spaced.replace("\t", " ") | |
| return [part for part in spaced.split() if part] | |
| def ParseFlexibleFields(opcode: str, fields: List[str]) -> Tuple[Optional[str], Optional[str], Optional[str]]: | |
| opcode = opcode.upper() | |
| if opcode in {"LD", "L.D"}: | |
| if len(fields) == 1: | |
| return NormalizeToken(fields[0]), None, None | |
| if len(fields) == 2: | |
| return NormalizeToken(fields[0]), NormalizeToken(fields[1]), None | |
| return NormalizeToken(fields[0]), NormalizeToken(fields[2]), None | |
| if opcode in {"SD", "S.D"}: | |
| if len(fields) == 1: | |
| return None, NormalizeToken(fields[0]), None | |
| if len(fields) == 2: | |
| return None, NormalizeToken(fields[0]), NormalizeToken(fields[1]) | |
| return None, NormalizeToken(fields[0]), NormalizeToken(fields[2]) | |
| dest = NormalizeToken(fields[0]) if len(fields) > 0 else None | |
| src1 = NormalizeToken(fields[1]) if len(fields) > 1 else None | |
| src2 = NormalizeToken(fields[2]) if len(fields) > 2 else None | |
| return dest, src1, src2 | |
| def ParseInstructionLine( | |
| line: str, | |
| index: int, | |
| opcode_map: Dict[str, InstructionGroup], | |
| ) -> Instruction: | |
| cleaned = line.strip() | |
| if not cleaned: | |
| raise ValueError("Instruction cannot be empty.") | |
| parts = CleanInstructionTokens(cleaned) | |
| if len(parts) < 2: | |
| raise ValueError( | |
| "Each instruction needs at least an opcode and one operand. " | |
| "Examples: ADD R1, R2, R3 | LD R1, 0(R2) | SD R4, 8(R1)" | |
| ) | |
| opcode = parts[0].upper() | |
| if opcode not in opcode_map: | |
| raise ValueError(f"Unknown opcode '{parts[0]}'. Add it to an instruction group first.") | |
| dest, src1, src2 = ParseFlexibleFields(opcode, parts[1:]) | |
| group = opcode_map[opcode] | |
| if cleaned.upper().startswith(opcode): | |
| text = cleaned.upper() | |
| else: | |
| visible = [opcode, dest or NONE_TOKEN, src1 or NONE_TOKEN, src2 or NONE_TOKEN] | |
| text = " ".join(visible) | |
| return Instruction( | |
| index=index, | |
| opcode=opcode, | |
| dest=dest, | |
| src1=src1, | |
| src2=src2, | |
| text=text, | |
| group_name=group.name, | |
| ) | |
| def GetIssueBlockers( | |
| state: InstructionState, | |
| functional_units: Dict[str, List[FunctionalUnit]], | |
| register_result_status: Dict[str, int], | |
| ) -> List[str]: | |
| instr = state.instruction | |
| blockers: List[str] = [] | |
| if FindFreeUnit(functional_units, instr.group_name) is None: | |
| blockers.append(f"No free functional unit in group {instr.group_name}.") | |
| if instr.dest is not None and instr.dest in register_result_status: | |
| writer_index = register_result_status[instr.dest] | |
| blockers.append(f"WAW hazard on {instr.dest} with {LabelForInstruction(writer_index)}.") | |
| return blockers | |
| def GetReadBlockers( | |
| state: InstructionState, | |
| states: List[InstructionState], | |
| current_cycle: int, | |
| ) -> List[str]: | |
| blockers: List[str] = [] | |
| if state.src1_producer is not None: | |
| producer = states[state.src1_producer] | |
| if producer.write_cycle is None or producer.write_cycle >= current_cycle: | |
| blockers.append( | |
| f"Waiting for {LabelForInstruction(producer.instruction.index)} to write " | |
| f"{state.instruction.src1}." | |
| ) | |
| if state.src2_producer is not None: | |
| producer = states[state.src2_producer] | |
| if producer.write_cycle is None or producer.write_cycle >= current_cycle: | |
| blockers.append( | |
| f"Waiting for {LabelForInstruction(producer.instruction.index)} to write " | |
| f"{state.instruction.src2}." | |
| ) | |
| return blockers | |
| def GetWriteBlockers( | |
| state: InstructionState, | |
| states: List[InstructionState], | |
| ) -> List[str]: | |
| instr = state.instruction | |
| if instr.dest is None: | |
| return [] | |
| blockers: List[str] = [] | |
| for other in states: | |
| if other.instruction.index >= instr.index: | |
| continue | |
| if other.issue_cycle is None: | |
| continue | |
| if other.read_cycle is not None: | |
| continue | |
| if other.instruction.src1 == instr.dest or other.instruction.src2 == instr.dest: | |
| blockers.append( | |
| f"WAR hazard: {LabelForInstruction(other.instruction.index)} has not read " | |
| f"{instr.dest} yet." | |
| ) | |
| return blockers | |
| def VisibleStageValue(stage_cycle: Optional[int], snapshot_cycle: int) -> str: | |
| if stage_cycle is None or stage_cycle > snapshot_cycle: | |
| return HIDDEN_STAGE | |
| return str(stage_cycle) | |
| def OperandReadyByEndOfCycle( | |
| producer_index: Optional[int], | |
| states: List[InstructionState], | |
| snapshot_cycle: int, | |
| ) -> bool: | |
| if producer_index is None: | |
| return True | |
| producer = states[producer_index] | |
| return producer.write_cycle is not None and producer.write_cycle <= snapshot_cycle | |
| def ReadyFlagForOperand( | |
| operand_name: Optional[str], | |
| producer_index: Optional[int], | |
| state: InstructionState, | |
| states: List[InstructionState], | |
| snapshot_cycle: int, | |
| ) -> str: | |
| if operand_name is None: | |
| return NONE_TOKEN | |
| if state.read_cycle is not None and state.read_cycle <= snapshot_cycle: | |
| return "NO" | |
| return "YES" if OperandReadyByEndOfCycle(producer_index, states, snapshot_cycle) else "NO" | |
| def ProducerFuName( | |
| producer_index: Optional[int], | |
| states: List[InstructionState], | |
| snapshot_cycle: int, | |
| ) -> str: | |
| if producer_index is None: | |
| return NONE_TOKEN | |
| producer = states[producer_index] | |
| if producer.write_cycle is not None and producer.write_cycle <= snapshot_cycle: | |
| return NONE_TOKEN | |
| return producer.assigned_fu or NONE_TOKEN | |
| def BuildCycleSnapshot( | |
| cycle: int, | |
| events: List[str], | |
| groups: List[InstructionGroup], | |
| states: List[InstructionState], | |
| functional_units: Dict[str, List[FunctionalUnit]], | |
| register_result_status: Dict[str, int], | |
| ) -> CycleSnapshot: | |
| instruction_rows: List[Tuple[str, str, str, str, str, str]] = [] | |
| for state in states: | |
| instruction_rows.append( | |
| ( | |
| LabelForInstruction(state.instruction.index), | |
| state.instruction.text, | |
| VisibleStageValue(state.issue_cycle, cycle), | |
| VisibleStageValue(state.read_cycle, cycle), | |
| VisibleStageValue(state.exec_complete_cycle, cycle), | |
| VisibleStageValue(state.write_cycle, cycle), | |
| ) | |
| ) | |
| unit_rows: List[FunctionalUnitSnapshot] = [] | |
| for group in groups: | |
| for unit in functional_units[group.name]: | |
| if unit.busy and unit.instruction_index is not None: | |
| state = states[unit.instruction_index] | |
| instr = state.instruction | |
| unit_rows.append( | |
| FunctionalUnitSnapshot( | |
| fu_name=unit.name, | |
| group_name=group.name, | |
| busy="YES", | |
| instruction_label=LabelForInstruction(instr.index), | |
| opcode=instr.opcode, | |
| fi=instr.dest or NONE_TOKEN, | |
| fj=instr.src1 or NONE_TOKEN, | |
| fk=instr.src2 or NONE_TOKEN, | |
| qj=ProducerFuName(state.src1_producer, states, cycle), | |
| qk=ProducerFuName(state.src2_producer, states, cycle), | |
| rj=ReadyFlagForOperand(instr.src1, state.src1_producer, state, states, cycle), | |
| rk=ReadyFlagForOperand(instr.src2, state.src2_producer, state, states, cycle), | |
| ) | |
| ) | |
| else: | |
| unit_rows.append( | |
| FunctionalUnitSnapshot( | |
| fu_name=unit.name, | |
| group_name=group.name, | |
| busy="NO", | |
| instruction_label=NONE_TOKEN, | |
| opcode=NONE_TOKEN, | |
| fi=NONE_TOKEN, | |
| fj=NONE_TOKEN, | |
| fk=NONE_TOKEN, | |
| qj=NONE_TOKEN, | |
| qk=NONE_TOKEN, | |
| rj=NONE_TOKEN, | |
| rk=NONE_TOKEN, | |
| ) | |
| ) | |
| register_rows: List[Tuple[str, str]] = [] | |
| for register_name in sorted(register_result_status.keys()): | |
| writer_state = states[register_result_status[register_name]] | |
| register_rows.append((register_name, writer_state.assigned_fu or NONE_TOKEN)) | |
| return CycleSnapshot( | |
| cycle=cycle, | |
| events=list(events), | |
| instruction_rows=instruction_rows, | |
| unit_rows=unit_rows, | |
| register_rows=register_rows, | |
| ) | |
| def SimulateScoreboard( | |
| groups: List[InstructionGroup], | |
| instructions: List[Instruction], | |
| ) -> Tuple[List[InstructionState], Dict[int, CycleSnapshot]]: | |
| functional_units = BuildFunctionalUnits(groups) | |
| register_result_status: Dict[str, int] = {} | |
| states = [InstructionState(instruction=instr) for instr in instructions] | |
| group_by_name = {group.name: group for group in groups} | |
| snapshots: Dict[int, CycleSnapshot] = {} | |
| next_issue_index = 0 | |
| current_cycle = 1 | |
| max_cycles = 10000 | |
| def all_done() -> bool: | |
| return all(state.write_cycle is not None for state in states) | |
| while not all_done(): | |
| if current_cycle > max_cycles: | |
| raise RuntimeError("Simulation exceeded the cycle limit. Check the scenario or rules.") | |
| cycle_events: List[str] = [] | |
| if next_issue_index < len(states): | |
| state = states[next_issue_index] | |
| blockers = GetIssueBlockers(state, functional_units, register_result_status) | |
| if blockers: | |
| AppendUnique(state.issue_wait_reasons, blockers) | |
| else: | |
| free_unit = FindFreeUnit(functional_units, state.instruction.group_name) | |
| if free_unit is None: | |
| raise RuntimeError("Internal functional-unit allocation error.") | |
| state.assigned_fu = free_unit.name | |
| free_unit.busy = True | |
| free_unit.instruction_index = state.instruction.index | |
| if state.instruction.src1 is not None: | |
| state.src1_producer = register_result_status.get(state.instruction.src1) | |
| if state.instruction.src2 is not None: | |
| state.src2_producer = register_result_status.get(state.instruction.src2) | |
| if state.instruction.dest is not None: | |
| register_result_status[state.instruction.dest] = state.instruction.index | |
| state.issue_cycle = current_cycle | |
| cycle_events.append(f"{LabelForInstruction(state.instruction.index)} issued on {state.assigned_fu}.") | |
| next_issue_index += 1 | |
| for state in states: | |
| if state.issue_cycle is None or state.read_cycle is not None: | |
| continue | |
| if state.issue_cycle >= current_cycle: | |
| continue | |
| blockers = GetReadBlockers(state, states, current_cycle) | |
| if blockers: | |
| AppendUnique(state.read_wait_reasons, blockers) | |
| continue | |
| state.read_cycle = current_cycle | |
| group = group_by_name[state.instruction.group_name] | |
| state.remaining_exec_cycles = group.latency_for(state.instruction.opcode) | |
| cycle_events.append( | |
| f"{LabelForInstruction(state.instruction.index)} read its operands." | |
| ) | |
| for state in states: | |
| if state.read_cycle is None or state.exec_complete_cycle is not None: | |
| continue | |
| if current_cycle <= state.read_cycle: | |
| continue | |
| state.remaining_exec_cycles -= 1 | |
| if state.remaining_exec_cycles == 0: | |
| state.exec_complete_cycle = current_cycle | |
| cycle_events.append(f"{LabelForInstruction(state.instruction.index)} completed execution.") | |
| for state in states: | |
| if state.exec_complete_cycle is None or state.write_cycle is not None: | |
| continue | |
| if state.exec_complete_cycle >= current_cycle: | |
| continue | |
| blockers = GetWriteBlockers(state, states) | |
| if blockers: | |
| AppendUnique(state.write_wait_reasons, blockers) | |
| continue | |
| state.write_cycle = current_cycle | |
| cycle_events.append( | |
| f"{LabelForInstruction(state.instruction.index)} wrote its result." | |
| ) | |
| if ( | |
| state.instruction.dest is not None | |
| and register_result_status.get(state.instruction.dest) | |
| == state.instruction.index | |
| ): | |
| del register_result_status[state.instruction.dest] | |
| if state.assigned_fu is not None: | |
| for unit in functional_units[state.instruction.group_name]: | |
| if unit.name == state.assigned_fu: | |
| unit.busy = False | |
| unit.instruction_index = None | |
| break | |
| snapshots[current_cycle] = BuildCycleSnapshot( | |
| current_cycle, | |
| cycle_events, | |
| groups, | |
| states, | |
| functional_units, | |
| register_result_status, | |
| ) | |
| current_cycle += 1 | |
| return states, snapshots | |
| def PrintConfiguration(groups: List[InstructionGroup]) -> None: | |
| print("\nFunctional-unit configuration") | |
| print("-" * 88) | |
| print(f"{'Group':<18} {'Units':<8} {'Opcode':<12} {'Latency':<10}") | |
| print("-" * 88) | |
| for group in groups: | |
| opcodes = sorted(group.opcode_latencies.items()) | |
| for row_index, (opcode, latency) in enumerate(opcodes): | |
| group_text = group.name if row_index == 0 else "" | |
| units_text = str(group.unit_count) if row_index == 0 else "" | |
| print(f"{group_text:<18} {units_text:<8} {opcode:<12} {latency:<10}") | |
| print("-" * 88) | |
| def PrintAssumptions() -> None: | |
| print("\nScoreboarding assumptions used by this program") | |
| print("1. One instruction issues per cycle, in program order.") | |
| print("2. Functional units stay busy from Issue until Write Result.") | |
| print("3. A value written on cycle N can be read no earlier than cycle N + 1.") | |
| print("4. Execution starts the cycle after Read Operands.") | |
| print("5. A functional unit freed on cycle N can be reissued no earlier than cycle N + 1.") | |
| print("6. Write Result happens no earlier than the cycle after Execute Complete.") | |
| print("7. A single FU group may support multiple opcodes with different latencies.") | |
| print("8. Live scoreboard snapshots show the state at the end of the displayed cycle.\n") | |
| def PrintInstructionList(instructions: List[Instruction]) -> None: | |
| print("Instruction stream") | |
| print("-" * 88) | |
| for instr in instructions: | |
| print(f"{LabelForInstruction(instr.index)}: {instr.text}") | |
| print("-" * 88) | |
| def BuildStageExplanation( | |
| state: InstructionState, | |
| groups: List[InstructionGroup], | |
| stage_name: str, | |
| ) -> str: | |
| instr = state.instruction | |
| group_by_name = {group.name: group for group in groups} | |
| latency = group_by_name[instr.group_name].latency_for(instr.opcode) | |
| if stage_name == "ISSUE": | |
| if state.issue_wait_reasons: | |
| return ( | |
| f"Issued on cycle {state.issue_cycle} using {state.assigned_fu}. " | |
| f"It waited because: {' '.join(state.issue_wait_reasons)}" | |
| ) | |
| return f"Issued immediately on cycle {state.issue_cycle} using {state.assigned_fu}." | |
| if stage_name == "READ": | |
| if state.read_wait_reasons: | |
| return ( | |
| f"Read operands on cycle {state.read_cycle}. " | |
| f"It waited because: {' '.join(state.read_wait_reasons)}" | |
| ) | |
| return f"Read operands on the first possible cycle: {state.read_cycle}." | |
| if stage_name == "EXECUTE": | |
| return ( | |
| f"{instr.opcode} uses FU group {instr.group_name} with latency {latency}. " | |
| f"Read happened on cycle {state.read_cycle}, so execution completed on " | |
| f"cycle {state.exec_complete_cycle}." | |
| ) | |
| if stage_name == "WRITE": | |
| if state.write_wait_reasons: | |
| return ( | |
| f"Wrote result on cycle {state.write_cycle}. " | |
| f"It waited because: {' '.join(state.write_wait_reasons)}" | |
| ) | |
| return f"Wrote result on the first possible cycle: {state.write_cycle}." | |
| return "" | |
| def PrintScheduleTable(states: List[InstructionState]) -> None: | |
| print("\nCorrect schedule") | |
| print("-" * 88) | |
| print( | |
| f"{'Instr':<6} {'Instruction':<24} {'Issue':<7} {'Read':<7} " | |
| f"{'ExecC':<7} {'Write':<7}" | |
| ) | |
| print("-" * 88) | |
| for state in states: | |
| print( | |
| f"{LabelForInstruction(state.instruction.index):<6} {state.instruction.text:<24} " | |
| f"{state.issue_cycle:<7} {state.read_cycle:<7} " | |
| f"{state.exec_complete_cycle:<7} {state.write_cycle:<7}" | |
| ) | |
| print("-" * 88) | |
| def StageCycleForName(state: InstructionState, stage_name: str) -> Optional[int]: | |
| if stage_name == "ISSUE": | |
| return state.issue_cycle | |
| if stage_name == "READ": | |
| return state.read_cycle | |
| if stage_name == "EXECUTE": | |
| return state.exec_complete_cycle | |
| if stage_name == "WRITE": | |
| return state.write_cycle | |
| raise ValueError(f"Unknown stage name: {stage_name}") | |
| def EventTextForStage(state: InstructionState, stage_name: str) -> str: | |
| label = LabelForInstruction(state.instruction.index) | |
| if stage_name == "ISSUE": | |
| return f"{label} issued on {state.assigned_fu}." | |
| if stage_name == "READ": | |
| return f"{label} read its operands." | |
| if stage_name == "EXECUTE": | |
| return f"{label} completed execution." | |
| if stage_name == "WRITE": | |
| return f"{label} wrote its result." | |
| raise ValueError(f"Unknown stage name: {stage_name}") | |
| def BuildSpoilerFreeSnapshot( | |
| actual_snapshot: CycleSnapshot, | |
| states: List[InstructionState], | |
| revealed_stages: Set[Tuple[int, str]], | |
| ) -> CycleSnapshot: | |
| cycle = actual_snapshot.cycle | |
| instruction_rows: List[Tuple[str, str, str, str, str, str]] = [] | |
| for state in states: | |
| index = state.instruction.index | |
| instruction_rows.append( | |
| ( | |
| LabelForInstruction(index), | |
| state.instruction.text, | |
| str(state.issue_cycle) if (index, "ISSUE") in revealed_stages else HIDDEN_STAGE, | |
| str(state.read_cycle) if (index, "READ") in revealed_stages else HIDDEN_STAGE, | |
| str(state.exec_complete_cycle) if (index, "EXECUTE") in revealed_stages else HIDDEN_STAGE, | |
| str(state.write_cycle) if (index, "WRITE") in revealed_stages else HIDDEN_STAGE, | |
| ) | |
| ) | |
| events: List[str] = [] | |
| for state in states: | |
| index = state.instruction.index | |
| for stage_name in STAGE_NAMES: | |
| if (index, stage_name) not in revealed_stages: | |
| continue | |
| if StageCycleForName(state, stage_name) == cycle: | |
| events.append(EventTextForStage(state, stage_name)) | |
| unit_rows: List[FunctionalUnitSnapshot] = [] | |
| for row in actual_snapshot.unit_rows: | |
| if row.instruction_label.startswith("I") and row.instruction_label[1:].isdigit(): | |
| instr_index = int(row.instruction_label[1:]) | |
| if (instr_index, "ISSUE") not in revealed_stages: | |
| unit_rows.append( | |
| FunctionalUnitSnapshot( | |
| fu_name=row.fu_name, | |
| group_name=row.group_name, | |
| busy=HIDDEN_TOKEN, | |
| instruction_label=HIDDEN_TOKEN, | |
| opcode=HIDDEN_TOKEN, | |
| fi=HIDDEN_TOKEN, | |
| fj=HIDDEN_TOKEN, | |
| fk=HIDDEN_TOKEN, | |
| qj=HIDDEN_TOKEN, | |
| qk=HIDDEN_TOKEN, | |
| rj=HIDDEN_TOKEN, | |
| rk=HIDDEN_TOKEN, | |
| ) | |
| ) | |
| continue | |
| unit_rows.append(row) | |
| register_rows: List[Tuple[str, str]] = [] | |
| for state in states: | |
| index = state.instruction.index | |
| dest = state.instruction.dest | |
| if dest is None or state.issue_cycle is None or state.issue_cycle > cycle: | |
| continue | |
| if (index, "ISSUE") not in revealed_stages: | |
| continue | |
| if state.write_cycle is not None and state.write_cycle <= cycle: | |
| continue | |
| register_rows.append((dest, state.assigned_fu or NONE_TOKEN)) | |
| register_rows.sort(key=lambda item: item[0]) | |
| return CycleSnapshot( | |
| cycle=cycle, | |
| events=events, | |
| instruction_rows=instruction_rows, | |
| unit_rows=unit_rows, | |
| register_rows=register_rows, | |
| ) | |
| def PrintCycleSnapshot(snapshot: CycleSnapshot) -> None: | |
| print(f"\nLive scoreboard snapshot: end of cycle {snapshot.cycle}") | |
| print("-" * 88) | |
| if snapshot.events: | |
| print("Events this cycle:") | |
| for event in snapshot.events: | |
| print(f" - {event}") | |
| else: | |
| print("Events this cycle: none") | |
| print("\nInstruction status") | |
| print("-" * 88) | |
| print(f"{'Instr':<6} {'Instruction':<24} {'Issue':<7} {'Read':<7} {'ExecC':<7} {'Write':<7}") | |
| print("-" * 88) | |
| for label, text, issue, read, exec_complete, write in snapshot.instruction_rows: | |
| print(f"{label:<6} {text:<24} {issue:<7} {read:<7} {exec_complete:<7} {write:<7}") | |
| print("\nFunctional-unit status") | |
| print("-" * 132) | |
| print( | |
| f"{'FU':<16} {'Group':<16} {'Busy':<6} {'Instr':<6} {'Op':<8} " | |
| f"{'Fi':<8} {'Fj':<8} {'Fk':<8} {'Qj':<12} {'Qk':<12} {'Rj':<6} {'Rk':<6}" | |
| ) | |
| print("-" * 132) | |
| for row in snapshot.unit_rows: | |
| print( | |
| f"{row.fu_name:<16} {row.group_name:<16} {row.busy:<6} {row.instruction_label:<6} {row.opcode:<8} " | |
| f"{row.fi:<8} {row.fj:<8} {row.fk:<8} {row.qj:<12} {row.qk:<12} {row.rj:<6} {row.rk:<6}" | |
| ) | |
| print("\nRegister-result status") | |
| print("-" * 36) | |
| print(f"{'Register':<16} {'Writer FU':<16}") | |
| print("-" * 36) | |
| if snapshot.register_rows: | |
| for register_name, writer_fu in snapshot.register_rows: | |
| print(f"{register_name:<16} {writer_fu:<16}") | |
| else: | |
| print(f"{'(none)':<16} {NONE_TOKEN:<16}") | |
| print("-" * 36) | |
| def AskInteger(prompt: str) -> int: | |
| while True: | |
| text = input(prompt).strip() | |
| if text.lower() in QUIT_WORDS: | |
| raise KeyboardInterrupt | |
| try: | |
| return int(text) | |
| except ValueError: | |
| print("Please enter an integer.") | |
| def AskGroupConfiguration() -> List[InstructionGroup]: | |
| print("Choose a functional-unit configuration:") | |
| print("1. Built-in default") | |
| print("2. Custom interactive configuration") | |
| while True: | |
| choice = input("> ").strip() | |
| if choice == "1": | |
| return BuildDefaultGroups() | |
| if choice == "2": | |
| group_count = AskInteger("How many FU groups? ") | |
| groups: List[InstructionGroup] = [] | |
| for group_index in range(group_count): | |
| print(f"\nFU group {group_index + 1}") | |
| name = input("Group name: ").strip().upper() | |
| unit_count = AskInteger("How many functional units in this group? ") | |
| opcode_count = AskInteger("How many opcodes does this group support? ") | |
| opcode_latencies: Dict[str, int] = {} | |
| for opcode_index in range(opcode_count): | |
| while True: | |
| opcode = input(f" Opcode {opcode_index + 1} name: ").strip().upper() | |
| latency = AskInteger(f" Latency for {opcode}: ") | |
| if not opcode: | |
| print(" Opcode cannot be empty.") | |
| continue | |
| if latency <= 0: | |
| print(" Latency must be positive.") | |
| continue | |
| opcode_latencies[opcode] = latency | |
| break | |
| if not name: | |
| print("Group name cannot be empty.") | |
| continue | |
| if not opcode_latencies: | |
| print("You must provide at least one opcode.") | |
| continue | |
| if unit_count <= 0: | |
| print("Unit count must be positive.") | |
| continue | |
| groups.append( | |
| InstructionGroup( | |
| name=name, | |
| unit_count=unit_count, | |
| opcode_latencies=opcode_latencies, | |
| ) | |
| ) | |
| return groups | |
| print("Enter 1 or 2.") | |
| def AskScenarioLines(opcode_map: Dict[str, InstructionGroup]) -> List[str]: | |
| print("\nChoose an instruction stream:") | |
| print("1. Classic mixed example") | |
| print("2. Short dependency chain") | |
| print("3. WAR-delay example") | |
| print("4. Enter your own instructions") | |
| while True: | |
| choice = input("> ").strip() | |
| if choice in DEFAULT_SCENARIOS: | |
| return DEFAULT_SCENARIOS[choice] | |
| if choice == "4": | |
| print("\nEnter one instruction per line. [-1 to stop]") | |
| print("Accepted forms include:") | |
| print(" OP DEST SRC1 SRC2") | |
| print(" ADD R5, R3, R4") | |
| print(" LD R3, 34(R1)") | |
| print(" LD F1, #9") | |
| print(" SD R7, 4(R0)") | |
| print("Use '-' for a missing operand in normalized form.") | |
| lines: List[str] = [] | |
| index = 0 | |
| while True: | |
| line = input(f"{LabelForInstruction(index)}> ").strip() | |
| if line=="-1": | |
| break | |
| try: | |
| ParseInstructionLine(line, index, opcode_map) | |
| lines.append(line) | |
| index += 1 | |
| except ValueError as exc: | |
| print(f"Invalid instruction: {exc}") | |
| return lines | |
| print("Enter 1, 2, 3, or 4.") | |
| def AskGuessUntilCorrect( | |
| prompt: str, | |
| answer: int, | |
| help_text: str, | |
| explanation: str, | |
| ) -> None: | |
| while True: | |
| text = input(prompt).strip().lower() | |
| if text in QUIT_WORDS: | |
| raise KeyboardInterrupt | |
| if text in HELP_WORDS: | |
| print(help_text) | |
| continue | |
| if text in REVEAL_WORDS: | |
| print(f"The correct cycle is {answer}.") | |
| print("Now type that value to continue.") | |
| continue | |
| try: | |
| guess = int(text) | |
| except ValueError: | |
| print("Enter an integer, 'help', 'show', or 'quit'.") | |
| continue | |
| if guess == answer: | |
| print("Correct.") | |
| print(explanation) | |
| return | |
| if guess < answer: | |
| print(f"Incorrect. That is {answer - guess} cycle(s) too early.") | |
| else: | |
| print(f"Incorrect. That is {guess - answer} cycle(s) too late.") | |
| def SnapshotForCycle(snapshots: Dict[int, CycleSnapshot], cycle: int) -> CycleSnapshot: | |
| snapshot = snapshots.get(cycle) | |
| if snapshot is None: | |
| raise RuntimeError(f"Internal error: no saved snapshot for cycle {cycle}.") | |
| return snapshot | |
| def RunQuiz( | |
| states: List[InstructionState], | |
| groups: List[InstructionGroup], | |
| snapshots: Dict[int, CycleSnapshot], | |
| ) -> None: | |
| print("\nQuiz started.") | |
| print("Commands during the quiz: help, show, quit") | |
| group_by_name = {group.name: group for group in groups} | |
| revealed_stages: Set[Tuple[int, str]] = set() | |
| max_display_cycle = 0 | |
| for state in states: | |
| instr = state.instruction | |
| print("\n" + "=" * 88) | |
| print(f"{LabelForInstruction(instr.index)}: {instr.text}") | |
| group = group_by_name[instr.group_name] | |
| latency = group.latency_for(instr.opcode) | |
| print( | |
| f"FU group: {group.name} | " | |
| f"Functional units: {group.unit_count} | " | |
| f"Opcode latency: {latency}" | |
| ) | |
| def reveal_stage(stage_name: str, cycle: int) -> None: | |
| nonlocal max_display_cycle | |
| revealed_stages.add((instr.index, stage_name)) | |
| display_cycle = max(max_display_cycle, cycle) | |
| if display_cycle > max_display_cycle: | |
| max_display_cycle = display_cycle | |
| elif cycle < max_display_cycle: | |
| print( | |
| f"Live machine view remains at end of cycle {max_display_cycle} " | |
| f"to avoid rewinding time from cycle {cycle}." | |
| ) | |
| actual_snapshot = SnapshotForCycle(snapshots, max_display_cycle) | |
| PrintCycleSnapshot(BuildSpoilerFreeSnapshot(actual_snapshot, states, revealed_stages)) | |
| AskGuessUntilCorrect( | |
| "Guess ISSUE cycle: ", | |
| state.issue_cycle or -1, | |
| "Issue waits for a free functional unit and for any WAW hazard to clear.", | |
| BuildStageExplanation(state, groups, "ISSUE"), | |
| ) | |
| reveal_stage("ISSUE", state.issue_cycle or -1) | |
| AskGuessUntilCorrect( | |
| "Guess READ cycle: ", | |
| state.read_cycle or -1, | |
| "Read waits until every needed producer has already written in an earlier cycle.", | |
| BuildStageExplanation(state, groups, "READ"), | |
| ) | |
| reveal_stage("READ", state.read_cycle or -1) | |
| AskGuessUntilCorrect( | |
| "Guess EXECUTE-COMPLETE cycle: ", | |
| state.exec_complete_cycle or -1, | |
| "Execute Complete is driven by that opcode's latency within its FU group.", | |
| BuildStageExplanation(state, groups, "EXECUTE"), | |
| ) | |
| reveal_stage("EXECUTE", state.exec_complete_cycle or -1) | |
| AskGuessUntilCorrect( | |
| "Guess WRITE-RESULT cycle: ", | |
| state.write_cycle or -1, | |
| "Write waits for WAR hazards to clear and happens after Execute Complete.", | |
| BuildStageExplanation(state, groups, "WRITE"), | |
| ) | |
| reveal_stage("WRITE", state.write_cycle or -1) | |
| print("\nYou finished the entire instruction stream.") | |
| PrintScheduleTable(states) | |
| def Main() -> None: | |
| print("Register Scoreboarding Practice Program") | |
| print("=" * 88) | |
| try: | |
| groups = AskGroupConfiguration() | |
| opcode_map = BuildOpcodeMap(groups) | |
| scenario_lines = AskScenarioLines(opcode_map) | |
| instructions = [ | |
| ParseInstructionLine(line, index, opcode_map) | |
| for index, line in enumerate(scenario_lines) | |
| ] | |
| PrintAssumptions() | |
| PrintConfiguration(groups) | |
| PrintInstructionList(instructions) | |
| states, snapshots = SimulateScoreboard(groups, instructions) | |
| RunQuiz(states, groups, snapshots) | |
| except KeyboardInterrupt: | |
| print("\nSession ended.") | |
| if __name__ == "__main__": | |
| Main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment