mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-07 23:45:04 -04:00
Remove functionality associated with unused historical stats tables (#9721)
Fixes #9602
This commit is contained in:
parent
aa78064869
commit
f6767abc05
10 changed files with 22 additions and 572 deletions
|
@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership
|
|||
from synapse.api.errors import StoreError
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
|
@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = {
|
|||
"user": ("joined_rooms",),
|
||||
}
|
||||
|
||||
# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
||||
# You can draw these stats on a histogram.
|
||||
# Example: number of events sent locally during a time slice
|
||||
PER_SLICE_FIELDS = {
|
||||
"room": ("total_events", "total_event_bytes"),
|
||||
"user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
|
||||
}
|
||||
|
||||
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
|
||||
# these are the tables (& ID columns) which contain our actual subjects
|
||||
|
@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore):
|
|||
self.server_name = hs.hostname
|
||||
self.clock = self.hs.get_clock()
|
||||
self.stats_enabled = hs.config.stats_enabled
|
||||
self.stats_bucket_size = hs.config.stats_bucket_size
|
||||
|
||||
self.stats_delta_processing_lock = DeferredLock()
|
||||
|
||||
|
@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore):
|
|||
self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
|
||||
self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
|
||||
|
||||
def quantise_stats_time(self, ts):
|
||||
"""
|
||||
Quantises a timestamp to be a multiple of the bucket size.
|
||||
|
||||
Args:
|
||||
ts (int): the timestamp to quantise, in milliseconds since the Unix
|
||||
Epoch
|
||||
|
||||
Returns:
|
||||
int: a timestamp which
|
||||
- is divisible by the bucket size;
|
||||
- is no later than `ts`; and
|
||||
- is the largest such timestamp.
|
||||
"""
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
async def _populate_stats_process_users(self, progress, batch_size):
|
||||
"""
|
||||
This is a background update which regenerates statistics for users.
|
||||
|
@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore):
|
|||
desc="update_room_state",
|
||||
)
|
||||
|
||||
async def get_statistics_for_subject(
|
||||
self, stats_type: str, stats_id: str, start: str, size: int = 100
|
||||
) -> List[dict]:
|
||||
"""
|
||||
Get statistics for a given subject.
|
||||
|
||||
Args:
|
||||
stats_type: The type of subject
|
||||
stats_id: The ID of the subject (e.g. room_id or user_id)
|
||||
start: Pagination start. Number of entries, not timestamp.
|
||||
size: How many entries to return.
|
||||
|
||||
Returns:
|
||||
A list of dicts, where the dict has the keys of
|
||||
ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
|
||||
"""
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_statistics_for_subject",
|
||||
self._get_statistics_for_subject_txn,
|
||||
stats_type,
|
||||
stats_id,
|
||||
start,
|
||||
size,
|
||||
)
|
||||
|
||||
def _get_statistics_for_subject_txn(
|
||||
self, txn, stats_type, stats_id, start, size=100
|
||||
):
|
||||
"""
|
||||
Transaction-bound version of L{get_statistics_for_subject}.
|
||||
"""
|
||||
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
selected_columns = list(
|
||||
ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
|
||||
)
|
||||
|
||||
slice_list = self.db_pool.simple_select_list_paginate_txn(
|
||||
txn,
|
||||
table + "_historical",
|
||||
"end_ts",
|
||||
start,
|
||||
size,
|
||||
retcols=selected_columns + ["bucket_size", "end_ts"],
|
||||
keyvalues={id_col: stats_id},
|
||||
order_direction="DESC",
|
||||
)
|
||||
|
||||
return slice_list
|
||||
|
||||
@cached()
|
||||
async def get_earliest_token_for_stats(
|
||||
self, stats_type: str, id: str
|
||||
|
@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore):
|
|||
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
quantised_ts = self.quantise_stats_time(int(ts))
|
||||
end_ts = quantised_ts + self.stats_bucket_size
|
||||
|
||||
# Lets be paranoid and check that all the given field names are known
|
||||
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
slice_field_names = PER_SLICE_FIELDS[stats_type]
|
||||
for field in chain(fields.keys(), absolute_field_overrides.keys()):
|
||||
if field not in abs_field_names and field not in slice_field_names:
|
||||
if field not in abs_field_names:
|
||||
# guard against potential SQL injection dodginess
|
||||
raise ValueError(
|
||||
"%s is not a recognised field"
|
||||
|
@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore):
|
|||
additive_relatives=deltas_of_absolute_fields,
|
||||
)
|
||||
|
||||
per_slice_additive_relatives = {
|
||||
key: fields.get(key, 0) for key in slice_field_names
|
||||
}
|
||||
self._upsert_copy_from_table_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
into_table=table + "_historical",
|
||||
keyvalues={id_col: stats_id},
|
||||
extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
|
||||
extra_dst_keyvalues={"end_ts": end_ts},
|
||||
additive_relatives=per_slice_additive_relatives,
|
||||
src_table=table + "_current",
|
||||
copy_columns=abs_field_names,
|
||||
)
|
||||
|
||||
def _upsert_with_additive_relatives_txn(
|
||||
self, txn, table, keyvalues, absolutes, additive_relatives
|
||||
):
|
||||
|
@ -572,201 +478,6 @@ class StatsStore(StateDeltasStore):
|
|||
current_row.update(absolutes)
|
||||
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
|
||||
|
||||
def _upsert_copy_from_table_with_additive_relatives_txn(
|
||||
self,
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
additive_relatives,
|
||||
src_table,
|
||||
copy_columns,
|
||||
):
|
||||
"""Updates the historic stats table with latest updates.
|
||||
|
||||
This involves copying "absolute" fields from the `_current` table, and
|
||||
adding relative fields to any existing values.
|
||||
|
||||
Args:
|
||||
txn: Transaction
|
||||
into_table (str): The destination table to UPSERT the row into
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
|
||||
for `into_table`.
|
||||
extra_dst_insvalues (dict[str, any]): Additional values to insert
|
||||
on new row creation for `into_table`.
|
||||
additive_relatives (dict[str, any]): Fields that will be added onto
|
||||
if existing row present. (Must be disjoint from copy_columns.)
|
||||
src_table (str): The source table to copy from
|
||||
copy_columns (iterable[str]): The list of columns to copy
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
ins_columns = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
additive_relatives,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
)
|
||||
sel_exprs = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
(
|
||||
"?"
|
||||
for _ in chain(
|
||||
additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
|
||||
)
|
||||
),
|
||||
)
|
||||
keyvalues_where = ("%s = ?" % f for f in keyvalues)
|
||||
|
||||
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
|
||||
sets_ar = (
|
||||
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
|
||||
for f in additive_relatives
|
||||
)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(into_table)s (%(ins_columns)s)
|
||||
SELECT %(sel_exprs)s
|
||||
FROM %(src_table)s
|
||||
WHERE %(keyvalues_where)s
|
||||
ON CONFLICT (%(keyvalues)s)
|
||||
DO UPDATE SET %(sets)s
|
||||
""" % {
|
||||
"into_table": into_table,
|
||||
"ins_columns": ", ".join(ins_columns),
|
||||
"sel_exprs": ", ".join(sel_exprs),
|
||||
"keyvalues_where": " AND ".join(keyvalues_where),
|
||||
"src_table": src_table,
|
||||
"keyvalues": ", ".join(
|
||||
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
|
||||
),
|
||||
"sets": ", ".join(chain(sets_cc, sets_ar)),
|
||||
}
|
||||
|
||||
qargs = list(
|
||||
chain(
|
||||
additive_relatives.values(),
|
||||
extra_dst_keyvalues.values(),
|
||||
extra_dst_insvalues.values(),
|
||||
keyvalues.values(),
|
||||
)
|
||||
)
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, into_table)
|
||||
src_row = self.db_pool.simple_select_one_txn(
|
||||
txn, src_table, keyvalues, copy_columns
|
||||
)
|
||||
all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
|
||||
dest_current_row = self.db_pool.simple_select_one_txn(
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues=all_dest_keyvalues,
|
||||
retcols=list(chain(additive_relatives.keys(), copy_columns)),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if dest_current_row is None:
|
||||
merged_dict = {
|
||||
**keyvalues,
|
||||
**extra_dst_keyvalues,
|
||||
**extra_dst_insvalues,
|
||||
**src_row,
|
||||
**additive_relatives,
|
||||
}
|
||||
self.db_pool.simple_insert_txn(txn, into_table, merged_dict)
|
||||
else:
|
||||
for (key, val) in additive_relatives.items():
|
||||
src_row[key] = dest_current_row[key] + val
|
||||
self.db_pool.simple_update_txn(
|
||||
txn, into_table, all_dest_keyvalues, src_row
|
||||
)
|
||||
|
||||
async def get_changes_room_total_events_and_bytes(
|
||||
self, min_pos: int, max_pos: int
|
||||
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
|
||||
"""Fetches the counts of events in the given range of stream IDs.
|
||||
|
||||
Args:
|
||||
min_pos
|
||||
max_pos
|
||||
|
||||
Returns:
|
||||
Mapping of room ID to field changes.
|
||||
"""
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"stats_incremental_total_events_and_bytes",
|
||||
self.get_changes_room_total_events_and_bytes_txn,
|
||||
min_pos,
|
||||
max_pos,
|
||||
)
|
||||
|
||||
def get_changes_room_total_events_and_bytes_txn(
|
||||
self, txn, low_pos: int, high_pos: int
|
||||
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
|
||||
"""Gets the total_events and total_event_bytes counts for rooms and
|
||||
senders, in a range of stream_orderings (including backfilled events).
|
||||
|
||||
Args:
|
||||
txn
|
||||
low_pos: Low stream ordering
|
||||
high_pos: High stream ordering
|
||||
|
||||
Returns:
|
||||
The room and user deltas for total_events/total_event_bytes in the
|
||||
format of `stats_id` -> fields
|
||||
"""
|
||||
|
||||
if low_pos >= high_pos:
|
||||
# nothing to do here.
|
||||
return {}, {}
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
new_bytes_expression = "OCTET_LENGTH(json)"
|
||||
else:
|
||||
new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
|
||||
|
||||
sql = """
|
||||
SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
||||
FROM events INNER JOIN event_json USING (event_id)
|
||||
WHERE (? < stream_ordering AND stream_ordering <= ?)
|
||||
OR (? <= stream_ordering AND stream_ordering <= ?)
|
||||
GROUP BY events.room_id
|
||||
""" % (
|
||||
new_bytes_expression,
|
||||
)
|
||||
|
||||
txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
|
||||
|
||||
room_deltas = {
|
||||
room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
|
||||
for room_id, new_events, new_bytes in txn
|
||||
}
|
||||
|
||||
sql = """
|
||||
SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
||||
FROM events INNER JOIN event_json USING (event_id)
|
||||
WHERE (? < stream_ordering AND stream_ordering <= ?)
|
||||
OR (? <= stream_ordering AND stream_ordering <= ?)
|
||||
GROUP BY events.sender
|
||||
""" % (
|
||||
new_bytes_expression,
|
||||
)
|
||||
|
||||
txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
|
||||
|
||||
user_deltas = {
|
||||
user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
|
||||
for user_id, new_events, new_bytes in txn
|
||||
if self.hs.is_mine_id(user_id)
|
||||
}
|
||||
|
||||
return room_deltas, user_deltas
|
||||
|
||||
async def _calculate_and_set_initial_state_for_room(
|
||||
self, room_id: str
|
||||
) -> Tuple[dict, dict, int]:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue