Skip to content

Instantly share code, notes, and snippets.

@acolomb
Created January 15, 2022 21:49
Show Gist options
  • Select an option

  • Save acolomb/a583b05b3389cb57a406bc979e2f4f3e to your computer and use it in GitHub Desktop.

Select an option

Save acolomb/a583b05b3389cb57a406bc979e2f4f3e to your computer and use it in GitHub Desktop.
Synchronized CANopen communication cycle for multiple remotes with deterministic, drift-free timing
import time
import sched
import logging
import threading
import functools
import math
logger = logging.getLogger(__name__)
class CommCycle:
"""Deterministic timing for cyclic communication and calculations."""
SYNC = 0.000 # [seconds]
SYNC_RESET_PRIO = 0
SYNC_SEND_PRIO = 1
FEEDBACK_AWAIT = 0.020 # [seconds]
FEEDBACK_CHECK_PRIO = 0
FEEDBACK_DEADLINE = 0.022 # [seconds]
CALCULATE = FEEDBACK_DEADLINE
COMMAND_SEND = 0.030 # [seconds]
CYCLE_FINISH = 0.039 # [seconds]
CYCLE_CHECK_PRIO = 0
# Overall duration of one calculation and communication cycle
CYCLE_PERIOD = 0.040 # [seconds]
def __init__(self):
self.start_time = 0.0
self.tasks = {}
self.queued = []
self._feedback_condition = threading.Condition()
self._expected_feedback = {}
self.schedule = sched.scheduler()
self.worker_thread = None
self.terminate_cb = None
def add_task(self, t, prio, task, override=False):
if not override and (t, prio) in self.tasks:
logger.warning('Not overriding cyclic task at %f prio %d', t, prio)
return
logger.debug('Cyclic task at %f prio %d', t, prio)
self.tasks[t, prio] = task
class EndTaskException(Exception):
def __call__(self):
raise self
def start(self, thread_name: str = None):
self.add_task(self.SYNC, self.SYNC_RESET_PRIO,
self.reset_feedback, override=True)
self.add_task(self.FEEDBACK_AWAIT, self.FEEDBACK_CHECK_PRIO,
self.check_feedback, override=True)
self.add_task(self.CYCLE_FINISH, self.CYCLE_CHECK_PRIO,
self.check_finished, override=True)
# Tasks will start after the first complete period
self.start_time = time.monotonic() + self.CYCLE_PERIOD
def run():
logger.info('Cyclic thread started')
while True:
try:
self.schedule.run()
except self.EndTaskException:
break
except Exception as e:
logger.exception('Aborting communication cycle')
if self.terminate_cb is not None:
self.terminate_cb(e)
break
self.clear_schedule()
logger.info('Cyclic thread ending')
if self.worker_thread is not None:
self.worker_thread.join()
self.worker_thread = threading.Thread(
target=run, name=thread_name or __name__,
daemon=True)
self.worker_thread.start()
self.queue_cycle()
def stop(self, delay: int = 0):
"""Abort the scheduled tasks and let the background thread end.
:param delay: Optional number of cycles to keep running for.
"""
if self.worker_thread is not None:
delay *= self.CYCLE_PERIOD
# Queue an invalid object to terminate the worker thread
self.schedule.enter(delay, 0, self.EndTaskException())
def join(self):
if self.worker_thread is not None:
self.worker_thread.join()
self.worker_thread = None
@property
def now(self):
return time.monotonic() - self.start_time
def check_feedback(self):
logger.debug('Checking feedback at %f', self.now)
with self._feedback_condition:
timeout = max(self.FEEDBACK_DEADLINE - self.now, 0)
completed = self._feedback_condition.wait_for(
self.feedback_complete, timeout)
logger.debug('Feedback completed at %f: %s', self.now, completed)
if not completed: # FIXME
# Discard all further tasks and prepare next cycle
self.clear_schedule()
self.queue_cycle()
def check_finished(self):
elapsed = self.now
if elapsed >= self.CYCLE_PERIOD:
# Cycle is running late, missed next starting point
logger.debug('Elapsed time overrun by %f s',
elapsed - self.CYCLE_PERIOD)
else:
logger.debug('Cycle finished in %f s', elapsed)
self.clear_schedule()
self.queue_cycle()
def clear_schedule(self):
if not self.schedule.empty():
logger.debug('Clearing schedule at %f', time.monotonic())
for event in self.queued:
try:
self.schedule.cancel(event)
except ValueError:
pass
self.queued.clear()
def queue_cycle(self):
delay_cycles = math.floor(self.now / self.CYCLE_PERIOD)
if delay_cycles > 0:
logger.debug('Skipping %d cycles to compensate overrun', delay_cycles)
# If the cycle was already overrun, advance start time to next sensible value
self.start_time += (1 + delay_cycles) * self.CYCLE_PERIOD
for (t, prio), task in self.tasks.items():
event = self.schedule.enterabs(
self.start_time + t, prio, task)
self.queued.append(event)
def expect_feedback(self, prio):
t = self.FEEDBACK_AWAIT
logger.debug('Feedback expected at %f prio %d', t, prio)
self._expected_feedback[prio] = False
def got_feedback(self, prio, *args):
t = self.FEEDBACK_AWAIT
delay = t - self.now
logger.debug('Feedback received with delay %f prio %d', delay, prio)
with self._feedback_condition:
self._expected_feedback[prio] = True
self._feedback_condition.notify_all()
def feedback_complete(self):
return all(self._expected_feedback.values())
def reset_feedback(self, done=False):
with self._feedback_condition: # FIXME needed?
for i in self._expected_feedback:
self._expected_feedback[i] = bool(done)
def add_command(self, prio, task):
t = self.COMMAND_SEND
self.add_task(t, prio, task)
def add_calculation(self, task, prio: int = None):
if prio is None:
try:
prio = 1 + max(prio for (t, prio) in self.tasks if t == self.CALCULATE)
except ValueError:
prio = 1
self.add_task(self.CALCULATE, prio, task)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
cc = CommCycle()
def test_feedback():
logger.debug('SYNC')
time.sleep(0.005)
logger.debug('FB1')
cc.got_feedback(10)
time.sleep(0.005)
logger.debug('FB2')
cc.got_feedback(20)
def test_exception():
raise RuntimeError('Test')
cc.add_task(CommCycle.SYNC, 0, test_feedback)
cc.expect_feedback(10)
cc.expect_feedback(20)
cc.add_command(10, functools.partial(logger.debug, 'CMD'))
cc.terminate_cb = print
cc.start()
print('Interrupt to quit...')
while True:
try:
time.sleep(3)
except KeyboardInterrupt:
break
print('Provoking exception')
cc.add_task(CommCycle.SYNC, 1, test_exception)
time.sleep(2 * CommCycle.CYCLE_PERIOD)
cc.stop()
cc.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment