Store invites in a separate table.

This commit is contained in:
Erik Johnston 2016-04-04 15:52:01 +01:00
parent d76d89323c
commit 3d76b7cb2b
5 changed files with 124 additions and 32 deletions

View file

@ -36,7 +36,7 @@ RoomsForUser = namedtuple(
class RoomMemberStore(SQLBaseStore):
def _store_room_members_txn(self, txn, events):
def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
"""
self._simple_insert_many_txn(
@ -62,6 +62,41 @@ class RoomMemberStore(SQLBaseStore):
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
)
txn.call_after(
self.get_invited_rooms_for_user.invalidate, (event.state_key,)
)
is_mine = self.hs.is_mine_id(event.state_key)
is_new_state = not backfilled and (
not event.internal_metadata.is_outlier()
or event.internal_metadata.is_invite_from_remote()
)
if is_new_state and is_mine:
if event.membership == Membership.INVITE:
self._simple_insert_txn(
txn,
table="invites",
values={
"event_id": event.event_id,
"invitee": event.state_key,
"inviter": event.sender,
"room_id": event.room_id,
"stream_id": event.internal_metadata.stream_ordering,
}
)
else:
sql = (
"UPDATE invites SET stream_id = ?, replaced_by = ? WHERE"
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
" AND replaced_by is NULL"
)
txn.execute(sql, (
event.internal_metadata.stream_ordering,
event.event_id,
event.room_id,
event.state_key,
))
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@ -127,6 +162,14 @@ class RoomMemberStore(SQLBaseStore):
user_id, [Membership.INVITE]
)
@defer.inlineCallbacks
def get_inviter(self, user_id, room_id):
invites = yield self.get_invited_rooms_for_user(user_id)
for invite in invites:
if invite.room_id == room_id:
defer.returnValue(invite)
defer.returnValue(None)
def get_leave_and_ban_events_for_user(self, user_id):
""" Get all the leave events for a user
Args:
@ -163,29 +206,55 @@ class RoomMemberStore(SQLBaseStore):
def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
membership_list):
where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
" OR ".join(["membership = ?" for _ in membership_list]),
)
args = [user_id]
args.extend(membership_list)
do_invite = Membership.INVITE in membership_list
membership_list = [m for m in membership_list if m != Membership.INVITE]
sql = (
"SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
" FROM current_state_events as c"
" INNER JOIN room_memberships as m"
" ON m.event_id = c.event_id"
" INNER JOIN events as e"
" ON e.event_id = c.event_id"
" AND m.room_id = c.room_id"
" AND m.user_id = c.state_key"
" WHERE %s"
) % (where_clause,)
results = []
if membership_list:
where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
" OR ".join(["membership = ?" for _ in membership_list]),
)
txn.execute(sql, args)
return [
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
]
args = [user_id]
args.extend(membership_list)
sql = (
"SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
" FROM current_state_events as c"
" INNER JOIN room_memberships as m"
" ON m.event_id = c.event_id"
" INNER JOIN events as e"
" ON e.event_id = c.event_id"
" AND m.room_id = c.room_id"
" AND m.user_id = c.state_key"
" WHERE %s"
) % (where_clause,)
txn.execute(sql, args)
results = [
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
]
if do_invite:
sql = (
"SELECT i.room_id, inviter, i.event_id, e.stream_ordering"
" FROM invites as i"
" INNER JOIN events as e USING (event_id)"
" WHERE invitee = ? AND locally_rejected is NULL"
" AND replaced_by is NULL"
)
txn.execute(sql, (user_id,))
results.extend(RoomsForUser(
room_id=r["room_id"],
sender=r["inviter"],
event_id=r["event_id"],
stream_ordering=r["stream_ordering"],
membership=Membership.INVITE,
) for r in self.cursor_to_dict(txn))
return results
@cached(max_entries=5000)
def get_joined_hosts_for_room(self, room_id):