Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_sig_fed

This commit is contained in:
Erik Johnston 2018-03-01 16:59:39 +00:00
commit 2ad4d5b5bb
7 changed files with 428 additions and 445 deletions

View File

@ -22,9 +22,8 @@ from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureWorkerStore from synapse.storage.signatures import SignatureWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker from ._slaved_id_tracker import SlavedIdTracker
@ -43,70 +42,30 @@ logger = logging.getLogger(__name__)
class SlavedEventStore(EventFederationWorkerStore, class SlavedEventStore(EventFederationWorkerStore,
RoomMemberWorkerStore, RoomMemberWorkerStore,
EventPushActionsWorkerStore, EventPushActionsWorkerStore,
StreamWorkerStore,
EventsWorkerStore, EventsWorkerStore,
StateGroupWorkerStore, StateGroupWorkerStore,
SignatureWorkerStore, SignatureWorkerStore,
BaseSlavedStore): BaseSlavedStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs)
self._stream_id_gen = SlavedIdTracker( self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", db_conn, "events", "stream_ordering",
) )
self._backfill_id_gen = SlavedIdTracker( self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1 db_conn, "events", "stream_ordering", step=-1
) )
events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict(
db_conn, "events",
entity_column="room_id",
stream_column="stream_ordering",
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache", min_event_val,
prefilled_cache=event_cache_prefill,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max,
)
self.stream_ordering_month_ago = 0 super(SlavedEventStore, self).__init__(db_conn, hs)
self._stream_order_on_start = self.get_room_max_stream_ordering()
# Cached functions can't be accessed through a class instance so we need # Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them. # to reach inside the __dict__ to extract them.
get_recent_event_ids_for_room = ( def get_room_max_stream_ordering(self):
StreamStore.__dict__["get_recent_event_ids_for_room"] return self._stream_id_gen.get_current_token()
)
has_room_changed_since = DataStore.has_room_changed_since.__func__
get_membership_changes_for_user = ( def get_room_min_stream_ordering(self):
DataStore.get_membership_changes_for_user.__func__ return self._backfill_id_gen.get_current_token()
)
get_room_events_max_id = DataStore.get_room_events_max_id.__func__
get_room_events_stream_for_room = (
DataStore.get_room_events_stream_for_room.__func__
)
get_events_around = DataStore.get_events_around.__func__
get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
get_room_events_stream_for_rooms = (
DataStore.get_room_events_stream_for_rooms.__func__
)
get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
_set_before_and_after = staticmethod(DataStore._set_before_and_after)
_get_events_around_txn = DataStore._get_events_around_txn.__func__
get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
get_federation_out_pos = DataStore.get_federation_out_pos.__func__
update_federation_out_pos = DataStore.update_federation_out_pos.__func__
def stream_positions(self): def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions() result = super(SlavedEventStore, self).stream_positions()

View File

@ -14,32 +14,19 @@
# limitations under the License. # limitations under the License.
from ._base import BaseSlavedStore from ._base import BaseSlavedStore
from synapse.storage import DataStore from synapse.storage.room import RoomWorkerStore
from synapse.storage.room import RoomStore
from ._slaved_id_tracker import SlavedIdTracker from ._slaved_id_tracker import SlavedIdTracker
class RoomStore(BaseSlavedStore): class RoomStore(RoomWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(RoomStore, self).__init__(db_conn, hs) super(RoomStore, self).__init__(db_conn, hs)
self._public_room_id_gen = SlavedIdTracker( self._public_room_id_gen = SlavedIdTracker(
db_conn, "public_room_list_stream", "stream_id" db_conn, "public_room_list_stream", "stream_id"
) )
get_public_room_ids = DataStore.get_public_room_ids.__func__ def get_current_public_room_stream_id(self):
get_current_public_room_stream_id = ( return self._public_room_id_gen.get_current_token()
DataStore.get_current_public_room_stream_id.__func__
)
get_public_room_ids_at_stream_id = (
RoomStore.__dict__["get_public_room_ids_at_stream_id"]
)
get_public_room_ids_at_stream_id_txn = (
DataStore.get_public_room_ids_at_stream_id_txn.__func__
)
get_published_at_stream_id_txn = (
DataStore.get_published_at_stream_id_txn.__func__
)
get_public_room_changes = DataStore.get_public_room_changes.__func__
def stream_positions(self): def stream_positions(self):
result = super(RoomStore, self).stream_positions() result = super(RoomStore, self).stream_positions()

View File

@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore
from .appservice import ( from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore ApplicationServiceStore, ApplicationServiceTransactionStore
) )
from ._base import LoggingTransaction
from .directory import DirectoryStore from .directory import DirectoryStore
from .events import EventsStore from .events import EventsStore
from .presence import PresenceStore, UserPresenceState from .presence import PresenceStore, UserPresenceState
@ -141,22 +140,6 @@ class DataStore(RoomMemberStore, RoomStore,
else: else:
self._cache_id_gen = None self._cache_id_gen = None
events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict(
db_conn, "events",
entity_column="room_id",
stream_column="stream_ordering",
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache", min_event_val,
prefilled_cache=event_cache_prefill,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max,
)
self._presence_on_startup = self._get_active_presence(db_conn) self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict( presence_cache_prefill, min_presence_val = self._get_cache_dict(
@ -204,6 +187,7 @@ class DataStore(RoomMemberStore, RoomStore,
"DeviceListFederationStreamChangeCache", device_list_max, "DeviceListFederationStreamChangeCache", device_list_max,
) )
events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
db_conn, "current_state_delta_stream", db_conn, "current_state_delta_stream",
entity_column="room_id", entity_column="room_id",
@ -228,20 +212,6 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=_group_updates_prefill, prefilled_cache=_group_updates_prefill,
) )
cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
database_engine=self.database_engine,
after_callbacks=[],
final_callbacks=[],
)
self._find_stream_orderings_for_times_txn(cur)
cur.close()
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 10 * 60 * 1000
)
self._stream_order_on_start = self.get_room_max_stream_ordering() self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering()

View File

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer from twisted.internet import defer
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
class EventPushActionsWorkerStore(SQLBaseStore): class EventPushActionsWorkerStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
# These get correctly set by _find_stream_orderings_for_times_txn
self.stream_ordering_month_ago = None
self.stream_ordering_day_ago = None
cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
database_engine=self.database_engine,
after_callbacks=[],
final_callbacks=[],
)
self._find_stream_orderings_for_times_txn(cur)
cur.close()
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 10 * 60 * 1000
)
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user( def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id self, room_id, user_id, last_read_event_id
@ -443,6 +464,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
desc="remove_push_actions_from_staging", desc="remove_push_actions_from_staging",
) )
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
"_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn
)
def _find_stream_orderings_for_times_txn(self, txn):
logger.info("Searching for stream ordering 1 month ago")
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
logger.info("Searching for stream ordering 1 day ago")
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 day ago: it's %d",
self.stream_ordering_day_ago
)
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
Find the stream_ordering of the first event that was received after
a given timestamp. This is relatively slow as there is no index on
received_ts but we can then use this to delete push actions before
this.
received_ts must necessarily be in the same order as stream_ordering
and stream_ordering is indexed, so we manually binary search using
stream_ordering
"""
txn.execute("SELECT MAX(stream_ordering) FROM events")
max_stream_ordering = txn.fetchone()[0]
if max_stream_ordering is None:
return 0
range_start = 0
range_end = max_stream_ordering
sql = (
"SELECT received_ts FROM events"
" WHERE stream_ordering > ?"
" ORDER BY stream_ordering"
" LIMIT 1"
)
while range_end - range_start > 1:
middle = int((range_end + range_start) / 2)
txn.execute(sql, (middle,))
middle_ts = txn.fetchone()[0]
if ts > middle_ts:
range_start = middle
else:
range_end = middle
return range_end
class EventPushActionsStore(EventPushActionsWorkerStore): class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index" EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@ -650,69 +734,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering)) """, (room_id, user_id, stream_ordering))
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
"_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn
)
def _find_stream_orderings_for_times_txn(self, txn):
logger.info("Searching for stream ordering 1 month ago")
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
logger.info("Searching for stream ordering 1 day ago")
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 day ago: it's %d",
self.stream_ordering_day_ago
)
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
Find the stream_ordering of the first event that was received after
a given timestamp. This is relatively slow as there is no index on
received_ts but we can then use this to delete push actions before
this.
received_ts must necessarily be in the same order as stream_ordering
and stream_ordering is indexed, so we manually binary search using
stream_ordering
"""
txn.execute("SELECT MAX(stream_ordering) FROM events")
max_stream_ordering = txn.fetchone()[0]
if max_stream_ordering is None:
return 0
range_start = 0
range_end = max_stream_ordering
sql = (
"SELECT received_ts FROM events"
" WHERE stream_ordering > ?"
" ORDER BY stream_ordering"
" LIMIT 1"
)
while range_end - range_start > 1:
middle = int((range_end + range_start) / 2)
txn.execute(sql, (middle,))
middle_ts = txn.fetchone()[0]
if ts > middle_ts:
range_start = middle
else:
range_end = middle
return range_end
@defer.inlineCallbacks @defer.inlineCallbacks
def _rotate_notifs(self): def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None: if self._doing_notif_rotation or self.stream_ordering_day_ago is None:

View File

@ -16,6 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.search import SearchStore from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@ -38,7 +39,126 @@ RatelimitOverride = collections.namedtuple(
) )
class RoomStore(SearchStore): class RoomWorkerStore(SQLBaseStore):
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",
keyvalues={
"is_public": True,
},
retcol="room_id",
desc="get_public_room_ids",
)
@cached(num_args=2, max_entries=100)
def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
"""Get pulbic rooms for a particular list, or across all lists.
Args:
stream_id (int)
network_tuple (ThirdPartyInstanceID): The list to use (None, None)
means the main list, None means all lsits.
"""
return self.runInteraction(
"get_public_room_ids_at_stream_id",
self.get_public_room_ids_at_stream_id_txn,
stream_id, network_tuple=network_tuple
)
def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
network_tuple):
return {
rm
for rm, vis in self.get_published_at_stream_id_txn(
txn, stream_id, network_tuple=network_tuple
).items()
if vis
}
def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
if network_tuple:
# We want to get from a particular list. No aggregation required.
sql = ("""
SELECT room_id, visibility FROM public_room_list_stream
INNER JOIN (
SELECT room_id, max(stream_id) AS stream_id
FROM public_room_list_stream
WHERE stream_id <= ? %s
GROUP BY room_id
) grouped USING (room_id, stream_id)
""")
if network_tuple.appservice_id is not None:
txn.execute(
sql % ("AND appservice_id = ? AND network_id = ?",),
(stream_id, network_tuple.appservice_id, network_tuple.network_id,)
)
else:
txn.execute(
sql % ("AND appservice_id IS NULL",),
(stream_id,)
)
return dict(txn)
else:
# We want to get from all lists, so we need to aggregate the results
logger.info("Executing full list")
sql = ("""
SELECT room_id, visibility
FROM public_room_list_stream
INNER JOIN (
SELECT
room_id, max(stream_id) AS stream_id, appservice_id,
network_id
FROM public_room_list_stream
WHERE stream_id <= ?
GROUP BY room_id, appservice_id, network_id
) grouped USING (room_id, stream_id)
""")
txn.execute(
sql,
(stream_id,)
)
results = {}
# A room is visible if its visible on any list.
for room_id, visibility in txn:
results[room_id] = bool(visibility) or results.get(room_id, False)
return results
def get_public_room_changes(self, prev_stream_id, new_stream_id,
network_tuple):
def get_public_room_changes_txn(txn):
then_rooms = self.get_public_room_ids_at_stream_id_txn(
txn, prev_stream_id, network_tuple
)
now_rooms_dict = self.get_published_at_stream_id_txn(
txn, new_stream_id, network_tuple
)
now_rooms_visible = set(
rm for rm, vis in now_rooms_dict.items() if vis
)
now_rooms_not_visible = set(
rm for rm, vis in now_rooms_dict.items() if not vis
)
newly_visible = now_rooms_visible - then_rooms
newly_unpublished = now_rooms_not_visible & then_rooms
return newly_visible, newly_unpublished
return self.runInteraction(
"get_public_room_changes", get_public_room_changes_txn
)
class RoomStore(RoomWorkerStore, SearchStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def store_room(self, room_id, room_creator_user_id, is_public): def store_room(self, room_id, room_creator_user_id, is_public):
@ -225,16 +345,6 @@ class RoomStore(SearchStore):
) )
self.hs.get_notifier().on_new_replication_data() self.hs.get_notifier().on_new_replication_data()
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",
keyvalues={
"is_public": True,
},
retcol="room_id",
desc="get_public_room_ids",
)
def get_room_count(self): def get_room_count(self):
"""Retrieve a list of all rooms """Retrieve a list of all rooms
""" """
@ -326,113 +436,6 @@ class RoomStore(SearchStore):
def get_current_public_room_stream_id(self): def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token() return self._public_room_id_gen.get_current_token()
@cached(num_args=2, max_entries=100)
def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
"""Get pulbic rooms for a particular list, or across all lists.
Args:
stream_id (int)
network_tuple (ThirdPartyInstanceID): The list to use (None, None)
means the main list, None means all lsits.
"""
return self.runInteraction(
"get_public_room_ids_at_stream_id",
self.get_public_room_ids_at_stream_id_txn,
stream_id, network_tuple=network_tuple
)
def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
network_tuple):
return {
rm
for rm, vis in self.get_published_at_stream_id_txn(
txn, stream_id, network_tuple=network_tuple
).items()
if vis
}
def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
if network_tuple:
# We want to get from a particular list. No aggregation required.
sql = ("""
SELECT room_id, visibility FROM public_room_list_stream
INNER JOIN (
SELECT room_id, max(stream_id) AS stream_id
FROM public_room_list_stream
WHERE stream_id <= ? %s
GROUP BY room_id
) grouped USING (room_id, stream_id)
""")
if network_tuple.appservice_id is not None:
txn.execute(
sql % ("AND appservice_id = ? AND network_id = ?",),
(stream_id, network_tuple.appservice_id, network_tuple.network_id,)
)
else:
txn.execute(
sql % ("AND appservice_id IS NULL",),
(stream_id,)
)
return dict(txn)
else:
# We want to get from all lists, so we need to aggregate the results
logger.info("Executing full list")
sql = ("""
SELECT room_id, visibility
FROM public_room_list_stream
INNER JOIN (
SELECT
room_id, max(stream_id) AS stream_id, appservice_id,
network_id
FROM public_room_list_stream
WHERE stream_id <= ?
GROUP BY room_id, appservice_id, network_id
) grouped USING (room_id, stream_id)
""")
txn.execute(
sql,
(stream_id,)
)
results = {}
# A room is visible if its visible on any list.
for room_id, visibility in txn:
results[room_id] = bool(visibility) or results.get(room_id, False)
return results
def get_public_room_changes(self, prev_stream_id, new_stream_id,
network_tuple):
def get_public_room_changes_txn(txn):
then_rooms = self.get_public_room_ids_at_stream_id_txn(
txn, prev_stream_id, network_tuple
)
now_rooms_dict = self.get_published_at_stream_id_txn(
txn, new_stream_id, network_tuple
)
now_rooms_visible = set(
rm for rm, vis in now_rooms_dict.items() if vis
)
now_rooms_not_visible = set(
rm for rm, vis in now_rooms_dict.items() if not vis
)
newly_visible = now_rooms_visible - then_rooms
newly_unpublished = now_rooms_not_visible & then_rooms
return newly_visible, newly_unpublished
return self.runInteraction(
"get_public_room_changes", get_public_room_changes_txn
)
def get_all_new_public_rooms(self, prev_id, current_id, limit): def get_all_new_public_rooms(self, prev_id, current_id, limit):
def get_all_new_public_rooms(txn): def get_all_new_public_rooms(txn):
sql = (""" sql = ("""

View File

@ -35,13 +35,17 @@ what sort order was used:
from twisted.internet import defer from twisted.internet import defer
from ._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import abc
import logging import logging
@ -143,81 +147,41 @@ def filter_to_clause(event_filter):
return " AND ".join(clauses), args return " AND ".join(clauses), args
class StreamStore(SQLBaseStore): class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks """This is an abstract base class where subclasses must implement
def get_appservice_room_stream(self, service, from_key, to_key, limit=0): `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
# NB this lives here instead of appservice.py so we can reuse the which can be called in the initializer.
# 'private' StreamToken class in this file. """
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering. __metaclass__ = abc.ABCMeta
from_id = RoomStreamToken.parse_stream_token(from_key)
to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key: def __init__(self, db_conn, hs):
defer.returnValue(([], to_key)) super(StreamWorkerStore, self).__init__(db_conn, hs)
return
# select all the events between from/to with a sensible limit events_max = self.get_room_max_stream_ordering()
sql = ( event_cache_prefill, min_event_val = self._get_cache_dict(
"SELECT e.event_id, e.room_id, e.type, s.state_key, " db_conn, "events",
"e.stream_ordering FROM events AS e " entity_column="room_id",
"LEFT JOIN state_events as s ON " stream_column="stream_ordering",
"e.event_id = s.event_id " max_value=events_max,
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " )
"ORDER BY stream_ordering ASC LIMIT %(limit)d " self._events_stream_cache = StreamChangeCache(
) % { "EventsRoomStreamChangeCache", min_event_val,
"limit": limit prefilled_cache=event_cache_prefill,
} )
self._membership_stream_cache = StreamChangeCache(
def f(txn): "MembershipStreamChangeCache", events_max,
# pull out all the events between the tokens
txn.execute(sql, (from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
# Logic:
# - We want ALL events which match the AS room_id regex
# - We want ALL events which match the rooms represented by the AS
# room_alias regex
# - We want ALL events for rooms that AS users have joined.
# This is currently supported via get_app_service_rooms (which is
# used for the Notifier listener rooms). We can't reasonably make a
# SQL query for these room IDs, so we'll pull all the events between
# from/to and filter in python.
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
room_ids_for_as = [r.room_id for r in rooms_for_as]
def app_service_interested(row):
if row["room_id"] in room_ids_for_as:
return True
if row["type"] == EventTypes.Member:
if service.is_interested_in_user(row.get("state_key")):
return True
return False
return [r for r in rows if app_service_interested(r)]
rows = yield self.runInteraction("get_appservice_room_stream", f)
ret = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
) )
self._set_before_and_after(ret, rows, topo_order=from_id is None) self._stream_order_on_start = self.get_room_max_stream_ordering()
if rows: @abc.abstractmethod
key = "s%d" % max(r["stream_ordering"] for r in rows) def get_room_max_stream_ordering(self):
else: raise NotImplementedError()
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
defer.returnValue((ret, key)) @abc.abstractmethod
def get_room_min_stream_ordering(self):
raise NotImplementedError()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@ -380,88 +344,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == 'b':
order = "DESC"
bounds = upper_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, lower_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, upper_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
filter_clause, filter_args = filter_to_clause(event_filter)
if filter_clause:
bounds += " AND " + filter_clause
args.extend(filter_args)
if int(limit) > 0:
args.append(int(limit))
limit_str = " LIMIT ?"
else:
limit_str = ""
sql = (
"SELECT * FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
) % {
"bounds": bounds,
"order": order,
"limit": limit_str
}
def f(txn):
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
return rows, next_token,
rows, token = yield self.runInteraction("paginate_room_events", f)
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room( rows, token = yield self.get_recent_event_ids_for_room(
@ -542,7 +424,7 @@ class StreamStore(SQLBaseStore):
`room_id` causes it to return the current room specific topological `room_id` causes it to return the current room specific topological
token. token.
""" """
token = yield self._stream_id_gen.get_current_token() token = yield self.get_room_max_stream_ordering()
if room_id is None: if room_id is None:
defer.returnValue("s%d" % (token,)) defer.returnValue("s%d" % (token,))
else: else:
@ -552,12 +434,6 @@ class StreamStore(SQLBaseStore):
) )
defer.returnValue("t%d-%d" % (topo, token)) defer.returnValue("t%d-%d" % (topo, token))
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
def get_stream_token_for_event(self, event_id): def get_stream_token_for_event(self, event_id):
"""The stream token for an event """The stream token for an event
Args: Args:
@ -832,3 +708,168 @@ class StreamStore(SQLBaseStore):
def has_room_changed_since(self, room_id, stream_id): def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id) return self._events_stream_cache.has_entity_changed(room_id, stream_id)
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the
# 'private' StreamToken class in this file.
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
from_id = RoomStreamToken.parse_stream_token(from_key)
to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key:
defer.returnValue(([], to_key))
return
# select all the events between from/to with a sensible limit
sql = (
"SELECT e.event_id, e.room_id, e.type, s.state_key, "
"e.stream_ordering FROM events AS e "
"LEFT JOIN state_events as s ON "
"e.event_id = s.event_id "
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % {
"limit": limit
}
def f(txn):
# pull out all the events between the tokens
txn.execute(sql, (from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
# Logic:
# - We want ALL events which match the AS room_id regex
# - We want ALL events which match the rooms represented by the AS
# room_alias regex
# - We want ALL events for rooms that AS users have joined.
# This is currently supported via get_app_service_rooms (which is
# used for the Notifier listener rooms). We can't reasonably make a
# SQL query for these room IDs, so we'll pull all the events between
# from/to and filter in python.
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
room_ids_for_as = [r.room_id for r in rooms_for_as]
def app_service_interested(row):
if row["room_id"] in room_ids_for_as:
return True
if row["type"] == EventTypes.Member:
if service.is_interested_in_user(row.get("state_key")):
return True
return False
return [r for r in rows if app_service_interested(r)]
rows = yield self.runInteraction("get_appservice_room_stream", f)
ret = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
self._set_before_and_after(ret, rows, topo_order=from_id is None)
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
defer.returnValue((ret, key))
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == 'b':
order = "DESC"
bounds = upper_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, lower_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, upper_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
filter_clause, filter_args = filter_to_clause(event_filter)
if filter_clause:
bounds += " AND " + filter_clause
args.extend(filter_args)
if int(limit) > 0:
args.append(int(limit))
limit_str = " LIMIT ?"
else:
limit_str = ""
sql = (
"SELECT * FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
) % {
"bounds": bounds,
"order": order,
"limit": limit_str
}
def f(txn):
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
return rows, next_token,
rows, token = yield self.runInteraction("paginate_room_events", f)
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
self._set_before_and_after(events, rows)
defer.returnValue((events, token))

View File

@ -299,10 +299,6 @@ def preserve_fn(f):
Useful for wrapping functions that return a deferred which you don't yield Useful for wrapping functions that return a deferred which you don't yield
on. on.
""" """
def reset_context(result):
LoggingContext.set_current_context(LoggingContext.sentinel)
return result
def g(*args, **kwargs): def g(*args, **kwargs):
current = LoggingContext.current_context() current = LoggingContext.current_context()
res = f(*args, **kwargs) res = f(*args, **kwargs)
@ -323,12 +319,11 @@ def preserve_fn(f):
# which is supposed to have a single entry and exit point. But # which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively # by spawning off another deferred, we are effectively
# adding a new exit point.) # adding a new exit point.)
res.addBoth(reset_context) res.addBoth(_set_context_cb, LoggingContext.sentinel)
return res return res
return g return g
@defer.inlineCallbacks
def make_deferred_yieldable(deferred): def make_deferred_yieldable(deferred):
"""Given a deferred, make it follow the Synapse logcontext rules: """Given a deferred, make it follow the Synapse logcontext rules:
@ -342,9 +337,16 @@ def make_deferred_yieldable(deferred):
(This is more-or-less the opposite operation to preserve_fn.) (This is more-or-less the opposite operation to preserve_fn.)
""" """
with PreserveLoggingContext(): if isinstance(deferred, defer.Deferred) and not deferred.called:
r = yield deferred prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
defer.returnValue(r) deferred.addBoth(_set_context_cb, prev_context)
return deferred
def _set_context_cb(result, context):
"""A callback function which just sets the logging context"""
LoggingContext.set_current_context(context)
return result
# modules to ignore in `logcontext_tracer` # modules to ignore in `logcontext_tracer`