Implement restrictions for power levels

This commit is contained in:
Brendan Abolivier 2019-07-03 11:45:07 +01:00
parent ee0ee97447
commit 8636ec042b
No known key found for this signature in database
GPG Key ID: 1E015C145F1916CD

View File

@ -80,6 +80,7 @@ class RoomAccessRules(object):
"""
is_direct = config.get("is_direct")
rules_in_initial_state = False
rule = ""
# If there's a rules event in the initial state, check if it complies with the
# spec for im.vector.room.access_rules and deny the request if not.
@ -123,6 +124,22 @@ class RoomAccessRules(object):
}
})
rule = default_rule
# Check if the creator can override values for the power levels.
allowed = self._is_power_level_content_allowed(
config.get("power_level_content_override", {}), rule,
)
if not allowed:
raise SynapseError(400, "Invalid power levels content override")
# Second loop for events we need to know the current rule to process.
for event in config.get("initial_state", []):
if event["type"] == EventTypes.PowerLevels:
allowed = self._is_power_level_content_allowed(event["content"], rule)
if not allowed:
raise SynapseError(400, "Invalid power levels content")
@defer.inlineCallbacks
def check_threepid_can_be_invited(self, medium, address, state_events):
"""Implements synapse.events.ThirdPartyEventRules.check_threepid_can_be_invited
@ -178,6 +195,11 @@ class RoomAccessRules(object):
rule = self._get_rule_from_state(state_events)
# Special-case the power levels event because it makes more sense to check it
# here.
if event.type == EventTypes.PowerLevels:
return self._is_power_level_content_allowed(event.content, rule)
if rule == ACCESS_RULE_RESTRICTED:
ret = self._apply_restricted(event)
elif rule == ACCESS_RULE_UNRESTRICTED:
@ -319,6 +341,41 @@ class RoomAccessRules(object):
return True
def _is_power_level_content_allowed(self, content, access_rule):
"""Denies a power level events that sets 'users_default' to a non-0 value, and
sets the PL of a user that'd be blacklisted in restricted mode to a non-default
value.
Args:
content (dict[]): The content of the m.room.power_levels event to check.
access_rule (str): The access rule in place in this room.
Returns:
bool, True if the event can be allowed, False otherwise.
"""
# Blacklisted servers shouldn't have any restriction in "direct" mode, so always
# accept the event.
if access_rule == ACCESS_RULE_DIRECT:
return True
# If users_default is explicitly set to a non-0 value, deny the event.
users_default = content.get('users_default', 0)
if users_default:
return False
users = content.get('users', {})
for user_id, power_level in users.items():
server_name = get_domain_from_id(user_id)
# Check the domain against the blacklist. If found, and the PL isn't 0, deny
# the event.
if (
server_name in self.domains_forbidden_when_restricted
and power_level != 0
):
return False
return True
@staticmethod
def _get_rule_from_state(state_events):
"""Extract the rule to be applied from the given set of state events.