Track device IDs for pushers (#13831)

Second half of the MSC3881 implementation
This commit is contained in:
Brendan Abolivier 2022-09-21 16:31:53 +01:00 committed by GitHub
parent 0fd2f2d460
commit ccca14140a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 154 additions and 10 deletions

View file

@ -124,7 +124,7 @@ class PusherWorkerStore(SQLBaseStore):
id, user_name, access_token, profile_tag, kind, app_id,
app_display_name, device_display_name, pushkey, ts, lang, data,
last_stream_ordering, last_success, failing_since,
COALESCE(enabled, TRUE) AS enabled
COALESCE(enabled, TRUE) AS enabled, device_id
FROM pushers
"""
@ -477,7 +477,74 @@ class PusherWorkerStore(SQLBaseStore):
return number_deleted
class PusherStore(PusherWorkerStore):
class PusherBackgroundUpdatesStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_update_handler(
"set_device_id_for_pushers", self._set_device_id_for_pushers
)
async def _set_device_id_for_pushers(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update to populate the device_id column of the pushers table."""
last_pusher_id = progress.get("pusher_id", 0)
def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
txn.execute(
"""
SELECT p.id, at.device_id
FROM pushers AS p
INNER JOIN access_tokens AS at
ON p.access_token = at.id
WHERE
p.access_token IS NOT NULL
AND at.device_id IS NOT NULL
AND p.id > ?
ORDER BY p.id
LIMIT ?
""",
(last_pusher_id, batch_size),
)
rows = self.db_pool.cursor_to_dict(txn)
if len(rows) == 0:
return 0
self.db_pool.simple_update_many_txn(
txn=txn,
table="pushers",
key_names=("id",),
key_values=[(row["id"],) for row in rows],
value_names=("device_id",),
value_values=[(row["device_id"],) for row in rows],
)
self.db_pool.updates._background_update_progress_txn(
txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["id"]}
)
return len(rows)
nb_processed = await self.db_pool.runInteraction(
"set_device_id_for_pushers", set_device_id_for_pushers_txn
)
if nb_processed < batch_size:
await self.db_pool.updates._end_background_update(
"set_device_id_for_pushers"
)
return nb_processed
class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
def get_pushers_stream_token(self) -> int:
return self._pushers_id_gen.get_current_token()
@ -496,6 +563,7 @@ class PusherStore(PusherWorkerStore):
last_stream_ordering: int,
profile_tag: str = "",
enabled: bool = True,
device_id: Optional[str] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
@ -515,6 +583,7 @@ class PusherStore(PusherWorkerStore):
"profile_tag": profile_tag,
"id": stream_id,
"enabled": enabled,
"device_id": device_id,
},
desc="add_pusher",
lock=False,