For in memory streams when fetching updates on workers we need to query the source of the stream, which currently is hard coded to be master. This PR threads through the source instance we received via `POSITION` through to the update function in each stream, which can then be passed to the replication client for in memory streams.
We move the processing of typing and federation replication traffic into their handlers so that `Stream.current_token()` points to a valid token. This allows us to remove `get_streams_to_replicate()` and `stream_positions()`.
This is primarily for allowing us to send those commands from workers, but for now simply allows us to ignore echoed RDATA/POSITION commands that we sent (we get echoes of sent commands when using redis). Currently we log a WARNING on the master process every time we receive an echoed RDATA.
For direct TCP connections we need the master to relay REMOTE_SERVER_UP
commands to the other connections so that all instances get notified
about it. The old implementation just relayed to all connections,
assuming that sending back to the original sender of the command was
safe. This is not true for redis, where commands sent get echoed back to
the sender, which was causing master to effectively infinite loop
sending and then re-receiving REMOTE_SERVER_UP commands that it sent.
The fix is to ensure that we only relay to *other* connections and not
to the connection we received the notification from.
Fixes#7334.
* Factor out functions for injecting events into database
I want to add some more flexibility to the tools for injecting events into the
database, and I don't want to clutter up HomeserverTestCase with them, so let's
factor them out to a new file.
* Rework TestReplicationDataHandler
This wasn't very easy to work with: the mock wrapping was largely superfluous,
and it's useful to be able to inspect the received rows, and clear out the
received list.
* Fix AssertionErrors being thrown by EventsStream
Part of the problem was that there was an off-by-one error in the assertion,
but also the limit logic was too simple. Fix it all up and add some tests.
Figuring out how to correctly limit updates from this stream without dropping
entries is far more complicated than just counting the number of rows being
returned. We need to consider each query separately and, if any one query hits
the limit, truncate the results from the others.
I think this also fixes some potentially long-standing bugs where events or
state changes could get missed if we hit the limit on either query.
Long story short: if we're handling presence on the current worker, we shouldn't be sending USER_SYNC commands over replication.
In an attempt to figure out what is going on here, I ended up refactoring some bits of the presencehandler code, so the first 4 commits here are non-functional refactors to move this code slightly closer to sanity. (There's still plenty to do here :/). Suggest reviewing individual commits.
Fixes (I hope) #7257.
Other parts of the code (such as the StreamChangeCache) assume that there will
not be multiple changes with the same stream id.
This code was introduced in #7024, and I hope this fixes#7206.
The general idea here is to get rid of the type: ignore annotations on all of the current_token and update_function assignments, which would have caught #7290.
After a bit of experimentation, it seems like the least-awful way to do this is to pass the offending functions in as parameters to the Stream constructor. Unfortunately that means that the concrete implementations no longer have the same constructor signature as Stream itself, which means that it gets hard to correctly annotate STREAMS_MAP.
I've also introduced a couple of new types, to take out some duplication.
Some of the query functions return generators rather than lists, so we can't
index into the result. Happily we already have a copy of the results.
(think this was introduced in #7024)
`REPLICATE` is now a valid command, and it's nice if you can issue it from the
console without remembering to call it `REPLICATE ` with a trailing space.
Separate `SimpleCommand` from `Command`, so that things which don't want to use
the `data` property don't have to, and thus fix the warnings PyCharm was giving
me about not calling `__init__` in the base class.
The aim here is to move the command handling out of the TCP protocol classes and to also merge the client and server command handling (so that we can reuse them for redis protocol). This PR simply moves the client paths to the new `ReplicationCommandHandler`, a future PR will move the server paths too.
This broke in a recent PR (#7024) and is no longer useful due to all
replication clients implicitly subscribing to all streams, so let's
just remove it.
* Remove `conn_id` usage for UserSyncCommand.
Each tcp replication connection is assigned a "conn_id", which is used
to give an ID to a remotely connected worker. In a redis world, there
will no longer be a one to one mapping between connection and instance,
so instead we need to replace such usages with an ID generated by the
remote instances and included in the replicaiton commands.
This really only effects UserSyncCommand.
* Add CLEAR_USER_SYNCS command that is sent on shutdown.
This should help with the case where a synchrotron gets restarted
gracefully, rather than rely on 5 minute timeout.
This changes the replication protocol so that the server does not send down `RDATA` for rows that happened before the client connected. Instead, the server will send a `POSITION` and clients then query the database (or master out of band) to get up to date.
* Add 'device_lists_outbound_pokes' as extra table.
This makes sure we check all the relevant tables to get the current max
stream ID.
Currently not doing so isn't problematic as the max stream ID in
`device_lists_outbound_pokes` is the same as in `device_lists_stream`,
however that will change.
* Change device lists stream to have one row per id.
This will make it possible to process the streams more incrementally,
avoiding having to process large chunks at once.
* Change device list replication to match new semantics.
Instead of sending down batches of user ID/host tuples, send down a row
per entity (user ID or host).
* Newsfile
* Remove handling of multiple rows per ID
* Fix worker handling
* Comments from review
This is a bit fiddly because it all has to be done on one fell swoop:
* Wherever we create a new event, pass in the room version (and check it matches the format version)
* When we prune an event, use the room version of the unpruned event to create the pruned version.
* When we pass an event over the replication protocol, pass the room version over alongside it, and use it when deserialising the event again.
This makes sure we check all the relevant tables to get the current max
stream ID.
Currently not doing so isn't problematic as the max stream ID in
`device_lists_outbound_pokes` is the same as in `device_lists_stream`,
however that will change.
When we get an invite over federation, store the room version in the rooms table.
The general idea here is that, when we pull the invite out again, we'll want to know what room_version it belongs to (so that we can later redact it if need be). So we need to store it somewhere...
* Port synapse.replication.tcp to async/await
* Newsfile
* Correctly document type of on_<FOO> functions as async
* Don't be overenthusiastic with the asyncing....
Currently we rely on `current_state_events` to figure out what rooms a
user was in and their last membership event in there. However, if the
server leaves the room then the table may be cleaned up and that
information is lost. So lets add a table that separately holds that
information.
When the `/keys/query` API is hit on client_reader worker Synapse may
decide that it needs to resync some remote deivces. Usually this happens
on master, and then gets cached. However, that fails on workers and so
it falls back to fetching devices from remotes directly, which may in
turn fail if the remote is down.
Python will return a tuple whether there are parentheses around the returned values or not.
I'm just sick of my editor complaining about this all over the place :)
Propagate opentracing contexts across workers
Also includes some Convenience modifications to opentracing for servlets, notably:
- Add boolean to skip the whitelisting check on inject
extract methods. - useful when injecting into carriers
locally. Otherwise we'd always have to include our
own servername and whitelist our servername
- start_active_span_from_request instead of header
- Add boolean to decide whether to extract context
from a request to a servlet
--------
- Fix a regression introduced in v1.2.0rc1 which led to incorrect labels on some prometheus metrics. ([\#5734](https://github.com/matrix-org/synapse/issues/5734))
-----BEGIN PGP SIGNATURE-----
iQJHBAABCgAxFiEEgQG31Z317NrSMt0QiISIDS7+X/QFAl04Ur0THGFuZHJld0Bh
bW9yZ2FuLnh5egAKCRCIhIgNLv5f9F4oD/0TY6S/SEd2uAmzor64ojmbX5BOwPzf
j/wzUTrfvuf40EvkNPDpnejNZSvy/ysbaGQaQusv0SQKlV3xrvdn4RuMvnOWVWck
kBsO+lvzOaUTR0KHDxN4y9F5eI2NdPbub4847PPVzyqSIHAd+kolxXS8kSBBhwpL
yfaICWV/AOy5L7xN+JZ9IQpnegVAvUj5DmgXzDHd6VdeiHDVJuARaBgrR5uCkwVS
ZoLRqZ95XV/qiguMAUvPOwyEqht2mwO64989MswP16YYm8oMkB5QA6I5nYnACsTP
qk9YcN/oNvEfQXUhttku6MxK1/4yUMPUhEoDBDH7ebc0440QDtWN+IHTdA6oPVZB
IuStL9YGY16m7Ltx37ZUA4URfNMiSeLHo3zKc/mCAcwxN4HyOjJewtxbG5zKQAOZ
SMs8UcDwGR4zL1hnt8ZDNYtWwfzJBQIdGjoHvjXJEY7/1csTv2lmAwewFTXiqSAr
30GW5ews94kotqBK53zZT6V0F5gHNqgGHniOz1ZpqLLxYLqO3LSAGe97CrqlWUdX
GkhA9tZyweknociD9fyyBmKdcFJ4mL4a+oGI5CMnSMph8UvCY8Y5XMb1T+iYEABI
tA9G3mBvgkLPj+5V+8QggNkBafSigW2Q4FX7enGsDmiiskZOtfeKrAcVkapD4ooi
3I7IW5aetZr2IQ==
=+JBn
-----END PGP SIGNATURE-----
Merge tag 'v1.2.0rc2' into develop
Bugfixes
--------
- Fix a regression introduced in v1.2.0rc1 which led to incorrect labels on some prometheus metrics. ([\#5734](https://github.com/matrix-org/synapse/issues/5734))
* Fix servlet metric names
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
* Remove redundant check
* Cover all return paths
Nothing uses this now, so we can remove the dead code, and clean up the
API.
Since we're changing the shape of the return value anyway, we take the
opportunity to give the method a better name.
This has never been documented, and I'm not sure it's ever been used outside
sytest.
It's quite a lot of poorly-maintained code, so I'd like to get rid of it.
For now I haven't removed the database table; I suggest we leave that for a
future clearout.
Hopefully this time we really will fix#4422.
We need to make sure that the cache on
`get_rooms_for_user_with_stream_ordering` is invalidated *before* the
SyncHandler is notified for the new events, and we can now do so reliably via
the `events` stream.
`__str__` depended on `self.addr`, which was absent from
ClientReplicationStreamProtocol, so attempting to call str on such an object
would raise an exception.
We can calculate the peer addr from the transport, so there is no need for addr
anyway.
* Rate-limiting for registration
* Add unit test for registration rate limiting
* Add config parameters for rate limiting on auth endpoints
* Doc
* Fix doc of rate limiting function
Co-Authored-By: babolivier <contact@brendanabolivier.com>
* Incorporate review
* Fix config parsing
* Fix linting errors
* Set default config for auth rate limiting
* Fix tests
* Add changelog
* Advance reactor instead of mocked clock
* Move parameters to registration specific config and give them more sensible default values
* Remove unused config options
* Don't mock the rate limiter un MAU tests
* Rename _register_with_store into register_with_store
* Make CI happy
* Remove unused import
* Update sample config
* Fix ratelimiting test for py2
* Add non-guest test
If the client failed to process incoming commands during the initial set
up of the replication connection it would immediately disconnect and
reconnect, resulting in a tightloop.
This can happen, for example, when subscribing to a stream that has a
row that is too long in the backlog.
The fix here is to not consider the connection successfully set up until
the client has succesfully subscribed and caught up with the streams.
This ensures that the retry logic timers aren't reset until then,
meaning that if an error does happen during start up the client will
continue backing off before retrying again.
Currently whenever the current state changes in a room invalidate a lot
of caches, which cause *a lot* of traffic over replication. Instead,
lets batch up all those invalidations and send a single poke down
the replication streams.
Hopefully this will reduce load on the master process by substantially
reducing traffic.
This allows registration to be handled by a worker, though the actual
write to the database still happens on master.
Note: due to the in-memory session map all registration requests must be
handled by the same worker.
* Fix replication for room v3
We were not correctly quoting the path fragments over http replication,
which meant that it exploded when the event IDs had a slash in them
* Newsfile
Run the handlers for replication commands as background processes. This should
improve the visibility in our metrics, and reduce the number of "running db
transaction from sentinel context" warnings.
Ideally it means converting the things that fire off deferreds into the night
into things that actually return a Deferred when they are done. I've made a bit
of a stab at this, but it will probably be leaky.
This is the first tranche of support for room versioning. It includes:
* setting the default room version in the config file
* new room_version param on the createRoom API
* storing the version of newly-created rooms in the m.room.create event
* fishing the version of existing rooms out of the m.room.create event
This code brings the SimpleHttpClient into line with the
MatrixFederationHttpClient by having it raise HttpResponseExceptions when a
request fails (rather than trying to parse for matrix errors and maybe raising
MatrixCodeMessageException).
Then, whenever we were checking for MatrixCodeMessageException and turning them
into SynapseErrors, we now need to check for HttpResponseExceptions and call
to_synapse_error.
on_notifier_poke no longer runs synchonously, so we have to do a different hack
to make sure that the replication data has been sent. Let's actually listen for
its arrival.
Adds a `.wrap` method to ResponseCache which wraps up the boilerplate of a
(get, set) pair, and then use it throughout the codebase.
This will be largely non-functional, but does include the following functional
changes:
* federation_server.on_context_state_request: drops use of _server_linearizer
which looked redundant and could cause incorrect cache misses by yielding
between the get and the set.
* RoomListHandler.get_remote_public_room_list(): fixes logcontext leaks
* the wrap function includes some logging. I'm hoping this won't be too noisy
on production.
* Split state group persist into seperate storage func
* Add per database engine code for state group id gen
* Move store_state_group to StateReadStore
This allows other workers to use it, and so resolve state.
* Hook up store_state_group
* Fix tests
* Rename _store_mult_state_groups_txn
* Rename StateGroupReadStore
* Remove redundant _have_persisted_state_group_txn
* Update comments
* Comment compute_event_context
* Set start val for state_group_id_seq
... otherwise we try to recreate old state groups
* Update comments
* Don't store state for outliers
* Update comment
* Update docstring as state groups are ints
... so that we don't need to secretly gut-wrench it for use in the slaved
stores. I haven't done the other stores yet, but we should. I'm tired of the
workers breaking every time we tweak the stores because I forgot to gut-wrench
the right method.
fixes https://github.com/matrix-org/synapse/issues/2655.
Add db_conn parameters to the `__init__` methods of the *Store classes, so that
they are all consistent, which makes the multiple inheritance work correctly
(and so that we can later extract mixins which can be used in the slavedstores)
This is mainly done by moving the calculation of where to send presence
updates from the presence handler to the transaction queue, so we only
need to send the presence event (and not the destinations) across the
replication connection. Before we were duplicating by sending the full
state across once per destination.
Otherwise the streams don't advance and steadily fall behind, so when a
worker does connect either a) they'll be streamed lots of old updates or
b) the connection will fail as the streams are too far behind.