Merge branch 'develop' into pushers

This commit is contained in:
David Baker 2014-12-02 13:50:05 +00:00
commit 7642d95d5e
44 changed files with 6670 additions and 255 deletions

View file

@ -69,7 +69,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 7
SCHEMA_VERSION = 8
class _RollbackButIsFineException(Exception):
@ -95,7 +95,8 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
@log_function
def persist_event(self, event, backfilled=False, is_new_state=True):
def persist_event(self, event, backfilled=False, is_new_state=True,
current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
@ -111,6 +112,7 @@ class DataStore(RoomMemberStore, RoomStore,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
current_state=current_state,
)
except _RollbackButIsFineException:
pass
@ -139,7 +141,7 @@ class DataStore(RoomMemberStore, RoomStore,
@log_function
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
is_new_state=True):
is_new_state=True, current_state=None):
if event.type == RoomMemberEvent.TYPE:
self._store_room_member_txn(txn, event)
elif event.type == FeedbackEvent.TYPE:
@ -208,8 +210,27 @@ class DataStore(RoomMemberStore, RoomStore,
self._store_state_groups_txn(txn, event)
if current_state:
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
(event.room_id,)
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
is_state = hasattr(event, "state_key") and event.state_key is not None
if is_new_state and is_state:
if is_state:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@ -227,17 +248,18 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
if is_new_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for e_id, h in event.prev_state:
self._simple_insert_txn(
@ -314,7 +336,12 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
if not outlier:
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
def _store_redaction(self, txn, event):
txn.execute(

View file

@ -246,7 +246,10 @@ class SQLBaseStore(object):
raise StoreError(404, "No row found")
def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % {
sql = (
"SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
"ORDER BY rowid asc"
) % {
"retcol": retcol,
"table": table,
"where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
@ -299,7 +302,7 @@ class SQLBaseStore(object):
keyvalues : dict of column names and values to select the rows with
retcols : list of strings giving the names of the columns to return
"""
sql = "SELECT %s FROM %s WHERE %s" % (
sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
@ -334,7 +337,7 @@ class SQLBaseStore(object):
retcols=None, allow_none=False):
""" Combined SELECT then UPDATE."""
if retcols:
select_sql = "SELECT %s FROM %s WHERE %s" % (
select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k) for k in keyvalues)
@ -461,7 +464,7 @@ class SQLBaseStore(object):
def _get_events_txn(self, txn, event_ids):
# FIXME (erikj): This should be batched?
sql = "SELECT * FROM events WHERE event_id = ?"
sql = "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
event_rows = []
for e_id in event_ids:
@ -478,7 +481,9 @@ class SQLBaseStore(object):
def _parse_events_txn(self, txn, rows):
events = [self._parse_event_from_row(r) for r in rows]
select_event_sql = "SELECT * FROM events WHERE event_id = ?"
select_event_sql = (
"SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
)
for i, ev in enumerate(events):
signatures = self._get_event_signatures_txn(

View file

@ -0,0 +1,34 @@
/* Copyright 2014 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS event_signatures_2 (
event_id TEXT,
signature_name TEXT,
key_id TEXT,
signature BLOB,
CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
);
INSERT INTO event_signatures_2 (event_id, signature_name, key_id, signature)
SELECT event_id, signature_name, key_id, signature FROM event_signatures;
DROP TABLE event_signatures;
ALTER TABLE event_signatures_2 RENAME TO event_signatures;
CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
event_id
);
PRAGMA user_version = 8;

View file

@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS event_signatures (
signature_name TEXT,
key_id TEXT,
signature BLOB,
CONSTRAINT uniqueness UNIQUE (event_id, key_id)
CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
);
CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (

View file

@ -87,7 +87,7 @@ class StateStore(SQLBaseStore):
)
def _store_state_groups_txn(self, txn, event):
if not event.state_events:
if event.state_events is None:
return
state_group = event.state_group

View file

@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT *, (%(redacted)s) AS redacted FROM events "
"WHERE room_id = ? AND stream_ordering <= ? "
"WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
) % {
"redacted": del_sql,