diff --git a/changelog.d/15354.misc b/changelog.d/15354.misc new file mode 100644 index 000000000..862444edf --- /dev/null +++ b/changelog.d/15354.misc @@ -0,0 +1 @@ +Add some clarification to the doc/comments regarding TCP replication. diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index 15df949de..083cda841 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -25,7 +25,7 @@ position of all streams. The server then periodically sends `RDATA` commands which have the format `RDATA `, where the format of `` is defined by the individual streams. The `` is the name of the Synapse process that generated the data -(usually "master"). +(usually "master"). We expect an RDATA for every row in the DB. Error reporting happens by either the client or server sending an ERROR command, and usually the connection will be closed. @@ -107,7 +107,7 @@ reconnect, following the steps above. If the server sends messages faster than the client can consume them the server will first buffer a (fairly large) number of commands and then disconnect the client. This ensures that we don't queue up an unbounded -number of commands in memory and gives us a potential oppurtunity to +number of commands in memory and gives us a potential opportunity to squawk loudly. When/if the client recovers it can reconnect to the server and ask for missed messages. @@ -122,7 +122,7 @@ since these include tokens which can be used to restart the stream on connection errors. The client should keep track of the token in the last RDATA command -received for each stream so that on reconneciton it can start streaming +received for each stream so that on reconnection it can start streaming from the correct place. Note: not all RDATA have valid tokens due to batching. See `RdataCommand` for more details. @@ -188,7 +188,8 @@ client (C): Two positions are included, the "new" position and the last position sent respectively. This allows servers to tell instances that the positions have advanced but no data has been written, without clients needlessly checking to see if they - have missed any updates. + have missed any updates. Instances will only fetch stuff if there is a gap between + their current position and the given last position. #### ERROR (S, C) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 56a5c2191..a7248d7b2 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -14,36 +14,7 @@ """This module contains the implementation of both the client and server protocols. -The basic structure of the protocol is line based, where the initial word of -each line specifies the command. The rest of the line is parsed based on the -command. For example, the `RDATA` command is defined as:: - - RDATA - -(Note that `` may contains spaces, but cannot contain newlines.) - -Blank lines are ignored. - -# Example - -An example iteraction is shown below. Each line is prefixed with '>' or '<' to -indicate which side is sending, these are *not* included on the wire:: - - * connection established * - > SERVER localhost:8823 - > PING 1490197665618 - < NAME synapse.app.appservice - < PING 1490197665618 - < REPLICATE - > POSITION events 1 - > POSITION backfill 1 - > POSITION caches 1 - > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] - > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823", - "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]] - < PING 1490197675618 - > ERROR server stopping - * connection closed by server * +An explanation of this protocol is available in docs/tcp_replication.md """ import fcntl import logging diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index a4bdb48c0..c6088a0f9 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -152,8 +152,8 @@ class Stream: Returns: A triplet `(updates, new_last_token, limited)`, where `updates` is a list of `(token, row)` entries, `new_last_token` is the new - position in stream, and `limited` is whether there are more updates - to fetch. + position in stream (ie the highest token returned in the updates), + and `limited` is whether there are more updates to fetch. """ current_token = self.current_token(self.local_instance_name) updates, current_token, limited = await self.get_updates_since(