mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-18 13:27:05 -05:00
Merge pull request #2178 from matrix-org/erikj/message_metrics
Add more granular event send metrics
This commit is contained in:
commit
e4c15fcb5c
@ -50,6 +50,7 @@ class EventContext(object):
|
|||||||
"prev_group",
|
"prev_group",
|
||||||
"delta_ids",
|
"delta_ids",
|
||||||
"prev_state_events",
|
"prev_state_events",
|
||||||
|
"app_service",
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -68,3 +69,5 @@ class EventContext(object):
|
|||||||
self.delta_ids = None
|
self.delta_ids = None
|
||||||
|
|
||||||
self.prev_state_events = None
|
self.prev_state_events = None
|
||||||
|
|
||||||
|
self.app_service = None
|
||||||
|
@ -175,7 +175,8 @@ class MessageHandler(BaseHandler):
|
|||||||
defer.returnValue(chunk)
|
defer.returnValue(chunk)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
|
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
||||||
|
prev_event_ids=None):
|
||||||
"""
|
"""
|
||||||
Given a dict from a client, create a new event.
|
Given a dict from a client, create a new event.
|
||||||
|
|
||||||
@ -185,6 +186,7 @@ class MessageHandler(BaseHandler):
|
|||||||
Adds display names to Join membership events.
|
Adds display names to Join membership events.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
requester
|
||||||
event_dict (dict): An entire event
|
event_dict (dict): An entire event
|
||||||
token_id (str)
|
token_id (str)
|
||||||
txn_id (str)
|
txn_id (str)
|
||||||
@ -226,6 +228,7 @@ class MessageHandler(BaseHandler):
|
|||||||
|
|
||||||
event, context = yield self._create_new_client_event(
|
event, context = yield self._create_new_client_event(
|
||||||
builder=builder,
|
builder=builder,
|
||||||
|
requester=requester,
|
||||||
prev_event_ids=prev_event_ids,
|
prev_event_ids=prev_event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -319,6 +322,7 @@ class MessageHandler(BaseHandler):
|
|||||||
See self.create_event and self.send_nonmember_event.
|
See self.create_event and self.send_nonmember_event.
|
||||||
"""
|
"""
|
||||||
event, context = yield self.create_event(
|
event, context = yield self.create_event(
|
||||||
|
requester,
|
||||||
event_dict,
|
event_dict,
|
||||||
token_id=requester.access_token_id,
|
token_id=requester.access_token_id,
|
||||||
txn_id=txn_id
|
txn_id=txn_id
|
||||||
@ -416,7 +420,7 @@ class MessageHandler(BaseHandler):
|
|||||||
|
|
||||||
@measure_func("_create_new_client_event")
|
@measure_func("_create_new_client_event")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _create_new_client_event(self, builder, prev_event_ids=None):
|
def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
|
||||||
if prev_event_ids:
|
if prev_event_ids:
|
||||||
prev_events = yield self.store.add_event_hashes(prev_event_ids)
|
prev_events = yield self.store.add_event_hashes(prev_event_ids)
|
||||||
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
|
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
|
||||||
@ -456,6 +460,8 @@ class MessageHandler(BaseHandler):
|
|||||||
state_handler = self.state_handler
|
state_handler = self.state_handler
|
||||||
|
|
||||||
context = yield state_handler.compute_event_context(builder)
|
context = yield state_handler.compute_event_context(builder)
|
||||||
|
if requester:
|
||||||
|
context.app_service = requester.app_service
|
||||||
|
|
||||||
if builder.is_state():
|
if builder.is_state():
|
||||||
builder.prev_state = yield self.store.add_event_hashes(
|
builder.prev_state = yield self.store.add_event_hashes(
|
||||||
|
@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler):
|
|||||||
content["kind"] = "guest"
|
content["kind"] = "guest"
|
||||||
|
|
||||||
event, context = yield msg_handler.create_event(
|
event, context = yield msg_handler.create_event(
|
||||||
|
requester,
|
||||||
{
|
{
|
||||||
"type": EventTypes.Member,
|
"type": EventTypes.Member,
|
||||||
"content": content,
|
"content": content,
|
||||||
|
@ -164,6 +164,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
|||||||
else:
|
else:
|
||||||
msg_handler = self.handlers.message_handler
|
msg_handler = self.handlers.message_handler
|
||||||
event, context = yield msg_handler.create_event(
|
event, context = yield msg_handler.create_event(
|
||||||
|
requester,
|
||||||
event_dict,
|
event_dict,
|
||||||
token_id=requester.access_token_id,
|
token_id=requester.access_token_id,
|
||||||
txn_id=txn_id,
|
txn_id=txn_id,
|
||||||
|
@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes
|
|||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.state import resolve_events
|
from synapse.state import resolve_events
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
|
from synapse.types import get_domain_from_id
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
from collections import deque, namedtuple, OrderedDict
|
from collections import deque, namedtuple, OrderedDict
|
||||||
@ -49,6 +50,9 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
persist_event_counter = metrics.register_counter("persisted_events")
|
persist_event_counter = metrics.register_counter("persisted_events")
|
||||||
|
event_counter = metrics.register_counter(
|
||||||
|
"persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def encode_json(json_object):
|
def encode_json(json_object):
|
||||||
@ -370,6 +374,18 @@ class EventsStore(SQLBaseStore):
|
|||||||
new_forward_extremeties=new_forward_extremeties,
|
new_forward_extremeties=new_forward_extremeties,
|
||||||
)
|
)
|
||||||
persist_event_counter.inc_by(len(chunk))
|
persist_event_counter.inc_by(len(chunk))
|
||||||
|
for event, context in chunk:
|
||||||
|
if context.app_service:
|
||||||
|
origin_type = "local"
|
||||||
|
origin_entity = context.app_service.id
|
||||||
|
elif self.hs.is_mine_id(event.sender):
|
||||||
|
origin_type = "local"
|
||||||
|
origin_entity = "*client*"
|
||||||
|
else:
|
||||||
|
origin_type = "remote"
|
||||||
|
origin_entity = get_domain_from_id(event.sender)
|
||||||
|
|
||||||
|
event_counter.inc(event.type, origin_type, origin_entity)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
|
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
|
||||||
|
@ -27,10 +27,10 @@ class EventInjector:
|
|||||||
self.event_builder_factory = hs.get_event_builder_factory()
|
self.event_builder_factory = hs.get_event_builder_factory()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_room(self, room):
|
def create_room(self, room, user):
|
||||||
builder = self.event_builder_factory.new({
|
builder = self.event_builder_factory.new({
|
||||||
"type": EventTypes.Create,
|
"type": EventTypes.Create,
|
||||||
"sender": "",
|
"sender": user.to_string(),
|
||||||
"room_id": room.to_string(),
|
"room_id": room.to_string(),
|
||||||
"content": {},
|
"content": {},
|
||||||
})
|
})
|
||||||
|
@ -50,7 +50,7 @@ class EventsStoreTestCase(unittest.TestCase):
|
|||||||
# Create something to report
|
# Create something to report
|
||||||
room = RoomID.from_string("!abc123:test")
|
room = RoomID.from_string("!abc123:test")
|
||||||
user = UserID.from_string("@raccoonlover:test")
|
user = UserID.from_string("@raccoonlover:test")
|
||||||
yield self.event_injector.create_room(room)
|
yield self.event_injector.create_room(room, user)
|
||||||
|
|
||||||
self.base_event = yield self._get_last_stream_token()
|
self.base_event = yield self._get_last_stream_token()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user