Fix MultiWriterIdGenerator.current_position. (#8257)

It did not correctly handle IDs finishing being persisted out of
order, resulting in the `current_position` lagging until new IDs are
persisted.
This commit is contained in:
Erik Johnston 2020-09-08 14:26:54 +01:00 committed by GitHub
parent cca03dbec8
commit deedb91732
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 6 deletions

View file

@ -224,6 +224,10 @@ class MultiWriterIdGenerator:
# should be less than the minimum of this set (if not empty).
self._unfinished_ids = set() # type: Set[int]
# Set of local IDs that we've processed that are larger than the current
# position, due to there being smaller unpersisted IDs.
self._finished_ids = set() # type: Set[int]
# We track the max position where we know everything before has been
# persisted. This is done by a) looking at the min across all instances
# and b) noting that if we have seen a run of persisted positions
@ -348,17 +352,44 @@ class MultiWriterIdGenerator:
def _mark_id_as_finished(self, next_id: int):
"""The ID has finished being processed so we should advance the
current poistion if possible.
current position if possible.
"""
with self._lock:
self._unfinished_ids.discard(next_id)
self._finished_ids.add(next_id)
# Figure out if its safe to advance the position by checking there
# aren't any lower allocated IDs that are yet to finish.
if all(c > next_id for c in self._unfinished_ids):
new_cur = None
if self._unfinished_ids:
# If there are unfinished IDs then the new position will be the
# largest finished ID less than the minimum unfinished ID.
finished = set()
min_unfinshed = min(self._unfinished_ids)
for s in self._finished_ids:
if s < min_unfinshed:
if new_cur is None or new_cur < s:
new_cur = s
else:
finished.add(s)
# We clear these out since they're now all less than the new
# position.
self._finished_ids = finished
else:
# There are no unfinished IDs so the new position is simply the
# largest finished one.
new_cur = max(self._finished_ids)
# We clear these out since they're now all less than the new
# position.
self._finished_ids.clear()
if new_cur:
curr = self._current_positions.get(self._instance_name, 0)
self._current_positions[self._instance_name] = max(curr, next_id)
self._current_positions[self._instance_name] = max(curr, new_cur)
self._add_persisted_position(next_id)
@ -428,7 +459,7 @@ class MultiWriterIdGenerator:
# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
min_curr = min(self._current_positions.values())
min_curr = min(self._current_positions.values(), default=0)
self._persisted_upto_position = max(min_curr, self._persisted_upto_position)
# We now iterate through the seen positions, discarding those that are