Prefill state caches

This commit is contained in:
Erik Johnston 2017-05-02 10:40:31 +01:00
parent 29ded770b1
commit bfbc907cec
3 changed files with 20 additions and 6 deletions

View File

@ -60,12 +60,12 @@ class LoggingTransaction(object):
object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "database_engine", database_engine)
object.__setattr__(self, "after_callbacks", after_callbacks) object.__setattr__(self, "after_callbacks", after_callbacks)
def call_after(self, callback, *args): def call_after(self, callback, *args, **kwargs):
"""Call the given callback on the main twisted thread after the """Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the transaction has finished. Used to invalidate the caches on the
correct thread. correct thread.
""" """
self.after_callbacks.append((callback, args)) self.after_callbacks.append((callback, args, kwargs))
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self.txn, name) return getattr(self.txn, name)
@ -319,8 +319,8 @@ class SQLBaseStore(object):
inner_func, *args, **kwargs inner_func, *args, **kwargs
) )
finally: finally:
for after_callback, after_args in after_callbacks: for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args) after_callback(*after_args, **after_kwargs)
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -374,6 +374,7 @@ class EventsStore(SQLBaseStore):
new_forward_extremeties=new_forward_extremeties, new_forward_extremeties=new_forward_extremeties,
) )
persist_event_counter.inc_by(len(chunk)) persist_event_counter.inc_by(len(chunk))
for event, context in chunk: for event, context in chunk:
if context.app_service: if context.app_service:
origin_type = "local" origin_type = "local"
@ -387,6 +388,11 @@ class EventsStore(SQLBaseStore):
event_counter.inc(event.type, origin_type, origin_entity) event_counter.inc(event.type, origin_type, origin_entity)
for room_id, (_, _, new_state) in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(
(room_id, ), new_state
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to """Calculates the new forward extremeties for a room given events to
@ -545,7 +551,7 @@ class EventsStore(SQLBaseStore):
if ev_id in events_to_insert if ev_id in events_to_insert
} }
defer.returnValue((to_delete, to_insert)) defer.returnValue((to_delete, to_insert, current_state))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True, def get_event(self, event_id, check_redacted=True,
@ -698,7 +704,7 @@ class EventsStore(SQLBaseStore):
def _update_current_state_txn(self, txn, state_delta_by_room): def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems(): for room_id, current_state_tuple in state_delta_by_room.iteritems():
to_delete, to_insert = current_state_tuple to_delete, to_insert, _ = current_state_tuple
txn.executemany( txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?", "DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()], [(ev_id,) for ev_id in to_delete.itervalues()],

View File

@ -227,6 +227,14 @@ class StateStore(SQLBaseStore):
], ],
) )
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=context.state_group,
value=context.current_state_ids,
full=True,
)
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
table="event_to_state_groups", table="event_to_state_groups",