Add paranoia checks to make sure that we evict stale NotificationListeners when we are about to process them

This commit is contained in:
Erik Johnston 2015-04-08 13:31:06 +01:00
parent 07d4041709
commit 65f5e4e3e4

View File

@ -62,7 +62,8 @@ class _NotificationListener(object):
self.rooms = rooms self.rooms = rooms
self.pending_notifications = [] def notified(self):
return self.deferred.called
def notify(self, notifier, events, start_token, end_token): def notify(self, notifier, events, start_token, end_token):
""" Inform whoever is listening about the new events. This will """ Inform whoever is listening about the new events. This will
@ -78,11 +79,15 @@ class _NotificationListener(object):
except defer.AlreadyCalledError: except defer.AlreadyCalledError:
pass pass
# Should the following be done be using intrusively linked lists?
# -- erikj
for room in self.rooms: for room in self.rooms:
lst = notifier.room_to_listeners.get(room, set()) lst = notifier.room_to_listeners.get(room, set())
lst.discard(self) lst.discard(self)
notifier.user_to_listeners.get(self.user, set()).discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self)
if self.appservice: if self.appservice:
notifier.appservice_to_listeners.get( notifier.appservice_to_listeners.get(
self.appservice, set() self.appservice, set()
@ -161,10 +166,24 @@ class Notifier(object):
room_source = self.event_sources.sources["room"] room_source = self.event_sources.sources["room"]
listeners = self.room_to_listeners.get(room_id, set()).copy() room_listeners = self.room_to_listeners.get(room_id, set())
# Remove any 'stale' listeners.
for l in room_listeners.copy():
if l.notified():
room_listeners.discard(l)
listeners = room_listeners.copy()
for user in extra_users: for user in extra_users:
listeners |= self.user_to_listeners.get(user, set()).copy() user_listeners = self.user_to_listeners.get(user, set())
# Remove any 'stale' listeners.
for l in user_listeners.copy():
if l.notified():
user_listeners.discard(l)
listeners |= user_listeners
for appservice in self.appservice_to_listeners: for appservice in self.appservice_to_listeners:
# TODO (kegan): Redundant appservice listener checks? # TODO (kegan): Redundant appservice listener checks?
@ -173,9 +192,16 @@ class Notifier(object):
# receive *invites* for users they are interested in. Does this # receive *invites* for users they are interested in. Does this
# make the room_to_listeners check somewhat obselete? # make the room_to_listeners check somewhat obselete?
if appservice.is_interested(event): if appservice.is_interested(event):
listeners |= self.appservice_to_listeners.get( app_listeners = self.appservice_to_listeners.get(
appservice, set() appservice, set()
).copy() )
# Remove any 'stale' listeners.
for l in app_listeners.copy():
if l.notified():
app_listeners.discard(l)
listeners |= app_listeners
logger.debug("on_new_room_event listeners %s", listeners) logger.debug("on_new_room_event listeners %s", listeners)