mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-12-17 04:24:37 -05:00
Improve performance of getting unread counts in rooms (#13119)
This commit is contained in:
parent
cdc0259449
commit
92a0c18ef0
1
changelog.d/13119.misc
Normal file
1
changelog.d/13119.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.
|
@ -270,6 +270,9 @@ class MockHomeserver:
|
|||||||
def get_instance_name(self) -> str:
|
def get_instance_name(self) -> str:
|
||||||
return "master"
|
return "master"
|
||||||
|
|
||||||
|
def should_send_federation(self) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class Porter:
|
class Porter:
|
||||||
def __init__(
|
def __init__(
|
||||||
|
@ -87,7 +87,6 @@ class DataStore(
|
|||||||
RoomStore,
|
RoomStore,
|
||||||
RoomBatchStore,
|
RoomBatchStore,
|
||||||
RegistrationStore,
|
RegistrationStore,
|
||||||
StreamWorkerStore,
|
|
||||||
ProfileStore,
|
ProfileStore,
|
||||||
PresenceStore,
|
PresenceStore,
|
||||||
TransactionWorkerStore,
|
TransactionWorkerStore,
|
||||||
@ -112,6 +111,7 @@ class DataStore(
|
|||||||
SearchStore,
|
SearchStore,
|
||||||
TagsStore,
|
TagsStore,
|
||||||
AccountDataStore,
|
AccountDataStore,
|
||||||
|
StreamWorkerStore,
|
||||||
OpenIdStore,
|
OpenIdStore,
|
||||||
ClientIpWorkerStore,
|
ClientIpWorkerStore,
|
||||||
DeviceStore,
|
DeviceStore,
|
||||||
|
@ -25,8 +25,8 @@ from synapse.storage.database import (
|
|||||||
LoggingDatabaseConnection,
|
LoggingDatabaseConnection,
|
||||||
LoggingTransaction,
|
LoggingTransaction,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
|
||||||
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
|
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
|
||||||
|
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
|
|
||||||
@ -122,7 +122,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st
|
|||||||
return DEFAULT_NOTIF_ACTION
|
return DEFAULT_NOTIF_ACTION
|
||||||
|
|
||||||
|
|
||||||
class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore):
|
class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBaseStore):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
database: DatabasePool,
|
database: DatabasePool,
|
||||||
@ -218,7 +218,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBas
|
|||||||
retcol="event_id",
|
retcol="event_id",
|
||||||
)
|
)
|
||||||
|
|
||||||
stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) # type: ignore[attr-defined]
|
stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)
|
||||||
|
|
||||||
return self._get_unread_counts_by_pos_txn(
|
return self._get_unread_counts_by_pos_txn(
|
||||||
txn, room_id, user_id, stream_ordering
|
txn, room_id, user_id, stream_ordering
|
||||||
@ -307,12 +307,22 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBas
|
|||||||
actions that have been deleted from `event_push_actions` table.
|
actions that have been deleted from `event_push_actions` table.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# If there have been no events in the room since the stream ordering,
|
||||||
|
# there can't be any push actions either.
|
||||||
|
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
clause = ""
|
clause = ""
|
||||||
args = [user_id, room_id, stream_ordering]
|
args = [user_id, room_id, stream_ordering]
|
||||||
if max_stream_ordering is not None:
|
if max_stream_ordering is not None:
|
||||||
clause = "AND ea.stream_ordering <= ?"
|
clause = "AND ea.stream_ordering <= ?"
|
||||||
args.append(max_stream_ordering)
|
args.append(max_stream_ordering)
|
||||||
|
|
||||||
|
# If the max stream ordering is less than the min stream ordering,
|
||||||
|
# then obviously there are zero push actions in that range.
|
||||||
|
if max_stream_ordering <= stream_ordering:
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
sql = f"""
|
sql = f"""
|
||||||
SELECT
|
SELECT
|
||||||
COUNT(CASE WHEN notif = 1 THEN 1 END),
|
COUNT(CASE WHEN notif = 1 THEN 1 END),
|
||||||
|
@ -46,10 +46,12 @@ from typing import (
|
|||||||
Set,
|
Set,
|
||||||
Tuple,
|
Tuple,
|
||||||
cast,
|
cast,
|
||||||
|
overload,
|
||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
from frozendict import frozendict
|
from frozendict import frozendict
|
||||||
|
from typing_extensions import Literal
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
@ -795,6 +797,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
)
|
)
|
||||||
return RoomStreamToken(topo, stream_ordering)
|
return RoomStreamToken(topo, stream_ordering)
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def get_stream_id_for_event_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
event_id: str,
|
||||||
|
allow_none: Literal[False] = False,
|
||||||
|
) -> int:
|
||||||
|
...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def get_stream_id_for_event_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
event_id: str,
|
||||||
|
allow_none: bool = False,
|
||||||
|
) -> Optional[int]:
|
||||||
|
...
|
||||||
|
|
||||||
def get_stream_id_for_event_txn(
|
def get_stream_id_for_event_txn(
|
||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
|
@ -86,6 +86,8 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
|
|||||||
event.internal_metadata.is_outlier.return_value = False
|
event.internal_metadata.is_outlier.return_value = False
|
||||||
event.depth = stream
|
event.depth = stream
|
||||||
|
|
||||||
|
self.store._events_stream_cache.entity_has_changed(room_id, stream)
|
||||||
|
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.db_pool.simple_insert(
|
self.store.db_pool.simple_insert(
|
||||||
table="events",
|
table="events",
|
||||||
|
Loading…
Reference in New Issue
Block a user