diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 9aba9f13f..7938fe7bc 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -27,6 +27,9 @@ from .identicon_resource import IdenticonResource from .preview_url_resource import PreviewUrlResource from .filepath import MediaFilePaths from .thumbnailer import Thumbnailer +from .storage_provider import ( + StorageProviderWrapper, FileStorageProviderBackend, +) from .media_storage import MediaStorage from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -66,10 +69,6 @@ class MediaRepository(object): self.primary_base_path = hs.config.media_store_path self.filepaths = MediaFilePaths(self.primary_base_path) - self.backup_base_path = hs.config.backup_media_store_path - - self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store - self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.thumbnail_requirements = hs.config.thumbnail_requirements @@ -77,7 +76,27 @@ class MediaRepository(object): self.recently_accessed_remotes = set() - self.media_storage = MediaStorage(self.primary_base_path, self.filepaths) + # List of StorageProvider's where we should search for media and + # potentially upload to. + self.storage_providers = [] + + # TODO: Move this into config and allow other storage providers to be + # defined. + if hs.config.backup_media_store_path: + backend = FileStorageProviderBackend( + self.primary_base_path, hs.config.backup_media_store_path, + ) + provider = StorageProviderWrapper( + backend, + store=True, + store_synchronous=hs.config.synchronous_backup_media_store, + store_remote=True, + ) + self.storage_providers.append(provider) + + self.media_storage = MediaStorage( + self.primary_base_path, self.filepaths, self.storage_providers, + ) self.clock.looping_call( self._update_recently_accessed_remotes, diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 052745e6f..a1ec6cadb 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -32,9 +32,10 @@ class MediaStorage(object): """Responsible for storing/fetching files from local sources. """ - def __init__(self, local_media_directory, filepaths): + def __init__(self, local_media_directory, filepaths, storage_providers): self.local_media_directory = local_media_directory self.filepaths = filepaths + self.storage_providers = storage_providers @defer.inlineCallbacks def store_file(self, source, file_info): @@ -90,11 +91,12 @@ class MediaStorage(object): finished_called = [False] + @defer.inlineCallbacks def finish(): - # This will be used later when we want to hit out to other storage - # places + for provider in self.storage_providers: + yield provider.store_file(path, file_info) + finished_called[0] = True - return defer.succeed(None) try: with open(fname, "wb") as f: @@ -127,6 +129,11 @@ class MediaStorage(object): if os.path.exists(local_path): defer.returnValue(FileResponder(open(local_path, "rb"))) + for provider in self.storage_providers: + res = yield provider.fetch(path, file_info) + if res: + defer.returnValue(res) + defer.returnValue(None) def _file_info_to_path(self, file_info): diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py new file mode 100644 index 000000000..2ad602e10 --- /dev/null +++ b/synapse/rest/media/v1/storage_provider.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, threads + +from .media_storage import FileResponder + +from synapse.util.logcontext import preserve_fn + +import logging +import os +import shutil + + +logger = logging.getLogger(__name__) + + +class StorageProvider(object): + """A storage provider is a service that can store uploaded media and + retrieve them. + """ + def store_file(self, path, file_info): + """Store the file described by file_info. The actual contents can be + retrieved by reading the file in file_info.upload_path. + + Args: + path (str): Relative path of file in local cache + file_info (FileInfo) + + Returns: + Deferred + """ + pass + + def fetch(self, path, file_info): + """Attempt to fetch the file described by file_info and stream it + into writer. + + Args: + path (str): Relative path of file in local cache + file_info (FileInfo) + + Returns: + Deferred(Responder): Returns a Responder if the provider has the file, + otherwise returns None. + """ + pass + + +class StorageProviderWrapper(StorageProvider): + """Wraps a storage provider and provides various config options + + Args: + backend (StorageProvider) + store (bool): Whether to store new files or not. + store_synchronous (bool): Whether to wait for file to be successfully + uploaded, or todo the upload in the backgroud. + store_remote (bool): Whether remote media should be uploaded + """ + def __init__(self, backend, store, store_synchronous, store_remote): + self.backend = backend + self.store = store + self.store_synchronous = store_synchronous + self.store_remote = store_remote + + def store_file(self, path, file_info): + if not self.store: + return defer.succeed(None) + + if file_info.server_name and not self.store_remote: + return defer.succeed(None) + + if self.store_synchronous: + return self.backend.store_file(path, file_info) + else: + # TODO: Handle errors. + preserve_fn(self.backend.store_file)(path, file_info) + return defer.succeed(None) + + def fetch(self, path, file_info): + return self.backend.fetch(path, file_info) + + +class FileStorageProviderBackend(StorageProvider): + """A storage provider that stores files in a directory on a filesystem. + + Args: + cache_directory (str): Base path of the local media repository + base_directory (str): Base path to store new files + """ + + def __init__(self, cache_directory, base_directory): + self.cache_directory = cache_directory + self.base_directory = base_directory + + def store_file(self, path, file_info): + """See StorageProvider.store_file""" + + primary_fname = os.path.join(self.cache_directory, path) + backup_fname = os.path.join(self.base_directory, path) + + dirname = os.path.dirname(backup_fname) + if not os.path.exists(dirname): + os.makedirs(dirname) + + return threads.deferToThread( + shutil.copyfile, primary_fname, backup_fname, + ) + + def fetch(self, path, file_info): + """See StorageProvider.fetch""" + + backup_fname = os.path.join(self.base_directory, path) + if os.path.isfile(backup_fname): + return FileResponder(open(backup_fname, "rb"))