Fix occasional "Re-starting finished log context" from keyring (#8398)

* Fix test_verify_json_objects_for_server_awaits_previous_requests

It turns out that this wasn't really testing what it thought it was testing
(in particular, `check_context` was turning failures into success, which was
making the tests pass even though it wasn't clear they should have been.

It was also somewhat overcomplex - we can test what it was trying to test
without mocking out perspectives servers.

* Fix warnings about finished logcontexts in the keyring

We need to make sure that we finish the key fetching magic before we run the
verifying code, to ensure that we don't mess up our logcontexts.
This commit is contained in:
Richard van der Hoff 2020-09-25 12:29:54 +01:00 committed by GitHub
parent abd04b6af0
commit fec6f9ac17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 90 deletions

1
changelog.d/8398.bugfix Normal file
View File

@ -0,0 +1 @@
Fix "Re-starting finished log context" warning when receiving an event we already had over federation.

View File

@ -42,7 +42,6 @@ from synapse.api.errors import (
) )
from synapse.logging.context import ( from synapse.logging.context import (
PreserveLoggingContext, PreserveLoggingContext,
current_context,
make_deferred_yieldable, make_deferred_yieldable,
preserve_fn, preserve_fn,
run_in_background, run_in_background,
@ -233,8 +232,6 @@ class Keyring:
""" """
try: try:
ctx = current_context()
# map from server name to a set of outstanding request ids # map from server name to a set of outstanding request ids
server_to_request_ids = {} server_to_request_ids = {}
@ -265,12 +262,8 @@ class Keyring:
# if there are no more requests for this server, we can drop the lock. # if there are no more requests for this server, we can drop the lock.
if not server_requests: if not server_requests:
with PreserveLoggingContext(ctx): logger.debug("Releasing key lookup lock on %s", server_name)
logger.debug("Releasing key lookup lock on %s", server_name) drop_server_lock(server_name)
# ... but not immediately, as that can cause stack explosions if
# we get a long queue of lookups.
self.clock.call_later(0, drop_server_lock, server_name)
return res return res
@ -335,20 +328,32 @@ class Keyring:
) )
# look for any requests which weren't satisfied # look for any requests which weren't satisfied
with PreserveLoggingContext(): while remaining_requests:
for verify_request in remaining_requests: verify_request = remaining_requests.pop()
verify_request.key_ready.errback( rq_str = (
SynapseError( "VerifyJsonRequest(server=%s, key_ids=%s, min_valid=%i)"
401, % (
"No key for %s with ids in %s (min_validity %i)" verify_request.server_name,
% ( verify_request.key_ids,
verify_request.server_name, verify_request.minimum_valid_until_ts,
verify_request.key_ids,
verify_request.minimum_valid_until_ts,
),
Codes.UNAUTHORIZED,
)
) )
)
# If we run the errback immediately, it may cancel our
# loggingcontext while we are still in it, so instead we
# schedule it for the next time round the reactor.
#
# (this also ensures that we don't get a stack overflow if we
# has a massive queue of lookups waiting for this server).
self.clock.call_later(
0,
verify_request.key_ready.errback,
SynapseError(
401,
"Failed to find any key to satisfy %s" % (rq_str,),
Codes.UNAUTHORIZED,
),
)
except Exception as err: except Exception as err:
# we don't really expect to get here, because any errors should already # we don't really expect to get here, because any errors should already
# have been caught and logged. But if we do, let's log the error and make # have been caught and logged. But if we do, let's log the error and make
@ -410,10 +415,23 @@ class Keyring:
# key was not valid at this point # key was not valid at this point
continue continue
with PreserveLoggingContext(): # we have a valid key for this request. If we run the callback
verify_request.key_ready.callback( # immediately, it may cancel our loggingcontext while we are still in
(server_name, key_id, fetch_key_result.verify_key) # it, so instead we schedule it for the next time round the reactor.
) #
# (this also ensures that we don't get a stack overflow if we had
# a massive queue of lookups waiting for this server).
logger.debug(
"Found key %s:%s for %s",
server_name,
key_id,
verify_request.request_name,
)
self.clock.call_later(
0,
verify_request.key_ready.callback,
(server_name, key_id, fetch_key_result.verify_key),
)
completed.append(verify_request) completed.append(verify_request)
break break

View File

@ -23,6 +23,7 @@ from nacl.signing import SigningKey
from signedjson.key import encode_verify_key_base64, get_verify_key from signedjson.key import encode_verify_key_base64, get_verify_key
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import Deferred, ensureDeferred
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.crypto import keyring from synapse.crypto import keyring
@ -33,7 +34,6 @@ from synapse.crypto.keyring import (
) )
from synapse.logging.context import ( from synapse.logging.context import (
LoggingContext, LoggingContext,
PreserveLoggingContext,
current_context, current_context,
make_deferred_yieldable, make_deferred_yieldable,
) )
@ -68,54 +68,40 @@ class MockPerspectiveServer:
class KeyringTestCase(unittest.HomeserverTestCase): class KeyringTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock): def check_context(self, val, expected):
self.mock_perspective_server = MockPerspectiveServer()
self.http_client = Mock()
config = self.default_config()
config["trusted_key_servers"] = [
{
"server_name": self.mock_perspective_server.server_name,
"verify_keys": self.mock_perspective_server.get_verify_keys(),
}
]
return self.setup_test_homeserver(
handlers=None, http_client=self.http_client, config=config
)
def check_context(self, _, expected):
self.assertEquals(getattr(current_context(), "request", None), expected) self.assertEquals(getattr(current_context(), "request", None), expected)
return val
def test_verify_json_objects_for_server_awaits_previous_requests(self): def test_verify_json_objects_for_server_awaits_previous_requests(self):
key1 = signedjson.key.generate_signing_key(1) mock_fetcher = keyring.KeyFetcher()
mock_fetcher.get_keys = Mock()
kr = keyring.Keyring(self.hs, key_fetchers=(mock_fetcher,))
kr = keyring.Keyring(self.hs) # a signed object that we are going to try to validate
key1 = signedjson.key.generate_signing_key(1)
json1 = {} json1 = {}
signedjson.sign.sign_json(json1, "server10", key1) signedjson.sign.sign_json(json1, "server10", key1)
persp_resp = { # start off a first set of lookups. We make the mock fetcher block until this
"server_keys": [ # deferred completes.
self.mock_perspective_server.get_signed_key( first_lookup_deferred = Deferred()
"server10", signedjson.key.get_verify_key(key1)
)
]
}
persp_deferred = defer.Deferred()
async def get_perspectives(**kwargs): async def first_lookup_fetch(keys_to_fetch):
self.assertEquals(current_context().request, "11") self.assertEquals(current_context().request, "context_11")
with PreserveLoggingContext(): self.assertEqual(keys_to_fetch, {"server10": {get_key_id(key1): 0}})
await persp_deferred
return persp_resp
self.http_client.post_json.side_effect = get_perspectives await make_deferred_yieldable(first_lookup_deferred)
return {
"server10": {
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)
}
}
# start off a first set of lookups mock_fetcher.get_keys.side_effect = first_lookup_fetch
@defer.inlineCallbacks
def first_lookup(): async def first_lookup():
with LoggingContext("11") as context_11: with LoggingContext("context_11") as context_11:
context_11.request = "11" context_11.request = "context_11"
res_deferreds = kr.verify_json_objects_for_server( res_deferreds = kr.verify_json_objects_for_server(
[("server10", json1, 0, "test10"), ("server11", {}, 0, "test11")] [("server10", json1, 0, "test10"), ("server11", {}, 0, "test11")]
@ -124,7 +110,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
# the unsigned json should be rejected pretty quickly # the unsigned json should be rejected pretty quickly
self.assertTrue(res_deferreds[1].called) self.assertTrue(res_deferreds[1].called)
try: try:
yield res_deferreds[1] await res_deferreds[1]
self.assertFalse("unsigned json didn't cause a failure") self.assertFalse("unsigned json didn't cause a failure")
except SynapseError: except SynapseError:
pass pass
@ -132,45 +118,51 @@ class KeyringTestCase(unittest.HomeserverTestCase):
self.assertFalse(res_deferreds[0].called) self.assertFalse(res_deferreds[0].called)
res_deferreds[0].addBoth(self.check_context, None) res_deferreds[0].addBoth(self.check_context, None)
yield make_deferred_yieldable(res_deferreds[0]) await make_deferred_yieldable(res_deferreds[0])
# let verify_json_objects_for_server finish its work before we kill the d0 = ensureDeferred(first_lookup())
# logcontext
yield self.clock.sleep(0)
d0 = first_lookup() mock_fetcher.get_keys.assert_called_once()
# wait a tick for it to send the request to the perspectives server
# (it first tries the datastore)
self.pump()
self.http_client.post_json.assert_called_once()
# a second request for a server with outstanding requests # a second request for a server with outstanding requests
# should block rather than start a second call # should block rather than start a second call
@defer.inlineCallbacks
def second_lookup(): async def second_lookup_fetch(keys_to_fetch):
with LoggingContext("12") as context_12: self.assertEquals(current_context().request, "context_12")
context_12.request = "12" return {
self.http_client.post_json.reset_mock() "server10": {
self.http_client.post_json.return_value = defer.Deferred() get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)
}
}
mock_fetcher.get_keys.reset_mock()
mock_fetcher.get_keys.side_effect = second_lookup_fetch
second_lookup_state = [0]
async def second_lookup():
with LoggingContext("context_12") as context_12:
context_12.request = "context_12"
res_deferreds_2 = kr.verify_json_objects_for_server( res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1, 0, "test")] [("server10", json1, 0, "test")]
) )
res_deferreds_2[0].addBoth(self.check_context, None) res_deferreds_2[0].addBoth(self.check_context, None)
yield make_deferred_yieldable(res_deferreds_2[0]) second_lookup_state[0] = 1
await make_deferred_yieldable(res_deferreds_2[0])
second_lookup_state[0] = 2
# let verify_json_objects_for_server finish its work before we kill the d2 = ensureDeferred(second_lookup())
# logcontext
yield self.clock.sleep(0)
d2 = second_lookup()
self.pump() self.pump()
self.http_client.post_json.assert_not_called() # the second request should be pending, but the fetcher should not yet have been
# called
self.assertEqual(second_lookup_state[0], 1)
mock_fetcher.get_keys.assert_not_called()
# complete the first request # complete the first request
persp_deferred.callback(persp_resp) first_lookup_deferred.callback(None)
# and now both verifications should succeed.
self.get_success(d0) self.get_success(d0)
self.get_success(d2) self.get_success(d2)