client: Store the history fetcher tasks to the db.

This commit is contained in:
Damir Jelić 2019-06-12 15:39:08 +02:00
parent 1ad2a3af28
commit b4e60b603a
3 changed files with 110 additions and 18 deletions

View File

@ -19,7 +19,6 @@ from functools import partial
from pprint import pformat from pprint import pformat
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import attr
from aiohttp.client_exceptions import ClientConnectionError from aiohttp.client_exceptions import ClientConnectionError
from jsonschema import Draft4Validator, FormatChecker, validators from jsonschema import Draft4Validator, FormatChecker, validators
from nio import (AsyncClient, ClientConfig, EncryptionError, KeysQueryResponse, from nio import (AsyncClient, ClientConfig, EncryptionError, KeysQueryResponse,
@ -33,6 +32,7 @@ from nio.store import SqliteStore
from pantalaimon.index import Index from pantalaimon.index import Index
from pantalaimon.log import logger from pantalaimon.log import logger
from pantalaimon.store import FetchTask
from pantalaimon.thread_messages import (DaemonResponse, InviteSasSignal, from pantalaimon.thread_messages import (DaemonResponse, InviteSasSignal,
SasDoneSignal, ShowSasSignal, SasDoneSignal, ShowSasSignal,
UpdateDevicesMessage) UpdateDevicesMessage)
@ -104,12 +104,6 @@ class InvalidOrderByError(Exception):
pass pass
@attr.s
class FetchTask:
room_id = attr.ib(type=str)
token = attr.ib(type=str)
class PanClient(AsyncClient): class PanClient(AsyncClient):
"""A wrapper class around a nio AsyncClient extending its functionality.""" """A wrapper class around a nio AsyncClient extending its functionality."""
@ -216,10 +210,23 @@ class PanClient(AsyncClient):
message = UpdateDevicesMessage() message = UpdateDevicesMessage()
await self.queue.put(message) await self.queue.put(message)
def delete_fetcher_task(self, task):
self.pan_store.delete_fetcher_task(
self.server_name,
self.user_id,
task
)
async def fetcher_loop(self): async def fetcher_loop(self):
for t in self.pan_store.load_fetcher_tasks(
self.server_name,
self.user_id
):
await self.history_fetch_queue.put(t)
while True: while True:
try: try:
await asyncio.sleep(5) await asyncio.sleep(3)
fetch_task = await self.history_fetch_queue.get() fetch_task = await self.history_fetch_queue.get()
@ -228,6 +235,7 @@ class PanClient(AsyncClient):
except KeyError: except KeyError:
# The room is missing from our client, we probably left the # The room is missing from our client, we probably left the
# room. # room.
self.delete_fetcher_task(fetch_task)
continue continue
try: try:
@ -239,8 +247,9 @@ class PanClient(AsyncClient):
except ClientConnectionError: except ClientConnectionError:
self.history_fetch_queue.put(fetch_task) self.history_fetch_queue.put(fetch_task)
# The chunk was empyt, we're at the start of the timeline. # The chunk was empty, we're at the start of the timeline.
if not response.chunk: if not response.chunk:
self.delete_fetcher_task(fetch_task)
continue continue
for event in response.chunk: for event in response.chunk:
@ -259,10 +268,13 @@ class PanClient(AsyncClient):
else: else:
# There may be even more events to fetch, add a new task to # There may be even more events to fetch, add a new task to
# the queue. # the queue.
await self.history_fetch_queue.put( task = FetchTask(room.room_id, response.end)
FetchTask(room.room_id, response.end) self.pan_store.save_fetcher_task(self.server_name,
) self.user_id, task)
except asyncio.CancelledError: await self.history_fetch_queue.put(task)
self.delete_fetcher_task(fetch_task)
except (asyncio.CancelledError, KeyboardInterrupt):
return return
async def sync_tasks(self, response): async def sync_tasks(self, response):
@ -290,9 +302,11 @@ class PanClient(AsyncClient):
"room for history fetching.".format( "room for history fetching.".format(
self.rooms[room_id].display_name self.rooms[room_id].display_name
)) ))
await self.history_fetch_queue.put( task = FetchTask(room_id, room.timeline.prev_batch)
FetchTask(room_id, room.timeline.prev_batch) self.pan_store.save_fetcher_task(self.server_name,
) self.user_id, task)
await self.history_fetch_queue.put(task)
async def keys_query_cb(self, response): async def keys_query_cb(self, response):
await self.send_update_devcies() await self.send_update_devcies()
@ -593,6 +607,8 @@ class PanClient(AsyncClient):
await self.history_fetcher_task await self.history_fetcher_task
self.history_fetcher_task = None self.history_fetcher_task = None
self.history_fetch_queue = asyncio.Queue()
def pan_decrypt_event( def pan_decrypt_event(
self, self,
event_dict, event_dict,

View File

@ -26,6 +26,12 @@ from peewee import (SQL, DateTimeField, DoesNotExist, ForeignKeyField, Model,
SqliteDatabase, TextField) SqliteDatabase, TextField)
@attr.s
class FetchTask:
room_id = attr.ib(type=str)
token = attr.ib(type=str)
class DictField(TextField): class DictField(TextField):
def python_value(self, value): # pragma: no cover def python_value(self, value): # pragma: no cover
return json.loads(value) return json.loads(value)
@ -109,6 +115,18 @@ class PanSyncTokens(Model):
constraints = [SQL("UNIQUE(user_id)")] constraints = [SQL("UNIQUE(user_id)")]
class PanFetcherTasks(Model):
user = ForeignKeyField(
model=ServerUsers,
column_name="user_id",
backref="fetcher_tasks")
room_id = TextField()
token = TextField()
class Meta:
constraints = [SQL("UNIQUE(user_id, room_id, token)")]
@attr.s @attr.s
class ClientInfo: class ClientInfo:
user_id = attr.ib(type=str) user_id = attr.ib(type=str)
@ -131,7 +149,8 @@ class PanStore:
Profile, Profile,
Event, Event,
UserMessages, UserMessages,
PanSyncTokens PanSyncTokens,
PanFetcherTasks
] ]
def __attrs_post_init__(self): def __attrs_post_init__(self):
@ -165,6 +184,38 @@ class PanStore:
except DoesNotExist: except DoesNotExist:
return None return None
@use_database
def save_fetcher_task(self, server, pan_user, task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.replace(
user=user,
room_id=task.room_id,
token=task.token
).execute()
def load_fetcher_tasks(self, server, pan_user):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
tasks = []
for t in user.fetcher_tasks:
tasks.append(FetchTask(t.room_id, t.token))
return tasks
def delete_fetcher_task(self, server, pan_user, task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == task.room_id,
PanFetcherTasks.token == task.token
).execute()
@use_database @use_database
def save_token(self, server, pan_user, token): def save_token(self, server, pan_user, token):
# type: (str, str, str) -> None # type: (str, str, str) -> None

View File

@ -4,6 +4,7 @@ from nio import RoomMessage
from conftest import faker from conftest import faker
from pantalaimon.index import Index from pantalaimon.index import Index
from pantalaimon.store import FetchTask
TEST_ROOM = "!SVkFJHzfwvuaIEawgC:localhost" TEST_ROOM = "!SVkFJHzfwvuaIEawgC:localhost"
TEST_ROOM2 = "!testroom:localhost" TEST_ROOM2 = "!testroom:localhost"
@ -125,4 +126,28 @@ class TestClass(object):
assert not panstore.load_token("example", user) assert not panstore.load_token("example", user)
panstore.save_token("example", user, "abc123") panstore.save_token("example", user, "abc123")
assert "abc123" == panstore.load_token("example", user) assert panstore.load_token("example", user) == "abc123"
def test_fetcher_tasks(self, panstore_with_users):
panstore = panstore_with_users
accounts = panstore.load_all_users()
user, _ = accounts[0]
task = FetchTask(TEST_ROOM, "abc1234")
task2 = FetchTask(TEST_ROOM2, "abc1234")
assert not panstore.load_fetcher_tasks("example", user)
panstore.save_fetcher_task("example", user, task)
panstore.save_fetcher_task("example", user, task2)
tasks = panstore.load_fetcher_tasks("example", user)
assert task in tasks
assert task2 in tasks
panstore.delete_fetcher_task("example", user, task)
tasks = panstore.load_fetcher_tasks("example", user)
assert task not in tasks
assert task2 in tasks