Skip to content

Instantly share code, notes, and snippets.

@kevyonan
Last active March 19, 2026 23:02
Show Gist options
  • Select an option

  • Save kevyonan/e353a7973a6ccbe1c6961a7d92093825 to your computer and use it in GitHub Desktop.

Select an option

Save kevyonan/e353a7973a6ccbe1c6961a7d92093825 to your computer and use it in GitHub Desktop.
interactive, single-file script for Computer Architecture students to practice cycle-counting for register scoreboarding.
"""
This is an interactive Python program intended as an educational tool for students to practice
counting cycles for a basic register scoreboarding RISC machine.
By: Kevin Yonan
Parts of this was made through generative AI.
License: Public Domain
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
NONE_TOKEN = "-"
HELP_WORDS = {"help", "h", "?"}
REVEAL_WORDS = {"show", "reveal", "answer"}
QUIT_WORDS = {"quit", "exit"}
HIDDEN_STAGE = "."
HIDDEN_TOKEN = "?"
STAGE_NAMES = ("ISSUE", "READ", "EXECUTE", "WRITE")
@dataclass
class InstructionGroup:
name: str
unit_count: int
opcode_latencies: Dict[str, int]
@property
def opcodes(self) -> Set[str]:
return set(self.opcode_latencies.keys())
def latency_for(self, opcode: str) -> int:
return self.opcode_latencies[opcode.upper()]
@dataclass
class Instruction:
index: int
opcode: str
dest: Optional[str]
src1: Optional[str]
src2: Optional[str]
text: str
group_name: str
@dataclass
class FunctionalUnit:
name: str
group_name: str
busy: bool = False
instruction_index: Optional[int] = None
@dataclass
class InstructionState:
instruction: Instruction
assigned_fu: Optional[str] = None
src1_producer: Optional[int] = None
src2_producer: Optional[int] = None
issue_cycle: Optional[int] = None
read_cycle: Optional[int] = None
exec_complete_cycle: Optional[int] = None
write_cycle: Optional[int] = None
remaining_exec_cycles: int = 0
issue_wait_reasons: List[str] = field(default_factory=list)
read_wait_reasons: List[str] = field(default_factory=list)
write_wait_reasons: List[str] = field(default_factory=list)
@dataclass
class FunctionalUnitSnapshot:
fu_name: str
group_name: str
busy: str
instruction_label: str
opcode: str
fi: str
fj: str
fk: str
qj: str
qk: str
rj: str
rk: str
@dataclass
class CycleSnapshot:
cycle: int
events: List[str]
instruction_rows: List[Tuple[str, str, str, str, str, str]]
unit_rows: List[FunctionalUnitSnapshot]
register_rows: List[Tuple[str, str]]
DEFAULT_SCENARIOS: Dict[str, List[str]] = {
"1": [
"LD F6, 34(R2)",
"LD F2, 45(R3)",
"MULTD F0, F2, F4",
"SUBD F8, F6, F2",
"DIVD F10, F0, F6",
"ADDD F6, F8, F2",
],
"2": [
"LD F2, 0(R1)",
"ADDD F4, F2, F6",
"SUBD F8, F4, F2",
"MULTD F10, F8, F12",
],
"3": [
"LD F2, 0(R1)",
"ADDD F6, F2, F4",
"DIVD F8, F6, F2",
"ADDD F2, F10, F12",
],
}
def LabelForInstruction(index: int) -> str:
return f"I{index}"
def BuildDefaultGroups() -> List[InstructionGroup]:
return [
InstructionGroup("INT", 1, {"ADD": 1, "SUB": 1, "ADDI": 1, "SUBI": 1}),
InstructionGroup("MEMORY", 1, {"LD": 2, "L.D": 2, "SD": 2, "S.D": 2}),
InstructionGroup("FP_ADD", 1, {"ADDD": 2, "SUBD": 2, "ADD.D": 2, "SUB.D": 2}),
InstructionGroup("FP_MULDIV", 1, {"MULTD": 10, "MUL.D": 10, "DIVD": 40, "DIV.D": 40}),
]
def BuildOpcodeMap(groups: List[InstructionGroup]) -> Dict[str, InstructionGroup]:
opcode_map: Dict[str, InstructionGroup] = {}
for group in groups:
for opcode in group.opcodes:
opcode_map[opcode.upper()] = group
return opcode_map
def BuildFunctionalUnits(groups: List[InstructionGroup]) -> Dict[str, List[FunctionalUnit]]:
functional_units: Dict[str, List[FunctionalUnit]] = {}
for group in groups:
functional_units[group.name] = [
FunctionalUnit(f"{group.name}{index + 1}", group.name)
for index in range(group.unit_count)
]
return functional_units
def NormalizeToken(token: Optional[str]) -> Optional[str]:
if token is None:
return None
token = token.strip().upper()
if not token or token == NONE_TOKEN:
return None
return token
def AppendUnique(target: List[str], items: List[str]) -> None:
for item in items:
if item not in target:
target.append(item)
def FindFreeUnit(
functional_units: Dict[str, List[FunctionalUnit]],
group_name: str,
) -> Optional[FunctionalUnit]:
for unit in functional_units[group_name]:
if not unit.busy:
return unit
return None
def CleanInstructionTokens(line: str) -> List[str]:
spaced = line.replace(",", " ")
spaced = spaced.replace("(", " ").replace(")", " ")
spaced = spaced.replace("\t", " ")
return [part for part in spaced.split() if part]
def ParseFlexibleFields(opcode: str, fields: List[str]) -> Tuple[Optional[str], Optional[str], Optional[str]]:
opcode = opcode.upper()
if opcode in {"LD", "L.D"}:
if len(fields) == 1:
return NormalizeToken(fields[0]), None, None
if len(fields) == 2:
return NormalizeToken(fields[0]), NormalizeToken(fields[1]), None
return NormalizeToken(fields[0]), NormalizeToken(fields[2]), None
if opcode in {"SD", "S.D"}:
if len(fields) == 1:
return None, NormalizeToken(fields[0]), None
if len(fields) == 2:
return None, NormalizeToken(fields[0]), NormalizeToken(fields[1])
return None, NormalizeToken(fields[0]), NormalizeToken(fields[2])
dest = NormalizeToken(fields[0]) if len(fields) > 0 else None
src1 = NormalizeToken(fields[1]) if len(fields) > 1 else None
src2 = NormalizeToken(fields[2]) if len(fields) > 2 else None
return dest, src1, src2
def ParseInstructionLine(
line: str,
index: int,
opcode_map: Dict[str, InstructionGroup],
) -> Instruction:
cleaned = line.strip()
if not cleaned:
raise ValueError("Instruction cannot be empty.")
parts = CleanInstructionTokens(cleaned)
if len(parts) < 2:
raise ValueError(
"Each instruction needs at least an opcode and one operand. "
"Examples: ADD R1, R2, R3 | LD R1, 0(R2) | SD R4, 8(R1)"
)
opcode = parts[0].upper()
if opcode not in opcode_map:
raise ValueError(f"Unknown opcode '{parts[0]}'. Add it to an instruction group first.")
dest, src1, src2 = ParseFlexibleFields(opcode, parts[1:])
group = opcode_map[opcode]
if cleaned.upper().startswith(opcode):
text = cleaned.upper()
else:
visible = [opcode, dest or NONE_TOKEN, src1 or NONE_TOKEN, src2 or NONE_TOKEN]
text = " ".join(visible)
return Instruction(
index=index,
opcode=opcode,
dest=dest,
src1=src1,
src2=src2,
text=text,
group_name=group.name,
)
def GetIssueBlockers(
state: InstructionState,
functional_units: Dict[str, List[FunctionalUnit]],
register_result_status: Dict[str, int],
) -> List[str]:
instr = state.instruction
blockers: List[str] = []
if FindFreeUnit(functional_units, instr.group_name) is None:
blockers.append(f"No free functional unit in group {instr.group_name}.")
if instr.dest is not None and instr.dest in register_result_status:
writer_index = register_result_status[instr.dest]
blockers.append(f"WAW hazard on {instr.dest} with {LabelForInstruction(writer_index)}.")
return blockers
def GetReadBlockers(
state: InstructionState,
states: List[InstructionState],
current_cycle: int,
) -> List[str]:
blockers: List[str] = []
if state.src1_producer is not None:
producer = states[state.src1_producer]
if producer.write_cycle is None or producer.write_cycle >= current_cycle:
blockers.append(
f"Waiting for {LabelForInstruction(producer.instruction.index)} to write "
f"{state.instruction.src1}."
)
if state.src2_producer is not None:
producer = states[state.src2_producer]
if producer.write_cycle is None or producer.write_cycle >= current_cycle:
blockers.append(
f"Waiting for {LabelForInstruction(producer.instruction.index)} to write "
f"{state.instruction.src2}."
)
return blockers
def GetWriteBlockers(
state: InstructionState,
states: List[InstructionState],
) -> List[str]:
instr = state.instruction
if instr.dest is None:
return []
blockers: List[str] = []
for other in states:
if other.instruction.index >= instr.index:
continue
if other.issue_cycle is None:
continue
if other.read_cycle is not None:
continue
if other.instruction.src1 == instr.dest or other.instruction.src2 == instr.dest:
blockers.append(
f"WAR hazard: {LabelForInstruction(other.instruction.index)} has not read "
f"{instr.dest} yet."
)
return blockers
def VisibleStageValue(stage_cycle: Optional[int], snapshot_cycle: int) -> str:
if stage_cycle is None or stage_cycle > snapshot_cycle:
return HIDDEN_STAGE
return str(stage_cycle)
def OperandReadyByEndOfCycle(
producer_index: Optional[int],
states: List[InstructionState],
snapshot_cycle: int,
) -> bool:
if producer_index is None:
return True
producer = states[producer_index]
return producer.write_cycle is not None and producer.write_cycle <= snapshot_cycle
def ReadyFlagForOperand(
operand_name: Optional[str],
producer_index: Optional[int],
state: InstructionState,
states: List[InstructionState],
snapshot_cycle: int,
) -> str:
if operand_name is None:
return NONE_TOKEN
if state.read_cycle is not None and state.read_cycle <= snapshot_cycle:
return "NO"
return "YES" if OperandReadyByEndOfCycle(producer_index, states, snapshot_cycle) else "NO"
def ProducerFuName(
producer_index: Optional[int],
states: List[InstructionState],
snapshot_cycle: int,
) -> str:
if producer_index is None:
return NONE_TOKEN
producer = states[producer_index]
if producer.write_cycle is not None and producer.write_cycle <= snapshot_cycle:
return NONE_TOKEN
return producer.assigned_fu or NONE_TOKEN
def BuildCycleSnapshot(
cycle: int,
events: List[str],
groups: List[InstructionGroup],
states: List[InstructionState],
functional_units: Dict[str, List[FunctionalUnit]],
register_result_status: Dict[str, int],
) -> CycleSnapshot:
instruction_rows: List[Tuple[str, str, str, str, str, str]] = []
for state in states:
instruction_rows.append(
(
LabelForInstruction(state.instruction.index),
state.instruction.text,
VisibleStageValue(state.issue_cycle, cycle),
VisibleStageValue(state.read_cycle, cycle),
VisibleStageValue(state.exec_complete_cycle, cycle),
VisibleStageValue(state.write_cycle, cycle),
)
)
unit_rows: List[FunctionalUnitSnapshot] = []
for group in groups:
for unit in functional_units[group.name]:
if unit.busy and unit.instruction_index is not None:
state = states[unit.instruction_index]
instr = state.instruction
unit_rows.append(
FunctionalUnitSnapshot(
fu_name=unit.name,
group_name=group.name,
busy="YES",
instruction_label=LabelForInstruction(instr.index),
opcode=instr.opcode,
fi=instr.dest or NONE_TOKEN,
fj=instr.src1 or NONE_TOKEN,
fk=instr.src2 or NONE_TOKEN,
qj=ProducerFuName(state.src1_producer, states, cycle),
qk=ProducerFuName(state.src2_producer, states, cycle),
rj=ReadyFlagForOperand(instr.src1, state.src1_producer, state, states, cycle),
rk=ReadyFlagForOperand(instr.src2, state.src2_producer, state, states, cycle),
)
)
else:
unit_rows.append(
FunctionalUnitSnapshot(
fu_name=unit.name,
group_name=group.name,
busy="NO",
instruction_label=NONE_TOKEN,
opcode=NONE_TOKEN,
fi=NONE_TOKEN,
fj=NONE_TOKEN,
fk=NONE_TOKEN,
qj=NONE_TOKEN,
qk=NONE_TOKEN,
rj=NONE_TOKEN,
rk=NONE_TOKEN,
)
)
register_rows: List[Tuple[str, str]] = []
for register_name in sorted(register_result_status.keys()):
writer_state = states[register_result_status[register_name]]
register_rows.append((register_name, writer_state.assigned_fu or NONE_TOKEN))
return CycleSnapshot(
cycle=cycle,
events=list(events),
instruction_rows=instruction_rows,
unit_rows=unit_rows,
register_rows=register_rows,
)
def SimulateScoreboard(
groups: List[InstructionGroup],
instructions: List[Instruction],
) -> Tuple[List[InstructionState], Dict[int, CycleSnapshot]]:
functional_units = BuildFunctionalUnits(groups)
register_result_status: Dict[str, int] = {}
states = [InstructionState(instruction=instr) for instr in instructions]
group_by_name = {group.name: group for group in groups}
snapshots: Dict[int, CycleSnapshot] = {}
next_issue_index = 0
current_cycle = 1
max_cycles = 10000
def all_done() -> bool:
return all(state.write_cycle is not None for state in states)
while not all_done():
if current_cycle > max_cycles:
raise RuntimeError("Simulation exceeded the cycle limit. Check the scenario or rules.")
cycle_events: List[str] = []
if next_issue_index < len(states):
state = states[next_issue_index]
blockers = GetIssueBlockers(state, functional_units, register_result_status)
if blockers:
AppendUnique(state.issue_wait_reasons, blockers)
else:
free_unit = FindFreeUnit(functional_units, state.instruction.group_name)
if free_unit is None:
raise RuntimeError("Internal functional-unit allocation error.")
state.assigned_fu = free_unit.name
free_unit.busy = True
free_unit.instruction_index = state.instruction.index
if state.instruction.src1 is not None:
state.src1_producer = register_result_status.get(state.instruction.src1)
if state.instruction.src2 is not None:
state.src2_producer = register_result_status.get(state.instruction.src2)
if state.instruction.dest is not None:
register_result_status[state.instruction.dest] = state.instruction.index
state.issue_cycle = current_cycle
cycle_events.append(f"{LabelForInstruction(state.instruction.index)} issued on {state.assigned_fu}.")
next_issue_index += 1
for state in states:
if state.issue_cycle is None or state.read_cycle is not None:
continue
if state.issue_cycle >= current_cycle:
continue
blockers = GetReadBlockers(state, states, current_cycle)
if blockers:
AppendUnique(state.read_wait_reasons, blockers)
continue
state.read_cycle = current_cycle
group = group_by_name[state.instruction.group_name]
state.remaining_exec_cycles = group.latency_for(state.instruction.opcode)
cycle_events.append(
f"{LabelForInstruction(state.instruction.index)} read its operands."
)
for state in states:
if state.read_cycle is None or state.exec_complete_cycle is not None:
continue
if current_cycle <= state.read_cycle:
continue
state.remaining_exec_cycles -= 1
if state.remaining_exec_cycles == 0:
state.exec_complete_cycle = current_cycle
cycle_events.append(f"{LabelForInstruction(state.instruction.index)} completed execution.")
for state in states:
if state.exec_complete_cycle is None or state.write_cycle is not None:
continue
if state.exec_complete_cycle >= current_cycle:
continue
blockers = GetWriteBlockers(state, states)
if blockers:
AppendUnique(state.write_wait_reasons, blockers)
continue
state.write_cycle = current_cycle
cycle_events.append(
f"{LabelForInstruction(state.instruction.index)} wrote its result."
)
if (
state.instruction.dest is not None
and register_result_status.get(state.instruction.dest)
== state.instruction.index
):
del register_result_status[state.instruction.dest]
if state.assigned_fu is not None:
for unit in functional_units[state.instruction.group_name]:
if unit.name == state.assigned_fu:
unit.busy = False
unit.instruction_index = None
break
snapshots[current_cycle] = BuildCycleSnapshot(
current_cycle,
cycle_events,
groups,
states,
functional_units,
register_result_status,
)
current_cycle += 1
return states, snapshots
def PrintConfiguration(groups: List[InstructionGroup]) -> None:
print("\nFunctional-unit configuration")
print("-" * 88)
print(f"{'Group':<18} {'Units':<8} {'Opcode':<12} {'Latency':<10}")
print("-" * 88)
for group in groups:
opcodes = sorted(group.opcode_latencies.items())
for row_index, (opcode, latency) in enumerate(opcodes):
group_text = group.name if row_index == 0 else ""
units_text = str(group.unit_count) if row_index == 0 else ""
print(f"{group_text:<18} {units_text:<8} {opcode:<12} {latency:<10}")
print("-" * 88)
def PrintAssumptions() -> None:
print("\nScoreboarding assumptions used by this program")
print("1. One instruction issues per cycle, in program order.")
print("2. Functional units stay busy from Issue until Write Result.")
print("3. A value written on cycle N can be read no earlier than cycle N + 1.")
print("4. Execution starts the cycle after Read Operands.")
print("5. A functional unit freed on cycle N can be reissued no earlier than cycle N + 1.")
print("6. Write Result happens no earlier than the cycle after Execute Complete.")
print("7. A single FU group may support multiple opcodes with different latencies.")
print("8. Live scoreboard snapshots show the state at the end of the displayed cycle.\n")
def PrintInstructionList(instructions: List[Instruction]) -> None:
print("Instruction stream")
print("-" * 88)
for instr in instructions:
print(f"{LabelForInstruction(instr.index)}: {instr.text}")
print("-" * 88)
def BuildStageExplanation(
state: InstructionState,
groups: List[InstructionGroup],
stage_name: str,
) -> str:
instr = state.instruction
group_by_name = {group.name: group for group in groups}
latency = group_by_name[instr.group_name].latency_for(instr.opcode)
if stage_name == "ISSUE":
if state.issue_wait_reasons:
return (
f"Issued on cycle {state.issue_cycle} using {state.assigned_fu}. "
f"It waited because: {' '.join(state.issue_wait_reasons)}"
)
return f"Issued immediately on cycle {state.issue_cycle} using {state.assigned_fu}."
if stage_name == "READ":
if state.read_wait_reasons:
return (
f"Read operands on cycle {state.read_cycle}. "
f"It waited because: {' '.join(state.read_wait_reasons)}"
)
return f"Read operands on the first possible cycle: {state.read_cycle}."
if stage_name == "EXECUTE":
return (
f"{instr.opcode} uses FU group {instr.group_name} with latency {latency}. "
f"Read happened on cycle {state.read_cycle}, so execution completed on "
f"cycle {state.exec_complete_cycle}."
)
if stage_name == "WRITE":
if state.write_wait_reasons:
return (
f"Wrote result on cycle {state.write_cycle}. "
f"It waited because: {' '.join(state.write_wait_reasons)}"
)
return f"Wrote result on the first possible cycle: {state.write_cycle}."
return ""
def PrintScheduleTable(states: List[InstructionState]) -> None:
print("\nCorrect schedule")
print("-" * 88)
print(
f"{'Instr':<6} {'Instruction':<24} {'Issue':<7} {'Read':<7} "
f"{'ExecC':<7} {'Write':<7}"
)
print("-" * 88)
for state in states:
print(
f"{LabelForInstruction(state.instruction.index):<6} {state.instruction.text:<24} "
f"{state.issue_cycle:<7} {state.read_cycle:<7} "
f"{state.exec_complete_cycle:<7} {state.write_cycle:<7}"
)
print("-" * 88)
def StageCycleForName(state: InstructionState, stage_name: str) -> Optional[int]:
if stage_name == "ISSUE":
return state.issue_cycle
if stage_name == "READ":
return state.read_cycle
if stage_name == "EXECUTE":
return state.exec_complete_cycle
if stage_name == "WRITE":
return state.write_cycle
raise ValueError(f"Unknown stage name: {stage_name}")
def EventTextForStage(state: InstructionState, stage_name: str) -> str:
label = LabelForInstruction(state.instruction.index)
if stage_name == "ISSUE":
return f"{label} issued on {state.assigned_fu}."
if stage_name == "READ":
return f"{label} read its operands."
if stage_name == "EXECUTE":
return f"{label} completed execution."
if stage_name == "WRITE":
return f"{label} wrote its result."
raise ValueError(f"Unknown stage name: {stage_name}")
def BuildSpoilerFreeSnapshot(
actual_snapshot: CycleSnapshot,
states: List[InstructionState],
revealed_stages: Set[Tuple[int, str]],
) -> CycleSnapshot:
cycle = actual_snapshot.cycle
instruction_rows: List[Tuple[str, str, str, str, str, str]] = []
for state in states:
index = state.instruction.index
instruction_rows.append(
(
LabelForInstruction(index),
state.instruction.text,
str(state.issue_cycle) if (index, "ISSUE") in revealed_stages else HIDDEN_STAGE,
str(state.read_cycle) if (index, "READ") in revealed_stages else HIDDEN_STAGE,
str(state.exec_complete_cycle) if (index, "EXECUTE") in revealed_stages else HIDDEN_STAGE,
str(state.write_cycle) if (index, "WRITE") in revealed_stages else HIDDEN_STAGE,
)
)
events: List[str] = []
for state in states:
index = state.instruction.index
for stage_name in STAGE_NAMES:
if (index, stage_name) not in revealed_stages:
continue
if StageCycleForName(state, stage_name) == cycle:
events.append(EventTextForStage(state, stage_name))
unit_rows: List[FunctionalUnitSnapshot] = []
for row in actual_snapshot.unit_rows:
if row.instruction_label.startswith("I") and row.instruction_label[1:].isdigit():
instr_index = int(row.instruction_label[1:])
if (instr_index, "ISSUE") not in revealed_stages:
unit_rows.append(
FunctionalUnitSnapshot(
fu_name=row.fu_name,
group_name=row.group_name,
busy=HIDDEN_TOKEN,
instruction_label=HIDDEN_TOKEN,
opcode=HIDDEN_TOKEN,
fi=HIDDEN_TOKEN,
fj=HIDDEN_TOKEN,
fk=HIDDEN_TOKEN,
qj=HIDDEN_TOKEN,
qk=HIDDEN_TOKEN,
rj=HIDDEN_TOKEN,
rk=HIDDEN_TOKEN,
)
)
continue
unit_rows.append(row)
register_rows: List[Tuple[str, str]] = []
for state in states:
index = state.instruction.index
dest = state.instruction.dest
if dest is None or state.issue_cycle is None or state.issue_cycle > cycle:
continue
if (index, "ISSUE") not in revealed_stages:
continue
if state.write_cycle is not None and state.write_cycle <= cycle:
continue
register_rows.append((dest, state.assigned_fu or NONE_TOKEN))
register_rows.sort(key=lambda item: item[0])
return CycleSnapshot(
cycle=cycle,
events=events,
instruction_rows=instruction_rows,
unit_rows=unit_rows,
register_rows=register_rows,
)
def PrintCycleSnapshot(snapshot: CycleSnapshot) -> None:
print(f"\nLive scoreboard snapshot: end of cycle {snapshot.cycle}")
print("-" * 88)
if snapshot.events:
print("Events this cycle:")
for event in snapshot.events:
print(f" - {event}")
else:
print("Events this cycle: none")
print("\nInstruction status")
print("-" * 88)
print(f"{'Instr':<6} {'Instruction':<24} {'Issue':<7} {'Read':<7} {'ExecC':<7} {'Write':<7}")
print("-" * 88)
for label, text, issue, read, exec_complete, write in snapshot.instruction_rows:
print(f"{label:<6} {text:<24} {issue:<7} {read:<7} {exec_complete:<7} {write:<7}")
print("\nFunctional-unit status")
print("-" * 132)
print(
f"{'FU':<16} {'Group':<16} {'Busy':<6} {'Instr':<6} {'Op':<8} "
f"{'Fi':<8} {'Fj':<8} {'Fk':<8} {'Qj':<12} {'Qk':<12} {'Rj':<6} {'Rk':<6}"
)
print("-" * 132)
for row in snapshot.unit_rows:
print(
f"{row.fu_name:<16} {row.group_name:<16} {row.busy:<6} {row.instruction_label:<6} {row.opcode:<8} "
f"{row.fi:<8} {row.fj:<8} {row.fk:<8} {row.qj:<12} {row.qk:<12} {row.rj:<6} {row.rk:<6}"
)
print("\nRegister-result status")
print("-" * 36)
print(f"{'Register':<16} {'Writer FU':<16}")
print("-" * 36)
if snapshot.register_rows:
for register_name, writer_fu in snapshot.register_rows:
print(f"{register_name:<16} {writer_fu:<16}")
else:
print(f"{'(none)':<16} {NONE_TOKEN:<16}")
print("-" * 36)
def AskInteger(prompt: str) -> int:
while True:
text = input(prompt).strip()
if text.lower() in QUIT_WORDS:
raise KeyboardInterrupt
try:
return int(text)
except ValueError:
print("Please enter an integer.")
def AskGroupConfiguration() -> List[InstructionGroup]:
print("Choose a functional-unit configuration:")
print("1. Built-in default")
print("2. Custom interactive configuration")
while True:
choice = input("> ").strip()
if choice == "1":
return BuildDefaultGroups()
if choice == "2":
group_count = AskInteger("How many FU groups? ")
groups: List[InstructionGroup] = []
for group_index in range(group_count):
print(f"\nFU group {group_index + 1}")
name = input("Group name: ").strip().upper()
unit_count = AskInteger("How many functional units in this group? ")
opcode_count = AskInteger("How many opcodes does this group support? ")
opcode_latencies: Dict[str, int] = {}
for opcode_index in range(opcode_count):
while True:
opcode = input(f" Opcode {opcode_index + 1} name: ").strip().upper()
latency = AskInteger(f" Latency for {opcode}: ")
if not opcode:
print(" Opcode cannot be empty.")
continue
if latency <= 0:
print(" Latency must be positive.")
continue
opcode_latencies[opcode] = latency
break
if not name:
print("Group name cannot be empty.")
continue
if not opcode_latencies:
print("You must provide at least one opcode.")
continue
if unit_count <= 0:
print("Unit count must be positive.")
continue
groups.append(
InstructionGroup(
name=name,
unit_count=unit_count,
opcode_latencies=opcode_latencies,
)
)
return groups
print("Enter 1 or 2.")
def AskScenarioLines(opcode_map: Dict[str, InstructionGroup]) -> List[str]:
print("\nChoose an instruction stream:")
print("1. Classic mixed example")
print("2. Short dependency chain")
print("3. WAR-delay example")
print("4. Enter your own instructions")
while True:
choice = input("> ").strip()
if choice in DEFAULT_SCENARIOS:
return DEFAULT_SCENARIOS[choice]
if choice == "4":
print("\nEnter one instruction per line. [-1 to stop]")
print("Accepted forms include:")
print(" OP DEST SRC1 SRC2")
print(" ADD R5, R3, R4")
print(" LD R3, 34(R1)")
print(" LD F1, #9")
print(" SD R7, 4(R0)")
print("Use '-' for a missing operand in normalized form.")
lines: List[str] = []
index = 0
while True:
line = input(f"{LabelForInstruction(index)}> ").strip()
if line=="-1":
break
try:
ParseInstructionLine(line, index, opcode_map)
lines.append(line)
index += 1
except ValueError as exc:
print(f"Invalid instruction: {exc}")
return lines
print("Enter 1, 2, 3, or 4.")
def AskGuessUntilCorrect(
prompt: str,
answer: int,
help_text: str,
explanation: str,
) -> None:
while True:
text = input(prompt).strip().lower()
if text in QUIT_WORDS:
raise KeyboardInterrupt
if text in HELP_WORDS:
print(help_text)
continue
if text in REVEAL_WORDS:
print(f"The correct cycle is {answer}.")
print("Now type that value to continue.")
continue
try:
guess = int(text)
except ValueError:
print("Enter an integer, 'help', 'show', or 'quit'.")
continue
if guess == answer:
print("Correct.")
print(explanation)
return
if guess < answer:
print(f"Incorrect. That is {answer - guess} cycle(s) too early.")
else:
print(f"Incorrect. That is {guess - answer} cycle(s) too late.")
def SnapshotForCycle(snapshots: Dict[int, CycleSnapshot], cycle: int) -> CycleSnapshot:
snapshot = snapshots.get(cycle)
if snapshot is None:
raise RuntimeError(f"Internal error: no saved snapshot for cycle {cycle}.")
return snapshot
def RunQuiz(
states: List[InstructionState],
groups: List[InstructionGroup],
snapshots: Dict[int, CycleSnapshot],
) -> None:
print("\nQuiz started.")
print("Commands during the quiz: help, show, quit")
group_by_name = {group.name: group for group in groups}
revealed_stages: Set[Tuple[int, str]] = set()
max_display_cycle = 0
for state in states:
instr = state.instruction
print("\n" + "=" * 88)
print(f"{LabelForInstruction(instr.index)}: {instr.text}")
group = group_by_name[instr.group_name]
latency = group.latency_for(instr.opcode)
print(
f"FU group: {group.name} | "
f"Functional units: {group.unit_count} | "
f"Opcode latency: {latency}"
)
def reveal_stage(stage_name: str, cycle: int) -> None:
nonlocal max_display_cycle
revealed_stages.add((instr.index, stage_name))
display_cycle = max(max_display_cycle, cycle)
if display_cycle > max_display_cycle:
max_display_cycle = display_cycle
elif cycle < max_display_cycle:
print(
f"Live machine view remains at end of cycle {max_display_cycle} "
f"to avoid rewinding time from cycle {cycle}."
)
actual_snapshot = SnapshotForCycle(snapshots, max_display_cycle)
PrintCycleSnapshot(BuildSpoilerFreeSnapshot(actual_snapshot, states, revealed_stages))
AskGuessUntilCorrect(
"Guess ISSUE cycle: ",
state.issue_cycle or -1,
"Issue waits for a free functional unit and for any WAW hazard to clear.",
BuildStageExplanation(state, groups, "ISSUE"),
)
reveal_stage("ISSUE", state.issue_cycle or -1)
AskGuessUntilCorrect(
"Guess READ cycle: ",
state.read_cycle or -1,
"Read waits until every needed producer has already written in an earlier cycle.",
BuildStageExplanation(state, groups, "READ"),
)
reveal_stage("READ", state.read_cycle or -1)
AskGuessUntilCorrect(
"Guess EXECUTE-COMPLETE cycle: ",
state.exec_complete_cycle or -1,
"Execute Complete is driven by that opcode's latency within its FU group.",
BuildStageExplanation(state, groups, "EXECUTE"),
)
reveal_stage("EXECUTE", state.exec_complete_cycle or -1)
AskGuessUntilCorrect(
"Guess WRITE-RESULT cycle: ",
state.write_cycle or -1,
"Write waits for WAR hazards to clear and happens after Execute Complete.",
BuildStageExplanation(state, groups, "WRITE"),
)
reveal_stage("WRITE", state.write_cycle or -1)
print("\nYou finished the entire instruction stream.")
PrintScheduleTable(states)
def Main() -> None:
print("Register Scoreboarding Practice Program")
print("=" * 88)
try:
groups = AskGroupConfiguration()
opcode_map = BuildOpcodeMap(groups)
scenario_lines = AskScenarioLines(opcode_map)
instructions = [
ParseInstructionLine(line, index, opcode_map)
for index, line in enumerate(scenario_lines)
]
PrintAssumptions()
PrintConfiguration(groups)
PrintInstructionList(instructions)
states, snapshots = SimulateScoreboard(groups, instructions)
RunQuiz(states, groups, snapshots)
except KeyboardInterrupt:
print("\nSession ended.")
if __name__ == "__main__":
Main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment