diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index de4f66197..9f63f0708 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -163,8 +163,8 @@ class LoggingTransaction(object): return self.txn.execute( sql, *args, **kwargs ) - except: - logger.exception("[SQL FAIL] {%s}", self.name) + except Exception as e: + logger.debug("[SQL FAIL] {%s} %s", self.name, e) raise finally: msecs = (time.time() * 1000) - start @@ -209,6 +209,46 @@ class PerformanceCounters(object): return top_n_counters +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + with self._lock: + if not self._next_id: + res = yield self.store._execute_and_decode( + "IdGenerator_%s" % (self.table,), + "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) + ) + + self._next_id = (res and res[0] and res[0]["mx"]) or 1 + + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + self._next_id = val or 2 + + return 1 + + class SQLBaseStore(object): _TXN_ID = 0 @@ -234,8 +274,10 @@ class SQLBaseStore(object): # Pretend the getEventCache is just another named cache caches_by_name["*getEvent*"] = self._get_event_cache - self._next_stream_id_lock = threading.Lock() - self._next_stream_id = int(hs.get_clock().time_msec()) * 1000 + self._stream_id_gen = IdGenerator("events", "stream_ordering", self) + self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) + self._state_groups_id_gen = IdGenerator("state_groups", "id", self) + self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -292,8 +334,8 @@ class SQLBaseStore(object): LoggingTransaction(txn, name, self.database_engine), *args, **kwargs ) - except: - logger.exception("[TXN FAIL] {%s}", name) + except Exception as e: + logger.debug("[TXN FAIL] {%s}", name, e) raise finally: end = time.time() * 1000 diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 69f598967..514feebcb 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -96,12 +96,16 @@ class EventsStore(SQLBaseStore): # Remove the any existing cache entries for the event_id self._get_event_cache.pop(event.event_id) + if stream_ordering is None: + stream_ordering = self._stream_id_gen.get_next_txn(txn) + # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) + self._simple_delete_txn( + txn, + table="current_state_events", + keyvalues={"room_id": event.room_id}, ) for s in current_state: @@ -240,9 +244,6 @@ class EventsStore(SQLBaseStore): "depth": event.depth, } - if stream_ordering is None: - stream_ordering = self.get_next_stream_id() - unrec = { k: v for k, v in event.get_dict().items() diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 0c785ec98..b62b4a341 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -39,10 +39,12 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - yield self._simple_insert( + next_id = yield self._access_tokens_id_gen.get_next() + + self._simple_insert( "access_tokens", { - "id": self.get_next_stream_id(), + "id": next_id, "user_id": user_id, "token": token }, @@ -68,6 +70,8 @@ class RegistrationStore(SQLBaseStore): def _register(self, txn, user_id, token, password_hash): now = int(self.clock.time()) + next_id = self._access_tokens_id_gen.get_next_txn(txn) + try: txn.execute("INSERT INTO users(name, password_hash, creation_ts) " "VALUES (?,?,?)", @@ -82,7 +86,7 @@ class RegistrationStore(SQLBaseStore): txn.execute( "INSERT INTO access_tokens(id, user_id, token)" " VALUES (?,?,?)", - (self.get_next_stream_id(), user_id, token,) + (next_id, user_id, token,) ) @defer.inlineCallbacks diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 65ea9c4d8..3e55cb81b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -93,12 +93,12 @@ class StateStore(SQLBaseStore): state_group = context.state_group if not state_group: - group = _make_group_id(self._clock) - state_group = self._simple_insert_txn( + state_group = _make_group_id(self._clock) + self._simple_insert_txn( txn, table="state_groups", values={ - "id": group, + "id": state_group, "room_id": event.room_id, "event_id": event.event_id, }, diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index e3e484fb2..9594fe1f2 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -123,6 +123,8 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): + next_id = self._transaction_id_gen.get_next_txn(txn) + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -143,7 +145,7 @@ class TransactionStore(SQLBaseStore): txn, table=SentTransactions.table_name, values={ - "id": self.get_next_stream_id(), + "id": next_id, "transaction_id": transaction_id, "destination": destination, "ts": origin_server_ts,