mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-12 10:39:27 -05:00
Move necessary storage functions to worker classes
This commit is contained in:
parent
1e2bed9656
commit
62ace05c45
@ -1430,88 +1430,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
||||
(event.event_id, event.redacts)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def have_events_in_timeline(self, event_ids):
|
||||
"""Given a list of event ids, check if we have already processed and
|
||||
stored them as non outliers.
|
||||
"""
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="events",
|
||||
retcols=("event_id",),
|
||||
column="event_id",
|
||||
iterable=list(event_ids),
|
||||
keyvalues={"outlier": False},
|
||||
desc="have_events_in_timeline",
|
||||
)
|
||||
|
||||
defer.returnValue(set(r["event_id"] for r in rows))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def have_seen_events(self, event_ids):
|
||||
"""Given a list of event ids, check if we have already processed them.
|
||||
|
||||
Args:
|
||||
event_ids (iterable[str]):
|
||||
|
||||
Returns:
|
||||
Deferred[set[str]]: The events we have already seen.
|
||||
"""
|
||||
results = set()
|
||||
|
||||
def have_seen_events_txn(txn, chunk):
|
||||
sql = (
|
||||
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
|
||||
% (",".join("?" * len(chunk)), )
|
||||
)
|
||||
txn.execute(sql, chunk)
|
||||
for (event_id, ) in txn:
|
||||
results.add(event_id)
|
||||
|
||||
# break the input up into chunks of 100
|
||||
input_iterator = iter(event_ids)
|
||||
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
|
||||
[]):
|
||||
yield self.runInteraction(
|
||||
"have_seen_events",
|
||||
have_seen_events_txn,
|
||||
chunk,
|
||||
)
|
||||
defer.returnValue(results)
|
||||
|
||||
def get_seen_events_with_rejections(self, event_ids):
|
||||
"""Given a list of event ids, check if we rejected them.
|
||||
|
||||
Args:
|
||||
event_ids (list[str])
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, str|None):
|
||||
Has an entry for each event id we already have seen. Maps to
|
||||
the rejected reason string if we rejected the event, else maps
|
||||
to None.
|
||||
"""
|
||||
if not event_ids:
|
||||
return defer.succeed({})
|
||||
|
||||
def f(txn):
|
||||
sql = (
|
||||
"SELECT e.event_id, reason FROM events as e "
|
||||
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
|
||||
"WHERE e.event_id = ?"
|
||||
)
|
||||
|
||||
res = {}
|
||||
for event_id in event_ids:
|
||||
txn.execute(sql, (event_id,))
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
_, rejected = row
|
||||
res[event_id] = rejected
|
||||
|
||||
return res
|
||||
|
||||
return self.runInteraction("get_rejection_reasons", f)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_messages(self):
|
||||
"""
|
||||
|
@ -12,7 +12,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from canonicaljson import json
|
||||
@ -442,3 +444,85 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
|
||||
|
||||
defer.returnValue(cache_entry)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def have_events_in_timeline(self, event_ids):
|
||||
"""Given a list of event ids, check if we have already processed and
|
||||
stored them as non outliers.
|
||||
"""
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="events",
|
||||
retcols=("event_id",),
|
||||
column="event_id",
|
||||
iterable=list(event_ids),
|
||||
keyvalues={"outlier": False},
|
||||
desc="have_events_in_timeline",
|
||||
)
|
||||
|
||||
defer.returnValue(set(r["event_id"] for r in rows))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def have_seen_events(self, event_ids):
|
||||
"""Given a list of event ids, check if we have already processed them.
|
||||
|
||||
Args:
|
||||
event_ids (iterable[str]):
|
||||
|
||||
Returns:
|
||||
Deferred[set[str]]: The events we have already seen.
|
||||
"""
|
||||
results = set()
|
||||
|
||||
def have_seen_events_txn(txn, chunk):
|
||||
sql = (
|
||||
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
|
||||
% (",".join("?" * len(chunk)), )
|
||||
)
|
||||
txn.execute(sql, chunk)
|
||||
for (event_id, ) in txn:
|
||||
results.add(event_id)
|
||||
|
||||
# break the input up into chunks of 100
|
||||
input_iterator = iter(event_ids)
|
||||
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
|
||||
[]):
|
||||
yield self.runInteraction(
|
||||
"have_seen_events",
|
||||
have_seen_events_txn,
|
||||
chunk,
|
||||
)
|
||||
defer.returnValue(results)
|
||||
|
||||
def get_seen_events_with_rejections(self, event_ids):
|
||||
"""Given a list of event ids, check if we rejected them.
|
||||
|
||||
Args:
|
||||
event_ids (list[str])
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, str|None):
|
||||
Has an entry for each event id we already have seen. Maps to
|
||||
the rejected reason string if we rejected the event, else maps
|
||||
to None.
|
||||
"""
|
||||
if not event_ids:
|
||||
return defer.succeed({})
|
||||
|
||||
def f(txn):
|
||||
sql = (
|
||||
"SELECT e.event_id, reason FROM events as e "
|
||||
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
|
||||
"WHERE e.event_id = ?"
|
||||
)
|
||||
|
||||
res = {}
|
||||
for event_id in event_ids:
|
||||
txn.execute(sql, (event_id,))
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
_, rejected = row
|
||||
res[event_id] = rejected
|
||||
|
||||
return res
|
||||
|
||||
return self.runInteraction("get_rejection_reasons", f)
|
||||
|
@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
|
||||
|
||||
|
||||
class RoomWorkerStore(SQLBaseStore):
|
||||
def get_room(self, room_id):
|
||||
"""Retrieve a room.
|
||||
|
||||
Args:
|
||||
room_id (str): The ID of the room to retrieve.
|
||||
Returns:
|
||||
A namedtuple containing the room information, or an empty list.
|
||||
"""
|
||||
return self._simple_select_one(
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcols=("room_id", "is_public", "creator"),
|
||||
desc="get_room",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
def get_public_room_ids(self):
|
||||
return self._simple_select_onecol(
|
||||
table="rooms",
|
||||
@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
logger.error("store_room with room_id=%s failed: %s", room_id, e)
|
||||
raise StoreError(500, "Problem creating room.")
|
||||
|
||||
def get_room(self, room_id):
|
||||
"""Retrieve a room.
|
||||
|
||||
Args:
|
||||
room_id (str): The ID of the room to retrieve.
|
||||
Returns:
|
||||
A namedtuple containing the room information, or an empty list.
|
||||
"""
|
||||
return self._simple_select_one(
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcols=("room_id", "is_public", "creator"),
|
||||
desc="get_room",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_room_is_public(self, room_id, is_public):
|
||||
def set_room_is_public_txn(txn, next_id):
|
||||
|
Loading…
Reference in New Issue
Block a user