mirror of
https://github.com/matrix-org/pantalaimon.git
synced 2025-01-05 21:01:08 -05:00
client: Fetch old room messages if there was a limited room timeline.
This patch adds the ability to fetch the whole room history. Note that this is still incomplete as it not fetch the whole history if it gets interrupted. Another deficiency is that it always tries to start fetching room history after a restart even though we know that it doesn't need to do so.
This commit is contained in:
parent
c7ca5d9851
commit
8c3cfbc0dd
@ -19,6 +19,7 @@ from functools import partial
|
||||
from pprint import pformat
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import attr
|
||||
from aiohttp.client_exceptions import ClientConnectionError
|
||||
from jsonschema import Draft4Validator, FormatChecker, validators
|
||||
from nio import (AsyncClient, ClientConfig, EncryptionError, KeysQueryResponse,
|
||||
@ -103,6 +104,12 @@ class InvalidOrderByError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@attr.s
|
||||
class FetchTask:
|
||||
room_id = attr.ib(type=str)
|
||||
token = attr.ib(type=str)
|
||||
|
||||
|
||||
class PanClient(AsyncClient):
|
||||
"""A wrapper class around a nio AsyncClient extending its functionality."""
|
||||
|
||||
@ -139,6 +146,9 @@ class PanClient(AsyncClient):
|
||||
self.send_semaphores = defaultdict(asyncio.Semaphore)
|
||||
self.send_decision_queues = dict() # type: asyncio.Queue
|
||||
|
||||
self.history_fetcher_task = None
|
||||
self.history_fetch_queue = asyncio.Queue()
|
||||
|
||||
self.add_to_device_callback(
|
||||
self.key_verification_cb,
|
||||
KeyVerificationEvent
|
||||
@ -177,8 +187,12 @@ class PanClient(AsyncClient):
|
||||
display_name,
|
||||
avatar_url
|
||||
)
|
||||
|
||||
if column_id:
|
||||
self.index.add_event(column_id, event, room.room_id)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@property
|
||||
def unable_to_decrypt(self):
|
||||
@ -200,6 +214,55 @@ class PanClient(AsyncClient):
|
||||
message = UpdateDevicesMessage()
|
||||
await self.queue.put(message)
|
||||
|
||||
async def fetcher_loop(self):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(5)
|
||||
|
||||
fetch_task = await self.history_fetch_queue.get()
|
||||
|
||||
try:
|
||||
room = self.rooms[fetch_task.room_id]
|
||||
except KeyError:
|
||||
# The room is missing from our client, we probably left the
|
||||
# room.
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.debug("Fetching room history for {}".format(
|
||||
room.display_name
|
||||
))
|
||||
response = await self.room_messages(fetch_task.room_id,
|
||||
fetch_task.token)
|
||||
except ClientConnectionError:
|
||||
self.history_fetch_queue.put(fetch_task)
|
||||
|
||||
# The chunk was empyt, we're at the start of the timeline.
|
||||
if not response.chunk:
|
||||
continue
|
||||
|
||||
for event in response.chunk:
|
||||
if not isinstance(event, (
|
||||
RoomMessageText,
|
||||
RoomMessageMedia,
|
||||
RoomEncryptedMedia,
|
||||
RoomTopicEvent,
|
||||
RoomNameEvent
|
||||
)):
|
||||
continue
|
||||
|
||||
if not self.store_message_cb(room, event):
|
||||
# The event was already in our store, we catched up.
|
||||
break
|
||||
else:
|
||||
# There may be even more events to fetch, add a new task to
|
||||
# the queue.
|
||||
await self.history_fetch_queue.put(
|
||||
FetchTask(room.room_id, response.end)
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
async def sync_tasks(self, response):
|
||||
try:
|
||||
await asyncio.gather(*self.key_verificatins_tasks)
|
||||
@ -213,6 +276,16 @@ class PanClient(AsyncClient):
|
||||
|
||||
self.index.commit()
|
||||
|
||||
for room_id, room in response.rooms.join.items():
|
||||
if room.timeline.limited:
|
||||
logger.info("Room {} had a limited timeline, queueing "
|
||||
"room for history fetching.".format(
|
||||
self.rooms[room_id].display_name
|
||||
))
|
||||
await self.history_fetch_queue.put(
|
||||
FetchTask(room_id, room.timeline.prev_batch)
|
||||
)
|
||||
|
||||
async def keys_query_cb(self, response):
|
||||
await self.send_update_devcies()
|
||||
|
||||
@ -309,6 +382,8 @@ class PanClient(AsyncClient):
|
||||
|
||||
task = loop.create_task(self.sync_forever(timeout, sync_filter))
|
||||
self.task = task
|
||||
self.history_fetcher_task = loop.create_task(self.fetcher_loop())
|
||||
|
||||
return task
|
||||
|
||||
async def start_sas(self, message, device):
|
||||
@ -466,11 +541,15 @@ class PanClient(AsyncClient):
|
||||
"""Stop the client loop."""
|
||||
logger.info("Stopping the sync loop")
|
||||
|
||||
if not self.task or self.task.done():
|
||||
return
|
||||
if self.task and not self.task.done():
|
||||
self.task.cancel()
|
||||
await self.task
|
||||
self.task = None
|
||||
|
||||
self.task.cancel()
|
||||
await self.task
|
||||
if self.history_fetcher_task and not self.history_fetcher_task.done():
|
||||
self.history_fetcher_task.cancel()
|
||||
await self.history_fetcher_task
|
||||
self.history_fetcher_task = None
|
||||
|
||||
def pan_decrypt_event(
|
||||
self,
|
||||
|
Loading…
Reference in New Issue
Block a user