Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2017-02-02 09:30:20 +00:00
commit f57cb21952
6 changed files with 65 additions and 10 deletions

View file

@ -504,7 +504,7 @@ class TransactionQueue(object):
code = e.code code = e.code
response = e.response response = e.response
if e.code == 429 or 500 <= e.code: if e.code in (401, 404, 429) or 500 <= e.code:
logger.info( logger.info(
"TX [%s] {%s} got %d response", "TX [%s] {%s} got %d response",
destination, txn_id, code destination, txn_id, code

View file

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
from synapse.api import errors from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.util import stringutils from synapse.util import stringutils
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
@ -221,15 +222,52 @@ class DeviceHandler(BaseHandler):
self.federation_sender.send_device_messages(host) self.federation_sender.send_device_messages(host)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_device_key): def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.
Args:
user_id (str)
from_token (StreamToken)
"""
rooms = yield self.store.get_rooms_for_user(user_id) rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(r.room_id for r in rooms) room_ids = set(r.room_id for r in rooms)
user_ids_changed = set() # First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed( changed = yield self.store.get_user_whose_devices_changed(
from_device_key from_token.device_list_key
) )
for other_user_id in changed:
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
possibly_changed = set(changed)
for room_id in rooms_changed:
# Fetch (an approximation) of the current state at the time.
event_rows, token = yield self.store.get_recent_event_ids_for_room(
room_id, end_token=from_token.room_key, limit=1,
)
if event_rows:
last_event_id = event_rows[-1]["event_id"]
prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id)
else:
prev_state_ids = {}
current_state_ids = yield self.state.get_current_state_ids(room_id)
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype == EventTypes.Member:
prev_event_id = prev_state_ids.get(key, None)
if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key)
user_ids_changed = set()
for other_user_id in possibly_changed:
other_rooms = yield self.store.get_rooms_for_user(other_user_id) other_rooms = yield self.store.get_rooms_for_user(other_user_id)
if room_ids.intersection(e.room_id for e in other_rooms): if room_ids.intersection(e.room_id for e in other_rooms):
user_ids_changed.add(other_user_id) user_ids_changed.add(other_user_id)

View file

@ -130,7 +130,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.invited or self.invited or
self.archived or self.archived or
self.account_data or self.account_data or
self.to_device self.to_device or
self.device_lists
) )

View file

@ -189,7 +189,7 @@ class KeyChangesServlet(RestServlet):
user_id = requester.user.to_string() user_id = requester.user.to_string()
changed = yield self.device_handler.get_user_ids_changed( changed = yield self.device_handler.get_user_ids_changed(
user_id, from_token.device_list_key, user_id, from_token,
) )
defer.returnValue((200, { defer.returnValue((200, {

View file

@ -244,6 +244,20 @@ class StreamStore(SQLBaseStore):
defer.returnValue(results) defer.returnValue(results)
def get_rooms_that_changed(self, room_ids, from_key):
"""Given a list of rooms and a token, return rooms where there may have
been changes.
Args:
room_ids (list)
from_key (str): The room_key portion of a StreamToken
"""
from_key = RoomStreamToken.parse_stream_token(from_key).stream
return set(
room_id for room_id in room_ids
if self._events_stream_cache.has_entity_changed(room_id, from_key)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'): order='DESC'):

View file

@ -129,11 +129,13 @@ class RetryDestinationLimiter(object):
# APIs may expect to never received e.g. a 404. It's important to # APIs may expect to never received e.g. a 404. It's important to
# handle 404 as some remote servers will return a 404 when the HS # handle 404 as some remote servers will return a 404 when the HS
# has been decommissioned. # has been decommissioned.
# If we get a 401, then we should probably back off since they
# won't accept our requests for at least a while.
# 429 is us being aggresively rate limited, so lets rate limit
# ourselves.
if exc_val.code == 404 and self.backoff_on_404: if exc_val.code == 404 and self.backoff_on_404:
valid_err_code = False valid_err_code = False
elif exc_val.code == 429: elif exc_val.code in (401, 429):
# 429 is us being aggresively rate limited, so lets rate limit
# ourselves.
valid_err_code = False valid_err_code = False
elif exc_val.code < 500: elif exc_val.code < 500:
valid_err_code = True valid_err_code = True