Skip to content

lms_public.schedule_tasks

schedule_periodic_tasks(user_id) async

Schedule 2 periodic tasks: 1. Update courses every Thursday at midnight 2. Check messages every 5 minutes

Parameters:

Name Type Description Default
user_id int

The id of the LMSUser instance

required
Source code in src/lms_public/schedule_tasks.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
async def schedule_periodic_tasks(user_id: int):
    """Schedule 2 periodic tasks:
    1. Update courses every Thursday at midnight
    2. Check messages every 5 minutes

    Args:
        user_id (int): The id of the LMSUser instance
    """

    # Task 1: Every Thursday at midnight
    thursday_schedule, _ = await CrontabSchedule.objects.aget_or_create(
        minute="0",
        hour="0",
        day_of_week="4",  # Thursday
        day_of_month="*",
        month_of_year="*",
    )
    await PeriodicTask.objects.aupdate_or_create(
        name=f"Update courses for user {user_id}",
        task="lms_public.tasks.update_user_courses_task",
        defaults={
            "crontab": thursday_schedule,
            "args": json.dumps([user_id]),
        },
    )

    # Task 2: Every 5 minutes (or user-defined interval)
    user_pref, _ = await UserNotificationPreference.objects.aget_or_create(
        user_id=user_id,
        defaults={"public_lms_interval_minutes": 5},
    )
    interval_schedule, _ = await IntervalSchedule.objects.aget_or_create(
        every=user_pref.public_lms_interval_minutes,
        period=IntervalSchedule.MINUTES,
    )
    await PeriodicTask.objects.aupdate_or_create(
        name=f"Check messages for user {user_id}",
        task="lms_public.tasks.check_new_messages_task",
        defaults={
            "interval": interval_schedule,
            "args": json.dumps([user_id]),
        },
    )