mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-12-10 11:44:53 -05:00
Merge branch 'develop' into uhoreg/cross_signing_fix_workers_notify
This commit is contained in:
commit
9c94b48bf1
122 changed files with 973 additions and 448 deletions
|
|
@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
|
|||
from synapse.storage.data_stores import DataStores
|
||||
from synapse.storage.data_stores.main import DataStore
|
||||
from synapse.storage.persist_events import EventsPersistenceStorage
|
||||
from synapse.storage.state import StateGroupStorage
|
||||
|
||||
__all__ = ["DataStores", "DataStore"]
|
||||
|
||||
|
|
@ -45,6 +46,7 @@ class Storage(object):
|
|||
self.main = stores.main
|
||||
|
||||
self.persistence = EventsPersistenceStorage(hs, stores)
|
||||
self.state = StateGroupStorage(hs, stores)
|
||||
|
||||
|
||||
def are_all_users_on_domain(txn, database_engine, domain):
|
||||
|
|
|
|||
|
|
@ -494,7 +494,7 @@ class SQLBaseStore(object):
|
|||
exception_callbacks = []
|
||||
|
||||
if LoggingContext.current_context() == LoggingContext.sentinel:
|
||||
logger.warn("Starting db txn '%s' from sentinel context", desc)
|
||||
logger.warning("Starting db txn '%s' from sentinel context", desc)
|
||||
|
||||
try:
|
||||
result = yield self.runWithConnection(
|
||||
|
|
@ -532,7 +532,7 @@ class SQLBaseStore(object):
|
|||
"""
|
||||
parent_context = LoggingContext.current_context()
|
||||
if parent_context == LoggingContext.sentinel:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Starting db connection from sentinel context: metrics will be lost"
|
||||
)
|
||||
parent_context = None
|
||||
|
|
@ -719,7 +719,7 @@ class SQLBaseStore(object):
|
|||
raise
|
||||
|
||||
# presumably we raced with another transaction: let's retry.
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"IntegrityError when upserting into %s; retrying: %s", table, e
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -320,7 +320,7 @@ class DataStore(
|
|||
) u
|
||||
"""
|
||||
txn.execute(sql, (time_from,))
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
def count_r30_users(self):
|
||||
|
|
@ -399,7 +399,7 @@ class DataStore(
|
|||
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
results["all"] = count
|
||||
|
||||
return results
|
||||
|
|
|
|||
|
|
@ -863,7 +863,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
|||
)
|
||||
stream_row = txn.fetchone()
|
||||
if stream_row:
|
||||
offset_stream_ordering, = stream_row
|
||||
(offset_stream_ordering,) = stream_row
|
||||
rotate_to_stream_ordering = min(
|
||||
self.stream_ordering_day_ago, offset_stream_ordering
|
||||
)
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ def _retry_on_integrity_error(func):
|
|||
@defer.inlineCallbacks
|
||||
def f(self, *args, **kwargs):
|
||||
try:
|
||||
res = yield func(self, *args, **kwargs)
|
||||
res = yield func(self, *args, delete_existing=False, **kwargs)
|
||||
except self.database_engine.module.IntegrityError:
|
||||
logger.exception("IntegrityError, retrying.")
|
||||
res = yield func(self, *args, delete_existing=True, **kwargs)
|
||||
|
|
@ -1125,7 +1125,7 @@ class EventsStore(
|
|||
AND stream_ordering > ?
|
||||
"""
|
||||
txn.execute(sql, (self.stream_ordering_day_ago,))
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_messages", _count_messages)
|
||||
|
|
@ -1146,7 +1146,7 @@ class EventsStore(
|
|||
"""
|
||||
|
||||
txn.execute(sql, (like_clause, self.stream_ordering_day_ago))
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
|
||||
|
|
@ -1161,7 +1161,7 @@ class EventsStore(
|
|||
AND stream_ordering > ?
|
||||
"""
|
||||
txn.execute(sql, (self.stream_ordering_day_ago,))
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_daily_active_rooms", _count)
|
||||
|
|
@ -1646,7 +1646,7 @@ class EventsStore(
|
|||
""",
|
||||
(room_id,),
|
||||
)
|
||||
min_depth, = txn.fetchone()
|
||||
(min_depth,) = txn.fetchone()
|
||||
|
||||
logger.info("[purge] updating room_depth to %d", min_depth)
|
||||
|
||||
|
|
@ -1838,7 +1838,6 @@ class EventsStore(
|
|||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
"topics",
|
||||
"users_in_public_rooms",
|
||||
"users_who_share_private_rooms",
|
||||
# no useful index, but let's clear them anyway
|
||||
|
|
|
|||
|
|
@ -438,7 +438,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
|||
if not rows:
|
||||
return 0
|
||||
|
||||
upper_event_id, = rows[-1]
|
||||
(upper_event_id,) = rows[-1]
|
||||
|
||||
# Update the redactions with the received_ts.
|
||||
#
|
||||
|
|
|
|||
|
|
@ -249,7 +249,7 @@ class GroupServerStore(SQLBaseStore):
|
|||
WHERE group_id = ? AND category_id = ?
|
||||
"""
|
||||
txn.execute(sql, (group_id, category_id))
|
||||
order, = txn.fetchone()
|
||||
(order,) = txn.fetchone()
|
||||
|
||||
if existing:
|
||||
to_update = {}
|
||||
|
|
@ -509,7 +509,7 @@ class GroupServerStore(SQLBaseStore):
|
|||
WHERE group_id = ? AND role_id = ?
|
||||
"""
|
||||
txn.execute(sql, (group_id, role_id))
|
||||
order, = txn.fetchone()
|
||||
(order,) = txn.fetchone()
|
||||
|
||||
if existing:
|
||||
to_update = {}
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
|
||||
|
||||
txn.execute(sql)
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
return self.runInteraction("count_users", _count_users)
|
||||
|
|
|
|||
|
|
@ -143,7 +143,7 @@ class PushRulesWorkerStore(
|
|||
" WHERE user_id = ? AND ? < stream_id"
|
||||
)
|
||||
txn.execute(sql, (user_id, last_id))
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return bool(count)
|
||||
|
||||
return self.runInteraction(
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ class PusherWorkerStore(SQLBaseStore):
|
|||
|
||||
r["data"] = json.loads(dataJson)
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Invalid JSON in data for pusher %d: %s, %s",
|
||||
r["id"],
|
||||
dataJson,
|
||||
|
|
|
|||
|
|
@ -459,7 +459,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
WHERE appservice_id IS NULL
|
||||
"""
|
||||
)
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_users", _count_users)
|
||||
|
|
|
|||
|
|
@ -927,7 +927,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
|
|||
if not row or not row[0]:
|
||||
return processed, True
|
||||
|
||||
next_room, = row
|
||||
(next_room,) = row
|
||||
|
||||
sql = """
|
||||
UPDATE current_state_events
|
||||
|
|
|
|||
|
|
@ -196,7 +196,7 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
|
|||
" ON event_search USING GIN (vector)"
|
||||
)
|
||||
except psycopg2.ProgrammingError as e:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Ignoring error %r when trying to switch from GIST to GIN", e
|
||||
)
|
||||
|
||||
|
|
@ -672,7 +672,7 @@ class SearchStore(SearchBackgroundUpdateStore):
|
|||
)
|
||||
)
|
||||
txn.execute(query, (value, search_query))
|
||||
headline, = txn.fetchall()[0]
|
||||
(headline,) = txn.fetchall()[0]
|
||||
|
||||
# Now we need to pick the possible highlights out of the haedline
|
||||
# result.
|
||||
|
|
|
|||
|
|
@ -725,16 +725,18 @@ class StateGroupWorkerStore(
|
|||
member_filter, non_member_filter = state_filter.get_member_split()
|
||||
|
||||
# Now we look them up in the member and non-member caches
|
||||
non_member_state, incomplete_groups_nm, = (
|
||||
yield self._get_state_for_groups_using_cache(
|
||||
groups, self._state_group_cache, state_filter=non_member_filter
|
||||
)
|
||||
(
|
||||
non_member_state,
|
||||
incomplete_groups_nm,
|
||||
) = yield self._get_state_for_groups_using_cache(
|
||||
groups, self._state_group_cache, state_filter=non_member_filter
|
||||
)
|
||||
|
||||
member_state, incomplete_groups_m, = (
|
||||
yield self._get_state_for_groups_using_cache(
|
||||
groups, self._state_group_members_cache, state_filter=member_filter
|
||||
)
|
||||
(
|
||||
member_state,
|
||||
incomplete_groups_m,
|
||||
) = yield self._get_state_for_groups_using_cache(
|
||||
groups, self._state_group_members_cache, state_filter=member_filter
|
||||
)
|
||||
|
||||
state = dict(non_member_state)
|
||||
|
|
@ -1076,7 +1078,7 @@ class StateBackgroundUpdateStore(
|
|||
" WHERE id < ? AND room_id = ?",
|
||||
(state_group, room_id),
|
||||
)
|
||||
prev_group, = txn.fetchone()
|
||||
(prev_group,) = txn.fetchone()
|
||||
new_last_state_group = state_group
|
||||
|
||||
if prev_group:
|
||||
|
|
|
|||
|
|
@ -773,7 +773,7 @@ class StatsStore(StateDeltasStore):
|
|||
(room_id,),
|
||||
)
|
||||
|
||||
current_state_events_count, = txn.fetchone()
|
||||
(current_state_events_count,) = txn.fetchone()
|
||||
|
||||
users_in_room = self.get_users_in_room_txn(txn, room_id)
|
||||
|
||||
|
|
@ -863,7 +863,7 @@ class StatsStore(StateDeltasStore):
|
|||
""",
|
||||
(user_id,),
|
||||
)
|
||||
count, = txn.fetchone()
|
||||
(count,) = txn.fetchone()
|
||||
return count, pos
|
||||
|
||||
joined_rooms, pos = yield self.runInteraction(
|
||||
|
|
|
|||
|
|
@ -260,9 +260,7 @@ class EventsPersistenceStorage(object):
|
|||
self._event_persist_queue.handle_queue(room_id, persisting_queue)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _persist_events(
|
||||
self, events_and_contexts, backfilled=False, delete_existing=False
|
||||
):
|
||||
def _persist_events(self, events_and_contexts, backfilled=False):
|
||||
"""Calculates the change to current state and forward extremities, and
|
||||
persists the given events and with those updates.
|
||||
|
||||
|
|
@ -412,7 +410,6 @@ class EventsPersistenceStorage(object):
|
|||
state_delta_for_room=state_delta_for_room,
|
||||
new_forward_extremeties=new_forward_extremeties,
|
||||
backfilled=backfilled,
|
||||
delete_existing=delete_existing,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -550,7 +547,7 @@ class EventsPersistenceStorage(object):
|
|||
|
||||
if missing_event_ids:
|
||||
# Now pull out the state groups for any missing events from DB
|
||||
event_to_groups = yield self.state_store._get_state_group_for_events(
|
||||
event_to_groups = yield self.main_store._get_state_group_for_events(
|
||||
missing_event_ids
|
||||
)
|
||||
event_id_to_state_group.update(event_to_groups)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ from six import iteritems, itervalues
|
|||
|
||||
import attr
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -322,3 +324,234 @@ class StateFilter(object):
|
|||
)
|
||||
|
||||
return member_filter, non_member_filter
|
||||
|
||||
|
||||
class StateGroupStorage(object):
|
||||
"""High level interface to fetching state for event.
|
||||
"""
|
||||
|
||||
def __init__(self, hs, stores):
|
||||
self.stores = stores
|
||||
|
||||
def get_state_group_delta(self, state_group):
|
||||
"""Given a state group try to return a previous group and a delta between
|
||||
the old and the new.
|
||||
|
||||
Returns:
|
||||
Deferred[Tuple[Optional[int], Optional[list[dict[tuple[str, str], str]]]]]):
|
||||
(prev_group, delta_ids)
|
||||
"""
|
||||
|
||||
return self.stores.main.get_state_group_delta(state_group)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_groups_ids(self, _room_id, event_ids):
|
||||
"""Get the event IDs of all the state for the state groups for the given events
|
||||
|
||||
Args:
|
||||
_room_id (str): id of the room for these events
|
||||
event_ids (iterable[str]): ids of the events
|
||||
|
||||
Returns:
|
||||
Deferred[dict[int, dict[tuple[str, str], str]]]:
|
||||
dict of state_group_id -> (dict of (type, state_key) -> event id)
|
||||
"""
|
||||
if not event_ids:
|
||||
return {}
|
||||
|
||||
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
|
||||
|
||||
groups = set(itervalues(event_to_groups))
|
||||
group_to_state = yield self.stores.main._get_state_for_groups(groups)
|
||||
|
||||
return group_to_state
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_ids_for_group(self, state_group):
|
||||
"""Get the event IDs of all the state in the given state group
|
||||
|
||||
Args:
|
||||
state_group (int)
|
||||
|
||||
Returns:
|
||||
Deferred[dict]: Resolves to a map of (type, state_key) -> event_id
|
||||
"""
|
||||
group_to_state = yield self._get_state_for_groups((state_group,))
|
||||
|
||||
return group_to_state[state_group]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_groups(self, room_id, event_ids):
|
||||
""" Get the state groups for the given list of event_ids
|
||||
Returns:
|
||||
Deferred[dict[int, list[EventBase]]]:
|
||||
dict of state_group_id -> list of state events.
|
||||
"""
|
||||
if not event_ids:
|
||||
return {}
|
||||
|
||||
group_to_ids = yield self.get_state_groups_ids(room_id, event_ids)
|
||||
|
||||
state_event_map = yield self.stores.main.get_events(
|
||||
[
|
||||
ev_id
|
||||
for group_ids in itervalues(group_to_ids)
|
||||
for ev_id in itervalues(group_ids)
|
||||
],
|
||||
get_prev_content=False,
|
||||
)
|
||||
|
||||
return {
|
||||
group: [
|
||||
state_event_map[v]
|
||||
for v in itervalues(event_id_map)
|
||||
if v in state_event_map
|
||||
]
|
||||
for group, event_id_map in iteritems(group_to_ids)
|
||||
}
|
||||
|
||||
def _get_state_groups_from_groups(self, groups, state_filter):
|
||||
"""Returns the state groups for a given set of groups, filtering on
|
||||
types of state events.
|
||||
|
||||
Args:
|
||||
groups(list[int]): list of state group IDs to query
|
||||
state_filter (StateFilter): The state filter used to fetch state
|
||||
from the database.
|
||||
Returns:
|
||||
Deferred[dict[int, dict[tuple[str, str], str]]]:
|
||||
dict of state_group_id -> (dict of (type, state_key) -> event id)
|
||||
"""
|
||||
|
||||
return self.stores.main._get_state_groups_from_groups(groups, state_filter)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
|
||||
"""Given a list of event_ids and type tuples, return a list of state
|
||||
dicts for each event.
|
||||
Args:
|
||||
event_ids (list[string])
|
||||
state_filter (StateFilter): The state filter used to fetch state
|
||||
from the database.
|
||||
Returns:
|
||||
deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
|
||||
"""
|
||||
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
|
||||
|
||||
groups = set(itervalues(event_to_groups))
|
||||
group_to_state = yield self.stores.main._get_state_for_groups(
|
||||
groups, state_filter
|
||||
)
|
||||
|
||||
state_event_map = yield self.stores.main.get_events(
|
||||
[ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
|
||||
get_prev_content=False,
|
||||
)
|
||||
|
||||
event_to_state = {
|
||||
event_id: {
|
||||
k: state_event_map[v]
|
||||
for k, v in iteritems(group_to_state[group])
|
||||
if v in state_event_map
|
||||
}
|
||||
for event_id, group in iteritems(event_to_groups)
|
||||
}
|
||||
|
||||
return {event: event_to_state[event] for event in event_ids}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()):
|
||||
"""
|
||||
Get the state dicts corresponding to a list of events, containing the event_ids
|
||||
of the state events (as opposed to the events themselves)
|
||||
|
||||
Args:
|
||||
event_ids(list(str)): events whose state should be returned
|
||||
state_filter (StateFilter): The state filter used to fetch state
|
||||
from the database.
|
||||
|
||||
Returns:
|
||||
A deferred dict from event_id -> (type, state_key) -> event_id
|
||||
"""
|
||||
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
|
||||
|
||||
groups = set(itervalues(event_to_groups))
|
||||
group_to_state = yield self.stores.main._get_state_for_groups(
|
||||
groups, state_filter
|
||||
)
|
||||
|
||||
event_to_state = {
|
||||
event_id: group_to_state[group]
|
||||
for event_id, group in iteritems(event_to_groups)
|
||||
}
|
||||
|
||||
return {event: event_to_state[event] for event in event_ids}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_event(self, event_id, state_filter=StateFilter.all()):
|
||||
"""
|
||||
Get the state dict corresponding to a particular event
|
||||
|
||||
Args:
|
||||
event_id(str): event whose state should be returned
|
||||
state_filter (StateFilter): The state filter used to fetch state
|
||||
from the database.
|
||||
|
||||
Returns:
|
||||
A deferred dict from (type, state_key) -> state_event
|
||||
"""
|
||||
state_map = yield self.get_state_for_events([event_id], state_filter)
|
||||
return state_map[event_id]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()):
|
||||
"""
|
||||
Get the state dict corresponding to a particular event
|
||||
|
||||
Args:
|
||||
event_id(str): event whose state should be returned
|
||||
state_filter (StateFilter): The state filter used to fetch state
|
||||
from the database.
|
||||
|
||||
Returns:
|
||||
A deferred dict from (type, state_key) -> state_event
|
||||
"""
|
||||
state_map = yield self.get_state_ids_for_events([event_id], state_filter)
|
||||
return state_map[event_id]
|
||||
|
||||
def _get_state_for_groups(self, groups, state_filter=StateFilter.all()):
|
||||
"""Gets the state at each of a list of state groups, optionally
|
||||
filtering by type/state_key
|
||||
|
||||
Args:
|
||||
groups (iterable[int]): list of state groups for which we want
|
||||
to get the state.
|
||||
state_filter (StateFilter): The state filter used to fetch state
|
||||
from the database.
|
||||
Returns:
|
||||
Deferred[dict[int, dict[tuple[str, str], str]]]:
|
||||
dict of state_group_id -> (dict of (type, state_key) -> event id)
|
||||
"""
|
||||
return self.stores.main._get_state_for_groups(groups, state_filter)
|
||||
|
||||
def store_state_group(
|
||||
self, event_id, room_id, prev_group, delta_ids, current_state_ids
|
||||
):
|
||||
"""Store a new set of state, returning a newly assigned state group.
|
||||
|
||||
Args:
|
||||
event_id (str): The event ID for which the state was calculated
|
||||
room_id (str)
|
||||
prev_group (int|None): A previous state group for the room, optional.
|
||||
delta_ids (dict|None): The delta between state at `prev_group` and
|
||||
`current_state_ids`, if `prev_group` was given. Same format as
|
||||
`current_state_ids`.
|
||||
current_state_ids (dict): The state to store. Map of (type, state_key)
|
||||
to event_id.
|
||||
|
||||
Returns:
|
||||
Deferred[int]: The state group ID
|
||||
"""
|
||||
return self.stores.main.store_state_group(
|
||||
event_id, room_id, prev_group, delta_ids, current_state_ids
|
||||
)
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ def _load_current_id(db_conn, table, column, step=1):
|
|||
cur.execute("SELECT MAX(%s) FROM %s" % (column, table))
|
||||
else:
|
||||
cur.execute("SELECT MIN(%s) FROM %s" % (column, table))
|
||||
val, = cur.fetchone()
|
||||
(val,) = cur.fetchone()
|
||||
cur.close()
|
||||
current_id = int(val) if val else step
|
||||
return (max if step > 0 else min)(current_id, step)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue