Last active
November 6, 2017 19:37
-
-
Save igniteflow/7021978 to your computer and use it in GitHub Desktop.
Testing deferred tasks with Django Appengine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from django.test import TestCase | |
| from google.appengine.ext import deferred | |
| from google.appengine.ext import testbed | |
| from foo.tasks import fake_task | |
| class TasksTest(TestCase): | |
| def setUp(self): | |
| self.testbed = testbed.Testbed() | |
| self.testbed.activate() | |
| self.testbed.init_taskqueue_stub(root_path='.') # root_path is where queue.yaml lives | |
| self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME) | |
| def tearDown(self): | |
| self.testbed.deactivate() | |
| def test_bar(self): | |
| # call a function that triggers deferred.defer() | |
| fake_task() | |
| # Get the task out of the queue | |
| tasks = self.taskqueue_stub.get_filtered_tasks() | |
| self.assertEqual(1, len(tasks)) | |
| # Run the task | |
| task = tasks[0] | |
| deferred.run(task.payload) | |
| # Assert that state changed as expected | |
| self.assertTrue(...) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks!