diff --git a/changelog.d/12353.misc b/changelog.d/12353.misc new file mode 100644 index 000000000..1d681fb0e --- /dev/null +++ b/changelog.d/12353.misc @@ -0,0 +1 @@ +Convert `Linearizer` tests from `inlineCallbacks` to async. diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c4a3917b2..fa132391a 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -13,160 +13,202 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Callable, Hashable, Tuple + from twisted.internet import defer, reactor -from twisted.internet.defer import CancelledError +from twisted.internet.base import ReactorBase +from twisted.internet.defer import CancelledError, Deferred from synapse.logging.context import LoggingContext, current_context -from synapse.util import Clock from synapse.util.async_helpers import Linearizer from tests import unittest class LinearizerTestCase(unittest.TestCase): - @defer.inlineCallbacks - def test_linearizer(self): + def _start_task( + self, linearizer: Linearizer, key: Hashable + ) -> Tuple["Deferred[None]", "Deferred[None]", Callable[[], None]]: + """Starts a task which acquires the linearizer lock, blocks, then completes. + + Args: + linearizer: The `Linearizer`. + key: The `Linearizer` key. + + Returns: + A tuple containing: + * A cancellable `Deferred` for the entire task. + * A `Deferred` that resolves once the task acquires the lock. + * A function that unblocks the task. Must be called by the caller + to allow the task to release the lock and complete. + """ + acquired_d: "Deferred[None]" = Deferred() + unblock_d: "Deferred[None]" = Deferred() + + async def task() -> None: + with await linearizer.queue(key): + acquired_d.callback(None) + await unblock_d + + d = defer.ensureDeferred(task()) + + def unblock() -> None: + unblock_d.callback(None) + # The next task, if it exists, will acquire the lock and require a kick of + # the reactor to advance. + self._pump() + + return d, acquired_d, unblock + + def _pump(self) -> None: + """Pump the reactor to advance `Linearizer`s.""" + assert isinstance(reactor, ReactorBase) + while reactor.getDelayedCalls(): + reactor.runUntilCurrent() + + def test_linearizer(self) -> None: + """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + _, acquired_d2, unblock2 = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) - with cm1: - self.assertFalse(d2.called) + # Once the first task is done, the second task can continue. + unblock1() + self.assertTrue(acquired_d2.called) - with (yield d2): - pass + unblock2() - @defer.inlineCallbacks - def test_linearizer_is_queued(self): + def test_linearizer_is_queued(self) -> None: + """Tests `Linearizer.is_queued`. + + Runs through the same scenario as `test_linearizer`. + """ linearizer = Linearizer() key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) - # Since d1 gets called immediately, "is_queued" should return false. + # Since the first task acquires the lock immediately, "is_queued" should return + # false. self.assertFalse(linearizer.is_queued(key)) - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + _, acquired_d2, unblock2 = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) - # Now d2 is queued up behind successful completion of cm1 + # Now the second task is queued up behind the first. self.assertTrue(linearizer.is_queued(key)) - with cm1: - self.assertFalse(d2.called) + unblock1() - # cm1 still not done, so d2 still queued. - self.assertTrue(linearizer.is_queued(key)) - - # And now d2 is called and nothing is in the queue again + # And now the second task acquires the lock and nothing is in the queue again. + self.assertTrue(acquired_d2.called) self.assertFalse(linearizer.is_queued(key)) - with (yield d2): - self.assertFalse(linearizer.is_queued(key)) - + unblock2() self.assertFalse(linearizer.is_queued(key)) - def test_lots_of_queued_things(self): - # we have one slow thing, and lots of fast things queued up behind it. - # it should *not* explode the stack. + def test_lots_of_queued_things(self) -> None: + """Tests lots of fast things queued up behind a slow thing. + + The stack should *not* explode when the slow thing completes. + """ linearizer = Linearizer() + key = "" - @defer.inlineCallbacks - def func(i, sleep=False): + async def func(i: int) -> None: with LoggingContext("func(%s)" % i) as lc: - with (yield linearizer.queue("")): + with (await linearizer.queue(key)): self.assertEqual(current_context(), lc) - if sleep: - yield Clock(reactor).sleep(0) self.assertEqual(current_context(), lc) - func(0, sleep=True) + _, _, unblock = self._start_task(linearizer, key) for i in range(1, 100): - func(i) + defer.ensureDeferred(func(i)) - return func(1000) + d = defer.ensureDeferred(func(1000)) + unblock() + self.successResultOf(d) - @defer.inlineCallbacks - def test_multiple_entries(self): + def test_multiple_entries(self) -> None: + """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) key = object() - d1 = limiter.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(limiter, key) + self.assertTrue(acquired_d1.called) - d2 = limiter.queue(key) - cm2 = yield d2 + _, acquired_d2, unblock2 = self._start_task(limiter, key) + self.assertTrue(acquired_d2.called) - d3 = limiter.queue(key) - cm3 = yield d3 + _, acquired_d3, unblock3 = self._start_task(limiter, key) + self.assertTrue(acquired_d3.called) - d4 = limiter.queue(key) - self.assertFalse(d4.called) + # These next two tasks have to wait. + _, acquired_d4, unblock4 = self._start_task(limiter, key) + self.assertFalse(acquired_d4.called) - d5 = limiter.queue(key) - self.assertFalse(d5.called) + _, acquired_d5, unblock5 = self._start_task(limiter, key) + self.assertFalse(acquired_d5.called) - with cm1: - self.assertFalse(d4.called) - self.assertFalse(d5.called) + # Once the first task completes, the fourth task can continue. + unblock1() + self.assertTrue(acquired_d4.called) + self.assertFalse(acquired_d5.called) - cm4 = yield d4 - self.assertFalse(d5.called) + # Once the third task completes, the fifth task can continue. + unblock3() + self.assertTrue(acquired_d5.called) - with cm3: - self.assertFalse(d5.called) + # Make all tasks finish. + unblock2() + unblock4() + unblock5() - cm5 = yield d5 + # The next task shouldn't have to wait. + _, acquired_d6, unblock6 = self._start_task(limiter, key) + self.assertTrue(acquired_d6) + unblock6() - with cm2: - pass - - with cm4: - pass - - with cm5: - pass - - d6 = limiter.queue(key) - with (yield d6): - pass - - @defer.inlineCallbacks - def test_cancellation(self): + def test_cancellation(self) -> None: + """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + d1, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + # Create a second task, waiting for the first task. + d2, acquired_d2, _ = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) - d3 = linearizer.queue(key) - self.assertFalse(d3.called) + # Create a third task, waiting for the second task. + d3, acquired_d3, unblock3 = self._start_task(linearizer, key) + self.assertFalse(acquired_d3.called) + # Cancel the waiting second task. d2.cancel() - with cm1: - pass + unblock1() + self.successResultOf(d1) self.assertTrue(d2.called) - try: - yield d2 - self.fail("Expected d2 to raise CancelledError") - except CancelledError: - pass + self.failureResultOf(d2, CancelledError) - with (yield d3): - pass + # The third task should continue running. + self.assertTrue( + acquired_d3.called, + "Third task did not get the lock after the second task was cancelled", + ) + unblock3() + self.successResultOf(d3)