Last active
March 16, 2026 03:33
-
-
Save AndBondStyle/7fc2838fdcd09702b22145edb2c787d4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import os | |
| import subprocess as sp | |
| from pathlib import Path | |
| import venv | |
| def is_venv(): | |
| return (hasattr(sys, "real_prefix") or | |
| (hasattr(sys, "base_prefix") and sys.base_prefix != sys.prefix)) | |
| def setup_and_relaunch(): | |
| venv_dir = Path("~/.setup/venv").expanduser() | |
| if not venv_dir.exists(): | |
| print("Creating virtual environment...") | |
| venv.create(venv_dir, with_pip=True) | |
| packages = ["nicegui", "pyserial"] | |
| sp.check_call([str(venv_dir / "bin" / "pip"), "install"] + packages) | |
| python_path = str(venv_dir / "bin" / "python") | |
| os.execv(python_path, [python_path] + sys.argv) | |
| if not is_venv(): | |
| setup_and_relaunch() | |
| sys.exit(0) | |
| # ---------------------------------------------------------------- | |
| import asyncio | |
| import time | |
| import abc | |
| from typing import Callable, ClassVar | |
| from nicegui import ui, app | |
| from dataclasses import dataclass | |
| import traceback | |
| from serial import Serial | |
| import struct | |
| import math | |
| import yaml | |
| import re | |
| class BaseTest(abc.ABC): | |
| name: ClassVar[str] = ... | |
| tests: ClassVar[dict] = {} | |
| def __init__(self): | |
| self.status = "NOT RUN" | |
| self.log_func: Callable[[str], None] = None | |
| self.output: ui.column = None | |
| self.run_btn: ui.button = None | |
| def __init_subclass__(cls, **kwargs): | |
| super().__init_subclass__(**kwargs) | |
| BaseTest.tests[cls] = True | |
| def log(self, msg: str, color: str = "default"): | |
| COLORS = { | |
| "red": "\x1b[91m", | |
| "green": "\x1b[92m", | |
| "yellow": "\x1b[93m", | |
| "blue": "\x1b[94m", | |
| "reset": "\x1b[0m", | |
| } | |
| if color != "default": | |
| reset = COLORS["reset"] | |
| code = COLORS.get(color, reset) | |
| msg = f"{code}{msg}{reset}" | |
| self.log_func(msg) | |
| async def shell(self, cmd: str, check: bool = True, timeout: float = 10, **kwargs): | |
| self.log(f">>> $ {cmd}") | |
| proc = await asyncio.create_subprocess_shell( | |
| cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| logs = "" | |
| async def read_stream(stream): | |
| nonlocal logs | |
| while True: | |
| line = await stream.readline() | |
| if not line: break | |
| line = line.decode().rstrip() | |
| logs += line + "\n" | |
| self.log(line) | |
| try: | |
| gather = asyncio.gather(read_stream(proc.stdout), read_stream(proc.stderr)) | |
| await asyncio.wait_for(gather, timeout=timeout) | |
| await proc.wait() | |
| except asyncio.TimeoutError: | |
| proc.kill() | |
| await proc.wait() | |
| assert False, f"Command timed out after {timeout}s" | |
| assert not check or proc.returncode == 0, f"Bad return code: {proc.returncode}" | |
| return logs | |
| async def run(self): | |
| self.log(f"\r\n===== START: {self.name} =====\r\n", color="yellow") | |
| try: | |
| self.output.clear() | |
| self.output.set_visibility(False) | |
| self.run_btn.disable() | |
| self.status = "RUNNING" | |
| res = await self.test() | |
| self.status = "PASS" if res else "FAIL" | |
| except AssertionError as err: | |
| self.log(f"----- ASSERTION FAILED -----\n", color="red") | |
| self.log(str(err) + "\n") | |
| self.status = "FAIL" | |
| except Exception as err: | |
| self.log(f"----- UNHANDLED TEST ERROR -----\n", color="red") | |
| self.log(traceback.format_exc().replace("\n", "\n\r")) | |
| self.status = "FAIL" | |
| color = "green" if self.status == "PASS" else "red" | |
| self.log(f"\r\n===== [{self.status}] {self.name} =====", color=color) | |
| self.run_btn.enable() | |
| @abc.abstractmethod | |
| async def test(self): | |
| pass | |
| class CameraTest(BaseTest): | |
| name = "CAMERA" | |
| async def test(self): | |
| logs = await self.shell("rpicam-hello --list-cameras") | |
| lines = logs.splitlines() | |
| count = sum(1 for line in lines if line.strip() and line[0].isdigit()) | |
| assert count == 1, f"Expected 1 camera, found {count}" | |
| output_file = Path("/tmp/camera-test.png") | |
| output_file.unlink(missing_ok=True) | |
| cmd = ( | |
| "rpicam-still --timeout 1000 --encoding png " | |
| f"--width 1280 --height 720 --output {output_file}" | |
| ) | |
| await self.shell(cmd) | |
| assert output_file.exists(), f"Output file missing: {output_file}" | |
| with self.output.classes(add="h-60 items-center"): | |
| self.output.set_visibility(True) | |
| img = ui.image(output_file).classes("max-h-full").props("fit=contain") | |
| img.force_reload() | |
| return True | |
| class SpeakerMicTest(BaseTest): | |
| name = "SPEAKER + MIC" | |
| async def test(self): | |
| output_file = Path("/tmp/mic-test.wav") | |
| output_file.unlink(missing_ok=True) | |
| record_proc = self.shell( | |
| "arecord -D plughw:0 -c2 -r 48000 -f S32_LE -t wav " | |
| f"-V stereo -d 5 -v {output_file}" | |
| ) | |
| speaker_proc = self.shell("speaker-test -t wav -c 1 -l 3") | |
| await asyncio.gather(record_proc, speaker_proc) | |
| assert output_file.exists(), f"Output file missing: {output_file}" | |
| assert output_file.stat().st_size != 0, f"Output file empty: {output_file}" | |
| self.log("NOTICE: Check recorded audio manually", color="yellow") | |
| with self.output: | |
| self.output.set_visibility(True) | |
| ui.audio(output_file).classes("w-full") | |
| return True | |
| class MCUTest(BaseTest): | |
| name = "STM COMM + IMU CHECK" | |
| @dataclass | |
| class Packet: | |
| accel_x: float | |
| accel_y: float | |
| accel_z: float | |
| gyro_x: float | |
| gyro_y: float | |
| gyro_z: float | |
| mag_x: float | |
| mag_y: float | |
| mag_z: float | |
| left_angle: float | |
| right_angle: float | |
| left_speed: float | |
| right_speed: float | |
| left_lidar: int | |
| right_lidar: int | |
| _sep: ClassVar[int] = 0x7E | |
| _fmt: ClassVar[str] = "<hhhhhhhhhffffHHx" | |
| _size: ClassVar[int] = struct.calcsize(_fmt) | |
| @classmethod | |
| def unpack(cls, buff: bytes): | |
| if len(buff) != cls._size: return | |
| values = struct.unpack(cls._fmt, buff) | |
| return cls( | |
| accel_x=values[0], | |
| accel_y=values[1], | |
| accel_z=values[2], | |
| gyro_x=values[3] * 250.0 / 32768.0 / 180 * math.pi, | |
| gyro_y=values[4] * 250.0 / 32768.0 / 180 * math.pi, | |
| gyro_z=values[5] * 250.0 / 32768.0 / 180 * math.pi, | |
| mag_x=values[6], | |
| mag_y=values[7], | |
| mag_z=values[8], | |
| left_angle=values[9], | |
| right_angle=values[10], | |
| left_speed=values[11], | |
| right_speed=values[12], | |
| left_lidar=values[13], | |
| right_lidar=values[14], | |
| ) | |
| def crc8(self, data: bytes, poly: int = 0x31, init: int = 0xFF) -> int: | |
| crc = init | |
| for byte in data: | |
| crc ^= byte | |
| for _ in range(8): | |
| if crc & 0x80: crc = (crc << 1) ^ poly | |
| else: crc <<= 1 | |
| crc &= 0xFF | |
| return crc | |
| def send_cmd(self, ser: Serial, left: float, right: float, log: bool = True): | |
| buf = struct.pack("<ffB", left, right, 0) | |
| buf = b"\xA0" + buf | |
| buf = b"\x7E" + buf + self.crc8(buf).to_bytes() | |
| if log: self.log(f">>> {':'.join(f'{x:02X}' for x in buf)}") | |
| ser.write(buf) | |
| ser.flush() | |
| async def read_serial(self, ser: Serial, timeout: float = 2.0, packets: list = None, logfmt: str = None, log_packets: bool = True): | |
| if logfmt is None: | |
| logfmt = ( | |
| "gyro: [{0.gyro_x:.2f}, {0.gyro_y:.2f}, {0.gyro_z:.2f}] | " | |
| "accel: [{0.accel_x:.2f}, {0.accel_y:.2f}, {0.accel_z:.2f}] | " | |
| "mag: [{0.mag_x:.2f}, {0.mag_y:.2f}, {0.mag_z:.2f}]\r\n" | |
| "speed: [{0.left_speed:.2f}, {0.right_speed:.2f}] | " | |
| "angle: [{0.left_angle:.2f}, {0.right_angle:.2f}] | " | |
| "lidar: [{0.left_lidar:.2f}, {0.right_lidar:.2f}]" | |
| ) | |
| data = b"" | |
| packets = [] if packets is None else packets | |
| start = time.perf_counter() | |
| while time.perf_counter() - start < timeout: | |
| if not ser.in_waiting: | |
| await asyncio.sleep(0.1) | |
| continue | |
| chunk = ser.read_until(b"\x7E") | |
| data += chunk | |
| if len(data) >= self.Packet._size: | |
| last_sep_index = -1 | |
| for i in range(len(data) - 1, -1, -1): | |
| if data[i] == 0x7E: | |
| last_sep_index = i | |
| break | |
| # self.log(f"last sep: {last_sep_index}") | |
| if last_sep_index == -1: continue | |
| if last_sep_index < self.Packet._size: continue | |
| chunk = data[last_sep_index - self.Packet._size + 1:last_sep_index + 1] | |
| if log_packets: self.log(f"<<< {':'.join(f'{x:02X}' for x in chunk)}") | |
| if chunk.count(0x7E) > 1: | |
| self.log("!!! WARNING: 0x7E inside packet - skipipng") | |
| continue | |
| pack = self.Packet.unpack(chunk) | |
| assert pack is not None, "Failed to parse packet" | |
| # self.log(f"unpack: {pack}") | |
| self.log(logfmt.format(pack)) | |
| if pack is not None: | |
| packets.append(pack) | |
| data = data[last_sep_index + 1:-1] | |
| await asyncio.sleep(0) | |
| return packets | |
| def open_serial(self): | |
| ser = Serial("/dev/ttyAMA0", baudrate=115200, timeout=0.1, exclusive=True) | |
| ser.reset_input_buffer() | |
| ser.reset_output_buffer() | |
| return ser | |
| async def test(self): | |
| ser = self.open_serial() | |
| packets = await self.read_serial(ser) | |
| if not packets: | |
| msg = "No valid packets decoded" | |
| self.log(msg, color="red") | |
| return False | |
| good_imu_count = 0 | |
| for pack in packets: | |
| gyro_ok = (pack.gyro_x != 0 or pack.gyro_y != 0 or pack.gyro_z != 0) | |
| accel_ok = pack.accel_z > 1000 # accel z axis is gravity (in popugai's) | |
| good_imu_count += gyro_ok and accel_ok | |
| msg = f"Good IMU packets: {good_imu_count} out of {len(packets)} total" | |
| self.log(msg, color="yellow") | |
| assert good_imu_count > len(packets) / 2, "Broken IMU" | |
| return True | |
| class MoveTest(MCUTest): | |
| name = "SIMPLE MOVE TEST" | |
| async def test(self): | |
| ser = self.open_serial() | |
| packets = [] | |
| SPEED = 10.0 | |
| MOVES = [ | |
| (+SPEED, +SPEED, 1.0), | |
| (-SPEED, +SPEED, 1.0), | |
| (+SPEED, +SPEED, 1.0), | |
| (+SPEED, -SPEED, 1.0), | |
| ] | |
| RESULTS = [] | |
| async def move_func(): | |
| for left, right, delay in MOVES: | |
| self.log(f"\r\n***** MOVE: [{left:5.1f}, {right:5.1f}] *****\r\n") | |
| self.send_cmd(ser, left, right) | |
| packet_index = len(packets) - 1 | |
| await asyncio.sleep(delay) | |
| RESULTS.append(packets[packet_index:]) | |
| try: | |
| read_func = self.read_serial( | |
| ser, | |
| packets=packets, | |
| timeout=sum(x[2] for x in MOVES), | |
| logfmt="speed: [{0.left_speed:6.1f}, {0.right_speed:6.1f}]", | |
| log_packets=False, | |
| ) | |
| await asyncio.gather(move_func(), read_func) | |
| self.send_cmd(ser, 0.0, 0.0) | |
| self.log("\r\n") | |
| for (left, right, _), results in zip(MOVES, RESULTS): | |
| assert len(results), f"No packets received for move: [{left:.1f}, {right:.1f}]" | |
| left_err = [left - x.left_speed for x in results] | |
| left_rms = math.sqrt(sum(x ** 2 for x in left_err) / len(left_err)) | |
| right_err = [right - x.right_speed for x in results] | |
| right_rms = math.sqrt(sum(x ** 2 for x in right_err) / len(right_err)) | |
| self.log( | |
| f"MOVE [{left:6.1f}, {right:6.1f}] " | |
| f"SPEED RMS ERR: [{left_rms:.3f}, {right_rms:.3f}]" | |
| ) | |
| self.log("\r\nNOTICE: Check speed RMS errors manually", color="yellow") | |
| finally: | |
| self.send_cmd(ser, 0.0, 0.0, log=False) | |
| return True | |
| class DockerROSTest(BaseTest): | |
| name = "DOCKER + ROS" | |
| EXPECTED_NODES = [ | |
| "/hwnode", | |
| "/ld19_node", | |
| "/camera_node", | |
| "/icp_odom", | |
| "/robot_state_publisher", | |
| "/foxglove_bridge", | |
| "/slam_toolbox", | |
| "/nav2_container", | |
| "/bt_navigator", | |
| "/behavior_server", | |
| "/controller_server", | |
| "/planner_server", | |
| "/global_costmap/global_costmap", | |
| "/local_costmap/local_costmap", | |
| "/lifecycle_manager_navigation", | |
| ] | |
| TOLERANCE = 0.9 | |
| TRANSFORMS = [ | |
| ("lidar", "base_link", -1), | |
| ("base_link", "odom", 10.0), | |
| ("odom", "map", 1.0), | |
| ] | |
| TOPICS = [ | |
| ("/scan", 10.0, 3), | |
| ("/hardware/imu", 50.0, 3), | |
| ("/hardware/odom", 50.0, 3), | |
| ("/camera_node/image_raw/compressed", 25.0, 3), | |
| ("/icp/odom", 10.0, 3), | |
| ("/local_costmap/costmap", 1.5, 10), | |
| ("/global_costmap/costmap", 0.5, 10), | |
| ("/map", 0.2, 20), | |
| ] | |
| async def _docker_exec(self, cmd: str, check: bool = True, timeout: float = 10.0) -> str: | |
| cmd = f"docker exec ros bash -c 'export PS1=\"dummy\" && source ~/.bashrc && {cmd}'" | |
| return await self.shell(cmd, check=check, timeout=timeout) | |
| async def _start_container(self): | |
| Path("~/robomarvel/.autostart").expanduser().touch() | |
| await self.shell("docker stop ros", check=False, timeout=10) | |
| await self.shell("docker start ros", timeout=10) | |
| self.log(f"Waiting for ROS to initialize...", color="yellow") | |
| await asyncio.sleep(5) | |
| async def _check_nodes(self): | |
| self.log("\n--- Checking ROS nodes ---", color="yellow") | |
| output = await self._docker_exec("ros2 node list", timeout=10) | |
| nodes = set(line.strip() for line in output.splitlines() if line.strip()) | |
| missing = set(self.EXPECTED_NODES) - nodes | |
| assert not missing, f"Missing nodes: {sorted(missing)}" | |
| self.log(f"All {len(self.EXPECTED_NODES)} expected nodes found", color="green") | |
| async def _check_transforms(self): | |
| self.log("\n--- Checking transforms via view_frames ---", color="yellow") | |
| cmd = "ros2 run tf2_tools view_frames -o /tmp/frames --wait-time 5" | |
| output = await self._docker_exec(cmd, timeout=10) | |
| match = re.search(r'Result:tf2_msgs\.srv\.FrameGraph_Response\(frame_yaml="(.*)"\)', output, re.DOTALL) | |
| assert match, "Could not extract frame_yaml from view_frames output" | |
| data = match.group(1) | |
| data = data.replace('\\"', '"') | |
| data = data.replace("\\n", "\n") | |
| self.log(f"\r\nVIEW FRAMES RESULT:", color="yellow") | |
| self.log(f"{data.replace("\n", "\r\n")}") | |
| frames = yaml.safe_load(data) | |
| for child, parent, min_rate in self.TRANSFORMS: | |
| self.log(f"Checking transform: {child} -> {parent}") | |
| assert child in frames, f"Child frame '{child}' not found" | |
| info = frames[child] | |
| actual_parent = info.get("parent") | |
| assert actual_parent == parent, f"Transform {child} has parent '{actual_parent}', expected '{parent}'" | |
| if min_rate > 0: | |
| rate = info.get("rate") | |
| assert rate is not None, f"Rate not available for dynamic transform {child}" | |
| assert rate >= min_rate * self.TOLERANCE, f"Transform {child} rate too low: {rate:.3f} Hz < {min_rate} Hz" | |
| self.log(f"--> OK: rate = {rate:.3f} Hz (≥ {min_rate})", color="green") | |
| else: | |
| self.log(f"--> OK: static transform", color="green") | |
| async def _check_topic(self, topic: str, min_rate: float, timeout: float): | |
| self.log(f"\n--- Checking topic: {topic} ---", color="yellow") | |
| cmd = f"timeout {timeout} ros2 topic hz {topic}" | |
| output = await self._docker_exec(cmd, timeout=timeout + 1, check=False) | |
| avg_rate = None | |
| for line in output.splitlines(): | |
| if "average rate" in line: | |
| match = re.search(r"average rate:\s+([0-9.]+)", line) | |
| if match: | |
| avg_rate = float(match.group(1)) | |
| break | |
| if avg_rate is None or avg_rate == 0.0: | |
| if "no new messages" in output or not output.strip(): | |
| assert False, f"No messages received" | |
| assert False, f"Could not determine message rate" | |
| assert avg_rate >= min_rate * self.TOLERANCE, f"Rate too low: {avg_rate:.1f} Hz < {min_rate} Hz" | |
| self.log(f"Topic rate OK: {avg_rate:.3f} >> {min_rate} Hz", color="green") | |
| async def test(self): | |
| try: | |
| await self._start_container() | |
| await self._check_nodes() | |
| await self._check_transforms() | |
| for topic, min_rate, timeout in self.TOPICS: | |
| await self._check_topic(topic, min_rate, timeout) | |
| return True | |
| finally: | |
| self.log("\r\nStopping container...") | |
| await self.shell("docker stop ros", check=False, timeout=10) | |
| # ---------------------------------------------------------------- | |
| class StatusLabel(ui.label): | |
| COLORS = { | |
| "running": "text-warning", | |
| "pass": "text-positive", | |
| "fail": "text-negative", | |
| "not run": "text-grey", | |
| } | |
| def _handle_text_change(self, text: str) -> None: | |
| super()._handle_text_change(text) | |
| self.classes(remove=(" ".join(self.COLORS.values()))) | |
| res = self.COLORS.get(text.lower()) | |
| if res is not None: self.classes(add=res) | |
| INTRO = """ | |
| ___ ___ __ __ _____ _____ _____ ___ __ __ _____ ___ ___ _____ ___ | |
| │ _ ╲ _ ) ╲╱ │ ╱ __╲ ╲ ╱ ╱ __│_ _│ __│ ╲╱ │ │_ _│ __╱ __│_ _╱ __│ | |
| │ ╱ _ ╲ │╲╱│ │ ╲__ ╲╲ V ╱╲__ ╲ │ │ │ _││ │╲╱│ │ │ │ │ _│╲__ ╲ │ │ ╲__ ╲ | |
| │_│_╲___╱_│ │_│ │___╱ │_│ │___╱ │_│ │___│_│ │_│ │_│ │___│___╱ │_│ │___╱ | |
| """ | |
| @ui.page("/") | |
| def main_page(): | |
| ui.dark_mode(value=True) | |
| ui.query(".nicegui-content").classes("p-0") | |
| test_instances = [] | |
| with ui.splitter(value=25).classes("w-full h-screen") as splitter: | |
| with splitter.before: | |
| with ui.column().classes("w-full p-4 overflow-auto"): | |
| with ui.row().classes("w-full items-center"): | |
| ui.label("RBM SYSTEM TESTS").classes("text-h5 col") | |
| async def run_all(): | |
| run_all_btn.disable() | |
| for test in test_instances: | |
| await test.run() | |
| run_all_btn.enable() | |
| run_all_btn = ui.button("RUN ALL", on_click=run_all, icon="play_arrow") | |
| ui.separator() | |
| for test_class in BaseTest.tests.keys(): | |
| test = test_class() | |
| test_instances.append(test) | |
| with ui.row().classes("items-center gap-4 p-2 border rounded w-full").style("border-color: rgba(255, 255, 255, 0.25)"): | |
| ui.label(test.name).classes("font-bold col") | |
| StatusLabel().bind_text_from(test, "status") | |
| btn = ui.button("RUN", on_click=test.run).props("outline") | |
| output = ui.row().classes("w-full") | |
| output.set_visibility(False) | |
| test.log_func = lambda line: terminal.write(line + "\r\n") | |
| test.output = output | |
| test.run_btn = btn | |
| with splitter.after: | |
| with ui.column().classes("w-full h-full p-4"): | |
| terminal = ui.xterm().classes("size-full") | |
| ui.element("q-resize-observer").on("resize", terminal.fit) | |
| intro = INTRO + "—" * (len(INTRO.splitlines()[-1]) + 1) + "\n" | |
| intro = intro.replace("\n", "\r\n") | |
| terminal.write(intro) | |
| sp.check_call("docker stop ros", shell=True, stderr=sp.DEVNULL, stdout=sp.DEVNULL) | |
| ui.run(title="RBM TESTS", show=False, port=8888, favicon="✅", reconnect_timeout=10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment