mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-23 07:30:59 -05:00
Remove the unused public_room_list_stream (#10565)
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
This commit is contained in:
parent
c8132f4a31
commit
84469bdac7
1
changelog.d/10565.misc
Normal file
1
changelog.d/10565.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Remove the unused public rooms replication stream.
|
@ -38,7 +38,6 @@ from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
|||||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
from synapse.replication.slave.storage.room import RoomStore
|
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
from synapse.util.versionstring import get_version_string
|
from synapse.util.versionstring import get_version_string
|
||||||
@ -58,7 +57,6 @@ class AdminCmdSlavedStore(
|
|||||||
SlavedPushRuleStore,
|
SlavedPushRuleStore,
|
||||||
SlavedEventStore,
|
SlavedEventStore,
|
||||||
SlavedClientIpStore,
|
SlavedClientIpStore,
|
||||||
RoomStore,
|
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
|
@ -64,7 +64,6 @@ from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
|||||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
from synapse.replication.slave.storage.room import RoomStore
|
|
||||||
from synapse.rest.admin import register_servlets_for_media_repo
|
from synapse.rest.admin import register_servlets_for_media_repo
|
||||||
from synapse.rest.client import (
|
from synapse.rest.client import (
|
||||||
account_data,
|
account_data,
|
||||||
@ -114,6 +113,7 @@ from synapse.storage.databases.main.monthly_active_users import (
|
|||||||
MonthlyActiveUsersWorkerStore,
|
MonthlyActiveUsersWorkerStore,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.presence import PresenceStore
|
from synapse.storage.databases.main.presence import PresenceStore
|
||||||
|
from synapse.storage.databases.main.room import RoomWorkerStore
|
||||||
from synapse.storage.databases.main.search import SearchStore
|
from synapse.storage.databases.main.search import SearchStore
|
||||||
from synapse.storage.databases.main.stats import StatsStore
|
from synapse.storage.databases.main.stats import StatsStore
|
||||||
from synapse.storage.databases.main.transactions import TransactionWorkerStore
|
from synapse.storage.databases.main.transactions import TransactionWorkerStore
|
||||||
@ -237,7 +237,7 @@ class GenericWorkerSlavedStore(
|
|||||||
ClientIpWorkerStore,
|
ClientIpWorkerStore,
|
||||||
SlavedEventStore,
|
SlavedEventStore,
|
||||||
SlavedKeyStore,
|
SlavedKeyStore,
|
||||||
RoomStore,
|
RoomWorkerStore,
|
||||||
DirectoryStore,
|
DirectoryStore,
|
||||||
SlavedApplicationServiceStore,
|
SlavedApplicationServiceStore,
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
|
@ -1,37 +0,0 @@
|
|||||||
# Copyright 2015, 2016 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from synapse.replication.tcp.streams import PublicRoomsStream
|
|
||||||
from synapse.storage.database import DatabasePool
|
|
||||||
from synapse.storage.databases.main.room import RoomWorkerStore
|
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
|
||||||
|
|
||||||
|
|
||||||
class RoomStore(RoomWorkerStore, BaseSlavedStore):
|
|
||||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
|
||||||
super().__init__(database, db_conn, hs)
|
|
||||||
self._public_room_id_gen = SlavedIdTracker(
|
|
||||||
db_conn, "public_room_list_stream", "stream_id"
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_current_public_room_stream_id(self):
|
|
||||||
return self._public_room_id_gen.get_current_token()
|
|
||||||
|
|
||||||
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
|
||||||
if stream_name == PublicRoomsStream.NAME:
|
|
||||||
self._public_room_id_gen.advance(instance_name, token)
|
|
||||||
|
|
||||||
return super().process_replication_rows(stream_name, instance_name, token, rows)
|
|
@ -32,7 +32,6 @@ from synapse.replication.tcp.streams._base import (
|
|||||||
GroupServerStream,
|
GroupServerStream,
|
||||||
PresenceFederationStream,
|
PresenceFederationStream,
|
||||||
PresenceStream,
|
PresenceStream,
|
||||||
PublicRoomsStream,
|
|
||||||
PushersStream,
|
PushersStream,
|
||||||
PushRulesStream,
|
PushRulesStream,
|
||||||
ReceiptsStream,
|
ReceiptsStream,
|
||||||
@ -57,7 +56,6 @@ STREAMS_MAP = {
|
|||||||
PushRulesStream,
|
PushRulesStream,
|
||||||
PushersStream,
|
PushersStream,
|
||||||
CachesStream,
|
CachesStream,
|
||||||
PublicRoomsStream,
|
|
||||||
DeviceListsStream,
|
DeviceListsStream,
|
||||||
ToDeviceStream,
|
ToDeviceStream,
|
||||||
FederationStream,
|
FederationStream,
|
||||||
@ -79,7 +77,6 @@ __all__ = [
|
|||||||
"PushRulesStream",
|
"PushRulesStream",
|
||||||
"PushersStream",
|
"PushersStream",
|
||||||
"CachesStream",
|
"CachesStream",
|
||||||
"PublicRoomsStream",
|
|
||||||
"DeviceListsStream",
|
"DeviceListsStream",
|
||||||
"ToDeviceStream",
|
"ToDeviceStream",
|
||||||
"TagAccountDataStream",
|
"TagAccountDataStream",
|
||||||
|
@ -447,31 +447,6 @@ class CachesStream(Stream):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class PublicRoomsStream(Stream):
|
|
||||||
"""The public rooms list changed"""
|
|
||||||
|
|
||||||
PublicRoomsStreamRow = namedtuple(
|
|
||||||
"PublicRoomsStreamRow",
|
|
||||||
(
|
|
||||||
"room_id", # str
|
|
||||||
"visibility", # str
|
|
||||||
"appservice_id", # str, optional
|
|
||||||
"network_id", # str, optional
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
NAME = "public_rooms"
|
|
||||||
ROW_TYPE = PublicRoomsStreamRow
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
store = hs.get_datastore()
|
|
||||||
super().__init__(
|
|
||||||
hs.get_instance_name(),
|
|
||||||
current_token_without_instance(store.get_current_public_room_stream_id),
|
|
||||||
store.get_all_new_public_rooms,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceListsStream(Stream):
|
class DeviceListsStream(Stream):
|
||||||
"""Either a user has updated their devices or a remote server needs to be
|
"""Either a user has updated their devices or a remote server needs to be
|
||||||
told about a device update.
|
told about a device update.
|
||||||
|
@ -127,9 +127,6 @@ class DataStore(
|
|||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.database_engine = database.engine
|
self.database_engine = database.engine
|
||||||
|
|
||||||
self._public_room_id_gen = StreamIdGenerator(
|
|
||||||
db_conn, "public_room_list_stream", "stream_id"
|
|
||||||
)
|
|
||||||
self._device_list_id_gen = StreamIdGenerator(
|
self._device_list_id_gen = StreamIdGenerator(
|
||||||
db_conn,
|
db_conn,
|
||||||
"device_lists_stream",
|
"device_lists_stream",
|
||||||
@ -170,6 +167,7 @@ class DataStore(
|
|||||||
sequence_name="cache_invalidation_stream_seq",
|
sequence_name="cache_invalidation_stream_seq",
|
||||||
writers=[],
|
writers=[],
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self._cache_id_gen = None
|
self._cache_id_gen = None
|
||||||
|
|
||||||
|
@ -890,55 +890,6 @@ class RoomWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
return total_media_quarantined
|
return total_media_quarantined
|
||||||
|
|
||||||
async def get_all_new_public_rooms(
|
|
||||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
|
||||||
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|
|
||||||
"""Get updates for public rooms replication stream.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
instance_name: The writer we want to fetch updates from. Unused
|
|
||||||
here since there is only ever one writer.
|
|
||||||
last_id: The token to fetch updates from. Exclusive.
|
|
||||||
current_id: The token to fetch updates up to. Inclusive.
|
|
||||||
limit: The requested limit for the number of rows to return. The
|
|
||||||
function may return more or fewer rows.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A tuple consisting of: the updates, a token to use to fetch
|
|
||||||
subsequent updates, and whether we returned fewer rows than exists
|
|
||||||
between the requested tokens due to the limit.
|
|
||||||
|
|
||||||
The token returned can be used in a subsequent call to this
|
|
||||||
function to get further updatees.
|
|
||||||
|
|
||||||
The updates are a list of 2-tuples of stream ID and the row data
|
|
||||||
"""
|
|
||||||
if last_id == current_id:
|
|
||||||
return [], current_id, False
|
|
||||||
|
|
||||||
def get_all_new_public_rooms(txn):
|
|
||||||
sql = """
|
|
||||||
SELECT stream_id, room_id, visibility, appservice_id, network_id
|
|
||||||
FROM public_room_list_stream
|
|
||||||
WHERE stream_id > ? AND stream_id <= ?
|
|
||||||
ORDER BY stream_id ASC
|
|
||||||
LIMIT ?
|
|
||||||
"""
|
|
||||||
|
|
||||||
txn.execute(sql, (last_id, current_id, limit))
|
|
||||||
updates = [(row[0], row[1:]) for row in txn]
|
|
||||||
limited = False
|
|
||||||
upto_token = current_id
|
|
||||||
if len(updates) >= limit:
|
|
||||||
upto_token = updates[-1][0]
|
|
||||||
limited = True
|
|
||||||
|
|
||||||
return updates, upto_token, limited
|
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
|
||||||
"get_all_new_public_rooms", get_all_new_public_rooms
|
|
||||||
)
|
|
||||||
|
|
||||||
async def get_rooms_for_retention_period_in_range(
|
async def get_rooms_for_retention_period_in_range(
|
||||||
self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
|
self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
|
||||||
) -> Dict[str, dict]:
|
) -> Dict[str, dict]:
|
||||||
@ -1410,34 +1361,17 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||||||
StoreError if the room could not be stored.
|
StoreError if the room could not be stored.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
await self.db_pool.simple_insert(
|
||||||
def store_room_txn(txn, next_id):
|
"rooms",
|
||||||
self.db_pool.simple_insert_txn(
|
{
|
||||||
txn,
|
"room_id": room_id,
|
||||||
"rooms",
|
"creator": room_creator_user_id,
|
||||||
{
|
"is_public": is_public,
|
||||||
"room_id": room_id,
|
"room_version": room_version.identifier,
|
||||||
"creator": room_creator_user_id,
|
"has_auth_chain_index": True,
|
||||||
"is_public": is_public,
|
},
|
||||||
"room_version": room_version.identifier,
|
desc="store_room",
|
||||||
"has_auth_chain_index": True,
|
)
|
||||||
},
|
|
||||||
)
|
|
||||||
if is_public:
|
|
||||||
self.db_pool.simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="public_room_list_stream",
|
|
||||||
values={
|
|
||||||
"stream_id": next_id,
|
|
||||||
"room_id": room_id,
|
|
||||||
"visibility": is_public,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
async with self._public_room_id_gen.get_next() as next_id:
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"store_room_txn", store_room_txn, next_id
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("store_room with room_id=%s failed: %s", room_id, e)
|
logger.error("store_room with room_id=%s failed: %s", room_id, e)
|
||||||
raise StoreError(500, "Problem creating room.")
|
raise StoreError(500, "Problem creating room.")
|
||||||
@ -1470,49 +1404,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||||||
lock=False,
|
lock=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def set_room_is_public(self, room_id, is_public):
|
async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
|
||||||
def set_room_is_public_txn(txn, next_id):
|
await self.db_pool.simple_update_one(
|
||||||
self.db_pool.simple_update_one_txn(
|
table="rooms",
|
||||||
txn,
|
keyvalues={"room_id": room_id},
|
||||||
table="rooms",
|
updatevalues={"is_public": is_public},
|
||||||
keyvalues={"room_id": room_id},
|
desc="set_room_is_public",
|
||||||
updatevalues={"is_public": is_public},
|
)
|
||||||
)
|
|
||||||
|
|
||||||
entries = self.db_pool.simple_select_list_txn(
|
|
||||||
txn,
|
|
||||||
table="public_room_list_stream",
|
|
||||||
keyvalues={
|
|
||||||
"room_id": room_id,
|
|
||||||
"appservice_id": None,
|
|
||||||
"network_id": None,
|
|
||||||
},
|
|
||||||
retcols=("stream_id", "visibility"),
|
|
||||||
)
|
|
||||||
|
|
||||||
entries.sort(key=lambda r: r["stream_id"])
|
|
||||||
|
|
||||||
add_to_stream = True
|
|
||||||
if entries:
|
|
||||||
add_to_stream = bool(entries[-1]["visibility"]) != is_public
|
|
||||||
|
|
||||||
if add_to_stream:
|
|
||||||
self.db_pool.simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="public_room_list_stream",
|
|
||||||
values={
|
|
||||||
"stream_id": next_id,
|
|
||||||
"room_id": room_id,
|
|
||||||
"visibility": is_public,
|
|
||||||
"appservice_id": None,
|
|
||||||
"network_id": None,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
async with self._public_room_id_gen.get_next() as next_id:
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"set_room_is_public", set_room_is_public_txn, next_id
|
|
||||||
)
|
|
||||||
self.hs.get_notifier().on_new_replication_data()
|
self.hs.get_notifier().on_new_replication_data()
|
||||||
|
|
||||||
async def set_room_is_public_appservice(
|
async def set_room_is_public_appservice(
|
||||||
@ -1533,68 +1432,33 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||||||
list.
|
list.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def set_room_is_public_appservice_txn(txn, next_id):
|
if is_public:
|
||||||
if is_public:
|
await self.db_pool.simple_upsert(
|
||||||
try:
|
table="appservice_room_list",
|
||||||
self.db_pool.simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="appservice_room_list",
|
|
||||||
values={
|
|
||||||
"appservice_id": appservice_id,
|
|
||||||
"network_id": network_id,
|
|
||||||
"room_id": room_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
except self.database_engine.module.IntegrityError:
|
|
||||||
# We've already inserted, nothing to do.
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
self.db_pool.simple_delete_txn(
|
|
||||||
txn,
|
|
||||||
table="appservice_room_list",
|
|
||||||
keyvalues={
|
|
||||||
"appservice_id": appservice_id,
|
|
||||||
"network_id": network_id,
|
|
||||||
"room_id": room_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
entries = self.db_pool.simple_select_list_txn(
|
|
||||||
txn,
|
|
||||||
table="public_room_list_stream",
|
|
||||||
keyvalues={
|
keyvalues={
|
||||||
"room_id": room_id,
|
|
||||||
"appservice_id": appservice_id,
|
"appservice_id": appservice_id,
|
||||||
"network_id": network_id,
|
"network_id": network_id,
|
||||||
|
"room_id": room_id,
|
||||||
},
|
},
|
||||||
retcols=("stream_id", "visibility"),
|
values={},
|
||||||
|
insertion_values={
|
||||||
|
"appservice_id": appservice_id,
|
||||||
|
"network_id": network_id,
|
||||||
|
"room_id": room_id,
|
||||||
|
},
|
||||||
|
desc="set_room_is_public_appservice_true",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await self.db_pool.simple_delete(
|
||||||
|
table="appservice_room_list",
|
||||||
|
keyvalues={
|
||||||
|
"appservice_id": appservice_id,
|
||||||
|
"network_id": network_id,
|
||||||
|
"room_id": room_id,
|
||||||
|
},
|
||||||
|
desc="set_room_is_public_appservice_false",
|
||||||
)
|
)
|
||||||
|
|
||||||
entries.sort(key=lambda r: r["stream_id"])
|
|
||||||
|
|
||||||
add_to_stream = True
|
|
||||||
if entries:
|
|
||||||
add_to_stream = bool(entries[-1]["visibility"]) != is_public
|
|
||||||
|
|
||||||
if add_to_stream:
|
|
||||||
self.db_pool.simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="public_room_list_stream",
|
|
||||||
values={
|
|
||||||
"stream_id": next_id,
|
|
||||||
"room_id": room_id,
|
|
||||||
"visibility": is_public,
|
|
||||||
"appservice_id": appservice_id,
|
|
||||||
"network_id": network_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
async with self._public_room_id_gen.get_next() as next_id:
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"set_room_is_public_appservice",
|
|
||||||
set_room_is_public_appservice_txn,
|
|
||||||
next_id,
|
|
||||||
)
|
|
||||||
self.hs.get_notifier().on_new_replication_data()
|
self.hs.get_notifier().on_new_replication_data()
|
||||||
|
|
||||||
async def add_event_report(
|
async def add_event_report(
|
||||||
@ -1787,9 +1651,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||||||
"get_event_reports_paginate", _get_event_reports_paginate_txn
|
"get_event_reports_paginate", _get_event_reports_paginate_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_current_public_room_stream_id(self):
|
|
||||||
return self._public_room_id_gen.get_current_token()
|
|
||||||
|
|
||||||
async def block_room(self, room_id: str, user_id: str) -> None:
|
async def block_room(self, room_id: str, user_id: str) -> None:
|
||||||
"""Marks the room as blocked. Can be called multiple times.
|
"""Marks the room as blocked. Can be called multiple times.
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
SCHEMA_VERSION = 62
|
SCHEMA_VERSION = 63
|
||||||
"""Represents the expectations made by the codebase about the database schema
|
"""Represents the expectations made by the codebase about the database schema
|
||||||
|
|
||||||
This should be incremented whenever the codebase changes its requirements on the
|
This should be incremented whenever the codebase changes its requirements on the
|
||||||
@ -25,6 +25,11 @@ for more information on how this works.
|
|||||||
Changes in SCHEMA_VERSION = 61:
|
Changes in SCHEMA_VERSION = 61:
|
||||||
- The `user_stats_historical` and `room_stats_historical` tables are not written and
|
- The `user_stats_historical` and `room_stats_historical` tables are not written and
|
||||||
are not read (previously, they were written but not read).
|
are not read (previously, they were written but not read).
|
||||||
|
|
||||||
|
Changes in SCHEMA_VERSION = 63:
|
||||||
|
- The `public_room_list_stream` table is not written nor read to
|
||||||
|
(previously, it was written and read to, but not for any significant purpose).
|
||||||
|
https://github.com/matrix-org/synapse/pull/10565
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user