mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-11-11 17:56:41 -05:00
Merge branch 'matthew/gin_work_mem' into matthew/hit_the_gin
This commit is contained in:
commit
ddb6a79b68
116 changed files with 4226 additions and 1834 deletions
|
|
@ -124,7 +124,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
)
|
||||
|
||||
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
|
||||
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
|
||||
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
||||
|
|
|
|||
|
|
@ -291,33 +291,33 @@ class SQLBaseStore(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def runInteraction(self, desc, func, *args, **kwargs):
|
||||
"""Wraps the .runInteraction() method on the underlying db_pool."""
|
||||
current_context = LoggingContext.current_context()
|
||||
"""Starts a transaction on the database and runs a given function
|
||||
|
||||
start_time = time.time() * 1000
|
||||
Arguments:
|
||||
desc (str): description of the transaction, for logging and metrics
|
||||
func (func): callback function, which will be called with a
|
||||
database transaction (twisted.enterprise.adbapi.Transaction) as
|
||||
its first argument, followed by `args` and `kwargs`.
|
||||
|
||||
args (list): positional args to pass to `func`
|
||||
kwargs (dict): named args to pass to `func`
|
||||
|
||||
Returns:
|
||||
Deferred: The result of func
|
||||
"""
|
||||
current_context = LoggingContext.current_context()
|
||||
|
||||
after_callbacks = []
|
||||
final_callbacks = []
|
||||
|
||||
def inner_func(conn, *args, **kwargs):
|
||||
with LoggingContext("runInteraction") as context:
|
||||
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
|
||||
|
||||
if self.database_engine.is_connection_closed(conn):
|
||||
logger.debug("Reconnecting closed database connection")
|
||||
conn.reconnect()
|
||||
|
||||
current_context.copy_to(context)
|
||||
return self._new_transaction(
|
||||
conn, desc, after_callbacks, final_callbacks, current_context,
|
||||
func, *args, **kwargs
|
||||
)
|
||||
return self._new_transaction(
|
||||
conn, desc, after_callbacks, final_callbacks, current_context,
|
||||
func, *args, **kwargs
|
||||
)
|
||||
|
||||
try:
|
||||
with PreserveLoggingContext():
|
||||
result = yield self._db_pool.runWithConnection(
|
||||
inner_func, *args, **kwargs
|
||||
)
|
||||
result = yield self.runWithConnection(inner_func, *args, **kwargs)
|
||||
|
||||
for after_callback, after_args, after_kwargs in after_callbacks:
|
||||
after_callback(*after_args, **after_kwargs)
|
||||
|
|
@ -329,14 +329,27 @@ class SQLBaseStore(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def runWithConnection(self, func, *args, **kwargs):
|
||||
"""Wraps the .runInteraction() method on the underlying db_pool."""
|
||||
"""Wraps the .runWithConnection() method on the underlying db_pool.
|
||||
|
||||
Arguments:
|
||||
func (func): callback function, which will be called with a
|
||||
database connection (twisted.enterprise.adbapi.Connection) as
|
||||
its first argument, followed by `args` and `kwargs`.
|
||||
args (list): positional args to pass to `func`
|
||||
kwargs (dict): named args to pass to `func`
|
||||
|
||||
Returns:
|
||||
Deferred: The result of func
|
||||
"""
|
||||
current_context = LoggingContext.current_context()
|
||||
|
||||
start_time = time.time() * 1000
|
||||
|
||||
def inner_func(conn, *args, **kwargs):
|
||||
with LoggingContext("runWithConnection") as context:
|
||||
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
|
||||
sched_duration_ms = time.time() * 1000 - start_time
|
||||
sql_scheduling_timer.inc_by(sched_duration_ms)
|
||||
current_context.add_database_scheduled(sched_duration_ms)
|
||||
|
||||
if self.database_engine.is_connection_closed(conn):
|
||||
logger.debug("Reconnecting closed database connection")
|
||||
|
|
|
|||
|
|
@ -62,3 +62,9 @@ class PostgresEngine(object):
|
|||
|
||||
def lock_table(self, txn, table):
|
||||
txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
|
||||
|
||||
def get_next_state_group_id(self, txn):
|
||||
"""Returns an int that can be used as a new state_group ID
|
||||
"""
|
||||
txn.execute("SELECT nextval('state_group_id_seq')")
|
||||
return txn.fetchone()[0]
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
from synapse.storage.prepare_database import prepare_database
|
||||
|
||||
import struct
|
||||
import threading
|
||||
|
||||
|
||||
class Sqlite3Engine(object):
|
||||
|
|
@ -24,6 +25,11 @@ class Sqlite3Engine(object):
|
|||
def __init__(self, database_module, database_config):
|
||||
self.module = database_module
|
||||
|
||||
# The current max state_group, or None if we haven't looked
|
||||
# in the DB yet.
|
||||
self._current_state_group_id = None
|
||||
self._current_state_group_id_lock = threading.Lock()
|
||||
|
||||
def check_database(self, txn):
|
||||
pass
|
||||
|
||||
|
|
@ -43,6 +49,19 @@ class Sqlite3Engine(object):
|
|||
def lock_table(self, txn, table):
|
||||
return
|
||||
|
||||
def get_next_state_group_id(self, txn):
|
||||
"""Returns an int that can be used as a new state_group ID
|
||||
"""
|
||||
# We do application locking here since if we're using sqlite then
|
||||
# we are a single process synapse.
|
||||
with self._current_state_group_id_lock:
|
||||
if self._current_state_group_id is None:
|
||||
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
|
||||
self._current_state_group_id = txn.fetchone()[0]
|
||||
|
||||
self._current_state_group_id += 1
|
||||
return self._current_state_group_id
|
||||
|
||||
|
||||
# Following functions taken from: https://github.com/coleifer/peewee
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from synapse.util.logutils import log_function
|
|||
from synapse.util.metrics import Measure
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.state import resolve_events
|
||||
from synapse.state import resolve_events_with_factory
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.types import get_domain_from_id
|
||||
|
||||
|
|
@ -110,7 +110,7 @@ class _EventPeristenceQueue(object):
|
|||
end_item.events_and_contexts.extend(events_and_contexts)
|
||||
return end_item.deferred.observe()
|
||||
|
||||
deferred = ObservableDeferred(defer.Deferred())
|
||||
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
|
||||
|
||||
queue.append(self._EventPersistQueueItem(
|
||||
events_and_contexts=events_and_contexts,
|
||||
|
|
@ -146,18 +146,25 @@ class _EventPeristenceQueue(object):
|
|||
try:
|
||||
queue = self._get_drainining_queue(room_id)
|
||||
for item in queue:
|
||||
# handle_queue_loop runs in the sentinel logcontext, so
|
||||
# there is no need to preserve_fn when running the
|
||||
# callbacks on the deferred.
|
||||
try:
|
||||
ret = yield per_item_callback(item)
|
||||
item.deferred.callback(ret)
|
||||
except Exception as e:
|
||||
item.deferred.errback(e)
|
||||
except Exception:
|
||||
item.deferred.errback()
|
||||
finally:
|
||||
queue = self._event_persist_queues.pop(room_id, None)
|
||||
if queue:
|
||||
self._event_persist_queues[room_id] = queue
|
||||
self._currently_persisting_rooms.discard(room_id)
|
||||
|
||||
preserve_fn(handle_queue_loop)()
|
||||
# set handle_queue_loop off on the background. We don't want to
|
||||
# attribute work done in it to the current request, so we drop the
|
||||
# logcontext altogether.
|
||||
with PreserveLoggingContext():
|
||||
handle_queue_loop()
|
||||
|
||||
def _get_drainining_queue(self, room_id):
|
||||
queue = self._event_persist_queues.setdefault(room_id, deque())
|
||||
|
|
@ -335,8 +342,20 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
# NB: Assumes that we are only persisting events for one room
|
||||
# at a time.
|
||||
|
||||
# map room_id->list[event_ids] giving the new forward
|
||||
# extremities in each room
|
||||
new_forward_extremeties = {}
|
||||
|
||||
# map room_id->(type,state_key)->event_id tracking the full
|
||||
# state in each room after adding these events
|
||||
current_state_for_room = {}
|
||||
|
||||
# map room_id->(to_delete, to_insert) where each entry is
|
||||
# a map (type,key)->event_id giving the state delta in each
|
||||
# room
|
||||
state_delta_for_room = {}
|
||||
|
||||
if not backfilled:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
# Work out the new "current state" for each room.
|
||||
|
|
@ -379,11 +398,19 @@ class EventsStore(SQLBaseStore):
|
|||
if all_single_prev_not_state:
|
||||
continue
|
||||
|
||||
state = yield self._calculate_state_delta(
|
||||
room_id, ev_ctx_rm, new_latest_event_ids
|
||||
logger.info(
|
||||
"Calculating state delta for room %s", room_id,
|
||||
)
|
||||
if state:
|
||||
current_state_for_room[room_id] = state
|
||||
current_state = yield self._get_new_state_after_events(
|
||||
ev_ctx_rm, new_latest_event_ids,
|
||||
)
|
||||
if current_state is not None:
|
||||
current_state_for_room[room_id] = current_state
|
||||
delta = yield self._calculate_state_delta(
|
||||
room_id, current_state,
|
||||
)
|
||||
if delta is not None:
|
||||
state_delta_for_room[room_id] = delta
|
||||
|
||||
yield self.runInteraction(
|
||||
"persist_events",
|
||||
|
|
@ -391,7 +418,7 @@ class EventsStore(SQLBaseStore):
|
|||
events_and_contexts=chunk,
|
||||
backfilled=backfilled,
|
||||
delete_existing=delete_existing,
|
||||
current_state_for_room=current_state_for_room,
|
||||
state_delta_for_room=state_delta_for_room,
|
||||
new_forward_extremeties=new_forward_extremeties,
|
||||
)
|
||||
persist_event_counter.inc_by(len(chunk))
|
||||
|
|
@ -408,7 +435,7 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
event_counter.inc(event.type, origin_type, origin_entity)
|
||||
|
||||
for room_id, (_, _, new_state) in current_state_for_room.iteritems():
|
||||
for room_id, new_state in current_state_for_room.iteritems():
|
||||
self.get_current_state_ids.prefill(
|
||||
(room_id, ), new_state
|
||||
)
|
||||
|
|
@ -460,20 +487,22 @@ class EventsStore(SQLBaseStore):
|
|||
defer.returnValue(new_latest_event_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
|
||||
"""Calculate the new state deltas for a room.
|
||||
def _get_new_state_after_events(self, events_context, new_latest_event_ids):
|
||||
"""Calculate the current state dict after adding some new events to
|
||||
a room
|
||||
|
||||
Assumes that we are only persisting events for one room at a time.
|
||||
Args:
|
||||
events_context (list[(EventBase, EventContext)]):
|
||||
events and contexts which are being added to the room
|
||||
|
||||
new_latest_event_ids (iterable[str]):
|
||||
the new forward extremities for the room.
|
||||
|
||||
Returns:
|
||||
3-tuple (to_delete, to_insert, new_state) where both are state dicts,
|
||||
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
|
||||
first be deleted from current_state_events, `to_insert` are entries
|
||||
to insert. `new_state` is the full set of state.
|
||||
May return None if there are no changes to be applied.
|
||||
Deferred[dict[(str,str), str]|None]:
|
||||
None if there are no changes to the room state, or
|
||||
a dict of (type, state_key) -> event_id].
|
||||
"""
|
||||
# Now we need to work out the different state sets for
|
||||
# each state extremities
|
||||
state_sets = []
|
||||
state_groups = set()
|
||||
missing_event_ids = []
|
||||
|
|
@ -516,18 +545,23 @@ class EventsStore(SQLBaseStore):
|
|||
state_sets.extend(group_to_state.itervalues())
|
||||
|
||||
if not new_latest_event_ids:
|
||||
current_state = {}
|
||||
defer.returnValue({})
|
||||
elif was_updated:
|
||||
if len(state_sets) == 1:
|
||||
# If there is only one state set, then we know what the current
|
||||
# state is.
|
||||
current_state = state_sets[0]
|
||||
defer.returnValue(state_sets[0])
|
||||
else:
|
||||
# We work out the current state by passing the state sets to the
|
||||
# state resolution algorithm. It may ask for some events, including
|
||||
# the events we have yet to persist, so we need a slightly more
|
||||
# complicated event lookup function than simply looking the events
|
||||
# up in the db.
|
||||
|
||||
logger.info(
|
||||
"Resolving state with %i state sets", len(state_sets),
|
||||
)
|
||||
|
||||
events_map = {ev.event_id: ev for ev, _ in events_context}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -550,13 +584,26 @@ class EventsStore(SQLBaseStore):
|
|||
to_return.update(evs)
|
||||
defer.returnValue(to_return)
|
||||
|
||||
current_state = yield resolve_events(
|
||||
current_state = yield resolve_events_with_factory(
|
||||
state_sets,
|
||||
state_map_factory=get_events,
|
||||
)
|
||||
defer.returnValue(current_state)
|
||||
else:
|
||||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_state_delta(self, room_id, current_state):
|
||||
"""Calculate the new state deltas for a room.
|
||||
|
||||
Assumes that we are only persisting events for one room at a time.
|
||||
|
||||
Returns:
|
||||
2-tuple (to_delete, to_insert) where both are state dicts,
|
||||
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
|
||||
first be deleted from current_state_events, `to_insert` are entries
|
||||
to insert.
|
||||
"""
|
||||
existing_state = yield self.get_current_state_ids(room_id)
|
||||
|
||||
existing_events = set(existing_state.itervalues())
|
||||
|
|
@ -576,7 +623,7 @@ class EventsStore(SQLBaseStore):
|
|||
if ev_id in events_to_insert
|
||||
}
|
||||
|
||||
defer.returnValue((to_delete, to_insert, current_state))
|
||||
defer.returnValue((to_delete, to_insert))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_event(self, event_id, check_redacted=True,
|
||||
|
|
@ -636,7 +683,7 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
@log_function
|
||||
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
|
||||
delete_existing=False, current_state_for_room={},
|
||||
delete_existing=False, state_delta_for_room={},
|
||||
new_forward_extremeties={}):
|
||||
"""Insert some number of room events into the necessary database tables.
|
||||
|
||||
|
|
@ -652,7 +699,7 @@ class EventsStore(SQLBaseStore):
|
|||
delete_existing (bool): True to purge existing table rows for the
|
||||
events from the database. This is useful when retrying due to
|
||||
IntegrityError.
|
||||
current_state_for_room (dict[str, (list[str], list[str])]):
|
||||
state_delta_for_room (dict[str, (list[str], list[str])]):
|
||||
The current-state delta for each room. For each room, a tuple
|
||||
(to_delete, to_insert), being a list of event ids to be removed
|
||||
from the current state, and a list of event ids to be added to
|
||||
|
|
@ -664,7 +711,7 @@ class EventsStore(SQLBaseStore):
|
|||
"""
|
||||
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
|
||||
|
||||
self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
|
||||
self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
|
||||
|
||||
self._update_forward_extremities_txn(
|
||||
txn,
|
||||
|
|
@ -708,9 +755,8 @@ class EventsStore(SQLBaseStore):
|
|||
events_and_contexts=events_and_contexts,
|
||||
)
|
||||
|
||||
# Insert into the state_groups, state_groups_state, and
|
||||
# event_to_state_groups tables.
|
||||
self._store_mult_state_groups_txn(txn, events_and_contexts)
|
||||
# Insert into event_to_state_groups.
|
||||
self._store_event_state_mappings_txn(txn, events_and_contexts)
|
||||
|
||||
# _store_rejected_events_txn filters out any events which were
|
||||
# rejected, and returns the filtered list.
|
||||
|
|
@ -730,7 +776,7 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
|
||||
for room_id, current_state_tuple in state_delta_by_room.iteritems():
|
||||
to_delete, to_insert, _ = current_state_tuple
|
||||
to_delete, to_insert = current_state_tuple
|
||||
txn.executemany(
|
||||
"DELETE FROM current_state_events WHERE event_id = ?",
|
||||
[(ev_id,) for ev_id in to_delete.itervalues()],
|
||||
|
|
@ -945,10 +991,9 @@ class EventsStore(SQLBaseStore):
|
|||
# an outlier in the database. We now have some state at that
|
||||
# so we need to update the state_groups table with that state.
|
||||
|
||||
# insert into the state_group, state_groups_state and
|
||||
# event_to_state_groups tables.
|
||||
# insert into event_to_state_groups.
|
||||
try:
|
||||
self._store_mult_state_groups_txn(txn, ((event, context),))
|
||||
self._store_event_state_mappings_txn(txn, ((event, context),))
|
||||
except Exception:
|
||||
logger.exception("")
|
||||
raise
|
||||
|
|
@ -2018,16 +2063,32 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
|
||||
|
||||
def delete_old_state(self, room_id, topological_ordering):
|
||||
return self.runInteraction(
|
||||
"delete_old_state",
|
||||
self._delete_old_state_txn, room_id, topological_ordering
|
||||
)
|
||||
def purge_history(
|
||||
self, room_id, topological_ordering, delete_local_events,
|
||||
):
|
||||
"""Deletes room history before a certain point
|
||||
|
||||
def _delete_old_state_txn(self, txn, room_id, topological_ordering):
|
||||
"""Deletes old room state
|
||||
Args:
|
||||
room_id (str):
|
||||
|
||||
topological_ordering (int):
|
||||
minimum topo ordering to preserve
|
||||
|
||||
delete_local_events (bool):
|
||||
if True, we will delete local events as well as remote ones
|
||||
(instead of just marking them as outliers and deleting their
|
||||
state groups).
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"purge_history",
|
||||
self._purge_history_txn, room_id, topological_ordering,
|
||||
delete_local_events,
|
||||
)
|
||||
|
||||
def _purge_history_txn(
|
||||
self, txn, room_id, topological_ordering, delete_local_events,
|
||||
):
|
||||
# Tables that should be pruned:
|
||||
# event_auth
|
||||
# event_backward_extremities
|
||||
|
|
@ -2068,7 +2129,7 @@ class EventsStore(SQLBaseStore):
|
|||
400, "topological_ordering is greater than forward extremeties"
|
||||
)
|
||||
|
||||
logger.debug("[purge] looking for events to delete")
|
||||
logger.info("[purge] looking for events to delete")
|
||||
|
||||
txn.execute(
|
||||
"SELECT event_id, state_key FROM events"
|
||||
|
|
@ -2080,16 +2141,16 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
to_delete = [
|
||||
(event_id,) for event_id, state_key in event_rows
|
||||
if state_key is None and not self.hs.is_mine_id(event_id)
|
||||
if state_key is None and (
|
||||
delete_local_events or not self.hs.is_mine_id(event_id)
|
||||
)
|
||||
]
|
||||
logger.info(
|
||||
"[purge] found %i events before cutoff, of which %i are remote"
|
||||
" non-state events to delete", len(event_rows), len(to_delete))
|
||||
"[purge] found %i events before cutoff, of which %i can be deleted",
|
||||
len(event_rows), len(to_delete),
|
||||
)
|
||||
|
||||
for event_id, state_key in event_rows:
|
||||
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
|
||||
|
||||
logger.debug("[purge] Finding new backward extremities")
|
||||
logger.info("[purge] Finding new backward extremities")
|
||||
|
||||
# We calculate the new entries for the backward extremeties by finding
|
||||
# all events that point to events that are to be purged
|
||||
|
|
@ -2103,7 +2164,7 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
new_backwards_extrems = txn.fetchall()
|
||||
|
||||
logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
|
||||
logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
|
||||
|
||||
txn.execute(
|
||||
"DELETE FROM event_backward_extremities WHERE room_id = ?",
|
||||
|
|
@ -2119,7 +2180,7 @@ class EventsStore(SQLBaseStore):
|
|||
]
|
||||
)
|
||||
|
||||
logger.debug("[purge] finding redundant state groups")
|
||||
logger.info("[purge] finding redundant state groups")
|
||||
|
||||
# Get all state groups that are only referenced by events that are
|
||||
# to be deleted.
|
||||
|
|
@ -2136,15 +2197,15 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
state_rows = txn.fetchall()
|
||||
logger.debug("[purge] found %i redundant state groups", len(state_rows))
|
||||
logger.info("[purge] found %i redundant state groups", len(state_rows))
|
||||
|
||||
# make a set of the redundant state groups, so that we can look them up
|
||||
# efficiently
|
||||
state_groups_to_delete = set([sg for sg, in state_rows])
|
||||
|
||||
# Now we get all the state groups that rely on these state groups
|
||||
logger.debug("[purge] finding state groups which depend on redundant"
|
||||
" state groups")
|
||||
logger.info("[purge] finding state groups which depend on redundant"
|
||||
" state groups")
|
||||
remaining_state_groups = []
|
||||
for i in xrange(0, len(state_rows), 100):
|
||||
chunk = [sg for sg, in state_rows[i:i + 100]]
|
||||
|
|
@ -2169,7 +2230,7 @@ class EventsStore(SQLBaseStore):
|
|||
# Now we turn the state groups that reference to-be-deleted state
|
||||
# groups to non delta versions.
|
||||
for sg in remaining_state_groups:
|
||||
logger.debug("[purge] de-delta-ing remaining state group %s", sg)
|
||||
logger.info("[purge] de-delta-ing remaining state group %s", sg)
|
||||
curr_state = self._get_state_groups_from_groups_txn(
|
||||
txn, [sg], types=None
|
||||
)
|
||||
|
|
@ -2206,7 +2267,7 @@ class EventsStore(SQLBaseStore):
|
|||
],
|
||||
)
|
||||
|
||||
logger.debug("[purge] removing redundant state groups")
|
||||
logger.info("[purge] removing redundant state groups")
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups_state WHERE state_group = ?",
|
||||
state_rows
|
||||
|
|
@ -2216,18 +2277,15 @@ class EventsStore(SQLBaseStore):
|
|||
state_rows
|
||||
)
|
||||
|
||||
# Delete all non-state
|
||||
logger.debug("[purge] removing events from event_to_state_groups")
|
||||
logger.info("[purge] removing events from event_to_state_groups")
|
||||
txn.executemany(
|
||||
"DELETE FROM event_to_state_groups WHERE event_id = ?",
|
||||
[(event_id,) for event_id, _ in event_rows]
|
||||
)
|
||||
|
||||
logger.debug("[purge] updating room_depth")
|
||||
txn.execute(
|
||||
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
|
||||
(topological_ordering, room_id,)
|
||||
)
|
||||
for event_id, _ in event_rows:
|
||||
txn.call_after(self._get_state_group_for_event.invalidate, (
|
||||
event_id,
|
||||
))
|
||||
|
||||
# Delete all remote non-state events
|
||||
for table in (
|
||||
|
|
@ -2245,7 +2303,8 @@ class EventsStore(SQLBaseStore):
|
|||
"event_signatures",
|
||||
"rejections",
|
||||
):
|
||||
logger.debug("[purge] removing remote non-state events from %s", table)
|
||||
logger.info("[purge] removing remote non-state events from %s",
|
||||
table)
|
||||
|
||||
txn.executemany(
|
||||
"DELETE FROM %s WHERE event_id = ?" % (table,),
|
||||
|
|
@ -2253,16 +2312,30 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
# Mark all state and own events as outliers
|
||||
logger.debug("[purge] marking remaining events as outliers")
|
||||
logger.info("[purge] marking remaining events as outliers")
|
||||
txn.executemany(
|
||||
"UPDATE events SET outlier = ?"
|
||||
" WHERE event_id = ?",
|
||||
[
|
||||
(True, event_id,) for event_id, state_key in event_rows
|
||||
if state_key is not None or self.hs.is_mine_id(event_id)
|
||||
if state_key is not None or (
|
||||
not delete_local_events and self.hs.is_mine_id(event_id)
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
# synapse tries to take out an exclusive lock on room_depth whenever it
|
||||
# persists events (because upsert), and once we run this update, we
|
||||
# will block that for the rest of our transaction.
|
||||
#
|
||||
# So, let's stick it at the end so that we don't block event
|
||||
# persistence.
|
||||
logger.info("[purge] updating room_depth")
|
||||
txn.execute(
|
||||
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
|
||||
(topological_ordering, room_id,)
|
||||
)
|
||||
|
||||
logger.info("[purge] done")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
|||
|
|
@ -29,9 +29,6 @@ class MediaRepositoryStore(BackgroundUpdateStore):
|
|||
where_clause='url_cache IS NOT NULL',
|
||||
)
|
||||
|
||||
def get_default_thumbnails(self, top_level_type, sub_type):
|
||||
return []
|
||||
|
||||
def get_local_media(self, media_id):
|
||||
"""Get the metadata for a local piece of media
|
||||
Returns:
|
||||
|
|
@ -176,7 +173,14 @@ class MediaRepositoryStore(BackgroundUpdateStore):
|
|||
desc="store_cached_remote_media",
|
||||
)
|
||||
|
||||
def update_cached_last_access_time(self, origin_id_tuples, time_ts):
|
||||
def update_cached_last_access_time(self, local_media, remote_media, time_ms):
|
||||
"""Updates the last access time of the given media
|
||||
|
||||
Args:
|
||||
local_media (iterable[str]): Set of media_ids
|
||||
remote_media (iterable[(str, str)]): Set of (server_name, media_id)
|
||||
time_ms: Current time in milliseconds
|
||||
"""
|
||||
def update_cache_txn(txn):
|
||||
sql = (
|
||||
"UPDATE remote_media_cache SET last_access_ts = ?"
|
||||
|
|
@ -184,8 +188,18 @@ class MediaRepositoryStore(BackgroundUpdateStore):
|
|||
)
|
||||
|
||||
txn.executemany(sql, (
|
||||
(time_ts, media_origin, media_id)
|
||||
for media_origin, media_id in origin_id_tuples
|
||||
(time_ms, media_origin, media_id)
|
||||
for media_origin, media_id in remote_media
|
||||
))
|
||||
|
||||
sql = (
|
||||
"UPDATE local_media_repository SET last_access_ts = ?"
|
||||
" WHERE media_id = ?"
|
||||
)
|
||||
|
||||
txn.executemany(sql, (
|
||||
(time_ms, media_id)
|
||||
for media_id in local_media
|
||||
))
|
||||
|
||||
return self.runInteraction("update_cached_last_access_time", update_cache_txn)
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 46
|
||||
SCHEMA_VERSION = 47
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
|
|
|||
|
|
@ -16,11 +16,9 @@
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage.search import SearchStore
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from .engines import PostgresEngine, Sqlite3Engine
|
||||
|
||||
import collections
|
||||
import logging
|
||||
import ujson as json
|
||||
|
|
@ -40,7 +38,7 @@ RatelimitOverride = collections.namedtuple(
|
|||
)
|
||||
|
||||
|
||||
class RoomStore(SQLBaseStore):
|
||||
class RoomStore(SearchStore):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def store_room(self, room_id, room_creator_user_id, is_public):
|
||||
|
|
@ -263,8 +261,8 @@ class RoomStore(SQLBaseStore):
|
|||
},
|
||||
)
|
||||
|
||||
self._store_event_search_txn(
|
||||
txn, event, "content.topic", event.content["topic"]
|
||||
self.store_event_search_txn(
|
||||
txn, event, "content.topic", event.content["topic"],
|
||||
)
|
||||
|
||||
def _store_room_name_txn(self, txn, event):
|
||||
|
|
@ -279,14 +277,14 @@ class RoomStore(SQLBaseStore):
|
|||
}
|
||||
)
|
||||
|
||||
self._store_event_search_txn(
|
||||
txn, event, "content.name", event.content["name"]
|
||||
self.store_event_search_txn(
|
||||
txn, event, "content.name", event.content["name"],
|
||||
)
|
||||
|
||||
def _store_room_message_txn(self, txn, event):
|
||||
if hasattr(event, "content") and "body" in event.content:
|
||||
self._store_event_search_txn(
|
||||
txn, event, "content.body", event.content["body"]
|
||||
self.store_event_search_txn(
|
||||
txn, event, "content.body", event.content["body"],
|
||||
)
|
||||
|
||||
def _store_history_visibility_txn(self, txn, event):
|
||||
|
|
@ -308,31 +306,6 @@ class RoomStore(SQLBaseStore):
|
|||
event.content[key]
|
||||
))
|
||||
|
||||
def _store_event_search_txn(self, txn, event, key, value):
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = (
|
||||
"INSERT INTO event_search"
|
||||
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
|
||||
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
|
||||
)
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
event.event_id, event.room_id, key, value,
|
||||
event.internal_metadata.stream_ordering,
|
||||
event.origin_server_ts,
|
||||
)
|
||||
)
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
sql = (
|
||||
"INSERT INTO event_search (event_id, room_id, key, value)"
|
||||
" VALUES (?,?,?,?)"
|
||||
)
|
||||
txn.execute(sql, (event.event_id, event.room_id, key, value,))
|
||||
else:
|
||||
# This should be unreachable.
|
||||
raise Exception("Unrecognized database engine")
|
||||
|
||||
def add_event_report(self, room_id, event_id, user_id, reason, content,
|
||||
received_ts):
|
||||
next_id = self._event_reports_id_gen.get_next()
|
||||
|
|
@ -533,73 +506,114 @@ class RoomStore(SQLBaseStore):
|
|||
)
|
||||
self.is_room_blocked.invalidate((room_id,))
|
||||
|
||||
def get_media_mxcs_in_room(self, room_id):
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
The local and remote media as a lists of tuples where the key is
|
||||
the hostname and the value is the media ID.
|
||||
"""
|
||||
def _get_media_mxcs_in_room_txn(txn):
|
||||
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
|
||||
local_media_mxcs = []
|
||||
remote_media_mxcs = []
|
||||
|
||||
# Convert the IDs to MXC URIs
|
||||
for media_id in local_mxcs:
|
||||
local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
|
||||
for hostname, media_id in remote_mxcs:
|
||||
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
|
||||
|
||||
return local_media_mxcs, remote_media_mxcs
|
||||
return self.runInteraction("get_media_ids_in_room", _get_media_mxcs_in_room_txn)
|
||||
|
||||
def quarantine_media_ids_in_room(self, room_id, quarantined_by):
|
||||
"""For a room loops through all events with media and quarantines
|
||||
the associated media
|
||||
"""
|
||||
def _get_media_ids_in_room(txn):
|
||||
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
|
||||
|
||||
next_token = self.get_current_events_token() + 1
|
||||
|
||||
def _quarantine_media_in_room_txn(txn):
|
||||
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
|
||||
total_media_quarantined = 0
|
||||
|
||||
while next_token:
|
||||
sql = """
|
||||
SELECT stream_ordering, content FROM events
|
||||
WHERE room_id = ?
|
||||
AND stream_ordering < ?
|
||||
AND contains_url = ? AND outlier = ?
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
# Now update all the tables to set the quarantined_by flag
|
||||
|
||||
txn.executemany("""
|
||||
UPDATE local_media_repository
|
||||
SET quarantined_by = ?
|
||||
WHERE media_id = ?
|
||||
""", ((quarantined_by, media_id) for media_id in local_mxcs))
|
||||
|
||||
txn.executemany(
|
||||
"""
|
||||
txn.execute(sql, (room_id, next_token, True, False, 100))
|
||||
|
||||
next_token = None
|
||||
local_media_mxcs = []
|
||||
remote_media_mxcs = []
|
||||
for stream_ordering, content_json in txn:
|
||||
next_token = stream_ordering
|
||||
content = json.loads(content_json)
|
||||
|
||||
content_url = content.get("url")
|
||||
thumbnail_url = content.get("info", {}).get("thumbnail_url")
|
||||
|
||||
for url in (content_url, thumbnail_url):
|
||||
if not url:
|
||||
continue
|
||||
matches = mxc_re.match(url)
|
||||
if matches:
|
||||
hostname = matches.group(1)
|
||||
media_id = matches.group(2)
|
||||
if hostname == self.hostname:
|
||||
local_media_mxcs.append(media_id)
|
||||
else:
|
||||
remote_media_mxcs.append((hostname, media_id))
|
||||
|
||||
# Now update all the tables to set the quarantined_by flag
|
||||
|
||||
txn.executemany("""
|
||||
UPDATE local_media_repository
|
||||
UPDATE remote_media_cache
|
||||
SET quarantined_by = ?
|
||||
WHERE media_id = ?
|
||||
""", ((quarantined_by, media_id) for media_id in local_media_mxcs))
|
||||
|
||||
txn.executemany(
|
||||
"""
|
||||
UPDATE remote_media_cache
|
||||
SET quarantined_by = ?
|
||||
WHERE media_origin AND media_id = ?
|
||||
""",
|
||||
(
|
||||
(quarantined_by, origin, media_id)
|
||||
for origin, media_id in remote_media_mxcs
|
||||
)
|
||||
WHERE media_origin = ? AND media_id = ?
|
||||
""",
|
||||
(
|
||||
(quarantined_by, origin, media_id)
|
||||
for origin, media_id in remote_mxcs
|
||||
)
|
||||
)
|
||||
|
||||
total_media_quarantined += len(local_media_mxcs)
|
||||
total_media_quarantined += len(remote_media_mxcs)
|
||||
total_media_quarantined += len(local_mxcs)
|
||||
total_media_quarantined += len(remote_mxcs)
|
||||
|
||||
return total_media_quarantined
|
||||
|
||||
return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room)
|
||||
return self.runInteraction(
|
||||
"quarantine_media_in_room",
|
||||
_quarantine_media_in_room_txn,
|
||||
)
|
||||
|
||||
def _get_media_mxcs_in_room_txn(self, txn, room_id):
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
Args:
|
||||
txn (cursor)
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
The local and remote media as a lists of tuples where the key is
|
||||
the hostname and the value is the media ID.
|
||||
"""
|
||||
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
|
||||
|
||||
next_token = self.get_current_events_token() + 1
|
||||
local_media_mxcs = []
|
||||
remote_media_mxcs = []
|
||||
|
||||
while next_token:
|
||||
sql = """
|
||||
SELECT stream_ordering, content FROM events
|
||||
WHERE room_id = ?
|
||||
AND stream_ordering < ?
|
||||
AND contains_url = ? AND outlier = ?
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (room_id, next_token, True, False, 100))
|
||||
|
||||
next_token = None
|
||||
for stream_ordering, content_json in txn:
|
||||
next_token = stream_ordering
|
||||
content = json.loads(content_json)
|
||||
|
||||
content_url = content.get("url")
|
||||
thumbnail_url = content.get("info", {}).get("thumbnail_url")
|
||||
|
||||
for url in (content_url, thumbnail_url):
|
||||
if not url:
|
||||
continue
|
||||
matches = mxc_re.match(url)
|
||||
if matches:
|
||||
hostname = matches.group(1)
|
||||
media_id = matches.group(2)
|
||||
if hostname == self.hostname:
|
||||
local_media_mxcs.append(media_id)
|
||||
else:
|
||||
remote_media_mxcs.append((hostname, media_id))
|
||||
|
||||
return local_media_mxcs, remote_media_mxcs
|
||||
|
|
|
|||
16
synapse/storage/schema/delta/47/last_access_media.sql
Normal file
16
synapse/storage/schema/delta/47/last_access_media.sql
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
ALTER TABLE local_media_repository ADD COLUMN last_access_ts BIGINT;
|
||||
37
synapse/storage/schema/delta/47/state_group_seq.py
Normal file
37
synapse/storage/schema/delta/47/state_group_seq.py
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
|
||||
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
# if we already have some state groups, we want to start making new
|
||||
# ones with a higher id.
|
||||
cur.execute("SELECT max(id) FROM state_groups")
|
||||
row = cur.fetchone()
|
||||
|
||||
if row[0] is None:
|
||||
start_val = 1
|
||||
else:
|
||||
start_val = row[0] + 1
|
||||
|
||||
cur.execute(
|
||||
"CREATE SEQUENCE state_group_id_seq START WITH %s",
|
||||
(start_val, ),
|
||||
)
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
||||
|
|
@ -13,19 +13,26 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import ujson as json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from .background_updates import BackgroundUpdateStore
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
|
||||
import logging
|
||||
import re
|
||||
import ujson as json
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SearchEntry = namedtuple('SearchEntry', [
|
||||
'key', 'value', 'event_id', 'room_id', 'stream_ordering',
|
||||
'origin_server_ts',
|
||||
])
|
||||
|
||||
|
||||
class SearchStore(BackgroundUpdateStore):
|
||||
|
||||
|
|
@ -60,16 +67,17 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_search(self, progress, batch_size):
|
||||
# we work through the events table from highest stream id to lowest
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
|
||||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id, room_id, type, content FROM events"
|
||||
"SELECT stream_ordering, event_id, room_id, type, content, "
|
||||
" origin_server_ts FROM events"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" AND (%s)"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
|
|
@ -78,6 +86,10 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
# we could stream straight from the results into
|
||||
# store_search_entries_txn with a generator function, but that
|
||||
# would mean having two cursors open on the database at once.
|
||||
# Instead we just build a list of results.
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if not rows:
|
||||
return 0
|
||||
|
|
@ -90,6 +102,8 @@ class SearchStore(BackgroundUpdateStore):
|
|||
event_id = row["event_id"]
|
||||
room_id = row["room_id"]
|
||||
etype = row["type"]
|
||||
stream_ordering = row["stream_ordering"]
|
||||
origin_server_ts = row["origin_server_ts"]
|
||||
try:
|
||||
content = json.loads(row["content"])
|
||||
except Exception:
|
||||
|
|
@ -104,6 +118,8 @@ class SearchStore(BackgroundUpdateStore):
|
|||
elif etype == "m.room.name":
|
||||
key = "content.name"
|
||||
value = content["name"]
|
||||
else:
|
||||
raise Exception("unexpected event type %s" % etype)
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
|
|
@ -114,25 +130,16 @@ class SearchStore(BackgroundUpdateStore):
|
|||
# then skip over it
|
||||
continue
|
||||
|
||||
event_search_rows.append((event_id, room_id, key, value))
|
||||
event_search_rows.append(SearchEntry(
|
||||
key=key,
|
||||
value=value,
|
||||
event_id=event_id,
|
||||
room_id=room_id,
|
||||
stream_ordering=stream_ordering,
|
||||
origin_server_ts=origin_server_ts,
|
||||
))
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = (
|
||||
"INSERT INTO event_search (event_id, room_id, key, vector)"
|
||||
" VALUES (?,?,?,to_tsvector('english', ?))"
|
||||
)
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
sql = (
|
||||
"INSERT INTO event_search (event_id, room_id, key, value)"
|
||||
" VALUES (?,?,?,?)"
|
||||
)
|
||||
else:
|
||||
# This should be unreachable.
|
||||
raise Exception("Unrecognized database engine")
|
||||
|
||||
for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
|
||||
clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
self.store_search_entries_txn(txn, event_search_rows)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
|
|
@ -276,6 +283,92 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
defer.returnValue(num_rows)
|
||||
|
||||
def store_event_search_txn(self, txn, event, key, value):
|
||||
"""Add event to the search table
|
||||
|
||||
Args:
|
||||
txn (cursor):
|
||||
event (EventBase):
|
||||
key (str):
|
||||
value (str):
|
||||
"""
|
||||
self.store_search_entries_txn(
|
||||
txn,
|
||||
(SearchEntry(
|
||||
key=key,
|
||||
value=value,
|
||||
event_id=event.event_id,
|
||||
room_id=event.room_id,
|
||||
stream_ordering=event.internal_metadata.stream_ordering,
|
||||
origin_server_ts=event.origin_server_ts,
|
||||
),),
|
||||
)
|
||||
|
||||
def store_search_entries_txn(self, txn, entries):
|
||||
"""Add entries to the search table
|
||||
|
||||
Args:
|
||||
txn (cursor):
|
||||
entries (iterable[SearchEntry]):
|
||||
entries to be added to the table
|
||||
"""
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = (
|
||||
"INSERT INTO event_search"
|
||||
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
|
||||
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
|
||||
)
|
||||
|
||||
args = ((
|
||||
entry.event_id, entry.room_id, entry.key, entry.value,
|
||||
entry.stream_ordering, entry.origin_server_ts,
|
||||
) for entry in entries)
|
||||
|
||||
# inserts to a GIN index are normally batched up into a pending
|
||||
# list, and then all committed together once the list gets to a
|
||||
# certain size. The trouble with that is that postgres (pre-9.5)
|
||||
# uses work_mem to determine the length of the list, and work_mem
|
||||
# is typically very large.
|
||||
#
|
||||
# We therefore reduce work_mem while we do the insert.
|
||||
#
|
||||
# (postgres 9.5 uses the separate gin_pending_list_limit setting,
|
||||
# so doesn't suffer the same problem, but changing work_mem will
|
||||
# be harmless)
|
||||
|
||||
txn.execute("SET work_mem='256kB'")
|
||||
try:
|
||||
txn.executemany(sql, args)
|
||||
except Exception:
|
||||
# we need to reset work_mem, but doing so may throw a new
|
||||
# exception and we want to preserve the original
|
||||
t, v, tb = sys.exc_info()
|
||||
try:
|
||||
txn.execute("RESET work_mem")
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
"exception resetting work_mem during exception "
|
||||
"handling: %r",
|
||||
e,
|
||||
)
|
||||
raise t, v, tb
|
||||
else:
|
||||
txn.execute("RESET work_mem")
|
||||
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
sql = (
|
||||
"INSERT INTO event_search (event_id, room_id, key, value)"
|
||||
" VALUES (?,?,?,?)"
|
||||
)
|
||||
args = ((
|
||||
entry.event_id, entry.room_id, entry.key, entry.value,
|
||||
) for entry in entries)
|
||||
|
||||
txn.executemany(sql, args)
|
||||
else:
|
||||
# This should be unreachable.
|
||||
raise Exception("Unrecognized database engine")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def search_msgs(self, room_ids, search_term, keys):
|
||||
"""Performs a full text search over events with given keys.
|
||||
|
|
|
|||
|
|
@ -42,11 +42,8 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
|
|||
return len(self.delta_ids) if self.delta_ids else 0
|
||||
|
||||
|
||||
class StateGroupReadStore(SQLBaseStore):
|
||||
"""The read-only parts of StateGroupStore
|
||||
|
||||
None of these functions write to the state tables, so are suitable for
|
||||
including in the SlavedStores.
|
||||
class StateGroupWorkerStore(SQLBaseStore):
|
||||
"""The parts of StateGroupStore that can be called from workers.
|
||||
"""
|
||||
|
||||
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
||||
|
|
@ -54,7 +51,7 @@ class StateGroupReadStore(SQLBaseStore):
|
|||
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(StateGroupReadStore, self).__init__(db_conn, hs)
|
||||
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
|
||||
|
||||
self._state_group_cache = DictionaryCache(
|
||||
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
|
||||
|
|
@ -549,8 +546,117 @@ class StateGroupReadStore(SQLBaseStore):
|
|||
|
||||
defer.returnValue(results)
|
||||
|
||||
def store_state_group(self, event_id, room_id, prev_group, delta_ids,
|
||||
current_state_ids):
|
||||
"""Store a new set of state, returning a newly assigned state group.
|
||||
|
||||
class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
||||
Args:
|
||||
event_id (str): The event ID for which the state was calculated
|
||||
room_id (str)
|
||||
prev_group (int|None): A previous state group for the room, optional.
|
||||
delta_ids (dict|None): The delta between state at `prev_group` and
|
||||
`current_state_ids`, if `prev_group` was given. Same format as
|
||||
`current_state_ids`.
|
||||
current_state_ids (dict): The state to store. Map of (type, state_key)
|
||||
to event_id.
|
||||
|
||||
Returns:
|
||||
Deferred[int]: The state group ID
|
||||
"""
|
||||
def _store_state_group_txn(txn):
|
||||
if current_state_ids is None:
|
||||
# AFAIK, this can never happen
|
||||
raise Exception("current_state_ids cannot be None")
|
||||
|
||||
state_group = self.database_engine.get_next_state_group_id(txn)
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
values={
|
||||
"id": state_group,
|
||||
"room_id": room_id,
|
||||
"event_id": event_id,
|
||||
},
|
||||
)
|
||||
|
||||
# We persist as a delta if we can, while also ensuring the chain
|
||||
# of deltas isn't tooo long, as otherwise read performance degrades.
|
||||
if prev_group:
|
||||
is_in_db = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keyvalues={"id": prev_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
)
|
||||
if not is_in_db:
|
||||
raise Exception(
|
||||
"Trying to persist state with unpersisted prev_group: %r"
|
||||
% (prev_group,)
|
||||
)
|
||||
|
||||
potential_hops = self._count_state_group_hops_txn(
|
||||
txn, prev_group
|
||||
)
|
||||
if prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
values={
|
||||
"state_group": state_group,
|
||||
"prev_state_group": prev_group,
|
||||
},
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": state_group,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in delta_ids.iteritems()
|
||||
],
|
||||
)
|
||||
else:
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": state_group,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in current_state_ids.iteritems()
|
||||
],
|
||||
)
|
||||
|
||||
# Prefill the state group cache with this group.
|
||||
# It's fine to use the sequence like this as the state group map
|
||||
# is immutable. (If the map wasn't immutable then this prefill could
|
||||
# race with another update)
|
||||
txn.call_after(
|
||||
self._state_group_cache.update,
|
||||
self._state_group_cache.sequence,
|
||||
key=state_group,
|
||||
value=dict(current_state_ids),
|
||||
full=True,
|
||||
)
|
||||
|
||||
return state_group
|
||||
|
||||
return self.runInteraction("store_state_group", _store_state_group_txn)
|
||||
|
||||
|
||||
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||
""" Keeps track of the state at a given event.
|
||||
|
||||
This is done by the concept of `state groups`. Every event is a assigned
|
||||
|
|
@ -591,27 +697,12 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
|||
where_clause="type='m.room.member'",
|
||||
)
|
||||
|
||||
def _have_persisted_state_group_txn(self, txn, state_group):
|
||||
txn.execute(
|
||||
"SELECT count(*) FROM state_groups WHERE id = ?",
|
||||
(state_group,)
|
||||
)
|
||||
row = txn.fetchone()
|
||||
return row and row[0]
|
||||
|
||||
def _store_mult_state_groups_txn(self, txn, events_and_contexts):
|
||||
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
||||
state_groups = {}
|
||||
for event, context in events_and_contexts:
|
||||
if event.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
if context.current_state_ids is None:
|
||||
# AFAIK, this can never happen
|
||||
logger.error(
|
||||
"Non-outlier event %s had current_state_ids==None",
|
||||
event.event_id)
|
||||
continue
|
||||
|
||||
# if the event was rejected, just give it the same state as its
|
||||
# predecessor.
|
||||
if context.rejected:
|
||||
|
|
@ -620,90 +711,6 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
|||
|
||||
state_groups[event.event_id] = context.state_group
|
||||
|
||||
if self._have_persisted_state_group_txn(txn, context.state_group):
|
||||
continue
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
values={
|
||||
"id": context.state_group,
|
||||
"room_id": event.room_id,
|
||||
"event_id": event.event_id,
|
||||
},
|
||||
)
|
||||
|
||||
# We persist as a delta if we can, while also ensuring the chain
|
||||
# of deltas isn't tooo long, as otherwise read performance degrades.
|
||||
if context.prev_group:
|
||||
is_in_db = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keyvalues={"id": context.prev_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
)
|
||||
if not is_in_db:
|
||||
raise Exception(
|
||||
"Trying to persist state with unpersisted prev_group: %r"
|
||||
% (context.prev_group,)
|
||||
)
|
||||
|
||||
potential_hops = self._count_state_group_hops_txn(
|
||||
txn, context.prev_group
|
||||
)
|
||||
if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
values={
|
||||
"state_group": context.state_group,
|
||||
"prev_state_group": context.prev_group,
|
||||
},
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": context.state_group,
|
||||
"room_id": event.room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in context.delta_ids.iteritems()
|
||||
],
|
||||
)
|
||||
else:
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": context.state_group,
|
||||
"room_id": event.room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in context.current_state_ids.iteritems()
|
||||
],
|
||||
)
|
||||
|
||||
# Prefill the state group cache with this group.
|
||||
# It's fine to use the sequence like this as the state group map
|
||||
# is immutable. (If the map wasn't immutable then this prefill could
|
||||
# race with another update)
|
||||
txn.call_after(
|
||||
self._state_group_cache.update,
|
||||
self._state_group_cache.sequence,
|
||||
key=context.state_group,
|
||||
value=dict(context.current_state_ids),
|
||||
full=True,
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_to_state_groups",
|
||||
|
|
@ -763,9 +770,6 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
|||
|
||||
return count
|
||||
|
||||
def get_next_state_group(self):
|
||||
return self._state_groups_id_gen.get_next()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_deduplicate_state(self, progress, batch_size):
|
||||
"""This background update will slowly deduplicate state by reencoding
|
||||
|
|
|
|||
|
|
@ -641,8 +641,12 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
if self.hs.config.user_directory_search_all_users:
|
||||
join_clause = ""
|
||||
where_clause = "?<>''" # naughty hack to keep the same number of binds
|
||||
# make s.user_id null to keep the ordering algorithm happy
|
||||
join_clause = """
|
||||
CROSS JOIN (SELECT NULL as user_id) AS s
|
||||
"""
|
||||
join_args = ()
|
||||
where_clause = "1=1"
|
||||
else:
|
||||
join_clause = """
|
||||
LEFT JOIN users_in_public_rooms AS p USING (user_id)
|
||||
|
|
@ -651,6 +655,7 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
WHERE user_id = ? AND share_private
|
||||
) AS s USING (user_id)
|
||||
"""
|
||||
join_args = (user_id,)
|
||||
where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
|
|
@ -692,7 +697,7 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
avatar_url IS NULL
|
||||
LIMIT ?
|
||||
""" % (join_clause, where_clause)
|
||||
args = (user_id, full_query, exact_query, prefix_query, limit + 1,)
|
||||
args = join_args + (full_query, exact_query, prefix_query, limit + 1,)
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
search_query = _parse_query_sqlite(search_term)
|
||||
|
||||
|
|
@ -710,7 +715,7 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
avatar_url IS NULL
|
||||
LIMIT ?
|
||||
""" % (join_clause, where_clause)
|
||||
args = (user_id, search_query, limit + 1)
|
||||
args = join_args + (search_query, limit + 1)
|
||||
else:
|
||||
# This should be unreachable.
|
||||
raise Exception("Unrecognized database engine")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue