mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 10:36:06 -04:00
Initial worker impl
This commit is contained in:
parent
a9d6fa8b2b
commit
6aa5bc8635
6 changed files with 328 additions and 7 deletions
|
@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
|
|||
"data_type", # str
|
||||
"data", # dict
|
||||
))
|
||||
CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
|
||||
"room_id", # str
|
||||
"type", # str
|
||||
"state_key", # str
|
||||
"event_id", # str, optional
|
||||
))
|
||||
|
||||
|
||||
class Stream(object):
|
||||
|
@ -443,6 +449,21 @@ class AccountDataStream(Stream):
|
|||
defer.returnValue(results)
|
||||
|
||||
|
||||
class CurrentStateDeltaStream(Stream):
|
||||
"""Current state for a room was changed
|
||||
"""
|
||||
NAME = "current_state_deltas"
|
||||
ROW_TYPE = CurrentStateDeltaStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_max_current_state_delta_stream_id
|
||||
self.update_function = store.get_all_updated_current_state_deltas
|
||||
|
||||
super(CurrentStateDeltaStream, self).__init__(hs)
|
||||
|
||||
|
||||
STREAMS_MAP = {
|
||||
stream.NAME: stream
|
||||
for stream in (
|
||||
|
@ -460,5 +481,6 @@ STREAMS_MAP = {
|
|||
FederationStream,
|
||||
TagAccountDataStream,
|
||||
AccountDataStream,
|
||||
CurrentStateDeltaStream,
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue