mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-12-15 16:21:00 -05:00
Merge branch 'develop' into media_repository
This commit is contained in:
commit
61fc37e467
42 changed files with 6635 additions and 231 deletions
|
|
@ -16,4 +16,4 @@
|
|||
""" This is a reference implementation of a synapse home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.5.0"
|
||||
__version__ = "0.5.4"
|
||||
|
|
|
|||
|
|
@ -38,79 +38,66 @@ class Auth(object):
|
|||
self.store = hs.get_datastore()
|
||||
self.state = hs.get_state_handler()
|
||||
|
||||
def check(self, event, raises=False):
|
||||
def check(self, event, auth_events):
|
||||
""" Checks if this event is correctly authed.
|
||||
|
||||
Returns:
|
||||
True if the auth checks pass.
|
||||
Raises:
|
||||
AuthError if there was a problem authorising this event. This will
|
||||
be raised only if raises=True.
|
||||
"""
|
||||
try:
|
||||
if hasattr(event, "room_id"):
|
||||
if event.old_state_events is None:
|
||||
# Oh, we don't know what the state of the room was, so we
|
||||
# are trusting that this is allowed (at least for now)
|
||||
logger.warn("Trusting event: %s", event.event_id)
|
||||
return True
|
||||
|
||||
if hasattr(event, "outlier") and event.outlier is True:
|
||||
# TODO (erikj): Auth for outliers is done differently.
|
||||
return True
|
||||
|
||||
if event.type == RoomCreateEvent.TYPE:
|
||||
# FIXME
|
||||
return True
|
||||
|
||||
# FIXME: Temp hack
|
||||
if event.type == RoomAliasesEvent.TYPE:
|
||||
return True
|
||||
|
||||
if event.type == RoomMemberEvent.TYPE:
|
||||
allowed = self.is_membership_change_allowed(event)
|
||||
if allowed:
|
||||
logger.debug("Allowing! %s", event)
|
||||
else:
|
||||
logger.debug("Denying! %s", event)
|
||||
return allowed
|
||||
|
||||
self.check_event_sender_in_room(event)
|
||||
self._can_send_event(event)
|
||||
|
||||
if event.type == RoomPowerLevelsEvent.TYPE:
|
||||
self._check_power_levels(event)
|
||||
|
||||
if event.type == RoomRedactionEvent.TYPE:
|
||||
self._check_redaction(event)
|
||||
|
||||
logger.debug("Allowing! %s", event)
|
||||
if not hasattr(event, "room_id"):
|
||||
raise AuthError(500, "Event has no room_id: %s" % event)
|
||||
if auth_events is None:
|
||||
# Oh, we don't know what the state of the room was, so we
|
||||
# are trusting that this is allowed (at least for now)
|
||||
logger.warn("Trusting event: %s", event.event_id)
|
||||
return True
|
||||
else:
|
||||
raise AuthError(500, "Unknown event: %s" % event)
|
||||
|
||||
if event.type == RoomCreateEvent.TYPE:
|
||||
# FIXME
|
||||
return True
|
||||
|
||||
# FIXME: Temp hack
|
||||
if event.type == RoomAliasesEvent.TYPE:
|
||||
return True
|
||||
|
||||
if event.type == RoomMemberEvent.TYPE:
|
||||
allowed = self.is_membership_change_allowed(
|
||||
event, auth_events
|
||||
)
|
||||
if allowed:
|
||||
logger.debug("Allowing! %s", event)
|
||||
else:
|
||||
logger.debug("Denying! %s", event)
|
||||
return allowed
|
||||
|
||||
self.check_event_sender_in_room(event, auth_events)
|
||||
self._can_send_event(event, auth_events)
|
||||
|
||||
if event.type == RoomPowerLevelsEvent.TYPE:
|
||||
self._check_power_levels(event, auth_events)
|
||||
|
||||
if event.type == RoomRedactionEvent.TYPE:
|
||||
self._check_redaction(event, auth_events)
|
||||
|
||||
logger.debug("Allowing! %s", event)
|
||||
except AuthError as e:
|
||||
logger.info(
|
||||
"Event auth check failed on event %s with msg: %s",
|
||||
event, e.msg
|
||||
)
|
||||
logger.info("Denying! %s", event)
|
||||
if raises:
|
||||
raise
|
||||
|
||||
return False
|
||||
raise
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_joined_room(self, room_id, user_id):
|
||||
try:
|
||||
member = yield self.store.get_room_member(
|
||||
room_id=room_id,
|
||||
user_id=user_id
|
||||
)
|
||||
self._check_joined_room(member, user_id, room_id)
|
||||
defer.returnValue(member)
|
||||
except AttributeError:
|
||||
pass
|
||||
defer.returnValue(None)
|
||||
member = yield self.state.get_current_state(
|
||||
room_id=room_id,
|
||||
event_type=RoomMemberEvent.TYPE,
|
||||
state_key=user_id
|
||||
)
|
||||
self._check_joined_room(member, user_id, room_id)
|
||||
defer.returnValue(member)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_host_in_room(self, room_id, host):
|
||||
|
|
@ -130,9 +117,9 @@ class Auth(object):
|
|||
|
||||
defer.returnValue(False)
|
||||
|
||||
def check_event_sender_in_room(self, event):
|
||||
def check_event_sender_in_room(self, event, auth_events):
|
||||
key = (RoomMemberEvent.TYPE, event.user_id, )
|
||||
member_event = event.state_events.get(key)
|
||||
member_event = auth_events.get(key)
|
||||
|
||||
return self._check_joined_room(
|
||||
member_event,
|
||||
|
|
@ -147,14 +134,14 @@ class Auth(object):
|
|||
))
|
||||
|
||||
@log_function
|
||||
def is_membership_change_allowed(self, event):
|
||||
def is_membership_change_allowed(self, event, auth_events):
|
||||
membership = event.content["membership"]
|
||||
|
||||
# Check if this is the room creator joining:
|
||||
if len(event.prev_events) == 1 and Membership.JOIN == membership:
|
||||
# Get room creation event:
|
||||
key = (RoomCreateEvent.TYPE, "", )
|
||||
create = event.old_state_events.get(key)
|
||||
create = auth_events.get(key)
|
||||
if create and event.prev_events[0][0] == create.event_id:
|
||||
if create.content["creator"] == event.state_key:
|
||||
return True
|
||||
|
|
@ -163,19 +150,19 @@ class Auth(object):
|
|||
|
||||
# get info about the caller
|
||||
key = (RoomMemberEvent.TYPE, event.user_id, )
|
||||
caller = event.old_state_events.get(key)
|
||||
caller = auth_events.get(key)
|
||||
|
||||
caller_in_room = caller and caller.membership == Membership.JOIN
|
||||
caller_invited = caller and caller.membership == Membership.INVITE
|
||||
|
||||
# get info about the target
|
||||
key = (RoomMemberEvent.TYPE, target_user_id, )
|
||||
target = event.old_state_events.get(key)
|
||||
target = auth_events.get(key)
|
||||
|
||||
target_in_room = target and target.membership == Membership.JOIN
|
||||
|
||||
key = (RoomJoinRulesEvent.TYPE, "", )
|
||||
join_rule_event = event.old_state_events.get(key)
|
||||
join_rule_event = auth_events.get(key)
|
||||
if join_rule_event:
|
||||
join_rule = join_rule_event.content.get(
|
||||
"join_rule", JoinRules.INVITE
|
||||
|
|
@ -186,11 +173,13 @@ class Auth(object):
|
|||
user_level = self._get_power_level_from_event_state(
|
||||
event,
|
||||
event.user_id,
|
||||
auth_events,
|
||||
)
|
||||
|
||||
ban_level, kick_level, redact_level = (
|
||||
self._get_ops_level_from_event_state(
|
||||
event
|
||||
event,
|
||||
auth_events,
|
||||
)
|
||||
)
|
||||
|
||||
|
|
@ -213,7 +202,10 @@ class Auth(object):
|
|||
|
||||
# Invites are valid iff caller is in the room and target isn't.
|
||||
if not caller_in_room: # caller isn't joined
|
||||
raise AuthError(403, "You are not in room %s." % event.room_id)
|
||||
raise AuthError(
|
||||
403,
|
||||
"%s not in room %s." % (event.user_id, event.room_id,)
|
||||
)
|
||||
elif target_in_room: # the target is already in the room.
|
||||
raise AuthError(403, "%s is already in the room." %
|
||||
target_user_id)
|
||||
|
|
@ -236,7 +228,10 @@ class Auth(object):
|
|||
# TODO (erikj): Implement kicks.
|
||||
|
||||
if not caller_in_room: # trying to leave a room you aren't joined
|
||||
raise AuthError(403, "You are not in room %s." % event.room_id)
|
||||
raise AuthError(
|
||||
403,
|
||||
"%s not in room %s." % (target_user_id, event.room_id,)
|
||||
)
|
||||
elif target_user_id != event.user_id:
|
||||
if kick_level:
|
||||
kick_level = int(kick_level)
|
||||
|
|
@ -260,9 +255,9 @@ class Auth(object):
|
|||
|
||||
return True
|
||||
|
||||
def _get_power_level_from_event_state(self, event, user_id):
|
||||
def _get_power_level_from_event_state(self, event, user_id, auth_events):
|
||||
key = (RoomPowerLevelsEvent.TYPE, "", )
|
||||
power_level_event = event.old_state_events.get(key)
|
||||
power_level_event = auth_events.get(key)
|
||||
level = None
|
||||
if power_level_event:
|
||||
level = power_level_event.content.get("users", {}).get(user_id)
|
||||
|
|
@ -270,16 +265,16 @@ class Auth(object):
|
|||
level = power_level_event.content.get("users_default", 0)
|
||||
else:
|
||||
key = (RoomCreateEvent.TYPE, "", )
|
||||
create_event = event.old_state_events.get(key)
|
||||
create_event = auth_events.get(key)
|
||||
if (create_event is not None and
|
||||
create_event.content["creator"] == user_id):
|
||||
return 100
|
||||
|
||||
return level
|
||||
|
||||
def _get_ops_level_from_event_state(self, event):
|
||||
def _get_ops_level_from_event_state(self, event, auth_events):
|
||||
key = (RoomPowerLevelsEvent.TYPE, "", )
|
||||
power_level_event = event.old_state_events.get(key)
|
||||
power_level_event = auth_events.get(key)
|
||||
|
||||
if power_level_event:
|
||||
return (
|
||||
|
|
@ -375,6 +370,11 @@ class Auth(object):
|
|||
key = (RoomMemberEvent.TYPE, event.user_id, )
|
||||
member_event = event.old_state_events.get(key)
|
||||
|
||||
key = (RoomCreateEvent.TYPE, "", )
|
||||
create_event = event.old_state_events.get(key)
|
||||
if create_event:
|
||||
auth_events.append(create_event.event_id)
|
||||
|
||||
if join_rule_event:
|
||||
join_rule = join_rule_event.content.get("join_rule")
|
||||
is_public = join_rule == JoinRules.PUBLIC if join_rule else False
|
||||
|
|
@ -406,9 +406,9 @@ class Auth(object):
|
|||
event.auth_events = zip(auth_events, hashes)
|
||||
|
||||
@log_function
|
||||
def _can_send_event(self, event):
|
||||
def _can_send_event(self, event, auth_events):
|
||||
key = (RoomPowerLevelsEvent.TYPE, "", )
|
||||
send_level_event = event.old_state_events.get(key)
|
||||
send_level_event = auth_events.get(key)
|
||||
send_level = None
|
||||
if send_level_event:
|
||||
send_level = send_level_event.content.get("events", {}).get(
|
||||
|
|
@ -432,6 +432,7 @@ class Auth(object):
|
|||
user_level = self._get_power_level_from_event_state(
|
||||
event,
|
||||
event.user_id,
|
||||
auth_events,
|
||||
)
|
||||
|
||||
if user_level:
|
||||
|
|
@ -468,14 +469,16 @@ class Auth(object):
|
|||
|
||||
return True
|
||||
|
||||
def _check_redaction(self, event):
|
||||
def _check_redaction(self, event, auth_events):
|
||||
user_level = self._get_power_level_from_event_state(
|
||||
event,
|
||||
event.user_id,
|
||||
auth_events,
|
||||
)
|
||||
|
||||
_, _, redact_level = self._get_ops_level_from_event_state(
|
||||
event
|
||||
event,
|
||||
auth_events,
|
||||
)
|
||||
|
||||
if user_level < redact_level:
|
||||
|
|
@ -484,7 +487,7 @@ class Auth(object):
|
|||
"You don't have permission to redact events"
|
||||
)
|
||||
|
||||
def _check_power_levels(self, event):
|
||||
def _check_power_levels(self, event, auth_events):
|
||||
user_list = event.content.get("users", {})
|
||||
# Validate users
|
||||
for k, v in user_list.items():
|
||||
|
|
@ -499,7 +502,7 @@ class Auth(object):
|
|||
raise SynapseError(400, "Not a valid power level: %s" % (v,))
|
||||
|
||||
key = (event.type, event.state_key, )
|
||||
current_state = event.old_state_events.get(key)
|
||||
current_state = auth_events.get(key)
|
||||
|
||||
if not current_state:
|
||||
return
|
||||
|
|
@ -507,6 +510,7 @@ class Auth(object):
|
|||
user_level = self._get_power_level_from_event_state(
|
||||
event,
|
||||
event.user_id,
|
||||
auth_events,
|
||||
)
|
||||
|
||||
# Check other levels:
|
||||
|
|
|
|||
|
|
@ -125,6 +125,7 @@ class SynapseEvent(JsonEncodedObject):
|
|||
pdu_json.pop("outlier", None)
|
||||
pdu_json.pop("replaces_state", None)
|
||||
pdu_json.pop("redacted", None)
|
||||
pdu_json.pop("prev_content", None)
|
||||
state_hash = pdu_json.pop("state_hash", None)
|
||||
if state_hash is not None:
|
||||
pdu_json.setdefault("unsigned", {})["state_hash"] = state_hash
|
||||
|
|
|
|||
|
|
@ -52,12 +52,18 @@ class LoggingConfig(Config):
|
|||
if self.log_config is None:
|
||||
|
||||
level = logging.INFO
|
||||
level_for_storage = logging.INFO
|
||||
if self.verbosity:
|
||||
level = logging.DEBUG
|
||||
if self.verbosity > 1:
|
||||
level_for_storage = logging.DEBUG
|
||||
|
||||
# FIXME: we need a logging.WARN for a -q quiet option
|
||||
logger = logging.getLogger('')
|
||||
logger.setLevel(level)
|
||||
|
||||
logging.getLogger('synapse.storage').setLevel(level_for_storage)
|
||||
|
||||
formatter = logging.Formatter(log_format)
|
||||
if self.log_file:
|
||||
handler = logging.FileHandler(self.log_file)
|
||||
|
|
|
|||
|
|
@ -35,8 +35,11 @@ class ServerConfig(Config):
|
|||
if not args.content_addr:
|
||||
host = args.server_name
|
||||
if ':' not in host:
|
||||
host = "%s:%d" % (host, args.bind_port)
|
||||
args.content_addr = "https://%s" % (host,)
|
||||
host = "%s:%d" % (host, args.unsecure_port)
|
||||
else:
|
||||
host = host.split(':')[0]
|
||||
host = "%s:%d" % (host, args.unsecure_port)
|
||||
args.content_addr = "http://%s" % (host,)
|
||||
|
||||
self.content_addr = args.content_addr
|
||||
|
||||
|
|
|
|||
|
|
@ -281,6 +281,22 @@ class ReplicationLayer(object):
|
|||
|
||||
defer.returnValue(pdus)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def get_event_auth(self, destination, context, event_id):
|
||||
res = yield self.transport_layer.get_event_auth(
|
||||
destination, context, event_id,
|
||||
)
|
||||
|
||||
auth_chain = [
|
||||
self.event_from_pdu_json(p, outlier=True)
|
||||
for p in res["auth_chain"]
|
||||
]
|
||||
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
defer.returnValue(auth_chain)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_backfill_request(self, origin, context, versions, limit):
|
||||
|
|
@ -481,11 +497,17 @@ class ReplicationLayer(object):
|
|||
# FIXME: We probably want to do something with the auth_chain given
|
||||
# to us
|
||||
|
||||
# auth_chain = [
|
||||
# Pdu(outlier=True, **p) for p in content.get("auth_chain", [])
|
||||
# ]
|
||||
auth_chain = [
|
||||
self.event_from_pdu_json(p, outlier=True)
|
||||
for p in content.get("auth_chain", [])
|
||||
]
|
||||
|
||||
defer.returnValue(state)
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
defer.returnValue({
|
||||
"state": state,
|
||||
"auth_chain": auth_chain,
|
||||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_invite(self, destination, context, event_id, pdu):
|
||||
|
|
@ -543,20 +565,34 @@ class ReplicationLayer(object):
|
|||
state = None
|
||||
|
||||
# We need to make sure we have all the auth events.
|
||||
for e_id, _ in pdu.auth_events:
|
||||
exists = yield self._get_persisted_pdu(
|
||||
origin,
|
||||
e_id,
|
||||
do_auth=False
|
||||
)
|
||||
|
||||
if not exists:
|
||||
yield self.get_pdu(
|
||||
origin,
|
||||
event_id=e_id,
|
||||
outlier=True,
|
||||
)
|
||||
logger.debug("Processed pdu %s", e_id)
|
||||
# for e_id, _ in pdu.auth_events:
|
||||
# exists = yield self._get_persisted_pdu(
|
||||
# origin,
|
||||
# e_id,
|
||||
# do_auth=False
|
||||
# )
|
||||
#
|
||||
# if not exists:
|
||||
# try:
|
||||
# logger.debug(
|
||||
# "_handle_new_pdu fetch missing auth event %s from %s",
|
||||
# e_id,
|
||||
# origin,
|
||||
# )
|
||||
#
|
||||
# yield self.get_pdu(
|
||||
# origin,
|
||||
# event_id=e_id,
|
||||
# outlier=True,
|
||||
# )
|
||||
#
|
||||
# logger.debug("Processed pdu %s", e_id)
|
||||
# except:
|
||||
# logger.warn(
|
||||
# "Failed to get auth event %s from %s",
|
||||
# e_id,
|
||||
# origin
|
||||
# )
|
||||
|
||||
# Get missing pdus if necessary.
|
||||
if not pdu.outlier:
|
||||
|
|
@ -565,6 +601,11 @@ class ReplicationLayer(object):
|
|||
pdu.room_id
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"_handle_new_pdu min_depth for %s: %d",
|
||||
pdu.room_id, min_depth
|
||||
)
|
||||
|
||||
if min_depth and pdu.depth > min_depth:
|
||||
for event_id, hashes in pdu.prev_events:
|
||||
exists = yield self._get_persisted_pdu(
|
||||
|
|
@ -574,11 +615,14 @@ class ReplicationLayer(object):
|
|||
)
|
||||
|
||||
if not exists:
|
||||
logger.debug("Requesting pdu %s", event_id)
|
||||
logger.debug(
|
||||
"_handle_new_pdu requesting pdu %s",
|
||||
event_id
|
||||
)
|
||||
|
||||
try:
|
||||
yield self.get_pdu(
|
||||
pdu.origin,
|
||||
origin,
|
||||
event_id=event_id,
|
||||
)
|
||||
logger.debug("Processed pdu %s", event_id)
|
||||
|
|
@ -588,12 +632,17 @@ class ReplicationLayer(object):
|
|||
else:
|
||||
# We need to get the state at this event, since we have reached
|
||||
# a backward extremity edge.
|
||||
logger.debug(
|
||||
"_handle_new_pdu getting state for %s",
|
||||
pdu.room_id
|
||||
)
|
||||
state = yield self.get_state_for_context(
|
||||
origin, pdu.room_id, pdu.event_id,
|
||||
)
|
||||
|
||||
if not backfilled:
|
||||
ret = yield self.handler.on_receive_pdu(
|
||||
origin,
|
||||
pdu,
|
||||
backfilled=backfilled,
|
||||
state=state,
|
||||
|
|
@ -804,7 +853,10 @@ class _TransactionQueue(object):
|
|||
|
||||
# Ensures we don't continue until all callbacks on that
|
||||
# deferred have fired
|
||||
yield deferred
|
||||
try:
|
||||
yield deferred
|
||||
except:
|
||||
pass
|
||||
|
||||
logger.debug("TX [%s] Yielded to callbacks", destination)
|
||||
|
||||
|
|
@ -816,7 +868,8 @@ class _TransactionQueue(object):
|
|||
logger.exception(e)
|
||||
|
||||
for deferred in deferreds:
|
||||
deferred.errback(e)
|
||||
if not deferred.called:
|
||||
deferred.errback(e)
|
||||
|
||||
finally:
|
||||
# We want to be *very* sure we delete this after we stop processing
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ class BaseHandler(object):
|
|||
|
||||
if not suppress_auth:
|
||||
logger.debug("Authing...")
|
||||
self.auth.check(event, raises=True)
|
||||
self.auth.check(event, auth_events=event.old_state_events)
|
||||
logger.debug("Authed")
|
||||
else:
|
||||
logger.debug("Suppressed auth.")
|
||||
|
|
|
|||
|
|
@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
|
|||
if auth_user not in self._streams_per_user:
|
||||
self._streams_per_user[auth_user] = 0
|
||||
if auth_user in self._stop_timer_per_user:
|
||||
self.clock.cancel_call_later(
|
||||
self._stop_timer_per_user.pop(auth_user))
|
||||
try:
|
||||
self.clock.cancel_call_later(
|
||||
self._stop_timer_per_user.pop(auth_user)
|
||||
)
|
||||
except:
|
||||
logger.exception("Failed to cancel event timer")
|
||||
else:
|
||||
yield self.distributor.fire(
|
||||
"started_user_eventstream", auth_user
|
||||
|
|
@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
|
|||
logger.debug(
|
||||
"_later stopped_user_eventstream %s", auth_user
|
||||
)
|
||||
|
||||
self._stop_timer_per_user.pop(auth_user, None)
|
||||
|
||||
yield self.distributor.fire(
|
||||
"stopped_user_eventstream", auth_user
|
||||
)
|
||||
del self._stop_timer_per_user[auth_user]
|
||||
|
||||
logger.debug("Scheduling _later: for %s", auth_user)
|
||||
self._stop_timer_per_user[auth_user] = (
|
||||
|
|
|
|||
|
|
@ -18,13 +18,16 @@
|
|||
from ._base import BaseHandler
|
||||
|
||||
from synapse.api.events.utils import prune_event
|
||||
from synapse.api.errors import AuthError, FederationError, SynapseError
|
||||
from synapse.api.events.room import RoomMemberEvent
|
||||
from synapse.api.errors import (
|
||||
AuthError, FederationError, SynapseError, StoreError,
|
||||
)
|
||||
from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.crypto.event_signing import (
|
||||
compute_event_signature, check_event_content_hash
|
||||
compute_event_signature, check_event_content_hash,
|
||||
add_hashes_and_signatures,
|
||||
)
|
||||
from syutil.jsonutil import encode_canonical_json
|
||||
|
||||
|
|
@ -98,7 +101,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def on_receive_pdu(self, pdu, backfilled, state=None):
|
||||
def on_receive_pdu(self, origin, pdu, backfilled, state=None):
|
||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||
do auth checks and put it through the StateHandler.
|
||||
"""
|
||||
|
|
@ -109,7 +112,7 @@ class FederationHandler(BaseHandler):
|
|||
# If we are currently in the process of joining this room, then we
|
||||
# queue up events for later processing.
|
||||
if event.room_id in self.room_queues:
|
||||
self.room_queues[event.room_id].append(pdu)
|
||||
self.room_queues[event.room_id].append((pdu, origin))
|
||||
return
|
||||
|
||||
logger.debug("Processing event: %s", event.event_id)
|
||||
|
|
@ -141,15 +144,62 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
event = redacted_event
|
||||
|
||||
is_new_state = yield self.state_handler.annotate_event_with_state(
|
||||
event,
|
||||
old_state=state
|
||||
)
|
||||
|
||||
logger.debug("Event: %s", event)
|
||||
|
||||
# FIXME (erikj): Awful hack to make the case where we are not currently
|
||||
# in the room work
|
||||
current_state = None
|
||||
is_in_room = yield self.auth.check_host_in_room(
|
||||
event.room_id,
|
||||
self.server_name
|
||||
)
|
||||
if not is_in_room and not event.outlier:
|
||||
logger.debug("Got event for room we're not in.")
|
||||
|
||||
replication_layer = self.replication_layer
|
||||
auth_chain = yield replication_layer.get_event_auth(
|
||||
origin,
|
||||
context=event.room_id,
|
||||
event_id=event.event_id,
|
||||
)
|
||||
|
||||
for e in auth_chain:
|
||||
e.outlier = True
|
||||
try:
|
||||
yield self._handle_new_event(e, fetch_missing=False)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to parse auth event %s",
|
||||
e.event_id,
|
||||
)
|
||||
|
||||
if not state:
|
||||
state = yield replication_layer.get_state_for_context(
|
||||
origin,
|
||||
context=event.room_id,
|
||||
event_id=event.event_id,
|
||||
)
|
||||
|
||||
current_state = state
|
||||
|
||||
if state:
|
||||
for e in state:
|
||||
e.outlier = True
|
||||
try:
|
||||
yield self._handle_new_event(e)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to parse state event %s",
|
||||
e.event_id,
|
||||
)
|
||||
|
||||
try:
|
||||
self.auth.check(event, raises=True)
|
||||
yield self._handle_new_event(
|
||||
event,
|
||||
state=state,
|
||||
backfilled=backfilled,
|
||||
current_state=current_state,
|
||||
)
|
||||
except AuthError as e:
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
|
|
@ -158,43 +208,17 @@ class FederationHandler(BaseHandler):
|
|||
affected=event.event_id,
|
||||
)
|
||||
|
||||
is_new_state = is_new_state and not backfilled
|
||||
|
||||
# TODO: Implement something in federation that allows us to
|
||||
# respond to PDU.
|
||||
|
||||
yield self.store.persist_event(
|
||||
event,
|
||||
backfilled,
|
||||
is_new_state=is_new_state
|
||||
)
|
||||
|
||||
room = yield self.store.get_room(event.room_id)
|
||||
|
||||
if not room:
|
||||
# Huh, let's try and get the current state
|
||||
try:
|
||||
yield self.replication_layer.get_state_for_context(
|
||||
event.origin, event.room_id, event.event_id,
|
||||
)
|
||||
|
||||
hosts = yield self.store.get_joined_hosts_for_room(
|
||||
event.room_id
|
||||
)
|
||||
if self.hs.hostname in hosts:
|
||||
try:
|
||||
yield self.store.store_room(
|
||||
room_id=event.room_id,
|
||||
room_creator_user_id="",
|
||||
is_public=False,
|
||||
)
|
||||
except:
|
||||
pass
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to get current state for room %s",
|
||||
event.room_id
|
||||
yield self.store.store_room(
|
||||
room_id=event.room_id,
|
||||
room_creator_user_id="",
|
||||
is_public=False,
|
||||
)
|
||||
except StoreError:
|
||||
logger.exception("Failed to store room.")
|
||||
|
||||
if not backfilled:
|
||||
extra_users = []
|
||||
|
|
@ -255,11 +279,23 @@ class FederationHandler(BaseHandler):
|
|||
pdu=event
|
||||
)
|
||||
|
||||
|
||||
|
||||
defer.returnValue(pdu)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_event_auth(self, event_id):
|
||||
auth = yield self.store.get_auth_chain(event_id)
|
||||
|
||||
for event in auth:
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
defer.returnValue([e for e in auth])
|
||||
|
||||
@log_function
|
||||
|
|
@ -276,6 +312,8 @@ class FederationHandler(BaseHandler):
|
|||
We suspend processing of any received events from this room until we
|
||||
have finished processing the join.
|
||||
"""
|
||||
logger.debug("Joining %s to %s", joinee, room_id)
|
||||
|
||||
pdu = yield self.replication_layer.make_join(
|
||||
target_host,
|
||||
room_id,
|
||||
|
|
@ -298,19 +336,29 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
try:
|
||||
event.event_id = self.event_factory.create_event_id()
|
||||
event.origin = self.hs.hostname
|
||||
event.content = content
|
||||
|
||||
state = yield self.replication_layer.send_join(
|
||||
if not hasattr(event, "signatures"):
|
||||
event.signatures = {}
|
||||
|
||||
add_hashes_and_signatures(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0],
|
||||
)
|
||||
|
||||
ret = yield self.replication_layer.send_join(
|
||||
target_host,
|
||||
event
|
||||
)
|
||||
|
||||
logger.debug("do_invite_join state: %s", state)
|
||||
state = ret["state"]
|
||||
auth_chain = ret["auth_chain"]
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
yield self.state_handler.annotate_event_with_state(
|
||||
event,
|
||||
old_state=state
|
||||
)
|
||||
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
||||
logger.debug("do_invite_join state: %s", state)
|
||||
|
||||
logger.debug("do_invite_join event: %s", event)
|
||||
|
||||
|
|
@ -324,34 +372,50 @@ class FederationHandler(BaseHandler):
|
|||
# FIXME
|
||||
pass
|
||||
|
||||
for e in auth_chain:
|
||||
e.outlier = True
|
||||
try:
|
||||
yield self._handle_new_event(e, fetch_missing=False)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to parse auth event %s",
|
||||
e.event_id,
|
||||
)
|
||||
|
||||
for e in state:
|
||||
# FIXME: Auth these.
|
||||
e.outlier = True
|
||||
try:
|
||||
yield self._handle_new_event(
|
||||
e,
|
||||
fetch_missing=True
|
||||
)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to parse state event %s",
|
||||
e.event_id,
|
||||
)
|
||||
|
||||
yield self.state_handler.annotate_event_with_state(
|
||||
e,
|
||||
)
|
||||
|
||||
yield self.store.persist_event(
|
||||
e,
|
||||
backfilled=False,
|
||||
is_new_state=True
|
||||
)
|
||||
|
||||
yield self.store.persist_event(
|
||||
yield self._handle_new_event(
|
||||
event,
|
||||
backfilled=False,
|
||||
is_new_state=True
|
||||
state=state,
|
||||
current_state=state,
|
||||
)
|
||||
|
||||
yield self.notifier.on_new_room_event(
|
||||
event, extra_users=[joinee]
|
||||
)
|
||||
|
||||
logger.debug("Finished joining %s to %s", joinee, room_id)
|
||||
finally:
|
||||
room_queue = self.room_queues[room_id]
|
||||
del self.room_queues[room_id]
|
||||
|
||||
for p in room_queue:
|
||||
for p, origin in room_queue:
|
||||
try:
|
||||
yield self.on_receive_pdu(p, backfilled=False)
|
||||
self.on_receive_pdu(origin, p, backfilled=False)
|
||||
except:
|
||||
pass
|
||||
logger.exception("Couldn't handle pdu")
|
||||
|
||||
defer.returnValue(True)
|
||||
|
||||
|
|
@ -375,7 +439,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
yield self.state_handler.annotate_event_with_state(event)
|
||||
yield self.auth.add_auth_events(event)
|
||||
self.auth.check(event, raises=True)
|
||||
self.auth.check(event, auth_events=event.old_state_events)
|
||||
|
||||
pdu = event
|
||||
|
||||
|
|
@ -391,17 +455,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
event.outlier = False
|
||||
|
||||
state_handler = self.state_handler
|
||||
is_new_state = yield state_handler.annotate_event_with_state(event)
|
||||
self.auth.check(event, raises=True)
|
||||
|
||||
# FIXME (erikj): All this is duplicated above :(
|
||||
|
||||
yield self.store.persist_event(
|
||||
event,
|
||||
backfilled=False,
|
||||
is_new_state=is_new_state
|
||||
)
|
||||
yield self._handle_new_event(event)
|
||||
|
||||
extra_users = []
|
||||
if event.type == RoomMemberEvent.TYPE:
|
||||
|
|
@ -414,7 +468,7 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
|
||||
if event.type == RoomMemberEvent.TYPE:
|
||||
if event.membership == Membership.JOIN:
|
||||
if event.content["membership"] == Membership.JOIN:
|
||||
user = self.hs.parse_userid(event.state_key)
|
||||
yield self.distributor.fire(
|
||||
"user_joined_room", user=user, room_id=event.room_id
|
||||
|
|
@ -508,7 +562,17 @@ class FederationHandler(BaseHandler):
|
|||
else:
|
||||
del results[(event.type, event.state_key)]
|
||||
|
||||
defer.returnValue(results.values())
|
||||
res = results.values()
|
||||
for event in res:
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
defer.returnValue(res)
|
||||
else:
|
||||
defer.returnValue([])
|
||||
|
||||
|
|
@ -541,6 +605,17 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
|
||||
if event:
|
||||
# FIXME: This is a temporary work around where we occasionally
|
||||
# return events slightly differently than when they were
|
||||
# originally signed
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
if do_auth:
|
||||
in_room = yield self.auth.check_host_in_room(
|
||||
event.room_id,
|
||||
|
|
@ -565,3 +640,78 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
while waiters:
|
||||
waiters.pop().callback(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_new_event(self, event, state=None, backfilled=False,
|
||||
current_state=None, fetch_missing=True):
|
||||
is_new_state = yield self.state_handler.annotate_event_with_state(
|
||||
event,
|
||||
old_state=state
|
||||
)
|
||||
|
||||
if event.old_state_events:
|
||||
known_ids = set(
|
||||
[s.event_id for s in event.old_state_events.values()]
|
||||
)
|
||||
for e_id, _ in event.auth_events:
|
||||
if e_id not in known_ids:
|
||||
e = yield self.store.get_event(
|
||||
e_id,
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if not e:
|
||||
# TODO: Do some conflict res to make sure that we're
|
||||
# not the ones who are wrong.
|
||||
logger.info(
|
||||
"Rejecting %s as %s not in %s",
|
||||
event.event_id, e_id, known_ids,
|
||||
)
|
||||
raise AuthError(403, "Auth events are stale")
|
||||
|
||||
auth_events = event.old_state_events
|
||||
else:
|
||||
# We need to get the auth events from somewhere.
|
||||
|
||||
# TODO: Don't just hit the DBs?
|
||||
|
||||
auth_events = {}
|
||||
for e_id, _ in event.auth_events:
|
||||
e = yield self.store.get_event(
|
||||
e_id,
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if not e:
|
||||
e = yield self.replication_layer.get_pdu(
|
||||
event.origin, e_id, outlier=True
|
||||
)
|
||||
|
||||
if e and fetch_missing:
|
||||
try:
|
||||
yield self.on_receive_pdu(event.origin, e, False)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to parse auth event %s",
|
||||
e_id,
|
||||
)
|
||||
|
||||
if not e:
|
||||
logger.warn("Can't find auth event %s.", e_id)
|
||||
|
||||
auth_events[(e.type, e.state_key)] = e
|
||||
|
||||
if event.type == RoomMemberEvent.TYPE and not event.auth_events:
|
||||
if len(event.prev_events) == 1:
|
||||
c = yield self.store.get_event(event.prev_events[0][0])
|
||||
if c.type == RoomCreateEvent.TYPE:
|
||||
auth_events[(c.type, c.state_key)] = c
|
||||
|
||||
self.auth.check(event, auth_events=auth_events)
|
||||
|
||||
yield self.store.persist_event(
|
||||
event,
|
||||
backfilled=backfilled,
|
||||
is_new_state=(is_new_state and not backfilled),
|
||||
current_state=current_state,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -306,7 +306,7 @@ class MessageHandler(BaseHandler):
|
|||
auth_user = self.hs.parse_userid(user_id)
|
||||
|
||||
# TODO: These concurrently
|
||||
state_tuples = yield self.store.get_current_state(room_id)
|
||||
state_tuples = yield self.state_handler.get_current_state(room_id)
|
||||
state = [self.hs.serialize_event(x) for x in state_tuples]
|
||||
|
||||
member_event = (yield self.store.get_room_member(
|
||||
|
|
|
|||
|
|
@ -651,12 +651,13 @@ class PresenceHandler(BaseHandler):
|
|||
logger.debug("Incoming presence update from %s", user)
|
||||
|
||||
observers = set(self._remote_recvmap.get(user, set()))
|
||||
if observers:
|
||||
logger.debug(" | %d interested local observers %r", len(observers), observers)
|
||||
|
||||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
||||
room_ids = yield rm_handler.get_rooms_for_user(user)
|
||||
|
||||
if not observers and not room_ids:
|
||||
continue
|
||||
if room_ids:
|
||||
logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
|
||||
|
||||
state = dict(push)
|
||||
del state["user_id"]
|
||||
|
|
@ -678,6 +679,10 @@ class PresenceHandler(BaseHandler):
|
|||
self._user_cachemap_latest_serial += 1
|
||||
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
||||
|
||||
if not observers and not room_ids:
|
||||
logger.debug(" | no interested observers or room IDs")
|
||||
continue
|
||||
|
||||
self.push_update_to_clients(
|
||||
observed_user=user,
|
||||
users_to_push=observers,
|
||||
|
|
@ -799,6 +804,7 @@ class PresenceEventSource(object):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def get_new_events_for_user(self, user, from_key, limit):
|
||||
from_key = int(from_key)
|
||||
|
||||
|
|
@ -811,7 +817,8 @@ class PresenceEventSource(object):
|
|||
# TODO(paul): use a DeferredList ? How to limit concurrency.
|
||||
for observed_user in cachemap.keys():
|
||||
cached = cachemap[observed_user]
|
||||
if not (from_key < cached.serial):
|
||||
|
||||
if cached.serial <= from_key:
|
||||
continue
|
||||
|
||||
if (yield self.is_visible(observer_user, observed_user)):
|
||||
|
|
|
|||
|
|
@ -519,7 +519,11 @@ class RoomMemberHandler(BaseHandler):
|
|||
user_id=user.to_string(), membership_list=membership_list
|
||||
)
|
||||
|
||||
defer.returnValue([r.room_id for r in rooms])
|
||||
# For some reason the list of events contains duplicates
|
||||
# TODO(paul): work out why because I really don't think it should
|
||||
room_ids = set(r.room_id for r in rooms)
|
||||
|
||||
defer.returnValue(room_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_local_membership_update(self, event, membership, snapshot,
|
||||
|
|
|
|||
|
|
@ -148,7 +148,7 @@ class RoomStateEventRestServlet(RestServlet):
|
|||
content = _parse_json(request)
|
||||
|
||||
event = self.event_factory.create_event(
|
||||
etype=urllib.unquote(event_type),
|
||||
etype=event_type, # already urldecoded
|
||||
content=content,
|
||||
room_id=urllib.unquote(room_id),
|
||||
user_id=user.to_string(),
|
||||
|
|
@ -327,7 +327,9 @@ class RoomMessageListRestServlet(RestServlet):
|
|||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id):
|
||||
user = yield self.auth.get_user_by_req(request)
|
||||
pagination_config = PaginationConfig.from_request(request)
|
||||
pagination_config = PaginationConfig.from_request(request,
|
||||
default_limit=10,
|
||||
)
|
||||
with_feedback = "feedback" in request.args
|
||||
handler = self.handlers.message_handler
|
||||
msgs = yield handler.get_messages(
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ class StateHandler(object):
|
|||
if hasattr(event, "outlier") and event.outlier:
|
||||
event.state_group = None
|
||||
event.old_state_events = None
|
||||
event.state_events = {}
|
||||
event.state_events = None
|
||||
defer.returnValue(False)
|
||||
return
|
||||
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ SCHEMAS = [
|
|||
|
||||
# Remember to update this number every time an incompatible change is made to
|
||||
# database schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 7
|
||||
SCHEMA_VERSION = 8
|
||||
|
||||
|
||||
class _RollbackButIsFineException(Exception):
|
||||
|
|
@ -97,7 +97,8 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def persist_event(self, event, backfilled=False, is_new_state=True):
|
||||
def persist_event(self, event, backfilled=False, is_new_state=True,
|
||||
current_state=None):
|
||||
stream_ordering = None
|
||||
if backfilled:
|
||||
if not self.min_token_deferred.called:
|
||||
|
|
@ -113,6 +114,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
backfilled=backfilled,
|
||||
stream_ordering=stream_ordering,
|
||||
is_new_state=is_new_state,
|
||||
current_state=current_state,
|
||||
)
|
||||
except _RollbackButIsFineException:
|
||||
pass
|
||||
|
|
@ -141,7 +143,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
|
||||
@log_function
|
||||
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
|
||||
is_new_state=True):
|
||||
is_new_state=True, current_state=None):
|
||||
if event.type == RoomMemberEvent.TYPE:
|
||||
self._store_room_member_txn(txn, event)
|
||||
elif event.type == FeedbackEvent.TYPE:
|
||||
|
|
@ -210,8 +212,27 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
|
||||
self._store_state_groups_txn(txn, event)
|
||||
|
||||
if current_state:
|
||||
txn.execute(
|
||||
"DELETE FROM current_state_events WHERE room_id = ?",
|
||||
(event.room_id,)
|
||||
)
|
||||
|
||||
for s in current_state:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"current_state_events",
|
||||
{
|
||||
"event_id": s.event_id,
|
||||
"room_id": s.room_id,
|
||||
"type": s.type,
|
||||
"state_key": s.state_key,
|
||||
},
|
||||
or_replace=True,
|
||||
)
|
||||
|
||||
is_state = hasattr(event, "state_key") and event.state_key is not None
|
||||
if is_new_state and is_state:
|
||||
if is_state:
|
||||
vals = {
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
|
|
@ -229,17 +250,18 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
or_replace=True,
|
||||
)
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"current_state_events",
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"type": event.type,
|
||||
"state_key": event.state_key,
|
||||
},
|
||||
or_replace=True,
|
||||
)
|
||||
if is_new_state:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"current_state_events",
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"type": event.type,
|
||||
"state_key": event.state_key,
|
||||
},
|
||||
or_replace=True,
|
||||
)
|
||||
|
||||
for e_id, h in event.prev_state:
|
||||
self._simple_insert_txn(
|
||||
|
|
@ -316,7 +338,12 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
txn, event.event_id, ref_alg, ref_hash_bytes
|
||||
)
|
||||
|
||||
self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
|
||||
if not outlier:
|
||||
self._update_min_depth_for_room_txn(
|
||||
txn,
|
||||
event.room_id,
|
||||
event.depth
|
||||
)
|
||||
|
||||
def _store_redaction(self, txn, event):
|
||||
txn.execute(
|
||||
|
|
|
|||
|
|
@ -218,7 +218,9 @@ class RoomMemberStore(SQLBaseStore):
|
|||
"ON m.event_id = c.event_id "
|
||||
"WHERE m.membership = 'join' "
|
||||
"AND (%(clause)s) "
|
||||
"GROUP BY m.room_id HAVING COUNT(m.room_id) = ?"
|
||||
# TODO(paul): We've got duplicate rows in the database somewhere
|
||||
# so we have to DISTINCT m.user_id here
|
||||
"GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?"
|
||||
) % {"clause": user_list_clause}
|
||||
|
||||
args = list(user_id_list)
|
||||
|
|
|
|||
34
synapse/storage/schema/delta/v8.sql
Normal file
34
synapse/storage/schema/delta/v8.sql
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
/* Copyright 2014 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE TABLE IF NOT EXISTS event_signatures_2 (
|
||||
event_id TEXT,
|
||||
signature_name TEXT,
|
||||
key_id TEXT,
|
||||
signature BLOB,
|
||||
CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
|
||||
);
|
||||
|
||||
INSERT INTO event_signatures_2 (event_id, signature_name, key_id, signature)
|
||||
SELECT event_id, signature_name, key_id, signature FROM event_signatures;
|
||||
|
||||
DROP TABLE event_signatures;
|
||||
ALTER TABLE event_signatures_2 RENAME TO event_signatures;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
|
||||
event_id
|
||||
);
|
||||
|
||||
PRAGMA user_version = 8;
|
||||
|
|
@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS event_signatures (
|
|||
signature_name TEXT,
|
||||
key_id TEXT,
|
||||
signature BLOB,
|
||||
CONSTRAINT uniqueness UNIQUE (event_id, key_id)
|
||||
CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ class StateStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
def _store_state_groups_txn(self, txn, event):
|
||||
if not event.state_events:
|
||||
if event.state_events is None:
|
||||
return
|
||||
|
||||
state_group = event.state_group
|
||||
|
|
|
|||
|
|
@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
|
|||
|
||||
sql = (
|
||||
"SELECT *, (%(redacted)s) AS redacted FROM events "
|
||||
"WHERE room_id = ? AND stream_ordering <= ? "
|
||||
"WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
|
||||
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
|
||||
) % {
|
||||
"redacted": del_sql,
|
||||
|
|
|
|||
|
|
@ -47,7 +47,8 @@ class PaginationConfig(object):
|
|||
self.limit = int(limit) if limit is not None else None
|
||||
|
||||
@classmethod
|
||||
def from_request(cls, request, raise_invalid_params=True):
|
||||
def from_request(cls, request, raise_invalid_params=True,
|
||||
default_limit=None):
|
||||
def get_param(name, default=None):
|
||||
lst = request.args.get(name, [])
|
||||
if len(lst) > 1:
|
||||
|
|
@ -84,6 +85,9 @@ class PaginationConfig(object):
|
|||
if limit is not None and not limit.isdigit():
|
||||
raise SynapseError(400, "'limit' parameter must be an integer.")
|
||||
|
||||
if limit is None:
|
||||
limit = default_limit
|
||||
|
||||
try:
|
||||
return PaginationConfig(from_tok, to_tok, direction, limit)
|
||||
except:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue