Created
January 14, 2017 00:32
-
-
Save glowka/a3936877cb754d30e3ff753e1b94d67b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import absolute_import | |
| from datetime import timedelta | |
| from celery.schedules import schedule, schedstate | |
| from celery.utils.timeutils import timedelta_seconds, make_aware, is_naive | |
| class schedulesince(schedule): | |
| """ | |
| Basic celery schedule with additional `since` parameter used for calculating | |
| time of execution at exact periods from `since`. | |
| Without that param starting moment is depending on not fully controlled events, | |
| such as last execution of that task or time of worker start. | |
| Use: | |
| schedulesince(run_every=timedelta(days=5), since=datetime(2015,1, 1)) | |
| will be called on 1. 6. 11. of jan regardless of worker start time | |
| """ | |
| def __init__(self, *args, **kwargs): | |
| self._since = kwargs.pop('since', None) | |
| super(schedulesince, self).__init__(*args, **kwargs) | |
| def _get_since(self): | |
| if self._since is not None: | |
| return self.maybe_set_tz(self._since) | |
| return None | |
| def _set_since(self, val): | |
| self._since = val | |
| since = property(_get_since, _set_since) | |
| def maybe_set_tz(self, dt): | |
| if is_naive(dt): | |
| return make_aware(dt, self.tz) | |
| return dt | |
| def is_due(self, last_run_at): | |
| if self.since is None: # no since arg? no due, and re-run in 100s | |
| return schedstate(is_due=False, next=100.0) | |
| last_period_since_mom = self.last_since_for_dt(last_run_at) | |
| remaining_delta = self.remaining_estimate(last_period_since_mom) | |
| remaining_seconds = timedelta_seconds(remaining_delta) | |
| if remaining_seconds == 0: | |
| next_period_since_mom = self.last_since_for_dt(self.now()) + timedelta(seconds=self.seconds) | |
| next_period_since_mom_seconds = timedelta_seconds(next_period_since_mom - self.maybe_make_aware(self.now())) | |
| return schedstate(is_due=True, next=next_period_since_mom_seconds) | |
| return schedstate(is_due=False, next=remaining_seconds) | |
| def last_since_for_dt(self, dt): | |
| since = self.maybe_make_aware(self.since) | |
| aware_dt = self.maybe_make_aware(dt) | |
| if aware_dt > since: | |
| seconds_since = timedelta_seconds(aware_dt - since) | |
| return since + timedelta(seconds=int(seconds_since / self.seconds) * self.seconds) | |
| else: | |
| return since - timedelta(seconds=self.seconds) | |
| def __eq__(self, other): | |
| return super(schedulesince, self).__eq__(other) and self.since == other.since | |
| def __reduce__(self): | |
| # note: use '_since' when serializing, pickle uses __dict__ (or other similar method) | |
| # for that case, so property setter does not do the job | |
| return super(schedulesince, self).__reduce__() + ({'_since': self._since},) | |
| def __repr__(self): | |
| return '<freq: {0.human_seconds}, since date {0.since}>'.format(self) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment