Skip to content

Instantly share code, notes, and snippets.

@glowka
Created January 14, 2017 00:32
Show Gist options
  • Select an option

  • Save glowka/a3936877cb754d30e3ff753e1b94d67b to your computer and use it in GitHub Desktop.

Select an option

Save glowka/a3936877cb754d30e3ff753e1b94d67b to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
from datetime import timedelta
from celery.schedules import schedule, schedstate
from celery.utils.timeutils import timedelta_seconds, make_aware, is_naive
class schedulesince(schedule):
"""
Basic celery schedule with additional `since` parameter used for calculating
time of execution at exact periods from `since`.
Without that param starting moment is depending on not fully controlled events,
such as last execution of that task or time of worker start.
Use:
schedulesince(run_every=timedelta(days=5), since=datetime(2015,1, 1))
will be called on 1. 6. 11. of jan regardless of worker start time
"""
def __init__(self, *args, **kwargs):
self._since = kwargs.pop('since', None)
super(schedulesince, self).__init__(*args, **kwargs)
def _get_since(self):
if self._since is not None:
return self.maybe_set_tz(self._since)
return None
def _set_since(self, val):
self._since = val
since = property(_get_since, _set_since)
def maybe_set_tz(self, dt):
if is_naive(dt):
return make_aware(dt, self.tz)
return dt
def is_due(self, last_run_at):
if self.since is None: # no since arg? no due, and re-run in 100s
return schedstate(is_due=False, next=100.0)
last_period_since_mom = self.last_since_for_dt(last_run_at)
remaining_delta = self.remaining_estimate(last_period_since_mom)
remaining_seconds = timedelta_seconds(remaining_delta)
if remaining_seconds == 0:
next_period_since_mom = self.last_since_for_dt(self.now()) + timedelta(seconds=self.seconds)
next_period_since_mom_seconds = timedelta_seconds(next_period_since_mom - self.maybe_make_aware(self.now()))
return schedstate(is_due=True, next=next_period_since_mom_seconds)
return schedstate(is_due=False, next=remaining_seconds)
def last_since_for_dt(self, dt):
since = self.maybe_make_aware(self.since)
aware_dt = self.maybe_make_aware(dt)
if aware_dt > since:
seconds_since = timedelta_seconds(aware_dt - since)
return since + timedelta(seconds=int(seconds_since / self.seconds) * self.seconds)
else:
return since - timedelta(seconds=self.seconds)
def __eq__(self, other):
return super(schedulesince, self).__eq__(other) and self.since == other.since
def __reduce__(self):
# note: use '_since' when serializing, pickle uses __dict__ (or other similar method)
# for that case, so property setter does not do the job
return super(schedulesince, self).__reduce__() + ({'_since': self._since},)
def __repr__(self):
return '<freq: {0.human_seconds}, since date {0.since}>'.format(self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment