Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.11.0

This commit is contained in:
Erik Johnston 2015-11-11 17:51:14 +00:00
commit 6341be45c6
24 changed files with 737 additions and 140 deletions

View File

@ -20,8 +20,8 @@ The overall architecture is::
https://somewhere.org/_matrix https://elsewhere.net/_matrix https://somewhere.org/_matrix https://elsewhere.net/_matrix
``#matrix:matrix.org`` is the official support room for Matrix, and can be ``#matrix:matrix.org`` is the official support room for Matrix, and can be
accessed by the web client at http://matrix.org/beta or via an IRC bridge at accessed by any client from https://matrix.org/blog/try-matrix-now or via IRC
irc://irc.freenode.net/matrix. bridge at irc://irc.freenode.net/matrix.
Synapse is currently in rapid development, but as of version 0.5 we believe it Synapse is currently in rapid development, but as of version 0.5 we believe it
is sufficiently stable to be run as an internet-facing service for real usage! is sufficiently stable to be run as an internet-facing service for real usage!
@ -77,14 +77,14 @@ Meanwhile, iOS and Android SDKs and clients are available from:
- https://github.com/matrix-org/matrix-android-sdk - https://github.com/matrix-org/matrix-android-sdk
We'd like to invite you to join #matrix:matrix.org (via We'd like to invite you to join #matrix:matrix.org (via
https://matrix.org/beta), run a homeserver, take a look at the Matrix spec at https://matrix.org/blog/try-matrix-now), run a homeserver, take a look at the
https://matrix.org/docs/spec and API docs at https://matrix.org/docs/api, Matrix spec at https://matrix.org/docs/spec and API docs at
experiment with the APIs and the demo clients, and report any bugs via https://matrix.org/docs/api, experiment with the APIs and the demo clients, and
https://matrix.org/jira. report any bugs via https://matrix.org/jira.
Thanks for using Matrix! Thanks for using Matrix!
[1] End-to-end encryption is currently in development [1] End-to-end encryption is currently in development - see https://matrix.org/git/olm
Synapse Installation Synapse Installation
==================== ====================

View File

@ -68,6 +68,7 @@ class EventTypes(object):
RoomHistoryVisibility = "m.room.history_visibility" RoomHistoryVisibility = "m.room.history_visibility"
CanonicalAlias = "m.room.canonical_alias" CanonicalAlias = "m.room.canonical_alias"
RoomAvatar = "m.room.avatar" RoomAvatar = "m.room.avatar"
GuestAccess = "m.room.guest_access"
# These are used for validation # These are used for validation
Message = "m.room.message" Message = "m.room.message"

View File

@ -439,6 +439,7 @@ def setup(config_options):
hs.get_pusherpool().start() hs.get_pusherpool().start()
hs.get_state_handler().start_caching() hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling() hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
hs.get_replication_layer().start_get_pdu_cache() hs.get_replication_layer().start_get_pdu_cache()
return hs return hs

View File

@ -175,6 +175,8 @@ class BaseHandler(object):
if not suppress_auth: if not suppress_auth:
self.auth.check(event, auth_events=context.current_state) self.auth.check(event, auth_events=context.current_state)
yield self.maybe_kick_guest_users(event, context.current_state.values())
if event.type == EventTypes.CanonicalAlias: if event.type == EventTypes.CanonicalAlias:
# Check the alias is acually valid (at this time at least) # Check the alias is acually valid (at this time at least)
room_alias_str = event.content.get("alias", None) room_alias_str = event.content.get("alias", None)
@ -282,3 +284,58 @@ class BaseHandler(object):
federation_handler.handle_new_event( federation_handler.handle_new_event(
event, destinations=destinations, event, destinations=destinations,
) )
@defer.inlineCallbacks
def maybe_kick_guest_users(self, event, current_state):
# Technically this function invalidates current_state by changing it.
# Hopefully this isn't that important to the caller.
if event.type == EventTypes.GuestAccess:
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
yield self.kick_guest_users(current_state)
@defer.inlineCallbacks
def kick_guest_users(self, current_state):
for member_event in current_state:
try:
if member_event.type != EventTypes.Member:
continue
if not self.hs.is_mine(UserID.from_string(member_event.state_key)):
continue
if member_event.content["membership"] not in {
Membership.JOIN,
Membership.INVITE
}:
continue
if (
"kind" not in member_event.content
or member_event.content["kind"] != "guest"
):
continue
# We make the user choose to leave, rather than have the
# event-sender kick them. This is partially because we don't
# need to worry about power levels, and partially because guest
# users are a concept which doesn't hugely work over federation,
# and having homeservers have their own users leave keeps more
# of that decision-making and control local to the guest-having
# homeserver.
message_handler = self.hs.get_handlers().message_handler
yield message_handler.create_and_send_event(
{
"type": EventTypes.Member,
"state_key": member_event.state_key,
"content": {
"membership": Membership.LEAVE,
"kind": "guest"
},
"room_id": member_event.room_id,
"sender": member_event.state_key
},
ratelimit=False,
)
except Exception as e:
logger.warn("Error kicking guest user: %s" % (e,))

View File

@ -1097,8 +1097,6 @@ class FederationHandler(BaseHandler):
context = yield self._prep_event( context = yield self._prep_event(
origin, event, origin, event,
state=state, state=state,
backfilled=backfilled,
current_state=current_state,
auth_events=auth_events, auth_events=auth_events,
) )
@ -1121,7 +1119,6 @@ class FederationHandler(BaseHandler):
origin, origin,
ev_info["event"], ev_info["event"],
state=ev_info.get("state"), state=ev_info.get("state"),
backfilled=backfilled,
auth_events=ev_info.get("auth_events"), auth_events=ev_info.get("auth_events"),
) )
for ev_info in event_infos for ev_info in event_infos
@ -1208,8 +1205,7 @@ class FederationHandler(BaseHandler):
defer.returnValue((event_stream_id, max_stream_id)) defer.returnValue((event_stream_id, max_stream_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False, def _prep_event(self, origin, event, state=None, auth_events=None):
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier() outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context( context = yield self.state_handler.compute_event_context(
@ -1242,6 +1238,10 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR context.rejected = RejectedReason.AUTH_ERROR
if event.type == EventTypes.GuestAccess:
full_context = yield self.store.get_current_state(room_id=event.room_id)
yield self.maybe_kick_guest_users(event, full_context)
defer.returnValue(context) defer.returnValue(context)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -167,7 +167,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def create_and_send_event(self, event_dict, ratelimit=True, def create_and_send_event(self, event_dict, ratelimit=True,
token_id=None, txn_id=None): token_id=None, txn_id=None, is_guest=False):
""" Given a dict from a client, create and handle a new event. """ Given a dict from a client, create and handle a new event.
Creates an FrozenEvent object, filling out auth_events, prev_events, Creates an FrozenEvent object, filling out auth_events, prev_events,
@ -213,7 +213,7 @@ class MessageHandler(BaseHandler):
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
member_handler = self.hs.get_handlers().room_member_handler member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.change_membership(event, context) yield member_handler.change_membership(event, context, is_guest=is_guest)
else: else:
yield self.handle_new_client_event( yield self.handle_new_client_event(
event=event, event=event,

View File

@ -950,6 +950,7 @@ class PresenceHandler(BaseHandler):
) )
while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS: while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS:
self._remote_offline_serials.pop() # remove the oldest self._remote_offline_serials.pop() # remove the oldest
if user in self._user_cachemap:
del self._user_cachemap[user] del self._user_cachemap[user]
else: else:
# Remove the user from remote_offline_serials now that they're # Remove the user from remote_offline_serials now that they're

View File

@ -369,7 +369,7 @@ class RoomMemberHandler(BaseHandler):
remotedomains.add(member.domain) remotedomains.add(member.domain)
@defer.inlineCallbacks @defer.inlineCallbacks
def change_membership(self, event, context, do_auth=True): def change_membership(self, event, context, do_auth=True, is_guest=False):
""" Change the membership status of a user in a room. """ Change the membership status of a user in a room.
Args: Args:
@ -390,6 +390,20 @@ class RoomMemberHandler(BaseHandler):
# if this HS is not currently in the room, i.e. we have to do the # if this HS is not currently in the room, i.e. we have to do the
# invite/join dance. # invite/join dance.
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
if is_guest:
guest_access = context.current_state.get(
(EventTypes.GuestAccess, ""),
None
)
is_guest_access_allowed = (
guest_access
and guest_access.content
and "guest_access" in guest_access.content
and guest_access.content["guest_access"] == "can_join"
)
if not is_guest_access_allowed:
raise AuthError(403, "Guest access not allowed")
yield self._do_join(event, context, do_auth=do_auth) yield self._do_join(event, context, do_auth=do_auth)
else: else:
if event.membership == Membership.LEAVE: if event.membership == Membership.LEAVE:
@ -582,7 +596,6 @@ class RoomMemberHandler(BaseHandler):
medium, medium,
address, address,
id_server, id_server,
display_name,
token_id, token_id,
txn_id txn_id
): ):
@ -609,7 +622,6 @@ class RoomMemberHandler(BaseHandler):
else: else:
yield self._make_and_store_3pid_invite( yield self._make_and_store_3pid_invite(
id_server, id_server,
display_name,
medium, medium,
address, address,
room_id, room_id,
@ -673,7 +685,6 @@ class RoomMemberHandler(BaseHandler):
def _make_and_store_3pid_invite( def _make_and_store_3pid_invite(
self, self,
id_server, id_server,
display_name,
medium, medium,
address, address,
room_id, room_id,
@ -681,7 +692,7 @@ class RoomMemberHandler(BaseHandler):
token_id, token_id,
txn_id txn_id
): ):
token, public_key, key_validity_url = ( token, public_key, key_validity_url, display_name = (
yield self._ask_id_server_for_third_party_invite( yield self._ask_id_server_for_third_party_invite(
id_server, id_server,
medium, medium,
@ -725,10 +736,11 @@ class RoomMemberHandler(BaseHandler):
# TODO: Check for success # TODO: Check for success
token = data["token"] token = data["token"]
public_key = data["public_key"] public_key = data["public_key"]
display_name = data["display_name"]
key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
id_server_scheme, id_server, id_server_scheme, id_server,
) )
defer.returnValue((token, public_key, key_validity_url)) defer.returnValue((token, public_key, key_validity_url, display_name))
class RoomListHandler(BaseHandler): class RoomListHandler(BaseHandler):

View File

@ -272,7 +272,7 @@ class SyncHandler(BaseHandler):
def private_user_data_for_room(self, room_id, tags_by_room): def private_user_data_for_room(self, room_id, tags_by_room):
private_user_data = [] private_user_data = []
tags = tags_by_room.get(room_id) tags = tags_by_room.get(room_id)
if tags: if tags is not None:
private_user_data.append({ private_user_data.append({
"type": "m.tag", "type": "m.tag",
"content": {"tags": tags}, "content": {"tags": tags},
@ -311,8 +311,13 @@ class SyncHandler(BaseHandler):
ephemeral_by_room = {} ephemeral_by_room = {}
for event in typing: for event in typing:
room_id = event.pop("room_id") # we want to exclude the room_id from the event, but modifying the
ephemeral_by_room.setdefault(room_id, []).append(event) # result returned by the event source is poor form (it might cache
# the object)
room_id = event["room_id"]
event_copy = {k: v for (k, v) in event.iteritems()
if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
receipt_key = since_token.receipt_key if since_token else "0" receipt_key = since_token.receipt_key if since_token else "0"
@ -328,8 +333,11 @@ class SyncHandler(BaseHandler):
now_token = now_token.copy_and_replace("receipt_key", receipt_key) now_token = now_token.copy_and_replace("receipt_key", receipt_key)
for event in receipts: for event in receipts:
room_id = event.pop("room_id") room_id = event["room_id"]
ephemeral_by_room.setdefault(room_id, []).append(event) # exclude room id, as above
event_copy = {k: v for (k, v) in event.iteritems()
if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
defer.returnValue((now_token, ephemeral_by_room)) defer.returnValue((now_token, ephemeral_by_room))

View File

@ -175,7 +175,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request, room_id, event_type, txn_id=None): def on_POST(self, request, room_id, event_type, txn_id=None):
user, token_id, _ = yield self.auth.get_user_by_req(request) user, token_id, _ = yield self.auth.get_user_by_req(request, allow_guest=True)
content = _parse_json(request) content = _parse_json(request)
msg_handler = self.handlers.message_handler msg_handler = self.handlers.message_handler
@ -220,7 +220,10 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request, room_identifier, txn_id=None): def on_POST(self, request, room_identifier, txn_id=None):
user, token_id, _ = yield self.auth.get_user_by_req(request) user, token_id, is_guest = yield self.auth.get_user_by_req(
request,
allow_guest=True
)
# the identifier could be a room alias or a room id. Try one then the # the identifier could be a room alias or a room id. Try one then the
# other if it fails to parse, without swallowing other valid # other if it fails to parse, without swallowing other valid
@ -242,16 +245,20 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
defer.returnValue((200, ret_dict)) defer.returnValue((200, ret_dict))
else: # room id else: # room id
msg_handler = self.handlers.message_handler msg_handler = self.handlers.message_handler
content = {"membership": Membership.JOIN}
if is_guest:
content["kind"] = "guest"
yield msg_handler.create_and_send_event( yield msg_handler.create_and_send_event(
{ {
"type": EventTypes.Member, "type": EventTypes.Member,
"content": {"membership": Membership.JOIN}, "content": content,
"room_id": identifier.to_string(), "room_id": identifier.to_string(),
"sender": user.to_string(), "sender": user.to_string(),
"state_key": user.to_string(), "state_key": user.to_string(),
}, },
token_id=token_id, token_id=token_id,
txn_id=txn_id, txn_id=txn_id,
is_guest=is_guest,
) )
defer.returnValue((200, {"room_id": identifier.to_string()})) defer.returnValue((200, {"room_id": identifier.to_string()}))
@ -319,7 +326,7 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
})) }))
# TODO: Needs unit testing # TODO: Needs better unit testing
class RoomMessageListRestServlet(ClientV1RestServlet): class RoomMessageListRestServlet(ClientV1RestServlet):
PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/messages$") PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/messages$")
@ -459,7 +466,6 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
content["medium"], content["medium"],
content["address"], content["address"],
content["id_server"], content["id_server"],
content["display_name"],
token_id, token_id,
txn_id txn_id
) )
@ -494,7 +500,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
defer.returnValue((200, {})) defer.returnValue((200, {}))
def _has_3pid_invite_keys(self, content): def _has_3pid_invite_keys(self, content):
for key in {"id_server", "medium", "address", "display_name"}: for key in {"id_server", "medium", "address"}:
if key not in content: if key not in content:
return False return False
return True return True

View File

@ -0,0 +1,256 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from twisted.internet import defer
import ujson as json
import logging
logger = logging.getLogger(__name__)
class BackgroundUpdatePerformance(object):
"""Tracks the how long a background update is taking to update its items"""
def __init__(self, name):
self.name = name
self.total_item_count = 0
self.total_duration_ms = 0
self.avg_item_count = 0
self.avg_duration_ms = 0
def update(self, item_count, duration_ms):
"""Update the stats after doing an update"""
self.total_item_count += item_count
self.total_duration_ms += duration_ms
# Exponential moving averages for the number of items updated and
# the duration.
self.avg_item_count += 0.1 * (item_count - self.avg_item_count)
self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms)
def average_items_per_ms(self):
"""An estimate of how long it takes to do a single update.
Returns:
A duration in ms as a float
"""
if self.total_item_count == 0:
return None
else:
# Use the exponential moving average so that we can adapt to
# changes in how long the update process takes.
return float(self.avg_item_count) / float(self.avg_duration_ms)
def total_items_per_ms(self):
"""An estimate of how long it takes to do a single update.
Returns:
A duration in ms as a float
"""
if self.total_item_count == 0:
return None
else:
return float(self.total_item_count) / float(self.total_duration_ms)
class BackgroundUpdateStore(SQLBaseStore):
""" Background updates are updates to the database that run in the
background. Each update processes a batch of data at once. We attempt to
limit the impact of each update by monitoring how long each batch takes to
process and autotuning the batch size.
"""
MINIMUM_BACKGROUND_BATCH_SIZE = 100
DEFAULT_BACKGROUND_BATCH_SIZE = 100
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, hs):
super(BackgroundUpdateStore, self).__init__(hs)
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
self._background_update_timer = None
@defer.inlineCallbacks
def start_doing_background_updates(self):
while True:
if self._background_update_timer is not None:
return
sleep = defer.Deferred()
self._background_update_timer = self._clock.call_later(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
)
try:
yield sleep
finally:
self._background_update_timer = None
try:
result = yield self.do_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except:
logger.exception("Error doing update")
if result is None:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
)
return
@defer.inlineCallbacks
def do_background_update(self, desired_duration_ms):
"""Does some amount of work on a background update
Args:
desired_duration_ms(float): How long we want to spend
updating.
Returns:
A deferred that completes once some amount of work is done.
The deferred will have a value of None if there is currently
no more work to do.
"""
if not self._background_update_queue:
updates = yield self._simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name",),
)
for update in updates:
self._background_update_queue.append(update['update_name'])
if not self._background_update_queue:
defer.returnValue(None)
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
update_handler = self._background_update_handlers[update_name]
performance = self._background_update_performance.get(update_name)
if performance is None:
performance = BackgroundUpdatePerformance(update_name)
self._background_update_performance[update_name] = performance
items_per_ms = performance.average_items_per_ms()
if items_per_ms is not None:
batch_size = int(desired_duration_ms * items_per_ms)
# Clamp the batch size so that we always make progress
batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
progress_json = yield self._simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json"
)
progress = json.loads(progress_json)
time_start = self._clock.time_msec()
items_updated = yield update_handler(progress, batch_size)
time_stop = self._clock.time_msec()
duration_ms = time_stop - time_start
logger.info(
"Updating %r. Updated %r items in %rms."
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
update_name, items_updated, duration_ms,
performance.total_items_per_ms(),
performance.average_items_per_ms(),
performance.total_item_count,
)
performance.update(items_updated, duration_ms)
defer.returnValue(len(self._background_update_performance))
def register_background_update_handler(self, update_name, update_handler):
"""Register a handler for doing a background update.
The handler should take two arguments:
* A dict of the current progress
* An integer count of the number of items to update in this batch.
The handler should return a deferred integer count of items updated.
The hander is responsible for updating the progress of the update.
Args:
update_name(str): The name of the update that this code handles.
update_handler(function): The function that does the update.
"""
self._background_update_handlers[update_name] = update_handler
def start_background_update(self, update_name, progress):
"""Starts a background update running.
Args:
update_name: The update to set running.
progress: The initial state of the progress of the update.
Returns:
A deferred that completes once the task has been added to the
queue.
"""
# Clear the background update queue so that we will pick up the new
# task on the next iteration of do_background_update.
self._background_update_queue = []
progress_json = json.dumps(progress)
return self._simple_insert(
"background_updates",
{"update_name": update_name, "progress_json": progress_json}
)
def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue.
Args:
update_name(str): The name of the completed task to remove
Returns:
A deferred that completes once the task is removed.
"""
self._background_update_queue = [
name for name in self._background_update_queue if name != update_name
]
return self._simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
def _background_update_progress_txn(self, txn, update_name, progress):
"""Update the progress of a background update
Args:
txn(cursor): The transaction.
update_name(str): The name of the background update task
progress(dict): The progress of the update.
"""
progress_json = json.dumps(progress)
self._simple_update_one_txn(
txn,
"background_updates",
keyvalues={"update_name": update_name},
updatevalues={"progress_json": progress_json},
)

View File

@ -313,6 +313,8 @@ class EventsStore(SQLBaseStore):
self._store_redaction(txn, event) self._store_redaction(txn, event)
elif event.type == EventTypes.RoomHistoryVisibility: elif event.type == EventTypes.RoomHistoryVisibility:
self._store_history_visibility_txn(txn, event) self._store_history_visibility_txn(txn, event)
elif event.type == EventTypes.GuestAccess:
self._store_guest_access_txn(txn, event)
self._store_room_members_txn( self._store_room_members_txn(
txn, txn,

View File

@ -99,34 +99,39 @@ class RoomStore(SQLBaseStore):
""" """
def f(txn): def f(txn):
topic_subquery = ( def subquery(table_name, column_name=None):
"SELECT topics.event_id as event_id, " column_name = column_name or table_name
"topics.room_id as room_id, topic " return (
"FROM topics " "SELECT %(table_name)s.event_id as event_id, "
"%(table_name)s.room_id as room_id, %(column_name)s "
"FROM %(table_name)s "
"INNER JOIN current_state_events as c " "INNER JOIN current_state_events as c "
"ON c.event_id = topics.event_id " "ON c.event_id = %(table_name)s.event_id " % {
"column_name": column_name,
"table_name": table_name,
}
) )
name_subquery = (
"SELECT room_names.event_id as event_id, "
"room_names.room_id as room_id, name "
"FROM room_names "
"INNER JOIN current_state_events as c "
"ON c.event_id = room_names.event_id "
)
# We use non printing ascii character US (\x1F) as a separator
sql = ( sql = (
"SELECT r.room_id, max(n.name), max(t.topic)" "SELECT"
" r.room_id,"
" max(n.name),"
" max(t.topic),"
" max(v.history_visibility),"
" max(g.guest_access)"
" FROM rooms AS r" " FROM rooms AS r"
" LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
" LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
" LEFT JOIN (%(history_visibility)s) AS v ON v.room_id = r.room_id"
" LEFT JOIN (%(guest_access)s) AS g ON g.room_id = r.room_id"
" WHERE r.is_public = ?" " WHERE r.is_public = ?"
" GROUP BY r.room_id" " GROUP BY r.room_id" % {
) % { "topic": subquery("topics", "topic"),
"topic": topic_subquery, "name": subquery("room_names", "name"),
"name": name_subquery, "history_visibility": subquery("history_visibility"),
"guest_access": subquery("guest_access"),
} }
)
txn.execute(sql, (is_public,)) txn.execute(sql, (is_public,))
@ -156,10 +161,12 @@ class RoomStore(SQLBaseStore):
"room_id": r[0], "room_id": r[0],
"name": r[1], "name": r[1],
"topic": r[2], "topic": r[2],
"aliases": r[3], "world_readable": r[3] == "world_readable",
"guest_can_join": r[4] == "can_join",
"aliases": r[5],
} }
for r in rows for r in rows
if r[3] # We only return rooms that have at least one alias. if r[5] # We only return rooms that have at least one alias.
] ]
defer.returnValue(ret) defer.returnValue(ret)
@ -203,16 +210,22 @@ class RoomStore(SQLBaseStore):
) )
def _store_history_visibility_txn(self, txn, event): def _store_history_visibility_txn(self, txn, event):
if hasattr(event, "content") and "history_visibility" in event.content: self._store_content_index_txn(txn, event, "history_visibility")
def _store_guest_access_txn(self, txn, event):
self._store_content_index_txn(txn, event, "guest_access")
def _store_content_index_txn(self, txn, event, key):
if hasattr(event, "content") and key in event.content:
sql = ( sql = (
"INSERT INTO history_visibility" "INSERT INTO %(key)s"
" (event_id, room_id, history_visibility)" " (event_id, room_id, %(key)s)"
" VALUES (?, ?, ?)" " VALUES (?, ?, ?)" % {"key": key}
) )
txn.execute(sql, ( txn.execute(sql, (
event.event_id, event.event_id,
event.room_id, event.room_id,
event.content["history_visibility"] event.content[key]
)) ))
def _store_event_search_txn(self, txn, event, key, value): def _store_event_search_txn(self, txn, event, key, value):

View File

@ -0,0 +1,21 @@
/* Copyright 2015 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS background_updates(
update_name TEXT NOT NULL, -- The name of the background update.
progress_json TEXT NOT NULL, -- The current progress of the update as JSON.
CONSTRAINT background_updates_uniqueness UNIQUE (update_name)
);

View File

@ -22,7 +22,7 @@ import ujson
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
POSTGRES_SQL = """ POSTGRES_TABLE = """
CREATE TABLE IF NOT EXISTS event_search ( CREATE TABLE IF NOT EXISTS event_search (
event_id TEXT, event_id TEXT,
room_id TEXT, room_id TEXT,
@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search (
vector tsvector vector tsvector
); );
INSERT INTO event_search SELECT
event_id, room_id, json::json->>'sender', 'content.body',
to_tsvector('english', json::json->'content'->>'body')
FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
INSERT INTO event_search SELECT
event_id, room_id, json::json->>'sender', 'content.name',
to_tsvector('english', json::json->'content'->>'name')
FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
INSERT INTO event_search SELECT
event_id, room_id, json::json->>'sender', 'content.topic',
to_tsvector('english', json::json->'content'->>'topic')
FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
CREATE INDEX event_search_fts_idx ON event_search USING gin(vector); CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
CREATE INDEX event_search_ev_idx ON event_search(event_id); CREATE INDEX event_search_ev_idx ON event_search(event_id);
CREATE INDEX event_search_ev_ridx ON event_search(room_id); CREATE INDEX event_search_ev_ridx ON event_search(room_id);
@ -61,67 +45,34 @@ SQLITE_TABLE = (
def run_upgrade(cur, database_engine, *args, **kwargs): def run_upgrade(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine): if isinstance(database_engine, PostgresEngine):
run_postgres_upgrade(cur) for statement in get_statements(POSTGRES_TABLE.splitlines()):
return
if isinstance(database_engine, Sqlite3Engine):
run_sqlite_upgrade(cur)
return
def run_postgres_upgrade(cur):
for statement in get_statements(POSTGRES_SQL.splitlines()):
cur.execute(statement) cur.execute(statement)
elif isinstance(database_engine, Sqlite3Engine):
def run_sqlite_upgrade(cur):
cur.execute(SQLITE_TABLE) cur.execute(SQLITE_TABLE)
else:
raise Exception("Unrecognized database engine")
rowid = -1 cur.execute("SELECT MIN(stream_ordering) FROM events")
while True: rows = cur.fetchall()
cur.execute( min_stream_id = rows[0][0]
"SELECT rowid, json FROM event_json"
" WHERE rowid > ?" cur.execute("SELECT MAX(stream_ordering) FROM events")
" ORDER BY rowid ASC LIMIT 100", rows = cur.fetchall()
(rowid,) max_stream_id = rows[0][0]
if min_stream_id is not None and max_stream_id is not None:
progress = {
"target_min_stream_id_inclusive": min_stream_id,
"max_stream_id_exclusive": max_stream_id + 1,
"rows_inserted": 0,
}
progress_json = ujson.dumps(progress)
sql = (
"INSERT into background_updates (update_name, progress_json)"
" VALUES (?, ?)"
) )
res = cur.fetchall() sql = database_engine.convert_param_style(sql)
if not res: cur.execute(sql, ("event_search", progress_json))
break
events = [
ujson.loads(js)
for _, js in res
]
rowid = max(rid for rid, _ in res)
rows = []
for ev in events:
content = ev.get("content", {})
body = content.get("body", None)
name = content.get("name", None)
topic = content.get("topic", None)
sender = ev.get("sender", None)
if ev["type"] == "m.room.message" and body:
rows.append((
ev["event_id"], ev["room_id"], sender, "content.body", body
))
if ev["type"] == "m.room.name" and name:
rows.append((
ev["event_id"], ev["room_id"], sender, "content.name", name
))
if ev["type"] == "m.room.topic" and topic:
rows.append((
ev["event_id"], ev["room_id"], sender, "content.topic", topic
))
if rows:
logger.info(rows)
cur.executemany(
"INSERT INTO event_search (event_id, room_id, sender, key, value)"
" VALUES (?,?,?,?,?)",
rows
)

View File

@ -0,0 +1,25 @@
/* Copyright 2015 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This is a manual index of guest_access content of state events,
* so that we can join on them in SELECT statements.
*/
CREATE TABLE IF NOT EXISTS guest_access(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
guest_access TEXT NOT NULL,
UNIQUE (event_id)
);

View File

@ -18,7 +18,6 @@
* so that we can join on them in SELECT statements. * so that we can join on them in SELECT statements.
*/ */
CREATE TABLE IF NOT EXISTS history_visibility( CREATE TABLE IF NOT EXISTS history_visibility(
id INTEGER PRIMARY KEY,
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
history_visibility TEXT NOT NULL, history_visibility TEXT NOT NULL,

View File

@ -15,7 +15,7 @@
from twisted.internet import defer from twisted.internet import defer
from _base import SQLBaseStore from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@ -25,7 +25,106 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SearchStore(SQLBaseStore): class SearchStore(BackgroundUpdateStore):
EVENT_SEARCH_UPDATE_NAME = "event_search"
def __init__(self, hs):
super(SearchStore, self).__init__(hs)
self.register_background_update_handler(
self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
)
@defer.inlineCallbacks
def _background_reindex_search(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
rows = txn.fetchall()
if not rows:
return 0
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
events = self._get_events_txn(txn, event_ids)
event_search_rows = []
for event in events:
try:
event_id = event.event_id
room_id = event.room_id
content = event.content
if event.type == "m.room.message":
key = "content.body"
value = content["body"]
elif event.type == "m.room.topic":
key = "content.topic"
value = content["topic"]
elif event.type == "m.room.name":
key = "content.name"
value = content["name"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
event_search_rows.append((event_id, room_id, key, value))
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, vector)"
" VALUES (?,?,?,to_tsvector('english', ?))"
)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(event_search_rows)
}
self._background_update_progress_txn(
txn, self.EVENT_SEARCH_UPDATE_NAME, progress
)
return len(event_search_rows)
result = yield self.runInteraction(
self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
)
if not result:
yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def search_msgs(self, room_ids, search_term, keys): def search_msgs(self, room_ids, search_term, keys):
"""Performs a full text search over events with given keys. """Performs a full text search over events with given keys.
@ -154,7 +253,7 @@ class SearchStore(SQLBaseStore):
) )
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
sql = ( sql = (
"SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id,"
" topological_ordering, stream_ordering" " topological_ordering, stream_ordering"
" FROM event_search" " FROM event_search"
" NATURAL JOIN events" " NATURAL JOIN events"

View File

@ -95,7 +95,7 @@ class TagsStore(SQLBaseStore):
if room_ids: if room_ids:
tags_by_room = yield self.get_tags_for_user(user_id) tags_by_room = yield self.get_tags_for_user(user_id)
for room_id in room_ids: for room_id in room_ids:
results[room_id] = tags_by_room[room_id] results[room_id] = tags_by_room.get(room_id, {})
defer.returnValue(results) defer.returnValue(results)

View File

@ -53,6 +53,14 @@ class Clock(object):
loop.stop() loop.stop()
def call_later(self, delay, callback, *args, **kwargs): def call_later(self, delay, callback, *args, **kwargs):
"""Call something later
Args:
delay(float): How long to wait in seconds.
callback(function): Function to call
*args: Postional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
current_context = LoggingContext.current_context() current_context = LoggingContext.current_context()
def wrapped_callback(*args, **kwargs): def wrapped_callback(*args, **kwargs):

View File

@ -994,3 +994,59 @@ class RoomInitialSyncTestCase(RestTestCase):
} }
self.assertTrue(self.user_id in presence_by_user) self.assertTrue(self.user_id in presence_by_user)
self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
class RoomMessageListTestCase(RestTestCase):
""" Tests /rooms/$room_id/messages REST events. """
user_id = "@sid1:red"
@defer.inlineCallbacks
def setUp(self):
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
self.auth_user_id = self.user_id
hs = yield setup_test_homeserver(
"red",
http_client=None,
replication_layer=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]),
)
self.ratelimiter = hs.get_ratelimiter()
self.ratelimiter.send_message.return_value = (True, 0)
hs.get_handlers().federation_handler = Mock()
def _get_user_by_access_token(token=None, allow_guest=False):
return {
"user": UserID.from_string(self.auth_user_id),
"token_id": 1,
"is_guest": False,
}
hs.get_v1auth()._get_user_by_access_token = _get_user_by_access_token
def _insert_client_ip(*args, **kwargs):
return defer.succeed(None)
hs.get_datastore().insert_client_ip = _insert_client_ip
synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource)
self.room_id = yield self.create_room_as(self.user_id)
@defer.inlineCallbacks
def test_topo_token_is_accepted(self):
token = "t1-0_0_0_0_0"
(code, response) = yield self.mock_resource.trigger_get(
"/rooms/%s/messages?access_token=x&from=%s" %
(self.room_id, token))
self.assertEquals(200, code)
self.assertTrue("start" in response)
self.assertEquals(token, response['start'])
self.assertTrue("chunk" in response)
self.assertTrue("end" in response)
@defer.inlineCallbacks
def test_stream_token_is_rejected(self):
(code, response) = yield self.mock_resource.trigger_get(
"/rooms/%s/messages?access_token=x&from=s0_0_0_0" %
self.room_id)
self.assertEquals(400, code)

View File

@ -0,0 +1,76 @@
from tests import unittest
from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.types import UserID, RoomID, RoomAlias
from tests.utils import setup_test_homeserver
from mock import Mock
class BackgroundUpdateTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
hs = yield setup_test_homeserver()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.update_handler = Mock()
yield self.store.register_background_update_handler(
"test_update", self.update_handler
)
@defer.inlineCallbacks
def test_do_background_update(self):
desired_count = 1000;
duration_ms = 42;
@defer.inlineCallbacks
def update(progress, count):
self.clock.advance_time_msec(count * duration_ms)
progress = {"my_key": progress["my_key"] + 1}
yield self.store.runInteraction(
"update_progress",
self.store._background_update_progress_txn,
"test_update",
progress,
)
defer.returnValue(count)
self.update_handler.side_effect = update
yield self.store.start_background_update("test_update", {"my_key": 1})
self.update_handler.reset_mock()
result = yield self.store.do_background_update(
duration_ms * desired_count
)
self.assertIsNotNone(result)
self.update_handler.assert_called_once_with(
{"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE
)
@defer.inlineCallbacks
def update(progress, count):
yield self.store._end_background_update("test_update")
defer.returnValue(count)
self.update_handler.side_effect = update
self.update_handler.reset_mock()
result = yield self.store.do_background_update(
duration_ms * desired_count
)
self.assertIsNotNone(result)
self.update_handler.assert_called_once_with(
{"my_key": 2}, desired_count
)
self.update_handler.reset_mock()
result = yield self.store.do_background_update(
duration_ms * desired_count
)
self.assertIsNone(result)
self.assertFalse(self.update_handler.called)

View File

@ -73,6 +73,8 @@ class RoomStoreTestCase(unittest.TestCase):
"room_id": self.room.to_string(), "room_id": self.room.to_string(),
"topic": None, "topic": None,
"aliases": [self.alias.to_string()], "aliases": [self.alias.to_string()],
"world_readable": False,
"guest_can_join": False,
}, rooms[0]) }, rooms[0])

View File

@ -243,6 +243,9 @@ class MockClock(object):
else: else:
self.timers.append(t) self.timers.append(t)
def advance_time_msec(self, ms):
self.advance_time(ms / 1000.)
class SQLiteMemoryDbPool(ConnectionPool, object): class SQLiteMemoryDbPool(ConnectionPool, object):
def __init__(self): def __init__(self):