Merge branch 'develop' into application-services-txn-reliability

Conflicts:
	synapse/storage/__init__.py
This commit is contained in:
Kegan Dougal 2015-03-26 10:07:59 +00:00
commit 4edcbcee3b
45 changed files with 1251 additions and 731 deletions

View file

@ -14,15 +14,12 @@
# limitations under the License.
from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
from ._base import Cache
from .directory import DirectoryStore
from .feedback import FeedbackStore
from .events import EventsStore
from .presence import PresenceStore
from .profile import ProfileStore
from .registration import RegistrationStore
@ -41,11 +38,6 @@ from .state import StateStore
from .signatures import SignatureStore
from .filtering import FilteringStore
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
import fnmatch
import imp
@ -63,16 +55,14 @@ SCHEMA_VERSION = 15
dir_path = os.path.abspath(os.path.dirname(__file__))
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
something went wrong.
"""
pass
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120*1000
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
ApplicationServiceStore,
@ -83,6 +73,7 @@ class DataStore(RoomMemberStore, RoomStore,
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,
EventsStore,
):
def __init__(self, hs):
@ -92,424 +83,28 @@ class DataStore(RoomMemberStore, RoomStore,
self.min_token_deferred = self._get_min_token()
self.min_token = None
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False,
is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
yield self.min_token_deferred
self.min_token -= 1
stream_ordering = self.min_token
try:
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
current_state=current_state,
)
except _RollbackButIsFineException:
pass
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
allow_none=False):
"""Get an event from the database by event_id.
Args:
event_id (str): The event_id of the event to fetch
check_redacted (bool): If True, check if event has been redacted
and redact it.
get_prev_content (bool): If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected (bool): If True return rejected events.
allow_none (bool): If True, return None if no event found, if
False throw an exception.
Returns:
Deferred : A FrozenEvent.
"""
event = yield self.runInteraction(
"get_event", self._get_event_txn,
event_id,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
if not event and not allow_none:
raise RuntimeError("Could not find event %s" % (event_id,))
defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
# Remove the any existing cache entries for the event_id
self._get_event_cache.pop(event.event_id)
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
(event.room_id,)
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
if event.is_state() and is_new_state:
if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for prev_state_id, _ in event.prev_state:
self._simple_delete_txn(
txn,
table="state_forward_extremities",
keyvalues={
"event_id": prev_state_id,
}
)
outlier = event.internal_metadata.is_outlier()
if not outlier:
self._store_state_groups_txn(txn, event, context)
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
self._handle_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
)
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
keyvalues={"event_id": event.event_id},
retcol="event_id",
allow_none=True,
)
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
# If we have already persisted this event, we don't need to do any
# more processing.
# The processing above must be done on every call to persist event,
# since they might not have happened on previous calls. For example,
# if we are persisting an event that we had persisted as an outlier,
# but is no longer one.
if have_persisted:
if not outlier:
sql = (
"UPDATE event_json SET internal_metadata = ?"
" WHERE event_id = ?"
)
txn.execute(
sql,
(metadata_json.decode("UTF-8"), event.event_id,)
)
sql = (
"UPDATE events SET outlier = 0"
" WHERE event_id = ?"
)
txn.execute(
sql,
(event.event_id,)
)
return
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
self._store_feedback_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
event_dict = {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
self._simple_insert_txn(
txn,
table="event_json",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": metadata_json.decode("UTF-8"),
"json": encode_canonical_json(event_dict).decode("UTF-8"),
},
or_replace=True,
)
content = encode_canonical_json(
event.content
).decode("UTF-8")
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
"content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
}
if stream_ordering is not None:
vals["stream_ordering"] = stream_ordering
unrec = {
k: v
for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
"signatures",
"hashes",
"prev_events",
]
}
vals["unrecognized_keys"] = encode_canonical_json(
unrec
).decode("UTF-8")
try:
self._simple_insert_txn(
txn,
"events",
vals,
or_replace=(not outlier),
or_ignore=bool(outlier),
)
except:
logger.warn(
"Failed to persist, probably duplicate: %s",
event.event_id,
exc_info=True,
)
raise _RollbackButIsFineException("_persist_event")
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
}
# TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
self._simple_insert_txn(
txn,
"state_events",
vals,
or_replace=True,
)
if is_new_state and not context.rejected:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for e_id, h in event.prev_state:
self._simple_insert_txn(
txn,
table="event_edges",
values={
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
"is_state": 1,
},
or_ignore=True,
)
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
txn, event.event_id, hash_alg, hash_bytes,
)
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_prev_event_hash_txn(
txn, event.event_id, prev_event_id, alg, hash_bytes
)
for auth_id, _ in event.auth_events:
self._simple_insert_txn(
txn,
table="event_auth",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
},
or_ignore=True,
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
txn, event.event_id, ref_alg, ref_hash_bytes
)
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
self._get_event_cache.pop(event.redacts)
txn.execute(
"INSERT OR IGNORE INTO redactions "
"(event_id, redacts) VALUES (?,?)",
(event.event_id, event.redacts)
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
)
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
del_sql = (
"SELECT event_id FROM redactions WHERE redacts = e.event_id "
"LIMIT 1"
)
sql = (
"SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
) % {
"redacted": del_sql,
}
if event_type and state_key is not None:
sql += " AND s.type = ? AND s.state_key = ? "
args = (room_id, event_type, state_key)
elif event_type:
sql += " AND s.type = ?"
args = (room_id, event_type)
else:
args = (room_id, )
results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results)
defer.returnValue(events)
@defer.inlineCallbacks
def get_room_name_and_aliases(self, room_id):
del_sql = (
"SELECT event_id FROM redactions WHERE redacts = e.event_id "
"LIMIT 1"
)
sql = (
"SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
) % {
"redacted": del_sql,
}
sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
sql += " OR s.type = 'm.room.aliases')"
args = (room_id,)
results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results)
name = None
aliases = []
for e in events:
if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name']
elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases))
@defer.inlineCallbacks
def _get_min_token(self):
row = yield self._execute(
"_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
)
self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
self.min_token = min(self.min_token, -1)
logger.debug("min_token is: %s", self.min_token)
defer.returnValue(self.min_token)
def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
return self._simple_insert(
now = int(self._clock.time_msec())
key = (user.to_string(), access_token, device_id, ip)
try:
last_seen = self.client_ip_last_seen.get(*key)
except KeyError:
last_seen = None
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
self.client_ip_last_seen.prefill(*key + (now,))
yield self._simple_insert(
"user_ips",
{
"user": user.to_string(),
@ -517,8 +112,9 @@ class DataStore(RoomMemberStore, RoomStore,
"device_id": device_id,
"ip": ip,
"user_agent": user_agent,
"last_seen": int(self._clock.time_msec()),
}
"last_seen": now,
},
desc="insert_client_ip",
)
def get_user_ip_and_agents(self, user):
@ -528,38 +124,7 @@ class DataStore(RoomMemberStore, RoomStore,
retcols=[
"device_id", "access_token", "ip", "user_agent", "last_seen"
],
)
def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Returns:
dict: Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps to
None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
"WHERE e.event_id = ?"
)
res = {}
for event_id in event_ids:
txn.execute(sql, (event_id,))
row = txn.fetchone()
if row:
_, rejected = row
res[event_id] = rejected
return res
return self.runInteraction(
"have_events", f,
desc="get_user_ip_and_agents",
)