From 2fc787c341ff540e5880932f116498ec0ed7a2c2 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 31 May 2022 17:35:29 +0100 Subject: [PATCH] Add config options for media retention (#12732) --- changelog.d/12732.feature | 1 + .../configuration/config_documentation.md | 29 ++- synapse/config/repository.py | 16 ++ synapse/rest/media/v1/media_repository.py | 71 +++++- tests/rest/media/test_media_retention.py | 238 ++++++++++++++++++ 5 files changed, 353 insertions(+), 2 deletions(-) create mode 100644 changelog.d/12732.feature create mode 100644 tests/rest/media/test_media_retention.py diff --git a/changelog.d/12732.feature b/changelog.d/12732.feature new file mode 100644 index 000000000..3c73363d2 --- /dev/null +++ b/changelog.d/12732.feature @@ -0,0 +1 @@ +Add new `media_retention` options to the homeserver config for routinely cleaning up non-recently accessed media. \ No newline at end of file diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 88b9e5744..1c75a23a3 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1459,7 +1459,7 @@ federation_rr_transactions_per_room_per_second: 40 ``` --- ## Media Store ## -Config options relating to Synapse media store. +Config options related to Synapse's media store. --- Config option: `enable_media_repo` @@ -1563,6 +1563,33 @@ thumbnail_sizes: height: 600 method: scale ``` +--- +Config option: `media_retention` + +Controls whether local media and entries in the remote media cache +(media that is downloaded from other homeservers) should be removed +under certain conditions, typically for the purpose of saving space. + +Purging media files will be the carried out by the media worker +(that is, the worker that has the `enable_media_repo` homeserver config +option set to 'true'). This may be the main process. + +The `media_retention.local_media_lifetime` and +`media_retention.remote_media_lifetime` config options control whether +media will be purged if it has not been accessed in a given amount of +time. Note that media is 'accessed' when loaded in a room in a client, or +otherwise downloaded by a local or remote user. If the media has never +been accessed, the media's creation time is used instead. Both thumbnails +and the original media will be removed. If either of these options are unset, +then media of that type will not be purged. + +Example configuration: +```yaml +media_retention: + local_media_lifetime: 90d + remote_media_lifetime: 14d +``` +--- Config option: `url_preview_enabled` This setting determines whether the preview URL API is enabled. diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 98d8a1662..f9c55143c 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -223,6 +223,22 @@ class ContentRepositoryConfig(Config): "url_preview_accept_language" ) or ["en"] + media_retention = config.get("media_retention") or {} + + self.media_retention_local_media_lifetime_ms = None + local_media_lifetime = media_retention.get("local_media_lifetime") + if local_media_lifetime is not None: + self.media_retention_local_media_lifetime_ms = self.parse_duration( + local_media_lifetime + ) + + self.media_retention_remote_media_lifetime_ms = None + remote_media_lifetime = media_retention.get("remote_media_lifetime") + if remote_media_lifetime is not None: + self.media_retention_remote_media_lifetime_ms = self.parse_duration( + remote_media_lifetime + ) + def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str: assert data_dir_path is not None media_store = os.path.join(data_dir_path, "media_store") diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 3e5d6c629..20af36653 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -65,7 +65,12 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 +# How often to run the background job to update the "recently accessed" +# attribute of local and remote media. +UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 # 1 minute +# How often to run the background job to check for local and remote media +# that should be purged according to the configured media retention settings. +MEDIA_RETENTION_CHECK_PERIOD_MS = 60 * 60 * 1000 # 1 hour class MediaRepository: @@ -122,11 +127,36 @@ class MediaRepository: self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS ) + # Media retention configuration options + self._media_retention_local_media_lifetime_ms = ( + hs.config.media.media_retention_local_media_lifetime_ms + ) + self._media_retention_remote_media_lifetime_ms = ( + hs.config.media.media_retention_remote_media_lifetime_ms + ) + + # Check whether local or remote media retention is configured + if ( + hs.config.media.media_retention_local_media_lifetime_ms is not None + or hs.config.media.media_retention_remote_media_lifetime_ms is not None + ): + # Run the background job to apply media retention rules routinely, + # with the duration between runs dictated by the homeserver config. + self.clock.looping_call( + self._start_apply_media_retention_rules, + MEDIA_RETENTION_CHECK_PERIOD_MS, + ) + def _start_update_recently_accessed(self) -> Deferred: return run_as_background_process( "update_recently_accessed_media", self._update_recently_accessed ) + def _start_apply_media_retention_rules(self) -> Deferred: + return run_as_background_process( + "apply_media_retention_rules", self._apply_media_retention_rules + ) + async def _update_recently_accessed(self) -> None: remote_media = self.recently_accessed_remotes self.recently_accessed_remotes = set() @@ -835,6 +865,45 @@ class MediaRepository: return {"width": m_width, "height": m_height} + async def _apply_media_retention_rules(self) -> None: + """ + Purge old local and remote media according to the media retention rules + defined in the homeserver config. + """ + # Purge remote media + if self._media_retention_remote_media_lifetime_ms is not None: + # Calculate a threshold timestamp derived from the configured lifetime. Any + # media that has not been accessed since this timestamp will be removed. + remote_media_threshold_timestamp_ms = ( + self.clock.time_msec() - self._media_retention_remote_media_lifetime_ms + ) + + logger.info( + "Purging remote media last accessed before" + f" {remote_media_threshold_timestamp_ms}" + ) + + await self.delete_old_remote_media( + before_ts=remote_media_threshold_timestamp_ms + ) + + # And now do the same for local media + if self._media_retention_local_media_lifetime_ms is not None: + # This works the same as the remote media threshold + local_media_threshold_timestamp_ms = ( + self.clock.time_msec() - self._media_retention_local_media_lifetime_ms + ) + + logger.info( + "Purging local media last accessed before" + f" {local_media_threshold_timestamp_ms}" + ) + + await self.delete_old_local_media( + before_ts=local_media_threshold_timestamp_ms, + keep_profiles=True, + ) + async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]: old_media = await self.store.get_remote_media_before(before_ts) diff --git a/tests/rest/media/test_media_retention.py b/tests/rest/media/test_media_retention.py new file mode 100644 index 000000000..b98a5cd58 --- /dev/null +++ b/tests/rest/media/test_media_retention.py @@ -0,0 +1,238 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +from typing import Iterable, Optional, Tuple + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.rest import admin +from synapse.rest.client import login, register, room +from synapse.server import HomeServer +from synapse.types import UserID +from synapse.util import Clock + +from tests import unittest +from tests.unittest import override_config +from tests.utils import MockClock + + +class MediaRetentionTestCase(unittest.HomeserverTestCase): + + ONE_DAY_IN_MS = 24 * 60 * 60 * 1000 + THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS + + servlets = [ + room.register_servlets, + login.register_servlets, + register.register_servlets, + admin.register_servlets_for_client_rest_resource, + ] + + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + # We need to be able to test advancing time in the homeserver, so we + # replace the test homeserver's default clock with a MockClock, which + # supports advancing time. + return self.setup_test_homeserver(clock=MockClock()) + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.remote_server_name = "remote.homeserver" + self.store = hs.get_datastores().main + + # Create a user to upload media with + test_user_id = self.register_user("alice", "password") + + # Inject media (3 images each; recently accessed, old access, never accessed) + # into both the local store and the remote cache + media_repository = hs.get_media_repository() + test_media_content = b"example string" + + def _create_media_and_set_last_accessed( + last_accessed_ms: Optional[int], + ) -> str: + # "Upload" some media to the local media store + mxc_uri = self.get_success( + media_repository.create_content( + media_type="text/plain", + upload_name=None, + content=io.BytesIO(test_media_content), + content_length=len(test_media_content), + auth_user=UserID.from_string(test_user_id), + ) + ) + + media_id = mxc_uri.split("/")[-1] + + # Set the last recently accessed time for this media + if last_accessed_ms is not None: + self.get_success( + self.store.update_cached_last_access_time( + local_media=(media_id,), + remote_media=(), + time_ms=last_accessed_ms, + ) + ) + + return media_id + + def _cache_remote_media_and_set_last_accessed( + media_id: str, last_accessed_ms: Optional[int] + ) -> str: + # Pretend to cache some remote media + self.get_success( + self.store.store_cached_remote_media( + origin=self.remote_server_name, + media_id=media_id, + media_type="text/plain", + media_length=1, + time_now_ms=clock.time_msec(), + upload_name="testfile.txt", + filesystem_id="abcdefg12345", + ) + ) + + # Set the last recently accessed time for this media + if last_accessed_ms is not None: + self.get_success( + hs.get_datastores().main.update_cached_last_access_time( + local_media=(), + remote_media=((self.remote_server_name, media_id),), + time_ms=last_accessed_ms, + ) + ) + + return media_id + + # Start with the local media store + self.local_recently_accessed_media = _create_media_and_set_last_accessed( + self.THIRTY_DAYS_IN_MS + ) + self.local_not_recently_accessed_media = _create_media_and_set_last_accessed( + self.ONE_DAY_IN_MS + ) + self.local_never_accessed_media = _create_media_and_set_last_accessed(None) + + # And now the remote media store + self.remote_recently_accessed_media = _cache_remote_media_and_set_last_accessed( + "a", self.THIRTY_DAYS_IN_MS + ) + self.remote_not_recently_accessed_media = ( + _cache_remote_media_and_set_last_accessed("b", self.ONE_DAY_IN_MS) + ) + # Remote media will always have a "last accessed" attribute, as it would not + # be fetched from the remote homeserver unless instigated by a user. + + @override_config( + { + "media_retention": { + # Enable retention for local media + "local_media_lifetime": "30d" + # Cached remote media should not be purged + } + } + ) + def test_local_media_retention(self) -> None: + """ + Tests that local media that have not been accessed recently is purged, while + cached remote media is unaffected. + """ + # Advance 31 days (in seconds) + self.reactor.advance(31 * 24 * 60 * 60) + + # Check that media has been correctly purged. + # Local media accessed <30 days ago should still exist. + # Remote media should be unaffected. + self._assert_if_mxc_uris_purged( + purged=[ + ( + self.hs.config.server.server_name, + self.local_not_recently_accessed_media, + ), + (self.hs.config.server.server_name, self.local_never_accessed_media), + ], + not_purged=[ + (self.hs.config.server.server_name, self.local_recently_accessed_media), + (self.remote_server_name, self.remote_recently_accessed_media), + (self.remote_server_name, self.remote_not_recently_accessed_media), + ], + ) + + @override_config( + { + "media_retention": { + # Enable retention for cached remote media + "remote_media_lifetime": "30d" + # Local media should not be purged + } + } + ) + def test_remote_media_cache_retention(self) -> None: + """ + Tests that entries from the remote media cache that have not been accessed + recently is purged, while local media is unaffected. + """ + # Advance 31 days (in seconds) + self.reactor.advance(31 * 24 * 60 * 60) + + # Check that media has been correctly purged. + # Local media should be unaffected. + # Remote media accessed <30 days ago should still exist. + self._assert_if_mxc_uris_purged( + purged=[ + (self.remote_server_name, self.remote_not_recently_accessed_media), + ], + not_purged=[ + (self.remote_server_name, self.remote_recently_accessed_media), + (self.hs.config.server.server_name, self.local_recently_accessed_media), + ( + self.hs.config.server.server_name, + self.local_not_recently_accessed_media, + ), + (self.hs.config.server.server_name, self.local_never_accessed_media), + ], + ) + + def _assert_if_mxc_uris_purged( + self, purged: Iterable[Tuple[str, str]], not_purged: Iterable[Tuple[str, str]] + ) -> None: + def _assert_mxc_uri_purge_state( + server_name: str, media_id: str, expect_purged: bool + ) -> None: + """Given an MXC URI, assert whether it has been purged or not.""" + if server_name == self.hs.config.server.server_name: + found_media_dict = self.get_success( + self.store.get_local_media(media_id) + ) + else: + found_media_dict = self.get_success( + self.store.get_cached_remote_media(server_name, media_id) + ) + + mxc_uri = f"mxc://{server_name}/{media_id}" + + if expect_purged: + self.assertIsNone( + found_media_dict, msg=f"{mxc_uri} unexpectedly not purged" + ) + else: + self.assertIsNotNone( + found_media_dict, + msg=f"{mxc_uri} unexpectedly purged", + ) + + # Assert that the given MXC URIs have either been correctly purged or not. + for server_name, media_id in purged: + _assert_mxc_uri_purge_state(server_name, media_id, expect_purged=True) + for server_name, media_id in not_purged: + _assert_mxc_uri_purge_state(server_name, media_id, expect_purged=False)