Fix sending server up commands from workers (#6811)

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
Erik Johnston 2020-01-30 16:42:11 +00:00 committed by GitHub
parent a5bab2d058
commit c3d4ad8afd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 13 deletions

1
changelog.d/6811.bugfix Normal file
View File

@ -0,0 +1 @@
Fix waking up other workers when remote server is detected to have come back online.

View File

@ -15,6 +15,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Any, Dict
from six.moves import urllib from six.moves import urllib
@ -352,7 +353,9 @@ class TransportLayerClient(object):
else: else:
path = _create_v1_path("/publicRooms") path = _create_v1_path("/publicRooms")
args = {"include_all_networks": "true" if include_all_networks else "false"} args = {
"include_all_networks": "true" if include_all_networks else "false"
} # type: Dict[str, Any]
if third_party_instance_id: if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,) args["third_party_instance_id"] = (third_party_instance_id,)
if limit: if limit:

View File

@ -18,6 +18,7 @@
import functools import functools
import logging import logging
import re import re
from typing import Optional, Tuple, Type
from twisted.internet.defer import maybeDeferred from twisted.internet.defer import maybeDeferred
@ -267,6 +268,8 @@ class BaseFederationServlet(object):
returned. returned.
""" """
PATH = "" # Overridden in subclasses, the regex to match against the path.
REQUIRE_AUTH = True REQUIRE_AUTH = True
PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version
@ -347,9 +350,6 @@ class BaseFederationServlet(object):
return response return response
# Extra logic that functools.wraps() doesn't finish
new_func.__self__ = func.__self__
return new_func return new_func
def register(self, server): def register(self, server):
@ -824,7 +824,7 @@ class PublicRoomList(BaseFederationServlet):
if not self.allow_access: if not self.allow_access:
raise FederationDeniedError(origin) raise FederationDeniedError(origin)
limit = int(content.get("limit", 100)) limit = int(content.get("limit", 100)) # type: Optional[int]
since_token = content.get("since", None) since_token = content.get("since", None)
search_filter = content.get("filter", None) search_filter = content.get("filter", None)
@ -971,7 +971,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
if get_domain_from_id(requester_user_id) != origin: if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin") raise SynapseError(403, "requester_user_id doesn't match origin")
result = await self.groups_handler.update_room_in_group( result = await self.handler.update_room_in_group(
group_id, requester_user_id, room_id, config_key, content group_id, requester_user_id, room_id, config_key, content
) )
@ -1422,11 +1422,13 @@ FEDERATION_SERVLET_CLASSES = (
On3pidBindServlet, On3pidBindServlet,
FederationVersionServlet, FederationVersionServlet,
RoomComplexityServlet, RoomComplexityServlet,
) ) # type: Tuple[Type[BaseFederationServlet], ...]
OPENID_SERVLET_CLASSES = (OpenIdUserInfo,) OPENID_SERVLET_CLASSES = (
OpenIdUserInfo,
) # type: Tuple[Type[BaseFederationServlet], ...]
ROOM_LIST_CLASSES = (PublicRoomList,) ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...]
GROUP_SERVER_SERVLET_CLASSES = ( GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsProfileServlet, FederationGroupsProfileServlet,
@ -1447,17 +1449,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsAddRoomsServlet, FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet, FederationGroupsAddRoomsConfigServlet,
FederationGroupsSettingJoinPolicyServlet, FederationGroupsSettingJoinPolicyServlet,
) ) # type: Tuple[Type[BaseFederationServlet], ...]
GROUP_LOCAL_SERVLET_CLASSES = ( GROUP_LOCAL_SERVLET_CLASSES = (
FederationGroupsLocalInviteServlet, FederationGroupsLocalInviteServlet,
FederationGroupsRemoveLocalUserServlet, FederationGroupsRemoveLocalUserServlet,
FederationGroupsBulkPublicisedServlet, FederationGroupsBulkPublicisedServlet,
) ) # type: Tuple[Type[BaseFederationServlet], ...]
GROUP_ATTESTATION_SERVLET_CLASSES = (FederationGroupsRenewAttestaionServlet,) GROUP_ATTESTATION_SERVLET_CLASSES = (
FederationGroupsRenewAttestaionServlet,
) # type: Tuple[Type[BaseFederationServlet], ...]
DEFAULT_SERVLET_GROUPS = ( DEFAULT_SERVLET_GROUPS = (
"federation", "federation",

View File

@ -31,6 +31,7 @@ from .commands import (
Command, Command,
FederationAckCommand, FederationAckCommand,
InvalidateCacheCommand, InvalidateCacheCommand,
RemoteServerUpCommand,
RemovePusherCommand, RemovePusherCommand,
UserIpCommand, UserIpCommand,
UserSyncCommand, UserSyncCommand,
@ -210,6 +211,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
self.send_command(cmd) self.send_command(cmd)
def send_remote_server_up(self, server: str):
self.send_command(RemoteServerUpCommand(server))
def await_sync(self, data): def await_sync(self, data):
"""Returns a deferred that is resolved when we receive a SYNC command """Returns a deferred that is resolved when we receive a SYNC command
with given data. with given data.

View File

@ -2,8 +2,8 @@ import twisted.internet
import synapse.api.auth import synapse.api.auth
import synapse.config.homeserver import synapse.config.homeserver
import synapse.crypto.keyring
import synapse.federation.sender import synapse.federation.sender
import synapse.federation.transaction_queue
import synapse.federation.transport.client import synapse.federation.transport.client
import synapse.handlers import synapse.handlers
import synapse.handlers.auth import synapse.handlers.auth
@ -17,6 +17,7 @@ import synapse.handlers.room_member
import synapse.handlers.set_password import synapse.handlers.set_password
import synapse.http.client import synapse.http.client
import synapse.notifier import synapse.notifier
import synapse.replication.tcp.client
import synapse.rest.media.v1.media_repository import synapse.rest.media.v1.media_repository
import synapse.server_notices.server_notices_manager import synapse.server_notices.server_notices_manager
import synapse.server_notices.server_notices_sender import synapse.server_notices.server_notices_sender
@ -27,6 +28,9 @@ class HomeServer(object):
@property @property
def config(self) -> synapse.config.homeserver.HomeServerConfig: def config(self) -> synapse.config.homeserver.HomeServerConfig:
pass pass
@property
def hostname(self) -> str:
pass
def get_auth(self) -> synapse.api.auth.Auth: def get_auth(self) -> synapse.api.auth.Auth:
pass pass
def get_auth_handler(self) -> synapse.handlers.auth.AuthHandler: def get_auth_handler(self) -> synapse.handlers.auth.AuthHandler:
@ -97,3 +101,9 @@ class HomeServer(object):
pass pass
def get_reactor(self) -> twisted.internet.base.ReactorBase: def get_reactor(self) -> twisted.internet.base.ReactorBase:
pass pass
def get_keyring(self) -> synapse.crypto.keyring.Keyring:
pass
def get_tcp_replication(
self,
) -> synapse.replication.tcp.client.ReplicationClientHandler:
pass

View File

@ -179,6 +179,7 @@ extras = all
commands = mypy \ commands = mypy \
synapse/api \ synapse/api \
synapse/config/ \ synapse/config/ \
synapse/federation/transport \
synapse/handlers/ui_auth \ synapse/handlers/ui_auth \
synapse/logging/ \ synapse/logging/ \
synapse/module_api \ synapse/module_api \