mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-01 15:06:03 -04:00
Update all the workers and master to use TCP replication
This commit is contained in:
parent
3a1f3f8388
commit
36c28bc467
9 changed files with 255 additions and 397 deletions
|
@ -26,8 +26,8 @@ from synapse.replication.slave.storage.directory import DirectoryStore
|
|||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
|
@ -36,7 +36,7 @@ from synapse.util.versionstring import get_version_string
|
|||
|
||||
from synapse import events
|
||||
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from daemonize import Daemonize
|
||||
|
@ -120,30 +120,23 @@ class AppserviceServer(HomeServer):
|
|||
else:
|
||||
logger.warn("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def replicate(self):
|
||||
http_client = self.get_simple_http_client()
|
||||
store = self.get_datastore()
|
||||
replication_url = self.config.worker_replication_url
|
||||
appservice_handler = self.get_application_service_handler()
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def replicate(results):
|
||||
stream = results.get("events")
|
||||
if stream:
|
||||
max_stream_id = stream["position"]
|
||||
yield appservice_handler.notify_interested_services(max_stream_id)
|
||||
def build_tcp_replication(self):
|
||||
return ASReplicationHandler(self)
|
||||
|
||||
while True:
|
||||
try:
|
||||
args = store.stream_positions()
|
||||
args["timeout"] = 30000
|
||||
result = yield http_client.get_json(replication_url, args=args)
|
||||
yield store.process_replication(result)
|
||||
replicate(result)
|
||||
except:
|
||||
logger.exception("Error replicating from %r", replication_url)
|
||||
yield sleep(30)
|
||||
|
||||
class ASReplicationHandler(ReplicationClientHandler):
|
||||
def __init__(self, hs):
|
||||
super(ASReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
|
||||
if stream_name == "events":
|
||||
max_stream_id = self.store.get_room_max_stream_ordering()
|
||||
self.appservice_handler.notify_interested_services(max_stream_id)
|
||||
|
||||
|
||||
def start(config_options):
|
||||
|
@ -199,7 +192,6 @@ def start(config_options):
|
|||
reactor.run()
|
||||
|
||||
def start():
|
||||
ps.replicate()
|
||||
ps.get_datastore().start_profiling()
|
||||
ps.get_state_handler().start_caching()
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue