Merge branch 'develop' of github.com:matrix-org/synapse into erikj/replication_noop

This commit is contained in:
Erik Johnston 2016-10-11 14:08:29 +01:00
commit 3061dac53e
28 changed files with 605 additions and 337 deletions

View file

@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore):
)
def get_app_services(self):
return defer.succeed(self.services_cache)
return self.services_cache
def get_app_service_by_user_id(self, user_id):
"""Retrieve an application service from their user ID.
@ -54,8 +54,8 @@ class ApplicationServiceStore(SQLBaseStore):
"""
for service in self.services_cache:
if service.sender == user_id:
return defer.succeed(service)
return defer.succeed(None)
return service
return None
def get_app_service_by_token(self, token):
"""Get the application service with the given appservice token.
@ -67,8 +67,8 @@ class ApplicationServiceStore(SQLBaseStore):
"""
for service in self.services_cache:
if service.token == token:
return defer.succeed(service)
return defer.succeed(None)
return service
return None
def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service.
@ -163,7 +163,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
["as_id"]
)
# NB: This assumes this class is linked with ApplicationServiceStore
as_list = yield self.get_app_services()
as_list = self.get_app_services()
services = []
for res in results:

View file

@ -1355,39 +1355,53 @@ class EventsStore(SQLBaseStore):
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
events = self._get_events_txn(txn, event_ids)
rows_to_update = []
rows = []
for event in events:
try:
event_id = event.event_id
origin_server_ts = event.origin_server_ts
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
chunks = [
event_ids[i:i + 100]
for i in xrange(0, len(event_ids), 100)
]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
txn,
table="event_json",
column="event_id",
iterable=chunk,
retcols=["event_id", "json"],
keyvalues={},
)
rows.append((origin_server_ts, event_id))
for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
rows_to_update.append((origin_server_ts, event_id))
sql = (
"UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
)
for index in range(0, len(rows), INSERT_CLUMP_SIZE):
clump = rows[index:index + INSERT_CLUMP_SIZE]
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows)
"rows_inserted": rows_inserted + len(rows_to_update)
}
self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
)
return len(rows)
return len(rows_to_update)
result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn

View file

@ -307,6 +307,9 @@ class StateStore(SQLBaseStore):
def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
results = {group: {} for group in groups}
if types is not None:
types = list(set(types)) # deduplicate types list
if isinstance(self.database_engine, PostgresEngine):
# Temporarily disable sequential scans in this transaction. This is
# a temporary hack until we can add the right indices in
@ -375,10 +378,35 @@ class StateStore(SQLBaseStore):
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
for group in groups:
group_tree = [group]
next_group = group
while next_group:
# We did this before by getting the list of group ids, and
# then passing that list to sqlite to get latest event for
# each (type, state_key). However, that was terribly slow
# without the right indicies (which we can't add until
# after we finish deduping state, which requires this func)
args = [next_group]
if types:
args.extend(i for typ in types for i in typ)
txn.execute(
"SELECT type, state_key, event_id FROM state_groups_state"
" WHERE state_group = ? %s" % (where_clause,),
args
)
rows = txn.fetchall()
results[group].update({
(typ, state_key): event_id
for typ, state_key, event_id in rows
if (typ, state_key) not in results[group]
})
# If the lengths match then we must have all the types,
# so no need to go walk further down the tree.
if types is not None and len(results[group]) == len(types):
break
next_group = self._simple_select_one_onecol_txn(
txn,
table="state_group_edges",
@ -386,28 +414,6 @@ class StateStore(SQLBaseStore):
retcol="prev_state_group",
allow_none=True,
)
if next_group:
group_tree.append(next_group)
sql = ("""
SELECT type, state_key, event_id FROM state_groups_state
INNER JOIN (
SELECT type, state_key, max(state_group) as state_group
FROM state_groups_state
WHERE state_group IN (%s) %s
GROUP BY type, state_key
) USING (type, state_key, state_group);
""") % (",".join("?" for _ in group_tree), where_clause,)
args = list(group_tree)
if types is not None:
args.extend([i for typ in types for i in typ])
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
for row in rows:
key = (row["type"], row["state_key"])
results[group][key] = row["event_id"]
return results