Merge pull request #8420 from matrix-org/rav/state_res_stats

Report metrics on expensive rooms for state res
This commit is contained in:
Richard van der Hoff 2020-09-30 10:37:52 +01:00 committed by GitHub
commit c429dfc300
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 200 additions and 80 deletions

1
changelog.d/8420.feature Normal file
View File

@ -0,0 +1 @@
Add experimental reporting of metrics on expensive rooms for state-resolution.

View File

@ -21,7 +21,7 @@ import itertools
import logging import logging
from collections.abc import Container from collections.abc import Container
from http import HTTPStatus from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import attr import attr
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
@ -69,7 +69,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet, ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet, ReplicationStoreRoomOnInviteRestServlet,
) )
from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import ( from synapse.types import (
JsonDict, JsonDict,
@ -85,6 +85,9 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server from synapse.visibility import filter_events_for_server
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -116,7 +119,7 @@ class FederationHandler(BaseHandler):
rooms. rooms.
""" """
def __init__(self, hs): def __init__(self, hs: "HomeServer"):
super().__init__(hs) super().__init__(hs)
self.hs = hs self.hs = hs
@ -126,6 +129,7 @@ class FederationHandler(BaseHandler):
self.state_store = self.storage.state self.state_store = self.storage.state
self.federation_client = hs.get_federation_client() self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler() self.state_handler = hs.get_state_handler()
self._state_resolution_handler = hs.get_state_resolution_handler()
self.server_name = hs.hostname self.server_name = hs.hostname
self.keyring = hs.get_keyring() self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator() self.action_generator = hs.get_action_generator()
@ -381,8 +385,7 @@ class FederationHandler(BaseHandler):
event_map[x.event_id] = x event_map[x.event_id] = x
room_version = await self.store.get_room_version_id(room_id) room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store( state_map = await self._state_resolution_handler.resolve_events_with_store(
self.clock,
room_id, room_id,
room_version, room_version,
state_maps, state_maps,

View File

@ -13,42 +13,46 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import heapq
import logging import logging
from collections import namedtuple from collections import defaultdict, namedtuple
from typing import ( from typing import (
Any,
Awaitable, Awaitable,
Callable,
DefaultDict,
Dict, Dict,
Iterable, Iterable,
List, List,
Optional, Optional,
Sequence, Sequence,
Set, Set,
Tuple,
Union, Union,
overload, overload,
) )
import attr import attr
from frozendict import frozendict from frozendict import frozendict
from prometheus_client import Histogram from prometheus_client import Counter, Histogram
from typing_extensions import Literal from typing_extensions import Literal
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.state import v1, v2 from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo from synapse.storage.roommember import ProfileInfo
from synapse.types import Collection, StateMap from synapse.types import Collection, StateMap
from synapse.util import Clock
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics_logger = logging.getLogger("synapse.state.metrics")
# Metrics for number of state groups involved in a resolution. # Metrics for number of state groups involved in a resolution.
state_groups_histogram = Histogram( state_groups_histogram = Histogram(
@ -448,19 +452,44 @@ class StateHandler:
state_map = {ev.event_id: ev for st in state_sets for ev in st} state_map = {ev.event_id: ev for st in state_sets for ev in st}
with Measure(self.clock, "state._resolve_events"): new_state = await self._state_resolution_handler.resolve_events_with_store(
new_state = await resolve_events_with_store( event.room_id,
self.clock, room_version,
event.room_id, state_set_ids,
room_version, event_map=state_map,
state_set_ids, state_res_store=StateResolutionStore(self.store),
event_map=state_map, )
state_res_store=StateResolutionStore(self.store),
)
return {key: state_map[ev_id] for key, ev_id in new_state.items()} return {key: state_map[ev_id] for key, ev_id in new_state.items()}
@attr.s(slots=True)
class _StateResMetrics:
"""Keeps track of some usage metrics about state res."""
# System and User CPU time, in seconds
cpu_time = attr.ib(type=float, default=0.0)
# time spent on database transactions (excluding scheduling time). This roughly
# corresponds to the amount of work done on the db server, excluding event fetches.
db_time = attr.ib(type=float, default=0.0)
# number of events fetched from the db.
db_events = attr.ib(type=int, default=0)
_biggest_room_by_cpu_counter = Counter(
"synapse_state_res_cpu_for_biggest_room_seconds",
"CPU time spent performing state resolution for the single most expensive "
"room for state resolution",
)
_biggest_room_by_db_counter = Counter(
"synapse_state_res_db_for_biggest_room_seconds",
"Database time spent performing state resolution for the single most "
"expensive room for state resolution",
)
class StateResolutionHandler: class StateResolutionHandler:
"""Responsible for doing state conflict resolution. """Responsible for doing state conflict resolution.
@ -483,6 +512,17 @@ class StateResolutionHandler:
reset_expiry_on_get=True, reset_expiry_on_get=True,
) )
#
# stuff for tracking time spent on state-res by room
#
# tracks the amount of work done on state res per room
self._state_res_metrics = defaultdict(
_StateResMetrics
) # type: DefaultDict[str, _StateResMetrics]
self.clock.looping_call(self._report_metrics, 120 * 1000)
@log_function @log_function
async def resolve_state_groups( async def resolve_state_groups(
self, self,
@ -530,15 +570,13 @@ class StateResolutionHandler:
state_groups_histogram.observe(len(state_groups_ids)) state_groups_histogram.observe(len(state_groups_ids))
with Measure(self.clock, "state._resolve_events"): new_state = await self.resolve_events_with_store(
new_state = await resolve_events_with_store( room_id,
self.clock, room_version,
room_id, list(state_groups_ids.values()),
room_version, event_map=event_map,
list(state_groups_ids.values()), state_res_store=state_res_store,
event_map=event_map, )
state_res_store=state_res_store,
)
# if the new state matches any of the input state groups, we can # if the new state matches any of the input state groups, we can
# use that state group again. Otherwise we will generate a state_id # use that state group again. Otherwise we will generate a state_id
@ -552,6 +590,114 @@ class StateResolutionHandler:
return cache return cache
async def resolve_events_with_store(
self,
room_id: str,
room_version: str,
state_sets: Sequence[StateMap[str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
) -> StateMap[str]:
"""
Args:
room_id: the room we are working in
room_version: Version of the room
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
If None, all events will be fetched via state_res_store.
state_res_store: a place to fetch events from
Returns:
a map from (type, state_key) to event_id.
"""
try:
with Measure(self.clock, "state._resolve_events") as m:
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return await v1.resolve_events_with_store(
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return await v2.resolve_events_with_store(
self.clock,
room_id,
room_version,
state_sets,
event_map,
state_res_store,
)
finally:
self._record_state_res_metrics(room_id, m.get_resource_usage())
def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage):
room_metrics = self._state_res_metrics[room_id]
room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime
room_metrics.db_time += rusage.db_txn_duration_sec
room_metrics.db_events += rusage.evt_db_fetch_count
def _report_metrics(self):
if not self._state_res_metrics:
# no state res has happened since the last iteration: don't bother logging.
return
self._report_biggest(
lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter,
)
self._report_biggest(
lambda i: i.db_time, "DB time", _biggest_room_by_db_counter,
)
self._state_res_metrics.clear()
def _report_biggest(
self,
extract_key: Callable[[_StateResMetrics], Any],
metric_name: str,
prometheus_counter_metric: Counter,
) -> None:
"""Report metrics on the biggest rooms for state res
Args:
extract_key: a callable which, given a _StateResMetrics, extracts a single
metric to sort by.
metric_name: the name of the metric we have extracted, for the log line
prometheus_counter_metric: a prometheus metric recording the sum of the
the extracted metric
"""
n_to_log = 10
if not metrics_logger.isEnabledFor(logging.DEBUG):
# only need the most expensive if we don't have debug logging, which
# allows nlargest() to degrade to max()
n_to_log = 1
items = self._state_res_metrics.items()
# log the N biggest rooms
biggest = heapq.nlargest(
n_to_log, items, key=lambda i: extract_key(i[1])
) # type: List[Tuple[str, _StateResMetrics]]
metrics_logger.debug(
"%i biggest rooms for state-res by %s: %s",
len(biggest),
metric_name,
["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest],
)
# report info on the single biggest to prometheus
_, biggest_metrics = biggest[0]
prometheus_counter_metric.inc(extract_key(biggest_metrics))
def _make_state_cache_entry( def _make_state_cache_entry(
new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]] new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]]
@ -605,47 +751,6 @@ def _make_state_cache_entry(
) )
def resolve_events_with_store(
clock: Clock,
room_id: str,
room_version: str,
state_sets: Sequence[StateMap[str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
) -> Awaitable[StateMap[str]]:
"""
Args:
room_id: the room we are working in
room_version: Version of the room
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
If None, all events will be fetched via state_res_store.
state_res_store: a place to fetch events from
Returns:
a map from (type, state_key) to event_id.
"""
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return v1.resolve_events_with_store(
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return v2.resolve_events_with_store(
clock, room_id, room_version, state_sets, event_map, state_res_store
)
@attr.s(slots=True) @attr.s(slots=True)
class StateResolutionStore: class StateResolutionStore:
"""Interface that allows state resolution algorithms to access the database """Interface that allows state resolution algorithms to access the database

View File

@ -19,7 +19,11 @@ from typing import Any, Callable, Optional, TypeVar, cast
from prometheus_client import Counter from prometheus_client import Counter
from synapse.logging.context import LoggingContext, current_context from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
current_context,
)
from synapse.metrics import InFlightGauge from synapse.metrics import InFlightGauge
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -104,27 +108,27 @@ class Measure:
def __init__(self, clock, name): def __init__(self, clock, name):
self.clock = clock self.clock = clock
self.name = name self.name = name
self._logging_context = None
self.start = None
def __enter__(self):
if self._logging_context:
raise RuntimeError("Measure() objects cannot be re-used")
self.start = self.clock.time()
parent_context = current_context() parent_context = current_context()
self._logging_context = LoggingContext( self._logging_context = LoggingContext(
"Measure[%s]" % (self.name,), parent_context "Measure[%s]" % (self.name,), parent_context
) )
self.start = None
def __enter__(self) -> "Measure":
if self.start is not None:
raise RuntimeError("Measure() objects cannot be re-used")
self.start = self.clock.time()
self._logging_context.__enter__() self._logging_context.__enter__()
in_flight.register((self.name,), self._update_in_flight) in_flight.register((self.name,), self._update_in_flight)
return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if not self._logging_context: if self.start is None:
raise RuntimeError("Measure() block exited without being entered") raise RuntimeError("Measure() block exited without being entered")
duration = self.clock.time() - self.start duration = self.clock.time() - self.start
usage = self._logging_context.get_resource_usage() usage = self.get_resource_usage()
in_flight.unregister((self.name,), self._update_in_flight) in_flight.unregister((self.name,), self._update_in_flight)
self._logging_context.__exit__(exc_type, exc_val, exc_tb) self._logging_context.__exit__(exc_type, exc_val, exc_tb)
@ -140,6 +144,13 @@ class Measure:
except ValueError: except ValueError:
logger.warning("Failed to save metrics! Usage: %s", usage) logger.warning("Failed to save metrics! Usage: %s", usage)
def get_resource_usage(self) -> ContextResourceUsage:
"""Get the resources used within this Measure block
If the Measure block is still active, returns the resource usage so far.
"""
return self._logging_context.get_resource_usage()
def _update_in_flight(self, metrics): def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics """Gets called when processing in flight metrics
""" """