mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-12-20 19:54:19 -05:00
Merge pull request #6089 from matrix-org/erikj/cleanup_user_ips
Move last seen info into devices table
This commit is contained in:
commit
9614d3c9d1
1
changelog.d/6089.misc
Normal file
1
changelog.d/6089.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Move last seen info into devices table.
|
@ -85,6 +85,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||||||
"user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
|
"user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Update the last seen info in devices.
|
||||||
|
self.register_background_update_handler(
|
||||||
|
"devices_last_seen", self._devices_last_seen_update
|
||||||
|
)
|
||||||
|
|
||||||
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
|
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
|
||||||
self._batch_row_update = {}
|
self._batch_row_update = {}
|
||||||
|
|
||||||
@ -354,6 +359,21 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||||||
},
|
},
|
||||||
lock=False,
|
lock=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Technically an access token might not be associated with
|
||||||
|
# a device so we need to check.
|
||||||
|
if device_id:
|
||||||
|
self._simple_upsert_txn(
|
||||||
|
txn,
|
||||||
|
table="devices",
|
||||||
|
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||||
|
values={
|
||||||
|
"user_agent": user_agent,
|
||||||
|
"last_seen": last_seen,
|
||||||
|
"ip": ip,
|
||||||
|
},
|
||||||
|
lock=False,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Failed to upsert, log and continue
|
# Failed to upsert, log and continue
|
||||||
logger.error("Failed to insert client IP %r: %r", entry, e)
|
logger.error("Failed to insert client IP %r: %r", entry, e)
|
||||||
@ -372,19 +392,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||||||
keys giving the column names
|
keys giving the column names
|
||||||
"""
|
"""
|
||||||
|
|
||||||
res = yield self.runInteraction(
|
keyvalues = {"user_id": user_id}
|
||||||
"get_last_client_ip_by_device",
|
if device_id is not None:
|
||||||
self._get_last_client_ip_by_device_txn,
|
keyvalues["device_id"] = device_id
|
||||||
user_id,
|
|
||||||
device_id,
|
res = yield self._simple_select_list(
|
||||||
retcols=(
|
table="devices",
|
||||||
"user_id",
|
keyvalues=keyvalues,
|
||||||
"access_token",
|
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
|
||||||
"ip",
|
|
||||||
"user_agent",
|
|
||||||
"device_id",
|
|
||||||
"last_seen",
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
ret = {(d["user_id"], d["device_id"]): d for d in res}
|
ret = {(d["user_id"], d["device_id"]): d for d in res}
|
||||||
@ -403,42 +418,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
|
|
||||||
where_clauses = []
|
|
||||||
bindings = []
|
|
||||||
if device_id is None:
|
|
||||||
where_clauses.append("user_id = ?")
|
|
||||||
bindings.extend((user_id,))
|
|
||||||
else:
|
|
||||||
where_clauses.append("(user_id = ? AND device_id = ?)")
|
|
||||||
bindings.extend((user_id, device_id))
|
|
||||||
|
|
||||||
if not where_clauses:
|
|
||||||
return []
|
|
||||||
|
|
||||||
inner_select = (
|
|
||||||
"SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips "
|
|
||||||
"WHERE %(where)s "
|
|
||||||
"GROUP BY user_id, device_id"
|
|
||||||
) % {"where": " OR ".join(where_clauses)}
|
|
||||||
|
|
||||||
sql = (
|
|
||||||
"SELECT %(retcols)s FROM user_ips "
|
|
||||||
"JOIN (%(inner_select)s) ips ON"
|
|
||||||
" user_ips.last_seen = ips.mls AND"
|
|
||||||
" user_ips.user_id = ips.user_id AND"
|
|
||||||
" (user_ips.device_id = ips.device_id OR"
|
|
||||||
" (user_ips.device_id IS NULL AND ips.device_id IS NULL)"
|
|
||||||
" )"
|
|
||||||
) % {
|
|
||||||
"retcols": ",".join("user_ips." + c for c in retcols),
|
|
||||||
"inner_select": inner_select,
|
|
||||||
}
|
|
||||||
|
|
||||||
txn.execute(sql, bindings)
|
|
||||||
return cls.cursor_to_dict(txn)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_user_ip_and_agents(self, user):
|
def get_user_ip_and_agents(self, user):
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
@ -470,3 +449,50 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||||||
}
|
}
|
||||||
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
|
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _devices_last_seen_update(self, progress, batch_size):
|
||||||
|
"""Background update to insert last seen info into devices table
|
||||||
|
"""
|
||||||
|
|
||||||
|
last_user_id = progress.get("last_user_id", "")
|
||||||
|
last_device_id = progress.get("last_device_id", "")
|
||||||
|
|
||||||
|
def _devices_last_seen_update_txn(txn):
|
||||||
|
sql = """
|
||||||
|
SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices
|
||||||
|
INNER JOIN user_ips AS u USING (user_id, device_id)
|
||||||
|
WHERE user_id > ? OR (user_id = ? AND device_id > ?)
|
||||||
|
ORDER BY user_id ASC, device_id ASC
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size))
|
||||||
|
|
||||||
|
rows = txn.fetchall()
|
||||||
|
if not rows:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
UPDATE devices
|
||||||
|
SET last_seen = ?, ip = ?, user_agent = ?
|
||||||
|
WHERE user_id = ? AND device_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute_batch(sql, rows)
|
||||||
|
|
||||||
|
_, _, _, user_id, device_id = rows[-1]
|
||||||
|
self._background_update_progress_txn(
|
||||||
|
txn,
|
||||||
|
"devices_last_seen",
|
||||||
|
{"last_user_id": user_id, "last_device_id": device_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(rows)
|
||||||
|
|
||||||
|
updated = yield self.runInteraction(
|
||||||
|
"_devices_last_seen_update", _devices_last_seen_update_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
if not updated:
|
||||||
|
yield self._end_background_update("devices_last_seen")
|
||||||
|
|
||||||
|
return updated
|
||||||
|
24
synapse/storage/schema/delta/56/devices_last_seen.sql
Normal file
24
synapse/storage/schema/delta/56/devices_last_seen.sql
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
/* Copyright 2019 Matrix.org Foundation CIC
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Track last seen information for a device in the devices table, rather
|
||||||
|
-- than relying on it being in the user_ips table (which we want to be able
|
||||||
|
-- to purge old entries from)
|
||||||
|
ALTER TABLE devices ADD COLUMN last_seen BIGINT;
|
||||||
|
ALTER TABLE devices ADD COLUMN ip TEXT;
|
||||||
|
ALTER TABLE devices ADD COLUMN user_agent TEXT;
|
||||||
|
|
||||||
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('devices_last_seen', '{}');
|
@ -55,7 +55,6 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
|
|||||||
{
|
{
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"device_id": "device_id",
|
"device_id": "device_id",
|
||||||
"access_token": "access_token",
|
|
||||||
"ip": "ip",
|
"ip": "ip",
|
||||||
"user_agent": "user_agent",
|
"user_agent": "user_agent",
|
||||||
"last_seen": 12345678000,
|
"last_seen": 12345678000,
|
||||||
@ -201,6 +200,85 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
|
|||||||
active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
|
active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
|
||||||
self.assertTrue(active)
|
self.assertTrue(active)
|
||||||
|
|
||||||
|
def test_devices_last_seen_bg_update(self):
|
||||||
|
# First make sure we have completed all updates.
|
||||||
|
while not self.get_success(self.store.has_completed_background_updates()):
|
||||||
|
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||||
|
|
||||||
|
# Insert a user IP
|
||||||
|
user_id = "@user:id"
|
||||||
|
self.get_success(
|
||||||
|
self.store.insert_client_ip(
|
||||||
|
user_id, "access_token", "ip", "user_agent", "device_id"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Force persisting to disk
|
||||||
|
self.reactor.advance(200)
|
||||||
|
|
||||||
|
# But clear the associated entry in devices table
|
||||||
|
self.get_success(
|
||||||
|
self.store._simple_update(
|
||||||
|
table="devices",
|
||||||
|
keyvalues={"user_id": user_id, "device_id": "device_id"},
|
||||||
|
updatevalues={"last_seen": None, "ip": None, "user_agent": None},
|
||||||
|
desc="test_devices_last_seen_bg_update",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# We should now get nulls when querying
|
||||||
|
result = self.get_success(
|
||||||
|
self.store.get_last_client_ip_by_device(user_id, "device_id")
|
||||||
|
)
|
||||||
|
|
||||||
|
r = result[(user_id, "device_id")]
|
||||||
|
self.assertDictContainsSubset(
|
||||||
|
{
|
||||||
|
"user_id": user_id,
|
||||||
|
"device_id": "device_id",
|
||||||
|
"ip": None,
|
||||||
|
"user_agent": None,
|
||||||
|
"last_seen": None,
|
||||||
|
},
|
||||||
|
r,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Register the background update to run again.
|
||||||
|
self.get_success(
|
||||||
|
self.store._simple_insert(
|
||||||
|
table="background_updates",
|
||||||
|
values={
|
||||||
|
"update_name": "devices_last_seen",
|
||||||
|
"progress_json": "{}",
|
||||||
|
"depends_on": None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ... and tell the DataStore that it hasn't finished all updates yet
|
||||||
|
self.store._all_done = False
|
||||||
|
|
||||||
|
# Now let's actually drive the updates to completion
|
||||||
|
while not self.get_success(self.store.has_completed_background_updates()):
|
||||||
|
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||||
|
|
||||||
|
# We should now get the correct result again
|
||||||
|
result = self.get_success(
|
||||||
|
self.store.get_last_client_ip_by_device(user_id, "device_id")
|
||||||
|
)
|
||||||
|
|
||||||
|
r = result[(user_id, "device_id")]
|
||||||
|
self.assertDictContainsSubset(
|
||||||
|
{
|
||||||
|
"user_id": user_id,
|
||||||
|
"device_id": "device_id",
|
||||||
|
"ip": "ip",
|
||||||
|
"user_agent": "user_agent",
|
||||||
|
"last_seen": 0,
|
||||||
|
},
|
||||||
|
r,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ClientIpAuthTestCase(unittest.HomeserverTestCase):
|
class ClientIpAuthTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user