Merge branch 'develop' of github.com:matrix-org/synapse into erikj/soft_fail_impl

This commit is contained in:
Erik Johnston 2019-03-08 11:44:20 +00:00
commit 8c4896668f
40 changed files with 712 additions and 504 deletions

1
changelog.d/4537.feature Normal file
View File

@ -0,0 +1 @@
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.

1
changelog.d/4779.misc Normal file
View File

@ -0,0 +1 @@
Update URLs for riot.im icons and logos in the default notification templates.

1
changelog.d/4792.bugfix Normal file
View File

@ -0,0 +1 @@
Handle batch updates in worker replication protocol.

1
changelog.d/4801.feature Normal file
View File

@ -0,0 +1 @@
Include a default configuration file in the 'docs' directory.

1
changelog.d/4804.feature Normal file
View File

@ -0,0 +1 @@
Add configurable rate limiting to the /register endpoint.

1
changelog.d/4815.misc Normal file
View File

@ -0,0 +1 @@
Add some docstrings.

1
changelog.d/4816.misc Normal file
View File

@ -0,0 +1 @@
Add debug logger to try and track down #4422.

1
changelog.d/4817.misc Normal file
View File

@ -0,0 +1 @@
Make shutdown API send explanation message to room after users have been forced joined.

1
changelog.d/4818.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where we didn't correctly throttle sending of USER_IP commands over replication.

1
changelog.d/4820.misc Normal file
View File

@ -0,0 +1 @@
Update example_log_config.yaml.

1
changelog.d/4824.misc Normal file
View File

@ -0,0 +1 @@
Document the `generate` option for the docker image.

1
changelog.d/4825.misc Normal file
View File

@ -0,0 +1 @@
Fix check-newsfragment for debian-only changes.

1
changelog.d/4828.misc Normal file
View File

@ -0,0 +1 @@
Add some debug logging for device list updates to help with #4828.

1
changelog.d/4829.bugfix Normal file
View File

@ -0,0 +1 @@
Fix potential race in handling missing updates in device list updates.

View File

@ -19,6 +19,7 @@ handlers:
# example output to console # example output to console
console: console:
class: logging.StreamHandler class: logging.StreamHandler
formatter: fmt
filters: [context] filters: [context]
# example output to file - to enable, edit 'root' config below. # example output to file - to enable, edit 'root' config below.
@ -29,7 +30,7 @@ handlers:
maxBytes: 100000000 maxBytes: 100000000
backupCount: 3 backupCount: 3
filters: [context] filters: [context]
encoding: utf8
root: root:
level: INFO level: INFO

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
matrix-synapse-py3 (0.99.3) UNRELEASED; urgency=medium
* Fix warning during preconfiguration. (Fixes: #4819)
-- Richard van der Hoff <richard@matrix.org> Thu, 07 Mar 2019 07:17:00 +0000
matrix-synapse-py3 (0.99.2) stable; urgency=medium matrix-synapse-py3 (0.99.2) stable; urgency=medium
* Fix overwriting of config settings on upgrade. * Fix overwriting of config settings on upgrade.

View File

@ -5,7 +5,11 @@ set -e
. /usr/share/debconf/confmodule . /usr/share/debconf/confmodule
# try to update the debconf db according to whatever is in the config files # try to update the debconf db according to whatever is in the config files
/opt/venvs/matrix-synapse/lib/manage_debconf.pl read || true #
# note that we may get run during preconfiguration, in which case the script
# will not yet be installed.
[ -x /opt/venvs/matrix-synapse/lib/manage_debconf.pl ] && \
/opt/venvs/matrix-synapse/lib/manage_debconf.pl read
db_input high matrix-synapse/server-name || true db_input high matrix-synapse/server-name || true
db_input high matrix-synapse/report-stats || true db_input high matrix-synapse/report-stats || true

View File

@ -28,7 +28,7 @@ with your postgres database.
docker run \ docker run \
-d \ -d \
--name synapse \ --name synapse \
-v ${DATA_PATH}:/data \ --mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_SERVER_NAME=my.matrix.host \ -e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \ -e SYNAPSE_REPORT_STATS=yes \
matrixdotorg/synapse:latest matrixdotorg/synapse:latest
@ -87,10 +87,15 @@ Global settings:
* ``SYNAPSE_CONFIG_PATH``, path to a custom config file * ``SYNAPSE_CONFIG_PATH``, path to a custom config file
If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
then customize it manually. No other environment variable is required. then customize it manually: see [Generating a config
file](#generating-a-config-file).
Otherwise, a dynamic configuration file will be used. The following environment Otherwise, a dynamic configuration file will be used.
variables are available for configuration:
### Environment variables used to build a dynamic configuration file
The following environment variables are used to build the configuration file
when ``SYNAPSE_CONFIG_PATH`` is not set.
* ``SYNAPSE_SERVER_NAME`` (mandatory), the server public hostname. * ``SYNAPSE_SERVER_NAME`` (mandatory), the server public hostname.
* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous * ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
@ -143,3 +148,31 @@ Mail server specific values (will not send emails if not set):
any. any.
* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail * ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail
server if any. server if any.
### Generating a config file
It is possible to generate a basic configuration file for use with
`SYNAPSE_CONFIG_PATH` using the `generate` commandline option. You will need to
specify values for `SYNAPSE_CONFIG_PATH`, `SYNAPSE_SERVER_NAME` and
`SYNAPSE_REPORT_STATS`, and mount a docker volume to store the data on. For
example:
```
docker run -it --rm
--mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_CONFIG_PATH=/data/homeserver.yaml \
-e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \
matrixdotorg/synapse:latest generate
```
This will generate a `homeserver.yaml` in (typically)
`/var/lib/docker/volumes/synapse-data/_data`, which you can then customise and
use with:
```
docker run -d --name synapse \
--mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_CONFIG_PATH=/data/homeserver.yaml \
matrixdotorg/synapse:latest
```

View File

@ -1,7 +1,12 @@
# This file is a reference to the configuration options which can be set in # The config is maintained as an up-to-date snapshot of the default
# homeserver.yaml. # homeserver.yaml configuration generated by Synapse.
# #
# Note that it is not quite ready to be used as-is. If you are starting from # It is intended to act as a reference for the default configuration,
# scratch, it is easier to generate the config files following the instructions # helping admins keep track of new options and other changes, and compare
# in INSTALL.md. # their configs with the current default. As such, many of the actual
# config values shown are placeholders.
#
# It is *not* intended to be copied and used as the basis for a real
# homeserver.yaml. Instead, if you are starting from scratch, please generate
# a fresh config using Synapse by following the instructions in INSTALL.md.

View File

@ -1,9 +1,14 @@
# This file is a reference to the configuration options which can be set in # The config is maintained as an up-to-date snapshot of the default
# homeserver.yaml. # homeserver.yaml configuration generated by Synapse.
# #
# Note that it is not quite ready to be used as-is. If you are starting from # It is intended to act as a reference for the default configuration,
# scratch, it is easier to generate the config files following the instructions # helping admins keep track of new options and other changes, and compare
# in INSTALL.md. # their configs with the current default. As such, many of the actual
# config values shown are placeholders.
#
# It is *not* intended to be copied and used as the basis for a real
# homeserver.yaml. Instead, if you are starting from scratch, please generate
# a fresh config using Synapse by following the instructions in INSTALL.md.
## Server ## ## Server ##
@ -393,6 +398,17 @@ federation_rc_reject_limit: 50
# #
federation_rc_concurrent: 3 federation_rc_concurrent: 3
# Number of registration requests a client can send per second.
# Defaults to 1/minute (0.17).
#
#rc_registration_requests_per_second: 0.17
# Number of registration requests a client can send before being
# throttled.
# Defaults to 3.
#
#rc_registration_request_burst_count: 3.0
# Directory where uploaded images and attachments are stored. # Directory where uploaded images and attachments are stored.
@ -580,6 +596,8 @@ turn_allow_guests: True
## Registration ## ## Registration ##
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users. # Enable registration for new users.
enable_registration: False enable_registration: False
@ -657,17 +675,6 @@ trusted_third_party_id_servers:
# #
autocreate_auto_join_rooms: true autocreate_auto_join_rooms: true
# Number of registration requests a client can send per second.
# Defaults to 1/minute (0.17).
#
#rc_registration_requests_per_second: 0.17
# Number of registration requests a client can send before being
# throttled.
# Defaults to 3.
#
#rc_registration_request_burst_count: 3.0
## Metrics ### ## Metrics ###

View File

@ -7,14 +7,12 @@ set -e
# make sure that origin/develop is up to date # make sure that origin/develop is up to date
git remote set-branches --add origin develop git remote set-branches --add origin develop
git fetch --depth=1 origin develop git fetch origin develop
UPSTREAM=origin/develop
# if there are changes in the debian directory, check that the debian changelog # if there are changes in the debian directory, check that the debian changelog
# has been updated # has been updated
if ! git diff --quiet $UPSTREAM... -- debian; then if ! git diff --quiet FETCH_HEAD... -- debian; then
if git diff --quiet $UPSTREAM... -- debian/changelog; then if git diff --quiet FETCH_HEAD... -- debian/changelog; then
echo "Updates to debian directory, but no update to the changelog." >&2 echo "Updates to debian directory, but no update to the changelog." >&2
exit 1 exit 1
fi fi
@ -22,7 +20,7 @@ fi
# if there are changes *outside* the debian directory, check that the # if there are changes *outside* the debian directory, check that the
# newsfragments have been updated. # newsfragments have been updated.
if git diff --name-only $UPSTREAM... | grep -qv '^develop/'; then if git diff --name-only FETCH_HEAD... | grep -qv '^debian/'; then
tox -e check-newsfragment tox -e check-newsfragment
fi fi
@ -31,7 +29,7 @@ echo "--------------------------"
echo echo
# check that any new newsfiles on this branch end with a full stop. # check that any new newsfiles on this branch end with a full stop.
for f in `git diff --name-only $UPSTREAM... -- changelog.d`; do for f in `git diff --name-only FETCH_HEAD... -- changelog.d`; do
lastchar=`tr -d '\n' < $f | tail -c 1` lastchar=`tr -d '\n' < $f | tail -c 1`
if [ $lastchar != '.' ]; then if [ $lastchar != '.' ]; then
echo -e "\e[31mERROR: newsfragment $f does not end with a '.'\e[39m" >&2 echo -e "\e[31mERROR: newsfragment $f does not end with a '.'\e[39m" >&2

View File

@ -27,6 +27,13 @@ class RatelimitConfig(Config):
self.federation_rc_reject_limit = config["federation_rc_reject_limit"] self.federation_rc_reject_limit = config["federation_rc_reject_limit"]
self.federation_rc_concurrent = config["federation_rc_concurrent"] self.federation_rc_concurrent = config["federation_rc_concurrent"]
self.rc_registration_requests_per_second = config.get(
"rc_registration_requests_per_second", 0.17,
)
self.rc_registration_request_burst_count = config.get(
"rc_registration_request_burst_count", 3,
)
def default_config(self, **kwargs): def default_config(self, **kwargs):
return """\ return """\
## Ratelimiting ## ## Ratelimiting ##
@ -62,4 +69,15 @@ class RatelimitConfig(Config):
# single server # single server
# #
federation_rc_concurrent: 3 federation_rc_concurrent: 3
# Number of registration requests a client can send per second.
# Defaults to 1/minute (0.17).
#
#rc_registration_requests_per_second: 0.17
# Number of registration requests a client can send before being
# throttled.
# Defaults to 3.
#
#rc_registration_request_burst_count: 3.0
""" """

View File

@ -54,13 +54,6 @@ class RegistrationConfig(Config):
config.get("disable_msisdn_registration", False) config.get("disable_msisdn_registration", False)
) )
self.rc_registration_requests_per_second = config.get(
"rc_registration_requests_per_second", 0.17,
)
self.rc_registration_request_burst_count = config.get(
"rc_registration_request_burst_count", 3,
)
def default_config(self, generate_secrets=False, **kwargs): def default_config(self, generate_secrets=False, **kwargs):
if generate_secrets: if generate_secrets:
registration_shared_secret = 'registration_shared_secret: "%s"' % ( registration_shared_secret = 'registration_shared_secret: "%s"' % (
@ -71,6 +64,8 @@ class RegistrationConfig(Config):
return """\ return """\
## Registration ## ## Registration ##
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users. # Enable registration for new users.
enable_registration: False enable_registration: False
@ -147,17 +142,6 @@ class RegistrationConfig(Config):
# users cannot be auto-joined since they do not exist. # users cannot be auto-joined since they do not exist.
# #
autocreate_auto_join_rooms: true autocreate_auto_join_rooms: true
# Number of registration requests a client can send per second.
# Defaults to 1/minute (0.17).
#
#rc_registration_requests_per_second: 0.17
# Number of registration requests a client can send before being
# throttled.
# Defaults to 3.
#
#rc_registration_request_burst_count: 3.0
""" % locals() """ % locals()
def add_arguments(self, parser): def add_arguments(self, parser):

View File

@ -402,6 +402,12 @@ class DeviceHandler(DeviceWorkerHandler):
user_id, device_ids, list(hosts) user_id, device_ids, list(hosts)
) )
for device_id in device_ids:
logger.debug(
"Notifying about update %r/%r, ID: %r", user_id, device_id,
position,
)
room_ids = yield self.store.get_rooms_for_user(user_id) room_ids = yield self.store.get_rooms_for_user(user_id)
yield self.notifier.on_new_event( yield self.notifier.on_new_event(
@ -409,7 +415,7 @@ class DeviceHandler(DeviceWorkerHandler):
) )
if hosts: if hosts:
logger.info("Sending device list update notif to: %r", hosts) logger.info("Sending device list update notif for %r to: %r", user_id, hosts)
for host in hosts: for host in hosts:
self.federation_sender.send_device_messages(host) self.federation_sender.send_device_messages(host)
@ -479,15 +485,26 @@ class DeviceListEduUpdater(object):
if get_domain_from_id(user_id) != origin: if get_domain_from_id(user_id) != origin:
# TODO: Raise? # TODO: Raise?
logger.warning("Got device list update edu for %r from %r", user_id, origin) logger.warning(
"Got device list update edu for %r/%r from %r",
user_id, device_id, origin,
)
return return
room_ids = yield self.store.get_rooms_for_user(user_id) room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids: if not room_ids:
# We don't share any rooms with this user. Ignore update, as we # We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates. # probably won't get any further updates.
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id, device_id,
)
return return
logger.debug(
"Received device list update for %r/%r", user_id, device_id,
)
self._pending_updates.setdefault(user_id, []).append( self._pending_updates.setdefault(user_id, []).append(
(device_id, stream_id, prev_ids, edu_content) (device_id, stream_id, prev_ids, edu_content)
) )
@ -505,10 +522,18 @@ class DeviceListEduUpdater(object):
# This can happen since we batch updates # This can happen since we batch updates
return return
for device_id, stream_id, prev_ids, content in pending_updates:
logger.debug(
"Handling update %r/%r, ID: %r, prev: %r ",
user_id, device_id, stream_id, prev_ids,
)
# Given a list of updates we check if we need to resync. This # Given a list of updates we check if we need to resync. This
# happens if we've missed updates. # happens if we've missed updates.
resync = yield self._need_to_do_resync(user_id, pending_updates) resync = yield self._need_to_do_resync(user_id, pending_updates)
logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
if resync: if resync:
# Fetch all devices for the user. # Fetch all devices for the user.
origin = get_domain_from_id(user_id) origin = get_domain_from_id(user_id)
@ -561,11 +586,21 @@ class DeviceListEduUpdater(object):
) )
devices = [] devices = []
for device in devices:
logger.debug(
"Handling resync update %r/%r, ID: %r",
user_id, device["device_id"], stream_id,
)
yield self.store.update_remote_device_list_cache( yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id, user_id, devices, stream_id,
) )
device_ids = [device["device_id"] for device in devices] device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids) yield self.device_handler.notify_device_update(user_id, device_ids)
# We clobber the seen updates since we've re-synced from a given
# point.
self._seen_updates[user_id] = set([stream_id])
else: else:
# Simply update the single device, since we know that is the only # Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache) # change (because of the single prev_id matching the current cache)
@ -593,6 +628,11 @@ class DeviceListEduUpdater(object):
user_id user_id
) )
logger.debug(
"Current extremity for %r: %r",
user_id, extremity,
)
stream_id_in_updates = set() # stream_ids in updates list stream_id_in_updates = set() # stream_ids in updates list
for _, stream_id, prev_ids, _ in updates: for _, stream_id, prev_ids, _ in updates:
if not prev_ids: if not prev_ids:

View File

@ -61,7 +61,7 @@ class RegistrationHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler() self.user_directory_handler = hs.get_user_directory_handler()
self.captcha_client = CaptchaServerHttpClient(hs) self.captcha_client = CaptchaServerHttpClient(hs)
self.identity_handler = self.hs.get_handlers().identity_handler self.identity_handler = self.hs.get_handlers().identity_handler
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_registration_ratelimiter()
self._next_generated_user_id = None self._next_generated_user_id = None

View File

@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
# Counts the number of times we returned a non-empty sync. `type` is one of # Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@ -962,6 +965,15 @@ class SyncHandler(object):
yield self._generate_sync_entry_for_groups(sync_result_builder) yield self._generate_sync_entry_for_groups(sync_result_builder)
# debug for https://github.com/matrix-org/synapse/issues/4422
for joined_room in sync_result_builder.joined:
room_id = joined_room.room_id
if room_id in newly_joined_rooms:
issue4422_logger.debug(
"Sync result for newly joined room %s: %r",
room_id, joined_room,
)
defer.returnValue(SyncResult( defer.returnValue(SyncResult(
presence=sync_result_builder.presence, presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data, account_data=sync_result_builder.account_data,
@ -1425,6 +1437,17 @@ class SyncHandler(object):
old_mem_ev = yield self.store.get_event( old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True old_mem_ev_id, allow_none=True
) )
# debug for #4422
if has_join:
prev_membership = None
if old_mem_ev:
prev_membership = old_mem_ev.membership
issue4422_logger.debug(
"Previous membership for room %s with join: %s (event %s)",
room_id, prev_membership, old_mem_ev_id,
)
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id) newly_joined_rooms.append(room_id)
@ -1519,30 +1542,39 @@ class SyncHandler(object):
for room_id in sync_result_builder.joined_room_ids: for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None) room_entry = room_to_events.get(room_id, None)
newly_joined = room_id in newly_joined_rooms
if room_entry: if room_entry:
events, start_key = room_entry events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace("room_key", start_key) prev_batch_token = now_token.copy_and_replace("room_key", start_key)
room_entries.append(RoomSyncResultBuilder( entry = RoomSyncResultBuilder(
room_id=room_id, room_id=room_id,
rtype="joined", rtype="joined",
events=events, events=events,
newly_joined=room_id in newly_joined_rooms, newly_joined=newly_joined,
full_state=False, full_state=False,
since_token=None if room_id in newly_joined_rooms else since_token, since_token=None if newly_joined else since_token,
upto_token=prev_batch_token, upto_token=prev_batch_token,
)) )
else: else:
room_entries.append(RoomSyncResultBuilder( entry = RoomSyncResultBuilder(
room_id=room_id, room_id=room_id,
rtype="joined", rtype="joined",
events=[], events=[],
newly_joined=room_id in newly_joined_rooms, newly_joined=newly_joined,
full_state=False, full_state=False,
since_token=since_token, since_token=since_token,
upto_token=since_token, upto_token=since_token,
)) )
if newly_joined:
# debugging for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"RoomSyncResultBuilder events for newly joined room %s: %r",
room_id, entry.events,
)
room_entries.append(entry)
defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
@ -1663,6 +1695,13 @@ class SyncHandler(object):
newly_joined_room=newly_joined, newly_joined_room=newly_joined,
) )
if newly_joined:
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"Timeline events after filtering in newly-joined room %s: %r",
room_id, batch,
)
# When we join the room (or the client requests full_state), we should # When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a # send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the # newly joined room, unless either a) they've joined before or b) the
@ -1894,15 +1933,34 @@ def _calculate_state(
class SyncResultBuilder(object): class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user" """Used to help build up a new SyncResult for a user
Attributes:
sync_config (SyncConfig)
full_state (bool)
since_token (StreamToken)
now_token (StreamToken)
joined_room_ids (list[str])
# The following mirror the fields in a sync response
presence (list)
account_data (list)
joined (list[JoinedSyncResult])
invited (list[InvitedSyncResult])
archived (list[ArchivedSyncResult])
device (list)
groups (GroupsSyncResult|None)
to_device (list)
"""
def __init__(self, sync_config, full_state, since_token, now_token, def __init__(self, sync_config, full_state, since_token, now_token,
joined_room_ids): joined_room_ids):
""" """
Args: Args:
sync_config(SyncConfig) sync_config (SyncConfig)
full_state(bool): The full_state flag as specified by user full_state (bool): The full_state flag as specified by user
since_token(StreamToken): The token supplied by user, or None. since_token (StreamToken): The token supplied by user, or None.
now_token(StreamToken): The token to sync up to. now_token (StreamToken): The token to sync up to.
joined_room_ids (list[str]): List of rooms the user is joined to
""" """
self.sync_config = sync_config self.sync_config = sync_config
self.full_state = full_state self.full_state = full_state
@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object):
Args: Args:
room_id(str) room_id(str)
rtype(str): One of `"joined"` or `"archived"` rtype(str): One of `"joined"` or `"archived"`
events(list): List of events to include in the room, (more events events(list[FrozenEvent]): List of events to include in the room
may be added when generating result). (more events may be added when generating result).
newly_joined(bool): If the user has newly joined the room newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None since_token(StreamToken): Earliest point to return events from, or None

View File

@ -15,7 +15,7 @@
import logging import logging
from six import iteritems from six import iteritems, iterkeys
from twisted.internet import defer from twisted.internet import defer
@ -63,10 +63,6 @@ class UserDirectoryHandler(object):
# When start up for the first time we need to populate the user_directory. # When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already # This is a set of user_id's we've inserted already
self.initially_handled_users = set() self.initially_handled_users = set()
self.initially_handled_users_in_public = set()
self.initially_handled_users_share = set()
self.initially_handled_users_share_private_room = set()
# The current position in the current_state_delta stream # The current position in the current_state_delta stream
self.pos = None self.pos = None
@ -140,7 +136,6 @@ class UserDirectoryHandler(object):
# FIXME(#3714): We should probably do this in the same worker as all # FIXME(#3714): We should probably do this in the same worker as all
# the other changes. # the other changes.
yield self.store.remove_from_user_dir(user_id) yield self.store.remove_from_user_dir(user_id)
yield self.store.remove_from_user_in_public_room(user_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def _unsafe_process(self): def _unsafe_process(self):
@ -215,15 +210,13 @@ class UserDirectoryHandler(object):
logger.info("Processed all users") logger.info("Processed all users")
self.initially_handled_users = None self.initially_handled_users = None
self.initially_handled_users_in_public = None
self.initially_handled_users_share = None
self.initially_handled_users_share_private_room = None
yield self.store.update_user_directory_stream_pos(new_pos) yield self.store.update_user_directory_stream_pos(new_pos)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_initial_room(self, room_id): def _handle_initial_room(self, room_id):
"""Called when we initially fill out user_directory one room at a time """
Called when we initially fill out user_directory one room at a time
""" """
is_in_room = yield self.store.is_host_joined(room_id, self.server_name) is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
if not is_in_room: if not is_in_room:
@ -238,23 +231,15 @@ class UserDirectoryHandler(object):
unhandled_users = user_ids - self.initially_handled_users unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir( yield self.store.add_profiles_to_user_dir(
room_id,
{user_id: users_with_profile[user_id] for user_id in unhandled_users}, {user_id: users_with_profile[user_id] for user_id in unhandled_users},
) )
self.initially_handled_users |= unhandled_users self.initially_handled_users |= unhandled_users
if is_public:
yield self.store.add_users_to_public_room(
room_id, user_ids=user_ids - self.initially_handled_users_in_public
)
self.initially_handled_users_in_public |= user_ids
# We now go and figure out the new users who share rooms with user entries # We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources. # We sleep aggressively here as otherwise it can starve resources.
# We also batch up inserts/updates, but try to avoid too many at once. # We also batch up inserts/updates, but try to avoid too many at once.
to_insert = set() to_insert = set()
to_update = set()
count = 0 count = 0
for user_id in user_ids: for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
@ -277,44 +262,18 @@ class UserDirectoryHandler(object):
count += 1 count += 1
user_set = (user_id, other_user_id) user_set = (user_id, other_user_id)
if user_set in self.initially_handled_users_share_private_room:
continue
if user_set in self.initially_handled_users_share:
if is_public:
continue
to_update.add(user_set)
else:
to_insert.add(user_set) to_insert.add(user_set)
if is_public:
self.initially_handled_users_share.add(user_set)
else:
self.initially_handled_users_share_private_room.add(user_set)
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room( yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert room_id, not is_public, to_insert
) )
to_insert.clear() to_insert.clear()
if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update
)
to_update.clear()
if to_insert: if to_insert:
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
to_insert.clear() to_insert.clear()
if to_update:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update
)
to_update.clear()
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_deltas(self, deltas): def _handle_deltas(self, deltas):
"""Called with the state deltas to process """Called with the state deltas to process
@ -356,6 +315,7 @@ class UserDirectoryHandler(object):
user_ids = yield self.store.get_users_in_dir_due_to_room( user_ids = yield self.store.get_users_in_dir_due_to_room(
room_id room_id
) )
for user_id in user_ids: for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id) yield self._handle_remove_user(room_id, user_id)
return return
@ -436,14 +396,20 @@ class UserDirectoryHandler(object):
# ignore the change # ignore the change
return return
if change:
users_with_profile = yield self.state.get_current_user_in_room(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id)
# Remove every user from the sharing tables for that room.
for user_id in iterkeys(users_with_profile):
yield self.store.remove_user_who_share_room(user_id, room_id)
# Then, re-add them to the tables.
# NOTE: this is not the most efficient method, as handle_new_user sets
# up local_user -> other_user and other_user_whos_local -> local_user,
# which when ran over an entire room, will result in the same values
# being added multiple times. The batching upserts shouldn't make this
# too bad, though.
for user_id, profile in iteritems(users_with_profile): for user_id, profile in iteritems(users_with_profile):
yield self._handle_new_user(room_id, user_id, profile) yield self._handle_new_user(room_id, user_id, profile)
else:
users = yield self.store.get_users_in_public_due_to_room(room_id)
for user_id in users:
yield self._handle_remove_user(room_id, user_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_local_user(self, user_id): def _handle_local_user(self, user_id):
@ -457,7 +423,7 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id) row = yield self.store.get_user_in_directory(user_id)
if not row: if not row:
yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) yield self.store.add_profiles_to_user_dir({user_id: profile})
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile): def _handle_new_user(self, room_id, user_id, profile):
@ -471,55 +437,27 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id) row = yield self.store.get_user_in_directory(user_id)
if not row: if not row:
yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) yield self.store.add_profiles_to_user_dir({user_id: profile})
is_public = yield self.store.is_room_world_readable_or_publicly_joinable( is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id room_id
) )
# Now we update users who share rooms with users.
if is_public:
row = yield self.store.get_user_in_public_room(user_id)
if not row:
yield self.store.add_users_to_public_room(room_id, [user_id])
else:
logger.debug("Not adding new user to public dir, %r", user_id)
# Now we update users who share rooms with users. We do this by getting
# all the current users in the room and seeing which aren't already
# marked in the database as sharing with `user_id`
users_with_profile = yield self.state.get_current_user_in_room(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id)
to_insert = set() to_insert = set()
to_update = set()
# First, if they're our user then we need to update for every user
if self.is_mine_id(user_id):
is_appservice = self.store.get_if_app_services_interested_in_user(user_id) is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
# First, if they're our user then we need to update for every user # We don't care about appservice users.
if self.is_mine_id(user_id) and not is_appservice: if not is_appservice:
# Returns a map of other_user_id -> shared_private. We only need
# to update mappings if for users that either don't share a room
# already (aren't in the map) or, if the room is private, those that
# only share a public room.
user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
user_id
)
for other_user_id in users_with_profile: for other_user_id in users_with_profile:
if user_id == other_user_id: if user_id == other_user_id:
continue continue
shared_is_private = user_ids_shared.get(other_user_id)
if shared_is_private is True:
# We've already marked in the database they share a private room
continue
elif shared_is_private is False:
# They already share a public room, so only update if this is
# a private room
if not is_public:
to_update.add((user_id, other_user_id))
elif shared_is_private is None:
# This is the first time they both share a room
to_insert.add((user_id, other_user_id)) to_insert.add((user_id, other_user_id))
# Next we need to update for every local user in the room # Next we need to update for every local user in the room
@ -531,29 +469,11 @@ class UserDirectoryHandler(object):
other_user_id other_user_id
) )
if self.is_mine_id(other_user_id) and not is_appservice: if self.is_mine_id(other_user_id) and not is_appservice:
shared_is_private = yield self.store.get_if_users_share_a_room(
other_user_id, user_id
)
if shared_is_private is True:
# We've already marked in the database they share a private room
continue
elif shared_is_private is False:
# They already share a public room, so only update if this is
# a private room
if not is_public:
to_update.add((other_user_id, user_id))
elif shared_is_private is None:
# This is the first time they both share a room
to_insert.add((other_user_id, user_id)) to_insert.add((other_user_id, user_id))
if to_insert: if to_insert:
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
if to_update:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id): def _handle_remove_user(self, room_id, user_id):
"""Called when we might need to remove user to directory """Called when we might need to remove user to directory
@ -562,84 +482,16 @@ class UserDirectoryHandler(object):
room_id (str): room_id that user left or stopped being public that room_id (str): room_id that user left or stopped being public that
user_id (str) user_id (str)
""" """
logger.debug("Maybe removing user %r", user_id) logger.debug("Removing user %r", user_id)
row = yield self.store.get_user_in_directory(user_id) # Remove user from sharing tables
update_user_dir = row and row["room_id"] == room_id yield self.store.remove_user_who_share_room(user_id, room_id)
row = yield self.store.get_user_in_public_room(user_id) # Are they still in a room with members? If not, remove them entirely.
update_user_in_public = row and row["room_id"] == room_id users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id)
if update_user_in_public or update_user_dir: if len(users_in_room_with) == 0:
# XXX: Make this faster?
rooms = yield self.store.get_rooms_for_user(user_id)
for j_room_id in rooms:
if not update_user_in_public and not update_user_dir:
break
is_in_room = yield self.store.is_host_joined(
j_room_id, self.server_name
)
if not is_in_room:
continue
if update_user_dir:
update_user_dir = False
yield self.store.update_user_in_user_dir(user_id, j_room_id)
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
j_room_id
)
if update_user_in_public and is_public:
yield self.store.update_user_in_public_user_list(user_id, j_room_id)
update_user_in_public = False
if update_user_dir:
yield self.store.remove_from_user_dir(user_id) yield self.store.remove_from_user_dir(user_id)
elif update_user_in_public:
yield self.store.remove_from_user_in_public_room(user_id)
# Now handle users_who_share_rooms.
# Get a list of user tuples that were in the DB due to this room and
# users (this includes tuples where the other user matches `user_id`)
user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
user_id, room_id
)
for user_id, other_user_id in user_tuples:
# For each user tuple get a list of rooms that they still share,
# trying to find a private room, and update the entry in the DB
rooms = yield self.store.get_rooms_in_common_for_users(
user_id, other_user_id
)
# If they dont share a room anymore, remove the mapping
if not rooms:
yield self.store.remove_user_who_share_room(user_id, other_user_id)
continue
found_public_share = None
for j_room_id in rooms:
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
j_room_id
)
if is_public:
found_public_share = j_room_id
else:
found_public_share = None
yield self.store.update_users_who_share_room(
room_id, not is_public, [(user_id, other_user_id)]
)
break
if found_public_share:
yield self.store.update_users_who_share_room(
room_id, not is_public, [(user_id, other_user_id)]
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):

View File

@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return return
self.client_ip_last_seen.prefill(key, now)
self.hs.get_tcp_replication().send_user_ip( self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now user_id, access_token, ip, user_agent, device_id, now
) )

View File

@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
@defer.inlineCallbacks @defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token): def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a streams. """Subscribe the remote to a stream.
This invloves checking if they've missed anything and sending those This invloves checking if they've missed anything and sending those
updates down if they have. During that time new updates for the stream updates down if they have. During that time new updates for the stream
@ -478,11 +478,36 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Now we can send any updates that came in while we were subscribing # Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, []) pending_rdata = self.pending_rdata.pop(stream_name, [])
updates = []
for token, update in pending_rdata: for token, update in pending_rdata:
# Only send updates newer than the current token # If the token is null, it is part of a batch update. Batches
if token > current_token: # are multiple updates that share a single token. To denote
# this, the token is set to None for all tokens in the batch
# except for the last. If we find a None token, we keep looking
# through tokens until we find one that is not None and then
# process all previous updates in the batch as if they had the
# final token.
if token is None:
# Store this update as part of a batch
updates.append(update)
continue
if token <= current_token:
# This update or batch of updates is older than
# current_token, dismiss it
updates = []
continue
updates.append(update)
# Send all updates that are part of this batch with the
# found token
for update in updates:
self.send_command(RdataCommand(stream_name, token, update)) self.send_command(RdataCommand(stream_name, token, update))
# Clear stored updates
updates = []
# They're now fully subscribed # They're now fully subscribed
self.replication_streams.add(stream_name) self.replication_streams.add(stream_name)
except Exception as e: except Exception as e:

View File

@ -6,11 +6,11 @@
<img alt="" class="sender_avatar" src="{{ message.sender_avatar_url|mxc_to_http(32,32) }}" /> <img alt="" class="sender_avatar" src="{{ message.sender_avatar_url|mxc_to_http(32,32) }}" />
{% else %} {% else %}
{% if message.sender_hash % 3 == 0 %} {% if message.sender_hash % 3 == 0 %}
<img class="sender_avatar" src="https://vector.im/beta/img/76cfa6.png" /> <img class="sender_avatar" src="https://riot.im/img/external/avatar-1.png" />
{% elif message.sender_hash % 3 == 1 %} {% elif message.sender_hash % 3 == 1 %}
<img class="sender_avatar" src="https://vector.im/beta/img/50e2c2.png" /> <img class="sender_avatar" src="https://riot.im/img/external/avatar-2.png" />
{% else %} {% else %}
<img class="sender_avatar" src="https://vector.im/beta/img/f4c371.png" /> <img class="sender_avatar" src="https://riot.im/img/external/avatar-3.png" />
{% endif %} {% endif %}
{% endif %} {% endif %}
{% endif %} {% endif %}

View File

@ -19,7 +19,7 @@
</td> </td>
<td class="logo"> <td class="logo">
{% if app_name == "Riot" %} {% if app_name == "Riot" %}
<img src="http://matrix.org/img/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
{% elif app_name == "Vector" %} {% elif app_name == "Vector" %}
<img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
{% else %} {% else %}

View File

@ -5,11 +5,11 @@
<img alt="" src="{{ room.avatar_url|mxc_to_http(48,48) }}" /> <img alt="" src="{{ room.avatar_url|mxc_to_http(48,48) }}" />
{% else %} {% else %}
{% if room.hash % 3 == 0 %} {% if room.hash % 3 == 0 %}
<img alt="" src="https://vector.im/beta/img/76cfa6.png" /> <img alt="" src="https://riot.im/img/external/avatar-1.png" />
{% elif room.hash % 3 == 1 %} {% elif room.hash % 3 == 1 %}
<img alt="" src="https://vector.im/beta/img/50e2c2.png" /> <img alt="" src="https://riot.im/img/external/avatar-2.png" />
{% else %} {% else %}
<img alt="" src="https://vector.im/beta/img/f4c371.png" /> <img alt="" src="https://riot.im/img/external/avatar-3.png" />
{% endif %} {% endif %}
{% endif %} {% endif %}
</td> </td>

View File

@ -488,17 +488,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
) )
new_room_id = info["room_id"] new_room_id = info["room_id"]
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
requester_user_id = requester.user.to_string() requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id) logger.info("Shutting down room %r", room_id)
@ -536,6 +525,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id) kicked_users.append(user_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = yield self.store.get_aliases_for_room(room_id) aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room( yield self.store.update_aliases_for_room(

View File

@ -196,7 +196,7 @@ class RegisterRestServlet(RestServlet):
self.identity_handler = hs.get_handlers().identity_handler self.identity_handler = hs.get_handlers().identity_handler
self.room_member_handler = hs.get_room_member_handler() self.room_member_handler = hs.get_room_member_handler()
self.macaroon_gen = hs.get_macaroon_generator() self.macaroon_gen = hs.get_macaroon_generator()
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_registration_ratelimiter()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@interactive_auth_handler @interactive_auth_handler

View File

@ -206,6 +206,7 @@ class HomeServer(object):
self.clock = Clock(reactor) self.clock = Clock(reactor)
self.distributor = Distributor() self.distributor = Distributor()
self.ratelimiter = Ratelimiter() self.ratelimiter = Ratelimiter()
self.registration_ratelimiter = Ratelimiter()
self.datastore = None self.datastore = None
@ -251,6 +252,9 @@ class HomeServer(object):
def get_ratelimiter(self): def get_ratelimiter(self):
return self.ratelimiter return self.ratelimiter
def get_registration_ratelimiter(self):
return self.registration_ratelimiter
def build_federation_client(self): def build_federation_client(self):
return FederationClient(self) return FederationClient(self)

View File

@ -0,0 +1,47 @@
/* Copyright 2017 Vector Creations Ltd, 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Old disused version of the tables below.
DROP TABLE IF EXISTS users_who_share_rooms;
-- This is no longer used because it's duplicated by the users_who_share_public_rooms
DROP TABLE IF EXISTS users_in_public_rooms;
-- Tables keeping track of what users share rooms. This is a map of local users
-- to local or remote users, per room. Remote users cannot be in the user_id
-- column, only the other_user_id column. There are two tables, one for public
-- rooms and those for private rooms.
CREATE TABLE IF NOT EXISTS users_who_share_public_rooms (
user_id TEXT NOT NULL,
other_user_id TEXT NOT NULL,
room_id TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS users_who_share_private_rooms (
user_id TEXT NOT NULL,
other_user_id TEXT NOT NULL,
room_id TEXT NOT NULL
);
CREATE UNIQUE INDEX users_who_share_public_rooms_u_idx ON users_who_share_public_rooms(user_id, other_user_id, room_id);
CREATE INDEX users_who_share_public_rooms_r_idx ON users_who_share_public_rooms(room_id);
CREATE INDEX users_who_share_public_rooms_o_idx ON users_who_share_public_rooms(other_user_id);
CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id);
CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id);
CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id);
-- Make sure that we populate the tables initially by resetting the stream ID
UPDATE user_directory_stream_pos SET stream_id = NULL;

View File

@ -191,6 +191,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
order='DESC'): order='DESC'):
"""Get new room events in stream ordering since `from_key`.
Args:
room_id (str)
from_key (str): Token from which no events are returned before
to_key (str): Token from which no events are returned after. (This
is typically the current stream token)
limit (int): Maximum number of events to return
order (str): Either "DESC" or "ASC". Determines which events are
returned when the result is limited. If "DESC" then the most
recent `limit` events are returned, otherwise returns the
oldest `limit` events.
Returns:
Deferred[dict[str,tuple[list[FrozenEvent], str]]]
A map from room id to a tuple containing:
- list of recent events in the room
- stream ordering key for the start of the chunk of events returned.
"""
from_id = RoomStreamToken.parse_stream_token(from_key).stream from_id = RoomStreamToken.parse_stream_token(from_key).stream
room_ids = yield self._events_stream_cache.get_entities_changed( room_ids = yield self._events_stream_cache.get_entities_changed(

View File

@ -63,31 +63,14 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(False) defer.returnValue(False)
@defer.inlineCallbacks def add_profiles_to_user_dir(self, users_with_profile):
def add_users_to_public_room(self, room_id, user_ids):
"""Add user to the list of users in public rooms
Args:
room_id (str): A room_id that all users are in that is world_readable
or publically joinable
user_ids (list(str)): Users to add
"""
yield self._simple_insert_many(
table="users_in_public_rooms",
values=[{"user_id": user_id, "room_id": room_id} for user_id in user_ids],
desc="add_users_to_public_room",
)
for user_id in user_ids:
self.get_user_in_public_room.invalidate((user_id,))
def add_profiles_to_user_dir(self, room_id, users_with_profile):
"""Add profiles to the user directory """Add profiles to the user directory
Args: Args:
room_id (str): A room_id that all users are joined to
users_with_profile (dict): Users to add to directory in the form of users_with_profile (dict): Users to add to directory in the form of
mapping of user_id -> ProfileInfo mapping of user_id -> ProfileInfo
""" """
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
# We weight the loclpart most highly, then display name and finally # We weight the loclpart most highly, then display name and finally
# server name # server name
@ -113,7 +96,7 @@ class UserDirectoryStore(SQLBaseStore):
INSERT INTO user_directory_search(user_id, value) INSERT INTO user_directory_search(user_id, value)
VALUES (?,?) VALUES (?,?)
""" """
args = ( args = tuple(
( (
user_id, user_id,
"%s %s" % (user_id, p.display_name) if p.display_name else user_id, "%s %s" % (user_id, p.display_name) if p.display_name else user_id,
@ -132,7 +115,7 @@ class UserDirectoryStore(SQLBaseStore):
values=[ values=[
{ {
"user_id": user_id, "user_id": user_id,
"room_id": room_id, "room_id": None,
"display_name": profile.display_name, "display_name": profile.display_name,
"avatar_url": profile.avatar_url, "avatar_url": profile.avatar_url,
} }
@ -250,16 +233,6 @@ class UserDirectoryStore(SQLBaseStore):
"update_profile_in_user_dir", _update_profile_in_user_dir_txn "update_profile_in_user_dir", _update_profile_in_user_dir_txn
) )
@defer.inlineCallbacks
def update_user_in_public_user_list(self, user_id, room_id):
yield self._simple_update_one(
table="users_in_public_rooms",
keyvalues={"user_id": user_id},
updatevalues={"room_id": room_id},
desc="update_user_in_public_user_list",
)
self.get_user_in_public_room.invalidate((user_id,))
def remove_from_user_dir(self, user_id): def remove_from_user_dir(self, user_id):
def _remove_from_user_dir_txn(txn): def _remove_from_user_dir_txn(txn):
self._simple_delete_txn( self._simple_delete_txn(
@ -269,62 +242,50 @@ class UserDirectoryStore(SQLBaseStore):
txn, table="user_directory_search", keyvalues={"user_id": user_id} txn, table="user_directory_search", keyvalues={"user_id": user_id}
) )
self._simple_delete_txn( self._simple_delete_txn(
txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} txn,
table="users_who_share_public_rooms",
keyvalues={"user_id": user_id},
)
self._simple_delete_txn(
txn,
table="users_who_share_public_rooms",
keyvalues={"other_user_id": user_id},
)
self._simple_delete_txn(
txn,
table="users_who_share_private_rooms",
keyvalues={"user_id": user_id},
)
self._simple_delete_txn(
txn,
table="users_who_share_private_rooms",
keyvalues={"other_user_id": user_id},
) )
txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
txn.call_after(self.get_user_in_public_room.invalidate, (user_id,))
return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn) return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
@defer.inlineCallbacks
def remove_from_user_in_public_room(self, user_id):
yield self._simple_delete(
table="users_in_public_rooms",
keyvalues={"user_id": user_id},
desc="remove_from_user_in_public_room",
)
self.get_user_in_public_room.invalidate((user_id,))
def get_users_in_public_due_to_room(self, room_id):
"""Get all user_ids that are in the room directory because they're
in the given room_id
"""
return self._simple_select_onecol(
table="users_in_public_rooms",
keyvalues={"room_id": room_id},
retcol="user_id",
desc="get_users_in_public_due_to_room",
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_users_in_dir_due_to_room(self, room_id): def get_users_in_dir_due_to_room(self, room_id):
"""Get all user_ids that are in the room directory because they're """Get all user_ids that are in the room directory because they're
in the given room_id in the given room_id
""" """
user_ids_dir = yield self._simple_select_onecol( user_ids_share_pub = yield self._simple_select_onecol(
table="user_directory", table="users_who_share_public_rooms",
keyvalues={"room_id": room_id}, keyvalues={"room_id": room_id},
retcol="user_id", retcol="other_user_id",
desc="get_users_in_dir_due_to_room", desc="get_users_in_dir_due_to_room",
) )
user_ids_pub = yield self._simple_select_onecol( user_ids_share_priv = yield self._simple_select_onecol(
table="users_in_public_rooms", table="users_who_share_private_rooms",
keyvalues={"room_id": room_id}, keyvalues={"room_id": room_id},
retcol="user_id", retcol="other_user_id",
desc="get_users_in_dir_due_to_room", desc="get_users_in_dir_due_to_room",
) )
user_ids_share = yield self._simple_select_onecol( user_ids = set(user_ids_share_pub)
table="users_who_share_rooms", user_ids.update(user_ids_share_priv)
keyvalues={"room_id": room_id},
retcol="user_id",
desc="get_users_in_dir_due_to_room",
)
user_ids = set(user_ids_dir)
user_ids.update(user_ids_pub)
user_ids.update(user_ids_share)
defer.returnValue(user_ids) defer.returnValue(user_ids)
@ -351,7 +312,7 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue([name for name, in rows]) defer.returnValue([name for name, in rows])
def add_users_who_share_room(self, room_id, share_private, user_id_tuples): def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
"""Insert entries into the users_who_share_rooms table. The first """Insert entries into the users_who_share_*_rooms table. The first
user should be a local user. user should be a local user.
Args: Args:
@ -361,109 +322,71 @@ class UserDirectoryStore(SQLBaseStore):
""" """
def _add_users_who_share_room_txn(txn): def _add_users_who_share_room_txn(txn):
self._simple_insert_many_txn(
if share_private:
tbl = "users_who_share_private_rooms"
else:
tbl = "users_who_share_public_rooms"
self._simple_upsert_many_txn(
txn, txn,
table="users_who_share_rooms", table=tbl,
values=[ key_names=["user_id", "other_user_id", "room_id"],
{ key_values=[
"user_id": user_id, (user_id, other_user_id, room_id)
"other_user_id": other_user_id,
"room_id": room_id,
"share_private": share_private,
}
for user_id, other_user_id in user_id_tuples for user_id, other_user_id in user_id_tuples
], ],
value_names=(),
value_values=None,
) )
for user_id, other_user_id in user_id_tuples: for user_id, other_user_id in user_id_tuples:
txn.call_after( txn.call_after(
self.get_users_who_share_room_from_dir.invalidate, (user_id,) self.get_users_who_share_room_from_dir.invalidate, (user_id,)
) )
txn.call_after(
self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
return self.runInteraction( return self.runInteraction(
"add_users_who_share_room", _add_users_who_share_room_txn "add_users_who_share_room", _add_users_who_share_room_txn
) )
def update_users_who_share_room(self, room_id, share_private, user_id_sets): def remove_user_who_share_room(self, user_id, room_id):
"""Updates entries in the users_who_share_rooms table. The first """
Deletes entries in the users_who_share_*_rooms table. The first
user should be a local user. user should be a local user.
Args: Args:
user_id (str)
room_id (str) room_id (str)
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _update_users_who_share_room_txn(txn):
sql = """
UPDATE users_who_share_rooms
SET room_id = ?, share_private = ?
WHERE user_id = ? AND other_user_id = ?
"""
txn.executemany(
sql, ((room_id, share_private, uid, oid) for uid, oid in user_id_sets)
)
for user_id, other_user_id in user_id_sets:
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
txn.call_after(
self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
return self.runInteraction(
"update_users_who_share_room", _update_users_who_share_room_txn
)
def remove_user_who_share_room(self, user_id, other_user_id):
"""Deletes entries in the users_who_share_rooms table. The first
user should be a local user.
Args:
room_id (str)
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
""" """
def _remove_user_who_share_room_txn(txn): def _remove_user_who_share_room_txn(txn):
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,
table="users_who_share_rooms", table="users_who_share_private_rooms",
keyvalues={"user_id": user_id, "other_user_id": other_user_id}, keyvalues={"user_id": user_id, "room_id": room_id},
)
self._simple_delete_txn(
txn,
table="users_who_share_private_rooms",
keyvalues={"other_user_id": user_id, "room_id": room_id},
)
self._simple_delete_txn(
txn,
table="users_who_share_public_rooms",
keyvalues={"user_id": user_id, "room_id": room_id},
)
self._simple_delete_txn(
txn,
table="users_who_share_public_rooms",
keyvalues={"other_user_id": user_id, "room_id": room_id},
) )
txn.call_after( txn.call_after(
self.get_users_who_share_room_from_dir.invalidate, (user_id,) self.get_users_who_share_room_from_dir.invalidate, (user_id,)
) )
txn.call_after(
self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
return self.runInteraction( return self.runInteraction(
"remove_user_who_share_room", _remove_user_who_share_room_txn "remove_user_who_share_room", _remove_user_who_share_room_txn
) )
@cached(max_entries=500000)
def get_if_users_share_a_room(self, user_id, other_user_id):
"""Gets if users share a room.
Args:
user_id (str): Must be a local user_id
other_user_id (str)
Returns:
bool|None: None if they don't share a room, otherwise whether they
share a private room or not.
"""
return self._simple_select_one_onecol(
table="users_who_share_rooms",
keyvalues={"user_id": user_id, "other_user_id": other_user_id},
retcol="share_private",
allow_none=True,
desc="get_if_users_share_a_room",
)
@cachedInlineCallbacks(max_entries=500000, iterable=True) @cachedInlineCallbacks(max_entries=500000, iterable=True)
def get_users_who_share_room_from_dir(self, user_id): def get_users_who_share_room_from_dir(self, user_id):
"""Returns the set of users who share a room with `user_id` """Returns the set of users who share a room with `user_id`
@ -472,33 +395,30 @@ class UserDirectoryStore(SQLBaseStore):
user_id(str): Must be a local user user_id(str): Must be a local user
Returns: Returns:
dict: user_id -> share_private mapping list: user_id
""" """
rows = yield self._simple_select_list( rows = yield self._simple_select_onecol(
table="users_who_share_rooms", table="users_who_share_private_rooms",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
retcols=("other_user_id", "share_private"), retcol="other_user_id",
desc="get_users_who_share_room_with_user", desc="get_users_who_share_room_with_user",
) )
defer.returnValue({row["other_user_id"]: row["share_private"] for row in rows}) pub_rows = yield self._simple_select_onecol(
table="users_who_share_public_rooms",
def get_users_in_share_dir_with_room_id(self, user_id, room_id): keyvalues={"user_id": user_id},
"""Get all user tuples that are in the users_who_share_rooms due to the retcol="other_user_id",
given room_id. desc="get_users_who_share_room_with_user",
Returns:
[(user_id, other_user_id)]: where one of the two will match the given
user_id.
"""
sql = """
SELECT user_id, other_user_id FROM users_who_share_rooms
WHERE room_id = ? AND (user_id = ? OR other_user_id = ?)
"""
return self._execute(
"get_users_in_share_dir_with_room_id", None, sql, room_id, user_id, user_id
) )
users = set(pub_rows)
users.update(rows)
# Remove the user themselves from this list.
users.discard(user_id)
defer.returnValue(list(users))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_rooms_in_common_for_users(self, user_id, other_user_id): def get_rooms_in_common_for_users(self, user_id, other_user_id):
"""Given two user_ids find out the list of rooms they share. """Given two user_ids find out the list of rooms they share.
@ -532,12 +452,10 @@ class UserDirectoryStore(SQLBaseStore):
def _delete_all_from_user_dir_txn(txn): def _delete_all_from_user_dir_txn(txn):
txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM user_directory_search")
txn.execute("DELETE FROM users_in_public_rooms") txn.execute("DELETE FROM users_who_share_public_rooms")
txn.execute("DELETE FROM users_who_share_rooms") txn.execute("DELETE FROM users_who_share_private_rooms")
txn.call_after(self.get_user_in_directory.invalidate_all) txn.call_after(self.get_user_in_directory.invalidate_all)
txn.call_after(self.get_user_in_public_room.invalidate_all)
txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
txn.call_after(self.get_if_users_share_a_room.invalidate_all)
return self.runInteraction( return self.runInteraction(
"delete_all_from_user_dir", _delete_all_from_user_dir_txn "delete_all_from_user_dir", _delete_all_from_user_dir_txn
@ -548,21 +466,11 @@ class UserDirectoryStore(SQLBaseStore):
return self._simple_select_one( return self._simple_select_one(
table="user_directory", table="user_directory",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
retcols=("room_id", "display_name", "avatar_url"), retcols=("display_name", "avatar_url"),
allow_none=True, allow_none=True,
desc="get_user_in_directory", desc="get_user_in_directory",
) )
@cached()
def get_user_in_public_room(self, user_id):
return self._simple_select_one(
table="users_in_public_rooms",
keyvalues={"user_id": user_id},
retcols=("room_id",),
allow_none=True,
desc="get_user_in_public_room",
)
def get_user_directory_stream_pos(self): def get_user_directory_stream_pos(self):
return self._simple_select_one_onecol( return self._simple_select_one_onecol(
table="user_directory_stream_pos", table="user_directory_stream_pos",
@ -660,14 +568,15 @@ class UserDirectoryStore(SQLBaseStore):
where_clause = "1=1" where_clause = "1=1"
else: else:
join_clause = """ join_clause = """
LEFT JOIN users_in_public_rooms AS p USING (user_id)
LEFT JOIN ( LEFT JOIN (
SELECT other_user_id AS user_id FROM users_who_share_rooms SELECT other_user_id AS user_id FROM users_who_share_public_rooms
WHERE user_id = ? AND share_private UNION
) AS s USING (user_id) SELECT other_user_id AS user_id FROM users_who_share_private_rooms
WHERE user_id = ?
) AS p USING (user_id)
""" """
join_args = (user_id,) join_args = (user_id,)
where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)" where_clause = "p.user_id IS NOT NULL"
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
full_query, exact_query, prefix_query = _parse_query_postgres(search_term) full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
@ -686,7 +595,7 @@ class UserDirectoryStore(SQLBaseStore):
%s %s
AND vector @@ to_tsquery('english', ?) AND vector @@ to_tsquery('english', ?)
ORDER BY ORDER BY
(CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
* (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END) * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
* (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END) * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END)
* ( * (

View File

@ -14,78 +14,262 @@
# limitations under the License. # limitations under the License.
from mock import Mock from mock import Mock
from twisted.internet import defer
from synapse.api.constants import UserTypes from synapse.api.constants import UserTypes
from synapse.handlers.user_directory import UserDirectoryHandler from synapse.rest.client.v1 import admin, login, room
from synapse.storage.roommember import ProfileInfo from synapse.storage.roommember import ProfileInfo
from tests import unittest from tests import unittest
from tests.utils import setup_test_homeserver
class UserDirectoryHandlers(object): class UserDirectoryTestCase(unittest.HomeserverTestCase):
def __init__(self, hs): """
self.user_directory_handler = UserDirectoryHandler(hs) Tests the UserDirectoryHandler.
"""
servlets = [
login.register_servlets,
admin.register_servlets,
room.register_servlets,
]
class UserDirectoryTestCase(unittest.TestCase): def make_homeserver(self, reactor, clock):
""" Tests the UserDirectoryHandler. """
@defer.inlineCallbacks config = self.default_config()
def setUp(self): config.update_user_directory = True
hs = yield setup_test_homeserver(self.addCleanup) return self.setup_test_homeserver(config=config)
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()
hs.handlers = UserDirectoryHandlers(hs) self.handler = hs.get_user_directory_handler()
self.handler = hs.get_handlers().user_directory_handler
@defer.inlineCallbacks
def test_handle_local_profile_change_with_support_user(self): def test_handle_local_profile_change_with_support_user(self):
support_user_id = "@support:test" support_user_id = "@support:test"
yield self.store.register( self.get_success(
self.store.register(
user_id=support_user_id, user_id=support_user_id,
token="123", token="123",
password_hash=None, password_hash=None,
user_type=UserTypes.SUPPORT user_type=UserTypes.SUPPORT,
)
) )
yield self.handler.handle_local_profile_change(support_user_id, None) self.get_success(
profile = yield self.store.get_user_in_directory(support_user_id) self.handler.handle_local_profile_change(support_user_id, None)
)
profile = self.get_success(self.store.get_user_in_directory(support_user_id))
self.assertTrue(profile is None) self.assertTrue(profile is None)
display_name = 'display_name' display_name = 'display_name'
profile_info = ProfileInfo( profile_info = ProfileInfo(avatar_url='avatar_url', display_name=display_name)
avatar_url='avatar_url',
display_name=display_name,
)
regular_user_id = '@regular:test' regular_user_id = '@regular:test'
yield self.handler.handle_local_profile_change(regular_user_id, profile_info) self.get_success(
profile = yield self.store.get_user_in_directory(regular_user_id) self.handler.handle_local_profile_change(regular_user_id, profile_info)
)
profile = self.get_success(self.store.get_user_in_directory(regular_user_id))
self.assertTrue(profile['display_name'] == display_name) self.assertTrue(profile['display_name'] == display_name)
@defer.inlineCallbacks
def test_handle_user_deactivated_support_user(self): def test_handle_user_deactivated_support_user(self):
s_user_id = "@support:test" s_user_id = "@support:test"
self.get_success(
self.store.register( self.store.register(
user_id=s_user_id, user_id=s_user_id,
token="123", token="123",
password_hash=None, password_hash=None,
user_type=UserTypes.SUPPORT user_type=UserTypes.SUPPORT,
)
) )
self.store.remove_from_user_dir = Mock() self.store.remove_from_user_dir = Mock()
self.store.remove_from_user_in_public_room = Mock() self.store.remove_from_user_in_public_room = Mock()
yield self.handler.handle_user_deactivated(s_user_id) self.get_success(self.handler.handle_user_deactivated(s_user_id))
self.store.remove_from_user_dir.not_called() self.store.remove_from_user_dir.not_called()
self.store.remove_from_user_in_public_room.not_called() self.store.remove_from_user_in_public_room.not_called()
@defer.inlineCallbacks
def test_handle_user_deactivated_regular_user(self): def test_handle_user_deactivated_regular_user(self):
r_user_id = "@regular:test" r_user_id = "@regular:test"
self.get_success(
self.store.register(user_id=r_user_id, token="123", password_hash=None) self.store.register(user_id=r_user_id, token="123", password_hash=None)
)
self.store.remove_from_user_dir = Mock() self.store.remove_from_user_dir = Mock()
self.store.remove_from_user_in_public_room = Mock() self.get_success(self.handler.handle_user_deactivated(r_user_id))
yield self.handler.handle_user_deactivated(r_user_id)
self.store.remove_from_user_dir.called_once_with(r_user_id) self.store.remove_from_user_dir.called_once_with(r_user_id)
self.store.remove_from_user_in_public_room.assert_called_once_with(r_user_id)
def test_private_room(self):
"""
A user can be searched for only by people that are either in a public
room, or that share a private chat.
"""
u1 = self.register_user("user1", "pass")
u1_token = self.login(u1, "pass")
u2 = self.register_user("user2", "pass")
u2_token = self.login(u2, "pass")
u3 = self.register_user("user3", "pass")
# We do not add users to the directory until they join a room.
s = self.get_success(self.handler.search_users(u1, "user2", 10))
self.assertEqual(len(s["results"]), 0)
room = self.helper.create_room_as(u1, is_public=False, tok=u1_token)
self.helper.invite(room, src=u1, targ=u2, tok=u1_token)
self.helper.join(room, user=u2, tok=u2_token)
# Check we have populated the database correctly.
shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
self.assertEqual(shares_public, [])
self.assertEqual(
self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)])
)
# We get one search result when searching for user2 by user1.
s = self.get_success(self.handler.search_users(u1, "user2", 10))
self.assertEqual(len(s["results"]), 1)
# We get NO search results when searching for user2 by user3.
s = self.get_success(self.handler.search_users(u3, "user2", 10))
self.assertEqual(len(s["results"]), 0)
# We get NO search results when searching for user3 by user1.
s = self.get_success(self.handler.search_users(u1, "user3", 10))
self.assertEqual(len(s["results"]), 0)
# User 2 then leaves.
self.helper.leave(room, user=u2, tok=u2_token)
# Check we have removed the values.
shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
self.assertEqual(shares_public, [])
self.assertEqual(self._compress_shared(shares_private), set())
# User1 now gets no search results for any of the other users.
s = self.get_success(self.handler.search_users(u1, "user2", 10))
self.assertEqual(len(s["results"]), 0)
s = self.get_success(self.handler.search_users(u1, "user3", 10))
self.assertEqual(len(s["results"]), 0)
def _compress_shared(self, shared):
"""
Compress a list of users who share rooms dicts to a list of tuples.
"""
r = set()
for i in shared:
r.add((i["user_id"], i["other_user_id"], i["room_id"]))
return r
def get_users_who_share_public_rooms(self):
return self.get_success(
self.store._simple_select_list(
"users_who_share_public_rooms",
None,
["user_id", "other_user_id", "room_id"],
)
)
def get_users_who_share_private_rooms(self):
return self.get_success(
self.store._simple_select_list(
"users_who_share_private_rooms",
None,
["user_id", "other_user_id", "room_id"],
)
)
def test_initial(self):
"""
The user directory's initial handler correctly updates the search tables.
"""
u1 = self.register_user("user1", "pass")
u1_token = self.login(u1, "pass")
u2 = self.register_user("user2", "pass")
u2_token = self.login(u2, "pass")
u3 = self.register_user("user3", "pass")
u3_token = self.login(u3, "pass")
room = self.helper.create_room_as(u1, is_public=True, tok=u1_token)
self.helper.invite(room, src=u1, targ=u2, tok=u1_token)
self.helper.join(room, user=u2, tok=u2_token)
private_room = self.helper.create_room_as(u1, is_public=False, tok=u1_token)
self.helper.invite(private_room, src=u1, targ=u3, tok=u1_token)
self.helper.join(private_room, user=u3, tok=u3_token)
self.get_success(self.store.update_user_directory_stream_pos(None))
self.get_success(self.store.delete_all_from_user_dir())
shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
self.assertEqual(shares_private, [])
self.assertEqual(shares_public, [])
# Reset the handled users caches
self.handler.initially_handled_users = set()
# Do the initial population
d = self.handler._do_initial_spam()
# This takes a while, so pump it a bunch of times to get through the
# sleep delays
for i in range(10):
self.pump(1)
self.get_success(d)
shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
# User 1 and User 2 share public rooms
self.assertEqual(
self._compress_shared(shares_public), set([(u1, u2, room), (u2, u1, room)])
)
# User 1 and User 3 share private rooms
self.assertEqual(
self._compress_shared(shares_private),
set([(u1, u3, private_room), (u3, u1, private_room)]),
)
def test_search_all_users(self):
"""
Search all users = True means that a user does not have to share a
private room with the searching user or be in a public room to be search
visible.
"""
self.handler.search_all_users = True
self.hs.config.user_directory_search_all_users = True
u1 = self.register_user("user1", "pass")
u1_token = self.login(u1, "pass")
u2 = self.register_user("user2", "pass")
u2_token = self.login(u2, "pass")
u3 = self.register_user("user3", "pass")
# User 1 and User 2 join a room. User 3 never does.
room = self.helper.create_room_as(u1, is_public=True, tok=u1_token)
self.helper.invite(room, src=u1, targ=u2, tok=u1_token)
self.helper.join(room, user=u2, tok=u2_token)
self.get_success(self.store.update_user_directory_stream_pos(None))
self.get_success(self.store.delete_all_from_user_dir())
# Reset the handled users caches
self.handler.initially_handled_users = set()
# Do the initial population
d = self.handler._do_initial_spam()
# This takes a while, so pump it a bunch of times to get through the
# sleep delays
for i in range(10):
self.pump(1)
self.get_success(d)
# Despite not sharing a room, search_all_users means we get a search
# result.
s = self.get_success(self.handler.search_users(u1, u3, 10))
self.assertEqual(len(s["results"]), 1)

View File

@ -35,14 +35,12 @@ class UserDirectoryStoreTestCase(unittest.TestCase):
# alice and bob are both in !room_id. bobby is not but shares # alice and bob are both in !room_id. bobby is not but shares
# a homeserver with alice. # a homeserver with alice.
yield self.store.add_profiles_to_user_dir( yield self.store.add_profiles_to_user_dir(
"!room:id",
{ {
ALICE: ProfileInfo(None, "alice"), ALICE: ProfileInfo(None, "alice"),
BOB: ProfileInfo(None, "bob"), BOB: ProfileInfo(None, "bob"),
BOBBY: ProfileInfo(None, "bobby"), BOBBY: ProfileInfo(None, "bobby"),
}, },
) )
yield self.store.add_users_to_public_room("!room:id", [ALICE, BOB])
yield self.store.add_users_who_share_room( yield self.store.add_users_who_share_room(
"!room:id", False, ((ALICE, BOB), (BOB, ALICE)) "!room:id", False, ((ALICE, BOB), (BOB, ALICE))
) )