Remove slaved id tracker (#14376)

This matches the multi instance writer ID generator class which can
both handle advancing the current token over replication and by calling
the database.
This commit is contained in:
Nick Mills-Barrett 2022-11-14 17:31:36 +00:00 committed by GitHub
parent e226513c0f
commit 36097e88c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 74 additions and 176 deletions

View file

@ -186,11 +186,13 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
column: str,
extra_tables: Iterable[Tuple[str, str]] = (),
step: int = 1,
is_writer: bool = True,
) -> None:
assert step != 0
self._lock = threading.Lock()
self._step: int = step
self._current: int = _load_current_id(db_conn, table, column, step)
self._is_writer = is_writer
for table, column in extra_tables:
self._current = (max if step > 0 else min)(
self._current, _load_current_id(db_conn, table, column, step)
@ -204,9 +206,11 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
def advance(self, instance_name: str, new_id: int) -> None:
# `StreamIdGenerator` should only be used when there is a single writer,
# so replication should never happen.
raise Exception("Replication is not supported by StreamIdGenerator")
# Advance should never be called on a writer instance, only over replication
if self._is_writer:
raise Exception("Replication is not supported by writer StreamIdGenerator")
self._current = (max if self._step > 0 else min)(self._current, new_id)
def get_next(self) -> AsyncContextManager[int]:
with self._lock:
@ -249,6 +253,9 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
return _AsyncCtxManagerWrapper(manager())
def get_current_token(self) -> int:
if self._is_writer:
return self._current
with self._lock:
if self._unfinished_ids:
return next(iter(self._unfinished_ids)) - self._step