Add a timestamp to USER_SYNC command

This timestamp is used to indicate when the user last sync'd
This commit is contained in:
Erik Johnston 2017-03-31 11:46:20 +01:00
parent 9d0170ac6c
commit 36d2b66f90
4 changed files with 24 additions and 16 deletions

View File

@ -511,7 +511,7 @@ class PresenceHandler(object):
self.external_process_to_current_syncs[process_id] = syncing_user_ids self.external_process_to_current_syncs[process_id] = syncing_user_ids
@defer.inlineCallbacks @defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing): def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
"""Update the syncing users for an external process as a delta. """Update the syncing users for an external process as a delta.
Args: Args:
@ -520,6 +520,7 @@ class PresenceHandler(object):
as user start and stop syncing against a given process. as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing user_id (str): The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing is_syncing (bool): Whether or not the user is now syncing
sync_time_msec(int): Time in ms when the user was last syncing
""" """
with (yield self.external_sync_linearizer.queue(process_id)): with (yield self.external_sync_linearizer.queue(process_id)):
prev_state = yield self.current_state_for_user(user_id) prev_state = yield self.current_state_for_user(user_id)
@ -527,24 +528,23 @@ class PresenceHandler(object):
process_presence = self.external_process_to_current_syncs.setdefault( process_presence = self.external_process_to_current_syncs.setdefault(
process_id, set() process_id, set()
) )
time_now_ms = self.clock.time_msec()
updates = [] updates = []
if is_syncing and user_id not in process_presence: if is_syncing and user_id not in process_presence:
if prev_state.state == PresenceState.OFFLINE: if prev_state.state == PresenceState.OFFLINE:
updates.append(prev_state.copy_and_replace( updates.append(prev_state.copy_and_replace(
state=PresenceState.ONLINE, state=PresenceState.ONLINE,
last_active_ts=time_now_ms, last_active_ts=sync_time_msec,
last_user_sync_ts=time_now_ms, last_user_sync_ts=sync_time_msec,
)) ))
else: else:
updates.append(prev_state.copy_and_replace( updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms, last_user_sync_ts=sync_time_msec,
)) ))
process_presence.add(user_id) process_presence.add(user_id)
elif user_id in process_presence: elif user_id in process_presence:
updates.append(prev_state.copy_and_replace( updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms, last_user_sync_ts=sync_time_msec,
)) ))
if not is_syncing: if not is_syncing:
@ -553,7 +553,7 @@ class PresenceHandler(object):
if updates: if updates:
yield self._update_states(updates) yield self._update_states(updates)
self.external_process_last_updated_ms[process_id] = time_now_ms self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
@defer.inlineCallbacks @defer.inlineCallbacks
def update_external_syncs_clear(self, process_id): def update_external_syncs_clear(self, process_id):

View File

@ -189,29 +189,34 @@ class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or """Sent by the client to inform the server that a user has started or
stopped syncing. Used to calculate presence on the master. stopped syncing. Used to calculate presence on the master.
Includes a timestamp of when the last user sync was.
Format:: Format::
USER_SYNC <user_id> <state> USER_SYNC <user_id> <state> <last_sync_ms>
Where <state> is either "start" or "stop" Where <state> is either "start" or "stop"
""" """
NAME = "USER_SYNC" NAME = "USER_SYNC"
def __init__(self, user_id, is_syncing): def __init__(self, user_id, is_syncing, last_sync_ms):
self.user_id = user_id self.user_id = user_id
self.is_syncing = is_syncing self.is_syncing = is_syncing
self.last_sync_ms = last_sync_ms
@classmethod @classmethod
def from_line(cls, line): def from_line(cls, line):
user_id, state = line.split(" ", 1) user_id, state, last_sync_ms = line.split(" ", 2)
if state not in ("start", "end"): if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,)) raise Exception("Invalid USER_SYNC state %r" % (state,))
return cls(user_id, state == "start") return cls(user_id, state == "start", int(last_sync_ms))
def to_line(self): def to_line(self):
return " ".join((self.user_id, "start" if self.is_syncing else "end")) return " ".join((
self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
))
class FederationAckCommand(Command): class FederationAckCommand(Command):

View File

@ -368,7 +368,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.name = cmd.data self.name = cmd.data
def on_USER_SYNC(self, cmd): def on_USER_SYNC(self, cmd):
self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing) self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
)
def on_REPLICATE(self, cmd): def on_REPLICATE(self, cmd):
stream_name = cmd.stream_name stream_name = cmd.stream_name
@ -481,8 +483,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Tell the server if we have any users currently syncing (should only # Tell the server if we have any users currently syncing (should only
# happen on synchrotrons) # happen on synchrotrons)
currently_syncing = self.handler.get_currently_syncing_users() currently_syncing = self.handler.get_currently_syncing_users()
now = self.clock.time_msec()
for user_id in currently_syncing: for user_id in currently_syncing:
self.send_command(UserSyncCommand(user_id, True)) self.send_command(UserSyncCommand(user_id, True, now))
# We've now finished connecting to so inform the client handler # We've now finished connecting to so inform the client handler
self.handler.update_connection(self) self.handler.update_connection(self)

View File

@ -220,12 +220,12 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token) self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync") @measure_func("repl.on_user_sync")
def on_user_sync(self, conn_id, user_id, is_syncing): def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker. """A client has started/stopped syncing on a worker.
""" """
user_sync_counter.inc() user_sync_counter.inc()
self.presence_handler.update_external_syncs_row( self.presence_handler.update_external_syncs_row(
conn_id, user_id, is_syncing conn_id, user_id, is_syncing, last_sync_ms,
) )
@measure_func("repl.on_remove_pusher") @measure_func("repl.on_remove_pusher")