mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-18 06:57:49 -04:00
Merge remote-tracking branch 'upstream/release-v1.39'
This commit is contained in:
commit
9754df5623
260 changed files with 3981 additions and 2390 deletions
|
@ -92,14 +92,12 @@ class BackgroundUpdater:
|
|||
self.db_pool = database
|
||||
|
||||
# if a background update is currently running, its name.
|
||||
self._current_background_update = None # type: Optional[str]
|
||||
self._current_background_update: Optional[str] = None
|
||||
|
||||
self._background_update_performance = (
|
||||
{}
|
||||
) # type: Dict[str, BackgroundUpdatePerformance]
|
||||
self._background_update_handlers = (
|
||||
{}
|
||||
) # type: Dict[str, Callable[[JsonDict, int], Awaitable[int]]]
|
||||
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
|
||||
self._background_update_handlers: Dict[
|
||||
str, Callable[[JsonDict, int], Awaitable[int]]
|
||||
] = {}
|
||||
self._all_done = False
|
||||
|
||||
def start_doing_background_updates(self) -> None:
|
||||
|
@ -411,7 +409,7 @@ class BackgroundUpdater:
|
|||
c.execute(sql)
|
||||
|
||||
if isinstance(self.db_pool.engine, engines.PostgresEngine):
|
||||
runner = create_index_psql # type: Optional[Callable[[Connection], None]]
|
||||
runner: Optional[Callable[[Connection], None]] = create_index_psql
|
||||
elif psql_only:
|
||||
runner = None
|
||||
else:
|
||||
|
|
|
@ -670,8 +670,8 @@ class DatabasePool:
|
|||
Returns:
|
||||
The result of func
|
||||
"""
|
||||
after_callbacks = [] # type: List[_CallbackListEntry]
|
||||
exception_callbacks = [] # type: List[_CallbackListEntry]
|
||||
after_callbacks: List[_CallbackListEntry] = []
|
||||
exception_callbacks: List[_CallbackListEntry] = []
|
||||
|
||||
if not current_context():
|
||||
logger.warning("Starting db txn '%s' from sentinel context", desc)
|
||||
|
@ -907,7 +907,7 @@ class DatabasePool:
|
|||
# The sort is to ensure that we don't rely on dictionary iteration
|
||||
# order.
|
||||
keys, vals = zip(
|
||||
*[zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i]
|
||||
*(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i)
|
||||
)
|
||||
|
||||
for k in keys:
|
||||
|
@ -1090,7 +1090,7 @@ class DatabasePool:
|
|||
return False
|
||||
|
||||
# We didn't find any existing rows, so insert a new one
|
||||
allvalues = {} # type: Dict[str, Any]
|
||||
allvalues: Dict[str, Any] = {}
|
||||
allvalues.update(keyvalues)
|
||||
allvalues.update(values)
|
||||
allvalues.update(insertion_values)
|
||||
|
@ -1121,7 +1121,7 @@ class DatabasePool:
|
|||
values: The nonunique columns and their new values
|
||||
insertion_values: additional key/values to use only when inserting
|
||||
"""
|
||||
allvalues = {} # type: Dict[str, Any]
|
||||
allvalues: Dict[str, Any] = {}
|
||||
allvalues.update(keyvalues)
|
||||
allvalues.update(insertion_values or {})
|
||||
|
||||
|
@ -1257,7 +1257,7 @@ class DatabasePool:
|
|||
value_values: A list of each row's value column values.
|
||||
Ignored if value_names is empty.
|
||||
"""
|
||||
allnames = [] # type: List[str]
|
||||
allnames: List[str] = []
|
||||
allnames.extend(key_names)
|
||||
allnames.extend(value_names)
|
||||
|
||||
|
@ -1566,7 +1566,7 @@ class DatabasePool:
|
|||
"""
|
||||
keyvalues = keyvalues or {}
|
||||
|
||||
results = [] # type: List[Dict[str, Any]]
|
||||
results: List[Dict[str, Any]] = []
|
||||
|
||||
if not iterable:
|
||||
return results
|
||||
|
@ -1978,7 +1978,7 @@ class DatabasePool:
|
|||
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
|
||||
|
||||
where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
|
||||
arg_list = [] # type: List[Any]
|
||||
arg_list: List[Any] = []
|
||||
if filters:
|
||||
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
|
||||
arg_list += list(filters.values())
|
||||
|
|
|
@ -48,9 +48,7 @@ def _make_exclusive_regex(
|
|||
]
|
||||
if exclusive_user_regexes:
|
||||
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
|
||||
exclusive_user_pattern = re.compile(
|
||||
exclusive_user_regex
|
||||
) # type: Optional[Pattern]
|
||||
exclusive_user_pattern: Optional[Pattern] = re.compile(exclusive_user_regex)
|
||||
else:
|
||||
# We handle this case specially otherwise the constructed regex
|
||||
# will always match
|
||||
|
|
|
@ -203,9 +203,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
"delete_messages_for_device", delete_messages_for_device_txn
|
||||
)
|
||||
|
||||
log_kv(
|
||||
{"message": "deleted {} messages for device".format(count), "count": count}
|
||||
)
|
||||
log_kv({"message": f"deleted {count} messages for device", "count": count})
|
||||
|
||||
# Update the cache, ensuring that we only ever increase the value
|
||||
last_deleted_stream_id = self._last_device_delete_cache.get(
|
||||
|
|
|
@ -247,7 +247,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
|
|||
|
||||
txn.execute(sql, query_params)
|
||||
|
||||
result = {} # type: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]
|
||||
result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
|
||||
for (user_id, device_id, display_name, key_json) in txn:
|
||||
if include_deleted_devices:
|
||||
deleted_devices.remove((user_id, device_id))
|
||||
|
|
|
@ -62,9 +62,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
)
|
||||
|
||||
# Cache of event ID to list of auth event IDs and their depths.
|
||||
self._event_auth_cache = LruCache(
|
||||
self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache(
|
||||
500000, "_event_auth_cache", size_callback=len
|
||||
) # type: LruCache[str, List[Tuple[str, int]]]
|
||||
)
|
||||
|
||||
self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
|
||||
|
||||
|
@ -137,10 +137,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
initial_events = set(event_ids)
|
||||
|
||||
# All the events that we've found that are reachable from the events.
|
||||
seen_events = set() # type: Set[str]
|
||||
seen_events: Set[str] = set()
|
||||
|
||||
# A map from chain ID to max sequence number of the given events.
|
||||
event_chains = {} # type: Dict[int, int]
|
||||
event_chains: Dict[int, int] = {}
|
||||
|
||||
sql = """
|
||||
SELECT event_id, chain_id, sequence_number
|
||||
|
@ -182,7 +182,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
"""
|
||||
|
||||
# A map from chain ID to max sequence number *reachable* from any event ID.
|
||||
chains = {} # type: Dict[int, int]
|
||||
chains: Dict[int, int] = {}
|
||||
|
||||
# Add all linked chains reachable from initial set of chains.
|
||||
for batch in batch_iter(event_chains, 1000):
|
||||
|
@ -353,14 +353,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
initial_events = set(state_sets[0]).union(*state_sets[1:])
|
||||
|
||||
# Map from event_id -> (chain ID, seq no)
|
||||
chain_info = {} # type: Dict[str, Tuple[int, int]]
|
||||
chain_info: Dict[str, Tuple[int, int]] = {}
|
||||
|
||||
# Map from chain ID -> seq no -> event Id
|
||||
chain_to_event = {} # type: Dict[int, Dict[int, str]]
|
||||
chain_to_event: Dict[int, Dict[int, str]] = {}
|
||||
|
||||
# All the chains that we've found that are reachable from the state
|
||||
# sets.
|
||||
seen_chains = set() # type: Set[int]
|
||||
seen_chains: Set[int] = set()
|
||||
|
||||
sql = """
|
||||
SELECT event_id, chain_id, sequence_number
|
||||
|
@ -392,9 +392,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
|
||||
# Corresponds to `state_sets`, except as a map from chain ID to max
|
||||
# sequence number reachable from the state set.
|
||||
set_to_chain = [] # type: List[Dict[int, int]]
|
||||
set_to_chain: List[Dict[int, int]] = []
|
||||
for state_set in state_sets:
|
||||
chains = {} # type: Dict[int, int]
|
||||
chains: Dict[int, int] = {}
|
||||
set_to_chain.append(chains)
|
||||
|
||||
for event_id in state_set:
|
||||
|
@ -446,7 +446,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
|
||||
# Mapping from chain ID to the range of sequence numbers that should be
|
||||
# pulled from the database.
|
||||
chain_to_gap = {} # type: Dict[int, Tuple[int, int]]
|
||||
chain_to_gap: Dict[int, Tuple[int, int]] = {}
|
||||
|
||||
for chain_id in seen_chains:
|
||||
min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain)
|
||||
|
@ -555,7 +555,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
}
|
||||
|
||||
# The sorted list of events whose auth chains we should walk.
|
||||
search = [] # type: List[Tuple[int, str]]
|
||||
search: List[Tuple[int, str]] = []
|
||||
|
||||
# We need to get the depth of the initial events for sorting purposes.
|
||||
sql = """
|
||||
|
@ -578,7 +578,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
search.sort()
|
||||
|
||||
# Map from event to its auth events
|
||||
event_to_auth_events = {} # type: Dict[str, Set[str]]
|
||||
event_to_auth_events: Dict[str, Set[str]] = {}
|
||||
|
||||
base_sql = """
|
||||
SELECT a.event_id, auth_id, depth
|
||||
|
@ -1230,7 +1230,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
"SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
|
||||
)
|
||||
|
||||
(age,) = txn.fetchone()
|
||||
(received_ts,) = txn.fetchone()
|
||||
|
||||
age = self._clock.time_msec() - received_ts
|
||||
|
||||
return count, age
|
||||
|
||||
|
|
|
@ -759,7 +759,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
# object because we might not have the same amount of rows in each of them. To do
|
||||
# this, we use a dict indexed on the user ID and room ID to make it easier to
|
||||
# populate.
|
||||
summaries = {} # type: Dict[Tuple[str, str], _EventPushSummary]
|
||||
summaries: Dict[Tuple[str, str], _EventPushSummary] = {}
|
||||
for row in txn:
|
||||
summaries[(row[0], row[1])] = _EventPushSummary(
|
||||
unread_count=row[2],
|
||||
|
|
|
@ -109,10 +109,8 @@ class PersistEventsStore:
|
|||
|
||||
# Ideally we'd move these ID gens here, unfortunately some other ID
|
||||
# generators are chained off them so doing so is a bit of a PITA.
|
||||
self._backfill_id_gen = (
|
||||
self.store._backfill_id_gen
|
||||
) # type: MultiWriterIdGenerator
|
||||
self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator
|
||||
self._backfill_id_gen: MultiWriterIdGenerator = self.store._backfill_id_gen
|
||||
self._stream_id_gen: MultiWriterIdGenerator = self.store._stream_id_gen
|
||||
|
||||
# This should only exist on instances that are configured to write
|
||||
assert (
|
||||
|
@ -221,7 +219,7 @@ class PersistEventsStore:
|
|||
Returns:
|
||||
Filtered event ids
|
||||
"""
|
||||
results = [] # type: List[str]
|
||||
results: List[str] = []
|
||||
|
||||
def _get_events_which_are_prevs_txn(txn, batch):
|
||||
sql = """
|
||||
|
@ -508,7 +506,7 @@ class PersistEventsStore:
|
|||
"""
|
||||
|
||||
# Map from event ID to chain ID/sequence number.
|
||||
chain_map = {} # type: Dict[str, Tuple[int, int]]
|
||||
chain_map: Dict[str, Tuple[int, int]] = {}
|
||||
|
||||
# Set of event IDs to calculate chain ID/seq numbers for.
|
||||
events_to_calc_chain_id_for = set(event_to_room_id)
|
||||
|
@ -817,8 +815,8 @@ class PersistEventsStore:
|
|||
# new chain if the sequence number has already been allocated.
|
||||
#
|
||||
|
||||
existing_chains = set() # type: Set[int]
|
||||
tree = [] # type: List[Tuple[str, Optional[str]]]
|
||||
existing_chains: Set[int] = set()
|
||||
tree: List[Tuple[str, Optional[str]]] = []
|
||||
|
||||
# We need to do this in a topologically sorted order as we want to
|
||||
# generate chain IDs/sequence numbers of an event's auth events before
|
||||
|
@ -848,7 +846,7 @@ class PersistEventsStore:
|
|||
)
|
||||
txn.execute(sql % (clause,), args)
|
||||
|
||||
chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int]
|
||||
chain_to_max_seq_no: Dict[Any, int] = {row[0]: row[1] for row in txn}
|
||||
|
||||
# Allocate the new events chain ID/sequence numbers.
|
||||
#
|
||||
|
@ -858,8 +856,8 @@ class PersistEventsStore:
|
|||
# number of new chain IDs in one call, replacing all temporary
|
||||
# objects with real allocated chain IDs.
|
||||
|
||||
unallocated_chain_ids = set() # type: Set[object]
|
||||
new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]]
|
||||
unallocated_chain_ids: Set[object] = set()
|
||||
new_chain_tuples: Dict[str, Tuple[Any, int]] = {}
|
||||
for event_id, auth_event_id in tree:
|
||||
# If we reference an auth_event_id we fetch the allocated chain ID,
|
||||
# either from the existing `chain_map` or the newly generated
|
||||
|
@ -870,7 +868,7 @@ class PersistEventsStore:
|
|||
if not existing_chain_id:
|
||||
existing_chain_id = chain_map[auth_event_id]
|
||||
|
||||
new_chain_tuple = None # type: Optional[Tuple[Any, int]]
|
||||
new_chain_tuple: Optional[Tuple[Any, int]] = None
|
||||
if existing_chain_id:
|
||||
# We found a chain ID/sequence number candidate, check its
|
||||
# not already taken.
|
||||
|
@ -897,9 +895,9 @@ class PersistEventsStore:
|
|||
)
|
||||
|
||||
# Map from potentially temporary chain ID to real chain ID
|
||||
chain_id_to_allocated_map = dict(
|
||||
chain_id_to_allocated_map: Dict[Any, int] = dict(
|
||||
zip(unallocated_chain_ids, newly_allocated_chain_ids)
|
||||
) # type: Dict[Any, int]
|
||||
)
|
||||
chain_id_to_allocated_map.update((c, c) for c in existing_chains)
|
||||
|
||||
return {
|
||||
|
@ -1175,9 +1173,9 @@ class PersistEventsStore:
|
|||
Returns:
|
||||
list[(EventBase, EventContext)]: filtered list
|
||||
"""
|
||||
new_events_and_contexts = (
|
||||
OrderedDict()
|
||||
) # type: OrderedDict[str, Tuple[EventBase, EventContext]]
|
||||
new_events_and_contexts: OrderedDict[
|
||||
str, Tuple[EventBase, EventContext]
|
||||
] = OrderedDict()
|
||||
for event, context in events_and_contexts:
|
||||
prev_event_context = new_events_and_contexts.get(event.event_id)
|
||||
if prev_event_context:
|
||||
|
@ -1205,7 +1203,7 @@ class PersistEventsStore:
|
|||
we are persisting
|
||||
backfilled (bool): True if the events were backfilled
|
||||
"""
|
||||
depth_updates = {} # type: Dict[str, int]
|
||||
depth_updates: Dict[str, int] = {}
|
||||
for event, context in events_and_contexts:
|
||||
# Remove the any existing cache entries for the event_ids
|
||||
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
|
||||
|
@ -1580,11 +1578,11 @@ class PersistEventsStore:
|
|||
# invalidate the cache for the redacted event
|
||||
txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
|
||||
|
||||
self.db_pool.simple_insert_txn(
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="redactions",
|
||||
keyvalues={"event_id": event.event_id},
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
"redacts": event.redacts,
|
||||
"received_ts": self._clock.time_msec(),
|
||||
},
|
||||
|
@ -1885,7 +1883,7 @@ class PersistEventsStore:
|
|||
),
|
||||
)
|
||||
|
||||
room_to_event_ids = {} # type: Dict[str, List[str]]
|
||||
room_to_event_ids: Dict[str, List[str]] = {}
|
||||
for e, _ in events_and_contexts:
|
||||
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
|
||||
|
||||
|
@ -2012,10 +2010,6 @@ class PersistEventsStore:
|
|||
|
||||
Forward extremities are handled when we first start persisting the events.
|
||||
"""
|
||||
events_by_room = {} # type: Dict[str, List[EventBase]]
|
||||
for ev in events:
|
||||
events_by_room.setdefault(ev.room_id, []).append(ev)
|
||||
|
||||
query = (
|
||||
"INSERT INTO event_backward_extremities (event_id, room_id)"
|
||||
" SELECT ?, ? WHERE NOT EXISTS ("
|
||||
|
|
|
@ -960,9 +960,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
|||
event_to_types = {row[0]: (row[1], row[2]) for row in rows}
|
||||
|
||||
# Calculate the new last position we've processed up to.
|
||||
new_last_depth = rows[-1][3] if rows else last_depth # type: int
|
||||
new_last_stream = rows[-1][4] if rows else last_stream # type: int
|
||||
new_last_room_id = rows[-1][5] if rows else "" # type: str
|
||||
new_last_depth: int = rows[-1][3] if rows else last_depth
|
||||
new_last_stream: int = rows[-1][4] if rows else last_stream
|
||||
new_last_room_id: str = rows[-1][5] if rows else ""
|
||||
|
||||
# Map from room_id to last depth/stream_ordering processed for the room,
|
||||
# excluding the last room (which we're likely still processing). We also
|
||||
|
@ -989,7 +989,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
|||
retcols=("event_id", "auth_id"),
|
||||
)
|
||||
|
||||
event_to_auth_chain = {} # type: Dict[str, List[str]]
|
||||
event_to_auth_chain: Dict[str, List[str]] = {}
|
||||
for row in auth_events:
|
||||
event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"])
|
||||
|
||||
|
|
|
@ -1365,10 +1365,10 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
# we need to make sure that, for every stream id in the results, we get *all*
|
||||
# the rows with that stream id.
|
||||
|
||||
rows = await self.db_pool.runInteraction(
|
||||
rows: List[Tuple] = await self.db_pool.runInteraction(
|
||||
"get_all_updated_current_state_deltas",
|
||||
get_all_updated_current_state_deltas_txn,
|
||||
) # type: List[Tuple]
|
||||
)
|
||||
|
||||
# if we've got fewer rows than the limit, we're good
|
||||
if len(rows) < target_row_count:
|
||||
|
@ -1469,7 +1469,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
mapping = {}
|
||||
txn_id_to_event = {} # type: Dict[Tuple[str, int, str], str]
|
||||
txn_id_to_event: Dict[Tuple[str, int, str], str] = {}
|
||||
|
||||
for event in events:
|
||||
token_id = getattr(event.internal_metadata, "token_id", None)
|
||||
|
|
|
@ -27,8 +27,11 @@ from synapse.util import json_encoder
|
|||
_DEFAULT_CATEGORY_ID = ""
|
||||
_DEFAULT_ROLE_ID = ""
|
||||
|
||||
|
||||
# A room in a group.
|
||||
_RoomInGroup = TypedDict("_RoomInGroup", {"room_id": str, "is_public": bool})
|
||||
class _RoomInGroup(TypedDict):
|
||||
room_id: str
|
||||
is_public: bool
|
||||
|
||||
|
||||
class GroupServerWorkerStore(SQLBaseStore):
|
||||
|
@ -92,6 +95,7 @@ class GroupServerWorkerStore(SQLBaseStore):
|
|||
"is_public": False # Whether this is a public room or not
|
||||
}
|
||||
"""
|
||||
|
||||
# TODO: Pagination
|
||||
|
||||
def _get_rooms_in_group_txn(txn):
|
||||
|
|
|
@ -78,7 +78,11 @@ class LockStore(SQLBaseStore):
|
|||
"""Called when the server is shutting down"""
|
||||
logger.info("Dropping held locks due to shutdown")
|
||||
|
||||
for (lock_name, lock_key), token in self._live_tokens.items():
|
||||
# We need to take a copy of the tokens dict as dropping the locks will
|
||||
# cause the dictionary to change.
|
||||
tokens = dict(self._live_tokens)
|
||||
|
||||
for (lock_name, lock_key), token in tokens.items():
|
||||
await self._drop_lock(lock_name, lock_key, token)
|
||||
|
||||
logger.info("Dropped locks due to shutdown")
|
||||
|
|
|
@ -316,11 +316,140 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
|||
|
||||
return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
|
||||
|
||||
async def count_r30v2_users(self) -> Dict[str, int]:
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as users that:
|
||||
- Appear more than once in the past 60 days
|
||||
- Have more than 30 days between the most and least recent appearances that
|
||||
occurred in the past 60 days.
|
||||
|
||||
(This is the second version of this metric, hence R30'v2')
|
||||
|
||||
Returns:
|
||||
A mapping from client type to the number of 30-day retained users for that client.
|
||||
|
||||
The dict keys are:
|
||||
- "all" (a combined number of users across any and all clients)
|
||||
- "android" (Element Android)
|
||||
- "ios" (Element iOS)
|
||||
- "electron" (Element Desktop)
|
||||
- "web" (any web application -- it's not possible to distinguish Element Web here)
|
||||
"""
|
||||
|
||||
def _count_r30v2_users(txn):
|
||||
thirty_days_in_secs = 86400 * 30
|
||||
now = int(self._clock.time())
|
||||
sixty_days_ago_in_secs = now - 2 * thirty_days_in_secs
|
||||
one_day_from_now_in_secs = now + 86400
|
||||
|
||||
# This is the 'per-platform' count.
|
||||
sql = """
|
||||
SELECT
|
||||
client_type,
|
||||
count(client_type)
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
user_id,
|
||||
CASE
|
||||
WHEN
|
||||
LOWER(user_agent) LIKE '%%riot%%' OR
|
||||
LOWER(user_agent) LIKE '%%element%%'
|
||||
THEN CASE
|
||||
WHEN
|
||||
LOWER(user_agent) LIKE '%%electron%%'
|
||||
THEN 'electron'
|
||||
WHEN
|
||||
LOWER(user_agent) LIKE '%%android%%'
|
||||
THEN 'android'
|
||||
WHEN
|
||||
LOWER(user_agent) LIKE '%%ios%%'
|
||||
THEN 'ios'
|
||||
ELSE 'unknown'
|
||||
END
|
||||
WHEN
|
||||
LOWER(user_agent) LIKE '%%mozilla%%' OR
|
||||
LOWER(user_agent) LIKE '%%gecko%%'
|
||||
THEN 'web'
|
||||
ELSE 'unknown'
|
||||
END as client_type
|
||||
FROM
|
||||
user_daily_visits
|
||||
WHERE
|
||||
timestamp > ?
|
||||
AND
|
||||
timestamp < ?
|
||||
GROUP BY
|
||||
user_id,
|
||||
client_type
|
||||
HAVING
|
||||
max(timestamp) - min(timestamp) > ?
|
||||
) AS temp
|
||||
GROUP BY
|
||||
client_type
|
||||
;
|
||||
"""
|
||||
|
||||
# We initialise all the client types to zero, so we get an explicit
|
||||
# zero if they don't appear in the query results
|
||||
results = {"ios": 0, "android": 0, "web": 0, "electron": 0}
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
sixty_days_ago_in_secs * 1000,
|
||||
one_day_from_now_in_secs * 1000,
|
||||
thirty_days_in_secs * 1000,
|
||||
),
|
||||
)
|
||||
|
||||
for row in txn:
|
||||
if row[0] == "unknown":
|
||||
continue
|
||||
results[row[0]] = row[1]
|
||||
|
||||
# This is the 'all users' count.
|
||||
sql = """
|
||||
SELECT COUNT(*) FROM (
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
user_daily_visits
|
||||
WHERE
|
||||
timestamp > ?
|
||||
AND
|
||||
timestamp < ?
|
||||
GROUP BY
|
||||
user_id
|
||||
HAVING
|
||||
max(timestamp) - min(timestamp) > ?
|
||||
) AS r30_users
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
sixty_days_ago_in_secs * 1000,
|
||||
one_day_from_now_in_secs * 1000,
|
||||
thirty_days_in_secs * 1000,
|
||||
),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is None:
|
||||
results["all"] = 0
|
||||
else:
|
||||
results["all"] = row[0]
|
||||
|
||||
return results
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"count_r30v2_users", _count_r30v2_users
|
||||
)
|
||||
|
||||
def _get_start_of_day(self):
|
||||
"""
|
||||
Returns millisecond unixtime for start of UTC day.
|
||||
"""
|
||||
now = time.gmtime()
|
||||
now = time.gmtime(self._clock.time())
|
||||
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
|
||||
return today_start * 1000
|
||||
|
||||
|
@ -352,7 +481,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
|||
) udv
|
||||
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
|
||||
INNER JOIN users ON users.name=u.user_id
|
||||
WHERE last_seen > ? AND last_seen <= ?
|
||||
WHERE ? <= last_seen AND last_seen < ?
|
||||
AND udv.timestamp IS NULL AND users.is_guest=0
|
||||
AND users.appservice_id IS NULL
|
||||
GROUP BY u.user_id, u.device_id
|
||||
|
|
|
@ -115,7 +115,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||
logger.info("[purge] looking for events to delete")
|
||||
|
||||
should_delete_expr = "state_key IS NULL"
|
||||
should_delete_params = () # type: Tuple[Any, ...]
|
||||
should_delete_params: Tuple[Any, ...] = ()
|
||||
if not delete_local_events:
|
||||
should_delete_expr += " AND event_id NOT LIKE ?"
|
||||
|
||||
|
@ -215,6 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||
"event_relations",
|
||||
"event_search",
|
||||
"rejections",
|
||||
"redactions",
|
||||
):
|
||||
logger.info("[purge] removing events from %s", table)
|
||||
|
||||
|
@ -392,7 +393,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||
"room_memberships",
|
||||
"room_stats_state",
|
||||
"room_stats_current",
|
||||
"room_stats_historical",
|
||||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
|
|
|
@ -79,9 +79,9 @@ class PushRulesWorkerStore(
|
|||
super().__init__(database, db_conn, hs)
|
||||
|
||||
if hs.config.worker.worker_app is None:
|
||||
self._push_rules_stream_id_gen = StreamIdGenerator(
|
||||
db_conn, "push_rules_stream", "stream_id"
|
||||
) # type: Union[StreamIdGenerator, SlavedIdTracker]
|
||||
self._push_rules_stream_id_gen: Union[
|
||||
StreamIdGenerator, SlavedIdTracker
|
||||
] = StreamIdGenerator(db_conn, "push_rules_stream", "stream_id")
|
||||
else:
|
||||
self._push_rules_stream_id_gen = SlavedIdTracker(
|
||||
db_conn, "push_rules_stream", "stream_id"
|
||||
|
|
|
@ -1744,7 +1744,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
|
|||
|
||||
items = keyvalues.items()
|
||||
where_clause = " AND ".join(k + " = ?" for k, _ in items)
|
||||
values = [v for _, v in items] # type: List[Union[str, int]]
|
||||
values: List[Union[str, int]] = [v for _, v in items]
|
||||
# Conveniently, refresh_tokens and access_tokens both use the user_id and device_id fields. Only caveat
|
||||
# is the `except_token_id` param that is tricky to get right, so for now we're just using the same where
|
||||
# clause and values before we handle that. This seems to be only used in the "set password" handler.
|
||||
|
|
|
@ -25,6 +25,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
|
|||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main.search import SearchStore
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict, ThirdPartyInstanceID
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
@ -1022,10 +1023,22 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
|
||||
class RoomBackgroundUpdateStore(SQLBaseStore):
|
||||
class _BackgroundUpdates:
|
||||
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
|
||||
ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
|
||||
POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
|
||||
REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
|
||||
|
||||
|
||||
_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
|
||||
"DROP TRIGGER populate_min_depth2_trigger ON room_depth",
|
||||
"DROP FUNCTION populate_min_depth2()",
|
||||
"ALTER TABLE room_depth DROP COLUMN min_depth",
|
||||
"ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth",
|
||||
)
|
||||
|
||||
|
||||
class RoomBackgroundUpdateStore(SQLBaseStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
|
@ -1037,15 +1050,25 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
|
||||
_BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
|
||||
self._remove_tombstoned_rooms_from_directory,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
self.ADD_ROOMS_ROOM_VERSION_COLUMN,
|
||||
_BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
|
||||
self._background_add_rooms_room_version_column,
|
||||
)
|
||||
|
||||
# BG updates to change the type of room_depth.min_depth
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
|
||||
self._background_populate_room_depth_min_depth2,
|
||||
)
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
|
||||
self._background_replace_room_depth_min_depth,
|
||||
)
|
||||
|
||||
async def _background_insert_retention(self, progress, batch_size):
|
||||
"""Retrieves a list of all rooms within a range and inserts an entry for each of
|
||||
them into the room_retention table.
|
||||
|
@ -1164,7 +1187,9 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
|||
new_last_room_id = room_id
|
||||
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id}
|
||||
txn,
|
||||
_BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
|
||||
{"room_id": new_last_room_id},
|
||||
)
|
||||
|
||||
return False
|
||||
|
@ -1176,7 +1201,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
if end:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
self.ADD_ROOMS_ROOM_VERSION_COLUMN
|
||||
_BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
@ -1215,7 +1240,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
if not rooms:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
|
||||
_BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
|
||||
)
|
||||
return 0
|
||||
|
||||
|
@ -1224,7 +1249,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
|||
await self.set_room_is_public(room_id, False)
|
||||
|
||||
await self.db_pool.updates._background_update_progress(
|
||||
self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
|
||||
_BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
|
||||
)
|
||||
|
||||
return len(rooms)
|
||||
|
@ -1268,6 +1293,71 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
return max_ordering is None
|
||||
|
||||
async def _background_populate_room_depth_min_depth2(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""Populate room_depth.min_depth2
|
||||
|
||||
This is to deal with the fact that min_depth was initially created as a
|
||||
32-bit integer field.
|
||||
"""
|
||||
|
||||
def process(txn: Cursor) -> int:
|
||||
last_room = progress.get("last_room", "")
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE room_depth SET min_depth2=min_depth
|
||||
WHERE room_id IN (
|
||||
SELECT room_id FROM room_depth WHERE room_id > ?
|
||||
ORDER BY room_id LIMIT ?
|
||||
)
|
||||
RETURNING room_id;
|
||||
""",
|
||||
(last_room, batch_size),
|
||||
)
|
||||
row_count = txn.rowcount
|
||||
if row_count == 0:
|
||||
return 0
|
||||
last_room = max(row[0] for row in txn)
|
||||
logger.info("populated room_depth up to %s", last_room)
|
||||
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
|
||||
{"last_room": last_room},
|
||||
)
|
||||
return row_count
|
||||
|
||||
result = await self.db_pool.runInteraction(
|
||||
"_background_populate_min_depth2", process
|
||||
)
|
||||
|
||||
if result != 0:
|
||||
return result
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2
|
||||
)
|
||||
return 0
|
||||
|
||||
async def _background_replace_room_depth_min_depth(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""Drop the old 'min_depth' column and rename 'min_depth2' into its place."""
|
||||
|
||||
def process(txn: Cursor) -> None:
|
||||
for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS:
|
||||
logger.info("completing room_depth migration: %s", sql)
|
||||
txn.execute(sql)
|
||||
|
||||
await self.db_pool.runInteraction("_background_replace_room_depth", process)
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
|
||||
)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
|
|
|
@ -649,7 +649,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
event_to_memberships = await self._get_joined_profiles_from_event_ids(
|
||||
missing_member_event_ids
|
||||
)
|
||||
users_in_room.update((row for row in event_to_memberships.values() if row))
|
||||
users_in_room.update(row for row in event_to_memberships.values() if row)
|
||||
|
||||
if event is not None and event.type == EventTypes.Member:
|
||||
if event.membership == Membership.JOIN:
|
||||
|
@ -703,13 +703,22 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
|
||||
@cached(max_entries=10000)
|
||||
async def is_host_joined(self, room_id: str, host: str) -> bool:
|
||||
return await self._check_host_room_membership(room_id, host, Membership.JOIN)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
async def is_host_invited(self, room_id: str, host: str) -> bool:
|
||||
return await self._check_host_room_membership(room_id, host, Membership.INVITE)
|
||||
|
||||
async def _check_host_room_membership(
|
||||
self, room_id: str, host: str, membership: str
|
||||
) -> bool:
|
||||
if "%" in host or "_" in host:
|
||||
raise Exception("Invalid host name")
|
||||
|
||||
sql = """
|
||||
SELECT state_key FROM current_state_events AS c
|
||||
INNER JOIN room_memberships AS m USING (event_id)
|
||||
WHERE m.membership = 'join'
|
||||
WHERE m.membership = ?
|
||||
AND type = 'm.room.member'
|
||||
AND c.room_id = ?
|
||||
AND state_key LIKE ?
|
||||
|
@ -722,7 +731,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
like_clause = "%:" + host
|
||||
|
||||
rows = await self.db_pool.execute(
|
||||
"is_host_joined", None, sql, room_id, like_clause
|
||||
"is_host_joined", None, sql, membership, room_id, like_clause
|
||||
)
|
||||
|
||||
if not rows:
|
||||
|
|
|
@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership
|
|||
from synapse.api.errors import StoreError
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
|
@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = {
|
|||
"user": ("joined_rooms",),
|
||||
}
|
||||
|
||||
# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
||||
# You can draw these stats on a histogram.
|
||||
# Example: number of events sent locally during a time slice
|
||||
PER_SLICE_FIELDS = {
|
||||
"room": ("total_events", "total_event_bytes"),
|
||||
"user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
|
||||
}
|
||||
|
||||
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
|
||||
# these are the tables (& ID columns) which contain our actual subjects
|
||||
|
@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore):
|
|||
self.server_name = hs.hostname
|
||||
self.clock = self.hs.get_clock()
|
||||
self.stats_enabled = hs.config.stats_enabled
|
||||
self.stats_bucket_size = hs.config.stats_bucket_size
|
||||
|
||||
self.stats_delta_processing_lock = DeferredLock()
|
||||
|
||||
|
@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore):
|
|||
self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
|
||||
self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
|
||||
|
||||
def quantise_stats_time(self, ts):
|
||||
"""
|
||||
Quantises a timestamp to be a multiple of the bucket size.
|
||||
|
||||
Args:
|
||||
ts (int): the timestamp to quantise, in milliseconds since the Unix
|
||||
Epoch
|
||||
|
||||
Returns:
|
||||
int: a timestamp which
|
||||
- is divisible by the bucket size;
|
||||
- is no later than `ts`; and
|
||||
- is the largest such timestamp.
|
||||
"""
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
async def _populate_stats_process_users(self, progress, batch_size):
|
||||
"""
|
||||
This is a background update which regenerates statistics for users.
|
||||
|
@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore):
|
|||
desc="update_room_state",
|
||||
)
|
||||
|
||||
async def get_statistics_for_subject(
|
||||
self, stats_type: str, stats_id: str, start: str, size: int = 100
|
||||
) -> List[dict]:
|
||||
"""
|
||||
Get statistics for a given subject.
|
||||
|
||||
Args:
|
||||
stats_type: The type of subject
|
||||
stats_id: The ID of the subject (e.g. room_id or user_id)
|
||||
start: Pagination start. Number of entries, not timestamp.
|
||||
size: How many entries to return.
|
||||
|
||||
Returns:
|
||||
A list of dicts, where the dict has the keys of
|
||||
ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
|
||||
"""
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_statistics_for_subject",
|
||||
self._get_statistics_for_subject_txn,
|
||||
stats_type,
|
||||
stats_id,
|
||||
start,
|
||||
size,
|
||||
)
|
||||
|
||||
def _get_statistics_for_subject_txn(
|
||||
self, txn, stats_type, stats_id, start, size=100
|
||||
):
|
||||
"""
|
||||
Transaction-bound version of L{get_statistics_for_subject}.
|
||||
"""
|
||||
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
selected_columns = list(
|
||||
ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
|
||||
)
|
||||
|
||||
slice_list = self.db_pool.simple_select_list_paginate_txn(
|
||||
txn,
|
||||
table + "_historical",
|
||||
"end_ts",
|
||||
start,
|
||||
size,
|
||||
retcols=selected_columns + ["bucket_size", "end_ts"],
|
||||
keyvalues={id_col: stats_id},
|
||||
order_direction="DESC",
|
||||
)
|
||||
|
||||
return slice_list
|
||||
|
||||
@cached()
|
||||
async def get_earliest_token_for_stats(
|
||||
self, stats_type: str, id: str
|
||||
|
@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore):
|
|||
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
quantised_ts = self.quantise_stats_time(int(ts))
|
||||
end_ts = quantised_ts + self.stats_bucket_size
|
||||
|
||||
# Lets be paranoid and check that all the given field names are known
|
||||
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
slice_field_names = PER_SLICE_FIELDS[stats_type]
|
||||
for field in chain(fields.keys(), absolute_field_overrides.keys()):
|
||||
if field not in abs_field_names and field not in slice_field_names:
|
||||
if field not in abs_field_names:
|
||||
# guard against potential SQL injection dodginess
|
||||
raise ValueError(
|
||||
"%s is not a recognised field"
|
||||
|
@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore):
|
|||
additive_relatives=deltas_of_absolute_fields,
|
||||
)
|
||||
|
||||
per_slice_additive_relatives = {
|
||||
key: fields.get(key, 0) for key in slice_field_names
|
||||
}
|
||||
self._upsert_copy_from_table_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
into_table=table + "_historical",
|
||||
keyvalues={id_col: stats_id},
|
||||
extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
|
||||
extra_dst_keyvalues={"end_ts": end_ts},
|
||||
additive_relatives=per_slice_additive_relatives,
|
||||
src_table=table + "_current",
|
||||
copy_columns=abs_field_names,
|
||||
)
|
||||
|
||||
def _upsert_with_additive_relatives_txn(
|
||||
self, txn, table, keyvalues, absolutes, additive_relatives
|
||||
):
|
||||
|
@ -528,7 +434,7 @@ class StatsStore(StateDeltasStore):
|
|||
]
|
||||
|
||||
relative_updates = [
|
||||
"%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
|
||||
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
|
||||
% {"table": table, "field": field}
|
||||
for field in additive_relatives.keys()
|
||||
]
|
||||
|
@ -568,205 +474,13 @@ class StatsStore(StateDeltasStore):
|
|||
self.db_pool.simple_insert_txn(txn, table, merged_dict)
|
||||
else:
|
||||
for (key, val) in additive_relatives.items():
|
||||
current_row[key] += val
|
||||
if current_row[key] is None:
|
||||
current_row[key] = val
|
||||
else:
|
||||
current_row[key] += val
|
||||
current_row.update(absolutes)
|
||||
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
|
||||
|
||||
def _upsert_copy_from_table_with_additive_relatives_txn(
|
||||
self,
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
additive_relatives,
|
||||
src_table,
|
||||
copy_columns,
|
||||
):
|
||||
"""Updates the historic stats table with latest updates.
|
||||
|
||||
This involves copying "absolute" fields from the `_current` table, and
|
||||
adding relative fields to any existing values.
|
||||
|
||||
Args:
|
||||
txn: Transaction
|
||||
into_table (str): The destination table to UPSERT the row into
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
|
||||
for `into_table`.
|
||||
extra_dst_insvalues (dict[str, any]): Additional values to insert
|
||||
on new row creation for `into_table`.
|
||||
additive_relatives (dict[str, any]): Fields that will be added onto
|
||||
if existing row present. (Must be disjoint from copy_columns.)
|
||||
src_table (str): The source table to copy from
|
||||
copy_columns (iterable[str]): The list of columns to copy
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
ins_columns = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
additive_relatives,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
)
|
||||
sel_exprs = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
(
|
||||
"?"
|
||||
for _ in chain(
|
||||
additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
|
||||
)
|
||||
),
|
||||
)
|
||||
keyvalues_where = ("%s = ?" % f for f in keyvalues)
|
||||
|
||||
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
|
||||
sets_ar = (
|
||||
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
|
||||
for f in additive_relatives
|
||||
)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(into_table)s (%(ins_columns)s)
|
||||
SELECT %(sel_exprs)s
|
||||
FROM %(src_table)s
|
||||
WHERE %(keyvalues_where)s
|
||||
ON CONFLICT (%(keyvalues)s)
|
||||
DO UPDATE SET %(sets)s
|
||||
""" % {
|
||||
"into_table": into_table,
|
||||
"ins_columns": ", ".join(ins_columns),
|
||||
"sel_exprs": ", ".join(sel_exprs),
|
||||
"keyvalues_where": " AND ".join(keyvalues_where),
|
||||
"src_table": src_table,
|
||||
"keyvalues": ", ".join(
|
||||
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
|
||||
),
|
||||
"sets": ", ".join(chain(sets_cc, sets_ar)),
|
||||
}
|
||||
|
||||
qargs = list(
|
||||
chain(
|
||||
additive_relatives.values(),
|
||||
extra_dst_keyvalues.values(),
|
||||
extra_dst_insvalues.values(),
|
||||
keyvalues.values(),
|
||||
)
|
||||
)
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, into_table)
|
||||
src_row = self.db_pool.simple_select_one_txn(
|
||||
txn, src_table, keyvalues, copy_columns
|
||||
)
|
||||
all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
|
||||
dest_current_row = self.db_pool.simple_select_one_txn(
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues=all_dest_keyvalues,
|
||||
retcols=list(chain(additive_relatives.keys(), copy_columns)),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if dest_current_row is None:
|
||||
merged_dict = {
|
||||
**keyvalues,
|
||||
**extra_dst_keyvalues,
|
||||
**extra_dst_insvalues,
|
||||
**src_row,
|
||||
**additive_relatives,
|
||||
}
|
||||
self.db_pool.simple_insert_txn(txn, into_table, merged_dict)
|
||||
else:
|
||||
for (key, val) in additive_relatives.items():
|
||||
src_row[key] = dest_current_row[key] + val
|
||||
self.db_pool.simple_update_txn(
|
||||
txn, into_table, all_dest_keyvalues, src_row
|
||||
)
|
||||
|
||||
async def get_changes_room_total_events_and_bytes(
|
||||
self, min_pos: int, max_pos: int
|
||||
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
|
||||
"""Fetches the counts of events in the given range of stream IDs.
|
||||
|
||||
Args:
|
||||
min_pos
|
||||
max_pos
|
||||
|
||||
Returns:
|
||||
Mapping of room ID to field changes.
|
||||
"""
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"stats_incremental_total_events_and_bytes",
|
||||
self.get_changes_room_total_events_and_bytes_txn,
|
||||
min_pos,
|
||||
max_pos,
|
||||
)
|
||||
|
||||
def get_changes_room_total_events_and_bytes_txn(
|
||||
self, txn, low_pos: int, high_pos: int
|
||||
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
|
||||
"""Gets the total_events and total_event_bytes counts for rooms and
|
||||
senders, in a range of stream_orderings (including backfilled events).
|
||||
|
||||
Args:
|
||||
txn
|
||||
low_pos: Low stream ordering
|
||||
high_pos: High stream ordering
|
||||
|
||||
Returns:
|
||||
The room and user deltas for total_events/total_event_bytes in the
|
||||
format of `stats_id` -> fields
|
||||
"""
|
||||
|
||||
if low_pos >= high_pos:
|
||||
# nothing to do here.
|
||||
return {}, {}
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
new_bytes_expression = "OCTET_LENGTH(json)"
|
||||
else:
|
||||
new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
|
||||
|
||||
sql = """
|
||||
SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
||||
FROM events INNER JOIN event_json USING (event_id)
|
||||
WHERE (? < stream_ordering AND stream_ordering <= ?)
|
||||
OR (? <= stream_ordering AND stream_ordering <= ?)
|
||||
GROUP BY events.room_id
|
||||
""" % (
|
||||
new_bytes_expression,
|
||||
)
|
||||
|
||||
txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
|
||||
|
||||
room_deltas = {
|
||||
room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
|
||||
for room_id, new_events, new_bytes in txn
|
||||
}
|
||||
|
||||
sql = """
|
||||
SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
||||
FROM events INNER JOIN event_json USING (event_id)
|
||||
WHERE (? < stream_ordering AND stream_ordering <= ?)
|
||||
OR (? <= stream_ordering AND stream_ordering <= ?)
|
||||
GROUP BY events.sender
|
||||
""" % (
|
||||
new_bytes_expression,
|
||||
)
|
||||
|
||||
txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
|
||||
|
||||
user_deltas = {
|
||||
user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
|
||||
for user_id, new_events, new_bytes in txn
|
||||
if self.hs.is_mine_id(user_id)
|
||||
}
|
||||
|
||||
return room_deltas, user_deltas
|
||||
|
||||
async def _calculate_and_set_initial_state_for_room(
|
||||
self, room_id: str
|
||||
) -> Tuple[dict, dict, int]:
|
||||
|
@ -893,6 +607,7 @@ class StatsStore(StateDeltasStore):
|
|||
"invited_members": membership_counts.get(Membership.INVITE, 0),
|
||||
"left_members": membership_counts.get(Membership.LEAVE, 0),
|
||||
"banned_members": membership_counts.get(Membership.BAN, 0),
|
||||
"knocked_members": membership_counts.get(Membership.KNOCK, 0),
|
||||
"local_users_in_room": len(local_users_in_room),
|
||||
},
|
||||
)
|
||||
|
|
|
@ -1085,9 +1085,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
|
|||
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
|
||||
# then filtering the results.
|
||||
if from_token.topological is not None:
|
||||
from_bound = (
|
||||
from_token.as_historical_tuple()
|
||||
) # type: Tuple[Optional[int], int]
|
||||
from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple()
|
||||
elif direction == "b":
|
||||
from_bound = (
|
||||
None,
|
||||
|
@ -1099,7 +1097,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
|
|||
from_token.stream,
|
||||
)
|
||||
|
||||
to_bound = None # type: Optional[Tuple[Optional[int], int]]
|
||||
to_bound: Optional[Tuple[Optional[int], int]] = None
|
||||
if to_token:
|
||||
if to_token.topological is not None:
|
||||
to_bound = to_token.as_historical_tuple()
|
||||
|
|
|
@ -42,7 +42,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
|||
"room_tags", {"user_id": user_id}, ["room_id", "tag", "content"]
|
||||
)
|
||||
|
||||
tags_by_room = {} # type: Dict[str, Dict[str, JsonDict]]
|
||||
tags_by_room: Dict[str, Dict[str, JsonDict]] = {}
|
||||
for row in rows:
|
||||
room_tags = tags_by_room.setdefault(row["room_id"], {})
|
||||
room_tags[row["tag"]] = db_to_json(row["content"])
|
||||
|
|
|
@ -224,12 +224,12 @@ class UIAuthWorkerStore(SQLBaseStore):
|
|||
self, txn: LoggingTransaction, session_id: str, key: str, value: Any
|
||||
):
|
||||
# Get the current value.
|
||||
result = self.db_pool.simple_select_one_txn(
|
||||
result: Dict[str, Any] = self.db_pool.simple_select_one_txn( # type: ignore
|
||||
txn,
|
||||
table="ui_auth_sessions",
|
||||
keyvalues={"session_id": session_id},
|
||||
retcols=("serverdict",),
|
||||
) # type: Dict[str, Any] # type: ignore
|
||||
)
|
||||
|
||||
# Update it and add it back to the database.
|
||||
serverdict = db_to_json(result["serverdict"])
|
||||
|
|
|
@ -307,7 +307,7 @@ class EventsPersistenceStorage:
|
|||
matched the transcation ID; the existing event is returned in such
|
||||
a case.
|
||||
"""
|
||||
partitioned = {} # type: Dict[str, List[Tuple[EventBase, EventContext]]]
|
||||
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
||||
for event, ctx in events_and_contexts:
|
||||
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
||||
|
||||
|
@ -384,7 +384,7 @@ class EventsPersistenceStorage:
|
|||
A dictionary of event ID to event ID we didn't persist as we already
|
||||
had another event persisted with the same TXN ID.
|
||||
"""
|
||||
replaced_events = {} # type: Dict[str, str]
|
||||
replaced_events: Dict[str, str] = {}
|
||||
if not events_and_contexts:
|
||||
return replaced_events
|
||||
|
||||
|
@ -440,16 +440,14 @@ class EventsPersistenceStorage:
|
|||
# Set of remote users which were in rooms the server has left. We
|
||||
# should check if we still share any rooms and if not we mark their
|
||||
# device lists as stale.
|
||||
potentially_left_users = set() # type: Set[str]
|
||||
potentially_left_users: Set[str] = set()
|
||||
|
||||
if not backfilled:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
# Work out the new "current state" for each room.
|
||||
# We do this by working out what the new extremities are and then
|
||||
# calculating the state from that.
|
||||
events_by_room = (
|
||||
{}
|
||||
) # type: Dict[str, List[Tuple[EventBase, EventContext]]]
|
||||
events_by_room: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
||||
for event, context in chunk:
|
||||
events_by_room.setdefault(event.room_id, []).append(
|
||||
(event, context)
|
||||
|
@ -622,9 +620,9 @@ class EventsPersistenceStorage:
|
|||
)
|
||||
|
||||
# Remove any events which are prev_events of any existing events.
|
||||
existing_prevs = await self.persist_events_store._get_events_which_are_prevs(
|
||||
result
|
||||
) # type: Collection[str]
|
||||
existing_prevs: Collection[
|
||||
str
|
||||
] = await self.persist_events_store._get_events_which_are_prevs(result)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
# Finally handle the case where the new events have soft-failed prev
|
||||
|
|
|
@ -256,7 +256,7 @@ def _setup_new_database(
|
|||
for database in databases
|
||||
)
|
||||
|
||||
directory_entries = [] # type: List[_DirectoryListing]
|
||||
directory_entries: List[_DirectoryListing] = []
|
||||
for directory in directories:
|
||||
directory_entries.extend(
|
||||
_DirectoryListing(file_name, os.path.join(directory, file_name))
|
||||
|
@ -424,10 +424,10 @@ def _upgrade_existing_database(
|
|||
directories.append(os.path.join(schema_path, database, "delta", str(v)))
|
||||
|
||||
# Used to check if we have any duplicate file names
|
||||
file_name_counter = Counter() # type: CounterType[str]
|
||||
file_name_counter: CounterType[str] = Counter()
|
||||
|
||||
# Now find which directories have anything of interest.
|
||||
directory_entries = [] # type: List[_DirectoryListing]
|
||||
directory_entries: List[_DirectoryListing] = []
|
||||
for directory in directories:
|
||||
logger.debug("Looking for schema deltas in %s", directory)
|
||||
try:
|
||||
|
@ -639,7 +639,7 @@ def get_statements(f: Iterable[str]) -> Generator[str, None, None]:
|
|||
|
||||
|
||||
def executescript(txn: Cursor, schema_path: str) -> None:
|
||||
with open(schema_path, "r") as f:
|
||||
with open(schema_path) as f:
|
||||
execute_statements_from_stream(txn, f)
|
||||
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
SCHEMA_VERSION = 60
|
||||
SCHEMA_VERSION = 61
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
|
@ -21,6 +21,10 @@ older versions of Synapse).
|
|||
|
||||
See `README.md <synapse/storage/schema/README.md>`_ for more information on how this
|
||||
works.
|
||||
|
||||
Changes in SCHEMA_VERSION = 61:
|
||||
- The `user_stats_historical` and `room_stats_historical` tables are not written and
|
||||
are not read (previously, they were written but not read).
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- we use bigint elsewhere in the database for appservice txn ids (notably,
|
||||
-- application_services_state.last_txn), and generally we use bigints everywhere else
|
||||
-- we have monotonic counters, so let's bring this one in line.
|
||||
--
|
||||
-- assuming there aren't thousands of rows for decommisioned/non-functional ASes, this
|
||||
-- table should be pretty small, so safe to do a synchronous ALTER TABLE.
|
||||
|
||||
ALTER TABLE application_services_txns ALTER COLUMN txn_id SET DATA TYPE BIGINT;
|
|
@ -0,0 +1,18 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- this index is redundant; there is another UNIQUE index on this table.
|
||||
DROP INDEX IF EXISTS room_depth_room;
|
||||
|
70
synapse/storage/schema/main/delta/61/03recreate_min_depth.py
Normal file
70
synapse/storage/schema/main/delta/61/03recreate_min_depth.py
Normal file
|
@ -0,0 +1,70 @@
|
|||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
This migration handles the process of changing the type of `room_depth.min_depth` to
|
||||
a BIGINT.
|
||||
"""
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
|
||||
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
|
||||
if not isinstance(database_engine, PostgresEngine):
|
||||
# this only applies to postgres - sqlite does not distinguish between big and
|
||||
# little ints.
|
||||
return
|
||||
|
||||
# First add a new column to contain the bigger min_depth
|
||||
cur.execute("ALTER TABLE room_depth ADD COLUMN min_depth2 BIGINT")
|
||||
|
||||
# Create a trigger which will keep it populated.
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION populate_min_depth2() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
new.min_depth2 := new.min_depth;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql
|
||||
"""
|
||||
)
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TRIGGER populate_min_depth2_trigger BEFORE INSERT OR UPDATE ON room_depth
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE populate_min_depth2()
|
||||
"""
|
||||
)
|
||||
|
||||
# Start a bg process to populate it for old rooms
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(6103, 'populate_room_depth_min_depth2', '{}')
|
||||
"""
|
||||
)
|
||||
|
||||
# and another to switch them over once it completes.
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
(6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2')
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
|
||||
pass
|
|
@ -0,0 +1,34 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
-- By default the postgres statistics collector massively underestimates the
|
||||
-- number of distinct state groups are in the `state_groups_state`, which can
|
||||
-- cause postgres to use table scans for queries for multiple state groups.
|
||||
--
|
||||
-- To work around this we can manually tell postgres the number of distinct state
|
||||
-- groups there are by setting `n_distinct` (a negative value here is the number
|
||||
-- of distinct values divided by the number of rows, so -0.02 means on average
|
||||
-- there are 50 rows per distinct value). We don't need a particularly
|
||||
-- accurate number here, as a) we just want it to always use index scans and b)
|
||||
-- our estimate is going to be better than the one made by the statistics
|
||||
-- collector.
|
||||
|
||||
ALTER TABLE state_groups_state ALTER COLUMN state_group SET (n_distinct = -0.02);
|
||||
|
||||
-- Ideally we'd do an `ANALYZE state_groups_state (state_group)` here so that
|
||||
-- the above gets picked up immediately, but that can take a bit of time so we
|
||||
-- rely on the autovacuum eventually getting run and doing that in the
|
||||
-- background for us.
|
|
@ -91,7 +91,7 @@ class StateFilter:
|
|||
Returns:
|
||||
The new state filter.
|
||||
"""
|
||||
type_dict = {} # type: Dict[str, Optional[Set[str]]]
|
||||
type_dict: Dict[str, Optional[Set[str]]] = {}
|
||||
for typ, s in types:
|
||||
if typ in type_dict:
|
||||
if type_dict[typ] is None:
|
||||
|
@ -194,7 +194,7 @@ class StateFilter:
|
|||
"""
|
||||
|
||||
where_clause = ""
|
||||
where_args = [] # type: List[str]
|
||||
where_args: List[str] = []
|
||||
|
||||
if self.is_full():
|
||||
return where_clause, where_args
|
||||
|
|
|
@ -112,7 +112,7 @@ class StreamIdGenerator:
|
|||
# insertion ordering will ensure its in the correct ordering.
|
||||
#
|
||||
# The key and values are the same, but we never look at the values.
|
||||
self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int]
|
||||
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
|
||||
|
||||
def get_next(self):
|
||||
"""
|
||||
|
@ -236,15 +236,15 @@ class MultiWriterIdGenerator:
|
|||
# Note: If we are a negative stream then we still store all the IDs as
|
||||
# positive to make life easier for us, and simply negate the IDs when we
|
||||
# return them.
|
||||
self._current_positions = {} # type: Dict[str, int]
|
||||
self._current_positions: Dict[str, int] = {}
|
||||
|
||||
# Set of local IDs that we're still processing. The current position
|
||||
# should be less than the minimum of this set (if not empty).
|
||||
self._unfinished_ids = set() # type: Set[int]
|
||||
self._unfinished_ids: Set[int] = set()
|
||||
|
||||
# Set of local IDs that we've processed that are larger than the current
|
||||
# position, due to there being smaller unpersisted IDs.
|
||||
self._finished_ids = set() # type: Set[int]
|
||||
self._finished_ids: Set[int] = set()
|
||||
|
||||
# We track the max position where we know everything before has been
|
||||
# persisted. This is done by a) looking at the min across all instances
|
||||
|
@ -265,7 +265,7 @@ class MultiWriterIdGenerator:
|
|||
self._persisted_upto_position = (
|
||||
min(self._current_positions.values()) if self._current_positions else 1
|
||||
)
|
||||
self._known_persisted_positions = [] # type: List[int]
|
||||
self._known_persisted_positions: List[int] = []
|
||||
|
||||
self._sequence_gen = PostgresSequenceGenerator(sequence_name)
|
||||
|
||||
|
@ -465,7 +465,7 @@ class MultiWriterIdGenerator:
|
|||
self._unfinished_ids.discard(next_id)
|
||||
self._finished_ids.add(next_id)
|
||||
|
||||
new_cur = None # type: Optional[int]
|
||||
new_cur: Optional[int] = None
|
||||
|
||||
if self._unfinished_ids:
|
||||
# If there are unfinished IDs then the new position will be the
|
||||
|
|
|
@ -208,10 +208,10 @@ class LocalSequenceGenerator(SequenceGenerator):
|
|||
get_next_id_txn; should return the curreent maximum id
|
||||
"""
|
||||
# the callback. this is cleared after it is called, so that it can be GCed.
|
||||
self._callback = get_first_callback # type: Optional[GetFirstCallbackType]
|
||||
self._callback: Optional[GetFirstCallbackType] = get_first_callback
|
||||
|
||||
# The current max value, or None if we haven't looked in the DB yet.
|
||||
self._current_max_id = None # type: Optional[int]
|
||||
self._current_max_id: Optional[int] = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def get_next_id_txn(self, txn: Cursor) -> int:
|
||||
|
@ -274,7 +274,7 @@ def build_sequence_generator(
|
|||
`check_consistency` details.
|
||||
"""
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
seq = PostgresSequenceGenerator(sequence_name) # type: SequenceGenerator
|
||||
seq: SequenceGenerator = PostgresSequenceGenerator(sequence_name)
|
||||
else:
|
||||
seq = LocalSequenceGenerator(get_first_callback)
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue