Merge pull request #889 from matrix-org/markjh/synctl_workers

Optionally start or stop workers in synctl.
This commit is contained in:
Mark Haines 2016-06-21 17:58:19 +01:00 committed by GitHub
commit 774f3a692c

View File

@ -14,11 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import argparse
import collections
import glob
import os
import os.path
import subprocess
import signal
import subprocess
import sys
import yaml
SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"]
@ -28,60 +31,181 @@ RED = "\x1b[1;31m"
NORMAL = "\x1b[m"
def write(message, colour=NORMAL, stream=sys.stdout):
if colour == NORMAL:
stream.write(message + "\n")
else:
stream.write(colour + message + NORMAL + "\n")
def start(configfile):
print ("Starting ...")
write("Starting ...")
args = SYNAPSE
args.extend(["--daemonize", "-c", configfile])
try:
subprocess.check_call(args)
print (GREEN + "started" + NORMAL)
write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
except subprocess.CalledProcessError as e:
print (
RED +
"error starting (exit code: %d); see above for logs" % e.returncode +
NORMAL
write(
"error starting (exit code: %d); see above for logs" % e.returncode,
colour=RED,
)
def stop(pidfile):
def start_worker(app, configfile, worker_configfile):
args = [
"python", "-B",
"-m", app,
"-c", configfile,
"-c", worker_configfile
]
try:
subprocess.check_call(args)
write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
"error starting %s(%r) (exit code: %d); see above for logs" % (
app, worker_configfile, e.returncode,
),
colour=RED,
)
def stop(pidfile, app):
if os.path.exists(pidfile):
pid = int(open(pidfile).read())
os.kill(pid, signal.SIGTERM)
print (GREEN + "stopped" + NORMAL)
write("stopped %s" % (app,), colour=GREEN)
Worker = collections.namedtuple("Worker", [
"app", "configfile", "pidfile", "cache_factor"
])
def main():
configfile = sys.argv[2] if len(sys.argv) == 3 else "homeserver.yaml"
if not os.path.exists(configfile):
sys.stderr.write(
"No config file found\n"
"To generate a config file, run '%s -c %s --generate-config"
" --server-name=<server name>'\n" % (
" ".join(SYNAPSE), configfile
parser = argparse.ArgumentParser()
parser.add_argument(
"action",
choices=["start", "stop", "restart"],
help="whether to start, stop or restart the synapse",
)
parser.add_argument(
"configfile",
nargs="?",
default="homeserver.yaml",
help="the homeserver config file, defaults to homserver.yaml",
)
parser.add_argument(
"-w", "--worker",
metavar="WORKERCONFIG",
help="start or stop a single worker",
)
parser.add_argument(
"-a", "--all-processes",
metavar="WORKERCONFIGDIR",
help="start or stop all the workers in the given directory"
" and the main synapse process",
)
options = parser.parse_args()
if options.worker and options.all_processes:
write(
'Cannot use "--worker" with "--all-processes"',
stream=sys.stderr
)
sys.exit(1)
config = yaml.load(open(configfile))
configfile = options.configfile
if not os.path.exists(configfile):
write(
"No config file found\n"
"To generate a config file, run '%s -c %s --generate-config"
" --server-name=<server name>'\n" % (
" ".join(SYNAPSE), options.configfile
),
stream=sys.stderr,
)
sys.exit(1)
with open(configfile) as stream:
config = yaml.load(stream)
pidfile = config["pid_file"]
cache_factor = config.get("synctl_cache_factor", None)
cache_factor = config.get("synctl_cache_factor")
start_stop_synapse = True
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
action = sys.argv[1] if sys.argv[1:] else "usage"
if action == "start":
start(configfile)
elif action == "stop":
stop(pidfile)
elif action == "restart":
stop(pidfile)
start(configfile)
else:
sys.stderr.write("Usage: %s [start|stop|restart] [configfile]\n" % (sys.argv[0],))
worker_configfiles = []
if options.worker:
start_stop_synapse = False
worker_configfile = options.worker
if not os.path.exists(worker_configfile):
write(
"No worker config found at %r" % (worker_configfile,),
stream=sys.stderr,
)
sys.exit(1)
worker_configfiles.append(worker_configfile)
if options.all_processes:
worker_configdir = options.all_processes
if not os.path.isdir(worker_configdir):
write(
"No worker config directory found at %r" % (worker_configdir,),
stream=sys.stderr,
)
sys.exit(1)
worker_configfiles.extend(sorted(glob.glob(
os.path.join(worker_configdir, "*.yaml")
)))
workers = []
for worker_configfile in worker_configfiles:
with open(worker_configfile) as stream:
worker_config = yaml.load(stream)
worker_app = worker_config["worker_app"]
worker_pidfile = worker_config["worker_pid_file"]
worker_daemonize = worker_config["worker_daemonize"]
assert worker_daemonize # TODO print something more user friendly
worker_cache_factor = worker_config.get("synctl_cache_factor")
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
))
action = options.action
if action == "stop" or action == "restart":
for worker in workers:
stop(worker.pidfile, worker.app)
if start_stop_synapse:
stop(pidfile, "synapse.app.homeserver")
# TODO: Wait for synapse to actually shutdown before starting it again
if action == "start" or action == "restart":
if start_stop_synapse:
start(configfile)
for worker in workers:
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
start_worker(worker.app, configfile, worker.configfile)
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
else:
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
if __name__ == "__main__":