Add m.room.deletion. If an event is deleted it will be returned to clients 'pruned', i.e. all client specified keys will be removed.

This commit is contained in:
Erik Johnston 2014-09-23 15:28:32 +01:00
parent 231afe464a
commit 78af6bbb98
9 changed files with 144 additions and 21 deletions

View file

@ -24,6 +24,7 @@ from synapse.api.events.room import (
RoomAddStateLevelEvent,
RoomSendEventLevelEvent,
RoomOpsPowerLevelsEvent,
RoomDeletionEvent,
)
from synapse.util.logutils import log_function
@ -61,7 +62,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 3
SCHEMA_VERSION = 4
class _RollbackButIsFineException(Exception):
@ -182,6 +183,8 @@ class DataStore(RoomMemberStore, RoomStore,
self._store_send_event_level(txn, event)
elif event.type == RoomOpsPowerLevelsEvent.TYPE:
self._store_ops_level(txn, event)
elif event.type == RoomDeletionEvent.TYPE:
self._store_deletion(txn, event)
vals = {
"topological_ordering": event.depth,
@ -203,7 +206,7 @@ class DataStore(RoomMemberStore, RoomStore,
unrec = {
k: v
for k, v in event.get_full_dict().items()
if k not in vals.keys()
if k not in vals.keys() and k is not "deleted"
}
vals["unrecognized_keys"] = json.dumps(unrec)
@ -241,14 +244,32 @@ class DataStore(RoomMemberStore, RoomStore,
}
)
def _store_deletion(self, txn, event):
event_id = event.event_id
deletes = event.deletes
# We check if this new delete deletes an old delete or has been
# deleted by a previous delete that we received out of order.
sql = "SELECT * FROM deletions WHERE event_id = ? OR deletes = ?"
txn.execute(sql, (deletes, event_id))
if txn.fetchall():
sql = "DELETE FROM deletions WHERE event_id = ? OR deletes = ?"
txn.execute(sql, (deletes, event_id, ))
else:
sql = "INSERT INTO deletions (event_id, deletes) VALUES (?,?)"
txn.execute(sql, (event_id, deletes))
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
sql = (
"SELECT e.* FROM events as e "
"SELECT e.*, (%(deleted)s) AS deleted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
)
) % {
"deleted": "e.event_id IN (SELECT deletes FROM deletions)",
}
if event_type:
sql += " AND s.type = ? AND s.state_key = ? "

View file

@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.api.events.utils import prune_event
from synapse.util.logutils import log_function
import collections
@ -345,7 +346,7 @@ class SQLBaseStore(object):
return self.runInteraction(func)
def _parse_event_from_row(self, row_dict):
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
d = copy.deepcopy({k: v for k, v in row_dict.items()})
d.pop("stream_ordering", None)
d.pop("topological_ordering", None)
@ -373,8 +374,8 @@ class SQLBaseStore(object):
sql = "SELECT * FROM events WHERE event_id = ?"
for ev in events:
if hasattr(ev, "prev_state"):
# Load previous state_content.
if hasattr(ev, "prev_state"):
# Load previous state_content.
# TODO: Should we be pulling this out above?
cursor = txn.execute(sql, (ev.prev_state,))
prevs = self.cursor_to_dict(cursor)
@ -382,8 +383,31 @@ class SQLBaseStore(object):
prev = self._parse_event_from_row(prevs[0])
ev.prev_content = prev.content
if not hasattr(ev, "deleted"):
logger.debug("Doesn't have deleted key: %s", ev)
ev.deleted = self._has_been_deleted_txn(txn, ev)
if ev.deleted:
# Get the deletion event.
sql = "SELECT * FROM events WHERE event_id = ?"
txn.execute(sql, (ev.deleted,))
del_evs = self._parse_events_txn(
txn, self.cursor_to_dict(txn)
)
if del_evs:
prune_event(ev)
ev.pruned = del_evs[0]
return events
def _has_been_deleted_txn(self, txn, event):
sql = "SELECT * FROM deletions WHERE deletes = ?"
txn.execute(sql, (event.event_id,))
return len(txn.fetchall()) > 0
class Table(object):
""" A base class used to store information about a particular table.
"""

View file

@ -182,14 +182,21 @@ class RoomMemberStore(SQLBaseStore):
)
def _get_members_query_txn(self, txn, where_clause, where_values):
del_sql = (
"SELECT event_id FROM deletions WHERE deletes = e.event_id"
)
sql = (
"SELECT e.* FROM events as e "
"SELECT e.*, (%(deleted)s) AS deleted FROM events as e "
"INNER JOIN room_memberships as m "
"ON e.event_id = m.event_id "
"INNER JOIN current_state_events as c "
"ON m.event_id = c.event_id "
"WHERE %s "
) % (where_clause,)
"WHERE %(where)s "
) % {
"deleted": del_sql,
"where": where_clause,
}
txn.execute(sql, where_values)
rows = self.cursor_to_dict(txn)

View file

@ -1,5 +1,7 @@
CREATE TABLE IF NOT EXISTS deletions (
event_id TEXT NOT NULL,
deletes TEXT NOT NULL,
CONSTRAINT ev_uniq UNIQUE (event_id)
deletes TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS deletions_event_id ON deletions (event_id);
CREATE INDEX IF NOT EXISTS deletions_deletes ON deletions (deletes);

View file

@ -157,6 +157,10 @@ class StreamStore(SQLBaseStore):
"WHERE m.user_id = ? "
)
del_sql = (
"SELECT event_id FROM deletions WHERE deletes = e.event_id"
)
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
@ -171,13 +175,14 @@ class StreamStore(SQLBaseStore):
return
sql = (
"SELECT * FROM events as e WHERE "
"SELECT *, (%(deleted)s) AS deleted FROM events AS e WHERE "
"((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) "
"AND e.stream_ordering > ? AND e.stream_ordering <= ? "
"AND e.outlier = 0 "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % {
"deleted": del_sql,
"current": current_room_membership_sql,
"invites": membership_sql,
"limit": limit
@ -224,11 +229,20 @@ class StreamStore(SQLBaseStore):
else:
limit_str = ""
del_sql = (
"SELECT event_id FROM deletions WHERE deletes = events.event_id"
)
sql = (
"SELECT * FROM events "
"SELECT *, (%(deleted)s) AS deleted FROM events "
"WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
"ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
) % {"bounds": bounds, "order": order, "limit": limit_str}
) % {
"deleted": del_sql,
"bounds": bounds,
"order": order,
"limit": limit_str
}
rows = yield self._execute_and_decode(
sql,
@ -257,11 +271,17 @@ class StreamStore(SQLBaseStore):
with_feedback=False):
# TODO (erikj): Handle compressed feedback
del_sql = (
"SELECT event_id FROM deletions WHERE deletes = events.event_id"
)
sql = (
"SELECT * FROM events "
"SELECT *, (%(deleted)s) AS deleted FROM events "
"WHERE room_id = ? AND stream_ordering <= ? "
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
)
) % {
"deleted": del_sql,
}
rows = yield self._execute_and_decode(
sql,