mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-10-07 16:38:26 -04:00
Merge branch 'develop' into travis/login-terms
This commit is contained in:
commit
7ede650956
85 changed files with 313 additions and 190 deletions
|
@ -27,4 +27,4 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "0.33.5.1"
|
||||
__version__ = "0.33.6"
|
||||
|
|
|
@ -17,6 +17,7 @@ import gc
|
|||
import logging
|
||||
import sys
|
||||
|
||||
import psutil
|
||||
from daemonize import Daemonize
|
||||
|
||||
from twisted.internet import error, reactor
|
||||
|
@ -24,12 +25,6 @@ from twisted.internet import error, reactor
|
|||
from synapse.util import PreserveLoggingContext
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
|
||||
try:
|
||||
import affinity
|
||||
except Exception:
|
||||
affinity = None
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -89,15 +84,20 @@ def start_reactor(
|
|||
with PreserveLoggingContext():
|
||||
logger.info("Running")
|
||||
if cpu_affinity is not None:
|
||||
if not affinity:
|
||||
quit_with_error(
|
||||
"Missing package 'affinity' required for cpu_affinity\n"
|
||||
"option\n\n"
|
||||
"Install by running:\n\n"
|
||||
" pip install affinity\n\n"
|
||||
)
|
||||
logger.info("Setting CPU affinity to %s" % cpu_affinity)
|
||||
affinity.set_process_affinity_mask(0, cpu_affinity)
|
||||
# Turn the bitmask into bits, reverse it so we go from 0 up
|
||||
mask_to_bits = bin(cpu_affinity)[2:][::-1]
|
||||
|
||||
cpus = []
|
||||
cpu_num = 0
|
||||
|
||||
for i in mask_to_bits:
|
||||
if i == "1":
|
||||
cpus.append(cpu_num)
|
||||
cpu_num += 1
|
||||
|
||||
p = psutil.Process()
|
||||
p.cpu_affinity(cpus)
|
||||
|
||||
change_resource_limit(soft_file_limit)
|
||||
if gc_thresholds:
|
||||
gc.set_threshold(*gc_thresholds)
|
||||
|
|
|
@ -178,6 +178,9 @@ def start(config_options):
|
|||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
# This should only be done on the user directory worker or the master
|
||||
config.update_user_directory = False
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
|
|
@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
|
|||
from synapse.http.site import SynapseSite
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import __func__
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
|
@ -49,31 +50,31 @@ class PusherSlaveStore(
|
|||
SlavedAccountDataStore
|
||||
):
|
||||
update_pusher_last_stream_ordering_and_success = (
|
||||
DataStore.update_pusher_last_stream_ordering_and_success.__func__
|
||||
__func__(DataStore.update_pusher_last_stream_ordering_and_success)
|
||||
)
|
||||
|
||||
update_pusher_failing_since = (
|
||||
DataStore.update_pusher_failing_since.__func__
|
||||
__func__(DataStore.update_pusher_failing_since)
|
||||
)
|
||||
|
||||
update_pusher_last_stream_ordering = (
|
||||
DataStore.update_pusher_last_stream_ordering.__func__
|
||||
__func__(DataStore.update_pusher_last_stream_ordering)
|
||||
)
|
||||
|
||||
get_throttle_params_by_room = (
|
||||
DataStore.get_throttle_params_by_room.__func__
|
||||
__func__(DataStore.get_throttle_params_by_room)
|
||||
)
|
||||
|
||||
set_throttle_params = (
|
||||
DataStore.set_throttle_params.__func__
|
||||
__func__(DataStore.set_throttle_params)
|
||||
)
|
||||
|
||||
get_time_of_last_push_action_before = (
|
||||
DataStore.get_time_of_last_push_action_before.__func__
|
||||
__func__(DataStore.get_time_of_last_push_action_before)
|
||||
)
|
||||
|
||||
get_profile_displayname = (
|
||||
DataStore.get_profile_displayname.__func__
|
||||
__func__(DataStore.get_profile_displayname)
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
|
|||
from synapse.http.site import SynapseSite
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
|
@ -147,7 +147,7 @@ class SynchrotronPresence(object):
|
|||
and haven't come back yet. If there are poke the master about them.
|
||||
"""
|
||||
now = self.clock.time_msec()
|
||||
for user_id, last_sync_ms in self.users_going_offline.items():
|
||||
for user_id, last_sync_ms in list(self.users_going_offline.items()):
|
||||
if now - last_sync_ms > 10 * 1000:
|
||||
self.users_going_offline.pop(user_id, None)
|
||||
self.send_user_sync(user_id, False, last_sync_ms)
|
||||
|
@ -156,9 +156,9 @@ class SynchrotronPresence(object):
|
|||
# TODO Hows this supposed to work?
|
||||
pass
|
||||
|
||||
get_states = PresenceHandler.get_states.__func__
|
||||
get_state = PresenceHandler.get_state.__func__
|
||||
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
||||
get_states = __func__(PresenceHandler.get_states)
|
||||
get_state = __func__(PresenceHandler.get_state)
|
||||
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
|
||||
|
||||
def user_syncing(self, user_id, affect_presence):
|
||||
if affect_presence:
|
||||
|
@ -208,7 +208,7 @@ class SynchrotronPresence(object):
|
|||
) for row in rows]
|
||||
|
||||
for state in states:
|
||||
self.user_to_current_state[row.user_id] = state
|
||||
self.user_to_current_state[state.user_id] = state
|
||||
|
||||
stream_id = token
|
||||
yield self.notify_from_replication(states, stream_id)
|
||||
|
|
|
@ -28,6 +28,7 @@ from synapse.metrics import (
|
|||
event_processing_loop_room_count,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util import log_failure
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
|
@ -36,17 +37,6 @@ logger = logging.getLogger(__name__)
|
|||
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
||||
|
||||
|
||||
def log_failure(failure):
|
||||
logger.error(
|
||||
"Application Services Failure",
|
||||
exc_info=(
|
||||
failure.type,
|
||||
failure.value,
|
||||
failure.getTracebackObject()
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class ApplicationServicesHandler(object):
|
||||
|
||||
def __init__(self, hs):
|
||||
|
@ -112,7 +102,10 @@ class ApplicationServicesHandler(object):
|
|||
|
||||
if not self.started_scheduler:
|
||||
def start_scheduler():
|
||||
return self.scheduler.start().addErrback(log_failure)
|
||||
return self.scheduler.start().addErrback(
|
||||
log_failure, "Application Services Failure",
|
||||
)
|
||||
|
||||
run_as_background_process("as_scheduler", start_scheduler)
|
||||
self.started_scheduler = True
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ import logging
|
|||
|
||||
from six import iteritems, itervalues
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
|
@ -36,6 +38,19 @@ from synapse.visibility import filter_events_for_client
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Counts the number of times we returned a non-empty sync. `type` is one of
|
||||
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
|
||||
# "true" or "false" depending on if the request asked for lazy loaded members or
|
||||
# not.
|
||||
non_empty_sync_counter = Counter(
|
||||
"synapse_handlers_sync_nonempty_total",
|
||||
"Count of non empty sync responses. type is initial_sync/full_state_sync"
|
||||
"/incremental_sync. lazy_loaded indicates if lazy loaded members were "
|
||||
"enabled for that request.",
|
||||
["type", "lazy_loaded"],
|
||||
)
|
||||
|
||||
# Store the cache that tracks which lazy-loaded members have been sent to a given
|
||||
# client for no more than 30 minutes.
|
||||
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
|
||||
|
@ -227,14 +242,16 @@ class SyncHandler(object):
|
|||
@defer.inlineCallbacks
|
||||
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
||||
full_state):
|
||||
if since_token is None:
|
||||
sync_type = "initial_sync"
|
||||
elif full_state:
|
||||
sync_type = "full_state_sync"
|
||||
else:
|
||||
sync_type = "incremental_sync"
|
||||
|
||||
context = LoggingContext.current_context()
|
||||
if context:
|
||||
if since_token is None:
|
||||
context.tag = "initial_sync"
|
||||
elif full_state:
|
||||
context.tag = "full_state_sync"
|
||||
else:
|
||||
context.tag = "incremental_sync"
|
||||
context.tag = sync_type
|
||||
|
||||
if timeout == 0 or since_token is None or full_state:
|
||||
# we are going to return immediately, so don't bother calling
|
||||
|
@ -242,7 +259,6 @@ class SyncHandler(object):
|
|||
result = yield self.current_sync_for_user(
|
||||
sync_config, since_token, full_state=full_state,
|
||||
)
|
||||
defer.returnValue(result)
|
||||
else:
|
||||
def current_sync_callback(before_token, after_token):
|
||||
return self.current_sync_for_user(sync_config, since_token)
|
||||
|
@ -251,7 +267,15 @@ class SyncHandler(object):
|
|||
sync_config.user.to_string(), timeout, current_sync_callback,
|
||||
from_token=since_token,
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
if result:
|
||||
if sync_config.filter_collection.lazy_load_members():
|
||||
lazy_loaded = "true"
|
||||
else:
|
||||
lazy_loaded = "false"
|
||||
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
def current_sync_for_user(self, sync_config, since_token=None,
|
||||
full_state=False):
|
||||
|
|
|
@ -58,7 +58,10 @@ REQUIREMENTS = {
|
|||
"msgpack-python>=0.3.0": ["msgpack"],
|
||||
"phonenumbers>=8.2.0": ["phonenumbers"],
|
||||
"six>=1.10": ["six"],
|
||||
"prometheus_client>=0.0.18": ["prometheus_client"],
|
||||
|
||||
# prometheus_client 0.4.0 changed the format of counter metrics
|
||||
# (cf https://github.com/matrix-org/synapse/issues/4001)
|
||||
"prometheus_client>=0.0.18,<0.4.0": ["prometheus_client"],
|
||||
|
||||
# we use attr.s(slots), which arrived in 16.0.0
|
||||
"attrs>=16.0.0": ["attr>=16.0.0"],
|
||||
|
@ -79,9 +82,6 @@ CONDITIONAL_REQUIREMENTS = {
|
|||
"psutil": {
|
||||
"psutil>=2.0.0": ["psutil>=2.0.0"],
|
||||
},
|
||||
"affinity": {
|
||||
"affinity": ["affinity"],
|
||||
},
|
||||
"postgres": {
|
||||
"psycopg2>=2.6": ["psycopg2"]
|
||||
}
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
|
||||
|
@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def __func__(inp):
|
||||
if six.PY3:
|
||||
return inp
|
||||
else:
|
||||
return inp.__func__
|
||||
|
||||
|
||||
class BaseSlavedStore(SQLBaseStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(BaseSlavedStore, self).__init__(db_conn, hs)
|
||||
|
|
|
@ -17,7 +17,7 @@ from synapse.storage import DataStore
|
|||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._base import BaseSlavedStore, __func__
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
||||
|
||||
|
@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
|
|||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
|
||||
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
|
||||
get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
|
||||
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
|
||||
delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
|
||||
get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token)
|
||||
get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device)
|
||||
get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote)
|
||||
delete_messages_for_device = __func__(DataStore.delete_messages_for_device)
|
||||
delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote)
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(SlavedDeviceInboxStore, self).stream_positions()
|
||||
|
|
|
@ -13,23 +13,14 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.end_to_end_keys import EndToEndKeyStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._base import BaseSlavedStore, __func__
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
||||
|
||||
def __func__(inp):
|
||||
if six.PY3:
|
||||
return inp
|
||||
else:
|
||||
return inp.__func__
|
||||
|
||||
|
||||
class SlavedDeviceStore(BaseSlavedStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(SlavedDeviceStore, self).__init__(db_conn, hs)
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
from synapse.storage import DataStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._base import BaseSlavedStore, __func__
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
||||
|
||||
|
@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore):
|
|||
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
|
||||
)
|
||||
|
||||
get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
|
||||
get_group_stream_token = DataStore.get_group_stream_token.__func__
|
||||
get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
|
||||
get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
|
||||
get_group_stream_token = __func__(DataStore.get_group_stream_token)
|
||||
get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(SlavedGroupServerStore, self).stream_positions()
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
from synapse.storage import DataStore
|
||||
from synapse.storage.keys import KeyStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._base import BaseSlavedStore, __func__
|
||||
|
||||
|
||||
class SlavedKeyStore(BaseSlavedStore):
|
||||
|
@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore):
|
|||
"_get_server_verify_key"
|
||||
]
|
||||
|
||||
get_server_verify_keys = DataStore.get_server_verify_keys.__func__
|
||||
store_server_verify_key = DataStore.store_server_verify_key.__func__
|
||||
get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
|
||||
store_server_verify_key = __func__(DataStore.store_server_verify_key)
|
||||
|
||||
get_server_certificate = DataStore.get_server_certificate.__func__
|
||||
store_server_certificate = DataStore.store_server_certificate.__func__
|
||||
get_server_certificate = __func__(DataStore.get_server_certificate)
|
||||
store_server_certificate = __func__(DataStore.store_server_certificate)
|
||||
|
||||
get_server_keys_json = DataStore.get_server_keys_json.__func__
|
||||
store_server_keys_json = DataStore.store_server_keys_json.__func__
|
||||
get_server_keys_json = __func__(DataStore.get_server_keys_json)
|
||||
store_server_keys_json = __func__(DataStore.store_server_keys_json)
|
||||
|
|
|
@ -17,7 +17,7 @@ from synapse.storage import DataStore
|
|||
from synapse.storage.presence import PresenceStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._base import BaseSlavedStore, __func__
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
||||
|
||||
|
@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore):
|
|||
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
|
||||
)
|
||||
|
||||
_get_active_presence = DataStore._get_active_presence.__func__
|
||||
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
|
||||
_get_active_presence = __func__(DataStore._get_active_presence)
|
||||
take_presence_startup_info = __func__(DataStore.take_presence_startup_info)
|
||||
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
||||
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore
|
|||
from synapse.storage.event_federation import EventFederationStore
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.util import batch_iter
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
|
@ -386,12 +387,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
)
|
||||
|
||||
for room_id, ev_ctx_rm in iteritems(events_by_room):
|
||||
# Work out new extremities by recursively adding and removing
|
||||
# the new events.
|
||||
latest_event_ids = yield self.get_latest_event_ids_in_room(
|
||||
room_id
|
||||
)
|
||||
new_latest_event_ids = yield self._calculate_new_extremeties(
|
||||
new_latest_event_ids = yield self._calculate_new_extremities(
|
||||
room_id, ev_ctx_rm, latest_event_ids
|
||||
)
|
||||
|
||||
|
@ -400,6 +399,12 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
# No change in extremities, so no change in state
|
||||
continue
|
||||
|
||||
# there should always be at least one forward extremity.
|
||||
# (except during the initial persistence of the send_join
|
||||
# results, in which case there will be no existing
|
||||
# extremities, so we'll `continue` above and skip this bit.)
|
||||
assert new_latest_event_ids, "No forward extremities left!"
|
||||
|
||||
new_forward_extremeties[room_id] = new_latest_event_ids
|
||||
|
||||
len_1 = (
|
||||
|
@ -517,44 +522,79 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
|
||||
"""Calculates the new forward extremeties for a room given events to
|
||||
def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
|
||||
"""Calculates the new forward extremities for a room given events to
|
||||
persist.
|
||||
|
||||
Assumes that we are only persisting events for one room at a time.
|
||||
"""
|
||||
new_latest_event_ids = set(latest_event_ids)
|
||||
# First, add all the new events to the list
|
||||
new_latest_event_ids.update(
|
||||
event.event_id for event, ctx in event_contexts
|
||||
|
||||
# we're only interested in new events which aren't outliers and which aren't
|
||||
# being rejected.
|
||||
new_events = [
|
||||
event for event, ctx in event_contexts
|
||||
if not event.internal_metadata.is_outlier() and not ctx.rejected
|
||||
]
|
||||
|
||||
# start with the existing forward extremities
|
||||
result = set(latest_event_ids)
|
||||
|
||||
# add all the new events to the list
|
||||
result.update(
|
||||
event.event_id for event in new_events
|
||||
)
|
||||
# Now remove all events that are referenced by the to-be-added events
|
||||
new_latest_event_ids.difference_update(
|
||||
|
||||
# Now remove all events which are prev_events of any of the new events
|
||||
result.difference_update(
|
||||
e_id
|
||||
for event, ctx in event_contexts
|
||||
for event in new_events
|
||||
for e_id, _ in event.prev_events
|
||||
if not event.internal_metadata.is_outlier() and not ctx.rejected
|
||||
)
|
||||
|
||||
# And finally remove any events that are referenced by previously added
|
||||
# events.
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="event_edges",
|
||||
column="prev_event_id",
|
||||
iterable=list(new_latest_event_ids),
|
||||
retcols=["prev_event_id"],
|
||||
keyvalues={
|
||||
"is_state": False,
|
||||
},
|
||||
desc="_calculate_new_extremeties",
|
||||
)
|
||||
# Finally, remove any events which are prev_events of any existing events.
|
||||
existing_prevs = yield self._get_events_which_are_prevs(result)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
new_latest_event_ids.difference_update(
|
||||
row["prev_event_id"] for row in rows
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
defer.returnValue(new_latest_event_ids)
|
||||
@defer.inlineCallbacks
|
||||
def _get_events_which_are_prevs(self, event_ids):
|
||||
"""Filter the supplied list of event_ids to get those which are prev_events of
|
||||
existing (non-outlier/rejected) events.
|
||||
|
||||
Args:
|
||||
event_ids (Iterable[str]): event ids to filter
|
||||
|
||||
Returns:
|
||||
Deferred[List[str]]: filtered event ids
|
||||
"""
|
||||
results = []
|
||||
|
||||
def _get_events(txn, batch):
|
||||
sql = """
|
||||
SELECT prev_event_id
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE
|
||||
prev_event_id IN (%s)
|
||||
AND NOT events.outlier
|
||||
AND rejections.event_id IS NULL
|
||||
""" % (
|
||||
",".join("?" for _ in batch),
|
||||
)
|
||||
|
||||
txn.execute(sql, batch)
|
||||
results.extend(r[0] for r in txn)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
yield self.runInteraction(
|
||||
"_get_events_which_are_prevs",
|
||||
_get_events,
|
||||
chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
|
||||
|
@ -586,10 +626,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
the new current state is only returned if we've already calculated
|
||||
it.
|
||||
"""
|
||||
|
||||
if not new_latest_event_ids:
|
||||
return
|
||||
|
||||
# map from state_group to ((type, key) -> event_id) state map
|
||||
state_groups_map = {}
|
||||
|
||||
|
|
|
@ -630,7 +630,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def get_all_new_events_stream(self, from_id, current_id, limit):
|
||||
"""Get all new events"""
|
||||
"""Get all new events
|
||||
|
||||
Returns all events with from_id < stream_ordering <= current_id.
|
||||
|
||||
Args:
|
||||
from_id (int): the stream_ordering of the last event we processed
|
||||
current_id (int): the stream_ordering of the most recently processed event
|
||||
limit (int): the maximum number of events to return
|
||||
|
||||
Returns:
|
||||
Deferred[Tuple[int, list[FrozenEvent]]]: A tuple of (next_id, events), where
|
||||
`next_id` is the next value to pass as `from_id` (it will either be the
|
||||
stream_ordering of the last returned event, or, if fewer than `limit` events
|
||||
were found, `current_id`.
|
||||
"""
|
||||
|
||||
def get_all_new_events_stream_txn(txn):
|
||||
sql = (
|
||||
|
|
|
@ -68,7 +68,10 @@ class Clock(object):
|
|||
"""
|
||||
call = task.LoopingCall(f)
|
||||
call.clock = self._reactor
|
||||
call.start(msec / 1000.0, now=False)
|
||||
d = call.start(msec / 1000.0, now=False)
|
||||
d.addErrback(
|
||||
log_failure, "Looping call died", consumeErrors=False,
|
||||
)
|
||||
return call
|
||||
|
||||
def call_later(self, delay, callback, *args, **kwargs):
|
||||
|
@ -109,3 +112,29 @@ def batch_iter(iterable, size):
|
|||
sourceiter = iter(iterable)
|
||||
# call islice until it returns an empty tuple
|
||||
return iter(lambda: tuple(islice(sourceiter, size)), ())
|
||||
|
||||
|
||||
def log_failure(failure, msg, consumeErrors=True):
|
||||
"""Creates a function suitable for passing to `Deferred.addErrback` that
|
||||
logs any failures that occur.
|
||||
|
||||
Args:
|
||||
msg (str): Message to log
|
||||
consumeErrors (bool): If true consumes the failure, otherwise passes
|
||||
on down the callback chain
|
||||
|
||||
Returns:
|
||||
func(Failure)
|
||||
"""
|
||||
|
||||
logger.error(
|
||||
msg,
|
||||
exc_info=(
|
||||
failure.type,
|
||||
failure.value,
|
||||
failure.getTracebackObject()
|
||||
)
|
||||
)
|
||||
|
||||
if not consumeErrors:
|
||||
return failure
|
||||
|
|
|
@ -324,14 +324,13 @@ def filter_events_for_server(store, server_name, events):
|
|||
# server's domain.
|
||||
#
|
||||
# event_to_state_ids contains lots of duplicates, so it turns out to be
|
||||
# cheaper to build a complete set of unique
|
||||
# ((type, state_key), event_id) tuples, and then filter out the ones we
|
||||
# don't want.
|
||||
# cheaper to build a complete event_id => (type, state_key) dict, and then
|
||||
# filter out the ones we don't want
|
||||
#
|
||||
state_key_to_event_id_set = {
|
||||
e
|
||||
event_id_to_state_key = {
|
||||
event_id: key
|
||||
for key_to_eid in itervalues(event_to_state_ids)
|
||||
for e in key_to_eid.items()
|
||||
for key, event_id in iteritems(key_to_eid)
|
||||
}
|
||||
|
||||
def include(typ, state_key):
|
||||
|
@ -346,7 +345,7 @@ def filter_events_for_server(store, server_name, events):
|
|||
|
||||
event_map = yield store.get_events([
|
||||
e_id
|
||||
for key, e_id in state_key_to_event_id_set
|
||||
for e_id, key in iteritems(event_id_to_state_key)
|
||||
if include(key[0], key[1])
|
||||
])
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue