Fix threadsafety in ThreadedMemoryReactorClock (#8497)

This could, very occasionally, cause:

```
tests.test_visibility.FilterEventsForServerTestCase.test_large_room
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/src/tests/rest/media/v1/test_media_storage.py", line 86, in test_ensure_media_is_in_local_cache
    self.wait_on_thread(x)
  File "/src/tests/unittest.py", line 296, in wait_on_thread
    self.reactor.advance(0.01)
  File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 826, in advance
    self._sortCalls()
  File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 787, in _sortCalls
    self.calls.sort(key=lambda a: a.getTime())
builtins.ValueError: list modified during sort

tests.rest.media.v1.test_media_storage.MediaStorageTests.test_ensure_media_is_in_local_cache
```
This commit is contained in:
Richard van der Hoff 2020-10-09 17:22:25 +01:00 committed by GitHub
parent ca2db5dd0c
commit 9789b1fba5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 4 deletions

1
changelog.d/8497.misc Normal file
View File

@ -0,0 +1 @@
Fix a threadsafety bug in unit tests.

View File

@ -1,8 +1,11 @@
import json import json
import logging import logging
from collections import deque
from io import SEEK_END, BytesIO from io import SEEK_END, BytesIO
from typing import Callable
import attr import attr
from typing_extensions import Deque
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import address, threads, udp from twisted.internet import address, threads, udp
@ -251,6 +254,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
self._tcp_callbacks = {} self._tcp_callbacks = {}
self._udp = [] self._udp = []
lookups = self.lookups = {} lookups = self.lookups = {}
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]()
@implementer(IResolverSimple) @implementer(IResolverSimple)
class FakeResolver: class FakeResolver:
@ -272,10 +276,10 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
""" """
Make the callback fire in the next reactor iteration. Make the callback fire in the next reactor iteration.
""" """
d = Deferred() cb = lambda: callback(*args, **kwargs)
d.addCallback(lambda x: callback(*args, **kwargs)) # it's not safe to call callLater() here, so we append the callback to a
self.callLater(0, d.callback, True) # separate queue.
return d self._thread_callbacks.append(cb)
def getThreadPool(self): def getThreadPool(self):
return self.threadpool return self.threadpool
@ -303,6 +307,30 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
return conn return conn
def advance(self, amount):
# first advance our reactor's time, and run any "callLater" callbacks that
# makes ready
super().advance(amount)
# now run any "callFromThread" callbacks
while True:
try:
callback = self._thread_callbacks.popleft()
except IndexError:
break
callback()
# check for more "callLater" callbacks added by the thread callback
# This isn't required in a regular reactor, but it ends up meaning that
# our database queries can complete in a single call to `advance` [1] which
# simplifies tests.
#
# [1]: we replace the threadpool backing the db connection pool with a
# mock ThreadPool which doesn't really use threads; but we still use
# reactor.callFromThread to feed results back from the db functions to the
# main thread.
super().advance(0)
class ThreadPool: class ThreadPool:
""" """