Fix bugs in the /keys/changes api

* `get_forward_extremeties_for_room` takes a numeric `stream_ordering`. We were
  passing a `RoomStreamToken`, which meant that it returned the *current*
  extremities, rather than those corresponding to the `from_token`. However:
* `get_state_ids_for_events` required a second ('types') parameter; this meant
  that a `TypeError` was thrown and we ended up acting as though there was *no*
  prev state.
* `get_state_ids_for_events` actually returns a map from event_id to state
  dictionary - just looking up the state keys in it again meant that we acted
  as though there was no prev state. We now check if each member's state has
  changed since *any* of the extremities.

Also add/fix some comments.
This commit is contained in:
Richard van der Hoff 2017-02-14 13:59:50 +00:00
parent d7457c7661
commit fc2f29c1d0
3 changed files with 58 additions and 11 deletions

View File

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.api import errors from synapse.api import errors
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util import stringutils from synapse.util import stringutils
@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler):
# Then work out if any users have since joined # Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key).stream
possibly_changed = set(changed) possibly_changed = set(changed)
for room_id in rooms_changed: for room_id in rooms_changed:
# Fetch the current state at the time. # Fetch the current state at the time.
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
try: try:
event_ids = yield self.store.get_forward_extremeties_for_room( event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering room_id, stream_ordering=stream_ordering
) )
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) except errors.StoreError:
except: # we have purged the stream_ordering index since the stream
prev_state_ids = {} # ordering: treat it the same as a new room
event_ids = []
current_state_ids = yield self.state.get_current_state_ids(room_id) current_state_ids = yield self.state.get_current_state_ids(room_id)
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue
# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
# If there has been any change in membership, include them in the # If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below, # possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users. # and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.iteritems(): for key, event_id in current_state_ids.iteritems():
etype, state_key = key etype, state_key = key
if etype == EventTypes.Member: if etype != EventTypes.Member:
prev_event_id = prev_state_ids.get(key, None) continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in prev_state_ids.values():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id: if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key) possibly_changed.add(state_key)
break
users_who_share_room = yield self.store.get_users_who_share_room_with_user( users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id user_id

View File

@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore):
) )
def get_forward_extremeties_for_room(self, room_id, stream_ordering): def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".
Throws a StoreError if we have since purged the index for
stream_orderings from that point.
Args:
room_id (str):
stream_ordering (int):
Returns:
deferred, which resolves to a list of event_ids
"""
# We want to make the cache more effective, so we clamp to the last # We want to make the cache more effective, so we clamp to the last
# change before the given ordering. # change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
# We don't always have a full stream_to_exterm_id table, e.g. after # We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a # the upgrade that introduced it, so we make sure we never ask for a
# try and pin to a stream_ordering from before a restart # stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change) last_change = max(self._stream_order_on_start, last_change)
# provided the last_change is recent enough, we now clamp the requested
# stream_ordering to it.
if last_change > self.stream_ordering_month_ago: if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering) stream_ordering = min(last_change, stream_ordering)

View File

@ -413,7 +413,19 @@ class StateStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids}) defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_ids_for_events(self, event_ids, types): def get_state_ids_for_events(self, event_ids, types=None):
"""
Get the state dicts corresponding to a list of events
Args:
event_ids(list(str)): events whose state should be returned
types(list[(str, str)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. May be None, which
matches any key
Returns:
A deferred dict from event_id -> (type, state_key) -> state_event
"""
event_to_groups = yield self._get_state_group_for_events( event_to_groups = yield self._get_state_group_for_events(
event_ids, event_ids,
) )