Add a replication stream for direct to device messages

This commit is contained in:
Mark Haines 2016-08-31 10:38:58 +01:00
parent 55fc17cf4b
commit 1aa3e1d287
4 changed files with 77 additions and 3 deletions

View file

@ -42,6 +42,7 @@ STREAM_NAMES = (
("pushers",),
("state",),
("caches",),
("to_device",),
)
@ -144,6 +145,7 @@ class ReplicationResource(Resource):
pushers_token,
state_token,
caches_token,
int(stream_token.to_device_key),
))
@request_handler()
@ -193,6 +195,7 @@ class ReplicationResource(Resource):
yield self.pushers(writer, current_token, limit, request_streams)
yield self.state(writer, current_token, limit, request_streams)
yield self.caches(writer, current_token, limit, request_streams)
yield self.to_device(writer, current_token, limit, request_streams)
self.streams(writer, current_token, request_streams)
logger.info("Replicated %d rows", writer.total)
@ -398,6 +401,20 @@ class ReplicationResource(Resource):
"position", "cache_func", "keys", "invalidation_ts"
))
@defer.inlineCallbacks
def to_device(self, writer, current_token, limit, request_streams):
current_position = current_token.to_device
to_device = request_streams.get("to_device")
if to_device is not None:
to_device_rows = yield self.store.get_all_new_device_messages(
to_device, current_position, limit
)
writer.write_header_and_rows("to_device", to_device_rows, (
"position", "user_id", "device_id", "message_json"
))
class _Writer(object):
"""Writes the streams as a JSON object as the response to the request"""
@ -426,7 +443,7 @@ class _Writer(object):
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
"events", "presence", "typing", "receipts", "account_data", "backfill",
"push_rules", "pushers", "state", "caches",
"push_rules", "pushers", "state", "caches", "to_device",
))):
__slots__ = []