Merge branch 'develop' of github.com:matrix-org/synapse into erikj/prefill_state

This commit is contained in:
Erik Johnston 2017-05-03 09:46:40 +01:00
commit 8346e6e696
8 changed files with 35 additions and 6 deletions

View File

@ -50,6 +50,7 @@ class EventContext(object):
"prev_group", "prev_group",
"delta_ids", "delta_ids",
"prev_state_events", "prev_state_events",
"app_service",
] ]
def __init__(self): def __init__(self):
@ -68,3 +69,5 @@ class EventContext(object):
self.delta_ids = None self.delta_ids = None
self.prev_state_events = None self.prev_state_events = None
self.app_service = None

View File

@ -175,7 +175,8 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk) defer.returnValue(chunk)
@defer.inlineCallbacks @defer.inlineCallbacks
def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_event_ids=None):
""" """
Given a dict from a client, create a new event. Given a dict from a client, create a new event.
@ -185,6 +186,7 @@ class MessageHandler(BaseHandler):
Adds display names to Join membership events. Adds display names to Join membership events.
Args: Args:
requester
event_dict (dict): An entire event event_dict (dict): An entire event
token_id (str) token_id (str)
txn_id (str) txn_id (str)
@ -226,6 +228,7 @@ class MessageHandler(BaseHandler):
event, context = yield self._create_new_client_event( event, context = yield self._create_new_client_event(
builder=builder, builder=builder,
requester=requester,
prev_event_ids=prev_event_ids, prev_event_ids=prev_event_ids,
) )
@ -319,6 +322,7 @@ class MessageHandler(BaseHandler):
See self.create_event and self.send_nonmember_event. See self.create_event and self.send_nonmember_event.
""" """
event, context = yield self.create_event( event, context = yield self.create_event(
requester,
event_dict, event_dict,
token_id=requester.access_token_id, token_id=requester.access_token_id,
txn_id=txn_id txn_id=txn_id
@ -416,7 +420,7 @@ class MessageHandler(BaseHandler):
@measure_func("_create_new_client_event") @measure_func("_create_new_client_event")
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_new_client_event(self, builder, prev_event_ids=None): def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
if prev_event_ids: if prev_event_ids:
prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_events = yield self.store.add_event_hashes(prev_event_ids)
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
@ -456,6 +460,8 @@ class MessageHandler(BaseHandler):
state_handler = self.state_handler state_handler = self.state_handler
context = yield state_handler.compute_event_context(builder) context = yield state_handler.compute_event_context(builder)
if requester:
context.app_service = requester.app_service
if builder.is_state(): if builder.is_state():
builder.prev_state = yield self.store.add_event_hashes( builder.prev_state = yield self.store.add_event_hashes(

View File

@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler):
content["kind"] = "guest" content["kind"] = "guest"
event, context = yield msg_handler.create_event( event, context = yield msg_handler.create_event(
requester,
{ {
"type": EventTypes.Member, "type": EventTypes.Member,
"content": content, "content": content,

View File

@ -164,6 +164,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
else: else:
msg_handler = self.handlers.message_handler msg_handler = self.handlers.message_handler
event, context = yield msg_handler.create_event( event, context = yield msg_handler.create_event(
requester,
event_dict, event_dict,
token_id=requester.access_token_id, token_id=requester.access_token_id,
txn_id=txn_id, txn_id=txn_id,

View File

@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.state import resolve_events from synapse.state import resolve_events
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from synapse.types import get_domain_from_id
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from collections import deque, namedtuple, OrderedDict from collections import deque, namedtuple, OrderedDict
@ -49,6 +50,9 @@ logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__) metrics = synapse.metrics.get_metrics_for(__name__)
persist_event_counter = metrics.register_counter("persisted_events") persist_event_counter = metrics.register_counter("persisted_events")
event_counter = metrics.register_counter(
"persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
)
def encode_json(json_object): def encode_json(json_object):
@ -370,11 +374,25 @@ class EventsStore(SQLBaseStore):
new_forward_extremeties=new_forward_extremeties, new_forward_extremeties=new_forward_extremeties,
) )
persist_event_counter.inc_by(len(chunk)) persist_event_counter.inc_by(len(chunk))
for room_id, (_, _, new_state) in current_state_for_room.iteritems(): for room_id, (_, _, new_state) in current_state_for_room.iteritems():
self.get_current_state_ids.prefill( self.get_current_state_ids.prefill(
(room_id, ), new_state (room_id, ), new_state
) )
for event, context in chunk:
if context.app_service:
origin_type = "local"
origin_entity = context.app_service.id
elif self.hs.is_mine_id(event.sender):
origin_type = "local"
origin_entity = "*client*"
else:
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)
event_counter.inc(event.type, origin_type, origin_entity)
@defer.inlineCallbacks @defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to """Calculates the new forward extremeties for a room given events to

View File

@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
class DeferredTimedOutError(SynapseError): class DeferredTimedOutError(SynapseError):
def __init__(self): def __init__(self):
super(SynapseError, self).__init__(504, "Timed out") super(DeferredTimedOutError, self).__init__(504, "Timed out")
def unwrapFirstError(failure): def unwrapFirstError(failure):

View File

@ -27,10 +27,10 @@ class EventInjector:
self.event_builder_factory = hs.get_event_builder_factory() self.event_builder_factory = hs.get_event_builder_factory()
@defer.inlineCallbacks @defer.inlineCallbacks
def create_room(self, room): def create_room(self, room, user):
builder = self.event_builder_factory.new({ builder = self.event_builder_factory.new({
"type": EventTypes.Create, "type": EventTypes.Create,
"sender": "", "sender": user.to_string(),
"room_id": room.to_string(), "room_id": room.to_string(),
"content": {}, "content": {},
}) })

View File

@ -50,7 +50,7 @@ class EventsStoreTestCase(unittest.TestCase):
# Create something to report # Create something to report
room = RoomID.from_string("!abc123:test") room = RoomID.from_string("!abc123:test")
user = UserID.from_string("@raccoonlover:test") user = UserID.from_string("@raccoonlover:test")
yield self.event_injector.create_room(room) yield self.event_injector.create_room(room, user)
self.base_event = yield self._get_last_stream_token() self.base_event = yield self._get_last_stream_token()