Skip to content

Instantly share code, notes, and snippets.

@shyan1
Forked from mplewis/safe_schedule.py
Created September 18, 2023 05:12
Show Gist options
  • Select an option

  • Save shyan1/e06038332b0cb8381297019da1482f70 to your computer and use it in GitHub Desktop.

Select an option

Save shyan1/e06038332b0cb8381297019da1482f70 to your computer and use it in GitHub Desktop.

Revisions

  1. @mplewis mplewis revised this gist Jul 28, 2014. 1 changed file with 38 additions and 0 deletions.
    38 changes: 38 additions & 0 deletions test_safe_schedule.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,38 @@
    #!/usr/bin/env python3

    import time
    from safe_schedule import SafeScheduler


    def good_task_1():
    print('Good Task 1')


    def good_task_2():
    print('Good Task 2')


    def good_task_3():
    print('Good Task 3')


    def bad_task_1():
    print('Bad Task 1')
    print(1/0)


    def bad_task_2():
    print('Bad Task 2')
    raise Exception('Something went wrong!')


    scheduler = SafeScheduler()
    scheduler.every(3).seconds.do(good_task_1)
    scheduler.every(5).seconds.do(bad_task_1)
    scheduler.every(7).seconds.do(good_task_2)
    scheduler.every(8).seconds.do(bad_task_2)
    scheduler.every(12).seconds.do(good_task_3)

    while True:
    scheduler.run_pending()
    time.sleep(1)
  2. @mplewis mplewis created this gist Jul 28, 2014.
    36 changes: 36 additions & 0 deletions safe_schedule.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,36 @@
    import logging
    from traceback import format_exc
    import datetime

    from schedule import Scheduler


    logger = logging.getLogger('schedule')


    class SafeScheduler(Scheduler):
    """
    An implementation of Scheduler that catches jobs that fail, logs their
    exception tracebacks as errors, optionally reschedules the jobs for their
    next run time, and keeps going.
    Use this to run jobs that may or may not crash without worrying about
    whether other jobs will run or if they'll crash the entire script.
    """

    def __init__(self, reschedule_on_failure=True):
    """
    If reschedule_on_failure is True, jobs will be rescheduled for their
    next run as if they had completed successfully. If False, they'll run
    on the next run_pending() tick.
    """
    self.reschedule_on_failure = reschedule_on_failure
    super().__init__()

    def _run_job(self, job):
    try:
    super()._run_job(job)
    except Exception:
    logger.error(format_exc())
    job.last_run = datetime.datetime.now()
    job._schedule_next_run()