mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
258 lines
7.3 KiB
Python
258 lines
7.3 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Copyright 2014, 2015 OpenMarket Ltd
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
from twisted.internet import defer
|
||
|
|
||
|
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
|
||
|
from synapse.util.logutils import log_function
|
||
|
|
||
|
import logging
|
||
|
import json
|
||
|
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class TransportLayerClient(object):
|
||
|
"""Sends federation HTTP requests to other servers"""
|
||
|
|
||
|
@log_function
|
||
|
def get_context_state(self, destination, context, event_id=None):
|
||
|
""" Requests all state for a given context (i.e. room) from the
|
||
|
given server.
|
||
|
|
||
|
Args:
|
||
|
destination (str): The host name of the remote home server we want
|
||
|
to get the state from.
|
||
|
context (str): The name of the context we want the state of
|
||
|
|
||
|
Returns:
|
||
|
Deferred: Results in a dict received from the remote homeserver.
|
||
|
"""
|
||
|
logger.debug("get_context_state dest=%s, context=%s",
|
||
|
destination, context)
|
||
|
|
||
|
subpath = "/state/%s/" % context
|
||
|
|
||
|
args = {}
|
||
|
if event_id:
|
||
|
args["event_id"] = event_id
|
||
|
|
||
|
return self._do_request_for_transaction(
|
||
|
destination, subpath, args=args
|
||
|
)
|
||
|
|
||
|
@log_function
|
||
|
def get_event(self, destination, event_id):
|
||
|
""" Requests the pdu with give id and origin from the given server.
|
||
|
|
||
|
Args:
|
||
|
destination (str): The host name of the remote home server we want
|
||
|
to get the state from.
|
||
|
event_id (str): The id of the event being requested.
|
||
|
|
||
|
Returns:
|
||
|
Deferred: Results in a dict received from the remote homeserver.
|
||
|
"""
|
||
|
logger.debug("get_pdu dest=%s, event_id=%s",
|
||
|
destination, event_id)
|
||
|
|
||
|
subpath = "/event/%s/" % (event_id, )
|
||
|
|
||
|
return self._do_request_for_transaction(destination, subpath)
|
||
|
|
||
|
@log_function
|
||
|
def backfill(self, dest, context, event_tuples, limit):
|
||
|
""" Requests `limit` previous PDUs in a given context before list of
|
||
|
PDUs.
|
||
|
|
||
|
Args:
|
||
|
dest (str)
|
||
|
context (str)
|
||
|
event_tuples (list)
|
||
|
limt (int)
|
||
|
|
||
|
Returns:
|
||
|
Deferred: Results in a dict received from the remote homeserver.
|
||
|
"""
|
||
|
logger.debug(
|
||
|
"backfill dest=%s, context=%s, event_tuples=%s, limit=%s",
|
||
|
dest, context, repr(event_tuples), str(limit)
|
||
|
)
|
||
|
|
||
|
if not event_tuples:
|
||
|
# TODO: raise?
|
||
|
return
|
||
|
|
||
|
subpath = "/backfill/%s/" % (context,)
|
||
|
|
||
|
args = {
|
||
|
"v": event_tuples,
|
||
|
"limit": [str(limit)],
|
||
|
}
|
||
|
|
||
|
return self._do_request_for_transaction(
|
||
|
dest,
|
||
|
subpath,
|
||
|
args=args,
|
||
|
)
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
@log_function
|
||
|
def send_transaction(self, transaction, json_data_callback=None):
|
||
|
""" Sends the given Transaction to its destination
|
||
|
|
||
|
Args:
|
||
|
transaction (Transaction)
|
||
|
|
||
|
Returns:
|
||
|
Deferred: Results of the deferred is a tuple in the form of
|
||
|
(response_code, response_body) where the response_body is a
|
||
|
python dict decoded from json
|
||
|
"""
|
||
|
logger.debug(
|
||
|
"send_data dest=%s, txid=%s",
|
||
|
transaction.destination, transaction.transaction_id
|
||
|
)
|
||
|
|
||
|
if transaction.destination == self.server_name:
|
||
|
raise RuntimeError("Transport layer cannot send to itself!")
|
||
|
|
||
|
# FIXME: This is only used by the tests. The actual json sent is
|
||
|
# generated by the json_data_callback.
|
||
|
json_data = transaction.get_dict()
|
||
|
|
||
|
code, response = yield self.client.put_json(
|
||
|
transaction.destination,
|
||
|
path=PREFIX + "/send/%s/" % transaction.transaction_id,
|
||
|
data=json_data,
|
||
|
json_data_callback=json_data_callback,
|
||
|
)
|
||
|
|
||
|
logger.debug(
|
||
|
"send_data dest=%s, txid=%s, got response: %d",
|
||
|
transaction.destination, transaction.transaction_id, code
|
||
|
)
|
||
|
|
||
|
defer.returnValue((code, response))
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
@log_function
|
||
|
def make_query(self, destination, query_type, args, retry_on_dns_fail):
|
||
|
path = PREFIX + "/query/%s" % query_type
|
||
|
|
||
|
response = yield self.client.get_json(
|
||
|
destination=destination,
|
||
|
path=path,
|
||
|
args=args,
|
||
|
retry_on_dns_fail=retry_on_dns_fail,
|
||
|
)
|
||
|
|
||
|
defer.returnValue(response)
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
@log_function
|
||
|
def make_join(self, destination, context, user_id, retry_on_dns_fail=True):
|
||
|
path = PREFIX + "/make_join/%s/%s" % (context, user_id,)
|
||
|
|
||
|
response = yield self.client.get_json(
|
||
|
destination=destination,
|
||
|
path=path,
|
||
|
retry_on_dns_fail=retry_on_dns_fail,
|
||
|
)
|
||
|
|
||
|
defer.returnValue(response)
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
@log_function
|
||
|
def send_join(self, destination, context, event_id, content):
|
||
|
path = PREFIX + "/send_join/%s/%s" % (
|
||
|
context,
|
||
|
event_id,
|
||
|
)
|
||
|
|
||
|
code, content = yield self.client.put_json(
|
||
|
destination=destination,
|
||
|
path=path,
|
||
|
data=content,
|
||
|
)
|
||
|
|
||
|
if not 200 <= code < 300:
|
||
|
raise RuntimeError("Got %d from send_join", code)
|
||
|
|
||
|
defer.returnValue(json.loads(content))
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
@log_function
|
||
|
def send_invite(self, destination, context, event_id, content):
|
||
|
path = PREFIX + "/invite/%s/%s" % (
|
||
|
context,
|
||
|
event_id,
|
||
|
)
|
||
|
|
||
|
code, content = yield self.client.put_json(
|
||
|
destination=destination,
|
||
|
path=path,
|
||
|
data=content,
|
||
|
)
|
||
|
|
||
|
if not 200 <= code < 300:
|
||
|
raise RuntimeError("Got %d from send_invite", code)
|
||
|
|
||
|
defer.returnValue(json.loads(content))
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
@log_function
|
||
|
def get_event_auth(self, destination, context, event_id):
|
||
|
path = PREFIX + "/event_auth/%s/%s" % (
|
||
|
context,
|
||
|
event_id,
|
||
|
)
|
||
|
|
||
|
response = yield self.client.get_json(
|
||
|
destination=destination,
|
||
|
path=path,
|
||
|
)
|
||
|
|
||
|
defer.returnValue(response)
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
@log_function
|
||
|
def _do_request_for_transaction(self, destination, subpath, args={}):
|
||
|
"""
|
||
|
Args:
|
||
|
destination (str)
|
||
|
path (str)
|
||
|
args (dict): This is parsed directly to the HttpClient.
|
||
|
|
||
|
Returns:
|
||
|
Deferred: Results in a dict.
|
||
|
"""
|
||
|
|
||
|
data = yield self.client.get_json(
|
||
|
destination,
|
||
|
path=PREFIX + subpath,
|
||
|
args=args,
|
||
|
)
|
||
|
|
||
|
# Add certain keys to the JSON, ready for decoding as a Transaction
|
||
|
data.update(
|
||
|
origin=destination,
|
||
|
destination=self.server_name,
|
||
|
transaction_id=None
|
||
|
)
|
||
|
|
||
|
defer.returnValue(data)
|