mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.14.0
This commit is contained in:
commit
7a3815b372
21
README.rst
21
README.rst
@ -525,7 +525,6 @@ Logging In To An Existing Account
|
|||||||
Just enter the ``@localpart:my.domain.here`` Matrix user ID and password into
|
Just enter the ``@localpart:my.domain.here`` Matrix user ID and password into
|
||||||
the form and click the Login button.
|
the form and click the Login button.
|
||||||
|
|
||||||
|
|
||||||
Identity Servers
|
Identity Servers
|
||||||
================
|
================
|
||||||
|
|
||||||
@ -545,6 +544,26 @@ as the primary means of identity and E2E encryption is not complete. As such,
|
|||||||
we are running a single identity server (https://matrix.org) at the current
|
we are running a single identity server (https://matrix.org) at the current
|
||||||
time.
|
time.
|
||||||
|
|
||||||
|
Password reset
|
||||||
|
==============
|
||||||
|
|
||||||
|
If a user has registered an email address to their account using an identity
|
||||||
|
server, they can request a password-reset token via clients such as Vector.
|
||||||
|
|
||||||
|
A manual password reset can be done via direct database access as follows.
|
||||||
|
|
||||||
|
First calculate the hash of the new password:
|
||||||
|
|
||||||
|
$ source ~/.synapse/bin/activate
|
||||||
|
$ ./scripts/hash_password
|
||||||
|
Password:
|
||||||
|
Confirm password:
|
||||||
|
$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||||
|
|
||||||
|
Then update the `users` table in the database:
|
||||||
|
|
||||||
|
UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
|
||||||
|
WHERE name='@test:test.com';
|
||||||
|
|
||||||
Where's the spec?!
|
Where's the spec?!
|
||||||
==================
|
==================
|
||||||
|
@ -86,9 +86,12 @@ def used_names(prefix, item, defs, names):
|
|||||||
for name, funcs in defs.get('class', {}).items():
|
for name, funcs in defs.get('class', {}).items():
|
||||||
used_names(prefix + name + ".", name, funcs, names)
|
used_names(prefix + name + ".", name, funcs, names)
|
||||||
|
|
||||||
|
path = prefix.rstrip('.')
|
||||||
for used in defs.get('uses', ()):
|
for used in defs.get('uses', ()):
|
||||||
if used in names:
|
if used in names:
|
||||||
names[used].setdefault('used', {}).setdefault(item, []).append(prefix.rstrip('.'))
|
if item:
|
||||||
|
names[item].setdefault('uses', []).append(used)
|
||||||
|
names[used].setdefault('used', {}).setdefault(item, []).append(path)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
@ -113,6 +116,10 @@ if __name__ == '__main__':
|
|||||||
"--referrers", default=0, type=int,
|
"--referrers", default=0, type=int,
|
||||||
help="Include referrers up to the given depth"
|
help="Include referrers up to the given depth"
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--referred", default=0, type=int,
|
||||||
|
help="Include referred down to the given depth"
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--format", default="yaml",
|
"--format", default="yaml",
|
||||||
help="Output format, one of 'yaml' or 'dot'"
|
help="Output format, one of 'yaml' or 'dot'"
|
||||||
@ -161,6 +168,20 @@ if __name__ == '__main__':
|
|||||||
continue
|
continue
|
||||||
result[name] = definition
|
result[name] = definition
|
||||||
|
|
||||||
|
referred_depth = args.referred
|
||||||
|
referred = set()
|
||||||
|
while referred_depth:
|
||||||
|
referred_depth -= 1
|
||||||
|
for entry in result.values():
|
||||||
|
for uses in entry.get("uses", ()):
|
||||||
|
referred.add(uses)
|
||||||
|
for name, definition in names.items():
|
||||||
|
if not name in referred:
|
||||||
|
continue
|
||||||
|
if ignore and any(pattern.match(name) for pattern in ignore):
|
||||||
|
continue
|
||||||
|
result[name] = definition
|
||||||
|
|
||||||
if args.format == 'yaml':
|
if args.format == 'yaml':
|
||||||
yaml.dump(result, sys.stdout, default_flow_style=False)
|
yaml.dump(result, sys.stdout, default_flow_style=False)
|
||||||
elif args.format == 'dot':
|
elif args.format == 'dot':
|
||||||
|
@ -1 +0,0 @@
|
|||||||
perl -MCrypt::Random -MCrypt::Eksblowfish::Bcrypt -e 'print Crypt::Eksblowfish::Bcrypt::bcrypt("secret", "\$2\$12\$" . Crypt::Eksblowfish::Bcrypt::en_base64(Crypt::Random::makerandom_octet(Length=>16)))."\n"'
|
|
39
scripts/hash_password
Executable file
39
scripts/hash_password
Executable file
@ -0,0 +1,39 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import bcrypt
|
||||||
|
import getpass
|
||||||
|
|
||||||
|
bcrypt_rounds=12
|
||||||
|
|
||||||
|
def prompt_for_pass():
|
||||||
|
password = getpass.getpass("Password: ")
|
||||||
|
|
||||||
|
if not password:
|
||||||
|
raise Exception("Password cannot be blank.")
|
||||||
|
|
||||||
|
confirm_password = getpass.getpass("Confirm password: ")
|
||||||
|
|
||||||
|
if password != confirm_password:
|
||||||
|
raise Exception("Passwords do not match.")
|
||||||
|
|
||||||
|
return password
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Calculate the hash of a new password, so that passwords"
|
||||||
|
" can be reset")
|
||||||
|
parser.add_argument(
|
||||||
|
"-p", "--password",
|
||||||
|
default=None,
|
||||||
|
help="New password for user. Will prompt if omitted.",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
password = args.password
|
||||||
|
|
||||||
|
if not password:
|
||||||
|
password = prompt_for_pass()
|
||||||
|
|
||||||
|
print bcrypt.hashpw(password, bcrypt.gensalt(bcrypt_rounds))
|
||||||
|
|
@ -814,17 +814,16 @@ class Auth(object):
|
|||||||
|
|
||||||
return auth_ids
|
return auth_ids
|
||||||
|
|
||||||
@log_function
|
def _get_send_level(self, etype, state_key, auth_events):
|
||||||
def _can_send_event(self, event, auth_events):
|
|
||||||
key = (EventTypes.PowerLevels, "", )
|
key = (EventTypes.PowerLevels, "", )
|
||||||
send_level_event = auth_events.get(key)
|
send_level_event = auth_events.get(key)
|
||||||
send_level = None
|
send_level = None
|
||||||
if send_level_event:
|
if send_level_event:
|
||||||
send_level = send_level_event.content.get("events", {}).get(
|
send_level = send_level_event.content.get("events", {}).get(
|
||||||
event.type
|
etype
|
||||||
)
|
)
|
||||||
if send_level is None:
|
if send_level is None:
|
||||||
if hasattr(event, "state_key"):
|
if state_key is not None:
|
||||||
send_level = send_level_event.content.get(
|
send_level = send_level_event.content.get(
|
||||||
"state_default", 50
|
"state_default", 50
|
||||||
)
|
)
|
||||||
@ -838,6 +837,13 @@ class Auth(object):
|
|||||||
else:
|
else:
|
||||||
send_level = 0
|
send_level = 0
|
||||||
|
|
||||||
|
return send_level
|
||||||
|
|
||||||
|
@log_function
|
||||||
|
def _can_send_event(self, event, auth_events):
|
||||||
|
send_level = self._get_send_level(
|
||||||
|
event.type, event.get("state_key", None), auth_events
|
||||||
|
)
|
||||||
user_level = self._get_user_power_level(event.user_id, auth_events)
|
user_level = self._get_user_power_level(event.user_id, auth_events)
|
||||||
|
|
||||||
if user_level < send_level:
|
if user_level < send_level:
|
||||||
@ -982,3 +988,43 @@ class Auth(object):
|
|||||||
"You don't have permission to add ops level greater "
|
"You don't have permission to add ops level greater "
|
||||||
"than your own"
|
"than your own"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def check_can_change_room_list(self, room_id, user):
|
||||||
|
"""Check if the user is allowed to edit the room's entry in the
|
||||||
|
published room list.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str)
|
||||||
|
user (UserID)
|
||||||
|
"""
|
||||||
|
|
||||||
|
is_admin = yield self.is_server_admin(user)
|
||||||
|
if is_admin:
|
||||||
|
defer.returnValue(True)
|
||||||
|
|
||||||
|
user_id = user.to_string()
|
||||||
|
yield self.check_joined_room(room_id, user_id)
|
||||||
|
|
||||||
|
# We currently require the user is a "moderator" in the room. We do this
|
||||||
|
# by checking if they would (theoretically) be able to change the
|
||||||
|
# m.room.aliases events
|
||||||
|
power_level_event = yield self.state.get_current_state(
|
||||||
|
room_id, EventTypes.PowerLevels, ""
|
||||||
|
)
|
||||||
|
|
||||||
|
auth_events = {}
|
||||||
|
if power_level_event:
|
||||||
|
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
|
||||||
|
|
||||||
|
send_level = self._get_send_level(
|
||||||
|
EventTypes.Aliases, "", auth_events
|
||||||
|
)
|
||||||
|
user_level = self._get_user_power_level(user_id, auth_events)
|
||||||
|
|
||||||
|
if user_level < send_level:
|
||||||
|
raise AuthError(
|
||||||
|
403,
|
||||||
|
"This server requires you to be a moderator in the room to"
|
||||||
|
" edit its room list entry"
|
||||||
|
)
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from synapse.util.frozenutils import freeze
|
from synapse.util.frozenutils import freeze
|
||||||
|
from synapse.util.caches import intern_dict
|
||||||
|
|
||||||
|
|
||||||
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
|
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
|
||||||
@ -140,6 +141,10 @@ class FrozenEvent(EventBase):
|
|||||||
|
|
||||||
unsigned = dict(event_dict.pop("unsigned", {}))
|
unsigned = dict(event_dict.pop("unsigned", {}))
|
||||||
|
|
||||||
|
# We intern these strings because they turn up a lot (especially when
|
||||||
|
# caching).
|
||||||
|
event_dict = intern_dict(event_dict)
|
||||||
|
|
||||||
if USE_FROZEN_DICTS:
|
if USE_FROZEN_DICTS:
|
||||||
frozen_dict = freeze(event_dict)
|
frozen_dict = freeze(event_dict)
|
||||||
else:
|
else:
|
||||||
@ -168,5 +173,7 @@ class FrozenEvent(EventBase):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<FrozenEvent event_id='%s', type='%s', state_key='%s'>" % (
|
return "<FrozenEvent event_id='%s', type='%s', state_key='%s'>" % (
|
||||||
self.event_id, self.type, self.get("state_key", None),
|
self.get("event_id", None),
|
||||||
|
self.get("type", None),
|
||||||
|
self.get("state_key", None),
|
||||||
)
|
)
|
||||||
|
@ -418,6 +418,7 @@ class FederationClient(FederationBase):
|
|||||||
"Failed to make_%s via %s: %s",
|
"Failed to make_%s via %s: %s",
|
||||||
membership, destination, e.message
|
membership, destination, e.message
|
||||||
)
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
raise RuntimeError("Failed to send to any server.")
|
raise RuntimeError("Failed to send to any server.")
|
||||||
|
|
||||||
|
@ -137,8 +137,8 @@ class FederationServer(FederationBase):
|
|||||||
logger.exception("Failed to handle PDU")
|
logger.exception("Failed to handle PDU")
|
||||||
|
|
||||||
if hasattr(transaction, "edus"):
|
if hasattr(transaction, "edus"):
|
||||||
for edu in [Edu(**x) for x in transaction.edus]:
|
for edu in (Edu(**x) for x in transaction.edus):
|
||||||
self.received_edu(
|
yield self.received_edu(
|
||||||
transaction.origin,
|
transaction.origin,
|
||||||
edu.edu_type,
|
edu.edu_type,
|
||||||
edu.content
|
edu.content
|
||||||
@ -161,11 +161,17 @@ class FederationServer(FederationBase):
|
|||||||
)
|
)
|
||||||
defer.returnValue((200, response))
|
defer.returnValue((200, response))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def received_edu(self, origin, edu_type, content):
|
def received_edu(self, origin, edu_type, content):
|
||||||
received_edus_counter.inc()
|
received_edus_counter.inc()
|
||||||
|
|
||||||
if edu_type in self.edu_handlers:
|
if edu_type in self.edu_handlers:
|
||||||
self.edu_handlers[edu_type](origin, content)
|
try:
|
||||||
|
yield self.edu_handlers[edu_type](origin, content)
|
||||||
|
except SynapseError as e:
|
||||||
|
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("Failed to handle edu %r", edu_type, e)
|
||||||
else:
|
else:
|
||||||
logger.warn("Received EDU of type %s with no handler", edu_type)
|
logger.warn("Received EDU of type %s with no handler", edu_type)
|
||||||
|
|
||||||
@ -525,7 +531,6 @@ class FederationServer(FederationBase):
|
|||||||
yield self.handler.on_receive_pdu(
|
yield self.handler.on_receive_pdu(
|
||||||
origin,
|
origin,
|
||||||
pdu,
|
pdu,
|
||||||
backfilled=False,
|
|
||||||
state=state,
|
state=state,
|
||||||
auth_chain=auth_chain,
|
auth_chain=auth_chain,
|
||||||
)
|
)
|
||||||
|
@ -175,7 +175,7 @@ class BaseFederationServlet(object):
|
|||||||
|
|
||||||
|
|
||||||
class FederationSendServlet(BaseFederationServlet):
|
class FederationSendServlet(BaseFederationServlet):
|
||||||
PATH = "/send/([^/]*)/"
|
PATH = "/send/(?P<transaction_id>[^/]*)/"
|
||||||
|
|
||||||
def __init__(self, handler, server_name, **kwargs):
|
def __init__(self, handler, server_name, **kwargs):
|
||||||
super(FederationSendServlet, self).__init__(
|
super(FederationSendServlet, self).__init__(
|
||||||
@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationEventServlet(BaseFederationServlet):
|
class FederationEventServlet(BaseFederationServlet):
|
||||||
PATH = "/event/([^/]*)/"
|
PATH = "/event/(?P<event_id>[^/]*)/"
|
||||||
|
|
||||||
# This is when someone asks for a data item for a given server data_id pair.
|
# This is when someone asks for a data item for a given server data_id pair.
|
||||||
def on_GET(self, origin, content, query, event_id):
|
def on_GET(self, origin, content, query, event_id):
|
||||||
@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationStateServlet(BaseFederationServlet):
|
class FederationStateServlet(BaseFederationServlet):
|
||||||
PATH = "/state/([^/]*)/"
|
PATH = "/state/(?P<context>[^/]*)/"
|
||||||
|
|
||||||
# This is when someone asks for all data for a given context.
|
# This is when someone asks for all data for a given context.
|
||||||
def on_GET(self, origin, content, query, context):
|
def on_GET(self, origin, content, query, context):
|
||||||
@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationBackfillServlet(BaseFederationServlet):
|
class FederationBackfillServlet(BaseFederationServlet):
|
||||||
PATH = "/backfill/([^/]*)/"
|
PATH = "/backfill/(?P<context>[^/]*)/"
|
||||||
|
|
||||||
def on_GET(self, origin, content, query, context):
|
def on_GET(self, origin, content, query, context):
|
||||||
versions = query["v"]
|
versions = query["v"]
|
||||||
@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationQueryServlet(BaseFederationServlet):
|
class FederationQueryServlet(BaseFederationServlet):
|
||||||
PATH = "/query/([^/]*)"
|
PATH = "/query/(?P<query_type>[^/]*)"
|
||||||
|
|
||||||
# This is when we receive a server-server Query
|
# This is when we receive a server-server Query
|
||||||
def on_GET(self, origin, content, query, query_type):
|
def on_GET(self, origin, content, query, query_type):
|
||||||
@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationMakeJoinServlet(BaseFederationServlet):
|
class FederationMakeJoinServlet(BaseFederationServlet):
|
||||||
PATH = "/make_join/([^/]*)/([^/]*)"
|
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, origin, content, query, context, user_id):
|
def on_GET(self, origin, content, query, context, user_id):
|
||||||
@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationMakeLeaveServlet(BaseFederationServlet):
|
class FederationMakeLeaveServlet(BaseFederationServlet):
|
||||||
PATH = "/make_leave/([^/]*)/([^/]*)"
|
PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, origin, content, query, context, user_id):
|
def on_GET(self, origin, content, query, context, user_id):
|
||||||
@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationSendLeaveServlet(BaseFederationServlet):
|
class FederationSendLeaveServlet(BaseFederationServlet):
|
||||||
PATH = "/send_leave/([^/]*)/([^/]*)"
|
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_PUT(self, origin, content, query, room_id, txid):
|
def on_PUT(self, origin, content, query, room_id, txid):
|
||||||
@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationEventAuthServlet(BaseFederationServlet):
|
class FederationEventAuthServlet(BaseFederationServlet):
|
||||||
PATH = "/event_auth/([^/]*)/([^/]*)"
|
PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)"
|
||||||
|
|
||||||
def on_GET(self, origin, content, query, context, event_id):
|
def on_GET(self, origin, content, query, context, event_id):
|
||||||
return self.handler.on_event_auth(origin, context, event_id)
|
return self.handler.on_event_auth(origin, context, event_id)
|
||||||
|
|
||||||
|
|
||||||
class FederationSendJoinServlet(BaseFederationServlet):
|
class FederationSendJoinServlet(BaseFederationServlet):
|
||||||
PATH = "/send_join/([^/]*)/([^/]*)"
|
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_PUT(self, origin, content, query, context, event_id):
|
def on_PUT(self, origin, content, query, context, event_id):
|
||||||
@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationInviteServlet(BaseFederationServlet):
|
class FederationInviteServlet(BaseFederationServlet):
|
||||||
PATH = "/invite/([^/]*)/([^/]*)"
|
PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_PUT(self, origin, content, query, context, event_id):
|
def on_PUT(self, origin, content, query, context, event_id):
|
||||||
@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
|
class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
|
||||||
PATH = "/exchange_third_party_invite/([^/]*)"
|
PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_PUT(self, origin, content, query, room_id):
|
def on_PUT(self, origin, content, query, room_id):
|
||||||
@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
|
|
||||||
class FederationQueryAuthServlet(BaseFederationServlet):
|
class FederationQueryAuthServlet(BaseFederationServlet):
|
||||||
PATH = "/query_auth/([^/]*)/([^/]*)"
|
PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_POST(self, origin, content, query, context, event_id):
|
def on_POST(self, origin, content, query, context, event_id):
|
||||||
@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
|
|||||||
|
|
||||||
class FederationGetMissingEventsServlet(BaseFederationServlet):
|
class FederationGetMissingEventsServlet(BaseFederationServlet):
|
||||||
# TODO(paul): Why does this path alone end with "/?" optional?
|
# TODO(paul): Why does this path alone end with "/?" optional?
|
||||||
PATH = "/get_missing_events/([^/]*)/?"
|
PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_POST(self, origin, content, query, room_id):
|
def on_POST(self, origin, content, query, room_id):
|
||||||
|
@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
class AuthHandler(BaseHandler):
|
class AuthHandler(BaseHandler):
|
||||||
|
SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(AuthHandler, self).__init__(hs)
|
super(AuthHandler, self).__init__(hs)
|
||||||
@ -66,15 +67,18 @@ class AuthHandler(BaseHandler):
|
|||||||
'auth' key: this method prompts for auth if none is sent.
|
'auth' key: this method prompts for auth if none is sent.
|
||||||
clientip (str): The IP address of the client.
|
clientip (str): The IP address of the client.
|
||||||
Returns:
|
Returns:
|
||||||
A tuple of (authed, dict, dict) where authed is true if the client
|
A tuple of (authed, dict, dict, session_id) where authed is true if
|
||||||
has successfully completed an auth flow. If it is true, the first
|
the client has successfully completed an auth flow. If it is true
|
||||||
dict contains the authenticated credentials of each stage.
|
the first dict contains the authenticated credentials of each stage.
|
||||||
|
|
||||||
If authed is false, the first dictionary is the server response to
|
If authed is false, the first dictionary is the server response to
|
||||||
the login request and should be passed back to the client.
|
the login request and should be passed back to the client.
|
||||||
|
|
||||||
In either case, the second dict contains the parameters for this
|
In either case, the second dict contains the parameters for this
|
||||||
request (which may have been given only in a previous call).
|
request (which may have been given only in a previous call).
|
||||||
|
|
||||||
|
session_id is the ID of this session, either passed in by the client
|
||||||
|
or assigned by the call to check_auth
|
||||||
"""
|
"""
|
||||||
|
|
||||||
authdict = None
|
authdict = None
|
||||||
@ -103,7 +107,10 @@ class AuthHandler(BaseHandler):
|
|||||||
|
|
||||||
if not authdict:
|
if not authdict:
|
||||||
defer.returnValue(
|
defer.returnValue(
|
||||||
(False, self._auth_dict_for_flows(flows, session), clientdict)
|
(
|
||||||
|
False, self._auth_dict_for_flows(flows, session),
|
||||||
|
clientdict, session['id']
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if 'creds' not in session:
|
if 'creds' not in session:
|
||||||
@ -122,12 +129,11 @@ class AuthHandler(BaseHandler):
|
|||||||
for f in flows:
|
for f in flows:
|
||||||
if len(set(f) - set(creds.keys())) == 0:
|
if len(set(f) - set(creds.keys())) == 0:
|
||||||
logger.info("Auth completed with creds: %r", creds)
|
logger.info("Auth completed with creds: %r", creds)
|
||||||
self._remove_session(session)
|
defer.returnValue((True, creds, clientdict, session['id']))
|
||||||
defer.returnValue((True, creds, clientdict))
|
|
||||||
|
|
||||||
ret = self._auth_dict_for_flows(flows, session)
|
ret = self._auth_dict_for_flows(flows, session)
|
||||||
ret['completed'] = creds.keys()
|
ret['completed'] = creds.keys()
|
||||||
defer.returnValue((False, ret, clientdict))
|
defer.returnValue((False, ret, clientdict, session['id']))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def add_oob_auth(self, stagetype, authdict, clientip):
|
def add_oob_auth(self, stagetype, authdict, clientip):
|
||||||
@ -154,6 +160,43 @@ class AuthHandler(BaseHandler):
|
|||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
|
|
||||||
|
def get_session_id(self, clientdict):
|
||||||
|
"""
|
||||||
|
Gets the session ID for a client given the client dictionary
|
||||||
|
:param clientdict: The dictionary sent by the client in the request
|
||||||
|
:return: The string session ID the client sent. If the client did not
|
||||||
|
send a session ID, returns None.
|
||||||
|
"""
|
||||||
|
sid = None
|
||||||
|
if clientdict and 'auth' in clientdict:
|
||||||
|
authdict = clientdict['auth']
|
||||||
|
if 'session' in authdict:
|
||||||
|
sid = authdict['session']
|
||||||
|
return sid
|
||||||
|
|
||||||
|
def set_session_data(self, session_id, key, value):
|
||||||
|
"""
|
||||||
|
Store a key-value pair into the sessions data associated with this
|
||||||
|
request. This data is stored server-side and cannot be modified by
|
||||||
|
the client.
|
||||||
|
:param session_id: (string) The ID of this session as returned from check_auth
|
||||||
|
:param key: (string) The key to store the data under
|
||||||
|
:param value: (any) The data to store
|
||||||
|
"""
|
||||||
|
sess = self._get_session_info(session_id)
|
||||||
|
sess.setdefault('serverdict', {})[key] = value
|
||||||
|
self._save_session(sess)
|
||||||
|
|
||||||
|
def get_session_data(self, session_id, key, default=None):
|
||||||
|
"""
|
||||||
|
Retrieve data stored with set_session_data
|
||||||
|
:param session_id: (string) The ID of this session as returned from check_auth
|
||||||
|
:param key: (string) The key to store the data under
|
||||||
|
:param default: (any) Value to return if the key has not been set
|
||||||
|
"""
|
||||||
|
sess = self._get_session_info(session_id)
|
||||||
|
return sess.setdefault('serverdict', {}).get(key, default)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _check_password_auth(self, authdict, _):
|
def _check_password_auth(self, authdict, _):
|
||||||
if "user" not in authdict or "password" not in authdict:
|
if "user" not in authdict or "password" not in authdict:
|
||||||
@ -455,11 +498,18 @@ class AuthHandler(BaseHandler):
|
|||||||
def _save_session(self, session):
|
def _save_session(self, session):
|
||||||
# TODO: Persistent storage
|
# TODO: Persistent storage
|
||||||
logger.debug("Saving session %s", session)
|
logger.debug("Saving session %s", session)
|
||||||
|
session["last_used"] = self.hs.get_clock().time_msec()
|
||||||
self.sessions[session["id"]] = session
|
self.sessions[session["id"]] = session
|
||||||
|
self._prune_sessions()
|
||||||
|
|
||||||
def _remove_session(self, session):
|
def _prune_sessions(self):
|
||||||
logger.debug("Removing session %s", session)
|
for sid, sess in self.sessions.items():
|
||||||
del self.sessions[session["id"]]
|
last_used = 0
|
||||||
|
if 'last_used' in sess:
|
||||||
|
last_used = sess['last_used']
|
||||||
|
now = self.hs.get_clock().time_msec()
|
||||||
|
if last_used < now - AuthHandler.SESSION_EXPIRE_MS:
|
||||||
|
del self.sessions[sid]
|
||||||
|
|
||||||
def hash(self, password):
|
def hash(self, password):
|
||||||
"""Computes a secure hash of password.
|
"""Computes a secure hash of password.
|
||||||
|
@ -32,6 +32,8 @@ class DirectoryHandler(BaseHandler):
|
|||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(DirectoryHandler, self).__init__(hs)
|
super(DirectoryHandler, self).__init__(hs)
|
||||||
|
|
||||||
|
self.state = hs.get_state_handler()
|
||||||
|
|
||||||
self.federation = hs.get_replication_layer()
|
self.federation = hs.get_replication_layer()
|
||||||
self.federation.register_query_handler(
|
self.federation.register_query_handler(
|
||||||
"directory", self.on_directory_query
|
"directory", self.on_directory_query
|
||||||
@ -93,7 +95,7 @@ class DirectoryHandler(BaseHandler):
|
|||||||
yield self._create_association(room_alias, room_id, servers)
|
yield self._create_association(room_alias, room_id, servers)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_association(self, user_id, room_alias):
|
def delete_association(self, requester, user_id, room_alias):
|
||||||
# association deletion for human users
|
# association deletion for human users
|
||||||
|
|
||||||
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
|
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
|
||||||
@ -112,7 +114,25 @@ class DirectoryHandler(BaseHandler):
|
|||||||
errcode=Codes.EXCLUSIVE
|
errcode=Codes.EXCLUSIVE
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self._delete_association(room_alias)
|
room_id = yield self._delete_association(room_alias)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield self.send_room_alias_update_event(
|
||||||
|
requester,
|
||||||
|
requester.user.to_string(),
|
||||||
|
room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self._update_canonical_alias(
|
||||||
|
requester,
|
||||||
|
requester.user.to_string(),
|
||||||
|
room_id,
|
||||||
|
room_alias,
|
||||||
|
)
|
||||||
|
except AuthError as e:
|
||||||
|
logger.info("Failed to update alias events: %s", e)
|
||||||
|
|
||||||
|
defer.returnValue(room_id)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_appservice_association(self, service, room_alias):
|
def delete_appservice_association(self, service, room_alias):
|
||||||
@ -129,11 +149,9 @@ class DirectoryHandler(BaseHandler):
|
|||||||
if not self.hs.is_mine(room_alias):
|
if not self.hs.is_mine(room_alias):
|
||||||
raise SynapseError(400, "Room alias must be local")
|
raise SynapseError(400, "Room alias must be local")
|
||||||
|
|
||||||
yield self.store.delete_room_alias(room_alias)
|
room_id = yield self.store.delete_room_alias(room_alias)
|
||||||
|
|
||||||
# TODO - Looks like _update_room_alias_event has never been implemented
|
defer.returnValue(room_id)
|
||||||
# if room_id:
|
|
||||||
# yield self._update_room_alias_events(user_id, room_id)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_association(self, room_alias):
|
def get_association(self, room_alias):
|
||||||
@ -233,6 +251,29 @@ class DirectoryHandler(BaseHandler):
|
|||||||
ratelimit=False
|
ratelimit=False
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _update_canonical_alias(self, requester, user_id, room_id, room_alias):
|
||||||
|
alias_event = yield self.state.get_current_state(
|
||||||
|
room_id, EventTypes.CanonicalAlias, ""
|
||||||
|
)
|
||||||
|
|
||||||
|
alias_str = room_alias.to_string()
|
||||||
|
if not alias_event or alias_event.content.get("alias", "") != alias_str:
|
||||||
|
return
|
||||||
|
|
||||||
|
msg_handler = self.hs.get_handlers().message_handler
|
||||||
|
yield msg_handler.create_and_send_nonmember_event(
|
||||||
|
requester,
|
||||||
|
{
|
||||||
|
"type": EventTypes.CanonicalAlias,
|
||||||
|
"state_key": "",
|
||||||
|
"room_id": room_id,
|
||||||
|
"sender": user_id,
|
||||||
|
"content": {},
|
||||||
|
},
|
||||||
|
ratelimit=False
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_association_from_room_alias(self, room_alias):
|
def get_association_from_room_alias(self, room_alias):
|
||||||
result = yield self.store.get_association_from_room_alias(
|
result = yield self.store.get_association_from_room_alias(
|
||||||
@ -276,3 +317,25 @@ class DirectoryHandler(BaseHandler):
|
|||||||
|
|
||||||
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
|
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
|
||||||
defer.returnValue(is_admin)
|
defer.returnValue(is_admin)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def edit_published_room_list(self, requester, room_id, visibility):
|
||||||
|
"""Edit the entry of the room in the published room list.
|
||||||
|
|
||||||
|
requester
|
||||||
|
room_id (str)
|
||||||
|
visibility (str): "public" or "private"
|
||||||
|
"""
|
||||||
|
if requester.is_guest:
|
||||||
|
raise AuthError(403, "Guests cannot edit the published room list")
|
||||||
|
|
||||||
|
if visibility not in ["public", "private"]:
|
||||||
|
raise SynapseError(400, "Invalid visibility setting")
|
||||||
|
|
||||||
|
room = yield self.store.get_room(room_id)
|
||||||
|
if room is None:
|
||||||
|
raise SynapseError(400, "Unknown room")
|
||||||
|
|
||||||
|
yield self.auth.check_can_change_room_list(room_id, requester.user)
|
||||||
|
|
||||||
|
yield self.store.set_room_is_public(room_id, visibility == "public")
|
||||||
|
@ -18,7 +18,6 @@ from twisted.internet import defer
|
|||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.types import UserID
|
from synapse.types import UserID
|
||||||
from synapse.events.utils import serialize_event
|
from synapse.events.utils import serialize_event
|
||||||
from synapse.util.logcontext import preserve_context_over_fn
|
|
||||||
from synapse.api.constants import Membership, EventTypes
|
from synapse.api.constants import Membership, EventTypes
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
|
|
||||||
@ -31,20 +30,6 @@ import random
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def started_user_eventstream(distributor, user):
|
|
||||||
return preserve_context_over_fn(
|
|
||||||
distributor.fire,
|
|
||||||
"started_user_eventstream", user
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def stopped_user_eventstream(distributor, user):
|
|
||||||
return preserve_context_over_fn(
|
|
||||||
distributor.fire,
|
|
||||||
"stopped_user_eventstream", user
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class EventStreamHandler(BaseHandler):
|
class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
@ -63,61 +48,6 @@ class EventStreamHandler(BaseHandler):
|
|||||||
|
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def started_stream(self, user):
|
|
||||||
"""Tells the presence handler that we have started an eventstream for
|
|
||||||
the user:
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user (User): The user who started a stream.
|
|
||||||
Returns:
|
|
||||||
A deferred that completes once their presence has been updated.
|
|
||||||
"""
|
|
||||||
if user not in self._streams_per_user:
|
|
||||||
# Make sure we set the streams per user to 1 here rather than
|
|
||||||
# setting it to zero and incrementing the value below.
|
|
||||||
# Otherwise this may race with stopped_stream causing the
|
|
||||||
# user to be erased from the map before we have a chance
|
|
||||||
# to increment it.
|
|
||||||
self._streams_per_user[user] = 1
|
|
||||||
if user in self._stop_timer_per_user:
|
|
||||||
try:
|
|
||||||
self.clock.cancel_call_later(
|
|
||||||
self._stop_timer_per_user.pop(user)
|
|
||||||
)
|
|
||||||
except:
|
|
||||||
logger.exception("Failed to cancel event timer")
|
|
||||||
else:
|
|
||||||
yield started_user_eventstream(self.distributor, user)
|
|
||||||
else:
|
|
||||||
self._streams_per_user[user] += 1
|
|
||||||
|
|
||||||
def stopped_stream(self, user):
|
|
||||||
"""If there are no streams for a user this starts a timer that will
|
|
||||||
notify the presence handler that we haven't got an event stream for
|
|
||||||
the user unless the user starts a new stream in 30 seconds.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user (User): The user who stopped a stream.
|
|
||||||
"""
|
|
||||||
self._streams_per_user[user] -= 1
|
|
||||||
if not self._streams_per_user[user]:
|
|
||||||
del self._streams_per_user[user]
|
|
||||||
|
|
||||||
# 30 seconds of grace to allow the client to reconnect again
|
|
||||||
# before we think they're gone
|
|
||||||
def _later():
|
|
||||||
logger.debug("_later stopped_user_eventstream %s", user)
|
|
||||||
|
|
||||||
self._stop_timer_per_user.pop(user, None)
|
|
||||||
|
|
||||||
return stopped_user_eventstream(self.distributor, user)
|
|
||||||
|
|
||||||
logger.debug("Scheduling _later: for %s", user)
|
|
||||||
self._stop_timer_per_user[user] = (
|
|
||||||
self.clock.call_later(30, _later)
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def get_stream(self, auth_user_id, pagin_config, timeout=0,
|
def get_stream(self, auth_user_id, pagin_config, timeout=0,
|
||||||
|
@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_receive_pdu(self, origin, pdu, backfilled, state=None,
|
def on_receive_pdu(self, origin, pdu, state=None,
|
||||||
auth_chain=None):
|
auth_chain=None):
|
||||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||||
do auth checks and put it through the StateHandler.
|
do auth checks and put it through the StateHandler.
|
||||||
@ -123,7 +123,6 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
# FIXME (erikj): Awful hack to make the case where we are not currently
|
# FIXME (erikj): Awful hack to make the case where we are not currently
|
||||||
# in the room work
|
# in the room work
|
||||||
current_state = None
|
|
||||||
is_in_room = yield self.auth.check_host_in_room(
|
is_in_room = yield self.auth.check_host_in_room(
|
||||||
event.room_id,
|
event.room_id,
|
||||||
self.server_name
|
self.server_name
|
||||||
@ -186,8 +185,6 @@ class FederationHandler(BaseHandler):
|
|||||||
origin,
|
origin,
|
||||||
event,
|
event,
|
||||||
state=state,
|
state=state,
|
||||||
backfilled=backfilled,
|
|
||||||
current_state=current_state,
|
|
||||||
)
|
)
|
||||||
except AuthError as e:
|
except AuthError as e:
|
||||||
raise FederationError(
|
raise FederationError(
|
||||||
@ -216,18 +213,17 @@ class FederationHandler(BaseHandler):
|
|||||||
except StoreError:
|
except StoreError:
|
||||||
logger.exception("Failed to store room.")
|
logger.exception("Failed to store room.")
|
||||||
|
|
||||||
if not backfilled:
|
extra_users = []
|
||||||
extra_users = []
|
if event.type == EventTypes.Member:
|
||||||
if event.type == EventTypes.Member:
|
target_user_id = event.state_key
|
||||||
target_user_id = event.state_key
|
target_user = UserID.from_string(target_user_id)
|
||||||
target_user = UserID.from_string(target_user_id)
|
extra_users.append(target_user)
|
||||||
extra_users.append(target_user)
|
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
self.notifier.on_new_room_event(
|
self.notifier.on_new_room_event(
|
||||||
event, event_stream_id, max_stream_id,
|
event, event_stream_id, max_stream_id,
|
||||||
extra_users=extra_users
|
extra_users=extra_users
|
||||||
)
|
)
|
||||||
|
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
@ -647,7 +643,7 @@ class FederationHandler(BaseHandler):
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.on_receive_pdu(origin, p, backfilled=False)
|
self.on_receive_pdu(origin, p)
|
||||||
except:
|
except:
|
||||||
logger.exception("Couldn't handle pdu")
|
logger.exception("Couldn't handle pdu")
|
||||||
|
|
||||||
@ -779,7 +775,6 @@ class FederationHandler(BaseHandler):
|
|||||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||||
event,
|
event,
|
||||||
context=context,
|
context=context,
|
||||||
backfilled=False,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
target_user = UserID.from_string(event.state_key)
|
target_user = UserID.from_string(event.state_key)
|
||||||
@ -813,7 +808,21 @@ class FederationHandler(BaseHandler):
|
|||||||
target_hosts,
|
target_hosts,
|
||||||
signed_event
|
signed_event
|
||||||
)
|
)
|
||||||
defer.returnValue(None)
|
|
||||||
|
context = yield self.state_handler.compute_event_context(event)
|
||||||
|
|
||||||
|
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||||
|
event,
|
||||||
|
context=context,
|
||||||
|
)
|
||||||
|
|
||||||
|
target_user = UserID.from_string(event.state_key)
|
||||||
|
self.notifier.on_new_room_event(
|
||||||
|
event, event_stream_id, max_stream_id,
|
||||||
|
extra_users=[target_user],
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(event)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
|
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
|
||||||
@ -1059,8 +1068,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def _handle_new_event(self, origin, event, state=None, backfilled=False,
|
def _handle_new_event(self, origin, event, state=None, auth_events=None):
|
||||||
current_state=None, auth_events=None):
|
|
||||||
|
|
||||||
outlier = event.internal_metadata.is_outlier()
|
outlier = event.internal_metadata.is_outlier()
|
||||||
|
|
||||||
@ -1070,7 +1078,7 @@ class FederationHandler(BaseHandler):
|
|||||||
auth_events=auth_events,
|
auth_events=auth_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
if not backfilled and not event.internal_metadata.is_outlier():
|
if not event.internal_metadata.is_outlier():
|
||||||
action_generator = ActionGenerator(self.hs)
|
action_generator = ActionGenerator(self.hs)
|
||||||
yield action_generator.handle_push_actions_for_event(
|
yield action_generator.handle_push_actions_for_event(
|
||||||
event, context, self
|
event, context, self
|
||||||
@ -1079,9 +1087,7 @@ class FederationHandler(BaseHandler):
|
|||||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||||
event,
|
event,
|
||||||
context=context,
|
context=context,
|
||||||
backfilled=backfilled,
|
is_new_state=not outlier,
|
||||||
is_new_state=(not outlier and not backfilled),
|
|
||||||
current_state=current_state,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((context, event_stream_id, max_stream_id))
|
defer.returnValue((context, event_stream_id, max_stream_id))
|
||||||
@ -1179,7 +1185,6 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||||
event, new_event_context,
|
event, new_event_context,
|
||||||
backfilled=False,
|
|
||||||
is_new_state=True,
|
is_new_state=True,
|
||||||
current_state=state,
|
current_state=state,
|
||||||
)
|
)
|
||||||
|
@ -73,14 +73,6 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000
|
|||||||
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
|
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
|
||||||
|
|
||||||
|
|
||||||
def user_presence_changed(distributor, user, statuscache):
|
|
||||||
return distributor.fire("user_presence_changed", user, statuscache)
|
|
||||||
|
|
||||||
|
|
||||||
def collect_presencelike_data(distributor, user, content):
|
|
||||||
return distributor.fire("collect_presencelike_data", user, content)
|
|
||||||
|
|
||||||
|
|
||||||
class PresenceHandler(BaseHandler):
|
class PresenceHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -47,7 +47,8 @@ class RegistrationHandler(BaseHandler):
|
|||||||
self._next_generated_user_id = None
|
self._next_generated_user_id = None
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def check_username(self, localpart, guest_access_token=None):
|
def check_username(self, localpart, guest_access_token=None,
|
||||||
|
assigned_user_id=None):
|
||||||
yield run_on_reactor()
|
yield run_on_reactor()
|
||||||
|
|
||||||
if urllib.quote(localpart.encode('utf-8')) != localpart:
|
if urllib.quote(localpart.encode('utf-8')) != localpart:
|
||||||
@ -60,6 +61,15 @@ class RegistrationHandler(BaseHandler):
|
|||||||
user = UserID(localpart, self.hs.hostname)
|
user = UserID(localpart, self.hs.hostname)
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
|
|
||||||
|
if assigned_user_id:
|
||||||
|
if user_id == assigned_user_id:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise SynapseError(
|
||||||
|
400,
|
||||||
|
"A different user ID has already been registered for this session",
|
||||||
|
)
|
||||||
|
|
||||||
yield self.check_user_id_not_appservice_exclusive(user_id)
|
yield self.check_user_id_not_appservice_exclusive(user_id)
|
||||||
|
|
||||||
users = yield self.store.get_users_by_id_case_insensitive(user_id)
|
users = yield self.store.get_users_by_id_case_insensitive(user_id)
|
||||||
|
@ -119,7 +119,8 @@ class RoomCreationHandler(BaseHandler):
|
|||||||
|
|
||||||
invite_3pid_list = config.get("invite_3pid", [])
|
invite_3pid_list = config.get("invite_3pid", [])
|
||||||
|
|
||||||
is_public = config.get("visibility", None) == "public"
|
visibility = config.get("visibility", None)
|
||||||
|
is_public = visibility == "public"
|
||||||
|
|
||||||
# autogen room IDs and try to create it. We may clash, so just
|
# autogen room IDs and try to create it. We may clash, so just
|
||||||
# try a few times till one goes through, giving up eventually.
|
# try a few times till one goes through, giving up eventually.
|
||||||
@ -155,9 +156,9 @@ class RoomCreationHandler(BaseHandler):
|
|||||||
|
|
||||||
preset_config = config.get(
|
preset_config = config.get(
|
||||||
"preset",
|
"preset",
|
||||||
RoomCreationPreset.PUBLIC_CHAT
|
RoomCreationPreset.PRIVATE_CHAT
|
||||||
if is_public
|
if visibility == "private"
|
||||||
else RoomCreationPreset.PRIVATE_CHAT
|
else RoomCreationPreset.PUBLIC_CHAT
|
||||||
)
|
)
|
||||||
|
|
||||||
raw_initial_state = config.get("initial_state", [])
|
raw_initial_state = config.get("initial_state", [])
|
||||||
@ -946,53 +947,62 @@ class RoomListHandler(BaseHandler):
|
|||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def handle_room(room_id):
|
def handle_room(room_id):
|
||||||
aliases = yield self.store.get_aliases_for_room(room_id)
|
aliases = yield self.store.get_aliases_for_room(room_id)
|
||||||
if not aliases:
|
|
||||||
defer.returnValue(None)
|
|
||||||
|
|
||||||
state = yield self.state_handler.get_current_state(room_id)
|
# We pull each bit of state out indvidually to avoid pulling the
|
||||||
|
# full state into memory. Due to how the caching works this should
|
||||||
|
# be fairly quick, even if not originally in the cache.
|
||||||
|
def get_state(etype, state_key):
|
||||||
|
return self.state_handler.get_current_state(room_id, etype, state_key)
|
||||||
|
|
||||||
result = {"aliases": aliases, "room_id": room_id}
|
# Double check that this is actually a public room.
|
||||||
|
join_rules_event = yield get_state(EventTypes.JoinRules, "")
|
||||||
|
if join_rules_event:
|
||||||
|
join_rule = join_rules_event.content.get("join_rule", None)
|
||||||
|
if join_rule and join_rule != JoinRules.PUBLIC:
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
name_event = state.get((EventTypes.Name, ""), None)
|
result = {"room_id": room_id}
|
||||||
|
if aliases:
|
||||||
|
result["aliases"] = aliases
|
||||||
|
|
||||||
|
name_event = yield get_state(EventTypes.Name, "")
|
||||||
if name_event:
|
if name_event:
|
||||||
name = name_event.content.get("name", None)
|
name = name_event.content.get("name", None)
|
||||||
if name:
|
if name:
|
||||||
result["name"] = name
|
result["name"] = name
|
||||||
|
|
||||||
topic_event = state.get((EventTypes.Topic, ""), None)
|
topic_event = yield get_state(EventTypes.Topic, "")
|
||||||
if topic_event:
|
if topic_event:
|
||||||
topic = topic_event.content.get("topic", None)
|
topic = topic_event.content.get("topic", None)
|
||||||
if topic:
|
if topic:
|
||||||
result["topic"] = topic
|
result["topic"] = topic
|
||||||
|
|
||||||
canonical_event = state.get((EventTypes.CanonicalAlias, ""), None)
|
canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
|
||||||
if canonical_event:
|
if canonical_event:
|
||||||
canonical_alias = canonical_event.content.get("alias", None)
|
canonical_alias = canonical_event.content.get("alias", None)
|
||||||
if canonical_alias:
|
if canonical_alias:
|
||||||
result["canonical_alias"] = canonical_alias
|
result["canonical_alias"] = canonical_alias
|
||||||
|
|
||||||
visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
|
visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
|
||||||
visibility = None
|
visibility = None
|
||||||
if visibility_event:
|
if visibility_event:
|
||||||
visibility = visibility_event.content.get("history_visibility", None)
|
visibility = visibility_event.content.get("history_visibility", None)
|
||||||
result["world_readable"] = visibility == "world_readable"
|
result["world_readable"] = visibility == "world_readable"
|
||||||
|
|
||||||
guest_event = state.get((EventTypes.GuestAccess, ""), None)
|
guest_event = yield get_state(EventTypes.GuestAccess, "")
|
||||||
guest = None
|
guest = None
|
||||||
if guest_event:
|
if guest_event:
|
||||||
guest = guest_event.content.get("guest_access", None)
|
guest = guest_event.content.get("guest_access", None)
|
||||||
result["guest_can_join"] = guest == "can_join"
|
result["guest_can_join"] = guest == "can_join"
|
||||||
|
|
||||||
avatar_event = state.get(("m.room.avatar", ""), None)
|
avatar_event = yield get_state("m.room.avatar", "")
|
||||||
if avatar_event:
|
if avatar_event:
|
||||||
avatar_url = avatar_event.content.get("url", None)
|
avatar_url = avatar_event.content.get("url", None)
|
||||||
if avatar_url:
|
if avatar_url:
|
||||||
result["avatar_url"] = avatar_url
|
result["avatar_url"] = avatar_url
|
||||||
|
|
||||||
result["num_joined_members"] = sum(
|
joined_users = yield self.store.get_users_in_room(room_id)
|
||||||
1 for (event_type, _), ev in state.items()
|
result["num_joined_members"] = len(joined_users)
|
||||||
if event_type == EventTypes.Member and ev.membership == Membership.JOIN
|
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ from synapse.api.errors import (
|
|||||||
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
|
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
|
||||||
)
|
)
|
||||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||||
|
from synapse.util.caches import intern_dict
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
import synapse.events
|
import synapse.events
|
||||||
|
|
||||||
@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource):
|
|||||||
else:
|
else:
|
||||||
servlet_classname = "%r" % callback
|
servlet_classname = "%r" % callback
|
||||||
|
|
||||||
args = [
|
kwargs = intern_dict({
|
||||||
urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups()
|
name: urllib.unquote(value).decode("UTF-8") if value else value
|
||||||
]
|
for name, value in m.groupdict().items()
|
||||||
|
})
|
||||||
|
|
||||||
callback_return = yield callback(request, *args)
|
callback_return = yield callback(request, **kwargs)
|
||||||
if callback_return is not None:
|
if callback_return is not None:
|
||||||
code, response = callback_return
|
code, response = callback_return
|
||||||
self._send_response(request, code, response)
|
self._send_response(request, code, response)
|
||||||
|
@ -282,6 +282,12 @@ class Notifier(object):
|
|||||||
|
|
||||||
self.notify_replication()
|
self.notify_replication()
|
||||||
|
|
||||||
|
def on_new_replication_data(self):
|
||||||
|
"""Used to inform replication listeners that something has happend
|
||||||
|
without waking up any of the normal user event streams"""
|
||||||
|
with PreserveLoggingContext():
|
||||||
|
self.notify_replication()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
|
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
|
||||||
from_token=StreamToken.START):
|
from_token=StreamToken.START):
|
||||||
|
@ -317,7 +317,7 @@ class Pusher(object):
|
|||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_badge_count(self):
|
def _get_badge_count(self):
|
||||||
invites, joins = yield defer.gatherResults([
|
invites, joins = yield defer.gatherResults([
|
||||||
self.store.get_invites_for_user(self.user_id),
|
self.store.get_invited_rooms_for_user(self.user_id),
|
||||||
self.store.get_rooms_for_user(self.user_id),
|
self.store.get_rooms_for_user(self.user_id),
|
||||||
], consumeErrors=True)
|
], consumeErrors=True)
|
||||||
|
|
||||||
|
@ -107,7 +107,9 @@ class BulkPushRuleEvaluator:
|
|||||||
users_dict.items(), [event], {event.event_id: current_state}
|
users_dict.items(), [event], {event.event_id: current_state}
|
||||||
)
|
)
|
||||||
|
|
||||||
evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
|
room_members = yield self.store.get_users_in_room(self.room_id)
|
||||||
|
|
||||||
|
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
|
||||||
|
|
||||||
condition_cache = {}
|
condition_cache = {}
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ STREAM_NAMES = (
|
|||||||
("user_account_data", "room_account_data", "tag_account_data",),
|
("user_account_data", "room_account_data", "tag_account_data",),
|
||||||
("backfill",),
|
("backfill",),
|
||||||
("push_rules",),
|
("push_rules",),
|
||||||
|
("pushers",),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -65,6 +66,7 @@ class ReplicationResource(Resource):
|
|||||||
* "tag_account_data": Per room per user tags.
|
* "tag_account_data": Per room per user tags.
|
||||||
* "backfill": Old events that have been backfilled from other servers.
|
* "backfill": Old events that have been backfilled from other servers.
|
||||||
* "push_rules": Per user changes to push rules.
|
* "push_rules": Per user changes to push rules.
|
||||||
|
* "pushers": Per user changes to their pushers.
|
||||||
|
|
||||||
The API takes two additional query parameters:
|
The API takes two additional query parameters:
|
||||||
|
|
||||||
@ -120,6 +122,7 @@ class ReplicationResource(Resource):
|
|||||||
stream_token = yield self.sources.get_current_token()
|
stream_token = yield self.sources.get_current_token()
|
||||||
backfill_token = yield self.store.get_current_backfill_token()
|
backfill_token = yield self.store.get_current_backfill_token()
|
||||||
push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
|
push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
|
||||||
|
pushers_token = self.store.get_pushers_stream_token()
|
||||||
|
|
||||||
defer.returnValue(_ReplicationToken(
|
defer.returnValue(_ReplicationToken(
|
||||||
room_stream_token,
|
room_stream_token,
|
||||||
@ -129,6 +132,7 @@ class ReplicationResource(Resource):
|
|||||||
int(stream_token.account_data_key),
|
int(stream_token.account_data_key),
|
||||||
backfill_token,
|
backfill_token,
|
||||||
push_rules_token,
|
push_rules_token,
|
||||||
|
pushers_token,
|
||||||
))
|
))
|
||||||
|
|
||||||
@request_handler
|
@request_handler
|
||||||
@ -151,6 +155,7 @@ class ReplicationResource(Resource):
|
|||||||
yield self.typing(writer, current_token) # TODO: implement limit
|
yield self.typing(writer, current_token) # TODO: implement limit
|
||||||
yield self.receipts(writer, current_token, limit)
|
yield self.receipts(writer, current_token, limit)
|
||||||
yield self.push_rules(writer, current_token, limit)
|
yield self.push_rules(writer, current_token, limit)
|
||||||
|
yield self.pushers(writer, current_token, limit)
|
||||||
self.streams(writer, current_token)
|
self.streams(writer, current_token)
|
||||||
|
|
||||||
logger.info("Replicated %d rows", writer.total)
|
logger.info("Replicated %d rows", writer.total)
|
||||||
@ -297,6 +302,24 @@ class ReplicationResource(Resource):
|
|||||||
"priority_class", "priority", "conditions", "actions"
|
"priority_class", "priority", "conditions", "actions"
|
||||||
))
|
))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def pushers(self, writer, current_token, limit):
|
||||||
|
current_position = current_token.pushers
|
||||||
|
|
||||||
|
pushers = parse_integer(writer.request, "pushers")
|
||||||
|
if pushers is not None:
|
||||||
|
updated, deleted = yield self.store.get_all_updated_pushers(
|
||||||
|
pushers, current_position, limit
|
||||||
|
)
|
||||||
|
writer.write_header_and_rows("pushers", updated, (
|
||||||
|
"position", "user_id", "access_token", "profile_tag", "kind",
|
||||||
|
"app_id", "app_display_name", "device_display_name", "pushkey",
|
||||||
|
"ts", "lang", "data"
|
||||||
|
))
|
||||||
|
writer.write_header_and_rows("deleted", deleted, (
|
||||||
|
"position", "user_id", "app_id", "pushkey"
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
class _Writer(object):
|
class _Writer(object):
|
||||||
"""Writes the streams as a JSON object as the response to the request"""
|
"""Writes the streams as a JSON object as the response to the request"""
|
||||||
@ -327,7 +350,7 @@ class _Writer(object):
|
|||||||
|
|
||||||
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
|
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
|
||||||
"events", "presence", "typing", "receipts", "account_data", "backfill",
|
"events", "presence", "typing", "receipts", "account_data", "backfill",
|
||||||
"push_rules"
|
"push_rules", "pushers"
|
||||||
))):
|
))):
|
||||||
__slots__ = []
|
__slots__ = []
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
def register_servlets(hs, http_server):
|
def register_servlets(hs, http_server):
|
||||||
ClientDirectoryServer(hs).register(http_server)
|
ClientDirectoryServer(hs).register(http_server)
|
||||||
|
ClientDirectoryListServer(hs).register(http_server)
|
||||||
|
|
||||||
|
|
||||||
class ClientDirectoryServer(ClientV1RestServlet):
|
class ClientDirectoryServer(ClientV1RestServlet):
|
||||||
@ -127,8 +128,9 @@ class ClientDirectoryServer(ClientV1RestServlet):
|
|||||||
room_alias = RoomAlias.from_string(room_alias)
|
room_alias = RoomAlias.from_string(room_alias)
|
||||||
|
|
||||||
yield dir_handler.delete_association(
|
yield dir_handler.delete_association(
|
||||||
user.to_string(), room_alias
|
requester, user.to_string(), room_alias
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"User %s deleted alias %s",
|
"User %s deleted alias %s",
|
||||||
user.to_string(),
|
user.to_string(),
|
||||||
@ -136,3 +138,44 @@ class ClientDirectoryServer(ClientV1RestServlet):
|
|||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((200, {}))
|
defer.returnValue((200, {}))
|
||||||
|
|
||||||
|
|
||||||
|
class ClientDirectoryListServer(ClientV1RestServlet):
|
||||||
|
PATTERNS = client_path_patterns("/directory/list/room/(?P<room_id>[^/]*)$")
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
super(ClientDirectoryListServer, self).__init__(hs)
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_GET(self, request, room_id):
|
||||||
|
room = yield self.store.get_room(room_id)
|
||||||
|
if room is None:
|
||||||
|
raise SynapseError(400, "Unknown room")
|
||||||
|
|
||||||
|
defer.returnValue((200, {
|
||||||
|
"visibility": "public" if room["is_public"] else "private"
|
||||||
|
}))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_PUT(self, request, room_id):
|
||||||
|
requester = yield self.auth.get_user_by_req(request)
|
||||||
|
|
||||||
|
content = parse_json_object_from_request(request)
|
||||||
|
visibility = content.get("visibility", "public")
|
||||||
|
|
||||||
|
yield self.handlers.directory_handler.edit_published_room_list(
|
||||||
|
requester, room_id, visibility,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue((200, {}))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_DELETE(self, request, room_id):
|
||||||
|
requester = yield self.auth.get_user_by_req(request)
|
||||||
|
|
||||||
|
yield self.handlers.directory_handler.edit_published_room_list(
|
||||||
|
requester, room_id, "private",
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue((200, {}))
|
||||||
|
@ -29,6 +29,10 @@ logger = logging.getLogger(__name__)
|
|||||||
class PusherRestServlet(ClientV1RestServlet):
|
class PusherRestServlet(ClientV1RestServlet):
|
||||||
PATTERNS = client_path_patterns("/pushers/set$")
|
PATTERNS = client_path_patterns("/pushers/set$")
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
super(PusherRestServlet, self).__init__(hs)
|
||||||
|
self.notifier = hs.get_notifier()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_POST(self, request):
|
def on_POST(self, request):
|
||||||
requester = yield self.auth.get_user_by_req(request)
|
requester = yield self.auth.get_user_by_req(request)
|
||||||
@ -87,6 +91,8 @@ class PusherRestServlet(ClientV1RestServlet):
|
|||||||
raise SynapseError(400, "Config Error: " + pce.message,
|
raise SynapseError(400, "Config Error: " + pce.message,
|
||||||
errcode=Codes.MISSING_PARAM)
|
errcode=Codes.MISSING_PARAM)
|
||||||
|
|
||||||
|
self.notifier.on_new_replication_data()
|
||||||
|
|
||||||
defer.returnValue((200, {}))
|
defer.returnValue((200, {}))
|
||||||
|
|
||||||
def on_OPTIONS(self, _):
|
def on_OPTIONS(self, _):
|
||||||
|
@ -43,7 +43,7 @@ class PasswordRestServlet(RestServlet):
|
|||||||
|
|
||||||
body = parse_json_object_from_request(request)
|
body = parse_json_object_from_request(request)
|
||||||
|
|
||||||
authed, result, params = yield self.auth_handler.check_auth([
|
authed, result, params, _ = yield self.auth_handler.check_auth([
|
||||||
[LoginType.PASSWORD],
|
[LoginType.PASSWORD],
|
||||||
[LoginType.EMAIL_IDENTITY]
|
[LoginType.EMAIL_IDENTITY]
|
||||||
], body, self.hs.get_ip_from_request(request))
|
], body, self.hs.get_ip_from_request(request))
|
||||||
|
@ -122,10 +122,22 @@ class RegisterRestServlet(RestServlet):
|
|||||||
|
|
||||||
guest_access_token = body.get("guest_access_token", None)
|
guest_access_token = body.get("guest_access_token", None)
|
||||||
|
|
||||||
|
session_id = self.auth_handler.get_session_id(body)
|
||||||
|
registered_user_id = None
|
||||||
|
if session_id:
|
||||||
|
# if we get a registered user id out of here, it means we previously
|
||||||
|
# registered a user for this session, so we could just return the
|
||||||
|
# user here. We carry on and go through the auth checks though,
|
||||||
|
# for paranoia.
|
||||||
|
registered_user_id = self.auth_handler.get_session_data(
|
||||||
|
session_id, "registered_user_id", None
|
||||||
|
)
|
||||||
|
|
||||||
if desired_username is not None:
|
if desired_username is not None:
|
||||||
yield self.registration_handler.check_username(
|
yield self.registration_handler.check_username(
|
||||||
desired_username,
|
desired_username,
|
||||||
guest_access_token=guest_access_token
|
guest_access_token=guest_access_token,
|
||||||
|
assigned_user_id=registered_user_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.hs.config.enable_registration_captcha:
|
if self.hs.config.enable_registration_captcha:
|
||||||
@ -139,7 +151,7 @@ class RegisterRestServlet(RestServlet):
|
|||||||
[LoginType.EMAIL_IDENTITY]
|
[LoginType.EMAIL_IDENTITY]
|
||||||
]
|
]
|
||||||
|
|
||||||
authed, result, params = yield self.auth_handler.check_auth(
|
authed, result, params, session_id = yield self.auth_handler.check_auth(
|
||||||
flows, body, self.hs.get_ip_from_request(request)
|
flows, body, self.hs.get_ip_from_request(request)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -147,6 +159,22 @@ class RegisterRestServlet(RestServlet):
|
|||||||
defer.returnValue((401, result))
|
defer.returnValue((401, result))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if registered_user_id is not None:
|
||||||
|
logger.info(
|
||||||
|
"Already registered user ID %r for this session",
|
||||||
|
registered_user_id
|
||||||
|
)
|
||||||
|
access_token = yield self.auth_handler.issue_access_token(registered_user_id)
|
||||||
|
refresh_token = yield self.auth_handler.issue_refresh_token(
|
||||||
|
registered_user_id
|
||||||
|
)
|
||||||
|
defer.returnValue((200, {
|
||||||
|
"user_id": registered_user_id,
|
||||||
|
"access_token": access_token,
|
||||||
|
"home_server": self.hs.hostname,
|
||||||
|
"refresh_token": refresh_token,
|
||||||
|
}))
|
||||||
|
|
||||||
# NB: This may be from the auth handler and NOT from the POST
|
# NB: This may be from the auth handler and NOT from the POST
|
||||||
if 'password' not in params:
|
if 'password' not in params:
|
||||||
raise SynapseError(400, "Missing password.", Codes.MISSING_PARAM)
|
raise SynapseError(400, "Missing password.", Codes.MISSING_PARAM)
|
||||||
@ -161,6 +189,12 @@ class RegisterRestServlet(RestServlet):
|
|||||||
guest_access_token=guest_access_token,
|
guest_access_token=guest_access_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# remember that we've now registered that user account, and with what
|
||||||
|
# user ID (since the user may not have specified)
|
||||||
|
self.auth_handler.set_session_data(
|
||||||
|
session_id, "registered_user_id", user_id
|
||||||
|
)
|
||||||
|
|
||||||
if result and LoginType.EMAIL_IDENTITY in result:
|
if result and LoginType.EMAIL_IDENTITY in result:
|
||||||
threepid = result[LoginType.EMAIL_IDENTITY]
|
threepid = result[LoginType.EMAIL_IDENTITY]
|
||||||
|
|
||||||
|
141
synapse/state.py
141
synapse/state.py
@ -18,6 +18,7 @@ from twisted.internet import defer
|
|||||||
|
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
|
from synapse.util.metrics import Measure
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import AuthError
|
from synapse.api.errors import AuthError
|
||||||
from synapse.api.auth import AuthEventTypes
|
from synapse.api.auth import AuthEventTypes
|
||||||
@ -27,6 +28,7 @@ from collections import namedtuple
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import os
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -34,8 +36,11 @@ logger = logging.getLogger(__name__)
|
|||||||
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
|
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
|
||||||
|
|
||||||
|
|
||||||
SIZE_OF_CACHE = 1000
|
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
|
||||||
EVICTION_TIMEOUT_SECONDS = 20
|
|
||||||
|
|
||||||
|
SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR)
|
||||||
|
EVICTION_TIMEOUT_SECONDS = 60 * 60
|
||||||
|
|
||||||
|
|
||||||
class _StateCacheEntry(object):
|
class _StateCacheEntry(object):
|
||||||
@ -85,16 +90,8 @@ class StateHandler(object):
|
|||||||
"""
|
"""
|
||||||
event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
|
event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
|
||||||
|
|
||||||
cache = None
|
res = yield self.resolve_state_groups(room_id, event_ids)
|
||||||
if self._state_cache is not None:
|
state = res[1]
|
||||||
cache = self._state_cache.get(frozenset(event_ids), None)
|
|
||||||
|
|
||||||
if cache:
|
|
||||||
cache.ts = self.clock.time_msec()
|
|
||||||
state = cache.state
|
|
||||||
else:
|
|
||||||
res = yield self.resolve_state_groups(room_id, event_ids)
|
|
||||||
state = res[1]
|
|
||||||
|
|
||||||
if event_type:
|
if event_type:
|
||||||
defer.returnValue(state.get((event_type, state_key)))
|
defer.returnValue(state.get((event_type, state_key)))
|
||||||
@ -186,20 +183,6 @@ class StateHandler(object):
|
|||||||
"""
|
"""
|
||||||
logger.debug("resolve_state_groups event_ids %s", event_ids)
|
logger.debug("resolve_state_groups event_ids %s", event_ids)
|
||||||
|
|
||||||
if self._state_cache is not None:
|
|
||||||
cache = self._state_cache.get(frozenset(event_ids), None)
|
|
||||||
if cache and cache.state_group:
|
|
||||||
cache.ts = self.clock.time_msec()
|
|
||||||
prev_state = cache.state.get((event_type, state_key), None)
|
|
||||||
if prev_state:
|
|
||||||
prev_state = prev_state.event_id
|
|
||||||
prev_states = [prev_state]
|
|
||||||
else:
|
|
||||||
prev_states = []
|
|
||||||
defer.returnValue(
|
|
||||||
(cache.state_group, cache.state, prev_states)
|
|
||||||
)
|
|
||||||
|
|
||||||
state_groups = yield self.store.get_state_groups(
|
state_groups = yield self.store.get_state_groups(
|
||||||
room_id, event_ids
|
room_id, event_ids
|
||||||
)
|
)
|
||||||
@ -209,7 +192,7 @@ class StateHandler(object):
|
|||||||
state_groups.keys()
|
state_groups.keys()
|
||||||
)
|
)
|
||||||
|
|
||||||
group_names = set(state_groups.keys())
|
group_names = frozenset(state_groups.keys())
|
||||||
if len(group_names) == 1:
|
if len(group_names) == 1:
|
||||||
name, state_list = state_groups.items().pop()
|
name, state_list = state_groups.items().pop()
|
||||||
state = {
|
state = {
|
||||||
@ -223,29 +206,38 @@ class StateHandler(object):
|
|||||||
else:
|
else:
|
||||||
prev_states = []
|
prev_states = []
|
||||||
|
|
||||||
if self._state_cache is not None:
|
|
||||||
cache = _StateCacheEntry(
|
|
||||||
state=state,
|
|
||||||
state_group=name,
|
|
||||||
ts=self.clock.time_msec()
|
|
||||||
)
|
|
||||||
|
|
||||||
self._state_cache[frozenset(event_ids)] = cache
|
|
||||||
|
|
||||||
defer.returnValue((name, state, prev_states))
|
defer.returnValue((name, state, prev_states))
|
||||||
|
|
||||||
|
if self._state_cache is not None:
|
||||||
|
cache = self._state_cache.get(group_names, None)
|
||||||
|
if cache and cache.state_group:
|
||||||
|
cache.ts = self.clock.time_msec()
|
||||||
|
|
||||||
|
event_dict = yield self.store.get_events(cache.state.values())
|
||||||
|
state = {(e.type, e.state_key): e for e in event_dict.values()}
|
||||||
|
|
||||||
|
prev_state = state.get((event_type, state_key), None)
|
||||||
|
if prev_state:
|
||||||
|
prev_state = prev_state.event_id
|
||||||
|
prev_states = [prev_state]
|
||||||
|
else:
|
||||||
|
prev_states = []
|
||||||
|
defer.returnValue(
|
||||||
|
(cache.state_group, state, prev_states)
|
||||||
|
)
|
||||||
|
|
||||||
new_state, prev_states = self._resolve_events(
|
new_state, prev_states = self._resolve_events(
|
||||||
state_groups.values(), event_type, state_key
|
state_groups.values(), event_type, state_key
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._state_cache is not None:
|
if self._state_cache is not None:
|
||||||
cache = _StateCacheEntry(
|
cache = _StateCacheEntry(
|
||||||
state=new_state,
|
state={key: event.event_id for key, event in new_state.items()},
|
||||||
state_group=None,
|
state_group=None,
|
||||||
ts=self.clock.time_msec()
|
ts=self.clock.time_msec()
|
||||||
)
|
)
|
||||||
|
|
||||||
self._state_cache[frozenset(event_ids)] = cache
|
self._state_cache[group_names] = cache
|
||||||
|
|
||||||
defer.returnValue((None, new_state, prev_states))
|
defer.returnValue((None, new_state, prev_states))
|
||||||
|
|
||||||
@ -263,48 +255,49 @@ class StateHandler(object):
|
|||||||
from (type, state_key) to event. prev_states is a list of event_ids.
|
from (type, state_key) to event. prev_states is a list of event_ids.
|
||||||
:rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
|
:rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
|
||||||
"""
|
"""
|
||||||
state = {}
|
with Measure(self.clock, "state._resolve_events"):
|
||||||
for st in state_sets:
|
state = {}
|
||||||
for e in st:
|
for st in state_sets:
|
||||||
state.setdefault(
|
for e in st:
|
||||||
(e.type, e.state_key),
|
state.setdefault(
|
||||||
{}
|
(e.type, e.state_key),
|
||||||
)[e.event_id] = e
|
{}
|
||||||
|
)[e.event_id] = e
|
||||||
|
|
||||||
unconflicted_state = {
|
unconflicted_state = {
|
||||||
k: v.values()[0] for k, v in state.items()
|
k: v.values()[0] for k, v in state.items()
|
||||||
if len(v.values()) == 1
|
if len(v.values()) == 1
|
||||||
}
|
}
|
||||||
|
|
||||||
conflicted_state = {
|
conflicted_state = {
|
||||||
k: v.values()
|
k: v.values()
|
||||||
for k, v in state.items()
|
for k, v in state.items()
|
||||||
if len(v.values()) > 1
|
if len(v.values()) > 1
|
||||||
}
|
}
|
||||||
|
|
||||||
if event_type:
|
if event_type:
|
||||||
prev_states_events = conflicted_state.get(
|
prev_states_events = conflicted_state.get(
|
||||||
(event_type, state_key), []
|
(event_type, state_key), []
|
||||||
)
|
)
|
||||||
prev_states = [s.event_id for s in prev_states_events]
|
prev_states = [s.event_id for s in prev_states_events]
|
||||||
else:
|
else:
|
||||||
prev_states = []
|
prev_states = []
|
||||||
|
|
||||||
auth_events = {
|
auth_events = {
|
||||||
k: e for k, e in unconflicted_state.items()
|
k: e for k, e in unconflicted_state.items()
|
||||||
if k[0] in AuthEventTypes
|
if k[0] in AuthEventTypes
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
resolved_state = self._resolve_state_events(
|
resolved_state = self._resolve_state_events(
|
||||||
conflicted_state, auth_events
|
conflicted_state, auth_events
|
||||||
)
|
)
|
||||||
except:
|
except:
|
||||||
logger.exception("Failed to resolve state")
|
logger.exception("Failed to resolve state")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
new_state = unconflicted_state
|
new_state = unconflicted_state
|
||||||
new_state.update(resolved_state)
|
new_state.update(resolved_state)
|
||||||
|
|
||||||
return new_state, prev_states
|
return new_state, prev_states
|
||||||
|
|
||||||
|
@ -119,12 +119,15 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||||||
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
|
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
|
||||||
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
||||||
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
|
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
|
||||||
self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id")
|
|
||||||
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
||||||
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
|
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
|
||||||
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
||||||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
||||||
)
|
)
|
||||||
|
self._pushers_id_gen = StreamIdGenerator(
|
||||||
|
db_conn, "pushers", "id",
|
||||||
|
extra_tables=[("deleted_pushers", "stream_id")],
|
||||||
|
)
|
||||||
|
|
||||||
events_max = self._stream_id_gen.get_max_token()
|
events_max = self._stream_id_gen.get_max_token()
|
||||||
event_cache_prefill, min_event_val = self._get_cache_dict(
|
event_cache_prefill, min_event_val = self._get_cache_dict(
|
||||||
|
@ -18,6 +18,7 @@ from synapse.api.errors import StoreError
|
|||||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||||
from synapse.util.caches.descriptors import Cache
|
from synapse.util.caches.descriptors import Cache
|
||||||
|
from synapse.util.caches import intern_dict
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
|
|
||||||
@ -26,6 +27,10 @@ from twisted.internet import defer
|
|||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -163,7 +168,9 @@ class SQLBaseStore(object):
|
|||||||
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
|
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
|
||||||
max_entries=hs.config.event_cache_size)
|
max_entries=hs.config.event_cache_size)
|
||||||
|
|
||||||
self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000)
|
self._state_group_cache = DictionaryCache(
|
||||||
|
"*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR
|
||||||
|
)
|
||||||
|
|
||||||
self._event_fetch_lock = threading.Condition()
|
self._event_fetch_lock = threading.Condition()
|
||||||
self._event_fetch_list = []
|
self._event_fetch_list = []
|
||||||
@ -344,7 +351,7 @@ class SQLBaseStore(object):
|
|||||||
"""
|
"""
|
||||||
col_headers = list(column[0] for column in cursor.description)
|
col_headers = list(column[0] for column in cursor.description)
|
||||||
results = list(
|
results = list(
|
||||||
dict(zip(col_headers, row)) for row in cursor.fetchall()
|
intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall()
|
||||||
)
|
)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@ -155,7 +155,7 @@ class DirectoryStore(SQLBaseStore):
|
|||||||
|
|
||||||
return room_id
|
return room_id
|
||||||
|
|
||||||
@cached()
|
@cached(max_entries=5000)
|
||||||
def get_aliases_for_room(self, room_id):
|
def get_aliases_for_room(self, room_id):
|
||||||
return self._simple_select_onecol(
|
return self._simple_select_onecol(
|
||||||
"room_aliases",
|
"room_aliases",
|
||||||
|
@ -49,7 +49,7 @@ class EventPushActionsStore(SQLBaseStore):
|
|||||||
)
|
)
|
||||||
self._simple_insert_many_txn(txn, "event_push_actions", values)
|
self._simple_insert_many_txn(txn, "event_push_actions", values)
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=3, lru=True, tree=True)
|
@cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000)
|
||||||
def get_unread_event_push_actions_by_room_for_user(
|
def get_unread_event_push_actions_by_room_for_user(
|
||||||
self, room_id, user_id, last_read_event_id
|
self, room_id, user_id, last_read_event_id
|
||||||
):
|
):
|
||||||
|
@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def persist_event(self, event, context, backfilled=False,
|
def persist_event(self, event, context,
|
||||||
is_new_state=True, current_state=None):
|
is_new_state=True, current_state=None):
|
||||||
stream_ordering = None
|
|
||||||
if backfilled:
|
|
||||||
self.min_stream_token -= 1
|
|
||||||
stream_ordering = self.min_stream_token
|
|
||||||
|
|
||||||
if stream_ordering is None:
|
|
||||||
stream_ordering_manager = self._stream_id_gen.get_next()
|
|
||||||
else:
|
|
||||||
@contextmanager
|
|
||||||
def stream_ordering_manager():
|
|
||||||
yield stream_ordering
|
|
||||||
stream_ordering_manager = stream_ordering_manager()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with stream_ordering_manager as stream_ordering:
|
with self._stream_id_gen.get_next() as stream_ordering:
|
||||||
event.internal_metadata.stream_ordering = stream_ordering
|
event.internal_metadata.stream_ordering = stream_ordering
|
||||||
yield self.runInteraction(
|
yield self.runInteraction(
|
||||||
"persist_event",
|
"persist_event",
|
||||||
self._persist_event_txn,
|
self._persist_event_txn,
|
||||||
event=event,
|
event=event,
|
||||||
context=context,
|
context=context,
|
||||||
backfilled=backfilled,
|
|
||||||
is_new_state=is_new_state,
|
is_new_state=is_new_state,
|
||||||
current_state=current_state,
|
current_state=current_state,
|
||||||
)
|
)
|
||||||
@ -165,13 +151,38 @@ class EventsStore(SQLBaseStore):
|
|||||||
|
|
||||||
defer.returnValue(events[0] if events else None)
|
defer.returnValue(events[0] if events else None)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_events(self, event_ids, check_redacted=True,
|
||||||
|
get_prev_content=False, allow_rejected=False):
|
||||||
|
"""Get events from the database
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_ids (list): The event_ids of the events to fetch
|
||||||
|
check_redacted (bool): If True, check if event has been redacted
|
||||||
|
and redact it.
|
||||||
|
get_prev_content (bool): If True and event is a state event,
|
||||||
|
include the previous states content in the unsigned field.
|
||||||
|
allow_rejected (bool): If True return rejected events.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred : Dict from event_id to event.
|
||||||
|
"""
|
||||||
|
events = yield self._get_events(
|
||||||
|
event_ids,
|
||||||
|
check_redacted=check_redacted,
|
||||||
|
get_prev_content=get_prev_content,
|
||||||
|
allow_rejected=allow_rejected,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue({e.event_id: e for e in events})
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def _persist_event_txn(self, txn, event, context, backfilled,
|
def _persist_event_txn(self, txn, event, context,
|
||||||
is_new_state=True, current_state=None):
|
is_new_state=True, current_state=None):
|
||||||
# We purposefully do this first since if we include a `current_state`
|
# We purposefully do this first since if we include a `current_state`
|
||||||
# key, we *want* to update the `current_state_events` table
|
# key, we *want* to update the `current_state_events` table
|
||||||
if current_state:
|
if current_state:
|
||||||
txn.call_after(self.get_current_state_for_key.invalidate_all)
|
txn.call_after(self._get_current_state_for_key.invalidate_all)
|
||||||
txn.call_after(self.get_rooms_for_user.invalidate_all)
|
txn.call_after(self.get_rooms_for_user.invalidate_all)
|
||||||
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
||||||
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
||||||
@ -198,7 +209,7 @@ class EventsStore(SQLBaseStore):
|
|||||||
return self._persist_events_txn(
|
return self._persist_events_txn(
|
||||||
txn,
|
txn,
|
||||||
[(event, context)],
|
[(event, context)],
|
||||||
backfilled=backfilled,
|
backfilled=False,
|
||||||
is_new_state=is_new_state,
|
is_new_state=is_new_state,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -455,7 +466,7 @@ class EventsStore(SQLBaseStore):
|
|||||||
for event, _ in state_events_and_contexts:
|
for event, _ in state_events_and_contexts:
|
||||||
if not context.rejected:
|
if not context.rejected:
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.get_current_state_for_key.invalidate,
|
self._get_current_state_for_key.invalidate,
|
||||||
(event.room_id, event.type, event.state_key,)
|
(event.room_id, event.type, event.state_key,)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -526,6 +537,9 @@ class EventsStore(SQLBaseStore):
|
|||||||
if not event_ids:
|
if not event_ids:
|
||||||
defer.returnValue([])
|
defer.returnValue([])
|
||||||
|
|
||||||
|
event_id_list = event_ids
|
||||||
|
event_ids = set(event_ids)
|
||||||
|
|
||||||
event_map = self._get_events_from_cache(
|
event_map = self._get_events_from_cache(
|
||||||
event_ids,
|
event_ids,
|
||||||
check_redacted=check_redacted,
|
check_redacted=check_redacted,
|
||||||
@ -535,23 +549,18 @@ class EventsStore(SQLBaseStore):
|
|||||||
|
|
||||||
missing_events_ids = [e for e in event_ids if e not in event_map]
|
missing_events_ids = [e for e in event_ids if e not in event_map]
|
||||||
|
|
||||||
if not missing_events_ids:
|
if missing_events_ids:
|
||||||
defer.returnValue([
|
missing_events = yield self._enqueue_events(
|
||||||
event_map[e_id] for e_id in event_ids
|
missing_events_ids,
|
||||||
if e_id in event_map and event_map[e_id]
|
check_redacted=check_redacted,
|
||||||
])
|
get_prev_content=get_prev_content,
|
||||||
|
allow_rejected=allow_rejected,
|
||||||
|
)
|
||||||
|
|
||||||
missing_events = yield self._enqueue_events(
|
event_map.update(missing_events)
|
||||||
missing_events_ids,
|
|
||||||
check_redacted=check_redacted,
|
|
||||||
get_prev_content=get_prev_content,
|
|
||||||
allow_rejected=allow_rejected,
|
|
||||||
)
|
|
||||||
|
|
||||||
event_map.update(missing_events)
|
|
||||||
|
|
||||||
defer.returnValue([
|
defer.returnValue([
|
||||||
event_map[e_id] for e_id in event_ids
|
event_map[e_id] for e_id in event_id_list
|
||||||
if e_id in event_map and event_map[e_id]
|
if e_id in event_map and event_map[e_id]
|
||||||
])
|
])
|
||||||
|
|
||||||
|
@ -16,8 +16,6 @@
|
|||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
@ -79,12 +77,41 @@ class PusherStore(SQLBaseStore):
|
|||||||
rows = yield self.runInteraction("get_all_pushers", get_pushers)
|
rows = yield self.runInteraction("get_all_pushers", get_pushers)
|
||||||
defer.returnValue(rows)
|
defer.returnValue(rows)
|
||||||
|
|
||||||
|
def get_pushers_stream_token(self):
|
||||||
|
return self._pushers_id_gen.get_max_token()
|
||||||
|
|
||||||
|
def get_all_updated_pushers(self, last_id, current_id, limit):
|
||||||
|
def get_all_updated_pushers_txn(txn):
|
||||||
|
sql = (
|
||||||
|
"SELECT id, user_name, access_token, profile_tag, kind,"
|
||||||
|
" app_id, app_display_name, device_display_name, pushkey, ts,"
|
||||||
|
" lang, data"
|
||||||
|
" FROM pushers"
|
||||||
|
" WHERE ? < id AND id <= ?"
|
||||||
|
" ORDER BY id ASC LIMIT ?"
|
||||||
|
)
|
||||||
|
txn.execute(sql, (last_id, current_id, limit))
|
||||||
|
updated = txn.fetchall()
|
||||||
|
|
||||||
|
sql = (
|
||||||
|
"SELECT stream_id, user_id, app_id, pushkey"
|
||||||
|
" FROM deleted_pushers"
|
||||||
|
" WHERE ? < stream_id AND stream_id <= ?"
|
||||||
|
" ORDER BY stream_id ASC LIMIT ?"
|
||||||
|
)
|
||||||
|
txn.execute(sql, (last_id, current_id, limit))
|
||||||
|
deleted = txn.fetchall()
|
||||||
|
|
||||||
|
return (updated, deleted)
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_all_updated_pushers", get_all_updated_pushers_txn
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def add_pusher(self, user_id, access_token, kind, app_id,
|
def add_pusher(self, user_id, access_token, kind, app_id,
|
||||||
app_display_name, device_display_name,
|
app_display_name, device_display_name,
|
||||||
pushkey, pushkey_ts, lang, data, profile_tag=""):
|
pushkey, pushkey_ts, lang, data, profile_tag=""):
|
||||||
try:
|
with self._pushers_id_gen.get_next() as stream_id:
|
||||||
next_id = self._pushers_id_gen.get_next()
|
|
||||||
yield self._simple_upsert(
|
yield self._simple_upsert(
|
||||||
"pushers",
|
"pushers",
|
||||||
dict(
|
dict(
|
||||||
@ -101,23 +128,29 @@ class PusherStore(SQLBaseStore):
|
|||||||
lang=lang,
|
lang=lang,
|
||||||
data=encode_canonical_json(data),
|
data=encode_canonical_json(data),
|
||||||
profile_tag=profile_tag,
|
profile_tag=profile_tag,
|
||||||
),
|
id=stream_id,
|
||||||
insertion_values=dict(
|
|
||||||
id=next_id,
|
|
||||||
),
|
),
|
||||||
desc="add_pusher",
|
desc="add_pusher",
|
||||||
)
|
)
|
||||||
except Exception as e:
|
|
||||||
logger.error("create_pusher with failed: %s", e)
|
|
||||||
raise StoreError(500, "Problem creating pusher.")
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
|
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
|
||||||
yield self._simple_delete_one(
|
def delete_pusher_txn(txn, stream_id):
|
||||||
"pushers",
|
self._simple_delete_one_txn(
|
||||||
{"app_id": app_id, "pushkey": pushkey, 'user_name': user_id},
|
txn,
|
||||||
desc="delete_pusher_by_app_id_pushkey_user_id",
|
"pushers",
|
||||||
)
|
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
|
||||||
|
)
|
||||||
|
self._simple_upsert_txn(
|
||||||
|
txn,
|
||||||
|
"deleted_pushers",
|
||||||
|
{"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
|
||||||
|
{"stream_id": stream_id},
|
||||||
|
)
|
||||||
|
with self._pushers_id_gen.get_next() as stream_id:
|
||||||
|
yield self.runInteraction(
|
||||||
|
"delete_pusher", delete_pusher_txn, stream_id
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
|
def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
|
||||||
|
@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore):
|
|||||||
|
|
||||||
@cachedInlineCallbacks(num_args=2)
|
@cachedInlineCallbacks(num_args=2)
|
||||||
def get_receipts_for_user(self, user_id, receipt_type):
|
def get_receipts_for_user(self, user_id, receipt_type):
|
||||||
def f(txn):
|
rows = yield self._simple_select_list(
|
||||||
sql = (
|
table="receipts_linearized",
|
||||||
"SELECT room_id,event_id "
|
keyvalues={
|
||||||
"FROM receipts_linearized "
|
"user_id": user_id,
|
||||||
"WHERE user_id = ? AND receipt_type = ? "
|
"receipt_type": receipt_type,
|
||||||
)
|
},
|
||||||
txn.execute(sql, (user_id, receipt_type))
|
retcols=("room_id", "event_id"),
|
||||||
return txn.fetchall()
|
desc="get_receipts_for_user",
|
||||||
|
)
|
||||||
|
|
||||||
defer.returnValue(dict(
|
defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
|
||||||
(yield self.runInteraction("get_receipts_for_user", f))
|
|
||||||
))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
|
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
|
||||||
|
@ -77,6 +77,14 @@ class RoomStore(SQLBaseStore):
|
|||||||
allow_none=True,
|
allow_none=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def set_room_is_public(self, room_id, is_public):
|
||||||
|
return self._simple_update_one(
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
updatevalues={"is_public": is_public},
|
||||||
|
desc="set_room_is_public",
|
||||||
|
)
|
||||||
|
|
||||||
def get_public_room_ids(self):
|
def get_public_room_ids(self):
|
||||||
return self._simple_select_onecol(
|
return self._simple_select_onecol(
|
||||||
table="rooms",
|
table="rooms",
|
||||||
|
@ -115,19 +115,17 @@ class RoomMemberStore(SQLBaseStore):
|
|||||||
).addCallback(self._get_events)
|
).addCallback(self._get_events)
|
||||||
|
|
||||||
@cached()
|
@cached()
|
||||||
def get_invites_for_user(self, user_id):
|
def get_invited_rooms_for_user(self, user_id):
|
||||||
""" Get all the invite events for a user
|
""" Get all the rooms the user is invited to
|
||||||
Args:
|
Args:
|
||||||
user_id (str): The user ID.
|
user_id (str): The user ID.
|
||||||
Returns:
|
Returns:
|
||||||
A deferred list of event objects.
|
A deferred list of RoomsForUser.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.get_rooms_for_user_where_membership_is(
|
return self.get_rooms_for_user_where_membership_is(
|
||||||
user_id, [Membership.INVITE]
|
user_id, [Membership.INVITE]
|
||||||
).addCallback(lambda invites: self._get_events([
|
)
|
||||||
invite.event_id for invite in invites
|
|
||||||
]))
|
|
||||||
|
|
||||||
def get_leave_and_ban_events_for_user(self, user_id):
|
def get_leave_and_ban_events_for_user(self, user_id):
|
||||||
""" Get all the leave events for a user
|
""" Get all the leave events for a user
|
||||||
@ -251,30 +249,6 @@ class RoomMemberStore(SQLBaseStore):
|
|||||||
user_id, membership_list=[Membership.JOIN],
|
user_id, membership_list=[Membership.JOIN],
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def user_rooms_intersect(self, user_id_list):
|
|
||||||
""" Checks whether all the users whose IDs are given in a list share a
|
|
||||||
room.
|
|
||||||
|
|
||||||
This is a "hot path" function that's called a lot, e.g. by presence for
|
|
||||||
generating the event stream. As such, it is implemented locally by
|
|
||||||
wrapping logic around heavily-cached database queries.
|
|
||||||
"""
|
|
||||||
if len(user_id_list) < 2:
|
|
||||||
defer.returnValue(True)
|
|
||||||
|
|
||||||
deferreds = [self.get_rooms_for_user(u) for u in user_id_list]
|
|
||||||
|
|
||||||
results = yield defer.DeferredList(deferreds, consumeErrors=True)
|
|
||||||
|
|
||||||
# A list of sets of strings giving room IDs for each user
|
|
||||||
room_id_lists = [set([r.room_id for r in result[1]]) for result in results]
|
|
||||||
|
|
||||||
# There isn't a setintersection(*list_of_sets)
|
|
||||||
ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0
|
|
||||||
|
|
||||||
defer.returnValue(ret)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def forget(self, user_id, room_id):
|
def forget(self, user_id, room_id):
|
||||||
"""Indicate that user_id wishes to discard history for room_id."""
|
"""Indicate that user_id wishes to discard history for room_id."""
|
||||||
|
25
synapse/storage/schema/delta/30/deleted_pushers.sql
Normal file
25
synapse/storage/schema/delta/30/deleted_pushers.sql
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
/* Copyright 2016 OpenMarket Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS deleted_pushers(
|
||||||
|
stream_id BIGINT NOT NULL,
|
||||||
|
app_id TEXT NOT NULL,
|
||||||
|
pushkey TEXT NOT NULL,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
/* We only track the most recent delete for each app_id, pushkey and user_id. */
|
||||||
|
UNIQUE (app_id, pushkey, user_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id);
|
23
synapse/storage/schema/delta/30/public_rooms.sql
Normal file
23
synapse/storage/schema/delta/30/public_rooms.sql
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
/* Copyright 2016 OpenMarket Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
/* This release removes the restriction that published rooms must have an alias,
|
||||||
|
* so we go back and ensure the only 'public' rooms are ones with an alias.
|
||||||
|
* We use (1 = 0) and (1 = 1) so that it works in both postgres and sqlite
|
||||||
|
*/
|
||||||
|
UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in (
|
||||||
|
SELECT room_id FROM room_aliases
|
||||||
|
);
|
@ -14,9 +14,8 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from synapse.util.caches.descriptors import (
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
cached, cachedInlineCallbacks, cachedList
|
from synapse.util.caches import intern_string
|
||||||
)
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
@ -155,8 +154,14 @@ class StateStore(SQLBaseStore):
|
|||||||
events = yield self._get_events(event_ids, get_prev_content=False)
|
events = yield self._get_events(event_ids, get_prev_content=False)
|
||||||
defer.returnValue(events)
|
defer.returnValue(events)
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=3)
|
@defer.inlineCallbacks
|
||||||
def get_current_state_for_key(self, room_id, event_type, state_key):
|
def get_current_state_for_key(self, room_id, event_type, state_key):
|
||||||
|
event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key)
|
||||||
|
events = yield self._get_events(event_ids, get_prev_content=False)
|
||||||
|
defer.returnValue(events)
|
||||||
|
|
||||||
|
@cached(num_args=3)
|
||||||
|
def _get_current_state_for_key(self, room_id, event_type, state_key):
|
||||||
def f(txn):
|
def f(txn):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT event_id FROM current_state_events"
|
"SELECT event_id FROM current_state_events"
|
||||||
@ -167,12 +172,10 @@ class StateStore(SQLBaseStore):
|
|||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
results = txn.fetchall()
|
results = txn.fetchall()
|
||||||
return [r[0] for r in results]
|
return [r[0] for r in results]
|
||||||
event_ids = yield self.runInteraction("get_current_state_for_key", f)
|
return self.runInteraction("get_current_state_for_key", f)
|
||||||
events = yield self._get_events(event_ids, get_prev_content=False)
|
|
||||||
defer.returnValue(events)
|
|
||||||
|
|
||||||
def _get_state_groups_from_groups(self, groups, types):
|
def _get_state_groups_from_groups(self, groups, types):
|
||||||
"""Returns dictionary state_group -> state event ids
|
"""Returns dictionary state_group -> (dict of (type, state_key) -> event id)
|
||||||
"""
|
"""
|
||||||
def f(txn, groups):
|
def f(txn, groups):
|
||||||
if types is not None:
|
if types is not None:
|
||||||
@ -183,7 +186,8 @@ class StateStore(SQLBaseStore):
|
|||||||
where_clause = ""
|
where_clause = ""
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT state_group, event_id FROM state_groups_state WHERE"
|
"SELECT state_group, event_id, type, state_key"
|
||||||
|
" FROM state_groups_state WHERE"
|
||||||
" state_group IN (%s) %s" % (
|
" state_group IN (%s) %s" % (
|
||||||
",".join("?" for _ in groups),
|
",".join("?" for _ in groups),
|
||||||
where_clause,
|
where_clause,
|
||||||
@ -199,7 +203,8 @@ class StateStore(SQLBaseStore):
|
|||||||
|
|
||||||
results = {}
|
results = {}
|
||||||
for row in rows:
|
for row in rows:
|
||||||
results.setdefault(row["state_group"], []).append(row["event_id"])
|
key = (row["type"], row["state_key"])
|
||||||
|
results.setdefault(row["state_group"], {})[key] = row["event_id"]
|
||||||
return results
|
return results
|
||||||
|
|
||||||
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
|
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
|
||||||
@ -296,7 +301,7 @@ class StateStore(SQLBaseStore):
|
|||||||
where a `state_key` of `None` matches all state_keys for the
|
where a `state_key` of `None` matches all state_keys for the
|
||||||
`type`.
|
`type`.
|
||||||
"""
|
"""
|
||||||
is_all, state_dict = self._state_group_cache.get(group)
|
is_all, state_dict_ids = self._state_group_cache.get(group)
|
||||||
|
|
||||||
type_to_key = {}
|
type_to_key = {}
|
||||||
missing_types = set()
|
missing_types = set()
|
||||||
@ -308,7 +313,7 @@ class StateStore(SQLBaseStore):
|
|||||||
if type_to_key.get(typ, object()) is not None:
|
if type_to_key.get(typ, object()) is not None:
|
||||||
type_to_key.setdefault(typ, set()).add(state_key)
|
type_to_key.setdefault(typ, set()).add(state_key)
|
||||||
|
|
||||||
if (typ, state_key) not in state_dict:
|
if (typ, state_key) not in state_dict_ids:
|
||||||
missing_types.add((typ, state_key))
|
missing_types.add((typ, state_key))
|
||||||
|
|
||||||
sentinel = object()
|
sentinel = object()
|
||||||
@ -326,7 +331,7 @@ class StateStore(SQLBaseStore):
|
|||||||
got_all = not (missing_types or types is None)
|
got_all = not (missing_types or types is None)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
k: v for k, v in state_dict.items()
|
k: v for k, v in state_dict_ids.items()
|
||||||
if include(k[0], k[1])
|
if include(k[0], k[1])
|
||||||
}, missing_types, got_all
|
}, missing_types, got_all
|
||||||
|
|
||||||
@ -340,8 +345,9 @@ class StateStore(SQLBaseStore):
|
|||||||
Args:
|
Args:
|
||||||
group: The state group to lookup
|
group: The state group to lookup
|
||||||
"""
|
"""
|
||||||
is_all, state_dict = self._state_group_cache.get(group)
|
is_all, state_dict_ids = self._state_group_cache.get(group)
|
||||||
return state_dict, is_all
|
|
||||||
|
return state_dict_ids, is_all
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_state_for_groups(self, groups, types=None):
|
def _get_state_for_groups(self, groups, types=None):
|
||||||
@ -354,84 +360,72 @@ class StateStore(SQLBaseStore):
|
|||||||
missing_groups = []
|
missing_groups = []
|
||||||
if types is not None:
|
if types is not None:
|
||||||
for group in set(groups):
|
for group in set(groups):
|
||||||
state_dict, missing_types, got_all = self._get_some_state_from_cache(
|
state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
|
||||||
group, types
|
group, types
|
||||||
)
|
)
|
||||||
results[group] = state_dict
|
results[group] = state_dict_ids
|
||||||
|
|
||||||
if not got_all:
|
if not got_all:
|
||||||
missing_groups.append(group)
|
missing_groups.append(group)
|
||||||
else:
|
else:
|
||||||
for group in set(groups):
|
for group in set(groups):
|
||||||
state_dict, got_all = self._get_all_state_from_cache(
|
state_dict_ids, got_all = self._get_all_state_from_cache(
|
||||||
group
|
group
|
||||||
)
|
)
|
||||||
results[group] = state_dict
|
|
||||||
|
results[group] = state_dict_ids
|
||||||
|
|
||||||
if not got_all:
|
if not got_all:
|
||||||
missing_groups.append(group)
|
missing_groups.append(group)
|
||||||
|
|
||||||
if not missing_groups:
|
if missing_groups:
|
||||||
defer.returnValue({
|
# Okay, so we have some missing_types, lets fetch them.
|
||||||
group: {
|
cache_seq_num = self._state_group_cache.sequence
|
||||||
type_tuple: event
|
|
||||||
for type_tuple, event in state.items()
|
|
||||||
if event
|
|
||||||
}
|
|
||||||
for group, state in results.items()
|
|
||||||
})
|
|
||||||
|
|
||||||
# Okay, so we have some missing_types, lets fetch them.
|
group_to_state_dict = yield self._get_state_groups_from_groups(
|
||||||
cache_seq_num = self._state_group_cache.sequence
|
missing_groups, types
|
||||||
|
)
|
||||||
|
|
||||||
group_state_dict = yield self._get_state_groups_from_groups(
|
# Now we want to update the cache with all the things we fetched
|
||||||
missing_groups, types
|
# from the database.
|
||||||
)
|
for group, group_state_dict in group_to_state_dict.items():
|
||||||
|
if types:
|
||||||
|
# We delibrately put key -> None mappings into the cache to
|
||||||
|
# cache absence of the key, on the assumption that if we've
|
||||||
|
# explicitly asked for some types then we will probably ask
|
||||||
|
# for them again.
|
||||||
|
state_dict = {
|
||||||
|
(intern_string(etype), intern_string(state_key)): None
|
||||||
|
for (etype, state_key) in types
|
||||||
|
}
|
||||||
|
state_dict.update(results[group])
|
||||||
|
results[group] = state_dict
|
||||||
|
else:
|
||||||
|
state_dict = results[group]
|
||||||
|
|
||||||
|
state_dict.update(group_state_dict)
|
||||||
|
|
||||||
|
self._state_group_cache.update(
|
||||||
|
cache_seq_num,
|
||||||
|
key=group,
|
||||||
|
value=state_dict,
|
||||||
|
full=(types is None),
|
||||||
|
)
|
||||||
|
|
||||||
state_events = yield self._get_events(
|
state_events = yield self._get_events(
|
||||||
[e_id for l in group_state_dict.values() for e_id in l],
|
[ev_id for sd in results.values() for ev_id in sd.values()],
|
||||||
get_prev_content=False
|
get_prev_content=False
|
||||||
)
|
)
|
||||||
|
|
||||||
state_events = {e.event_id: e for e in state_events}
|
state_events = {e.event_id: e for e in state_events}
|
||||||
|
|
||||||
# Now we want to update the cache with all the things we fetched
|
|
||||||
# from the database.
|
|
||||||
for group, state_ids in group_state_dict.items():
|
|
||||||
if types:
|
|
||||||
# We delibrately put key -> None mappings into the cache to
|
|
||||||
# cache absence of the key, on the assumption that if we've
|
|
||||||
# explicitly asked for some types then we will probably ask
|
|
||||||
# for them again.
|
|
||||||
state_dict = {key: None for key in types}
|
|
||||||
state_dict.update(results[group])
|
|
||||||
results[group] = state_dict
|
|
||||||
else:
|
|
||||||
state_dict = results[group]
|
|
||||||
|
|
||||||
for event_id in state_ids:
|
|
||||||
try:
|
|
||||||
state_event = state_events[event_id]
|
|
||||||
state_dict[(state_event.type, state_event.state_key)] = state_event
|
|
||||||
except KeyError:
|
|
||||||
# Hmm. So we do don't have that state event? Interesting.
|
|
||||||
logger.warn(
|
|
||||||
"Can't find state event %r for state group %r",
|
|
||||||
event_id, group,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._state_group_cache.update(
|
|
||||||
cache_seq_num,
|
|
||||||
key=group,
|
|
||||||
value=state_dict,
|
|
||||||
full=(types is None),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Remove all the entries with None values. The None values were just
|
# Remove all the entries with None values. The None values were just
|
||||||
# used for bookkeeping in the cache.
|
# used for bookkeeping in the cache.
|
||||||
for group, state_dict in results.items():
|
for group, state_dict in results.items():
|
||||||
results[group] = {
|
results[group] = {
|
||||||
key: event for key, event in state_dict.items() if event
|
key: state_events[event_id]
|
||||||
|
for key, event_id in state_dict.items()
|
||||||
|
if event_id and event_id in state_events
|
||||||
}
|
}
|
||||||
|
|
||||||
defer.returnValue(results)
|
defer.returnValue(results)
|
||||||
|
@ -36,7 +36,7 @@ what sort order was used:
|
|||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import preserve_fn
|
||||||
@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore):
|
|||||||
|
|
||||||
defer.returnValue((events, token))
|
defer.returnValue((events, token))
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=4)
|
@defer.inlineCallbacks
|
||||||
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
|
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
|
||||||
|
rows, token = yield self.get_recent_event_ids_for_room(
|
||||||
|
room_id, limit, end_token, from_token
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("stream before")
|
||||||
|
events = yield self._get_events(
|
||||||
|
[r["event_id"] for r in rows],
|
||||||
|
get_prev_content=True
|
||||||
|
)
|
||||||
|
logger.debug("stream after")
|
||||||
|
|
||||||
|
self._set_before_and_after(events, rows)
|
||||||
|
|
||||||
|
defer.returnValue((events, token))
|
||||||
|
|
||||||
|
@cached(num_args=4)
|
||||||
|
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
|
||||||
end_token = RoomStreamToken.parse_stream_token(end_token)
|
end_token = RoomStreamToken.parse_stream_token(end_token)
|
||||||
|
|
||||||
if from_token is None:
|
if from_token is None:
|
||||||
@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore):
|
|||||||
|
|
||||||
return rows, token
|
return rows, token
|
||||||
|
|
||||||
rows, token = yield self.runInteraction(
|
return self.runInteraction(
|
||||||
"get_recent_events_for_room", get_recent_events_for_room_txn
|
"get_recent_events_for_room", get_recent_events_for_room_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("stream before")
|
|
||||||
events = yield self._get_events(
|
|
||||||
[r["event_id"] for r in rows],
|
|
||||||
get_prev_content=True
|
|
||||||
)
|
|
||||||
logger.debug("stream after")
|
|
||||||
|
|
||||||
self._set_before_and_after(events, rows)
|
|
||||||
|
|
||||||
defer.returnValue((events, token))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_events_max_id(self, direction='f'):
|
def get_room_events_max_id(self, direction='f'):
|
||||||
token = yield self._stream_id_gen.get_max_token()
|
token = yield self._stream_id_gen.get_max_token()
|
||||||
|
@ -49,9 +49,14 @@ class StreamIdGenerator(object):
|
|||||||
with stream_id_gen.get_next() as stream_id:
|
with stream_id_gen.get_next() as stream_id:
|
||||||
# ... persist event ...
|
# ... persist event ...
|
||||||
"""
|
"""
|
||||||
def __init__(self, db_conn, table, column):
|
def __init__(self, db_conn, table, column, extra_tables=[]):
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._current_max = _load_max_id(db_conn, table, column)
|
self._current_max = _load_max_id(db_conn, table, column)
|
||||||
|
for table, column in extra_tables:
|
||||||
|
self._current_max = max(
|
||||||
|
self._current_max,
|
||||||
|
_load_max_id(db_conn, table, column)
|
||||||
|
)
|
||||||
self._unfinished_ids = deque()
|
self._unfinished_ids = deque()
|
||||||
|
|
||||||
def get_next(self):
|
def get_next(self):
|
||||||
|
@ -14,6 +14,10 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
from lrucache import LruCache
|
||||||
|
import os
|
||||||
|
|
||||||
|
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
|
||||||
|
|
||||||
DEBUG_CACHES = False
|
DEBUG_CACHES = False
|
||||||
|
|
||||||
@ -25,3 +29,56 @@ cache_counter = metrics.register_cache(
|
|||||||
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
|
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
|
||||||
labels=["name"],
|
labels=["name"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
|
||||||
|
caches_by_name["string_cache"] = _string_cache
|
||||||
|
|
||||||
|
|
||||||
|
KNOWN_KEYS = {
|
||||||
|
key: key for key in
|
||||||
|
(
|
||||||
|
"auth_events",
|
||||||
|
"content",
|
||||||
|
"depth",
|
||||||
|
"event_id",
|
||||||
|
"hashes",
|
||||||
|
"origin",
|
||||||
|
"origin_server_ts",
|
||||||
|
"prev_events",
|
||||||
|
"room_id",
|
||||||
|
"sender",
|
||||||
|
"signatures",
|
||||||
|
"state_key",
|
||||||
|
"type",
|
||||||
|
"unsigned",
|
||||||
|
"user_id",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def intern_string(string):
|
||||||
|
"""Takes a (potentially) unicode string and interns using custom cache
|
||||||
|
"""
|
||||||
|
return _string_cache.setdefault(string, string)
|
||||||
|
|
||||||
|
|
||||||
|
def intern_dict(dictionary):
|
||||||
|
"""Takes a dictionary and interns well known keys and their values
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
|
||||||
|
for key, value in dictionary.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _intern_known_values(key, value):
|
||||||
|
intern_str_keys = ("event_id", "room_id")
|
||||||
|
intern_unicode_keys = ("sender", "user_id", "type", "state_key")
|
||||||
|
|
||||||
|
if key in intern_str_keys:
|
||||||
|
return intern(value.encode('ascii'))
|
||||||
|
|
||||||
|
if key in intern_unicode_keys:
|
||||||
|
return intern_string(value)
|
||||||
|
|
||||||
|
return value
|
||||||
|
@ -29,6 +29,16 @@ def enumerate_leaves(node, depth):
|
|||||||
yield m
|
yield m
|
||||||
|
|
||||||
|
|
||||||
|
class _Node(object):
|
||||||
|
__slots__ = ["prev_node", "next_node", "key", "value"]
|
||||||
|
|
||||||
|
def __init__(self, prev_node, next_node, key, value):
|
||||||
|
self.prev_node = prev_node
|
||||||
|
self.next_node = next_node
|
||||||
|
self.key = key
|
||||||
|
self.value = value
|
||||||
|
|
||||||
|
|
||||||
class LruCache(object):
|
class LruCache(object):
|
||||||
"""
|
"""
|
||||||
Least-recently-used cache.
|
Least-recently-used cache.
|
||||||
@ -38,10 +48,9 @@ class LruCache(object):
|
|||||||
def __init__(self, max_size, keylen=1, cache_type=dict):
|
def __init__(self, max_size, keylen=1, cache_type=dict):
|
||||||
cache = cache_type()
|
cache = cache_type()
|
||||||
self.cache = cache # Used for introspection.
|
self.cache = cache # Used for introspection.
|
||||||
list_root = []
|
list_root = _Node(None, None, None, None)
|
||||||
list_root[:] = [list_root, list_root, None, None]
|
list_root.next_node = list_root
|
||||||
|
list_root.prev_node = list_root
|
||||||
PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
|
|
||||||
|
|
||||||
lock = threading.Lock()
|
lock = threading.Lock()
|
||||||
|
|
||||||
@ -55,36 +64,36 @@ class LruCache(object):
|
|||||||
|
|
||||||
def add_node(key, value):
|
def add_node(key, value):
|
||||||
prev_node = list_root
|
prev_node = list_root
|
||||||
next_node = prev_node[NEXT]
|
next_node = prev_node.next_node
|
||||||
node = [prev_node, next_node, key, value]
|
node = _Node(prev_node, next_node, key, value)
|
||||||
prev_node[NEXT] = node
|
prev_node.next_node = node
|
||||||
next_node[PREV] = node
|
next_node.prev_node = node
|
||||||
cache[key] = node
|
cache[key] = node
|
||||||
|
|
||||||
def move_node_to_front(node):
|
def move_node_to_front(node):
|
||||||
prev_node = node[PREV]
|
prev_node = node.prev_node
|
||||||
next_node = node[NEXT]
|
next_node = node.next_node
|
||||||
prev_node[NEXT] = next_node
|
prev_node.next_node = next_node
|
||||||
next_node[PREV] = prev_node
|
next_node.prev_node = prev_node
|
||||||
prev_node = list_root
|
prev_node = list_root
|
||||||
next_node = prev_node[NEXT]
|
next_node = prev_node.next_node
|
||||||
node[PREV] = prev_node
|
node.prev_node = prev_node
|
||||||
node[NEXT] = next_node
|
node.next_node = next_node
|
||||||
prev_node[NEXT] = node
|
prev_node.next_node = node
|
||||||
next_node[PREV] = node
|
next_node.prev_node = node
|
||||||
|
|
||||||
def delete_node(node):
|
def delete_node(node):
|
||||||
prev_node = node[PREV]
|
prev_node = node.prev_node
|
||||||
next_node = node[NEXT]
|
next_node = node.next_node
|
||||||
prev_node[NEXT] = next_node
|
prev_node.next_node = next_node
|
||||||
next_node[PREV] = prev_node
|
next_node.prev_node = prev_node
|
||||||
|
|
||||||
@synchronized
|
@synchronized
|
||||||
def cache_get(key, default=None):
|
def cache_get(key, default=None):
|
||||||
node = cache.get(key, None)
|
node = cache.get(key, None)
|
||||||
if node is not None:
|
if node is not None:
|
||||||
move_node_to_front(node)
|
move_node_to_front(node)
|
||||||
return node[VALUE]
|
return node.value
|
||||||
else:
|
else:
|
||||||
return default
|
return default
|
||||||
|
|
||||||
@ -93,25 +102,25 @@ class LruCache(object):
|
|||||||
node = cache.get(key, None)
|
node = cache.get(key, None)
|
||||||
if node is not None:
|
if node is not None:
|
||||||
move_node_to_front(node)
|
move_node_to_front(node)
|
||||||
node[VALUE] = value
|
node.value = value
|
||||||
else:
|
else:
|
||||||
add_node(key, value)
|
add_node(key, value)
|
||||||
if len(cache) > max_size:
|
if len(cache) > max_size:
|
||||||
todelete = list_root[PREV]
|
todelete = list_root.prev_node
|
||||||
delete_node(todelete)
|
delete_node(todelete)
|
||||||
cache.pop(todelete[KEY], None)
|
cache.pop(todelete.key, None)
|
||||||
|
|
||||||
@synchronized
|
@synchronized
|
||||||
def cache_set_default(key, value):
|
def cache_set_default(key, value):
|
||||||
node = cache.get(key, None)
|
node = cache.get(key, None)
|
||||||
if node is not None:
|
if node is not None:
|
||||||
return node[VALUE]
|
return node.value
|
||||||
else:
|
else:
|
||||||
add_node(key, value)
|
add_node(key, value)
|
||||||
if len(cache) > max_size:
|
if len(cache) > max_size:
|
||||||
todelete = list_root[PREV]
|
todelete = list_root.prev_node
|
||||||
delete_node(todelete)
|
delete_node(todelete)
|
||||||
cache.pop(todelete[KEY], None)
|
cache.pop(todelete.key, None)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@synchronized
|
@synchronized
|
||||||
@ -119,8 +128,8 @@ class LruCache(object):
|
|||||||
node = cache.get(key, None)
|
node = cache.get(key, None)
|
||||||
if node:
|
if node:
|
||||||
delete_node(node)
|
delete_node(node)
|
||||||
cache.pop(node[KEY], None)
|
cache.pop(node.key, None)
|
||||||
return node[VALUE]
|
return node.value
|
||||||
else:
|
else:
|
||||||
return default
|
return default
|
||||||
|
|
||||||
@ -137,8 +146,8 @@ class LruCache(object):
|
|||||||
|
|
||||||
@synchronized
|
@synchronized
|
||||||
def cache_clear():
|
def cache_clear():
|
||||||
list_root[NEXT] = list_root
|
list_root.next_node = list_root
|
||||||
list_root[PREV] = list_root
|
list_root.prev_node = list_root
|
||||||
cache.clear()
|
cache.clear()
|
||||||
|
|
||||||
@synchronized
|
@synchronized
|
||||||
|
@ -131,6 +131,7 @@ class ReplicationResourceCase(unittest.TestCase):
|
|||||||
test_timeout_tag_account_data = _test_timeout("tag_account_data")
|
test_timeout_tag_account_data = _test_timeout("tag_account_data")
|
||||||
test_timeout_backfill = _test_timeout("backfill")
|
test_timeout_backfill = _test_timeout("backfill")
|
||||||
test_timeout_push_rules = _test_timeout("push_rules")
|
test_timeout_push_rules = _test_timeout("push_rules")
|
||||||
|
test_timeout_pushers = _test_timeout("pushers")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_text_message(self, room_id, message):
|
def send_text_message(self, room_id, message):
|
||||||
|
@ -22,9 +22,10 @@ class RegisterRestServletTestCase(unittest.TestCase):
|
|||||||
side_effect=lambda x: defer.succeed(self.appservice))
|
side_effect=lambda x: defer.succeed(self.appservice))
|
||||||
)
|
)
|
||||||
|
|
||||||
self.auth_result = (False, None, None)
|
self.auth_result = (False, None, None, None)
|
||||||
self.auth_handler = Mock(
|
self.auth_handler = Mock(
|
||||||
check_auth=Mock(side_effect=lambda x, y, z: self.auth_result)
|
check_auth=Mock(side_effect=lambda x, y, z: self.auth_result),
|
||||||
|
get_session_data=Mock(return_value=None)
|
||||||
)
|
)
|
||||||
self.registration_handler = Mock()
|
self.registration_handler = Mock()
|
||||||
self.identity_handler = Mock()
|
self.identity_handler = Mock()
|
||||||
@ -112,7 +113,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
|
|||||||
self.auth_result = (True, None, {
|
self.auth_result = (True, None, {
|
||||||
"username": "kermit",
|
"username": "kermit",
|
||||||
"password": "monkey"
|
"password": "monkey"
|
||||||
})
|
}, None)
|
||||||
self.registration_handler.register = Mock(return_value=(user_id, token))
|
self.registration_handler.register = Mock(return_value=(user_id, token))
|
||||||
|
|
||||||
(code, result) = yield self.servlet.on_POST(self.request)
|
(code, result) = yield self.servlet.on_POST(self.request)
|
||||||
@ -135,7 +136,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
|
|||||||
self.auth_result = (True, None, {
|
self.auth_result = (True, None, {
|
||||||
"username": "kermit",
|
"username": "kermit",
|
||||||
"password": "monkey"
|
"password": "monkey"
|
||||||
})
|
}, None)
|
||||||
self.registration_handler.register = Mock(return_value=("@user:id", "t"))
|
self.registration_handler.register = Mock(return_value=("@user:id", "t"))
|
||||||
d = self.servlet.on_POST(self.request)
|
d = self.servlet.on_POST(self.request)
|
||||||
return self.assertFailure(d, SynapseError)
|
return self.assertFailure(d, SynapseError)
|
||||||
|
@ -91,11 +91,6 @@ class RoomMemberStoreTestCase(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
)
|
)
|
||||||
self.assertFalse(
|
|
||||||
(yield self.store.user_rooms_intersect(
|
|
||||||
[self.u_alice.to_string(), self.u_bob.to_string()]
|
|
||||||
))
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_two_members(self):
|
def test_two_members(self):
|
||||||
@ -108,11 +103,6 @@ class RoomMemberStoreTestCase(unittest.TestCase):
|
|||||||
yield self.store.get_room_members(self.room.to_string())
|
yield self.store.get_room_members(self.room.to_string())
|
||||||
)}
|
)}
|
||||||
)
|
)
|
||||||
self.assertTrue((
|
|
||||||
yield self.store.user_rooms_intersect([
|
|
||||||
self.u_alice.to_string(), self.u_bob.to_string()
|
|
||||||
])
|
|
||||||
))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_room_hosts(self):
|
def test_room_hosts(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user