Merge branch 'develop' into matthew/preview_urls

This commit is contained in:
Matthew Hodgson 2016-03-29 01:20:25 +01:00
commit e0c2490a14
29 changed files with 615 additions and 318 deletions

View File

@ -814,17 +814,16 @@ class Auth(object):
return auth_ids return auth_ids
@log_function def _get_send_level(self, etype, state_key, auth_events):
def _can_send_event(self, event, auth_events):
key = (EventTypes.PowerLevels, "", ) key = (EventTypes.PowerLevels, "", )
send_level_event = auth_events.get(key) send_level_event = auth_events.get(key)
send_level = None send_level = None
if send_level_event: if send_level_event:
send_level = send_level_event.content.get("events", {}).get( send_level = send_level_event.content.get("events", {}).get(
event.type etype
) )
if send_level is None: if send_level is None:
if hasattr(event, "state_key"): if state_key is not None:
send_level = send_level_event.content.get( send_level = send_level_event.content.get(
"state_default", 50 "state_default", 50
) )
@ -838,6 +837,13 @@ class Auth(object):
else: else:
send_level = 0 send_level = 0
return send_level
@log_function
def _can_send_event(self, event, auth_events):
send_level = self._get_send_level(
event.type, event.get("state_key", None), auth_events
)
user_level = self._get_user_power_level(event.user_id, auth_events) user_level = self._get_user_power_level(event.user_id, auth_events)
if user_level < send_level: if user_level < send_level:
@ -982,3 +988,43 @@ class Auth(object):
"You don't have permission to add ops level greater " "You don't have permission to add ops level greater "
"than your own" "than your own"
) )
@defer.inlineCallbacks
def check_can_change_room_list(self, room_id, user):
"""Check if the user is allowed to edit the room's entry in the
published room list.
Args:
room_id (str)
user (UserID)
"""
is_admin = yield self.is_server_admin(user)
if is_admin:
defer.returnValue(True)
user_id = user.to_string()
yield self.check_joined_room(room_id, user_id)
# We currently require the user is a "moderator" in the room. We do this
# by checking if they would (theoretically) be able to change the
# m.room.aliases events
power_level_event = yield self.state.get_current_state(
room_id, EventTypes.PowerLevels, ""
)
auth_events = {}
if power_level_event:
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
send_level = self._get_send_level(
EventTypes.Aliases, "", auth_events
)
user_level = self._get_user_power_level(user_id, auth_events)
if user_level < send_level:
raise AuthError(
403,
"This server requires you to be a moderator in the room to"
" edit its room list entry"
)

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
from synapse.util.frozenutils import freeze from synapse.util.frozenutils import freeze
from synapse.util.caches import intern_dict
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
@ -140,6 +141,10 @@ class FrozenEvent(EventBase):
unsigned = dict(event_dict.pop("unsigned", {})) unsigned = dict(event_dict.pop("unsigned", {}))
# We intern these strings because they turn up a lot (especially when
# caching).
event_dict = intern_dict(event_dict)
if USE_FROZEN_DICTS: if USE_FROZEN_DICTS:
frozen_dict = freeze(event_dict) frozen_dict = freeze(event_dict)
else: else:

View File

@ -418,6 +418,7 @@ class FederationClient(FederationBase):
"Failed to make_%s via %s: %s", "Failed to make_%s via %s: %s",
membership, destination, e.message membership, destination, e.message
) )
raise
raise RuntimeError("Failed to send to any server.") raise RuntimeError("Failed to send to any server.")

View File

@ -531,7 +531,6 @@ class FederationServer(FederationBase):
yield self.handler.on_receive_pdu( yield self.handler.on_receive_pdu(
origin, origin,
pdu, pdu,
backfilled=False,
state=state, state=state,
auth_chain=auth_chain, auth_chain=auth_chain,
) )

View File

@ -175,7 +175,7 @@ class BaseFederationServlet(object):
class FederationSendServlet(BaseFederationServlet): class FederationSendServlet(BaseFederationServlet):
PATH = "/send/([^/]*)/" PATH = "/send/(?P<transaction_id>[^/]*)/"
def __init__(self, handler, server_name, **kwargs): def __init__(self, handler, server_name, **kwargs):
super(FederationSendServlet, self).__init__( super(FederationSendServlet, self).__init__(
@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet):
class FederationEventServlet(BaseFederationServlet): class FederationEventServlet(BaseFederationServlet):
PATH = "/event/([^/]*)/" PATH = "/event/(?P<event_id>[^/]*)/"
# This is when someone asks for a data item for a given server data_id pair. # This is when someone asks for a data item for a given server data_id pair.
def on_GET(self, origin, content, query, event_id): def on_GET(self, origin, content, query, event_id):
@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet):
class FederationStateServlet(BaseFederationServlet): class FederationStateServlet(BaseFederationServlet):
PATH = "/state/([^/]*)/" PATH = "/state/(?P<context>[^/]*)/"
# This is when someone asks for all data for a given context. # This is when someone asks for all data for a given context.
def on_GET(self, origin, content, query, context): def on_GET(self, origin, content, query, context):
@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet):
class FederationBackfillServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/([^/]*)/" PATH = "/backfill/(?P<context>[^/]*)/"
def on_GET(self, origin, content, query, context): def on_GET(self, origin, content, query, context):
versions = query["v"] versions = query["v"]
@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet):
class FederationQueryServlet(BaseFederationServlet): class FederationQueryServlet(BaseFederationServlet):
PATH = "/query/([^/]*)" PATH = "/query/(?P<query_type>[^/]*)"
# This is when we receive a server-server Query # This is when we receive a server-server Query
def on_GET(self, origin, content, query, query_type): def on_GET(self, origin, content, query, query_type):
@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet):
class FederationMakeJoinServlet(BaseFederationServlet): class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/([^/]*)/([^/]*)" PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id): def on_GET(self, origin, content, query, context, user_id):
@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet):
class FederationMakeLeaveServlet(BaseFederationServlet): class FederationMakeLeaveServlet(BaseFederationServlet):
PATH = "/make_leave/([^/]*)/([^/]*)" PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id): def on_GET(self, origin, content, query, context, user_id):
@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
class FederationSendLeaveServlet(BaseFederationServlet): class FederationSendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/([^/]*)/([^/]*)" PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id, txid): def on_PUT(self, origin, content, query, room_id, txid):
@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet):
class FederationEventAuthServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet):
PATH = "/event_auth/([^/]*)/([^/]*)" PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)"
def on_GET(self, origin, content, query, context, event_id): def on_GET(self, origin, content, query, context, event_id):
return self.handler.on_event_auth(origin, context, event_id) return self.handler.on_event_auth(origin, context, event_id)
class FederationSendJoinServlet(BaseFederationServlet): class FederationSendJoinServlet(BaseFederationServlet):
PATH = "/send_join/([^/]*)/([^/]*)" PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id): def on_PUT(self, origin, content, query, context, event_id):
@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet):
class FederationInviteServlet(BaseFederationServlet): class FederationInviteServlet(BaseFederationServlet):
PATH = "/invite/([^/]*)/([^/]*)" PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id): def on_PUT(self, origin, content, query, context, event_id):
@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet):
class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
PATH = "/exchange_third_party_invite/([^/]*)" PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id): def on_PUT(self, origin, content, query, room_id):
@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet):
class FederationQueryAuthServlet(BaseFederationServlet): class FederationQueryAuthServlet(BaseFederationServlet):
PATH = "/query_auth/([^/]*)/([^/]*)" PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, origin, content, query, context, event_id): def on_POST(self, origin, content, query, context, event_id):
@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
class FederationGetMissingEventsServlet(BaseFederationServlet): class FederationGetMissingEventsServlet(BaseFederationServlet):
# TODO(paul): Why does this path alone end with "/?" optional? # TODO(paul): Why does this path alone end with "/?" optional?
PATH = "/get_missing_events/([^/]*)/?" PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, origin, content, query, room_id): def on_POST(self, origin, content, query, room_id):

View File

@ -317,3 +317,25 @@ class DirectoryHandler(BaseHandler):
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
defer.returnValue(is_admin) defer.returnValue(is_admin)
@defer.inlineCallbacks
def edit_published_room_list(self, requester, room_id, visibility):
"""Edit the entry of the room in the published room list.
requester
room_id (str)
visibility (str): "public" or "private"
"""
if requester.is_guest:
raise AuthError(403, "Guests cannot edit the published room list")
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")
yield self.auth.check_can_change_room_list(room_id, requester.user)
yield self.store.set_room_is_public(room_id, visibility == "public")

View File

@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, backfilled, state=None, def on_receive_pdu(self, origin, pdu, state=None,
auth_chain=None): auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to """ Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler. do auth checks and put it through the StateHandler.
@ -123,7 +123,6 @@ class FederationHandler(BaseHandler):
# FIXME (erikj): Awful hack to make the case where we are not currently # FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work # in the room work
current_state = None
is_in_room = yield self.auth.check_host_in_room( is_in_room = yield self.auth.check_host_in_room(
event.room_id, event.room_id,
self.server_name self.server_name
@ -186,8 +185,6 @@ class FederationHandler(BaseHandler):
origin, origin,
event, event,
state=state, state=state,
backfilled=backfilled,
current_state=current_state,
) )
except AuthError as e: except AuthError as e:
raise FederationError( raise FederationError(
@ -216,18 +213,17 @@ class FederationHandler(BaseHandler):
except StoreError: except StoreError:
logger.exception("Failed to store room.") logger.exception("Failed to store room.")
if not backfilled: extra_users = []
extra_users = [] if event.type == EventTypes.Member:
if event.type == EventTypes.Member: target_user_id = event.state_key
target_user_id = event.state_key target_user = UserID.from_string(target_user_id)
target_user = UserID.from_string(target_user_id) extra_users.append(target_user)
extra_users.append(target_user)
with PreserveLoggingContext(): with PreserveLoggingContext():
self.notifier.on_new_room_event( self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, event, event_stream_id, max_stream_id,
extra_users=extra_users extra_users=extra_users
) )
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
@ -647,7 +643,7 @@ class FederationHandler(BaseHandler):
continue continue
try: try:
self.on_receive_pdu(origin, p, backfilled=False) self.on_receive_pdu(origin, p)
except: except:
logger.exception("Couldn't handle pdu") logger.exception("Couldn't handle pdu")
@ -779,7 +775,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, event,
context=context, context=context,
backfilled=False,
) )
target_user = UserID.from_string(event.state_key) target_user = UserID.from_string(event.state_key)
@ -819,7 +814,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, event,
context=context, context=context,
backfilled=False,
) )
target_user = UserID.from_string(event.state_key) target_user = UserID.from_string(event.state_key)
@ -1074,8 +1068,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _handle_new_event(self, origin, event, state=None, backfilled=False, def _handle_new_event(self, origin, event, state=None, auth_events=None):
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier() outlier = event.internal_metadata.is_outlier()
@ -1085,7 +1078,7 @@ class FederationHandler(BaseHandler):
auth_events=auth_events, auth_events=auth_events,
) )
if not backfilled and not event.internal_metadata.is_outlier(): if not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs) action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event( yield action_generator.handle_push_actions_for_event(
event, context, self event, context, self
@ -1094,9 +1087,7 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, event,
context=context, context=context,
backfilled=backfilled, is_new_state=not outlier,
is_new_state=(not outlier and not backfilled),
current_state=current_state,
) )
defer.returnValue((context, event_stream_id, max_stream_id)) defer.returnValue((context, event_stream_id, max_stream_id))
@ -1194,7 +1185,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context, event, new_event_context,
backfilled=False,
is_new_state=True, is_new_state=True,
current_state=state, current_state=state,
) )

View File

@ -25,6 +25,7 @@ from synapse.api.constants import (
from synapse.api.errors import AuthError, StoreError, SynapseError, Codes from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
from synapse.util import stringutils, unwrapFirstError from synapse.util import stringutils, unwrapFirstError
from synapse.util.logcontext import preserve_context_over_fn from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.caches.response_cache import ResponseCache
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
@ -119,7 +120,8 @@ class RoomCreationHandler(BaseHandler):
invite_3pid_list = config.get("invite_3pid", []) invite_3pid_list = config.get("invite_3pid", [])
is_public = config.get("visibility", None) == "public" visibility = config.get("visibility", None)
is_public = visibility == "public"
# autogen room IDs and try to create it. We may clash, so just # autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually. # try a few times till one goes through, giving up eventually.
@ -155,9 +157,9 @@ class RoomCreationHandler(BaseHandler):
preset_config = config.get( preset_config = config.get(
"preset", "preset",
RoomCreationPreset.PUBLIC_CHAT RoomCreationPreset.PRIVATE_CHAT
if is_public if visibility == "private"
else RoomCreationPreset.PRIVATE_CHAT else RoomCreationPreset.PUBLIC_CHAT
) )
raw_initial_state = config.get("initial_state", []) raw_initial_state = config.get("initial_state", [])
@ -938,61 +940,79 @@ class RoomMemberHandler(BaseHandler):
class RoomListHandler(BaseHandler): class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache()
def get_public_room_list(self):
result = self.response_cache.get(())
if not result:
result = self.response_cache.set((), self._get_public_room_list())
return result
@defer.inlineCallbacks @defer.inlineCallbacks
def get_public_room_list(self): def _get_public_room_list(self):
room_ids = yield self.store.get_public_room_ids() room_ids = yield self.store.get_public_room_ids()
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_room(room_id): def handle_room(room_id):
aliases = yield self.store.get_aliases_for_room(room_id) aliases = yield self.store.get_aliases_for_room(room_id)
if not aliases:
defer.returnValue(None)
state = yield self.state_handler.get_current_state(room_id) # We pull each bit of state out indvidually to avoid pulling the
# full state into memory. Due to how the caching works this should
# be fairly quick, even if not originally in the cache.
def get_state(etype, state_key):
return self.state_handler.get_current_state(room_id, etype, state_key)
result = {"aliases": aliases, "room_id": room_id} # Double check that this is actually a public room.
join_rules_event = yield get_state(EventTypes.JoinRules, "")
if join_rules_event:
join_rule = join_rules_event.content.get("join_rule", None)
if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
name_event = state.get((EventTypes.Name, ""), None) result = {"room_id": room_id}
if aliases:
result["aliases"] = aliases
name_event = yield get_state(EventTypes.Name, "")
if name_event: if name_event:
name = name_event.content.get("name", None) name = name_event.content.get("name", None)
if name: if name:
result["name"] = name result["name"] = name
topic_event = state.get((EventTypes.Topic, ""), None) topic_event = yield get_state(EventTypes.Topic, "")
if topic_event: if topic_event:
topic = topic_event.content.get("topic", None) topic = topic_event.content.get("topic", None)
if topic: if topic:
result["topic"] = topic result["topic"] = topic
canonical_event = state.get((EventTypes.CanonicalAlias, ""), None) canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
if canonical_event: if canonical_event:
canonical_alias = canonical_event.content.get("alias", None) canonical_alias = canonical_event.content.get("alias", None)
if canonical_alias: if canonical_alias:
result["canonical_alias"] = canonical_alias result["canonical_alias"] = canonical_alias
visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
visibility = None visibility = None
if visibility_event: if visibility_event:
visibility = visibility_event.content.get("history_visibility", None) visibility = visibility_event.content.get("history_visibility", None)
result["world_readable"] = visibility == "world_readable" result["world_readable"] = visibility == "world_readable"
guest_event = state.get((EventTypes.GuestAccess, ""), None) guest_event = yield get_state(EventTypes.GuestAccess, "")
guest = None guest = None
if guest_event: if guest_event:
guest = guest_event.content.get("guest_access", None) guest = guest_event.content.get("guest_access", None)
result["guest_can_join"] = guest == "can_join" result["guest_can_join"] = guest == "can_join"
avatar_event = state.get(("m.room.avatar", ""), None) avatar_event = yield get_state("m.room.avatar", "")
if avatar_event: if avatar_event:
avatar_url = avatar_event.content.get("url", None) avatar_url = avatar_event.content.get("url", None)
if avatar_url: if avatar_url:
result["avatar_url"] = avatar_url result["avatar_url"] = avatar_url
result["num_joined_members"] = sum( joined_users = yield self.store.get_users_in_room(room_id)
1 for (event_type, _), ev in state.items() result["num_joined_members"] = len(joined_users)
if event_type == EventTypes.Member and ev.membership == Membership.JOIN
)
defer.returnValue(result) defer.returnValue(result)

View File

@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
from twisted.internet import defer from twisted.internet import defer
@ -35,6 +36,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"user", "user",
"filter_collection", "filter_collection",
"is_guest", "is_guest",
"request_key",
]) ])
@ -136,8 +138,8 @@ class SyncHandler(BaseHandler):
super(SyncHandler, self).__init__(hs) super(SyncHandler, self).__init__(hs)
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.response_cache = ResponseCache()
@defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False): full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise """Get the sync for a client if we have new data for it now. Otherwise
@ -146,7 +148,19 @@ class SyncHandler(BaseHandler):
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
result = self.response_cache.get(sync_config.request_key)
if not result:
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
sync_config, since_token, timeout, full_state
)
)
return result
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
full_state):
context = LoggingContext.current_context() context = LoggingContext.current_context()
if context: if context:
if since_token is None: if since_token is None:

View File

@ -18,6 +18,7 @@ from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
) )
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches import intern_dict
import synapse.metrics import synapse.metrics
import synapse.events import synapse.events
@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource):
else: else:
servlet_classname = "%r" % callback servlet_classname = "%r" % callback
args = [ kwargs = intern_dict({
urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups() name: urllib.unquote(value).decode("UTF-8") if value else value
] for name, value in m.groupdict().items()
})
callback_return = yield callback(request, *args) callback_return = yield callback(request, **kwargs)
if callback_return is not None: if callback_return is not None:
code, response = callback_return code, response = callback_return
self._send_response(request, code, response) self._send_response(request, code, response)

View File

@ -317,7 +317,7 @@ class Pusher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_badge_count(self): def _get_badge_count(self):
invites, joins = yield defer.gatherResults([ invites, joins = yield defer.gatherResults([
self.store.get_invites_for_user(self.user_id), self.store.get_invited_rooms_for_user(self.user_id),
self.store.get_rooms_for_user(self.user_id), self.store.get_rooms_for_user(self.user_id),
], consumeErrors=True) ], consumeErrors=True)

View File

@ -160,7 +160,27 @@ BASE_APPEND_OVRRIDE_RULES = [
'actions': [ 'actions': [
'dont_notify', 'dont_notify',
] ]
} },
# Will we sometimes want to know about people joining and leaving?
# Perhaps: if so, this could be expanded upon. Seems the most usual case
# is that we don't though. We add this override rule so that even if
# the room rule is set to notify, we don't get notifications about
# join/leave/avatar/displayname events.
# See also: https://matrix.org/jira/browse/SYN-607
{
'rule_id': 'global/override/.m.rule.member_event',
'conditions': [
{
'kind': 'event_match',
'key': 'type',
'pattern': 'm.room.member',
'_id': '_member',
}
],
'actions': [
'dont_notify'
]
},
] ]
@ -261,25 +281,6 @@ BASE_APPEND_UNDERRIDE_RULES = [
} }
] ]
}, },
# This is too simple: https://matrix.org/jira/browse/SYN-607
# Removing for now
# {
# 'rule_id': 'global/underride/.m.rule.member_event',
# 'conditions': [
# {
# 'kind': 'event_match',
# 'key': 'type',
# 'pattern': 'm.room.member',
# '_id': '_member',
# }
# ],
# 'actions': [
# 'notify', {
# 'set_tweak': 'highlight',
# 'value': False
# }
# ]
# },
{ {
'rule_id': 'global/underride/.m.rule.message', 'rule_id': 'global/underride/.m.rule.message',
'conditions': [ 'conditions': [

View File

@ -107,7 +107,9 @@ class BulkPushRuleEvaluator:
users_dict.items(), [event], {event.event_id: current_state} users_dict.items(), [event], {event.event_id: current_state}
) )
evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room)) room_members = yield self.store.get_users_in_room(self.room_id)
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
condition_cache = {} condition_cache = {}

View File

@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
def register_servlets(hs, http_server): def register_servlets(hs, http_server):
ClientDirectoryServer(hs).register(http_server) ClientDirectoryServer(hs).register(http_server)
ClientDirectoryListServer(hs).register(http_server)
class ClientDirectoryServer(ClientV1RestServlet): class ClientDirectoryServer(ClientV1RestServlet):
@ -137,3 +138,44 @@ class ClientDirectoryServer(ClientV1RestServlet):
) )
defer.returnValue((200, {})) defer.returnValue((200, {}))
class ClientDirectoryListServer(ClientV1RestServlet):
PATTERNS = client_path_patterns("/directory/list/room/(?P<room_id>[^/]*)$")
def __init__(self, hs):
super(ClientDirectoryListServer, self).__init__(hs)
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")
defer.returnValue((200, {
"visibility": "public" if room["is_public"] else "private"
}))
@defer.inlineCallbacks
def on_PUT(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
content = parse_json_object_from_request(request)
visibility = content.get("visibility", "public")
yield self.handlers.directory_handler.edit_published_room_list(
requester, room_id, visibility,
)
defer.returnValue((200, {}))
@defer.inlineCallbacks
def on_DELETE(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield self.handlers.directory_handler.edit_published_room_list(
requester, room_id, "private",
)
defer.returnValue((200, {}))

View File

@ -115,6 +115,8 @@ class SyncRestServlet(RestServlet):
) )
) )
request_key = (user, timeout, since, filter_id, full_state)
if filter_id: if filter_id:
if filter_id.startswith('{'): if filter_id.startswith('{'):
try: try:
@ -134,6 +136,7 @@ class SyncRestServlet(RestServlet):
user=user, user=user,
filter_collection=filter, filter_collection=filter,
is_guest=requester.is_guest, is_guest=requester.is_guest,
request_key=request_key,
) )
if since is not None: if since is not None:

View File

@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError from synapse.api.errors import AuthError
from synapse.api.auth import AuthEventTypes from synapse.api.auth import AuthEventTypes
@ -27,6 +28,7 @@ from collections import namedtuple
import logging import logging
import hashlib import hashlib
import os
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -34,8 +36,11 @@ logger = logging.getLogger(__name__)
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
SIZE_OF_CACHE = 1000 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
EVICTION_TIMEOUT_SECONDS = 20
SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR)
EVICTION_TIMEOUT_SECONDS = 60 * 60
class _StateCacheEntry(object): class _StateCacheEntry(object):
@ -85,16 +90,8 @@ class StateHandler(object):
""" """
event_ids = yield self.store.get_latest_event_ids_in_room(room_id) event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
cache = None res = yield self.resolve_state_groups(room_id, event_ids)
if self._state_cache is not None: state = res[1]
cache = self._state_cache.get(frozenset(event_ids), None)
if cache:
cache.ts = self.clock.time_msec()
state = cache.state
else:
res = yield self.resolve_state_groups(room_id, event_ids)
state = res[1]
if event_type: if event_type:
defer.returnValue(state.get((event_type, state_key))) defer.returnValue(state.get((event_type, state_key)))
@ -186,20 +183,6 @@ class StateHandler(object):
""" """
logger.debug("resolve_state_groups event_ids %s", event_ids) logger.debug("resolve_state_groups event_ids %s", event_ids)
if self._state_cache is not None:
cache = self._state_cache.get(frozenset(event_ids), None)
if cache and cache.state_group:
cache.ts = self.clock.time_msec()
prev_state = cache.state.get((event_type, state_key), None)
if prev_state:
prev_state = prev_state.event_id
prev_states = [prev_state]
else:
prev_states = []
defer.returnValue(
(cache.state_group, cache.state, prev_states)
)
state_groups = yield self.store.get_state_groups( state_groups = yield self.store.get_state_groups(
room_id, event_ids room_id, event_ids
) )
@ -209,7 +192,7 @@ class StateHandler(object):
state_groups.keys() state_groups.keys()
) )
group_names = set(state_groups.keys()) group_names = frozenset(state_groups.keys())
if len(group_names) == 1: if len(group_names) == 1:
name, state_list = state_groups.items().pop() name, state_list = state_groups.items().pop()
state = { state = {
@ -223,29 +206,38 @@ class StateHandler(object):
else: else:
prev_states = [] prev_states = []
if self._state_cache is not None:
cache = _StateCacheEntry(
state=state,
state_group=name,
ts=self.clock.time_msec()
)
self._state_cache[frozenset(event_ids)] = cache
defer.returnValue((name, state, prev_states)) defer.returnValue((name, state, prev_states))
if self._state_cache is not None:
cache = self._state_cache.get(group_names, None)
if cache and cache.state_group:
cache.ts = self.clock.time_msec()
event_dict = yield self.store.get_events(cache.state.values())
state = {(e.type, e.state_key): e for e in event_dict.values()}
prev_state = state.get((event_type, state_key), None)
if prev_state:
prev_state = prev_state.event_id
prev_states = [prev_state]
else:
prev_states = []
defer.returnValue(
(cache.state_group, state, prev_states)
)
new_state, prev_states = self._resolve_events( new_state, prev_states = self._resolve_events(
state_groups.values(), event_type, state_key state_groups.values(), event_type, state_key
) )
if self._state_cache is not None: if self._state_cache is not None:
cache = _StateCacheEntry( cache = _StateCacheEntry(
state=new_state, state={key: event.event_id for key, event in new_state.items()},
state_group=None, state_group=None,
ts=self.clock.time_msec() ts=self.clock.time_msec()
) )
self._state_cache[frozenset(event_ids)] = cache self._state_cache[group_names] = cache
defer.returnValue((None, new_state, prev_states)) defer.returnValue((None, new_state, prev_states))
@ -263,48 +255,49 @@ class StateHandler(object):
from (type, state_key) to event. prev_states is a list of event_ids. from (type, state_key) to event. prev_states is a list of event_ids.
:rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str]) :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
""" """
state = {} with Measure(self.clock, "state._resolve_events"):
for st in state_sets: state = {}
for e in st: for st in state_sets:
state.setdefault( for e in st:
(e.type, e.state_key), state.setdefault(
{} (e.type, e.state_key),
)[e.event_id] = e {}
)[e.event_id] = e
unconflicted_state = { unconflicted_state = {
k: v.values()[0] for k, v in state.items() k: v.values()[0] for k, v in state.items()
if len(v.values()) == 1 if len(v.values()) == 1
} }
conflicted_state = { conflicted_state = {
k: v.values() k: v.values()
for k, v in state.items() for k, v in state.items()
if len(v.values()) > 1 if len(v.values()) > 1
} }
if event_type: if event_type:
prev_states_events = conflicted_state.get( prev_states_events = conflicted_state.get(
(event_type, state_key), [] (event_type, state_key), []
) )
prev_states = [s.event_id for s in prev_states_events] prev_states = [s.event_id for s in prev_states_events]
else: else:
prev_states = [] prev_states = []
auth_events = { auth_events = {
k: e for k, e in unconflicted_state.items() k: e for k, e in unconflicted_state.items()
if k[0] in AuthEventTypes if k[0] in AuthEventTypes
} }
try: try:
resolved_state = self._resolve_state_events( resolved_state = self._resolve_state_events(
conflicted_state, auth_events conflicted_state, auth_events
) )
except: except:
logger.exception("Failed to resolve state") logger.exception("Failed to resolve state")
raise raise
new_state = unconflicted_state new_state = unconflicted_state
new_state.update(resolved_state) new_state.update(resolved_state)
return new_state, prev_states return new_state, prev_states

View File

@ -18,6 +18,7 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache from synapse.util.caches.descriptors import Cache
from synapse.util.caches import intern_dict
import synapse.metrics import synapse.metrics
@ -26,6 +27,10 @@ from twisted.internet import defer
import sys import sys
import time import time
import threading import threading
import os
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -163,7 +168,9 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size) max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000) self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR
)
self._event_fetch_lock = threading.Condition() self._event_fetch_lock = threading.Condition()
self._event_fetch_list = [] self._event_fetch_list = []
@ -344,7 +351,7 @@ class SQLBaseStore(object):
""" """
col_headers = list(column[0] for column in cursor.description) col_headers = list(column[0] for column in cursor.description)
results = list( results = list(
dict(zip(col_headers, row)) for row in cursor.fetchall() intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall()
) )
return results return results

View File

@ -155,7 +155,7 @@ class DirectoryStore(SQLBaseStore):
return room_id return room_id
@cached() @cached(max_entries=5000)
def get_aliases_for_room(self, room_id): def get_aliases_for_room(self, room_id):
return self._simple_select_onecol( return self._simple_select_onecol(
"room_aliases", "room_aliases",

View File

@ -49,7 +49,7 @@ class EventPushActionsStore(SQLBaseStore):
) )
self._simple_insert_many_txn(txn, "event_push_actions", values) self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, lru=True, tree=True) @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user( def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id self, room_id, user_id, last_read_event_id
): ):

View File

@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event, context, backfilled=False, def persist_event(self, event, context,
is_new_state=True, current_state=None): is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
self.min_stream_token -= 1
stream_ordering = self.min_stream_token
if stream_ordering is None:
stream_ordering_manager = self._stream_id_gen.get_next()
else:
@contextmanager
def stream_ordering_manager():
yield stream_ordering
stream_ordering_manager = stream_ordering_manager()
try: try:
with stream_ordering_manager as stream_ordering: with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction( yield self.runInteraction(
"persist_event", "persist_event",
self._persist_event_txn, self._persist_event_txn,
event=event, event=event,
context=context, context=context,
backfilled=backfilled,
is_new_state=is_new_state, is_new_state=is_new_state,
current_state=current_state, current_state=current_state,
) )
@ -165,13 +151,38 @@ class EventsStore(SQLBaseStore):
defer.returnValue(events[0] if events else None) defer.returnValue(events[0] if events else None)
@defer.inlineCallbacks
def get_events(self, event_ids, check_redacted=True,
get_prev_content=False, allow_rejected=False):
"""Get events from the database
Args:
event_ids (list): The event_ids of the events to fetch
check_redacted (bool): If True, check if event has been redacted
and redact it.
get_prev_content (bool): If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected (bool): If True return rejected events.
Returns:
Deferred : Dict from event_id to event.
"""
events = yield self._get_events(
event_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
defer.returnValue({e.event_id: e for e in events})
@log_function @log_function
def _persist_event_txn(self, txn, event, context, backfilled, def _persist_event_txn(self, txn, event, context,
is_new_state=True, current_state=None): is_new_state=True, current_state=None):
# We purposefully do this first since if we include a `current_state` # We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table # key, we *want* to update the `current_state_events` table
if current_state: if current_state:
txn.call_after(self.get_current_state_for_key.invalidate_all) txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
@ -198,7 +209,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn( return self._persist_events_txn(
txn, txn,
[(event, context)], [(event, context)],
backfilled=backfilled, backfilled=False,
is_new_state=is_new_state, is_new_state=is_new_state,
) )
@ -455,7 +466,7 @@ class EventsStore(SQLBaseStore):
for event, _ in state_events_and_contexts: for event, _ in state_events_and_contexts:
if not context.rejected: if not context.rejected:
txn.call_after( txn.call_after(
self.get_current_state_for_key.invalidate, self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,) (event.room_id, event.type, event.state_key,)
) )

View File

@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=2) @cachedInlineCallbacks(num_args=2)
def get_receipts_for_user(self, user_id, receipt_type): def get_receipts_for_user(self, user_id, receipt_type):
def f(txn): rows = yield self._simple_select_list(
sql = ( table="receipts_linearized",
"SELECT room_id,event_id " keyvalues={
"FROM receipts_linearized " "user_id": user_id,
"WHERE user_id = ? AND receipt_type = ? " "receipt_type": receipt_type,
) },
txn.execute(sql, (user_id, receipt_type)) retcols=("room_id", "event_id"),
return txn.fetchall() desc="get_receipts_for_user",
)
defer.returnValue(dict( defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
(yield self.runInteraction("get_receipts_for_user", f))
))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):

View File

@ -77,6 +77,14 @@ class RoomStore(SQLBaseStore):
allow_none=True, allow_none=True,
) )
def set_room_is_public(self, room_id, is_public):
return self._simple_update_one(
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
desc="set_room_is_public",
)
def get_public_room_ids(self): def get_public_room_ids(self):
return self._simple_select_onecol( return self._simple_select_onecol(
table="rooms", table="rooms",

View File

@ -115,19 +115,17 @@ class RoomMemberStore(SQLBaseStore):
).addCallback(self._get_events) ).addCallback(self._get_events)
@cached() @cached()
def get_invites_for_user(self, user_id): def get_invited_rooms_for_user(self, user_id):
""" Get all the invite events for a user """ Get all the rooms the user is invited to
Args: Args:
user_id (str): The user ID. user_id (str): The user ID.
Returns: Returns:
A deferred list of event objects. A deferred list of RoomsForUser.
""" """
return self.get_rooms_for_user_where_membership_is( return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE] user_id, [Membership.INVITE]
).addCallback(lambda invites: self._get_events([ )
invite.event_id for invite in invites
]))
def get_leave_and_ban_events_for_user(self, user_id): def get_leave_and_ban_events_for_user(self, user_id):
""" Get all the leave events for a user """ Get all the leave events for a user

View File

@ -0,0 +1,23 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* This release removes the restriction that published rooms must have an alias,
* so we go back and ensure the only 'public' rooms are ones with an alias.
* We use (1 = 0) and (1 = 1) so that it works in both postgres and sqlite
*/
UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in (
SELECT room_id FROM room_aliases
);

View File

@ -14,9 +14,8 @@
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import ( from synapse.util.caches.descriptors import cached, cachedList
cached, cachedInlineCallbacks, cachedList from synapse.util.caches import intern_string
)
from twisted.internet import defer from twisted.internet import defer
@ -155,8 +154,14 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False) events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events) defer.returnValue(events)
@cachedInlineCallbacks(num_args=3) @defer.inlineCallbacks
def get_current_state_for_key(self, room_id, event_type, state_key): def get_current_state_for_key(self, room_id, event_type, state_key):
event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key)
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
@cached(num_args=3)
def _get_current_state_for_key(self, room_id, event_type, state_key):
def f(txn): def f(txn):
sql = ( sql = (
"SELECT event_id FROM current_state_events" "SELECT event_id FROM current_state_events"
@ -167,12 +172,10 @@ class StateStore(SQLBaseStore):
txn.execute(sql, args) txn.execute(sql, args)
results = txn.fetchall() results = txn.fetchall()
return [r[0] for r in results] return [r[0] for r in results]
event_ids = yield self.runInteraction("get_current_state_for_key", f) return self.runInteraction("get_current_state_for_key", f)
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
def _get_state_groups_from_groups(self, groups, types): def _get_state_groups_from_groups(self, groups, types):
"""Returns dictionary state_group -> state event ids """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
""" """
def f(txn, groups): def f(txn, groups):
if types is not None: if types is not None:
@ -183,7 +186,8 @@ class StateStore(SQLBaseStore):
where_clause = "" where_clause = ""
sql = ( sql = (
"SELECT state_group, event_id FROM state_groups_state WHERE" "SELECT state_group, event_id, type, state_key"
" FROM state_groups_state WHERE"
" state_group IN (%s) %s" % ( " state_group IN (%s) %s" % (
",".join("?" for _ in groups), ",".join("?" for _ in groups),
where_clause, where_clause,
@ -199,7 +203,8 @@ class StateStore(SQLBaseStore):
results = {} results = {}
for row in rows: for row in rows:
results.setdefault(row["state_group"], []).append(row["event_id"]) key = (row["type"], row["state_key"])
results.setdefault(row["state_group"], {})[key] = row["event_id"]
return results return results
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
@ -296,7 +301,7 @@ class StateStore(SQLBaseStore):
where a `state_key` of `None` matches all state_keys for the where a `state_key` of `None` matches all state_keys for the
`type`. `type`.
""" """
is_all, state_dict = self._state_group_cache.get(group) is_all, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {} type_to_key = {}
missing_types = set() missing_types = set()
@ -308,7 +313,7 @@ class StateStore(SQLBaseStore):
if type_to_key.get(typ, object()) is not None: if type_to_key.get(typ, object()) is not None:
type_to_key.setdefault(typ, set()).add(state_key) type_to_key.setdefault(typ, set()).add(state_key)
if (typ, state_key) not in state_dict: if (typ, state_key) not in state_dict_ids:
missing_types.add((typ, state_key)) missing_types.add((typ, state_key))
sentinel = object() sentinel = object()
@ -326,7 +331,7 @@ class StateStore(SQLBaseStore):
got_all = not (missing_types or types is None) got_all = not (missing_types or types is None)
return { return {
k: v for k, v in state_dict.items() k: v for k, v in state_dict_ids.items()
if include(k[0], k[1]) if include(k[0], k[1])
}, missing_types, got_all }, missing_types, got_all
@ -340,8 +345,9 @@ class StateStore(SQLBaseStore):
Args: Args:
group: The state group to lookup group: The state group to lookup
""" """
is_all, state_dict = self._state_group_cache.get(group) is_all, state_dict_ids = self._state_group_cache.get(group)
return state_dict, is_all
return state_dict_ids, is_all
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_state_for_groups(self, groups, types=None): def _get_state_for_groups(self, groups, types=None):
@ -354,84 +360,72 @@ class StateStore(SQLBaseStore):
missing_groups = [] missing_groups = []
if types is not None: if types is not None:
for group in set(groups): for group in set(groups):
state_dict, missing_types, got_all = self._get_some_state_from_cache( state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
group, types group, types
) )
results[group] = state_dict results[group] = state_dict_ids
if not got_all: if not got_all:
missing_groups.append(group) missing_groups.append(group)
else: else:
for group in set(groups): for group in set(groups):
state_dict, got_all = self._get_all_state_from_cache( state_dict_ids, got_all = self._get_all_state_from_cache(
group group
) )
results[group] = state_dict
results[group] = state_dict_ids
if not got_all: if not got_all:
missing_groups.append(group) missing_groups.append(group)
if not missing_groups: if missing_groups:
defer.returnValue({ # Okay, so we have some missing_types, lets fetch them.
group: { cache_seq_num = self._state_group_cache.sequence
type_tuple: event
for type_tuple, event in state.items()
if event
}
for group, state in results.items()
})
# Okay, so we have some missing_types, lets fetch them. group_to_state_dict = yield self._get_state_groups_from_groups(
cache_seq_num = self._state_group_cache.sequence missing_groups, types
)
group_state_dict = yield self._get_state_groups_from_groups( # Now we want to update the cache with all the things we fetched
missing_groups, types # from the database.
) for group, group_state_dict in group_to_state_dict.items():
if types:
# We delibrately put key -> None mappings into the cache to
# cache absence of the key, on the assumption that if we've
# explicitly asked for some types then we will probably ask
# for them again.
state_dict = {
(intern_string(etype), intern_string(state_key)): None
for (etype, state_key) in types
}
state_dict.update(results[group])
results[group] = state_dict
else:
state_dict = results[group]
state_dict.update(group_state_dict)
self._state_group_cache.update(
cache_seq_num,
key=group,
value=state_dict,
full=(types is None),
)
state_events = yield self._get_events( state_events = yield self._get_events(
[e_id for l in group_state_dict.values() for e_id in l], [ev_id for sd in results.values() for ev_id in sd.values()],
get_prev_content=False get_prev_content=False
) )
state_events = {e.event_id: e for e in state_events} state_events = {e.event_id: e for e in state_events}
# Now we want to update the cache with all the things we fetched
# from the database.
for group, state_ids in group_state_dict.items():
if types:
# We delibrately put key -> None mappings into the cache to
# cache absence of the key, on the assumption that if we've
# explicitly asked for some types then we will probably ask
# for them again.
state_dict = {key: None for key in types}
state_dict.update(results[group])
results[group] = state_dict
else:
state_dict = results[group]
for event_id in state_ids:
try:
state_event = state_events[event_id]
state_dict[(state_event.type, state_event.state_key)] = state_event
except KeyError:
# Hmm. So we do don't have that state event? Interesting.
logger.warn(
"Can't find state event %r for state group %r",
event_id, group,
)
self._state_group_cache.update(
cache_seq_num,
key=group,
value=state_dict,
full=(types is None),
)
# Remove all the entries with None values. The None values were just # Remove all the entries with None values. The None values were just
# used for bookkeeping in the cache. # used for bookkeeping in the cache.
for group, state_dict in results.items(): for group, state_dict in results.items():
results[group] = { results[group] = {
key: event for key, event in state_dict.items() if event key: state_events[event_id]
for key, event_id in state_dict.items()
if event_id and event_id in state_events
} }
defer.returnValue(results) defer.returnValue(results)

View File

@ -36,7 +36,7 @@ what sort order was used:
from twisted.internet import defer from twisted.internet import defer
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import preserve_fn
@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore):
defer.returnValue((events, token)) defer.returnValue((events, token))
@cachedInlineCallbacks(num_args=4) @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
)
logger.debug("stream before")
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
logger.debug("stream after")
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
@cached(num_args=4)
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
end_token = RoomStreamToken.parse_stream_token(end_token) end_token = RoomStreamToken.parse_stream_token(end_token)
if from_token is None: if from_token is None:
@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore):
return rows, token return rows, token
rows, token = yield self.runInteraction( return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn "get_recent_events_for_room", get_recent_events_for_room_txn
) )
logger.debug("stream before")
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
logger.debug("stream after")
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_events_max_id(self, direction='f'): def get_room_events_max_id(self, direction='f'):
token = yield self._stream_id_gen.get_max_token() token = yield self._stream_id_gen.get_max_token()

View File

@ -14,6 +14,10 @@
# limitations under the License. # limitations under the License.
import synapse.metrics import synapse.metrics
from lrucache import LruCache
import os
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
DEBUG_CACHES = False DEBUG_CACHES = False
@ -25,3 +29,56 @@ cache_counter = metrics.register_cache(
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"], labels=["name"],
) )
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
caches_by_name["string_cache"] = _string_cache
KNOWN_KEYS = {
key: key for key in
(
"auth_events",
"content",
"depth",
"event_id",
"hashes",
"origin",
"origin_server_ts",
"prev_events",
"room_id",
"sender",
"signatures",
"state_key",
"type",
"unsigned",
"user_id",
)
}
def intern_string(string):
"""Takes a (potentially) unicode string and interns using custom cache
"""
return _string_cache.setdefault(string, string)
def intern_dict(dictionary):
"""Takes a dictionary and interns well known keys and their values
"""
return {
KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
for key, value in dictionary.items()
}
def _intern_known_values(key, value):
intern_str_keys = ("event_id", "room_id")
intern_unicode_keys = ("sender", "user_id", "type", "state_key")
if key in intern_str_keys:
return intern(value.encode('ascii'))
if key in intern_unicode_keys:
return intern_string(value)
return value

View File

@ -29,6 +29,16 @@ def enumerate_leaves(node, depth):
yield m yield m
class _Node(object):
__slots__ = ["prev_node", "next_node", "key", "value"]
def __init__(self, prev_node, next_node, key, value):
self.prev_node = prev_node
self.next_node = next_node
self.key = key
self.value = value
class LruCache(object): class LruCache(object):
""" """
Least-recently-used cache. Least-recently-used cache.
@ -38,10 +48,9 @@ class LruCache(object):
def __init__(self, max_size, keylen=1, cache_type=dict): def __init__(self, max_size, keylen=1, cache_type=dict):
cache = cache_type() cache = cache_type()
self.cache = cache # Used for introspection. self.cache = cache # Used for introspection.
list_root = [] list_root = _Node(None, None, None, None)
list_root[:] = [list_root, list_root, None, None] list_root.next_node = list_root
list_root.prev_node = list_root
PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
lock = threading.Lock() lock = threading.Lock()
@ -55,36 +64,36 @@ class LruCache(object):
def add_node(key, value): def add_node(key, value):
prev_node = list_root prev_node = list_root
next_node = prev_node[NEXT] next_node = prev_node.next_node
node = [prev_node, next_node, key, value] node = _Node(prev_node, next_node, key, value)
prev_node[NEXT] = node prev_node.next_node = node
next_node[PREV] = node next_node.prev_node = node
cache[key] = node cache[key] = node
def move_node_to_front(node): def move_node_to_front(node):
prev_node = node[PREV] prev_node = node.prev_node
next_node = node[NEXT] next_node = node.next_node
prev_node[NEXT] = next_node prev_node.next_node = next_node
next_node[PREV] = prev_node next_node.prev_node = prev_node
prev_node = list_root prev_node = list_root
next_node = prev_node[NEXT] next_node = prev_node.next_node
node[PREV] = prev_node node.prev_node = prev_node
node[NEXT] = next_node node.next_node = next_node
prev_node[NEXT] = node prev_node.next_node = node
next_node[PREV] = node next_node.prev_node = node
def delete_node(node): def delete_node(node):
prev_node = node[PREV] prev_node = node.prev_node
next_node = node[NEXT] next_node = node.next_node
prev_node[NEXT] = next_node prev_node.next_node = next_node
next_node[PREV] = prev_node next_node.prev_node = prev_node
@synchronized @synchronized
def cache_get(key, default=None): def cache_get(key, default=None):
node = cache.get(key, None) node = cache.get(key, None)
if node is not None: if node is not None:
move_node_to_front(node) move_node_to_front(node)
return node[VALUE] return node.value
else: else:
return default return default
@ -93,25 +102,25 @@ class LruCache(object):
node = cache.get(key, None) node = cache.get(key, None)
if node is not None: if node is not None:
move_node_to_front(node) move_node_to_front(node)
node[VALUE] = value node.value = value
else: else:
add_node(key, value) add_node(key, value)
if len(cache) > max_size: if len(cache) > max_size:
todelete = list_root[PREV] todelete = list_root.prev_node
delete_node(todelete) delete_node(todelete)
cache.pop(todelete[KEY], None) cache.pop(todelete.key, None)
@synchronized @synchronized
def cache_set_default(key, value): def cache_set_default(key, value):
node = cache.get(key, None) node = cache.get(key, None)
if node is not None: if node is not None:
return node[VALUE] return node.value
else: else:
add_node(key, value) add_node(key, value)
if len(cache) > max_size: if len(cache) > max_size:
todelete = list_root[PREV] todelete = list_root.prev_node
delete_node(todelete) delete_node(todelete)
cache.pop(todelete[KEY], None) cache.pop(todelete.key, None)
return value return value
@synchronized @synchronized
@ -119,8 +128,8 @@ class LruCache(object):
node = cache.get(key, None) node = cache.get(key, None)
if node: if node:
delete_node(node) delete_node(node)
cache.pop(node[KEY], None) cache.pop(node.key, None)
return node[VALUE] return node.value
else: else:
return default return default
@ -137,8 +146,8 @@ class LruCache(object):
@synchronized @synchronized
def cache_clear(): def cache_clear():
list_root[NEXT] = list_root list_root.next_node = list_root
list_root[PREV] = list_root list_root.prev_node = list_root
cache.clear() cache.clear()
@synchronized @synchronized

View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.async import ObservableDeferred
class ResponseCache(object):
"""
This caches a deferred response. Until the deferred completes it will be
returned from the cache. This means that if the client retries the request
while the response is still being computed, that original response will be
used rather than trying to compute a new response.
"""
def __init__(self):
self.pending_result_cache = {} # Requests that haven't finished yet.
def get(self, key):
result = self.pending_result_cache.get(key)
if result is not None:
return result.observe()
else:
return None
def set(self, key, deferred):
result = ObservableDeferred(deferred)
self.pending_result_cache[key] = result
def remove(r):
self.pending_result_cache.pop(key, None)
return r
result.addBoth(remove)
return result.observe()