Merge pull request #6645 from matrix-org/rav/fix_synchrotron_error

Fix exceptions in the synchrotron worker log when events are rejected.
This commit is contained in:
Richard van der Hoff 2020-01-07 14:02:14 +00:00 committed by GitHub
commit 1ff5491117
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 15 deletions

1
changelog.d/6645.bugfix Normal file
View File

@ -0,0 +1 @@
Fix exceptions in the synchrotron worker log when events are rejected.

View File

@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def get_currently_syncing_users(self):
return self.presence_handler.get_currently_syncing_users()
@defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows):
async def process_and_notify(self, stream_name, token, rows):
try:
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
event = yield self.store.get_event(row.data.event_id)
assert isinstance(row, EventsStreamRow)
event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id)
room_ids = await self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]

View File

@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def get_event(
self,
event_id: List[str],
event_id: str,
redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT,
get_prev_content: bool = False,
allow_rejected: bool = False,
@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_id: The event_id of the event to fetch
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events
* DISALLOW - Do not return redacted events (behave as per allow_none
if the event is redacted)
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected: If True return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
behave as per allow_none.
allow_none: If True, return None if no event found, if
False throw a NotFoundError
check_room_id: if not None, check the room of the found event.
If there is a mismatch, behave as per allow_none.
@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_ids: The event_ids of the events to fetch
redact_behaviour: Determine what to do with a redacted event. Possible
values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events
* DISALLOW - Do not return redacted events (omit them from the response)
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected: If True return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
omits rejeted events from the response.
Returns:
Deferred : Dict from event_id to event.
@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore):
"""Get events from the database and return in a list in the same order
as given by `event_ids` arg.
Unknown events will be omitted from the response.
Args:
event_ids: The event_ids of the events to fetch
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events
* DISALLOW - Do not return redacted events (omit them from the response)
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected: If True, return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
omits rejected events from the response.
Returns:
Deferred[list[EventBase]]: List of events fetched from the database. The
@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore):
If events are pulled from the database, they will be cached for future lookups.
Unknown events are omitted from the response.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore):
Returned events will be added to the cache for future lookups.
Unknown events are omitted from the response.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]: