From d3b535f07018d40721fe668b6bd793ab0ced2a29 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 22 Sep 2020 15:17:41 +0100 Subject: [PATCH 1/5] Add option to forget sessions when inserting a new session --- pantalaimon/client.py | 6 ++---- pantalaimon/config.py | 7 +++++-- pantalaimon/daemon.py | 1 + pantalaimon/store.py | 20 ++++++++++++++++++++ 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/pantalaimon/client.py b/pantalaimon/client.py index b2d2a72..7d9d7a1 100644 --- a/pantalaimon/client.py +++ b/pantalaimon/client.py @@ -18,7 +18,6 @@ from collections import defaultdict from pprint import pformat from typing import Any, Dict, Optional from urllib.parse import urlparse - from aiohttp.client_exceptions import ClientConnectionError from jsonschema import Draft4Validator, FormatChecker, validators from playhouse.sqliteq import SqliteQueueDatabase @@ -50,10 +49,9 @@ from nio import ( ) from nio.crypto import Sas from nio.store import SqliteStore - from pantalaimon.index import INDEXING_ENABLED from pantalaimon.log import logger -from pantalaimon.store import FetchTask, MediaInfo +from pantalaimon.store import FetchTask, MediaInfo, PanSqliteStore from pantalaimon.thread_messages import ( DaemonResponse, InviteSasSignal, @@ -162,7 +160,7 @@ class PanClient(AsyncClient): media_info=None, ): config = config or AsyncClientConfig( - store=store_class or SqliteStore, store_name="pan.db" + store=store_class or PanSqliteStore, store_name="pan.db" ) super().__init__(homeserver, user_id, device_id, store_path, config, ssl, proxy) diff --git a/pantalaimon/config.py b/pantalaimon/config.py index 2a01714..65dc62b 100644 --- a/pantalaimon/config.py +++ b/pantalaimon/config.py @@ -121,6 +121,8 @@ class ServerConfig: the room history. history_fetch_delay (int): The delay between room history fetching requests in seconds. + store_forgetful (bool): Enable or disable discarding of previous sessions + from the store. """ name = attr.ib(type=str) @@ -137,7 +139,7 @@ class ServerConfig: index_encrypted_only = attr.ib(type=bool, default=True) indexing_batch_size = attr.ib(type=int, default=100) history_fetch_delay = attr.ib(type=int, default=3) - + store_forgetful = attr.ib(type=bool, default=False) @attr.s class PanConfig: @@ -201,7 +203,7 @@ class PanConfig: proxy = section.geturl("Proxy") search_requests = section.getboolean("SearchRequests") index_encrypted_only = section.getboolean("IndexEncryptedOnly") - + store_forgetful = config["Default"].getboolean("StoreForgetful") indexing_batch_size = section.getint("IndexingBatchSize") if not 1 < indexing_batch_size <= 1000: @@ -243,6 +245,7 @@ class PanConfig: index_encrypted_only, indexing_batch_size, history_fetch_delay / 1000, + store_forgetful, ) self.servers[section_name] = server_conf diff --git a/pantalaimon/daemon.py b/pantalaimon/daemon.py index 9efee6b..720ca81 100755 --- a/pantalaimon/daemon.py +++ b/pantalaimon/daemon.py @@ -150,6 +150,7 @@ class ProxyDaemon: pan_client.user_id = user_id pan_client.access_token = token pan_client.load_store() + pan_client.store.forgetful = self.conf.store_forgetful self.pan_clients[user_id] = pan_client loop.create_task( diff --git a/pantalaimon/store.py b/pantalaimon/store.py index 910df64..4a82d06 100644 --- a/pantalaimon/store.py +++ b/pantalaimon/store.py @@ -25,6 +25,9 @@ from nio.store import ( DeviceTrustState, use_database, use_database_atomic, + SqliteStore, + MegolmInboundSessions, + ForwardedChains ) from peewee import SQL, DoesNotExist, ForeignKeyField, Model, SqliteDatabase, TextField from cachetools import LRUCache @@ -372,3 +375,20 @@ class PanStore: store[account.user_id] = device_store return store + +class PanSqliteStore(SqliteStore): + forgetful = False + + @use_database + def save_inbound_group_session(self, session): + """Save the provided Megolm inbound group session to the database. + Args: + session (InboundGroupSession): The session to save. + """ + # Delete previous sessions + if self.forgetful: + MegolmInboundSessions.delete().where( + (MegolmInboundSessions.sender_key == session.sender_key) | + (MegolmInboundSessions.room_id == session.room_id) + ).execute() + super().save_inbound_group_session(session) \ No newline at end of file From 7446cfc084d195ad3a9736d68f00cd6953f075eb Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Thu, 14 Jan 2021 17:57:20 +0000 Subject: [PATCH 2/5] Add supporting options for disallowing the sync on startup and stopping the sync after some time --- pantalaimon/client.py | 36 +++++++++++++++++++++++++++++++++++- pantalaimon/config.py | 13 +++++++++++++ pantalaimon/daemon.py | 8 +++++--- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/pantalaimon/client.py b/pantalaimon/client.py index b2d2a72..01b3295 100644 --- a/pantalaimon/client.py +++ b/pantalaimon/client.py @@ -14,6 +14,7 @@ import asyncio import os +import time from collections import defaultdict from pprint import pformat from typing import Any, Dict, Optional @@ -177,6 +178,7 @@ class PanClient(AsyncClient): self.pan_store = pan_store self.pan_conf = pan_conf self.media_info = media_info + self.last_sync_request_ts = 0 if INDEXING_ENABLED: logger.info("Indexing enabled.") @@ -199,6 +201,7 @@ class PanClient(AsyncClient): self.send_semaphores = defaultdict(asyncio.Semaphore) self.send_decision_queues = dict() # type: asyncio.Queue self.last_sync_token = None + self.last_sync_task = None self.history_fetcher_task = None self.history_fetch_queue = asyncio.Queue() @@ -523,6 +526,22 @@ class PanClient(AsyncClient): ) await self.send_update_device(device) + def ensure_sync_running(self, loop_sleep_time=100): + self.last_sync_request_ts = int(time.time()) + if self.task is None: + self.start_loop(loop_sleep_time) + + async def can_stop_sync(self): + try: + while True: + await asyncio.sleep(self.pan_conf.sync_stop_after) + if time.time() - self.last_sync_request_ts > self.pan_conf.sync_stop_after: + await self.loop_stop() + break + except (asyncio.CancelledError, KeyboardInterrupt): + return + + def start_loop(self, loop_sleep_time=100): """Start a loop that runs forever and keeps on syncing with the server. @@ -541,6 +560,7 @@ class PanClient(AsyncClient): sync_filter = {"room": {"state": {"lazy_load_members": True}}} next_batch = self.pan_store.load_token(self.server_name, self.user_id) self.last_sync_token = next_batch + self.last_sync_request_ts = int(time.time()) # We don't store any room state so initial sync needs to be with the # full_state parameter. Subsequent ones are normal. @@ -555,6 +575,10 @@ class PanClient(AsyncClient): ) self.task = task + if self.pan_conf.sync_stop_after > 0: + self.last_sync_task = loop.create_task(self.can_stop_sync()) + + return task async def start_sas(self, message, device): @@ -774,7 +798,7 @@ class PanClient(AsyncClient): async def loop_stop(self): """Stop the client loop.""" - logger.info("Stopping the sync loop") + logger.info(f"Stopping the sync loop for {self.user_id}") if self.task and not self.task.done(): self.task.cancel() @@ -786,6 +810,16 @@ class PanClient(AsyncClient): self.task = None + if self.last_sync_task and not self.last_sync_task.done(): + self.last_sync_task.cancel() + + try: + await self.last_sync_task + except KeyboardInterrupt: + pass + + self.last_sync_task = None + if self.history_fetcher_task and not self.history_fetcher_task.done(): self.history_fetcher_task.cancel() diff --git a/pantalaimon/config.py b/pantalaimon/config.py index 2a01714..47ca53d 100644 --- a/pantalaimon/config.py +++ b/pantalaimon/config.py @@ -39,6 +39,8 @@ class PanConfigParser(configparser.ConfigParser): "IndexingBatchSize": "100", "HistoryFetchDelay": "3000", "DebugEncryption": "False", + "SyncOnStartup": "True", + "StopSyncingTimeout": "0" }, converters={ "address": parse_address, @@ -121,6 +123,10 @@ class ServerConfig: the room history. history_fetch_delay (int): The delay between room history fetching requests in seconds. + sync_on_startup (bool): Begin syncing all accounts registered with + pantalaimon on startup. + sync_stop_after (int): The number of seconds to wait since the + client has requested a /sync, before stopping a sync. """ name = attr.ib(type=str) @@ -137,6 +143,8 @@ class ServerConfig: index_encrypted_only = attr.ib(type=bool, default=True) indexing_batch_size = attr.ib(type=int, default=100) history_fetch_delay = attr.ib(type=int, default=3) + sync_on_startup = attr.ib(type=bool, default=True) + sync_stop_after = attr.ib(type=int, default=0) @attr.s @@ -204,6 +212,9 @@ class PanConfig: indexing_batch_size = section.getint("IndexingBatchSize") + sync_on_startup = section.getboolean("SyncOnStartup") + sync_stop_after = section.getint("SyncStopAfter") + if not 1 < indexing_batch_size <= 1000: raise PanConfigError( "The indexing batch size needs to be " @@ -243,6 +254,8 @@ class PanConfig: index_encrypted_only, indexing_batch_size, history_fetch_delay / 1000, + sync_on_startup, + sync_stop_after, ) self.servers[section_name] = server_conf diff --git a/pantalaimon/daemon.py b/pantalaimon/daemon.py index 55b67a2..e98512b 100755 --- a/pantalaimon/daemon.py +++ b/pantalaimon/daemon.py @@ -15,12 +15,12 @@ import asyncio import json import os +import time import urllib.parse import concurrent.futures from json import JSONDecodeError from typing import Any, Dict from uuid import uuid4 - import aiohttp import attr import keyring @@ -159,8 +159,8 @@ class ProxyDaemon: ) loop.create_task(pan_client.send_update_devices(pan_client.device_store)) - - pan_client.start_loop() + if self.conf.sync_on_startup: + pan_client.start_loop() async def _find_client(self, access_token): client_info = self.client_info.get(access_token, None) @@ -736,6 +736,8 @@ class ProxyDaemon: if not client: return self._unknown_token + client.ensure_sync_running() + sync_filter = request.query.get("filter", None) query = CIMultiDict(request.query) From 60e6692c4f1887787b394508f6395d7ea241b351 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Fri, 16 Apr 2021 11:43:14 +0100 Subject: [PATCH 3/5] Changes --- pantalaimon/client.py | 4 ++-- pantalaimon/config.py | 12 ++++++------ pantalaimon/daemon.py | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pantalaimon/client.py b/pantalaimon/client.py index 01b3295..4e0ea85 100644 --- a/pantalaimon/client.py +++ b/pantalaimon/client.py @@ -575,8 +575,8 @@ class PanClient(AsyncClient): ) self.task = task - if self.pan_conf.sync_stop_after > 0: - self.last_sync_task = loop.create_task(self.can_stop_sync()) + # if self.pan_conf.sync_stop_after > 0: + # self.last_sync_task = loop.create_task(self.can_stop_sync()) return task diff --git a/pantalaimon/config.py b/pantalaimon/config.py index 47ca53d..0d09d5c 100644 --- a/pantalaimon/config.py +++ b/pantalaimon/config.py @@ -39,8 +39,8 @@ class PanConfigParser(configparser.ConfigParser): "IndexingBatchSize": "100", "HistoryFetchDelay": "3000", "DebugEncryption": "False", - "SyncOnStartup": "True", - "StopSyncingTimeout": "0" + "SyncOnStartup": "False", + "StopSyncingTimeout": "600" }, converters={ "address": parse_address, @@ -143,8 +143,8 @@ class ServerConfig: index_encrypted_only = attr.ib(type=bool, default=True) indexing_batch_size = attr.ib(type=int, default=100) history_fetch_delay = attr.ib(type=int, default=3) - sync_on_startup = attr.ib(type=bool, default=True) - sync_stop_after = attr.ib(type=int, default=0) + sync_on_startup = attr.ib(type=bool, default=False) + sync_stop_after = attr.ib(type=int, default=600) @attr.s @@ -212,8 +212,8 @@ class PanConfig: indexing_batch_size = section.getint("IndexingBatchSize") - sync_on_startup = section.getboolean("SyncOnStartup") - sync_stop_after = section.getint("SyncStopAfter") + sync_on_startup = False #section.getboolean("SyncOnStartup") + sync_stop_after = 600 #section.getint("SyncStopAfter") if not 1 < indexing_batch_size <= 1000: raise PanConfigError( diff --git a/pantalaimon/daemon.py b/pantalaimon/daemon.py index 07384c2..b814614 100755 --- a/pantalaimon/daemon.py +++ b/pantalaimon/daemon.py @@ -924,8 +924,8 @@ class ProxyDaemon: # The room isn't encrypted just forward the message. if not encrypt: - content_msgtype = content["msgtype"] - if content_msgtype in ["m.image", "m.video", "m.audio", "m.file"] or msgtype == "m.room.avatar": + content_msgtype = content.get("msgtype") + if content_msgtype is not None and content_msgtype in ["m.image", "m.video", "m.audio", "m.file"] or msgtype == "m.room.avatar": try: content = await self._map_decrypted_uri("url", content, request, client) return await self.forward_to_web(request, data=json.dumps(content), token=client.access_token) @@ -940,7 +940,7 @@ class ProxyDaemon: async def _send(ignore_unverified=False): try: - content_msgtype = content["msgtype"] + content_msgtype = content.get("msgtype") if content_msgtype in ["m.image", "m.video", "m.audio", "m.file"] or msgtype == "m.room.avatar": upload_info, media_info = self._get_upload_and_media_info("url", content) if not upload_info or not media_info: From 93db4a02ab3aad61278097975a5682848ad5086a Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Fri, 14 May 2021 14:42:49 +0100 Subject: [PATCH 4/5] default to forgetful --- pantalaimon/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pantalaimon/config.py b/pantalaimon/config.py index 5421cf3..ca29eae 100644 --- a/pantalaimon/config.py +++ b/pantalaimon/config.py @@ -147,7 +147,7 @@ class ServerConfig: history_fetch_delay = attr.ib(type=int, default=3) sync_on_startup = attr.ib(type=bool, default=False) sync_stop_after = attr.ib(type=int, default=600) - store_forgetful = attr.ib(type=bool, default=False) + store_forgetful = attr.ib(type=bool, default=True) @attr.s class PanConfig: From e590cc9dbf9c4ca023b41fa25873ba34ecfd9d9d Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Thu, 3 Jun 2021 12:29:35 +0100 Subject: [PATCH 5/5] syntax fix --- pantalaimon/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pantalaimon/config.py b/pantalaimon/config.py index 791c03c..ce8f127 100644 --- a/pantalaimon/config.py +++ b/pantalaimon/config.py @@ -40,7 +40,7 @@ class PanConfigParser(configparser.ConfigParser): "HistoryFetchDelay": "3000", "DebugEncryption": "False", "SyncOnStartup": "False", - "StopSyncingTimeout": "600" + "StopSyncingTimeout": "600", "DropOldKeys": "False", }, converters={