Use new EventPersistenceStore

This commit is contained in:
Erik Johnston 2019-10-23 12:02:36 +01:00
parent 73cf63784b
commit 3ca4c7c516
11 changed files with 44 additions and 19 deletions

View File

@ -109,6 +109,7 @@ class FederationHandler(BaseHandler):
self.hs = hs self.hs = hs
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.federation_client = hs.get_federation_client() self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler() self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname self.server_name = hs.hostname
@ -2648,7 +2649,7 @@ class FederationHandler(BaseHandler):
backfilled=backfilled, backfilled=backfilled,
) )
else: else:
max_stream_id = yield self.store.persist_events( max_stream_id = yield self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled event_and_contexts, backfilled=backfilled
) )

View File

@ -234,6 +234,7 @@ class EventCreationHandler(object):
self.hs = hs self.hs = hs
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.validator = EventValidator() self.validator = EventValidator()
@ -868,7 +869,7 @@ class EventCreationHandler(object):
if prev_state_ids: if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden") raise AuthError(403, "Changing the room create event is forbidden")
(event_stream_id, max_stream_id) = yield self.store.persist_event( event_stream_id, max_stream_id = yield self.storage.persistence.persist_event(
event, context=context event, context=context
) )

View File

@ -95,6 +95,7 @@ from synapse.server_notices.worker_server_notices_sender import (
WorkerServerNoticesSender, WorkerServerNoticesSender,
) )
from synapse.state import StateHandler, StateResolutionHandler from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import DataStores, Storage
from synapse.streams.events import EventSources from synapse.streams.events import EventSources
from synapse.util import Clock from synapse.util import Clock
from synapse.util.distributor import Distributor from synapse.util.distributor import Distributor
@ -196,6 +197,7 @@ class HomeServer(object):
"account_validity_handler", "account_validity_handler",
"saml_handler", "saml_handler",
"event_client_serializer", "event_client_serializer",
"storage",
] ]
REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"] REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
@ -225,6 +227,7 @@ class HomeServer(object):
self.registration_ratelimiter = Ratelimiter() self.registration_ratelimiter = Ratelimiter()
self.datastore = None self.datastore = None
self.datastores = None
# Other kwargs are explicit dependencies # Other kwargs are explicit dependencies
for depname in kwargs: for depname in kwargs:
@ -234,6 +237,7 @@ class HomeServer(object):
logger.info("Setting up.") logger.info("Setting up.")
with self.get_db_conn() as conn: with self.get_db_conn() as conn:
self.datastore = self.DATASTORE_CLASS(conn, self) self.datastore = self.DATASTORE_CLASS(conn, self)
self.datastores = DataStores(self.datastore, conn, self)
conn.commit() conn.commit()
logger.info("Finished setting up.") logger.info("Finished setting up.")
@ -266,7 +270,7 @@ class HomeServer(object):
return self.clock return self.clock
def get_datastore(self): def get_datastore(self):
return self.datastore return self.datastores.main
def get_config(self): def get_config(self):
return self.config return self.config
@ -537,6 +541,9 @@ class HomeServer(object):
def build_event_client_serializer(self): def build_event_client_serializer(self):
return EventClientSerializer(self) return EventClientSerializer(self)
def build_storage(self) -> Storage:
return Storage(self, self.datastores)
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

View File

@ -41,6 +41,7 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs): def prepare(self, reactor, clock, hs):
self.master_store = self.hs.get_datastore() self.master_store = self.hs.get_datastore()
self.storage = hs.get_storage()
self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs)
self.event_id = 0 self.event_id = 0

View File

@ -234,7 +234,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
) )
msg, msgctx = self.build_event() msg, msgctx = self.build_event()
self.get_success(self.master_store.persist_events([(j2, j2ctx), (msg, msgctx)])) self.get_success(
self.storage.persistence.persist_events([(j2, j2ctx), (msg, msgctx)])
)
self.replicate() self.replicate()
event_source = RoomEventSource(self.hs) event_source = RoomEventSource(self.hs)
@ -290,10 +292,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
if backfill: if backfill:
self.get_success( self.get_success(
self.master_store.persist_events([(event, context)], backfilled=True) self.storage.persistence.persist_events(
[(event, context)], backfilled=True
)
) )
else: else:
self.get_success(self.master_store.persist_event(event, context)) self.get_success(self.storage.persistence.persist_event(event, context))
return event return event

View File

@ -39,6 +39,7 @@ class RedactionTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs): def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.event_builder_factory = hs.get_event_builder_factory() self.event_builder_factory = hs.get_event_builder_factory()
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
@ -73,7 +74,7 @@ class RedactionTestCase(unittest.HomeserverTestCase):
self.event_creation_handler.create_new_client_event(builder) self.event_creation_handler.create_new_client_event(builder)
) )
self.get_success(self.store.persist_event(event, context)) self.get_success(self.storage.persistence.persist_event(event, context))
return event return event
@ -95,7 +96,7 @@ class RedactionTestCase(unittest.HomeserverTestCase):
self.event_creation_handler.create_new_client_event(builder) self.event_creation_handler.create_new_client_event(builder)
) )
self.get_success(self.store.persist_event(event, context)) self.get_success(self.storage.persistence.persist_event(event, context))
return event return event
@ -116,7 +117,7 @@ class RedactionTestCase(unittest.HomeserverTestCase):
self.event_creation_handler.create_new_client_event(builder) self.event_creation_handler.create_new_client_event(builder)
) )
self.get_success(self.store.persist_event(event, context)) self.get_success(self.storage.persistence.persist_event(event, context))
return event return event
@ -263,7 +264,7 @@ class RedactionTestCase(unittest.HomeserverTestCase):
) )
) )
self.get_success(self.store.persist_event(event_1, context_1)) self.get_success(self.storage.persistence.persist_event(event_1, context_1))
event_2, context_2 = self.get_success( event_2, context_2 = self.get_success(
self.event_creation_handler.create_new_client_event( self.event_creation_handler.create_new_client_event(
@ -282,7 +283,7 @@ class RedactionTestCase(unittest.HomeserverTestCase):
) )
) )
) )
self.get_success(self.store.persist_event(event_2, context_2)) self.get_success(self.storage.persistence.persist_event(event_2, context_2))
# fetch one of the redactions # fetch one of the redactions
fetched = self.get_success(self.store.get_event(redaction_event_id1)) fetched = self.get_success(self.store.get_event(redaction_event_id1))

View File

@ -62,6 +62,7 @@ class RoomEventsStoreTestCase(unittest.TestCase):
# Room events need the full datastore, for persist_event() and # Room events need the full datastore, for persist_event() and
# get_room_state() # get_room_state()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.event_factory = hs.get_event_factory() self.event_factory = hs.get_event_factory()
self.room = RoomID.from_string("!abcde:test") self.room = RoomID.from_string("!abcde:test")
@ -72,7 +73,7 @@ class RoomEventsStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def inject_room_event(self, **kwargs): def inject_room_event(self, **kwargs):
yield self.store.persist_event( yield self.storage.persistence.persist_event(
self.event_factory.create_event(room_id=self.room.to_string(), **kwargs) self.event_factory.create_event(room_id=self.room.to_string(), **kwargs)
) )

View File

@ -44,6 +44,7 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
# We can't test the RoomMemberStore on its own without the other event # We can't test the RoomMemberStore on its own without the other event
# storage logic # storage logic
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.event_builder_factory = hs.get_event_builder_factory() self.event_builder_factory = hs.get_event_builder_factory()
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
@ -70,7 +71,7 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
self.event_creation_handler.create_new_client_event(builder) self.event_creation_handler.create_new_client_event(builder)
) )
self.get_success(self.store.persist_event(event, context)) self.get_success(self.storage.persistence.persist_event(event, context))
return event return event

View File

@ -34,6 +34,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
hs = yield tests.utils.setup_test_homeserver(self.addCleanup) hs = yield tests.utils.setup_test_homeserver(self.addCleanup)
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.event_builder_factory = hs.get_event_builder_factory() self.event_builder_factory = hs.get_event_builder_factory()
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
@ -63,7 +64,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
builder builder
) )
yield self.store.persist_event(event, context) yield self.storage.persistence.persist_event(event, context)
return event return event

View File

@ -36,6 +36,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
self.event_creation_handler = self.hs.get_event_creation_handler() self.event_creation_handler = self.hs.get_event_creation_handler()
self.event_builder_factory = self.hs.get_event_builder_factory() self.event_builder_factory = self.hs.get_event_builder_factory()
self.store = self.hs.get_datastore() self.store = self.hs.get_datastore()
self.storage = self.hs.get_storage()
yield create_room(self.hs, TEST_ROOM_ID, "@someone:ROOM") yield create_room(self.hs, TEST_ROOM_ID, "@someone:ROOM")
@ -137,7 +138,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
event, context = yield self.event_creation_handler.create_new_client_event( event, context = yield self.event_creation_handler.create_new_client_event(
builder builder
) )
yield self.hs.get_datastore().persist_event(event, context) yield self.storage.persistence.persist_event(event, context)
return event return event
@defer.inlineCallbacks @defer.inlineCallbacks
@ -159,7 +160,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
builder builder
) )
yield self.hs.get_datastore().persist_event(event, context) yield self.storage.persistence.persist_event(event, context)
return event return event
@defer.inlineCallbacks @defer.inlineCallbacks
@ -180,7 +181,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
builder builder
) )
yield self.hs.get_datastore().persist_event(event, context) yield self.storage.persistence.persist_event(event, context)
return event return event
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -326,10 +326,16 @@ def setup_test_homeserver(
if homeserverToUse.__name__ == "TestHomeServer": if homeserverToUse.__name__ == "TestHomeServer":
hs.setup_master() hs.setup_master()
else: else:
# If we have been given an explicit datastore we probably want to mock
# out the DataStores somehow too. This all feels a bit wrong, but then
# mocking the stores feels wrong too.
datastores = Mock(datastore=datastore)
hs = homeserverToUse( hs = homeserverToUse(
name, name,
db_pool=None, db_pool=None,
datastore=datastore, datastore=datastore,
datastores=datastores,
config=config, config=config,
version_string="Synapse/tests", version_string="Synapse/tests",
database_engine=db_engine, database_engine=db_engine,
@ -647,7 +653,7 @@ def create_room(hs, room_id, creator_id):
creator_id (str) creator_id (str)
""" """
store = hs.get_datastore() persistence_store = hs.get_storage().persistence
event_builder_factory = hs.get_event_builder_factory() event_builder_factory = hs.get_event_builder_factory()
event_creation_handler = hs.get_event_creation_handler() event_creation_handler = hs.get_event_creation_handler()
@ -664,4 +670,4 @@ def create_room(hs, room_id, creator_id):
event, context = yield event_creation_handler.create_new_client_event(builder) event, context = yield event_creation_handler.create_new_client_event(builder)
yield store.persist_event(event, context) yield persistence_store.persist_event(event, context)