Split out stream store

This commit is contained in:
Erik Johnston 2018-03-01 13:56:03 +00:00
parent 17445e6701
commit f793bc3877
3 changed files with 202 additions and 210 deletions

View File

@ -22,9 +22,8 @@ from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureStore from synapse.storage.signatures import SignatureStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker from ._slaved_id_tracker import SlavedIdTracker
@ -41,34 +40,20 @@ logger = logging.getLogger(__name__)
class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
EventsWorkerStore, StateGroupWorkerStore, StreamWorkerStore, EventsWorkerStore, StateGroupWorkerStore,
BaseSlavedStore): BaseSlavedStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs)
self._stream_id_gen = SlavedIdTracker( self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", db_conn, "events", "stream_ordering",
) )
self._backfill_id_gen = SlavedIdTracker( self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1 db_conn, "events", "stream_ordering", step=-1
) )
events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict( super(SlavedEventStore, self).__init__(db_conn, hs)
db_conn, "events",
entity_column="room_id",
stream_column="stream_ordering",
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache", min_event_val,
prefilled_cache=event_cache_prefill,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max,
)
self.stream_ordering_month_ago = 0 self.stream_ordering_month_ago = 0
self._stream_order_on_start = self.get_room_max_stream_ordering()
# Cached functions can't be accessed through a class instance so we need # Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them. # to reach inside the __dict__ to extract them.
@ -76,30 +61,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
"get_latest_event_ids_in_room" "get_latest_event_ids_in_room"
] ]
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
)
has_room_changed_since = DataStore.has_room_changed_since.__func__
get_membership_changes_for_user = (
DataStore.get_membership_changes_for_user.__func__
)
get_room_events_max_id = DataStore.get_room_events_max_id.__func__
get_room_events_stream_for_room = (
DataStore.get_room_events_stream_for_room.__func__
)
get_events_around = DataStore.get_events_around.__func__
get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
get_room_events_stream_for_rooms = (
DataStore.get_room_events_stream_for_rooms.__func__
)
get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
_set_before_and_after = staticmethod(DataStore._set_before_and_after)
_get_events_around_txn = DataStore._get_events_around_txn.__func__
get_backfill_events = DataStore.get_backfill_events.__func__ get_backfill_events = DataStore.get_backfill_events.__func__
_get_backfill_events = DataStore._get_backfill_events.__func__ _get_backfill_events = DataStore._get_backfill_events.__func__
get_missing_events = DataStore.get_missing_events.__func__ get_missing_events = DataStore.get_missing_events.__func__
@ -120,8 +81,11 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
get_federation_out_pos = DataStore.get_federation_out_pos.__func__ def get_room_max_stream_ordering(self):
update_federation_out_pos = DataStore.update_federation_out_pos.__func__ return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
get_latest_event_ids_and_hashes_in_room = ( get_latest_event_ids_and_hashes_in_room = (
DataStore.get_latest_event_ids_and_hashes_in_room.__func__ DataStore.get_latest_event_ids_and_hashes_in_room.__func__

View File

@ -148,14 +148,6 @@ class DataStore(RoomMemberStore, RoomStore,
stream_column="stream_ordering", stream_column="stream_ordering",
max_value=events_max, max_value=events_max,
) )
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache", min_event_val,
prefilled_cache=event_cache_prefill,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max,
)
self._presence_on_startup = self._get_active_presence(db_conn) self._presence_on_startup = self._get_active_presence(db_conn)

View File

@ -35,13 +35,17 @@ what sort order was used:
from twisted.internet import defer from twisted.internet import defer
from ._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import abc
import logging import logging
@ -143,81 +147,28 @@ def filter_to_clause(event_filter):
return " AND ".join(clauses), args return " AND ".join(clauses), args
class StreamStore(SQLBaseStore): class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks __metaclass__ = abc.ABCMeta
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the
# 'private' StreamToken class in this file.
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering. def __init__(self, db_conn, hs):
from_id = RoomStreamToken.parse_stream_token(from_key) super(StreamWorkerStore, self).__init__(db_conn, hs)
to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key: events_max = self.get_room_max_stream_ordering()
defer.returnValue(([], to_key)) event_cache_prefill, min_event_val = self._get_cache_dict(
return db_conn, "events",
entity_column="room_id",
# select all the events between from/to with a sensible limit stream_column="stream_ordering",
sql = ( max_value=events_max,
"SELECT e.event_id, e.room_id, e.type, s.state_key, " )
"e.stream_ordering FROM events AS e " self._events_stream_cache = StreamChangeCache(
"LEFT JOIN state_events as s ON " "EventsRoomStreamChangeCache", min_event_val,
"e.event_id = s.event_id " prefilled_cache=event_cache_prefill,
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " )
"ORDER BY stream_ordering ASC LIMIT %(limit)d " self._membership_stream_cache = StreamChangeCache(
) % { "MembershipStreamChangeCache", events_max,
"limit": limit
}
def f(txn):
# pull out all the events between the tokens
txn.execute(sql, (from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
# Logic:
# - We want ALL events which match the AS room_id regex
# - We want ALL events which match the rooms represented by the AS
# room_alias regex
# - We want ALL events for rooms that AS users have joined.
# This is currently supported via get_app_service_rooms (which is
# used for the Notifier listener rooms). We can't reasonably make a
# SQL query for these room IDs, so we'll pull all the events between
# from/to and filter in python.
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
room_ids_for_as = [r.room_id for r in rooms_for_as]
def app_service_interested(row):
if row["room_id"] in room_ids_for_as:
return True
if row["type"] == EventTypes.Member:
if service.is_interested_in_user(row.get("state_key")):
return True
return False
return [r for r in rows if app_service_interested(r)]
rows = yield self.runInteraction("get_appservice_room_stream", f)
ret = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
) )
self._set_before_and_after(ret, rows, topo_order=from_id is None) self._stream_order_on_start = self.get_room_max_stream_ordering()
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
defer.returnValue((ret, key))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@ -380,88 +331,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == 'b':
order = "DESC"
bounds = upper_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, lower_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, upper_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
filter_clause, filter_args = filter_to_clause(event_filter)
if filter_clause:
bounds += " AND " + filter_clause
args.extend(filter_args)
if int(limit) > 0:
args.append(int(limit))
limit_str = " LIMIT ?"
else:
limit_str = ""
sql = (
"SELECT * FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
) % {
"bounds": bounds,
"order": order,
"limit": limit_str
}
def f(txn):
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
return rows, next_token,
rows, token = yield self.runInteraction("paginate_room_events", f)
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room( rows, token = yield self.get_recent_event_ids_for_room(
@ -542,7 +411,7 @@ class StreamStore(SQLBaseStore):
`room_id` causes it to return the current room specific topological `room_id` causes it to return the current room specific topological
token. token.
""" """
token = yield self._stream_id_gen.get_current_token() token = yield self.get_room_max_stream_ordering()
if room_id is None: if room_id is None:
defer.returnValue("s%d" % (token,)) defer.returnValue("s%d" % (token,))
else: else:
@ -552,11 +421,13 @@ class StreamStore(SQLBaseStore):
) )
defer.returnValue("t%d-%d" % (topo, token)) defer.returnValue("t%d-%d" % (topo, token))
@abc.abstractmethod
def get_room_max_stream_ordering(self): def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token() raise NotImplementedError()
@abc.abstractmethod
def get_room_min_stream_ordering(self): def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token() raise NotImplementedError()
def get_stream_token_for_event(self, event_id): def get_stream_token_for_event(self, event_id):
"""The stream token for an event """The stream token for an event
@ -832,3 +703,168 @@ class StreamStore(SQLBaseStore):
def has_room_changed_since(self, room_id, stream_id): def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id) return self._events_stream_cache.has_entity_changed(room_id, stream_id)
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the
# 'private' StreamToken class in this file.
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
from_id = RoomStreamToken.parse_stream_token(from_key)
to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key:
defer.returnValue(([], to_key))
return
# select all the events between from/to with a sensible limit
sql = (
"SELECT e.event_id, e.room_id, e.type, s.state_key, "
"e.stream_ordering FROM events AS e "
"LEFT JOIN state_events as s ON "
"e.event_id = s.event_id "
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % {
"limit": limit
}
def f(txn):
# pull out all the events between the tokens
txn.execute(sql, (from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
# Logic:
# - We want ALL events which match the AS room_id regex
# - We want ALL events which match the rooms represented by the AS
# room_alias regex
# - We want ALL events for rooms that AS users have joined.
# This is currently supported via get_app_service_rooms (which is
# used for the Notifier listener rooms). We can't reasonably make a
# SQL query for these room IDs, so we'll pull all the events between
# from/to and filter in python.
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
room_ids_for_as = [r.room_id for r in rooms_for_as]
def app_service_interested(row):
if row["room_id"] in room_ids_for_as:
return True
if row["type"] == EventTypes.Member:
if service.is_interested_in_user(row.get("state_key")):
return True
return False
return [r for r in rows if app_service_interested(r)]
rows = yield self.runInteraction("get_appservice_room_stream", f)
ret = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
self._set_before_and_after(ret, rows, topo_order=from_id is None)
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
defer.returnValue((ret, key))
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == 'b':
order = "DESC"
bounds = upper_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, lower_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
RoomStreamToken.parse(from_key), self.database_engine
)
if to_key:
bounds = "%s AND %s" % (bounds, upper_bound(
RoomStreamToken.parse(to_key), self.database_engine
))
filter_clause, filter_args = filter_to_clause(event_filter)
if filter_clause:
bounds += " AND " + filter_clause
args.extend(filter_args)
if int(limit) > 0:
args.append(int(limit))
limit_str = " LIMIT ?"
else:
limit_str = ""
sql = (
"SELECT * FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
) % {
"bounds": bounds,
"order": order,
"limit": limit_str
}
def f(txn):
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
return rows, next_token,
rows, token = yield self.runInteraction("paginate_room_events", f)
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
self._set_before_and_after(events, rows)
defer.returnValue((events, token))