Fix replication metrics when using redis (#7325)

This commit is contained in:
Erik Johnston 2020-04-22 16:26:19 +01:00 committed by GitHub
parent f16beaa969
commit 841c581c40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 37 deletions

1
changelog.d/7325.feature Normal file
View File

@ -0,0 +1 @@
Add support for running replication over Redis when using workers.

View File

@ -50,10 +50,7 @@ import abc
import fcntl import fcntl
import logging import logging
import struct import struct
from collections import defaultdict from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING, DefaultDict, List
from six import iteritems
from prometheus_client import Counter from prometheus_client import Counter
@ -86,6 +83,18 @@ connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"] "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
) )
tcp_inbound_commands_counter = Counter(
"synapse_replication_tcp_protocol_inbound_commands",
"Number of commands received from replication, by command and name of process connected to",
["command", "name"],
)
tcp_outbound_commands_counter = Counter(
"synapse_replication_tcp_protocol_outbound_commands",
"Number of commands sent to replication, by command and name of process connected to",
["command", "name"],
)
# A list of all connected protocols. This allows us to send metrics about the # A list of all connected protocols. This allows us to send metrics about the
# connections. # connections.
connected_connections = [] connected_connections = []
@ -151,9 +160,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings. # The LoopingCall for sending pings.
self._send_ping_loop = None self._send_ping_loop = None
self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
def connectionMade(self): def connectionMade(self):
logger.info("[%s] Connection established", self.id()) logger.info("[%s] Connection established", self.id())
@ -224,9 +230,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec() self.last_received_command = self.clock.time_msec()
self.inbound_commands_counter[cmd.NAME] = ( tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()
self.inbound_commands_counter[cmd.NAME] + 1
)
# Now lets try and call on_<CMD_NAME> function # Now lets try and call on_<CMD_NAME> function
run_as_background_process( run_as_background_process(
@ -292,9 +296,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd) self._queue_command(cmd)
return return
self.outbound_commands_counter[cmd.NAME] = ( tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc()
self.outbound_commands_counter[cmd.NAME] + 1
)
string = "%s %s" % (cmd.NAME, cmd.to_line()) string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string: if "\n" in string:
raise Exception("Unexpected newline in command: %r", string) raise Exception("Unexpected newline in command: %r", string)
@ -546,26 +549,3 @@ tcp_transport_kernel_read_buffer = LaterGauge(
for p in connected_connections for p in connected_connections
}, },
) )
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_inbound_commands",
"",
["command", "name"],
lambda: {
(k, p.name): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
},
)
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_outbound_commands",
"",
["command", "name"],
lambda: {
(k, p.name): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
},
)

View File

@ -25,7 +25,11 @@ from synapse.replication.tcp.commands import (
ReplicateCommand, ReplicateCommand,
parse_command_from_line, parse_command_from_line,
) )
from synapse.replication.tcp.protocol import AbstractConnection from synapse.replication.tcp.protocol import (
AbstractConnection,
tcp_inbound_commands_counter,
tcp_outbound_commands_counter,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.handler import ReplicationCommandHandler
@ -79,6 +83,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
) )
return return
# We use "redis" as the name here as we don't have 1:1 connections to
# remote instances.
tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc()
# Now lets try and call on_<CMD_NAME> function # Now lets try and call on_<CMD_NAME> function
run_as_background_process( run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
@ -126,6 +134,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
encoded_string = string.encode("utf-8") encoded_string = string.encode("utf-8")
# We use "redis" as the name here as we don't have 1:1 connections to
# remote instances.
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
async def _send(): async def _send():
with PreserveLoggingContext(): with PreserveLoggingContext():
# Note that we use the other connection as we can't send # Note that we use the other connection as we can't send