Created
January 15, 2022 21:49
-
-
Save acolomb/a583b05b3389cb57a406bc979e2f4f3e to your computer and use it in GitHub Desktop.
Synchronized CANopen communication cycle for multiple remotes with deterministic, drift-free timing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import sched | |
| import logging | |
| import threading | |
| import functools | |
| import math | |
| logger = logging.getLogger(__name__) | |
| class CommCycle: | |
| """Deterministic timing for cyclic communication and calculations.""" | |
| SYNC = 0.000 # [seconds] | |
| SYNC_RESET_PRIO = 0 | |
| SYNC_SEND_PRIO = 1 | |
| FEEDBACK_AWAIT = 0.020 # [seconds] | |
| FEEDBACK_CHECK_PRIO = 0 | |
| FEEDBACK_DEADLINE = 0.022 # [seconds] | |
| CALCULATE = FEEDBACK_DEADLINE | |
| COMMAND_SEND = 0.030 # [seconds] | |
| CYCLE_FINISH = 0.039 # [seconds] | |
| CYCLE_CHECK_PRIO = 0 | |
| # Overall duration of one calculation and communication cycle | |
| CYCLE_PERIOD = 0.040 # [seconds] | |
| def __init__(self): | |
| self.start_time = 0.0 | |
| self.tasks = {} | |
| self.queued = [] | |
| self._feedback_condition = threading.Condition() | |
| self._expected_feedback = {} | |
| self.schedule = sched.scheduler() | |
| self.worker_thread = None | |
| self.terminate_cb = None | |
| def add_task(self, t, prio, task, override=False): | |
| if not override and (t, prio) in self.tasks: | |
| logger.warning('Not overriding cyclic task at %f prio %d', t, prio) | |
| return | |
| logger.debug('Cyclic task at %f prio %d', t, prio) | |
| self.tasks[t, prio] = task | |
| class EndTaskException(Exception): | |
| def __call__(self): | |
| raise self | |
| def start(self, thread_name: str = None): | |
| self.add_task(self.SYNC, self.SYNC_RESET_PRIO, | |
| self.reset_feedback, override=True) | |
| self.add_task(self.FEEDBACK_AWAIT, self.FEEDBACK_CHECK_PRIO, | |
| self.check_feedback, override=True) | |
| self.add_task(self.CYCLE_FINISH, self.CYCLE_CHECK_PRIO, | |
| self.check_finished, override=True) | |
| # Tasks will start after the first complete period | |
| self.start_time = time.monotonic() + self.CYCLE_PERIOD | |
| def run(): | |
| logger.info('Cyclic thread started') | |
| while True: | |
| try: | |
| self.schedule.run() | |
| except self.EndTaskException: | |
| break | |
| except Exception as e: | |
| logger.exception('Aborting communication cycle') | |
| if self.terminate_cb is not None: | |
| self.terminate_cb(e) | |
| break | |
| self.clear_schedule() | |
| logger.info('Cyclic thread ending') | |
| if self.worker_thread is not None: | |
| self.worker_thread.join() | |
| self.worker_thread = threading.Thread( | |
| target=run, name=thread_name or __name__, | |
| daemon=True) | |
| self.worker_thread.start() | |
| self.queue_cycle() | |
| def stop(self, delay: int = 0): | |
| """Abort the scheduled tasks and let the background thread end. | |
| :param delay: Optional number of cycles to keep running for. | |
| """ | |
| if self.worker_thread is not None: | |
| delay *= self.CYCLE_PERIOD | |
| # Queue an invalid object to terminate the worker thread | |
| self.schedule.enter(delay, 0, self.EndTaskException()) | |
| def join(self): | |
| if self.worker_thread is not None: | |
| self.worker_thread.join() | |
| self.worker_thread = None | |
| @property | |
| def now(self): | |
| return time.monotonic() - self.start_time | |
| def check_feedback(self): | |
| logger.debug('Checking feedback at %f', self.now) | |
| with self._feedback_condition: | |
| timeout = max(self.FEEDBACK_DEADLINE - self.now, 0) | |
| completed = self._feedback_condition.wait_for( | |
| self.feedback_complete, timeout) | |
| logger.debug('Feedback completed at %f: %s', self.now, completed) | |
| if not completed: # FIXME | |
| # Discard all further tasks and prepare next cycle | |
| self.clear_schedule() | |
| self.queue_cycle() | |
| def check_finished(self): | |
| elapsed = self.now | |
| if elapsed >= self.CYCLE_PERIOD: | |
| # Cycle is running late, missed next starting point | |
| logger.debug('Elapsed time overrun by %f s', | |
| elapsed - self.CYCLE_PERIOD) | |
| else: | |
| logger.debug('Cycle finished in %f s', elapsed) | |
| self.clear_schedule() | |
| self.queue_cycle() | |
| def clear_schedule(self): | |
| if not self.schedule.empty(): | |
| logger.debug('Clearing schedule at %f', time.monotonic()) | |
| for event in self.queued: | |
| try: | |
| self.schedule.cancel(event) | |
| except ValueError: | |
| pass | |
| self.queued.clear() | |
| def queue_cycle(self): | |
| delay_cycles = math.floor(self.now / self.CYCLE_PERIOD) | |
| if delay_cycles > 0: | |
| logger.debug('Skipping %d cycles to compensate overrun', delay_cycles) | |
| # If the cycle was already overrun, advance start time to next sensible value | |
| self.start_time += (1 + delay_cycles) * self.CYCLE_PERIOD | |
| for (t, prio), task in self.tasks.items(): | |
| event = self.schedule.enterabs( | |
| self.start_time + t, prio, task) | |
| self.queued.append(event) | |
| def expect_feedback(self, prio): | |
| t = self.FEEDBACK_AWAIT | |
| logger.debug('Feedback expected at %f prio %d', t, prio) | |
| self._expected_feedback[prio] = False | |
| def got_feedback(self, prio, *args): | |
| t = self.FEEDBACK_AWAIT | |
| delay = t - self.now | |
| logger.debug('Feedback received with delay %f prio %d', delay, prio) | |
| with self._feedback_condition: | |
| self._expected_feedback[prio] = True | |
| self._feedback_condition.notify_all() | |
| def feedback_complete(self): | |
| return all(self._expected_feedback.values()) | |
| def reset_feedback(self, done=False): | |
| with self._feedback_condition: # FIXME needed? | |
| for i in self._expected_feedback: | |
| self._expected_feedback[i] = bool(done) | |
| def add_command(self, prio, task): | |
| t = self.COMMAND_SEND | |
| self.add_task(t, prio, task) | |
| def add_calculation(self, task, prio: int = None): | |
| if prio is None: | |
| try: | |
| prio = 1 + max(prio for (t, prio) in self.tasks if t == self.CALCULATE) | |
| except ValueError: | |
| prio = 1 | |
| self.add_task(self.CALCULATE, prio, task) | |
| if __name__ == '__main__': | |
| logging.basicConfig(level=logging.DEBUG) | |
| cc = CommCycle() | |
| def test_feedback(): | |
| logger.debug('SYNC') | |
| time.sleep(0.005) | |
| logger.debug('FB1') | |
| cc.got_feedback(10) | |
| time.sleep(0.005) | |
| logger.debug('FB2') | |
| cc.got_feedback(20) | |
| def test_exception(): | |
| raise RuntimeError('Test') | |
| cc.add_task(CommCycle.SYNC, 0, test_feedback) | |
| cc.expect_feedback(10) | |
| cc.expect_feedback(20) | |
| cc.add_command(10, functools.partial(logger.debug, 'CMD')) | |
| cc.terminate_cb = print | |
| cc.start() | |
| print('Interrupt to quit...') | |
| while True: | |
| try: | |
| time.sleep(3) | |
| except KeyboardInterrupt: | |
| break | |
| print('Provoking exception') | |
| cc.add_task(CommCycle.SYNC, 1, test_exception) | |
| time.sleep(2 * CommCycle.CYCLE_PERIOD) | |
| cc.stop() | |
| cc.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment