From 4184280d4e13728b2aeb11867af79f0c6a98f678 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 7 Mar 2024 16:14:06 +0200 Subject: [PATCH] Add basic scheduler for plugins --- maubot/plugin_base.py | 5 ++ maubot/scheduler.py | 159 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 maubot/scheduler.py diff --git a/maubot/plugin_base.py b/maubot/plugin_base.py index d7b6bbd..5e967dc 100644 --- a/maubot/plugin_base.py +++ b/maubot/plugin_base.py @@ -27,6 +27,8 @@ from mautrix.util.async_db import Database, UpgradeTable from mautrix.util.config import BaseProxyConfig from mautrix.util.logging import TraceLogger +from .scheduler import BasicScheduler + if TYPE_CHECKING: from .client import MaubotMatrixClient from .loader import BasePluginLoader @@ -40,6 +42,7 @@ class Plugin(ABC): log: TraceLogger loop: AbstractEventLoop loader: BasePluginLoader + sched: BasicScheduler config: BaseProxyConfig | None database: Engine | Database | None webapp: PluginWebApp | None @@ -58,6 +61,7 @@ class Plugin(ABC): webapp_url: str | None, loader: BasePluginLoader, ) -> None: + self.sched = BasicScheduler(log=log.getChild("scheduler")) self.client = client self.loop = loop self.http = http @@ -117,6 +121,7 @@ class Plugin(ABC): self.client.remove_event_handler(event_type, func) if self.webapp is not None: self.webapp.clear() + self.sched.stop() await self.stop() async def stop(self) -> None: diff --git a/maubot/scheduler.py b/maubot/scheduler.py new file mode 100644 index 0000000..0cb39ed --- /dev/null +++ b/maubot/scheduler.py @@ -0,0 +1,159 @@ +# maubot - A plugin-based Matrix bot system. +# Copyright (C) 2024 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from __future__ import annotations + +from typing import Awaitable, Callable +import asyncio +import logging + + +class BasicScheduler: + background_loop: asyncio.Task | None + tasks: set[asyncio.Task] + log: logging.Logger + + def __init__(self, log: logging.Logger) -> None: + self.log = log + self.tasks = set() + + def _find_caller(self) -> str: + try: + file_name, line_number, function_name, _ = self.log.findCaller() + return f"{function_name} at {file_name}:{line_number}" + except ValueError: + return "unknown function" + + def run_periodically( + self, + period: float | int, + func: Callable[[], Awaitable], + run_task_in_background: bool = False, + catch_errors: bool = True, + ) -> asyncio.Task: + """ + Run a function periodically in the background. + + Args: + period: The period in seconds between each call to the function. + func: The function to run. No parameters will be provided, + use :meth:`functools.partial` if you need to pass parameters. + run_task_in_background: If ``True``, the function will be run in a background task. + If ``False`` (the default), the loop will wait for the task to return before + sleeping for the next period. + catch_errors: Whether the scheduler should catch and log any errors. + If ``False``, errors will be raised, and the caller must await the returned task + to find errors. This parameter has no effect if ``run_task_in_background`` + is ``True``. + + Returns: + The asyncio task object representing the background loop. + """ + task = asyncio.create_task( + self._call_periodically( + period, + func, + caller=self._find_caller(), + catch_errors=catch_errors, + run_task_in_background=run_task_in_background, + ) + ) + self._register_task(task) + return task + + def run_later( + self, delay: float | int, coro: Awaitable, catch_errors: bool = True + ) -> asyncio.Task: + """ + Run a coroutine after a delay. + + Examples: + >>> self.sched.run_later(5, self.async_task(meow=True)) + + Args: + delay: The delay in seconds to await the coroutine after. + coro: The coroutine to await. + catch_errors: Whether the scheduler should catch and log any errors. + If ``False``, errors will be raised, and the caller must await the returned task + to find errors. + + Returns: + The asyncio task object representing the scheduled task. + """ + task = asyncio.create_task( + self._call_with_delay( + delay, coro, caller=self._find_caller(), catch_errors=catch_errors + ) + ) + self._register_task(task) + return task + + def _register_task(self, task: asyncio.Task) -> None: + self.tasks.add(task) + task.add_done_callback(self.tasks.discard) + + async def _call_periodically( + self, + period: float | int, + func: Callable[[], Awaitable], + caller: str, + catch_errors: bool, + run_task_in_background: bool, + ) -> None: + while True: + try: + await asyncio.sleep(period) + if run_task_in_background: + self._register_task( + asyncio.create_task(self._call_periodically_background(func(), caller)) + ) + else: + await func() + except asyncio.CancelledError: + raise + except Exception: + if catch_errors: + self.log.exception(f"Uncaught error in background loop (created in {caller})") + else: + raise + + async def _call_periodically_background(self, coro: Awaitable, caller: str) -> None: + try: + await coro + except asyncio.CancelledError: + raise + except Exception: + self.log.exception(f"Uncaught error in background loop subtask (created in {caller})") + + async def _call_with_delay( + self, delay: float | int, coro: Awaitable, caller: str, catch_errors: bool + ) -> None: + try: + await asyncio.sleep(delay) + await coro + except asyncio.CancelledError: + raise + except Exception: + if catch_errors: + self.log.exception(f"Uncaught error in scheduled task (created in {caller})") + else: + raise + + def stop(self) -> None: + """ + Stop all scheduled tasks and background loops. + """ + for task in self.tasks: + task.cancel(msg="Scheduler stopped")