synapse-product/synapse/appservice/__init__.py

131 lines
4.7 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import EventTypes
import logging
import re
logger = logging.getLogger(__name__)
class ApplicationService(object):
"""Defines an application service. This definition is mostly what is
provided to the /register AS API.
Provides methods to check if this service is "interested" in events.
"""
NS_USERS = "users"
NS_ALIASES = "aliases"
NS_ROOMS = "rooms"
# The ordering here is important as it is used to map database values (which
# are stored as ints representing the position in this list) to namespace
# values.
NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
def __init__(self, token, url=None, namespaces=None, txn_id=None):
self.token = token
self.url = url
self.namespaces = self._check_namespaces(namespaces)
self.txn_id = None
def _check_namespaces(self, namespaces):
# Sanity check that it is of the form:
# {
# users: ["regex",...],
# aliases: ["regex",...],
# rooms: ["regex",...],
# }
if not namespaces:
return None
for ns in ApplicationService.NS_LIST:
if type(namespaces[ns]) != list:
raise ValueError("Bad namespace value for '%s'", ns)
for regex in namespaces[ns]:
if not isinstance(regex, basestring):
raise ValueError("Expected string regex for ns '%s'", ns)
return namespaces
def _matches_regex(self, test_string, namespace_key):
if not isinstance(test_string, basestring):
logger.warning(
"Expected a string to test regex against, but got %s",
test_string
)
return False
for regex in self.namespaces[namespace_key]:
if re.match(regex, test_string):
return True
return False
def _matches_user(self, event):
if (hasattr(event, "sender") and
self._matches_regex(
event.sender, ApplicationService.NS_USERS)):
return True
# also check m.room.member state key
if (hasattr(event, "type") and event.type == EventTypes.Member
and hasattr(event, "state_key")
and self._matches_regex(
event.state_key, ApplicationService.NS_USERS)):
return True
return False
def _matches_room_id(self, event):
if hasattr(event, "room_id"):
return self._matches_regex(
event.room_id, ApplicationService.NS_ROOMS
)
return False
def _matches_aliases(self, event, alias_list):
for alias in alias_list:
if self._matches_regex(alias, ApplicationService.NS_ALIASES):
return True
return False
def is_interested(self, event, restrict_to=None, aliases_for_event=None):
"""Check if this service is interested in this event.
Args:
event(Event): The event to check.
restrict_to(str): The namespace to restrict regex tests to.
aliases_for_event(list): A list of all the known room aliases for
this event.
Returns:
bool: True if this service would like to know about this event.
"""
if aliases_for_event is None:
aliases_for_event = []
if restrict_to and restrict_to not in ApplicationService.NS_LIST:
# this is a programming error, so fail early and raise a general
# exception
raise Exception("Unexpected restrict_to value: %s". restrict_to)
if not restrict_to:
return (self._matches_user(event)
or self._matches_aliases(event, aliases_for_event)
or self._matches_room_id(event))
elif restrict_to == ApplicationService.NS_ALIASES:
return self._matches_aliases(event, aliases_for_event)
elif restrict_to == ApplicationService.NS_ROOMS:
return self._matches_room_id(event)
elif restrict_to == ApplicationService.NS_USERS:
return self._matches_user(event)
def __str__(self):
return "ApplicationService: %s" % (self.__dict__,)