From 194238224614c0d01e0bbf9186398e15dafdf4ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 May 2015 16:11:55 +0100 Subject: [PATCH 01/25] Don't log enqueue_ --- synapse/federation/transaction_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4dccd93d0..ca04822fb 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -104,7 +104,6 @@ class TransactionQueue(object): return not destination.startswith("localhost") @defer.inlineCallbacks - @log_function def enqueue_pdu(self, pdu, destinations, order): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus From b8940cd9022cc76c1699f6bdccd5d23faae7945b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 May 2015 16:14:06 +0100 Subject: [PATCH 02/25] Remove some unused indexes --- .../storage/schema/delta/17/drop_indexes.sql | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 synapse/storage/schema/delta/17/drop_indexes.sql diff --git a/synapse/storage/schema/delta/17/drop_indexes.sql b/synapse/storage/schema/delta/17/drop_indexes.sql new file mode 100644 index 000000000..8eb3325a6 --- /dev/null +++ b/synapse/storage/schema/delta/17/drop_indexes.sql @@ -0,0 +1,18 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP INDEX IF EXISTS sent_transaction_dest; +DROP INDEX IF EXISTS sent_transaction_sent; +DROP INDEX IF EXISTS user_ips_user; From d9cc5de9e580c8a0de92352ec50fa62fb32b0b95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 10:24:10 +0100 Subject: [PATCH 03/25] Correctly name transaction --- synapse/storage/event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index fbbcce754..68f39bd68 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -104,7 +104,7 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, }, retcol="event_id", - desc="get_latest_events_in_room", + desc="get_latest_event_ids_in_room", ) def _get_latest_events_in_room(self, txn, room_id): From 261d809a4779b03c81ada52ed3893b2ad8782a96 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 14:08:03 +0100 Subject: [PATCH 04/25] Sequence the modifications to the cache so that selects don't race with inserts --- synapse/storage/_base.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c328b5274..7f5477dee 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -31,6 +31,7 @@ import functools import simplejson as json import sys import time +import threading logger = logging.getLogger(__name__) @@ -68,9 +69,20 @@ class Cache(object): self.name = name self.keylen = keylen - + self.sequence = 0 + self.thread = None caches_by_name[name] = self.cache + def check_thread(self): + expected_thread = self.thread + if expected_thread is None: + self.thread = threading.current_thread() + else: + if expected_thread is not threading.current_thread(): + raise ValueError( + "Cache objects can only be accessed from the main thread" + ) + def get(self, *keyargs): if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) @@ -82,6 +94,11 @@ class Cache(object): cache_counter.inc_misses(self.name) raise KeyError() + def update(self, sequence, *args): + self.check_thread() + if self.sequence == sequence: + self.prefill(*args) + def prefill(self, *args): # because I can't *keyargs, value keyargs = args[:-1] value = args[-1] @@ -96,9 +113,10 @@ class Cache(object): self.cache[keyargs] = value def invalidate(self, *keyargs): + self.check_thread() if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) - + self.sequence += 1 self.cache.pop(keyargs, None) @@ -130,9 +148,11 @@ def cached(max_entries=1000, num_args=1, lru=False): try: defer.returnValue(cache.get(*keyargs)) except KeyError: + sequence = cache.sequence + ret = yield orig(self, *keyargs) - cache.prefill(*keyargs + (ret,)) + cache.update(sequence, *keyargs + (ret,)) defer.returnValue(ret) From a9aea68fd568182185e8d0ae478c56df8ac6be49 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 14:57:08 +0100 Subject: [PATCH 05/25] Invalidate the caches from the correct thread --- synapse/storage/event_federation.py | 10 +++++--- synapse/storage/events.py | 39 +++++++++++++++++++---------- synapse/storage/room.py | 4 +-- synapse/storage/roommember.py | 8 +++--- synapse/storage/signatures.py | 12 ++++----- synapse/storage/state.py | 2 +- 6 files changed, 46 insertions(+), 29 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 68f39bd68..3cd3fbdc9 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, room_id, depth): + def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) do_insert = depth < min_depth if min_depth else True @@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, outlier, event_id, prev_events, - room_id): + def _handle_prev_events(self, txn, invalidates, outlier, event_id, + prev_events, room_id): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -330,7 +330,9 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - self.get_latest_event_ids_in_room.invalidate(room_id) + invalidates.append(( + self.get_latest_event_ids_in_room.invalidate, room_id + )) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3c260ddc..b2ab4b02f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_token try: - yield self.runInteraction( + invalidates = yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, @@ -52,6 +52,11 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) + for invalidated in invalidates: + invalidated_callback = invalidated[0] + invalidated_args = invalidated[1:] + invalidated_callback(*invalidated_args) + except _RollbackButIsFineException: pass @@ -91,9 +96,10 @@ class EventsStore(SQLBaseStore): def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): + invalidates = [] # Remove the any existing cache entries for the event_id - self._invalidate_get_event_cache(event.event_id) + invalidates.append((self._invalidate_get_event_cache, event.event_id)) if stream_ordering is None: with self._stream_id_gen.get_next_txn(txn) as stream_ordering: @@ -150,10 +156,11 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, event, context) + self._store_state_groups_txn(txn, invalidates, event, context) self._update_min_depth_for_room_txn( txn, + invalidates, event.room_id, event.depth ) @@ -199,6 +206,7 @@ class EventsStore(SQLBaseStore): self._handle_prev_events( txn, + invalidates, outlier=outlier, event_id=event.event_id, prev_events=event.prev_events, @@ -206,13 +214,13 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) + self._store_room_member_txn(txn, invalidates, event) elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) + self._store_room_name_txn(txn, invalidates, event) elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) + self._store_room_topic_txn(txn, invalidates, event) elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) + self._store_redaction(txn, invalidates, event) event_dict = { k: v @@ -281,19 +289,22 @@ class EventsStore(SQLBaseStore): ) if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) + self._store_rejections_txn( + txn, invalidates, event.event_id, context.rejected + ) for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, + txn, invalidates, event.event_id, hash_alg, hash_bytes, ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes + txn, invalidates, event.event_id, prev_event_id, alg, + hash_bytes ) for auth_id, _ in event.auth_events: @@ -309,7 +320,7 @@ class EventsStore(SQLBaseStore): (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes + txn, invalidates, event.event_id, ref_alg, ref_hash_bytes ) if event.is_state(): @@ -356,9 +367,11 @@ class EventsStore(SQLBaseStore): } ) - def _store_redaction(self, txn, event): + return invalidates + + def _store_redaction(self, txn, invalidates, event): # invalidate the cache for the redacted event - self._invalidate_get_event_cache(event.redacts) + invalidates.append((self._invalidate_get_event_cache, event.redacts)) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index f95637763..d42d7ff0e 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic_txn(self, txn, event): + def _store_room_topic_txn(self, txn, invalidates, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( txn, @@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore): }, ) - def _store_room_name_txn(self, txn, event): + def _store_room_name_txn(self, txn, invalidates, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 09fb77a19..117da817b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, event): + def _store_room_member_txn(self, txn, invalidates, event): """Store a room member in the database. """ try: @@ -64,8 +64,10 @@ class RoomMemberStore(SQLBaseStore): } ) - self.get_rooms_for_user.invalidate(target_user_id) - self.get_joined_hosts_for_room.invalidate(event.room_id) + invalidates.extend([ + (self.get_rooms_for_user.invalidate, target_user_id), + (self.get_joined_hosts_for_room.invalidate, event.room_id), + ]) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index f05182863..e3979846e 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return dict(txn.fetchall()) - def _store_event_content_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + def _store_event_content_hash_txn(self, txn, invalidates, event_id, + algorithm, hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + def _store_event_reference_hash_txn(self, txn, invalidates, event_id, + algorithm, hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore): hashes[algorithm] = hash_bytes return results - def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, - algorithm, hash_bytes): + def _store_prev_event_hash_txn(self, txn, invalidates, event_id, + prev_event_id, algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7e55e8bed..35d11c27c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): f, ) - def _store_state_groups_txn(self, txn, event, context): + def _store_state_groups_txn(self, txn, invalidates, event, context): if context.current_state is None: return From 1692dc019d803287047b16beda92fec4f1934622 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 15:00:30 +0100 Subject: [PATCH 06/25] Don't call 'encode_parameter' no-op --- synapse/storage/_base.py | 4 ---- synapse/storage/engines/postgres.py | 3 --- synapse/storage/engines/sqlite3.py | 3 --- 3 files changed, 10 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c328b5274..e01c61d08 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -167,10 +167,6 @@ class LoggingTransaction(object): sql = self.database_engine.convert_param_style(sql) if args and args[0]: - args = list(args) - args[0] = [ - self.database_engine.encode_parameter(a) for a in args[0] - ] try: sql_logger.debug( "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 64e34265f..a32302854 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -36,9 +36,6 @@ class PostgresEngine(object): def convert_param_style(self, sql): return sql.replace("?", "%s") - def encode_parameter(self, param): - return param - def on_new_connection(self, db_conn): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 7b49157cb..ff13d8006 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -26,9 +26,6 @@ class Sqlite3Engine(object): def convert_param_style(self, sql): return sql - def encode_parameter(self, param): - return param - def on_new_connection(self, db_conn): self.prepare_database(db_conn) From 43c2e8deae5f7e2b339ab5c131391231886cad09 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 15:13:25 +0100 Subject: [PATCH 07/25] Add support for using executemany --- synapse/storage/_base.py | 54 ++++++++++++++++++++++------- synapse/storage/event_federation.py | 40 +++++++++++---------- synapse/storage/events.py | 46 +++++++++++++----------- synapse/storage/state.py | 16 +++++---- tests/storage/test_base.py | 4 +-- 5 files changed, 99 insertions(+), 61 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e01c61d08..b7c3cf03c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -160,18 +160,23 @@ class LoggingTransaction(object): def __setattr__(self, name, value): setattr(self.txn, name, value) - def execute(self, sql, *args, **kwargs): + def execute(self, sql, *args): + self._do_execute(self.txn.execute, sql, *args) + + def executemany(self, sql, *args): + self._do_execute(self.txn.executemany, sql, *args) + + def _do_execute(self, func, sql, *args): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) sql = self.database_engine.convert_param_style(sql) - if args and args[0]: + if args: try: sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), - self.name, - *args[0] + "[SQL values] {%s} %r", + self.name, args[0] ) except: # Don't let logging failures stop SQL from working @@ -180,8 +185,8 @@ class LoggingTransaction(object): start = time.time() * 1000 try: - return self.txn.execute( - sql, *args, **kwargs + return func( + sql, *args ) except Exception as e: logger.debug("[SQL FAIL] {%s} %s", self.name, e) @@ -434,18 +439,41 @@ class SQLBaseStore(object): @log_function def _simple_insert_txn(self, txn, table, values): + keys, vals = zip(*values.items()) + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, - ", ".join(k for k in values), - ", ".join("?" for k in values) + ", ".join(k for k in keys), + ", ".join("?" for _ in keys) ) - logger.debug( - "[SQL] %s Args=%s", - sql, values.values(), + txn.execute(sql, vals) + + def _simple_insert_many_txn(self, txn, table, values): + if not values: + return + + keys, vals = zip(*[ + zip( + *(sorted(i.items(), key=lambda kv: kv[0])) + ) + for i in values + if i + ]) + + for k in keys: + if k != keys[0]: + raise RuntimeError( + "All items must have the same keys" + ) + + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( + table, + ", ".join(k for k in keys[0]), + ", ".join("?" for _ in keys[0]) ) - txn.execute(sql, values.values()) + txn.executemany(sql, vals) def _simple_upsert(self, table, keyvalues, values, insertion_values={}, desc="_simple_upsert", lock=True): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 68f39bd68..0aca4ba17 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -262,18 +262,19 @@ class EventFederationStore(SQLBaseStore): For the given event, update the event edges table and forward and backward extremities tables. """ - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, + self._simple_insert_many_txn( + txn, table="event_edges", - values={ - "event_id": event_id, - "prev_event_id": e_id, - "room_id": room_id, - "is_state": False, - }, - ) + values=[ + { + "event_id": event_id, + "prev_event_id": e_id, + "room_id": room_id, + "is_state": False, + } + for e_id, _ in prev_events + ], + ) # Update the extremities table if this is not an outlier. if not outlier: @@ -307,16 +308,17 @@ class EventFederationStore(SQLBaseStore): # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, - table="event_backward_extremities", - values={ + self._simple_insert_many_txn( + txn, + table="event_backward_extremities", + values=[ + { "event_id": e_id, "room_id": room_id, - }, - ) + } + for e_id, _ in prev_events + ], + ) # Also delete from the backwards extremities table all ones that # reference events that we have already seen diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3c260ddc..84e446a99 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -113,17 +113,19 @@ class EventsStore(SQLBaseStore): keyvalues={"room_id": event.room_id}, ) - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", + self._simple_insert_many_txn( + txn, + "current_state_events", + [ { "event_id": s.event_id, "room_id": s.room_id, "type": s.type, "state_key": s.state_key, - }, - ) + } + for s in current_state + ], + ) if event.is_state() and is_new_state: if not backfilled and not context.rejected: @@ -296,16 +298,18 @@ class EventsStore(SQLBaseStore): txn, event.event_id, prev_event_id, alg, hash_bytes ) - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ + self._simple_insert_many_txn( + txn, + table="event_auth", + values=[ + { "event_id": event.event_id, "room_id": event.room_id, "auth_id": auth_id, - }, - ) + } + for auth_id, _ in event.auth_events + ], + ) (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( @@ -330,17 +334,19 @@ class EventsStore(SQLBaseStore): vals, ) - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { "event_id": event.event_id, "prev_event_id": e_id, "room_id": event.room_id, "is_state": True, - }, - ) + } + for e_id, h in event.prev_state + ], + ) if is_new_state and not context.rejected: self._simple_upsert_txn( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7e55e8bed..dbc0e49c1 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -104,18 +104,20 @@ class StateStore(SQLBaseStore): }, ) - for state in state_events.values(): - self._simple_insert_txn( - txn, - table="state_groups_state", - values={ + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { "state_group": state_group, "room_id": state.room_id, "type": state.type, "state_key": state.state_key, "event_id": state.event_id, - }, - ) + } + for state in state_events.values() + ], + ) self._simple_insert_txn( txn, diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index a64d2b821..8c348ecc9 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -67,7 +67,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.mock_txn.execute.assert_called_with( "INSERT INTO tablename (columname) VALUES(?)", - ["Value"] + ("Value",) ) @defer.inlineCallbacks @@ -82,7 +82,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.mock_txn.execute.assert_called_with( "INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)", - [1, 2, 3] + (1, 2, 3,) ) @defer.inlineCallbacks From bdcd7693c8b954c9a7895339d4727c17221d4d9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 15:14:48 +0100 Subject: [PATCH 08/25] Fix indentation --- synapse/storage/event_federation.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0aca4ba17..36b1feac6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -264,16 +264,16 @@ class EventFederationStore(SQLBaseStore): """ self._simple_insert_many_txn( txn, - table="event_edges", - values=[ - { - "event_id": event_id, - "prev_event_id": e_id, - "room_id": room_id, - "is_state": False, - } - for e_id, _ in prev_events - ], + table="event_edges", + values=[ + { + "event_id": event_id, + "prev_event_id": e_id, + "room_id": room_id, + "is_state": False, + } + for e_id, _ in prev_events + ], ) # Update the extremities table if this is not an outlier. From d0fece8d3c4e9db3652785e41176e2a4241eebe1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 15:39:09 +0100 Subject: [PATCH 09/25] Missing return for when the event was already persisted --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b2ab4b02f..16359e876 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -202,7 +202,7 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - return + return invalidates self._handle_prev_events( txn, From bfa4a7f8b023d91f93d4a5f0e8bd592400a2e166 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 15:43:49 +0100 Subject: [PATCH 10/25] Invalidate the room_member cache if the current state events updates --- synapse/storage/events.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 16359e876..7dc49ceed 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -120,6 +120,11 @@ class EventsStore(SQLBaseStore): ) for s in current_state: + if s.type == EventTypes.Member: + invalidates.extend([ + (self.get_rooms_for_user.invalidate, s.state_key), + (self.get_joined_hosts_for_room.invalidate, s.room_id), + ]) self._simple_insert_txn( txn, "current_state_events", From 531d7955fd6265bc7e0a6424ec68cdc19ccef8da Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 16:12:28 +0100 Subject: [PATCH 11/25] Don't insert without deduplication. In this case we never actually use this table, so simply remove the insert entirely --- synapse/storage/events.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 84e446a99..34bd49cfe 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -127,28 +127,6 @@ class EventsStore(SQLBaseStore): ], ) - if event.is_state() and is_new_state: - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - outlier = event.internal_metadata.is_outlier() if not outlier: From 63075118a528d1abf0b146a961ec5c571bf058b2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 16:24:04 +0100 Subject: [PATCH 12/25] Add debug flag in synapse/storage/_base.py for debugging the cache logic by comparing what is in the cache with what was in the database on every access --- synapse/storage/_base.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7f5477dee..840a4994b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -33,6 +33,7 @@ import sys import time import threading +DEBUG_CACHES = False logger = logging.getLogger(__name__) @@ -146,7 +147,17 @@ def cached(max_entries=1000, num_args=1, lru=False): @defer.inlineCallbacks def wrapped(self, *keyargs): try: - defer.returnValue(cache.get(*keyargs)) + cached_result = cache.get(*keyargs) + if DEBUG_CACHES: + actual_result = yield orig(self, *keyargs) + if actual_result != cached_result: + logger.error( + "Stale cache entry %s%r: cached: %r, actual %r", + orig.__name__, keyargs, + cached_result, actual_result, + ) + raise ValueError("Stale cache entry") + defer.returnValue(cached_result) except KeyError: sequence = cache.sequence From 041b6cba612f5640fe490859a54f0ef140e29d33 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 16:32:44 +0100 Subject: [PATCH 13/25] SYN-369: Add comments to the sequence number logic in the cache --- synapse/storage/_base.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 840a4994b..579ed5637 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -98,6 +98,8 @@ class Cache(object): def update(self, sequence, *args): self.check_thread() if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) self.prefill(*args) def prefill(self, *args): # because I can't *keyargs, value @@ -117,6 +119,8 @@ class Cache(object): self.check_thread() if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) + # Increment the sequence number so that any SELECT statements that + # raced with the INSERT don't update the cache (SYN-369) self.sequence += 1 self.cache.pop(keyargs, None) @@ -159,6 +163,9 @@ def cached(max_entries=1000, num_args=1, lru=False): raise ValueError("Stale cache entry") defer.returnValue(cached_result) except KeyError: + # Get the sequence number of the cache before reading from the + # database so that we can tell if the cache is invalidated + # while the SELECT is executing (SYN-369) sequence = cache.sequence ret = yield orig(self, *keyargs) From 995154239358af589146ab4697e7cb4f100e2d84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 17:06:55 +0100 Subject: [PATCH 14/25] Add a comment about the zip(*[zip(sorted(...),...)]) --- synapse/storage/_base.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b7c3cf03c..94946587f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -453,6 +453,14 @@ class SQLBaseStore(object): if not values: return + # This is a *slight* abomination to get a list of tuples of key names + # and a list of tuples of value names. + # + # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}] + # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)] + # + # The sort is to ensure that we don't rely on dictionary iteration + # order. keys, vals = zip(*[ zip( *(sorted(i.items(), key=lambda kv: kv[0])) From d18f37e026a02b4e899bc96e600850007a613189 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:32:21 +0100 Subject: [PATCH 15/25] Collect the invalidate callbacks on the transaction object rather than passing around a separate list --- synapse/storage/_base.py | 18 ++++++++--- synapse/storage/event_federation.py | 10 +++--- synapse/storage/events.py | 48 +++++++++++++---------------- synapse/storage/room.py | 4 +-- synapse/storage/roommember.py | 8 ++--- synapse/storage/signatures.py | 12 ++++---- synapse/storage/state.py | 2 +- 7 files changed, 51 insertions(+), 51 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 579ed5637..ccf9697fa 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -185,12 +185,16 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name", "database_engine"] + __slots__ = ["txn", "name", "database_engine", "after_callbacks"] - def __init__(self, txn, name, database_engine): + def __init__(self, txn, name, database_engine, after_callbacks): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) object.__setattr__(self, "database_engine", database_engine) + object.__setattr__(self, "after_callbacks", after_callbacks) + + def call_after(self, callback, *args): + self.after_callbacks.append((callback, args)) def __getattr__(self, name): return getattr(self.txn, name) @@ -336,6 +340,8 @@ class SQLBaseStore(object): start_time = time.time() * 1000 + after_callbacks = [] + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: if self.database_engine.is_connection_closed(conn): @@ -360,10 +366,10 @@ class SQLBaseStore(object): while True: try: txn = conn.cursor() - return func( - LoggingTransaction(txn, name, self.database_engine), - *args, **kwargs + txn = LoggingTransaction( + txn, name, self.database_engine, after_callbacks ) + return func(txn, *args, **kwargs) except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -412,6 +418,8 @@ class SQLBaseStore(object): result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) def cursor_to_dict(self, cursor): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3cd3fbdc9..893344eff 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth): + def _update_min_depth_for_room_txn(self, txn, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) do_insert = depth < min_depth if min_depth else True @@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, invalidates, outlier, event_id, - prev_events, room_id): + def _handle_prev_events(self, txn, outlier, event_id, prev_events, + room_id): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -330,9 +330,9 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - invalidates.append(( + txn.call_after( self.get_latest_event_ids_in_room.invalidate, room_id - )) + ) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7dc49ceed..17f9d2728 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_token try: - invalidates = yield self.runInteraction( + yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, @@ -52,11 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - for invalidated in invalidates: - invalidated_callback = invalidated[0] - invalidated_args = invalidated[1:] - invalidated_callback(*invalidated_args) - except _RollbackButIsFineException: pass @@ -96,10 +91,9 @@ class EventsStore(SQLBaseStore): def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): - invalidates = [] # Remove the any existing cache entries for the event_id - invalidates.append((self._invalidate_get_event_cache, event.event_id)) + txn.call_after(self._invalidate_get_event_cache, event.event_id) if stream_ordering is None: with self._stream_id_gen.get_next_txn(txn) as stream_ordering: @@ -121,10 +115,12 @@ class EventsStore(SQLBaseStore): for s in current_state: if s.type == EventTypes.Member: - invalidates.extend([ - (self.get_rooms_for_user.invalidate, s.state_key), - (self.get_joined_hosts_for_room.invalidate, s.room_id), - ]) + txn.call_after( + self.get_rooms_for_user.invalidate, s.state_key + ) + txn.call_after( + self.get_joined_hosts_for_room.invalidate, s.room_id + ) self._simple_insert_txn( txn, "current_state_events", @@ -161,11 +157,10 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, invalidates, event, context) + self._store_state_groups_txn(txn, event, context) self._update_min_depth_for_room_txn( txn, - invalidates, event.room_id, event.depth ) @@ -207,11 +202,10 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - return invalidates + return self._handle_prev_events( txn, - invalidates, outlier=outlier, event_id=event.event_id, prev_events=event.prev_events, @@ -219,13 +213,13 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Member: - self._store_room_member_txn(txn, invalidates, event) + self._store_room_member_txn(txn, event) elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, invalidates, event) + self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, invalidates, event) + self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Redaction: - self._store_redaction(txn, invalidates, event) + self._store_redaction(txn, event) event_dict = { k: v @@ -295,20 +289,20 @@ class EventsStore(SQLBaseStore): if context.rejected: self._store_rejections_txn( - txn, invalidates, event.event_id, context.rejected + txn, event.event_id, context.rejected ) for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( - txn, invalidates, event.event_id, hash_alg, hash_bytes, + txn, event.event_id, hash_alg, hash_bytes, ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_prev_event_hash_txn( - txn, invalidates, event.event_id, prev_event_id, alg, + txn, event.event_id, prev_event_id, alg, hash_bytes ) @@ -325,7 +319,7 @@ class EventsStore(SQLBaseStore): (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( - txn, invalidates, event.event_id, ref_alg, ref_hash_bytes + txn, event.event_id, ref_alg, ref_hash_bytes ) if event.is_state(): @@ -372,11 +366,11 @@ class EventsStore(SQLBaseStore): } ) - return invalidates + return - def _store_redaction(self, txn, invalidates, event): + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event - invalidates.append((self._invalidate_get_event_cache, event.redacts)) + txn.call_after(self._invalidate_get_event_cache, event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index d42d7ff0e..f95637763 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic_txn(self, txn, invalidates, event): + def _store_room_topic_txn(self, txn, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( txn, @@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore): }, ) - def _store_room_name_txn(self, txn, invalidates, event): + def _store_room_name_txn(self, txn, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 117da817b..839c74f63 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, invalidates, event): + def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ try: @@ -64,10 +64,8 @@ class RoomMemberStore(SQLBaseStore): } ) - invalidates.extend([ - (self.get_rooms_for_user.invalidate, target_user_id), - (self.get_joined_hosts_for_room.invalidate, event.room_id), - ]) + txn.call_after(self.get_rooms_for_user.invalidate, target_user_id) + txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index e3979846e..f05182863 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return dict(txn.fetchall()) - def _store_event_content_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_content_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_reference_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore): hashes[algorithm] = hash_bytes return results - def _store_prev_event_hash_txn(self, txn, invalidates, event_id, - prev_event_id, algorithm, hash_bytes): + def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, + algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 35d11c27c..7e55e8bed 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): f, ) - def _store_state_groups_txn(self, txn, invalidates, event, context): + def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return From 3d5a955e08c21c076c55806c3c1e78a19c09ad4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 17:36:57 +0100 Subject: [PATCH 16/25] Missed events are not outliers --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6811a0e3d..904c7c094 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -491,7 +491,7 @@ class FederationClient(FederationBase): ] signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=True + destination, events, outlier=False ) have_gotten_all_from_destination = True From e45b05647e9242ba543562e3ad2bb4141e85ab8c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:38:10 +0100 Subject: [PATCH 17/25] Fix the --help option for synapse --- synapse/config/_base.py | 47 +++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index cd4bd28e8..2807abbc9 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -144,16 +144,17 @@ class Config(object): ) config_args, remaining_args = config_parser.parse_known_args(argv) - if not config_args.config_path: - config_parser.error( - "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" - " -c CONFIG-FILE\"" - ) - - config_dir_path = os.path.dirname(config_args.config_path[0]) - config_dir_path = os.path.abspath(config_dir_path) if config_args.generate_config: + if not config_args.config_path: + config_parser.error( + "Must supply a config file.\nA config file can be automatically" + " generated using \"--generate-config -h SERVER_NAME" + " -c CONFIG-FILE\"" + ) + + config_dir_path = os.path.dirname(config_args.config_path[0]) + config_dir_path = os.path.abspath(config_dir_path) + server_name = config_args.server_name if not server_name: print "Most specify a server_name to a generate config for." @@ -196,6 +197,25 @@ class Config(object): ) sys.exit(0) + parser = argparse.ArgumentParser( + parents=[config_parser], + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + obj.invoke_all("add_arguments", parser) + args = parser.parse_args(remaining_args) + + if not config_args.config_path: + config_parser.error( + "Must supply a config file.\nA config file can be automatically" + " generated using \"--generate-config -h SERVER_NAME" + " -c CONFIG-FILE\"" + ) + + config_dir_path = os.path.dirname(config_args.config_path[0]) + config_dir_path = os.path.abspath(config_dir_path) + specified_config = {} for config_path in config_args.config_path: yaml_config = cls.read_config_file(config_path) @@ -208,15 +228,6 @@ class Config(object): obj.invoke_all("read_config", config) - parser = argparse.ArgumentParser( - parents=[config_parser], - description=description, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - obj.invoke_all("add_arguments", parser) - args = parser.parse_args(remaining_args) - obj.invoke_all("read_arguments", args) return obj From deb0237166afe280847b625260620d8fb675f7d7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:45:11 +0100 Subject: [PATCH 18/25] Add some doc-string --- synapse/storage/_base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ccf9697fa..dbef179b2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -194,6 +194,10 @@ class LoggingTransaction(object): object.__setattr__(self, "after_callbacks", after_callbacks) def call_after(self, callback, *args): + """Call the given callback on the main twisted thread after the + transaction has finished. Used to invalidate the caches on the + correct thread. + """ self.after_callbacks.append((callback, args)) def __getattr__(self, name): From 977338a7afa5e95dba1ce230ba253daf2b239fb5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 18:12:44 +0100 Subject: [PATCH 19/25] Use buffer(...) when inserting into bytea column --- synapse/federation/persistence.py | 4 +--- synapse/storage/transactions.py | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 76a9dcd77..865766eb2 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,8 +23,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from syutil.jsonutil import encode_canonical_json - import logging @@ -71,7 +69,7 @@ class TransactionActions(object): transaction.transaction_id, transaction.origin, code, - encode_canonical_json(response) + response, ) @defer.inlineCallbacks diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 89dd7d894..b5b21a9b1 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached from collections import namedtuple +from syutil.jsonutil import encode_canonical_json import logging logger = logging.getLogger(__name__) @@ -82,7 +83,7 @@ class TransactionStore(SQLBaseStore): "transaction_id": transaction_id, "origin": origin, "response_code": code, - "response_json": response_dict, + "response_json": buffer(encode_canonical_json(response_dict)), }, or_ignore=True, desc="set_received_txn_response", From 0cf7e480b442f9f893b782ab1a437b556c1bbb54 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 18:20:01 +0100 Subject: [PATCH 20/25] And use buffer(...) there as well --- synapse/federation/persistence.py | 2 +- synapse/storage/transactions.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 865766eb2..1a7cc02f9 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -99,5 +99,5 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, response_code, - encode_canonical_json(response_dict) + response_dict, ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b5b21a9b1..624da4a9d 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -162,7 +162,8 @@ class TransactionStore(SQLBaseStore): return self.runInteraction( "delivered_txn", self._delivered_txn, - transaction_id, destination, code, response_dict + transaction_id, destination, code, + buffer(encode_canonical_json(response_dict)), ) def _delivered_txn(self, txn, transaction_id, destination, From 119e5d7702a1de0b196a374b53b646c06ee753e5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 May 2015 11:41:19 +0100 Subject: [PATCH 21/25] Seperate scripts/ into scripts/ and scripts-dev/, where scripts/* are automatically added to the package --- register_new_matrix_user | 153 ------------------------ scripts/check_auth.py | 65 ---------- scripts/check_event_hash.py | 50 -------- scripts/check_signature.py | 73 ----------- scripts/copyrighter-sql.pl | 33 ----- scripts/copyrighter.pl | 33 ----- scripts/database-save.sh | 16 --- scripts/federation_client.py | 146 ---------------------- scripts/hash_history.py | 69 ----------- scripts/make_identicons.pl | 39 ------ scripts/nuke-room-from-db.sh | 24 ---- scripts/port_from_sqlite_to_postgres.py | 1 + scripts/sphinx_api_docs.sh | 1 - scripts/upgrade_db_to_v0.6.0.py | 2 +- setup.py | 3 +- 15 files changed, 4 insertions(+), 704 deletions(-) delete mode 100755 register_new_matrix_user delete mode 100644 scripts/check_auth.py delete mode 100644 scripts/check_event_hash.py delete mode 100644 scripts/check_signature.py delete mode 100755 scripts/copyrighter-sql.pl delete mode 100755 scripts/copyrighter.pl delete mode 100755 scripts/database-save.sh delete mode 100644 scripts/federation_client.py delete mode 100644 scripts/hash_history.py delete mode 100755 scripts/make_identicons.pl delete mode 100755 scripts/nuke-room-from-db.sh mode change 100644 => 100755 scripts/port_from_sqlite_to_postgres.py delete mode 100644 scripts/sphinx_api_docs.sh mode change 100644 => 100755 scripts/upgrade_db_to_v0.6.0.py diff --git a/register_new_matrix_user b/register_new_matrix_user deleted file mode 100755 index 0ca83795a..000000000 --- a/register_new_matrix_user +++ /dev/null @@ -1,153 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import argparse -import getpass -import hashlib -import hmac -import json -import sys -import urllib2 -import yaml - - -def request_registration(user, password, server_location, shared_secret): - mac = hmac.new( - key=shared_secret, - msg=user, - digestmod=hashlib.sha1, - ).hexdigest() - - data = { - "username": user, - "password": password, - "mac": mac, - } - - server_location = server_location.rstrip("/") - - print "Sending registration request..." - - req = urllib2.Request( - "%s/_matrix/client/v2_alpha/register" % (server_location,), - data=json.dumps(data), - headers={'Content-Type': 'application/json'} - ) - try: - if sys.version_info[:3] >= (2, 7, 9): - # As of version 2.7.9, urllib2 now checks SSL certs - import ssl - f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) - else: - f = urllib2.urlopen(req) - f.read() - f.close() - print "Success." - except urllib2.HTTPError as e: - print "ERROR! Received %d %s" % (e.code, e.reason,) - if 400 <= e.code < 500: - if e.info().type == "application/json": - resp = json.load(e) - if "error" in resp: - print resp["error"] - sys.exit(1) - - -def register_new_user(user, password, server_location, shared_secret): - if not user: - try: - default_user = getpass.getuser() - except: - default_user = None - - if default_user: - user = raw_input("New user localpart [%s]: " % (default_user,)) - if not user: - user = default_user - else: - user = raw_input("New user localpart: ") - - if not user: - print "Invalid user name" - sys.exit(1) - - if not password: - password = getpass.getpass("Password: ") - - if not password: - print "Password cannot be blank." - sys.exit(1) - - confirm_password = getpass.getpass("Confirm password: ") - - if password != confirm_password: - print "Passwords do not match" - sys.exit(1) - - request_registration(user, password, server_location, shared_secret) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Used to register new users with a given home server when" - " registration has been disabled. The home server must be" - " configured with the 'registration_shared_secret' option" - " set.", - ) - parser.add_argument( - "-u", "--user", - default=None, - help="Local part of the new user. Will prompt if omitted.", - ) - parser.add_argument( - "-p", "--password", - default=None, - help="New password for user. Will prompt if omitted.", - ) - - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "-c", "--config", - type=argparse.FileType('r'), - help="Path to server config file. Used to read in shared secret.", - ) - - group.add_argument( - "-k", "--shared-secret", - help="Shared secret as defined in server config file.", - ) - - parser.add_argument( - "server_url", - default="https://localhost:8448", - nargs='?', - help="URL to use to talk to the home server. Defaults to " - " 'https://localhost:8448'.", - ) - - args = parser.parse_args() - - if "config" in args and args.config: - config = yaml.safe_load(args.config) - secret = config.get("registration_shared_secret", None) - if not secret: - print "No 'registration_shared_secret' defined in config." - sys.exit(1) - else: - secret = args.shared_secret - - register_new_user(args.user, args.password, args.server_url, secret) diff --git a/scripts/check_auth.py b/scripts/check_auth.py deleted file mode 100644 index b889ac7fa..000000000 --- a/scripts/check_auth.py +++ /dev/null @@ -1,65 +0,0 @@ -from synapse.events import FrozenEvent -from synapse.api.auth import Auth - -from mock import Mock - -import argparse -import itertools -import json -import sys - - -def check_auth(auth, auth_chain, events): - auth_chain.sort(key=lambda e: e.depth) - - auth_map = { - e.event_id: e - for e in auth_chain - } - - create_events = {} - for e in auth_chain: - if e.type == "m.room.create": - create_events[e.room_id] = e - - for e in itertools.chain(auth_chain, events): - auth_events_list = [auth_map[i] for i, _ in e.auth_events] - - auth_events = { - (e.type, e.state_key): e - for e in auth_events_list - } - - auth_events[("m.room.create", "")] = create_events[e.room_id] - - try: - auth.check(e, auth_events=auth_events) - except Exception as ex: - print "Failed:", e.event_id, e.type, e.state_key - print "Auth_events:", auth_events - print ex - print json.dumps(e.get_dict(), sort_keys=True, indent=4) - # raise - print "Success:", e.event_id, e.type, e.state_key - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - - parser.add_argument( - 'json', - nargs='?', - type=argparse.FileType('r'), - default=sys.stdin, - ) - - args = parser.parse_args() - - js = json.load(args.json) - - - auth = Auth(Mock()) - check_auth( - auth, - [FrozenEvent(d) for d in js["auth_chain"]], - [FrozenEvent(d) for d in js["pdus"]], - ) diff --git a/scripts/check_event_hash.py b/scripts/check_event_hash.py deleted file mode 100644 index 679afbd26..000000000 --- a/scripts/check_event_hash.py +++ /dev/null @@ -1,50 +0,0 @@ -from synapse.crypto.event_signing import * -from syutil.base64util import encode_base64 - -import argparse -import hashlib -import sys -import json - - -class dictobj(dict): - def __init__(self, *args, **kargs): - dict.__init__(self, *args, **kargs) - self.__dict__ = self - - def get_dict(self): - return dict(self) - - def get_full_dict(self): - return dict(self) - - def get_pdu_json(self): - return dict(self) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), - default=sys.stdin) - args = parser.parse_args() - logging.basicConfig() - - event_json = dictobj(json.load(args.input_json)) - - algorithms = { - "sha256": hashlib.sha256, - } - - for alg_name in event_json.hashes: - if check_event_content_hash(event_json, algorithms[alg_name]): - print "PASS content hash %s" % (alg_name,) - else: - print "FAIL content hash %s" % (alg_name,) - - for algorithm in algorithms.values(): - name, h_bytes = compute_event_reference_hash(event_json, algorithm) - print "Reference hash %s: %s" % (name, encode_base64(h_bytes)) - -if __name__=="__main__": - main() - diff --git a/scripts/check_signature.py b/scripts/check_signature.py deleted file mode 100644 index 59e3d603a..000000000 --- a/scripts/check_signature.py +++ /dev/null @@ -1,73 +0,0 @@ - -from syutil.crypto.jsonsign import verify_signed_json -from syutil.crypto.signing_key import ( - decode_verify_key_bytes, write_signing_keys -) -from syutil.base64util import decode_base64 - -import urllib2 -import json -import sys -import dns.resolver -import pprint -import argparse -import logging - -def get_targets(server_name): - if ":" in server_name: - target, port = server_name.split(":") - yield (target, int(port)) - return - try: - answers = dns.resolver.query("_matrix._tcp." + server_name, "SRV") - for srv in answers: - yield (srv.target, srv.port) - except dns.resolver.NXDOMAIN: - yield (server_name, 8448) - -def get_server_keys(server_name, target, port): - url = "https://%s:%i/_matrix/key/v1" % (target, port) - keys = json.load(urllib2.urlopen(url)) - verify_keys = {} - for key_id, key_base64 in keys["verify_keys"].items(): - verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64)) - verify_signed_json(keys, server_name, verify_key) - verify_keys[key_id] = verify_key - return verify_keys - -def main(): - - parser = argparse.ArgumentParser() - parser.add_argument("signature_name") - parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), - default=sys.stdin) - - args = parser.parse_args() - logging.basicConfig() - - server_name = args.signature_name - keys = {} - for target, port in get_targets(server_name): - try: - keys = get_server_keys(server_name, target, port) - print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port) - write_signing_keys(sys.stdout, keys.values()) - break - except: - logging.exception("Error talking to %s:%s", target, port) - - json_to_check = json.load(args.input_json) - print "Checking JSON:" - for key_id in json_to_check["signatures"][args.signature_name]: - try: - key = keys[key_id] - verify_signed_json(json_to_check, args.signature_name, key) - print "PASS %s" % (key_id,) - except: - logging.exception("Check for key %s failed" % (key_id,)) - print "FAIL %s" % (key_id,) - - -if __name__ == '__main__': - main() - diff --git a/scripts/copyrighter-sql.pl b/scripts/copyrighter-sql.pl deleted file mode 100755 index 890e51e58..000000000 --- a/scripts/copyrighter-sql.pl +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/perl -pi -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -$copyright = <table-save.sql -.dump users -.dump access_tokens -.dump presence -.dump profiles -EOF diff --git a/scripts/federation_client.py b/scripts/federation_client.py deleted file mode 100644 index ea62dceb3..000000000 --- a/scripts/federation_client.py +++ /dev/null @@ -1,146 +0,0 @@ -import nacl.signing -import json -import base64 -import requests -import sys -import srvlookup - - -def encode_base64(input_bytes): - """Encode bytes as a base64 string without any padding.""" - - input_len = len(input_bytes) - output_len = 4 * ((input_len + 2) // 3) + (input_len + 2) % 3 - 2 - output_bytes = base64.b64encode(input_bytes) - output_string = output_bytes[:output_len].decode("ascii") - return output_string - - -def decode_base64(input_string): - """Decode a base64 string to bytes inferring padding from the length of the - string.""" - - input_bytes = input_string.encode("ascii") - input_len = len(input_bytes) - padding = b"=" * (3 - ((input_len + 3) % 4)) - output_len = 3 * ((input_len + 2) // 4) + (input_len + 2) % 4 - 2 - output_bytes = base64.b64decode(input_bytes + padding) - return output_bytes[:output_len] - - -def encode_canonical_json(value): - return json.dumps( - value, - # Encode code-points outside of ASCII as UTF-8 rather than \u escapes - ensure_ascii=False, - # Remove unecessary white space. - separators=(',',':'), - # Sort the keys of dictionaries. - sort_keys=True, - # Encode the resulting unicode as UTF-8 bytes. - ).encode("UTF-8") - - -def sign_json(json_object, signing_key, signing_name): - signatures = json_object.pop("signatures", {}) - unsigned = json_object.pop("unsigned", None) - - signed = signing_key.sign(encode_canonical_json(json_object)) - signature_base64 = encode_base64(signed.signature) - - key_id = "%s:%s" % (signing_key.alg, signing_key.version) - signatures.setdefault(signing_name, {})[key_id] = signature_base64 - - json_object["signatures"] = signatures - if unsigned is not None: - json_object["unsigned"] = unsigned - - return json_object - - -NACL_ED25519 = "ed25519" - -def decode_signing_key_base64(algorithm, version, key_base64): - """Decode a base64 encoded signing key - Args: - algorithm (str): The algorithm the key is for (currently "ed25519"). - version (str): Identifies this key out of the keys for this entity. - key_base64 (str): Base64 encoded bytes of the key. - Returns: - A SigningKey object. - """ - if algorithm == NACL_ED25519: - key_bytes = decode_base64(key_base64) - key = nacl.signing.SigningKey(key_bytes) - key.version = version - key.alg = NACL_ED25519 - return key - else: - raise ValueError("Unsupported algorithm %s" % (algorithm,)) - - -def read_signing_keys(stream): - """Reads a list of keys from a stream - Args: - stream : A stream to iterate for keys. - Returns: - list of SigningKey objects. - """ - keys = [] - for line in stream: - algorithm, version, key_base64 = line.split() - keys.append(decode_signing_key_base64(algorithm, version, key_base64)) - return keys - - -def lookup(destination, path): - if ":" in destination: - return "https://%s%s" % (destination, path) - else: - try: - srv = srvlookup.lookup("matrix", "tcp", destination)[0] - return "https://%s:%d%s" % (srv.host, srv.port, path) - except: - return "https://%s:%d%s" % (destination, 8448, path) - -def get_json(origin_name, origin_key, destination, path): - request_json = { - "method": "GET", - "uri": path, - "origin": origin_name, - "destination": destination, - } - - signed_json = sign_json(request_json, origin_key, origin_name) - - authorization_headers = [] - - for key, sig in signed_json["signatures"][origin_name].items(): - authorization_headers.append(bytes( - "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( - origin_name, key, sig, - ) - )) - - result = requests.get( - lookup(destination, path), - headers={"Authorization": authorization_headers[0]}, - verify=False, - ) - return result.json() - - -def main(): - origin_name, keyfile, destination, path = sys.argv[1:] - - with open(keyfile) as f: - key = read_signing_keys(f)[0] - - result = get_json( - origin_name, key, destination, "/_matrix/federation/v1/" + path - ) - - json.dump(result, sys.stdout) - -if __name__ == "__main__": - main() diff --git a/scripts/hash_history.py b/scripts/hash_history.py deleted file mode 100644 index bdad530af..000000000 --- a/scripts/hash_history.py +++ /dev/null @@ -1,69 +0,0 @@ -from synapse.storage.pdu import PduStore -from synapse.storage.signatures import SignatureStore -from synapse.storage._base import SQLBaseStore -from synapse.federation.units import Pdu -from synapse.crypto.event_signing import ( - add_event_pdu_content_hash, compute_pdu_event_reference_hash -) -from synapse.api.events.utils import prune_pdu -from syutil.base64util import encode_base64, decode_base64 -from syutil.jsonutil import encode_canonical_json -import sqlite3 -import sys - -class Store(object): - _get_pdu_tuples = PduStore.__dict__["_get_pdu_tuples"] - _get_pdu_content_hashes_txn = SignatureStore.__dict__["_get_pdu_content_hashes_txn"] - _get_prev_pdu_hashes_txn = SignatureStore.__dict__["_get_prev_pdu_hashes_txn"] - _get_pdu_origin_signatures_txn = SignatureStore.__dict__["_get_pdu_origin_signatures_txn"] - _store_pdu_content_hash_txn = SignatureStore.__dict__["_store_pdu_content_hash_txn"] - _store_pdu_reference_hash_txn = SignatureStore.__dict__["_store_pdu_reference_hash_txn"] - _store_prev_pdu_hash_txn = SignatureStore.__dict__["_store_prev_pdu_hash_txn"] - _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] - - -store = Store() - - -def select_pdus(cursor): - cursor.execute( - "SELECT pdu_id, origin FROM pdus ORDER BY depth ASC" - ) - - ids = cursor.fetchall() - - pdu_tuples = store._get_pdu_tuples(cursor, ids) - - pdus = [Pdu.from_pdu_tuple(p) for p in pdu_tuples] - - reference_hashes = {} - - for pdu in pdus: - try: - if pdu.prev_pdus: - print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus - for pdu_id, origin, hashes in pdu.prev_pdus: - ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)] - hashes[ref_alg] = encode_base64(ref_hsh) - store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh) - print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus - pdu = add_event_pdu_content_hash(pdu) - ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu) - reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh) - store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh) - - for alg, hsh_base64 in pdu.hashes.items(): - print alg, hsh_base64 - store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64)) - - except: - print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus - -def main(): - conn = sqlite3.connect(sys.argv[1]) - cursor = conn.cursor() - select_pdus(cursor) - conn.commit() - -if __name__=='__main__': - main() diff --git a/scripts/make_identicons.pl b/scripts/make_identicons.pl deleted file mode 100755 index cbff63e29..000000000 --- a/scripts/make_identicons.pl +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env perl - -use strict; -use warnings; - -use DBI; -use DBD::SQLite; -use JSON; -use Getopt::Long; - -my $db; # = "homeserver.db"; -my $server = "http://localhost:8008"; -my $size = 320; - -GetOptions("db|d=s", \$db, - "server|s=s", \$server, - "width|w=i", \$size) or usage(); - -usage() unless $db; - -my $dbh = DBI->connect("dbi:SQLite:dbname=$db","","") || die $DBI::errstr; - -my $res = $dbh->selectall_arrayref("select token, name from access_tokens, users where access_tokens.user_id = users.id group by user_id") || die $DBI::errstr; - -foreach (@$res) { - my ($token, $mxid) = ($_->[0], $_->[1]); - my ($user_id) = ($mxid =~ m/@(.*):/); - my ($url) = $dbh->selectrow_array("select avatar_url from profiles where user_id=?", undef, $user_id); - if (!$url || $url =~ /#auto$/) { - `curl -s -o tmp.png "$server/_matrix/media/v1/identicon?name=${mxid}&width=$size&height=$size"`; - my $json = `curl -s -X POST -H "Content-Type: image/png" -T "tmp.png" $server/_matrix/media/v1/upload?access_token=$token`; - my $content_uri = from_json($json)->{content_uri}; - `curl -X PUT -H "Content-Type: application/json" --data '{ "avatar_url": "${content_uri}#auto"}' $server/_matrix/client/api/v1/profile/${mxid}/avatar_url?access_token=$token`; - } -} - -sub usage { - die "usage: ./make-identicons.pl\n\t-d database [e.g. homeserver.db]\n\t-s homeserver (default: http://localhost:8008)\n\t-w identicon size in pixels (default 320)"; -} \ No newline at end of file diff --git a/scripts/nuke-room-from-db.sh b/scripts/nuke-room-from-db.sh deleted file mode 100755 index 58c036c89..000000000 --- a/scripts/nuke-room-from-db.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash - -## CAUTION: -## This script will remove (hopefully) all trace of the given room ID from -## your homeserver.db - -## Do not run it lightly. - -ROOMID="$1" - -sqlite3 homeserver.db < Date: Wed, 6 May 2015 11:45:18 +0100 Subject: [PATCH 22/25] Re-add scripts/register_new_matrix_user --- scripts/register_new_matrix_user | 153 +++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100755 scripts/register_new_matrix_user diff --git a/scripts/register_new_matrix_user b/scripts/register_new_matrix_user new file mode 100755 index 000000000..0ca83795a --- /dev/null +++ b/scripts/register_new_matrix_user @@ -0,0 +1,153 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import argparse +import getpass +import hashlib +import hmac +import json +import sys +import urllib2 +import yaml + + +def request_registration(user, password, server_location, shared_secret): + mac = hmac.new( + key=shared_secret, + msg=user, + digestmod=hashlib.sha1, + ).hexdigest() + + data = { + "username": user, + "password": password, + "mac": mac, + } + + server_location = server_location.rstrip("/") + + print "Sending registration request..." + + req = urllib2.Request( + "%s/_matrix/client/v2_alpha/register" % (server_location,), + data=json.dumps(data), + headers={'Content-Type': 'application/json'} + ) + try: + if sys.version_info[:3] >= (2, 7, 9): + # As of version 2.7.9, urllib2 now checks SSL certs + import ssl + f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) + else: + f = urllib2.urlopen(req) + f.read() + f.close() + print "Success." + except urllib2.HTTPError as e: + print "ERROR! Received %d %s" % (e.code, e.reason,) + if 400 <= e.code < 500: + if e.info().type == "application/json": + resp = json.load(e) + if "error" in resp: + print resp["error"] + sys.exit(1) + + +def register_new_user(user, password, server_location, shared_secret): + if not user: + try: + default_user = getpass.getuser() + except: + default_user = None + + if default_user: + user = raw_input("New user localpart [%s]: " % (default_user,)) + if not user: + user = default_user + else: + user = raw_input("New user localpart: ") + + if not user: + print "Invalid user name" + sys.exit(1) + + if not password: + password = getpass.getpass("Password: ") + + if not password: + print "Password cannot be blank." + sys.exit(1) + + confirm_password = getpass.getpass("Confirm password: ") + + if password != confirm_password: + print "Passwords do not match" + sys.exit(1) + + request_registration(user, password, server_location, shared_secret) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Used to register new users with a given home server when" + " registration has been disabled. The home server must be" + " configured with the 'registration_shared_secret' option" + " set.", + ) + parser.add_argument( + "-u", "--user", + default=None, + help="Local part of the new user. Will prompt if omitted.", + ) + parser.add_argument( + "-p", "--password", + default=None, + help="New password for user. Will prompt if omitted.", + ) + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-c", "--config", + type=argparse.FileType('r'), + help="Path to server config file. Used to read in shared secret.", + ) + + group.add_argument( + "-k", "--shared-secret", + help="Shared secret as defined in server config file.", + ) + + parser.add_argument( + "server_url", + default="https://localhost:8448", + nargs='?', + help="URL to use to talk to the home server. Defaults to " + " 'https://localhost:8448'.", + ) + + args = parser.parse_args() + + if "config" in args and args.config: + config = yaml.safe_load(args.config) + secret = config.get("registration_shared_secret", None) + if not secret: + print "No 'registration_shared_secret' defined in config." + sys.exit(1) + else: + secret = args.shared_secret + + register_new_user(args.user, args.password, args.server_url, secret) From 673375fe2d0df303e47bcd818df580d59ffc4dfa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 May 2015 11:46:02 +0100 Subject: [PATCH 23/25] Acutally add scripts-dev/ --- scripts-dev/check_auth.py | 65 ++++++++++++++ scripts-dev/check_event_hash.py | 50 +++++++++++ scripts-dev/check_signature.py | 73 ++++++++++++++++ scripts-dev/copyrighter-sql.pl | 33 +++++++ scripts-dev/copyrighter.pl | 33 +++++++ scripts-dev/database-save.sh | 16 ++++ scripts-dev/federation_client.py | 146 +++++++++++++++++++++++++++++++ scripts-dev/hash_history.py | 69 +++++++++++++++ scripts-dev/make_identicons.pl | 39 +++++++++ scripts-dev/nuke-room-from-db.sh | 24 +++++ scripts-dev/sphinx_api_docs.sh | 1 + 11 files changed, 549 insertions(+) create mode 100644 scripts-dev/check_auth.py create mode 100644 scripts-dev/check_event_hash.py create mode 100644 scripts-dev/check_signature.py create mode 100755 scripts-dev/copyrighter-sql.pl create mode 100755 scripts-dev/copyrighter.pl create mode 100755 scripts-dev/database-save.sh create mode 100644 scripts-dev/federation_client.py create mode 100644 scripts-dev/hash_history.py create mode 100755 scripts-dev/make_identicons.pl create mode 100755 scripts-dev/nuke-room-from-db.sh create mode 100644 scripts-dev/sphinx_api_docs.sh diff --git a/scripts-dev/check_auth.py b/scripts-dev/check_auth.py new file mode 100644 index 000000000..b889ac7fa --- /dev/null +++ b/scripts-dev/check_auth.py @@ -0,0 +1,65 @@ +from synapse.events import FrozenEvent +from synapse.api.auth import Auth + +from mock import Mock + +import argparse +import itertools +import json +import sys + + +def check_auth(auth, auth_chain, events): + auth_chain.sort(key=lambda e: e.depth) + + auth_map = { + e.event_id: e + for e in auth_chain + } + + create_events = {} + for e in auth_chain: + if e.type == "m.room.create": + create_events[e.room_id] = e + + for e in itertools.chain(auth_chain, events): + auth_events_list = [auth_map[i] for i, _ in e.auth_events] + + auth_events = { + (e.type, e.state_key): e + for e in auth_events_list + } + + auth_events[("m.room.create", "")] = create_events[e.room_id] + + try: + auth.check(e, auth_events=auth_events) + except Exception as ex: + print "Failed:", e.event_id, e.type, e.state_key + print "Auth_events:", auth_events + print ex + print json.dumps(e.get_dict(), sort_keys=True, indent=4) + # raise + print "Success:", e.event_id, e.type, e.state_key + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument( + 'json', + nargs='?', + type=argparse.FileType('r'), + default=sys.stdin, + ) + + args = parser.parse_args() + + js = json.load(args.json) + + + auth = Auth(Mock()) + check_auth( + auth, + [FrozenEvent(d) for d in js["auth_chain"]], + [FrozenEvent(d) for d in js["pdus"]], + ) diff --git a/scripts-dev/check_event_hash.py b/scripts-dev/check_event_hash.py new file mode 100644 index 000000000..679afbd26 --- /dev/null +++ b/scripts-dev/check_event_hash.py @@ -0,0 +1,50 @@ +from synapse.crypto.event_signing import * +from syutil.base64util import encode_base64 + +import argparse +import hashlib +import sys +import json + + +class dictobj(dict): + def __init__(self, *args, **kargs): + dict.__init__(self, *args, **kargs) + self.__dict__ = self + + def get_dict(self): + return dict(self) + + def get_full_dict(self): + return dict(self) + + def get_pdu_json(self): + return dict(self) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), + default=sys.stdin) + args = parser.parse_args() + logging.basicConfig() + + event_json = dictobj(json.load(args.input_json)) + + algorithms = { + "sha256": hashlib.sha256, + } + + for alg_name in event_json.hashes: + if check_event_content_hash(event_json, algorithms[alg_name]): + print "PASS content hash %s" % (alg_name,) + else: + print "FAIL content hash %s" % (alg_name,) + + for algorithm in algorithms.values(): + name, h_bytes = compute_event_reference_hash(event_json, algorithm) + print "Reference hash %s: %s" % (name, encode_base64(h_bytes)) + +if __name__=="__main__": + main() + diff --git a/scripts-dev/check_signature.py b/scripts-dev/check_signature.py new file mode 100644 index 000000000..59e3d603a --- /dev/null +++ b/scripts-dev/check_signature.py @@ -0,0 +1,73 @@ + +from syutil.crypto.jsonsign import verify_signed_json +from syutil.crypto.signing_key import ( + decode_verify_key_bytes, write_signing_keys +) +from syutil.base64util import decode_base64 + +import urllib2 +import json +import sys +import dns.resolver +import pprint +import argparse +import logging + +def get_targets(server_name): + if ":" in server_name: + target, port = server_name.split(":") + yield (target, int(port)) + return + try: + answers = dns.resolver.query("_matrix._tcp." + server_name, "SRV") + for srv in answers: + yield (srv.target, srv.port) + except dns.resolver.NXDOMAIN: + yield (server_name, 8448) + +def get_server_keys(server_name, target, port): + url = "https://%s:%i/_matrix/key/v1" % (target, port) + keys = json.load(urllib2.urlopen(url)) + verify_keys = {} + for key_id, key_base64 in keys["verify_keys"].items(): + verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64)) + verify_signed_json(keys, server_name, verify_key) + verify_keys[key_id] = verify_key + return verify_keys + +def main(): + + parser = argparse.ArgumentParser() + parser.add_argument("signature_name") + parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), + default=sys.stdin) + + args = parser.parse_args() + logging.basicConfig() + + server_name = args.signature_name + keys = {} + for target, port in get_targets(server_name): + try: + keys = get_server_keys(server_name, target, port) + print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port) + write_signing_keys(sys.stdout, keys.values()) + break + except: + logging.exception("Error talking to %s:%s", target, port) + + json_to_check = json.load(args.input_json) + print "Checking JSON:" + for key_id in json_to_check["signatures"][args.signature_name]: + try: + key = keys[key_id] + verify_signed_json(json_to_check, args.signature_name, key) + print "PASS %s" % (key_id,) + except: + logging.exception("Check for key %s failed" % (key_id,)) + print "FAIL %s" % (key_id,) + + +if __name__ == '__main__': + main() + diff --git a/scripts-dev/copyrighter-sql.pl b/scripts-dev/copyrighter-sql.pl new file mode 100755 index 000000000..890e51e58 --- /dev/null +++ b/scripts-dev/copyrighter-sql.pl @@ -0,0 +1,33 @@ +#!/usr/bin/perl -pi +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +$copyright = <table-save.sql +.dump users +.dump access_tokens +.dump presence +.dump profiles +EOF diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py new file mode 100644 index 000000000..ea62dceb3 --- /dev/null +++ b/scripts-dev/federation_client.py @@ -0,0 +1,146 @@ +import nacl.signing +import json +import base64 +import requests +import sys +import srvlookup + + +def encode_base64(input_bytes): + """Encode bytes as a base64 string without any padding.""" + + input_len = len(input_bytes) + output_len = 4 * ((input_len + 2) // 3) + (input_len + 2) % 3 - 2 + output_bytes = base64.b64encode(input_bytes) + output_string = output_bytes[:output_len].decode("ascii") + return output_string + + +def decode_base64(input_string): + """Decode a base64 string to bytes inferring padding from the length of the + string.""" + + input_bytes = input_string.encode("ascii") + input_len = len(input_bytes) + padding = b"=" * (3 - ((input_len + 3) % 4)) + output_len = 3 * ((input_len + 2) // 4) + (input_len + 2) % 4 - 2 + output_bytes = base64.b64decode(input_bytes + padding) + return output_bytes[:output_len] + + +def encode_canonical_json(value): + return json.dumps( + value, + # Encode code-points outside of ASCII as UTF-8 rather than \u escapes + ensure_ascii=False, + # Remove unecessary white space. + separators=(',',':'), + # Sort the keys of dictionaries. + sort_keys=True, + # Encode the resulting unicode as UTF-8 bytes. + ).encode("UTF-8") + + +def sign_json(json_object, signing_key, signing_name): + signatures = json_object.pop("signatures", {}) + unsigned = json_object.pop("unsigned", None) + + signed = signing_key.sign(encode_canonical_json(json_object)) + signature_base64 = encode_base64(signed.signature) + + key_id = "%s:%s" % (signing_key.alg, signing_key.version) + signatures.setdefault(signing_name, {})[key_id] = signature_base64 + + json_object["signatures"] = signatures + if unsigned is not None: + json_object["unsigned"] = unsigned + + return json_object + + +NACL_ED25519 = "ed25519" + +def decode_signing_key_base64(algorithm, version, key_base64): + """Decode a base64 encoded signing key + Args: + algorithm (str): The algorithm the key is for (currently "ed25519"). + version (str): Identifies this key out of the keys for this entity. + key_base64 (str): Base64 encoded bytes of the key. + Returns: + A SigningKey object. + """ + if algorithm == NACL_ED25519: + key_bytes = decode_base64(key_base64) + key = nacl.signing.SigningKey(key_bytes) + key.version = version + key.alg = NACL_ED25519 + return key + else: + raise ValueError("Unsupported algorithm %s" % (algorithm,)) + + +def read_signing_keys(stream): + """Reads a list of keys from a stream + Args: + stream : A stream to iterate for keys. + Returns: + list of SigningKey objects. + """ + keys = [] + for line in stream: + algorithm, version, key_base64 = line.split() + keys.append(decode_signing_key_base64(algorithm, version, key_base64)) + return keys + + +def lookup(destination, path): + if ":" in destination: + return "https://%s%s" % (destination, path) + else: + try: + srv = srvlookup.lookup("matrix", "tcp", destination)[0] + return "https://%s:%d%s" % (srv.host, srv.port, path) + except: + return "https://%s:%d%s" % (destination, 8448, path) + +def get_json(origin_name, origin_key, destination, path): + request_json = { + "method": "GET", + "uri": path, + "origin": origin_name, + "destination": destination, + } + + signed_json = sign_json(request_json, origin_key, origin_name) + + authorization_headers = [] + + for key, sig in signed_json["signatures"][origin_name].items(): + authorization_headers.append(bytes( + "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( + origin_name, key, sig, + ) + )) + + result = requests.get( + lookup(destination, path), + headers={"Authorization": authorization_headers[0]}, + verify=False, + ) + return result.json() + + +def main(): + origin_name, keyfile, destination, path = sys.argv[1:] + + with open(keyfile) as f: + key = read_signing_keys(f)[0] + + result = get_json( + origin_name, key, destination, "/_matrix/federation/v1/" + path + ) + + json.dump(result, sys.stdout) + +if __name__ == "__main__": + main() diff --git a/scripts-dev/hash_history.py b/scripts-dev/hash_history.py new file mode 100644 index 000000000..bdad530af --- /dev/null +++ b/scripts-dev/hash_history.py @@ -0,0 +1,69 @@ +from synapse.storage.pdu import PduStore +from synapse.storage.signatures import SignatureStore +from synapse.storage._base import SQLBaseStore +from synapse.federation.units import Pdu +from synapse.crypto.event_signing import ( + add_event_pdu_content_hash, compute_pdu_event_reference_hash +) +from synapse.api.events.utils import prune_pdu +from syutil.base64util import encode_base64, decode_base64 +from syutil.jsonutil import encode_canonical_json +import sqlite3 +import sys + +class Store(object): + _get_pdu_tuples = PduStore.__dict__["_get_pdu_tuples"] + _get_pdu_content_hashes_txn = SignatureStore.__dict__["_get_pdu_content_hashes_txn"] + _get_prev_pdu_hashes_txn = SignatureStore.__dict__["_get_prev_pdu_hashes_txn"] + _get_pdu_origin_signatures_txn = SignatureStore.__dict__["_get_pdu_origin_signatures_txn"] + _store_pdu_content_hash_txn = SignatureStore.__dict__["_store_pdu_content_hash_txn"] + _store_pdu_reference_hash_txn = SignatureStore.__dict__["_store_pdu_reference_hash_txn"] + _store_prev_pdu_hash_txn = SignatureStore.__dict__["_store_prev_pdu_hash_txn"] + _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] + + +store = Store() + + +def select_pdus(cursor): + cursor.execute( + "SELECT pdu_id, origin FROM pdus ORDER BY depth ASC" + ) + + ids = cursor.fetchall() + + pdu_tuples = store._get_pdu_tuples(cursor, ids) + + pdus = [Pdu.from_pdu_tuple(p) for p in pdu_tuples] + + reference_hashes = {} + + for pdu in pdus: + try: + if pdu.prev_pdus: + print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus + for pdu_id, origin, hashes in pdu.prev_pdus: + ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)] + hashes[ref_alg] = encode_base64(ref_hsh) + store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh) + print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus + pdu = add_event_pdu_content_hash(pdu) + ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu) + reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh) + store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh) + + for alg, hsh_base64 in pdu.hashes.items(): + print alg, hsh_base64 + store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64)) + + except: + print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus + +def main(): + conn = sqlite3.connect(sys.argv[1]) + cursor = conn.cursor() + select_pdus(cursor) + conn.commit() + +if __name__=='__main__': + main() diff --git a/scripts-dev/make_identicons.pl b/scripts-dev/make_identicons.pl new file mode 100755 index 000000000..cbff63e29 --- /dev/null +++ b/scripts-dev/make_identicons.pl @@ -0,0 +1,39 @@ +#!/usr/bin/env perl + +use strict; +use warnings; + +use DBI; +use DBD::SQLite; +use JSON; +use Getopt::Long; + +my $db; # = "homeserver.db"; +my $server = "http://localhost:8008"; +my $size = 320; + +GetOptions("db|d=s", \$db, + "server|s=s", \$server, + "width|w=i", \$size) or usage(); + +usage() unless $db; + +my $dbh = DBI->connect("dbi:SQLite:dbname=$db","","") || die $DBI::errstr; + +my $res = $dbh->selectall_arrayref("select token, name from access_tokens, users where access_tokens.user_id = users.id group by user_id") || die $DBI::errstr; + +foreach (@$res) { + my ($token, $mxid) = ($_->[0], $_->[1]); + my ($user_id) = ($mxid =~ m/@(.*):/); + my ($url) = $dbh->selectrow_array("select avatar_url from profiles where user_id=?", undef, $user_id); + if (!$url || $url =~ /#auto$/) { + `curl -s -o tmp.png "$server/_matrix/media/v1/identicon?name=${mxid}&width=$size&height=$size"`; + my $json = `curl -s -X POST -H "Content-Type: image/png" -T "tmp.png" $server/_matrix/media/v1/upload?access_token=$token`; + my $content_uri = from_json($json)->{content_uri}; + `curl -X PUT -H "Content-Type: application/json" --data '{ "avatar_url": "${content_uri}#auto"}' $server/_matrix/client/api/v1/profile/${mxid}/avatar_url?access_token=$token`; + } +} + +sub usage { + die "usage: ./make-identicons.pl\n\t-d database [e.g. homeserver.db]\n\t-s homeserver (default: http://localhost:8008)\n\t-w identicon size in pixels (default 320)"; +} \ No newline at end of file diff --git a/scripts-dev/nuke-room-from-db.sh b/scripts-dev/nuke-room-from-db.sh new file mode 100755 index 000000000..58c036c89 --- /dev/null +++ b/scripts-dev/nuke-room-from-db.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +## CAUTION: +## This script will remove (hopefully) all trace of the given room ID from +## your homeserver.db + +## Do not run it lightly. + +ROOMID="$1" + +sqlite3 homeserver.db < Date: Wed, 6 May 2015 12:56:35 +0100 Subject: [PATCH 24/25] Don't read from the config file before checking it exists --- synapse/app/synctl.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 0a2b0d6fc..1f7d543c3 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -27,20 +27,21 @@ CONFIGFILE = "homeserver.yaml" GREEN = "\x1b[1;32m" NORMAL = "\x1b[m" +if not os.path.exists(CONFIGFILE): + sys.stderr.write( + "No config file found\n" + "To generate a config file, run '%s -c %s --generate-config" + " --server-name='\n" % ( + " ".join(SYNAPSE), CONFIGFILE + ) + ) + sys.exit(1) + CONFIG = yaml.load(open(CONFIGFILE)) PIDFILE = CONFIG["pid_file"] def start(): - if not os.path.exists(CONFIGFILE): - sys.stderr.write( - "No config file found\n" - "To generate a config file, run '%s -c %s --generate-config" - " --server-name='\n" % ( - " ".join(SYNAPSE), CONFIGFILE - ) - ) - sys.exit(1) print "Starting ...", args = SYNAPSE args.extend(["--daemonize", "-c", CONFIGFILE]) From 4a7a4a5b6cc6bf6201e996b5a6b1b82a4b874677 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 6 May 2015 17:08:00 +0100 Subject: [PATCH 25/25] Optional profiling using cProfile --- synapse/app/homeserver.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d8d0df7e4..c22726519 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -496,11 +496,31 @@ class SynapseSite(Site): def run(hs): + PROFILE_SYNAPSE = False + if PROFILE_SYNAPSE: + def profile(func): + from cProfile import Profile + from threading import current_thread + + def profiled(*args, **kargs): + profile = Profile() + profile.enable() + func(*args, **kargs) + profile.disable() + ident = current_thread().ident + profile.dump_stats("/tmp/%s.%s.%i.pstat" % ( + hs.hostname, func.__name__, ident + )) + + return profiled + + from twisted.python.threadpool import ThreadPool + ThreadPool._worker = profile(ThreadPool._worker) + reactor.run = profile(reactor.run) def in_thread(): with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) - reactor.run() if hs.config.daemonize: