mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 12:16:09 -04:00
Add methods to events, account data and receipt slaves
Adds the methods needed by /sync to the slaved events, account data and receipt stores.
This commit is contained in:
parent
6bb9aacf9d
commit
81cf449daa
3 changed files with 81 additions and 6 deletions
|
@ -15,7 +15,10 @@
|
|||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.account_data import AccountDataStore
|
||||
from synapse.storage.tags import TagsStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
|
||||
class SlavedAccountDataStore(BaseSlavedStore):
|
||||
|
@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore):
|
|||
self._account_data_id_gen = SlavedIdTracker(
|
||||
db_conn, "account_data_max_stream_id", "stream_id",
|
||||
)
|
||||
self._account_data_stream_cache = StreamChangeCache(
|
||||
"AccountDataAndTagsChangeCache",
|
||||
self._account_data_id_gen.get_current_token(),
|
||||
)
|
||||
|
||||
get_account_data_for_user = (
|
||||
AccountDataStore.__dict__["get_account_data_for_user"]
|
||||
)
|
||||
|
||||
get_global_account_data_by_type_for_users = (
|
||||
AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
|
||||
|
@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore):
|
|||
AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
|
||||
)
|
||||
|
||||
get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
|
||||
|
||||
get_updated_tags = DataStore.get_updated_tags.__func__
|
||||
get_updated_account_data_for_user = (
|
||||
DataStore.get_updated_account_data_for_user.__func__
|
||||
)
|
||||
|
||||
def get_max_account_data_stream_id(self):
|
||||
return self._account_data_id_gen.get_current_token()
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(SlavedAccountDataStore, self).stream_positions()
|
||||
position = self._account_data_id_gen.get_current_token()
|
||||
|
@ -47,15 +68,33 @@ class SlavedAccountDataStore(BaseSlavedStore):
|
|||
if stream:
|
||||
self._account_data_id_gen.advance(int(stream["position"]))
|
||||
for row in stream["rows"]:
|
||||
user_id, data_type = row[1:3]
|
||||
position, user_id, data_type = row[:3]
|
||||
self.get_global_account_data_by_type_for_user.invalidate(
|
||||
(data_type, user_id,)
|
||||
)
|
||||
self.get_account_data_for_user.invalidate((user_id,))
|
||||
self._account_data_stream_cache.entity_has_changed(
|
||||
user_id, position
|
||||
)
|
||||
|
||||
stream = result.get("room_account_data")
|
||||
if stream:
|
||||
self._account_data_id_gen.advance(int(stream["position"]))
|
||||
for row in stream["rows"]:
|
||||
position, user_id = row[:2]
|
||||
self.get_account_data_for_user.invalidate((user_id,))
|
||||
self._account_data_stream_cache.entity_has_changed(
|
||||
user_id, position
|
||||
)
|
||||
|
||||
stream = result.get("tag_account_data")
|
||||
if stream:
|
||||
self._account_data_id_gen.advance(int(stream["position"]))
|
||||
for row in stream["rows"]:
|
||||
position, user_id = row[:2]
|
||||
self.get_tags_for_user.invalidate((user_id,))
|
||||
self._account_data_stream_cache.entity_has_changed(
|
||||
user_id, position
|
||||
)
|
||||
|
||||
return super(SlavedAccountDataStore, self).process_replication(result)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue