merge develop pydoc for _get_state_for_groups

This commit is contained in:
Matthew Hodgson 2018-07-19 11:26:04 +01:00
commit be3adfc331
370 changed files with 6458 additions and 3962 deletions

View file

@ -15,51 +15,49 @@
# limitations under the License.
import datetime
from dateutil import tz
import time
import logging
import time
from synapse.storage.devices import DeviceStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
from .profile import ProfileStore
from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
from .stream import StreamStore
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
from .pusher import PusherStore
from .push_rule import PushRuleStore
from .media_repository import MediaRepositoryStore
from .rejections import RejectionsStore
from .event_push_actions import EventPushActionsStore
from .deviceinbox import DeviceInboxStore
from .group_server import GroupServerStore
from .state import StateStore
from .signatures import SignatureStore
from .filtering import FilteringStore
from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore
from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
from .openid import OpenIdStore
from .client_ips import ClientIpStore
from .user_directory import UserDirectoryStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
from .engines import PostgresEngine
from dateutil import tz
from synapse.api.constants import PresenceState
from synapse.storage.devices import DeviceStore
from synapse.storage.user_erasure_store import UserErasureStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from .account_data import AccountDataStore
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
from .client_ips import ClientIpStore
from .deviceinbox import DeviceInboxStore
from .directory import DirectoryStore
from .end_to_end_keys import EndToEndKeyStore
from .engines import PostgresEngine
from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events import EventsStore
from .filtering import FilteringStore
from .group_server import GroupServerStore
from .keys import KeyStore
from .media_repository import MediaRepositoryStore
from .openid import OpenIdStore
from .presence import PresenceStore, UserPresenceState
from .profile import ProfileStore
from .push_rule import PushRuleStore
from .pusher import PusherStore
from .receipts import ReceiptsStore
from .registration import RegistrationStore
from .rejections import RejectionsStore
from .room import RoomStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .signatures import SignatureStore
from .state import StateStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionStore
from .user_directory import UserDirectoryStore
from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerator
logger = logging.getLogger(__name__)
@ -88,6 +86,7 @@ class DataStore(RoomMemberStore, RoomStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
):
def __init__(self, db_conn, hs):

View file

@ -13,22 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import threading
import time
from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine
from six import iteritems, iterkeys, itervalues
from six.moves import intern, range
from prometheus_client import Histogram
from twisted.internet import defer
import sys
import time
import threading
from six import itervalues, iterkeys, iteritems
from six.moves import intern, range
from synapse.api.errors import StoreError
from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
@ -221,7 +220,7 @@ class SQLBaseStore(object):
self._clock.looping_call(loop, 10000)
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
logging_context, func, *args, **kwargs):
func, *args, **kwargs):
start = time.time()
txn_id = self._TXN_ID
@ -285,8 +284,7 @@ class SQLBaseStore(object):
end = time.time()
duration = end - start
if logging_context is not None:
logging_context.add_database_transaction(duration)
LoggingContext.current_context().add_database_transaction(duration)
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
@ -310,19 +308,15 @@ class SQLBaseStore(object):
Returns:
Deferred: The result of func
"""
current_context = LoggingContext.current_context()
after_callbacks = []
exception_callbacks = []
def inner_func(conn, *args, **kwargs):
return self._new_transaction(
conn, desc, after_callbacks, exception_callbacks, current_context,
func, *args, **kwargs
)
try:
result = yield self.runWithConnection(inner_func, *args, **kwargs)
result = yield self.runWithConnection(
self._new_transaction,
desc, after_callbacks, exception_callbacks, func,
*args, **kwargs
)
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
@ -347,22 +341,25 @@ class SQLBaseStore(object):
Returns:
Deferred: The result of func
"""
current_context = LoggingContext.current_context()
parent_context = LoggingContext.current_context()
if parent_context == LoggingContext.sentinel:
logger.warn(
"Starting db connection from sentinel context: metrics will be lost",
)
parent_context = None
start_time = time.time()
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection") as context:
with LoggingContext("runWithConnection", parent_context) as context:
sched_duration_sec = time.time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
current_context.add_database_scheduled(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
current_context.copy_to(context)
return func(conn, *args, **kwargs)
with PreserveLoggingContext():

View file

@ -14,17 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from canonicaljson import json
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
import abc
import simplejson as json
import logging
logger = logging.getLogger(__name__)
@ -114,25 +114,6 @@ class AccountDataWorkerStore(SQLBaseStore):
else:
defer.returnValue(None)
@cachedList(cached_method_name="get_global_account_data_by_type_for_user",
num_args=2, list_name="user_ids", inlineCallbacks=True)
def get_global_account_data_by_type_for_users(self, data_type, user_ids):
rows = yield self._simple_select_many_batch(
table="account_data",
column="user_id",
iterable=user_ids,
keyvalues={
"account_data_type": data_type,
},
retcols=("user_id", "content",),
desc="get_global_account_data_by_type_for_users",
)
defer.returnValue({
row["user_id"]: json.loads(row["content"]) if row["content"] else None
for row in rows
})
@cached(num_args=2)
def get_account_data_for_room(self, user_id, room_id):
"""Get all the client account_data for a user for a room.

View file

@ -15,14 +15,16 @@
# limitations under the License.
import logging
import re
import simplejson as json
from canonicaljson import json
from twisted.internet import defer
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage.events import EventsWorkerStore
from ._base import SQLBaseStore
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)

View file

@ -12,15 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.util.async
from ._base import SQLBaseStore
from . import engines
import logging
from canonicaljson import json
from twisted.internet import defer
import simplejson as json
import logging
from synapse.metrics.background_process_metrics import run_as_background_process
from . import engines
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@ -87,12 +89,16 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers = {}
self._all_done = False
@defer.inlineCallbacks
def start_doing_background_updates(self):
logger.info("Starting background schema updates")
run_as_background_process(
"background_updates", self._run_background_updates,
)
@defer.inlineCallbacks
def _run_background_updates(self):
logger.info("Starting background schema updates")
while True:
yield synapse.util.async.sleep(
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
try:

View file

@ -15,15 +15,15 @@
import logging
from twisted.internet import defer, reactor
from ._base import Cache
from . import background_updates
from synapse.util.caches import CACHE_SIZE_FACTOR
from six import iteritems
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR
from . import background_updates
from ._base import Cache
logger = logging.getLogger(__name__)
@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
@ -92,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self):
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
def update():
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn,
to_update,
)
run_as_background_process(
"update_client_ips", update,
)
def _update_client_ips_batch_txn(self, txn, to_update):

View file

@ -14,14 +14,14 @@
# limitations under the License.
import logging
import simplejson
from canonicaljson import json
from twisted.internet import defer
from .background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
from .background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
@ -85,7 +85,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
)
rows = []
for destination, edu in remote_messages_by_destination.items():
edu_json = simplejson.dumps(edu)
edu_json = json.dumps(edu)
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)
@ -177,7 +177,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
" WHERE user_id = ?"
)
txn.execute(sql, (user_id,))
message_json = simplejson.dumps(messages_by_device["*"])
message_json = json.dumps(messages_by_device["*"])
for row in txn:
# Add the message for all devices for this user on this
# server.
@ -199,7 +199,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
# Only insert into the local inbox if the device exists on
# this server
device = row[0]
message_json = simplejson.dumps(messages_by_device[device])
message_json = json.dumps(messages_by_device[device])
messages_json_for_user[device] = message_json
if messages_json_for_user:
@ -253,7 +253,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
messages.append(simplejson.loads(row[1]))
messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)
@ -389,7 +389,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
messages.append(simplejson.loads(row[1]))
messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)

View file

@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import simplejson as json
from six import iteritems, itervalues
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
from ._base import SQLBaseStore, Cache
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from six import itervalues, iteritems
from ._base import Cache, SQLBaseStore
logger = logging.getLogger(__name__)

View file

@ -13,15 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from synapse.api.errors import SynapseError
from collections import namedtuple
from twisted.internet import defer
from collections import namedtuple
from synapse.api.errors import SynapseError
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
RoomAliasMapping = namedtuple(
"RoomAliasMapping",

View file

@ -12,17 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from six import iteritems
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
from canonicaljson import encode_canonical_json
import simplejson as json
from ._base import SQLBaseStore
from six import iteritems
class EndToEndKeyStore(SQLBaseStore):
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):

View file

@ -13,13 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
import importlib
import platform
from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,

View file

@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.prepare_database import prepare_database
import struct
import threading
from synapse.storage.prepare_database import prepare_database
class Sqlite3Engine(object):
single_threaded = True

View file

@ -12,23 +12,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
from six.moves import range
from six.moves.queue import Empty, PriorityQueue
from unpaddedbase64 import encode_base64
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
import logging
from six.moves.queue import PriorityQueue, Empty
from six.moves import range
logger = logging.getLogger(__name__)

View file

@ -14,16 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
import logging
import simplejson as json
from six import iteritems
from canonicaljson import json
from twisted.internet import defer
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
logger = logging.getLogger(__name__)
@ -84,6 +85,8 @@ class EventPushActionsWorkerStore(SQLBaseStore):
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 10 * 60 * 1000
)
self._rotate_delay = 3
self._rotate_count = 10000
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@ -800,7 +803,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
yield sleep(5)
yield self.hs.get_clock().sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
@ -821,8 +824,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""", (old_rotate_stream_ordering,))
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row

View file

@ -14,36 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict, deque, namedtuple
from functools import wraps
import itertools
import logging
from collections import OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, itervalues
from six.moves import range
from canonicaljson import json
from prometheus_client import Counter
import simplejson as json
from twisted.internet import defer
from synapse.storage.events_worker import EventsWorkerStore
from synapse.util.async import ObservableDeferred
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import (
PreserveLoggingContext, make_deferred_yieldable,
)
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
import synapse.metrics
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.types import get_domain_from_id, RoomStreamToken
import synapse.metrics
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from six.moves import range
from six import itervalues, iteritems
from prometheus_client import Counter
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@ -158,11 +156,8 @@ class _EventPeristenceQueue(object):
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
# set handle_queue_loop off on the background. We don't want to
# attribute work done in it to the current request, so we drop the
# logcontext altogether.
with PreserveLoggingContext():
handle_queue_loop()
# set handle_queue_loop off in the background
run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@ -800,7 +795,8 @@ class EventsStore(EventsWorkerStore):
]
)
self._curr_state_delta_stream_cache.entity_has_changed(
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
)
@ -1044,7 +1040,6 @@ class EventsStore(EventsWorkerStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@ -1064,6 +1059,14 @@ class EventsStore(EventsWorkerStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
for table in (
"event_push_actions",
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables

View file

@ -12,27 +12,29 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from twisted.internet import defer, reactor
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
import logging
from collections import namedtuple
import logging
import simplejson as json
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import SynapseError
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.util.metrics import Measure
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@ -145,6 +147,9 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
log_ctx = LoggingContext.current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
@ -218,32 +223,47 @@ class EventsWorkerStore(SQLBaseStore):
"""Takes a database connection and waits for requests for events from
the _event_fetch_list queue.
"""
event_list = []
i = 0
while True:
with self._event_fetch_lock:
event_list = self._event_fetch_list
self._event_fetch_list = []
if not event_list:
single_threaded = self.database_engine.single_threaded
if single_threaded or i > EVENT_QUEUE_ITERATIONS:
self._event_fetch_ongoing -= 1
return
else:
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
i += 1
continue
i = 0
self._fetch_event_list(conn, event_list)
def _fetch_event_list(self, conn, event_list):
"""Handle a load of requests from the _event_fetch_list queue
Args:
conn (twisted.enterprise.adbapi.Connection): database connection
event_list (list[Tuple[list[str], Deferred]]):
The fetch requests. Each entry consists of a list of event
ids to be fetched, and a deferred to be completed once the
events have been fetched.
"""
with Measure(self._clock, "_fetch_event_list"):
try:
with self._event_fetch_lock:
event_list = self._event_fetch_list
self._event_fetch_list = []
if not event_list:
single_threaded = self.database_engine.single_threaded
if single_threaded or i > EVENT_QUEUE_ITERATIONS:
self._event_fetch_ongoing -= 1
return
else:
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
i += 1
continue
i = 0
event_id_lists = zip(*event_list)[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
rows = self._new_transaction(
conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
conn, "do_fetch", [], [],
self._fetch_event_rows, event_ids,
)
row_dict = {
@ -265,7 +285,7 @@ class EventsWorkerStore(SQLBaseStore):
except Exception:
logger.exception("Failed to callback")
with PreserveLoggingContext():
reactor.callFromThread(fire, event_list, row_dict)
self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@ -276,9 +296,8 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext():
d.errback(e)
if event_list:
with PreserveLoggingContext():
reactor.callFromThread(fire, event_list)
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
@ -304,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False
if should_start:
with PreserveLoggingContext():
self.runWithConnection(
self._do_fetch
)
run_as_background_process(
"fetch_events",
self.runWithConnection,
self._do_fetch,
)
logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():

View file

@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.api.errors import SynapseError, Codes
from synapse.api.errors import Codes, SynapseError
from synapse.util.caches.descriptors import cachedInlineCallbacks
from canonicaljson import encode_canonical_json
import simplejson as json
from ._base import SQLBaseStore
class FilteringStore(SQLBaseStore):

View file

@ -14,15 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import SynapseError
from ._base import SQLBaseStore
import simplejson as json
# The category ID for the "default" category. We don't store as null in the
# database to avoid the fun of null != null
_DEFAULT_CATEGORY_ID = ""

View file

@ -13,17 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
import hashlib
import logging
from twisted.internet import defer
import six
import OpenSSL
from signedjson.key import decode_verify_key_bytes
import hashlib
import logging
import OpenSSL
from twisted.internet import defer
from synapse.util.caches.descriptors import cachedInlineCallbacks
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)

View file

@ -20,7 +20,6 @@ import logging
import os
import re
logger = logging.getLogger(__name__)

View file

@ -13,14 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from synapse.api.constants import PresenceState
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util import batch_iter
from collections import namedtuple
from twisted.internet import defer
from synapse.api.constants import PresenceState
from synapse.util import batch_iter
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from ._base import SQLBaseStore
class UserPresenceState(namedtuple("UserPresenceState",
("user_id", "state", "last_active_ts",

View file

@ -15,8 +15,8 @@
from twisted.internet import defer
from synapse.storage.roommember import ProfileInfo
from synapse.api.errors import StoreError
from synapse.storage.roommember import ProfileInfo
from ._base import SQLBaseStore

View file

@ -14,20 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
import abc
import logging
from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.push.baserules import list_with_base_rules
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
from synapse.storage.receipts import ReceiptsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.push.baserules import list_with_base_rules
from synapse.api.constants import EventTypes
from twisted.internet import defer
import abc
import logging
import simplejson as json
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)

View file

@ -14,16 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from twisted.internet import defer
import logging
import types
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
import logging
import simplejson as json
import types
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)

View file

@ -14,17 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from .util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
import abc
import logging
from canonicaljson import json
from twisted.internet import defer
import abc
import logging
import simplejson as json
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import SQLBaseStore
from .util.id_generators import StreamIdGenerator
logger = logging.getLogger(__name__)
@ -139,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
"""
room_ids = set(room_ids)
if from_key:
if from_key is not None:
# Only ask the database about rooms where there have been new
# receipts added since `from_key`
room_ids = yield self._receipts_stream_cache.get_entities_changed(
room_ids, from_key
)
@ -150,7 +153,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
defer.returnValue([ev for res in results.values() for ev in res])
@cachedInlineCallbacks(num_args=3, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
@ -161,7 +163,19 @@ class ReceiptsWorkerStore(SQLBaseStore):
from the start.
Returns:
list: A list of receipts.
Deferred[list]: A list of receipts.
"""
if from_key is not None:
# Check the cache first to see if any new receipts have been added
# since`from_key`. If not we can no-op.
if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
defer.succeed([])
return self._get_linearized_receipts_for_room(room_id, to_key, from_key)
@cachedInlineCallbacks(num_args=3, tree=True)
def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""See get_linearized_receipts_for_room
"""
def f(txn):
if from_key:
@ -210,7 +224,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"content": content,
}])
@cachedList(cached_method_name="get_linearized_receipts_for_room",
@cachedList(cached_method_name="_get_linearized_receipts_for_room",
list_name="room_ids", num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
@ -372,7 +386,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(
self._receipts_stream_cache.entity_has_changed,
@ -492,7 +506,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
self._simple_delete_txn(
txn,

View file

@ -15,15 +15,15 @@
import re
from six.moves import range
from twisted.internet import defer
from synapse.api.errors import StoreError, Codes
from synapse.api.errors import Codes, StoreError
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from six.moves import range
class RegistrationWorkerStore(SQLBaseStore):
@cached()
@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore,
defer.returnValue(ret['user_id'])
defer.returnValue(None)
def user_delete_threepids(self, user_id):
return self._simple_delete(
"user_threepids",
keyvalues={
"user_id": user_id,
},
desc="user_delete_threepids",
)
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
@ -632,7 +623,9 @@ class RegistrationStore(RegistrationWorkerStore,
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
return self._simple_delete_one(
# XXX: This should be simple_delete_one but we failed to put a unique index on
# the table, so somehow duplicate entries have ended up in it.
return self._simple_delete(
"users_pending_deactivation",
keyvalues={
"user_id": user_id,

View file

@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
import logging
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)

View file

@ -13,6 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
import re
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
@ -20,11 +26,6 @@ from synapse.storage._base import SQLBaseStore
from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
import collections
import logging
import simplejson as json
import re
logger = logging.getLogger(__name__)

View file

@ -14,24 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import logging
from collections import namedtuple
from six import iteritems, itervalues
from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.storage.events import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.stringutils import to_ascii
from synapse.api.constants import Membership, EventTypes
from synapse.types import get_domain_from_id
import logging
import simplejson as json
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@ -455,7 +454,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(joined_hosts)
@cached(max_entries=10000, iterable=True)
@cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)

View file

@ -14,11 +14,11 @@
import logging
from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import simplejson
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)

View file

@ -14,10 +14,10 @@
import logging
from synapse.storage.prepare_database import get_statements
import simplejson
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)

View file

@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.config.appservice import load_appservices
from six.moves import range
from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)

View file

@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import simplejson
from synapse.storage.engines import PostgresEngine
from synapse.storage.prepare_database import get_statements
import logging
import simplejson
logger = logging.getLogger(__name__)

View file

@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.prepare_database import get_statements
import logging
import simplejson
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)

View file

@ -14,7 +14,6 @@
import time
ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"

View file

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import PostgresEngine
import logging
from synapse.storage.engines import PostgresEngine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)

View file

@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.engines import PostgresEngine
import logging
from synapse.storage.engines import PostgresEngine
logger = logging.getLogger(__name__)

View file

@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.engines import PostgresEngine
import logging
from synapse.storage.engines import PostgresEngine
logger = logging.getLogger(__name__)

View file

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import PostgresEngine
import logging
from synapse.storage.engines import PostgresEngine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
DROP_INDICES = """

View file

@ -14,8 +14,8 @@
import logging
from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)

View file

@ -0,0 +1,21 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- a table of users who have requested that their details be erased
CREATE TABLE erased_users (
user_id TEXT NOT NULL
);
CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);

View file

@ -13,19 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import logging
import re
import simplejson as json
from collections import namedtuple
from six import string_types
from canonicaljson import json
from twisted.internet import defer
from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from .background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
SearchEntry = namedtuple('SearchEntry', [

View file

@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import six
from ._base import SQLBaseStore
from unpaddedbase64 import encode_base64
from twisted.internet import defer
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList
from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:

View file

@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import logging
from collections import namedtuple
from six import iteritems, itervalues
from six.moves import range
@ -23,10 +23,11 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches import get_cache_factor_for, intern_string
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@ -585,19 +586,24 @@ class StateGroupWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_state_for_groups(self, groups, types=None):
"""Given list of groups returns dict of group -> list of state events
with matching types.
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
Args:
groups(list[int]): list of groups whose state to query
types(list[str|None, str|None]|None): List of 2-tuples of the form
(`type`, `state_key`), where a `state_key` of `None` matches all
state_keys for the `type`. Presence of type of `None` indicates
that types not in the list should not be filtered out. If None,
all events are returned.
groups (iterable[int]): list of state groups for which we want
to get the state.
types (None|iterable[(None|str, None|str)]):
indicates the state type/keys required. If None, the whole
state is fetched and returned.
Otherwise, each entry should be a `(type, state_key)` tuple to
include in the response. A `state_key` of None is a wildcard
meaning that we require all state with that type. A `type` of None
indicates that types not in the list should not be filtered out.
Returns:
dict of group -> list of state events
Deferred[dict[int, dict[(type, state_key), EventBase]]]
a dictionary mapping from state group to state dictionary.
"""
if types:
types = frozenset(types)
@ -606,7 +612,7 @@ class StateGroupWorkerStore(SQLBaseStore):
if types is not None:
for group in set(groups):
state_dict_ids, _, got_all = self._get_some_state_from_cache(
group, types
group, types,
)
results[group] = state_dict_ids
@ -627,26 +633,40 @@ class StateGroupWorkerStore(SQLBaseStore):
# Okay, so we have some missing_types, lets fetch them.
cache_seq_num = self._state_group_cache.sequence
# the DictionaryCache knows if it has *all* the state, but
# does not know if it has all of the keys of a particular type,
# which makes wildcard lookups expensive unless we have a complete
# cache. Hence, if we are doing a wildcard lookup, populate the
# cache fully so that we can do an efficient lookup next time.
if types and any(k is None for (t, k) in types):
types_to_fetch = None
else:
types_to_fetch = types
group_to_state_dict = yield self._get_state_groups_from_groups(
missing_groups, types
missing_groups, types_to_fetch,
)
# Now we want to update the cache with all the things we fetched
# from the database.
for group, group_state_dict in iteritems(group_to_state_dict):
state_dict = results[group]
state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
for k, v in iteritems(group_state_dict)
)
# update the result, filtering by `types`.
if types:
for k, v in iteritems(group_state_dict):
(typ, _) = k
if k in types or (typ, None) in types:
state_dict[k] = v
else:
state_dict.update(group_state_dict)
# update the cache with all the things we fetched from the
# database.
self._state_group_cache.update(
cache_seq_num,
key=group,
value=state_dict,
full=(types is None),
known_absent=types,
value=group_state_dict,
fetched_keys=types_to_fetch,
)
defer.returnValue(results)
@ -753,7 +773,6 @@ class StateGroupWorkerStore(SQLBaseStore):
self._state_group_cache.sequence,
key=state_group,
value=dict(current_state_ids),
full=True,
)
return state_group

View file

@ -33,22 +33,20 @@ what sort order was used:
and stream ordering columns respectively.
"""
import abc
import logging
from collections import namedtuple
from six.moves import range
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.events import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.engines import PostgresEngine
import abc
import logging
from six.moves import range
from collections import namedtuple
logger = logging.getLogger(__name__)

View file

@ -14,16 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.account_data import AccountDataWorkerStore
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
import simplejson as json
import logging
from six.moves import range
from canonicaljson import json
from twisted.internet import defer
from synapse.storage.account_data import AccountDataWorkerStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)

View file

@ -13,18 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
import six
from canonicaljson import encode_canonical_json
import logging
from collections import namedtuple
import logging
import simplejson as json
import six
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview

View file

@ -13,19 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
import logging
import re
from six import iteritems
import re
import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@ -265,7 +265,7 @@ class UserDirectoryStore(SQLBaseStore):
self.get_user_in_public_room.invalidate((user_id,))
def get_users_in_public_due_to_room(self, room_id):
"""Get all user_ids that are in the room directory becuase they're
"""Get all user_ids that are in the room directory because they're
in the given room_id
"""
return self._simple_select_onecol(
@ -277,7 +277,7 @@ class UserDirectoryStore(SQLBaseStore):
@defer.inlineCallbacks
def get_users_in_dir_due_to_room(self, room_id):
"""Get all user_ids that are in the room directory becuase they're
"""Get all user_ids that are in the room directory because they're
in the given room_id
"""
user_ids_dir = yield self._simple_select_onecol(

View file

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import operator
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList
class UserErasureWorkerStore(SQLBaseStore):
@cached()
def is_user_erased(self, user_id):
"""
Check if the given user id has requested erasure
Args:
user_id (str): full user id to check
Returns:
Deferred[bool]: True if the user has requested erasure
"""
return self._simple_select_onecol(
table="erased_users",
keyvalues={"user_id": user_id},
retcol="1",
desc="is_user_erased",
).addCallback(operator.truth)
@cachedList(
cached_method_name="is_user_erased",
list_name="user_ids",
inlineCallbacks=True,
)
def are_users_erased(self, user_ids):
"""
Checks which users in a list have requested erasure
Args:
user_ids (iterable[str]): full user id to check
Returns:
Deferred[dict[str, bool]]:
for each user, whether the user has requested erasure.
"""
# this serves the dual purpose of (a) making sure we can do len and
# iterate it multiple times, and (b) avoiding duplicates.
user_ids = tuple(set(user_ids))
def _get_erased_users(txn):
txn.execute(
"SELECT user_id FROM erased_users WHERE user_id IN (%s)" % (
",".join("?" * len(user_ids))
),
user_ids,
)
return set(r[0] for r in txn)
erased_users = yield self.runInteraction(
"are_users_erased", _get_erased_users,
)
res = dict((u, u in erased_users) for u in user_ids)
defer.returnValue(res)
class UserErasureStore(UserErasureWorkerStore):
def mark_user_erased(self, user_id):
"""Indicate that user_id wishes their message history to be erased.
Args:
user_id (str): full user_id to be erased
"""
def f(txn):
# first check if they are already in the list
txn.execute(
"SELECT 1 FROM erased_users WHERE user_id = ?",
(user_id, )
)
if txn.fetchone():
return
# they are not already there: do the insert.
txn.execute(
"INSERT INTO erased_users (user_id) VALUES (?)",
(user_id, )
)
self._invalidate_cache_and_stream(
txn, self.is_user_erased, (user_id,)
)
return self.runInteraction("mark_user_erased", f)

View file

@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import deque
import contextlib
import threading
from collections import deque
class IdGenerator(object):