Schedule 2 periodic tasks:
1. Update courses every Thursday at midnight
2. Check messages every 5 minutes
Parameters:
Name |
Type |
Description |
Default |
user_id
|
int
|
The id of the LMSUser instance
|
required
|
Source code in src/lms_public/schedule_tasks.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 | async def schedule_periodic_tasks(user_id: int):
"""Schedule 2 periodic tasks:
1. Update courses every Thursday at midnight
2. Check messages every 5 minutes
Args:
user_id (int): The id of the LMSUser instance
"""
# Task 1: Every Thursday at midnight
thursday_schedule, _ = await CrontabSchedule.objects.aget_or_create(
minute="0",
hour="0",
day_of_week="4", # Thursday
day_of_month="*",
month_of_year="*",
)
await PeriodicTask.objects.aupdate_or_create(
name=f"Update courses for user {user_id}",
task="lms_public.tasks.update_user_courses_task",
defaults={
"crontab": thursday_schedule,
"args": json.dumps([user_id]),
},
)
# Task 2: Every 5 minutes (or user-defined interval)
user_pref, _ = await UserNotificationPreference.objects.aget_or_create(
user_id=user_id,
defaults={"public_lms_interval_minutes": 5},
)
interval_schedule, _ = await IntervalSchedule.objects.aget_or_create(
every=user_pref.public_lms_interval_minutes,
period=IntervalSchedule.MINUTES,
)
await PeriodicTask.objects.aupdate_or_create(
name=f"Check messages for user {user_id}",
task="lms_public.tasks.check_new_messages_task",
defaults={
"interval": interval_schedule,
"args": json.dumps([user_id]),
},
)
|