Move DB pool and helper functions into dedicated Database class

This commit is contained in:
Erik Johnston 2019-12-04 13:52:46 +00:00
parent ddbbfc9512
commit 756d4942f5
62 changed files with 2377 additions and 2295 deletions

View file

@ -143,7 +143,7 @@ class EventsStore(
)
return txn.fetchall()
res = yield self.runInteraction("read_forward_extremities", fetch)
res = yield self.db.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
@_retry_on_integrity_error
@ -208,7 +208,7 @@ class EventsStore(
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
yield self.runInteraction(
yield self.db.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
@ -281,7 +281,7 @@ class EventsStore(
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
yield self.db.runInteraction(
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
@ -345,7 +345,7 @@ class EventsStore(
existing_prevs.add(prev_event_id)
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
yield self.db.runInteraction(
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
@ -432,7 +432,7 @@ class EventsStore(
# event's auth chain, but its easier for now just to store them (and
# it doesn't take much storage compared to storing the entire event
# anyway).
self.simple_insert_many_txn(
self.db.simple_insert_many_txn(
txn,
table="event_auth",
values=[
@ -580,12 +580,12 @@ class EventsStore(
self, txn, new_forward_extremities, max_stream_order
):
for room_id, new_extrem in iteritems(new_forward_extremities):
self.simple_delete_txn(
self.db.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
self.simple_insert_many_txn(
self.db.simple_insert_many_txn(
txn,
table="event_forward_extremities",
values=[
@ -598,7 +598,7 @@ class EventsStore(
# new stream_ordering to new forward extremeties in the room.
# This allows us to later efficiently look up the forward extremeties
# for a room before a given stream_ordering
self.simple_insert_many_txn(
self.db.simple_insert_many_txn(
txn,
table="stream_ordering_to_exterm",
values=[
@ -722,7 +722,7 @@ class EventsStore(
# change in outlier status to our workers.
stream_order = event.internal_metadata.stream_ordering
state_group_id = context.state_group
self.simple_insert_txn(
self.db.simple_insert_txn(
txn,
table="ex_outlier_stream",
values={
@ -794,7 +794,7 @@ class EventsStore(
d.pop("redacted_because", None)
return d
self.simple_insert_many_txn(
self.db.simple_insert_many_txn(
txn,
table="event_json",
values=[
@ -811,7 +811,7 @@ class EventsStore(
],
)
self.simple_insert_many_txn(
self.db.simple_insert_many_txn(
txn,
table="events",
values=[
@ -841,7 +841,7 @@ class EventsStore(
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
self.simple_update_txn(
self.db.simple_update_txn(
txn,
table="redactions",
keyvalues={"redacts": event.event_id},
@ -983,7 +983,7 @@ class EventsStore(
state_values.append(vals)
self.simple_insert_many_txn(txn, table="state_events", values=state_values)
self.db.simple_insert_many_txn(txn, table="state_events", values=state_values)
# Prefill the event cache
self._add_to_cache(txn, events_and_contexts)
@ -1014,7 +1014,7 @@ class EventsStore(
)
txn.execute(sql + clause, args)
rows = self.cursor_to_dict(txn)
rows = self.db.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
if not row["rejects"] and not row["redacts"]:
@ -1032,7 +1032,7 @@ class EventsStore(
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
self.simple_insert_txn(
self.db.simple_insert_txn(
txn,
table="redactions",
values={
@ -1077,7 +1077,9 @@ class EventsStore(
LIMIT ?
"""
rows = yield self.execute("_censor_redactions_fetch", None, sql, before_ts, 100)
rows = yield self.db.execute(
"_censor_redactions_fetch", None, sql, before_ts, 100
)
updates = []
@ -1109,14 +1111,14 @@ class EventsStore(
if pruned_json:
self._censor_event_txn(txn, event_id, pruned_json)
self.simple_update_one_txn(
self.db.simple_update_one_txn(
txn,
table="redactions",
keyvalues={"event_id": redaction_id},
updatevalues={"have_censored": True},
)
yield self.runInteraction("_update_censor_txn", _update_censor_txn)
yield self.db.runInteraction("_update_censor_txn", _update_censor_txn)
def _censor_event_txn(self, txn, event_id, pruned_json):
"""Censor an event by replacing its JSON in the event_json table with the
@ -1127,7 +1129,7 @@ class EventsStore(
event_id (str): The ID of the event to censor.
pruned_json (str): The pruned JSON
"""
self.simple_update_one_txn(
self.db.simple_update_one_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
@ -1153,7 +1155,7 @@ class EventsStore(
(count,) = txn.fetchone()
return count
ret = yield self.runInteraction("count_messages", _count_messages)
ret = yield self.db.runInteraction("count_messages", _count_messages)
return ret
@defer.inlineCallbacks
@ -1174,7 +1176,7 @@ class EventsStore(
(count,) = txn.fetchone()
return count
ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
ret = yield self.db.runInteraction("count_daily_sent_messages", _count_messages)
return ret
@defer.inlineCallbacks
@ -1189,7 +1191,7 @@ class EventsStore(
(count,) = txn.fetchone()
return count
ret = yield self.runInteraction("count_daily_active_rooms", _count)
ret = yield self.db.runInteraction("count_daily_active_rooms", _count)
return ret
def get_current_backfill_token(self):
@ -1241,7 +1243,7 @@ class EventsStore(
return new_event_updates
return self.runInteraction(
return self.db.runInteraction(
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
)
@ -1286,7 +1288,7 @@ class EventsStore(
return new_event_updates
return self.runInteraction(
return self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)
@ -1379,7 +1381,7 @@ class EventsStore(
backward_ex_outliers,
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
return self.db.runInteraction("get_all_new_events", get_all_new_events_txn)
def purge_history(self, room_id, token, delete_local_events):
"""Deletes room history before a certain point
@ -1399,7 +1401,7 @@ class EventsStore(
deleted events.
"""
return self.runInteraction(
return self.db.runInteraction(
"purge_history",
self._purge_history_txn,
room_id,
@ -1647,7 +1649,7 @@ class EventsStore(
Deferred[List[int]]: The list of state groups to delete.
"""
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
return self.db.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id):
# First we fetch all the state groups that should be deleted, before
@ -1766,7 +1768,7 @@ class EventsStore(
to delete.
"""
return self.runInteraction(
return self.db.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
@ -1778,7 +1780,7 @@ class EventsStore(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
rows = self.simple_select_many_txn(
rows = self.db.simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
@ -1805,15 +1807,15 @@ class EventsStore(
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self.simple_delete_txn(
self.db.simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self.simple_delete_txn(
self.db.simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self.simple_insert_many_txn(
self.db.simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
@ -1850,7 +1852,7 @@ class EventsStore(
state group.
"""
rows = yield self.simple_select_many_batch(
rows = yield self.db.simple_select_many_batch(
table="state_group_edges",
column="prev_state_group",
iterable=state_groups,
@ -1869,7 +1871,7 @@ class EventsStore(
state_groups_to_delete (list[int]): State groups to delete
"""
return self.runInteraction(
return self.db.runInteraction(
"purge_room_state",
self._purge_room_state_txn,
room_id,
@ -1880,7 +1882,7 @@ class EventsStore(
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
self.simple_delete_many_txn(
self.db.simple_delete_many_txn(
txn,
table="state_groups_state",
column="state_group",
@ -1891,7 +1893,7 @@ class EventsStore(
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
self.simple_delete_many_txn(
self.db.simple_delete_many_txn(
txn,
table="state_group_edges",
column="state_group",
@ -1902,7 +1904,7 @@ class EventsStore(
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
self.simple_delete_many_txn(
self.db.simple_delete_many_txn(
txn,
table="state_groups",
column="id",
@ -1919,7 +1921,7 @@ class EventsStore(
@cachedInlineCallbacks(max_entries=5000)
def _get_event_ordering(self, event_id):
res = yield self.simple_select_one(
res = yield self.db.simple_select_one(
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
@ -1942,7 +1944,7 @@ class EventsStore(
txn.execute(sql, (from_token, to_token, limit))
return txn.fetchall()
return self.runInteraction(
return self.db.runInteraction(
"get_all_updated_current_state_deltas",
get_all_updated_current_state_deltas_txn,
)
@ -1960,7 +1962,7 @@ class EventsStore(
room_id (str): The ID of the room the event was sent to.
topological_ordering (int): The position of the event in the room's topology.
"""
return self.simple_insert_many_txn(
return self.db.simple_insert_many_txn(
txn=txn,
table="event_labels",
values=[
@ -1982,7 +1984,7 @@ class EventsStore(
event_id (str): The event ID the expiry timestamp is associated with.
expiry_ts (int): The timestamp at which to expire (delete) the event.
"""
return self.simple_insert_txn(
return self.db.simple_insert_txn(
txn=txn,
table="event_expiry",
values={"event_id": event_id, "expiry_ts": expiry_ts},
@ -2031,7 +2033,7 @@ class EventsStore(
txn, "_get_event_cache", (event.event_id,)
)
yield self.runInteraction("delete_expired_event", delete_expired_event_txn)
yield self.db.runInteraction("delete_expired_event", delete_expired_event_txn)
def _delete_event_expiry_txn(self, txn, event_id):
"""Delete the expiry timestamp associated with an event ID without deleting the
@ -2041,7 +2043,7 @@ class EventsStore(
txn (LoggingTransaction): The transaction to use to perform the deletion.
event_id (str): The event ID to delete the associated expiry timestamp of.
"""
return self.simple_delete_txn(
return self.db.simple_delete_txn(
txn=txn, table="event_expiry", keyvalues={"event_id": event_id}
)
@ -2065,7 +2067,7 @@ class EventsStore(
return txn.fetchone()
return self.runInteraction(
return self.db.runInteraction(
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)