Fix media repository failing when media store path contains symlinks (#11446)

This commit is contained in:
Sean Quah 2021-12-02 16:05:24 +00:00 committed by GitHub
parent 435f044807
commit 858d80bf0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 176 additions and 41 deletions

View file

@ -43,47 +43,75 @@ GetPathMethod = TypeVar(
)
def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod:
def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
"""Wraps a path-returning method to check that the returned path(s) do not escape
the media store directory.
The path-returning method may return either a single path, or a list of paths.
The check is not expected to ever fail, unless `func` is missing a call to
`_validate_path_component`, or `_validate_path_component` is buggy.
Args:
func: The `MediaFilePaths` method to wrap. The method may return either a single
path, or a list of paths. Returned paths may be either absolute or relative.
relative: A boolean indicating whether the wrapped method returns paths relative
to the media store directory.
Returns:
The method, wrapped with a check to ensure that the returned path(s) lie within
the media store directory. Raises a `ValueError` if the check fails.
A method which will wrap a path-returning method, adding a check to ensure that
the returned path(s) lie within the media store directory. The check will raise
a `ValueError` if it fails.
"""
@functools.wraps(func)
def _wrapped(
self: "MediaFilePaths", *args: Any, **kwargs: Any
) -> Union[str, List[str]]:
path_or_paths = func(self, *args, **kwargs)
def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
@functools.wraps(func)
def _wrapped(
self: "MediaFilePaths", *args: Any, **kwargs: Any
) -> Union[str, List[str]]:
path_or_paths = func(self, *args, **kwargs)
if isinstance(path_or_paths, list):
paths_to_check = path_or_paths
else:
paths_to_check = [path_or_paths]
if isinstance(path_or_paths, list):
paths_to_check = path_or_paths
else:
paths_to_check = [path_or_paths]
for path in paths_to_check:
# path may be an absolute or relative path, depending on the method being
# wrapped. When "appending" an absolute path, `os.path.join` discards the
# previous path, which is desired here.
normalized_path = os.path.normpath(os.path.join(self.real_base_path, path))
if (
os.path.commonpath([normalized_path, self.real_base_path])
!= self.real_base_path
):
raise ValueError(f"Invalid media store path: {path!r}")
for path in paths_to_check:
# Construct the path that will ultimately be used.
# We cannot guess whether `path` is relative to the media store
# directory, since the media store directory may itself be a relative
# path.
if relative:
path = os.path.join(self.base_path, path)
normalized_path = os.path.normpath(path)
return path_or_paths
# Now that `normpath` has eliminated `../`s and `./`s from the path,
# `os.path.commonpath` can be used to check whether it lies within the
# media store directory.
if (
os.path.commonpath([normalized_path, self.normalized_base_path])
!= self.normalized_base_path
):
# The path resolves to outside the media store directory,
# or `self.base_path` is `.`, which is an unlikely configuration.
raise ValueError(f"Invalid media store path: {path!r}")
return cast(GetPathMethod, _wrapped)
# Note that `os.path.normpath`/`abspath` has a subtle caveat:
# `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
# different path if `a/b/c` is a symlink. That is, the check above is
# not perfect and may allow a certain restricted subset of untrustworthy
# paths through. Since the check above is secondary to the main
# `_validate_path_component` checks, it's less important for it to be
# perfect.
#
# As an alternative, `os.path.realpath` will resolve symlinks, but
# proves problematic if there are symlinks inside the media store.
# eg. if `url_store/` is symlinked to elsewhere, its canonical path
# won't match that of the main media store directory.
return path_or_paths
return cast(GetPathMethod, _wrapped)
return _wrap_with_jail_check_inner
ALLOWED_CHARACTERS = set(
@ -127,9 +155,7 @@ class MediaFilePaths:
def __init__(self, primary_base_path: str):
self.base_path = primary_base_path
# The media store directory, with all symlinks resolved.
self.real_base_path = os.path.realpath(primary_base_path)
self.normalized_base_path = os.path.normpath(self.base_path)
# Refuse to initialize if paths cannot be validated correctly for the current
# platform.
@ -140,7 +166,7 @@ class MediaFilePaths:
# for certain homeservers there, since ":"s aren't allowed in paths.
assert os.name == "posix"
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def local_media_filepath_rel(self, media_id: str) -> str:
return os.path.join(
"local_content",
@ -151,7 +177,7 @@ class MediaFilePaths:
local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def local_media_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
@ -167,7 +193,7 @@ class MediaFilePaths:
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=False)
def local_media_thumbnail_dir(self, media_id: str) -> str:
"""
Retrieve the local store path of thumbnails of a given media_id
@ -185,7 +211,7 @@ class MediaFilePaths:
_validate_path_component(media_id[4:]),
)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
return os.path.join(
"remote_content",
@ -197,7 +223,7 @@ class MediaFilePaths:
remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel(
self,
server_name: str,
@ -223,7 +249,7 @@ class MediaFilePaths:
# Legacy path that was used to store thumbnails previously.
# Should be removed after some time, when most of the thumbnails are stored
# using the new path.
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel_legacy(
self, server_name: str, file_id: str, width: int, height: int, content_type: str
) -> str:
@ -238,6 +264,7 @@ class MediaFilePaths:
_validate_path_component(file_name),
)
@_wrap_with_jail_check(relative=False)
def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
return os.path.join(
self.base_path,
@ -248,7 +275,7 @@ class MediaFilePaths:
_validate_path_component(file_id[4:]),
)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def url_cache_filepath_rel(self, media_id: str) -> str:
if NEW_FORMAT_ID_RE.match(media_id):
# Media id is of the form <DATE><RANDOM_STRING>
@ -268,7 +295,7 @@ class MediaFilePaths:
url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=False)
def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id file"
if NEW_FORMAT_ID_RE.match(media_id):
@ -290,7 +317,7 @@ class MediaFilePaths:
),
]
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
@ -318,7 +345,7 @@ class MediaFilePaths:
url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
@ -341,7 +368,7 @@ class MediaFilePaths:
url_cache_thumbnail_directory_rel
)
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=False)
def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id thumbnails"
# Media id is of the form <DATE><RANDOM_STRING>