Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix:tests
Browse files Browse the repository at this point in the history
cunla committed Oct 15, 2024
1 parent 8d2bc63 commit cf85c2f
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions scheduler/tests/test_mgmt_cmds.py
Original file line number Diff line number Diff line change
@@ -233,26 +233,28 @@ def test_import__should_schedule_job(self):
call_command("import", filename=self.tmpfile.name)
# assert
self.assertEqual(1, Task.objects.filter(task_type=TaskType.ONCE).count())
self.assertEqual(1, Task.objects.filter(task_type=TaskType.REPEATABLE).count())
db_job = Task.objects.filter(task_type=TaskType.ONCE).first()
attrs = ["name", "queue", "callable", "enabled", "timeout"]
for attr in attrs:
self.assertEqual(getattr(jobs[0], attr), getattr(db_job, attr))

def test_import__should_schedule_job_yaml(self):
jobs = list()
jobs.append(task_factory(ScheduledTask, enabled=True, instance_only=True))
jobs.append(task_factory(RepeatableTask, enabled=True, instance_only=True))
res = yaml.dump([j.to_dict() for j in jobs], default_flow_style=False)
tasks = list()
tasks.append(task_factory(ScheduledTask, enabled=True, instance_only=True))
tasks.append(task_factory(RepeatableTask, enabled=True, instance_only=True))
res = yaml.dump([j.to_dict() for j in tasks], default_flow_style=False)
self.tmpfile.write(res)
self.tmpfile.flush()
# act
call_command("import", filename=self.tmpfile.name, format="yaml")
# assert
self.assertEqual(1, Task.objects.filter(task_type=TaskType.ONCE).count())
db_job = Task.objects.filter(task_type=TaskType.ONCE).objects.first()
self.assertEqual(1, Task.objects.filter(task_type=TaskType.REPEATABLE).count())
db_job = Task.objects.filter(task_type=TaskType.ONCE).first()
attrs = ["name", "queue", "callable", "enabled", "timeout"]
for attr in attrs:
self.assertEqual(getattr(jobs[0], attr), getattr(db_job, attr))
self.assertEqual(getattr(tasks[0], attr), getattr(db_job, attr))

def test_import__should_schedule_job_yaml_without_yaml_lib(self):
jobs = list()
@@ -295,10 +297,10 @@ def test_import__should_schedule_job_reset(self):
self.assertEqual(getattr(jobs[1], attr), getattr(db_job, attr))

def test_import__should_schedule_job_update_existing(self):
jobs = list()
task_factory(ScheduledTask, enabled=True)
jobs.append(task_factory(ScheduledTask, enabled=True))
res = json.dumps([j.to_dict() for j in jobs])
tasks = list()
tasks.append(task_factory(ScheduledTask, enabled=True))
tasks.append(task_factory(ScheduledTask, enabled=True))
res = json.dumps([j.to_dict() for j in tasks])
self.tmpfile.write(res)
self.tmpfile.flush()
# act
@@ -309,17 +311,16 @@ def test_import__should_schedule_job_update_existing(self):
)
# assert
self.assertEqual(2, Task.objects.filter(task_type=TaskType.ONCE).count())
db_job = Task.objects.filter(task_type=TaskType.ONCE).get(name=jobs[0].name)
self.assertNotEqual(jobs[0].id, db_job.id)
db_job = Task.objects.filter(task_type=TaskType.ONCE).get(name=tasks[0].name)
attrs = ["name", "queue", "callable", "enabled", "timeout"]
for attr in attrs:
self.assertEqual(getattr(jobs[0], attr), getattr(db_job, attr))
self.assertEqual(getattr(tasks[0], attr), getattr(db_job, attr))

def test_import__should_schedule_job_without_update_existing(self):
jobs = list()
task_factory(ScheduledTask, enabled=True)
jobs.append(task_factory(ScheduledTask, enabled=True))
res = json.dumps([j.to_dict() for j in jobs])
tasks = list()
tasks.append(task_factory(ScheduledTask, enabled=True))
tasks.append(task_factory(ScheduledTask, enabled=True))
res = json.dumps([j.to_dict() for j in tasks])
self.tmpfile.write(res)
self.tmpfile.flush()
# act
@@ -329,7 +330,7 @@ def test_import__should_schedule_job_without_update_existing(self):
)
# assert
self.assertEqual(2, Task.objects.filter(task_type=TaskType.ONCE).count())
db_job = Task.objects.get(name=jobs[0].name)
db_job = Task.objects.get(name=tasks[0].name)
attrs = ["id", "name", "queue", "callable", "enabled", "timeout"]
for attr in attrs:
self.assertEqual(getattr(jobs[0], attr), getattr(db_job, attr))
self.assertEqual(getattr(tasks[0], attr), getattr(db_job, attr))

0 comments on commit cf85c2f

Please sign in to comment.