Merge remote-tracking branch 'origin/develop' into test-sqlite-memory

Conflicts:
	synapse/storage/pdu.py
This commit is contained in:
Paul "LeoNerd" Evans 2014-09-12 17:20:06 +01:00
commit 1c51c8ab7d
32 changed files with 7358 additions and 3859 deletions

View file

@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.util.logutils import log_function
import collections
import copy
@ -131,6 +132,7 @@ class SQLBaseStore(object):
self._simple_insert_txn, table, values, or_replace=or_replace
)
@log_function
def _simple_insert_txn(self, txn, table, values, or_replace=False):
sql = "%s INTO %s (%s) VALUES(%s)" % (
("INSERT OR REPLACE" if or_replace else "INSERT"),
@ -138,6 +140,12 @@ class SQLBaseStore(object):
", ".join(k for k in values),
", ".join("?" for k in values)
)
logger.debug(
"[SQL] %s Args=%s Func=%s",
sql, values.values(),
)
txn.execute(sql, values.values())
return txn.lastrowid

View file

@ -17,6 +17,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore, Table, JoinHelper
from synapse.federation.units import Pdu
from synapse.util.logutils import log_function
from collections import namedtuple
@ -516,7 +517,7 @@ class StatePduStore(SQLBaseStore):
if not current:
logger.debug("get_unresolved_state_tree No current state.")
return return_value
return (return_value, None)
return_value.current_branch.append(current)
@ -625,53 +626,6 @@ class StatePduStore(SQLBaseStore):
return result
def get_next_missing_pdu(self, new_pdu):
"""When we get a new state pdu we need to check whether we need to do
any conflict resolution, if we do then we need to check if we need
to go back and request some more state pdus that we haven't seen yet.
Args:
txn
new_pdu
Returns:
PduIdTuple: A pdu that we are missing, or None if we have all the
pdus required to do the conflict resolution.
"""
return self.runInteraction(
self._get_next_missing_pdu, new_pdu
)
def _get_next_missing_pdu(self, txn, new_pdu):
logger.debug(
"get_next_missing_pdu %s %s",
new_pdu.pdu_id, new_pdu.origin
)
current = self._get_current_interaction(
txn,
new_pdu.context, new_pdu.pdu_type, new_pdu.state_key
)
if (not current or not current.prev_state_id
or not current.prev_state_origin):
return None
# Oh look, it's a straight clobber, so wooooo almost no-op.
if (new_pdu.prev_state_id == current.pdu_id
and new_pdu.prev_state_origin == current.origin):
return None
enum_branches = self._enumerate_state_branches(txn, new_pdu, current)
for branch, prev_state, state in enum_branches:
if not state:
return PduIdTuple(
prev_state.prev_state_id,
prev_state.prev_state_origin
)
return None
def handle_new_state(self, new_pdu):
"""Actually perform conflict resolution on the new_pdu on the
assumption we have all the pdus required to perform it.
@ -755,24 +709,11 @@ class StatePduStore(SQLBaseStore):
return is_current
@classmethod
@log_function
def _enumerate_state_branches(cls, txn, pdu_a, pdu_b):
def _enumerate_state_branches(self, txn, pdu_a, pdu_b):
branch_a = pdu_a
branch_b = pdu_b
get_query = (
"SELECT %(fields)s FROM %(pdus)s as p "
"LEFT JOIN %(state)s as s "
"ON p.pdu_id = s.pdu_id AND p.origin = s.origin "
"WHERE p.pdu_id = ? AND p.origin = ? "
) % {
"fields": _pdu_state_joiner.get_fields(
PdusTable="p", StatePdusTable="s"),
"pdus": PdusTable.table_name,
"state": StatePdusTable.table_name,
}
while True:
if (branch_a.pdu_id == branch_b.pdu_id
and branch_a.origin == branch_b.origin):
@ -804,13 +745,12 @@ class StatePduStore(SQLBaseStore):
branch_a.prev_state_origin
)
logger.debug("getting branch_a prev %s", pdu_tuple)
txn.execute(get_query, pdu_tuple)
prev_branch = branch_a
res = txn.fetchone()
branch_a = PduEntry(*res) if res else None
logger.debug("getting branch_a prev %s", pdu_tuple)
branch_a = self._get_pdu_tuple(txn, *pdu_tuple)
if branch_a:
branch_a = Pdu.from_pdu_tuple(branch_a)
logger.debug("branch_a=%s", branch_a)
@ -823,14 +763,13 @@ class StatePduStore(SQLBaseStore):
branch_b.prev_state_id,
branch_b.prev_state_origin
)
txn.execute(get_query, pdu_tuple)
logger.debug("getting branch_b prev %s", pdu_tuple)
prev_branch = branch_b
res = txn.fetchone()
branch_b = PduEntry(*res) if res else None
logger.debug("getting branch_b prev %s", pdu_tuple)
branch_b = self._get_pdu_tuple(txn, *pdu_tuple)
if branch_b:
branch_b = Pdu.from_pdu_tuple(branch_b)
logger.debug("branch_b=%s", branch_b)

View file

@ -18,6 +18,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.api.constants import Membership
from synapse.util.logutils import log_function
import logging
@ -29,8 +30,18 @@ class RoomMemberStore(SQLBaseStore):
def _store_room_member_txn(self, txn, event):
"""Store a room member in the database.
"""
target_user_id = event.state_key
domain = self.hs.parse_userid(target_user_id).domain
try:
target_user_id = event.state_key
domain = self.hs.parse_userid(target_user_id).domain
except:
logger.exception("Failed to parse target_user_id=%s", target_user_id)
raise
logger.debug(
"_store_room_member_txn: target_user_id=%s, membership=%s",
target_user_id,
event.membership,
)
self._simple_insert_txn(
txn,
@ -51,12 +62,30 @@ class RoomMemberStore(SQLBaseStore):
"VALUES (?, ?)"
)
txn.execute(sql, (event.room_id, domain))
else:
sql = (
"DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
elif event.membership != Membership.INVITE:
# Check if this was the last person to have left.
member_events = self._get_members_query_txn(
txn,
where_clause="c.room_id = ? AND m.membership = ?",
where_values=(event.room_id, Membership.JOIN,)
)
txn.execute(sql, (event.room_id, domain))
joined_domains = set()
for e in member_events:
try:
joined_domains.add(
self.hs.parse_userid(e.state_key).domain
)
except:
# FIXME: How do we deal with invalid user ids in the db?
logger.exception("Invalid user_id: %s", event.state_key)
if domain not in joined_domains:
sql = (
"DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
)
txn.execute(sql, (event.room_id, domain))
@defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
@ -146,8 +175,13 @@ class RoomMemberStore(SQLBaseStore):
vals = where_dict.values()
return self._get_members_query(clause, vals)
@defer.inlineCallbacks
def _get_members_query(self, where_clause, where_values):
return self._db_pool.runInteraction(
self._get_members_query_txn,
where_clause, where_values
)
def _get_members_query_txn(self, txn, where_clause, where_values):
sql = (
"SELECT e.* FROM events as e "
"INNER JOIN room_memberships as m "
@ -157,12 +191,11 @@ class RoomMemberStore(SQLBaseStore):
"WHERE %s "
) % (where_clause,)
rows = yield self._execute_and_decode(sql, *where_values)
txn.execute(sql, where_values)
rows = self.cursor_to_dict(txn)
# logger.debug("_get_members_query Got rows %s", rows)
results = yield self._parse_events(rows)
defer.returnValue(results)
results = self._parse_events_txn(txn, rows)
return results
@defer.inlineCallbacks
def user_rooms_intersect(self, user_id_list):