Support routing edu's to multiple instances (#9042)

This is in preparation for moving `SendToDeviceServlet` off master
This commit is contained in:
Erik Johnston 2021-01-07 18:07:28 +00:00 committed by GitHub
parent e34df813ce
commit 5e99a94502
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 5 deletions

1
changelog.d/9042.feature Normal file
View File

@ -0,0 +1 @@
Add experimental support for handling and persistence of to-device messages to happen on worker processes.

View File

@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
import random
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
@ -860,8 +861,10 @@ class FederationHandlerRegistry:
) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]] self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]]
# Map from type to instance name that we should route EDU handling to. # Map from type to instance names that we should route EDU handling to.
self._edu_type_to_instance = {} # type: Dict[str, str] # We randomly choose one instance from the list to route to for each new
# EDU received.
self._edu_type_to_instance = {} # type: Dict[str, List[str]]
def register_edu_handler( def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
@ -905,7 +908,12 @@ class FederationHandlerRegistry:
def register_instance_for_edu(self, edu_type: str, instance_name: str): def register_instance_for_edu(self, edu_type: str, instance_name: str):
"""Register that the EDU handler is on a different instance than master. """Register that the EDU handler is on a different instance than master.
""" """
self._edu_type_to_instance[edu_type] = instance_name self._edu_type_to_instance[edu_type] = [instance_name]
def register_instances_for_edu(self, edu_type: str, instance_names: List[str]):
"""Register that the EDU handler is on multiple instances.
"""
self._edu_type_to_instance[edu_type] = instance_names
async def on_edu(self, edu_type: str, origin: str, content: dict): async def on_edu(self, edu_type: str, origin: str, content: dict):
if not self.config.use_presence and edu_type == "m.presence": if not self.config.use_presence and edu_type == "m.presence":
@ -924,8 +932,11 @@ class FederationHandlerRegistry:
return return
# Check if we can route it somewhere else that isn't us # Check if we can route it somewhere else that isn't us
route_to = self._edu_type_to_instance.get(edu_type, "master") instances = self._edu_type_to_instance.get(edu_type, ["master"])
if route_to != self._instance_name: if self._instance_name not in instances:
# Pick an instance randomly so that we don't overload one.
route_to = random.choice(instances)
try: try:
await self._send_edu( await self._send_edu(
instance_name=route_to, instance_name=route_to,