Merge branch 'develop' into daniel/3pidinvites

This commit is contained in:
Daniel Wagner-Hall 2015-10-15 11:51:55 +01:00
commit f38df51e8d
17 changed files with 759 additions and 589 deletions

View File

@ -38,6 +38,9 @@ for port in 8080 8081 8082; do
perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config
echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config
echo "report_stats: false" >> $DIR/etc/$port.config
python -m synapse.app.homeserver \ python -m synapse.app.homeserver \
--config-path "$DIR/etc/$port.config" \ --config-path "$DIR/etc/$port.config" \
-D \ -D \

View File

@ -54,7 +54,7 @@ class Filtering(object):
] ]
room_level_definitions = [ room_level_definitions = [
"state", "events", "ephemeral" "state", "timeline", "ephemeral"
] ]
for key in top_level_definitions: for key in top_level_definitions:
@ -135,17 +135,23 @@ class Filter(object):
def __init__(self, filter_json): def __init__(self, filter_json):
self.filter_json = filter_json self.filter_json = filter_json
def filter_public_user_data(self, events): def timeline_limit(self):
return self._filter_on_key(events, ["public_user_data"]) return self.filter_json.get("room", {}).get("timeline", {}).get("limit", 10)
def filter_private_user_data(self, events): def presence_limit(self):
return self._filter_on_key(events, ["private_user_data"]) return self.filter_json.get("presence", {}).get("limit", 10)
def ephemeral_limit(self):
return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10)
def filter_presence(self, events):
return self._filter_on_key(events, ["presence"])
def filter_room_state(self, events): def filter_room_state(self, events):
return self._filter_on_key(events, ["room", "state"]) return self._filter_on_key(events, ["room", "state"])
def filter_room_events(self, events): def filter_room_timeline(self, events):
return self._filter_on_key(events, ["room", "events"]) return self._filter_on_key(events, ["room", "timeline"])
def filter_room_ephemeral(self, events): def filter_room_ephemeral(self, events):
return self._filter_on_key(events, ["room", "ephemeral"]) return self._filter_on_key(events, ["room", "ephemeral"])
@ -169,11 +175,34 @@ class Filter(object):
return [e for e in events if self._passes_definition(definition, e)] return [e for e in events if self._passes_definition(definition, e)]
def _passes_definition(self, definition, event): def _passes_definition(self, definition, event):
"""Check if the event passes the filter definition
Args:
definition(dict): The filter definition to check against
event(dict or Event): The event to check
Returns:
True if the event passes the filter in the definition
"""
if type(event) is dict:
room_id = event.get("room_id")
sender = event.get("sender")
event_type = event["type"]
else:
room_id = getattr(event, "room_id", None)
sender = getattr(event, "sender", None)
event_type = event.type
return self._event_passes_definition(
definition, room_id, sender, event_type
)
def _event_passes_definition(self, definition, room_id, sender,
event_type):
"""Check if the event passes through the given definition. """Check if the event passes through the given definition.
Args: Args:
definition(dict): The definition to check against. definition(dict): The definition to check against.
event(Event): The event to check. room_id(str): The id of the room this event is in or None.
sender(str): The sender of the event
event_type(str): The type of the event.
Returns: Returns:
True if the event passes through the filter. True if the event passes through the filter.
""" """
@ -185,8 +214,7 @@ class Filter(object):
# and 'not_types' then it is treated as only being in 'not_types') # and 'not_types' then it is treated as only being in 'not_types')
# room checks # room checks
if hasattr(event, "room_id"): if room_id is not None:
room_id = event.room_id
allow_rooms = definition.get("rooms", None) allow_rooms = definition.get("rooms", None)
reject_rooms = definition.get("not_rooms", None) reject_rooms = definition.get("not_rooms", None)
if reject_rooms and room_id in reject_rooms: if reject_rooms and room_id in reject_rooms:
@ -195,9 +223,7 @@ class Filter(object):
return False return False
# sender checks # sender checks
if hasattr(event, "sender"): if sender is not None:
# Should we be including event.state_key for some event types?
sender = event.sender
allow_senders = definition.get("senders", None) allow_senders = definition.get("senders", None)
reject_senders = definition.get("not_senders", None) reject_senders = definition.get("not_senders", None)
if reject_senders and sender in reject_senders: if reject_senders and sender in reject_senders:
@ -208,12 +234,12 @@ class Filter(object):
# type checks # type checks
if "not_types" in definition: if "not_types" in definition:
for def_type in definition["not_types"]: for def_type in definition["not_types"]:
if self._event_matches_type(event, def_type): if self._event_matches_type(event_type, def_type):
return False return False
if "types" in definition: if "types" in definition:
included = False included = False
for def_type in definition["types"]: for def_type in definition["types"]:
if self._event_matches_type(event, def_type): if self._event_matches_type(event_type, def_type):
included = True included = True
break break
if not included: if not included:
@ -221,9 +247,9 @@ class Filter(object):
return True return True
def _event_matches_type(self, event, def_type): def _event_matches_type(self, event_type, def_type):
if def_type.endswith("*"): if def_type.endswith("*"):
type_prefix = def_type[:-1] type_prefix = def_type[:-1]
return event.type.startswith(type_prefix) return event_type.startswith(type_prefix)
else: else:
return event.type == def_type return event_type == def_type

View File

@ -33,11 +33,9 @@ if __name__ == '__main__':
sys.stderr.writelines(message) sys.stderr.writelines(message)
sys.exit(1) sys.exit(1)
from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
from synapse.storage import ( from synapse.storage import are_all_users_on_domain
are_all_users_on_domain, UpgradeDatabaseException, from synapse.storage.prepare_database import UpgradeDatabaseException
)
from synapse.server import HomeServer from synapse.server import HomeServer

View File

@ -22,6 +22,7 @@ import yaml
from string import Template from string import Template
import os import os
import signal import signal
from synapse.util.debug import debug_deferreds
DEFAULT_LOG_CONFIG = Template(""" DEFAULT_LOG_CONFIG = Template("""
@ -69,6 +70,8 @@ class LoggingConfig(Config):
self.verbosity = config.get("verbose", 0) self.verbosity = config.get("verbose", 0)
self.log_config = self.abspath(config.get("log_config")) self.log_config = self.abspath(config.get("log_config"))
self.log_file = self.abspath(config.get("log_file")) self.log_file = self.abspath(config.get("log_file"))
if config.get("full_twisted_stacktraces"):
debug_deferreds()
def default_config(self, config_dir_path, server_name, **kwargs): def default_config(self, config_dir_path, server_name, **kwargs):
log_file = self.abspath("homeserver.log") log_file = self.abspath("homeserver.log")
@ -84,6 +87,11 @@ class LoggingConfig(Config):
# A yaml python logging config file # A yaml python logging config file
log_config: "%(log_config)s" log_config: "%(log_config)s"
# Stop twisted from discarding the stack traces of exceptions in
# deferreds by waiting a reactor tick before running a deferred's
# callbacks.
# full_twisted_stacktraces: true
""" % locals() """ % locals()
def read_arguments(self, args): def read_arguments(self, args):

View File

@ -242,7 +242,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events): def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events( event_to_state = yield self.store.get_state_for_events(
room_id, frozenset(e.event_id for e in events), frozenset(e.event_id for e in events),
types=( types=(
(EventTypes.RoomHistoryVisibility, ""), (EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None), (EventTypes.Member, None),

View File

@ -164,7 +164,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events): def _filter_events_for_client(self, user_id, room_id, events):
event_id_to_state = yield self.store.get_state_for_events( event_id_to_state = yield self.store.get_state_for_events(
room_id, frozenset(e.event_id for e in events), frozenset(e.event_id for e in events),
types=( types=(
(EventTypes.RoomHistoryVisibility, ""), (EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id), (EventTypes.Member, user_id),
@ -290,7 +290,7 @@ class MessageHandler(BaseHandler):
elif member_event.membership == Membership.LEAVE: elif member_event.membership == Membership.LEAVE:
key = (event_type, state_key) key = (event_type, state_key)
room_state = yield self.store.get_state_for_events( room_state = yield self.store.get_state_for_events(
room_id, [member_event.event_id], [key] [member_event.event_id], [key]
) )
data = room_state[member_event.event_id].get(key) data = room_state[member_event.event_id].get(key)
@ -314,7 +314,7 @@ class MessageHandler(BaseHandler):
room_state = yield self.state_handler.get_current_state(room_id) room_state = yield self.state_handler.get_current_state(room_id)
elif member_event.membership == Membership.LEAVE: elif member_event.membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events( room_state = yield self.store.get_state_for_events(
room_id, [member_event.event_id], None [member_event.event_id], None
) )
room_state = room_state[member_event.event_id] room_state = room_state[member_event.event_id]
@ -406,7 +406,7 @@ class MessageHandler(BaseHandler):
elif event.membership == Membership.LEAVE: elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,) room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = self.store.get_state_for_events( deferred_room_state = self.store.get_state_for_events(
event.room_id, [event.event_id], None [event.event_id], None
) )
deferred_room_state.addCallback( deferred_room_state.addCallback(
lambda states: states[event.event_id] lambda states: states[event.event_id]
@ -499,7 +499,7 @@ class MessageHandler(BaseHandler):
def _room_initial_sync_parted(self, user_id, room_id, pagin_config, def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
member_event): member_event):
room_state = yield self.store.get_state_for_events( room_state = yield self.store.get_state_for_events(
member_event.room_id, [member_event.event_id], None [member_event.event_id], None
) )
room_state = room_state[member_event.event_id] room_state = room_state[member_event.event_id]

View File

@ -28,21 +28,28 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [ SyncConfig = collections.namedtuple("SyncConfig", [
"user", "user",
"limit",
"gap",
"sort",
"backfill",
"filter", "filter",
]) ])
class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ class TimelineBatch(collections.namedtuple("TimelineBatch", [
"room_id",
"limited",
"published",
"events",
"state",
"prev_batch", "prev_batch",
"events",
"limited",
])):
__slots__ = []
def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id",
"timeline",
"state",
"ephemeral", "ephemeral",
])): ])):
__slots__ = [] __slots__ = []
@ -51,14 +58,21 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"""Make the result appear empty if there are no updates. This is used """Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result. to tell if room needs to be part of the sync result.
""" """
return bool(self.events or self.state or self.ephemeral) return bool(self.timeline or self.state or self.ephemeral)
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
"room_id",
"invite",
])):
__slots__ = []
class SyncResult(collections.namedtuple("SyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync "next_batch", # Token for the next sync
"private_user_data", # List of private events for the user. "presence", # List of presence events for the user.
"public_user_data", # List of public events for all users. "joined", # JoinedSyncResult for each joined room.
"rooms", # RoomSyncResult for each room. "invited", # InvitedSyncResult for each invited room.
])): ])):
__slots__ = [] __slots__ = []
@ -68,7 +82,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
events. events.
""" """
return bool( return bool(
self.private_user_data or self.public_user_data or self.rooms self.presence or self.joined or self.invited
) )
@ -108,8 +122,8 @@ class SyncHandler(BaseHandler):
) )
result = yield self.notifier.wait_for_events( result = yield self.notifier.wait_for_events(
sync_config.user, room_ids, sync_config.user, room_ids, timeout, current_sync_callback,
sync_config.filter, timeout, current_sync_callback from_token=since_token
) )
defer.returnValue(result) defer.returnValue(result)
@ -121,11 +135,7 @@ class SyncHandler(BaseHandler):
if since_token is None: if since_token is None:
return self.initial_sync(sync_config) return self.initial_sync(sync_config)
else: else:
if sync_config.gap: return self.incremental_sync_with_gap(sync_config, since_token)
return self.incremental_sync_with_gap(sync_config, since_token)
else:
# TODO(mjark): Handle gapless sync
raise NotImplementedError()
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync(self, sync_config): def initial_sync(self, sync_config):
@ -133,12 +143,6 @@ class SyncHandler(BaseHandler):
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
presence_stream = self.event_sources.sources["presence"] presence_stream = self.event_sources.sources["presence"]
@ -155,33 +159,36 @@ class SyncHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN] membership_list=[Membership.INVITE, Membership.JOIN]
) )
# TODO (mjark): Does public mean "published"? joined = []
published_rooms = yield self.store.get_rooms(is_public=True) invited = []
published_room_ids = set(r["room_id"] for r in published_rooms)
rooms = []
for event in room_list: for event in room_list:
room_sync = yield self.initial_sync_for_room( if event.membership == Membership.JOIN:
event.room_id, sync_config, now_token, published_room_ids room_sync = yield self.initial_sync_for_joined_room(
) event.room_id, sync_config, now_token,
rooms.append(room_sync) )
joined.append(room_sync)
elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(
room_id=event.room_id,
invite=invite,
))
defer.returnValue(SyncResult( defer.returnValue(SyncResult(
public_user_data=presence, presence=presence,
private_user_data=[], joined=joined,
rooms=rooms, invited=invited,
next_batch=now_token, next_batch=now_token,
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync_for_room(self, room_id, sync_config, now_token, def initial_sync_for_joined_room(self, room_id, sync_config, now_token):
published_room_ids):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
A Deferred RoomSyncResult. A Deferred JoinedSyncResult.
""" """
recents, prev_batch_token, limited = yield self.load_filtered_recents( batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, room_id, sync_config, now_token,
) )
@ -190,13 +197,10 @@ class SyncHandler(BaseHandler):
) )
current_state_events = current_state.values() current_state_events = current_state.values()
defer.returnValue(RoomSyncResult( defer.returnValue(JoinedSyncResult(
room_id=room_id, room_id=room_id,
published=room_id in published_room_ids, timeline=batch,
events=recents,
prev_batch=prev_batch_token,
state=current_state_events, state=current_state_events,
limited=limited,
ephemeral=[], ephemeral=[],
)) ))
@ -207,19 +211,13 @@ class SyncHandler(BaseHandler):
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
presence_source = self.event_sources.sources["presence"] presence_source = self.event_sources.sources["presence"]
presence, presence_key = yield presence_source.get_new_events_for_user( presence, presence_key = yield presence_source.get_new_events_for_user(
user=sync_config.user, user=sync_config.user,
from_key=since_token.presence_key, from_key=since_token.presence_key,
limit=sync_config.limit, limit=sync_config.filter.presence_limit(),
) )
now_token = now_token.copy_and_replace("presence_key", presence_key) now_token = now_token.copy_and_replace("presence_key", presence_key)
@ -227,7 +225,7 @@ class SyncHandler(BaseHandler):
typing, typing_key = yield typing_source.get_new_events_for_user( typing, typing_key = yield typing_source.get_new_events_for_user(
user=sync_config.user, user=sync_config.user,
from_key=since_token.typing_key, from_key=since_token.typing_key,
limit=sync_config.limit, limit=sync_config.filter.ephemeral_limit(),
) )
now_token = now_token.copy_and_replace("typing_key", typing_key) now_token = now_token.copy_and_replace("typing_key", typing_key)
@ -242,33 +240,37 @@ class SyncHandler(BaseHandler):
) )
if app_service: if app_service:
rooms = yield self.store.get_app_service_rooms(app_service) rooms = yield self.store.get_app_service_rooms(app_service)
room_ids = set(r.room_id for r in rooms) joined_room_ids = set(r.room_id for r in rooms)
else: else:
room_ids = yield rm_handler.get_joined_rooms_for_user( joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
sync_config.user sync_config.user
) )
# TODO (mjark): Does public mean "published"? timeline_limit = sync_config.filter.timeline_limit()
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in published_rooms)
room_events, _ = yield self.store.get_room_events_stream( room_events, _ = yield self.store.get_room_events_stream(
sync_config.user.to_string(), sync_config.user.to_string(),
from_key=since_token.room_key, from_key=since_token.room_key,
to_key=now_token.room_key, to_key=now_token.room_key,
room_id=None, room_id=None,
limit=sync_config.limit + 1, limit=timeline_limit + 1,
) )
rooms = [] joined = []
if len(room_events) <= sync_config.limit: if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just # There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them. # partition the new events by room and return them.
invite_events = []
events_by_room_id = {} events_by_room_id = {}
for event in room_events: for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event) events_by_room_id.setdefault(event.room_id, []).append(event)
if event.room_id not in joined_room_ids:
if (event.type == EventTypes.Member
and event.membership == Membership.INVITE
and event.state_key == sync_config.user.to_string()):
invite_events.append(event)
for room_id in room_ids: for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, []) recents = events_by_room_id.get(room_id, [])
state = [event for event in recents if event.is_state()] state = [event for event in recents if event.is_state()]
if recents: if recents:
@ -282,37 +284,47 @@ class SyncHandler(BaseHandler):
sync_config, room_id, state sync_config, room_id, state
) )
room_sync = RoomSyncResult( room_sync = JoinedSyncResult(
room_id=room_id, room_id=room_id,
published=room_id in published_room_ids, timeline=TimelineBatch(
events=recents, events=recents,
prev_batch=prev_batch, prev_batch=prev_batch,
limited=False,
),
state=state, state=state,
limited=False,
ephemeral=typing_by_room.get(room_id, []) ephemeral=typing_by_room.get(room_id, [])
) )
if room_sync: if room_sync:
rooms.append(room_sync) joined.append(room_sync)
else: else:
for room_id in room_ids: invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string()
)
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room( room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token, room_id, sync_config, since_token, now_token,
published_room_ids, typing_by_room typing_by_room
) )
if room_sync: if room_sync:
rooms.append(room_sync) joined.append(room_sync)
invited = [
InvitedSyncResult(room_id=event.room_id, invite=event)
for event in invite_events
]
defer.returnValue(SyncResult( defer.returnValue(SyncResult(
public_user_data=presence, presence=presence,
private_user_data=[], joined=joined,
rooms=rooms, invited=invited,
next_batch=now_token, next_batch=now_token,
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events): def _filter_events_for_client(self, user_id, room_id, events):
event_id_to_state = yield self.store.get_state_for_events( event_id_to_state = yield self.store.get_state_for_events(
room_id, frozenset(e.event_id for e in events), frozenset(e.event_id for e in events),
types=( types=(
(EventTypes.RoomHistoryVisibility, ""), (EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id), (EventTypes.Member, user_id),
@ -361,12 +373,13 @@ class SyncHandler(BaseHandler):
limited = True limited = True
recents = [] recents = []
filtering_factor = 2 filtering_factor = 2
load_limit = max(sync_config.limit * filtering_factor, 100) timeline_limit = sync_config.filter.timeline_limit()
load_limit = max(timeline_limit * filtering_factor, 100)
max_repeat = 3 # Only try a few times per room, otherwise max_repeat = 3 # Only try a few times per room, otherwise
room_key = now_token.room_key room_key = now_token.room_key
end_key = room_key end_key = room_key
while limited and len(recents) < sync_config.limit and max_repeat: while limited and len(recents) < timeline_limit and max_repeat:
events, keys = yield self.store.get_recent_events_for_room( events, keys = yield self.store.get_recent_events_for_room(
room_id, room_id,
limit=load_limit + 1, limit=load_limit + 1,
@ -375,7 +388,7 @@ class SyncHandler(BaseHandler):
) )
(room_key, _) = keys (room_key, _) = keys
end_key = "s" + room_key.split('-')[-1] end_key = "s" + room_key.split('-')[-1]
loaded_recents = sync_config.filter.filter_room_events(events) loaded_recents = sync_config.filter.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client( loaded_recents = yield self._filter_events_for_client(
sync_config.user.to_string(), room_id, loaded_recents, sync_config.user.to_string(), room_id, loaded_recents,
) )
@ -385,34 +398,37 @@ class SyncHandler(BaseHandler):
limited = False limited = False
max_repeat -= 1 max_repeat -= 1
if len(recents) > sync_config.limit: if len(recents) > timeline_limit:
recents = recents[-sync_config.limit:] limited = True
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace( prev_batch_token = now_token.copy_and_replace(
"room_key", room_key "room_key", room_key
) )
defer.returnValue((recents, prev_batch_token, limited)) defer.returnValue(TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=limited
))
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config, def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token, since_token, now_token,
published_room_ids, typing_by_room): typing_by_room):
""" Get the incremental delta needed to bring the client up to date for """ Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to the room. Gives the client the most recent events and the changes to
state. state.
Returns: Returns:
A Deferred RoomSyncResult A Deferred JoinedSyncResult
""" """
# TODO(mjark): Check for redactions we might have missed. # TODO(mjark): Check for redactions we might have missed.
recents, prev_batch_token, limited = yield self.load_filtered_recents( batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, since_token, room_id, sync_config, now_token, since_token,
) )
logging.debug("Recents %r", recents) logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a # TODO(mjark): This seems racy since this isn't being passed a
# token to indicate what point in the stream this is # token to indicate what point in the stream this is
@ -435,13 +451,10 @@ class SyncHandler(BaseHandler):
sync_config, room_id, state_events_delta sync_config, room_id, state_events_delta
) )
room_sync = RoomSyncResult( room_sync = JoinedSyncResult(
room_id=room_id, room_id=room_id,
published=room_id in published_room_ids, timeline=batch,
events=recents,
prev_batch=prev_batch_token,
state=state_events_delta, state=state_events_delta,
limited=limited,
ephemeral=typing_by_room.get(room_id, []) ephemeral=typing_by_room.get(room_id, [])
) )

View File

@ -16,7 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.http.servlet import ( from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean RestServlet, parse_string, parse_integer
) )
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken from synapse.types import StreamToken
@ -26,6 +26,7 @@ from synapse.events.utils import (
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from ._base import client_v2_pattern from ._base import client_v2_pattern
import copy
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -36,51 +37,44 @@ class SyncRestServlet(RestServlet):
GET parameters:: GET parameters::
timeout(int): How long to wait for new events in milliseconds. timeout(int): How long to wait for new events in milliseconds.
limit(int): Maxiumum number of events per room to return.
gap(bool): Create gaps the message history if limit is exceeded to
ensure that the client has the most recent messages. Defaults to
"true".
sort(str,str): tuple of sort key (e.g. "timeline") and direction
(e.g. "asc", "desc"). Defaults to "timeline,asc".
since(batch_token): Batch token when asking for incremental deltas. since(batch_token): Batch token when asking for incremental deltas.
set_presence(str): What state the device presence should be set to. set_presence(str): What state the device presence should be set to.
default is "online". default is "online".
backfill(bool): Should the HS request message history from other
servers. This may take a long time making it unsuitable for clients
expecting a prompt response. Defaults to "true".
filter(filter_id): A filter to apply to the events returned. filter(filter_id): A filter to apply to the events returned.
filter_*: Filter override parameters.
Response JSON:: Response JSON::
{ {
"next_batch": // batch token for the next /sync "next_batch": // batch token for the next /sync
"private_user_data": // private events for this user. "presence": // presence data for the user.
"public_user_data": // public events for all users including the "rooms": {
// public events for this user. "joined": { // Joined rooms being updated.
"rooms": [{ // List of rooms with updates. "${room_id}": { // Id of the room being updated
"room_id": // Id of the room being updated
"limited": // Was the per-room event limit exceeded?
"published": // Is the room published by our HS?
"event_map": // Map of EventID -> event JSON. "event_map": // Map of EventID -> event JSON.
"events": { // The recent events in the room if gap is "true" "timeline": { // The recent events in the room if gap is "true"
// otherwise the next events in the room. "limited": // Was the per-room event limit exceeded?
"batch": [] // list of EventIDs in the "event_map". // otherwise the next events in the room.
"prev_batch": // back token for getting previous events. "events": [] // list of EventIDs in the "event_map".
"prev_batch": // back token for getting previous events.
} }
"state": [] // list of EventIDs updating the current state to "state": {"events": []} // list of EventIDs updating the
// be what it should be at the end of the batch. // current state to be what it should
"ephemeral": [] // be at the end of the batch.
}] "ephemeral": {"events": []} // list of event objects
}
},
"invited": {}, // Invited rooms being updated.
"archived": {} // Archived rooms being updated.
}
} }
""" """
PATTERN = client_v2_pattern("/sync$") PATTERN = client_v2_pattern("/sync$")
ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) ALLOWED_PRESENCE = set(["online", "offline"])
ALLOWED_PRESENCE = set(["online", "offline", "idle"])
def __init__(self, hs): def __init__(self, hs):
super(SyncRestServlet, self).__init__() super(SyncRestServlet, self).__init__()
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.event_stream_handler = hs.get_handlers().event_stream_handler
self.sync_handler = hs.get_handlers().sync_handler self.sync_handler = hs.get_handlers().sync_handler
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.filtering = hs.get_filtering() self.filtering = hs.get_filtering()
@ -90,45 +84,29 @@ class SyncRestServlet(RestServlet):
user, token_id = yield self.auth.get_user_by_req(request) user, token_id = yield self.auth.get_user_by_req(request)
timeout = parse_integer(request, "timeout", default=0) timeout = parse_integer(request, "timeout", default=0)
limit = parse_integer(request, "limit", required=True)
gap = parse_boolean(request, "gap", default=True)
sort = parse_string(
request, "sort", default="timeline,asc",
allowed_values=self.ALLOWED_SORT
)
since = parse_string(request, "since") since = parse_string(request, "since")
set_presence = parse_string( set_presence = parse_string(
request, "set_presence", default="online", request, "set_presence", default="online",
allowed_values=self.ALLOWED_PRESENCE allowed_values=self.ALLOWED_PRESENCE
) )
backfill = parse_boolean(request, "backfill", default=False)
filter_id = parse_string(request, "filter", default=None) filter_id = parse_string(request, "filter", default=None)
logger.info( logger.info(
"/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r," "/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, backfill=%r, filter_id=%r" % ( " set_presence=%r, filter_id=%r" % (
user, timeout, limit, gap, sort, since, set_presence, user, timeout, since, set_presence, filter_id
backfill, filter_id
) )
) )
# TODO(mjark): Load filter and apply overrides.
try: try:
filter = yield self.filtering.get_user_filter( filter = yield self.filtering.get_user_filter(
user.localpart, filter_id user.localpart, filter_id
) )
except: except:
filter = Filter({}) filter = Filter({})
# filter = filter.apply_overrides(http_request)
# if filter.matches(event):
# # stuff
sync_config = SyncConfig( sync_config = SyncConfig(
user=user, user=user,
gap=gap,
limit=limit,
sort=sort,
backfill=backfill,
filter=filter, filter=filter,
) )
@ -137,43 +115,81 @@ class SyncRestServlet(RestServlet):
else: else:
since_token = None since_token = None
sync_result = yield self.sync_handler.wait_for_sync_for_user( if set_presence == "online":
sync_config, since_token=since_token, timeout=timeout yield self.event_stream_handler.started_stream(user)
)
try:
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout
)
finally:
if set_presence == "online":
self.event_stream_handler.stopped_stream(user)
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
joined = self.encode_joined(
sync_result.joined, filter, time_now, token_id
)
invited = self.encode_invited(
sync_result.invited, filter, time_now, token_id
)
response_content = { response_content = {
"public_user_data": self.encode_user_data( "presence": self.encode_presence(
sync_result.public_user_data, filter, time_now sync_result.presence, filter, time_now
),
"private_user_data": self.encode_user_data(
sync_result.private_user_data, filter, time_now
),
"rooms": self.encode_rooms(
sync_result.rooms, filter, time_now, token_id
), ),
"rooms": {
"joined": joined,
"invited": invited,
"archived": {},
},
"next_batch": sync_result.next_batch.to_string(), "next_batch": sync_result.next_batch.to_string(),
} }
defer.returnValue((200, response_content)) defer.returnValue((200, response_content))
def encode_user_data(self, events, filter, time_now): def encode_presence(self, events, filter, time_now):
return events formatted = []
for event in events:
event = copy.deepcopy(event)
event['sender'] = event['content'].pop('user_id')
formatted.append(event)
return {"events": filter.filter_presence(formatted)}
def encode_rooms(self, rooms, filter, time_now, token_id): def encode_joined(self, rooms, filter, time_now, token_id):
return [ joined = {}
self.encode_room(room, filter, time_now, token_id) for room in rooms:
for room in rooms joined[room.room_id] = self.encode_room(
] room, filter, time_now, token_id
)
return joined
def encode_invited(self, rooms, filter, time_now, token_id):
invited = {}
for room in rooms:
invite = serialize_event(
room.invite, time_now, token_id=token_id,
event_format=format_event_for_client_v2_without_event_id,
)
invited_state = invite.get("unsigned", {}).pop("invite_room_state", [])
invited_state.append(invite)
invited[room.room_id] = {
"invite_state": {"events": invited_state}
}
return invited
@staticmethod @staticmethod
def encode_room(room, filter, time_now, token_id): def encode_room(room, filter, time_now, token_id):
event_map = {} event_map = {}
state_events = filter.filter_room_state(room.state) state_events = filter.filter_room_state(room.state)
recent_events = filter.filter_room_events(room.events) timeline_events = filter.filter_room_timeline(room.timeline.events)
ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
state_event_ids = [] state_event_ids = []
recent_event_ids = [] timeline_event_ids = []
for event in state_events: for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter. # TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event( event_map[event.event_id] = serialize_event(
@ -182,24 +198,22 @@ class SyncRestServlet(RestServlet):
) )
state_event_ids.append(event.event_id) state_event_ids.append(event.event_id)
for event in recent_events: for event in timeline_events:
# TODO(mjark): Respect formatting requirements in the filter. # TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event( event_map[event.event_id] = serialize_event(
event, time_now, token_id=token_id, event, time_now, token_id=token_id,
event_format=format_event_for_client_v2_without_event_id, event_format=format_event_for_client_v2_without_event_id,
) )
recent_event_ids.append(event.event_id) timeline_event_ids.append(event.event_id)
result = { result = {
"room_id": room.room_id,
"event_map": event_map, "event_map": event_map,
"events": { "timeline": {
"batch": recent_event_ids, "events": timeline_event_ids,
"prev_batch": room.prev_batch.to_string(), "prev_batch": room.timeline.prev_batch.to_string(),
"limited": room.timeline.limited,
}, },
"state": state_event_ids, "state": {"events": state_event_ids},
"limited": room.limited, "ephemeral": {"events": ephemeral_events},
"published": room.published,
"ephemeral": room.ephemeral,
} }
return result return result

View File

@ -42,22 +42,12 @@ from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore from .receipts import ReceiptsStore
import fnmatch
import imp
import logging import logging
import os
import re
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 24
dir_path = os.path.abspath(os.path.dirname(__file__))
# Number of msec of granularity to store the user IP 'last seen' time. Smaller # Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits # times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes # 120 seconds == 2 minutes
@ -158,371 +148,6 @@ class DataStore(RoomMemberStore, RoomStore,
) )
def read_schema(path):
""" Read the named database schema.
Args:
path: Path of the database schema.
Returns:
A string containing the database schema.
"""
with open(path) as schema_file:
return schema_file.read()
class PrepareDatabaseException(Exception):
pass
class UpgradeDatabaseException(PrepareDatabaseException):
pass
def prepare_database(db_conn, database_engine):
"""Prepares a database for usage. Will either create all necessary tables
or upgrade from an older schema version.
"""
try:
cur = db_conn.cursor()
version_info = _get_or_create_schema_state(cur, database_engine)
if version_info:
user_version, delta_files, upgraded = version_info
_upgrade_existing_database(
cur, user_version, delta_files, upgraded, database_engine
)
else:
_setup_new_database(cur, database_engine)
# cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
cur.close()
db_conn.commit()
except:
db_conn.rollback()
raise
def _setup_new_database(cur, database_engine):
"""Sets up the database by finding a base set of "full schemas" and then
applying any necessary deltas.
The "full_schemas" directory has subdirectories named after versions. This
function searches for the highest version less than or equal to
`SCHEMA_VERSION` and executes all .sql files in that directory.
The function will then apply all deltas for all versions after the base
version.
Example directory structure:
schema/
delta/
...
full_schemas/
3/
test.sql
...
11/
foo.sql
bar.sql
...
In the example foo.sql and bar.sql would be run, and then any delta files
for versions strictly greater than 11.
"""
current_dir = os.path.join(dir_path, "schema", "full_schemas")
directory_entries = os.listdir(current_dir)
valid_dirs = []
pattern = re.compile(r"^\d+(\.sql)?$")
for filename in directory_entries:
match = pattern.match(filename)
abs_path = os.path.join(current_dir, filename)
if match and os.path.isdir(abs_path):
ver = int(match.group(0))
if ver <= SCHEMA_VERSION:
valid_dirs.append((ver, abs_path))
else:
logger.warn("Unexpected entry in 'full_schemas': %s", filename)
if not valid_dirs:
raise PrepareDatabaseException(
"Could not find a suitable base set of full schemas"
)
max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
logger.debug("Initialising schema v%d", max_current_ver)
directory_entries = os.listdir(sql_dir)
for filename in fnmatch.filter(directory_entries, "*.sql"):
sql_loc = os.path.join(sql_dir, filename)
logger.debug("Applying schema %s", sql_loc)
executescript(cur, sql_loc)
cur.execute(
database_engine.convert_param_style(
"INSERT INTO schema_version (version, upgraded)"
" VALUES (?,?)"
),
(max_current_ver, False,)
)
_upgrade_existing_database(
cur,
current_version=max_current_ver,
applied_delta_files=[],
upgraded=False,
database_engine=database_engine,
)
def _upgrade_existing_database(cur, current_version, applied_delta_files,
upgraded, database_engine):
"""Upgrades an existing database.
Delta files can either be SQL stored in *.sql files, or python modules
in *.py.
There can be multiple delta files per version. Synapse will keep track of
which delta files have been applied, and will apply any that haven't been
even if there has been no version bump. This is useful for development
where orthogonal schema changes may happen on separate branches.
Different delta files for the same version *must* be orthogonal and give
the same result when applied in any order. No guarantees are made on the
order of execution of these scripts.
This is a no-op of current_version == SCHEMA_VERSION.
Example directory structure:
schema/
delta/
11/
foo.sql
...
12/
foo.sql
bar.py
...
full_schemas/
...
In the example, if current_version is 11, then foo.sql will be run if and
only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
some arbitrary order.
Args:
cur (Cursor)
current_version (int): The current version of the schema.
applied_delta_files (list): A list of deltas that have already been
applied.
upgraded (bool): Whether the current version was generated by having
applied deltas or from full schema file. If `True` the function
will never apply delta files for the given `current_version`, since
the current_version wasn't generated by applying those delta files.
"""
if current_version > SCHEMA_VERSION:
raise ValueError(
"Cannot use this database as it is too " +
"new for the server to understand"
)
start_ver = current_version
if not upgraded:
start_ver += 1
logger.debug("applied_delta_files: %s", applied_delta_files)
for v in range(start_ver, SCHEMA_VERSION + 1):
logger.debug("Upgrading schema to v%d", v)
delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
try:
directory_entries = os.listdir(delta_dir)
except OSError:
logger.exception("Could not open delta dir for version %d", v)
raise UpgradeDatabaseException(
"Could not open delta dir for version %d" % (v,)
)
directory_entries.sort()
for file_name in directory_entries:
relative_path = os.path.join(str(v), file_name)
logger.debug("Found file: %s", relative_path)
if relative_path in applied_delta_files:
continue
absolute_path = os.path.join(
dir_path, "schema", "delta", relative_path,
)
root_name, ext = os.path.splitext(file_name)
if ext == ".py":
# This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function.
module_name = "synapse.storage.v%d_%s" % (
v, root_name
)
with open(absolute_path) as python_file:
module = imp.load_source(
module_name, absolute_path, python_file
)
logger.debug("Running script %s", relative_path)
module.run_upgrade(cur, database_engine)
elif ext == ".pyc":
# Sometimes .pyc files turn up anyway even though we've
# disabled their generation; e.g. from distribution package
# installers. Silently skip it
pass
elif ext == ".sql":
# A plain old .sql file, just read and execute it
logger.debug("Applying schema %s", relative_path)
executescript(cur, absolute_path)
else:
# Not a valid delta file.
logger.warn(
"Found directory entry that did not end in .py or"
" .sql: %s",
relative_path,
)
continue
# Mark as done.
cur.execute(
database_engine.convert_param_style(
"INSERT INTO applied_schema_deltas (version, file)"
" VALUES (?,?)",
),
(v, relative_path)
)
cur.execute("DELETE FROM schema_version")
cur.execute(
database_engine.convert_param_style(
"INSERT INTO schema_version (version, upgraded)"
" VALUES (?,?)",
),
(v, True)
)
def get_statements(f):
statement_buffer = ""
in_comment = False # If we're in a /* ... */ style comment
for line in f:
line = line.strip()
if in_comment:
# Check if this line contains an end to the comment
comments = line.split("*/", 1)
if len(comments) == 1:
continue
line = comments[1]
in_comment = False
# Remove inline block comments
line = re.sub(r"/\*.*\*/", " ", line)
# Does this line start a comment?
comments = line.split("/*", 1)
if len(comments) > 1:
line = comments[0]
in_comment = True
# Deal with line comments
line = line.split("--", 1)[0]
line = line.split("//", 1)[0]
# Find *all* semicolons. We need to treat first and last entry
# specially.
statements = line.split(";")
# We must prepend statement_buffer to the first statement
first_statement = "%s %s" % (
statement_buffer.strip(),
statements[0].strip()
)
statements[0] = first_statement
# Every entry, except the last, is a full statement
for statement in statements[:-1]:
yield statement.strip()
# The last entry did *not* end in a semicolon, so we store it for the
# next semicolon we find
statement_buffer = statements[-1].strip()
def executescript(txn, schema_path):
with open(schema_path, 'r') as f:
for statement in get_statements(f):
txn.execute(statement)
def _get_or_create_schema_state(txn, database_engine):
# Bluntly try creating the schema_version tables.
schema_path = os.path.join(
dir_path, "schema", "schema_version.sql",
)
executescript(txn, schema_path)
txn.execute("SELECT version, upgraded FROM schema_version")
row = txn.fetchone()
current_version = int(row[0]) if row else None
upgraded = bool(row[1]) if row else None
if current_version:
txn.execute(
database_engine.convert_param_style(
"SELECT file FROM applied_schema_deltas WHERE version >= ?"
),
(current_version,)
)
applied_deltas = [d for d, in txn.fetchall()]
return current_version, applied_deltas, upgraded
return None
def prepare_sqlite3_database(db_conn):
"""This function should be called before `prepare_database` on sqlite3
databases.
Since we changed the way we store the current schema version and handle
updates to schemas, we need a way to upgrade from the old method to the
new. This only affects sqlite databases since they were the only ones
supported at the time.
"""
with db_conn:
schema_path = os.path.join(
dir_path, "schema", "schema_version.sql",
)
create_schema = read_schema(schema_path)
db_conn.executescript(create_schema)
c = db_conn.execute("SELECT * FROM schema_version")
rows = c.fetchall()
c.close()
if not rows:
c = db_conn.execute("PRAGMA user_version")
row = c.fetchone()
c.close()
if row and row[0]:
db_conn.execute(
"REPLACE INTO schema_version (version, upgraded)"
" VALUES (?,?)",
(row[0], False)
)
def are_all_users_on_domain(txn, database_engine, domain): def are_all_users_on_domain(txn, database_engine, domain):
sql = database_engine.convert_param_style( sql = database_engine.convert_param_style(
"SELECT COUNT(*) FROM users WHERE name NOT LIKE ?" "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.storage import prepare_database from synapse.storage.prepare_database import prepare_database
from ._base import IncorrectDatabaseSetup from ._base import IncorrectDatabaseSetup

View File

@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.storage import prepare_database, prepare_sqlite3_database from synapse.storage.prepare_database import (
prepare_database, prepare_sqlite3_database
)
class Sqlite3Engine(object): class Sqlite3Engine(object):

View File

@ -0,0 +1,395 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fnmatch
import imp
import logging
import os
import re
logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 24
dir_path = os.path.abspath(os.path.dirname(__file__))
def read_schema(path):
""" Read the named database schema.
Args:
path: Path of the database schema.
Returns:
A string containing the database schema.
"""
with open(path) as schema_file:
return schema_file.read()
class PrepareDatabaseException(Exception):
pass
class UpgradeDatabaseException(PrepareDatabaseException):
pass
def prepare_database(db_conn, database_engine):
"""Prepares a database for usage. Will either create all necessary tables
or upgrade from an older schema version.
"""
try:
cur = db_conn.cursor()
version_info = _get_or_create_schema_state(cur, database_engine)
if version_info:
user_version, delta_files, upgraded = version_info
_upgrade_existing_database(
cur, user_version, delta_files, upgraded, database_engine
)
else:
_setup_new_database(cur, database_engine)
# cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
cur.close()
db_conn.commit()
except:
db_conn.rollback()
raise
def _setup_new_database(cur, database_engine):
"""Sets up the database by finding a base set of "full schemas" and then
applying any necessary deltas.
The "full_schemas" directory has subdirectories named after versions. This
function searches for the highest version less than or equal to
`SCHEMA_VERSION` and executes all .sql files in that directory.
The function will then apply all deltas for all versions after the base
version.
Example directory structure:
schema/
delta/
...
full_schemas/
3/
test.sql
...
11/
foo.sql
bar.sql
...
In the example foo.sql and bar.sql would be run, and then any delta files
for versions strictly greater than 11.
"""
current_dir = os.path.join(dir_path, "schema", "full_schemas")
directory_entries = os.listdir(current_dir)
valid_dirs = []
pattern = re.compile(r"^\d+(\.sql)?$")
for filename in directory_entries:
match = pattern.match(filename)
abs_path = os.path.join(current_dir, filename)
if match and os.path.isdir(abs_path):
ver = int(match.group(0))
if ver <= SCHEMA_VERSION:
valid_dirs.append((ver, abs_path))
else:
logger.warn("Unexpected entry in 'full_schemas': %s", filename)
if not valid_dirs:
raise PrepareDatabaseException(
"Could not find a suitable base set of full schemas"
)
max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
logger.debug("Initialising schema v%d", max_current_ver)
directory_entries = os.listdir(sql_dir)
for filename in fnmatch.filter(directory_entries, "*.sql"):
sql_loc = os.path.join(sql_dir, filename)
logger.debug("Applying schema %s", sql_loc)
executescript(cur, sql_loc)
cur.execute(
database_engine.convert_param_style(
"INSERT INTO schema_version (version, upgraded)"
" VALUES (?,?)"
),
(max_current_ver, False,)
)
_upgrade_existing_database(
cur,
current_version=max_current_ver,
applied_delta_files=[],
upgraded=False,
database_engine=database_engine,
)
def _upgrade_existing_database(cur, current_version, applied_delta_files,
upgraded, database_engine):
"""Upgrades an existing database.
Delta files can either be SQL stored in *.sql files, or python modules
in *.py.
There can be multiple delta files per version. Synapse will keep track of
which delta files have been applied, and will apply any that haven't been
even if there has been no version bump. This is useful for development
where orthogonal schema changes may happen on separate branches.
Different delta files for the same version *must* be orthogonal and give
the same result when applied in any order. No guarantees are made on the
order of execution of these scripts.
This is a no-op of current_version == SCHEMA_VERSION.
Example directory structure:
schema/
delta/
11/
foo.sql
...
12/
foo.sql
bar.py
...
full_schemas/
...
In the example, if current_version is 11, then foo.sql will be run if and
only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
some arbitrary order.
Args:
cur (Cursor)
current_version (int): The current version of the schema.
applied_delta_files (list): A list of deltas that have already been
applied.
upgraded (bool): Whether the current version was generated by having
applied deltas or from full schema file. If `True` the function
will never apply delta files for the given `current_version`, since
the current_version wasn't generated by applying those delta files.
"""
if current_version > SCHEMA_VERSION:
raise ValueError(
"Cannot use this database as it is too " +
"new for the server to understand"
)
start_ver = current_version
if not upgraded:
start_ver += 1
logger.debug("applied_delta_files: %s", applied_delta_files)
for v in range(start_ver, SCHEMA_VERSION + 1):
logger.debug("Upgrading schema to v%d", v)
delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
try:
directory_entries = os.listdir(delta_dir)
except OSError:
logger.exception("Could not open delta dir for version %d", v)
raise UpgradeDatabaseException(
"Could not open delta dir for version %d" % (v,)
)
directory_entries.sort()
for file_name in directory_entries:
relative_path = os.path.join(str(v), file_name)
logger.debug("Found file: %s", relative_path)
if relative_path in applied_delta_files:
continue
absolute_path = os.path.join(
dir_path, "schema", "delta", relative_path,
)
root_name, ext = os.path.splitext(file_name)
if ext == ".py":
# This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function.
module_name = "synapse.storage.v%d_%s" % (
v, root_name
)
with open(absolute_path) as python_file:
module = imp.load_source(
module_name, absolute_path, python_file
)
logger.debug("Running script %s", relative_path)
module.run_upgrade(cur, database_engine)
elif ext == ".pyc":
# Sometimes .pyc files turn up anyway even though we've
# disabled their generation; e.g. from distribution package
# installers. Silently skip it
pass
elif ext == ".sql":
# A plain old .sql file, just read and execute it
logger.debug("Applying schema %s", relative_path)
executescript(cur, absolute_path)
else:
# Not a valid delta file.
logger.warn(
"Found directory entry that did not end in .py or"
" .sql: %s",
relative_path,
)
continue
# Mark as done.
cur.execute(
database_engine.convert_param_style(
"INSERT INTO applied_schema_deltas (version, file)"
" VALUES (?,?)",
),
(v, relative_path)
)
cur.execute("DELETE FROM schema_version")
cur.execute(
database_engine.convert_param_style(
"INSERT INTO schema_version (version, upgraded)"
" VALUES (?,?)",
),
(v, True)
)
def get_statements(f):
statement_buffer = ""
in_comment = False # If we're in a /* ... */ style comment
for line in f:
line = line.strip()
if in_comment:
# Check if this line contains an end to the comment
comments = line.split("*/", 1)
if len(comments) == 1:
continue
line = comments[1]
in_comment = False
# Remove inline block comments
line = re.sub(r"/\*.*\*/", " ", line)
# Does this line start a comment?
comments = line.split("/*", 1)
if len(comments) > 1:
line = comments[0]
in_comment = True
# Deal with line comments
line = line.split("--", 1)[0]
line = line.split("//", 1)[0]
# Find *all* semicolons. We need to treat first and last entry
# specially.
statements = line.split(";")
# We must prepend statement_buffer to the first statement
first_statement = "%s %s" % (
statement_buffer.strip(),
statements[0].strip()
)
statements[0] = first_statement
# Every entry, except the last, is a full statement
for statement in statements[:-1]:
yield statement.strip()
# The last entry did *not* end in a semicolon, so we store it for the
# next semicolon we find
statement_buffer = statements[-1].strip()
def executescript(txn, schema_path):
with open(schema_path, 'r') as f:
for statement in get_statements(f):
txn.execute(statement)
def _get_or_create_schema_state(txn, database_engine):
# Bluntly try creating the schema_version tables.
schema_path = os.path.join(
dir_path, "schema", "schema_version.sql",
)
executescript(txn, schema_path)
txn.execute("SELECT version, upgraded FROM schema_version")
row = txn.fetchone()
current_version = int(row[0]) if row else None
upgraded = bool(row[1]) if row else None
if current_version:
txn.execute(
database_engine.convert_param_style(
"SELECT file FROM applied_schema_deltas WHERE version >= ?"
),
(current_version,)
)
applied_deltas = [d for d, in txn.fetchall()]
return current_version, applied_deltas, upgraded
return None
def prepare_sqlite3_database(db_conn):
"""This function should be called before `prepare_database` on sqlite3
databases.
Since we changed the way we store the current schema version and handle
updates to schemas, we need a way to upgrade from the old method to the
new. This only affects sqlite databases since they were the only ones
supported at the time.
"""
with db_conn:
schema_path = os.path.join(
dir_path, "schema", "schema_version.sql",
)
create_schema = read_schema(schema_path)
db_conn.executescript(create_schema)
c = db_conn.execute("SELECT * FROM schema_version")
rows = c.fetchall()
c.close()
if not rows:
c = db_conn.execute("PRAGMA user_version")
row = c.fetchone()
c.close()
if row and row[0]:
db_conn.execute(
"REPLACE INTO schema_version (version, upgraded)"
" VALUES (?,?)",
(row[0], False)
)

View File

@ -110,6 +110,20 @@ class RoomMemberStore(SQLBaseStore):
membership=membership, membership=membership,
).addCallback(self._get_events) ).addCallback(self._get_events)
def get_invites_for_user(self, user_id):
""" Get all the invite events for a user
Args:
user_id (str): The user ID.
Returns:
A deferred list of event objects.
"""
return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE]
).addCallback(lambda invites: self._get_events([
invites.event_id for invite in invites
]))
def get_rooms_for_user_where_membership_is(self, user_id, membership_list): def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user """ Get all the rooms for this user where the membership for this user
matches one in the membership list. matches one in the membership list.

View File

@ -54,7 +54,7 @@ class StateStore(SQLBaseStore):
defer.returnValue({}) defer.returnValue({})
event_to_groups = yield self._get_state_group_for_events( event_to_groups = yield self._get_state_group_for_events(
room_id, event_ids, event_ids,
) )
groups = set(event_to_groups.values()) groups = set(event_to_groups.values())
@ -208,7 +208,7 @@ class StateStore(SQLBaseStore):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_for_events(self, room_id, event_ids, types): def get_state_for_events(self, event_ids, types):
"""Given a list of event_ids and type tuples, return a list of state """Given a list of event_ids and type tuples, return a list of state
dicts for each event. The state dicts will only have the type/state_keys dicts for each event. The state dicts will only have the type/state_keys
that are in the `types` list. that are in the `types` list.
@ -225,7 +225,7 @@ class StateStore(SQLBaseStore):
The dicts are mappings from (type, state_key) -> state_events The dicts are mappings from (type, state_key) -> state_events
""" """
event_to_groups = yield self._get_state_group_for_events( event_to_groups = yield self._get_state_group_for_events(
room_id, event_ids, event_ids,
) )
groups = set(event_to_groups.values()) groups = set(event_to_groups.values())
@ -251,8 +251,8 @@ class StateStore(SQLBaseStore):
) )
@cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
num_args=2) num_args=1)
def _get_state_group_for_events(self, room_id, event_ids): def _get_state_group_for_events(self, event_ids):
"""Returns mapping event_id -> state_group """Returns mapping event_id -> state_group
""" """
def f(txn): def f(txn):

72
synapse/util/debug.py Normal file
View File

@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from functools import wraps
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
def debug_deferreds():
"""Cause all deferreds to wait for a reactor tick before running their
callbacks. This increases the chance of getting a stack trace out of
a defer.inlineCallback since the code waiting on the deferred will get
a chance to add an errback before the deferred runs."""
# Helper method for retrieving and restoring the current logging context
# around a callback.
def with_logging_context(fn):
context = LoggingContext.current_context()
def restore_context_callback(x):
with PreserveLoggingContext():
LoggingContext.thread_local.current_context = context
return fn(x)
return restore_context_callback
# We are going to modify the __init__ method of defer.Deferred so we
# need to get a copy of the old method so we can still call it.
old__init__ = defer.Deferred.__init__
# We need to create a deferred to bounce the callbacks through the reactor
# but we don't want to add a callback when we create that deferred so we
# we create a new type of deferred that uses the old __init__ method.
# This is safe as long as the old __init__ method doesn't invoke an
# __init__ using super.
class Bouncer(defer.Deferred):
__init__ = old__init__
# We'll add this as a callback to all Deferreds. Twisted will wait until
# the bouncer deferred resolves before calling the callbacks of the
# original deferred.
def bounce_callback(x):
bouncer = Bouncer()
reactor.callLater(0, with_logging_context(bouncer.callback), x)
return bouncer
# We'll add this as an errback to all Deferreds. Twisted will wait until
# the bouncer deferred resolves before calling the errbacks of the
# original deferred.
def bounce_errback(x):
bouncer = Bouncer()
reactor.callLater(0, with_logging_context(bouncer.errback), x)
return bouncer
@wraps(old__init__)
def new__init__(self, *args, **kargs):
old__init__(self, *args, **kargs)
self.addCallbacks(bounce_callback, bounce_errback)
defer.Deferred.__init__ = new__init__

View File

@ -345,9 +345,9 @@ class FilteringTestCase(unittest.TestCase):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def test_filter_public_user_data_match(self): def test_filter_presence_match(self):
user_filter_json = { user_filter_json = {
"public_user_data": { "presence": {
"types": ["m.*"] "types": ["m.*"]
} }
} }
@ -368,13 +368,13 @@ class FilteringTestCase(unittest.TestCase):
filter_id=filter_id, filter_id=filter_id,
) )
results = user_filter.filter_public_user_data(events=events) results = user_filter.filter_presence(events=events)
self.assertEquals(events, results) self.assertEquals(events, results)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_filter_public_user_data_no_match(self): def test_filter_presence_no_match(self):
user_filter_json = { user_filter_json = {
"public_user_data": { "presence": {
"types": ["m.*"] "types": ["m.*"]
} }
} }
@ -395,7 +395,7 @@ class FilteringTestCase(unittest.TestCase):
filter_id=filter_id, filter_id=filter_id,
) )
results = user_filter.filter_public_user_data(events=events) results = user_filter.filter_presence(events=events)
self.assertEquals([], results) self.assertEquals([], results)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -16,7 +16,7 @@
from synapse.http.server import HttpServer from synapse.http.server import HttpServer
from synapse.api.errors import cs_error, CodeMessageException, StoreError from synapse.api.errors import cs_error, CodeMessageException, StoreError
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.storage import prepare_database from synapse.storage.prepare_database import prepare_database
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.server import HomeServer from synapse.server import HomeServer