mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-12-10 12:42:40 -05:00
Replace returnValue with return (#5736)
This commit is contained in:
parent
18a466b84e
commit
4806651744
177 changed files with 1359 additions and 1513 deletions
|
|
@ -498,7 +498,7 @@ class DataStore(
|
|||
)
|
||||
count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
|
||||
retval = {"users": users, "total": count}
|
||||
defer.returnValue(retval)
|
||||
return retval
|
||||
|
||||
def search_users(self, term):
|
||||
"""Function to search users list for one or more users with
|
||||
|
|
|
|||
|
|
@ -513,7 +513,7 @@ class SQLBaseStore(object):
|
|||
after_callback(*after_args, **after_kwargs)
|
||||
raise
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def runWithConnection(self, func, *args, **kwargs):
|
||||
|
|
@ -553,7 +553,7 @@ class SQLBaseStore(object):
|
|||
with PreserveLoggingContext():
|
||||
result = yield self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def cursor_to_dict(cursor):
|
||||
|
|
@ -615,8 +615,8 @@ class SQLBaseStore(object):
|
|||
# a cursor after we receive an error from the db.
|
||||
if not or_ignore:
|
||||
raise
|
||||
defer.returnValue(False)
|
||||
defer.returnValue(True)
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _simple_insert_txn(txn, table, values):
|
||||
|
|
@ -708,7 +708,7 @@ class SQLBaseStore(object):
|
|||
insertion_values,
|
||||
lock=lock,
|
||||
)
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
except self.database_engine.module.IntegrityError as e:
|
||||
attempts += 1
|
||||
if attempts >= 5:
|
||||
|
|
@ -1121,7 +1121,7 @@ class SQLBaseStore(object):
|
|||
results = []
|
||||
|
||||
if not iterable:
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
# iterables can not be sliced, so convert it to a list first
|
||||
it_list = list(iterable)
|
||||
|
|
@ -1142,7 +1142,7 @@ class SQLBaseStore(object):
|
|||
|
||||
results.extend(rows)
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
@classmethod
|
||||
def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols):
|
||||
|
|
|
|||
|
|
@ -111,9 +111,9 @@ class AccountDataWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
if result:
|
||||
defer.returnValue(json.loads(result))
|
||||
return json.loads(result)
|
||||
else:
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
@cached(num_args=2)
|
||||
def get_account_data_for_room(self, user_id, room_id):
|
||||
|
|
@ -264,11 +264,9 @@ class AccountDataWorkerStore(SQLBaseStore):
|
|||
on_invalidate=cache_context.invalidate,
|
||||
)
|
||||
if not ignored_account_data:
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
defer.returnValue(
|
||||
ignored_user_id in ignored_account_data.get("ignored_users", {})
|
||||
)
|
||||
return ignored_user_id in ignored_account_data.get("ignored_users", {})
|
||||
|
||||
|
||||
class AccountDataStore(AccountDataWorkerStore):
|
||||
|
|
@ -332,7 +330,7 @@ class AccountDataStore(AccountDataWorkerStore):
|
|||
)
|
||||
|
||||
result = self._account_data_id_gen.get_current_token()
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_account_data_for_user(self, user_id, account_data_type, content):
|
||||
|
|
@ -373,7 +371,7 @@ class AccountDataStore(AccountDataWorkerStore):
|
|||
)
|
||||
|
||||
result = self._account_data_id_gen.get_current_token()
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
def _update_max_stream_id(self, next_id):
|
||||
"""Update the max stream_id
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
for service in as_list:
|
||||
if service.id == res["as_id"]:
|
||||
services.append(service)
|
||||
defer.returnValue(services)
|
||||
return services
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservice_state(self, service):
|
||||
|
|
@ -164,9 +164,9 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
desc="get_appservice_state",
|
||||
)
|
||||
if result:
|
||||
defer.returnValue(result.get("state"))
|
||||
return result.get("state")
|
||||
return
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
def set_appservice_state(self, service, state):
|
||||
"""Set the application service state.
|
||||
|
|
@ -298,15 +298,13 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
)
|
||||
|
||||
if not entry:
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
event_ids = json.loads(entry["event_ids"])
|
||||
|
||||
events = yield self.get_events_as_list(event_ids)
|
||||
|
||||
defer.returnValue(
|
||||
AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
|
||||
)
|
||||
return AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
|
||||
|
||||
def _get_last_txn(self, txn, service_id):
|
||||
txn.execute(
|
||||
|
|
@ -360,7 +358,7 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
|
||||
events = yield self.get_events_as_list(event_ids)
|
||||
|
||||
defer.returnValue((upper_bound, events))
|
||||
return (upper_bound, events)
|
||||
|
||||
|
||||
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
" Unscheduling background update task."
|
||||
)
|
||||
self._all_done = True
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def has_completed_background_updates(self):
|
||||
|
|
@ -127,11 +127,11 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
# if we've previously determined that there is nothing left to do, that
|
||||
# is easy
|
||||
if self._all_done:
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
# obviously, if we have things in our queue, we're not done.
|
||||
if self._background_update_queue:
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
# otherwise, check if there are updates to be run. This is important,
|
||||
# as we may be running on a worker which doesn't perform the bg updates
|
||||
|
|
@ -144,9 +144,9 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
)
|
||||
if not updates:
|
||||
self._all_done = True
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_next_background_update(self, desired_duration_ms):
|
||||
|
|
@ -173,14 +173,14 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
if not self._background_update_queue:
|
||||
# no work left to do
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
# pop from the front, and add back to the back
|
||||
update_name = self._background_update_queue.pop(0)
|
||||
self._background_update_queue.append(update_name)
|
||||
|
||||
res = yield self._do_background_update(update_name, desired_duration_ms)
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_background_update(self, update_name, desired_duration_ms):
|
||||
|
|
@ -231,7 +231,7 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
performance.update(items_updated, duration_ms)
|
||||
|
||||
defer.returnValue(len(self._background_update_performance))
|
||||
return len(self._background_update_performance)
|
||||
|
||||
def register_background_update_handler(self, update_name, update_handler):
|
||||
"""Register a handler for doing a background update.
|
||||
|
|
@ -266,7 +266,7 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
@defer.inlineCallbacks
|
||||
def noop_update(progress, batch_size):
|
||||
yield self._end_background_update(update_name)
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
self.register_background_update_handler(update_name, noop_update)
|
||||
|
||||
|
|
@ -370,7 +370,7 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
logger.info("Adding index %s to %s", index_name, table)
|
||||
yield self.runWithConnection(runner)
|
||||
yield self._end_background_update(update_name)
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
self.register_background_update_handler(update_name, updater)
|
||||
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
|
||||
yield self.runWithConnection(f)
|
||||
yield self._end_background_update("user_ips_drop_nonunique_index")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _analyze_user_ip(self, progress, batch_size):
|
||||
|
|
@ -121,7 +121,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
|
||||
yield self._end_background_update("user_ips_analyze")
|
||||
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _remove_user_ip_dupes(self, progress, batch_size):
|
||||
|
|
@ -291,7 +291,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
if last:
|
||||
yield self._end_background_update("user_ips_remove_dupes")
|
||||
|
||||
defer.returnValue(batch_size)
|
||||
return batch_size
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def insert_client_ip(
|
||||
|
|
@ -401,7 +401,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
"device_id": did,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
|
||||
|
|
@ -461,14 +461,12 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
|
||||
for row in rows
|
||||
)
|
||||
defer.returnValue(
|
||||
list(
|
||||
{
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
|
||||
)
|
||||
return list(
|
||||
{
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
user_id, last_deleted_stream_id
|
||||
)
|
||||
if not has_changed:
|
||||
defer.returnValue(0)
|
||||
return 0
|
||||
|
||||
def delete_messages_for_device_txn(txn):
|
||||
sql = (
|
||||
|
|
@ -115,7 +115,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
last_deleted_stream_id, up_to_stream_id
|
||||
)
|
||||
|
||||
defer.returnValue(count)
|
||||
return count
|
||||
|
||||
def get_new_device_msgs_for_remote(
|
||||
self, destination, last_stream_id, current_stream_id, limit
|
||||
|
|
@ -263,7 +263,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
|||
destination, stream_id
|
||||
)
|
||||
|
||||
defer.returnValue(self._device_inbox_id_gen.get_current_token())
|
||||
return self._device_inbox_id_gen.get_current_token()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_messages_from_remote_to_device_inbox(
|
||||
|
|
@ -312,7 +312,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
|||
for user_id in local_messages_by_user_then_device.keys():
|
||||
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
|
||||
|
||||
defer.returnValue(stream_id)
|
||||
return stream_id
|
||||
|
||||
def _add_messages_to_local_device_inbox_txn(
|
||||
self, txn, stream_id, messages_by_user_then_device
|
||||
|
|
@ -426,4 +426,4 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
|||
|
||||
yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
|
||||
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
desc="get_devices_by_user",
|
||||
)
|
||||
|
||||
defer.returnValue({d["device_id"]: d for d in devices})
|
||||
return {d["device_id"]: d for d in devices}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_devices_by_remote(self, destination, from_stream_id, limit):
|
||||
|
|
@ -88,7 +88,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
destination, int(from_stream_id)
|
||||
)
|
||||
if not has_changed:
|
||||
defer.returnValue((now_stream_id, []))
|
||||
return (now_stream_id, [])
|
||||
|
||||
# We retrieve n+1 devices from the list of outbound pokes where n is
|
||||
# our outbound device update limit. We then check if the very last
|
||||
|
|
@ -111,7 +111,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
|
||||
# Return an empty list if there are no updates
|
||||
if not updates:
|
||||
defer.returnValue((now_stream_id, []))
|
||||
return (now_stream_id, [])
|
||||
|
||||
# if we have exceeded the limit, we need to exclude any results with the
|
||||
# same stream_id as the last row.
|
||||
|
|
@ -147,13 +147,13 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
# skip that stream_id and return an empty list, and continue with the next
|
||||
# stream_id next time.
|
||||
if not query_map:
|
||||
defer.returnValue((stream_id_cutoff, []))
|
||||
return (stream_id_cutoff, [])
|
||||
|
||||
results = yield self._get_device_update_edus_by_remote(
|
||||
destination, from_stream_id, query_map
|
||||
)
|
||||
|
||||
defer.returnValue((now_stream_id, results))
|
||||
return (now_stream_id, results)
|
||||
|
||||
def _get_devices_by_remote_txn(
|
||||
self, txn, destination, from_stream_id, now_stream_id, limit
|
||||
|
|
@ -232,7 +232,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
|
||||
results.append(result)
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
def _get_last_device_update_for_remote_user(
|
||||
self, destination, user_id, from_stream_id
|
||||
|
|
@ -330,7 +330,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
else:
|
||||
results[user_id] = yield self._get_cached_devices_for_user(user_id)
|
||||
|
||||
defer.returnValue((user_ids_not_in_cache, results))
|
||||
return (user_ids_not_in_cache, results)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, tree=True)
|
||||
def _get_cached_user_device(self, user_id, device_id):
|
||||
|
|
@ -340,7 +340,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
retcol="content",
|
||||
desc="_get_cached_user_device",
|
||||
)
|
||||
defer.returnValue(db_to_json(content))
|
||||
return db_to_json(content)
|
||||
|
||||
@cachedInlineCallbacks()
|
||||
def _get_cached_devices_for_user(self, user_id):
|
||||
|
|
@ -350,9 +350,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
retcols=("device_id", "content"),
|
||||
desc="_get_cached_devices_for_user",
|
||||
)
|
||||
defer.returnValue(
|
||||
{device["device_id"]: db_to_json(device["content"]) for device in devices}
|
||||
)
|
||||
return {
|
||||
device["device_id"]: db_to_json(device["content"]) for device in devices
|
||||
}
|
||||
|
||||
def get_devices_with_keys_by_user(self, user_id):
|
||||
"""Get all devices (with any device keys) for a user
|
||||
|
|
@ -482,7 +482,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
results = {user_id: None for user_id in user_ids}
|
||||
results.update({row["user_id"]: row["stream_id"] for row in rows})
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
|
||||
class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
||||
|
|
@ -543,7 +543,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
"""
|
||||
key = (user_id, device_id)
|
||||
if self.device_id_exists_cache.get(key, None):
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
try:
|
||||
inserted = yield self._simple_insert(
|
||||
|
|
@ -557,7 +557,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
or_ignore=True,
|
||||
)
|
||||
self.device_id_exists_cache.prefill(key, True)
|
||||
defer.returnValue(inserted)
|
||||
return inserted
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"store_device with device_id=%s(%r) user_id=%s(%r)"
|
||||
|
|
@ -780,7 +780,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
hosts,
|
||||
stream_id,
|
||||
)
|
||||
defer.returnValue(stream_id)
|
||||
return stream_id
|
||||
|
||||
def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
|
||||
now = self._clock.time_msec()
|
||||
|
|
@ -889,4 +889,4 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
|
||||
yield self.runWithConnection(f)
|
||||
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ class DirectoryWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
if not room_id:
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
return
|
||||
|
||||
servers = yield self._simple_select_onecol(
|
||||
|
|
@ -57,10 +57,10 @@ class DirectoryWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
if not servers:
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
return
|
||||
|
||||
defer.returnValue(RoomAliasMapping(room_id, room_alias.to_string(), servers))
|
||||
return RoomAliasMapping(room_id, room_alias.to_string(), servers)
|
||||
|
||||
def get_room_alias_creator(self, room_alias):
|
||||
return self._simple_select_one_onecol(
|
||||
|
|
@ -125,7 +125,7 @@ class DirectoryStore(DirectoryWorkerStore):
|
|||
raise SynapseError(
|
||||
409, "Room alias %s already exists" % room_alias.to_string()
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_room_alias(self, room_alias):
|
||||
|
|
@ -133,7 +133,7 @@ class DirectoryStore(DirectoryWorkerStore):
|
|||
"delete_room_alias", self._delete_room_alias_txn, room_alias
|
||||
)
|
||||
|
||||
defer.returnValue(room_id)
|
||||
return room_id
|
||||
|
||||
def _delete_room_alias_txn(self, txn, room_alias):
|
||||
txn.execute(
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
|
||||
row["session_data"] = json.loads(row["session_data"])
|
||||
|
||||
defer.returnValue(row)
|
||||
return row
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
|
||||
|
|
@ -118,7 +118,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
try:
|
||||
version = int(version)
|
||||
except ValueError:
|
||||
defer.returnValue({"rooms": {}})
|
||||
return {"rooms": {}}
|
||||
|
||||
keyvalues = {"user_id": user_id, "version": version}
|
||||
if room_id:
|
||||
|
|
@ -151,7 +151,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
"session_data": json.loads(row["session_data"]),
|
||||
}
|
||||
|
||||
defer.returnValue(sessions)
|
||||
return sessions
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
dict containing "key_json", "device_display_name".
|
||||
"""
|
||||
if not query_list:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
results = yield self.runInteraction(
|
||||
"get_e2e_device_keys",
|
||||
|
|
@ -55,7 +55,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
for device_id, device_info in iteritems(device_keys):
|
||||
device_info["keys"] = db_to_json(device_info.pop("key_json"))
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
def _get_e2e_device_keys_txn(
|
||||
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
|
||||
|
|
@ -130,9 +130,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
desc="add_e2e_one_time_keys_check",
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
{(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
)
|
||||
return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
|
||||
|
|
|
|||
|
|
@ -131,9 +131,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
)
|
||||
|
||||
if not rows:
|
||||
defer.returnValue(0)
|
||||
return 0
|
||||
else:
|
||||
defer.returnValue(max(row["depth"] for row in rows))
|
||||
return max(row["depth"] for row in rows)
|
||||
|
||||
def _get_oldest_events_in_room_txn(self, txn, room_id):
|
||||
return self._simple_select_onecol_txn(
|
||||
|
|
@ -169,7 +169,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
# make sure that we don't completely ignore the older events.
|
||||
res = res[0:5] + random.sample(res[5:], 5)
|
||||
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
def get_latest_event_ids_and_hashes_in_room(self, room_id):
|
||||
"""
|
||||
|
|
@ -411,7 +411,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
limit,
|
||||
)
|
||||
events = yield self.get_events_as_list(ids)
|
||||
defer.returnValue(events)
|
||||
return events
|
||||
|
||||
def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit):
|
||||
|
||||
|
|
@ -463,7 +463,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
desc="get_successor_events",
|
||||
)
|
||||
|
||||
defer.returnValue([row["event_id"] for row in rows])
|
||||
return [row["event_id"] for row in rows]
|
||||
|
||||
|
||||
class EventFederationStore(EventFederationWorkerStore):
|
||||
|
|
@ -654,4 +654,4 @@ class EventFederationStore(EventFederationWorkerStore):
|
|||
if not result:
|
||||
yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY)
|
||||
|
||||
defer.returnValue(batch_size)
|
||||
return batch_size
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
user_id,
|
||||
last_read_event_id,
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
def _get_unread_counts_by_receipt_txn(
|
||||
self, txn, room_id, user_id, last_read_event_id
|
||||
|
|
@ -178,7 +178,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
return [r[0] for r in txn]
|
||||
|
||||
ret = yield self.runInteraction("get_push_action_users_in_range", f)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_unread_push_actions_for_user_in_range_for_http(
|
||||
|
|
@ -279,7 +279,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
|
||||
# Take only up to the limit. We have to stop at the limit because
|
||||
# one of the subqueries may have hit the limit.
|
||||
defer.returnValue(notifs[:limit])
|
||||
return notifs[:limit]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_unread_push_actions_for_user_in_range_for_email(
|
||||
|
|
@ -380,7 +380,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
notifs.sort(key=lambda r: -(r["received_ts"] or 0))
|
||||
|
||||
# Now return the first `limit`
|
||||
defer.returnValue(notifs[:limit])
|
||||
return notifs[:limit]
|
||||
|
||||
def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
|
||||
"""A fast check to see if there might be something to push for the
|
||||
|
|
@ -477,7 +477,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
keyvalues={"event_id": event_id},
|
||||
desc="remove_push_actions_from_staging",
|
||||
)
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
except Exception:
|
||||
# this method is called from an exception handler, so propagating
|
||||
# another exception here really isn't helpful - there's nothing
|
||||
|
|
@ -732,7 +732,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
|||
push_actions = yield self.runInteraction("get_push_actions_for_user", f)
|
||||
for pa in push_actions:
|
||||
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
|
||||
defer.returnValue(push_actions)
|
||||
return push_actions
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_time_of_last_push_action_before(self, stream_ordering):
|
||||
|
|
@ -749,7 +749,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
|||
return txn.fetchone()
|
||||
|
||||
result = yield self.runInteraction("get_time_of_last_push_action_before", f)
|
||||
defer.returnValue(result[0] if result else None)
|
||||
return result[0] if result else None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_latest_push_action_stream_ordering(self):
|
||||
|
|
@ -758,7 +758,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
|||
return txn.fetchone()
|
||||
|
||||
result = yield self.runInteraction("get_latest_push_action_stream_ordering", f)
|
||||
defer.returnValue(result[0] or 0)
|
||||
return result[0] or 0
|
||||
|
||||
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
|
||||
# Sad that we have to blow away the cache for the whole room here
|
||||
|
|
|
|||
|
|
@ -223,7 +223,7 @@ def _retry_on_integrity_error(func):
|
|||
except self.database_engine.module.IntegrityError:
|
||||
logger.exception("IntegrityError, retrying.")
|
||||
res = yield func(self, *args, delete_existing=True, **kwargs)
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
return f
|
||||
|
||||
|
|
@ -309,7 +309,7 @@ class EventsStore(
|
|||
|
||||
max_persisted_id = yield self._stream_id_gen.get_current_token()
|
||||
|
||||
defer.returnValue(max_persisted_id)
|
||||
return max_persisted_id
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
|
@ -334,7 +334,7 @@ class EventsStore(
|
|||
yield make_deferred_yieldable(deferred)
|
||||
|
||||
max_persisted_id = yield self._stream_id_gen.get_current_token()
|
||||
defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
|
||||
return (event.internal_metadata.stream_ordering, max_persisted_id)
|
||||
|
||||
def _maybe_start_persisting(self, room_id):
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -595,7 +595,7 @@ class EventsStore(
|
|||
stale = latest_event_ids & result
|
||||
stale_forward_extremities_counter.observe(len(stale))
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_events_which_are_prevs(self, event_ids):
|
||||
|
|
@ -633,7 +633,7 @@ class EventsStore(
|
|||
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
|
||||
)
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_prevs_before_rejected(self, event_ids):
|
||||
|
|
@ -695,7 +695,7 @@ class EventsStore(
|
|||
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
|
||||
)
|
||||
|
||||
defer.returnValue(existing_prevs)
|
||||
return existing_prevs
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_state_after_events(
|
||||
|
|
@ -796,7 +796,7 @@ class EventsStore(
|
|||
# If they old and new groups are the same then we don't need to do
|
||||
# anything.
|
||||
if old_state_groups == new_state_groups:
|
||||
defer.returnValue((None, None))
|
||||
return (None, None)
|
||||
|
||||
if len(new_state_groups) == 1 and len(old_state_groups) == 1:
|
||||
# If we're going from one state group to another, lets check if
|
||||
|
|
@ -813,7 +813,7 @@ class EventsStore(
|
|||
# the current state in memory then lets also return that,
|
||||
# but it doesn't matter if we don't.
|
||||
new_state = state_groups_map.get(new_state_group)
|
||||
defer.returnValue((new_state, delta_ids))
|
||||
return (new_state, delta_ids)
|
||||
|
||||
# Now that we have calculated new_state_groups we need to get
|
||||
# their state IDs so we can resolve to a single state set.
|
||||
|
|
@ -825,7 +825,7 @@ class EventsStore(
|
|||
if len(new_state_groups) == 1:
|
||||
# If there is only one state group, then we know what the current
|
||||
# state is.
|
||||
defer.returnValue((state_groups_map[new_state_groups.pop()], None))
|
||||
return (state_groups_map[new_state_groups.pop()], None)
|
||||
|
||||
# Ok, we need to defer to the state handler to resolve our state sets.
|
||||
|
||||
|
|
@ -854,7 +854,7 @@ class EventsStore(
|
|||
state_res_store=StateResolutionStore(self),
|
||||
)
|
||||
|
||||
defer.returnValue((res.state, None))
|
||||
return (res.state, None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_state_delta(self, room_id, current_state):
|
||||
|
|
@ -877,7 +877,7 @@ class EventsStore(
|
|||
if ev_id != existing_state.get(key)
|
||||
}
|
||||
|
||||
defer.returnValue((to_delete, to_insert))
|
||||
return (to_delete, to_insert)
|
||||
|
||||
@log_function
|
||||
def _persist_events_txn(
|
||||
|
|
@ -1564,7 +1564,7 @@ class EventsStore(
|
|||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_messages", _count_messages)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_sent_messages(self):
|
||||
|
|
@ -1585,7 +1585,7 @@ class EventsStore(
|
|||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_active_rooms(self):
|
||||
|
|
@ -1600,7 +1600,7 @@ class EventsStore(
|
|||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_daily_active_rooms", _count)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
def get_current_backfill_token(self):
|
||||
"""The current minimum token that backfilled events have reached"""
|
||||
|
|
@ -2183,7 +2183,7 @@ class EventsStore(
|
|||
"""
|
||||
to_1, so_1 = yield self._get_event_ordering(event_id1)
|
||||
to_2, so_2 = yield self._get_event_ordering(event_id2)
|
||||
defer.returnValue((to_1, so_1) > (to_2, so_2))
|
||||
return (to_1, so_1) > (to_2, so_2)
|
||||
|
||||
@cachedInlineCallbacks(max_entries=5000)
|
||||
def _get_event_ordering(self, event_id):
|
||||
|
|
@ -2197,9 +2197,7 @@ class EventsStore(
|
|||
if not res:
|
||||
raise SynapseError(404, "Could not find event %s" % (event_id,))
|
||||
|
||||
defer.returnValue(
|
||||
(int(res["topological_ordering"]), int(res["stream_ordering"]))
|
||||
)
|
||||
return (int(res["topological_ordering"]), int(res["stream_ordering"]))
|
||||
|
||||
def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
|
||||
def get_all_updated_current_state_deltas_txn(txn):
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
|||
if not result:
|
||||
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_origin_server_ts(self, progress, batch_size):
|
||||
|
|
@ -212,7 +212,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
|||
if not result:
|
||||
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _cleanup_extremities_bg_update(self, progress, batch_size):
|
||||
|
|
@ -396,4 +396,4 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
|||
"_cleanup_extremities_bg_update_drop_table", _drop_table_txn
|
||||
)
|
||||
|
||||
defer.returnValue(num_handled)
|
||||
return num_handled
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
if event is None and not allow_none:
|
||||
raise NotFoundError("Could not find event %s" % (event_id,))
|
||||
|
||||
defer.returnValue(event)
|
||||
return event
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_events(
|
||||
|
|
@ -187,7 +187,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
defer.returnValue({e.event_id: e for e in events})
|
||||
return {e.event_id: e for e in events}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_events_as_list(
|
||||
|
|
@ -217,7 +217,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
if not event_ids:
|
||||
defer.returnValue([])
|
||||
return []
|
||||
|
||||
# there may be duplicates so we cast the list to a set
|
||||
event_entry_map = yield self._get_events_from_cache_or_db(
|
||||
|
|
@ -305,7 +305,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
event.unsigned["prev_content"] = prev.content
|
||||
event.unsigned["prev_sender"] = prev.sender
|
||||
|
||||
defer.returnValue(events)
|
||||
return events
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
|
||||
|
|
@ -452,7 +452,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
without having to create a new transaction for each request for events.
|
||||
"""
|
||||
if not events:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
events_d = defer.Deferred()
|
||||
with self._event_fetch_lock:
|
||||
|
|
@ -496,7 +496,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
)
|
||||
)
|
||||
|
||||
defer.returnValue({e.event.event_id: e for e in res if e})
|
||||
return {e.event.event_id: e for e in res if e}
|
||||
|
||||
def _fetch_event_rows(self, txn, event_ids):
|
||||
"""Fetch event rows from the database
|
||||
|
|
@ -609,7 +609,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
|
||||
|
||||
defer.returnValue(cache_entry)
|
||||
return cache_entry
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _maybe_redact_event_row(self, original_ev, redactions):
|
||||
|
|
@ -679,7 +679,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
desc="have_events_in_timeline",
|
||||
)
|
||||
|
||||
defer.returnValue(set(r["event_id"] for r in rows))
|
||||
return set(r["event_id"] for r in rows)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def have_seen_events(self, event_ids):
|
||||
|
|
@ -705,7 +705,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
input_iterator = iter(event_ids)
|
||||
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), []):
|
||||
yield self.runInteraction("have_seen_events", have_seen_events_txn, chunk)
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
def get_seen_events_with_rejections(self, event_ids):
|
||||
"""Given a list of event ids, check if we rejected them.
|
||||
|
|
@ -816,4 +816,4 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
# it.
|
||||
complexity_v1 = round(state_events / 500, 2)
|
||||
|
||||
defer.returnValue({"v1": complexity_v1})
|
||||
return {"v1": complexity_v1}
|
||||
|
|
|
|||
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
|
||||
|
|
@ -41,7 +39,7 @@ class FilteringStore(SQLBaseStore):
|
|||
desc="get_user_filter",
|
||||
)
|
||||
|
||||
defer.returnValue(db_to_json(def_json))
|
||||
return db_to_json(def_json)
|
||||
|
||||
def add_user_filter(self, user_localpart, user_filter):
|
||||
def_json = encode_canonical_json(user_filter)
|
||||
|
|
|
|||
|
|
@ -307,15 +307,13 @@ class GroupServerStore(SQLBaseStore):
|
|||
desc="get_group_categories",
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
{
|
||||
row["category_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
for row in rows
|
||||
return {
|
||||
row["category_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_category(self, group_id, category_id):
|
||||
|
|
@ -328,7 +326,7 @@ class GroupServerStore(SQLBaseStore):
|
|||
|
||||
category["profile"] = json.loads(category["profile"])
|
||||
|
||||
defer.returnValue(category)
|
||||
return category
|
||||
|
||||
def upsert_group_category(self, group_id, category_id, profile, is_public):
|
||||
"""Add/update room category for group
|
||||
|
|
@ -370,15 +368,13 @@ class GroupServerStore(SQLBaseStore):
|
|||
desc="get_group_roles",
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
{
|
||||
row["role_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
for row in rows
|
||||
return {
|
||||
row["role_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_role(self, group_id, role_id):
|
||||
|
|
@ -391,7 +387,7 @@ class GroupServerStore(SQLBaseStore):
|
|||
|
||||
role["profile"] = json.loads(role["profile"])
|
||||
|
||||
defer.returnValue(role)
|
||||
return role
|
||||
|
||||
def upsert_group_role(self, group_id, role_id, profile, is_public):
|
||||
"""Add/remove user role
|
||||
|
|
@ -960,7 +956,7 @@ class GroupServerStore(SQLBaseStore):
|
|||
_register_user_group_membership_txn,
|
||||
next_id,
|
||||
)
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def create_group(
|
||||
|
|
@ -1057,9 +1053,9 @@ class GroupServerStore(SQLBaseStore):
|
|||
|
||||
now = int(self._clock.time_msec())
|
||||
if row and now < row["valid_until_ms"]:
|
||||
defer.returnValue(json.loads(row["attestation_json"]))
|
||||
return json.loads(row["attestation_json"])
|
||||
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
def get_joined_groups(self, user_id):
|
||||
return self._simple_select_onecol(
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
)
|
||||
if user_id:
|
||||
count = count + 1
|
||||
defer.returnValue(count)
|
||||
return count
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def upsert_monthly_active_user(self, user_id):
|
||||
|
|
|
|||
|
|
@ -90,9 +90,7 @@ class PresenceStore(SQLBaseStore):
|
|||
presence_states,
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
(stream_orderings[-1], self._presence_id_gen.get_current_token())
|
||||
)
|
||||
return (stream_orderings[-1], self._presence_id_gen.get_current_token())
|
||||
|
||||
def _update_presence_txn(self, txn, stream_orderings, presence_states):
|
||||
for stream_id, state in zip(stream_orderings, presence_states):
|
||||
|
|
@ -180,7 +178,7 @@ class PresenceStore(SQLBaseStore):
|
|||
for row in rows:
|
||||
row["currently_active"] = bool(row["currently_active"])
|
||||
|
||||
defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
|
||||
return {row["user_id"]: UserPresenceState(**row) for row in rows}
|
||||
|
||||
def get_current_presence_token(self):
|
||||
return self._presence_id_gen.get_current_token()
|
||||
|
|
|
|||
|
|
@ -34,15 +34,13 @@ class ProfileWorkerStore(SQLBaseStore):
|
|||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
# no match
|
||||
defer.returnValue(ProfileInfo(None, None))
|
||||
return ProfileInfo(None, None)
|
||||
return
|
||||
else:
|
||||
raise
|
||||
|
||||
defer.returnValue(
|
||||
ProfileInfo(
|
||||
avatar_url=profile["avatar_url"], display_name=profile["displayname"]
|
||||
)
|
||||
return ProfileInfo(
|
||||
avatar_url=profile["avatar_url"], display_name=profile["displayname"]
|
||||
)
|
||||
|
||||
def get_profile_displayname(self, user_localpart):
|
||||
|
|
@ -168,7 +166,7 @@ class ProfileStore(ProfileWorkerStore):
|
|||
)
|
||||
|
||||
if res:
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
res = yield self._simple_select_one_onecol(
|
||||
table="group_invites",
|
||||
|
|
@ -179,4 +177,4 @@ class ProfileStore(ProfileWorkerStore):
|
|||
)
|
||||
|
||||
if res:
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class PushRulesWorkerStore(
|
|||
|
||||
rules = _load_rules(rows, enabled_map)
|
||||
|
||||
defer.returnValue(rules)
|
||||
return rules
|
||||
|
||||
@cachedInlineCallbacks(max_entries=5000)
|
||||
def get_push_rules_enabled_for_user(self, user_id):
|
||||
|
|
@ -130,9 +130,7 @@ class PushRulesWorkerStore(
|
|||
retcols=("user_name", "rule_id", "enabled"),
|
||||
desc="get_push_rules_enabled_for_user",
|
||||
)
|
||||
defer.returnValue(
|
||||
{r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
|
||||
)
|
||||
return {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
|
||||
|
||||
def have_push_rules_changed_for_user(self, user_id, last_id):
|
||||
if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
|
||||
|
|
@ -160,7 +158,7 @@ class PushRulesWorkerStore(
|
|||
)
|
||||
def bulk_get_push_rules(self, user_ids):
|
||||
if not user_ids:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
results = {user_id: [] for user_id in user_ids}
|
||||
|
||||
|
|
@ -182,7 +180,7 @@ class PushRulesWorkerStore(
|
|||
for user_id, rules in results.items():
|
||||
results[user_id] = _load_rules(rules, enabled_map_by_user.get(user_id, {}))
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def move_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
|
||||
|
|
@ -253,7 +251,7 @@ class PushRulesWorkerStore(
|
|||
result = yield self._bulk_get_push_rules_for_room(
|
||||
event.room_id, state_group, current_state_ids, event=event
|
||||
)
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, cache_context=True)
|
||||
def _bulk_get_push_rules_for_room(
|
||||
|
|
@ -312,7 +310,7 @@ class PushRulesWorkerStore(
|
|||
|
||||
rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
|
||||
|
||||
defer.returnValue(rules_by_user)
|
||||
return rules_by_user
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="get_push_rules_enabled_for_user",
|
||||
|
|
@ -322,7 +320,7 @@ class PushRulesWorkerStore(
|
|||
)
|
||||
def bulk_get_push_rules_enabled(self, user_ids):
|
||||
if not user_ids:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
results = {user_id: {} for user_id in user_ids}
|
||||
|
||||
|
|
@ -336,7 +334,7 @@ class PushRulesWorkerStore(
|
|||
for row in rows:
|
||||
enabled = bool(row["enabled"])
|
||||
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
|
||||
class PushRuleStore(PushRulesWorkerStore):
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ class PusherWorkerStore(SQLBaseStore):
|
|||
ret = yield self._simple_select_one_onecol(
|
||||
"pushers", {"user_name": user_id}, "id", allow_none=True
|
||||
)
|
||||
defer.returnValue(ret is not None)
|
||||
return ret is not None
|
||||
|
||||
def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
|
||||
return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
|
||||
|
|
@ -95,7 +95,7 @@ class PusherWorkerStore(SQLBaseStore):
|
|||
],
|
||||
desc="get_pushers_by",
|
||||
)
|
||||
defer.returnValue(self._decode_pushers_rows(ret))
|
||||
return self._decode_pushers_rows(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_all_pushers(self):
|
||||
|
|
@ -106,7 +106,7 @@ class PusherWorkerStore(SQLBaseStore):
|
|||
return self._decode_pushers_rows(rows)
|
||||
|
||||
rows = yield self.runInteraction("get_all_pushers", get_pushers)
|
||||
defer.returnValue(rows)
|
||||
return rows
|
||||
|
||||
def get_all_updated_pushers(self, last_id, current_id, limit):
|
||||
if last_id == current_id:
|
||||
|
|
@ -205,7 +205,7 @@ class PusherWorkerStore(SQLBaseStore):
|
|||
result = {user_id: False for user_id in user_ids}
|
||||
result.update({r["user_name"]: True for r in rows})
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
|
||||
class PusherStore(PusherWorkerStore):
|
||||
|
|
@ -343,7 +343,7 @@ class PusherStore(PusherWorkerStore):
|
|||
"throttle_ms": row["throttle_ms"],
|
||||
}
|
||||
|
||||
defer.returnValue(params_by_room)
|
||||
return params_by_room
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_throttle_params(self, pusher_id, room_id, params):
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
@cachedInlineCallbacks()
|
||||
def get_users_with_read_receipts_in_room(self, room_id):
|
||||
receipts = yield self.get_receipts_for_room(room_id, "m.read")
|
||||
defer.returnValue(set(r["user_id"] for r in receipts))
|
||||
return set(r["user_id"] for r in receipts)
|
||||
|
||||
@cached(num_args=2)
|
||||
def get_receipts_for_room(self, room_id, receipt_type):
|
||||
|
|
@ -92,7 +92,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
desc="get_receipts_for_user",
|
||||
)
|
||||
|
||||
defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
|
||||
return {row["room_id"]: row["event_id"] for row in rows}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_receipts_for_user_with_orderings(self, user_id, receipt_type):
|
||||
|
|
@ -110,16 +110,14 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
return txn.fetchall()
|
||||
|
||||
rows = yield self.runInteraction("get_receipts_for_user_with_orderings", f)
|
||||
defer.returnValue(
|
||||
{
|
||||
row[0]: {
|
||||
"event_id": row[1],
|
||||
"topological_ordering": row[2],
|
||||
"stream_ordering": row[3],
|
||||
}
|
||||
for row in rows
|
||||
return {
|
||||
row[0]: {
|
||||
"event_id": row[1],
|
||||
"topological_ordering": row[2],
|
||||
"stream_ordering": row[3],
|
||||
}
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
|
||||
|
|
@ -147,7 +145,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
room_ids, to_key, from_key=from_key
|
||||
)
|
||||
|
||||
defer.returnValue([ev for res in results.values() for ev in res])
|
||||
return [ev for res in results.values() for ev in res]
|
||||
|
||||
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
|
||||
"""Get receipts for a single room for sending to clients.
|
||||
|
|
@ -197,7 +195,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
rows = yield self.runInteraction("get_linearized_receipts_for_room", f)
|
||||
|
||||
if not rows:
|
||||
defer.returnValue([])
|
||||
return []
|
||||
|
||||
content = {}
|
||||
for row in rows:
|
||||
|
|
@ -205,9 +203,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
row["user_id"]
|
||||
] = json.loads(row["data"])
|
||||
|
||||
defer.returnValue(
|
||||
[{"type": "m.receipt", "room_id": room_id, "content": content}]
|
||||
)
|
||||
return [{"type": "m.receipt", "room_id": room_id, "content": content}]
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="_get_linearized_receipts_for_room",
|
||||
|
|
@ -217,7 +213,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
)
|
||||
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
|
||||
if not room_ids:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
def f(txn):
|
||||
if from_key:
|
||||
|
|
@ -264,7 +260,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
room_id: [results[room_id]] if room_id in results else []
|
||||
for room_id in room_ids
|
||||
}
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
def get_all_updated_receipts(self, last_id, current_id, limit=None):
|
||||
if last_id == current_id:
|
||||
|
|
@ -468,7 +464,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
|||
)
|
||||
|
||||
if event_ts is None:
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
now = self._clock.time_msec()
|
||||
logger.debug(
|
||||
|
|
@ -482,7 +478,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
|||
|
||||
max_persisted_id = self._receipts_id_gen.get_current_token()
|
||||
|
||||
defer.returnValue((stream_id, max_persisted_id))
|
||||
return (stream_id, max_persisted_id)
|
||||
|
||||
def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, data):
|
||||
return self.runInteraction(
|
||||
|
|
|
|||
|
|
@ -75,12 +75,12 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
|
||||
info = yield self.get_user_by_id(user_id)
|
||||
if not info:
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
now = self.clock.time_msec()
|
||||
trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000
|
||||
is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
|
||||
defer.returnValue(is_trial)
|
||||
return is_trial
|
||||
|
||||
@cached()
|
||||
def get_user_by_access_token(self, token):
|
||||
|
|
@ -115,7 +115,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
allow_none=True,
|
||||
desc="get_expiration_ts_for_user",
|
||||
)
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_account_validity_for_user(
|
||||
|
|
@ -190,7 +190,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
desc="get_user_from_renewal_token",
|
||||
)
|
||||
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_renewal_token_for_user(self, user_id):
|
||||
|
|
@ -209,7 +209,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
desc="get_renewal_token_for_user",
|
||||
)
|
||||
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_expiring_soon(self):
|
||||
|
|
@ -237,7 +237,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
self.config.account_validity.renew_at,
|
||||
)
|
||||
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_renewal_mail_status(self, user_id, email_sent):
|
||||
|
|
@ -280,7 +280,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
desc="is_server_admin",
|
||||
)
|
||||
|
||||
defer.returnValue(res if res else False)
|
||||
return res if res else False
|
||||
|
||||
def _query_for_auth(self, txn, token):
|
||||
sql = (
|
||||
|
|
@ -311,7 +311,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
res = yield self.runInteraction(
|
||||
"is_support_user", self.is_support_user_txn, user_id
|
||||
)
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
def is_support_user_txn(self, txn, user_id):
|
||||
res = self._simple_select_one_onecol_txn(
|
||||
|
|
@ -349,7 +349,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
return 0
|
||||
|
||||
ret = yield self.runInteraction("count_users", _count_users)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
def count_daily_user_type(self):
|
||||
"""
|
||||
|
|
@ -395,7 +395,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
return count
|
||||
|
||||
ret = yield self.runInteraction("count_users", _count_users)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def find_next_generated_user_id_localpart(self):
|
||||
|
|
@ -425,7 +425,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
if i not in found:
|
||||
return i
|
||||
|
||||
defer.returnValue(
|
||||
return (
|
||||
(
|
||||
yield self.runInteraction(
|
||||
"find_next_generated_user_id", _find_next_generated_user_id
|
||||
|
|
@ -447,7 +447,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
user_id = yield self.runInteraction(
|
||||
"get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address
|
||||
)
|
||||
defer.returnValue(user_id)
|
||||
return user_id
|
||||
|
||||
def get_user_id_by_threepid_txn(self, txn, medium, address):
|
||||
"""Returns user id from threepid
|
||||
|
|
@ -487,7 +487,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
["medium", "address", "validated_at", "added_at"],
|
||||
"user_get_threepids",
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
def user_delete_threepid(self, user_id, medium, address):
|
||||
return self._simple_delete(
|
||||
|
|
@ -677,7 +677,7 @@ class RegistrationStore(
|
|||
if end:
|
||||
yield self._end_background_update("users_set_deactivated_flag")
|
||||
|
||||
defer.returnValue(batch_size)
|
||||
return batch_size
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms):
|
||||
|
|
@ -957,7 +957,7 @@ class RegistrationStore(
|
|||
desc="is_guest",
|
||||
)
|
||||
|
||||
defer.returnValue(res if res else False)
|
||||
return res if res else False
|
||||
|
||||
def add_user_pending_deactivation(self, user_id):
|
||||
"""
|
||||
|
|
@ -1024,7 +1024,7 @@ class RegistrationStore(
|
|||
|
||||
yield self._end_background_update("user_threepids_grandfather")
|
||||
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
def get_threepid_validation_session(
|
||||
self, medium, client_secret, address=None, sid=None, validated=True
|
||||
|
|
@ -1337,4 +1337,4 @@ class RegistrationStore(
|
|||
)
|
||||
|
||||
# Convert the integer into a boolean.
|
||||
defer.returnValue(res == 1)
|
||||
return res == 1
|
||||
|
|
|
|||
|
|
@ -17,8 +17,6 @@ import logging
|
|||
|
||||
import attr
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import RelationTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
|
@ -363,7 +361,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
|||
return
|
||||
|
||||
edit_event = yield self.get_event(edit_id, allow_none=True)
|
||||
defer.returnValue(edit_event)
|
||||
return edit_event
|
||||
|
||||
def has_user_annotated_event(self, parent_id, event_type, aggregation_key, sender):
|
||||
"""Check if a user has already annotated an event with the same key
|
||||
|
|
|
|||
|
|
@ -193,14 +193,12 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
if row:
|
||||
defer.returnValue(
|
||||
RatelimitOverride(
|
||||
messages_per_second=row["messages_per_second"],
|
||||
burst_count=row["burst_count"],
|
||||
)
|
||||
return RatelimitOverride(
|
||||
messages_per_second=row["messages_per_second"],
|
||||
burst_count=row["burst_count"],
|
||||
)
|
||||
else:
|
||||
defer.returnValue(None)
|
||||
return None
|
||||
|
||||
|
||||
class RoomStore(RoomWorkerStore, SearchStore):
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
room_id, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
|
||||
defer.returnValue(hosts)
|
||||
return hosts
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
def get_users_in_room(self, room_id):
|
||||
|
|
@ -253,8 +253,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
invites = yield self.get_invited_rooms_for_user(user_id)
|
||||
for invite in invites:
|
||||
if invite.room_id == room_id:
|
||||
defer.returnValue(invite)
|
||||
defer.returnValue(None)
|
||||
return invite
|
||||
return None
|
||||
|
||||
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
|
||||
""" Get all the rooms for this user where the membership for this user
|
||||
|
|
@ -347,11 +347,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
rooms = yield self.get_rooms_for_user_where_membership_is(
|
||||
user_id, membership_list=[Membership.JOIN]
|
||||
)
|
||||
defer.returnValue(
|
||||
frozenset(
|
||||
GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
|
||||
for r in rooms
|
||||
)
|
||||
return frozenset(
|
||||
GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
|
||||
for r in rooms
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -361,7 +359,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
rooms = yield self.get_rooms_for_user_with_stream_ordering(
|
||||
user_id, on_invalidate=on_invalidate
|
||||
)
|
||||
defer.returnValue(frozenset(r.room_id for r in rooms))
|
||||
return frozenset(r.room_id for r in rooms)
|
||||
|
||||
@cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
|
||||
def get_users_who_share_room_with_user(self, user_id, cache_context):
|
||||
|
|
@ -378,7 +376,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
)
|
||||
user_who_share_room.update(user_ids)
|
||||
|
||||
defer.returnValue(user_who_share_room)
|
||||
return user_who_share_room
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_joined_users_from_context(self, event, context):
|
||||
|
|
@ -394,7 +392,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
result = yield self._get_joined_users_from_context(
|
||||
event.room_id, state_group, current_state_ids, event=event, context=context
|
||||
)
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
def get_joined_users_from_state(self, room_id, state_entry):
|
||||
state_group = state_entry.state_group
|
||||
|
|
@ -508,7 +506,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
avatar_url=to_ascii(event.content.get("avatar_url", None)),
|
||||
)
|
||||
|
||||
defer.returnValue(users_in_room)
|
||||
return users_in_room
|
||||
|
||||
@cachedInlineCallbacks(max_entries=10000)
|
||||
def is_host_joined(self, room_id, host):
|
||||
|
|
@ -533,14 +531,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause)
|
||||
|
||||
if not rows:
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
user_id = rows[0][0]
|
||||
if get_domain_from_id(user_id) != host:
|
||||
# This can only happen if the host name has something funky in it
|
||||
raise Exception("Invalid host name")
|
||||
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
@cachedInlineCallbacks()
|
||||
def was_host_joined(self, room_id, host):
|
||||
|
|
@ -573,14 +571,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause)
|
||||
|
||||
if not rows:
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
user_id = rows[0][0]
|
||||
if get_domain_from_id(user_id) != host:
|
||||
# This can only happen if the host name has something funky in it
|
||||
raise Exception("Invalid host name")
|
||||
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
def get_joined_hosts(self, room_id, state_entry):
|
||||
state_group = state_entry.state_group
|
||||
|
|
@ -607,7 +605,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
cache = self._get_joined_hosts_cache(room_id)
|
||||
joined_hosts = yield cache.get_destinations(state_entry)
|
||||
|
||||
defer.returnValue(joined_hosts)
|
||||
return joined_hosts
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def _get_joined_hosts_cache(self, room_id):
|
||||
|
|
@ -637,7 +635,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
return rows[0][0]
|
||||
|
||||
count = yield self.runInteraction("did_forget_membership", f)
|
||||
defer.returnValue(count == 0)
|
||||
return count == 0
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_rooms_user_has_been_in(self, user_id):
|
||||
|
|
@ -847,7 +845,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
|||
if not result:
|
||||
yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_current_state_membership(self, progress, batch_size):
|
||||
|
|
@ -905,7 +903,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
|||
if finished:
|
||||
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(row_count)
|
||||
return row_count
|
||||
|
||||
|
||||
class _JoinedHostsCache(object):
|
||||
|
|
@ -933,7 +931,7 @@ class _JoinedHostsCache(object):
|
|||
state_entry(synapse.state._StateCacheEntry)
|
||||
"""
|
||||
if state_entry.state_group == self.state_group:
|
||||
defer.returnValue(frozenset(self.hosts_to_joined_users))
|
||||
return frozenset(self.hosts_to_joined_users)
|
||||
|
||||
with (yield self.linearizer.queue(())):
|
||||
if state_entry.state_group == self.state_group:
|
||||
|
|
@ -970,7 +968,7 @@ class _JoinedHostsCache(object):
|
|||
else:
|
||||
self.state_group = object()
|
||||
self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
|
||||
defer.returnValue(frozenset(self.hosts_to_joined_users))
|
||||
return frozenset(self.hosts_to_joined_users)
|
||||
|
||||
def __len__(self):
|
||||
return self._len
|
||||
|
|
|
|||
|
|
@ -166,7 +166,7 @@ class SearchStore(BackgroundUpdateStore):
|
|||
if not result:
|
||||
yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_gin_search(self, progress, batch_size):
|
||||
|
|
@ -209,7 +209,7 @@ class SearchStore(BackgroundUpdateStore):
|
|||
yield self.runWithConnection(create_index)
|
||||
|
||||
yield self._end_background_update(self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME)
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_search_order(self, progress, batch_size):
|
||||
|
|
@ -287,7 +287,7 @@ class SearchStore(BackgroundUpdateStore):
|
|||
if not finished:
|
||||
yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(num_rows)
|
||||
return num_rows
|
||||
|
||||
def store_event_search_txn(self, txn, event, key, value):
|
||||
"""Add event to the search table
|
||||
|
|
@ -454,17 +454,15 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
|
||||
|
||||
defer.returnValue(
|
||||
{
|
||||
"results": [
|
||||
{"event": event_map[r["event_id"]], "rank": r["rank"]}
|
||||
for r in results
|
||||
if r["event_id"] in event_map
|
||||
],
|
||||
"highlights": highlights,
|
||||
"count": count,
|
||||
}
|
||||
)
|
||||
return {
|
||||
"results": [
|
||||
{"event": event_map[r["event_id"]], "rank": r["rank"]}
|
||||
for r in results
|
||||
if r["event_id"] in event_map
|
||||
],
|
||||
"highlights": highlights,
|
||||
"count": count,
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
|
||||
|
|
@ -599,22 +597,20 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
|
||||
|
||||
defer.returnValue(
|
||||
{
|
||||
"results": [
|
||||
{
|
||||
"event": event_map[r["event_id"]],
|
||||
"rank": r["rank"],
|
||||
"pagination_token": "%s,%s"
|
||||
% (r["origin_server_ts"], r["stream_ordering"]),
|
||||
}
|
||||
for r in results
|
||||
if r["event_id"] in event_map
|
||||
],
|
||||
"highlights": highlights,
|
||||
"count": count,
|
||||
}
|
||||
)
|
||||
return {
|
||||
"results": [
|
||||
{
|
||||
"event": event_map[r["event_id"]],
|
||||
"rank": r["rank"],
|
||||
"pagination_token": "%s,%s"
|
||||
% (r["origin_server_ts"], r["stream_ordering"]),
|
||||
}
|
||||
for r in results
|
||||
if r["event_id"] in event_map
|
||||
],
|
||||
"highlights": highlights,
|
||||
"count": count,
|
||||
}
|
||||
|
||||
def _find_highlights_in_postgres(self, search_query, events):
|
||||
"""Given a list of events and a search term, return a list of words
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ class SignatureWorkerStore(SQLBaseStore):
|
|||
for e_id, h in hashes.items()
|
||||
}
|
||||
|
||||
defer.returnValue(list(hashes.items()))
|
||||
return list(hashes.items())
|
||||
|
||||
def _get_event_reference_hashes_txn(self, txn, event_id):
|
||||
"""Get all the hashes for a given PDU.
|
||||
|
|
|
|||
|
|
@ -422,7 +422,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
# Retrieve the room's create event
|
||||
create_event = yield self.get_create_event_for_room(room_id)
|
||||
defer.returnValue(create_event.content.get("room_version", "1"))
|
||||
return create_event.content.get("room_version", "1")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_predecessor(self, room_id):
|
||||
|
|
@ -442,7 +442,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
create_event = yield self.get_create_event_for_room(room_id)
|
||||
|
||||
# Return predecessor if present
|
||||
defer.returnValue(create_event.content.get("predecessor", None))
|
||||
return create_event.content.get("predecessor", None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_create_event_for_room(self, room_id):
|
||||
|
|
@ -466,7 +466,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
# Retrieve the room's create event and return
|
||||
create_event = yield self.get_event(create_id)
|
||||
defer.returnValue(create_event)
|
||||
return create_event
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
def get_current_state_ids(self, room_id):
|
||||
|
|
@ -563,7 +563,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
if not event:
|
||||
return
|
||||
|
||||
defer.returnValue(event.content.get("canonical_alias"))
|
||||
return event.content.get("canonical_alias")
|
||||
|
||||
@cached(max_entries=10000, iterable=True)
|
||||
def get_state_group_delta(self, state_group):
|
||||
|
|
@ -613,14 +613,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
dict of state_group_id -> (dict of (type, state_key) -> event id)
|
||||
"""
|
||||
if not event_ids:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
event_to_groups = yield self._get_state_group_for_events(event_ids)
|
||||
|
||||
groups = set(itervalues(event_to_groups))
|
||||
group_to_state = yield self._get_state_for_groups(groups)
|
||||
|
||||
defer.returnValue(group_to_state)
|
||||
return group_to_state
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_ids_for_group(self, state_group):
|
||||
|
|
@ -634,7 +634,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
"""
|
||||
group_to_state = yield self._get_state_for_groups((state_group,))
|
||||
|
||||
defer.returnValue(group_to_state[state_group])
|
||||
return group_to_state[state_group]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_groups(self, room_id, event_ids):
|
||||
|
|
@ -645,7 +645,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
dict of state_group_id -> list of state events.
|
||||
"""
|
||||
if not event_ids:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
group_to_ids = yield self.get_state_groups_ids(room_id, event_ids)
|
||||
|
||||
|
|
@ -658,16 +658,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
get_prev_content=False,
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
{
|
||||
group: [
|
||||
state_event_map[v]
|
||||
for v in itervalues(event_id_map)
|
||||
if v in state_event_map
|
||||
]
|
||||
for group, event_id_map in iteritems(group_to_ids)
|
||||
}
|
||||
)
|
||||
return {
|
||||
group: [
|
||||
state_event_map[v]
|
||||
for v in itervalues(event_id_map)
|
||||
if v in state_event_map
|
||||
]
|
||||
for group, event_id_map in iteritems(group_to_ids)
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_state_groups_from_groups(self, groups, state_filter):
|
||||
|
|
@ -694,7 +692,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
)
|
||||
results.update(res)
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
def _get_state_groups_from_groups_txn(
|
||||
self, txn, groups, state_filter=StateFilter.all()
|
||||
|
|
@ -829,7 +827,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
for event_id, group in iteritems(event_to_groups)
|
||||
}
|
||||
|
||||
defer.returnValue({event: event_to_state[event] for event in event_ids})
|
||||
return {event: event_to_state[event] for event in event_ids}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()):
|
||||
|
|
@ -855,7 +853,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
for event_id, group in iteritems(event_to_groups)
|
||||
}
|
||||
|
||||
defer.returnValue({event: event_to_state[event] for event in event_ids})
|
||||
return {event: event_to_state[event] for event in event_ids}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_event(self, event_id, state_filter=StateFilter.all()):
|
||||
|
|
@ -871,7 +869,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
A deferred dict from (type, state_key) -> state_event
|
||||
"""
|
||||
state_map = yield self.get_state_for_events([event_id], state_filter)
|
||||
defer.returnValue(state_map[event_id])
|
||||
return state_map[event_id]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()):
|
||||
|
|
@ -887,7 +885,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
A deferred dict from (type, state_key) -> state_event
|
||||
"""
|
||||
state_map = yield self.get_state_ids_for_events([event_id], state_filter)
|
||||
defer.returnValue(state_map[event_id])
|
||||
return state_map[event_id]
|
||||
|
||||
@cached(max_entries=50000)
|
||||
def _get_state_group_for_event(self, event_id):
|
||||
|
|
@ -917,7 +915,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
desc="_get_state_group_for_events",
|
||||
)
|
||||
|
||||
defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
|
||||
return {row["event_id"]: row["state_group"] for row in rows}
|
||||
|
||||
def _get_state_for_group_using_cache(self, cache, group, state_filter):
|
||||
"""Checks if group is in cache. See `_get_state_for_groups`
|
||||
|
|
@ -997,7 +995,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
incomplete_groups = incomplete_groups_m | incomplete_groups_nm
|
||||
|
||||
if not incomplete_groups:
|
||||
defer.returnValue(state)
|
||||
return state
|
||||
|
||||
cache_sequence_nm = self._state_group_cache.sequence
|
||||
cache_sequence_m = self._state_group_members_cache.sequence
|
||||
|
|
@ -1024,7 +1022,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
# everything we need from the database anyway.
|
||||
state[group] = state_filter.filter_state(group_state_dict)
|
||||
|
||||
defer.returnValue(state)
|
||||
return state
|
||||
|
||||
def _get_state_for_groups_using_cache(self, groups, cache, state_filter):
|
||||
"""Gets the state at each of a list of state groups, optionally
|
||||
|
|
@ -1498,7 +1496,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|||
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME
|
||||
)
|
||||
|
||||
defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR)
|
||||
return result * BATCH_SIZE_SCALE_FACTOR
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_index_state(self, progress, batch_size):
|
||||
|
|
@ -1528,4 +1526,4 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|||
|
||||
yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ class StatsStore(StateDeltasStore):
|
|||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
# Get all the rooms that we want to process.
|
||||
def _make_staging_area(txn):
|
||||
|
|
@ -120,7 +120,7 @@ class StatsStore(StateDeltasStore):
|
|||
self.get_earliest_token_for_room_stats.invalidate_all()
|
||||
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_cleanup(self, progress, batch_size):
|
||||
|
|
@ -129,7 +129,7 @@ class StatsStore(StateDeltasStore):
|
|||
"""
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
position = yield self._simple_select_one_onecol(
|
||||
TEMP_TABLE + "_position", None, "position"
|
||||
|
|
@ -143,14 +143,14 @@ class StatsStore(StateDeltasStore):
|
|||
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
|
||||
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_process_rooms(self, progress, batch_size):
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
# If we don't have progress filed, delete everything.
|
||||
if not progress:
|
||||
|
|
@ -186,7 +186,7 @@ class StatsStore(StateDeltasStore):
|
|||
# No more rooms -- complete the transaction.
|
||||
if not rooms_to_work_on:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d rooms of %d remaining",
|
||||
|
|
@ -303,9 +303,9 @@ class StatsStore(StateDeltasStore):
|
|||
|
||||
if processed_event_count > batch_size:
|
||||
# Don't process any more rooms, we've hit our batch size.
|
||||
defer.returnValue(processed_event_count)
|
||||
return processed_event_count
|
||||
|
||||
defer.returnValue(processed_event_count)
|
||||
return processed_event_count
|
||||
|
||||
def delete_all_stats(self):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
)
|
||||
|
||||
if not room_ids:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
results = {}
|
||||
room_ids = list(room_ids)
|
||||
|
|
@ -323,7 +323,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
)
|
||||
results.update(dict(zip(rm_ids, res)))
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
def get_rooms_that_changed(self, room_ids, from_key):
|
||||
"""Given a list of rooms and a token, return rooms where there may have
|
||||
|
|
@ -364,7 +364,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
the chunk of events returned.
|
||||
"""
|
||||
if from_key == to_key:
|
||||
defer.returnValue(([], from_key))
|
||||
return ([], from_key)
|
||||
|
||||
from_id = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
to_id = RoomStreamToken.parse_stream_token(to_key).stream
|
||||
|
|
@ -374,7 +374,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
)
|
||||
|
||||
if not has_changed:
|
||||
defer.returnValue(([], from_key))
|
||||
return ([], from_key)
|
||||
|
||||
def f(txn):
|
||||
sql = (
|
||||
|
|
@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
# get.
|
||||
key = from_key
|
||||
|
||||
defer.returnValue((ret, key))
|
||||
return (ret, key)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_membership_changes_for_user(self, user_id, from_key, to_key):
|
||||
|
|
@ -415,14 +415,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
to_id = RoomStreamToken.parse_stream_token(to_key).stream
|
||||
|
||||
if from_key == to_key:
|
||||
defer.returnValue([])
|
||||
return []
|
||||
|
||||
if from_id:
|
||||
has_changed = self._membership_stream_cache.has_entity_changed(
|
||||
user_id, int(from_id)
|
||||
)
|
||||
if not has_changed:
|
||||
defer.returnValue([])
|
||||
return []
|
||||
|
||||
def f(txn):
|
||||
sql = (
|
||||
|
|
@ -447,7 +447,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
self._set_before_and_after(ret, rows, topo_order=False)
|
||||
|
||||
defer.returnValue(ret)
|
||||
return ret
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_recent_events_for_room(self, room_id, limit, end_token):
|
||||
|
|
@ -477,7 +477,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
self._set_before_and_after(events, rows)
|
||||
|
||||
defer.returnValue((events, token))
|
||||
return (events, token)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_recent_event_ids_for_room(self, room_id, limit, end_token):
|
||||
|
|
@ -496,7 +496,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
"""
|
||||
# Allow a zero limit here, and no-op.
|
||||
if limit == 0:
|
||||
defer.returnValue(([], end_token))
|
||||
return ([], end_token)
|
||||
|
||||
end_token = RoomStreamToken.parse(end_token)
|
||||
|
||||
|
|
@ -511,7 +511,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
# We want to return the results in ascending order.
|
||||
rows.reverse()
|
||||
|
||||
defer.returnValue((rows, token))
|
||||
return (rows, token)
|
||||
|
||||
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
|
||||
"""Gets details of the first event in a room at or after a stream ordering
|
||||
|
|
@ -549,12 +549,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
"""
|
||||
token = yield self.get_room_max_stream_ordering()
|
||||
if room_id is None:
|
||||
defer.returnValue("s%d" % (token,))
|
||||
return "s%d" % (token,)
|
||||
else:
|
||||
topo = yield self.runInteraction(
|
||||
"_get_max_topological_txn", self._get_max_topological_txn, room_id
|
||||
)
|
||||
defer.returnValue("t%d-%d" % (topo, token))
|
||||
return "t%d-%d" % (topo, token)
|
||||
|
||||
def get_stream_token_for_event(self, event_id):
|
||||
"""The stream token for an event
|
||||
|
|
@ -674,14 +674,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
[e for e in results["after"]["event_ids"]], get_prev_content=True
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
{
|
||||
"events_before": events_before,
|
||||
"events_after": events_after,
|
||||
"start": results["before"]["token"],
|
||||
"end": results["after"]["token"],
|
||||
}
|
||||
)
|
||||
return {
|
||||
"events_before": events_before,
|
||||
"events_after": events_after,
|
||||
"start": results["before"]["token"],
|
||||
"end": results["after"]["token"],
|
||||
}
|
||||
|
||||
def _get_events_around_txn(
|
||||
self, txn, room_id, event_id, before_limit, after_limit, event_filter
|
||||
|
|
@ -785,7 +783,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
events = yield self.get_events_as_list(event_ids)
|
||||
|
||||
defer.returnValue((upper_bound, events))
|
||||
return (upper_bound, events)
|
||||
|
||||
def get_federation_out_pos(self, typ):
|
||||
return self._simple_select_one_onecol(
|
||||
|
|
@ -939,7 +937,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
self._set_before_and_after(events, rows)
|
||||
|
||||
defer.returnValue((events, token))
|
||||
return (events, token)
|
||||
|
||||
|
||||
class StreamStore(StreamWorkerStore):
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
|||
room_id string, tag string and content string.
|
||||
"""
|
||||
if last_id == current_id:
|
||||
defer.returnValue([])
|
||||
return []
|
||||
|
||||
def get_all_updated_tags_txn(txn):
|
||||
sql = (
|
||||
|
|
@ -107,7 +107,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
|||
)
|
||||
results.extend(tags)
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_updated_tags(self, user_id, stream_id):
|
||||
|
|
@ -135,7 +135,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
|||
user_id, int(stream_id)
|
||||
)
|
||||
if not changed:
|
||||
defer.returnValue({})
|
||||
return {}
|
||||
|
||||
room_ids = yield self.runInteraction("get_updated_tags", get_updated_tags_txn)
|
||||
|
||||
|
|
@ -145,7 +145,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
|||
for room_id in room_ids:
|
||||
results[room_id] = tags_by_room.get(room_id, {})
|
||||
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
def get_tags_for_room(self, user_id, room_id):
|
||||
"""Get all the tags for the given room
|
||||
|
|
@ -194,7 +194,7 @@ class TagsStore(TagsWorkerStore):
|
|||
self.get_tags_for_user.invalidate((user_id,))
|
||||
|
||||
result = self._account_data_id_gen.get_current_token()
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def remove_tag_from_room(self, user_id, room_id, tag):
|
||||
|
|
@ -217,7 +217,7 @@ class TagsStore(TagsWorkerStore):
|
|||
self.get_tags_for_user.invalidate((user_id,))
|
||||
|
||||
result = self._account_data_id_gen.get_current_token()
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
def _update_revision_txn(self, txn, user_id, room_id, next_id):
|
||||
"""Update the latest revision of the tags for the given user and room.
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ class TransactionStore(SQLBaseStore):
|
|||
|
||||
result = self._destination_retry_cache.get(destination, SENTINEL)
|
||||
if result is not SENTINEL:
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
result = yield self.runInteraction(
|
||||
"get_destination_retry_timings",
|
||||
|
|
@ -158,7 +158,7 @@ class TransactionStore(SQLBaseStore):
|
|||
# We don't hugely care about race conditions between getting and
|
||||
# invalidating the cache, since we time out fairly quickly anyway.
|
||||
self._destination_retry_cache[destination] = result
|
||||
defer.returnValue(result)
|
||||
return result
|
||||
|
||||
def _get_destination_retry_timings(self, txn, destination):
|
||||
result = self._simple_select_one_txn(
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
|
||||
|
||||
yield self._end_background_update("populate_user_directory_createtables")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_user_directory_cleanup(self, progress, batch_size):
|
||||
|
|
@ -131,7 +131,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
)
|
||||
|
||||
yield self._end_background_update("populate_user_directory_cleanup")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_user_directory_process_rooms(self, progress, batch_size):
|
||||
|
|
@ -177,7 +177,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
# No more rooms -- complete the transaction.
|
||||
if not rooms_to_work_on:
|
||||
yield self._end_background_update("populate_user_directory_process_rooms")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d rooms of %d remaining"
|
||||
|
|
@ -257,9 +257,9 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
|
||||
if processed_event_count > batch_size:
|
||||
# Don't process any more rooms, we've hit our batch size.
|
||||
defer.returnValue(processed_event_count)
|
||||
return processed_event_count
|
||||
|
||||
defer.returnValue(processed_event_count)
|
||||
return processed_event_count
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_user_directory_process_users(self, progress, batch_size):
|
||||
|
|
@ -268,7 +268,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
"""
|
||||
if not self.hs.config.user_directory_search_all_users:
|
||||
yield self._end_background_update("populate_user_directory_process_users")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
def _get_next_batch(txn):
|
||||
sql = "SELECT user_id FROM %s LIMIT %s" % (
|
||||
|
|
@ -298,7 +298,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
# No more users -- complete the transaction.
|
||||
if not users_to_work_on:
|
||||
yield self._end_background_update("populate_user_directory_process_users")
|
||||
defer.returnValue(1)
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d users of %d remaining"
|
||||
|
|
@ -322,7 +322,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
progress,
|
||||
)
|
||||
|
||||
defer.returnValue(len(users_to_work_on))
|
||||
return len(users_to_work_on)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def is_room_world_readable_or_publicly_joinable(self, room_id):
|
||||
|
|
@ -344,16 +344,16 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
join_rule_ev = yield self.get_event(join_rules_id, allow_none=True)
|
||||
if join_rule_ev:
|
||||
if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
|
||||
if hist_vis_id:
|
||||
hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True)
|
||||
if hist_vis_ev:
|
||||
if hist_vis_ev.content.get("history_visibility") == "world_readable":
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
|
||||
"""
|
||||
|
|
@ -499,7 +499,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
user_ids = set(user_ids_share_pub)
|
||||
user_ids.update(user_ids_share_priv)
|
||||
|
||||
defer.returnValue(user_ids)
|
||||
return user_ids
|
||||
|
||||
def add_users_who_share_private_room(self, room_id, user_id_tuples):
|
||||
"""Insert entries into the users_who_share_private_rooms table. The first
|
||||
|
|
@ -609,7 +609,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
|
||||
users = set(pub_rows)
|
||||
users.update(rows)
|
||||
defer.returnValue(list(users))
|
||||
return list(users)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_rooms_in_common_for_users(self, user_id, other_user_id):
|
||||
|
|
@ -635,7 +635,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
"get_rooms_in_common_for_users", None, sql, user_id, other_user_id
|
||||
)
|
||||
|
||||
defer.returnValue([room_id for room_id, in rows])
|
||||
return [room_id for room_id, in rows]
|
||||
|
||||
def delete_all_from_user_dir(self):
|
||||
"""Delete the entire user directory
|
||||
|
|
@ -782,7 +782,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
|
||||
limited = len(results) > limit
|
||||
|
||||
defer.returnValue({"limited": limited, "results": results})
|
||||
return {"limited": limited, "results": results}
|
||||
|
||||
|
||||
def _parse_query_sqlite(search_term):
|
||||
|
|
|
|||
|
|
@ -12,9 +12,8 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import operator
|
||||
|
||||
from twisted.internet import defer
|
||||
import operator
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
|
@ -67,7 +66,7 @@ class UserErasureWorkerStore(SQLBaseStore):
|
|||
|
||||
erased_users = yield self.runInteraction("are_users_erased", _get_erased_users)
|
||||
res = dict((u, u in erased_users) for u in user_ids)
|
||||
defer.returnValue(res)
|
||||
return res
|
||||
|
||||
|
||||
class UserErasureStore(UserErasureWorkerStore):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue