Synapse 1.47.1 (2021-11-23)

===========================
 
 This release fixes a security issue in the media store, affecting all prior releases of Synapse. Server administrators are encouraged to update Synapse as soon as possible. We are not aware of these vulnerabilities being exploited in the wild.
 
 Server administrators who are unable to update Synapse may use the workarounds described in the linked GitHub Security Advisory below.
 
 Security advisory
 -----------------
 
 The following issue is fixed in 1.47.1.
 
 - **[GHSA-3hfw-x7gx-437c](https://github.com/matrix-org/synapse/security/advisories/GHSA-3hfw-x7gx-437c) / [CVE-2021-41281](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-41281): Path traversal when downloading remote media.**
 
   Synapse instances with the media repository enabled can be tricked into downloading a file from a remote server into an arbitrary directory, potentially outside the media store directory.
 
   The last two directories and file name of the path are chosen randomly by Synapse and cannot be controlled by an attacker, which limits the impact.
 
   Homeservers with the media repository disabled are unaffected. Homeservers configured with a federation whitelist are also unaffected.
 
   Fixed by [91f2bd090](https://github.com/matrix-org/synapse/commit/91f2bd090).
 -----BEGIN PGP SIGNATURE-----
 
 iQGzBAABCgAdFiEEWMTnW8Z8khaaf90R+84KzgcyGG8FAmGc1oUACgkQ+84Kzgcy
 GG9KbQv/e2KDS5Dfp0pxbVBDP368EaG7zh98/LiIPVzBHSj39BLDTaqCFSXYVlqd
 bQKKcYVt/iJFAcPnUPKV0qsqhrxbnaU1V0oeI8h5IaVUHIE5m8HVTMss5lvbLyIq
 flWbgYedoSL+vIFNFbsZqIGgFls0b0VDJJQCpl3gmkJgtruw/iFHaTnRjReooFpH
 ktK/Kt7iiGpgj2R9KsuZfmrUXJuCG4wiOjzkq/lYBwfMT5I0H8+wxG208YOvpcmb
 yVIPVCb7N2ol/6nrfFcH+vpMIuUjJnNn2XFVSEKyEBdh8oO19tVditMM3o1M/vSG
 A56D8lqcwhorr6utev1hFW/fxCTNkschxGSktAd9FN79b5OupKG1OlRI6bQJmuKa
 KecspFC/pxsB/tMRkp2XOGuIPuFXB2w9Uf4ZM3g2I1LaJ4omr/vX0f+1CDG+DhGd
 /xVif9Pw5AqRXdZQIBK2ZQq9DISdjcHhWgSrmQn5Hpp9uDrc4DR21RAXyApOd+vF
 cwP9WTnF
 =B6uZ
 -----END PGP SIGNATURE-----

Merge tag 'v1.47.1'

Synapse 1.47.1 (2021-11-23)
===========================

This release fixes a security issue in the media store, affecting all prior releases of Synapse. Server administrators are encouraged to update Synapse as soon as possible. We are not aware of these vulnerabilities being exploited in the wild.

Server administrators who are unable to update Synapse may use the workarounds described in the linked GitHub Security Advisory below.

Security advisory
-----------------

The following issue is fixed in 1.47.1.

- **[GHSA-3hfw-x7gx-437c](https://github.com/matrix-org/synapse/security/advisories/GHSA-3hfw-x7gx-437c) / [CVE-2021-41281](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-41281): Path traversal when downloading remote media.**

  Synapse instances with the media repository enabled can be tricked into downloading a file from a remote server into an arbitrary directory, potentially outside the media store directory.

  The last two directories and file name of the path are chosen randomly by Synapse and cannot be controlled by an attacker, which limits the impact.

  Homeservers with the media repository disabled are unaffected. Homeservers configured with a federation whitelist are also unaffected.

  Fixed by [91f2bd090](https://github.com/matrix-org/synapse/commit/91f2bd090).
This commit is contained in:
Sean Quah 2021-11-23 12:39:09 +00:00
commit fcb9441791
8 changed files with 513 additions and 51 deletions

View File

@ -1,3 +1,26 @@
Synapse 1.47.1 (2021-11-23)
===========================
This release fixes a security issue in the media store, affecting all prior releases of Synapse. Server administrators are encouraged to update Synapse as soon as possible. We are not aware of these vulnerabilities being exploited in the wild.
Server administrators who are unable to update Synapse may use the workarounds described in the linked GitHub Security Advisory below.
Security advisory
-----------------
The following issue is fixed in 1.47.1.
- **[GHSA-3hfw-x7gx-437c](https://github.com/matrix-org/synapse/security/advisories/GHSA-3hfw-x7gx-437c) / [CVE-2021-41281](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-41281): Path traversal when downloading remote media.**
Synapse instances with the media repository enabled can be tricked into downloading a file from a remote server into an arbitrary directory, potentially outside the media store directory.
The last two directories and file name of the path are chosen randomly by Synapse and cannot be controlled by an attacker, which limits the impact.
Homeservers with the media repository disabled are unaffected. Homeservers configured with a federation whitelist are also unaffected.
Fixed by [91f2bd090](https://github.com/matrix-org/synapse/commit/91f2bd090).
Synapse 1.47.0 (2021-11-17) Synapse 1.47.0 (2021-11-17)
=========================== ===========================

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.47.1) stable; urgency=medium
* New synapse release 1.47.1.
-- Synapse Packaging team <packages@matrix.org> Fri, 19 Nov 2021 13:44:32 +0000
matrix-synapse-py3 (1.47.0) stable; urgency=medium matrix-synapse-py3 (1.47.0) stable; urgency=medium
* New synapse release 1.47.0. * New synapse release 1.47.0.

View File

@ -47,7 +47,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.47.0" __version__ = "1.47.1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View File

@ -29,7 +29,7 @@ from synapse.api.errors import Codes, SynapseError, cs_error
from synapse.http.server import finish_request, respond_with_json from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable from synapse.logging.context import make_deferred_yieldable
from synapse.util.stringutils import is_ascii from synapse.util.stringutils import is_ascii, parse_and_validate_server_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -51,6 +51,19 @@ TEXT_CONTENT_TYPES = [
def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]:
"""Parses the server name, media ID and optional file name from the request URI
Also performs some rough validation on the server name.
Args:
request: The `Request`.
Returns:
A tuple containing the parsed server name, media ID and optional file name.
Raises:
SynapseError(404): if parsing or validation fail for any reason
"""
try: try:
# The type on postpath seems incorrect in Twisted 21.2.0. # The type on postpath seems incorrect in Twisted 21.2.0.
postpath: List[bytes] = request.postpath # type: ignore postpath: List[bytes] = request.postpath # type: ignore
@ -62,6 +75,9 @@ def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]:
server_name = server_name_bytes.decode("utf-8") server_name = server_name_bytes.decode("utf-8")
media_id = media_id_bytes.decode("utf8") media_id = media_id_bytes.decode("utf8")
# Validate the server name, raising if invalid
parse_and_validate_server_name(server_name)
file_name = None file_name = None
if len(postpath) > 2: if len(postpath) > 2:
try: try:

View File

@ -16,7 +16,8 @@
import functools import functools
import os import os
import re import re
from typing import Any, Callable, List, TypeVar, cast import string
from typing import Any, Callable, List, TypeVar, Union, cast
NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d") NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
@ -37,6 +38,85 @@ def _wrap_in_base_path(func: F) -> F:
return cast(F, _wrapped) return cast(F, _wrapped)
GetPathMethod = TypeVar(
"GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
)
def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod:
"""Wraps a path-returning method to check that the returned path(s) do not escape
the media store directory.
The check is not expected to ever fail, unless `func` is missing a call to
`_validate_path_component`, or `_validate_path_component` is buggy.
Args:
func: The `MediaFilePaths` method to wrap. The method may return either a single
path, or a list of paths. Returned paths may be either absolute or relative.
Returns:
The method, wrapped with a check to ensure that the returned path(s) lie within
the media store directory. Raises a `ValueError` if the check fails.
"""
@functools.wraps(func)
def _wrapped(
self: "MediaFilePaths", *args: Any, **kwargs: Any
) -> Union[str, List[str]]:
path_or_paths = func(self, *args, **kwargs)
if isinstance(path_or_paths, list):
paths_to_check = path_or_paths
else:
paths_to_check = [path_or_paths]
for path in paths_to_check:
# path may be an absolute or relative path, depending on the method being
# wrapped. When "appending" an absolute path, `os.path.join` discards the
# previous path, which is desired here.
normalized_path = os.path.normpath(os.path.join(self.real_base_path, path))
if (
os.path.commonpath([normalized_path, self.real_base_path])
!= self.real_base_path
):
raise ValueError(f"Invalid media store path: {path!r}")
return path_or_paths
return cast(GetPathMethod, _wrapped)
ALLOWED_CHARACTERS = set(
string.ascii_letters
+ string.digits
+ "_-"
+ ".[]:" # Domain names, IPv6 addresses and ports in server names
)
FORBIDDEN_NAMES = {
"",
os.path.curdir, # "." for the current platform
os.path.pardir, # ".." for the current platform
}
def _validate_path_component(name: str) -> str:
"""Checks that the given string can be safely used as a path component
Args:
name: The path component to check.
Returns:
The path component if valid.
Raises:
ValueError: If `name` cannot be safely used as a path component.
"""
if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
raise ValueError(f"Invalid path component: {name!r}")
return name
class MediaFilePaths: class MediaFilePaths:
"""Describes where files are stored on disk. """Describes where files are stored on disk.
@ -48,22 +128,46 @@ class MediaFilePaths:
def __init__(self, primary_base_path: str): def __init__(self, primary_base_path: str):
self.base_path = primary_base_path self.base_path = primary_base_path
# The media store directory, with all symlinks resolved.
self.real_base_path = os.path.realpath(primary_base_path)
# Refuse to initialize if paths cannot be validated correctly for the current
# platform.
assert os.path.sep not in ALLOWED_CHARACTERS
assert os.path.altsep not in ALLOWED_CHARACTERS
# On Windows, paths have all sorts of weirdness which `_validate_path_component`
# does not consider. In any case, the remote media store can't work correctly
# for certain homeservers there, since ":"s aren't allowed in paths.
assert os.name == "posix"
@_wrap_with_jail_check
def local_media_filepath_rel(self, media_id: str) -> str: def local_media_filepath_rel(self, media_id: str) -> str:
return os.path.join("local_content", media_id[0:2], media_id[2:4], media_id[4:]) return os.path.join(
"local_content",
_validate_path_component(media_id[0:2]),
_validate_path_component(media_id[2:4]),
_validate_path_component(media_id[4:]),
)
local_media_filepath = _wrap_in_base_path(local_media_filepath_rel) local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
@_wrap_with_jail_check
def local_media_thumbnail_rel( def local_media_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str: ) -> str:
top_level_type, sub_type = content_type.split("/") top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method) file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
return os.path.join( return os.path.join(
"local_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name "local_thumbnails",
_validate_path_component(media_id[0:2]),
_validate_path_component(media_id[2:4]),
_validate_path_component(media_id[4:]),
_validate_path_component(file_name),
) )
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel) local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
@_wrap_with_jail_check
def local_media_thumbnail_dir(self, media_id: str) -> str: def local_media_thumbnail_dir(self, media_id: str) -> str:
""" """
Retrieve the local store path of thumbnails of a given media_id Retrieve the local store path of thumbnails of a given media_id
@ -76,18 +180,24 @@ class MediaFilePaths:
return os.path.join( return os.path.join(
self.base_path, self.base_path,
"local_thumbnails", "local_thumbnails",
media_id[0:2], _validate_path_component(media_id[0:2]),
media_id[2:4], _validate_path_component(media_id[2:4]),
media_id[4:], _validate_path_component(media_id[4:]),
) )
@_wrap_with_jail_check
def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str: def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
return os.path.join( return os.path.join(
"remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:] "remote_content",
_validate_path_component(server_name),
_validate_path_component(file_id[0:2]),
_validate_path_component(file_id[2:4]),
_validate_path_component(file_id[4:]),
) )
remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel) remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
@_wrap_with_jail_check
def remote_media_thumbnail_rel( def remote_media_thumbnail_rel(
self, self,
server_name: str, server_name: str,
@ -101,11 +211,11 @@ class MediaFilePaths:
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method) file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
return os.path.join( return os.path.join(
"remote_thumbnail", "remote_thumbnail",
server_name, _validate_path_component(server_name),
file_id[0:2], _validate_path_component(file_id[0:2]),
file_id[2:4], _validate_path_component(file_id[2:4]),
file_id[4:], _validate_path_component(file_id[4:]),
file_name, _validate_path_component(file_name),
) )
remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel) remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
@ -113,6 +223,7 @@ class MediaFilePaths:
# Legacy path that was used to store thumbnails previously. # Legacy path that was used to store thumbnails previously.
# Should be removed after some time, when most of the thumbnails are stored # Should be removed after some time, when most of the thumbnails are stored
# using the new path. # using the new path.
@_wrap_with_jail_check
def remote_media_thumbnail_rel_legacy( def remote_media_thumbnail_rel_legacy(
self, server_name: str, file_id: str, width: int, height: int, content_type: str self, server_name: str, file_id: str, width: int, height: int, content_type: str
) -> str: ) -> str:
@ -120,43 +231,66 @@ class MediaFilePaths:
file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type) file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
return os.path.join( return os.path.join(
"remote_thumbnail", "remote_thumbnail",
server_name, _validate_path_component(server_name),
file_id[0:2], _validate_path_component(file_id[0:2]),
file_id[2:4], _validate_path_component(file_id[2:4]),
file_id[4:], _validate_path_component(file_id[4:]),
file_name, _validate_path_component(file_name),
) )
def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str: def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
return os.path.join( return os.path.join(
self.base_path, self.base_path,
"remote_thumbnail", "remote_thumbnail",
server_name, _validate_path_component(server_name),
file_id[0:2], _validate_path_component(file_id[0:2]),
file_id[2:4], _validate_path_component(file_id[2:4]),
file_id[4:], _validate_path_component(file_id[4:]),
) )
@_wrap_with_jail_check
def url_cache_filepath_rel(self, media_id: str) -> str: def url_cache_filepath_rel(self, media_id: str) -> str:
if NEW_FORMAT_ID_RE.match(media_id): if NEW_FORMAT_ID_RE.match(media_id):
# Media id is of the form <DATE><RANDOM_STRING> # Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf # E.g.: 2017-09-28-fsdRDt24DS234dsf
return os.path.join("url_cache", media_id[:10], media_id[11:]) return os.path.join(
"url_cache",
_validate_path_component(media_id[:10]),
_validate_path_component(media_id[11:]),
)
else: else:
return os.path.join("url_cache", media_id[0:2], media_id[2:4], media_id[4:]) return os.path.join(
"url_cache",
_validate_path_component(media_id[0:2]),
_validate_path_component(media_id[2:4]),
_validate_path_component(media_id[4:]),
)
url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel) url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
@_wrap_with_jail_check
def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]: def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id file" "The dirs to try and remove if we delete the media_id file"
if NEW_FORMAT_ID_RE.match(media_id): if NEW_FORMAT_ID_RE.match(media_id):
return [os.path.join(self.base_path, "url_cache", media_id[:10])] return [
os.path.join(
self.base_path, "url_cache", _validate_path_component(media_id[:10])
)
]
else: else:
return [ return [
os.path.join(self.base_path, "url_cache", media_id[0:2], media_id[2:4]), os.path.join(
os.path.join(self.base_path, "url_cache", media_id[0:2]), self.base_path,
"url_cache",
_validate_path_component(media_id[0:2]),
_validate_path_component(media_id[2:4]),
),
os.path.join(
self.base_path, "url_cache", _validate_path_component(media_id[0:2])
),
] ]
@_wrap_with_jail_check
def url_cache_thumbnail_rel( def url_cache_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str: ) -> str:
@ -168,37 +302,46 @@ class MediaFilePaths:
if NEW_FORMAT_ID_RE.match(media_id): if NEW_FORMAT_ID_RE.match(media_id):
return os.path.join( return os.path.join(
"url_cache_thumbnails", media_id[:10], media_id[11:], file_name "url_cache_thumbnails",
_validate_path_component(media_id[:10]),
_validate_path_component(media_id[11:]),
_validate_path_component(file_name),
) )
else: else:
return os.path.join( return os.path.join(
"url_cache_thumbnails", "url_cache_thumbnails",
media_id[0:2], _validate_path_component(media_id[0:2]),
media_id[2:4], _validate_path_component(media_id[2:4]),
media_id[4:], _validate_path_component(media_id[4:]),
file_name, _validate_path_component(file_name),
) )
url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel) url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
@_wrap_with_jail_check
def url_cache_thumbnail_directory_rel(self, media_id: str) -> str: def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
# Media id is of the form <DATE><RANDOM_STRING> # Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf # E.g.: 2017-09-28-fsdRDt24DS234dsf
if NEW_FORMAT_ID_RE.match(media_id): if NEW_FORMAT_ID_RE.match(media_id):
return os.path.join("url_cache_thumbnails", media_id[:10], media_id[11:]) return os.path.join(
"url_cache_thumbnails",
_validate_path_component(media_id[:10]),
_validate_path_component(media_id[11:]),
)
else: else:
return os.path.join( return os.path.join(
"url_cache_thumbnails", "url_cache_thumbnails",
media_id[0:2], _validate_path_component(media_id[0:2]),
media_id[2:4], _validate_path_component(media_id[2:4]),
media_id[4:], _validate_path_component(media_id[4:]),
) )
url_cache_thumbnail_directory = _wrap_in_base_path( url_cache_thumbnail_directory = _wrap_in_base_path(
url_cache_thumbnail_directory_rel url_cache_thumbnail_directory_rel
) )
@_wrap_with_jail_check
def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]: def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id thumbnails" "The dirs to try and remove if we delete the media_id thumbnails"
# Media id is of the form <DATE><RANDOM_STRING> # Media id is of the form <DATE><RANDOM_STRING>
@ -206,21 +349,35 @@ class MediaFilePaths:
if NEW_FORMAT_ID_RE.match(media_id): if NEW_FORMAT_ID_RE.match(media_id):
return [ return [
os.path.join( os.path.join(
self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:] self.base_path,
"url_cache_thumbnails",
_validate_path_component(media_id[:10]),
_validate_path_component(media_id[11:]),
),
os.path.join(
self.base_path,
"url_cache_thumbnails",
_validate_path_component(media_id[:10]),
), ),
os.path.join(self.base_path, "url_cache_thumbnails", media_id[:10]),
] ]
else: else:
return [ return [
os.path.join( os.path.join(
self.base_path, self.base_path,
"url_cache_thumbnails", "url_cache_thumbnails",
media_id[0:2], _validate_path_component(media_id[0:2]),
media_id[2:4], _validate_path_component(media_id[2:4]),
media_id[4:], _validate_path_component(media_id[4:]),
), ),
os.path.join( os.path.join(
self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4] self.base_path,
"url_cache_thumbnails",
_validate_path_component(media_id[0:2]),
_validate_path_component(media_id[2:4]),
),
os.path.join(
self.base_path,
"url_cache_thumbnails",
_validate_path_component(media_id[0:2]),
), ),
os.path.join(self.base_path, "url_cache_thumbnails", media_id[0:2]),
] ]

View File

@ -19,6 +19,8 @@ import string
from collections.abc import Iterable from collections.abc import Iterable
from typing import Optional, Tuple from typing import Optional, Tuple
from netaddr import valid_ipv6
from synapse.api.errors import Codes, SynapseError from synapse.api.errors import Codes, SynapseError
_string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@" _string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
@ -97,7 +99,10 @@ def parse_server_name(server_name: str) -> Tuple[str, Optional[int]]:
raise ValueError("Invalid server name '%s'" % server_name) raise ValueError("Invalid server name '%s'" % server_name)
VALID_HOST_REGEX = re.compile("\\A[0-9a-zA-Z.-]+\\Z") # An approximation of the domain name syntax in RFC 1035, section 2.3.1.
# NB: "\Z" is not equivalent to "$".
# The latter will match the position before a "\n" at the end of a string.
VALID_HOST_REGEX = re.compile("\\A[0-9a-zA-Z-]+(?:\\.[0-9a-zA-Z-]+)*\\Z")
def parse_and_validate_server_name(server_name: str) -> Tuple[str, Optional[int]]: def parse_and_validate_server_name(server_name: str) -> Tuple[str, Optional[int]]:
@ -122,13 +127,15 @@ def parse_and_validate_server_name(server_name: str) -> Tuple[str, Optional[int]
if host[0] == "[": if host[0] == "[":
if host[-1] != "]": if host[-1] != "]":
raise ValueError("Mismatched [...] in server name '%s'" % (server_name,)) raise ValueError("Mismatched [...] in server name '%s'" % (server_name,))
return host, port
# otherwise it should only be alphanumerics. # valid_ipv6 raises when given an empty string
if not VALID_HOST_REGEX.match(host): ipv6_address = host[1:-1]
raise ValueError( if not ipv6_address or not valid_ipv6(ipv6_address):
"Server name '%s' contains invalid characters" % (server_name,) raise ValueError(
) "Server name '%s' is not a valid IPv6 address" % (server_name,)
)
elif not VALID_HOST_REGEX.match(host):
raise ValueError("Server name '%s' has an invalid format" % (server_name,))
return host, port return host, port

View File

@ -36,8 +36,11 @@ class ServerNameTestCase(unittest.TestCase):
"localhost:http", # non-numeric port "localhost:http", # non-numeric port
"1234]", # smells like ipv6 literal but isn't "1234]", # smells like ipv6 literal but isn't
"[1234", "[1234",
"[1.2.3.4]",
"underscore_.com", "underscore_.com",
"percent%65.com", "percent%65.com",
"newline.com\n",
".empty-label.com",
"1234:5678:80", # too many colons "1234:5678:80", # too many colons
] ]
for i in test_data: for i in test_data:

View File

@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import inspect
from typing import Iterable
from synapse.rest.media.v1.filepath import MediaFilePaths from synapse.rest.media.v1.filepath import MediaFilePaths
from tests import unittest from tests import unittest
@ -236,3 +239,250 @@ class MediaFilePathsTestCase(unittest.TestCase):
"/media_store/url_cache_thumbnails/Ge", "/media_store/url_cache_thumbnails/Ge",
], ],
) )
def test_server_name_validation(self):
"""Test validation of server names"""
self._test_path_validation(
[
"remote_media_filepath_rel",
"remote_media_filepath",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"remote_media_thumbnail_rel_legacy",
"remote_media_thumbnail_dir",
],
parameter="server_name",
valid_values=[
"matrix.org",
"matrix.org:8448",
"matrix-federation.matrix.org",
"matrix-federation.matrix.org:8448",
"10.1.12.123",
"10.1.12.123:8448",
"[fd00:abcd::ffff]",
"[fd00:abcd::ffff]:8448",
],
invalid_values=[
"/matrix.org",
"matrix.org/..",
"matrix.org\x00",
"",
".",
"..",
"/",
],
)
def test_file_id_validation(self):
"""Test validation of local, remote and legacy URL cache file / media IDs"""
# File / media IDs get split into three parts to form paths, consisting of the
# first two characters, next two characters and rest of the ID.
valid_file_ids = [
"GerZNDnDZVjsOtardLuwfIBg",
# Unexpected, but produces an acceptable path:
"GerZN", # "N" becomes the last directory
]
invalid_file_ids = [
"/erZNDnDZVjsOtardLuwfIBg",
"Ge/ZNDnDZVjsOtardLuwfIBg",
"GerZ/DnDZVjsOtardLuwfIBg",
"GerZ/..",
"G\x00rZNDnDZVjsOtardLuwfIBg",
"Ger\x00NDnDZVjsOtardLuwfIBg",
"GerZNDnDZVjsOtardLuwfIBg\x00",
"",
"Ge",
"GerZ",
"GerZ.",
"..rZNDnDZVjsOtardLuwfIBg",
"Ge..NDnDZVjsOtardLuwfIBg",
"GerZ..",
"GerZ/",
]
self._test_path_validation(
[
"local_media_filepath_rel",
"local_media_filepath",
"local_media_thumbnail_rel",
"local_media_thumbnail",
"local_media_thumbnail_dir",
# Legacy URL cache media IDs
"url_cache_filepath_rel",
"url_cache_filepath",
# `url_cache_filepath_dirs_to_delete` is tested below.
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
"url_cache_thumbnail_directory_rel",
"url_cache_thumbnail_directory",
"url_cache_thumbnail_dirs_to_delete",
],
parameter="media_id",
valid_values=valid_file_ids,
invalid_values=invalid_file_ids,
)
# `url_cache_filepath_dirs_to_delete` ignores what would be the last path
# component, so only the first 4 characters matter.
self._test_path_validation(
[
"url_cache_filepath_dirs_to_delete",
],
parameter="media_id",
valid_values=valid_file_ids,
invalid_values=[
"/erZNDnDZVjsOtardLuwfIBg",
"Ge/ZNDnDZVjsOtardLuwfIBg",
"G\x00rZNDnDZVjsOtardLuwfIBg",
"Ger\x00NDnDZVjsOtardLuwfIBg",
"",
"Ge",
"..rZNDnDZVjsOtardLuwfIBg",
"Ge..NDnDZVjsOtardLuwfIBg",
],
)
self._test_path_validation(
[
"remote_media_filepath_rel",
"remote_media_filepath",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"remote_media_thumbnail_rel_legacy",
"remote_media_thumbnail_dir",
],
parameter="file_id",
valid_values=valid_file_ids,
invalid_values=invalid_file_ids,
)
def test_url_cache_media_id_validation(self):
"""Test validation of URL cache media IDs"""
self._test_path_validation(
[
"url_cache_filepath_rel",
"url_cache_filepath",
# `url_cache_filepath_dirs_to_delete` only cares about the date prefix
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
"url_cache_thumbnail_directory_rel",
"url_cache_thumbnail_directory",
"url_cache_thumbnail_dirs_to_delete",
],
parameter="media_id",
valid_values=[
"2020-01-02_GerZNDnDZVjsOtar",
"2020-01-02_G", # Unexpected, but produces an acceptable path
],
invalid_values=[
"2020-01-02",
"2020-01-02-",
"2020-01-02-.",
"2020-01-02-..",
"2020-01-02-/",
"2020-01-02-/GerZNDnDZVjsOtar",
"2020-01-02-GerZNDnDZVjsOtar/..",
"2020-01-02-GerZNDnDZVjsOtar\x00",
],
)
def test_content_type_validation(self):
"""Test validation of thumbnail content types"""
self._test_path_validation(
[
"local_media_thumbnail_rel",
"local_media_thumbnail",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"remote_media_thumbnail_rel_legacy",
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
],
parameter="content_type",
valid_values=[
"image/jpeg",
],
invalid_values=[
"", # ValueError: not enough values to unpack
"image/jpeg/abc", # ValueError: too many values to unpack
"image/jpeg\x00",
],
)
def test_thumbnail_method_validation(self):
"""Test validation of thumbnail methods"""
self._test_path_validation(
[
"local_media_thumbnail_rel",
"local_media_thumbnail",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
],
parameter="method",
valid_values=[
"crop",
"scale",
],
invalid_values=[
"/scale",
"scale/..",
"scale\x00",
"/",
],
)
def _test_path_validation(
self,
methods: Iterable[str],
parameter: str,
valid_values: Iterable[str],
invalid_values: Iterable[str],
):
"""Test that the specified methods validate the named parameter as expected
Args:
methods: The names of `MediaFilePaths` methods to test
parameter: The name of the parameter to test
valid_values: A list of parameter values that are expected to be accepted
invalid_values: A list of parameter values that are expected to be rejected
Raises:
AssertionError: If a value was accepted when it should have failed
validation.
ValueError: If a value failed validation when it should have been accepted.
"""
for method in methods:
get_path = getattr(self.filepaths, method)
parameters = inspect.signature(get_path).parameters
kwargs = {
"server_name": "matrix.org",
"media_id": "GerZNDnDZVjsOtardLuwfIBg",
"file_id": "GerZNDnDZVjsOtardLuwfIBg",
"width": 800,
"height": 600,
"content_type": "image/jpeg",
"method": "scale",
}
if get_path.__name__.startswith("url_"):
kwargs["media_id"] = "2020-01-02_GerZNDnDZVjsOtar"
kwargs = {k: v for k, v in kwargs.items() if k in parameters}
kwargs.pop(parameter)
for value in valid_values:
kwargs[parameter] = value
get_path(**kwargs)
# No exception should be raised
for value in invalid_values:
with self.assertRaises(ValueError):
kwargs[parameter] = value
path_or_list = get_path(**kwargs)
self.fail(
f"{value!r} unexpectedly passed validation: "
f"{method} returned {path_or_list!r}"
)