mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-21 16:11:05 -04:00
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
2effa32140
30 changed files with 2343 additions and 98 deletions
44
CHANGES.rst
44
CHANGES.rst
|
@ -1,3 +1,47 @@
|
|||
Changes in synapse v0.20.0-rc1 (2017-03-30)
|
||||
===========================================
|
||||
|
||||
Features:
|
||||
|
||||
* Add delete_devices API (PR #1993)
|
||||
* Add phone number registration/login support (PR #1994, #2055)
|
||||
|
||||
|
||||
Changes:
|
||||
|
||||
* Use JSONSchema for validation of filters. Thanks @pik! (PR #1783)
|
||||
* Reread log config on SIGHUP (PR #1982)
|
||||
* Speed up public room list (PR #1989)
|
||||
* Add helpful texts to logger config options (PR #1990)
|
||||
* Minor ``/sync`` performance improvements. (PR #2002, #2013, #2022)
|
||||
* Add some debug to help diagnose weird federation issue (PR #2035)
|
||||
* Correctly limit retries for all federation requests (PR #2050, #2061)
|
||||
* Don't lock table when persisting new one time keys (PR #2053)
|
||||
* Reduce some CPU work on DB threads (PR #2054)
|
||||
* Cache hosts in room (PR #2060)
|
||||
* Batch sending of device list pokes (PR #2063)
|
||||
* Speed up persist event path in certain edge cases (PR #2070)
|
||||
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Fix bug where current_state_events renamed to current_state_ids (PR #1849)
|
||||
* Fix routing loop when fetching remote media (PR #1992)
|
||||
* Fix current_state_events table to not lie (PR #1996)
|
||||
* Fix CAS login to handle PartialDownloadError (PR #1997)
|
||||
* Fix assertion to stop transaction queue getting wedged (PR #2010)
|
||||
* Fix presence to fallback to last_active_ts if it beats the last sync time.
|
||||
Thanks @Half-Shot! (PR #2014)
|
||||
* Fix bug when federation received a PDU while a room join is in progress (PR
|
||||
#2016)
|
||||
* Fix resetting state on rejected events (PR #2025)
|
||||
* Fix installation issues in readme. Thanks @ricco386 (PR #2037)
|
||||
* Fix caching of remote servers' signature keys (PR #2042)
|
||||
* Fix some leaking log context (PR #2048, #2049, #2057, #2058)
|
||||
* Fix rejection of invites not reaching sync (PR #2056)
|
||||
|
||||
|
||||
|
||||
Changes in synapse v0.19.3 (2017-03-20)
|
||||
=======================================
|
||||
|
||||
|
|
|
@ -108,10 +108,10 @@ Installing prerequisites on ArchLinux::
|
|||
sudo pacman -S base-devel python2 python-pip \
|
||||
python-setuptools python-virtualenv sqlite3
|
||||
|
||||
Installing prerequisites on CentOS 7::
|
||||
Installing prerequisites on CentOS 7 or Fedora 25::
|
||||
|
||||
sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
|
||||
lcms2-devel libwebp-devel tcl-devel tk-devel \
|
||||
lcms2-devel libwebp-devel tcl-devel tk-devel redhat-rpm-config \
|
||||
python-virtualenv libffi-devel openssl-devel
|
||||
sudo yum groupinstall "Development Tools"
|
||||
|
||||
|
|
|
@ -112,9 +112,9 @@ script one last time, e.g. if the SQLite database is at ``homeserver.db``
|
|||
run::
|
||||
|
||||
synapse_port_db --sqlite-database homeserver.db \
|
||||
--postgres-config database_config.yaml
|
||||
--postgres-config homeserver-postgres.yaml
|
||||
|
||||
Once that has completed, change the synapse config to point at the PostgreSQL
|
||||
database configuration file using the ``database_config`` parameter (see
|
||||
`Synapse Config`_) and restart synapse. Synapse should now be running against
|
||||
database configuration file ``homeserver-postgres.yaml`` (i.e. rename it to
|
||||
``homeserver.yaml``) and restart synapse. Synapse should now be running against
|
||||
PostgreSQL.
|
||||
|
|
223
docs/tcp_replication.rst
Normal file
223
docs/tcp_replication.rst
Normal file
|
@ -0,0 +1,223 @@
|
|||
TCP Replication
|
||||
===============
|
||||
|
||||
Motivation
|
||||
----------
|
||||
|
||||
Previously the workers used an HTTP long poll mechanism to get updates from the
|
||||
master, which had the problem of causing a lot of duplicate work on the server.
|
||||
This TCP protocol replaces those APIs with the aim of increased efficiency.
|
||||
|
||||
|
||||
|
||||
Overview
|
||||
--------
|
||||
|
||||
The protocol is based on fire and forget, line based commands. An example flow
|
||||
would be (where '>' indicates master to worker and '<' worker to master flows)::
|
||||
|
||||
> SERVER example.com
|
||||
< REPLICATE events 53
|
||||
> RDATA events 54 ["$foo1:bar.com", ...]
|
||||
> RDATA events 55 ["$foo4:bar.com", ...]
|
||||
|
||||
The example shows the server accepting a new connection and sending its identity
|
||||
with the ``SERVER`` command, followed by the client asking to subscribe to the
|
||||
``events`` stream from the token ``53``. The server then periodically sends ``RDATA``
|
||||
commands which have the format ``RDATA <stream_name> <token> <row>``, where the
|
||||
format of ``<row>`` is defined by the individual streams.
|
||||
|
||||
Error reporting happens by either the client or server sending an `ERROR`
|
||||
command, and usually the connection will be closed.
|
||||
|
||||
|
||||
Since the protocol is a simple line based, its possible to manually connect to
|
||||
the server using a tool like netcat. A few things should be noted when manually
|
||||
using the protocol:
|
||||
|
||||
* When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can
|
||||
be used to get all future updates. The special stream name ``ALL`` can be used
|
||||
with ``NOW`` to subscribe to all available streams.
|
||||
* The federation stream is only available if federation sending has been
|
||||
disabled on the main process.
|
||||
* The server will only time connections out that have sent a ``PING`` command.
|
||||
If a ping is sent then the connection will be closed if no further commands
|
||||
are receieved within 15s. Both the client and server protocol implementations
|
||||
will send an initial PING on connection and ensure at least one command every
|
||||
5s is sent (not necessarily ``PING``).
|
||||
* ``RDATA`` commands *usually* include a numeric token, however if the stream
|
||||
has multiple rows to replicate per token the server will send multiple
|
||||
``RDATA`` commands, with all but the last having a token of ``batch``. See
|
||||
the documentation on ``commands.RdataCommand`` for further details.
|
||||
|
||||
|
||||
Architecture
|
||||
------------
|
||||
|
||||
The basic structure of the protocol is line based, where the initial word of
|
||||
each line specifies the command. The rest of the line is parsed based on the
|
||||
command. For example, the `RDATA` command is defined as::
|
||||
|
||||
RDATA <stream_name> <token> <row_json>
|
||||
|
||||
(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
|
||||
|
||||
Blank lines are ignored.
|
||||
|
||||
|
||||
Keep alives
|
||||
~~~~~~~~~~~
|
||||
|
||||
Both sides are expected to send at least one command every 5s or so, and
|
||||
should send a ``PING`` command if necessary. If either side do not receive a
|
||||
command within e.g. 15s then the connection should be closed.
|
||||
|
||||
Because the server may be connected to manually using e.g. netcat, the timeouts
|
||||
aren't enabled until an initial ``PING`` command is seen. Both the client and
|
||||
server implementations below send a ``PING`` command immediately on connection to
|
||||
ensure the timeouts are enabled.
|
||||
|
||||
This ensures that both sides can quickly realize if the tcp connection has gone
|
||||
and handle the situation appropriately.
|
||||
|
||||
|
||||
Start up
|
||||
~~~~~~~~
|
||||
|
||||
When a new connection is made, the server:
|
||||
|
||||
* Sends a ``SERVER`` command, which includes the identity of the server, allowing
|
||||
the client to detect if its connected to the expected server
|
||||
* Sends a ``PING`` command as above, to enable the client to time out connections
|
||||
promptly.
|
||||
|
||||
The client:
|
||||
|
||||
* Sends a ``NAME`` command, allowing the server to associate a human friendly
|
||||
name with the connection. This is optional.
|
||||
* Sends a ``PING`` as above
|
||||
* For each stream the client wishes to subscribe to it sends a ``REPLICATE``
|
||||
with the stream_name and token it wants to subscribe from.
|
||||
* On receipt of a ``SERVER`` command, checks that the server name matches the
|
||||
expected server name.
|
||||
|
||||
|
||||
Error handling
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
If either side detects an error it can send an ``ERROR`` command and close the
|
||||
connection.
|
||||
|
||||
If the client side loses the connection to the server it should reconnect,
|
||||
following the steps above.
|
||||
|
||||
|
||||
Congestion
|
||||
~~~~~~~~~~
|
||||
|
||||
If the server sends messages faster than the client can consume them the server
|
||||
will first buffer a (fairly large) number of commands and then disconnect the
|
||||
client. This ensures that we don't queue up an unbounded number of commands in
|
||||
memory and gives us a potential oppurtunity to squawk loudly. When/if the client
|
||||
recovers it can reconnect to the server and ask for missed messages.
|
||||
|
||||
|
||||
Reliability
|
||||
~~~~~~~~~~~
|
||||
|
||||
In general the replication stream should be considered an unreliable transport
|
||||
since e.g. commands are not resent if the connection disappears.
|
||||
|
||||
The exception to that are the replication streams, i.e. RDATA commands, since
|
||||
these include tokens which can be used to restart the stream on connection
|
||||
errors.
|
||||
|
||||
The client should keep track of the token in the last RDATA command received
|
||||
for each stream so that on reconneciton it can start streaming from the correct
|
||||
place. Note: not all RDATA have valid tokens due to batching. See
|
||||
``RdataCommand`` for more details.
|
||||
|
||||
|
||||
Example
|
||||
~~~~~~~
|
||||
|
||||
An example iteraction is shown below. Each line is prefixed with '>' or '<' to
|
||||
indicate which side is sending, these are *not* included on the wire::
|
||||
|
||||
* connection established *
|
||||
> SERVER localhost:8823
|
||||
> PING 1490197665618
|
||||
< NAME synapse.app.appservice
|
||||
< PING 1490197665618
|
||||
< REPLICATE events 1
|
||||
< REPLICATE backfill 1
|
||||
< REPLICATE caches 1
|
||||
> POSITION events 1
|
||||
> POSITION backfill 1
|
||||
> POSITION caches 1
|
||||
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
|
||||
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
|
||||
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
|
||||
< PING 1490197675618
|
||||
> ERROR server stopping
|
||||
* connection closed by server *
|
||||
|
||||
The ``POSITION`` command sent by the server is used to set the clients position
|
||||
without needing to send data with the ``RDATA`` command.
|
||||
|
||||
|
||||
An example of a batched set of ``RDATA`` is::
|
||||
|
||||
> RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
|
||||
> RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
|
||||
> RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
|
||||
> RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
|
||||
|
||||
In this case the client shouldn't advance their caches token until it sees the
|
||||
the last ``RDATA``.
|
||||
|
||||
|
||||
List of commands
|
||||
~~~~~~~~~~~~~~~~
|
||||
|
||||
The list of valid commands, with which side can send it: server (S) or client (C):
|
||||
|
||||
SERVER (S)
|
||||
Sent at the start to identify which server the client is talking to
|
||||
|
||||
RDATA (S)
|
||||
A single update in a stream
|
||||
|
||||
POSITION (S)
|
||||
The position of the stream has been updated
|
||||
|
||||
ERROR (S, C)
|
||||
There was an error
|
||||
|
||||
PING (S, C)
|
||||
Sent periodically to ensure the connection is still alive
|
||||
|
||||
NAME (C)
|
||||
Sent at the start by client to inform the server who they are
|
||||
|
||||
REPLICATE (C)
|
||||
Asks the server to replicate a given stream
|
||||
|
||||
USER_SYNC (C)
|
||||
A user has started or stopped syncing
|
||||
|
||||
FEDERATION_ACK (C)
|
||||
Acknowledge receipt of some federation data
|
||||
|
||||
REMOVE_PUSHER (C)
|
||||
Inform the server a pusher should be removed
|
||||
|
||||
INVALIDATE_CACHE (C)
|
||||
Inform the server a cache should be invalidated
|
||||
|
||||
SYNC (S, C)
|
||||
Used exclusively in tests
|
||||
|
||||
|
||||
See ``synapse/replication/tcp/commands.py`` for a detailed description and the
|
||||
format of each command.
|
|
@ -50,14 +50,37 @@ You may be able to setup coturn via your package manager, or set it up manually
|
|||
|
||||
pwgen -s 64 1
|
||||
|
||||
5. Ensure youe firewall allows traffic into the TURN server on
|
||||
the ports you've configured it to listen on (remember to allow
|
||||
both TCP and UDP if you've enabled both).
|
||||
5. Consider your security settings. TURN lets users request a relay
|
||||
which will connect to arbitrary IP addresses and ports. At the least
|
||||
we recommend:
|
||||
|
||||
6. If you've configured coturn to support TLS/DTLS, generate or
|
||||
# VoIP traffic is all UDP. There is no reason to let users connect to arbitrary TCP endpoints via the relay.
|
||||
no-tcp-relay
|
||||
|
||||
# don't let the relay ever try to connect to private IP address ranges within your network (if any)
|
||||
# given the turn server is likely behind your firewall, remember to include any privileged public IPs too.
|
||||
denied-peer-ip=10.0.0.0-10.255.255.255
|
||||
denied-peer-ip=192.168.0.0-192.168.255.255
|
||||
denied-peer-ip=172.16.0.0-172.31.255.255
|
||||
|
||||
# special case the turn server itself so that client->TURN->TURN->client flows work
|
||||
allowed-peer-ip=10.0.0.1
|
||||
|
||||
# consider whether you want to limit the quota of relayed streams per user (or total) to avoid risk of DoS.
|
||||
user-quota=12 # 4 streams per video call, so 12 streams = 3 simultaneous relayed calls per user.
|
||||
total-quota=1200
|
||||
|
||||
Ideally coturn should refuse to relay traffic which isn't SRTP;
|
||||
see https://github.com/matrix-org/synapse/issues/2009
|
||||
|
||||
6. Ensure your firewall allows traffic into the TURN server on
|
||||
the ports you've configured it to listen on (remember to allow
|
||||
both TCP and UDP TURN traffic)
|
||||
|
||||
7. If you've configured coturn to support TLS/DTLS, generate or
|
||||
import your private key and certificate.
|
||||
|
||||
7. Start the turn server::
|
||||
8. Start the turn server::
|
||||
|
||||
bin/turnserver -o
|
||||
|
||||
|
@ -83,12 +106,19 @@ Your home server configuration file needs the following extra keys:
|
|||
to refresh credentials. The TURN REST API specification recommends
|
||||
one day (86400000).
|
||||
|
||||
4. "turn_allow_guests": Whether to allow guest users to use the TURN
|
||||
server. This is enabled by default, as otherwise VoIP will not
|
||||
work reliably for guests. However, it does introduce a security risk
|
||||
as it lets guests connect to arbitrary endpoints without having gone
|
||||
through a CAPTCHA or similar to register a real account.
|
||||
|
||||
As an example, here is the relevant section of the config file for
|
||||
matrix.org::
|
||||
|
||||
turn_uris: [ "turn:turn.matrix.org:3478?transport=udp", "turn:turn.matrix.org:3478?transport=tcp" ]
|
||||
turn_shared_secret: n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons
|
||||
turn_user_lifetime: 86400000
|
||||
turn_allow_guests: True
|
||||
|
||||
Now, restart synapse::
|
||||
|
||||
|
|
|
@ -9,16 +9,39 @@
|
|||
ROOMID="$1"
|
||||
|
||||
sqlite3 homeserver.db <<EOF
|
||||
DELETE FROM context_depth WHERE context = '$ROOMID';
|
||||
DELETE FROM current_state WHERE context = '$ROOMID';
|
||||
DELETE FROM feedback WHERE room_id = '$ROOMID';
|
||||
DELETE FROM messages WHERE room_id = '$ROOMID';
|
||||
DELETE FROM pdu_backward_extremities WHERE context = '$ROOMID';
|
||||
DELETE FROM pdu_edges WHERE context = '$ROOMID';
|
||||
DELETE FROM pdu_forward_extremities WHERE context = '$ROOMID';
|
||||
DELETE FROM pdus WHERE context = '$ROOMID';
|
||||
DELETE FROM room_data WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_edges WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_depth WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID';
|
||||
DELETE FROM events WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_json WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_events WHERE room_id = '$ROOMID';
|
||||
DELETE FROM current_state_events WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_memberships WHERE room_id = '$ROOMID';
|
||||
DELETE FROM feedback WHERE room_id = '$ROOMID';
|
||||
DELETE FROM topics WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_names WHERE room_id = '$ROOMID';
|
||||
DELETE FROM rooms WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_pdus WHERE context = '$ROOMID';
|
||||
DELETE FROM room_hosts WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_aliases WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_groups WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
|
||||
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
|
||||
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_search_content WHERE c1room_id = '$ROOMID';
|
||||
DELETE FROM guest_access WHERE room_id = '$ROOMID';
|
||||
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_tags WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_account_data WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_push_actions WHERE room_id = '$ROOMID';
|
||||
DELETE FROM local_invites WHERE room_id = '$ROOMID';
|
||||
DELETE FROM pusher_throttle WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_reports WHERE room_id = '$ROOMID';
|
||||
DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID';
|
||||
DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_auth WHERE room_id = '$ROOMID';
|
||||
DELETE FROM appservice_room_list WHERE room_id = '$ROOMID';
|
||||
VACUUM;
|
||||
EOF
|
||||
|
|
|
@ -447,9 +447,7 @@ class Porter(object):
|
|||
|
||||
postgres_tables = yield self.postgres_store._simple_select_onecol(
|
||||
table="information_schema.tables",
|
||||
keyvalues={
|
||||
"table_schema": "public",
|
||||
},
|
||||
keyvalues={},
|
||||
retcol="distinct table_name",
|
||||
)
|
||||
|
||||
|
|
|
@ -16,4 +16,4 @@
|
|||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.19.3"
|
||||
__version__ = "0.20.0-rc1"
|
||||
|
|
|
@ -56,6 +56,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
|||
from synapse.metrics import register_memory_metrics, get_metrics_for
|
||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
|
||||
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
|
@ -222,6 +223,16 @@ class SynapseHomeServer(HomeServer):
|
|||
),
|
||||
interface=address
|
||||
)
|
||||
elif listener["type"] == "replication":
|
||||
bind_addresses = listener["bind_addresses"]
|
||||
for address in bind_addresses:
|
||||
factory = ReplicationStreamProtocolFactory(self)
|
||||
server_listener = reactor.listenTCP(
|
||||
listener["port"], factory, interface=address
|
||||
)
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "shutdown", server_listener.stopListening,
|
||||
)
|
||||
else:
|
||||
logger.warn("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
|
|
|
@ -202,7 +202,8 @@ def main():
|
|||
worker_app = worker_config["worker_app"]
|
||||
worker_pidfile = worker_config["worker_pid_file"]
|
||||
worker_daemonize = worker_config["worker_daemonize"]
|
||||
assert worker_daemonize # TODO print something more user friendly
|
||||
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
|
||||
worker_configfile, "worker_daemonize")
|
||||
worker_cache_factor = worker_config.get("synctl_cache_factor")
|
||||
workers.append(Worker(
|
||||
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
|
||||
|
|
|
@ -23,6 +23,7 @@ class VoipConfig(Config):
|
|||
self.turn_username = config.get("turn_username")
|
||||
self.turn_password = config.get("turn_password")
|
||||
self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
|
||||
self.turn_allow_guests = config.get("turn_allow_guests", True)
|
||||
|
||||
def default_config(self, **kwargs):
|
||||
return """\
|
||||
|
@ -41,4 +42,11 @@ class VoipConfig(Config):
|
|||
|
||||
# How long generated TURN credentials last
|
||||
turn_user_lifetime: "1h"
|
||||
|
||||
# Whether guests should be allowed to use the TURN server.
|
||||
# This defaults to True, otherwise VoIP will be unreliable for guests.
|
||||
# However, it does introduce a slight security risk as it allows users to
|
||||
# connect to arbitrary endpoints without having first signed up for a
|
||||
# valid account (e.g. by passing a CAPTCHA).
|
||||
turn_allow_guests: True
|
||||
"""
|
||||
|
|
|
@ -146,11 +146,15 @@ class FederationServer(FederationBase):
|
|||
# check that it's actually being sent from a valid destination to
|
||||
# workaround bug #1753 in 0.18.5 and 0.18.6
|
||||
if transaction.origin != get_domain_from_id(pdu.event_id):
|
||||
# We continue to accept join events from any server; this is
|
||||
# necessary for the federation join dance to work correctly.
|
||||
# (When we join over federation, the "helper" server is
|
||||
# responsible for sending out the join event, rather than the
|
||||
# origin. See bug #1893).
|
||||
if not (
|
||||
pdu.type == 'm.room.member' and
|
||||
pdu.content and
|
||||
pdu.content.get("membership", None) == 'join' and
|
||||
self.hs.is_mine_id(pdu.state_key)
|
||||
pdu.content.get("membership", None) == 'join'
|
||||
):
|
||||
logger.info(
|
||||
"Discarding PDU %s from invalid origin %s",
|
||||
|
|
|
@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object):
|
|||
def get_current_token(self):
|
||||
return self.pos - 1
|
||||
|
||||
def get_replication_rows(self, token, limit, federation_ack=None):
|
||||
"""
|
||||
def federation_ack(self, token):
|
||||
self._clear_queue_before_pos(token)
|
||||
|
||||
def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
|
||||
"""Get rows to be sent over federation between the two tokens
|
||||
|
||||
Args:
|
||||
token (int)
|
||||
from_token (int)
|
||||
to_token(int)
|
||||
limit (int)
|
||||
federation_ack (int): Optional. The position where the worker is
|
||||
explicitly acknowledged it has handled. Allows us to drop
|
||||
|
@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object):
|
|||
# TODO: Handle limit.
|
||||
|
||||
# To handle restarts where we wrap around
|
||||
if token > self.pos:
|
||||
token = -1
|
||||
if from_token > self.pos:
|
||||
from_token = -1
|
||||
|
||||
rows = []
|
||||
|
||||
|
@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object):
|
|||
|
||||
# Fetch changed presence
|
||||
keys = self.presence_changed.keys()
|
||||
i = keys.bisect_right(token)
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
dest_user_ids = set(
|
||||
(pos, dest_user_id)
|
||||
for pos in keys[i:]
|
||||
for pos in keys[i:j]
|
||||
for dest_user_id in self.presence_changed[pos]
|
||||
)
|
||||
|
||||
|
@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object):
|
|||
|
||||
# Fetch changes keyed edus
|
||||
keys = self.keyed_edu_changed.keys()
|
||||
i = keys.bisect_right(token)
|
||||
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
|
||||
|
||||
for (pos, (destination, edu_key)) in keyed_edus:
|
||||
rows.append(
|
||||
|
@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object):
|
|||
|
||||
# Fetch changed edus
|
||||
keys = self.edus.keys()
|
||||
i = keys.bisect_right(token)
|
||||
edus = set((k, self.edus[k]) for k in keys[i:])
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
edus = set((k, self.edus[k]) for k in keys[i:j])
|
||||
|
||||
for (pos, edu) in edus:
|
||||
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
|
||||
|
||||
# Fetch changed failures
|
||||
keys = self.failures.keys()
|
||||
i = keys.bisect_right(token)
|
||||
failures = set((k, self.failures[k]) for k in keys[i:])
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
failures = set((k, self.failures[k]) for k in keys[i:j])
|
||||
|
||||
for (pos, (destination, failure)) in failures:
|
||||
rows.append((pos, FAILURE_TYPE, ujson.dumps({
|
||||
|
@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object):
|
|||
|
||||
# Fetch changed device messages
|
||||
keys = self.device_messages.keys()
|
||||
i = keys.bisect_right(token)
|
||||
device_messages = set((k, self.device_messages[k]) for k in keys[i:])
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
|
||||
|
||||
for (pos, destination) in device_messages:
|
||||
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
|
||||
|
|
|
@ -28,7 +28,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
|
|||
from synapse.events.validator import EventValidator
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.logcontext import (
|
||||
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
|
||||
preserve_fn, preserve_context_over_deferred
|
||||
)
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.logutils import log_function
|
||||
|
@ -394,11 +394,10 @@ class FederationHandler(BaseHandler):
|
|||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.membership == Membership.JOIN:
|
||||
|
@ -916,11 +915,10 @@ class FederationHandler(BaseHandler):
|
|||
origin, auth_chain, state, event
|
||||
)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[joinee]
|
||||
)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[joinee]
|
||||
)
|
||||
|
||||
logger.debug("Finished joining %s to %s", joinee, room_id)
|
||||
finally:
|
||||
|
@ -1004,9 +1002,19 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
|
||||
event.internal_metadata.outlier = False
|
||||
# Send this event on behalf of the origin server since they may not
|
||||
# have an up to data view of the state of the room at this event so
|
||||
# will not know which servers to send the event to.
|
||||
# Send this event on behalf of the origin server.
|
||||
#
|
||||
# The reasons we have the destination server rather than the origin
|
||||
# server send it are slightly mysterious: the origin server should have
|
||||
# all the neccessary state once it gets the response to the send_join,
|
||||
# so it could send the event itself if it wanted to. It may be that
|
||||
# doing it this way reduces failure modes, or avoids certain attacks
|
||||
# where a new server selectively tells a subset of the federation that
|
||||
# it has joined.
|
||||
#
|
||||
# The fact is that, as of the current writing, Synapse doesn't send out
|
||||
# the join event over federation after joining, and changing it now
|
||||
# would introduce the danger of backwards-compatibility problems.
|
||||
event.internal_metadata.send_on_behalf_of = origin
|
||||
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
|
@ -1025,10 +1033,9 @@ class FederationHandler(BaseHandler):
|
|||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.content["membership"] == Membership.JOIN:
|
||||
|
@ -1074,11 +1081,10 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
|
||||
target_user = UserID.from_string(event.state_key)
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[target_user],
|
||||
)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[target_user],
|
||||
)
|
||||
|
||||
defer.returnValue(event)
|
||||
|
||||
|
@ -1236,10 +1242,9 @@ class FederationHandler(BaseHandler):
|
|||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
|
||||
defer.returnValue(None)
|
||||
|
||||
|
|
|
@ -612,7 +612,7 @@ class MessageHandler(BaseHandler):
|
|||
@defer.inlineCallbacks
|
||||
def _notify():
|
||||
yield run_on_reactor()
|
||||
yield self.notifier.on_new_room_event(
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
|
|
@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState
|
|||
from synapse.storage.presence import UserPresenceState
|
||||
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
|
@ -187,6 +188,7 @@ class PresenceHandler(object):
|
|||
# process_id to millisecond timestamp last updated.
|
||||
self.external_process_to_current_syncs = {}
|
||||
self.external_process_last_updated_ms = {}
|
||||
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
|
||||
|
||||
# Start a LoopingCall in 30s that fires every 5s.
|
||||
# The initial delay is to allow disconnected clients a chance to
|
||||
|
@ -511,6 +513,73 @@ class PresenceHandler(object):
|
|||
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
|
||||
self.external_process_to_current_syncs[process_id] = syncing_user_ids
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
|
||||
"""Update the syncing users for an external process as a delta.
|
||||
|
||||
Args:
|
||||
process_id (str): An identifier for the process the users are
|
||||
syncing against. This allows synapse to process updates
|
||||
as user start and stop syncing against a given process.
|
||||
user_id (str): The user who has started or stopped syncing
|
||||
is_syncing (bool): Whether or not the user is now syncing
|
||||
sync_time_msec(int): Time in ms when the user was last syncing
|
||||
"""
|
||||
with (yield self.external_sync_linearizer.queue(process_id)):
|
||||
prev_state = yield self.current_state_for_user(user_id)
|
||||
|
||||
process_presence = self.external_process_to_current_syncs.setdefault(
|
||||
process_id, set()
|
||||
)
|
||||
|
||||
updates = []
|
||||
if is_syncing and user_id not in process_presence:
|
||||
if prev_state.state == PresenceState.OFFLINE:
|
||||
updates.append(prev_state.copy_and_replace(
|
||||
state=PresenceState.ONLINE,
|
||||
last_active_ts=sync_time_msec,
|
||||
last_user_sync_ts=sync_time_msec,
|
||||
))
|
||||
else:
|
||||
updates.append(prev_state.copy_and_replace(
|
||||
last_user_sync_ts=sync_time_msec,
|
||||
))
|
||||
process_presence.add(user_id)
|
||||
elif user_id in process_presence:
|
||||
updates.append(prev_state.copy_and_replace(
|
||||
last_user_sync_ts=sync_time_msec,
|
||||
))
|
||||
|
||||
if not is_syncing:
|
||||
process_presence.discard(user_id)
|
||||
|
||||
if updates:
|
||||
yield self._update_states(updates)
|
||||
|
||||
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_external_syncs_clear(self, process_id):
|
||||
"""Marks all users that had been marked as syncing by a given process
|
||||
as offline.
|
||||
|
||||
Used when the process has stopped/disappeared.
|
||||
"""
|
||||
with (yield self.external_sync_linearizer.queue(process_id)):
|
||||
process_presence = self.external_process_to_current_syncs.pop(
|
||||
process_id, set()
|
||||
)
|
||||
prev_states = yield self.current_state_for_users(process_presence)
|
||||
time_now_ms = self.clock.time_msec()
|
||||
|
||||
yield self._update_states([
|
||||
prev_state.copy_and_replace(
|
||||
last_user_sync_ts=time_now_ms,
|
||||
)
|
||||
for prev_state in prev_states.itervalues()
|
||||
])
|
||||
self.external_process_last_updated_ms.pop(process_id, None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def current_state_for_user(self, user_id):
|
||||
"""Get the current presence state for a user.
|
||||
|
|
|
@ -293,6 +293,9 @@ class TypingHandler(object):
|
|||
rows.sort()
|
||||
return rows
|
||||
|
||||
def get_current_token(self):
|
||||
return self._latest_room_serial
|
||||
|
||||
|
||||
class TypingNotificationEventSource(object):
|
||||
def __init__(self, hs):
|
||||
|
|
|
@ -163,6 +163,8 @@ class Notifier(object):
|
|||
self.store = hs.get_datastore()
|
||||
self.pending_new_room_events = []
|
||||
|
||||
self.replication_callbacks = []
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
|
||||
|
@ -202,7 +204,12 @@ class Notifier(object):
|
|||
lambda: len(self.user_to_user_stream),
|
||||
)
|
||||
|
||||
@preserve_fn
|
||||
def add_replication_callback(self, cb):
|
||||
"""Add a callback that will be called when some new data is available.
|
||||
Callback is not given any arguments.
|
||||
"""
|
||||
self.replication_callbacks.append(cb)
|
||||
|
||||
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
|
||||
extra_users=[]):
|
||||
""" Used by handlers to inform the notifier something has happened
|
||||
|
@ -216,15 +223,13 @@ class Notifier(object):
|
|||
until all previous events have been persisted before notifying
|
||||
the client streams.
|
||||
"""
|
||||
with PreserveLoggingContext():
|
||||
self.pending_new_room_events.append((
|
||||
room_stream_id, event, extra_users
|
||||
))
|
||||
self._notify_pending_new_room_events(max_room_stream_id)
|
||||
self.pending_new_room_events.append((
|
||||
room_stream_id, event, extra_users
|
||||
))
|
||||
self._notify_pending_new_room_events(max_room_stream_id)
|
||||
|
||||
self.notify_replication()
|
||||
self.notify_replication()
|
||||
|
||||
@preserve_fn
|
||||
def _notify_pending_new_room_events(self, max_room_stream_id):
|
||||
"""Notify for the room events that were queued waiting for a previous
|
||||
event to be persisted.
|
||||
|
@ -242,14 +247,16 @@ class Notifier(object):
|
|||
else:
|
||||
self._on_new_room_event(event, room_stream_id, extra_users)
|
||||
|
||||
@preserve_fn
|
||||
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
|
||||
"""Notify any user streams that are interested in this room event"""
|
||||
# poke any interested application service.
|
||||
self.appservice_handler.notify_interested_services(room_stream_id)
|
||||
preserve_fn(self.appservice_handler.notify_interested_services)(
|
||||
room_stream_id)
|
||||
|
||||
if self.federation_sender:
|
||||
self.federation_sender.notify_new_events(room_stream_id)
|
||||
preserve_fn(self.federation_sender.notify_new_events)(
|
||||
room_stream_id
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
self._user_joined_room(event.state_key, event.room_id)
|
||||
|
@ -260,7 +267,6 @@ class Notifier(object):
|
|||
rooms=[event.room_id],
|
||||
)
|
||||
|
||||
@preserve_fn
|
||||
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
|
||||
""" Used to inform listeners that something has happend event wise.
|
||||
|
||||
|
@ -287,7 +293,6 @@ class Notifier(object):
|
|||
|
||||
self.notify_replication()
|
||||
|
||||
@preserve_fn
|
||||
def on_new_replication_data(self):
|
||||
"""Used to inform replication listeners that something has happend
|
||||
without waking up any of the normal user event streams"""
|
||||
|
@ -510,6 +515,9 @@ class Notifier(object):
|
|||
self.replication_deferred = ObservableDeferred(defer.Deferred())
|
||||
deferred.callback(None)
|
||||
|
||||
for cb in self.replication_callbacks:
|
||||
preserve_fn(cb)()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_replication(self, callback, timeout):
|
||||
"""Wait for an event to happen.
|
||||
|
|
|
@ -489,7 +489,7 @@ class ReplicationResource(Resource):
|
|||
|
||||
if federation is not None and federation != current_position:
|
||||
federation_rows = self.federation_sender.get_replication_rows(
|
||||
federation, limit, federation_ack=federation_ack,
|
||||
federation, current_position, limit, federation_ack=federation_ack,
|
||||
)
|
||||
upto_token = _position_from_rows(federation_rows, current_position)
|
||||
writer.write_header_and_rows("federation", federation_rows, (
|
||||
|
@ -504,7 +504,7 @@ class ReplicationResource(Resource):
|
|||
|
||||
if device_lists is not None and device_lists != current_position:
|
||||
changes = yield self.store.get_all_device_list_changes_for_remotes(
|
||||
device_lists,
|
||||
device_lists, current_position,
|
||||
)
|
||||
writer.write_header_and_rows("device_lists", changes, (
|
||||
"position", "user_id", "destination",
|
||||
|
|
30
synapse/replication/tcp/__init__.py
Normal file
30
synapse/replication/tcp/__init__.py
Normal file
|
@ -0,0 +1,30 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""This module implements the TCP replication protocol used by synapse to
|
||||
communicate between the master process and its workers (when they're enabled).
|
||||
|
||||
Further details can be found in docs/tcp_replication.rst
|
||||
|
||||
|
||||
Structure of the module:
|
||||
* client.py - the client classes used for workers to connect to master
|
||||
* command.py - the definitions of all the valid commands
|
||||
* protocol.py - contains bot the client and server protocol implementations,
|
||||
these should not be used directly
|
||||
* resource.py - the server classes that accepts and handle client connections
|
||||
* streams.py - the definitons of all the valid streams
|
||||
|
||||
"""
|
346
synapse/replication/tcp/commands.py
Normal file
346
synapse/replication/tcp/commands.py
Normal file
|
@ -0,0 +1,346 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Defines the various valid commands
|
||||
|
||||
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
|
||||
allowed to be sent by which side.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Command(object):
|
||||
"""The base command class.
|
||||
|
||||
All subclasses must set the NAME variable which equates to the name of the
|
||||
command on the wire.
|
||||
|
||||
A full command line on the wire is constructed from `NAME + " " + to_line()`
|
||||
|
||||
The default implementation creates a command of form `<NAME> <data>`
|
||||
"""
|
||||
NAME = None
|
||||
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
"""Deserialises a line from the wire into this command. `line` does not
|
||||
include the command.
|
||||
"""
|
||||
return cls(line)
|
||||
|
||||
def to_line(self):
|
||||
"""Serialises the comamnd for the wire. Does not include the command
|
||||
prefix.
|
||||
"""
|
||||
return self.data
|
||||
|
||||
|
||||
class ServerCommand(Command):
|
||||
"""Sent by the server on new connection and includes the server_name.
|
||||
|
||||
Format::
|
||||
|
||||
SERVER <server_name>
|
||||
"""
|
||||
NAME = "SERVER"
|
||||
|
||||
|
||||
class RdataCommand(Command):
|
||||
"""Sent by server when a subscribed stream has an update.
|
||||
|
||||
Format::
|
||||
|
||||
RDATA <stream_name> <token> <row_json>
|
||||
|
||||
The `<token>` may either be a numeric stream id OR "batch". The latter case
|
||||
is used to support sending multiple updates with the same stream ID. This
|
||||
is done by sending an RDATA for each row, with all but the last RDATA having
|
||||
a token of "batch" and the last having the final stream ID.
|
||||
|
||||
The client should batch all incoming RDATA with a token of "batch" (per
|
||||
stream_name) until it sees an RDATA with a numeric stream ID.
|
||||
|
||||
`<token>` of "batch" maps to the instance variable `token` being None.
|
||||
|
||||
An example of a batched series of RDATA::
|
||||
|
||||
RDATA presence batch ["@foo:example.com", "online", ...]
|
||||
RDATA presence batch ["@bar:example.com", "online", ...]
|
||||
RDATA presence 59 ["@baz:example.com", "online", ...]
|
||||
"""
|
||||
NAME = "RDATA"
|
||||
|
||||
def __init__(self, stream_name, token, row):
|
||||
self.stream_name = stream_name
|
||||
self.token = token
|
||||
self.row = row
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
stream_name, token, row_json = line.split(" ", 2)
|
||||
return cls(
|
||||
stream_name,
|
||||
None if token == "batch" else int(token),
|
||||
json.loads(row_json)
|
||||
)
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((
|
||||
self.stream_name,
|
||||
str(self.token) if self.token is not None else "batch",
|
||||
json.dumps(self.row),
|
||||
))
|
||||
|
||||
|
||||
class PositionCommand(Command):
|
||||
"""Sent by the client to tell the client the stream postition without
|
||||
needing to send an RDATA.
|
||||
"""
|
||||
NAME = "POSITION"
|
||||
|
||||
def __init__(self, stream_name, token):
|
||||
self.stream_name = stream_name
|
||||
self.token = token
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
stream_name, token = line.split(" ", 1)
|
||||
return cls(stream_name, int(token))
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((self.stream_name, str(self.token),))
|
||||
|
||||
|
||||
class ErrorCommand(Command):
|
||||
"""Sent by either side if there was an ERROR. The data is a string describing
|
||||
the error.
|
||||
"""
|
||||
NAME = "ERROR"
|
||||
|
||||
|
||||
class PingCommand(Command):
|
||||
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
|
||||
"""
|
||||
NAME = "PING"
|
||||
|
||||
|
||||
class NameCommand(Command):
|
||||
"""Sent by client to inform the server of the client's identity. The data
|
||||
is the name
|
||||
"""
|
||||
NAME = "NAME"
|
||||
|
||||
|
||||
class ReplicateCommand(Command):
|
||||
"""Sent by the client to subscribe to the stream.
|
||||
|
||||
Format::
|
||||
|
||||
REPLICATE <stream_name> <token>
|
||||
|
||||
Where <token> may be either:
|
||||
* a numeric stream_id to stream updates from
|
||||
* "NOW" to stream all subsequent updates.
|
||||
|
||||
The <stream_name> can be "ALL" to subscribe to all known streams, in which
|
||||
case the <token> must be set to "NOW", i.e.::
|
||||
|
||||
REPLICATE ALL NOW
|
||||
"""
|
||||
NAME = "REPLICATE"
|
||||
|
||||
def __init__(self, stream_name, token):
|
||||
self.stream_name = stream_name
|
||||
self.token = token
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
stream_name, token = line.split(" ", 1)
|
||||
if token in ("NOW", "now"):
|
||||
token = "NOW"
|
||||
else:
|
||||
token = int(token)
|
||||
return cls(stream_name, token)
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((self.stream_name, str(self.token),))
|
||||
|
||||
|
||||
class UserSyncCommand(Command):
|
||||
"""Sent by the client to inform the server that a user has started or
|
||||
stopped syncing. Used to calculate presence on the master.
|
||||
|
||||
Includes a timestamp of when the last user sync was.
|
||||
|
||||
Format::
|
||||
|
||||
USER_SYNC <user_id> <state> <last_sync_ms>
|
||||
|
||||
Where <state> is either "start" or "stop"
|
||||
"""
|
||||
NAME = "USER_SYNC"
|
||||
|
||||
def __init__(self, user_id, is_syncing, last_sync_ms):
|
||||
self.user_id = user_id
|
||||
self.is_syncing = is_syncing
|
||||
self.last_sync_ms = last_sync_ms
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
user_id, state, last_sync_ms = line.split(" ", 2)
|
||||
|
||||
if state not in ("start", "end"):
|
||||
raise Exception("Invalid USER_SYNC state %r" % (state,))
|
||||
|
||||
return cls(user_id, state == "start", int(last_sync_ms))
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((
|
||||
self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
|
||||
))
|
||||
|
||||
|
||||
class FederationAckCommand(Command):
|
||||
"""Sent by the client when it has processed up to a given point in the
|
||||
federation stream. This allows the master to drop in-memory caches of the
|
||||
federation stream.
|
||||
|
||||
This must only be sent from one worker (i.e. the one sending federation)
|
||||
|
||||
Format::
|
||||
|
||||
FEDERATION_ACK <token>
|
||||
"""
|
||||
NAME = "FEDERATION_ACK"
|
||||
|
||||
def __init__(self, token):
|
||||
self.token = token
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
return cls(int(line))
|
||||
|
||||
def to_line(self):
|
||||
return str(self.token)
|
||||
|
||||
|
||||
class SyncCommand(Command):
|
||||
"""Used for testing. The client protocol implementation allows waiting
|
||||
on a SYNC command with a specified data.
|
||||
"""
|
||||
NAME = "SYNC"
|
||||
|
||||
|
||||
class RemovePusherCommand(Command):
|
||||
"""Sent by the client to request the master remove the given pusher.
|
||||
|
||||
Format::
|
||||
|
||||
REMOVE_PUSHER <app_id> <push_key> <user_id>
|
||||
"""
|
||||
NAME = "REMOVE_PUSHER"
|
||||
|
||||
def __init__(self, app_id, push_key, user_id):
|
||||
self.user_id = user_id
|
||||
self.app_id = app_id
|
||||
self.push_key = push_key
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
app_id, push_key, user_id = line.split(" ", 2)
|
||||
|
||||
return cls(app_id, push_key, user_id)
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((self.app_id, self.push_key, self.user_id))
|
||||
|
||||
|
||||
class InvalidateCacheCommand(Command):
|
||||
"""Sent by the client to invalidate an upstream cache.
|
||||
|
||||
THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE
|
||||
NOT DISASTROUS IF WE DROP ON THE FLOOR.
|
||||
|
||||
Mainly used to invalidate destination retry timing caches.
|
||||
|
||||
Format::
|
||||
|
||||
INVALIDATE_CACHE <cache_func> <keys_json>
|
||||
|
||||
Where <keys_json> is a json list.
|
||||
"""
|
||||
NAME = "INVALIDATE_CACHE"
|
||||
|
||||
def __init__(self, cache_func, keys):
|
||||
self.cache_func = cache_func
|
||||
self.keys = keys
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
cache_func, keys_json = line.split(" ", 1)
|
||||
|
||||
return cls(cache_func, json.loads(keys_json))
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((self.cache_func, json.dumps(self.keys)))
|
||||
|
||||
|
||||
# Map of command name to command type.
|
||||
COMMAND_MAP = {
|
||||
cmd.NAME: cmd
|
||||
for cmd in (
|
||||
ServerCommand,
|
||||
RdataCommand,
|
||||
PositionCommand,
|
||||
ErrorCommand,
|
||||
PingCommand,
|
||||
NameCommand,
|
||||
ReplicateCommand,
|
||||
UserSyncCommand,
|
||||
FederationAckCommand,
|
||||
SyncCommand,
|
||||
RemovePusherCommand,
|
||||
InvalidateCacheCommand,
|
||||
)
|
||||
}
|
||||
|
||||
# The commands the server is allowed to send
|
||||
VALID_SERVER_COMMANDS = (
|
||||
ServerCommand.NAME,
|
||||
RdataCommand.NAME,
|
||||
PositionCommand.NAME,
|
||||
ErrorCommand.NAME,
|
||||
PingCommand.NAME,
|
||||
SyncCommand.NAME,
|
||||
)
|
||||
|
||||
# The commands the client is allowed to send
|
||||
VALID_CLIENT_COMMANDS = (
|
||||
NameCommand.NAME,
|
||||
ReplicateCommand.NAME,
|
||||
PingCommand.NAME,
|
||||
UserSyncCommand.NAME,
|
||||
FederationAckCommand.NAME,
|
||||
RemovePusherCommand.NAME,
|
||||
InvalidateCacheCommand.NAME,
|
||||
ErrorCommand.NAME,
|
||||
)
|
604
synapse/replication/tcp/protocol.py
Normal file
604
synapse/replication/tcp/protocol.py
Normal file
|
@ -0,0 +1,604 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""This module contains the implementation of both the client and server
|
||||
protocols.
|
||||
|
||||
The basic structure of the protocol is line based, where the initial word of
|
||||
each line specifies the command. The rest of the line is parsed based on the
|
||||
command. For example, the `RDATA` command is defined as::
|
||||
|
||||
RDATA <stream_name> <token> <row_json>
|
||||
|
||||
(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
|
||||
|
||||
Blank lines are ignored.
|
||||
|
||||
# Example
|
||||
|
||||
An example iteraction is shown below. Each line is prefixed with '>' or '<' to
|
||||
indicate which side is sending, these are *not* included on the wire::
|
||||
|
||||
* connection established *
|
||||
> SERVER localhost:8823
|
||||
> PING 1490197665618
|
||||
< NAME synapse.app.appservice
|
||||
< PING 1490197665618
|
||||
< REPLICATE events 1
|
||||
< REPLICATE backfill 1
|
||||
< REPLICATE caches 1
|
||||
> POSITION events 1
|
||||
> POSITION backfill 1
|
||||
> POSITION caches 1
|
||||
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
|
||||
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
|
||||
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
|
||||
< PING 1490197675618
|
||||
> ERROR server stopping
|
||||
* connection closed by server *
|
||||
"""
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.protocols.basic import LineOnlyReceiver
|
||||
|
||||
from commands import (
|
||||
COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
|
||||
ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
|
||||
NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
|
||||
)
|
||||
from streams import STREAMS_MAP
|
||||
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
import logging
|
||||
import synapse.metrics
|
||||
import struct
|
||||
import fcntl
|
||||
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||
|
||||
inbound_commands_counter = metrics.register_counter(
|
||||
"inbound_commands", labels=["command", "name", "conn_id"],
|
||||
)
|
||||
outbound_commands_counter = metrics.register_counter(
|
||||
"outbound_commands", labels=["command", "name", "conn_id"],
|
||||
)
|
||||
|
||||
|
||||
# A list of all connected protocols. This allows us to send metrics about the
|
||||
# connections.
|
||||
connected_connections = []
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
PING_TIME = 5000
|
||||
|
||||
|
||||
class ConnectionStates(object):
|
||||
CONNECTING = "connecting"
|
||||
ESTABLISHED = "established"
|
||||
PAUSED = "paused"
|
||||
CLOSED = "closed"
|
||||
|
||||
|
||||
class BaseReplicationStreamProtocol(LineOnlyReceiver):
|
||||
"""Base replication protocol shared between client and server.
|
||||
|
||||
Reads lines (ignoring blank ones) and parses them into command classes,
|
||||
asserting that they are valid for the given direction, i.e. server commands
|
||||
are only sent by the server.
|
||||
|
||||
On receiving a new command it calls `on_<COMMAND_NAME>` with the parsed
|
||||
command.
|
||||
|
||||
It also sends `PING` periodically, and correctly times out remote connections
|
||||
(if they send a `PING` command)
|
||||
"""
|
||||
delimiter = b'\n'
|
||||
|
||||
VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive
|
||||
VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send
|
||||
|
||||
max_line_buffer = 10000
|
||||
|
||||
def __init__(self, clock):
|
||||
self.clock = clock
|
||||
|
||||
self.last_received_command = self.clock.time_msec()
|
||||
self.last_sent_command = 0
|
||||
self.time_we_closed = None # When we requested the connection be closed
|
||||
|
||||
self.received_ping = False # Have we reecived a ping from the other side
|
||||
|
||||
self.state = ConnectionStates.CONNECTING
|
||||
|
||||
self.name = "anon" # The name sent by a client.
|
||||
self.conn_id = random_string(5) # To dedupe in case of name clashes.
|
||||
|
||||
# List of pending commands to send once we've established the connection
|
||||
self.pending_commands = []
|
||||
|
||||
# The LoopingCall for sending pings.
|
||||
self._send_ping_loop = None
|
||||
|
||||
def connectionMade(self):
|
||||
logger.info("[%s] Connection established", self.id())
|
||||
|
||||
self.state = ConnectionStates.ESTABLISHED
|
||||
|
||||
connected_connections.append(self) # Register connection for metrics
|
||||
|
||||
self.transport.registerProducer(self, True) # For the *Producing callbacks
|
||||
|
||||
self._send_pending_commands()
|
||||
|
||||
# Starts sending pings
|
||||
self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000)
|
||||
|
||||
# Always send the initial PING so that the other side knows that they
|
||||
# can time us out.
|
||||
self.send_command(PingCommand(self.clock.time_msec()))
|
||||
|
||||
def send_ping(self):
|
||||
"""Periodically sends a ping and checks if we should close the connection
|
||||
due to the other side timing out.
|
||||
"""
|
||||
now = self.clock.time_msec()
|
||||
|
||||
if self.time_we_closed:
|
||||
if now - self.time_we_closed > PING_TIME * 3:
|
||||
logger.info(
|
||||
"[%s] Failed to close connection gracefully, aborting", self.id()
|
||||
)
|
||||
self.transport.abortConnection()
|
||||
else:
|
||||
if now - self.last_sent_command >= PING_TIME:
|
||||
self.send_command(PingCommand(now))
|
||||
|
||||
if self.received_ping and now - self.last_received_command > PING_TIME * 3:
|
||||
logger.info(
|
||||
"[%s] Connection hasn't received command in %r ms. Closing.",
|
||||
self.id(), now - self.last_received_command
|
||||
)
|
||||
self.send_error("ping timeout")
|
||||
|
||||
def lineReceived(self, line):
|
||||
"""Called when we've received a line
|
||||
"""
|
||||
if line.strip() == "":
|
||||
# Ignore blank lines
|
||||
return
|
||||
|
||||
line = line.decode("utf-8")
|
||||
cmd_name, rest_of_line = line.split(" ", 1)
|
||||
|
||||
if cmd_name not in self.VALID_INBOUND_COMMANDS:
|
||||
logger.error("[%s] invalid command %s", self.id(), cmd_name)
|
||||
self.send_error("invalid command: %s", cmd_name)
|
||||
return
|
||||
|
||||
self.last_received_command = self.clock.time_msec()
|
||||
|
||||
inbound_commands_counter.inc(cmd_name, self.name, self.conn_id)
|
||||
|
||||
cmd_cls = COMMAND_MAP[cmd_name]
|
||||
try:
|
||||
cmd = cmd_cls.from_line(rest_of_line)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
|
||||
)
|
||||
self.send_error(
|
||||
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
|
||||
)
|
||||
return
|
||||
|
||||
# Now lets try and call on_<CMD_NAME> function
|
||||
try:
|
||||
getattr(self, "on_%s" % (cmd_name,))(cmd)
|
||||
except Exception:
|
||||
logger.exception("[%s] Failed to handle line: %r", self.id(), line)
|
||||
|
||||
def close(self):
|
||||
self.time_we_closed = self.clock.time_msec()
|
||||
self.transport.loseConnection()
|
||||
self.on_connection_closed()
|
||||
|
||||
def send_error(self, error_string, *args):
|
||||
"""Send an error to remote and close the connection.
|
||||
"""
|
||||
self.send_command(ErrorCommand(error_string % args))
|
||||
self.close()
|
||||
|
||||
def send_command(self, cmd, do_buffer=True):
|
||||
"""Send a command if connection has been established.
|
||||
|
||||
Args:
|
||||
cmd (Command)
|
||||
do_buffer (bool): Whether to buffer the message or always attempt
|
||||
to send the command. This is mostly used to send an error
|
||||
message if we're about to close the connection due our buffers
|
||||
becoming full.
|
||||
"""
|
||||
if self.state == ConnectionStates.CLOSED:
|
||||
logger.info("[%s] Not sending, connection closed", self.id())
|
||||
return
|
||||
|
||||
if do_buffer and self.state != ConnectionStates.ESTABLISHED:
|
||||
self._queue_command(cmd)
|
||||
return
|
||||
|
||||
outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id)
|
||||
|
||||
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
|
||||
if "\n" in string:
|
||||
raise Exception("Unexpected newline in command: %r", string)
|
||||
|
||||
self.sendLine(string.encode("utf-8"))
|
||||
|
||||
self.last_sent_command = self.clock.time_msec()
|
||||
|
||||
def _queue_command(self, cmd):
|
||||
"""Queue the command until the connection is ready to write to again.
|
||||
"""
|
||||
logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
|
||||
self.pending_commands.append(cmd)
|
||||
|
||||
if len(self.pending_commands) > self.max_line_buffer:
|
||||
# The other side is failing to keep up and out buffers are becoming
|
||||
# full, so lets close the connection.
|
||||
# XXX: should we squawk more loudly?
|
||||
logger.error("[%s] Remote failed to keep up", self.id())
|
||||
self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False)
|
||||
self.close()
|
||||
|
||||
def _send_pending_commands(self):
|
||||
"""Send any queued commandes
|
||||
"""
|
||||
pending = self.pending_commands
|
||||
self.pending_commands = []
|
||||
for cmd in pending:
|
||||
self.send_command(cmd)
|
||||
|
||||
def on_PING(self, line):
|
||||
self.received_ping = True
|
||||
|
||||
def on_ERROR(self, cmd):
|
||||
logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)
|
||||
|
||||
def pauseProducing(self):
|
||||
"""This is called when both the kernel send buffer and the twisted
|
||||
tcp connection send buffers have become full.
|
||||
|
||||
We don't actually have any control over those sizes, so we buffer some
|
||||
commands ourselves before knifing the connection due to the remote
|
||||
failing to keep up.
|
||||
"""
|
||||
logger.info("[%s] Pause producing", self.id())
|
||||
self.state = ConnectionStates.PAUSED
|
||||
|
||||
def resumeProducing(self):
|
||||
"""The remote has caught up after we started buffering!
|
||||
"""
|
||||
logger.info("[%s] Resume producing", self.id())
|
||||
self.state = ConnectionStates.ESTABLISHED
|
||||
self._send_pending_commands()
|
||||
|
||||
def stopProducing(self):
|
||||
"""We're never going to send any more data (normally because either
|
||||
we or the remote has closed the connection)
|
||||
"""
|
||||
logger.info("[%s] Stop producing", self.id())
|
||||
self.on_connection_closed()
|
||||
|
||||
def connectionLost(self, reason):
|
||||
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
|
||||
|
||||
try:
|
||||
# Remove us from list of connections to be monitored
|
||||
connected_connections.remove(self)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Stop the looping call sending pings.
|
||||
if self._send_ping_loop and self._send_ping_loop.running:
|
||||
self._send_ping_loop.stop()
|
||||
|
||||
self.on_connection_closed()
|
||||
|
||||
def on_connection_closed(self):
|
||||
logger.info("[%s] Connection was closed", self.id())
|
||||
|
||||
self.state = ConnectionStates.CLOSED
|
||||
self.pending_commands = []
|
||||
|
||||
if self.transport:
|
||||
self.transport.unregisterProducer()
|
||||
|
||||
def __str__(self):
|
||||
return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % (
|
||||
self.name, self.conn_id, self.addr,
|
||||
)
|
||||
|
||||
def id(self):
|
||||
return "%s-%s" % (self.name, self.conn_id)
|
||||
|
||||
|
||||
class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
|
||||
VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS
|
||||
|
||||
def __init__(self, server_name, clock, streamer, addr):
|
||||
BaseReplicationStreamProtocol.__init__(self, clock) # Old style class
|
||||
|
||||
self.server_name = server_name
|
||||
self.streamer = streamer
|
||||
self.addr = addr
|
||||
|
||||
# The streams the client has subscribed to and is up to date with
|
||||
self.replication_streams = set()
|
||||
|
||||
# The streams the client is currently subscribing to.
|
||||
self.connecting_streams = set()
|
||||
|
||||
# Map from stream name to list of updates to send once we've finished
|
||||
# subscribing the client to the stream.
|
||||
self.pending_rdata = {}
|
||||
|
||||
def connectionMade(self):
|
||||
self.send_command(ServerCommand(self.server_name))
|
||||
BaseReplicationStreamProtocol.connectionMade(self)
|
||||
self.streamer.new_connection(self)
|
||||
|
||||
def on_NAME(self, cmd):
|
||||
self.name = cmd.data
|
||||
|
||||
def on_USER_SYNC(self, cmd):
|
||||
self.streamer.on_user_sync(
|
||||
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
|
||||
)
|
||||
|
||||
def on_REPLICATE(self, cmd):
|
||||
stream_name = cmd.stream_name
|
||||
token = cmd.token
|
||||
|
||||
if stream_name == "ALL":
|
||||
# Subscribe to all streams we're publishing to.
|
||||
for stream in self.streamer.streams_by_name.iterkeys():
|
||||
self.subscribe_to_stream(stream, token)
|
||||
else:
|
||||
self.subscribe_to_stream(stream_name, token)
|
||||
|
||||
def on_FEDERATION_ACK(self, cmd):
|
||||
self.streamer.federation_ack(cmd.token)
|
||||
|
||||
def on_REMOVE_PUSHER(self, cmd):
|
||||
self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
|
||||
|
||||
def onINVALIDATE_CACHE(self, cmd):
|
||||
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def subscribe_to_stream(self, stream_name, token):
|
||||
"""Subscribe the remote to a streams.
|
||||
|
||||
This invloves checking if they've missed anything and sending those
|
||||
updates down if they have. During that time new updates for the stream
|
||||
are queued and sent once we've sent down any missed updates.
|
||||
"""
|
||||
self.replication_streams.discard(stream_name)
|
||||
self.connecting_streams.add(stream_name)
|
||||
|
||||
try:
|
||||
# Get missing updates
|
||||
updates, current_token = yield self.streamer.get_stream_updates(
|
||||
stream_name, token,
|
||||
)
|
||||
|
||||
# Send all the missing updates
|
||||
for update in updates:
|
||||
token, row = update[0], update[1]
|
||||
self.send_command(RdataCommand(stream_name, token, row))
|
||||
|
||||
# Now we can send any updates that came in while we were subscribing
|
||||
pending_rdata = self.pending_rdata.pop(stream_name, [])
|
||||
for token, update in pending_rdata:
|
||||
self.send_command(RdataCommand(stream_name, token, update))
|
||||
|
||||
# We send a POSITION command to ensure that they have an up to
|
||||
# date token (especially useful if we didn't send any updates
|
||||
# above)
|
||||
self.send_command(PositionCommand(stream_name, current_token))
|
||||
|
||||
# They're now fully subscribed
|
||||
self.replication_streams.add(stream_name)
|
||||
except Exception as e:
|
||||
logger.exception("[%s] Failed to handle REPLICATE command", self.id())
|
||||
self.send_error("failed to handle replicate: %r", e)
|
||||
finally:
|
||||
self.connecting_streams.discard(stream_name)
|
||||
|
||||
def stream_update(self, stream_name, token, data):
|
||||
"""Called when a new update is available to stream to clients.
|
||||
|
||||
We need to check if the client is interested in the stream or not
|
||||
"""
|
||||
if stream_name in self.replication_streams:
|
||||
# The client is subscribed to the stream
|
||||
self.send_command(RdataCommand(stream_name, token, data))
|
||||
elif stream_name in self.connecting_streams:
|
||||
# The client is being subscribed to the stream
|
||||
logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
|
||||
self.pending_rdata.setdefault(stream_name, []).append((token, data))
|
||||
else:
|
||||
# The client isn't subscribed
|
||||
logger.debug("[%s] Dropping RDATA %r %r", self.id(), stream_name, token)
|
||||
|
||||
def send_sync(self, data):
|
||||
self.send_command(SyncCommand(data))
|
||||
|
||||
def on_connection_closed(self):
|
||||
BaseReplicationStreamProtocol.on_connection_closed(self)
|
||||
logger.info("[%s] Replication connection closed", self.id())
|
||||
self.streamer.lost_connection(self)
|
||||
|
||||
|
||||
class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||
VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS
|
||||
VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS
|
||||
|
||||
def __init__(self, client_name, server_name, clock, handler):
|
||||
BaseReplicationStreamProtocol.__init__(self, clock)
|
||||
|
||||
self.client_name = client_name
|
||||
self.server_name = server_name
|
||||
self.handler = handler
|
||||
|
||||
# Map of stream to batched updates. See RdataCommand for info on how
|
||||
# batching works.
|
||||
self.pending_batches = {}
|
||||
|
||||
def connectionMade(self):
|
||||
self.send_command(NameCommand(self.client_name))
|
||||
BaseReplicationStreamProtocol.connectionMade(self)
|
||||
|
||||
# Once we've connected subscribe to the necessary streams
|
||||
for stream_name, token in self.handler.get_streams_to_replicate().iteritems():
|
||||
self.replicate(stream_name, token)
|
||||
|
||||
# Tell the server if we have any users currently syncing (should only
|
||||
# happen on synchrotrons)
|
||||
currently_syncing = self.handler.get_currently_syncing_users()
|
||||
now = self.clock.time_msec()
|
||||
for user_id in currently_syncing:
|
||||
self.send_command(UserSyncCommand(user_id, True, now))
|
||||
|
||||
# We've now finished connecting to so inform the client handler
|
||||
self.handler.update_connection(self)
|
||||
|
||||
def on_SERVER(self, cmd):
|
||||
if cmd.data != self.server_name:
|
||||
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
|
||||
self.transport.abortConnection()
|
||||
|
||||
def on_RDATA(self, cmd):
|
||||
try:
|
||||
row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"[%s] Failed to parse RDATA: %r %r",
|
||||
self.id(), cmd.stream_name, cmd.row
|
||||
)
|
||||
raise
|
||||
|
||||
if cmd.token is None:
|
||||
# I.e. this is part of a batch of updates for this stream. Batch
|
||||
# until we get an update for the stream with a non None token
|
||||
self.pending_batches.setdefault(cmd.stream_name, []).append(row)
|
||||
else:
|
||||
# Check if this is the last of a batch of updates
|
||||
rows = self.pending_batches.pop(cmd.stream_name, [])
|
||||
rows.append(row)
|
||||
|
||||
self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
|
||||
|
||||
def on_POSITION(self, cmd):
|
||||
self.handler.on_position(cmd.stream_name, cmd.token)
|
||||
|
||||
def on_SYNC(self, cmd):
|
||||
self.handler.on_sync(cmd.data)
|
||||
|
||||
def replicate(self, stream_name, token):
|
||||
"""Send the subscription request to the server
|
||||
"""
|
||||
if stream_name not in STREAMS_MAP:
|
||||
raise Exception("Invalid stream name %r" % (stream_name,))
|
||||
|
||||
logger.info(
|
||||
"[%s] Subscribing to replication stream: %r from %r",
|
||||
self.id(), stream_name, token
|
||||
)
|
||||
|
||||
self.send_command(ReplicateCommand(stream_name, token))
|
||||
|
||||
def on_connection_closed(self):
|
||||
BaseReplicationStreamProtocol.on_connection_closed(self)
|
||||
self.handler.update_connection(None)
|
||||
|
||||
|
||||
# The following simply registers metrics for the replication connections
|
||||
|
||||
metrics.register_callback(
|
||||
"pending_commands",
|
||||
lambda: {
|
||||
(p.name, p.conn_id): len(p.pending_commands)
|
||||
for p in connected_connections
|
||||
},
|
||||
labels=["name", "conn_id"],
|
||||
)
|
||||
|
||||
|
||||
def transport_buffer_size(protocol):
|
||||
if protocol.transport:
|
||||
size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen
|
||||
return size
|
||||
return 0
|
||||
|
||||
|
||||
metrics.register_callback(
|
||||
"transport_send_buffer",
|
||||
lambda: {
|
||||
(p.name, p.conn_id): transport_buffer_size(p)
|
||||
for p in connected_connections
|
||||
},
|
||||
labels=["name", "conn_id"],
|
||||
)
|
||||
|
||||
|
||||
def transport_kernel_read_buffer_size(protocol, read=True):
|
||||
SIOCINQ = 0x541B
|
||||
SIOCOUTQ = 0x5411
|
||||
|
||||
if protocol.transport:
|
||||
fileno = protocol.transport.getHandle().fileno()
|
||||
if read:
|
||||
op = SIOCINQ
|
||||
else:
|
||||
op = SIOCOUTQ
|
||||
size = struct.unpack("I", fcntl.ioctl(fileno, op, '\0\0\0\0'))[0]
|
||||
return size
|
||||
return 0
|
||||
|
||||
|
||||
metrics.register_callback(
|
||||
"transport_kernel_send_buffer",
|
||||
lambda: {
|
||||
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
|
||||
for p in connected_connections
|
||||
},
|
||||
labels=["name", "conn_id"],
|
||||
)
|
||||
|
||||
|
||||
metrics.register_callback(
|
||||
"transport_kernel_read_buffer",
|
||||
lambda: {
|
||||
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
|
||||
for p in connected_connections
|
||||
},
|
||||
labels=["name", "conn_id"],
|
||||
)
|
290
synapse/replication/tcp/resource.py
Normal file
290
synapse/replication/tcp/resource.py
Normal file
|
@ -0,0 +1,290 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""The server side of the replication stream.
|
||||
"""
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.protocol import Factory
|
||||
|
||||
from streams import STREAMS_MAP, FederationStream
|
||||
from protocol import ServerReplicationStreamProtocol
|
||||
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
|
||||
import logging
|
||||
import synapse.metrics
|
||||
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||
stream_updates_counter = metrics.register_counter(
|
||||
"stream_updates", labels=["stream_name"]
|
||||
)
|
||||
user_sync_counter = metrics.register_counter("user_sync")
|
||||
federation_ack_counter = metrics.register_counter("federation_ack")
|
||||
remove_pusher_counter = metrics.register_counter("remove_pusher")
|
||||
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplicationStreamProtocolFactory(Factory):
|
||||
"""Factory for new replication connections.
|
||||
"""
|
||||
def __init__(self, hs):
|
||||
self.streamer = ReplicationStreamer(hs)
|
||||
self.clock = hs.get_clock()
|
||||
self.server_name = hs.config.server_name
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
return ServerReplicationStreamProtocol(
|
||||
self.server_name,
|
||||
self.clock,
|
||||
self.streamer,
|
||||
addr
|
||||
)
|
||||
|
||||
|
||||
class ReplicationStreamer(object):
|
||||
"""Handles replication connections.
|
||||
|
||||
This needs to be poked when new replication data may be available. When new
|
||||
data is available it will propagate to all connected clients.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
# Current connections.
|
||||
self.connections = []
|
||||
|
||||
metrics.register_callback("total_connections", lambda: len(self.connections))
|
||||
|
||||
# List of streams that clients can subscribe to.
|
||||
# We only support federation stream if federation sending hase been
|
||||
# disabled on the master.
|
||||
self.streams = [
|
||||
stream(hs) for stream in STREAMS_MAP.itervalues()
|
||||
if stream != FederationStream or not hs.config.send_federation
|
||||
]
|
||||
|
||||
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
|
||||
|
||||
metrics.register_callback(
|
||||
"connections_per_stream",
|
||||
lambda: {
|
||||
(stream_name,): len([
|
||||
conn for conn in self.connections
|
||||
if stream_name in conn.replication_streams
|
||||
])
|
||||
for stream_name in self.streams_by_name
|
||||
},
|
||||
labels=["stream_name"],
|
||||
)
|
||||
|
||||
self.federation_sender = None
|
||||
if not hs.config.send_federation:
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
|
||||
hs.get_notifier().add_replication_callback(self.on_notifier_poke)
|
||||
|
||||
# Keeps track of whether we are currently checking for updates
|
||||
self.is_looping = False
|
||||
self.pending_updates = False
|
||||
|
||||
reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
|
||||
|
||||
def on_shutdown(self):
|
||||
# close all connections on shutdown
|
||||
for conn in self.connections:
|
||||
conn.send_error("server shutting down")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_notifier_poke(self):
|
||||
"""Checks if there is actually any new data and sends it to the
|
||||
connections if there are.
|
||||
|
||||
This should get called each time new data is available, even if it
|
||||
is currently being executed, so that nothing gets missed
|
||||
"""
|
||||
if not self.connections:
|
||||
# Don't bother if nothing is listening. We still need to advance
|
||||
# the stream tokens otherwise they'll fall beihind forever
|
||||
for stream in self.streams:
|
||||
stream.advance_current_token()
|
||||
return
|
||||
|
||||
# If we're in the process of checking for new updates, mark that fact
|
||||
# and return
|
||||
if self.is_looping:
|
||||
logger.debug("Noitifier poke loop already running")
|
||||
self.pending_updates = True
|
||||
return
|
||||
|
||||
self.pending_updates = True
|
||||
self.is_looping = True
|
||||
|
||||
try:
|
||||
# Keep looping while there have been pokes about potential updates.
|
||||
# This protects against the race where a stream we already checked
|
||||
# gets an update while we're handling other streams.
|
||||
while self.pending_updates:
|
||||
self.pending_updates = False
|
||||
|
||||
with Measure(self.clock, "repl.stream.get_updates"):
|
||||
# First we tell the streams that they should update their
|
||||
# current tokens.
|
||||
for stream in self.streams:
|
||||
stream.advance_current_token()
|
||||
|
||||
for stream in self.streams:
|
||||
if stream.last_token == stream.upto_token:
|
||||
continue
|
||||
|
||||
logger.debug(
|
||||
"Getting stream: %s: %s -> %s",
|
||||
stream.NAME, stream.last_token, stream.upto_token
|
||||
)
|
||||
updates, current_token = yield stream.get_updates()
|
||||
|
||||
logger.debug(
|
||||
"Sending %d updates to %d connections",
|
||||
len(updates), len(self.connections),
|
||||
)
|
||||
|
||||
if updates:
|
||||
logger.info(
|
||||
"Streaming: %s -> %s", stream.NAME, updates[-1][0]
|
||||
)
|
||||
stream_updates_counter.inc_by(len(updates), stream.NAME)
|
||||
|
||||
# Some streams return multiple rows with the same stream IDs,
|
||||
# we need to make sure they get sent out in batches. We do
|
||||
# this by setting the current token to all but the last of
|
||||
# a series of updates with the same token to have a None
|
||||
# token. See RdataCommand for more details.
|
||||
batched_updates = _batch_updates(updates)
|
||||
|
||||
for conn in self.connections:
|
||||
for token, row in batched_updates:
|
||||
try:
|
||||
conn.stream_update(stream.NAME, token, row)
|
||||
except Exception:
|
||||
logger.exception("Failed to replicate")
|
||||
|
||||
logger.debug("No more pending updates, breaking poke loop")
|
||||
finally:
|
||||
self.pending_updates = False
|
||||
self.is_looping = False
|
||||
|
||||
@measure_func("repl.get_stream_updates")
|
||||
def get_stream_updates(self, stream_name, token):
|
||||
"""For a given stream get all updates since token. This is called when
|
||||
a client first subscribes to a stream.
|
||||
"""
|
||||
stream = self.streams_by_name.get(stream_name, None)
|
||||
if not stream:
|
||||
raise Exception("unknown stream %s", stream_name)
|
||||
|
||||
return stream.get_updates_since(token)
|
||||
|
||||
@measure_func("repl.federation_ack")
|
||||
def federation_ack(self, token):
|
||||
"""We've received an ack for federation stream from a client.
|
||||
"""
|
||||
federation_ack_counter.inc()
|
||||
if self.federation_sender:
|
||||
self.federation_sender.federation_ack(token)
|
||||
|
||||
@measure_func("repl.on_user_sync")
|
||||
def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
|
||||
"""A client has started/stopped syncing on a worker.
|
||||
"""
|
||||
user_sync_counter.inc()
|
||||
self.presence_handler.update_external_syncs_row(
|
||||
conn_id, user_id, is_syncing, last_sync_ms,
|
||||
)
|
||||
|
||||
@measure_func("repl.on_remove_pusher")
|
||||
@defer.inlineCallbacks
|
||||
def on_remove_pusher(self, app_id, push_key, user_id):
|
||||
"""A client has asked us to remove a pusher
|
||||
"""
|
||||
remove_pusher_counter.inc()
|
||||
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
||||
app_id=app_id, pushkey=push_key, user_id=user_id
|
||||
)
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
@measure_func("repl.on_invalidate_cache")
|
||||
def on_invalidate_cache(self, cache_func, keys):
|
||||
"""The client has asked us to invalidate a cache
|
||||
"""
|
||||
invalidate_cache_counter.inc()
|
||||
getattr(self.store, cache_func).invalidate(tuple(keys))
|
||||
|
||||
def send_sync_to_all_connections(self, data):
|
||||
"""Sends a SYNC command to all clients.
|
||||
|
||||
Used in tests.
|
||||
"""
|
||||
for conn in self.connections:
|
||||
conn.send_sync(data)
|
||||
|
||||
def new_connection(self, connection):
|
||||
"""A new client connection has been established
|
||||
"""
|
||||
self.connections.append(connection)
|
||||
|
||||
def lost_connection(self, connection):
|
||||
"""A client connection has been lost
|
||||
"""
|
||||
try:
|
||||
self.connections.remove(connection)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# We need to tell the presence handler that the connection has been
|
||||
# lost so that it can handle any ongoing syncs on that connection.
|
||||
self.presence_handler.update_external_syncs_clear(connection.conn_id)
|
||||
|
||||
|
||||
def _batch_updates(updates):
|
||||
"""Takes a list of updates of form [(token, row)] and sets the token to
|
||||
None for all rows where the next row has the same token. This is used to
|
||||
implement batching.
|
||||
|
||||
For example:
|
||||
|
||||
[(1, _), (1, _), (2, _), (3, _), (3, _)]
|
||||
|
||||
becomes:
|
||||
|
||||
[(None, _), (1, _), (2, _), (None, _), (3, _)]
|
||||
"""
|
||||
if not updates:
|
||||
return []
|
||||
|
||||
new_updates = []
|
||||
for i, update in enumerate(updates[:-1]):
|
||||
if update[0] == updates[i + 1][0]:
|
||||
new_updates.append((None, update[1]))
|
||||
else:
|
||||
new_updates.append(update)
|
||||
|
||||
new_updates.append(updates[-1])
|
||||
return new_updates
|
409
synapse/replication/tcp/streams.py
Normal file
409
synapse/replication/tcp/streams.py
Normal file
|
@ -0,0 +1,409 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Defines all the valid streams that clients can subscribe to, and the format
|
||||
of the rows returned by each stream.
|
||||
|
||||
Each stream is defined by the following information:
|
||||
|
||||
stream name: The name of the stream
|
||||
row type: The type that is used to serialise/deserialse the row
|
||||
current_token: The function that returns the current token for the stream
|
||||
update_function: The function that returns a list of updates between two tokens
|
||||
"""
|
||||
|
||||
from twisted.internet import defer
|
||||
from collections import namedtuple
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
MAX_EVENTS_BEHIND = 10000
|
||||
|
||||
|
||||
EventStreamRow = namedtuple("EventStreamRow",
|
||||
("event_id", "room_id", "type", "state_key", "redacts"))
|
||||
BackfillStreamRow = namedtuple("BackfillStreamRow",
|
||||
("event_id", "room_id", "type", "state_key", "redacts"))
|
||||
PresenceStreamRow = namedtuple("PresenceStreamRow",
|
||||
("user_id", "state", "last_active_ts",
|
||||
"last_federation_update_ts", "last_user_sync_ts",
|
||||
"status_msg", "currently_active"))
|
||||
TypingStreamRow = namedtuple("TypingStreamRow",
|
||||
("room_id", "user_ids"))
|
||||
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
|
||||
("room_id", "receipt_type", "user_id", "event_id",
|
||||
"data"))
|
||||
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
|
||||
PushersStreamRow = namedtuple("PushersStreamRow",
|
||||
("user_id", "app_id", "pushkey", "deleted",))
|
||||
CachesStreamRow = namedtuple("CachesStreamRow",
|
||||
("cache_func", "keys", "invalidation_ts",))
|
||||
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
|
||||
("room_id", "visibility", "appservice_id",
|
||||
"network_id",))
|
||||
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
|
||||
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
|
||||
FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
|
||||
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
|
||||
("user_id", "room_id", "data"))
|
||||
AccountDataStreamRow = namedtuple("AccountDataStream",
|
||||
("user_id", "room_id", "data_type", "data"))
|
||||
|
||||
|
||||
class Stream(object):
|
||||
"""Base class for the streams.
|
||||
|
||||
Provides a `get_updates()` function that returns new updates since the last
|
||||
time it was called up until the point `advance_current_token` was called.
|
||||
"""
|
||||
NAME = None # The name of the stream
|
||||
ROW_TYPE = None # The type of the row
|
||||
_LIMITED = True # Whether the update function takes a limit
|
||||
|
||||
def __init__(self, hs):
|
||||
# The token from which we last asked for updates
|
||||
self.last_token = self.current_token()
|
||||
|
||||
# The token that we will get updates up to
|
||||
self.upto_token = self.current_token()
|
||||
|
||||
def advance_current_token(self):
|
||||
"""Updates `upto_token` to "now", which updates up until which point
|
||||
get_updates[_since] will fetch rows till.
|
||||
"""
|
||||
self.upto_token = self.current_token()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_updates(self):
|
||||
"""Gets all updates since the last time this function was called (or
|
||||
since the stream was constructed if it hadn't been called before),
|
||||
until the `upto_token`
|
||||
|
||||
Returns:
|
||||
(list(ROW_TYPE), int): list of updates plus the token used as an
|
||||
upper bound of the updates (i.e. the "current token")
|
||||
"""
|
||||
updates, current_token = yield self.get_updates_since(self.last_token)
|
||||
self.last_token = current_token
|
||||
|
||||
defer.returnValue((updates, current_token))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_updates_since(self, from_token):
|
||||
"""Like get_updates except allows specifying from when we should
|
||||
stream updates
|
||||
|
||||
Returns:
|
||||
(list(ROW_TYPE), int): list of updates plus the token used as an
|
||||
upper bound of the updates (i.e. the "current token")
|
||||
"""
|
||||
if from_token in ("NOW", "now"):
|
||||
defer.returnValue(([], self.upto_token))
|
||||
|
||||
current_token = self.upto_token
|
||||
|
||||
from_token = int(from_token)
|
||||
|
||||
if from_token == current_token:
|
||||
defer.returnValue(([], current_token))
|
||||
|
||||
if self._LIMITED:
|
||||
rows = yield self.update_function(
|
||||
from_token, current_token,
|
||||
limit=MAX_EVENTS_BEHIND + 1,
|
||||
)
|
||||
|
||||
if len(rows) >= MAX_EVENTS_BEHIND:
|
||||
raise Exception("stream %s has fallen behined" % (self.NAME))
|
||||
else:
|
||||
rows = yield self.update_function(
|
||||
from_token, current_token,
|
||||
)
|
||||
|
||||
updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
|
||||
|
||||
defer.returnValue((updates, current_token))
|
||||
|
||||
def current_token(self):
|
||||
"""Gets the current token of the underlying streams. Should be provided
|
||||
by the sub classes
|
||||
|
||||
Returns:
|
||||
int
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def update_function(self, from_token, current_token, limit=None):
|
||||
"""Get updates between from_token and to_token. If Stream._LIMITED is
|
||||
True then limit is provided, otherwise it's not.
|
||||
|
||||
Returns:
|
||||
Deferred(list(tuple)): the first entry in the tuple is the token for
|
||||
that update, and the rest of the tuple gets used to construct
|
||||
a ``ROW_TYPE`` instance
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class EventsStream(Stream):
|
||||
"""We received a new event, or an event went from being an outlier to not
|
||||
"""
|
||||
NAME = "events"
|
||||
ROW_TYPE = EventStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
self.current_token = store.get_current_events_token
|
||||
self.update_function = store.get_all_new_forward_event_rows
|
||||
|
||||
super(EventsStream, self).__init__(hs)
|
||||
|
||||
|
||||
class BackfillStream(Stream):
|
||||
"""We fetched some old events and either we had never seen that event before
|
||||
or it went from being an outlier to not.
|
||||
"""
|
||||
NAME = "backfill"
|
||||
ROW_TYPE = BackfillStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
self.current_token = store.get_current_backfill_token
|
||||
self.update_function = store.get_all_new_backfill_event_rows
|
||||
|
||||
super(BackfillStream, self).__init__(hs)
|
||||
|
||||
|
||||
class PresenceStream(Stream):
|
||||
NAME = "presence"
|
||||
_LIMITED = False
|
||||
ROW_TYPE = PresenceStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
presence_handler = hs.get_presence_handler()
|
||||
|
||||
self.current_token = store.get_current_presence_token
|
||||
self.update_function = presence_handler.get_all_presence_updates
|
||||
|
||||
super(PresenceStream, self).__init__(hs)
|
||||
|
||||
|
||||
class TypingStream(Stream):
|
||||
NAME = "typing"
|
||||
_LIMITED = False
|
||||
ROW_TYPE = TypingStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
typing_handler = hs.get_typing_handler()
|
||||
|
||||
self.current_token = typing_handler.get_current_token
|
||||
self.update_function = typing_handler.get_all_typing_updates
|
||||
|
||||
super(TypingStream, self).__init__(hs)
|
||||
|
||||
|
||||
class ReceiptsStream(Stream):
|
||||
NAME = "receipts"
|
||||
ROW_TYPE = ReceiptsStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_max_receipt_stream_id
|
||||
self.update_function = store.get_all_updated_receipts
|
||||
|
||||
super(ReceiptsStream, self).__init__(hs)
|
||||
|
||||
|
||||
class PushRulesStream(Stream):
|
||||
"""A user has changed their push rules
|
||||
"""
|
||||
NAME = "push_rules"
|
||||
ROW_TYPE = PushRulesStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
super(PushRulesStream, self).__init__(hs)
|
||||
|
||||
def current_token(self):
|
||||
push_rules_token, _ = self.store.get_push_rules_stream_token()
|
||||
return push_rules_token
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_function(self, from_token, to_token, limit):
|
||||
rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
|
||||
defer.returnValue([(row[0], row[2]) for row in rows])
|
||||
|
||||
|
||||
class PushersStream(Stream):
|
||||
"""A user has added/changed/removed a pusher
|
||||
"""
|
||||
NAME = "pushers"
|
||||
ROW_TYPE = PushersStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_pushers_stream_token
|
||||
self.update_function = store.get_all_updated_pushers_rows
|
||||
|
||||
super(PushersStream, self).__init__(hs)
|
||||
|
||||
|
||||
class CachesStream(Stream):
|
||||
"""A cache was invalidated on the master and no other stream would invalidate
|
||||
the cache on the workers
|
||||
"""
|
||||
NAME = "caches"
|
||||
ROW_TYPE = CachesStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_cache_stream_token
|
||||
self.update_function = store.get_all_updated_caches
|
||||
|
||||
super(CachesStream, self).__init__(hs)
|
||||
|
||||
|
||||
class PublicRoomsStream(Stream):
|
||||
"""The public rooms list changed
|
||||
"""
|
||||
NAME = "public_rooms"
|
||||
ROW_TYPE = PublicRoomsStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_current_public_room_stream_id
|
||||
self.update_function = store.get_all_new_public_rooms
|
||||
|
||||
super(PublicRoomsStream, self).__init__(hs)
|
||||
|
||||
|
||||
class DeviceListsStream(Stream):
|
||||
"""Someone added/changed/removed a device
|
||||
"""
|
||||
NAME = "device_lists"
|
||||
_LIMITED = False
|
||||
ROW_TYPE = DeviceListsStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_device_stream_token
|
||||
self.update_function = store.get_all_device_list_changes_for_remotes
|
||||
|
||||
super(DeviceListsStream, self).__init__(hs)
|
||||
|
||||
|
||||
class ToDeviceStream(Stream):
|
||||
"""New to_device messages for a client
|
||||
"""
|
||||
NAME = "to_device"
|
||||
ROW_TYPE = ToDeviceStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_to_device_stream_token
|
||||
self.update_function = store.get_all_new_device_messages
|
||||
|
||||
super(ToDeviceStream, self).__init__(hs)
|
||||
|
||||
|
||||
class FederationStream(Stream):
|
||||
"""Data to be sent over federation. Only available when master has federation
|
||||
sending disabled.
|
||||
"""
|
||||
NAME = "federation"
|
||||
ROW_TYPE = FederationStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
federation_sender = hs.get_federation_sender()
|
||||
|
||||
self.current_token = federation_sender.get_current_token
|
||||
self.update_function = federation_sender.get_replication_rows
|
||||
|
||||
super(FederationStream, self).__init__(hs)
|
||||
|
||||
|
||||
class TagAccountDataStream(Stream):
|
||||
"""Someone added/removed a tag for a room
|
||||
"""
|
||||
NAME = "tag_account_data"
|
||||
ROW_TYPE = TagAccountDataStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_max_account_data_stream_id
|
||||
self.update_function = store.get_all_updated_tags
|
||||
|
||||
super(TagAccountDataStream, self).__init__(hs)
|
||||
|
||||
|
||||
class AccountDataStream(Stream):
|
||||
"""Global or per room account data was changed
|
||||
"""
|
||||
NAME = "account_data"
|
||||
ROW_TYPE = AccountDataStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
self.current_token = self.store.get_max_account_data_stream_id
|
||||
|
||||
super(AccountDataStream, self).__init__(hs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_function(self, from_token, to_token, limit):
|
||||
global_results, room_results = yield self.store.get_all_updated_account_data(
|
||||
from_token, from_token, to_token, limit
|
||||
)
|
||||
|
||||
results = list(room_results)
|
||||
results.extend(
|
||||
(stream_id, user_id, None, account_data_type, content,)
|
||||
for stream_id, user_id, account_data_type, content in global_results
|
||||
)
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
|
||||
STREAMS_MAP = {
|
||||
stream.NAME: stream
|
||||
for stream in (
|
||||
EventsStream,
|
||||
BackfillStream,
|
||||
PresenceStream,
|
||||
TypingStream,
|
||||
ReceiptsStream,
|
||||
PushRulesStream,
|
||||
PushersStream,
|
||||
CachesStream,
|
||||
PublicRoomsStream,
|
||||
DeviceListsStream,
|
||||
ToDeviceStream,
|
||||
FederationStream,
|
||||
TagAccountDataStream,
|
||||
AccountDataStream,
|
||||
)
|
||||
}
|
|
@ -28,7 +28,10 @@ class VoipRestServlet(ClientV1RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(
|
||||
request,
|
||||
self.hs.config.turn_allow_guests
|
||||
)
|
||||
|
||||
turnUris = self.hs.config.turn_uris
|
||||
turnSecret = self.hs.config.turn_shared_secret
|
||||
|
|
|
@ -36,7 +36,7 @@ class ThirdPartyProtocolsServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
protocols = yield self.appservice_handler.get_3pe_protocols()
|
||||
defer.returnValue((200, protocols))
|
||||
|
@ -54,7 +54,7 @@ class ThirdPartyProtocolServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, protocol):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
protocols = yield self.appservice_handler.get_3pe_protocols(
|
||||
only_protocol=protocol,
|
||||
|
@ -77,7 +77,7 @@ class ThirdPartyUserServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, protocol):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
fields = request.args
|
||||
fields.pop("access_token", None)
|
||||
|
@ -101,7 +101,7 @@ class ThirdPartyLocationServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, protocol):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
fields = request.args
|
||||
fields.pop("access_token", None)
|
||||
|
|
|
@ -533,7 +533,7 @@ class DeviceStore(SQLBaseStore):
|
|||
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
|
||||
defer.returnValue(set(row[0] for row in rows))
|
||||
|
||||
def get_all_device_list_changes_for_remotes(self, from_key):
|
||||
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
|
||||
"""Return a list of `(stream_id, user_id, destination)` which is the
|
||||
combined list of changes to devices, and which destinations need to be
|
||||
poked. `destination` may be None if no destinations need to be poked.
|
||||
|
@ -541,11 +541,11 @@ class DeviceStore(SQLBaseStore):
|
|||
sql = """
|
||||
SELECT stream_id, user_id, destination FROM device_lists_stream
|
||||
LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
|
||||
WHERE stream_id > ?
|
||||
WHERE ? < stream_id AND stream_id <= ?
|
||||
"""
|
||||
return self._execute(
|
||||
"get_all_device_list_changes_for_remotes", None,
|
||||
sql, from_key,
|
||||
sql, from_key, to_key
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -1770,6 +1770,94 @@ class EventsStore(SQLBaseStore):
|
|||
"""The current minimum token that backfilled events have reached"""
|
||||
return -self._backfill_id_gen.get_current_token()
|
||||
|
||||
def get_current_events_token(self):
|
||||
"""The current maximum token that events have reached"""
|
||||
return self._stream_id_gen.get_current_token()
|
||||
|
||||
def get_all_new_forward_event_rows(self, last_id, current_id, limit):
|
||||
if last_id == current_id:
|
||||
return defer.succeed([])
|
||||
|
||||
def get_all_new_forward_event_rows(txn):
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" state_key, redacts"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events USING (event_id)"
|
||||
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id, limit))
|
||||
new_event_updates = txn.fetchall()
|
||||
|
||||
if len(new_event_updates) == limit:
|
||||
upper_bound = new_event_updates[-1][0]
|
||||
else:
|
||||
upper_bound = current_id
|
||||
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" state_key, redacts"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events USING (event_id)"
|
||||
" WHERE ? < event_stream_ordering"
|
||||
" AND event_stream_ordering <= ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
txn.execute(sql, (last_id, upper_bound))
|
||||
new_event_updates.extend(txn)
|
||||
|
||||
return new_event_updates
|
||||
return self.runInteraction(
|
||||
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
|
||||
)
|
||||
|
||||
def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
|
||||
if last_id == current_id:
|
||||
return defer.succeed([])
|
||||
|
||||
def get_all_new_backfill_event_rows(txn):
|
||||
sql = (
|
||||
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" state_key, redacts"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events USING (event_id)"
|
||||
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (-last_id, -current_id, limit))
|
||||
new_event_updates = txn.fetchall()
|
||||
|
||||
if len(new_event_updates) == limit:
|
||||
upper_bound = new_event_updates[-1][0]
|
||||
else:
|
||||
upper_bound = current_id
|
||||
|
||||
sql = (
|
||||
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" state_key, redacts"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events USING (event_id)"
|
||||
" WHERE ? > event_stream_ordering"
|
||||
" AND event_stream_ordering >= ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
txn.execute(sql, (-last_id, -upper_bound))
|
||||
new_event_updates.extend(txn.fetchall())
|
||||
|
||||
return new_event_updates
|
||||
return self.runInteraction(
|
||||
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
|
||||
)
|
||||
|
||||
@cached(num_args=5, max_entries=10)
|
||||
def get_all_new_events(self, last_backfill_id, last_forward_id,
|
||||
current_backfill_id, current_forward_id, limit):
|
||||
|
|
|
@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore):
|
|||
"get_all_updated_pushers", get_all_updated_pushers_txn
|
||||
)
|
||||
|
||||
def get_all_updated_pushers_rows(self, last_id, current_id, limit):
|
||||
"""Get all the pushers that have changed between the given tokens.
|
||||
|
||||
Returns:
|
||||
Deferred(list(tuple)): each tuple consists of:
|
||||
stream_id (str)
|
||||
user_id (str)
|
||||
app_id (str)
|
||||
pushkey (str)
|
||||
was_deleted (bool): whether the pusher was added/updated (False)
|
||||
or deleted (True)
|
||||
"""
|
||||
|
||||
if last_id == current_id:
|
||||
return defer.succeed([])
|
||||
|
||||
def get_all_updated_pushers_rows_txn(txn):
|
||||
sql = (
|
||||
"SELECT id, user_name, app_id, pushkey"
|
||||
" FROM pushers"
|
||||
" WHERE ? < id AND id <= ?"
|
||||
" ORDER BY id ASC LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id, limit))
|
||||
results = [list(row) + [False] for row in txn]
|
||||
|
||||
sql = (
|
||||
"SELECT stream_id, user_id, app_id, pushkey"
|
||||
" FROM deleted_pushers"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id, limit))
|
||||
|
||||
results.extend(list(row) + [True] for row in txn)
|
||||
results.sort() # Sort so that they're ordered by stream id
|
||||
|
||||
return results
|
||||
return self.runInteraction(
|
||||
"get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=1, max_entries=15000)
|
||||
def get_if_user_has_pusher(self, user_id):
|
||||
# This only exists for the cachedList decorator
|
||||
|
|
|
@ -334,12 +334,8 @@ def preserve_fn(f):
|
|||
LoggingContext.set_current_context(LoggingContext.sentinel)
|
||||
return result
|
||||
|
||||
# XXX: why is this here rather than inside g? surely we want to preserve
|
||||
# the context from the time the function was called, not when it was
|
||||
# wrapped?
|
||||
current = LoggingContext.current_context()
|
||||
|
||||
def g(*args, **kwargs):
|
||||
current = LoggingContext.current_context()
|
||||
res = f(*args, **kwargs)
|
||||
if isinstance(res, defer.Deferred) and not res.called:
|
||||
# The function will have reset the context before returning, so
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue