diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 510f07dd7..c7082b83a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -14,10 +14,11 @@ # limitations under the License. -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, _AgentBase, _URI from twisted.web.http_headers import Headers +from twisted.web._newclient import ResponseDone from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.async import sleep @@ -227,7 +228,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True): - """ Get's some json from the given host homeserver and path + """ GETs some json from the given host homeserver and path Args: destination (str): The remote server to send the HTTP request @@ -235,9 +236,6 @@ class MatrixFederationHttpClient(object): path (str): The HTTP path. args (dict): A dictionary used to create query strings, defaults to None. - **Note**: The value of each key is assumed to be an iterable - and *not* a string. - Returns: Deferred: Succeeds when we get *any* HTTP response. @@ -272,6 +270,48 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) + @defer.inlineCallbacks + def get_file(self, destination, path, output_stream, args={}, + retry_on_dns_fail=True): + """GETs a file from a given homeserver + Args: + destination (str): The remote server to send the HTTP request to. + path (str): The HTTP path to GET. + output_stream (file): File to write the response body to. + args (dict): Optional dictionary used to create the query string. + Returns: + A (int,dict) tuple of the file length and a dict of the response + headers. + """ + + encoded_args = {} + for k, vs in args.items(): + if isinstance(vs, basestring): + vs = [vs] + encoded_args[k] = [v.encode("UTF-8") for v in vs] + + query_bytes = urllib.urlencode(encoded_args, True) + logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) + + def body_callback(method, url_bytes, headers_dict): + self.sign_request(destination, method, url_bytes, headers_dict) + return None + + response = yield self._create_request( + destination.encode("ascii"), + "GET", + path.encode("ascii"), + query_bytes=query_bytes, + body_callback=body_callback, + retry_on_dns_fail=retry_on_dns_fail + ) + + headers = dict(response.headers.getAllRawHeaders()) + + length = yield _readBodyToFile(response, output_stream) + + defer.returnValue((length, headers)) + def _getEndpoint(self, reactor, destination): return matrix_federation_endpoint( reactor, destination, timeout=10, @@ -279,6 +319,29 @@ class MatrixFederationHttpClient(object): ) +class _ReadBodyToFileProtocol(protocol.Protocol): + def __init__(self, stream, deferred): + self.stream = stream + self.deferred = deferred + self.length = 0 + + def dataReceived(self, data): + self.stream.write(data) + self.length += len(data) + + def connectionLost(self, reason): + if reason.check(ResponseDone): + self.deferred.callback(self.length) + else: + self.deferred.errback(reason) + + +def _readBodyToFile(response, stream): + d = defer.Deferred() + response.deliverBody(_ReadBodyToFileProtocol(stream, d)) + return d + + def _print_ex(e): if hasattr(e, "reasons") and e.reasons: for ex in e.reasons: diff --git a/synapse/media/v1/download_resource.py b/synapse/media/v1/download_resource.py new file mode 100644 index 000000000..c243f16a7 --- /dev/null +++ b/synapse/media/v1/download_resource.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json +from synapse.util.stringutils import random_string +from synapse.api.errors import ( + cs_exception, CodeMessageException, cs_error, Codes +) + +from twisted.protocols.basic import FileSender +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + +import os + +import logging + +logger = logging.getLogger(__name__) + + +class DownloadResource(Resource): + isLeaf = True + + def __init__(self, hs, filepaths): + Resource.__init__(self) + self.client = hs.get_http_client() + self.clock = hs.get_clock() + self.server_name = hs.hostname + self.store = hs.get_datastore() + self.filepaths = filepaths + + def render_GET(self, request): + self._async_render_GET(request) + return NOT_DONE_YET + + def _respond_404(self, request): + respond_with_json( + request, 404, + cs_error( + "Not found %r" % (request.postpath,), + code=Codes.NOT_FOUND, + ), + send_cors=True + ) + + @defer.inlineCallbacks + def _async_render_GET(self, request): + + try: + server_name, media_id = request.postpath + except: + self._respond_404(request) + return + + try: + if server_name == self.server_name: + yield self._respond_local_file(request, media_id) + else: + yield self._respond_remote_file(request, server_name, media_id) + except CodeMessageException as e: + logger.exception(e) + respond_with_json(request, e.code, cs_exception(e), send_cors=True) + except: + logger.exception("Failed to serve file") + respond_with_json( + request, + 500, + {"error": "Internal server error"}, + send_cors=True + ) + + @defer.inlineCallbacks + def _download_remote_file(self, server_name, media_id): + filesystem_id = random_string(24) + + fname = self.filepaths.remote_media_filepath( + server_name, filesystem_id + ) + os.makedirs(os.path.dirname(fname)) + + try: + with open(fname, "wb") as f: + length, headers = yield self.client.get_file( + server_name, + "/".join(( + "/_matrix/media/v1/download", server_name, media_id, + )), + output_stream=f, + ) + except: + os.remove(fname) + raise + + media_type = headers["Content-Type"][0] + time_now_ms = self.clock.time_msec() + + yield self.store.store_cached_remote_media( + origin=server_name, + media_id=media_id, + media_type=media_type, + time_now_ms=self.clock.time_msec(), + upload_name=None, + media_length=length, + filesystem_id=filesystem_id, + ) + + defer.returnValue({ + "media_type": media_type, + "media_length": length, + "upload_name": None, + "created_ts": time_now_ms, + "filesystem_id": filesystem_id, + }) + + @defer.inlineCallbacks + def _respond_remote_file(self, request, server_name, media_id): + media_info = yield self.store.get_cached_remote_media( + server_name, media_id + ) + + if not media_info: + media_info = yield self._download_remote_file( + server_name, media_id + ) + + filesystem_id = media_info["filesystem_id"] + + file_path = self.filepaths.remote_media_filepath( + server_name, filesystem_id + ) + + if os.path.isfile(file_path): + media_type = media_info["media_type"] + request.setHeader(b"Content-Type", media_type.encode("UTF-8")) + + # cache for at least a day. + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our + # clients are smart enough to be happy with Cache-Control + request.setHeader( + b"Cache-Control", b"public,max-age=86400,s-maxage=86400" + ) + + with open(file_path, "rb") as f: + yield FileSender().beginFileTransfer(f, request) + + request.finish() + else: + self._respond_404() + + @defer.inlineCallbacks + def _respond_local_file(self, request, media_id): + media_info = yield self.store.get_local_media(media_id) + if not media_info: + self._respond_404() + return + + file_path = self.filepaths.local_media_filepath(media_id) + + logger.debug("Searching for %s", file_path) + + if os.path.isfile(file_path): + media_type = media_info["media_type"] + request.setHeader(b"Content-Type", media_type.encode("UTF-8")) + + # cache for at least a day. + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our + # clients are smart enough to be happy with Cache-Control + request.setHeader( + b"Cache-Control", b"public,max-age=86400,s-maxage=86400" + ) + + with open(file_path, "rb") as f: + yield FileSender().beginFileTransfer(f, request) + + request.finish() + else: + self._respond_404() diff --git a/synapse/media/v1/media_repository.py b/synapse/media/v1/media_repository.py index afd92874c..e0a4cd01e 100644 --- a/synapse/media/v1/media_repository.py +++ b/synapse/media/v1/media_repository.py @@ -14,6 +14,7 @@ # limitations under the License. from .upload_resource import UploadResource +from .download_resource import DownloadResource from .filepath import MediaFilePaths from twisted.web.resource import Resource @@ -62,3 +63,4 @@ class MediaRepositoryResource(Resource): Resource.__init__(self) filepaths = MediaFilePaths(hs.config.media_store_path) self.putChild("upload", UploadResource(hs, filepaths)) + self.putChild("download", DownloadResource(hs, filepaths)) diff --git a/synapse/media/v1/upload_resource.py b/synapse/media/v1/upload_resource.py index 2919fee12..91bcc5caf 100644 --- a/synapse/media/v1/upload_resource.py +++ b/synapse/media/v1/upload_resource.py @@ -20,7 +20,8 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException ) -from twisted.web import server, resource +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET from twisted.internet import defer import os @@ -30,9 +31,11 @@ import logging logger = logging.getLogger(__name__) -class UploadResource(resource.Resource): +class UploadResource(Resource): + isLeaf = True def __init__(self, hs, filepaths): + Resource.__init__(self) self.auth = hs.get_auth() self.clock = hs.get_clock() self.store = hs.get_datastore() @@ -41,11 +44,11 @@ class UploadResource(resource.Resource): def render_POST(self, request): self._async_render_POST(request) - return server.NOT_DONE_YET + return NOT_DONE_YET def render_OPTIONS(self, request): respond_with_json(request, 200, {}, send_cors=True) - return server.NOT_DONE_YET + return NOT_DONE_YET @defer.inlineCallbacks def _async_render_POST(self, request): diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index eda191ad5..2d3a2d1cc 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -22,13 +22,13 @@ class MediaRepositoryStore(SQLBaseStore): def get_local_media(self, media_id): """Get the metadata for a local piece of media Returns: - None if the media_id doesn't exist. + None if the meia_id doesn't exist. """ return self._simple_select_one( "local_media_repository", {"media_id": media_id}, ("media_type", "media_length", "upload_name", "created_ts"), - True, + allow_none=True, ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, @@ -73,7 +73,11 @@ class MediaRepositoryStore(SQLBaseStore): return self._simple_select_one( "remote_media_cache", {"media_origin": origin, "media_id": media_id}, - ("media_type", "media_length", "upload_name", "created_ts"), + ( + "media_type", "media_length", "upload_name", "created_ts", + "filesystem_id", + ), + allow_none=True, ) def store_cached_remote_media(self, origin, media_id, media_type,