# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from six import iteritems

from canonicaljson import json

from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util import batch_iter
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList

logger = logging.getLogger(__name__)

DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
    "drop_device_list_streams_non_unique_indexes"
)


class DeviceWorkerStore(SQLBaseStore):
    def get_device(self, user_id, device_id):
        """Retrieve a device.

        Args:
            user_id (str): The ID of the user which owns the device
            device_id (str): The ID of the device to retrieve
        Returns:
            defer.Deferred for a dict containing the device information
        Raises:
            StoreError: if the device is not found
        """
        return self._simple_select_one(
            table="devices",
            keyvalues={"user_id": user_id, "device_id": device_id},
            retcols=("user_id", "device_id", "display_name"),
            desc="get_device",
        )

    @defer.inlineCallbacks
    def get_devices_by_user(self, user_id):
        """Retrieve all of a user's registered devices.

        Args:
            user_id (str):
        Returns:
            defer.Deferred: resolves to a dict from device_id to a dict
            containing "device_id", "user_id" and "display_name" for each
            device.
        """
        devices = yield self._simple_select_list(
            table="devices",
            keyvalues={"user_id": user_id},
            retcols=("user_id", "device_id", "display_name"),
            desc="get_devices_by_user",
        )

        return {d["device_id"]: d for d in devices}

    @defer.inlineCallbacks
    def get_devices_by_remote(self, destination, from_stream_id, limit):
        """Get stream of updates to send to remote servers

        Returns:
            Deferred[tuple[int, list[dict]]]:
                current stream id (ie, the stream id of the last update included in the
                response), and the list of updates
        """
        now_stream_id = self._device_list_id_gen.get_current_token()

        has_changed = self._device_list_federation_stream_cache.has_entity_changed(
            destination, int(from_stream_id)
        )
        if not has_changed:
            return (now_stream_id, [])

        # We retrieve n+1 devices from the list of outbound pokes where n is
        # our outbound device update limit. We then check if the very last
        # device has the same stream_id as the second-to-last device. If so,
        # then we ignore all devices with that stream_id and only send the
        # devices with a lower stream_id.
        #
        # If when culling the list we end up with no devices afterwards, we
        # consider the device update to be too large, and simply skip the
        # stream_id; the rationale being that such a large device list update
        # is likely an error.
        updates = yield self.runInteraction(
            "get_devices_by_remote",
            self._get_devices_by_remote_txn,
            destination,
            from_stream_id,
            now_stream_id,
            limit + 1,
        )

        # Return an empty list if there are no updates
        if not updates:
            return (now_stream_id, [])

        # if we have exceeded the limit, we need to exclude any results with the
        # same stream_id as the last row.
        if len(updates) > limit:
            stream_id_cutoff = updates[-1][2]
            now_stream_id = stream_id_cutoff - 1
        else:
            stream_id_cutoff = None

        # Perform the equivalent of a GROUP BY
        #
        # Iterate through the updates list and copy non-duplicate
        # (user_id, device_id) entries into a map, with the value being
        # the max stream_id across each set of duplicate entries
        #
        # maps (user_id, device_id) -> stream_id
        # as long as their stream_id does not match that of the last row
        query_map = {}
        for update in updates:
            if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
                # Stop processing updates
                break

            key = (update[0], update[1])
            query_map[key] = max(query_map.get(key, 0), update[2])

        # If we didn't find any updates with a stream_id lower than the cutoff, it
        # means that there are more than limit updates all of which have the same
        # steam_id.

        # That should only happen if a client is spamming the server with new
        # devices, in which case E2E isn't going to work well anyway. We'll just
        # skip that stream_id and return an empty list, and continue with the next
        # stream_id next time.
        if not query_map:
            return (stream_id_cutoff, [])

        results = yield self._get_device_update_edus_by_remote(
            destination, from_stream_id, query_map
        )

        return (now_stream_id, results)

    def _get_devices_by_remote_txn(
        self, txn, destination, from_stream_id, now_stream_id, limit
    ):
        """Return device update information for a given remote destination

        Args:
            txn (LoggingTransaction): The transaction to execute
            destination (str): The host the device updates are intended for
            from_stream_id (int): The minimum stream_id to filter updates by, exclusive
            now_stream_id (int): The maximum stream_id to filter updates by, inclusive
            limit (int): Maximum number of device updates to return

        Returns:
            List: List of device updates
        """
        sql = """
            SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
            WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
            ORDER BY stream_id
            LIMIT ?
        """
        txn.execute(sql, (destination, from_stream_id, now_stream_id, False, limit))

        return list(txn)

    @defer.inlineCallbacks
    def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map):
        """Returns a list of device update EDUs as well as E2EE keys

        Args:
            destination (str): The host the device updates are intended for
            from_stream_id (int): The minimum stream_id to filter updates by, exclusive
            query_map (Dict[(str, str): int]): Dictionary mapping
                user_id/device_id to update stream_id

        Returns:
            List[Dict]: List of objects representing an device update EDU

        """
        devices = yield self.runInteraction(
            "_get_e2e_device_keys_txn",
            self._get_e2e_device_keys_txn,
            query_map.keys(),
            include_all_devices=True,
            include_deleted_devices=True,
        )

        results = []
        for user_id, user_devices in iteritems(devices):
            # The prev_id for the first row is always the last row before
            # `from_stream_id`
            prev_id = yield self._get_last_device_update_for_remote_user(
                destination, user_id, from_stream_id
            )
            for device_id, device in iteritems(user_devices):
                stream_id = query_map[(user_id, device_id)]
                result = {
                    "user_id": user_id,
                    "device_id": device_id,
                    "prev_id": [prev_id] if prev_id else [],
                    "stream_id": stream_id,
                }

                prev_id = stream_id

                if device is not None:
                    key_json = device.get("key_json", None)
                    if key_json:
                        result["keys"] = db_to_json(key_json)
                    device_display_name = device.get("device_display_name", None)
                    if device_display_name:
                        result["device_display_name"] = device_display_name
                else:
                    result["deleted"] = True

                results.append(result)

        return results

    def _get_last_device_update_for_remote_user(
        self, destination, user_id, from_stream_id
    ):
        def f(txn):
            prev_sent_id_sql = """
                SELECT coalesce(max(stream_id), 0) as stream_id
                FROM device_lists_outbound_last_success
                WHERE destination = ? AND user_id = ? AND stream_id <= ?
            """
            txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
            rows = txn.fetchall()
            return rows[0][0]

        return self.runInteraction("get_last_device_update_for_remote_user", f)

    def mark_as_sent_devices_by_remote(self, destination, stream_id):
        """Mark that updates have successfully been sent to the destination.
        """
        return self.runInteraction(
            "mark_as_sent_devices_by_remote",
            self._mark_as_sent_devices_by_remote_txn,
            destination,
            stream_id,
        )

    def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
        # We update the device_lists_outbound_last_success with the successfully
        # poked users. We do the join to see which users need to be inserted and
        # which updated.
        sql = """
            SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL)
            FROM device_lists_outbound_pokes as o
            LEFT JOIN device_lists_outbound_last_success as s
                USING (destination, user_id)
            WHERE destination = ? AND o.stream_id <= ?
            GROUP BY user_id
        """
        txn.execute(sql, (destination, stream_id))
        rows = txn.fetchall()

        sql = """
            UPDATE device_lists_outbound_last_success
            SET stream_id = ?
            WHERE destination = ? AND user_id = ?
        """
        txn.executemany(sql, ((row[1], destination, row[0]) for row in rows if row[2]))

        sql = """
            INSERT INTO device_lists_outbound_last_success
            (destination, user_id, stream_id) VALUES (?, ?, ?)
        """
        txn.executemany(
            sql, ((destination, row[0], row[1]) for row in rows if not row[2])
        )

        # Delete all sent outbound pokes
        sql = """
            DELETE FROM device_lists_outbound_pokes
            WHERE destination = ? AND stream_id <= ?
        """
        txn.execute(sql, (destination, stream_id))

    def get_device_stream_token(self):
        return self._device_list_id_gen.get_current_token()

    @defer.inlineCallbacks
    def get_user_devices_from_cache(self, query_list):
        """Get the devices (and keys if any) for remote users from the cache.

        Args:
            query_list(list): List of (user_id, device_ids), if device_ids is
                falsey then return all device ids for that user.

        Returns:
            (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is
            a set of user_ids and results_map is a mapping of
            user_id -> device_id -> device_info
        """
        user_ids = set(user_id for user_id, _ in query_list)
        user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids))
        user_ids_in_cache = set(
            user_id for user_id, stream_id in user_map.items() if stream_id
        )
        user_ids_not_in_cache = user_ids - user_ids_in_cache

        results = {}
        for user_id, device_id in query_list:
            if user_id not in user_ids_in_cache:
                continue

            if device_id:
                device = yield self._get_cached_user_device(user_id, device_id)
                results.setdefault(user_id, {})[device_id] = device
            else:
                results[user_id] = yield self._get_cached_devices_for_user(user_id)

        return (user_ids_not_in_cache, results)

    @cachedInlineCallbacks(num_args=2, tree=True)
    def _get_cached_user_device(self, user_id, device_id):
        content = yield self._simple_select_one_onecol(
            table="device_lists_remote_cache",
            keyvalues={"user_id": user_id, "device_id": device_id},
            retcol="content",
            desc="_get_cached_user_device",
        )
        return db_to_json(content)

    @cachedInlineCallbacks()
    def _get_cached_devices_for_user(self, user_id):
        devices = yield self._simple_select_list(
            table="device_lists_remote_cache",
            keyvalues={"user_id": user_id},
            retcols=("device_id", "content"),
            desc="_get_cached_devices_for_user",
        )
        return {
            device["device_id"]: db_to_json(device["content"]) for device in devices
        }

    def get_devices_with_keys_by_user(self, user_id):
        """Get all devices (with any device keys) for a user

        Returns:
            (stream_id, devices)
        """
        return self.runInteraction(
            "get_devices_with_keys_by_user",
            self._get_devices_with_keys_by_user_txn,
            user_id,
        )

    def _get_devices_with_keys_by_user_txn(self, txn, user_id):
        now_stream_id = self._device_list_id_gen.get_current_token()

        devices = self._get_e2e_device_keys_txn(
            txn, [(user_id, None)], include_all_devices=True
        )

        if devices:
            user_devices = devices[user_id]
            results = []
            for device_id, device in iteritems(user_devices):
                result = {"device_id": device_id}

                key_json = device.get("key_json", None)
                if key_json:
                    result["keys"] = db_to_json(key_json)
                device_display_name = device.get("device_display_name", None)
                if device_display_name:
                    result["device_display_name"] = device_display_name

                results.append(result)

            return now_stream_id, results

        return now_stream_id, []

    def get_users_whose_devices_changed(self, from_key, user_ids):
        """Get set of users whose devices have changed since `from_key` that
        are in the given list of user_ids.

        Args:
            from_key (str): The device lists stream token
            user_ids (Iterable[str])

        Returns:
            Deferred[set[str]]: The set of user_ids whose devices have changed
            since `from_key`
        """
        from_key = int(from_key)

        # Get set of users who *may* have changed. Users not in the returned
        # list have definitely not changed.
        to_check = list(
            self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
        )

        if not to_check:
            return defer.succeed(set())

        def _get_users_whose_devices_changed_txn(txn):
            changes = set()

            sql = """
                SELECT DISTINCT user_id FROM device_lists_stream
                WHERE stream_id > ?
                AND user_id IN (%s)
            """

            for chunk in batch_iter(to_check, 100):
                txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
                changes.update(user_id for user_id, in txn)

            return changes

        return self.runInteraction(
            "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
        )

    def get_all_device_list_changes_for_remotes(self, from_key, to_key):
        """Return a list of `(stream_id, user_id, destination)` which is the
        combined list of changes to devices, and which destinations need to be
        poked. `destination` may be None if no destinations need to be poked.
        """
        # We do a group by here as there can be a large number of duplicate
        # entries, since we throw away device IDs.
        sql = """
            SELECT MAX(stream_id) AS stream_id, user_id, destination
            FROM device_lists_stream
            LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
            WHERE ? < stream_id AND stream_id <= ?
            GROUP BY user_id, destination
        """
        return self._execute(
            "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
        )

    @cached(max_entries=10000)
    def get_device_list_last_stream_id_for_remote(self, user_id):
        """Get the last stream_id we got for a user. May be None if we haven't
        got any information for them.
        """
        return self._simple_select_one_onecol(
            table="device_lists_remote_extremeties",
            keyvalues={"user_id": user_id},
            retcol="stream_id",
            desc="get_device_list_last_stream_id_for_remote",
            allow_none=True,
        )

    @cachedList(
        cached_method_name="get_device_list_last_stream_id_for_remote",
        list_name="user_ids",
        inlineCallbacks=True,
    )
    def get_device_list_last_stream_id_for_remotes(self, user_ids):
        rows = yield self._simple_select_many_batch(
            table="device_lists_remote_extremeties",
            column="user_id",
            iterable=user_ids,
            retcols=("user_id", "stream_id"),
            desc="get_device_list_last_stream_id_for_remotes",
        )

        results = {user_id: None for user_id in user_ids}
        results.update({row["user_id"]: row["stream_id"] for row in rows})

        return results


class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
    def __init__(self, db_conn, hs):
        super(DeviceStore, self).__init__(db_conn, hs)

        # Map of (user_id, device_id) -> bool. If there is an entry that implies
        # the device exists.
        self.device_id_exists_cache = Cache(
            name="device_id_exists", keylen=2, max_entries=10000
        )

        self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)

        self.register_background_index_update(
            "device_lists_stream_idx",
            index_name="device_lists_stream_user_id",
            table="device_lists_stream",
            columns=["user_id", "device_id"],
        )

        # create a unique index on device_lists_remote_cache
        self.register_background_index_update(
            "device_lists_remote_cache_unique_idx",
            index_name="device_lists_remote_cache_unique_id",
            table="device_lists_remote_cache",
            columns=["user_id", "device_id"],
            unique=True,
        )

        # And one on device_lists_remote_extremeties
        self.register_background_index_update(
            "device_lists_remote_extremeties_unique_idx",
            index_name="device_lists_remote_extremeties_unique_idx",
            table="device_lists_remote_extremeties",
            columns=["user_id"],
            unique=True,
        )

        # once they complete, we can remove the old non-unique indexes.
        self.register_background_update_handler(
            DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
            self._drop_device_list_streams_non_unique_indexes,
        )

    @defer.inlineCallbacks
    def store_device(self, user_id, device_id, initial_device_display_name):
        """Ensure the given device is known; add it to the store if not

        Args:
            user_id (str): id of user associated with the device
            device_id (str): id of device
            initial_device_display_name (str): initial displayname of the
               device. Ignored if device exists.
        Returns:
            defer.Deferred: boolean whether the device was inserted or an
                existing device existed with that ID.
        """
        key = (user_id, device_id)
        if self.device_id_exists_cache.get(key, None):
            return False

        try:
            inserted = yield self._simple_insert(
                "devices",
                values={
                    "user_id": user_id,
                    "device_id": device_id,
                    "display_name": initial_device_display_name,
                },
                desc="store_device",
                or_ignore=True,
            )
            self.device_id_exists_cache.prefill(key, True)
            return inserted
        except Exception as e:
            logger.error(
                "store_device with device_id=%s(%r) user_id=%s(%r)"
                " display_name=%s(%r) failed: %s",
                type(device_id).__name__,
                device_id,
                type(user_id).__name__,
                user_id,
                type(initial_device_display_name).__name__,
                initial_device_display_name,
                e,
            )
            raise StoreError(500, "Problem storing device.")

    @defer.inlineCallbacks
    def delete_device(self, user_id, device_id):
        """Delete a device.

        Args:
            user_id (str): The ID of the user which owns the device
            device_id (str): The ID of the device to delete
        Returns:
            defer.Deferred
        """
        yield self._simple_delete_one(
            table="devices",
            keyvalues={"user_id": user_id, "device_id": device_id},
            desc="delete_device",
        )

        self.device_id_exists_cache.invalidate((user_id, device_id))

    @defer.inlineCallbacks
    def delete_devices(self, user_id, device_ids):
        """Deletes several devices.

        Args:
            user_id (str): The ID of the user which owns the devices
            device_ids (list): The IDs of the devices to delete
        Returns:
            defer.Deferred
        """
        yield self._simple_delete_many(
            table="devices",
            column="device_id",
            iterable=device_ids,
            keyvalues={"user_id": user_id},
            desc="delete_devices",
        )
        for device_id in device_ids:
            self.device_id_exists_cache.invalidate((user_id, device_id))

    def update_device(self, user_id, device_id, new_display_name=None):
        """Update a device.

        Args:
            user_id (str): The ID of the user which owns the device
            device_id (str): The ID of the device to update
            new_display_name (str|None): new displayname for device; None
               to leave unchanged
        Raises:
            StoreError: if the device is not found
        Returns:
            defer.Deferred
        """
        updates = {}
        if new_display_name is not None:
            updates["display_name"] = new_display_name
        if not updates:
            return defer.succeed(None)
        return self._simple_update_one(
            table="devices",
            keyvalues={"user_id": user_id, "device_id": device_id},
            updatevalues=updates,
            desc="update_device",
        )

    @defer.inlineCallbacks
    def mark_remote_user_device_list_as_unsubscribed(self, user_id):
        """Mark that we no longer track device lists for remote user.
        """
        yield self._simple_delete(
            table="device_lists_remote_extremeties",
            keyvalues={"user_id": user_id},
            desc="mark_remote_user_device_list_as_unsubscribed",
        )
        self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))

    def update_remote_device_list_cache_entry(
        self, user_id, device_id, content, stream_id
    ):
        """Updates a single device in the cache of a remote user's devicelist.

        Note: assumes that we are the only thread that can be updating this user's
        device list.

        Args:
            user_id (str): User to update device list for
            device_id (str): ID of decivice being updated
            content (dict): new data on this device
            stream_id (int): the version of the device list

        Returns:
            Deferred[None]
        """
        return self.runInteraction(
            "update_remote_device_list_cache_entry",
            self._update_remote_device_list_cache_entry_txn,
            user_id,
            device_id,
            content,
            stream_id,
        )

    def _update_remote_device_list_cache_entry_txn(
        self, txn, user_id, device_id, content, stream_id
    ):
        if content.get("deleted"):
            self._simple_delete_txn(
                txn,
                table="device_lists_remote_cache",
                keyvalues={"user_id": user_id, "device_id": device_id},
            )

            txn.call_after(self.device_id_exists_cache.invalidate, (user_id, device_id))
        else:
            self._simple_upsert_txn(
                txn,
                table="device_lists_remote_cache",
                keyvalues={"user_id": user_id, "device_id": device_id},
                values={"content": json.dumps(content)},
                # we don't need to lock, because we assume we are the only thread
                # updating this user's devices.
                lock=False,
            )

        txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
        txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
        txn.call_after(
            self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
        )

        self._simple_upsert_txn(
            txn,
            table="device_lists_remote_extremeties",
            keyvalues={"user_id": user_id},
            values={"stream_id": stream_id},
            # again, we can assume we are the only thread updating this user's
            # extremity.
            lock=False,
        )

    def update_remote_device_list_cache(self, user_id, devices, stream_id):
        """Replace the entire cache of the remote user's devices.

        Note: assumes that we are the only thread that can be updating this user's
        device list.

        Args:
            user_id (str): User to update device list for
            devices (list[dict]): list of device objects supplied over federation
            stream_id (int): the version of the device list

        Returns:
            Deferred[None]
        """
        return self.runInteraction(
            "update_remote_device_list_cache",
            self._update_remote_device_list_cache_txn,
            user_id,
            devices,
            stream_id,
        )

    def _update_remote_device_list_cache_txn(self, txn, user_id, devices, stream_id):
        self._simple_delete_txn(
            txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id}
        )

        self._simple_insert_many_txn(
            txn,
            table="device_lists_remote_cache",
            values=[
                {
                    "user_id": user_id,
                    "device_id": content["device_id"],
                    "content": json.dumps(content),
                }
                for content in devices
            ],
        )

        txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
        txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
        txn.call_after(
            self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
        )

        self._simple_upsert_txn(
            txn,
            table="device_lists_remote_extremeties",
            keyvalues={"user_id": user_id},
            values={"stream_id": stream_id},
            # we don't need to lock, because we can assume we are the only thread
            # updating this user's extremity.
            lock=False,
        )

    @defer.inlineCallbacks
    def add_device_change_to_streams(self, user_id, device_ids, hosts):
        """Persist that a user's devices have been updated, and which hosts
        (if any) should be poked.
        """
        with self._device_list_id_gen.get_next() as stream_id:
            yield self.runInteraction(
                "add_device_change_to_streams",
                self._add_device_change_txn,
                user_id,
                device_ids,
                hosts,
                stream_id,
            )
        return stream_id

    def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
        now = self._clock.time_msec()

        txn.call_after(
            self._device_list_stream_cache.entity_has_changed, user_id, stream_id
        )
        for host in hosts:
            txn.call_after(
                self._device_list_federation_stream_cache.entity_has_changed,
                host,
                stream_id,
            )

        # Delete older entries in the table, as we really only care about
        # when the latest change happened.
        txn.executemany(
            """
            DELETE FROM device_lists_stream
            WHERE user_id = ? AND device_id = ? AND stream_id < ?
            """,
            [(user_id, device_id, stream_id) for device_id in device_ids],
        )

        self._simple_insert_many_txn(
            txn,
            table="device_lists_stream",
            values=[
                {"stream_id": stream_id, "user_id": user_id, "device_id": device_id}
                for device_id in device_ids
            ],
        )

        self._simple_insert_many_txn(
            txn,
            table="device_lists_outbound_pokes",
            values=[
                {
                    "destination": destination,
                    "stream_id": stream_id,
                    "user_id": user_id,
                    "device_id": device_id,
                    "sent": False,
                    "ts": now,
                }
                for destination in hosts
                for device_id in device_ids
            ],
        )

    def _prune_old_outbound_device_pokes(self):
        """Delete old entries out of the device_lists_outbound_pokes to ensure
        that we don't fill up due to dead servers. We keep one entry per
        (destination, user_id) tuple to ensure that the prev_ids remain correct
        if the server does come back.
        """
        yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000

        def _prune_txn(txn):
            select_sql = """
                SELECT destination, user_id, max(stream_id) as stream_id
                FROM device_lists_outbound_pokes
                GROUP BY destination, user_id
                HAVING min(ts) < ? AND count(*) > 1
            """

            txn.execute(select_sql, (yesterday,))
            rows = txn.fetchall()

            if not rows:
                return

            delete_sql = """
                DELETE FROM device_lists_outbound_pokes
                WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
            """

            txn.executemany(
                delete_sql, ((yesterday, row[0], row[1], row[2]) for row in rows)
            )

            # Since we've deleted unsent deltas, we need to remove the entry
            # of last successful sent so that the prev_ids are correctly set.
            sql = """
                DELETE FROM device_lists_outbound_last_success
                WHERE destination = ? AND user_id = ?
            """
            txn.executemany(sql, ((row[0], row[1]) for row in rows))

            logger.info("Pruned %d device list outbound pokes", txn.rowcount)

        return run_as_background_process(
            "prune_old_outbound_device_pokes",
            self.runInteraction,
            "_prune_old_outbound_device_pokes",
            _prune_txn,
        )

    @defer.inlineCallbacks
    def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
        def f(conn):
            txn = conn.cursor()
            txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
            txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
            txn.close()

        yield self.runWithConnection(f)
        yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
        return 1