Merge pull request #692 from matrix-org/markjh/replicate_reshuffle

Separate generating the replication response...
This commit is contained in:
Mark Haines 2016-04-05 13:23:36 +01:00
commit 2e308a3a38

View File

@ -145,32 +145,43 @@ class ReplicationResource(Resource):
timeout = parse_integer(request, "timeout", 10 * 1000)
request.setHeader(b"Content-Type", b"application/json")
writer = _Writer(request)
request_streams = {
name: parse_integer(request, name)
for names in STREAM_NAMES for name in names
}
request_streams["streams"] = parse_string(request, "streams")
def replicate():
return self.replicate(request_streams, limit)
result = yield self.notifier.wait_for_replication(replicate, timeout)
request.write(json.dumps(result, ensure_ascii=False))
finish_request(request)
@defer.inlineCallbacks
def replicate():
def replicate(self, request_streams, limit):
writer = _Writer()
current_token = yield self.current_replication_token()
logger.info("Replicating up to %r", current_token)
yield self.account_data(writer, current_token, limit)
yield self.events(writer, current_token, limit)
yield self.presence(writer, current_token) # TODO: implement limit
yield self.typing(writer, current_token) # TODO: implement limit
yield self.receipts(writer, current_token, limit)
yield self.push_rules(writer, current_token, limit)
yield self.pushers(writer, current_token, limit)
yield self.state(writer, current_token, limit)
self.streams(writer, current_token)
yield self.account_data(writer, current_token, limit, request_streams)
yield self.events(writer, current_token, limit, request_streams)
# TODO: implement limit
yield self.presence(writer, current_token, request_streams)
yield self.typing(writer, current_token, request_streams)
yield self.receipts(writer, current_token, limit, request_streams)
yield self.push_rules(writer, current_token, limit, request_streams)
yield self.pushers(writer, current_token, limit, request_streams)
yield self.state(writer, current_token, limit, request_streams)
self.streams(writer, current_token, request_streams)
logger.info("Replicated %d rows", writer.total)
defer.returnValue(writer.total)
defer.returnValue(writer.finish())
yield self.notifier.wait_for_replication(replicate, timeout)
writer.finish()
def streams(self, writer, current_token):
request_token = parse_string(writer.request, "streams")
def streams(self, writer, current_token, request_streams):
request_token = request_streams.get("streams")
streams = []
@ -195,9 +206,9 @@ class ReplicationResource(Resource):
)
@defer.inlineCallbacks
def events(self, writer, current_token, limit):
request_events = parse_integer(writer.request, "events")
request_backfill = parse_integer(writer.request, "backfill")
def events(self, writer, current_token, limit, request_streams):
request_events = request_streams.get("events")
request_backfill = request_streams.get("backfill")
if request_events is not None or request_backfill is not None:
if request_events is None:
@ -228,10 +239,10 @@ class ReplicationResource(Resource):
)
@defer.inlineCallbacks
def presence(self, writer, current_token):
def presence(self, writer, current_token, request_streams):
current_position = current_token.presence
request_presence = parse_integer(writer.request, "presence")
request_presence = request_streams.get("presence")
if request_presence is not None:
presence_rows = yield self.presence_handler.get_all_presence_updates(
@ -244,10 +255,10 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def typing(self, writer, current_token):
def typing(self, writer, current_token, request_streams):
current_position = current_token.presence
request_typing = parse_integer(writer.request, "typing")
request_typing = request_streams.get("typing")
if request_typing is not None:
typing_rows = yield self.typing_handler.get_all_typing_updates(
@ -258,10 +269,10 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def receipts(self, writer, current_token, limit):
def receipts(self, writer, current_token, limit, request_streams):
current_position = current_token.receipts
request_receipts = parse_integer(writer.request, "receipts")
request_receipts = request_streams.get("receipts")
if request_receipts is not None:
receipts_rows = yield self.store.get_all_updated_receipts(
@ -272,12 +283,12 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def account_data(self, writer, current_token, limit):
def account_data(self, writer, current_token, limit, request_streams):
current_position = current_token.account_data
user_account_data = parse_integer(writer.request, "user_account_data")
room_account_data = parse_integer(writer.request, "room_account_data")
tag_account_data = parse_integer(writer.request, "tag_account_data")
user_account_data = request_streams.get("user_account_data")
room_account_data = request_streams.get("room_account_data")
tag_account_data = request_streams.get("tag_account_data")
if user_account_data is not None or room_account_data is not None:
if user_account_data is None:
@ -303,10 +314,10 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def push_rules(self, writer, current_token, limit):
def push_rules(self, writer, current_token, limit, request_streams):
current_position = current_token.push_rules
push_rules = parse_integer(writer.request, "push_rules")
push_rules = request_streams.get("push_rules")
if push_rules is not None:
rows = yield self.store.get_all_push_rule_updates(
@ -318,10 +329,11 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def pushers(self, writer, current_token, limit):
def pushers(self, writer, current_token, limit, request_streams):
current_position = current_token.pushers
pushers = parse_integer(writer.request, "pushers")
pushers = request_streams.get("pushers")
if pushers is not None:
updated, deleted = yield self.store.get_all_updated_pushers(
pushers, current_position, limit
@ -336,10 +348,11 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def state(self, writer, current_token, limit):
def state(self, writer, current_token, limit, request_streams):
current_position = current_token.state
state = parse_integer(writer.request, "state")
state = request_streams.get("state")
if state is not None:
state_groups, state_group_state = (
yield self.store.get_all_new_state_groups(
@ -356,9 +369,8 @@ class ReplicationResource(Resource):
class _Writer(object):
"""Writes the streams as a JSON object as the response to the request"""
def __init__(self, request):
def __init__(self):
self.streams = {}
self.request = request
self.total = 0
def write_header_and_rows(self, name, rows, fields, position=None):
@ -377,8 +389,7 @@ class _Writer(object):
self.total += len(rows)
def finish(self):
self.request.write(json.dumps(self.streams, ensure_ascii=False))
finish_request(self.request)
return self.streams
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (