mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-12-16 03:23:51 -05:00
Merge branch 'develop' of github.com:matrix-org/synapse into rejections
Conflicts: synapse/storage/__init__.py synapse/storage/schema/delta/v12.sql
This commit is contained in:
commit
2ebf795c0a
51 changed files with 2374 additions and 308 deletions
|
|
@ -18,6 +18,7 @@ from twisted.internet import defer
|
|||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.types import UserID
|
||||
from synapse.events.utils import serialize_event
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
|
|
@ -48,24 +49,25 @@ class EventStreamHandler(BaseHandler):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def get_stream(self, auth_user_id, pagin_config, timeout=0,
|
||||
as_client_event=True):
|
||||
as_client_event=True, affect_presence=True):
|
||||
auth_user = UserID.from_string(auth_user_id)
|
||||
|
||||
try:
|
||||
if auth_user not in self._streams_per_user:
|
||||
self._streams_per_user[auth_user] = 0
|
||||
if auth_user in self._stop_timer_per_user:
|
||||
try:
|
||||
self.clock.cancel_call_later(
|
||||
self._stop_timer_per_user.pop(auth_user)
|
||||
if affect_presence:
|
||||
if auth_user not in self._streams_per_user:
|
||||
self._streams_per_user[auth_user] = 0
|
||||
if auth_user in self._stop_timer_per_user:
|
||||
try:
|
||||
self.clock.cancel_call_later(
|
||||
self._stop_timer_per_user.pop(auth_user)
|
||||
)
|
||||
except:
|
||||
logger.exception("Failed to cancel event timer")
|
||||
else:
|
||||
yield self.distributor.fire(
|
||||
"started_user_eventstream", auth_user
|
||||
)
|
||||
except:
|
||||
logger.exception("Failed to cancel event timer")
|
||||
else:
|
||||
yield self.distributor.fire(
|
||||
"started_user_eventstream", auth_user
|
||||
)
|
||||
self._streams_per_user[auth_user] += 1
|
||||
self._streams_per_user[auth_user] += 1
|
||||
|
||||
if pagin_config.from_token is None:
|
||||
pagin_config.from_token = None
|
||||
|
|
@ -78,8 +80,10 @@ class EventStreamHandler(BaseHandler):
|
|||
auth_user, room_ids, pagin_config, timeout
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
chunks = [
|
||||
self.hs.serialize_event(e, as_client_event) for e in events
|
||||
serialize_event(e, time_now, as_client_event) for e in events
|
||||
]
|
||||
|
||||
chunk = {
|
||||
|
|
@ -91,28 +95,29 @@ class EventStreamHandler(BaseHandler):
|
|||
defer.returnValue(chunk)
|
||||
|
||||
finally:
|
||||
self._streams_per_user[auth_user] -= 1
|
||||
if not self._streams_per_user[auth_user]:
|
||||
del self._streams_per_user[auth_user]
|
||||
if affect_presence:
|
||||
self._streams_per_user[auth_user] -= 1
|
||||
if not self._streams_per_user[auth_user]:
|
||||
del self._streams_per_user[auth_user]
|
||||
|
||||
# 10 seconds of grace to allow the client to reconnect again
|
||||
# before we think they're gone
|
||||
def _later():
|
||||
logger.debug(
|
||||
"_later stopped_user_eventstream %s", auth_user
|
||||
# 10 seconds of grace to allow the client to reconnect again
|
||||
# before we think they're gone
|
||||
def _later():
|
||||
logger.debug(
|
||||
"_later stopped_user_eventstream %s", auth_user
|
||||
)
|
||||
|
||||
self._stop_timer_per_user.pop(auth_user, None)
|
||||
|
||||
return self.distributor.fire(
|
||||
"stopped_user_eventstream", auth_user
|
||||
)
|
||||
|
||||
logger.debug("Scheduling _later: for %s", auth_user)
|
||||
self._stop_timer_per_user[auth_user] = (
|
||||
self.clock.call_later(30, _later)
|
||||
)
|
||||
|
||||
self._stop_timer_per_user.pop(auth_user, None)
|
||||
|
||||
yield self.distributor.fire(
|
||||
"stopped_user_eventstream", auth_user
|
||||
)
|
||||
|
||||
logger.debug("Scheduling _later: for %s", auth_user)
|
||||
self._stop_timer_per_user[auth_user] = (
|
||||
self.clock.call_later(30, _later)
|
||||
)
|
||||
|
||||
|
||||
class EventHandler(BaseHandler):
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ from twisted.internet import defer
|
|||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import RoomError
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.types import UserID
|
||||
|
|
@ -100,9 +101,11 @@ class MessageHandler(BaseHandler):
|
|||
"room_key", next_key
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
chunk = {
|
||||
"chunk": [
|
||||
self.hs.serialize_event(e, as_client_event) for e in events
|
||||
serialize_event(e, time_now, as_client_event) for e in events
|
||||
],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
|
|
@ -111,7 +114,8 @@ class MessageHandler(BaseHandler):
|
|||
defer.returnValue(chunk)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def create_and_send_event(self, event_dict, ratelimit=True):
|
||||
def create_and_send_event(self, event_dict, ratelimit=True,
|
||||
client=None, txn_id=None):
|
||||
""" Given a dict from a client, create and handle a new event.
|
||||
|
||||
Creates an FrozenEvent object, filling out auth_events, prev_events,
|
||||
|
|
@ -145,6 +149,15 @@ class MessageHandler(BaseHandler):
|
|||
builder.content
|
||||
)
|
||||
|
||||
if client is not None:
|
||||
if client.token_id is not None:
|
||||
builder.internal_metadata.token_id = client.token_id
|
||||
if client.device_id is not None:
|
||||
builder.internal_metadata.device_id = client.device_id
|
||||
|
||||
if txn_id is not None:
|
||||
builder.internal_metadata.txn_id = txn_id
|
||||
|
||||
event, context = yield self._create_new_client_event(
|
||||
builder=builder,
|
||||
)
|
||||
|
|
@ -211,7 +224,8 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
# TODO: This is duplicating logic from snapshot_all_rooms
|
||||
current_state = yield self.state_handler.get_current_state(room_id)
|
||||
defer.returnValue([self.hs.serialize_event(c) for c in current_state])
|
||||
now = self.clock.time_msec()
|
||||
defer.returnValue([serialize_event(c, now) for c in current_state])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
|
||||
|
|
@ -283,10 +297,11 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
start_token = now_token.copy_and_replace("room_key", token[0])
|
||||
end_token = now_token.copy_and_replace("room_key", token[1])
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
d["messages"] = {
|
||||
"chunk": [
|
||||
self.hs.serialize_event(m, as_client_event)
|
||||
serialize_event(m, time_now, as_client_event)
|
||||
for m in messages
|
||||
],
|
||||
"start": start_token.to_string(),
|
||||
|
|
@ -297,7 +312,8 @@ class MessageHandler(BaseHandler):
|
|||
event.room_id
|
||||
)
|
||||
d["state"] = [
|
||||
self.hs.serialize_event(c) for c in current_state
|
||||
serialize_event(c, time_now, as_client_event)
|
||||
for c in current_state
|
||||
]
|
||||
except:
|
||||
logger.exception("Failed to get snapshot")
|
||||
|
|
@ -320,8 +336,9 @@ class MessageHandler(BaseHandler):
|
|||
auth_user = UserID.from_string(user_id)
|
||||
|
||||
# TODO: These concurrently
|
||||
time_now = self.clock.time_msec()
|
||||
state_tuples = yield self.state_handler.get_current_state(room_id)
|
||||
state = [self.hs.serialize_event(x) for x in state_tuples]
|
||||
state = [serialize_event(x, time_now) for x in state_tuples]
|
||||
|
||||
member_event = (yield self.store.get_room_member(
|
||||
user_id=user_id,
|
||||
|
|
@ -360,11 +377,13 @@ class MessageHandler(BaseHandler):
|
|||
"Failed to get member presence of %r", m.user_id
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
defer.returnValue({
|
||||
"membership": member_event.membership,
|
||||
"room_id": room_id,
|
||||
"messages": {
|
||||
"chunk": [self.hs.serialize_event(m) for m in messages],
|
||||
"chunk": [serialize_event(m, time_now) for m in messages],
|
||||
"start": start_token.to_string(),
|
||||
"end": end_token.to_string(),
|
||||
},
|
||||
|
|
|
|||
|
|
@ -87,6 +87,10 @@ class PresenceHandler(BaseHandler):
|
|||
"changed_presencelike_data", self.changed_presencelike_data
|
||||
)
|
||||
|
||||
# outbound signal from the presence module to advertise when a user's
|
||||
# presence has changed
|
||||
distributor.declare("user_presence_changed")
|
||||
|
||||
self.distributor = distributor
|
||||
|
||||
self.federation = hs.get_replication_layer()
|
||||
|
|
@ -604,6 +608,7 @@ class PresenceHandler(BaseHandler):
|
|||
room_ids=room_ids,
|
||||
statuscache=statuscache,
|
||||
)
|
||||
yield self.distributor.fire("user_presence_changed", user, statuscache)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _push_presence_remote(self, user, destination, state=None):
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ class RegistrationHandler(BaseHandler):
|
|||
# each request
|
||||
httpCli = SimpleHttpClient(self.hs)
|
||||
# XXX: make this configurable!
|
||||
trustedIdServers = ['matrix.org:8090']
|
||||
trustedIdServers = ['matrix.org:8090', 'matrix.org']
|
||||
if not creds['idServer'] in trustedIdServers:
|
||||
logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
|
||||
'credentials', creds['idServer'])
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@
|
|||
"""Contains functions for performing events on rooms."""
|
||||
from twisted.internet import defer
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
from synapse.types import UserID, RoomAlias, RoomID
|
||||
from synapse.api.constants import EventTypes, Membership, JoinRules
|
||||
from synapse.api.errors import StoreError, SynapseError
|
||||
from synapse.util import stringutils
|
||||
from synapse.util.async import run_on_reactor
|
||||
from ._base import BaseHandler
|
||||
from synapse.events.utils import serialize_event
|
||||
|
||||
import logging
|
||||
|
||||
|
|
@ -293,8 +295,9 @@ class RoomMemberHandler(BaseHandler):
|
|||
yield self.auth.check_joined_room(room_id, user_id)
|
||||
|
||||
member_list = yield self.store.get_room_members(room_id=room_id)
|
||||
time_now = self.clock.time_msec()
|
||||
event_list = [
|
||||
self.hs.serialize_event(entry)
|
||||
serialize_event(entry, time_now)
|
||||
for entry in member_list
|
||||
]
|
||||
chunk_data = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue