mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-02-26 16:11:06 -05:00
Add stub functions and work out execution flow to implement AS event stream polling.
This commit is contained in:
parent
255f989c7b
commit
2d20466f9a
@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
self._streams_per_user[auth_user] += 1
|
self._streams_per_user[auth_user] += 1
|
||||||
|
|
||||||
if pagin_config.from_token is None:
|
|
||||||
pagin_config.from_token = None
|
|
||||||
|
|
||||||
rm_handler = self.hs.get_handlers().room_member_handler
|
rm_handler = self.hs.get_handlers().room_member_handler
|
||||||
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
|
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
|
||||||
|
|
||||||
|
@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler):
|
|||||||
def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
|
def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
|
||||||
"""Returns a list of roomids that the user has any of the given
|
"""Returns a list of roomids that the user has any of the given
|
||||||
membership states in."""
|
membership states in."""
|
||||||
rooms = yield self.store.get_rooms_for_user_where_membership_is(
|
|
||||||
user_id=user.to_string(), membership_list=membership_list
|
app_service = yield self.store.get_app_service_by_user_id(
|
||||||
|
user.to_string()
|
||||||
)
|
)
|
||||||
|
if app_service:
|
||||||
|
rooms = yield self.store.get_app_service_rooms(app_service)
|
||||||
|
else:
|
||||||
|
rooms = yield self.store.get_rooms_for_user_where_membership_is(
|
||||||
|
user_id=user.to_string(), membership_list=membership_list
|
||||||
|
)
|
||||||
|
|
||||||
# For some reason the list of events contains duplicates
|
# For some reason the list of events contains duplicates
|
||||||
# TODO(paul): work out why because I really don't think it should
|
# TODO(paul): work out why because I really don't think it should
|
||||||
@ -559,13 +566,22 @@ class RoomEventSource(object):
|
|||||||
|
|
||||||
to_key = yield self.get_current_key()
|
to_key = yield self.get_current_key()
|
||||||
|
|
||||||
events, end_key = yield self.store.get_room_events_stream(
|
app_service = self.store.get_app_service_by_user_id(user.to_string())
|
||||||
user_id=user.to_string(),
|
if app_service:
|
||||||
from_key=from_key,
|
events, end_key = yield self.store.get_appservice_room_stream(
|
||||||
to_key=to_key,
|
service=app_service,
|
||||||
room_id=None,
|
from_key=from_key,
|
||||||
limit=limit,
|
to_key=to_key,
|
||||||
)
|
limit=limit,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
events, end_key = yield self.store.get_room_events_stream(
|
||||||
|
user_id=user.to_string(),
|
||||||
|
from_key=from_key,
|
||||||
|
to_key=to_key,
|
||||||
|
room_id=None,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
|
||||||
defer.returnValue((events, end_key))
|
defer.returnValue((events, end_key))
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ from twisted.internet import defer
|
|||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.appservice import ApplicationService
|
from synapse.appservice import ApplicationService
|
||||||
|
from synapse.storage.roommember import RoomsForUser
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
|
||||||
@ -150,6 +151,16 @@ class ApplicationServiceStore(SQLBaseStore):
|
|||||||
yield self.cache_defer # make sure the cache is ready
|
yield self.cache_defer # make sure the cache is ready
|
||||||
defer.returnValue(self.services_cache)
|
defer.returnValue(self.services_cache)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_app_service_by_user_id(self, user_id):
|
||||||
|
yield self.cache_defer # make sure the cache is ready
|
||||||
|
|
||||||
|
for service in self.services_cache:
|
||||||
|
if service.sender == user_id:
|
||||||
|
defer.returnValue(service)
|
||||||
|
return
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_app_service_by_token(self, token, from_cache=True):
|
def get_app_service_by_token(self, token, from_cache=True):
|
||||||
"""Get the application service with the given token.
|
"""Get the application service with the given token.
|
||||||
@ -173,6 +184,14 @@ class ApplicationServiceStore(SQLBaseStore):
|
|||||||
# TODO: The from_cache=False impl
|
# TODO: The from_cache=False impl
|
||||||
# TODO: This should be JOINed with the application_services_regex table.
|
# TODO: This should be JOINed with the application_services_regex table.
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_app_service_rooms(self, service):
|
||||||
|
logger.info("get_app_service_rooms -> %s", service)
|
||||||
|
|
||||||
|
# TODO stub
|
||||||
|
yield self.cache_defer
|
||||||
|
defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")])
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _populate_cache(self):
|
def _populate_cache(self):
|
||||||
"""Populates the ApplicationServiceCache from the database."""
|
"""Populates the ApplicationServiceCache from the database."""
|
||||||
|
@ -127,6 +127,27 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
|
|||||||
|
|
||||||
|
|
||||||
class StreamStore(SQLBaseStore):
|
class StreamStore(SQLBaseStore):
|
||||||
|
|
||||||
|
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
|
||||||
|
# NB this lives here instead of appservice.py so we can reuse the
|
||||||
|
# 'private' StreamToken class in this file.
|
||||||
|
logger.info("get_appservice_room_stream -> %s", service)
|
||||||
|
|
||||||
|
if limit:
|
||||||
|
limit = max(limit, MAX_STREAM_SIZE)
|
||||||
|
else:
|
||||||
|
limit = MAX_STREAM_SIZE
|
||||||
|
|
||||||
|
# From and to keys should be integers from ordering.
|
||||||
|
from_id = _StreamToken.parse_stream_token(from_key)
|
||||||
|
to_id = _StreamToken.parse_stream_token(to_key)
|
||||||
|
|
||||||
|
if from_key == to_key:
|
||||||
|
return defer.succeed(([], to_key))
|
||||||
|
|
||||||
|
# TODO stub
|
||||||
|
return defer.succeed(([], to_key))
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
|
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
|
||||||
limit=0, with_feedback=False):
|
limit=0, with_feedback=False):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user