Use worker_ prefixes for worker config, use existing support for multiple config files

This commit is contained in:
Mark Haines 2016-06-16 17:29:50 +01:00
parent 364d616792
commit a352b68acf
5 changed files with 33 additions and 79 deletions

View File

@ -111,7 +111,7 @@ class PusherServer(HomeServer):
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
http_client = self.get_simple_http_client() http_client = self.get_simple_http_client()
replication_url = self.worker_config.replication_url replication_url = self.config.worker_replication_url
url = replication_url + "/remove_pushers" url = replication_url + "/remove_pushers"
return http_client.post_json_get_json(url, { return http_client.post_json_get_json(url, {
"remove": [{ "remove": [{
@ -165,7 +165,7 @@ class PusherServer(HomeServer):
def replicate(self): def replicate(self):
http_client = self.get_simple_http_client() http_client = self.get_simple_http_client()
store = self.get_datastore() store = self.get_datastore()
replication_url = self.worker_config.replication_url replication_url = self.config.worker_replication_url
pusher_pool = self.get_pusherpool() pusher_pool = self.get_pusherpool()
clock = self.get_clock() clock = self.get_clock()
@ -240,11 +240,8 @@ class PusherServer(HomeServer):
logger.exception("Error replicating from %r", replication_url) logger.exception("Error replicating from %r", replication_url)
yield sleep(30) yield sleep(30)
def get_event_cache_size(self):
return self.worker_config.event_cache_size
def start(config_options):
def setup(worker_name, config_options):
try: try:
config = HomeServerConfig.load_config( config = HomeServerConfig.load_config(
"Synapse pusher", config_options "Synapse pusher", config_options
@ -253,9 +250,9 @@ def setup(worker_name, config_options):
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + e.message + "\n")
sys.exit(1) sys.exit(1)
worker_config = config.workers[worker_name] assert config.worker_app == "synapse.app.pusher"
setup_logging(worker_config.log_config, worker_config.log_file) setup_logging(config.worker_log_config, config.worker_log_file)
if config.start_pushers: if config.start_pushers:
sys.stderr.write( sys.stderr.write(
@ -275,20 +272,19 @@ def setup(worker_name, config_options):
config.server_name, config.server_name,
db_config=config.database_config, db_config=config.database_config,
config=config, config=config,
worker_config=worker_config,
version_string=get_version_string("Synapse", synapse), version_string=get_version_string("Synapse", synapse),
database_engine=database_engine, database_engine=database_engine,
) )
ps.setup() ps.setup()
ps.start_listening(worker_config.listeners) ps.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): with LoggingContext("run"):
logger.info("Running") logger.info("Running")
change_resource_limit(worker_config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if worker_config.gc_thresholds: if config.gc_thresholds:
ps.set_threshold(worker_config.gc_thresholds) ps.set_threshold(config.gc_thresholds)
reactor.run() reactor.run()
def start(): def start():
@ -298,10 +294,10 @@ def setup(worker_name, config_options):
reactor.callWhenRunning(start) reactor.callWhenRunning(start)
if worker_config.daemonize: if config.worker_daemonize:
daemon = Daemonize( daemon = Daemonize(
app="synapse-pusher", app="synapse-pusher",
pid=worker_config.pid_file, pid=config.worker_pid_file,
action=run, action=run,
auto_close_fds=False, auto_close_fds=False,
verbose=True, verbose=True,
@ -314,5 +310,4 @@ def setup(worker_name, config_options):
if __name__ == '__main__': if __name__ == '__main__':
with LoggingContext("main"): with LoggingContext("main"):
worker_name = sys.argv[1] ps = start(sys.argv[1:])
ps = setup(worker_name, sys.argv[2:])

View File

@ -97,7 +97,7 @@ class SynchrotronPresence(object):
self.http_client = hs.get_simple_http_client() self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.user_to_num_current_syncs = {} self.user_to_num_current_syncs = {}
self.syncing_users_url = hs.worker_config.replication_url + "/syncing_users" self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
self.clock = hs.get_clock() self.clock = hs.get_clock()
active_presence = self.store.take_presence_startup_info() active_presence = self.store.take_presence_startup_info()
@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
def replicate(self): def replicate(self):
http_client = self.get_simple_http_client() http_client = self.get_simple_http_client()
store = self.get_datastore() store = self.get_datastore()
replication_url = self.worker_config.replication_url replication_url = self.config.worker_replication_url
clock = self.get_clock() clock = self.get_clock()
notifier = self.get_notifier() notifier = self.get_notifier()
presence_handler = self.get_presence_handler() presence_handler = self.get_presence_handler()
@ -403,11 +403,8 @@ class SynchrotronServer(HomeServer):
def build_typing_handler(self): def build_typing_handler(self):
return SynchrotronTyping(self) return SynchrotronTyping(self)
def get_event_cache_size(self):
return self.worker_config.event_cache_size
def start(config_options):
def start(worker_name, config_options):
try: try:
config = HomeServerConfig.load_config( config = HomeServerConfig.load_config(
"Synapse synchrotron", config_options "Synapse synchrotron", config_options
@ -416,9 +413,9 @@ def start(worker_name, config_options):
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + e.message + "\n")
sys.exit(1) sys.exit(1)
worker_config = config.workers[worker_name] assert config.worker_app == "synapse.app.synchrotron"
setup_logging(worker_config.log_config, worker_config.log_file) setup_logging(config.worker_log_config, config.worker_log_file)
database_engine = create_engine(config.database_config) database_engine = create_engine(config.database_config)
@ -426,21 +423,20 @@ def start(worker_name, config_options):
config.server_name, config.server_name,
db_config=config.database_config, db_config=config.database_config,
config=config, config=config,
worker_config=worker_config,
version_string=get_version_string("Synapse", synapse), version_string=get_version_string("Synapse", synapse),
database_engine=database_engine, database_engine=database_engine,
application_service_handler=SynchrotronApplicationService(), application_service_handler=SynchrotronApplicationService(),
) )
ss.setup() ss.setup()
ss.start_listening(worker_config.listeners) ss.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): with LoggingContext("run"):
logger.info("Running") logger.info("Running")
change_resource_limit(worker_config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if worker_config.gc_thresholds: if config.gc_thresholds:
ss.set_threshold(worker_config.gc_thresholds) ss.set_threshold(config.gc_thresholds)
reactor.run() reactor.run()
def start(): def start():
@ -449,10 +445,10 @@ def start(worker_name, config_options):
reactor.callWhenRunning(start) reactor.callWhenRunning(start)
if worker_config.daemonize: if config.worker_daemonize:
daemon = Daemonize( daemon = Daemonize(
app="synapse-synchrotron", app="synapse-synchrotron",
pid=worker_config.pid_file, pid=config.worker_pid_file,
action=run, action=run,
auto_close_fds=False, auto_close_fds=False,
verbose=True, verbose=True,
@ -465,5 +461,4 @@ def start(worker_name, config_options):
if __name__ == '__main__': if __name__ == '__main__':
with LoggingContext("main"): with LoggingContext("main"):
worker_name = sys.argv[1] start(sys.argv[1:])
start(worker_name, sys.argv[2:])

View File

@ -13,52 +13,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import collections
from ._base import Config from ._base import Config
from .server import read_gc_thresholds
Worker = collections.namedtuple("Worker", [
"app",
"listeners",
"pid_file",
"daemonize",
"log_file",
"log_config",
"event_cache_size",
"soft_file_limit",
"gc_thresholds",
"replication_url",
])
def read_worker_config(config):
return Worker(
app=config["app"],
listeners=config.get("listeners", []),
pid_file=config.get("pid_file"),
daemonize=config["daemonize"],
log_file=config.get("log_file"),
log_config=config.get("log_config"),
event_cache_size=Config.parse_size(config.get("event_cache_size", "10K")),
soft_file_limit=config.get("soft_file_limit"),
gc_thresholds=read_gc_thresholds(config.get("gc_thresholds")),
replication_url=config.get("replication_url"),
)
class WorkerConfig(Config): class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process. """The workers are processes run separately to the main synapse process.
Each worker has a name that identifies it within the config file.
They have their own pid_file and listener configuration. They use the They have their own pid_file and listener configuration. They use the
replication_url to talk to the main synapse process. They have their replication_url to talk to the main synapse process."""
own cache size tuning, gc threshold tuning and open file limits."""
def read_config(self, config): def read_config(self, config):
workers = config.get("workers", {}) self.worker_app = config.get("worker_app")
self.worker_listeners = config.get("worker_listeners")
self.workers = { self.worker_daemonize = config.get("worker_daemonize")
worker_name: read_worker_config(worker_config) self.worker_pid_file = config.get("worker_pid_file")
for worker_name, worker_config in workers.items() self.worker_log_file = config.get("worker_log_file")
} self.worker_log_config = config.get("worker_log_config")
self.worker_replication_url = config.get("worker_replication_url")

View File

@ -236,9 +236,6 @@ class HomeServer(object):
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
def get_event_cache_size(self):
return self.config.event_cache_size
def _make_dependency_method(depname): def _make_dependency_method(depname):
def _get(hs): def _get(hs):

View File

@ -166,7 +166,7 @@ class SQLBaseStore(object):
self._get_event_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters()
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.get_event_cache_size()) max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache( self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR