Merge branch 'develop' of github.com:matrix-org/synapse into erikj/ensure_round_trip

This commit is contained in:
Erik Johnston 2017-06-26 14:02:44 +01:00
commit e3cbec10c1
26 changed files with 535 additions and 58 deletions

View File

@ -1,3 +1,11 @@
Changes in synapse v0.21.1 (2017-06-15)
=======================================
Bug fixes:
* Fix bug in anonymous usage statistic reporting (PR #2281)
Changes in synapse v0.21.0 (2017-05-18)
=======================================

View File

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.21.0"
__version__ = "0.21.1"

View File

@ -25,8 +25,8 @@ from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v2_alpha import user_directory

View File

@ -33,6 +33,7 @@ from .jwt import JWTConfig
from .password_auth_providers import PasswordAuthProviderConfig
from .emailconfig import EmailConfig
from .workers import WorkerConfig
from .push import PushConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
@ -40,7 +41,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
JWTConfig, PasswordConfig, EmailConfig,
WorkerConfig, PasswordAuthProviderConfig,):
WorkerConfig, PasswordAuthProviderConfig, PushConfig,):
pass

45
synapse/config/push.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
class PushConfig(Config):
def read_config(self, config):
self.push_redact_content = False
push_config = config.get("email", {})
self.push_redact_content = push_config.get("redact_content", False)
def default_config(self, config_dir_path, server_name, **kwargs):
return """
# Control how push messages are sent to google/apple to notifications.
# Normally every message said in a room with one or more people using
# mobile devices will be posted to a push server hosted by matrix.org
# which is registered with google and apple in order to allow push
# notifications to be sent to these mobile devices.
#
# Setting redact_content to true will make the push messages contain no
# message content which will provide increased privacy. This is a
# temporary solution pending improvements to Android and iPhone apps
# to get content from the app rather than the notification.
#
# For modern android devices the notification content will still appear
# because it is loaded by the app. iPhone, however will send a
# notification saying only that a message arrived and who it came from.
#
#push:
# redact_content: false
"""

View File

@ -1068,6 +1068,10 @@ class FederationHandler(BaseHandler):
"""
event = pdu
is_blocked = yield self.store.is_room_blocked(event.room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
event.internal_metadata.outlier = True
event.internal_metadata.invite_from_remote = True

View File

@ -61,7 +61,7 @@ class RoomCreationHandler(BaseHandler):
}
@defer.inlineCallbacks
def create_room(self, requester, config):
def create_room(self, requester, config, ratelimit=True):
""" Creates a new room.
Args:
@ -75,6 +75,7 @@ class RoomCreationHandler(BaseHandler):
"""
user_id = requester.user.to_string()
if ratelimit:
yield self.ratelimit(requester)
if "room_alias_name" in config:
@ -167,6 +168,7 @@ class RoomCreationHandler(BaseHandler):
initial_state=initial_state,
creation_content=creation_content,
room_alias=room_alias,
power_level_content_override=config.get("power_level_content_override", {})
)
if "name" in config:
@ -245,7 +247,8 @@ class RoomCreationHandler(BaseHandler):
invite_list,
initial_state,
creation_content,
room_alias
room_alias,
power_level_content_override,
):
def create(etype, content, **kwargs):
e = {
@ -291,7 +294,15 @@ class RoomCreationHandler(BaseHandler):
ratelimit=False,
)
if (EventTypes.PowerLevels, '') not in initial_state:
# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ''), None)
if pl_content is not None:
yield send(
etype=EventTypes.PowerLevels,
content=pl_content,
)
else:
power_level_content = {
"users": {
creator_id: 100,
@ -316,6 +327,8 @@ class RoomCreationHandler(BaseHandler):
for invitee in invite_list:
power_level_content["users"][invitee] = 100
power_level_content.update(power_level_content_override)
yield send(
etype=EventTypes.PowerLevels,
content=power_level_content,

View File

@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler):
if not remote_room_hosts:
remote_room_hosts = []
if effective_membership_state not in ("leave", "ban",):
is_blocked = yield self.store.is_room_blocked(room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
@ -369,6 +374,11 @@ class RoomMemberHandler(BaseHandler):
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
if event.membership not in (Membership.LEAVE, Membership.BAN):
is_blocked = yield self.store.is_room_blocked(room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
yield message_handler.handle_new_client_event(
requester,
event,

View File

@ -42,6 +42,8 @@ class UserDirectoyHandler(object):
"""
INITIAL_SLEEP_MS = 50
INITIAL_SLEEP_COUNT = 100
INITIAL_BATCH_SIZE = 100
def __init__(self, hs):
self.store = hs.get_datastore()
@ -126,6 +128,7 @@ class UserDirectoyHandler(object):
if not deltas:
return
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
@ -187,9 +190,9 @@ class UserDirectoyHandler(object):
if is_public:
yield self.store.add_users_to_public_room(
room_id,
user_ids=unhandled_users - self.initially_handled_users_in_public
user_ids=user_ids - self.initially_handled_users_in_public
)
self.initially_handled_users_in_public != unhandled_users
self.initially_handled_users_in_public |= user_ids
# We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources.
@ -198,18 +201,22 @@ class UserDirectoyHandler(object):
to_update = set()
count = 0
for user_id in user_ids:
if count % 100 == 0:
if count % self.INITIAL_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
continue
if self.store.get_if_app_services_interested_in_user(user_id):
count += 1
continue
for other_user_id in user_ids:
if user_id == other_user_id:
continue
if count % 100 == 0:
if count % self.INITIAL_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
count += 1
@ -230,13 +237,13 @@ class UserDirectoyHandler(object):
else:
self.initially_handled_users_share_private_room.add(user_set)
if len(to_insert) > 100:
if len(to_insert) > self.INITIAL_BATCH_SIZE:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
to_insert.clear()
if len(to_update) > 100:
if len(to_update) > self.INITIAL_BATCH_SIZE:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
)
@ -294,7 +301,7 @@ class UserDirectoyHandler(object):
room_id, self.server_name,
)
if not is_in_room:
logger.debug("Server left room: %r", room_id)
logger.info("Server left room: %r", room_id)
# Fetch all the users that we marked as being in user
# directory due to being in the room and then check if
# need to remove those users or not
@ -411,8 +418,10 @@ class UserDirectoyHandler(object):
to_insert = set()
to_update = set()
is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
# First, if they're our user then we need to update for every user
if self.is_mine_id(user_id):
if self.is_mine_id(user_id) and not is_appservice:
# Returns a map of other_user_id -> shared_private. We only need
# to update mappings if for users that either don't share a room
# already (aren't in the map) or, if the room is private, those that
@ -443,7 +452,10 @@ class UserDirectoyHandler(object):
if user_id == other_user_id:
continue
if self.is_mine_id(other_user_id):
is_appservice = self.store.get_if_app_services_interested_in_user(
other_user_id
)
if self.is_mine_id(other_user_id) and not is_appservice:
shared_is_private = yield self.store.get_if_users_share_a_room(
other_user_id, user_id,
)

View File

@ -275,7 +275,7 @@ class HttpPusher(object):
if event.type == 'm.room.member':
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
if 'content' in event:
if not self.hs.config.push_redact_content and 'content' in event:
d['notification']['content'] = event.content
# We no longer send aliases separately, instead, we send the human

View File

@ -16,6 +16,7 @@
from ._base import BaseSlavedStore
from synapse.storage import DataStore
from synapse.config.appservice import load_appservices
from synapse.storage.appservice import _make_exclusive_regex
class SlavedApplicationServiceStore(BaseSlavedStore):
@ -25,6 +26,7 @@ class SlavedApplicationServiceStore(BaseSlavedStore):
hs.config.server_name,
hs.config.app_service_config_files
)
self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
get_app_service_by_token = DataStore.get_app_service_by_token.__func__
get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
@ -38,3 +40,6 @@ class SlavedApplicationServiceStore(BaseSlavedStore):
get_appservice_state = DataStore.get_appservice_state.__func__
set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
set_appservice_state = DataStore.set_appservice_state.__func__
get_if_app_services_interested_in_user = (
DataStore.get_if_app_services_interested_in_user.__func__
)

View File

@ -15,8 +15,9 @@
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID
from synapse.types import UserID, create_requester
from synapse.http.servlet import parse_json_object_from_request
from .base import ClientV1RestServlet, client_path_patterns
@ -157,6 +158,142 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
defer.returnValue((200, {}))
class ShutdownRoomRestServlet(ClientV1RestServlet):
"""Shuts down a room by removing all local users from the room and blocking
all future invites and joins to the room. Any local aliases will be repointed
to a new room created by `new_room_user_id` and kicked users will be auto
joined to the new room.
"""
PATTERNS = client_path_patterns("/admin/shutdown_room/(?P<room_id>[^/]+)")
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
" violatation will be blocked."
)
def __init__(self, hs):
super(ShutdownRoomRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.handlers = hs.get_handlers()
self.state = hs.get_state_handler()
@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
content = parse_json_object_from_request(request)
new_room_user_id = content.get("new_room_user_id")
if not new_room_user_id:
raise SynapseError(400, "Please provide field `new_room_user_id`")
room_creator_requester = create_requester(new_room_user_id)
message = content.get("message", self.DEFAULT_MESSAGE)
room_name = content.get("room_name", "Content Violation Notification")
info = yield self.handlers.room_creation_handler.create_room(
room_creator_requester,
config={
"preset": "public_chat",
"name": room_name,
"power_level_content_override": {
"users_default": -10,
},
},
ratelimit=False,
)
new_room_id = info["room_id"]
msg_handler = self.handlers.message_handler
yield msg_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
yield self.store.block_room(room_id, requester_user_id)
users = yield self.state.get_current_user_in_room(room_id)
kicked_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
logger.info("Kicking %r from %r...", user_id, room_id)
target_requester = create_requester(user_id)
yield self.handlers.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
action=Membership.LEAVE,
content={},
ratelimit=False
)
yield self.handlers.room_member_handler.forget(target_requester.user, room_id)
yield self.handlers.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=new_room_id,
action=Membership.JOIN,
content={},
ratelimit=False
)
kicked_users.append(user_id)
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
defer.returnValue((200, {
"kicked_users": kicked_users,
"local_aliases": aliases_for_room,
"new_room_id": new_room_id,
}))
class QuarantineMediaInRoom(ClientV1RestServlet):
"""Quarantines all media in a room so that no one can download it via
this server.
"""
PATTERNS = client_path_patterns("/admin/quarantine_media/(?P<room_id>[^/]+)")
def __init__(self, hs):
super(QuarantineMediaInRoom, self).__init__(hs)
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
num_quarantined = yield self.store.quarantine_media_ids_in_room(
room_id, requester.user.to_string(),
)
defer.returnValue((200, {"num_quarantined": num_quarantined}))
class ResetPasswordRestServlet(ClientV1RestServlet):
"""Post request to allow an administrator reset password for a user.
This need a user have a administrator access in Synapse.
@ -353,3 +490,5 @@ def register_servlets(hs, http_server):
ResetPasswordRestServlet(hs).register(http_server)
GetUsersPaginatedRestServlet(hs).register(http_server)
SearchUsersRestServlet(hs).register(http_server)
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)

View File

@ -66,13 +66,18 @@ class DownloadResource(Resource):
@defer.inlineCallbacks
def _respond_local_file(self, request, media_id, name):
media_info = yield self.store.get_local_media(media_id)
if not media_info:
if not media_info or media_info["quarantined_by"]:
respond_404(request)
return
media_type = media_info["media_type"]
media_length = media_info["media_length"]
upload_name = name if name else media_info["upload_name"]
if media_info["url_cache"]:
# TODO: Check the file still exists, if it doesn't we can redownload
# it from the url `media_info["url_cache"]`
file_path = self.filepaths.url_cache_filepath(media_id)
else:
file_path = self.filepaths.local_media_filepath(media_id)
yield respond_with_file(

View File

@ -71,3 +71,21 @@ class MediaFilePaths(object):
self.base_path, "remote_thumbnail", server_name,
file_id[0:2], file_id[2:4], file_id[4:],
)
def url_cache_filepath(self, media_id):
return os.path.join(
self.base_path, "url_cache",
media_id[0:2], media_id[2:4], media_id[4:]
)
def url_cache_thumbnail(self, media_id, width, height, content_type,
method):
top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s-%s" % (
width, height, top_level_type, sub_type, method
)
return os.path.join(
self.base_path, "url_cache_thumbnails",
media_id[0:2], media_id[2:4], media_id[4:],
file_name
)

View File

@ -135,6 +135,8 @@ class MediaRepository(object):
media_info = yield self._download_remote_file(
server_name, media_id
)
elif media_info["quarantined_by"]:
raise NotFoundError()
else:
self.recently_accessed_remotes.add((server_name, media_id))
yield self.store.update_cached_last_access_time(
@ -324,13 +326,17 @@ class MediaRepository(object):
defer.returnValue(t_path)
@defer.inlineCallbacks
def _generate_local_thumbnails(self, media_id, media_info):
def _generate_local_thumbnails(self, media_id, media_info, url_cache=False):
media_type = media_info["media_type"]
requirements = self._get_thumbnail_requirements(media_type)
if not requirements:
return
if url_cache:
input_path = self.filepaths.url_cache_filepath(media_id)
else:
input_path = self.filepaths.local_media_filepath(media_id)
thumbnailer = Thumbnailer(input_path)
m_width = thumbnailer.width
m_height = thumbnailer.height
@ -358,6 +364,11 @@ class MediaRepository(object):
for t_width, t_height, t_type in scales:
t_method = "scale"
if url_cache:
t_path = self.filepaths.url_cache_thumbnail(
media_id, t_width, t_height, t_type, t_method
)
else:
t_path = self.filepaths.local_media_thumbnail(
media_id, t_width, t_height, t_type, t_method
)
@ -375,6 +386,11 @@ class MediaRepository(object):
# thumbnail.
continue
t_method = "crop"
if url_cache:
t_path = self.filepaths.url_cache_thumbnail(
media_id, t_width, t_height, t_type, t_method
)
else:
t_path = self.filepaths.local_media_thumbnail(
media_id, t_width, t_height, t_type, t_method
)

View File

@ -164,7 +164,7 @@ class PreviewUrlResource(Resource):
if _is_media(media_info['media_type']):
dims = yield self.media_repo._generate_local_thumbnails(
media_info['filesystem_id'], media_info
media_info['filesystem_id'], media_info, url_cache=True,
)
og = {
@ -210,7 +210,7 @@ class PreviewUrlResource(Resource):
if _is_media(image_info['media_type']):
# TODO: make sure we don't choke on white-on-transparent images
dims = yield self.media_repo._generate_local_thumbnails(
image_info['filesystem_id'], image_info
image_info['filesystem_id'], image_info, url_cache=True,
)
if dims:
og["og:image:width"] = dims['width']
@ -256,7 +256,7 @@ class PreviewUrlResource(Resource):
# XXX: horrible duplication with base_resource's _download_remote_file()
file_id = random_string(24)
fname = self.filepaths.local_media_filepath(file_id)
fname = self.filepaths.url_cache_filepath(file_id)
self.media_repo._makedirs(fname)
try:
@ -303,6 +303,7 @@ class PreviewUrlResource(Resource):
upload_name=download_name,
media_length=length,
user_id=user,
url_cache=url,
)
except Exception as e:

View File

@ -81,7 +81,7 @@ class ThumbnailResource(Resource):
method, m_type):
media_info = yield self.store.get_local_media(media_id)
if not media_info:
if not media_info or media_info["quarantined_by"]:
respond_404(request)
return
@ -101,6 +101,13 @@ class ThumbnailResource(Resource):
t_type = thumbnail_info["thumbnail_type"]
t_method = thumbnail_info["thumbnail_method"]
if media_info["url_cache"]:
# TODO: Check the file still exists, if it doesn't we can redownload
# it from the url `media_info["url_cache"]`
file_path = self.filepaths.url_cache_thumbnail(
media_id, t_width, t_height, t_type, t_method,
)
else:
file_path = self.filepaths.local_media_thumbnail(
media_id, t_width, t_height, t_type, t_method,
)
@ -117,7 +124,7 @@ class ThumbnailResource(Resource):
desired_type):
media_info = yield self.store.get_local_media(media_id)
if not media_info:
if not media_info or media_info["quarantined_by"]:
respond_404(request)
return
@ -134,8 +141,17 @@ class ThumbnailResource(Resource):
t_type = info["thumbnail_type"] == desired_type
if t_w and t_h and t_method and t_type:
if media_info["url_cache"]:
# TODO: Check the file still exists, if it doesn't we can redownload
# it from the url `media_info["url_cache"]`
file_path = self.filepaths.url_cache_thumbnail(
media_id, desired_width, desired_height, desired_type,
desired_method,
)
else:
file_path = self.filepaths.local_media_thumbnail(
media_id, desired_width, desired_height, desired_type, desired_method,
media_id, desired_width, desired_height, desired_type,
desired_method,
)
yield respond_with_file(request, desired_type, file_path)
return

View File

@ -27,6 +27,25 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
def _make_exclusive_regex(services_cache):
# We precompie a regex constructed from all the regexes that the AS's
# have registered for exclusive users.
exclusive_user_regexes = [
regex.pattern
for service in services_cache
for regex in service.get_exlusive_user_regexes()
]
if exclusive_user_regexes:
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
exclusive_user_regex = re.compile(exclusive_user_regex)
else:
# We handle this case specially otherwise the constructed regex
# will always match
exclusive_user_regex = None
return exclusive_user_regex
class ApplicationServiceStore(SQLBaseStore):
def __init__(self, hs):
@ -36,21 +55,7 @@ class ApplicationServiceStore(SQLBaseStore):
hs.hostname,
hs.config.app_service_config_files
)
# We precompie a regex constructed from all the regexes that the AS's
# have registered for exclusive users.
exclusive_user_regexes = [
regex.pattern
for service in self.services_cache
for regex in service.get_exlusive_user_regexes()
]
if exclusive_user_regexes:
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
self.exclusive_user_regex = re.compile(exclusive_user_regex)
else:
# We handle this case specially otherwise the constructed regex
# will always match
self.exclusive_user_regex = None
self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
def get_app_services(self):
return self.services_cache

View File

@ -170,3 +170,17 @@ class DirectoryStore(SQLBaseStore):
"room_alias",
desc="get_aliases_for_room",
)
def update_aliases_for_room(self, old_room_id, new_room_id, creator):
def _update_aliases_for_room_txn(txn):
sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
txn.execute(sql, (new_room_id, creator, old_room_id,))
self._invalidate_cache_and_stream(
txn, self.get_aliases_for_room, (old_room_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_aliases_for_room, (new_room_id,)
)
return self.runInteraction(
"_update_aliases_for_room_txn", _update_aliases_for_room_txn
)

View File

@ -19,6 +19,7 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError, Codes
from synapse.util.caches.descriptors import cachedInlineCallbacks
from canonicaljson import encode_canonical_json
import simplejson as json
@ -46,11 +47,20 @@ class FilteringStore(SQLBaseStore):
defer.returnValue(json.loads(str(def_json).decode("utf-8")))
def add_user_filter(self, user_localpart, user_filter):
def_json = json.dumps(user_filter).encode("utf-8")
def_json = encode_canonical_json(user_filter)
# Need an atomic transaction to SELECT the maximal ID so far then
# INSERT a new one
def _do_txn(txn):
sql = (
"SELECT filter_id FROM user_filters "
"WHERE user_id = ? AND filter_json = ?"
)
txn.execute(sql, (user_localpart, def_json))
filter_id_response = txn.fetchone()
if filter_id_response is not None:
return filter_id_response[0]
sql = (
"SELECT MAX(filter_id) FROM user_filters "
"WHERE user_id = ?"

View File

@ -30,13 +30,16 @@ class MediaRepositoryStore(SQLBaseStore):
return self._simple_select_one(
"local_media_repository",
{"media_id": media_id},
("media_type", "media_length", "upload_name", "created_ts"),
(
"media_type", "media_length", "upload_name", "created_ts",
"quarantined_by", "url_cache",
),
allow_none=True,
desc="get_local_media",
)
def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
media_length, user_id):
media_length, user_id, url_cache=None):
return self._simple_insert(
"local_media_repository",
{
@ -46,6 +49,7 @@ class MediaRepositoryStore(SQLBaseStore):
"upload_name": upload_name,
"media_length": media_length,
"user_id": user_id.to_string(),
"url_cache": url_cache,
},
desc="store_local_media",
)
@ -138,7 +142,7 @@ class MediaRepositoryStore(SQLBaseStore):
{"media_origin": origin, "media_id": media_id},
(
"media_type", "media_length", "upload_name", "created_ts",
"filesystem_id",
"filesystem_id", "quarantined_by",
),
allow_none=True,
desc="get_cached_remote_media",

View File

@ -24,6 +24,7 @@ from .engines import PostgresEngine, Sqlite3Engine
import collections
import logging
import ujson as json
import re
logger = logging.getLogger(__name__)
@ -507,3 +508,98 @@ class RoomStore(SQLBaseStore):
))
else:
defer.returnValue(None)
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
return self._simple_select_one_onecol(
table="blocked_rooms",
keyvalues={
"room_id": room_id,
},
retcol="1",
allow_none=True,
desc="is_room_blocked",
)
@defer.inlineCallbacks
def block_room(self, room_id, user_id):
yield self._simple_insert(
table="blocked_rooms",
values={
"room_id": room_id,
"user_id": user_id,
},
desc="block_room",
)
self.is_room_blocked.invalidate((room_id,))
def quarantine_media_ids_in_room(self, room_id, quarantined_by):
"""For a room loops through all events with media and quarantines
the associated media
"""
def _get_media_ids_in_room(txn):
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
next_token = self.get_current_events_token() + 1
total_media_quarantined = 0
while next_token:
sql = """
SELECT stream_ordering, content FROM events
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
ORDER BY stream_ordering DESC
LIMIT ?
"""
txn.execute(sql, (room_id, next_token, True, False, 100))
next_token = None
local_media_mxcs = []
remote_media_mxcs = []
for stream_ordering, content_json in txn:
next_token = stream_ordering
content = json.loads(content_json)
content_url = content.get("url")
thumbnail_url = content.get("info", {}).get("thumbnail_url")
for url in (content_url, thumbnail_url):
if not url:
continue
matches = mxc_re.match(url)
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
if hostname == self.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))
# Now update all the tables to set the quarantined_by flag
txn.executemany("""
UPDATE local_media_repository
SET quarantined_by = ?
WHERE media_id = ?
""", ((quarantined_by, media_id) for media_id in local_media_mxcs))
txn.executemany(
"""
UPDATE remote_media_cache
SET quarantined_by = ?
WHERE media_origin AND media_id = ?
""",
(
(quarantined_by, origin, media_id)
for origin, media_id in remote_media_mxcs
)
)
total_media_quarantined += len(local_media_mxcs)
total_media_quarantined += len(remote_media_mxcs)
return total_media_quarantined
return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room)

View File

@ -0,0 +1,21 @@
/* Copyright 2017 Vector Creations Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE blocked_rooms (
room_id TEXT NOT NULL,
user_id TEXT NOT NULL -- Admin who blocked the room
);
CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id);

View File

@ -0,0 +1,17 @@
/* Copyright 2017 Vector Creations Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE local_media_repository ADD COLUMN quarantined_by TEXT;
ALTER TABLE remote_media_cache ADD COLUMN quarantined_by TEXT;

View File

@ -0,0 +1,16 @@
/* Copyright 2017 Vector Creations Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE local_media_repository ADD COLUMN url_cache TEXT;

View File

@ -439,6 +439,7 @@ class UserDirectoryStore(SQLBaseStore):
},
retcol="share_private",
allow_none=True,
desc="get_if_users_share_a_room",
)
@cachedInlineCallbacks(max_entries=500000, iterable=True)