Merge branch 'develop' into http_client_refactor

This commit is contained in:
David Baker 2014-11-20 17:49:48 +00:00
commit f1c7f8e813
35 changed files with 191 additions and 138 deletions

View File

@ -17,6 +17,8 @@
import logging
logger = logging.getLogger(__name__)
class Codes(object):
UNAUTHORIZED = "M_UNAUTHORIZED"
@ -38,7 +40,7 @@ class CodeMessageException(Exception):
"""An exception with integer code and message string attributes."""
def __init__(self, code, msg):
logging.error("%s: %s, %s", type(self).__name__, code, msg)
logger.info("%s: %s, %s", type(self).__name__, code, msg)
super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
self.code = code
self.msg = msg
@ -140,7 +142,8 @@ def cs_exception(exception):
if isinstance(exception, CodeMessageException):
return exception.error_dict()
else:
logging.error("Unknown exception type: %s", type(exception))
logger.error("Unknown exception type: %s", type(exception))
return {}
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):

View File

@ -83,6 +83,8 @@ class SynapseEvent(JsonEncodedObject):
"content",
]
outlier = False
def __init__(self, raises=True, **kwargs):
super(SynapseEvent, self).__init__(**kwargs)
# if "content" in kwargs:

View File

@ -116,7 +116,7 @@ class SynapseHomeServer(HomeServer):
# extra resources to existing nodes. See self._resource_id for the key.
resource_mappings = {}
for (full_path, resource) in desired_tree:
logging.info("Attaching %s to path %s", resource, full_path)
logger.info("Attaching %s to path %s", resource, full_path)
last_resource = self.root_resource
for path_seg in full_path.split('/')[1:-1]:
if not path_seg in last_resource.listNames():
@ -221,12 +221,12 @@ def setup():
db_name = hs.get_db_name()
logging.info("Preparing database: %s...", db_name)
logger.info("Preparing database: %s...", db_name)
with sqlite3.connect(db_name) as db_conn:
prepare_database(db_conn)
logging.info("Database prepared in %s.", db_name)
logger.info("Database prepared in %s.", db_name)
hs.get_db_pool()
@ -257,13 +257,16 @@ def setup():
else:
reactor.run()
def run():
with LoggingContext("run"):
reactor.run()
def main():
with LoggingContext("main"):
setup()
if __name__ == '__main__':
main()

View File

@ -27,6 +27,7 @@ PIDFILE="homeserver.pid"
GREEN = "\x1b[1;32m"
NORMAL = "\x1b[m"
def start():
if not os.path.exists(CONFIGFILE):
sys.stderr.write(
@ -43,12 +44,14 @@ def start():
subprocess.check_call(args)
print GREEN + "started" + NORMAL
def stop():
if os.path.exists(PIDFILE):
pid = int(open(PIDFILE).read())
os.kill(pid, signal.SIGTERM)
print GREEN + "stopped" + NORMAL
def main():
action = sys.argv[1] if sys.argv[1:] else "usage"
if action == "start":
@ -62,5 +65,6 @@ def main():
sys.stderr.write("Usage: %s [start|stop|restart]\n" % (sys.argv[0],))
sys.exit(1)
if __name__=='__main__':
if __name__ == "__main__":
main()

View File

@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
"""Check whether the hash for this PDU matches the contents"""
computed_hash = _compute_content_hash(event, hash_algorithm)
logging.debug("Expecting hash: %s", encode_base64(computed_hash.digest()))
logger.debug("Expecting hash: %s", encode_base64(computed_hash.digest()))
if computed_hash.name not in event.hashes:
raise SynapseError(
400,

View File

@ -135,7 +135,7 @@ class Keyring(object):
time_now_ms = self.clock.time_msec()
self.store.store_server_certificate(
yield self.store.store_server_certificate(
server_name,
server_name,
time_now_ms,
@ -143,7 +143,7 @@ class Keyring(object):
)
for key_id, key in verify_keys.items():
self.store.store_server_verify_key(
yield self.store.store_server_verify_key(
server_name, server_name, time_now_ms, key
)

View File

@ -24,6 +24,7 @@ from .units import Transaction, Edu
from .persistence import TransactionActions
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
import logging
@ -319,6 +320,7 @@ class ReplicationLayer(object):
logger.debug("[%s] Transacition is new", transaction.transaction_id)
with PreserveLoggingContext():
dl = []
for pdu in pdu_list:
dl.append(self._handle_new_pdu(transaction.origin, pdu))
@ -425,7 +427,9 @@ class ReplicationLayer(object):
time_now = self._clock.time_msec()
defer.returnValue((200, {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]],
"auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
],
}))
@defer.inlineCallbacks
@ -436,7 +440,9 @@ class ReplicationLayer(object):
(
200,
{
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
"auth_chain": [
a.get_pdu_json(time_now) for a in auth_pdus
],
}
)
)
@ -649,6 +655,7 @@ class _TransactionQueue(object):
(pdu, deferred, order)
)
with PreserveLoggingContext():
self._attempt_new_transaction(destination)
deferreds.append(deferred)
@ -669,6 +676,8 @@ class _TransactionQueue(object):
deferred.errback(failure)
else:
logger.exception("Failed to send edu", failure)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb)
return deferred

View File

@ -25,7 +25,6 @@ import logging
logger = logging.getLogger(__name__)
class Edu(JsonEncodedObject):
""" An Edu represents a piece of data sent from one homeserver to another.

View File

@ -112,7 +112,7 @@ class BaseHandler(object):
event.destinations = list(destinations)
self.notifier.on_new_room_event(event, extra_users=extra_users)
yield self.notifier.on_new_room_event(event, extra_users=extra_users)
federation_handler = self.hs.get_handlers().federation_handler
yield federation_handler.handle_new_event(event, snapshot)

View File

@ -128,8 +128,9 @@ class DirectoryHandler(BaseHandler):
"servers": result.servers,
})
else:
raise SynapseError(404, "Room alias \"%s\" not found" % (room_alias,))
raise SynapseError(
404, "Room alias \"%s\" not found" % (room_alias,)
)
@defer.inlineCallbacks
def send_room_alias_update_event(self, user_id, room_id):

View File

@ -56,7 +56,7 @@ class EventStreamHandler(BaseHandler):
self.clock.cancel_call_later(
self._stop_timer_per_user.pop(auth_user))
else:
self.distributor.fire(
yield self.distributor.fire(
"started_user_eventstream", auth_user
)
self._streams_per_user[auth_user] += 1
@ -65,8 +65,10 @@ class EventStreamHandler(BaseHandler):
pagin_config.from_token = None
rm_handler = self.hs.get_handlers().room_member_handler
logger.debug("BETA")
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
logger.debug("ALPHA")
with PreserveLoggingContext():
events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout
@ -93,7 +95,7 @@ class EventStreamHandler(BaseHandler):
logger.debug(
"_later stopped_user_eventstream %s", auth_user
)
self.distributor.fire(
yield self.distributor.fire(
"stopped_user_eventstream", auth_user
)
del self._stop_timer_per_user[auth_user]

View File

@ -122,7 +122,8 @@ class FederationHandler(BaseHandler):
event.origin, redacted_pdu_json
)
except SynapseError as e:
logger.warn("Signature check failed for %s redacted to %s",
logger.warn(
"Signature check failed for %s redacted to %s",
encode_canonical_json(pdu.get_pdu_json()),
encode_canonical_json(redacted_pdu_json),
)
@ -209,7 +210,7 @@ class FederationHandler(BaseHandler):
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
self.distributor.fire(
yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)
@ -390,7 +391,8 @@ class FederationHandler(BaseHandler):
event.outlier = False
is_new_state = yield self.state_handler.annotate_event_with_state(event)
state_handler = self.state_handler
is_new_state = yield state_handler.annotate_event_with_state(event)
self.auth.check(event, raises=True)
# FIXME (erikj): All this is duplicated above :(
@ -414,7 +416,7 @@ class FederationHandler(BaseHandler):
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
self.distributor.fire(
yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)

View File

@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.errors import RoomError
from synapse.streams.config import PaginationConfig
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
import logging
@ -86,6 +87,7 @@ class MessageHandler(BaseHandler):
event, snapshot, suppress_auth=suppress_auth
)
with PreserveLoggingContext():
self.hs.get_handlers().presence_handler.bump_presence_active_time(
user
)
@ -340,8 +342,8 @@ class MessageHandler(BaseHandler):
)
presence.append(member_presence)
except Exception:
logger.exception("Failed to get member presence of %r",
m.user_id
logger.exception(
"Failed to get member presence of %r", m.user_id
)
defer.returnValue({

View File

@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@ -142,7 +143,7 @@ class PresenceHandler(BaseHandler):
return UserPresenceCache()
def registered_user(self, user):
self.store.create_presence(user.localpart)
return self.store.create_presence(user.localpart)
@defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user):
@ -241,14 +242,12 @@ class PresenceHandler(BaseHandler):
was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
now_level = self.STATE_LEVELS[state["presence"]]
yield defer.DeferredList([
self.store.set_presence_state(
yield self.store.set_presence_state(
target_user.localpart, state_to_store
),
self.distributor.fire(
)
yield self.distributor.fire(
"collect_presencelike_data", target_user, state
),
])
)
if now_level > was_level:
state["last_active"] = self.clock.time_msec()
@ -256,6 +255,7 @@ class PresenceHandler(BaseHandler):
now_online = state["presence"] != PresenceState.OFFLINE
was_polling = target_user in self._user_cachemap
with PreserveLoggingContext():
if now_online and not was_polling:
self.start_polling_presence(target_user, state=state)
elif not now_online and was_polling:
@ -277,7 +277,7 @@ class PresenceHandler(BaseHandler):
self._user_cachemap_latest_serial += 1
statuscache.update(state, serial=self._user_cachemap_latest_serial)
self.push_presence(user, statuscache=statuscache)
return self.push_presence(user, statuscache=statuscache)
@log_function
def started_user_eventstream(self, user):
@ -381,8 +381,10 @@ class PresenceHandler(BaseHandler):
yield self.store.set_presence_list_accepted(
observer_user.localpart, observed_user.to_string()
)
self.start_polling_presence(observer_user, target_user=observed_user)
with PreserveLoggingContext():
self.start_polling_presence(
observer_user, target_user=observed_user
)
@defer.inlineCallbacks
def deny_presence(self, observed_user, observer_user):
@ -401,7 +403,10 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string()
)
self.stop_polling_presence(observer_user, target_user=observed_user)
with PreserveLoggingContext():
self.stop_polling_presence(
observer_user, target_user=observed_user
)
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@ -710,6 +715,7 @@ class PresenceHandler(BaseHandler):
if not self._remote_sendmap[user]:
del self._remote_sendmap[user]
with PreserveLoggingContext():
yield defer.DeferredList(deferreds)
@defer.inlineCallbacks

View File

@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.api.constants import Membership
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@ -46,7 +47,7 @@ class ProfileHandler(BaseHandler):
)
def registered_user(self, user):
self.store.create_profile(user.localpart)
return self.store.create_profile(user.localpart)
@defer.inlineCallbacks
def get_displayname(self, target_user):
@ -152,6 +153,7 @@ class ProfileHandler(BaseHandler):
if not user.is_mine:
defer.returnValue(None)
with PreserveLoggingContext():
(displayname, avatar_url) = yield defer.gatherResults(
[
self.store.get_profile_displayname(user.localpart),

View File

@ -69,7 +69,7 @@ class RegistrationHandler(BaseHandler):
password_hash=password_hash
)
self.distributor.fire("registered_user", user)
yield self.distributor.fire("registered_user", user)
else:
# autogen a random user ID
attempts = 0

View File

@ -178,7 +178,9 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
directory_handler.send_room_alias_update_event(user_id, room_id)
yield directory_handler.send_room_alias_update_event(
user_id, room_id
)
defer.returnValue(result)
@ -211,7 +213,6 @@ class RoomCreationHandler(BaseHandler):
**event_keys
)
power_levels_event = self.event_factory.create_event(
etype=RoomPowerLevelsEvent.TYPE,
content={
@ -480,7 +481,7 @@ class RoomMemberHandler(BaseHandler):
)
user = self.hs.parse_userid(event.user_id)
self.distributor.fire(
yield self.distributor.fire(
"user_joined_room", user=user, room_id=room_id
)

View File

@ -131,11 +131,13 @@ class ContentRepoResource(resource.Resource):
request.setHeader('Content-Type', content_type)
# cache for at least a day.
# XXX: we might want to turn this off for data we don't want to recommend
# caching as it's sensitive or private - or at least select private.
# don't bother setting Expires as all our matrix clients are smart enough to
# be happy with Cache-Control (right?)
request.setHeader('Cache-Control', 'public,max-age=86400,s-maxage=86400')
# XXX: we might want to turn this off for data we don't want to
# recommend caching as it's sensitive or private - or at least
# select private. don't bother setting Expires as all our matrix
# clients are smart enough to be happy with Cache-Control (right?)
request.setHeader(
"Cache-Control", "public,max-age=86400,s-maxage=86400"
)
d = FileSender().beginFileTransfer(f, request)

View File

@ -138,8 +138,7 @@ class JsonResource(HttpServer, resource.Resource):
)
except CodeMessageException as e:
if isinstance(e, SynapseError):
logger.error("%s SynapseError: %s - %s", request, e.code,
e.msg)
logger.info("%s SynapseError: %s - %s", request, e.code, e.msg)
else:
logger.exception(e)
self._send_response(

View File

@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor
import logging
@ -96,6 +97,7 @@ class Notifier(object):
listening to the room, and any listeners for the users in the
`extra_users` param.
"""
yield run_on_reactor()
room_id = event.room_id
room_source = self.event_sources.sources["room"]
@ -143,6 +145,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms.
"""
yield run_on_reactor()
presence_source = self.event_sources.sources["presence"]
listeners = set()
@ -211,6 +214,7 @@ class Notifier(object):
timeout,
deferred,
)
def _timeout_listener():
# TODO (erikj): We should probably set to_token to the current
# max rather than reusing from_token.

View File

@ -26,7 +26,6 @@ import logging
logger = logging.getLogger(__name__)
class EventStreamRestServlet(RestServlet):
PATTERN = client_path_pattern("/events$")

View File

@ -117,8 +117,6 @@ class PresenceListRestServlet(RestServlet):
logger.exception("JSON parse error")
raise SynapseError(400, "Unable to parse content")
deferreds = []
if "invite" in content:
for u in content["invite"]:
if not isinstance(u, basestring):
@ -126,8 +124,9 @@ class PresenceListRestServlet(RestServlet):
if len(u) == 0:
continue
invited_user = self.hs.parse_userid(u)
deferreds.append(self.handlers.presence_handler.send_invite(
observer_user=user, observed_user=invited_user))
yield self.handlers.presence_handler.send_invite(
observer_user=user, observed_user=invited_user
)
if "drop" in content:
for u in content["drop"]:
@ -136,10 +135,9 @@ class PresenceListRestServlet(RestServlet):
if len(u) == 0:
continue
dropped_user = self.hs.parse_userid(u)
deferreds.append(self.handlers.presence_handler.drop(
observer_user=user, observed_user=dropped_user))
yield defer.DeferredList(deferreds)
yield self.handlers.presence_handler.drop(
observer_user=user, observed_user=dropped_user
)
defer.returnValue((200, {}))

View File

@ -508,7 +508,7 @@ def prepare_database(db_conn):
"new for the server to understand"
)
elif user_version < SCHEMA_VERSION:
logging.info(
logger.info(
"Upgrading database from version %d",
user_version
)

View File

@ -57,7 +57,7 @@ class LoggingTransaction(object):
if args and args[0]:
values = args[0]
sql_logger.debug(
"[SQL values] {%s} " + ", ".join(("<%s>",) * len(values)),
"[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
self.name,
*values
)
@ -91,6 +91,7 @@ class SQLBaseStore(object):
def runInteraction(self, desc, func, *args, **kwargs):
"""Wraps the .runInteraction() method on the underlying db_pool."""
current_context = LoggingContext.current_context()
def inner_func(txn, *args, **kwargs):
with LoggingContext("runInteraction") as context:
current_context.copy_to(context)
@ -115,7 +116,6 @@ class SQLBaseStore(object):
"[TXN END] {%s} %f",
name, end - start
)
with PreserveLoggingContext():
result = yield self._db_pool.runInteraction(
inner_func, *args, **kwargs

View File

@ -75,7 +75,9 @@ class RegistrationStore(SQLBaseStore):
"VALUES (?,?,?)",
[user_id, password_hash, now])
except IntegrityError:
raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
raise StoreError(
400, "User ID already taken.", errcode=Codes.USER_IN_USE
)
# it's possible for this to get a conflict, but only for a single user
# since tokens are namespaced based on their user ID

View File

@ -27,7 +27,9 @@ import logging
logger = logging.getLogger(__name__)
OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level", "redact_level"))
OpsLevel = collections.namedtuple("OpsLevel", (
"ban_level", "kick_level", "redact_level")
)
class RoomStore(SQLBaseStore):

View File

@ -177,8 +177,8 @@ class RoomMemberStore(SQLBaseStore):
return self._get_members_query(clause, vals)
def _get_members_query(self, where_clause, where_values):
return self._db_pool.runInteraction(
self._get_members_query_txn,
return self.runInteraction(
"get_members_query", self._get_members_query_txn,
where_clause, where_values
)

View File

@ -237,7 +237,8 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT *, (%(redacted)s) AS redacted FROM events"
" WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
"ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
) % {
"redacted": del_sql,
"bounds": bounds,

View File

@ -37,6 +37,7 @@ class Clock(object):
def call_later(self, delay, callback):
current_context = LoggingContext.current_context()
def wrapped_callback():
LoggingContext.thread_local.current_context = current_context
callback()

View File

@ -18,6 +18,7 @@ from twisted.internet import defer, reactor
from .logcontext import PreserveLoggingContext
@defer.inlineCallbacks
def sleep(seconds):
d = defer.Deferred()
@ -25,6 +26,7 @@ def sleep(seconds):
with PreserveLoggingContext():
yield d
def run_on_reactor():
""" This will cause the rest of the function to be invoked upon the next
iteration of the main loop

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.logcontext import PreserveLoggingContext
from twisted.internet import defer
import logging
@ -91,6 +93,7 @@ class Signal(object):
Each observer callable may return a Deferred."""
self.observers.append(observer)
@defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
@ -98,6 +101,7 @@ class Signal(object):
Returns a Deferred that will complete when all the observers have
completed."""
with PreserveLoggingContext():
deferreds = []
for observer in self.observers:
d = defer.maybeDeferred(observer, *args, **kwargs)
@ -114,6 +118,7 @@ class Signal(object):
raise failure
deferreds.append(d.addErrback(eb))
return defer.DeferredList(
result = yield defer.DeferredList(
deferreds, fireOnOneErrback=not self.suppress_failures
)
defer.returnValue(result)

View File

@ -1,6 +1,8 @@
import threading
import logging
logger = logging.getLogger(__name__)
class LoggingContext(object):
"""Additional context for log formatting. Contexts are scoped within a
@ -53,7 +55,7 @@ class LoggingContext(object):
None to avoid suppressing any exeptions that were thrown.
"""
if self.thread_local.current_context is not self:
logging.error(
logger.error(
"Current logging context %s is not the expected context %s",
self.thread_local.current_context,
self