Reduce max time we wait for stream positions (#14881)

Now that we wait for stream positions whenever we do a HTTP replication
hit, we need to be less brutal in the case where we do timeout (as we
have bugs around this).
This commit is contained in:
Erik Johnston 2023-01-20 21:04:33 +00:00 committed by GitHub
parent 65d0386693
commit 0ec12a3753
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 12 deletions

1
changelog.d/14881.misc Normal file
View File

@ -0,0 +1 @@
Reduce max time we wait for stream positions.

View File

@ -352,7 +352,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_name=instance_name, instance_name=instance_name,
stream_name=stream_name, stream_name=stream_name,
position=position, position=position,
raise_on_timeout=False,
) )
return result return result
@ -414,7 +413,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_name=content[_STREAM_POSITION_KEY]["instance_name"], instance_name=content[_STREAM_POSITION_KEY]["instance_name"],
stream_name=stream_name, stream_name=stream_name,
position=position, position=position,
raise_on_timeout=False,
) )
if self.CACHE: if self.CACHE:

View File

@ -59,7 +59,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# How long we allow callers to wait for replication updates before timing out. # How long we allow callers to wait for replication updates before timing out.
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30 _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
class DirectTcpReplicationClientFactory(ReconnectingClientFactory): class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
@ -326,7 +326,6 @@ class ReplicationDataHandler:
instance_name: str, instance_name: str,
stream_name: str, stream_name: str,
position: int, position: int,
raise_on_timeout: bool = True,
) -> None: ) -> None:
"""Wait until this instance has received updates up to and including """Wait until this instance has received updates up to and including
the given stream position. the given stream position.
@ -335,8 +334,6 @@ class ReplicationDataHandler:
instance_name instance_name
stream_name stream_name
position position
raise_on_timeout: Whether to raise an exception if we time out
waiting for the updates, or if we log an error and return.
""" """
if instance_name == self._instance_name: if instance_name == self._instance_name:
@ -365,19 +362,23 @@ class ReplicationDataHandler:
# We measure here to get in flight counts and average waiting time. # We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"): with Measure(self._clock, "repl.wait_for_stream_position"):
logger.info("Waiting for repl stream %r to reach %s", stream_name, position) logger.info(
"Waiting for repl stream %r to reach %s (%s)",
stream_name,
position,
instance_name,
)
try: try:
await make_deferred_yieldable(deferred) await make_deferred_yieldable(deferred)
except defer.TimeoutError: except defer.TimeoutError:
logger.error("Timed out waiting for stream %s", stream_name) logger.error("Timed out waiting for stream %s", stream_name)
if raise_on_timeout:
raise
return return
logger.info( logger.info(
"Finished waiting for repl stream %r to reach %s", stream_name, position "Finished waiting for repl stream %r to reach %s (%s)",
stream_name,
position,
instance_name,
) )
def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: