forked-synapse/synapse/federation/sender/transaction_manager.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

197 lines
6.8 KiB
Python
Raw Normal View History

2019-03-13 16:02:56 -04:00
#
2023-11-21 15:29:58 -05:00
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
2019-03-13 16:02:56 -04:00
#
#
import logging
from typing import TYPE_CHECKING, List
2019-03-13 16:02:56 -04:00
from prometheus_client import Gauge
from synapse.api.constants import EduTypes
2019-03-13 16:02:56 -04:00
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
2019-03-13 16:02:56 -04:00
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
start_active_span_follows_from,
tags,
whitelisted_homeserver,
)
from synapse.types import JsonDict
from synapse.util import json_decoder
2019-03-13 16:02:56 -04:00
from synapse.util.metrics import measure_func
if TYPE_CHECKING:
import synapse.server
2019-03-13 16:02:56 -04:00
logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")
2019-03-13 16:02:56 -04:00
last_pdu_ts_metric = Gauge(
"synapse_federation_last_sent_pdu_time",
"The timestamp of the last PDU which was successfully sent to the given domain",
labelnames=("server_name",),
)
2019-03-13 16:02:56 -04:00
2020-09-04 06:54:56 -04:00
class TransactionManager:
2019-03-13 16:02:56 -04:00
"""Helper class which handles building and sending transactions
shared between PerDestinationQueue objects
"""
2019-06-20 05:32:02 -04:00
def __init__(self, hs: "synapse.server.HomeServer"):
2019-03-13 16:02:56 -04:00
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._store = hs.get_datastores().main
2019-03-13 16:02:56 -04:00
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
self._federation_metrics_domains = (
hs.config.federation.federation_metrics_domains
)
2019-03-13 16:02:56 -04:00
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
@measure_func("_send_new_transaction")
async def send_new_transaction(
self,
destination: str,
pdus: List[EventBase],
edus: List[Edu],
) -> None:
"""
Args:
destination: The destination to send to (e.g. 'example.org')
pdus: In-order list of PDUs to send
edus: List of EDUs to send
"""
2019-03-13 16:02:56 -04:00
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
2019-03-13 16:02:56 -04:00
span_contexts = []
keep_destination = whitelisted_homeserver(destination)
for edu in edus:
context = edu.get_context()
if context:
span_contexts.append(extract_text_map(json_decoder.decode(context)))
if keep_destination:
edu.strip_context()
2019-03-13 16:02:56 -04:00
with start_active_span_follows_from("send_transaction", span_contexts):
logger.debug("TX [%s] _attempt_new_transaction", destination)
2019-03-13 16:02:56 -04:00
txn_id = str(self._next_txn_id)
2019-03-13 16:02:56 -04:00
logger.debug(
"TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
)
2019-03-13 16:02:56 -04:00
transaction = Transaction(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=[p.get_pdu_json() for p in pdus],
edus=[edu.get_dict() for edu in edus],
2019-03-13 16:02:56 -04:00
)
self._next_txn_id += 1
2019-03-13 16:02:56 -04:00
logger.info(
"TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = {
EduTypes.DEVICE_LIST_UPDATE,
EduTypes.SIGNING_KEY_UPDATE,
}
device_list_updates = [
edu.content for edu in edus if edu.edu_type in DEVICE_UPDATE_EDUS
]
if device_list_updates:
issue_8631_logger.debug(
"about to send txn [%s] including device list updates: %s",
transaction.transaction_id,
device_list_updates,
)
2019-03-13 16:02:56 -04:00
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
# FIXME (richardv): I also believe it no longer works. We (now?) store
# "age_ts" in "unsigned" rather than at the top level. See
# https://github.com/matrix-org/synapse/issues/8429.
def json_data_cb() -> JsonDict:
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = await self._transport_layer.send_transaction(
transaction, json_data_cb
)
except HttpResponseException as e:
code = e.code
set_tag(tags.ERROR, True)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise
logger.info("TX [%s] {%s} got 200 response", destination, txn_id)
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warning(
"TX [%s] {%s} Remote returned error for %s: %s",
2019-03-13 16:02:56 -04:00
destination,
txn_id,
e_id,
r,
2019-03-13 16:02:56 -04:00
)
if pdus and destination in self._federation_metrics_domains:
last_pdu = pdus[-1]
last_pdu_ts_metric.labels(server_name=destination).set(
last_pdu.origin_server_ts / 1000
)