mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-12-11 11:34:23 -05:00
Add basic admin cmd app
This commit is contained in:
parent
65dd5543f6
commit
9f3c0a8556
@ -48,7 +48,7 @@ def register_sighup(func):
|
|||||||
_sighup_callbacks.append(func)
|
_sighup_callbacks.append(func)
|
||||||
|
|
||||||
|
|
||||||
def start_worker_reactor(appname, config):
|
def start_worker_reactor(appname, config, run_command=reactor.run):
|
||||||
""" Run the reactor in the main process
|
""" Run the reactor in the main process
|
||||||
|
|
||||||
Daemonizes if necessary, and then configures some resources, before starting
|
Daemonizes if necessary, and then configures some resources, before starting
|
||||||
@ -57,6 +57,7 @@ def start_worker_reactor(appname, config):
|
|||||||
Args:
|
Args:
|
||||||
appname (str): application name which will be sent to syslog
|
appname (str): application name which will be sent to syslog
|
||||||
config (synapse.config.Config): config object
|
config (synapse.config.Config): config object
|
||||||
|
run_command (Callable[]): callable that actually runs the reactor
|
||||||
"""
|
"""
|
||||||
|
|
||||||
logger = logging.getLogger(config.worker_app)
|
logger = logging.getLogger(config.worker_app)
|
||||||
@ -69,11 +70,19 @@ def start_worker_reactor(appname, config):
|
|||||||
daemonize=config.worker_daemonize,
|
daemonize=config.worker_daemonize,
|
||||||
print_pidfile=config.print_pidfile,
|
print_pidfile=config.print_pidfile,
|
||||||
logger=logger,
|
logger=logger,
|
||||||
|
run_command=run_command,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def start_reactor(
|
def start_reactor(
|
||||||
appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger
|
appname,
|
||||||
|
soft_file_limit,
|
||||||
|
gc_thresholds,
|
||||||
|
pid_file,
|
||||||
|
daemonize,
|
||||||
|
print_pidfile,
|
||||||
|
logger,
|
||||||
|
run_command=reactor.run,
|
||||||
):
|
):
|
||||||
""" Run the reactor in the main process
|
""" Run the reactor in the main process
|
||||||
|
|
||||||
@ -88,6 +97,7 @@ def start_reactor(
|
|||||||
daemonize (bool): true to run the reactor in a background process
|
daemonize (bool): true to run the reactor in a background process
|
||||||
print_pidfile (bool): whether to print the pid file, if daemonize is True
|
print_pidfile (bool): whether to print the pid file, if daemonize is True
|
||||||
logger (logging.Logger): logger instance to pass to Daemonize
|
logger (logging.Logger): logger instance to pass to Daemonize
|
||||||
|
run_command (Callable[]): callable that actually runs the reactor
|
||||||
"""
|
"""
|
||||||
|
|
||||||
install_dns_limiter(reactor)
|
install_dns_limiter(reactor)
|
||||||
@ -103,7 +113,8 @@ def start_reactor(
|
|||||||
change_resource_limit(soft_file_limit)
|
change_resource_limit(soft_file_limit)
|
||||||
if gc_thresholds:
|
if gc_thresholds:
|
||||||
gc.set_threshold(*gc_thresholds)
|
gc.set_threshold(*gc_thresholds)
|
||||||
reactor.run()
|
|
||||||
|
run_command()
|
||||||
|
|
||||||
if daemonize:
|
if daemonize:
|
||||||
if print_pidfile:
|
if print_pidfile:
|
||||||
|
198
synapse/app/admin_cmd.py
Normal file
198
synapse/app/admin_cmd.py
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from twisted.internet import defer, task
|
||||||
|
|
||||||
|
import synapse
|
||||||
|
from synapse.app import _base
|
||||||
|
from synapse.config._base import ConfigError
|
||||||
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
|
from synapse.config.logger import setup_logging
|
||||||
|
from synapse.handlers.admin import FileExfiltrationWriter
|
||||||
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
|
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||||
|
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||||
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
|
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
|
||||||
|
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||||
|
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||||
|
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||||
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||||
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
|
from synapse.replication.slave.storage.room import RoomStore
|
||||||
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
from synapse.storage.engines import create_engine
|
||||||
|
from synapse.util.logcontext import LoggingContext
|
||||||
|
from synapse.util.versionstring import get_version_string
|
||||||
|
|
||||||
|
logger = logging.getLogger("synapse.app.admin_cmd")
|
||||||
|
|
||||||
|
|
||||||
|
class AdminCmdSlavedStore(
|
||||||
|
SlavedReceiptsStore,
|
||||||
|
SlavedAccountDataStore,
|
||||||
|
SlavedApplicationServiceStore,
|
||||||
|
SlavedRegistrationStore,
|
||||||
|
SlavedFilteringStore,
|
||||||
|
SlavedPresenceStore,
|
||||||
|
SlavedGroupServerStore,
|
||||||
|
SlavedDeviceInboxStore,
|
||||||
|
SlavedDeviceStore,
|
||||||
|
SlavedPushRuleStore,
|
||||||
|
SlavedEventStore,
|
||||||
|
SlavedClientIpStore,
|
||||||
|
RoomStore,
|
||||||
|
BaseSlavedStore,
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AdminCmdServer(HomeServer):
|
||||||
|
DATASTORE_CLASS = AdminCmdSlavedStore
|
||||||
|
|
||||||
|
def _listen_http(self, listener_config):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start_listening(self, listeners):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def build_tcp_replication(self):
|
||||||
|
return AdminCmdReplicationHandler(self)
|
||||||
|
|
||||||
|
|
||||||
|
class AdminCmdReplicationHandler(ReplicationClientHandler):
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_rdata(self, stream_name, token, rows):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_streams_to_replicate(self):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def export_data_command(hs, user_id, directory):
|
||||||
|
"""Export data for a user.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id (str)
|
||||||
|
directory (str|None): Directory to write output to. Will create a temp
|
||||||
|
directory if not specified.
|
||||||
|
"""
|
||||||
|
|
||||||
|
res = yield hs.get_handlers().admin_handler.exfiltrate_user_data(
|
||||||
|
user_id, FileExfiltrationWriter(user_id, directory=directory)
|
||||||
|
)
|
||||||
|
print(res)
|
||||||
|
|
||||||
|
|
||||||
|
def start(config_options):
|
||||||
|
parser = HomeServerConfig.create_argument_parser("Synapse Admin Command")
|
||||||
|
|
||||||
|
subparser = parser.add_subparsers(
|
||||||
|
title="Admin Commands",
|
||||||
|
description="Choose and admin command to perform.",
|
||||||
|
required=True,
|
||||||
|
dest="command",
|
||||||
|
metavar="<admin_command>",
|
||||||
|
help="The admin command to perform.",
|
||||||
|
)
|
||||||
|
export_data_parser = subparser.add_parser(
|
||||||
|
"export-data", help="Export all data for a user"
|
||||||
|
)
|
||||||
|
export_data_parser.add_argument("user_id", help="User to extra data from")
|
||||||
|
export_data_parser.add_argument(
|
||||||
|
"--output-directory",
|
||||||
|
action="store",
|
||||||
|
metavar="DIRECTORY",
|
||||||
|
required=False,
|
||||||
|
help="The directory to store the exported data in. Must be emtpy. Defaults"
|
||||||
|
" to creating a temp directory.",
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
|
||||||
|
except ConfigError as e:
|
||||||
|
sys.stderr.write("\n" + str(e) + "\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if config.worker_app is not None:
|
||||||
|
assert config.worker_app == "synapse.app.admin_cmd"
|
||||||
|
|
||||||
|
# Update the config with some basic overrides so that don't have to specify
|
||||||
|
# a full worker config.
|
||||||
|
config.worker_app = "synapse.app.admin_cmd"
|
||||||
|
|
||||||
|
if (
|
||||||
|
not config.worker_daemonize
|
||||||
|
and not config.worker_log_file
|
||||||
|
and not config.worker_log_config
|
||||||
|
):
|
||||||
|
# Since we're meant to be run as a "command" let's not redirect stdio
|
||||||
|
# unless we've actually set log config.
|
||||||
|
config.no_redirect_stdio = True
|
||||||
|
|
||||||
|
# Explicitly disable background processes
|
||||||
|
config.update_user_directory = False
|
||||||
|
config.start_pushers = False
|
||||||
|
config.send_federation = False
|
||||||
|
|
||||||
|
setup_logging(config, use_worker_options=True)
|
||||||
|
|
||||||
|
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||||
|
|
||||||
|
database_engine = create_engine(config.database_config)
|
||||||
|
|
||||||
|
ss = AdminCmdServer(
|
||||||
|
config.server_name,
|
||||||
|
db_config=config.database_config,
|
||||||
|
config=config,
|
||||||
|
version_string="Synapse/" + get_version_string(synapse),
|
||||||
|
database_engine=database_engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
ss.setup()
|
||||||
|
|
||||||
|
if args.command == "export-data":
|
||||||
|
command = lambda: export_data_command(ss, args.user_id, args.output_directory)
|
||||||
|
else:
|
||||||
|
# This shouldn't happen.
|
||||||
|
raise ConfigError("Unknown admin command %s" % (args.command,))
|
||||||
|
|
||||||
|
# We use task.react as the basic run command as it correctly handles tearing
|
||||||
|
# down the reactor when the deferreds resolve and setting the return value.
|
||||||
|
# We also make sure that `_base.start` gets run before we actually run the
|
||||||
|
# command.
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def run(_reactor):
|
||||||
|
with LoggingContext("command"):
|
||||||
|
yield _base.start(ss, [])
|
||||||
|
yield command()
|
||||||
|
|
||||||
|
_base.start_worker_reactor(
|
||||||
|
"synapse-admin-cmd", config, run_command=lambda: task.react(run)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
with LoggingContext("main"):
|
||||||
|
start(sys.argv[1:])
|
@ -201,6 +201,26 @@ class Config(object):
|
|||||||
|
|
||||||
Returns: Config object.
|
Returns: Config object.
|
||||||
"""
|
"""
|
||||||
|
config_parser = cls.create_argument_parser(description)
|
||||||
|
obj, _ = cls.load_config_with_parser(config_parser, argv)
|
||||||
|
|
||||||
|
return obj
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create_argument_parser(cls, description):
|
||||||
|
"""Create an ArgumentParser instance with all the config flags.
|
||||||
|
|
||||||
|
Doesn't support config-file-generation: used by the worker apps.
|
||||||
|
|
||||||
|
Used for workers where we want to add extra flags/subcommands.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
description (str): App description
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ArgumentParser
|
||||||
|
"""
|
||||||
|
|
||||||
config_parser = argparse.ArgumentParser(description=description)
|
config_parser = argparse.ArgumentParser(description=description)
|
||||||
config_parser.add_argument(
|
config_parser.add_argument(
|
||||||
"-c",
|
"-c",
|
||||||
@ -219,9 +239,31 @@ class Config(object):
|
|||||||
" Defaults to the directory containing the last config file",
|
" Defaults to the directory containing the last config file",
|
||||||
)
|
)
|
||||||
|
|
||||||
obj = cls()
|
# We can only invoke `add_arguments` on an actual object, but
|
||||||
|
# `add_arguments` should be side effect free so this is probably fine.
|
||||||
|
cls().invoke_all("add_arguments", config_parser)
|
||||||
|
|
||||||
obj.invoke_all("add_arguments", config_parser)
|
return config_parser
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load_config_with_parser(cls, config_parser, argv):
|
||||||
|
"""Parse the commandline and config files with the given parser
|
||||||
|
|
||||||
|
Doesn't support config-file-generation: used by the worker apps.
|
||||||
|
|
||||||
|
Used for workers where we want to add extra flags/subcommands.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
conifg_parser (ArgumentParser)
|
||||||
|
argv (list[str])
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[HomeServerConfig, argparse.Namespace]: Returns the parsed
|
||||||
|
config object and the parsed argparse.Namespace object from
|
||||||
|
`config_parser.parse_args(..)`
|
||||||
|
"""
|
||||||
|
|
||||||
|
obj = cls()
|
||||||
|
|
||||||
config_args = config_parser.parse_args(argv)
|
config_args = config_parser.parse_args(argv)
|
||||||
|
|
||||||
@ -244,7 +286,7 @@ class Config(object):
|
|||||||
|
|
||||||
obj.invoke_all("read_arguments", config_args)
|
obj.invoke_all("read_arguments", config_args)
|
||||||
|
|
||||||
return obj
|
return obj, config_args
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load_or_generate_config(cls, description, argv):
|
def load_or_generate_config(cls, description, argv):
|
||||||
|
Loading…
Reference in New Issue
Block a user