Prevent the media store from writing outside of the configured directory

Also tighten validation of server names by forbidding invalid characters
in IPv6 addresses and empty domain labels.
This commit is contained in:
Sean Quah 2021-11-19 13:39:15 +00:00
parent 9f9d82aa84
commit 91f2bd0907
5 changed files with 483 additions and 50 deletions

View file

@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from typing import Iterable
from synapse.rest.media.v1.filepath import MediaFilePaths
from tests import unittest
@ -236,3 +239,250 @@ class MediaFilePathsTestCase(unittest.TestCase):
"/media_store/url_cache_thumbnails/Ge",
],
)
def test_server_name_validation(self):
"""Test validation of server names"""
self._test_path_validation(
[
"remote_media_filepath_rel",
"remote_media_filepath",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"remote_media_thumbnail_rel_legacy",
"remote_media_thumbnail_dir",
],
parameter="server_name",
valid_values=[
"matrix.org",
"matrix.org:8448",
"matrix-federation.matrix.org",
"matrix-federation.matrix.org:8448",
"10.1.12.123",
"10.1.12.123:8448",
"[fd00:abcd::ffff]",
"[fd00:abcd::ffff]:8448",
],
invalid_values=[
"/matrix.org",
"matrix.org/..",
"matrix.org\x00",
"",
".",
"..",
"/",
],
)
def test_file_id_validation(self):
"""Test validation of local, remote and legacy URL cache file / media IDs"""
# File / media IDs get split into three parts to form paths, consisting of the
# first two characters, next two characters and rest of the ID.
valid_file_ids = [
"GerZNDnDZVjsOtardLuwfIBg",
# Unexpected, but produces an acceptable path:
"GerZN", # "N" becomes the last directory
]
invalid_file_ids = [
"/erZNDnDZVjsOtardLuwfIBg",
"Ge/ZNDnDZVjsOtardLuwfIBg",
"GerZ/DnDZVjsOtardLuwfIBg",
"GerZ/..",
"G\x00rZNDnDZVjsOtardLuwfIBg",
"Ger\x00NDnDZVjsOtardLuwfIBg",
"GerZNDnDZVjsOtardLuwfIBg\x00",
"",
"Ge",
"GerZ",
"GerZ.",
"..rZNDnDZVjsOtardLuwfIBg",
"Ge..NDnDZVjsOtardLuwfIBg",
"GerZ..",
"GerZ/",
]
self._test_path_validation(
[
"local_media_filepath_rel",
"local_media_filepath",
"local_media_thumbnail_rel",
"local_media_thumbnail",
"local_media_thumbnail_dir",
# Legacy URL cache media IDs
"url_cache_filepath_rel",
"url_cache_filepath",
# `url_cache_filepath_dirs_to_delete` is tested below.
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
"url_cache_thumbnail_directory_rel",
"url_cache_thumbnail_directory",
"url_cache_thumbnail_dirs_to_delete",
],
parameter="media_id",
valid_values=valid_file_ids,
invalid_values=invalid_file_ids,
)
# `url_cache_filepath_dirs_to_delete` ignores what would be the last path
# component, so only the first 4 characters matter.
self._test_path_validation(
[
"url_cache_filepath_dirs_to_delete",
],
parameter="media_id",
valid_values=valid_file_ids,
invalid_values=[
"/erZNDnDZVjsOtardLuwfIBg",
"Ge/ZNDnDZVjsOtardLuwfIBg",
"G\x00rZNDnDZVjsOtardLuwfIBg",
"Ger\x00NDnDZVjsOtardLuwfIBg",
"",
"Ge",
"..rZNDnDZVjsOtardLuwfIBg",
"Ge..NDnDZVjsOtardLuwfIBg",
],
)
self._test_path_validation(
[
"remote_media_filepath_rel",
"remote_media_filepath",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"remote_media_thumbnail_rel_legacy",
"remote_media_thumbnail_dir",
],
parameter="file_id",
valid_values=valid_file_ids,
invalid_values=invalid_file_ids,
)
def test_url_cache_media_id_validation(self):
"""Test validation of URL cache media IDs"""
self._test_path_validation(
[
"url_cache_filepath_rel",
"url_cache_filepath",
# `url_cache_filepath_dirs_to_delete` only cares about the date prefix
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
"url_cache_thumbnail_directory_rel",
"url_cache_thumbnail_directory",
"url_cache_thumbnail_dirs_to_delete",
],
parameter="media_id",
valid_values=[
"2020-01-02_GerZNDnDZVjsOtar",
"2020-01-02_G", # Unexpected, but produces an acceptable path
],
invalid_values=[
"2020-01-02",
"2020-01-02-",
"2020-01-02-.",
"2020-01-02-..",
"2020-01-02-/",
"2020-01-02-/GerZNDnDZVjsOtar",
"2020-01-02-GerZNDnDZVjsOtar/..",
"2020-01-02-GerZNDnDZVjsOtar\x00",
],
)
def test_content_type_validation(self):
"""Test validation of thumbnail content types"""
self._test_path_validation(
[
"local_media_thumbnail_rel",
"local_media_thumbnail",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"remote_media_thumbnail_rel_legacy",
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
],
parameter="content_type",
valid_values=[
"image/jpeg",
],
invalid_values=[
"", # ValueError: not enough values to unpack
"image/jpeg/abc", # ValueError: too many values to unpack
"image/jpeg\x00",
],
)
def test_thumbnail_method_validation(self):
"""Test validation of thumbnail methods"""
self._test_path_validation(
[
"local_media_thumbnail_rel",
"local_media_thumbnail",
"remote_media_thumbnail_rel",
"remote_media_thumbnail",
"url_cache_thumbnail_rel",
"url_cache_thumbnail",
],
parameter="method",
valid_values=[
"crop",
"scale",
],
invalid_values=[
"/scale",
"scale/..",
"scale\x00",
"/",
],
)
def _test_path_validation(
self,
methods: Iterable[str],
parameter: str,
valid_values: Iterable[str],
invalid_values: Iterable[str],
):
"""Test that the specified methods validate the named parameter as expected
Args:
methods: The names of `MediaFilePaths` methods to test
parameter: The name of the parameter to test
valid_values: A list of parameter values that are expected to be accepted
invalid_values: A list of parameter values that are expected to be rejected
Raises:
AssertionError: If a value was accepted when it should have failed
validation.
ValueError: If a value failed validation when it should have been accepted.
"""
for method in methods:
get_path = getattr(self.filepaths, method)
parameters = inspect.signature(get_path).parameters
kwargs = {
"server_name": "matrix.org",
"media_id": "GerZNDnDZVjsOtardLuwfIBg",
"file_id": "GerZNDnDZVjsOtardLuwfIBg",
"width": 800,
"height": 600,
"content_type": "image/jpeg",
"method": "scale",
}
if get_path.__name__.startswith("url_"):
kwargs["media_id"] = "2020-01-02_GerZNDnDZVjsOtar"
kwargs = {k: v for k, v in kwargs.items() if k in parameters}
kwargs.pop(parameter)
for value in valid_values:
kwargs[parameter] = value
get_path(**kwargs)
# No exception should be raised
for value in invalid_values:
with self.assertRaises(ValueError):
kwargs[parameter] = value
path_or_list = get_path(**kwargs)
self.fail(
f"{value!r} unexpectedly passed validation: "
f"{method} returned {path_or_list!r}"
)