Use a custom scheme & the worker name for replication requests. (#15578)

All the information needed is already in the `instance_map`, so
use that instead of passing the hostname / IP & port manually
for each replication request.

This consolidates logic for future improvements of using e.g.
UNIX sockets for workers.
This commit is contained in:
Jason Little 2023-05-23 08:05:30 -05:00 committed by GitHub
parent 5b18a217ca
commit 1df0221bda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 19 deletions

1
changelog.d/15578.misc Normal file
View File

@ -0,0 +1 @@
Allow connecting to HTTP Replication Endpoints by using `worker_name` when constructing the request.

View File

@ -835,6 +835,7 @@ class ReplicationClient(BaseHttpClient):
self.agent: IAgent = ReplicationAgent( self.agent: IAgent = ReplicationAgent(
hs.get_reactor(), hs.get_reactor(),
hs.config.worker.instance_map,
contextFactory=hs.get_http_client_context_factory(), contextFactory=hs.get_http_client_context_factory(),
pool=pool, pool=pool,
) )

View File

@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Optional from typing import Dict, Optional
from zope.interface import implementer from zope.interface import implementer
@ -32,6 +32,7 @@ from twisted.web.iweb import (
IResponse, IResponse,
) )
from synapse.config.workers import InstanceLocationConfig
from synapse.types import ISynapseReactor from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -44,9 +45,11 @@ class ReplicationEndpointFactory:
def __init__( def __init__(
self, self,
reactor: ISynapseReactor, reactor: ISynapseReactor,
instance_map: Dict[str, InstanceLocationConfig],
context_factory: IPolicyForHTTPS, context_factory: IPolicyForHTTPS,
) -> None: ) -> None:
self.reactor = reactor self.reactor = reactor
self.instance_map = instance_map
self.context_factory = context_factory self.context_factory = context_factory
def endpointForURI(self, uri: URI) -> IStreamClientEndpoint: def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
@ -58,15 +61,29 @@ class ReplicationEndpointFactory:
Returns: The correct client endpoint object Returns: The correct client endpoint object
""" """
if uri.scheme in (b"http", b"https"): # The given URI has a special scheme and includes the worker name. The
endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port) # actual connection details are pulled from the instance map.
if uri.scheme == b"https": worker_name = uri.netloc.decode("utf-8")
scheme = self.instance_map[worker_name].scheme()
if scheme in ("http", "https"):
endpoint = HostnameEndpoint(
self.reactor,
self.instance_map[worker_name].host,
self.instance_map[worker_name].port,
)
if scheme == "https":
endpoint = wrapClientTLS( endpoint = wrapClientTLS(
self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint # The 'port' argument below isn't actually used by the function
self.context_factory.creatorForNetloc(
self.instance_map[worker_name].host,
self.instance_map[worker_name].port,
),
endpoint,
) )
return endpoint return endpoint
else: else:
raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}") raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
@implementer(IAgent) @implementer(IAgent)
@ -80,6 +97,7 @@ class ReplicationAgent(_AgentBase):
def __init__( def __init__(
self, self,
reactor: ISynapseReactor, reactor: ISynapseReactor,
instance_map: Dict[str, InstanceLocationConfig],
contextFactory: IPolicyForHTTPS, contextFactory: IPolicyForHTTPS,
connectTimeout: Optional[float] = None, connectTimeout: Optional[float] = None,
bindAddress: Optional[bytes] = None, bindAddress: Optional[bytes] = None,
@ -102,7 +120,9 @@ class ReplicationAgent(_AgentBase):
created. created.
""" """
_AgentBase.__init__(self, reactor, pool) _AgentBase.__init__(self, reactor, pool)
endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory) endpoint_factory = ReplicationEndpointFactory(
reactor, instance_map, contextFactory
)
self._endpointFactory = endpoint_factory self._endpointFactory = endpoint_factory
def request( def request(

View File

@ -219,11 +219,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
with outgoing_gauge.track_inprogress(): with outgoing_gauge.track_inprogress():
if instance_name == local_instance_name: if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self") raise Exception("Trying to send HTTP request to self")
if instance_name in instance_map: if instance_name not in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
tls = instance_map[instance_name].tls
else:
raise Exception( raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,) "Instance %r not in 'instance_map' config" % (instance_name,)
) )
@ -271,13 +267,11 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
"Unknown METHOD on %s replication endpoint" % (cls.NAME,) "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
) )
# Here the protocol is hard coded to be http by default or https in case the replication # Hard code a special scheme to show this only used for replication. The
# port is set to have tls true. # instance_name will be passed into the ReplicationEndpointFactory to
scheme = "https" if tls else "http" # determine connection details from the instance_map.
uri = "%s://%s:%s/_synapse/replication/%s/%s" % ( uri = "synapse-replication://%s/_synapse/replication/%s/%s" % (
scheme, instance_name,
host,
port,
cls.NAME, cls.NAME,
"/".join(url_args), "/".join(url_args),
) )