Merge branch 'develop' into pushers

Conflicts:
	synapse/api/errors.py
	synapse/server.py
	synapse/storage/__init__.py
This commit is contained in:
David Baker 2014-12-18 15:15:22 +00:00
commit b56730bb6e
99 changed files with 4650 additions and 2176 deletions

View file

@ -15,12 +15,8 @@
from twisted.internet import defer
from synapse.api.events.room import (
RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent,
RoomRedactionEvent,
)
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from .directory import DirectoryStore
from .feedback import FeedbackStore
@ -34,11 +30,13 @@ from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
from .pusher import PusherStore
from .media_repository import MediaRepositoryStore
from .state import StateStore
from .signatures import SignatureStore
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
@ -63,7 +61,8 @@ SCHEMAS = [
"state",
"event_edges",
"event_signatures",
"pusher"
"pusher",
"media_repository",
]
@ -83,11 +82,13 @@ class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
EventFederationStore, PusherStore, ):
EventFederationStore,
MediaRepositoryStore,
PusherStore,
):
def __init__(self, hs):
super(DataStore, self).__init__(hs)
self.event_factory = hs.get_event_factory()
self.hs = hs
self.min_token_deferred = self._get_min_token()
@ -95,8 +96,8 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
@log_function
def persist_event(self, event, backfilled=False, is_new_state=True,
current_state=None):
def persist_event(self, event, context, backfilled=False,
is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
@ -109,6 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
"persist_event",
self._persist_event_txn,
event=event,
context=context,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
@ -119,50 +121,66 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
def get_event(self, event_id, allow_none=False):
events_dict = yield self._simple_select_one(
"events",
{"event_id": event_id},
[
"event_id",
"type",
"room_id",
"content",
"unrecognized_keys",
"depth",
],
allow_none=allow_none,
)
events = yield self._get_events([event_id])
if not events_dict:
defer.returnValue(None)
if not events:
if allow_none:
defer.returnValue(None)
else:
raise RuntimeError("Could not find event %s" % (event_id,))
event = yield self._parse_events([events_dict])
defer.returnValue(event[0])
defer.returnValue(events[0])
@log_function
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
is_new_state=True, current_state=None):
if event.type == RoomMemberEvent.TYPE:
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == FeedbackEvent.TYPE:
elif event.type == EventTypes.Feedback:
self._store_feedback_txn(txn, event)
elif event.type == RoomNameEvent.TYPE:
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == RoomTopicEvent.TYPE:
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == RoomRedactionEvent.TYPE:
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
outlier = False
if hasattr(event, "outlier"):
outlier = event.outlier
if hasattr(event.internal_metadata, "outlier"):
outlier = event.internal_metadata.outlier
event_dict = {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
self._simple_insert_txn(
txn,
table="event_json",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": metadata_json.decode("UTF-8"),
"json": encode_canonical_json(event_dict).decode("UTF-8"),
},
or_replace=True,
)
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
"content": json.dumps(event.content),
"content": json.dumps(event.get_dict()["content"]),
"processed": True,
"outlier": outlier,
"depth": event.depth,
@ -173,7 +191,7 @@ class DataStore(RoomMemberStore, RoomStore,
unrec = {
k: v
for k, v in event.get_full_dict().items()
for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
@ -208,7 +226,8 @@ class DataStore(RoomMemberStore, RoomStore,
room_id=event.room_id,
)
self._store_state_groups_txn(txn, event)
if not outlier:
self._store_state_groups_txn(txn, event, context)
if current_state:
txn.execute(
@ -302,16 +321,6 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, hash_alg, hash_bytes,
)
if hasattr(event, "signatures"):
logger.debug("sigs: %s", event.signatures)
for name, sigs in event.signatures.items():
for key_id, signature_base64 in sigs.items():
signature_bytes = decode_base64(signature_base64)
self._store_event_signature_txn(
txn, event.event_id, name, key_id,
signature_bytes,
)
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
@ -413,86 +422,6 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
def snapshot_room(self, event):
"""Snapshot the room for an update by a user
Args:
room_id (synapse.types.RoomId): The room to snapshot.
user_id (synapse.types.UserId): The user to snapshot the room for.
state_type (str): Optional state type to snapshot.
state_key (str): Optional state key to snapshot.
Returns:
synapse.storage.Snapshot: A snapshot of the state of the room.
"""
def _snapshot(txn):
prev_events = self._get_latest_events_in_room(
txn,
event.room_id
)
prev_state = None
state_key = None
if hasattr(event, "state_key"):
state_key = event.state_key
prev_state = self._get_latest_state_in_room(
txn,
event.room_id,
type=event.type,
state_key=state_key,
)
return Snapshot(
store=self,
room_id=event.room_id,
user_id=event.user_id,
prev_events=prev_events,
prev_state=prev_state,
state_type=event.type,
state_key=state_key,
)
return self.runInteraction("snapshot_room", _snapshot)
class Snapshot(object):
"""Snapshot of the state of a room
Args:
store (DataStore): The datastore.
room_id (RoomId): The room of the snapshot.
user_id (UserId): The user this snapshot is for.
prev_events (list): The list of event ids this snapshot is after.
membership_state (RoomMemberEvent): The current state of the user in
the room.
state_type (str, optional): State type captured by the snapshot
state_key (str, optional): State key captured by the snapshot
prev_state_pdu (PduEntry, optional): pdu id of
the previous value of the state type and key in the room.
"""
def __init__(self, store, room_id, user_id, prev_events,
prev_state, state_type=None, state_key=None):
self.store = store
self.room_id = room_id
self.user_id = user_id
self.prev_events = prev_events
self.prev_state = prev_state
self.state_type = state_type
self.state_key = state_key
def fill_out_prev_events(self, event):
if not hasattr(event, "prev_events"):
event.prev_events = [
(event_id, hashes)
for event_id, hashes, _ in self.prev_events
]
if self.prev_events:
event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
else:
event.depth = 0
if not hasattr(event, "prev_state") and self.prev_state is not None:
event.prev_state = self.prev_state
def schema_path(schema):
""" Get a filesystem path for the named database schema
@ -520,6 +449,14 @@ def read_schema(schema):
return schema_file.read()
class PrepareDatabaseException(Exception):
pass
class UpgradeDatabaseException(PrepareDatabaseException):
pass
def prepare_database(db_conn):
""" Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
don't have to worry about overwriting existing content.
@ -544,6 +481,10 @@ def prepare_database(db_conn):
# Run every version since after the current version.
for v in range(user_version + 1, SCHEMA_VERSION + 1):
if v == 10:
raise UpgradeDatabaseException(
"No delta for version 10"
)
sql_script = read_schema("delta/v%d" % (v))
c.executescript(sql_script)