# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from blist import sorteddict class FederationRemoteSendQueue(object): def __init__(self, hs): self.clock = hs.get_clock() # TODO: Add metrics for size of lists below self.presence_map = {} self.presence_changed = sorteddict() self.keyed_edu = {} self.keyed_edu_changed = sorteddict() self.edus = sorteddict() self.failures = sorteddict() self.pos = 1 self.pos_time = sorteddict() self.clock.looping_call(self._clear_queue, 30 * 1000) def _next_pos(self): pos = self.pos self.pos += 1 self.pos_time[self.clock.time_msec()] = pos return pos def _clear_queue(self): # TODO measure this function time. FIVE_MINUTES_AGO = 5 * 60 * 1000 now = self.clock.time_msec() keys = self.pos_time.keys() time = keys.bisect_left(now - FIVE_MINUTES_AGO) if not keys[:time]: return position_to_delete = max(keys[:time]) for key in keys[:time]: del self.pos_time[key] self._clear_queue_before_pos(position_to_delete) def _clear_queue_before_pos(self, position_to_delete): # Delete things out of presence maps keys = self.presence_changed.keys() i = keys.bisect_left(position_to_delete) for key in keys[:i]: del self.presence_changed[key] user_ids = set() for _, states in self.presence_changed.values(): user_ids.update(s.user_id for s in user_ids) to_del = [user_id for user_id in self.presence_map if user_id not in user_ids] for user_id in self.to_del: del self.presence_map[user_id] # Delete things out of keyed edus keys = self.keyed_edu_changed.keys() i = keys.bisect_left(position_to_delete) for key in keys[:i]: del self.keyed_edu_changed[key] live_keys = set() for edu_key in self.keyed_edu_changed.values(): live_keys.add(edu_key) to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys] for edu_key in to_del: del self.keyed_edu[edu_key] # Delete things out of edu map keys = self.edus.keys() i = keys.bisect_left(position_to_delete) for key in keys[:i]: del self.edus[key] # Delete things out of failure map keys = self.failures.keys() i = keys.bisect_left(position_to_delete) for key in keys[:i]: del self.failures[key] def send_edu(self, edu, key=None): pos = self._next_pos() if key: self.keyed_edu[(edu.destination, key)] = edu self.keyed_edu_changed[pos] = (edu.destination, key) else: self.edus[pos] = edu def send_presence(self, destination, states): pos = self._next_pos() self.presence_map.presence_map.update({ state.user_id: state for state in states }) self.presence_changed[pos] = (destination, [ state.user_id for state in states ]) def send_failure(self, failure, destination): pos = self._next_pos() self.failures[pos] = (destination, failure) def notify_new_device_message(self, destination): # TODO pass def get_replication_rows(self, token): rows = [] # Fetch changed presence keys = self.presence_changed.keys() i = keys.bisect_right(token) dest_user_ids = set((k, self.presence_changed[k]) for k in keys[i:]) for (key, (dest, user_ids)) in dest_user_ids: for user_id in user_ids: rows.append((key, dest, "p", self.presence_map[user_id])) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() i = keys.bisect_right(token) keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) for (pos, edu_key) in keyed_edus: rows.append((pos, edu_key, "k", self.keyed_edu[edu_key])) # Fetch changed edus keys = self.edus.keys() i = keys.bisect_right(token) edus = set((k, self.edus[k]) for k in keys[i:]) for (pos, edu) in edus: rows.append((pos, edu.destination, "e", edu)) # Fetch changed failures keys = self.failures.keys() i = keys.bisect_right(token) failures = set((k, self.failures[k]) for k in keys[i:]) for (pos, (destination, failure)) in failures: rows.append((pos, destination, "f", failure)) # Sort rows based on pos rows.sort() return rows