From 892e70ec8404865b4b1e6cf4eef7a3ada7114149 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Oct 2015 16:06:57 +0000 Subject: [PATCH 01/14] Add APIs for adding and removing tags from rooms --- synapse/rest/client/v2_alpha/__init__.py | 2 + synapse/rest/client/v2_alpha/tags.py | 89 +++++++++++ synapse/storage/__init__.py | 2 + synapse/storage/schema/delta/25/tags.sql | 37 +++++ synapse/storage/tags.py | 190 +++++++++++++++++++++++ 5 files changed, 320 insertions(+) create mode 100644 synapse/rest/client/v2_alpha/tags.py create mode 100644 synapse/storage/schema/delta/25/tags.sql create mode 100644 synapse/storage/tags.py diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index 5831ff0e6..a10813234 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -22,6 +22,7 @@ from . import ( receipts, keys, tokenrefresh, + tags, ) from synapse.http.server import JsonResource @@ -44,3 +45,4 @@ class ClientV2AlphaRestResource(JsonResource): receipts.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) + tags.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py new file mode 100644 index 000000000..c4a670c5a --- /dev/null +++ b/synapse/rest/client/v2_alpha/tags.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import client_v2_pattern + +from synapse.http.servlet import RestServlet + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class TagListServlet(RestServlet): + """ + GET /user/{user_id}/rooms/{room_id}/tags HTTP/1.1 + """ + PATTERN = client_v2_pattern( + "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags" + ) + + def __init__(self, hs): + super(TagListServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, user_id, room_id): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot get tags for other users.") + + tags = yield self.store.get_tags_for_room(user_id, room_id) + + defer.returnValue((200, {"tags": tags})) + + +class TagServlet(RestServlet): + """ + PUT /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 + DELETE /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 + """ + PATTERN = client_v2_pattern( + "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags/(?P[^/]*)" + ) + def __init__(self, hs): + super(TagServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, room_id, tag): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add tags for other users.") + + yield self.store.add_tag_to_room(user_id, room_id, tag) + + # TODO: poke the notifier. + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_DELETE(self, request, user_id, room_id, tag): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add tags for other users.") + + yield self.store.remove_tag_from_room(user_id, room_id, tag) + + # TODO: poke the notifier. + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + TagListServlet(hs).register(http_server) + TagServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a1bd9c4ce..e7443f283 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -41,6 +41,7 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore from .search import SearchStore +from .tags import TagsStore import logging @@ -71,6 +72,7 @@ class DataStore(RoomMemberStore, RoomStore, ReceiptsStore, EndToEndKeyStore, SearchStore, + TagsStore, ): def __init__(self, hs): diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql new file mode 100644 index 000000000..168766dcf --- /dev/null +++ b/synapse/storage/schema/delta/25/tags.sql @@ -0,0 +1,37 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS room_tags( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + tag TEXT NOT NULL, -- The name of the tag. + CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) +); + +CREATE TABLE IF NOT EXISTS room_tags_revisions ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + stream_id BIGINT NOT NULL, -- The current version of the room tags. + CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id) +); + +CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py new file mode 100644 index 000000000..507e60596 --- /dev/null +++ b/synapse/storage/tags.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached +from twisted.internet import defer +from .util.id_generators import StreamIdGenerator + +import logging + +logger = logging.getLogger(__name__) + + +class TagsStore(SQLBaseStore): + def __init__(self, hs): + super(TagsStore, self).__init__(hs) + + self._private_user_data_id_gen = StreamIdGenerator( + "private_user_data_max_stream_id", "stream_id" + ) + + @cached() + def get_tags_for_user(self, user_id): + """Get all the tags for a user. + + + Args: + user_id(str): The user to get the tags for. + Returns: + A deferred dict mapping from room_id strings to lists of tag + strings. + """ + + deferred = self._simple_select_list( + "room_tags", {"user_id": user_id}, ["room_id", "tag"] + ) + + @deferred.addCallback + def tags_by_room(rows): + tags_by_room = {} + for row in rows: + tags_by_room.setdefault(row["room_id"], []).append(row["tag"]) + return tags_by_room + + return deferred + + @defer.inlineCallbacks + def get_updated_tags(self, user_id, stream_id): + """Get all the tags for the rooms where the tags have changed since the + given version + + Args: + user_id(str): The user to get the tags for. + stream_id(int): The earliest update to get for the user. + Returns: + A deferred dict mapping from room_id strings to lists of tag + strings for all the rooms that changed since the stream_id token. + """ + def get_updated_tags_txn(txn): + sql = ( + "SELECT room_id from room_tags_revisions" + " WHERE user_id = ? AND stream_id > ?" + ) + txn.execute(sql, (user_id, stream_id)) + room_ids = [row[0] for row in txn.fetchall()] + return room_ids + + room_ids = yield self.runInteraction( + "get_updated_tags", get_updated_tags_txn + ) + + results = {} + if room_ids: + tags_by_room = yield self.get_tags_for_user(self, user_id) + for room_id in rooms_ids: + results[room_id] = tags_by_room[room_id] + + defer.returnValue(results) + + def get_tags_for_room(self, user_id, room_id): + """Get all the tags for the given room + Args: + user_id(str): The user to get tags for + room_id(str): The room to get tags for + Returns: + A deferred list of string tags. + """ + return self._simple_select_onecol( + table="room_tags", + keyvalues={"user_id": user_id, "room_id": room_id}, + retcol="tag", + desc="get_tags_for_room", + ) + + @defer.inlineCallbacks + def add_tag_to_room(self, user_id, room_id, tag): + """Add a tag to a room for a user. + Returns: + A deferred that completes once the tag has been added. + """ + def add_tag_txn(txn, next_id): + sql = ( + "INSERT INTO room_tags (user_id, room_id, tag)" + " VALUES (?, ?, ?)" + ) + try: + txn.execute(sql, (user_id, room_id, tag)) + except database_engine.module.IntegrityError as e: + # Return early if the row is already in the table + # and we don't need to bump the revision number of the + # private_user_data. + return + self._update_revision_txn(txn, user_id, room_id, next_id) + + with (yield self._private_user_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction("add_tag", add_tag_txn, next_id) + + self.get_tags_for_user.invalidate((user_id,)) + + @defer.inlineCallbacks + def remove_tag_from_room(self, user_id, room_id, tag): + """Remove a tag from a room for a user. + Returns: + A deffered that completes once the tag has been removed + """ + def remove_tag_txn(txn, next_id): + sql = ( + "DELETE FROM room_tags " + " WHERE user_id = ? AND room_id = ? AND tag = ?" + ) + txn.execute(sql, (user_id, room_id, tag)) + self._update_revision_txn(txn, user_id, room_id, next_id) + + with (yield self._private_user_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction("remove_tag", remove_tag_txn, next_id) + + self.get_tags_for_user.invalidate((user_id,)) + + def _update_revision_txn(self, txn, user_id, room_id, next_id): + """Update the latest revision of the tags for the given user and room. + + Args: + txn: The database cursor + user_id(str): The ID of the user. + room_id(str): The ID of the room. + next_id(int): The the revision to advance to. + """ + + update_max_id_sql = ( + "UPDATE private_user_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + + update_sql = ( + "UPDATE room_tags_revisions" + " SET stream_id = ?" + " WHERE user_id = ?" + " AND room_id = ?" + ) + txn.execute(update_sql, (next_id, user_id, room_id)) + + if txn.rowcount == 0: + insert_sql = ( + "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)" + " VALUES (?, ?, ?)" + ) + try: + txn.execute(insert_sql, (user_id, room_id, next_id)) + except database_engine.module.IntegrityError as e: + # Ignore insertion errors. It doesn't matter if the row wasn't + # inserted because if two updates happend concurrently the one + # with the higher stream_id will not be reported to a client + # unless the previous update has completed. It doesn't matter + # which stream_id ends up in the table, as long as it is higher + # than the id that the client has. + pass From a89b86dc473f5dc41a17207a17165b27d8f599ea Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Oct 2015 16:45:57 +0000 Subject: [PATCH 02/14] Fix pyflakes errors --- synapse/rest/client/v2_alpha/tags.py | 2 ++ synapse/storage/tags.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index c4a670c5a..15c9347fc 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -16,6 +16,7 @@ from ._base import client_v2_pattern from synapse.http.servlet import RestServlet +from synapse.api.errors import AuthError from twisted.internet import defer @@ -56,6 +57,7 @@ class TagServlet(RestServlet): PATTERN = client_v2_pattern( "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags/(?P[^/]*)" ) + def __init__(self, hs): super(TagServlet, self).__init__() self.auth = hs.get_auth() diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 507e60596..64c65fc32 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -84,7 +84,7 @@ class TagsStore(SQLBaseStore): results = {} if room_ids: tags_by_room = yield self.get_tags_for_user(self, user_id) - for room_id in rooms_ids: + for room_id in room_ids: results[room_id] = tags_by_room[room_id] defer.returnValue(results) @@ -117,7 +117,7 @@ class TagsStore(SQLBaseStore): ) try: txn.execute(sql, (user_id, room_id, tag)) - except database_engine.module.IntegrityError as e: + except self.database_engine.module.IntegrityError: # Return early if the row is already in the table # and we don't need to bump the revision number of the # private_user_data. @@ -180,7 +180,7 @@ class TagsStore(SQLBaseStore): ) try: txn.execute(insert_sql, (user_id, room_id, next_id)) - except database_engine.module.IntegrityError as e: + except self.database_engine.module.IntegrityError: # Ignore insertion errors. It doesn't matter if the row wasn't # inserted because if two updates happend concurrently the one # with the higher stream_id will not be reported to a client From f40b0ed5e190a78ed6633505c4f437b6fddc41ee Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Oct 2015 15:20:52 +0000 Subject: [PATCH 03/14] Inform the client of new room tags using v1 /events --- synapse/handlers/private_user_data.py | 46 +++++++++++++++++++++++++++ synapse/notifier.py | 2 +- synapse/rest/client/v2_alpha/tags.py | 14 ++++++-- synapse/storage/tags.py | 16 +++++++++- synapse/streams/events.py | 5 +++ synapse/types.py | 22 +++++++------ 6 files changed, 91 insertions(+), 14 deletions(-) create mode 100644 synapse/handlers/private_user_data.py diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py new file mode 100644 index 000000000..1778c7132 --- /dev/null +++ b/synapse/handlers/private_user_data.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +class PrivateUserDataEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + def get_current_key(self, direction='f'): + return self.store.get_max_private_user_data_stream_id() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + user_id = user.to_string() + last_stream_id = from_key + + current_stream_id = yield self.store.get_max_private_user_data_stream_id() + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + + results = [] + for room_id, room_tags in tags.items(): + results.append({ + "type": "m.tag", + "content": {"tags": room_tags}, + "room_id": room_id, + }) + + defer.returnValue((results, current_stream_id)) + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + defer.returnValue(([], config.to_id)) diff --git a/synapse/notifier.py b/synapse/notifier.py index f998fc83b..a78ee3c1e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -270,7 +270,7 @@ class Notifier(object): @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, - from_token=StreamToken("s0", "0", "0", "0")): + from_token=StreamToken("s0", "0", "0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 15c9347fc..486add990 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -62,6 +62,7 @@ class TagServlet(RestServlet): super(TagServlet, self).__init__() self.auth = hs.get_auth() self.store = hs.get_datastore() + self.notifier = hs.get_notifier() @defer.inlineCallbacks def on_PUT(self, request, user_id, room_id, tag): @@ -69,9 +70,12 @@ class TagServlet(RestServlet): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - yield self.store.add_tag_to_room(user_id, room_id, tag) + max_id = yield self.store.add_tag_to_room(user_id, room_id, tag) + + yield self.notifier.on_new_event( + "private_user_data_key", max_id, users=[user_id] + ) - # TODO: poke the notifier. defer.returnValue((200, {})) @defer.inlineCallbacks @@ -80,7 +84,11 @@ class TagServlet(RestServlet): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - yield self.store.remove_tag_from_room(user_id, room_id, tag) + max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag) + + yield self.notifier.on_new_event( + "private_user_data_key", max_id, users=[user_id] + ) # TODO: poke the notifier. defer.returnValue((200, {})) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 64c65fc32..2d5c49144 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -31,6 +31,14 @@ class TagsStore(SQLBaseStore): "private_user_data_max_stream_id", "stream_id" ) + def get_max_private_user_data_stream_id(self): + """Get the current max stream id for the private user data stream + + Returns: + A deferred int. + """ + return self._private_user_data_id_gen.get_max_token(self) + @cached() def get_tags_for_user(self, user_id): """Get all the tags for a user. @@ -83,7 +91,7 @@ class TagsStore(SQLBaseStore): results = {} if room_ids: - tags_by_room = yield self.get_tags_for_user(self, user_id) + tags_by_room = yield self.get_tags_for_user(user_id) for room_id in room_ids: results[room_id] = tags_by_room[room_id] @@ -129,6 +137,9 @@ class TagsStore(SQLBaseStore): self.get_tags_for_user.invalidate((user_id,)) + result = yield self._private_user_data_id_gen.get_max_token(self) + defer.returnValue(result) + @defer.inlineCallbacks def remove_tag_from_room(self, user_id, room_id, tag): """Remove a tag from a room for a user. @@ -148,6 +159,9 @@ class TagsStore(SQLBaseStore): self.get_tags_for_user.invalidate((user_id,)) + result = yield self._private_user_data_id_gen.get_max_token(self) + defer.returnValue(result) + def _update_revision_txn(self, txn, user_id, room_id, next_id): """Update the latest revision of the tags for the given user and room. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 699083ae1..f0d68b5bf 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -21,6 +21,7 @@ from synapse.handlers.presence import PresenceEventSource from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource from synapse.handlers.receipts import ReceiptEventSource +from synapse.handlers.private_user_data import PrivateUserDataEventSource class EventSources(object): @@ -29,6 +30,7 @@ class EventSources(object): "presence": PresenceEventSource, "typing": TypingNotificationEventSource, "receipt": ReceiptEventSource, + "private_user_data": PrivateUserDataEventSource, } def __init__(self, hs): @@ -52,5 +54,8 @@ class EventSources(object): receipt_key=( yield self.sources["receipt"].get_current_key() ), + private_user_data_key=( + yield self.sources["private_user_data"].get_current_key() + ), ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index 9cffc33d2..8d3a8d88c 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -98,10 +98,13 @@ class EventID(DomainSpecificString): class StreamToken( - namedtuple( - "Token", - ("room_key", "presence_key", "typing_key", "receipt_key") - ) + namedtuple("Token", ( + "room_key", + "presence_key", + "typing_key", + "receipt_key", + "private_user_data_key", + )) ): _SEPARATOR = "_" @@ -128,13 +131,14 @@ class StreamToken( else: return int(self.room_key[1:].split("-")[-1]) - def is_after(self, other_token): + def is_after(self, other): """Does this token contain events that the other doesn't?""" return ( - (other_token.room_stream_id < self.room_stream_id) - or (int(other_token.presence_key) < int(self.presence_key)) - or (int(other_token.typing_key) < int(self.typing_key)) - or (int(other_token.receipt_key) < int(self.receipt_key)) + (other.room_stream_id < self.room_stream_id) + or (int(other.presence_key) < int(self.presence_key)) + or (int(other.typing_key) < int(self.typing_key)) + or (int(other.receipt_key) < int(self.receipt_key)) + or (int(other.private_user_data_key) < int(self.private_user_data_key)) ) def copy_and_advance(self, key, new_value): From fdf73c6855f2b043f1af451e77e2413049a21ab2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Oct 2015 16:22:32 +0000 Subject: [PATCH 04/14] Include room tags v1 /initialSync --- synapse/handlers/message.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 024474d5f..c5dce3008 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -322,6 +322,8 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("receipt"), None ) + tags_by_room = yield self.store.get_tags_for_user(user_id) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -398,6 +400,16 @@ class MessageHandler(BaseHandler): serialize_event(c, time_now, as_client_event) for c in current_state.values() ] + + private_user_data = [] + tags = tags_by_room.get(event.room_id) + if tags: + private_user_data.append({ + "room_id": event.room_id, + "type": "m.tag", + "content": {"tags": tags}, + }) + d["private_user_data"] = private_user_data except: logger.exception("Failed to get snapshot") From 79b65f387538d1386369fcec142770ea91fdf8a2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Oct 2015 16:28:19 +0000 Subject: [PATCH 05/14] Include tags in v1 room initial sync --- synapse/handlers/message.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c5dce3008..8f156e5c8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -459,6 +459,17 @@ class MessageHandler(BaseHandler): result = yield self._room_initial_sync_parted( user_id, room_id, pagin_config, member_event ) + + private_user_data = [] + tags = yield self.store.get_tags_for_room(user_id, room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + "room_id": room_id, + }) + result["private_user_data"] = private_user_data + defer.returnValue(result) @defer.inlineCallbacks From fb46937413cc0ccbf12063a5743ddf914cd8170a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Oct 2015 16:38:35 +0000 Subject: [PATCH 06/14] Support clients supplying older tokens, fix short poll test --- synapse/types.py | 2 +- tests/rest/client/v1/test_presence.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/types.py b/synapse/types.py index 8d3a8d88c..84631d177 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -112,7 +112,7 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) - if len(keys) == len(cls._fields) - 1: + while len(keys) < len(cls._fields): # i.e. old token from before receipt_key keys.append("0") return cls(*keys) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 29d9bbaad..0e3b92224 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -369,7 +369,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): # all be ours # I'll already get my own presence state change - self.assertEquals({"start": "0_1_0_0", "end": "0_1_0_0", "chunk": []}, + self.assertEquals({"start": "0_1_0_0_0", "end": "0_1_0_0_0", "chunk": []}, response ) @@ -388,7 +388,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "s0_1_0_0", "end": "s0_2_0_0", "chunk": [ + self.assertEquals({"start": "s0_1_0_0_0", "end": "s0_2_0_0_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", From ddd8566f415ab3a6092aa4947e5d2aebf67109fc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 2 Nov 2015 15:11:31 +0000 Subject: [PATCH 07/14] Store room tag content and return the content in the m.tag event --- synapse/handlers/message.py | 6 ++-- synapse/rest/client/v2_alpha/tags.py | 12 +++++-- synapse/storage/schema/delta/25/tags.sql | 1 + synapse/storage/tags.py | 44 +++++++++++++++--------- 4 files changed, 41 insertions(+), 22 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8f156e5c8..0f947993d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -405,7 +405,6 @@ class MessageHandler(BaseHandler): tags = tags_by_room.get(event.room_id) if tags: private_user_data.append({ - "room_id": event.room_id, "type": "m.tag", "content": {"tags": tags}, }) @@ -466,7 +465,6 @@ class MessageHandler(BaseHandler): private_user_data.append({ "type": "m.tag", "content": {"tags": tags}, - "room_id": room_id, }) result["private_user_data"] = private_user_data @@ -499,8 +497,8 @@ class MessageHandler(BaseHandler): user_id, messages ) - start_token = StreamToken(token[0], 0, 0, 0) - end_token = StreamToken(token[1], 0, 0, 0) + start_token = StreamToken(token[0], 0, 0, 0, 0) + end_token = StreamToken(token[1], 0, 0, 0, 0) time_now = self.clock.time_msec() diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 486add990..4e3f917fc 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -16,12 +16,14 @@ from ._base import client_v2_pattern from synapse.http.servlet import RestServlet -from synapse.api.errors import AuthError +from synapse.api.errors import AuthError, SynapseError from twisted.internet import defer import logging +import simplejson as json + logger = logging.getLogger(__name__) @@ -70,7 +72,13 @@ class TagServlet(RestServlet): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - max_id = yield self.store.add_tag_to_room(user_id, room_id, tag) + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid tag JSON") + + max_id = yield self.store.add_tag_to_room(user_id, room_id, tag, body) yield self.notifier.on_new_event( "private_user_data_key", max_id, users=[user_id] diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql index 168766dcf..527424c99 100644 --- a/synapse/storage/schema/delta/25/tags.sql +++ b/synapse/storage/schema/delta/25/tags.sql @@ -18,6 +18,7 @@ CREATE TABLE IF NOT EXISTS room_tags( user_id TEXT NOT NULL, room_id TEXT NOT NULL, tag TEXT NOT NULL, -- The name of the tag. + content TEXT NOT NULL, -- The JSON content of the tag. CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) ); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 2d5c49144..34aa38c06 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -18,6 +18,7 @@ from synapse.util.caches.descriptors import cached from twisted.internet import defer from .util.id_generators import StreamIdGenerator +import ujson as json import logging logger = logging.getLogger(__name__) @@ -52,14 +53,15 @@ class TagsStore(SQLBaseStore): """ deferred = self._simple_select_list( - "room_tags", {"user_id": user_id}, ["room_id", "tag"] + "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"] ) @deferred.addCallback def tags_by_room(rows): tags_by_room = {} for row in rows: - tags_by_room.setdefault(row["room_id"], []).append(row["tag"]) + room_tags = tags_by_room.setdefault(row["room_id"], {}) + room_tags[row["tag"]] = json.loads(row["content"]) return tags_by_room return deferred @@ -105,31 +107,41 @@ class TagsStore(SQLBaseStore): Returns: A deferred list of string tags. """ - return self._simple_select_onecol( + return self._simple_select_list( table="room_tags", keyvalues={"user_id": user_id, "room_id": room_id}, - retcol="tag", + retcols=("tag", "content"), desc="get_tags_for_room", - ) + ).addCallback(lambda rows: { + row["tag"]: json.loads(row["content"]) for row in rows + }) @defer.inlineCallbacks - def add_tag_to_room(self, user_id, room_id, tag): + def add_tag_to_room(self, user_id, room_id, tag, content): """Add a tag to a room for a user. + Args: + user_id(str): The user to add a tag for. + room_id(str): The room to add a tag for. + tag(str): The tag name to add. + content(dict): A json object to associate with the tag. Returns: A deferred that completes once the tag has been added. """ + content_json = json.dumps(content) + def add_tag_txn(txn, next_id): - sql = ( - "INSERT INTO room_tags (user_id, room_id, tag)" - " VALUES (?, ?, ?)" + self._simple_upsert_txn( + txn, + table="room_tags", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "tag": tag, + }, + values={ + "content": content_json, + } ) - try: - txn.execute(sql, (user_id, room_id, tag)) - except self.database_engine.module.IntegrityError: - # Return early if the row is already in the table - # and we don't need to bump the revision number of the - # private_user_data. - return self._update_revision_txn(txn, user_id, room_id, next_id) with (yield self._private_user_data_id_gen.get_next(self)) as next_id: From 57be722c461f7727153d9563f20620f5a0549f5b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 2 Nov 2015 16:23:15 +0000 Subject: [PATCH 08/14] Include room tags in v2 /sync --- synapse/api/filtering.py | 7 +++ synapse/handlers/sync.py | 69 +++++++++++++++++++++++----- synapse/rest/client/v2_alpha/sync.py | 5 ++ 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index eb15d8c54..e4e3d1c59 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -147,6 +147,10 @@ class FilterCollection(object): self.filter_json.get("room", {}).get("ephemeral", {}) ) + self.room_private_user_data = Filter( + self.filter_json.get("room", {}).get("private_user_data", {}) + ) + self.presence_filter = Filter( self.filter_json.get("presence", {}) ) @@ -172,6 +176,9 @@ class FilterCollection(object): def filter_room_ephemeral(self, events): return self.room_ephemeral_filter.filter(events) + def filter_room_private_user_data(self, events): + return self.room_private_user_data.filter(events) + class Filter(object): def __init__(self, filter_json): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c5a2353b..ea524fb67 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -51,6 +51,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "timeline", "state", "ephemeral", + "private_user_data", ])): __slots__ = [] @@ -58,13 +59,19 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.timeline or self.state or self.ephemeral) + return bool( + self.timeline + or self.state + or self.ephemeral + or self.private_user_data + ) class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", "timeline", "state", + "private_user_data", ])): __slots__ = [] @@ -72,7 +79,11 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.timeline or self.state) + return bool( + self.timeline + or self.state + or self.private_user_data + ) class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ @@ -197,6 +208,10 @@ class SyncHandler(BaseHandler): ) ) + tags_by_room = yield self.store.get_tags_for_user( + sync_config.user.to_string() + ) + joined = [] invited = [] archived = [] @@ -207,7 +222,8 @@ class SyncHandler(BaseHandler): sync_config=sync_config, now_token=now_token, timeline_since_token=timeline_since_token, - typing_by_room=typing_by_room + typing_by_room=typing_by_room, + tags_by_room=tags_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -226,6 +242,7 @@ class SyncHandler(BaseHandler): leave_event_id=event.event_id, leave_token=leave_token, timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, ) archived.append(room_sync) @@ -240,7 +257,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - typing_by_room): + typing_by_room, tags_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -260,8 +277,21 @@ class SyncHandler(BaseHandler): timeline=batch, state=current_state_events, ephemeral=typing_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), )) + def private_user_data_for_room(self, room_id, tags_by_room): + private_user_data = [] + tags = tags_by_room.get(room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + return private_user_data + @defer.inlineCallbacks def typing_by_room(self, sync_config, now_token, since_token=None): """Get the typing events for each room the user is in @@ -296,7 +326,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token): + timeline_since_token, tags_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -314,6 +344,9 @@ class SyncHandler(BaseHandler): room_id=room_id, timeline=batch, state=leave_state[leave_event_id].values(), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), )) @defer.inlineCallbacks @@ -359,6 +392,11 @@ class SyncHandler(BaseHandler): limit=timeline_limit + 1, ) + tags_by_room = yield self.store.get_updated_tags( + sync_config.user.to_string(), + since_token.private_user_data_key, + ) + joined = [] archived = [] if len(room_events) <= timeline_limit: @@ -399,7 +437,10 @@ class SyncHandler(BaseHandler): limited=limited, ), state=state, - ephemeral=typing_by_room.get(room_id, []) + ephemeral=typing_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), ) if room_sync: joined.append(room_sync) @@ -416,14 +457,14 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - typing_by_room + typing_by_room, tags_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token + sync_config, leave_event, since_token, tags_by_room ) archived.append(room_sync) @@ -487,7 +528,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - typing_by_room): + typing_by_room, tags_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -528,7 +569,10 @@ class SyncHandler(BaseHandler): room_id=room_id, timeline=batch, state=state_events_delta, - ephemeral=typing_by_room.get(room_id, []) + ephemeral=typing_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), ) logging.debug("Room sync: %r", room_sync) @@ -537,7 +581,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token): + since_token, tags_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -578,6 +622,9 @@ class SyncHandler(BaseHandler): room_id=leave_event.room_id, timeline=batch, state=state_events_delta, + private_user_data=self.private_user_data_for_room( + leave_event.room_id, tags_by_room + ), ) logging.debug("Room sync: %r", room_sync) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 1840eef77..32a1087c9 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -220,6 +220,10 @@ class SyncRestServlet(RestServlet): ) timeline_event_ids.append(event.event_id) + private_user_data = filter.filter_room_private_user_data( + room.private_user_data + ) + result = { "event_map": event_map, "timeline": { @@ -228,6 +232,7 @@ class SyncRestServlet(RestServlet): "limited": room.timeline.limited, }, "state": {"events": state_event_ids}, + "private_user_data": {"events": private_user_data}, } if joined: From 2657140c5831bdb8aed0fe4644bb2fa8a056abf7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 2 Nov 2015 17:54:04 +0000 Subject: [PATCH 09/14] Include read receipts in v2 sync --- synapse/handlers/sync.py | 47 ++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c5a2353b..19c4d5163 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -174,7 +174,7 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() - now_token, typing_by_room = yield self.typing_by_room( + now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token ) @@ -207,7 +207,7 @@ class SyncHandler(BaseHandler): sync_config=sync_config, now_token=now_token, timeline_since_token=timeline_since_token, - typing_by_room=typing_by_room + ephemeral_by_room=ephemeral_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -240,7 +240,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - typing_by_room): + ephemeral_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -259,12 +259,12 @@ class SyncHandler(BaseHandler): room_id=room_id, timeline=batch, state=current_state_events, - ephemeral=typing_by_room.get(room_id, []), + ephemeral=ephemeral_by_room.get(room_id, []), )) @defer.inlineCallbacks - def typing_by_room(self, sync_config, now_token, since_token=None): - """Get the typing events for each room the user is in + def ephemeral_by_room(self, sync_config, now_token, since_token=None): + """Get the ephemeral events for each room the user is in Args: sync_config (SyncConfig): The flags, filters and user for the sync. now_token (StreamToken): Where the server is currently up to. @@ -286,12 +286,27 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("typing_key", typing_key) - typing_by_room = {event["room_id"]: [event] for event in typing} - for event in typing: - event.pop("room_id") - logger.debug("Typing %r", typing_by_room) + ephemeral_by_room = {} - defer.returnValue((now_token, typing_by_room)) + for event in typing: + room_id = event.pop("room_id") + ephemeral_by_room.setdefault(room_id, []).append(event) + + receipt_key = since_token.receipt_key if since_token else "0" + + receipt_source = self.event_sources.sources["receipt"] + receipts, receipt_key = yield receipt_source.get_new_events_for_user( + user=sync_config.user, + from_key=receipt_key, + limit=sync_config.filter.ephemeral_limit(), + ) + now_token = now_token.copy_and_replace("receipt_key", receipt_key) + + for event in receipts: + room_id = event.pop("room_id") + ephemeral_by_room.setdefault(room_id, []).append(event) + + defer.returnValue((now_token, ephemeral_by_room)) @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, @@ -333,7 +348,7 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) - now_token, typing_by_room = yield self.typing_by_room( + now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token, since_token ) @@ -399,7 +414,7 @@ class SyncHandler(BaseHandler): limited=limited, ), state=state, - ephemeral=typing_by_room.get(room_id, []) + ephemeral=ephemeral_by_room.get(room_id, []) ) if room_sync: joined.append(room_sync) @@ -416,7 +431,7 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - typing_by_room + ephemeral_by_room ) if room_sync: joined.append(room_sync) @@ -487,7 +502,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - typing_by_room): + ephemeral_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -528,7 +543,7 @@ class SyncHandler(BaseHandler): room_id=room_id, timeline=batch, state=state_events_delta, - ephemeral=typing_by_room.get(room_id, []) + ephemeral=ephemeral_by_room.get(room_id, []) ) logging.debug("Room sync: %r", room_sync) From 5897e773fd50733fa448b36275130e5441d36f31 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 3 Nov 2015 14:27:35 +0000 Subject: [PATCH 10/14] Spell "deferred" more correctly --- synapse/storage/tags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 34aa38c06..641ea250f 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -156,7 +156,7 @@ class TagsStore(SQLBaseStore): def remove_tag_from_room(self, user_id, room_id, tag): """Remove a tag from a room for a user. Returns: - A deffered that completes once the tag has been removed + A deferred that completes once the tag has been removed """ def remove_tag_txn(txn, next_id): sql = ( From 06986e46a3e5187d339aa64d061742cdac0e33a8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 3 Nov 2015 14:28:51 +0000 Subject: [PATCH 11/14] That TODO was done --- synapse/rest/client/v2_alpha/tags.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 4e3f917fc..dcfe6bd20 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -98,7 +98,6 @@ class TagServlet(RestServlet): "private_user_data_key", max_id, users=[user_id] ) - # TODO: poke the notifier. defer.returnValue((200, {})) From 7ce264ce5f1b2409081446bd8e1a4adc63675e06 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Nov 2015 16:23:35 +0000 Subject: [PATCH 12/14] Fix broken cache for getting retry times. This meant we retried remote destinations way more frequently than we should --- synapse/federation/transaction_queue.py | 47 ++++++++++++------------ synapse/storage/transactions.py | 48 +++++++++---------------- 2 files changed, 40 insertions(+), 55 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 32fa5e8c1..2a7dd343f 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -202,19 +202,6 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - logger.debug( - "TX [%s] Transaction already in progress", - destination - ) - return - - logger.debug("TX [%s] _attempt_new_transaction", destination) - # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) @@ -228,20 +215,34 @@ class TransactionQueue(object): logger.debug("TX [%s] Nothing to send", destination) return - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[2]) - - pdus = [x[0] for x in pending_pdus] - edus = [x[0] for x in pending_edus] - failures = [x[0].get_dict() for x in pending_failures] - deferreds = [ - x[1] - for x in pending_pdus + pending_edus + pending_failures - ] + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + logger.debug( + "TX [%s] Transaction already in progress", + destination + ) + return + # NOTE: Nothing should be between the above check and the insertion below try: self.pending_transactions[destination] = 1 + logger.debug("TX [%s] _attempt_new_transaction", destination) + + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[2]) + + pdus = [x[0] for x in pending_pdus] + edus = [x[0] for x in pending_edus] + failures = [x[0].get_dict() for x in pending_failures] + deferreds = [ + x[1] + for x in pending_pdus + pending_edus + pending_failures + ] + txn_id = str(self._next_txn_id) limiter = yield get_retry_limiter( diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 15695e983..4e0d7c977 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -253,16 +253,6 @@ class TransactionStore(SQLBaseStore): retry_interval (int) - how long until next retry in ms """ - # As this is the new value, we might as well prefill the cache - self.get_destination_retry_timings.prefill( - destination, - { - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval - }, - ) - # XXX: we could chose to not bother persisting this if our cache thinks # this is a NOOP return self.runInteraction( @@ -275,31 +265,25 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - query = ( - "UPDATE destinations" - " SET retry_last_ts = ?, retry_interval = ?" - " WHERE destination = ?" - ) + txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) - txn.execute( - query, - ( - retry_last_ts, retry_interval, destination, - ) + self._simple_upsert_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + values={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + insertion_values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } ) - if txn.rowcount == 0: - # destination wasn't already in table. Insert it. - self._simple_insert_txn( - txn, - table="destinations", - values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } - ) - def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. From 97d792b28fd2b0a51ed5b491e3ca6968733df383 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Nov 2015 16:31:08 +0000 Subject: [PATCH 13/14] Don't rearrange transaction_queue --- synapse/federation/transaction_queue.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 2a7dd343f..aac6f1c16 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -203,6 +203,17 @@ class TransactionQueue(object): @log_function def _attempt_new_transaction(self, destination): # list of (pending_pdu, deferred, order) + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + logger.debug( + "TX [%s] Transaction already in progress", + destination + ) + return + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) @@ -215,18 +226,6 @@ class TransactionQueue(object): logger.debug("TX [%s] Nothing to send", destination) return - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - logger.debug( - "TX [%s] Transaction already in progress", - destination - ) - return - - # NOTE: Nothing should be between the above check and the insertion below try: self.pending_transactions[destination] = 1 From c452dabc3d295998ed70dfa977866568dce9fa79 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 4 Nov 2015 14:08:15 +0000 Subject: [PATCH 14/14] Remove the LockManager class because it wasn't being used --- synapse/handlers/federation.py | 2 - synapse/server.py | 5 -- synapse/util/lockutils.py | 74 ---------------------- tests/util/test_lock.py | 108 --------------------------------- 4 files changed, 189 deletions(-) delete mode 100644 synapse/util/lockutils.py delete mode 100644 tests/util/test_lock.py diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae9d22758..b2395b28d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -72,8 +72,6 @@ class FederationHandler(BaseHandler): self.server_name = hs.hostname self.keyring = hs.get_keyring() - self.lock_manager = hs.get_room_lock_manager() - self.replication_layer.set_handler(self) # When joining a room we need to queue any events for that room up diff --git a/synapse/server.py b/synapse/server.py index 8424798b1..f75d5358b 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -29,7 +29,6 @@ from synapse.state import StateHandler from synapse.storage import DataStore from synapse.util import Clock from synapse.util.distributor import Distributor -from synapse.util.lockutils import LockManager from synapse.streams.events import EventSources from synapse.api.ratelimiting import Ratelimiter from synapse.crypto.keyring import Keyring @@ -70,7 +69,6 @@ class BaseHomeServer(object): 'auth', 'rest_servlet_factory', 'state_handler', - 'room_lock_manager', 'notifier', 'distributor', 'resource_for_client', @@ -201,9 +199,6 @@ class HomeServer(BaseHomeServer): def build_state_handler(self): return StateHandler(self) - def build_room_lock_manager(self): - return LockManager() - def build_distributor(self): return Distributor() diff --git a/synapse/util/lockutils.py b/synapse/util/lockutils.py deleted file mode 100644 index 33edc5c20..000000000 --- a/synapse/util/lockutils.py +++ /dev/null @@ -1,74 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -import logging - - -logger = logging.getLogger(__name__) - - -class Lock(object): - - def __init__(self, deferred, key): - self._deferred = deferred - self.released = False - self.key = key - - def release(self): - self.released = True - self._deferred.callback(None) - - def __del__(self): - if not self.released: - logger.critical("Lock was destructed but never released!") - self.release() - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - logger.debug("Releasing lock for key=%r", self.key) - self.release() - - -class LockManager(object): - """ Utility class that allows us to lock based on a `key` """ - - def __init__(self): - self._lock_deferreds = {} - - @defer.inlineCallbacks - def lock(self, key): - """ Allows us to block until it is our turn. - Args: - key (str) - Returns: - Lock - """ - new_deferred = defer.Deferred() - old_deferred = self._lock_deferreds.get(key) - self._lock_deferreds[key] = new_deferred - - if old_deferred: - logger.debug("Queueing on lock for key=%r", key) - yield old_deferred - logger.debug("Obtained lock for key=%r", key) - else: - logger.debug("Entering uncontended lock for key=%r", key) - - defer.returnValue(Lock(new_deferred, key)) diff --git a/tests/util/test_lock.py b/tests/util/test_lock.py deleted file mode 100644 index 6a1e521b1..000000000 --- a/tests/util/test_lock.py +++ /dev/null @@ -1,108 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer -from tests import unittest - -from synapse.util.lockutils import LockManager - - -class LockManagerTestCase(unittest.TestCase): - - def setUp(self): - self.lock_manager = LockManager() - - @defer.inlineCallbacks - def test_one_lock(self): - key = "test" - deferred_lock1 = self.lock_manager.lock(key) - - self.assertTrue(deferred_lock1.called) - - lock1 = yield deferred_lock1 - - self.assertFalse(lock1.released) - - lock1.release() - - self.assertTrue(lock1.released) - - @defer.inlineCallbacks - def test_concurrent_locks(self): - key = "test" - deferred_lock1 = self.lock_manager.lock(key) - deferred_lock2 = self.lock_manager.lock(key) - - self.assertTrue(deferred_lock1.called) - self.assertFalse(deferred_lock2.called) - - lock1 = yield deferred_lock1 - - self.assertFalse(lock1.released) - self.assertFalse(deferred_lock2.called) - - lock1.release() - - self.assertTrue(lock1.released) - self.assertTrue(deferred_lock2.called) - - lock2 = yield deferred_lock2 - - lock2.release() - - @defer.inlineCallbacks - def test_sequential_locks(self): - key = "test" - deferred_lock1 = self.lock_manager.lock(key) - - self.assertTrue(deferred_lock1.called) - - lock1 = yield deferred_lock1 - - self.assertFalse(lock1.released) - - lock1.release() - - self.assertTrue(lock1.released) - - deferred_lock2 = self.lock_manager.lock(key) - - self.assertTrue(deferred_lock2.called) - - lock2 = yield deferred_lock2 - - self.assertFalse(lock2.released) - - lock2.release() - - self.assertTrue(lock2.released) - - @defer.inlineCallbacks - def test_with_statement(self): - key = "test" - with (yield self.lock_manager.lock(key)) as lock: - self.assertFalse(lock.released) - - self.assertTrue(lock.released) - - @defer.inlineCallbacks - def test_two_with_statement(self): - key = "test" - with (yield self.lock_manager.lock(key)): - pass - - with (yield self.lock_manager.lock(key)): - pass