mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-04-27 18:06:08 -04:00
Merge pull request #2934 from matrix-org/erikj/cache_fix
Fix bug with delayed cache invalidation stream
This commit is contained in:
commit
42174946f8
@ -48,16 +48,16 @@ class LoggingTransaction(object):
|
|||||||
passed to the constructor. Adds logging and metrics to the .execute()
|
passed to the constructor. Adds logging and metrics to the .execute()
|
||||||
method."""
|
method."""
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
"txn", "name", "database_engine", "after_callbacks", "final_callbacks",
|
"txn", "name", "database_engine", "after_callbacks", "exception_callbacks",
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, txn, name, database_engine, after_callbacks,
|
def __init__(self, txn, name, database_engine, after_callbacks,
|
||||||
final_callbacks):
|
exception_callbacks):
|
||||||
object.__setattr__(self, "txn", txn)
|
object.__setattr__(self, "txn", txn)
|
||||||
object.__setattr__(self, "name", name)
|
object.__setattr__(self, "name", name)
|
||||||
object.__setattr__(self, "database_engine", database_engine)
|
object.__setattr__(self, "database_engine", database_engine)
|
||||||
object.__setattr__(self, "after_callbacks", after_callbacks)
|
object.__setattr__(self, "after_callbacks", after_callbacks)
|
||||||
object.__setattr__(self, "final_callbacks", final_callbacks)
|
object.__setattr__(self, "exception_callbacks", exception_callbacks)
|
||||||
|
|
||||||
def call_after(self, callback, *args, **kwargs):
|
def call_after(self, callback, *args, **kwargs):
|
||||||
"""Call the given callback on the main twisted thread after the
|
"""Call the given callback on the main twisted thread after the
|
||||||
@ -66,8 +66,8 @@ class LoggingTransaction(object):
|
|||||||
"""
|
"""
|
||||||
self.after_callbacks.append((callback, args, kwargs))
|
self.after_callbacks.append((callback, args, kwargs))
|
||||||
|
|
||||||
def call_finally(self, callback, *args, **kwargs):
|
def call_on_exception(self, callback, *args, **kwargs):
|
||||||
self.final_callbacks.append((callback, args, kwargs))
|
self.exception_callbacks.append((callback, args, kwargs))
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
return getattr(self.txn, name)
|
return getattr(self.txn, name)
|
||||||
@ -215,7 +215,7 @@ class SQLBaseStore(object):
|
|||||||
|
|
||||||
self._clock.looping_call(loop, 10000)
|
self._clock.looping_call(loop, 10000)
|
||||||
|
|
||||||
def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
|
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
|
||||||
logging_context, func, *args, **kwargs):
|
logging_context, func, *args, **kwargs):
|
||||||
start = time.time() * 1000
|
start = time.time() * 1000
|
||||||
txn_id = self._TXN_ID
|
txn_id = self._TXN_ID
|
||||||
@ -236,7 +236,7 @@ class SQLBaseStore(object):
|
|||||||
txn = conn.cursor()
|
txn = conn.cursor()
|
||||||
txn = LoggingTransaction(
|
txn = LoggingTransaction(
|
||||||
txn, name, self.database_engine, after_callbacks,
|
txn, name, self.database_engine, after_callbacks,
|
||||||
final_callbacks,
|
exception_callbacks,
|
||||||
)
|
)
|
||||||
r = func(txn, *args, **kwargs)
|
r = func(txn, *args, **kwargs)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
@ -308,11 +308,11 @@ class SQLBaseStore(object):
|
|||||||
current_context = LoggingContext.current_context()
|
current_context = LoggingContext.current_context()
|
||||||
|
|
||||||
after_callbacks = []
|
after_callbacks = []
|
||||||
final_callbacks = []
|
exception_callbacks = []
|
||||||
|
|
||||||
def inner_func(conn, *args, **kwargs):
|
def inner_func(conn, *args, **kwargs):
|
||||||
return self._new_transaction(
|
return self._new_transaction(
|
||||||
conn, desc, after_callbacks, final_callbacks, current_context,
|
conn, desc, after_callbacks, exception_callbacks, current_context,
|
||||||
func, *args, **kwargs
|
func, *args, **kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -321,9 +321,10 @@ class SQLBaseStore(object):
|
|||||||
|
|
||||||
for after_callback, after_args, after_kwargs in after_callbacks:
|
for after_callback, after_args, after_kwargs in after_callbacks:
|
||||||
after_callback(*after_args, **after_kwargs)
|
after_callback(*after_args, **after_kwargs)
|
||||||
finally:
|
except: # noqa: E722, as we reraise the exception this is fine.
|
||||||
for after_callback, after_args, after_kwargs in final_callbacks:
|
for after_callback, after_args, after_kwargs in exception_callbacks:
|
||||||
after_callback(*after_args, **after_kwargs)
|
after_callback(*after_args, **after_kwargs)
|
||||||
|
raise
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
@ -1000,7 +1001,8 @@ class SQLBaseStore(object):
|
|||||||
# __exit__ called after the transaction finishes.
|
# __exit__ called after the transaction finishes.
|
||||||
ctx = self._cache_id_gen.get_next()
|
ctx = self._cache_id_gen.get_next()
|
||||||
stream_id = ctx.__enter__()
|
stream_id = ctx.__enter__()
|
||||||
txn.call_finally(ctx.__exit__, None, None, None)
|
txn.call_on_exception(ctx.__exit__, None, None, None)
|
||||||
|
txn.call_after(ctx.__exit__, None, None, None)
|
||||||
txn.call_after(self.hs.get_notifier().on_new_replication_data)
|
txn.call_after(self.hs.get_notifier().on_new_replication_data)
|
||||||
|
|
||||||
self._simple_insert_txn(
|
self._simple_insert_txn(
|
||||||
|
@ -76,7 +76,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||||||
name="_find_stream_orderings_for_times_txn",
|
name="_find_stream_orderings_for_times_txn",
|
||||||
database_engine=self.database_engine,
|
database_engine=self.database_engine,
|
||||||
after_callbacks=[],
|
after_callbacks=[],
|
||||||
final_callbacks=[],
|
exception_callbacks=[],
|
||||||
)
|
)
|
||||||
self._find_stream_orderings_for_times_txn(cur)
|
self._find_stream_orderings_for_times_txn(cur)
|
||||||
cur.close()
|
cur.close()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user