replacing portions

This commit is contained in:
Amber Brown 2018-05-21 19:47:37 -05:00
parent c60e0d5e02
commit df9f72d9e5
23 changed files with 270 additions and 416 deletions

View file

@ -60,19 +60,19 @@ from .commands import (
)
from .streams import STREAMS_MAP
from synapse.metrics import LaterGauge
from synapse.util.stringutils import random_string
from synapse.metrics.metric import CounterMetric
from prometheus_client import Counter
from collections import defaultdict
import logging
import synapse.metrics
import struct
import fcntl
metrics = synapse.metrics.get_metrics_for(__name__)
connection_close_counter = metrics.register_counter(
"close_reason", labels=["reason_type"],
connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"],
)
@ -136,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings.
self._send_ping_loop = None
self.inbound_commands_counter = CounterMetric(
"inbound_commands", labels=["command"],
)
self.outbound_commands_counter = CounterMetric(
"outbound_commands", labels=["command"],
)
self.inbound_commands_counter = defaultdict(int)
self.outbound_commands_counter = defaultdict(int)
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@ -201,7 +197,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec()
self.inbound_commands_counter.inc(cmd_name)
self.inbound_commands_counter[cmd_name] = self.inbound_commands_counter[cmd_name] + 1
cmd_cls = COMMAND_MAP[cmd_name]
try:
@ -251,8 +247,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd)
return
self.outbound_commands_counter.inc(cmd.NAME)
self.outbound_commands_counter[cmd.NAME] = self.outbound_commands_counter[cmd.NAME] + 1
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
@ -317,9 +312,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
if isinstance(reason, Failure):
connection_close_counter.inc(reason.type.__name__)
connection_close_counter.labels(reason.type.__name__).inc()
else:
connection_close_counter.inc(reason.__class__.__name__)
connection_close_counter.labels(reason.__class__.__name__).inc()
try:
# Remove us from list of connections to be monitored
@ -566,14 +561,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
metrics.register_callback(
"pending_commands",
pending_commands = LaterGauge(
"pending_commands", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
def transport_buffer_size(protocol):
@ -583,14 +576,12 @@ def transport_buffer_size(protocol):
return 0
metrics.register_callback(
"transport_send_buffer",
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
def transport_kernel_read_buffer_size(protocol, read=True):
@ -608,48 +599,37 @@ def transport_kernel_read_buffer_size(protocol, read=True):
return 0
metrics.register_callback(
"transport_kernel_send_buffer",
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
metrics.register_callback(
"transport_kernel_read_buffer",
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
metrics.register_callback(
"inbound_commands",
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in p.inbound_commands_counter.counts.iteritems()
},
labels=["command", "name", "conn_id"],
)
for k, count in p.inbound_commands_counter.items()
})
metrics.register_callback(
"outbound_commands",
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in p.outbound_commands_counter.counts.iteritems()
},
labels=["command", "name", "conn_id"],
)
for k, count in p.outbound_commands_counter.items()
})
# number of updates received for each RDATA stream
inbound_rdata_count = metrics.register_counter(
"inbound_rdata_count",
labels=["stream_name"],
)
inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "", ["stream_name"])