# Copyright 2023 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import TYPE_CHECKING, Any, Dict, List, Optional from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, make_in_list_sql_clause, ) from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus from synapse.util import json_encoder if TYPE_CHECKING: from synapse.server import HomeServer class TaskSchedulerWorkerStore(SQLBaseStore): def __init__( self, database: DatabasePool, db_conn: LoggingDatabaseConnection, hs: "HomeServer", ): super().__init__(database, db_conn, hs) @staticmethod def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: row["status"] = TaskStatus(row["status"]) if row["params"] is not None: row["params"] = db_to_json(row["params"]) if row["result"] is not None: row["result"] = db_to_json(row["result"]) return ScheduledTask(**row) async def get_scheduled_tasks( self, *, actions: Optional[List[str]] = None, resource_id: Optional[str] = None, statuses: Optional[List[TaskStatus]] = None, max_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of scheduled tasks from the DB. Args: actions: Limit the returned tasks to those specific action names resource_id: Limit the returned tasks to the specific resource id, if specified statuses: Limit the returned tasks to the specific statuses max_timestamp: Limit the returned tasks to the ones that have a timestamp inferior to the specified one Returns: a list of `ScheduledTask`, ordered by increasing timestamps """ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: clauses: List[str] = [] args: List[Any] = [] if resource_id: clauses.append("resource_id = ?") args.append(resource_id) if actions is not None: clause, temp_args = make_in_list_sql_clause( txn.database_engine, "action", actions ) clauses.append(clause) args.extend(temp_args) if statuses is not None: clause, temp_args = make_in_list_sql_clause( txn.database_engine, "status", statuses ) clauses.append(clause) args.extend(temp_args) if max_timestamp is not None: clauses.append("timestamp <= ?") args.append(max_timestamp) sql = "SELECT * FROM scheduled_tasks" if clauses: sql = sql + " WHERE " + " AND ".join(clauses) sql = sql + " ORDER BY timestamp" txn.execute(sql, args) return self.db_pool.cursor_to_dict(txn) rows = await self.db_pool.runInteraction( "get_scheduled_tasks", get_scheduled_tasks_txn ) return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] async def insert_scheduled_task(self, task: ScheduledTask) -> None: """Insert a specified `ScheduledTask` in the DB. Args: task: the `ScheduledTask` to insert """ await self.db_pool.simple_insert( "scheduled_tasks", { "id": task.id, "action": task.action, "status": task.status, "timestamp": task.timestamp, "resource_id": task.resource_id, "params": None if task.params is None else json_encoder.encode(task.params), "result": None if task.result is None else json_encoder.encode(task.result), "error": task.error, }, desc="insert_scheduled_task", ) async def update_scheduled_task( self, id: str, timestamp: int, *, status: Optional[TaskStatus] = None, result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: """Update a scheduled task in the DB with some new value(s). Args: id: id of the `ScheduledTask` to update timestamp: new timestamp of the task status: new status of the task result: new result of the task error: new error of the task Returns: `False` if no matching row was found, `True` otherwise """ updatevalues: JsonDict = {"timestamp": timestamp} if status is not None: updatevalues["status"] = status if result is not None: updatevalues["result"] = json_encoder.encode(result) if error is not None: updatevalues["error"] = error nb_rows = await self.db_pool.simple_update( "scheduled_tasks", {"id": id}, updatevalues, desc="update_scheduled_task", ) return nb_rows > 0 async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]: """Get a specific `ScheduledTask` from its id. Args: id: the id of the task to retrieve Returns: the task if available, `None` otherwise """ row = await self.db_pool.simple_select_one( table="scheduled_tasks", keyvalues={"id": id}, retcols=( "id", "action", "status", "timestamp", "resource_id", "params", "result", "error", ), allow_none=True, desc="get_scheduled_task", ) return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None async def delete_scheduled_task(self, id: str) -> None: """Delete a specific task from its id. Args: id: the id of the task to delete """ await self.db_pool.simple_delete( "scheduled_tasks", keyvalues={"id": id}, desc="delete_scheduled_task", )