Fix wait_for_stream_position for multiple waiters. (#8196)

This fixes a bug where having multiple callers waiting on the same
stream and position will cause it to try and compare two deferreds,
which fails (due to the sorted list having an entry of `Tuple[int,
Deferred]`).
This commit is contained in:
Erik Johnston 2020-08-28 17:12:45 +01:00 committed by GitHub
parent d58fda99ff
commit 3b4556cf87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 6 additions and 5 deletions

1
changelog.d/8196.misc Normal file
View File

@ -0,0 +1 @@
Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.

View File

@ -66,7 +66,9 @@ REQUIREMENTS = [
"msgpack>=0.5.2", "msgpack>=0.5.2",
"phonenumbers>=8.2.0", "phonenumbers>=8.2.0",
"prometheus_client>=0.0.18,<0.9.0", "prometheus_client>=0.0.18,<0.9.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0 # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
# is out in November.)
"attrs>=19.1.0", "attrs>=19.1.0",
"netaddr>=0.7.18", "netaddr>=0.7.18",
"Jinja2>=2.9", "Jinja2>=2.9",

View File

@ -14,7 +14,6 @@
# limitations under the License. # limitations under the License.
"""A replication client for use by synapse workers. """A replication client for use by synapse workers.
""" """
import heapq
import logging import logging
from typing import TYPE_CHECKING, Dict, List, Tuple from typing import TYPE_CHECKING, Dict, List, Tuple
@ -219,9 +218,8 @@ class ReplicationDataHandler:
waiting_list = self._streams_to_waiters.setdefault(stream_name, []) waiting_list = self._streams_to_waiters.setdefault(stream_name, [])
# We insert into the list using heapq as it is more efficient than waiting_list.append((position, deferred))
# pushing then resorting each time. waiting_list.sort(key=lambda t: t[0])
heapq.heappush(waiting_list, (position, deferred))
# We measure here to get in flight counts and average waiting time. # We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"): with Measure(self._clock, "repl.wait_for_stream_position"):