diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 7043fcc50..a565375e6 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -32,76 +32,6 @@ import logging logger = logging.getLogger(__name__) -class PduActions(object): - """ Defines persistence actions that relate to handling PDUs. - """ - - def __init__(self, datastore): - self.store = datastore - - @log_function - def mark_as_processed(self, pdu): - """ Persist the fact that we have fully processed the given `Pdu` - - Returns: - Deferred - """ - return self.store.mark_pdu_as_processed(pdu.pdu_id, pdu.origin) - - @defer.inlineCallbacks - @log_function - def after_transaction(self, transaction_id, destination, origin): - """ Returns all `Pdu`s that we sent to the given remote home server - after a given transaction id. - - Returns: - Deferred: Results in a list of `Pdu`s - """ - results = yield self.store.get_pdus_after_transaction( - transaction_id, - destination - ) - - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @defer.inlineCallbacks - @log_function - def get_all_pdus_from_context(self, context): - results = yield self.store.get_all_pdus_from_context(context) - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @defer.inlineCallbacks - @log_function - def backfill(self, context, pdu_list, limit): - """ For a given list of PDU id and origins return the proceeding - `limit` `Pdu`s in the given `context`. - - Returns: - Deferred: Results in a list of `Pdu`s. - """ - results = yield self.store.get_backfill( - context, pdu_list, limit - ) - - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @log_function - def is_new(self, pdu): - """ When we receive a `Pdu` from a remote home server, we want to - figure out whether it is `new`, i.e. it is not some historic PDU that - we haven't seen simply because we haven't backfilled back that far. - - Returns: - Deferred: Results in a `bool` - """ - return self.store.is_pdu_new( - pdu_id=pdu.pdu_id, - origin=pdu.origin, - context=pdu.context, - depth=pdu.depth - ) - - class TransactionActions(object): """ Defines persistence actions that relate to handling Transactions. """ diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a0bd2e057..159af4eed 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -21,7 +21,7 @@ from twisted.internet import defer from .units import Transaction, Pdu, Edu -from .persistence import PduActions, TransactionActions +from .persistence import TransactionActions from synapse.util.logutils import log_function diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d75c36683..3faa571dd 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -37,7 +37,6 @@ from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore from .stream import StreamStore -from .pdu import StatePduStore, PduStore, PdusTable from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore @@ -60,7 +59,6 @@ logger = logging.getLogger(__name__) SCHEMAS = [ "transactions", - "pdu", "users", "profiles", "presence", @@ -89,7 +87,7 @@ class _RollbackButIsFineException(Exception): class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, - PresenceStore, PduStore, StatePduStore, TransactionStore, + PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, EventFederationStore, ): @@ -150,68 +148,12 @@ class DataStore(RoomMemberStore, RoomStore, def _persist_pdu_event_txn(self, txn, pdu=None, event=None, backfilled=False, stream_ordering=None, is_new_state=True): - if pdu is not None: - self._persist_event_pdu_txn(txn, pdu) if event is not None: return self._persist_event_txn( txn, event, backfilled, stream_ordering, is_new_state=is_new_state, ) - def _persist_event_pdu_txn(self, txn, pdu): - cols = dict(pdu.__dict__) - unrec_keys = dict(pdu.unrecognized_keys) - del cols["hashes"] - del cols["signatures"] - del cols["content"] - del cols["prev_pdus"] - cols["content_json"] = json.dumps(pdu.content) - - unrec_keys.update({ - k: v for k, v in cols.items() - if k not in PdusTable.fields - }) - - cols["unrecognized_keys"] = json.dumps(unrec_keys) - - cols["ts"] = cols.pop("origin_server_ts") - - logger.debug("Persisting: %s", repr(cols)) - - for hash_alg, hash_base64 in pdu.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_pdu_content_hash_txn( - txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, - ) - - signatures = pdu.signatures.get(pdu.origin, {}) - - for key_id, signature_base64 in signatures.items(): - signature_bytes = decode_base64(signature_base64) - self._store_pdu_origin_signature_txn( - txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, - ) - - for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_pdu_hash_txn( - txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg, - hash_bytes - ) - - (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) - self._store_pdu_reference_hash_txn( - txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes - ) - - if pdu.is_state: - self._persist_state_txn(txn, pdu.prev_pdus, cols) - else: - self._persist_pdu_txn(txn, pdu.prev_pdus, cols) - - self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth) - @log_function def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, is_new_state=True): diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py deleted file mode 100644 index 4a4341907..000000000 --- a/synapse/storage/pdu.py +++ /dev/null @@ -1,949 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore, Table, JoinHelper - -from synapse.federation.units import Pdu -from synapse.util.logutils import log_function - -from syutil.base64util import encode_base64 - -from collections import namedtuple - -import logging - - -logger = logging.getLogger(__name__) - - -class PduStore(SQLBaseStore): - """A collection of queries for handling PDUs. - """ - - def get_pdu(self, pdu_id, origin): - """Given a pdu_id and origin, get a PDU. - - Args: - txn - pdu_id (str) - origin (str) - - Returns: - PduTuple: If the pdu does not exist in the database, returns None - """ - - return self.runInteraction( - "get_pdu", self._get_pdu_tuple, pdu_id, origin - ) - - def _get_pdu_tuple(self, txn, pdu_id, origin): - res = self._get_pdu_tuples(txn, [(pdu_id, origin)]) - return res[0] if res else None - - def _get_pdu_tuples(self, txn, pdu_id_tuples): - results = [] - for pdu_id, origin in pdu_id_tuples: - txn.execute( - PduEdgesTable.select_statement("pdu_id = ? AND origin = ?"), - (pdu_id, origin) - ) - - edges = [ - (r.prev_pdu_id, r.prev_origin) - for r in PduEdgesTable.decode_results(txn.fetchall()) - ] - - edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) - - hashes = self._get_pdu_content_hashes_txn(txn, pdu_id, origin) - signatures = self._get_pdu_origin_signatures_txn( - txn, pdu_id, origin - ) - - query = ( - "SELECT %(fields)s FROM %(pdus)s as p " - "LEFT JOIN %(state)s as s " - "ON p.pdu_id = s.pdu_id AND p.origin = s.origin " - "WHERE p.pdu_id = ? AND p.origin = ? " - ) % { - "fields": _pdu_state_joiner.get_fields( - PdusTable="p", StatePdusTable="s"), - "pdus": PdusTable.table_name, - "state": StatePdusTable.table_name, - } - - txn.execute(query, (pdu_id, origin)) - - row = txn.fetchone() - if row: - results.append(PduTuple( - PduEntry(*row), edges, hashes, signatures, edge_hashes - )) - - return results - - def get_current_state_for_context(self, context): - """Get a list of PDUs that represent the current state for a given - context - - Args: - context (str) - - Returns: - list: A list of PduTuples - """ - - return self.runInteraction( - "get_current_state_for_context", - self._get_current_state_for_context, - context - ) - - def _get_current_state_for_context(self, txn, context): - query = ( - "SELECT pdu_id, origin FROM %s WHERE context = ?" - % CurrentStateTable.table_name - ) - - logger.debug("get_current_state %s, Args=%s", query, context) - txn.execute(query, (context,)) - - res = txn.fetchall() - - logger.debug("get_current_state %d results", len(res)) - - return self._get_pdu_tuples(txn, res) - - def _persist_pdu_txn(self, txn, prev_pdus, cols): - """Inserts a (non-state) PDU into the database. - - Args: - txn, - prev_pdus (list) - **cols: The columns to insert into the PdusTable. - """ - entry = PdusTable.EntryType( - **{k: cols.get(k, None) for k in PdusTable.fields} - ) - - txn.execute(PdusTable.insert_statement(), entry) - - self._handle_prev_pdus( - txn, entry.outlier, entry.pdu_id, entry.origin, - prev_pdus, entry.context - ) - - def mark_pdu_as_processed(self, pdu_id, pdu_origin): - """Mark a received PDU as processed. - - Args: - txn - pdu_id (str) - pdu_origin (str) - """ - - return self.runInteraction( - "mark_pdu_as_processed", - self._mark_as_processed, pdu_id, pdu_origin - ) - - def _mark_as_processed(self, txn, pdu_id, pdu_origin): - txn.execute("UPDATE %s SET have_processed = 1" % PdusTable.table_name) - - def get_all_pdus_from_context(self, context): - """Get a list of all PDUs for a given context.""" - return self.runInteraction( - "get_all_pdus_from_context", - self._get_all_pdus_from_context, context, - ) - - def _get_all_pdus_from_context(self, txn, context): - query = ( - "SELECT pdu_id, origin FROM %s " - "WHERE context = ?" - ) % PdusTable.table_name - - txn.execute(query, (context,)) - - return self._get_pdu_tuples(txn, txn.fetchall()) - - def get_backfill(self, context, pdu_list, limit): - """Get a list of Pdus for a given topic that occured before (and - including) the pdus in pdu_list. Return a list of max size `limit`. - - Args: - txn - context (str) - pdu_list (list) - limit (int) - - Return: - list: A list of PduTuples - """ - return self.runInteraction( - "get_backfill", - self._get_backfill, context, pdu_list, limit - ) - - def _get_backfill(self, txn, context, pdu_list, limit): - logger.debug( - "backfill: %s, %s, %s", - context, repr(pdu_list), limit - ) - - # We seed the pdu_results with the things from the pdu_list. - pdu_results = pdu_list - - front = pdu_list - - query = ( - "SELECT prev_pdu_id, prev_origin FROM %(edges_table)s " - "WHERE context = ? AND pdu_id = ? AND origin = ? " - "LIMIT ?" - ) % { - "edges_table": PduEdgesTable.table_name, - } - - # We iterate through all pdu_ids in `front` to select their previous - # pdus. These are dumped in `new_front`. We continue until we reach the - # limit *or* new_front is empty (i.e., we've run out of things to - # select - while front and len(pdu_results) < limit: - - new_front = [] - for pdu_id, origin in front: - logger.debug( - "_backfill_interaction: i=%s, o=%s", - pdu_id, origin - ) - - txn.execute( - query, - (context, pdu_id, origin, limit - len(pdu_results)) - ) - - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got i=%s, o=%s", - *row - ) - new_front.append(row) - - front = new_front - pdu_results += new_front - - # We also want to update the `prev_pdus` attributes before returning. - return self._get_pdu_tuples(txn, pdu_results) - - def get_min_depth_for_context(self, context): - """Get the current minimum depth for a context - - Args: - txn - context (str) - """ - return self.runInteraction( - "get_min_depth_for_context", - self._get_min_depth_for_context, context - ) - - def _get_min_depth_for_context(self, txn, context): - return self._get_min_depth_interaction(txn, context) - - def _get_min_depth_interaction(self, txn, context): - txn.execute( - "SELECT min_depth FROM %s WHERE context = ?" - % ContextDepthTable.table_name, - (context,) - ) - - row = txn.fetchone() - - return row[0] if row else None - - def _update_min_depth_for_context_txn(self, txn, context, depth): - """Update the minimum `depth` of the given context, which is the line - on which we stop backfilling backwards. - - Args: - context (str) - depth (int) - """ - min_depth = self._get_min_depth_interaction(txn, context) - - do_insert = depth < min_depth if min_depth else True - - if do_insert: - txn.execute( - "INSERT OR REPLACE INTO %s (context, min_depth) " - "VALUES (?,?)" % ContextDepthTable.table_name, - (context, depth) - ) - - def get_latest_pdus_in_context(self, context): - return self.runInteraction( - "get_latest_pdus_in_context", - self._get_latest_pdus_in_context, - context - ) - - def _get_latest_pdus_in_context(self, txn, context): - """Get's a list of the most current pdus for a given context. This is - used when we are sending a Pdu and need to fill out the `prev_pdus` - key - - Args: - txn - context - """ - query = ( - "SELECT p.pdu_id, p.origin, p.depth FROM %(pdus)s as p " - "INNER JOIN %(forward)s as f ON p.pdu_id = f.pdu_id " - "AND f.origin = p.origin " - "WHERE f.context = ?" - ) % { - "pdus": PdusTable.table_name, - "forward": PduForwardExtremitiesTable.table_name, - } - - logger.debug("get_prev query: %s", query) - - txn.execute( - query, - (context, ) - ) - - results = [] - for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) - sha256_bytes = hashes["sha256"] - prev_hashes = {"sha256": encode_base64(sha256_bytes)} - results.append((pdu_id, origin, prev_hashes, depth)) - - return results - - @defer.inlineCallbacks - def get_oldest_pdus_in_context(self, context): - """Get a list of Pdus that we haven't backfilled beyond yet (and havent - seen). This list is used when we want to backfill backwards and is the - list we send to the remote server. - - Args: - txn - context (str) - - Returns: - list: A list of PduIdTuple. - """ - results = yield self._execute( - None, - "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" - % {"back": PduBackwardExtremitiesTable.table_name, }, - context - ) - - defer.returnValue([PduIdTuple(i, o) for i, o in results]) - - def is_pdu_new(self, pdu_id, origin, context, depth): - """For a given Pdu, try and figure out if it's 'new', i.e., if it's - not something we got randomly from the past, for example when we - request the current state of the room that will probably return a bunch - of pdus from before we joined. - - Args: - txn - pdu_id (str) - origin (str) - context (str) - depth (int) - - Returns: - bool - """ - - return self.runInteraction( - "is_pdu_new", - self._is_pdu_new, - pdu_id=pdu_id, - origin=origin, - context=context, - depth=depth - ) - - def _is_pdu_new(self, txn, pdu_id, origin, context, depth): - # If depth > min depth in back table, then we classify it as new. - # OR if there is nothing in the back table, then it kinda needs to - # be a new thing. - query = ( - "SELECT min(p.depth) FROM %(edges)s as e " - "INNER JOIN %(back)s as b " - "ON e.prev_pdu_id = b.pdu_id AND e.prev_origin = b.origin " - "INNER JOIN %(pdus)s as p " - "ON e.pdu_id = p.pdu_id AND p.origin = e.origin " - "WHERE p.context = ?" - ) % { - "pdus": PdusTable.table_name, - "edges": PduEdgesTable.table_name, - "back": PduBackwardExtremitiesTable.table_name, - } - - txn.execute(query, (context,)) - - min_depth, = txn.fetchone() - - if not min_depth or depth > int(min_depth): - logger.debug( - "is_new true: id=%s, o=%s, d=%s min_depth=%s", - pdu_id, origin, depth, min_depth - ) - return True - - # If this pdu is in the forwards table, then it also is a new one - query = ( - "SELECT * FROM %(forward)s WHERE pdu_id = ? AND origin = ?" - ) % { - "forward": PduForwardExtremitiesTable.table_name, - } - - txn.execute(query, (pdu_id, origin)) - - # Did we get anything? - if txn.fetchall(): - logger.debug( - "is_new true: id=%s, o=%s, d=%s was forward", - pdu_id, origin, depth - ) - return True - - logger.debug( - "is_new false: id=%s, o=%s, d=%s", - pdu_id, origin, depth - ) - - # FINE THEN. It's probably old. - return False - - @staticmethod - @log_function - def _handle_prev_pdus(txn, outlier, pdu_id, origin, prev_pdus, - context): - txn.executemany( - PduEdgesTable.insert_statement(), - [(pdu_id, origin, p[0], p[1], context) for p in prev_pdus] - ) - - # Update the extremities table if this is not an outlier. - if not outlier: - - # First, we delete the new one from the forwards extremities table. - query = ( - "DELETE FROM %s WHERE pdu_id = ? AND origin = ?" - % PduForwardExtremitiesTable.table_name - ) - txn.executemany(query, list(p[:2] for p in prev_pdus)) - - # We only insert as a forward extremety the new pdu if there are no - # other pdus that reference it as a prev pdu - query = ( - "INSERT INTO %(table)s (pdu_id, origin, context) " - "SELECT ?, ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(pdu_edges)s WHERE " - "prev_pdu_id = ? AND prev_origin = ?" - ")" - ) % { - "table": PduForwardExtremitiesTable.table_name, - "pdu_edges": PduEdgesTable.table_name - } - - logger.debug("query: %s", query) - - txn.execute(query, (pdu_id, origin, context, pdu_id, origin)) - - # Insert all the prev_pdus as a backwards thing, they'll get - # deleted in a second if they're incorrect anyway. - txn.executemany( - PduBackwardExtremitiesTable.insert_statement(), - [(i, o, context) for i, o, _ in prev_pdus] - ) - - # Also delete from the backwards extremities table all ones that - # reference pdus that we have already seen - query = ( - "DELETE FROM %(pdu_back)s WHERE EXISTS (" - "SELECT 1 FROM %(pdus)s AS pdus " - "WHERE " - "%(pdu_back)s.pdu_id = pdus.pdu_id " - "AND %(pdu_back)s.origin = pdus.origin " - "AND not pdus.outlier " - ")" - ) % { - "pdu_back": PduBackwardExtremitiesTable.table_name, - "pdus": PdusTable.table_name, - } - txn.execute(query) - - -class StatePduStore(SQLBaseStore): - """A collection of queries for handling state PDUs. - """ - - def _persist_state_txn(self, txn, prev_pdus, cols): - """Inserts a state PDU into the database - - Args: - txn, - prev_pdus (list) - **cols: The columns to insert into the PdusTable and StatePdusTable - """ - pdu_entry = PdusTable.EntryType( - **{k: cols.get(k, None) for k in PdusTable.fields} - ) - state_entry = StatePdusTable.EntryType( - **{k: cols.get(k, None) for k in StatePdusTable.fields} - ) - - logger.debug("Inserting pdu: %s", repr(pdu_entry)) - logger.debug("Inserting state: %s", repr(state_entry)) - - txn.execute(PdusTable.insert_statement(), pdu_entry) - txn.execute(StatePdusTable.insert_statement(), state_entry) - - self._handle_prev_pdus( - txn, - pdu_entry.outlier, pdu_entry.pdu_id, pdu_entry.origin, prev_pdus, - pdu_entry.context - ) - - def get_unresolved_state_tree(self, new_state_pdu): - return self.runInteraction( - "get_unresolved_state_tree", - self._get_unresolved_state_tree, new_state_pdu - ) - - @log_function - def _get_unresolved_state_tree(self, txn, new_pdu): - current = self._get_current_interaction( - txn, - new_pdu.context, new_pdu.pdu_type, new_pdu.state_key - ) - - ReturnType = namedtuple( - "StateReturnType", ["new_branch", "current_branch"] - ) - return_value = ReturnType([new_pdu], []) - - if not current: - logger.debug("get_unresolved_state_tree No current state.") - return (return_value, None) - - return_value.current_branch.append(current) - - enum_branches = self._enumerate_state_branches( - txn, new_pdu, current - ) - - missing_branch = None - for branch, prev_state, state in enum_branches: - if state: - return_value[branch].append(state) - else: - # We don't have prev_state :( - missing_branch = branch - break - - return (return_value, missing_branch) - - def update_current_state(self, pdu_id, origin, context, pdu_type, - state_key): - return self.runInteraction( - "update_current_state", - self._update_current_state, - pdu_id, origin, context, pdu_type, state_key - ) - - def _update_current_state(self, txn, pdu_id, origin, context, pdu_type, - state_key): - query = ( - "INSERT OR REPLACE INTO %(curr)s (%(fields)s) VALUES (%(qs)s)" - ) % { - "curr": CurrentStateTable.table_name, - "fields": CurrentStateTable.get_fields_string(), - "qs": ", ".join(["?"] * len(CurrentStateTable.fields)) - } - - query_args = CurrentStateTable.EntryType( - pdu_id=pdu_id, - origin=origin, - context=context, - pdu_type=pdu_type, - state_key=state_key - ) - - txn.execute(query, query_args) - - def get_current_state_pdu(self, context, pdu_type, state_key): - """For a given context, pdu_type, state_key 3-tuple, return what is - currently considered the current state. - - Args: - txn - context (str) - pdu_type (str) - state_key (str) - - Returns: - PduEntry - """ - - return self.runInteraction( - "get_current_state_pdu", - self._get_current_state_pdu, context, pdu_type, state_key - ) - - def _get_current_state_pdu(self, txn, context, pdu_type, state_key): - return self._get_current_interaction(txn, context, pdu_type, state_key) - - def _get_current_interaction(self, txn, context, pdu_type, state_key): - logger.debug( - "_get_current_interaction %s %s %s", - context, pdu_type, state_key - ) - - fields = _pdu_state_joiner.get_fields( - PdusTable="p", StatePdusTable="s") - - current_query = ( - "SELECT %(fields)s FROM %(state)s as s " - "INNER JOIN %(pdus)s as p " - "ON s.pdu_id = p.pdu_id AND s.origin = p.origin " - "INNER JOIN %(curr)s as c " - "ON s.pdu_id = c.pdu_id AND s.origin = c.origin " - "WHERE s.context = ? AND s.pdu_type = ? AND s.state_key = ? " - ) % { - "fields": fields, - "curr": CurrentStateTable.table_name, - "state": StatePdusTable.table_name, - "pdus": PdusTable.table_name, - } - - txn.execute( - current_query, - (context, pdu_type, state_key) - ) - - row = txn.fetchone() - - result = PduEntry(*row) if row else None - - if not result: - logger.debug("_get_current_interaction not found") - else: - logger.debug( - "_get_current_interaction found %s %s", - result.pdu_id, result.origin - ) - - return result - - def handle_new_state(self, new_pdu): - """Actually perform conflict resolution on the new_pdu on the - assumption we have all the pdus required to perform it. - - Args: - new_pdu - - Returns: - bool: True if the new_pdu clobbered the current state, False if not - """ - return self.runInteraction( - "handle_new_state", - self._handle_new_state, new_pdu - ) - - def _handle_new_state(self, txn, new_pdu): - logger.debug( - "handle_new_state %s %s", - new_pdu.pdu_id, new_pdu.origin - ) - - current = self._get_current_interaction( - txn, - new_pdu.context, new_pdu.pdu_type, new_pdu.state_key - ) - - is_current = False - - if (not current or not current.prev_state_id - or not current.prev_state_origin): - # Oh, we don't have any state for this yet. - is_current = True - elif (current.pdu_id == new_pdu.prev_state_id - and current.origin == new_pdu.prev_state_origin): - # Oh! A direct clobber. Just do it. - is_current = True - else: - ## - # Ok, now loop through until we get to a common ancestor. - max_new = int(new_pdu.power_level) - max_current = int(current.power_level) - - enum_branches = self._enumerate_state_branches( - txn, new_pdu, current - ) - for branch, prev_state, state in enum_branches: - if not state: - raise RuntimeError( - "Could not find state_pdu %s %s" % - ( - prev_state.prev_state_id, - prev_state.prev_state_origin - ) - ) - - if branch == 0: - max_new = max(int(state.depth), max_new) - else: - max_current = max(int(state.depth), max_current) - - is_current = max_new > max_current - - if is_current: - logger.debug("handle_new_state make current") - - # Right, this is a new thing, so woo, just insert it. - txn.execute( - "INSERT OR REPLACE INTO %(curr)s (%(fields)s) VALUES (%(qs)s)" - % { - "curr": CurrentStateTable.table_name, - "fields": CurrentStateTable.get_fields_string(), - "qs": ", ".join(["?"] * len(CurrentStateTable.fields)) - }, - CurrentStateTable.EntryType( - *(new_pdu.__dict__[k] for k in CurrentStateTable.fields) - ) - ) - else: - logger.debug("handle_new_state not current") - - logger.debug("handle_new_state done") - - return is_current - - @log_function - def _enumerate_state_branches(self, txn, pdu_a, pdu_b): - branch_a = pdu_a - branch_b = pdu_b - - while True: - if (branch_a.pdu_id == branch_b.pdu_id - and branch_a.origin == branch_b.origin): - # Woo! We found a common ancestor - logger.debug("_enumerate_state_branches Found common ancestor") - break - - do_branch_a = ( - hasattr(branch_a, "prev_state_id") and - branch_a.prev_state_id - ) - - do_branch_b = ( - hasattr(branch_b, "prev_state_id") and - branch_b.prev_state_id - ) - - logger.debug( - "do_branch_a=%s, do_branch_b=%s", - do_branch_a, do_branch_b - ) - - if do_branch_a and do_branch_b: - do_branch_a = int(branch_a.depth) > int(branch_b.depth) - - if do_branch_a: - pdu_tuple = PduIdTuple( - branch_a.prev_state_id, - branch_a.prev_state_origin - ) - - prev_branch = branch_a - - logger.debug("getting branch_a prev %s", pdu_tuple) - branch_a = self._get_pdu_tuple(txn, *pdu_tuple) - if branch_a: - branch_a = Pdu.from_pdu_tuple(branch_a) - - logger.debug("branch_a=%s", branch_a) - - yield (0, prev_branch, branch_a) - - if not branch_a: - break - elif do_branch_b: - pdu_tuple = PduIdTuple( - branch_b.prev_state_id, - branch_b.prev_state_origin - ) - - prev_branch = branch_b - - logger.debug("getting branch_b prev %s", pdu_tuple) - branch_b = self._get_pdu_tuple(txn, *pdu_tuple) - if branch_b: - branch_b = Pdu.from_pdu_tuple(branch_b) - - logger.debug("branch_b=%s", branch_b) - - yield (1, prev_branch, branch_b) - - if not branch_b: - break - else: - break - - -class PdusTable(Table): - table_name = "pdus" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "ts", - "depth", - "is_state", - "content_json", - "unrecognized_keys", - "outlier", - "have_processed", - ] - - EntryType = namedtuple("PdusEntry", fields) - - -class PduDestinationsTable(Table): - table_name = "pdu_destinations" - - fields = [ - "pdu_id", - "origin", - "destination", - "delivered_ts", - ] - - EntryType = namedtuple("PduDestinationsEntry", fields) - - -class PduEdgesTable(Table): - table_name = "pdu_edges" - - fields = [ - "pdu_id", - "origin", - "prev_pdu_id", - "prev_origin", - "context" - ] - - EntryType = namedtuple("PduEdgesEntry", fields) - - -class PduForwardExtremitiesTable(Table): - table_name = "pdu_forward_extremities" - - fields = [ - "pdu_id", - "origin", - "context", - ] - - EntryType = namedtuple("PduForwardExtremitiesEntry", fields) - - -class PduBackwardExtremitiesTable(Table): - table_name = "pdu_backward_extremities" - - fields = [ - "pdu_id", - "origin", - "context", - ] - - EntryType = namedtuple("PduBackwardExtremitiesEntry", fields) - - -class ContextDepthTable(Table): - table_name = "context_depth" - - fields = [ - "context", - "min_depth", - ] - - EntryType = namedtuple("ContextDepthEntry", fields) - - -class StatePdusTable(Table): - table_name = "state_pdus" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "state_key", - "power_level", - "prev_state_id", - "prev_state_origin", - ] - - EntryType = namedtuple("StatePdusEntry", fields) - - -class CurrentStateTable(Table): - table_name = "current_state" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "state_key", - ] - - EntryType = namedtuple("CurrentStateEntry", fields) - -_pdu_state_joiner = JoinHelper(PdusTable, StatePdusTable) - - -# TODO: These should probably be put somewhere more sensible -PduIdTuple = namedtuple("PduIdTuple", ("pdu_id", "origin")) - -PduEntry = _pdu_state_joiner.EntryType -""" We are always interested in the join of the PdusTable and StatePdusTable, -rather than just the PdusTable. - -This does not include a prev_pdus key. -""" - -PduTuple = namedtuple( - "PduTuple", - ("pdu_entry", "prev_pdu_list", "hashes", "signatures", "edge_hashes") -) -""" This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent -the `prev_pdus` key of a PDU. -""" diff --git a/synapse/storage/schema/pdu.sql b/synapse/storage/schema/pdu.sql deleted file mode 100644 index 16e111a56..000000000 --- a/synapse/storage/schema/pdu.sql +++ /dev/null @@ -1,106 +0,0 @@ -/* Copyright 2014 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ --- Stores pdus and their content -CREATE TABLE IF NOT EXISTS pdus( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - ts INTEGER, - depth INTEGER DEFAULT 0 NOT NULL, - is_state BOOL, - content_json TEXT, - unrecognized_keys TEXT, - outlier BOOL NOT NULL, - have_processed BOOL, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) -); - --- Stores what the current state pdu is for a given (context, pdu_type, key) tuple -CREATE TABLE IF NOT EXISTS state_pdus( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - state_key TEXT, - power_level TEXT, - prev_state_id TEXT, - prev_state_origin TEXT, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) - CONSTRAINT prev_pdu_id_origin UNIQUE (prev_state_id, prev_state_origin) -); - -CREATE TABLE IF NOT EXISTS current_state( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - state_key TEXT, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) - CONSTRAINT uniqueness UNIQUE (context, pdu_type, state_key) ON CONFLICT REPLACE -); - --- Stores where each pdu we want to send should be sent and the delivery status. -create TABLE IF NOT EXISTS pdu_destinations( - pdu_id TEXT, - origin TEXT, - destination TEXT, - delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, destination) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_forward_extremities( - pdu_id TEXT, - origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, context) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_backward_extremities( - pdu_id TEXT, - origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, context) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_edges( - pdu_id TEXT, - origin TEXT, - prev_pdu_id TEXT, - prev_origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, prev_pdu_id, prev_origin, context) -); - -CREATE TABLE IF NOT EXISTS context_depth( - context TEXT, - min_depth INTEGER, - CONSTRAINT uniqueness UNIQUE (context) -); - -CREATE INDEX IF NOT EXISTS context_depth_context ON context_depth(context); - - -CREATE INDEX IF NOT EXISTS pdu_id ON pdus(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS dests_id ON pdu_destinations (pdu_id, origin); --- CREATE INDEX IF NOT EXISTS dests ON pdu_destinations (destination); - -CREATE INDEX IF NOT EXISTS pdu_extrem_context ON pdu_forward_extremities(context); -CREATE INDEX IF NOT EXISTS pdu_extrem_id ON pdu_forward_extremities(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS pdu_edges_id ON pdu_edges(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS pdu_b_extrem_context ON pdu_backward_extremities(context); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6624348fd..ea6790078 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,7 +14,6 @@ # limitations under the License. from ._base import SQLBaseStore, Table -from .pdu import PdusTable from collections import namedtuple @@ -207,50 +206,6 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) - def get_pdus_after_transaction(self, transaction_id, destination): - """For a given local transaction_id that we sent to a given destination - home server, return a list of PDUs that were sent to that destination - after it. - - Args: - txn - transaction_id (str) - destination (str) - - Returns - list: A list of PduTuple - """ - return self.runInteraction( - "get_pdus_after_transaction", - self._get_pdus_after_transaction, - transaction_id, destination - ) - - def _get_pdus_after_transaction(self, txn, transaction_id, destination): - - # Query that first get's all transaction_ids with an id greater than - # the one given from the `sent_transactions` table. Then JOIN on this - # from the `tx->pdu` table to get a list of (pdu_id, origin) that - # specify the pdus that were sent in those transactions. - query = ( - "SELECT pdu_id, pdu_origin FROM %(tx_pdu)s as tp " - "INNER JOIN %(sent_tx)s as st " - "ON tp.transaction_id = st.transaction_id " - "AND tp.destination = st.destination " - "WHERE st.id > (" - "SELECT id FROM %(sent_tx)s " - "WHERE transaction_id = ? AND destination = ?" - ) % { - "tx_pdu": TransactionsToPduTable.table_name, - "sent_tx": SentTransactions.table_name, - } - - txn.execute(query, (transaction_id, destination)) - - pdus = PdusTable.decode_results(txn.fetchall()) - - return self._get_pdu_tuples(txn, pdus) - class ReceivedTransactionsTable(Table): table_name = "received_transactions"