mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-20 15:11:30 -05:00
Merge pull request #6358 from matrix-org/babolivier/message_retention
Implement message retention policies (MSC1763)
This commit is contained in:
commit
d31f69afa0
1
changelog.d/5815.feature
Normal file
1
changelog.d/5815.feature
Normal file
@ -0,0 +1 @@
|
|||||||
|
Implement per-room message retention policies.
|
@ -328,6 +328,69 @@ listeners:
|
|||||||
#
|
#
|
||||||
#user_ips_max_age: 14d
|
#user_ips_max_age: 14d
|
||||||
|
|
||||||
|
# Message retention policy at the server level.
|
||||||
|
#
|
||||||
|
# Room admins and mods can define a retention period for their rooms using the
|
||||||
|
# 'm.room.retention' state event, and server admins can cap this period by setting
|
||||||
|
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
|
||||||
|
#
|
||||||
|
# If this feature is enabled, Synapse will regularly look for and purge events
|
||||||
|
# which are older than the room's maximum retention period. Synapse will also
|
||||||
|
# filter events received over federation so that events that should have been
|
||||||
|
# purged are ignored and not stored again.
|
||||||
|
#
|
||||||
|
retention:
|
||||||
|
# The message retention policies feature is disabled by default. Uncomment the
|
||||||
|
# following line to enable it.
|
||||||
|
#
|
||||||
|
#enabled: true
|
||||||
|
|
||||||
|
# Default retention policy. If set, Synapse will apply it to rooms that lack the
|
||||||
|
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
|
||||||
|
# matter much because Synapse doesn't take it into account yet.
|
||||||
|
#
|
||||||
|
#default_policy:
|
||||||
|
# min_lifetime: 1d
|
||||||
|
# max_lifetime: 1y
|
||||||
|
|
||||||
|
# Retention policy limits. If set, a user won't be able to send a
|
||||||
|
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
|
||||||
|
# that's not within this range. This is especially useful in closed federations,
|
||||||
|
# in which server admins can make sure every federating server applies the same
|
||||||
|
# rules.
|
||||||
|
#
|
||||||
|
#allowed_lifetime_min: 1d
|
||||||
|
#allowed_lifetime_max: 1y
|
||||||
|
|
||||||
|
# Server admins can define the settings of the background jobs purging the
|
||||||
|
# events which lifetime has expired under the 'purge_jobs' section.
|
||||||
|
#
|
||||||
|
# If no configuration is provided, a single job will be set up to delete expired
|
||||||
|
# events in every room daily.
|
||||||
|
#
|
||||||
|
# Each job's configuration defines which range of message lifetimes the job
|
||||||
|
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
|
||||||
|
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
|
||||||
|
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
|
||||||
|
# lower than or equal to 3 days. Both the minimum and the maximum value of a
|
||||||
|
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
|
||||||
|
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
|
||||||
|
# which 'max_lifetime' is lower than or equal to three days.
|
||||||
|
#
|
||||||
|
# The rationale for this per-job configuration is that some rooms might have a
|
||||||
|
# retention policy with a low 'max_lifetime', where history needs to be purged
|
||||||
|
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
|
||||||
|
# that purge to be performed by a job that's iterating over every room it knows,
|
||||||
|
# which would be quite heavy on the server.
|
||||||
|
#
|
||||||
|
#purge_jobs:
|
||||||
|
# - shortest_max_lifetime: 1d
|
||||||
|
# longest_max_lifetime: 3d
|
||||||
|
# interval: 5m:
|
||||||
|
# - shortest_max_lifetime: 3d
|
||||||
|
# longest_max_lifetime: 1y
|
||||||
|
# interval: 24h
|
||||||
|
|
||||||
|
|
||||||
## TLS ##
|
## TLS ##
|
||||||
|
|
||||||
|
@ -94,6 +94,8 @@ class EventTypes(object):
|
|||||||
ServerACL = "m.room.server_acl"
|
ServerACL = "m.room.server_acl"
|
||||||
Pinned = "m.room.pinned_events"
|
Pinned = "m.room.pinned_events"
|
||||||
|
|
||||||
|
Retention = "m.room.retention"
|
||||||
|
|
||||||
|
|
||||||
class RejectedReason(object):
|
class RejectedReason(object):
|
||||||
AUTH_ERROR = "auth_error"
|
AUTH_ERROR = "auth_error"
|
||||||
|
@ -19,7 +19,7 @@ import logging
|
|||||||
import os.path
|
import os.path
|
||||||
import re
|
import re
|
||||||
from textwrap import indent
|
from textwrap import indent
|
||||||
from typing import List
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
import yaml
|
import yaml
|
||||||
@ -246,6 +246,124 @@ class ServerConfig(Config):
|
|||||||
# events with profile information that differ from the target's global profile.
|
# events with profile information that differ from the target's global profile.
|
||||||
self.allow_per_room_profiles = config.get("allow_per_room_profiles", True)
|
self.allow_per_room_profiles = config.get("allow_per_room_profiles", True)
|
||||||
|
|
||||||
|
retention_config = config.get("retention")
|
||||||
|
if retention_config is None:
|
||||||
|
retention_config = {}
|
||||||
|
|
||||||
|
self.retention_enabled = retention_config.get("enabled", False)
|
||||||
|
|
||||||
|
retention_default_policy = retention_config.get("default_policy")
|
||||||
|
|
||||||
|
if retention_default_policy is not None:
|
||||||
|
self.retention_default_min_lifetime = retention_default_policy.get(
|
||||||
|
"min_lifetime"
|
||||||
|
)
|
||||||
|
if self.retention_default_min_lifetime is not None:
|
||||||
|
self.retention_default_min_lifetime = self.parse_duration(
|
||||||
|
self.retention_default_min_lifetime
|
||||||
|
)
|
||||||
|
|
||||||
|
self.retention_default_max_lifetime = retention_default_policy.get(
|
||||||
|
"max_lifetime"
|
||||||
|
)
|
||||||
|
if self.retention_default_max_lifetime is not None:
|
||||||
|
self.retention_default_max_lifetime = self.parse_duration(
|
||||||
|
self.retention_default_max_lifetime
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.retention_default_min_lifetime is not None
|
||||||
|
and self.retention_default_max_lifetime is not None
|
||||||
|
and (
|
||||||
|
self.retention_default_min_lifetime
|
||||||
|
> self.retention_default_max_lifetime
|
||||||
|
)
|
||||||
|
):
|
||||||
|
raise ConfigError(
|
||||||
|
"The default retention policy's 'min_lifetime' can not be greater"
|
||||||
|
" than its 'max_lifetime'"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.retention_default_min_lifetime = None
|
||||||
|
self.retention_default_max_lifetime = None
|
||||||
|
|
||||||
|
self.retention_allowed_lifetime_min = retention_config.get(
|
||||||
|
"allowed_lifetime_min"
|
||||||
|
)
|
||||||
|
if self.retention_allowed_lifetime_min is not None:
|
||||||
|
self.retention_allowed_lifetime_min = self.parse_duration(
|
||||||
|
self.retention_allowed_lifetime_min
|
||||||
|
)
|
||||||
|
|
||||||
|
self.retention_allowed_lifetime_max = retention_config.get(
|
||||||
|
"allowed_lifetime_max"
|
||||||
|
)
|
||||||
|
if self.retention_allowed_lifetime_max is not None:
|
||||||
|
self.retention_allowed_lifetime_max = self.parse_duration(
|
||||||
|
self.retention_allowed_lifetime_max
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.retention_allowed_lifetime_min is not None
|
||||||
|
and self.retention_allowed_lifetime_max is not None
|
||||||
|
and self.retention_allowed_lifetime_min
|
||||||
|
> self.retention_allowed_lifetime_max
|
||||||
|
):
|
||||||
|
raise ConfigError(
|
||||||
|
"Invalid retention policy limits: 'allowed_lifetime_min' can not be"
|
||||||
|
" greater than 'allowed_lifetime_max'"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]]
|
||||||
|
for purge_job_config in retention_config.get("purge_jobs", []):
|
||||||
|
interval_config = purge_job_config.get("interval")
|
||||||
|
|
||||||
|
if interval_config is None:
|
||||||
|
raise ConfigError(
|
||||||
|
"A retention policy's purge jobs configuration must have the"
|
||||||
|
" 'interval' key set."
|
||||||
|
)
|
||||||
|
|
||||||
|
interval = self.parse_duration(interval_config)
|
||||||
|
|
||||||
|
shortest_max_lifetime = purge_job_config.get("shortest_max_lifetime")
|
||||||
|
|
||||||
|
if shortest_max_lifetime is not None:
|
||||||
|
shortest_max_lifetime = self.parse_duration(shortest_max_lifetime)
|
||||||
|
|
||||||
|
longest_max_lifetime = purge_job_config.get("longest_max_lifetime")
|
||||||
|
|
||||||
|
if longest_max_lifetime is not None:
|
||||||
|
longest_max_lifetime = self.parse_duration(longest_max_lifetime)
|
||||||
|
|
||||||
|
if (
|
||||||
|
shortest_max_lifetime is not None
|
||||||
|
and longest_max_lifetime is not None
|
||||||
|
and shortest_max_lifetime > longest_max_lifetime
|
||||||
|
):
|
||||||
|
raise ConfigError(
|
||||||
|
"A retention policy's purge jobs configuration's"
|
||||||
|
" 'shortest_max_lifetime' value can not be greater than its"
|
||||||
|
" 'longest_max_lifetime' value."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.retention_purge_jobs.append(
|
||||||
|
{
|
||||||
|
"interval": interval,
|
||||||
|
"shortest_max_lifetime": shortest_max_lifetime,
|
||||||
|
"longest_max_lifetime": longest_max_lifetime,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if not self.retention_purge_jobs:
|
||||||
|
self.retention_purge_jobs = [
|
||||||
|
{
|
||||||
|
"interval": self.parse_duration("1d"),
|
||||||
|
"shortest_max_lifetime": None,
|
||||||
|
"longest_max_lifetime": None,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
self.listeners = [] # type: List[dict]
|
self.listeners = [] # type: List[dict]
|
||||||
for listener in config.get("listeners", []):
|
for listener in config.get("listeners", []):
|
||||||
if not isinstance(listener.get("port", None), int):
|
if not isinstance(listener.get("port", None), int):
|
||||||
@ -761,6 +879,69 @@ class ServerConfig(Config):
|
|||||||
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
|
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
|
||||||
#
|
#
|
||||||
#user_ips_max_age: 14d
|
#user_ips_max_age: 14d
|
||||||
|
|
||||||
|
# Message retention policy at the server level.
|
||||||
|
#
|
||||||
|
# Room admins and mods can define a retention period for their rooms using the
|
||||||
|
# 'm.room.retention' state event, and server admins can cap this period by setting
|
||||||
|
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
|
||||||
|
#
|
||||||
|
# If this feature is enabled, Synapse will regularly look for and purge events
|
||||||
|
# which are older than the room's maximum retention period. Synapse will also
|
||||||
|
# filter events received over federation so that events that should have been
|
||||||
|
# purged are ignored and not stored again.
|
||||||
|
#
|
||||||
|
retention:
|
||||||
|
# The message retention policies feature is disabled by default. Uncomment the
|
||||||
|
# following line to enable it.
|
||||||
|
#
|
||||||
|
#enabled: true
|
||||||
|
|
||||||
|
# Default retention policy. If set, Synapse will apply it to rooms that lack the
|
||||||
|
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
|
||||||
|
# matter much because Synapse doesn't take it into account yet.
|
||||||
|
#
|
||||||
|
#default_policy:
|
||||||
|
# min_lifetime: 1d
|
||||||
|
# max_lifetime: 1y
|
||||||
|
|
||||||
|
# Retention policy limits. If set, a user won't be able to send a
|
||||||
|
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
|
||||||
|
# that's not within this range. This is especially useful in closed federations,
|
||||||
|
# in which server admins can make sure every federating server applies the same
|
||||||
|
# rules.
|
||||||
|
#
|
||||||
|
#allowed_lifetime_min: 1d
|
||||||
|
#allowed_lifetime_max: 1y
|
||||||
|
|
||||||
|
# Server admins can define the settings of the background jobs purging the
|
||||||
|
# events which lifetime has expired under the 'purge_jobs' section.
|
||||||
|
#
|
||||||
|
# If no configuration is provided, a single job will be set up to delete expired
|
||||||
|
# events in every room daily.
|
||||||
|
#
|
||||||
|
# Each job's configuration defines which range of message lifetimes the job
|
||||||
|
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
|
||||||
|
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
|
||||||
|
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
|
||||||
|
# lower than or equal to 3 days. Both the minimum and the maximum value of a
|
||||||
|
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
|
||||||
|
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
|
||||||
|
# which 'max_lifetime' is lower than or equal to three days.
|
||||||
|
#
|
||||||
|
# The rationale for this per-job configuration is that some rooms might have a
|
||||||
|
# retention policy with a low 'max_lifetime', where history needs to be purged
|
||||||
|
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
|
||||||
|
# that purge to be performed by a job that's iterating over every room it knows,
|
||||||
|
# which would be quite heavy on the server.
|
||||||
|
#
|
||||||
|
#purge_jobs:
|
||||||
|
# - shortest_max_lifetime: 1d
|
||||||
|
# longest_max_lifetime: 3d
|
||||||
|
# interval: 5m:
|
||||||
|
# - shortest_max_lifetime: 3d
|
||||||
|
# longest_max_lifetime: 1y
|
||||||
|
# interval: 24h
|
||||||
"""
|
"""
|
||||||
% locals()
|
% locals()
|
||||||
)
|
)
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from six import string_types
|
from six import integer_types, string_types
|
||||||
|
|
||||||
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
|
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
@ -22,11 +22,12 @@ from synapse.types import EventID, RoomID, UserID
|
|||||||
|
|
||||||
|
|
||||||
class EventValidator(object):
|
class EventValidator(object):
|
||||||
def validate_new(self, event):
|
def validate_new(self, event, config):
|
||||||
"""Validates the event has roughly the right format
|
"""Validates the event has roughly the right format
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event (FrozenEvent)
|
event (FrozenEvent): The event to validate.
|
||||||
|
config (Config): The homeserver's configuration.
|
||||||
"""
|
"""
|
||||||
self.validate_builder(event)
|
self.validate_builder(event)
|
||||||
|
|
||||||
@ -67,6 +68,99 @@ class EventValidator(object):
|
|||||||
Codes.INVALID_PARAM,
|
Codes.INVALID_PARAM,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if event.type == EventTypes.Retention:
|
||||||
|
self._validate_retention(event, config)
|
||||||
|
|
||||||
|
def _validate_retention(self, event, config):
|
||||||
|
"""Checks that an event that defines the retention policy for a room respects the
|
||||||
|
boundaries imposed by the server's administrator.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event (FrozenEvent): The event to validate.
|
||||||
|
config (Config): The homeserver's configuration.
|
||||||
|
"""
|
||||||
|
min_lifetime = event.content.get("min_lifetime")
|
||||||
|
max_lifetime = event.content.get("max_lifetime")
|
||||||
|
|
||||||
|
if min_lifetime is not None:
|
||||||
|
if not isinstance(min_lifetime, integer_types):
|
||||||
|
raise SynapseError(
|
||||||
|
code=400,
|
||||||
|
msg="'min_lifetime' must be an integer",
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
config.retention_allowed_lifetime_min is not None
|
||||||
|
and min_lifetime < config.retention_allowed_lifetime_min
|
||||||
|
):
|
||||||
|
raise SynapseError(
|
||||||
|
code=400,
|
||||||
|
msg=(
|
||||||
|
"'min_lifetime' can't be lower than the minimum allowed"
|
||||||
|
" value enforced by the server's administrator"
|
||||||
|
),
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
config.retention_allowed_lifetime_max is not None
|
||||||
|
and min_lifetime > config.retention_allowed_lifetime_max
|
||||||
|
):
|
||||||
|
raise SynapseError(
|
||||||
|
code=400,
|
||||||
|
msg=(
|
||||||
|
"'min_lifetime' can't be greater than the maximum allowed"
|
||||||
|
" value enforced by the server's administrator"
|
||||||
|
),
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
if max_lifetime is not None:
|
||||||
|
if not isinstance(max_lifetime, integer_types):
|
||||||
|
raise SynapseError(
|
||||||
|
code=400,
|
||||||
|
msg="'max_lifetime' must be an integer",
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
config.retention_allowed_lifetime_min is not None
|
||||||
|
and max_lifetime < config.retention_allowed_lifetime_min
|
||||||
|
):
|
||||||
|
raise SynapseError(
|
||||||
|
code=400,
|
||||||
|
msg=(
|
||||||
|
"'max_lifetime' can't be lower than the minimum allowed value"
|
||||||
|
" enforced by the server's administrator"
|
||||||
|
),
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
config.retention_allowed_lifetime_max is not None
|
||||||
|
and max_lifetime > config.retention_allowed_lifetime_max
|
||||||
|
):
|
||||||
|
raise SynapseError(
|
||||||
|
code=400,
|
||||||
|
msg=(
|
||||||
|
"'max_lifetime' can't be greater than the maximum allowed"
|
||||||
|
" value enforced by the server's administrator"
|
||||||
|
),
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
min_lifetime is not None
|
||||||
|
and max_lifetime is not None
|
||||||
|
and min_lifetime > max_lifetime
|
||||||
|
):
|
||||||
|
raise SynapseError(
|
||||||
|
code=400,
|
||||||
|
msg="'min_lifetime' can't be greater than 'max_lifetime",
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
def validate_builder(self, event):
|
def validate_builder(self, event):
|
||||||
"""Validates that the builder/event has roughly the right format. Only
|
"""Validates that the builder/event has roughly the right format. Only
|
||||||
checks values that we expect a proto event to have, rather than all the
|
checks values that we expect a proto event to have, rather than all the
|
||||||
|
@ -2466,7 +2466,7 @@ class FederationHandler(BaseHandler):
|
|||||||
room_version, event_dict, event, context
|
room_version, event_dict, event, context
|
||||||
)
|
)
|
||||||
|
|
||||||
EventValidator().validate_new(event)
|
EventValidator().validate_new(event, self.config)
|
||||||
|
|
||||||
# We need to tell the transaction queue to send this out, even
|
# We need to tell the transaction queue to send this out, even
|
||||||
# though the sender isn't a local user.
|
# though the sender isn't a local user.
|
||||||
@ -2581,7 +2581,7 @@ class FederationHandler(BaseHandler):
|
|||||||
event, context = yield self.event_creation_handler.create_new_client_event(
|
event, context = yield self.event_creation_handler.create_new_client_event(
|
||||||
builder=builder
|
builder=builder
|
||||||
)
|
)
|
||||||
EventValidator().validate_new(event)
|
EventValidator().validate_new(event, self.config)
|
||||||
return (event, context)
|
return (event, context)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -417,7 +417,7 @@ class EventCreationHandler(object):
|
|||||||
403, "You must be in the room to create an alias for it"
|
403, "You must be in the room to create an alias for it"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.validator.validate_new(event)
|
self.validator.validate_new(event, self.config)
|
||||||
|
|
||||||
return (event, context)
|
return (event, context)
|
||||||
|
|
||||||
@ -634,7 +634,7 @@ class EventCreationHandler(object):
|
|||||||
if requester:
|
if requester:
|
||||||
context.app_service = requester.app_service
|
context.app_service = requester.app_service
|
||||||
|
|
||||||
self.validator.validate_new(event)
|
self.validator.validate_new(event, self.config)
|
||||||
|
|
||||||
# If this event is an annotation then we check that that the sender
|
# If this event is an annotation then we check that that the sender
|
||||||
# can't annotate the same way twice (e.g. stops users from liking an
|
# can't annotate the same way twice (e.g. stops users from liking an
|
||||||
|
@ -15,12 +15,15 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.logging.context import run_in_background
|
from synapse.logging.context import run_in_background
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.async_helpers import ReadWriteLock
|
from synapse.util.async_helpers import ReadWriteLock
|
||||||
@ -80,6 +83,109 @@ class PaginationHandler(object):
|
|||||||
self._purges_by_id = {}
|
self._purges_by_id = {}
|
||||||
self._event_serializer = hs.get_event_client_serializer()
|
self._event_serializer = hs.get_event_client_serializer()
|
||||||
|
|
||||||
|
self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
|
||||||
|
|
||||||
|
if hs.config.retention_enabled:
|
||||||
|
# Run the purge jobs described in the configuration file.
|
||||||
|
for job in hs.config.retention_purge_jobs:
|
||||||
|
self.clock.looping_call(
|
||||||
|
run_as_background_process,
|
||||||
|
job["interval"],
|
||||||
|
"purge_history_for_rooms_in_range",
|
||||||
|
self.purge_history_for_rooms_in_range,
|
||||||
|
job["shortest_max_lifetime"],
|
||||||
|
job["longest_max_lifetime"],
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def purge_history_for_rooms_in_range(self, min_ms, max_ms):
|
||||||
|
"""Purge outdated events from rooms within the given retention range.
|
||||||
|
|
||||||
|
If a default retention policy is defined in the server's configuration and its
|
||||||
|
'max_lifetime' is within this range, also targets rooms which don't have a
|
||||||
|
retention policy.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
min_ms (int|None): Duration in milliseconds that define the lower limit of
|
||||||
|
the range to handle (exclusive). If None, it means that the range has no
|
||||||
|
lower limit.
|
||||||
|
max_ms (int|None): Duration in milliseconds that define the upper limit of
|
||||||
|
the range to handle (inclusive). If None, it means that the range has no
|
||||||
|
upper limit.
|
||||||
|
"""
|
||||||
|
# We want the storage layer to to include rooms with no retention policy in its
|
||||||
|
# return value only if a default retention policy is defined in the server's
|
||||||
|
# configuration and that policy's 'max_lifetime' is either lower (or equal) than
|
||||||
|
# max_ms or higher than min_ms (or both).
|
||||||
|
if self._retention_default_max_lifetime is not None:
|
||||||
|
include_null = True
|
||||||
|
|
||||||
|
if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
|
||||||
|
# The default max_lifetime is lower than (or equal to) min_ms.
|
||||||
|
include_null = False
|
||||||
|
|
||||||
|
if max_ms is not None and max_ms < self._retention_default_max_lifetime:
|
||||||
|
# The default max_lifetime is higher than max_ms.
|
||||||
|
include_null = False
|
||||||
|
else:
|
||||||
|
include_null = False
|
||||||
|
|
||||||
|
rooms = yield self.store.get_rooms_for_retention_period_in_range(
|
||||||
|
min_ms, max_ms, include_null
|
||||||
|
)
|
||||||
|
|
||||||
|
for room_id, retention_policy in iteritems(rooms):
|
||||||
|
if room_id in self._purges_in_progress_by_room:
|
||||||
|
logger.warning(
|
||||||
|
"[purge] not purging room %s as there's an ongoing purge running"
|
||||||
|
" for this room",
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
max_lifetime = retention_policy["max_lifetime"]
|
||||||
|
|
||||||
|
if max_lifetime is None:
|
||||||
|
# If max_lifetime is None, it means that include_null equals True,
|
||||||
|
# therefore we can safely assume that there is a default policy defined
|
||||||
|
# in the server's configuration.
|
||||||
|
max_lifetime = self._retention_default_max_lifetime
|
||||||
|
|
||||||
|
# Figure out what token we should start purging at.
|
||||||
|
ts = self.clock.time_msec() - max_lifetime
|
||||||
|
|
||||||
|
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
|
||||||
|
|
||||||
|
r = yield self.store.get_room_event_after_stream_ordering(
|
||||||
|
room_id, stream_ordering,
|
||||||
|
)
|
||||||
|
if not r:
|
||||||
|
logger.warning(
|
||||||
|
"[purge] purging events not possible: No event found "
|
||||||
|
"(ts %i => stream_ordering %i)",
|
||||||
|
ts,
|
||||||
|
stream_ordering,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
(stream, topo, _event_id) = r
|
||||||
|
token = "t%d-%d" % (topo, stream)
|
||||||
|
|
||||||
|
purge_id = random_string(16)
|
||||||
|
|
||||||
|
self._purges_by_id[purge_id] = PurgeStatus()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
# We want to purge everything, including local events, and to run the purge in
|
||||||
|
# the background so that it's not blocking any other operation apart from
|
||||||
|
# other purges in the same room.
|
||||||
|
run_as_background_process(
|
||||||
|
"_purge_history", self._purge_history, purge_id, room_id, token, True,
|
||||||
|
)
|
||||||
|
|
||||||
def start_purge_history(self, room_id, token, delete_local_events=False):
|
def start_purge_history(self, room_id, token, delete_local_events=False):
|
||||||
"""Start off a history purge on a room.
|
"""Start off a history purge on a room.
|
||||||
|
|
||||||
|
@ -927,6 +927,9 @@ class EventsStore(
|
|||||||
elif event.type == EventTypes.Redaction:
|
elif event.type == EventTypes.Redaction:
|
||||||
# Insert into the redactions table.
|
# Insert into the redactions table.
|
||||||
self._store_redaction(txn, event)
|
self._store_redaction(txn, event)
|
||||||
|
elif event.type == EventTypes.Retention:
|
||||||
|
# Update the room_retention table.
|
||||||
|
self._store_retention_policy_for_room_txn(txn, event)
|
||||||
|
|
||||||
self._handle_event_relations(txn, event)
|
self._handle_event_relations(txn, event)
|
||||||
|
|
||||||
|
@ -19,10 +19,13 @@ import logging
|
|||||||
import re
|
import re
|
||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
from six import integer_types
|
||||||
|
|
||||||
from canonicaljson import json
|
from canonicaljson import json
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.data_stores.main.search import SearchStore
|
from synapse.storage.data_stores.main.search import SearchStore
|
||||||
@ -300,8 +303,141 @@ class RoomWorkerStore(SQLBaseStore):
|
|||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@cachedInlineCallbacks()
|
||||||
|
def get_retention_policy_for_room(self, room_id):
|
||||||
|
"""Get the retention policy for a given room.
|
||||||
|
|
||||||
|
If no retention policy has been found for this room, returns a policy defined
|
||||||
|
by the configured default policy (which has None as both the 'min_lifetime' and
|
||||||
|
the 'max_lifetime' if no default policy has been defined in the server's
|
||||||
|
configuration).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str): The ID of the room to get the retention policy of.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_retention_policy_for_room_txn(txn):
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
SELECT min_lifetime, max_lifetime FROM room_retention
|
||||||
|
INNER JOIN current_state_events USING (event_id, room_id)
|
||||||
|
WHERE room_id = ?;
|
||||||
|
""",
|
||||||
|
(room_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
return self.cursor_to_dict(txn)
|
||||||
|
|
||||||
|
ret = yield self.runInteraction(
|
||||||
|
"get_retention_policy_for_room", get_retention_policy_for_room_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
# If we don't know this room ID, ret will be None, in this case return the default
|
||||||
|
# policy.
|
||||||
|
if not ret:
|
||||||
|
defer.returnValue(
|
||||||
|
{
|
||||||
|
"min_lifetime": self.config.retention_default_min_lifetime,
|
||||||
|
"max_lifetime": self.config.retention_default_max_lifetime,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
row = ret[0]
|
||||||
|
|
||||||
|
# If one of the room's policy's attributes isn't defined, use the matching
|
||||||
|
# attribute from the default policy.
|
||||||
|
# The default values will be None if no default policy has been defined, or if one
|
||||||
|
# of the attributes is missing from the default policy.
|
||||||
|
if row["min_lifetime"] is None:
|
||||||
|
row["min_lifetime"] = self.config.retention_default_min_lifetime
|
||||||
|
|
||||||
|
if row["max_lifetime"] is None:
|
||||||
|
row["max_lifetime"] = self.config.retention_default_max_lifetime
|
||||||
|
|
||||||
|
defer.returnValue(row)
|
||||||
|
|
||||||
|
|
||||||
class RoomStore(RoomWorkerStore, SearchStore):
|
class RoomStore(RoomWorkerStore, SearchStore):
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(RoomStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
self.config = hs.config
|
||||||
|
|
||||||
|
self.register_background_update_handler(
|
||||||
|
"insert_room_retention", self._background_insert_retention,
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _background_insert_retention(self, progress, batch_size):
|
||||||
|
"""Retrieves a list of all rooms within a range and inserts an entry for each of
|
||||||
|
them into the room_retention table.
|
||||||
|
NULLs the property's columns if missing from the retention event in the room's
|
||||||
|
state (or NULLs all of them if there's no retention event in the room's state),
|
||||||
|
so that we fall back to the server's retention policy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
last_room = progress.get("room_id", "")
|
||||||
|
|
||||||
|
def _background_insert_retention_txn(txn):
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
SELECT state.room_id, state.event_id, events.json
|
||||||
|
FROM current_state_events as state
|
||||||
|
LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
|
||||||
|
WHERE state.room_id > ? AND state.type = '%s'
|
||||||
|
ORDER BY state.room_id ASC
|
||||||
|
LIMIT ?;
|
||||||
|
"""
|
||||||
|
% EventTypes.Retention,
|
||||||
|
(last_room, batch_size),
|
||||||
|
)
|
||||||
|
|
||||||
|
rows = self.cursor_to_dict(txn)
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return True
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
if not row["json"]:
|
||||||
|
retention_policy = {}
|
||||||
|
else:
|
||||||
|
ev = json.loads(row["json"])
|
||||||
|
retention_policy = json.dumps(ev["content"])
|
||||||
|
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn=txn,
|
||||||
|
table="room_retention",
|
||||||
|
values={
|
||||||
|
"room_id": row["room_id"],
|
||||||
|
"event_id": row["event_id"],
|
||||||
|
"min_lifetime": retention_policy.get("min_lifetime"),
|
||||||
|
"max_lifetime": retention_policy.get("max_lifetime"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("Inserted %d rows into room_retention", len(rows))
|
||||||
|
|
||||||
|
self._background_update_progress_txn(
|
||||||
|
txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
|
||||||
|
)
|
||||||
|
|
||||||
|
if batch_size > len(rows):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
end = yield self.runInteraction(
|
||||||
|
"insert_room_retention", _background_insert_retention_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
if end:
|
||||||
|
yield self._end_background_update("insert_room_retention")
|
||||||
|
|
||||||
|
defer.returnValue(batch_size)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def store_room(self, room_id, room_creator_user_id, is_public):
|
def store_room(self, room_id, room_creator_user_id, is_public):
|
||||||
"""Stores a room.
|
"""Stores a room.
|
||||||
@ -502,6 +638,35 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
|||||||
txn, event, "content.body", event.content["body"]
|
txn, event, "content.body", event.content["body"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _store_retention_policy_for_room_txn(self, txn, event):
|
||||||
|
if hasattr(event, "content") and (
|
||||||
|
"min_lifetime" in event.content or "max_lifetime" in event.content
|
||||||
|
):
|
||||||
|
if (
|
||||||
|
"min_lifetime" in event.content
|
||||||
|
and not isinstance(event.content.get("min_lifetime"), integer_types)
|
||||||
|
) or (
|
||||||
|
"max_lifetime" in event.content
|
||||||
|
and not isinstance(event.content.get("max_lifetime"), integer_types)
|
||||||
|
):
|
||||||
|
# Ignore the event if one of the value isn't an integer.
|
||||||
|
return
|
||||||
|
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn=txn,
|
||||||
|
table="room_retention",
|
||||||
|
values={
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"min_lifetime": event.content.get("min_lifetime"),
|
||||||
|
"max_lifetime": event.content.get("max_lifetime"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
self._invalidate_cache_and_stream(
|
||||||
|
txn, self.get_retention_policy_for_room, (event.room_id,)
|
||||||
|
)
|
||||||
|
|
||||||
def add_event_report(
|
def add_event_report(
|
||||||
self, room_id, event_id, user_id, reason, content, received_ts
|
self, room_id, event_id, user_id, reason, content, received_ts
|
||||||
):
|
):
|
||||||
@ -683,3 +848,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
|||||||
remote_media_mxcs.append((hostname, media_id))
|
remote_media_mxcs.append((hostname, media_id))
|
||||||
|
|
||||||
return local_media_mxcs, remote_media_mxcs
|
return local_media_mxcs, remote_media_mxcs
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_rooms_for_retention_period_in_range(
|
||||||
|
self, min_ms, max_ms, include_null=False
|
||||||
|
):
|
||||||
|
"""Retrieves all of the rooms within the given retention range.
|
||||||
|
|
||||||
|
Optionally includes the rooms which don't have a retention policy.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
min_ms (int|None): Duration in milliseconds that define the lower limit of
|
||||||
|
the range to handle (exclusive). If None, doesn't set a lower limit.
|
||||||
|
max_ms (int|None): Duration in milliseconds that define the upper limit of
|
||||||
|
the range to handle (inclusive). If None, doesn't set an upper limit.
|
||||||
|
include_null (bool): Whether to include rooms which retention policy is NULL
|
||||||
|
in the returned set.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[str, dict]: The rooms within this range, along with their retention
|
||||||
|
policy. The key is "room_id", and maps to a dict describing the retention
|
||||||
|
policy associated with this room ID. The keys for this nested dict are
|
||||||
|
"min_lifetime" (int|None), and "max_lifetime" (int|None).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_rooms_for_retention_period_in_range_txn(txn):
|
||||||
|
range_conditions = []
|
||||||
|
args = []
|
||||||
|
|
||||||
|
if min_ms is not None:
|
||||||
|
range_conditions.append("max_lifetime > ?")
|
||||||
|
args.append(min_ms)
|
||||||
|
|
||||||
|
if max_ms is not None:
|
||||||
|
range_conditions.append("max_lifetime <= ?")
|
||||||
|
args.append(max_ms)
|
||||||
|
|
||||||
|
# Do a first query which will retrieve the rooms that have a retention policy
|
||||||
|
# in their current state.
|
||||||
|
sql = """
|
||||||
|
SELECT room_id, min_lifetime, max_lifetime FROM room_retention
|
||||||
|
INNER JOIN current_state_events USING (event_id, room_id)
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(range_conditions):
|
||||||
|
sql += " WHERE (" + " AND ".join(range_conditions) + ")"
|
||||||
|
|
||||||
|
if include_null:
|
||||||
|
sql += " OR max_lifetime IS NULL"
|
||||||
|
|
||||||
|
txn.execute(sql, args)
|
||||||
|
|
||||||
|
rows = self.cursor_to_dict(txn)
|
||||||
|
rooms_dict = {}
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
rooms_dict[row["room_id"]] = {
|
||||||
|
"min_lifetime": row["min_lifetime"],
|
||||||
|
"max_lifetime": row["max_lifetime"],
|
||||||
|
}
|
||||||
|
|
||||||
|
if include_null:
|
||||||
|
# If required, do a second query that retrieves all of the rooms we know
|
||||||
|
# of so we can handle rooms with no retention policy.
|
||||||
|
sql = "SELECT DISTINCT room_id FROM current_state_events"
|
||||||
|
|
||||||
|
txn.execute(sql)
|
||||||
|
|
||||||
|
rows = self.cursor_to_dict(txn)
|
||||||
|
|
||||||
|
# If a room isn't already in the dict (i.e. it doesn't have a retention
|
||||||
|
# policy in its state), add it with a null policy.
|
||||||
|
for row in rows:
|
||||||
|
if row["room_id"] not in rooms_dict:
|
||||||
|
rooms_dict[row["room_id"]] = {
|
||||||
|
"min_lifetime": None,
|
||||||
|
"max_lifetime": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
return rooms_dict
|
||||||
|
|
||||||
|
rooms = yield self.runInteraction(
|
||||||
|
"get_rooms_for_retention_period_in_range",
|
||||||
|
get_rooms_for_retention_period_in_range_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(rooms)
|
||||||
|
@ -0,0 +1,33 @@
|
|||||||
|
/* Copyright 2019 New Vector Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Tracks the retention policy of a room.
|
||||||
|
-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
|
||||||
|
-- the room's retention policy state event.
|
||||||
|
-- If a room doesn't have a retention policy state event in its state, both max_lifetime
|
||||||
|
-- and min_lifetime are NULL.
|
||||||
|
CREATE TABLE IF NOT EXISTS room_retention(
|
||||||
|
room_id TEXT,
|
||||||
|
event_id TEXT,
|
||||||
|
min_lifetime BIGINT,
|
||||||
|
max_lifetime BIGINT,
|
||||||
|
|
||||||
|
PRIMARY KEY(room_id, event_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
|
||||||
|
|
||||||
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('insert_room_retention', '{}');
|
@ -86,6 +86,14 @@ def filter_events_for_client(
|
|||||||
|
|
||||||
erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
|
erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
|
||||||
|
|
||||||
|
room_ids = set(e.room_id for e in events)
|
||||||
|
retention_policies = {}
|
||||||
|
|
||||||
|
for room_id in room_ids:
|
||||||
|
retention_policies[room_id] = yield storage.main.get_retention_policy_for_room(
|
||||||
|
room_id
|
||||||
|
)
|
||||||
|
|
||||||
def allowed(event):
|
def allowed(event):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
@ -103,6 +111,18 @@ def filter_events_for_client(
|
|||||||
if not event.is_state() and event.sender in ignore_list:
|
if not event.is_state() and event.sender in ignore_list:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Don't try to apply the room's retention policy if the event is a state event, as
|
||||||
|
# MSC1763 states that retention is only considered for non-state events.
|
||||||
|
if not event.is_state():
|
||||||
|
retention_policy = retention_policies[event.room_id]
|
||||||
|
max_lifetime = retention_policy.get("max_lifetime")
|
||||||
|
|
||||||
|
if max_lifetime is not None:
|
||||||
|
oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
|
||||||
|
|
||||||
|
if event.origin_server_ts < oldest_allowed_ts:
|
||||||
|
return None
|
||||||
|
|
||||||
if event.event_id in always_include_ids:
|
if event.event_id in always_include_ids:
|
||||||
return event
|
return event
|
||||||
|
|
||||||
|
293
tests/rest/client/test_retention.py
Normal file
293
tests/rest/client/test_retention.py
Normal file
@ -0,0 +1,293 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2019 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
from mock import Mock
|
||||||
|
|
||||||
|
from synapse.api.constants import EventTypes
|
||||||
|
from synapse.rest import admin
|
||||||
|
from synapse.rest.client.v1 import login, room
|
||||||
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
|
||||||
|
one_hour_ms = 3600000
|
||||||
|
one_day_ms = one_hour_ms * 24
|
||||||
|
|
||||||
|
|
||||||
|
class RetentionTestCase(unittest.HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
admin.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def make_homeserver(self, reactor, clock):
|
||||||
|
config = self.default_config()
|
||||||
|
config["retention"] = {
|
||||||
|
"enabled": True,
|
||||||
|
"default_policy": {
|
||||||
|
"min_lifetime": one_day_ms,
|
||||||
|
"max_lifetime": one_day_ms * 3,
|
||||||
|
},
|
||||||
|
"allowed_lifetime_min": one_day_ms,
|
||||||
|
"allowed_lifetime_max": one_day_ms * 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
self.hs = self.setup_test_homeserver(config=config)
|
||||||
|
return self.hs
|
||||||
|
|
||||||
|
def prepare(self, reactor, clock, homeserver):
|
||||||
|
self.user_id = self.register_user("user", "password")
|
||||||
|
self.token = self.login("user", "password")
|
||||||
|
|
||||||
|
def test_retention_state_event(self):
|
||||||
|
"""Tests that the server configuration can limit the values a user can set to the
|
||||||
|
room's retention policy.
|
||||||
|
"""
|
||||||
|
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id=room_id,
|
||||||
|
event_type=EventTypes.Retention,
|
||||||
|
body={"max_lifetime": one_day_ms * 4},
|
||||||
|
tok=self.token,
|
||||||
|
expect_code=400,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id=room_id,
|
||||||
|
event_type=EventTypes.Retention,
|
||||||
|
body={"max_lifetime": one_hour_ms},
|
||||||
|
tok=self.token,
|
||||||
|
expect_code=400,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_retention_event_purged_with_state_event(self):
|
||||||
|
"""Tests that expired events are correctly purged when the room's retention policy
|
||||||
|
is defined by a state event.
|
||||||
|
"""
|
||||||
|
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
|
||||||
|
# Set the room's retention period to 2 days.
|
||||||
|
lifetime = one_day_ms * 2
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id=room_id,
|
||||||
|
event_type=EventTypes.Retention,
|
||||||
|
body={"max_lifetime": lifetime},
|
||||||
|
tok=self.token,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._test_retention_event_purged(room_id, one_day_ms * 1.5)
|
||||||
|
|
||||||
|
def test_retention_event_purged_without_state_event(self):
|
||||||
|
"""Tests that expired events are correctly purged when the room's retention policy
|
||||||
|
is defined by the server's configuration's default retention policy.
|
||||||
|
"""
|
||||||
|
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
|
||||||
|
self._test_retention_event_purged(room_id, one_day_ms * 2)
|
||||||
|
|
||||||
|
def test_visibility(self):
|
||||||
|
"""Tests that synapse.visibility.filter_events_for_client correctly filters out
|
||||||
|
outdated events
|
||||||
|
"""
|
||||||
|
store = self.hs.get_datastore()
|
||||||
|
storage = self.hs.get_storage()
|
||||||
|
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
events = []
|
||||||
|
|
||||||
|
# Send a first event, which should be filtered out at the end of the test.
|
||||||
|
resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
|
||||||
|
|
||||||
|
# Get the event from the store so that we end up with a FrozenEvent that we can
|
||||||
|
# give to filter_events_for_client. We need to do this now because the event won't
|
||||||
|
# be in the database anymore after it has expired.
|
||||||
|
events.append(self.get_success(store.get_event(resp.get("event_id"))))
|
||||||
|
|
||||||
|
# Advance the time by 2 days. We're using the default retention policy, therefore
|
||||||
|
# after this the first event will still be valid.
|
||||||
|
self.reactor.advance(one_day_ms * 2 / 1000)
|
||||||
|
|
||||||
|
# Send another event, which shouldn't get filtered out.
|
||||||
|
resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
|
||||||
|
|
||||||
|
valid_event_id = resp.get("event_id")
|
||||||
|
|
||||||
|
events.append(self.get_success(store.get_event(valid_event_id)))
|
||||||
|
|
||||||
|
# Advance the time by anothe 2 days. After this, the first event should be
|
||||||
|
# outdated but not the second one.
|
||||||
|
self.reactor.advance(one_day_ms * 2 / 1000)
|
||||||
|
|
||||||
|
# Run filter_events_for_client with our list of FrozenEvents.
|
||||||
|
filtered_events = self.get_success(
|
||||||
|
filter_events_for_client(storage, self.user_id, events)
|
||||||
|
)
|
||||||
|
|
||||||
|
# We should only get one event back.
|
||||||
|
self.assertEqual(len(filtered_events), 1, filtered_events)
|
||||||
|
# That event should be the second, not outdated event.
|
||||||
|
self.assertEqual(filtered_events[0].event_id, valid_event_id, filtered_events)
|
||||||
|
|
||||||
|
def _test_retention_event_purged(self, room_id, increment):
|
||||||
|
# Get the create event to, later, check that we can still access it.
|
||||||
|
message_handler = self.hs.get_message_handler()
|
||||||
|
create_event = self.get_success(
|
||||||
|
message_handler.get_room_data(self.user_id, room_id, EventTypes.Create)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send a first event to the room. This is the event we'll want to be purged at the
|
||||||
|
# end of the test.
|
||||||
|
resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
|
||||||
|
|
||||||
|
expired_event_id = resp.get("event_id")
|
||||||
|
|
||||||
|
# Check that we can retrieve the event.
|
||||||
|
expired_event = self.get_event(room_id, expired_event_id)
|
||||||
|
self.assertEqual(
|
||||||
|
expired_event.get("content", {}).get("body"), "1", expired_event
|
||||||
|
)
|
||||||
|
|
||||||
|
# Advance the time.
|
||||||
|
self.reactor.advance(increment / 1000)
|
||||||
|
|
||||||
|
# Send another event. We need this because the purge job won't purge the most
|
||||||
|
# recent event in the room.
|
||||||
|
resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
|
||||||
|
|
||||||
|
valid_event_id = resp.get("event_id")
|
||||||
|
|
||||||
|
# Advance the time again. Now our first event should have expired but our second
|
||||||
|
# one should still be kept.
|
||||||
|
self.reactor.advance(increment / 1000)
|
||||||
|
|
||||||
|
# Check that the event has been purged from the database.
|
||||||
|
self.get_event(room_id, expired_event_id, expected_code=404)
|
||||||
|
|
||||||
|
# Check that the event that hasn't been purged can still be retrieved.
|
||||||
|
valid_event = self.get_event(room_id, valid_event_id)
|
||||||
|
self.assertEqual(valid_event.get("content", {}).get("body"), "2", valid_event)
|
||||||
|
|
||||||
|
# Check that we can still access state events that were sent before the event that
|
||||||
|
# has been purged.
|
||||||
|
self.get_event(room_id, create_event.event_id)
|
||||||
|
|
||||||
|
def get_event(self, room_id, event_id, expected_code=200):
|
||||||
|
url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id)
|
||||||
|
|
||||||
|
request, channel = self.make_request("GET", url, access_token=self.token)
|
||||||
|
self.render(request)
|
||||||
|
|
||||||
|
self.assertEqual(channel.code, expected_code, channel.result)
|
||||||
|
|
||||||
|
return channel.json_body
|
||||||
|
|
||||||
|
|
||||||
|
class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
admin.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def make_homeserver(self, reactor, clock):
|
||||||
|
config = self.default_config()
|
||||||
|
config["retention"] = {
|
||||||
|
"enabled": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_federation_client = Mock(spec=["backfill"])
|
||||||
|
|
||||||
|
self.hs = self.setup_test_homeserver(
|
||||||
|
config=config, federation_client=mock_federation_client,
|
||||||
|
)
|
||||||
|
return self.hs
|
||||||
|
|
||||||
|
def prepare(self, reactor, clock, homeserver):
|
||||||
|
self.user_id = self.register_user("user", "password")
|
||||||
|
self.token = self.login("user", "password")
|
||||||
|
|
||||||
|
def test_no_default_policy(self):
|
||||||
|
"""Tests that an event doesn't get expired if there is neither a default retention
|
||||||
|
policy nor a policy specific to the room.
|
||||||
|
"""
|
||||||
|
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
|
||||||
|
self._test_retention(room_id)
|
||||||
|
|
||||||
|
def test_state_policy(self):
|
||||||
|
"""Tests that an event gets correctly expired if there is no default retention
|
||||||
|
policy but there's a policy specific to the room.
|
||||||
|
"""
|
||||||
|
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
|
||||||
|
# Set the maximum lifetime to 35 days so that the first event gets expired but not
|
||||||
|
# the second one.
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id=room_id,
|
||||||
|
event_type=EventTypes.Retention,
|
||||||
|
body={"max_lifetime": one_day_ms * 35},
|
||||||
|
tok=self.token,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._test_retention(room_id, expected_code_for_first_event=404)
|
||||||
|
|
||||||
|
def _test_retention(self, room_id, expected_code_for_first_event=200):
|
||||||
|
# Send a first event to the room. This is the event we'll want to be purged at the
|
||||||
|
# end of the test.
|
||||||
|
resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
|
||||||
|
|
||||||
|
first_event_id = resp.get("event_id")
|
||||||
|
|
||||||
|
# Check that we can retrieve the event.
|
||||||
|
expired_event = self.get_event(room_id, first_event_id)
|
||||||
|
self.assertEqual(
|
||||||
|
expired_event.get("content", {}).get("body"), "1", expired_event
|
||||||
|
)
|
||||||
|
|
||||||
|
# Advance the time by a month.
|
||||||
|
self.reactor.advance(one_day_ms * 30 / 1000)
|
||||||
|
|
||||||
|
# Send another event. We need this because the purge job won't purge the most
|
||||||
|
# recent event in the room.
|
||||||
|
resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
|
||||||
|
|
||||||
|
second_event_id = resp.get("event_id")
|
||||||
|
|
||||||
|
# Advance the time by another month.
|
||||||
|
self.reactor.advance(one_day_ms * 30 / 1000)
|
||||||
|
|
||||||
|
# Check if the event has been purged from the database.
|
||||||
|
first_event = self.get_event(
|
||||||
|
room_id, first_event_id, expected_code=expected_code_for_first_event
|
||||||
|
)
|
||||||
|
|
||||||
|
if expected_code_for_first_event == 200:
|
||||||
|
self.assertEqual(
|
||||||
|
first_event.get("content", {}).get("body"), "1", first_event
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that the event that hasn't been purged can still be retrieved.
|
||||||
|
second_event = self.get_event(room_id, second_event_id)
|
||||||
|
self.assertEqual(second_event.get("content", {}).get("body"), "2", second_event)
|
||||||
|
|
||||||
|
def get_event(self, room_id, event_id, expected_code=200):
|
||||||
|
url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id)
|
||||||
|
|
||||||
|
request, channel = self.make_request("GET", url, access_token=self.token)
|
||||||
|
self.render(request)
|
||||||
|
|
||||||
|
self.assertEqual(channel.code, expected_code, channel.result)
|
||||||
|
|
||||||
|
return channel.json_body
|
Loading…
Reference in New Issue
Block a user