diff --git a/changelog.d/13585.bugfix b/changelog.d/13585.bugfix new file mode 100644 index 000000000..664b986c5 --- /dev/null +++ b/changelog.d/13585.bugfix @@ -0,0 +1 @@ +Fix loading the current stream position behind the actual position. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 3c13859fa..2dfe4c0b6 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -460,8 +460,17 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # Cast safety: this corresponds to the types returned by the query above. rows.extend(cast(Iterable[Tuple[str, int]], cur)) - # Sort so that we handle rows in order for each instance. - rows.sort() + # Sort by stream_id (ascending, lowest -> highest) so that we handle + # rows in order for each instance because we don't want to overwrite + # the current_position of an instance to a lower stream ID than + # we're actually at. + def sort_by_stream_id_key_func(row: Tuple[str, int]) -> int: + (instance, stream_id) = row + # If `stream_id` is ever `None`, we will see a `TypeError: '<' + # not supported between instances of 'NoneType' and 'X'` error. + return stream_id + + rows.sort(key=sort_by_stream_id_key_func) with self._lock: for (