anonymousland-synapse/synapse/rest/media/v1/storage_provider.py

166 lines
5.3 KiB
Python
Raw Normal View History

2018-01-08 17:19:55 +00:00
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
2018-07-09 16:09:20 +10:00
import logging
import os
import shutil
from typing import Optional
2018-01-08 17:19:55 +00:00
from synapse.config._base import Config
from synapse.logging.context import defer_to_thread, run_in_background
2018-01-08 17:19:55 +00:00
from ._base import FileInfo, Responder
2018-07-09 16:09:20 +10:00
from .media_storage import FileResponder
2018-01-08 17:19:55 +00:00
logger = logging.getLogger(__name__)
class StorageProvider:
2018-01-08 17:19:55 +00:00
"""A storage provider is a service that can store uploaded media and
retrieve them.
"""
2019-06-20 19:32:02 +10:00
async def store_file(self, path: str, file_info: FileInfo):
2018-01-08 17:19:55 +00:00
"""Store the file described by file_info. The actual contents can be
retrieved by reading the file in file_info.upload_path.
Args:
path: Relative path of file in local cache
file_info: The metadata of the file.
2018-01-08 17:19:55 +00:00
"""
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
2018-01-08 17:19:55 +00:00
"""Attempt to fetch the file described by file_info and stream it
into writer.
Args:
path: Relative path of file in local cache
file_info: The metadata of the file.
2018-01-08 17:19:55 +00:00
Returns:
Returns a Responder if the provider has the file, otherwise returns None.
2018-01-08 17:19:55 +00:00
"""
class StorageProviderWrapper(StorageProvider):
"""Wraps a storage provider and provides various config options
Args:
backend: The storage provider to wrap.
store_local: Whether to store new local files or not.
store_synchronous: Whether to wait for file to be successfully
2019-07-12 15:29:32 +01:00
uploaded, or todo the upload in the background.
store_remote: Whether remote media should be uploaded
2018-01-08 17:19:55 +00:00
"""
2019-06-20 19:32:02 +10:00
def __init__(
self,
backend: StorageProvider,
store_local: bool,
store_synchronous: bool,
store_remote: bool,
):
2018-01-08 17:19:55 +00:00
self.backend = backend
self.store_local = store_local
2018-01-08 17:19:55 +00:00
self.store_synchronous = store_synchronous
self.store_remote = store_remote
def __str__(self):
return "StorageProviderWrapper[%s]" % (self.backend,)
async def store_file(self, path, file_info):
if not file_info.server_name and not self.store_local:
return None
2018-01-08 17:19:55 +00:00
if file_info.server_name and not self.store_remote:
return None
2018-01-08 17:19:55 +00:00
if self.store_synchronous:
# store_file is supposed to return an Awaitable, but guard
# against improper implementations.
result = self.backend.store_file(path, file_info)
if inspect.isawaitable(result):
return await result
2018-01-08 17:19:55 +00:00
else:
# TODO: Handle errors.
async def store():
try:
result = self.backend.store_file(path, file_info)
if inspect.isawaitable(result):
return await result
except Exception:
logger.exception("Error storing file")
2019-06-20 19:32:02 +10:00
run_in_background(store)
return None
2018-01-08 17:19:55 +00:00
async def fetch(self, path, file_info):
# store_file is supposed to return an Awaitable, but guard
# against improper implementations.
result = self.backend.fetch(path, file_info)
if inspect.isawaitable(result):
return await result
2018-01-08 17:19:55 +00:00
class FileStorageProviderBackend(StorageProvider):
"""A storage provider that stores files in a directory on a filesystem.
Args:
hs (HomeServer)
2018-01-18 17:11:20 +00:00
config: The config returned by `parse_config`.
2018-01-08 17:19:55 +00:00
"""
def __init__(self, hs, config):
self.hs = hs
self.cache_directory = hs.config.media_store_path
self.base_directory = config
2018-01-08 17:19:55 +00:00
def __str__(self):
return "FileStorageProviderBackend[%s]" % (self.base_directory,)
async def store_file(self, path, file_info):
2018-01-08 17:19:55 +00:00
"""See StorageProvider.store_file"""
primary_fname = os.path.join(self.cache_directory, path)
backup_fname = os.path.join(self.base_directory, path)
dirname = os.path.dirname(backup_fname)
if not os.path.exists(dirname):
os.makedirs(dirname)
return await defer_to_thread(
2019-06-20 19:32:02 +10:00
self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname
2018-01-08 17:19:55 +00:00
)
async def fetch(self, path, file_info):
2018-01-08 17:19:55 +00:00
"""See StorageProvider.fetch"""
backup_fname = os.path.join(self.base_directory, path)
if os.path.isfile(backup_fname):
return FileResponder(open(backup_fname, "rb"))
2018-01-18 17:11:45 +00:00
@staticmethod
def parse_config(config):
"""Called on startup to parse config supplied. This should parse
the config and raise if there is a problem.
The returned value is passed into the constructor.
2018-01-18 17:11:20 +00:00
In this case we only care about a single param, the directory, so let's
just pull that out.
"""
return Config.ensure_directory(config["directory"])