Skip to content

Instantly share code, notes, and snippets.

@igniteflow
Last active November 6, 2017 19:37
Show Gist options
  • Select an option

  • Save igniteflow/7021978 to your computer and use it in GitHub Desktop.

Select an option

Save igniteflow/7021978 to your computer and use it in GitHub Desktop.
Testing deferred tasks with Django Appengine
from django.test import TestCase
from google.appengine.ext import deferred
from google.appengine.ext import testbed
from foo.tasks import fake_task
class TasksTest(TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_taskqueue_stub(root_path='.') # root_path is where queue.yaml lives
self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
def tearDown(self):
self.testbed.deactivate()
def test_bar(self):
# call a function that triggers deferred.defer()
fake_task()
# Get the task out of the queue
tasks = self.taskqueue_stub.get_filtered_tasks()
self.assertEqual(1, len(tasks))
# Run the task
task = tasks[0]
deferred.run(task.payload)
# Assert that state changed as expected
self.assertTrue(...)
@egalpin
Copy link

egalpin commented Nov 6, 2017

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment