Merge branch 'develop' into application-services

This commit is contained in:
Kegan Dougal 2015-02-05 14:28:03 +00:00
commit 951690e54d
27 changed files with 738 additions and 423 deletions

View file

@ -131,21 +131,144 @@ class DataStore(RoomMemberStore, RoomStore,
pass
@defer.inlineCallbacks
def get_event(self, event_id, allow_none=False):
events = yield self._get_events([event_id])
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
allow_none=False):
"""Get an event from the database by event_id.
if not events:
if allow_none:
defer.returnValue(None)
else:
raise RuntimeError("Could not find event %s" % (event_id,))
Args:
event_id (str): The event_id of the event to fetch
check_redacted (bool): If True, check if event has been redacted
and redact it.
get_prev_content (bool): If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected (bool): If True return rejected events.
allow_none (bool): If True, return None if no event found, if
False throw an exception.
defer.returnValue(events[0])
Returns:
Deferred : A FrozenEvent.
"""
event = yield self.runInteraction(
"get_event", self._get_event_txn,
event_id,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
if not event and not allow_none:
raise RuntimeError("Could not find event %s" % (event_id,))
defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
(event.room_id,)
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
if event.is_state() and is_new_state:
if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for prev_state_id, _ in event.prev_state:
self._simple_delete_txn(
txn,
table="state_forward_extremities",
keyvalues={
"event_id": prev_state_id,
}
)
outlier = event.internal_metadata.is_outlier()
if not outlier:
self._store_state_groups_txn(txn, event, context)
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
self._handle_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
)
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
keyvalues={"event_id": event.event_id},
retcol="event_id",
allow_none=True,
)
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
# If we have already persisted this event, we don't need to do any
# more processing.
# The processing above must be done on every call to persist event,
# since they might not have happened on previous calls. For example,
# if we are persisting an event that we had persisted as an outlier,
# but is no longer one.
if have_persisted:
if not outlier:
sql = (
"UPDATE event_json SET internal_metadata = ?"
" WHERE event_id = ?"
)
txn.execute(
sql,
(metadata_json.decode("UTF-8"), event.event_id,)
)
sql = (
"UPDATE events SET outlier = 0"
" WHERE event_id = ?"
)
txn.execute(
sql,
(event.event_id,)
)
return
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
@ -157,8 +280,6 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
outlier = event.internal_metadata.is_outlier()
event_dict = {
k: v
for k, v in event.get_dict().items()
@ -168,10 +289,6 @@ class DataStore(RoomMemberStore, RoomStore,
]
}
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
self._simple_insert_txn(
txn,
table="event_json",
@ -227,41 +344,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
self._handle_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
)
if not outlier:
self._store_state_groups_txn(txn, event, context)
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
if current_state:
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
(event.room_id,)
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
is_state = hasattr(event, "state_key") and event.state_key is not None
if is_state:
if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@ -269,6 +355,7 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key,
}
# TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
@ -305,28 +392,6 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True,
)
if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for prev_state_id, _ in event.prev_state:
self._simple_delete_txn(
txn,
table="state_forward_extremities",
keyvalues={
"event_id": prev_state_id,
}
)
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
@ -357,13 +422,6 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
if not outlier:
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
def _store_redaction(self, txn, event):
txn.execute(
"INSERT OR IGNORE INTO redactions "
@ -480,6 +538,9 @@ class DataStore(RoomMemberStore, RoomStore,
the rejected reason string if we rejected the event, else maps to
None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "

View file

@ -29,7 +29,7 @@ class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
sql = (
"SELECT id, user_name, kind, instance_handle, app_id,"
"SELECT id, user_name, kind, profile_tag, app_id,"
"app_display_name, device_display_name, pushkey, ts, data, "
"last_token, last_success, failing_since "
"FROM pushers "
@ -45,7 +45,7 @@ class PusherStore(SQLBaseStore):
"id": r[0],
"user_name": r[1],
"kind": r[2],
"instance_handle": r[3],
"profile_tag": r[3],
"app_id": r[4],
"app_display_name": r[5],
"device_display_name": r[6],
@ -64,7 +64,7 @@ class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def get_all_pushers(self):
sql = (
"SELECT id, user_name, kind, instance_handle, app_id,"
"SELECT id, user_name, kind, profile_tag, app_id,"
"app_display_name, device_display_name, pushkey, ts, data, "
"last_token, last_success, failing_since "
"FROM pushers"
@ -77,7 +77,7 @@ class PusherStore(SQLBaseStore):
"id": r[0],
"user_name": r[1],
"kind": r[2],
"instance_handle": r[3],
"profile_tag": r[3],
"app_id": r[4],
"app_display_name": r[5],
"device_display_name": r[6],
@ -94,7 +94,7 @@ class PusherStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
def add_pusher(self, user_name, instance_handle, kind, app_id,
def add_pusher(self, user_name, profile_tag, kind, app_id,
app_display_name, device_display_name,
pushkey, pushkey_ts, lang, data):
try:
@ -107,7 +107,7 @@ class PusherStore(SQLBaseStore):
dict(
user_name=user_name,
kind=kind,
instance_handle=instance_handle,
profile_tag=profile_tag,
app_display_name=app_display_name,
device_display_name=device_display_name,
ts=pushkey_ts,
@ -158,7 +158,7 @@ class PushersTable(Table):
"id",
"user_name",
"kind",
"instance_handle",
"profile_tag",
"app_id",
"app_display_name",
"device_display_name",

View file

@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS rejections(
CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
instance_handle varchar(32) NOT NULL,
profile_tag varchar(32) NOT NULL,
kind varchar(8) NOT NULL,
app_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,

View file

@ -16,7 +16,7 @@
CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
instance_handle varchar(32) NOT NULL,
profile_tag varchar(32) NOT NULL,
kind varchar(8) NOT NULL,
app_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,