Other parts of the code (such as the StreamChangeCache) assume that there will
not be multiple changes with the same stream id.
This code was introduced in #7024, and I hope this fixes#7206.
The get_entities_changed function was changed to return all changed
entities since the given stream position, rather than only those changed
from a given list of entities. This resulted in the function incorrectly
returning large numbers of entities that, for example, caused large
increases in database usage.
The stream cache keeps track of all entities that have changed since
a particular stream position, so get_entities_changed does not need to
return unknown entites when given a larger stream position.
This makes it consistent with the behaviour of has_entity_changed.
This line shows up as about 5% of cpu time on a synchrotron:
not_known_entities = set(entities) - set(self._entity_to_key)
Presumably the problem here is that _entity_to_key can be largeish, and
building a set for its keys every time this function is called is slow.
Here we rewrite the logic to avoid building so many sets.
This reverts commit 9fbe70a7dc.
It turns out that sortedcontainers.SortedDict is not an exact match for
blist.sorteddict; in particular, `popitem()` removes things from the opposite
end of the dict.
This is trivial to fix, but I want to add some unit tests, and potentially some
more thought about it, before we do so.
Occaisonally has_any_entity_changed would throw the error: "Set changed
size during iteration" when taking the max of the `sorteddict`. While
its uncertain how that happens, its quite inefficient to iterate over
the entire dict anyway so we change to using the more traditional
`bisect_*` functions.
... and update some docstrings to correctly reflect the types being used.
get_new_device_msgs_for_remote can return a long under some circumstances,
which was being stored in last_device_list_stream_id_by_dest, and was then
upsetting things on the next loop.
We change it so that each cache has an individual CacheMetric, instead
of having one global CacheMetric. This means that when a cache tries to
increment a counter it does not need to go through so many indirections.