From c7a96a3afe20a03714dc68fc2c8e75cda16eb026 Mon Sep 17 00:00:00 2001 From: David Teller Date: Tue, 25 Jan 2022 13:19:44 +0100 Subject: [PATCH] Retry requests in case of throttling (#178) * Retry requests in case of throttling Co-authored-by: gnuxie --- mx-tester.yml | 8 +- src/utils.ts | 69 ++++++++++ test/integration/abuseReportTest.ts | 10 +- test/integration/banListTest.ts | 6 +- test/integration/clientHelper.ts | 123 ++++++++++++++---- .../integration/commands/redactCommandTest.ts | 12 +- test/integration/helloTest.ts | 2 +- test/integration/mjolnirSetupUtils.ts | 3 +- test/integration/throttleTest.ts | 74 +++++++++++ test/integration/timelinePaginationTest.ts | 14 +- 10 files changed, 266 insertions(+), 55 deletions(-) create mode 100644 test/integration/throttleTest.ts diff --git a/mx-tester.yml b/mx-tester.yml index fb49066..0a3372c 100644 --- a/mx-tester.yml +++ b/mx-tester.yml @@ -21,11 +21,9 @@ homeserver: # Make manual testing easier enable_registration: true - # Getting rid of throttling. - rc_message: - per_second: 10000 - burst_count: 10000 - + # We remove rc_message so we can test rate limiting, + # but we keep the others because of https://github.com/matrix-org/synapse/issues/11785 + # and we don't want to slow integration tests down. rc_registration: per_second: 10000 burst_count: 10000 diff --git a/src/utils.ts b/src/utils.ts index 4e9dbc1..64ebcc3 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -313,6 +313,74 @@ function patchMatrixClientForConciseExceptions() { isMatrixClientPatchedForConciseExceptions = true; } +const MAX_REQUEST_ATTEMPTS = 15; +const REQUEST_RETRY_BASE_DURATION_MS = 100; + +const TRACE_CONCURRENT_REQUESTS = false; +let numberOfConcurrentRequests = 0; +let isMatrixClientPatchedForRetryWhenThrottled = false; +/** + * Patch instances of MatrixClient to make sure that it retries requests + * in case of throttling. + * + * Note: As of this writing, we do not re-attempt requests that timeout, + * only request that are throttled by the server. The rationale is that, + * in case of DoS, we do not wish to make the situation even worse. + */ +function patchMatrixClientForRetry() { + if (isMatrixClientPatchedForRetryWhenThrottled) { + return; + } + let originalRequestFn = getRequestFn(); + setRequestFn(async (params, cb) => { + let attempt = 1; + numberOfConcurrentRequests += 1; + if (TRACE_CONCURRENT_REQUESTS) { + console.trace("Current number of concurrent requests", numberOfConcurrentRequests); + } + try { + while (true) { + try { + let result: any[] = await new Promise((resolve, reject) => { + originalRequestFn(params, function requestFnWithRetry(err, response, resBody) { + // Note: There is no data race on `attempt` as we `await` before continuing + // to the next iteration of the loop. + if (attempt < MAX_REQUEST_ATTEMPTS && err?.body?.errcode === 'M_LIMIT_EXCEEDED') { + // We need to retry. + reject(err); + } else { + // No need-to-retry error? Lucky us! + // Note that this may very well be an error, just not + // one we need to retry. + resolve([err, response, resBody]); + } + }); + }); + // This is our final result. + // Pass result, whether success or error. + return cb(...result); + } catch (err) { + // Need to retry. + let retryAfterMs = attempt * attempt * REQUEST_RETRY_BASE_DURATION_MS; + if ("retry_after_ms" in err) { + try { + retryAfterMs = Number.parseInt(err.retry_after_ms, 10); + } catch (ex) { + // Use default value. + } + } + LogService.debug("Mjolnir.client", `Waiting ${retryAfterMs}ms before retrying ${params.method} ${params.uri}`); + await new Promise(resolve => setTimeout(resolve, retryAfterMs)); + attempt += 1; + } + } + } finally { + numberOfConcurrentRequests -= 1; + } + }); + isMatrixClientPatchedForRetryWhenThrottled = true; +} + /** * Perform any patching deemed necessary to MatrixClient. */ @@ -324,4 +392,5 @@ export function patchMatrixClient() { // - `patchMatrixClientForRetry` expects that all errors are returned as // errors. patchMatrixClientForConciseExceptions(); + patchMatrixClientForRetry(); } diff --git a/test/integration/abuseReportTest.ts b/test/integration/abuseReportTest.ts index b5b23c0..e8abbc4 100644 --- a/test/integration/abuseReportTest.ts +++ b/test/integration/abuseReportTest.ts @@ -32,8 +32,8 @@ describe("Test: Reporting abuse", async () => { }); // Create a few users and a room. - let goodUser = await newTestUser(false, "reporting-abuse-good-user"); - let badUser = await newTestUser(false, "reporting-abuse-bad-user"); + let goodUser = await newTestUser({ name: { contains: "reporting-abuse-good-user" }}); + let badUser = await newTestUser({ name: { contains: "reporting-abuse-bad-user" }}); let goodUserId = await goodUser.getUserId(); let badUserId = await badUser.getUserId(); @@ -227,13 +227,13 @@ describe("Test: Reporting abuse", async () => { }); // Create a moderator. - let moderatorUser = await newTestUser(false, "reacting-abuse-moderator-user"); + let moderatorUser = await newTestUser({ name: { contains: "reporting-abuse-moderator-user" }}); matrixClient().inviteUser(await moderatorUser.getUserId(), this.mjolnir.managementRoomId); await moderatorUser.joinRoom(this.mjolnir.managementRoomId); // Create a few users and a room. - let goodUser = await newTestUser(false, "reacting-abuse-good-user"); - let badUser = await newTestUser(false, "reacting-abuse-bad-user"); + let goodUser = await newTestUser({ name: { contains: "reacting-abuse-good-user" }}); + let badUser = await newTestUser({ name: { contains: "reacting-abuse-bad-user" }}); let goodUserId = await goodUser.getUserId(); let badUserId = await badUser.getUserId(); diff --git a/test/integration/banListTest.ts b/test/integration/banListTest.ts index 4eb6476..c505daa 100644 --- a/test/integration/banListTest.ts +++ b/test/integration/banListTest.ts @@ -26,7 +26,7 @@ describe("Test: Updating the BanList", function () { it("Calculates what has changed correctly.", async function () { this.timeout(10000); const mjolnir = config.RUNTIME.client! - const moderator = await newTestUser(false, "moderator"); + const moderator = await newTestUser({ name: { contains: "moderator" }}); const banListId = await mjolnir.createRoom({ invite: [await moderator.getUserId()]}); const banList = new BanList(banListId, banListId, mjolnir); mjolnir.setUserPowerLevel(await moderator.getUserId(), banListId, 100); @@ -117,7 +117,7 @@ describe("Test: Updating the BanList", function () { it("Will remove rules with old types when they are 'soft redacted' with a different but more recent event type.", async function () { this.timeout(3000); const mjolnir = config.RUNTIME.client! - const moderator = await newTestUser(false, "moderator"); + const moderator = await newTestUser({ name: { contains: "moderator" }}); const banListId = await mjolnir.createRoom({ invite: [await moderator.getUserId()]}); const banList = new BanList(banListId, banListId, mjolnir); mjolnir.setUserPowerLevel(await moderator.getUserId(), banListId, 100); @@ -139,7 +139,7 @@ describe("Test: Updating the BanList", function () { it("A rule of the most recent type won't be deleted when an old rule is deleted for the same entity.", async function () { this.timeout(3000); const mjolnir = config.RUNTIME.client! - const moderator = await newTestUser(false, "moderator"); + const moderator = await newTestUser({ name: { contains: "moderator" }}); const banListId = await mjolnir.createRoom({ invite: [await moderator.getUserId()]}); const banList = new BanList(banListId, banListId, mjolnir); mjolnir.setUserPowerLevel(await moderator.getUserId(), banListId, 100); diff --git a/test/integration/clientHelper.ts b/test/integration/clientHelper.ts index 8e7eb76..502397d 100644 --- a/test/integration/clientHelper.ts +++ b/test/integration/clientHelper.ts @@ -18,12 +18,11 @@ const REGISTRATION_RETRY_BASE_DELAY_MS = 100; * @returns The response from synapse. */ export async function registerUser(username: string, displayname: string, password: string, admin: boolean) { - let registerUrl = `${config.homeserverUrl}/_synapse/admin/v1/register` - let { data } = await axios.get(registerUrl); - let nonce = data.nonce!; - let mac = HmacSHA1(`${nonce}\0${username}\0${password}\0${admin ? 'admin' : 'notadmin'}`, 'REGISTRATION_SHARED_SECRET'); + const registerUrl = `${config.homeserverUrl}/_synapse/admin/v1/register`; for (let i = 1; i <= REGISTRATION_ATTEMPTS; ++i) { try { + const { data: { nonce } } = await axios.get(registerUrl); + const mac = HmacSHA1(`${nonce}\0${username}\0${password}\0${admin ? 'admin' : 'notadmin'}`, 'REGISTRATION_SHARED_SECRET'); return await axios.post(registerUrl, { nonce, username, @@ -45,47 +44,117 @@ export async function registerUser(username: string, displayname: string, passwo throw new Error(`Retried registration ${REGISTRATION_ATTEMPTS} times, is Mjolnir or Synapse misconfigured?`); } +export type RegistrationOptions = { + /** + * If specified and true, make the user an admin. + */ + isAdmin?: boolean, + /** + * If `exact`, use the account with this exact name, attempting to reuse + * an existing account if possible. + * + * If `contains` create a new account with a name that contains this + * specific string. + */ + name: { exact: string } | { contains: string }, + /** + * If specified and true, throttle this user. + */ + isThrottled?: boolean +} + /** - * Register a new test user with a unique username. + * Register a new test user. * - * @param isAdmin Whether to make the new user an admin. - * @param label If specified, a string to place somewhere within the username. - * @returns A string that is the username and password of a new user. + * @returns A string that is both the username and password of a new user. */ -export async function registerNewTestUser(isAdmin: boolean, label: string = "") { - let isUserValid = false; - let username; - if (label != "") { - label += "-"; - } +async function registerNewTestUser(options: RegistrationOptions) { do { - username = `mjolnir-test-user-${label}${Math.floor(Math.random() * 100000)}` - await registerUser(username, username, username, isAdmin).then(_ => isUserValid = true).catch(e => { + let username; + if ("exact" in options.name) { + username = options.name.exact; + } else { + username = `mjolnir-test-user-${options.name.contains}${Math.floor(Math.random() * 100000)}` + } + try { + await registerUser(username, username, username, options.isAdmin); + return username; + } catch (e) { if (e.isAxiosError && e?.response?.data?.errcode === 'M_USER_IN_USE') { - LogService.debug("test/clientHelper", `${username} already registered, trying another`); - false // continue and try again + if ("exact" in options.name) { + LogService.debug("test/clientHelper", `${username} already registered, reusing`); + return username; + } else { + LogService.debug("test/clientHelper", `${username} already registered, trying another`); + } } else { console.error(`failed to register user ${e}`); throw e; } - }) - } while (!isUserValid); - return username; + } + } while (true); } /** - * Registers a unique test user and returns a `MatrixClient` logged in and ready to use. + * Registers a test user and returns a `MatrixClient` logged in and ready to use. * - * @param isAdmin Whether to make the user an admin. - * @param label If specified, a string to place somewhere within the username. * @returns A new `MatrixClient` session for a unique test user. */ -export async function newTestUser(isAdmin: boolean = false, label: string = ""): Promise { - const username = await registerNewTestUser(isAdmin, label); +export async function newTestUser(options: RegistrationOptions): Promise { + const username = await registerNewTestUser(options); const pantalaimon = new PantalaimonClient(config.homeserverUrl, new MemoryStorageProvider()); - return await pantalaimon.createClientWithCredentials(username, username); + const client = await pantalaimon.createClientWithCredentials(username, username); + if (!options.isThrottled) { + let userId = await client.getUserId(); + await overrideRatelimitForUser(userId); + } + return client; } +let _globalAdminUser: MatrixClient; + +/** + * Get a client that can perform synapse admin API actions. + * @returns A client logged in with an admin user. + */ +async function getGlobalAdminUser(): Promise { + // Initialize global admin user if needed. + if (!_globalAdminUser) { + const USERNAME = "mjolnir-test-internal-admin-user"; + try { + await registerUser(USERNAME, USERNAME, USERNAME, true); + } catch (e) { + if (e.isAxiosError && e?.response?.data?.errcode === 'M_USER_IN_USE') { + // Then we've already registered the user in a previous run and that is ok. + } else { + throw e; + } + } + _globalAdminUser = await new PantalaimonClient(config.homeserverUrl, new MemoryStorageProvider()).createClientWithCredentials(USERNAME, USERNAME); + } + return _globalAdminUser; +} + +/** + * Disable ratelimiting for this user in Synapse. + * @param userId The user to disable ratelimiting for, has to include both the server part and local part. + */ +export async function overrideRatelimitForUser(userId: string) { + await (await getGlobalAdminUser()).doRequest("POST", `/_synapse/admin/v1/users/${userId}/override_ratelimit`, null, { + "messages_per_second": 0, + "burst_count": 0 + }); +} + +/** + * Put back the default ratelimiting for this user in Synapse. + * @param userId The user to use default ratelimiting for, has to include both the server part and local part. + */ +export async function resetRatelimitForUser(userId: string) { + await (await getGlobalAdminUser()).doRequest("DELETE", `/_synapse/admin/v1/users/${userId}/override_ratelimit`, null); +} + + /** * Utility to create an event listener for m.notice msgtype m.room.messages. * @param targetRoomdId The roomId to listen into. diff --git a/test/integration/commands/redactCommandTest.ts b/test/integration/commands/redactCommandTest.ts index 3971e0d..327e338 100644 --- a/test/integration/commands/redactCommandTest.ts +++ b/test/integration/commands/redactCommandTest.ts @@ -10,11 +10,11 @@ import { getFirstReaction } from "./commandUtils"; it('Mjölnir redacts all of the events sent by a spammer when instructed to by giving their id and a room id.', async function() { this.timeout(60000); // Create a few users and a room. - let badUser = await newTestUser(false, "spammer-needs-redacting"); + let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } }); let badUserId = await badUser.getUserId(); const mjolnir = config.RUNTIME.client! let mjolnirUserId = await mjolnir.getUserId(); - let moderator = await newTestUser(false, "moderator"); + let moderator = await newTestUser({ name: { contains: "moderator" } }); this.moderator = moderator; await moderator.joinRoom(config.managementRoom); let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId(), mjolnirUserId]}); @@ -54,11 +54,11 @@ import { getFirstReaction } from "./commandUtils"; it('Mjölnir redacts all of the events sent by a spammer when instructed to by giving their id in multiple rooms.', async function() { this.timeout(60000); // Create a few users and a room. - let badUser = await newTestUser(false, "spammer-needs-redacting"); + let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } }); let badUserId = await badUser.getUserId(); const mjolnir = config.RUNTIME.client! let mjolnirUserId = await mjolnir.getUserId(); - let moderator = await newTestUser(false, "moderator"); + let moderator = await newTestUser({ name: { contains: "moderator" } }); this.moderator = moderator; await moderator.joinRoom(config.managementRoom); let targetRooms: string[] = []; @@ -103,10 +103,10 @@ import { getFirstReaction } from "./commandUtils"; it("Redacts a single event when instructed to.", async function () { this.timeout(60000); // Create a few users and a room. - let badUser = await newTestUser(false, "spammer-needs-redacting"); + let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } }); const mjolnir = config.RUNTIME.client! let mjolnirUserId = await mjolnir.getUserId(); - let moderator = await newTestUser(false, "moderator"); + let moderator = await newTestUser({ name: { contains: "moderator" } }); this.moderator = moderator; await moderator.joinRoom(config.managementRoom); let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId(), mjolnirUserId]}); diff --git a/test/integration/helloTest.ts b/test/integration/helloTest.ts index f82018a..4e3c055 100644 --- a/test/integration/helloTest.ts +++ b/test/integration/helloTest.ts @@ -4,7 +4,7 @@ import { newTestUser, noticeListener } from "./clientHelper" describe("Test: !help command", function() { let client; this.beforeEach(async function () { - client = await newTestUser(true); + client = await newTestUser({ name: { contains: "-" }});; await client.start(); }) this.afterEach(async function () { diff --git a/test/integration/mjolnirSetupUtils.ts b/test/integration/mjolnirSetupUtils.ts index eb408fe..f6d7849 100644 --- a/test/integration/mjolnirSetupUtils.ts +++ b/test/integration/mjolnirSetupUtils.ts @@ -23,7 +23,7 @@ import { } from "matrix-bot-sdk"; import { Mjolnir} from '../../src/Mjolnir'; import config from "../../src/config"; -import { registerUser } from "./clientHelper"; +import { overrideRatelimitForUser, registerUser } from "./clientHelper"; import { patchMatrixClient } from "../../src/utils"; /** @@ -82,6 +82,7 @@ export async function makeMjolnir(): Promise { LogService.info("test/mjolnirSetupUtils", "Starting bot..."); const pantalaimon = new PantalaimonClient(config.homeserverUrl, new MemoryStorageProvider()); const client = await pantalaimon.createClientWithCredentials(config.pantalaimon.username, config.pantalaimon.password); + await overrideRatelimitForUser(await client.getUserId()); patchMatrixClient(); await ensureAliasedRoomExists(client, config.managementRoom); let mjolnir = await Mjolnir.setupMjolnirFromConfig(client); diff --git a/test/integration/throttleTest.ts b/test/integration/throttleTest.ts new file mode 100644 index 0000000..a33267e --- /dev/null +++ b/test/integration/throttleTest.ts @@ -0,0 +1,74 @@ +import { strict as assert } from "assert"; +import { newTestUser, overrideRatelimitForUser, resetRatelimitForUser } from "./clientHelper"; +import { getMessagesByUserIn } from "../../src/utils"; +import { getFirstReaction } from "./commands/commandUtils"; + +describe("Test: throttled users can function with Mjolnir.", function () { + it('throttled users survive being throttled by synapse', async function() { + this.timeout(60000); + let throttledUser = await newTestUser({ name: { contains: "throttled" }, isThrottled: true }); + let throttledUserId = await throttledUser.getUserId(); + let targetRoom = await throttledUser.createRoom(); + // send enough messages to hit the rate limit. + await Promise.all([...Array(150).keys()].map((i) => throttledUser.sendMessage(targetRoom, {msgtype: 'm.text.', body: `Message #${i}`}))); + let messageCount = 0; + await getMessagesByUserIn(throttledUser, throttledUserId, targetRoom, 150, (events) => { + messageCount += events.length; + }); + assert.equal(messageCount, 150, "There should have been 150 messages in this room"); + }) +}) + +describe("Test: Mjolnir can still sync and respond to commands while throttled", function () { + beforeEach(async function() { + await resetRatelimitForUser(await this.mjolnir.client.getUserId()) + }) + afterEach(async function() { + await overrideRatelimitForUser(await this.mjolnir.client.getUserId()); + }) + + it('Can still perform and respond to a redaction command', async function () { + this.timeout(60000); + // Create a few users and a room. + let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } }); + let badUserId = await badUser.getUserId(); + const mjolnir = this.mjolnir.client; + let mjolnirUserId = await mjolnir.getUserId(); + let moderator = await newTestUser({ name: { contains: "moderator" } }); + this.moderator = moderator; + await moderator.joinRoom(this.mjolnir.managementRoomId); + let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId(), mjolnirUserId]}); + await moderator.setUserPowerLevel(mjolnirUserId, targetRoom, 100); + await badUser.joinRoom(targetRoom); + + // Give Mjolnir some work to do and some messages to sync through. + await Promise.all([...Array(100).keys()].map((i) => moderator.sendMessage(this.mjolnir.managementRoomId, {msgtype: 'm.text.', body: `Irrelevant Message #${i}`}))); + await Promise.all([...Array(50).keys()].map(_ => moderator.sendMessage(this.mjolnir.managementRoomId, {msgtype: 'm.text', body: '!mjolnir status'}))); + + await moderator.sendMessage(this.mjolnir.managementRoomId, {msgtype: 'm.text', body: `!mjolnir rooms add ${targetRoom}`}); + + await Promise.all([...Array(50).keys()].map((i) => badUser.sendMessage(targetRoom, {msgtype: 'm.text.', body: `Bad Message #${i}`}))); + + try { + moderator.start(); + await getFirstReaction(moderator, this.mjolnir.managementRoomId, '✅', async () => { + return await moderator.sendMessage(this.mjolnir.managementRoomId, { msgtype: 'm.text', body: `!mjolnir redact ${badUserId} ${targetRoom}` }); + }); + } finally { + moderator.stop(); + } + + let count = 0; + await getMessagesByUserIn(moderator, badUserId, targetRoom, 1000, function(events) { + count += events.length + events.map(e => { + if (e.type === 'm.room.member') { + assert.equal(Object.keys(e.content).length, 1, "Only membership should be left on the membership event when it has been redacted.") + } else if (Object.keys(e.content).length !== 0) { + throw new Error(`This event should have been redacted: ${JSON.stringify(e, null, 2)}`) + } + }) + }); + assert.equal(count, 51, "There should be exactly 51 events from the spammer in this room."); + }) +}) diff --git a/test/integration/timelinePaginationTest.ts b/test/integration/timelinePaginationTest.ts index d746982..3081718 100644 --- a/test/integration/timelinePaginationTest.ts +++ b/test/integration/timelinePaginationTest.ts @@ -10,9 +10,9 @@ describe("Test: timeline pagination", function () { it('does not paginate across the entire room history while backfilling.', async function() { this.timeout(60000); // Create a few users and a room. - let badUser = await newTestUser(false, "spammer"); + let badUser = await newTestUser({ name: { contains: "spammer" }}); let badUserId = await badUser.getUserId(); - let moderator = await newTestUser(false, "moderator"); + let moderator = await newTestUser({ name: { contains: "moderator" }}); let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId()]}); await badUser.joinRoom(targetRoom); @@ -39,9 +39,9 @@ describe("Test: timeline pagination", function () { }) it('does not call the callback with an empty array when there are no relevant events', async function() { this.timeout(60000); - let badUser = await newTestUser(false, "spammer"); + let badUser = await newTestUser({ name: { contains: "spammer" }}); let badUserId = await badUser.getUserId(); - let moderator = await newTestUser(false, "moderator"); + let moderator = await newTestUser({ name: { contains: "moderator" }}); let targetRoom = await moderator.createRoom(); // send some irrelevant messages await Promise.all([...Array(200).keys()].map((i) => moderator.sendMessage(targetRoom, {msgtype: 'm.text.', body: `Irrelevant Message #${i}`}))); @@ -54,9 +54,9 @@ describe("Test: timeline pagination", function () { }) it("The limit provided is respected", async function() { this.timeout(60000); - let badUser = await newTestUser(false, "spammer"); + let badUser = await newTestUser({ name: { contains: "spammer" }}); let badUserId = await badUser.getUserId(); - let moderator = await newTestUser(false, "moderator"); + let moderator = await newTestUser({ name: { contains: "moderator" }}); let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId()]}); await badUser.joinRoom(targetRoom); // send some bad person messages @@ -83,7 +83,7 @@ describe("Test: timeline pagination", function () { }); it("Gives the events to the callback ordered by youngest first (even more important when the limit is reached halfway through a chunk).", async function() { this.timeout(60000); - let moderator = await newTestUser(false, "moderator"); + let moderator = await newTestUser({ name: { contains: "moderator" }}); let moderatorId = await moderator.getUserId(); let targetRoom = await moderator.createRoom(); for (let i = 0; i < 20; i++) {