mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 11:26:09 -04:00
Store state groups separately from events (#2784)
* Split state group persist into seperate storage func * Add per database engine code for state group id gen * Move store_state_group to StateReadStore This allows other workers to use it, and so resolve state. * Hook up store_state_group * Fix tests * Rename _store_mult_state_groups_txn * Rename StateGroupReadStore * Remove redundant _have_persisted_state_group_txn * Update comments * Comment compute_event_context * Set start val for state_group_id_seq ... otherwise we try to recreate old state groups * Update comments * Don't store state for outliers * Update comment * Update docstring as state groups are ints
This commit is contained in:
parent
b31bf0bb51
commit
3d33eef6fc
12 changed files with 341 additions and 204 deletions
|
@ -124,7 +124,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
)
|
||||
|
||||
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
|
||||
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
|
||||
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
||||
|
|
|
@ -62,3 +62,9 @@ class PostgresEngine(object):
|
|||
|
||||
def lock_table(self, txn, table):
|
||||
txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
|
||||
|
||||
def get_next_state_group_id(self, txn):
|
||||
"""Returns an int that can be used as a new state_group ID
|
||||
"""
|
||||
txn.execute("SELECT nextval('state_group_id_seq')")
|
||||
return txn.fetchone()[0]
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
from synapse.storage.prepare_database import prepare_database
|
||||
|
||||
import struct
|
||||
import threading
|
||||
|
||||
|
||||
class Sqlite3Engine(object):
|
||||
|
@ -24,6 +25,11 @@ class Sqlite3Engine(object):
|
|||
def __init__(self, database_module, database_config):
|
||||
self.module = database_module
|
||||
|
||||
# The current max state_group, or None if we haven't looked
|
||||
# in the DB yet.
|
||||
self._current_state_group_id = None
|
||||
self._current_state_group_id_lock = threading.Lock()
|
||||
|
||||
def check_database(self, txn):
|
||||
pass
|
||||
|
||||
|
@ -43,6 +49,19 @@ class Sqlite3Engine(object):
|
|||
def lock_table(self, txn, table):
|
||||
return
|
||||
|
||||
def get_next_state_group_id(self, txn):
|
||||
"""Returns an int that can be used as a new state_group ID
|
||||
"""
|
||||
# We do application locking here since if we're using sqlite then
|
||||
# we are a single process synapse.
|
||||
with self._current_state_group_id_lock:
|
||||
if self._current_state_group_id is None:
|
||||
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
|
||||
self._current_state_group_id = txn.fetchone()[0]
|
||||
|
||||
self._current_state_group_id += 1
|
||||
return self._current_state_group_id
|
||||
|
||||
|
||||
# Following functions taken from: https://github.com/coleifer/peewee
|
||||
|
||||
|
|
|
@ -755,9 +755,8 @@ class EventsStore(SQLBaseStore):
|
|||
events_and_contexts=events_and_contexts,
|
||||
)
|
||||
|
||||
# Insert into the state_groups, state_groups_state, and
|
||||
# event_to_state_groups tables.
|
||||
self._store_mult_state_groups_txn(txn, events_and_contexts)
|
||||
# Insert into event_to_state_groups.
|
||||
self._store_event_state_mappings_txn(txn, events_and_contexts)
|
||||
|
||||
# _store_rejected_events_txn filters out any events which were
|
||||
# rejected, and returns the filtered list.
|
||||
|
@ -992,10 +991,9 @@ class EventsStore(SQLBaseStore):
|
|||
# an outlier in the database. We now have some state at that
|
||||
# so we need to update the state_groups table with that state.
|
||||
|
||||
# insert into the state_group, state_groups_state and
|
||||
# event_to_state_groups tables.
|
||||
# insert into event_to_state_groups.
|
||||
try:
|
||||
self._store_mult_state_groups_txn(txn, ((event, context),))
|
||||
self._store_event_state_mappings_txn(txn, ((event, context),))
|
||||
except Exception:
|
||||
logger.exception("")
|
||||
raise
|
||||
|
|
37
synapse/storage/schema/delta/47/state_group_seq.py
Normal file
37
synapse/storage/schema/delta/47/state_group_seq.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
|
||||
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
# if we already have some state groups, we want to start making new
|
||||
# ones with a higher id.
|
||||
cur.execute("SELECT max(id) FROM state_groups")
|
||||
row = cur.fetchone()
|
||||
|
||||
if row[0] is None:
|
||||
start_val = 1
|
||||
else:
|
||||
start_val = row[0] + 1
|
||||
|
||||
cur.execute(
|
||||
"CREATE SEQUENCE state_group_id_seq START WITH %s",
|
||||
(start_val, ),
|
||||
)
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
|
@ -42,11 +42,8 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
|
|||
return len(self.delta_ids) if self.delta_ids else 0
|
||||
|
||||
|
||||
class StateGroupReadStore(SQLBaseStore):
|
||||
"""The read-only parts of StateGroupStore
|
||||
|
||||
None of these functions write to the state tables, so are suitable for
|
||||
including in the SlavedStores.
|
||||
class StateGroupWorkerStore(SQLBaseStore):
|
||||
"""The parts of StateGroupStore that can be called from workers.
|
||||
"""
|
||||
|
||||
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
||||
|
@ -54,7 +51,7 @@ class StateGroupReadStore(SQLBaseStore):
|
|||
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(StateGroupReadStore, self).__init__(db_conn, hs)
|
||||
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
|
||||
|
||||
self._state_group_cache = DictionaryCache(
|
||||
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
|
||||
|
@ -549,8 +546,117 @@ class StateGroupReadStore(SQLBaseStore):
|
|||
|
||||
defer.returnValue(results)
|
||||
|
||||
def store_state_group(self, event_id, room_id, prev_group, delta_ids,
|
||||
current_state_ids):
|
||||
"""Store a new set of state, returning a newly assigned state group.
|
||||
|
||||
class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
||||
Args:
|
||||
event_id (str): The event ID for which the state was calculated
|
||||
room_id (str)
|
||||
prev_group (int|None): A previous state group for the room, optional.
|
||||
delta_ids (dict|None): The delta between state at `prev_group` and
|
||||
`current_state_ids`, if `prev_group` was given. Same format as
|
||||
`current_state_ids`.
|
||||
current_state_ids (dict): The state to store. Map of (type, state_key)
|
||||
to event_id.
|
||||
|
||||
Returns:
|
||||
Deferred[int]: The state group ID
|
||||
"""
|
||||
def _store_state_group_txn(txn):
|
||||
if current_state_ids is None:
|
||||
# AFAIK, this can never happen
|
||||
raise Exception("current_state_ids cannot be None")
|
||||
|
||||
state_group = self.database_engine.get_next_state_group_id(txn)
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
values={
|
||||
"id": state_group,
|
||||
"room_id": room_id,
|
||||
"event_id": event_id,
|
||||
},
|
||||
)
|
||||
|
||||
# We persist as a delta if we can, while also ensuring the chain
|
||||
# of deltas isn't tooo long, as otherwise read performance degrades.
|
||||
if prev_group:
|
||||
is_in_db = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keyvalues={"id": prev_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
)
|
||||
if not is_in_db:
|
||||
raise Exception(
|
||||
"Trying to persist state with unpersisted prev_group: %r"
|
||||
% (prev_group,)
|
||||
)
|
||||
|
||||
potential_hops = self._count_state_group_hops_txn(
|
||||
txn, prev_group
|
||||
)
|
||||
if prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
values={
|
||||
"state_group": state_group,
|
||||
"prev_state_group": prev_group,
|
||||
},
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": state_group,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in delta_ids.iteritems()
|
||||
],
|
||||
)
|
||||
else:
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": state_group,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in current_state_ids.iteritems()
|
||||
],
|
||||
)
|
||||
|
||||
# Prefill the state group cache with this group.
|
||||
# It's fine to use the sequence like this as the state group map
|
||||
# is immutable. (If the map wasn't immutable then this prefill could
|
||||
# race with another update)
|
||||
txn.call_after(
|
||||
self._state_group_cache.update,
|
||||
self._state_group_cache.sequence,
|
||||
key=state_group,
|
||||
value=dict(current_state_ids),
|
||||
full=True,
|
||||
)
|
||||
|
||||
return state_group
|
||||
|
||||
return self.runInteraction("store_state_group", _store_state_group_txn)
|
||||
|
||||
|
||||
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||
""" Keeps track of the state at a given event.
|
||||
|
||||
This is done by the concept of `state groups`. Every event is a assigned
|
||||
|
@ -591,27 +697,12 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
|||
where_clause="type='m.room.member'",
|
||||
)
|
||||
|
||||
def _have_persisted_state_group_txn(self, txn, state_group):
|
||||
txn.execute(
|
||||
"SELECT count(*) FROM state_groups WHERE id = ?",
|
||||
(state_group,)
|
||||
)
|
||||
row = txn.fetchone()
|
||||
return row and row[0]
|
||||
|
||||
def _store_mult_state_groups_txn(self, txn, events_and_contexts):
|
||||
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
||||
state_groups = {}
|
||||
for event, context in events_and_contexts:
|
||||
if event.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
if context.current_state_ids is None:
|
||||
# AFAIK, this can never happen
|
||||
logger.error(
|
||||
"Non-outlier event %s had current_state_ids==None",
|
||||
event.event_id)
|
||||
continue
|
||||
|
||||
# if the event was rejected, just give it the same state as its
|
||||
# predecessor.
|
||||
if context.rejected:
|
||||
|
@ -620,90 +711,6 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
|||
|
||||
state_groups[event.event_id] = context.state_group
|
||||
|
||||
if self._have_persisted_state_group_txn(txn, context.state_group):
|
||||
continue
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
values={
|
||||
"id": context.state_group,
|
||||
"room_id": event.room_id,
|
||||
"event_id": event.event_id,
|
||||
},
|
||||
)
|
||||
|
||||
# We persist as a delta if we can, while also ensuring the chain
|
||||
# of deltas isn't tooo long, as otherwise read performance degrades.
|
||||
if context.prev_group:
|
||||
is_in_db = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keyvalues={"id": context.prev_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
)
|
||||
if not is_in_db:
|
||||
raise Exception(
|
||||
"Trying to persist state with unpersisted prev_group: %r"
|
||||
% (context.prev_group,)
|
||||
)
|
||||
|
||||
potential_hops = self._count_state_group_hops_txn(
|
||||
txn, context.prev_group
|
||||
)
|
||||
if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
values={
|
||||
"state_group": context.state_group,
|
||||
"prev_state_group": context.prev_group,
|
||||
},
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": context.state_group,
|
||||
"room_id": event.room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in context.delta_ids.iteritems()
|
||||
],
|
||||
)
|
||||
else:
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": context.state_group,
|
||||
"room_id": event.room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in context.current_state_ids.iteritems()
|
||||
],
|
||||
)
|
||||
|
||||
# Prefill the state group cache with this group.
|
||||
# It's fine to use the sequence like this as the state group map
|
||||
# is immutable. (If the map wasn't immutable then this prefill could
|
||||
# race with another update)
|
||||
txn.call_after(
|
||||
self._state_group_cache.update,
|
||||
self._state_group_cache.sequence,
|
||||
key=context.state_group,
|
||||
value=dict(context.current_state_ids),
|
||||
full=True,
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_to_state_groups",
|
||||
|
@ -763,9 +770,6 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore):
|
|||
|
||||
return count
|
||||
|
||||
def get_next_state_group(self):
|
||||
return self._state_groups_id_gen.get_next()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_deduplicate_state(self, progress, batch_size):
|
||||
"""This background update will slowly deduplicate state by reencoding
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue