mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-11-12 11:26:37 -05:00
Merge branch 'develop' into matthew/gin_work_mem
This commit is contained in:
commit
bb9f0f3cdb
87 changed files with 2822 additions and 1122 deletions
|
|
@ -291,33 +291,33 @@ class SQLBaseStore(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def runInteraction(self, desc, func, *args, **kwargs):
|
||||
"""Wraps the .runInteraction() method on the underlying db_pool."""
|
||||
current_context = LoggingContext.current_context()
|
||||
"""Starts a transaction on the database and runs a given function
|
||||
|
||||
start_time = time.time() * 1000
|
||||
Arguments:
|
||||
desc (str): description of the transaction, for logging and metrics
|
||||
func (func): callback function, which will be called with a
|
||||
database transaction (twisted.enterprise.adbapi.Transaction) as
|
||||
its first argument, followed by `args` and `kwargs`.
|
||||
|
||||
args (list): positional args to pass to `func`
|
||||
kwargs (dict): named args to pass to `func`
|
||||
|
||||
Returns:
|
||||
Deferred: The result of func
|
||||
"""
|
||||
current_context = LoggingContext.current_context()
|
||||
|
||||
after_callbacks = []
|
||||
final_callbacks = []
|
||||
|
||||
def inner_func(conn, *args, **kwargs):
|
||||
with LoggingContext("runInteraction") as context:
|
||||
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
|
||||
|
||||
if self.database_engine.is_connection_closed(conn):
|
||||
logger.debug("Reconnecting closed database connection")
|
||||
conn.reconnect()
|
||||
|
||||
current_context.copy_to(context)
|
||||
return self._new_transaction(
|
||||
conn, desc, after_callbacks, final_callbacks, current_context,
|
||||
func, *args, **kwargs
|
||||
)
|
||||
return self._new_transaction(
|
||||
conn, desc, after_callbacks, final_callbacks, current_context,
|
||||
func, *args, **kwargs
|
||||
)
|
||||
|
||||
try:
|
||||
with PreserveLoggingContext():
|
||||
result = yield self._db_pool.runWithConnection(
|
||||
inner_func, *args, **kwargs
|
||||
)
|
||||
result = yield self.runWithConnection(inner_func, *args, **kwargs)
|
||||
|
||||
for after_callback, after_args, after_kwargs in after_callbacks:
|
||||
after_callback(*after_args, **after_kwargs)
|
||||
|
|
@ -329,14 +329,27 @@ class SQLBaseStore(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def runWithConnection(self, func, *args, **kwargs):
|
||||
"""Wraps the .runInteraction() method on the underlying db_pool."""
|
||||
"""Wraps the .runWithConnection() method on the underlying db_pool.
|
||||
|
||||
Arguments:
|
||||
func (func): callback function, which will be called with a
|
||||
database connection (twisted.enterprise.adbapi.Connection) as
|
||||
its first argument, followed by `args` and `kwargs`.
|
||||
args (list): positional args to pass to `func`
|
||||
kwargs (dict): named args to pass to `func`
|
||||
|
||||
Returns:
|
||||
Deferred: The result of func
|
||||
"""
|
||||
current_context = LoggingContext.current_context()
|
||||
|
||||
start_time = time.time() * 1000
|
||||
|
||||
def inner_func(conn, *args, **kwargs):
|
||||
with LoggingContext("runWithConnection") as context:
|
||||
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
|
||||
sched_duration_ms = time.time() * 1000 - start_time
|
||||
sql_scheduling_timer.inc_by(sched_duration_ms)
|
||||
current_context.add_database_scheduled(sched_duration_ms)
|
||||
|
||||
if self.database_engine.is_connection_closed(conn):
|
||||
logger.debug("Reconnecting closed database connection")
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from synapse.util.logutils import log_function
|
|||
from synapse.util.metrics import Measure
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.state import resolve_events
|
||||
from synapse.state import resolve_events_with_factory
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.types import get_domain_from_id
|
||||
|
||||
|
|
@ -110,7 +110,7 @@ class _EventPeristenceQueue(object):
|
|||
end_item.events_and_contexts.extend(events_and_contexts)
|
||||
return end_item.deferred.observe()
|
||||
|
||||
deferred = ObservableDeferred(defer.Deferred())
|
||||
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
|
||||
|
||||
queue.append(self._EventPersistQueueItem(
|
||||
events_and_contexts=events_and_contexts,
|
||||
|
|
@ -146,18 +146,25 @@ class _EventPeristenceQueue(object):
|
|||
try:
|
||||
queue = self._get_drainining_queue(room_id)
|
||||
for item in queue:
|
||||
# handle_queue_loop runs in the sentinel logcontext, so
|
||||
# there is no need to preserve_fn when running the
|
||||
# callbacks on the deferred.
|
||||
try:
|
||||
ret = yield per_item_callback(item)
|
||||
item.deferred.callback(ret)
|
||||
except Exception as e:
|
||||
item.deferred.errback(e)
|
||||
except Exception:
|
||||
item.deferred.errback()
|
||||
finally:
|
||||
queue = self._event_persist_queues.pop(room_id, None)
|
||||
if queue:
|
||||
self._event_persist_queues[room_id] = queue
|
||||
self._currently_persisting_rooms.discard(room_id)
|
||||
|
||||
preserve_fn(handle_queue_loop)()
|
||||
# set handle_queue_loop off on the background. We don't want to
|
||||
# attribute work done in it to the current request, so we drop the
|
||||
# logcontext altogether.
|
||||
with PreserveLoggingContext():
|
||||
handle_queue_loop()
|
||||
|
||||
def _get_drainining_queue(self, room_id):
|
||||
queue = self._event_persist_queues.setdefault(room_id, deque())
|
||||
|
|
@ -528,6 +535,12 @@ class EventsStore(SQLBaseStore):
|
|||
# the events we have yet to persist, so we need a slightly more
|
||||
# complicated event lookup function than simply looking the events
|
||||
# up in the db.
|
||||
|
||||
logger.info(
|
||||
"Resolving state for %s with %i state sets",
|
||||
room_id, len(state_sets),
|
||||
)
|
||||
|
||||
events_map = {ev.event_id: ev for ev, _ in events_context}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -550,7 +563,7 @@ class EventsStore(SQLBaseStore):
|
|||
to_return.update(evs)
|
||||
defer.returnValue(to_return)
|
||||
|
||||
current_state = yield resolve_events(
|
||||
current_state = yield resolve_events_with_factory(
|
||||
state_sets,
|
||||
state_map_factory=get_events,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -29,9 +29,6 @@ class MediaRepositoryStore(BackgroundUpdateStore):
|
|||
where_clause='url_cache IS NOT NULL',
|
||||
)
|
||||
|
||||
def get_default_thumbnails(self, top_level_type, sub_type):
|
||||
return []
|
||||
|
||||
def get_local_media(self, media_id):
|
||||
"""Get the metadata for a local piece of media
|
||||
Returns:
|
||||
|
|
@ -176,7 +173,14 @@ class MediaRepositoryStore(BackgroundUpdateStore):
|
|||
desc="store_cached_remote_media",
|
||||
)
|
||||
|
||||
def update_cached_last_access_time(self, origin_id_tuples, time_ts):
|
||||
def update_cached_last_access_time(self, local_media, remote_media, time_ms):
|
||||
"""Updates the last access time of the given media
|
||||
|
||||
Args:
|
||||
local_media (iterable[str]): Set of media_ids
|
||||
remote_media (iterable[(str, str)]): Set of (server_name, media_id)
|
||||
time_ms: Current time in milliseconds
|
||||
"""
|
||||
def update_cache_txn(txn):
|
||||
sql = (
|
||||
"UPDATE remote_media_cache SET last_access_ts = ?"
|
||||
|
|
@ -184,8 +188,18 @@ class MediaRepositoryStore(BackgroundUpdateStore):
|
|||
)
|
||||
|
||||
txn.executemany(sql, (
|
||||
(time_ts, media_origin, media_id)
|
||||
for media_origin, media_id in origin_id_tuples
|
||||
(time_ms, media_origin, media_id)
|
||||
for media_origin, media_id in remote_media
|
||||
))
|
||||
|
||||
sql = (
|
||||
"UPDATE local_media_repository SET last_access_ts = ?"
|
||||
" WHERE media_id = ?"
|
||||
)
|
||||
|
||||
txn.executemany(sql, (
|
||||
(time_ms, media_id)
|
||||
for media_id in local_media
|
||||
))
|
||||
|
||||
return self.runInteraction("update_cached_last_access_time", update_cache_txn)
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 46
|
||||
SCHEMA_VERSION = 47
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
|
|
|||
|
|
@ -591,7 +591,7 @@ class RoomStore(SQLBaseStore):
|
|||
"""
|
||||
UPDATE remote_media_cache
|
||||
SET quarantined_by = ?
|
||||
WHERE media_origin AND media_id = ?
|
||||
WHERE media_origin = ? AND media_id = ?
|
||||
""",
|
||||
(
|
||||
(quarantined_by, origin, media_id)
|
||||
|
|
|
|||
16
synapse/storage/schema/delta/47/last_access_media.sql
Normal file
16
synapse/storage/schema/delta/47/last_access_media.sql
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
ALTER TABLE local_media_repository ADD COLUMN last_access_ts BIGINT;
|
||||
|
|
@ -641,8 +641,12 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
if self.hs.config.user_directory_search_all_users:
|
||||
join_clause = ""
|
||||
where_clause = "?<>''" # naughty hack to keep the same number of binds
|
||||
# make s.user_id null to keep the ordering algorithm happy
|
||||
join_clause = """
|
||||
CROSS JOIN (SELECT NULL as user_id) AS s
|
||||
"""
|
||||
join_args = ()
|
||||
where_clause = "1=1"
|
||||
else:
|
||||
join_clause = """
|
||||
LEFT JOIN users_in_public_rooms AS p USING (user_id)
|
||||
|
|
@ -651,6 +655,7 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
WHERE user_id = ? AND share_private
|
||||
) AS s USING (user_id)
|
||||
"""
|
||||
join_args = (user_id,)
|
||||
where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
|
|
@ -692,7 +697,7 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
avatar_url IS NULL
|
||||
LIMIT ?
|
||||
""" % (join_clause, where_clause)
|
||||
args = (user_id, full_query, exact_query, prefix_query, limit + 1,)
|
||||
args = join_args + (full_query, exact_query, prefix_query, limit + 1,)
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
search_query = _parse_query_sqlite(search_term)
|
||||
|
||||
|
|
@ -710,7 +715,7 @@ class UserDirectoryStore(SQLBaseStore):
|
|||
avatar_url IS NULL
|
||||
LIMIT ?
|
||||
""" % (join_clause, where_clause)
|
||||
args = (user_id, search_query, limit + 1)
|
||||
args = join_args + (search_query, limit + 1)
|
||||
else:
|
||||
# This should be unreachable.
|
||||
raise Exception("Unrecognized database engine")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue