Run Black. (#5482)

This commit is contained in:
Amber Brown 2019-06-20 19:32:02 +10:00 committed by GitHub
parent 7dcf984075
commit 32e7c9e7f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
376 changed files with 9142 additions and 10388 deletions

View file

@ -50,16 +50,20 @@ logger = logging.getLogger(__name__)
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
federation_presence_out_counter = Counter(
"synapse_handler_presence_federation_presence_out", "")
"synapse_handler_presence_federation_presence_out", ""
)
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
federation_presence_counter = Counter(
"synapse_handler_presence_federation_presence", ""
)
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
notify_reason_counter = Counter(
"synapse_handler_presence_notify_reason", "", ["reason"])
"synapse_handler_presence_notify_reason", "", ["reason"]
)
state_transition_counter = Counter(
"synapse_handler_presence_state_transition", "", ["from", "to"]
)
@ -90,7 +94,6 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
def __init__(self, hs):
"""
@ -110,31 +113,26 @@ class PresenceHandler(object):
federation_registry = hs.get_federation_registry()
federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
active_presence = self.store.take_presence_startup_info()
# A dictionary of the current state of users. This is prefilled with
# non-offline presence from the DB. We should fetch from the DB if
# we can't find a users presence in here.
self.user_to_current_state = {
state.user_id: state
for state in active_presence
}
self.user_to_current_state = {state.user_id: state for state in active_presence}
LaterGauge(
"synapse_handlers_presence_user_to_current_state_size", "", [],
lambda: len(self.user_to_current_state)
"synapse_handlers_presence_user_to_current_state_size",
"",
[],
lambda: len(self.user_to_current_state),
)
now = self.clock.time_msec()
for state in active_presence:
self.wheel_timer.insert(
now=now,
obj=state.user_id,
then=state.last_active_ts + IDLE_TIMER,
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
self.wheel_timer.insert(
now=now,
@ -193,27 +191,21 @@ class PresenceHandler(object):
"handle_presence_timeouts", self._handle_timeouts
)
self.clock.call_later(
30,
self.clock.looping_call,
run_timeout_handler,
5000,
)
self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
def run_persister():
return run_as_background_process(
"persist_presence_changes", self._persist_unpersisted_changes
)
self.clock.call_later(
60,
self.clock.looping_call,
run_persister,
60 * 1000,
)
self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
LaterGauge(
"synapse_handlers_presence_wheel_timer_size",
"",
[],
lambda: len(self.wheel_timer),
)
# Used to handle sending of presence to newly joined users/servers
if hs.config.use_presence:
@ -241,15 +233,17 @@ class PresenceHandler(object):
logger.info(
"Performing _on_shutdown. Persisting %d unpersisted changes",
len(self.user_to_current_state)
len(self.user_to_current_state),
)
if self.unpersisted_users_changes:
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in self.unpersisted_users_changes
])
yield self.store.update_presence(
[
self.user_to_current_state[user_id]
for user_id in self.unpersisted_users_changes
]
)
logger.info("Finished _on_shutdown")
@defer.inlineCallbacks
@ -261,13 +255,10 @@ class PresenceHandler(object):
self.unpersisted_users_changes = set()
if unpersisted:
logger.info(
"Persisting %d upersisted presence updates", len(unpersisted)
logger.info("Persisting %d upersisted presence updates", len(unpersisted))
yield self.store.update_presence(
[self.user_to_current_state[user_id] for user_id in unpersisted]
)
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in unpersisted
])
@defer.inlineCallbacks
def _update_states(self, new_states):
@ -303,10 +294,11 @@ class PresenceHandler(object):
)
new_state, should_notify, should_ping = handle_update(
prev_state, new_state,
prev_state,
new_state,
is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer,
now=now
now=now,
)
self.user_to_current_state[user_id] = new_state
@ -328,7 +320,8 @@ class PresenceHandler(object):
self.unpersisted_users_changes -= set(to_notify.keys())
to_federation_ping = {
user_id: state for user_id, state in to_federation_ping.items()
user_id: state
for user_id, state in to_federation_ping.items()
if user_id not in to_notify
}
if to_federation_ping:
@ -351,8 +344,8 @@ class PresenceHandler(object):
# Check whether the lists of syncing processes from an external
# process have expired.
expired_process_ids = [
process_id for process_id, last_update
in self.external_process_last_updated_ms.items()
process_id
for process_id, last_update in self.external_process_last_updated_ms.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
@ -362,9 +355,7 @@ class PresenceHandler(object):
self.external_process_last_update.pop(process_id)
states = [
self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id)
)
self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
for user_id in users_to_check
]
@ -394,9 +385,7 @@ class PresenceHandler(object):
prev_state = yield self.current_state_for_user(user_id)
new_fields = {
"last_active_ts": self.clock.time_msec(),
}
new_fields = {"last_active_ts": self.clock.time_msec()}
if prev_state.state == PresenceState.UNAVAILABLE:
new_fields["state"] = PresenceState.ONLINE
@ -430,15 +419,23 @@ class PresenceHandler(object):
if prev_state.state == PresenceState.OFFLINE:
# If they're currently offline then bring them online, otherwise
# just update the last sync times.
yield self._update_states([prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=self.clock.time_msec(),
last_user_sync_ts=self.clock.time_msec(),
)])
yield self._update_states(
[
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=self.clock.time_msec(),
last_user_sync_ts=self.clock.time_msec(),
)
]
)
else:
yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(),
)])
yield self._update_states(
[
prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec()
)
]
)
@defer.inlineCallbacks
def _end():
@ -446,9 +443,13 @@ class PresenceHandler(object):
self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id)
yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(),
)])
yield self._update_states(
[
prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec()
)
]
)
except Exception:
logger.exception("Error updating presence after sync")
@ -469,7 +470,8 @@ class PresenceHandler(object):
"""
if self.hs.config.use_presence:
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
@ -479,7 +481,9 @@ class PresenceHandler(object):
return set()
@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec
):
"""Update the syncing users for an external process as a delta.
Args:
@ -500,20 +504,22 @@ class PresenceHandler(object):
updates = []
if is_syncing and user_id not in process_presence:
if prev_state.state == PresenceState.OFFLINE:
updates.append(prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=sync_time_msec,
last_user_sync_ts=sync_time_msec,
))
updates.append(
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=sync_time_msec,
last_user_sync_ts=sync_time_msec,
)
)
else:
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec,
))
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
)
process_presence.add(user_id)
elif user_id in process_presence:
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec,
))
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
)
if not is_syncing:
process_presence.discard(user_id)
@ -537,12 +543,12 @@ class PresenceHandler(object):
prev_states = yield self.current_state_for_users(process_presence)
time_now_ms = self.clock.time_msec()
yield self._update_states([
prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
)
for prev_state in itervalues(prev_states)
])
yield self._update_states(
[
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
for prev_state in itervalues(prev_states)
]
)
self.external_process_last_updated_ms.pop(process_id, None)
@defer.inlineCallbacks
@ -574,8 +580,7 @@ class PresenceHandler(object):
missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id)
for user_id in missing
user_id: UserPresenceState.default(user_id) for user_id in missing
}
states.update(new)
self.user_to_current_state.update(new)
@ -593,8 +598,10 @@ class PresenceHandler(object):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states]
"presence_key",
stream_id,
rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states],
)
self._push_to_remotes(states)
@ -605,8 +612,10 @@ class PresenceHandler(object):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states]
"presence_key",
stream_id,
rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states],
)
def _push_to_remotes(self, states):
@ -631,15 +640,15 @@ class PresenceHandler(object):
user_id = push.get("user_id", None)
if not user_id:
logger.info(
"Got presence update from %r with no 'user_id': %r",
origin, push,
"Got presence update from %r with no 'user_id': %r", origin, push
)
continue
if get_domain_from_id(user_id) != origin:
logger.info(
"Got presence update from %r with bad 'user_id': %r",
origin, user_id,
origin,
user_id,
)
continue
@ -647,14 +656,12 @@ class PresenceHandler(object):
if not presence_state:
logger.info(
"Got presence update from %r with no 'presence_state': %r",
origin, push,
origin,
push,
)
continue
new_fields = {
"state": presence_state,
"last_federation_update_ts": now,
}
new_fields = {"state": presence_state, "last_federation_update_ts": now}
last_active_ago = push.get("last_active_ago", None)
if last_active_ago is not None:
@ -672,10 +679,7 @@ class PresenceHandler(object):
@defer.inlineCallbacks
def get_state(self, target_user, as_event=False):
results = yield self.get_states(
[target_user.to_string()],
as_event=as_event,
)
results = yield self.get_states([target_user.to_string()], as_event=as_event)
defer.returnValue(results[0])
@ -699,13 +703,15 @@ class PresenceHandler(object):
now = self.clock.time_msec()
if as_event:
defer.returnValue([
{
"type": "m.presence",
"content": format_user_presence_state(state, now),
}
for state in updates
])
defer.returnValue(
[
{
"type": "m.presence",
"content": format_user_presence_state(state, now),
}
for state in updates
]
)
else:
defer.returnValue(updates)
@ -717,7 +723,9 @@ class PresenceHandler(object):
presence = state["presence"]
valid_presence = (
PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)
if presence not in valid_presence:
raise SynapseError(400, "Invalid presence state")
@ -726,9 +734,7 @@ class PresenceHandler(object):
prev_state = yield self.current_state_for_user(user_id)
new_fields = {
"state": presence
}
new_fields = {"state": presence}
if not ignore_status_msg:
msg = status_msg if presence != PresenceState.OFFLINE else None
@ -877,8 +883,7 @@ class PresenceHandler(object):
hosts = set(host for host in hosts if host != self.server_name)
self.federation.send_presence_to_destinations(
states=[state],
destinations=hosts,
states=[state], destinations=hosts
)
else:
# A remote user has joined the room, so we need to:
@ -904,7 +909,8 @@ class PresenceHandler(object):
# default state.
now = self.clock.time_msec()
states = [
state for state in states.values()
state
for state in states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
@ -912,8 +918,7 @@ class PresenceHandler(object):
if states:
self.federation.send_presence_to_destinations(
states=states,
destinations=[get_domain_from_id(user_id)],
states=states, destinations=[get_domain_from_id(user_id)]
)
@ -937,7 +942,10 @@ def should_notify(old_state, new_state):
notify_reason_counter.labels("current_active_change").inc()
return True
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
if (
new_state.last_active_ts - old_state.last_active_ts
> LAST_ACTIVE_GRANULARITY
):
# Only notify about last active bumps if we're not currently acive
if not new_state.currently_active:
notify_reason_counter.labels("last_active_change_online").inc()
@ -958,9 +966,7 @@ def format_user_presence_state(state, now, include_user_id=True):
The "user_id" is optional so that this function can be used to format presence
updates for client /sync responses and for federation /send requests.
"""
content = {
"presence": state.state,
}
content = {"presence": state.state}
if include_user_id:
content["user_id"] = state.user_id
if state.last_active_ts:
@ -986,8 +992,15 @@ class PresenceEventSource(object):
@defer.inlineCallbacks
@log_function
def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
explicit_room_id=None, **kwargs):
def get_new_events(
self,
user,
from_key,
room_ids=None,
include_offline=True,
explicit_room_id=None,
**kwargs
):
# The process for getting presence events are:
# 1. Get the rooms the user is in.
# 2. Get the list of user in the rooms.
@ -1030,7 +1043,7 @@ class PresenceEventSource(object):
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
users_interested_in, from_key,
users_interested_in, from_key
)
else:
user_ids_changed = users_interested_in
@ -1040,10 +1053,16 @@ class PresenceEventSource(object):
if include_offline:
defer.returnValue((list(updates.values()), max_token))
else:
defer.returnValue(([
s for s in itervalues(updates)
if s.state != PresenceState.OFFLINE
], max_token))
defer.returnValue(
(
[
s
for s in itervalues(updates)
if s.state != PresenceState.OFFLINE
],
max_token,
)
)
def get_current_key(self):
return self.store.get_current_presence_token()
@ -1061,13 +1080,13 @@ class PresenceEventSource(object):
users_interested_in.add(user_id) # So that we receive our own presence
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate,
user_id, on_invalidate=cache_context.invalidate
)
users_interested_in.update(users_who_share_room)
if explicit_room_id:
user_ids = yield self.store.get_users_in_room(
explicit_room_id, on_invalidate=cache_context.invalidate,
explicit_room_id, on_invalidate=cache_context.invalidate
)
users_interested_in.update(user_ids)
@ -1123,9 +1142,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
if now - state.last_active_ts > IDLE_TIMER:
# Currently online, but last activity ages ago so auto
# idle
state = state.copy_and_replace(
state=PresenceState.UNAVAILABLE,
)
state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
changed = True
elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# So that we send down a notification that we've
@ -1145,8 +1162,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
status_msg=None,
state=PresenceState.OFFLINE, status_msg=None
)
changed = True
else:
@ -1155,10 +1171,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
# no one gets stuck online forever.
if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
# The other side seems to have disappeared.
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
status_msg=None,
)
state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None)
changed = True
return state if changed else None
@ -1193,21 +1206,17 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
if new_state.state == PresenceState.ONLINE:
# Idle timer
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_active_ts + IDLE_TIMER
now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
)
active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
new_state = new_state.copy_and_replace(
currently_active=active,
)
new_state = new_state.copy_and_replace(currently_active=active)
if active:
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
)
if new_state.state != PresenceState.OFFLINE:
@ -1215,29 +1224,25 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
last_federate = new_state.last_federation_update_ts
if now - last_federate > FEDERATION_PING_INTERVAL:
# Been a while since we've poked remote servers
new_state = new_state.copy_and_replace(
last_federation_update_ts=now,
)
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
federation_ping = True
else:
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
)
# Check whether the change was something worth notifying about
if should_notify(prev_state, new_state):
new_state = new_state.copy_and_replace(
last_federation_update_ts=now,
)
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
persist_and_notify = True
return new_state, persist_and_notify, federation_ping