Merge branch 'develop' of github.com:matrix-org/synapse into erikj/stop_fed_not_in_room

This commit is contained in:
Erik Johnston 2019-03-04 11:54:58 +00:00
commit fbc047f2a5
101 changed files with 1458 additions and 531 deletions

View file

@ -27,4 +27,4 @@ try:
except ImportError:
pass
__version__ = "0.99.1.1"
__version__ = "0.99.2"

View file

@ -48,6 +48,7 @@ from synapse.rest.client.v1.room import (
RoomMemberListRestServlet,
RoomStateRestServlet,
)
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@ -96,6 +97,7 @@ class ClientReaderServer(HomeServer):
RoomEventContextServlet(self).register(resource)
RegisterRestServlet(self).register(resource)
LoginRestServlet(self).register(resource)
ThreepidRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,

View file

@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
from synapse.api.urls import FEDERATION_PREFIX
from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer):
),
})
if name in ["keys", "federation"]:
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
root_resource = create_resource_tree(resources, NoResource())
_base.listen_tcp(

View file

@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
from synapse.api.errors import SynapseError
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@ -66,10 +66,15 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri.decode('ascii'),
headers=headers,
)
try:
result = yield self.http_client.get_json(
self.main_uri + request.uri.decode('ascii'),
headers=headers,
)
except HttpResponseException as e:
raise e.to_synapse_error()
defer.returnValue((200, result))
@defer.inlineCallbacks

View file

@ -555,6 +555,9 @@ def run(hs):
stats["memory_rss"] += process.memory_info().rss
stats["cpu_average"] += int(process.cpu_percent(interval=None))
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
logger.info("Reporting stats to matrix.org: %s" % (stats,))
try:
yield hs.get_simple_http_client().put_json(

View file

@ -47,5 +47,5 @@ class CaptchaConfig(Config):
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
"""

View file

@ -19,6 +19,8 @@ import warnings
from datetime import datetime
from hashlib import sha256
import six
from unpaddedbase64 import encode_base64
from OpenSSL import crypto
@ -36,9 +38,11 @@ class TlsConfig(Config):
acme_config = {}
self.acme_enabled = acme_config.get("enabled", False)
self.acme_url = acme_config.get(
# hyperlink complains on py2 if this is not a Unicode
self.acme_url = six.text_type(acme_config.get(
"url", u"https://acme-v01.api.letsencrypt.org/directory"
)
))
self.acme_port = acme_config.get("port", 80)
self.acme_bind_addresses = acme_config.get("bind_addresses", ['::', '0.0.0.0'])
self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30)
@ -55,7 +59,7 @@ class TlsConfig(Config):
)
if not self.tls_private_key_file:
raise ConfigError(
"tls_certificate_path must be specified if TLS-enabled listeners are "
"tls_private_key_path must be specified if TLS-enabled listeners are "
"configured."
)

View file

@ -17,6 +17,7 @@
import logging
from collections import namedtuple
from six import raise_from
from six.moves import urllib
from signedjson.key import (
@ -35,7 +36,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
from synapse.api.errors import Codes, RequestSendFailed, SynapseError
from synapse.api.errors import (
Codes,
HttpResponseException,
RequestSendFailed,
SynapseError,
)
from synapse.util import logcontext, unwrapFirstError
from synapse.util.logcontext import (
LoggingContext,
@ -44,6 +50,7 @@ from synapse.util.logcontext import (
run_in_background,
)
from synapse.util.metrics import Measure
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@ -367,13 +374,18 @@ class Keyring(object):
server_name_and_key_ids, perspective_name, perspective_keys
)
defer.returnValue(result)
except KeyLookupError as e:
logger.warning(
"Key lookup failed from %r: %s", perspective_name, e,
)
except Exception as e:
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
type(e).__name__, str(e),
)
defer.returnValue({})
defer.returnValue({})
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
@ -421,21 +433,30 @@ class Keyring(object):
# TODO(mark): Set the minimum_valid_until_ts to that needed by
# the events being validated or the current time if validating
# an incoming request.
query_response = yield self.client.post_json(
destination=perspective_name,
path="/_matrix/key/v2/query",
data={
u"server_keys": {
server_name: {
key_id: {
u"minimum_valid_until_ts": 0
} for key_id in key_ids
try:
query_response = yield self.client.post_json(
destination=perspective_name,
path="/_matrix/key/v2/query",
data={
u"server_keys": {
server_name: {
key_id: {
u"minimum_valid_until_ts": 0
} for key_id in key_ids
}
for server_name, key_ids in server_names_and_key_ids
}
for server_name, key_ids in server_names_and_key_ids
}
},
long_retries=True,
)
},
long_retries=True,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(
KeyLookupError("Failed to connect to remote server"), e,
)
except HttpResponseException as e:
raise_from(
KeyLookupError("Remote server returned an error"), e,
)
keys = {}
@ -502,11 +523,20 @@ class Keyring(object):
if requested_key_id in keys:
continue
response = yield self.client.get_json(
destination=server_name,
path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
ignore_backoff=True,
)
try:
response = yield self.client.get_json(
destination=server_name,
path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
ignore_backoff=True,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(
KeyLookupError("Failed to connect to remote server"), e,
)
except HttpResponseException as e:
raise_from(
KeyLookupError("Remote server returned an error"), e,
)
if (u"signatures" not in response
or server_name not in response[u"signatures"]):

View file

@ -33,6 +33,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import (
CodeMessageException,
Codes,
FederationDeniedError,
HttpResponseException,
SynapseError,
@ -792,10 +793,25 @@ class FederationClient(FederationBase):
defer.returnValue(content)
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, we
# assume that the remote understands the v2 invite API and this
# is a legitimate error.
if err.errcode != Codes.UNKNOWN:
raise err
# Otherwise, we assume that the remote server doesn't understand
# the v2 invite API.
if room_version in (RoomVersions.V1, RoomVersions.V2):
pass # We'll fall through
else:
raise Exception("Remote server is too old")
raise SynapseError(
400,
"User's homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
elif e.code == 403:
raise e.to_synapse_error()
else:

View file

@ -25,9 +25,10 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
FederationError,
IncompatibleRoomVersionError,
NotFoundError,
@ -239,8 +240,9 @@ class FederationServer(FederationBase):
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()),
)
yield concurrently_execute(
@ -386,6 +388,13 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_invite_request(self, origin, content, room_version):
if room_version not in KNOWN_ROOM_VERSIONS:
raise SynapseError(
400,
"Homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
@ -877,6 +886,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
if not self.config.use_presence and edu_type == "m.presence":
return
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(

View file

@ -393,7 +393,7 @@ class FederationStateServlet(BaseFederationServlet):
return self.handler.on_context_state_request(
origin,
context,
parse_string_from_args(query, "event_id", None),
parse_string_from_args(query, "event_id", None, required=True),
)
@ -404,7 +404,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
return self.handler.on_state_ids_request(
origin,
room_id,
parse_string_from_args(query, "event_id", None),
parse_string_from_args(query, "event_id", None, required=True),
)
@ -736,7 +736,8 @@ class PublicRoomList(BaseFederationServlet):
data = yield self.handler.get_local_public_room_list(
limit, since_token,
network_tuple=network_tuple
network_tuple=network_tuple,
from_federation=True,
)
defer.returnValue((200, data))

View file

@ -113,8 +113,7 @@ class GroupsServerHandler(object):
room_id = room_entry["room_id"]
joined_users = yield self.store.get_users_in_room(room_id)
entry = yield self.room_list_handler.generate_room_entry(
room_id, len(joined_users),
with_alias=False, allow_private=True,
room_id, len(joined_users), with_alias=False, allow_private=True,
)
entry = dict(entry) # so we don't change whats cached
entry.pop("room_id", None)
@ -544,8 +543,7 @@ class GroupsServerHandler(object):
joined_users = yield self.store.get_users_in_room(room_id)
entry = yield self.room_list_handler.generate_room_entry(
room_id, len(joined_users),
with_alias=False, allow_private=True,
room_id, len(joined_users), with_alias=False, allow_private=True,
)
if not entry:

View file

@ -770,10 +770,26 @@ class FederationHandler(BaseHandler):
set(auth_events.keys()) | set(state_events.keys())
)
# We now have a chunk of events plus associated state and auth chain to
# persist. We do the persistence in two steps:
# 1. Auth events and state get persisted as outliers, plus the
# backward extremities get persisted (as non-outliers).
# 2. The rest of the events in the chunk get persisted one by one, as
# each one depends on the previous event for its state.
#
# The important thing is that events in the chunk get persisted as
# non-outliers, including when those events are also in the state or
# auth chain. Caution must therefore be taken to ensure that they are
# not accidentally marked as outliers.
# Step 1a: persist auth events that *don't* appear in the chunk
ev_infos = []
for a in auth_events.values():
if a.event_id in seen_events:
# We only want to persist auth events as outliers that we haven't
# seen and aren't about to persist as part of the backfilled chunk.
if a.event_id in seen_events or a.event_id in event_map:
continue
a.internal_metadata.outlier = True
ev_infos.append({
"event": a,
@ -785,14 +801,21 @@ class FederationHandler(BaseHandler):
}
})
# Step 1b: persist the events in the chunk we fetched state for (i.e.
# the backwards extremities) as non-outliers.
for e_id in events_to_state:
# For paranoia we ensure that these events are marked as
# non-outliers
ev = event_map[e_id]
assert(not ev.internal_metadata.is_outlier())
ev_infos.append({
"event": event_map[e_id],
"event": ev,
"state": events_to_state[e_id],
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id in event_map[e_id].auth_event_ids()
for a_id in ev.auth_event_ids()
if a_id in auth_events
}
})
@ -802,12 +825,17 @@ class FederationHandler(BaseHandler):
backfilled=True,
)
# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
for event in events:
if event in events_to_state:
continue
# For paranoia we ensure that these events are marked as
# non-outliers
assert(not event.internal_metadata.is_outlier())
# We store these one at a time since each event depends on the
# previous to work out the state.
# TODO: We can probably do something more clever here.

View file

@ -436,10 +436,11 @@ class EventCreationHandler(object):
if event.is_state():
prev_state = yield self.deduplicate_state_event(event, context)
logger.info(
"Not bothering to persist duplicate state event %s", event.event_id,
)
if prev_state is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id, prev_state.event_id,
)
defer.returnValue(prev_state)
yield self.handle_new_client_event(

View file

@ -136,7 +136,11 @@ class PaginationHandler(object):
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
f = Failure()
logger.error(
"[purge] failed",
exc_info=(f.type, f.value, f.getTracebackObject()),
)
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
@ -254,7 +258,7 @@ class PaginationHandler(object):
})
state = None
if event_filter and event_filter.lazy_load_members():
if event_filter and event_filter.lazy_load_members() and len(events) > 0:
# TODO: remove redundant members
# FIXME: we also care about invite targets etc.

View file

@ -16,8 +16,8 @@ import logging
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util import logcontext
from ._base import BaseHandler
@ -59,7 +59,9 @@ class ReceiptsHandler(BaseHandler):
if is_new:
# fire off a process in the background to send the receipt to
# remote servers
self._push_remotes([receipt])
run_as_background_process(
'push_receipts_to_remotes', self._push_remotes, receipt
)
@defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
@ -125,44 +127,42 @@ class ReceiptsHandler(BaseHandler):
defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
def _push_remotes(self, receipts):
"""Given a list of receipts, works out which remote servers should be
def _push_remotes(self, receipt):
"""Given a receipt, works out which remote servers should be
poked and pokes them.
"""
try:
# TODO: Some of this stuff should be coallesced.
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
# TODO: optimise this to move some of the work to the workers.
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains:
self.federation.send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": event_ids,
"data": data,
}
for domain in remotedomains:
self.federation.send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": event_ids,
"data": data,
}
},
}
},
key=(room_id, receipt_type, user_id),
)
},
key=(room_id, receipt_type, user_id),
)
except Exception:
logger.exception("Error pushing receipts to remote servers")

View file

@ -460,7 +460,7 @@ class RegistrationHandler(BaseHandler):
lines = response.split('\n')
json = {
"valid": lines[0] == 'true',
"error_url": "http://www.google.com/recaptcha/api/challenge?" +
"error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" +
"error=%s" % lines[1]
}
defer.returnValue(json)
@ -471,7 +471,7 @@ class RegistrationHandler(BaseHandler):
Used only by c/s api v1
"""
data = yield self.captcha_client.post_urlencoded_get_raw(
"http://www.google.com:80/recaptcha/api/verify",
"http://www.recaptcha.net:80/recaptcha/api/verify",
args={
'privatekey': private_key,
'remoteip': ip_addr,

View file

@ -50,16 +50,17 @@ class RoomListHandler(BaseHandler):
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
network_tuple=EMPTY_THIRD_PARTY_ID,):
network_tuple=EMPTY_THIRD_PARTY_ID,
from_federation=False):
"""Generate a local public room list.
There are multiple different lists: the main one plus one per third
party network. A client can ask for a specific list or to return all.
Args:
limit (int)
since_token (str)
search_filter (dict)
limit (int|None)
since_token (str|None)
search_filter (dict|None)
network_tuple (ThirdPartyInstanceID): Which public list to use.
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
@ -87,14 +88,30 @@ class RoomListHandler(BaseHandler):
return self.response_cache.wrap(
key,
self._get_public_room_list,
limit, since_token, network_tuple=network_tuple,
limit, since_token,
network_tuple=network_tuple, from_federation=from_federation,
)
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
search_filter=None,
network_tuple=EMPTY_THIRD_PARTY_ID,
from_federation=False,
timeout=None,):
"""Generate a public room list.
Args:
limit (int|None): Maximum amount of rooms to return.
since_token (str|None)
search_filter (dict|None): Dictionary to filter rooms by.
network_tuple (ThirdPartyInstanceID): Which public list to use.
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
timeout (int|None): Amount of seconds to wait for a response before
timing out.
"""
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
else:
@ -217,7 +234,8 @@ class RoomListHandler(BaseHandler):
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
chunk, limit, search_filter,
from_federation=from_federation,
),
batch, 5,
)
@ -288,23 +306,51 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
search_filter):
search_filter, from_federation=False):
"""Generate the entry for a room in the public room list and append it
to the `chunk` if it matches the search filter
Args:
room_id (str): The ID of the room.
num_joined_users (int): The number of joined users in the room.
chunk (list)
limit (int|None): Maximum amount of rooms to display. Function will
return if length of chunk is greater than limit + 1.
search_filter (dict|None)
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
"""
if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it.
return
result = yield self.generate_room_entry(room_id, num_joined_users)
if not result:
return
if result and _matches_room_entry(result, search_filter):
if from_federation and not result.get("m.federate", True):
# This is a room that other servers cannot join. Do not show them
# this room.
return
if _matches_room_entry(result, search_filter):
chunk.append(result)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def generate_room_entry(self, room_id, num_joined_users, cache_context,
with_alias=True, allow_private=False):
"""Returns the entry for a room
Args:
room_id (str): The room's ID.
num_joined_users (int): Number of users in the room.
cache_context: Information for cached responses.
with_alias (bool): Whether to return the room's aliases in the result.
allow_private (bool): Whether invite-only rooms should be shown.
Returns:
Deferred[dict|None]: Returns a room entry as a dictionary, or None if this
room was determined not to be shown publicly.
"""
result = {
"room_id": room_id,
@ -318,6 +364,7 @@ class RoomListHandler(BaseHandler):
event_map = yield self.store.get_events([
event_id for key, event_id in iteritems(current_state_ids)
if key[0] in (
EventTypes.Create,
EventTypes.JoinRules,
EventTypes.Name,
EventTypes.Topic,
@ -334,12 +381,17 @@ class RoomListHandler(BaseHandler):
}
# Double check that this is actually a public room.
join_rules_event = current_state.get((EventTypes.JoinRules, ""))
if join_rules_event:
join_rule = join_rules_event.content.get("join_rule", None)
if not allow_private and join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
# Return whether this room is open to federation users or not
create_event = current_state.get((EventTypes.Create, ""))
result["m.federate"] = create_event.content.get("m.federate", True)
if with_alias:
aliases = yield self.store.get_aliases_for_room(
room_id, on_invalidate=cache_context.invalidate

View file

@ -68,9 +68,13 @@ class MatrixFederationAgent(object):
TLS policy to use for fetching .well-known files. None to use a default
(browser-like) implementation.
srv_resolver (SrvResolver|None):
_srv_resolver (SrvResolver|None):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
_well_known_cache (TTLCache|None):
TTLCache impl for storing cached well-known lookups. None to use a default
implementation.
"""
def __init__(

View file

@ -169,18 +169,18 @@ def _return_html_error(f, request):
)
else:
logger.error(
"Failed handle request %r: %s",
"Failed handle request %r",
request,
f.getTraceback().rstrip(),
exc_info=(f.type, f.value, f.getTracebackObject()),
)
else:
code = http_client.INTERNAL_SERVER_ERROR
msg = "Internal server error"
logger.error(
"Failed handle request %r: %s",
"Failed handle request %r",
request,
f.getTraceback().rstrip(),
exc_info=(f.type, f.value, f.getTracebackObject()),
)
body = HTML_ERROR_TEMPLATE.format(

View file

@ -32,9 +32,25 @@ if six.PY3:
logger = logging.getLogger(__name__)
http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
http_push_processed_counter = Counter(
"synapse_http_httppusher_http_pushes_processed",
"Number of push notifications successfully sent",
)
http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
http_push_failed_counter = Counter(
"synapse_http_httppusher_http_pushes_failed",
"Number of push notifications which failed",
)
http_badges_processed_counter = Counter(
"synapse_http_httppusher_badge_updates_processed",
"Number of badge updates successfully sent",
)
http_badges_failed_counter = Counter(
"synapse_http_httppusher_badge_updates_failed",
"Number of badge updates which failed",
)
class HttpPusher(object):
@ -81,6 +97,11 @@ class HttpPusher(object):
pusherdict['pushkey'],
)
if self.data is None:
raise PusherConfigException(
"data can not be null for HTTP pusher"
)
if 'url' not in self.data:
raise PusherConfigException(
"'url' required in data for HTTP pusher"
@ -346,6 +367,10 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _send_badge(self, badge):
"""
Args:
badge (int): number of unread messages
"""
logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
@ -366,14 +391,11 @@ class HttpPusher(object):
}
}
try:
resp = yield self.http_client.post_json_get_json(self.url, d)
yield self.http_client.post_json_get_json(self.url, d)
http_badges_processed_counter.inc()
except Exception as e:
logger.warning(
"Failed to send badge count to %s: %s %s",
self.name, type(e), e,
)
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
rejected = resp['rejected']
defer.returnValue(rejected)
http_badges_failed_counter.inc()

View file

@ -56,7 +56,7 @@ class PusherFactory(object):
f = self.pusher_types.get(kind, None)
if not f:
return None
logger.info("creating %s pusher for %r", kind, pusherdict)
logger.debug("creating %s pusher for %r", kind, pusherdict)
return f(self.hs, pusherdict)
def _create_email_pusher(self, _hs, pusherdict):

View file

@ -19,6 +19,7 @@ import logging
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.push.pusher import PusherFactory
logger = logging.getLogger(__name__)
@ -140,6 +141,10 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
if not self.pushers:
# nothing to do here.
return
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@ -155,6 +160,10 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
if not self.pushers:
# nothing to do here.
return
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@ -214,6 +223,15 @@ class PusherPool:
"""
try:
p = self.pusher_factory.create_pusher(pusherdict)
except PusherConfigException as e:
logger.warning(
"Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
pusherdict.get('user_name'),
pusherdict.get('app_id'),
pusherdict.get('pushkey'),
e,
)
return
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
return

View file

@ -59,12 +59,7 @@ class BaseSlavedStore(SQLBaseStore):
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
try:
getattr(self, row.cache_func).invalidate(tuple(row.keys))
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
pass
self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)

View file

@ -54,8 +54,11 @@ class SlavedPresenceStore(BaseSlavedStore):
def stream_positions(self):
result = super(SlavedPresenceStore, self).stream_positions()
position = self._presence_id_gen.get_current_token()
result["presence"] = position
if self.hs.config.use_presence:
position = self._presence_id_gen.get_current_token()
result["presence"] = position
return result
def process_replication_rows(self, stream_name, token, rows):

View file

@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
Accepts a handler that will be called when new data is available or data
is required.
"""
maxDelay = 5 # Try at least once every N seconds
maxDelay = 30 # Try at least once every N seconds
def __init__(self, hs, client_name, handler):
self.client_name = client_name
@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory):
def buildProtocol(self, addr):
logger.info("Connected to replication: %r", addr)
self.resetDelay()
return ClientReplicationStreamProtocol(
self.client_name, self.server_name, self._clock, self.handler
)
@ -90,15 +89,18 @@ class ReplicationClientHandler(object):
# Used for tests.
self.awaiting_syncs = {}
# The factory used to create connections.
self.factory = None
def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
factory = ReplicationClientFactory(hs, client_name, self)
self.factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, factory)
hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
@ -140,6 +142,7 @@ class ReplicationClientHandler(object):
args["account_data"] = user_account_data
elif room_account_data:
args["account_data"] = room_account_data
return args
def get_currently_syncing_users(self):
@ -204,3 +207,14 @@ class ReplicationClientHandler(object):
for cmd in self.pending_commands:
connection.send_command(cmd)
self.pending_commands = []
def finished_connecting(self):
"""Called when we have successfully subscribed and caught up to all
streams we're interested in.
"""
logger.info("Finished connecting to server")
# We don't reset the delay any earlier as otherwise if there is a
# problem during start up we'll end up tight looping connecting to the
# server.
self.factory.resetDelay()

View file

@ -127,8 +127,11 @@ class RdataCommand(Command):
class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
"""Sent by the server to tell the client the stream postition without
needing to send an RDATA.
Sent to the client after all missing updates for a stream have been sent
to the client and they're now up to date.
"""
NAME = "POSITION"

View file

@ -268,7 +268,17 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
self.sendLine(string.encode("utf-8"))
encoded_string = string.encode("utf-8")
if len(encoded_string) > self.MAX_LENGTH:
raise Exception(
"Failed to send command %s as too long (%d > %d)" % (
cmd.NAME,
len(encoded_string), self.MAX_LENGTH,
)
)
self.sendLine(encoded_string)
self.last_sent_command = self.clock.time_msec()
@ -361,6 +371,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def id(self):
return "%s-%s" % (self.name, self.conn_id)
def lineLengthExceeded(self, line):
"""Called when we receive a line that is above the maximum line length
"""
self.send_error("Line length exceeded")
class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
@ -511,6 +526,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.server_name = server_name
self.handler = handler
# Set of stream names that have been subscribe to, but haven't yet
# caught up with. This is used to track when the client has been fully
# connected to the remote.
self.streams_connecting = set()
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {}
@ -533,6 +553,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
# This will happen if we don't actually subscribe to any streams
if not self.streams_connecting:
self.handler.finished_connecting()
def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
@ -562,6 +586,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
return self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
# When we get a `POSITION` command it means we've finished getting
# missing updates for the given stream, and are now up to date.
self.streams_connecting.discard(cmd.stream_name)
if not self.streams_connecting:
self.handler.finished_connecting()
return self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
@ -578,6 +608,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.id(), stream_name, token
)
self.streams_connecting.add(stream_name)
self.send_command(ReplicateCommand(stream_name, token))
def on_connection_closed(self):

View file

@ -33,7 +33,7 @@ RECAPTCHA_TEMPLATE = """
<title>Authentication</title>
<meta name='viewport' content='width=device-width, initial-scale=1,
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
<script src="https://www.google.com/recaptcha/api.js"
<script src="https://www.recaptcha.net/recaptcha/api.js"
async defer></script>
<script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -133,8 +134,15 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam
logger.debug("Responding to media request with responder %s")
add_file_headers(request, media_type, file_size, upload_name)
with responder:
yield responder.write_to_consumer(request)
try:
with responder:
yield responder.write_to_consumer(request)
except Exception as e:
# The majority of the time this will be due to the client having gone
# away. Unfortunately, Twisted simply throws a generic exception at us
# in that case.
logger.warning("Failed to write to consumer: %s %s", type(e), e)
finish_request(request)
@ -206,8 +214,7 @@ def get_filename_from_headers(headers):
Content-Disposition HTTP header.
Args:
headers (twisted.web.http_headers.Headers): The HTTP
request headers.
headers (dict[bytes, list[bytes]]): The HTTP request headers.
Returns:
A Unicode string of the filename, or None.
@ -218,23 +225,12 @@ def get_filename_from_headers(headers):
if not content_disposition[0]:
return
# dict of unicode: bytes, corresponding to the key value sections of the
# Content-Disposition header.
params = {}
parts = content_disposition[0].split(b";")
for i in parts:
# Split into key-value pairs, if able
# We don't care about things like `inline`, so throw it out
if b"=" not in i:
continue
key, value = i.strip().split(b"=")
params[key.decode('ascii')] = value
_, params = _parse_header(content_disposition[0])
upload_name = None
# First check if there is a valid UTF-8 filename
upload_name_utf8 = params.get("filename*", None)
upload_name_utf8 = params.get(b"filename*", None)
if upload_name_utf8:
if upload_name_utf8.lower().startswith(b"utf-8''"):
upload_name_utf8 = upload_name_utf8[7:]
@ -260,12 +256,68 @@ def get_filename_from_headers(headers):
# If there isn't check for an ascii name.
if not upload_name:
upload_name_ascii = params.get("filename", None)
upload_name_ascii = params.get(b"filename", None)
if upload_name_ascii and is_ascii(upload_name_ascii):
# Make sure there's no %-quoted bytes. If there is, reject it as
# non-valid ASCII.
if b"%" not in upload_name_ascii:
upload_name = upload_name_ascii.decode('ascii')
upload_name = upload_name_ascii.decode('ascii')
# This may be None here, indicating we did not find a matching name.
return upload_name
def _parse_header(line):
"""Parse a Content-type like header.
Cargo-culted from `cgi`, but works on bytes rather than strings.
Args:
line (bytes): header to be parsed
Returns:
Tuple[bytes, dict[bytes, bytes]]:
the main content-type, followed by the parameter dictionary
"""
parts = _parseparam(b';' + line)
key = next(parts)
pdict = {}
for p in parts:
i = p.find(b'=')
if i >= 0:
name = p[:i].strip().lower()
value = p[i + 1:].strip()
# strip double-quotes
if len(value) >= 2 and value[0:1] == value[-1:] == b'"':
value = value[1:-1]
value = value.replace(b'\\\\', b'\\').replace(b'\\"', b'"')
pdict[name] = value
return key, pdict
def _parseparam(s):
"""Generator which splits the input on ;, respecting double-quoted sequences
Cargo-culted from `cgi`, but works on bytes rather than strings.
Args:
s (bytes): header to be parsed
Returns:
Iterable[bytes]: the split input
"""
while s[:1] == b';':
s = s[1:]
# look for the next ;
end = s.find(b';')
# if there is an odd number of " marks between here and the next ;, skip to the
# next ; instead
while end > 0 and (s.count(b'"', 0, end) - s.count(b'\\"', 0, end)) % 2:
end = s.find(b';', end + 1)
if end < 0:
end = len(s)
f = s[:end]
yield f.strip()
s = s[end:]

View file

@ -7,9 +7,9 @@ import synapse.handlers.auth
import synapse.handlers.deactivate_account
import synapse.handlers.device
import synapse.handlers.e2e_keys
import synapse.handlers.message
import synapse.handlers.room
import synapse.handlers.room_member
import synapse.handlers.message
import synapse.handlers.set_password
import synapse.rest.media.v1.media_repository
import synapse.server_notices.server_notices_manager

View file

@ -4,7 +4,7 @@
<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
<link rel="stylesheet" href="style.css">
<script src="js/jquery-2.1.3.min.js"></script>
<script src="https://www.google.com/recaptcha/api/js/recaptcha_ajax.js"></script>
<script src="https://www.recaptcha.net/recaptcha/api/js/recaptcha_ajax.js"></script>
<script src="register_config.js"></script>
<script src="js/register.js"></script>
</head>

View file

@ -30,6 +30,7 @@ from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id
from synapse.util import batch_iter
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode
@ -1327,10 +1328,16 @@ class SQLBaseStore(object):
"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
keys = itertools.chain([room_id], members_changed)
self._send_invalidation_to_replication(
txn, _CURRENT_STATE_CACHE_NAME, keys,
)
# We need to be careful that the size of the `members_changed` list
# isn't so large that it causes problems sending over replication, so we
# send them in chunks.
# Max line length is 16K, and max user ID length is 255, so 50 should
# be safe.
for chunk in batch_iter(members_changed, 50):
keys = itertools.chain([room_id], chunk)
self._send_invalidation_to_replication(
txn, _CURRENT_STATE_CACHE_NAME, keys,
)
def _invalidate_state_caches(self, room_id, members_changed):
"""Invalidates caches that are based on the current state, but does
@ -1342,15 +1349,43 @@ class SQLBaseStore(object):
changed
"""
for member in members_changed:
self.get_rooms_for_user_with_stream_ordering.invalidate((member,))
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (member,),
)
for host in set(get_domain_from_id(u) for u in members_changed):
self.is_host_joined.invalidate((room_id, host))
self.was_host_joined.invalidate((room_id, host))
self._attempt_to_invalidate_cache(
"is_host_joined", (room_id, host,),
)
self._attempt_to_invalidate_cache(
"was_host_joined", (room_id, host,),
)
self.get_users_in_room.invalidate((room_id,))
self.get_room_summary.invalidate((room_id,))
self.get_current_state_ids.invalidate((room_id,))
self._attempt_to_invalidate_cache(
"get_users_in_room", (room_id,),
)
self._attempt_to_invalidate_cache(
"get_room_summary", (room_id,),
)
self._attempt_to_invalidate_cache(
"get_current_state_ids", (room_id,),
)
def _attempt_to_invalidate_cache(self, cache_name, key):
"""Attempts to invalidate the cache of the given name, ignoring if the
cache doesn't exist. Mainly used for invalidating caches on workers,
where they may not have the cache.
Args:
cache_name (str)
key (tuple)
"""
try:
getattr(self, cache_name).invalidate(key)
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
pass
def _send_invalidation_to_replication(self, txn, cache_name, keys):
"""Notifies replication that given cache has been invalidated.
@ -1568,6 +1603,14 @@ class SQLBaseStore(object):
return cls.cursor_to_dict(txn)
@property
def database_engine_name(self):
return self.database_engine.module.__name__
def get_server_version(self):
"""Returns a string describing the server version number"""
return self.database_engine.server_version
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying

View file

@ -23,6 +23,7 @@ class PostgresEngine(object):
self.module = database_module
self.module.extensions.register_type(self.module.extensions.UNICODE)
self.synchronous_commit = database_config.get("synchronous_commit", True)
self._version = None # unknown as yet
def check_database(self, txn):
txn.execute("SHOW SERVER_ENCODING")
@ -87,3 +88,27 @@ class PostgresEngine(object):
"""
txn.execute("SELECT nextval('state_group_id_seq')")
return txn.fetchone()[0]
@property
def server_version(self):
"""Returns a string giving the server version. For example: '8.1.5'
Returns:
string
"""
# note that this is a bit of a hack because it relies on on_new_connection
# having been called at least once. Still, that should be a safe bet here.
numver = self._version
assert numver is not None
# https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION
if numver >= 100000:
return "%i.%i" % (
numver / 10000, numver % 10000,
)
else:
return "%i.%i.%i" % (
numver / 10000,
(numver % 10000) / 100,
numver % 100,
)

View file

@ -70,6 +70,15 @@ class Sqlite3Engine(object):
self._current_state_group_id += 1
return self._current_state_group_id
@property
def server_version(self):
"""Gets a string giving the server version. For example: '3.22.0'
Returns:
string
"""
return "%i.%i.%i" % self.module.sqlite_version_info
# Following functions taken from: https://github.com/coleifer/peewee

View file

@ -295,6 +295,39 @@ class RegistrationWorkerStore(SQLBaseStore):
return ret['user_id']
return None
@defer.inlineCallbacks
def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
yield self._simple_upsert("user_threepids", {
"medium": medium,
"address": address,
}, {
"user_id": user_id,
"validated_at": validated_at,
"added_at": added_at,
})
@defer.inlineCallbacks
def user_get_threepids(self, user_id):
ret = yield self._simple_select_list(
"user_threepids", {
"user_id": user_id
},
['medium', 'address', 'validated_at', 'added_at'],
'user_get_threepids'
)
defer.returnValue(ret)
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
keyvalues={
"user_id": user_id,
"medium": medium,
"address": address,
},
desc="user_delete_threepids",
)
class RegistrationStore(RegistrationWorkerStore,
background_updates.BackgroundUpdateStore):
@ -632,39 +665,6 @@ class RegistrationStore(RegistrationWorkerStore,
defer.returnValue(res if res else False)
@defer.inlineCallbacks
def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
yield self._simple_upsert("user_threepids", {
"medium": medium,
"address": address,
}, {
"user_id": user_id,
"validated_at": validated_at,
"added_at": added_at,
})
@defer.inlineCallbacks
def user_get_threepids(self, user_id):
ret = yield self._simple_select_list(
"user_threepids", {
"user_id": user_id
},
['medium', 'address', 'validated_at', 'added_at'],
'user_get_threepids'
)
defer.returnValue(ret)
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
keyvalues={
"user_id": user_id,
"medium": medium,
"address": address,
},
desc="user_delete_threepids",
)
@defer.inlineCallbacks
def save_or_get_3pid_guest_access_token(
self, medium, address, access_token, inviter_user_id