Retry requests in case of throttling (#178)

* Retry requests in case of throttling


Co-authored-by: gnuxie <gnuxie@element.io>
This commit is contained in:
David Teller 2022-01-25 13:19:44 +01:00 committed by GitHub
parent baa9129fa5
commit c7a96a3afe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 266 additions and 55 deletions

View File

@ -21,11 +21,9 @@ homeserver:
# Make manual testing easier
enable_registration: true
# Getting rid of throttling.
rc_message:
per_second: 10000
burst_count: 10000
# We remove rc_message so we can test rate limiting,
# but we keep the others because of https://github.com/matrix-org/synapse/issues/11785
# and we don't want to slow integration tests down.
rc_registration:
per_second: 10000
burst_count: 10000

View File

@ -313,6 +313,74 @@ function patchMatrixClientForConciseExceptions() {
isMatrixClientPatchedForConciseExceptions = true;
}
const MAX_REQUEST_ATTEMPTS = 15;
const REQUEST_RETRY_BASE_DURATION_MS = 100;
const TRACE_CONCURRENT_REQUESTS = false;
let numberOfConcurrentRequests = 0;
let isMatrixClientPatchedForRetryWhenThrottled = false;
/**
* Patch instances of MatrixClient to make sure that it retries requests
* in case of throttling.
*
* Note: As of this writing, we do not re-attempt requests that timeout,
* only request that are throttled by the server. The rationale is that,
* in case of DoS, we do not wish to make the situation even worse.
*/
function patchMatrixClientForRetry() {
if (isMatrixClientPatchedForRetryWhenThrottled) {
return;
}
let originalRequestFn = getRequestFn();
setRequestFn(async (params, cb) => {
let attempt = 1;
numberOfConcurrentRequests += 1;
if (TRACE_CONCURRENT_REQUESTS) {
console.trace("Current number of concurrent requests", numberOfConcurrentRequests);
}
try {
while (true) {
try {
let result: any[] = await new Promise((resolve, reject) => {
originalRequestFn(params, function requestFnWithRetry(err, response, resBody) {
// Note: There is no data race on `attempt` as we `await` before continuing
// to the next iteration of the loop.
if (attempt < MAX_REQUEST_ATTEMPTS && err?.body?.errcode === 'M_LIMIT_EXCEEDED') {
// We need to retry.
reject(err);
} else {
// No need-to-retry error? Lucky us!
// Note that this may very well be an error, just not
// one we need to retry.
resolve([err, response, resBody]);
}
});
});
// This is our final result.
// Pass result, whether success or error.
return cb(...result);
} catch (err) {
// Need to retry.
let retryAfterMs = attempt * attempt * REQUEST_RETRY_BASE_DURATION_MS;
if ("retry_after_ms" in err) {
try {
retryAfterMs = Number.parseInt(err.retry_after_ms, 10);
} catch (ex) {
// Use default value.
}
}
LogService.debug("Mjolnir.client", `Waiting ${retryAfterMs}ms before retrying ${params.method} ${params.uri}`);
await new Promise(resolve => setTimeout(resolve, retryAfterMs));
attempt += 1;
}
}
} finally {
numberOfConcurrentRequests -= 1;
}
});
isMatrixClientPatchedForRetryWhenThrottled = true;
}
/**
* Perform any patching deemed necessary to MatrixClient.
*/
@ -324,4 +392,5 @@ export function patchMatrixClient() {
// - `patchMatrixClientForRetry` expects that all errors are returned as
// errors.
patchMatrixClientForConciseExceptions();
patchMatrixClientForRetry();
}

View File

@ -32,8 +32,8 @@ describe("Test: Reporting abuse", async () => {
});
// Create a few users and a room.
let goodUser = await newTestUser(false, "reporting-abuse-good-user");
let badUser = await newTestUser(false, "reporting-abuse-bad-user");
let goodUser = await newTestUser({ name: { contains: "reporting-abuse-good-user" }});
let badUser = await newTestUser({ name: { contains: "reporting-abuse-bad-user" }});
let goodUserId = await goodUser.getUserId();
let badUserId = await badUser.getUserId();
@ -227,13 +227,13 @@ describe("Test: Reporting abuse", async () => {
});
// Create a moderator.
let moderatorUser = await newTestUser(false, "reacting-abuse-moderator-user");
let moderatorUser = await newTestUser({ name: { contains: "reporting-abuse-moderator-user" }});
matrixClient().inviteUser(await moderatorUser.getUserId(), this.mjolnir.managementRoomId);
await moderatorUser.joinRoom(this.mjolnir.managementRoomId);
// Create a few users and a room.
let goodUser = await newTestUser(false, "reacting-abuse-good-user");
let badUser = await newTestUser(false, "reacting-abuse-bad-user");
let goodUser = await newTestUser({ name: { contains: "reacting-abuse-good-user" }});
let badUser = await newTestUser({ name: { contains: "reacting-abuse-bad-user" }});
let goodUserId = await goodUser.getUserId();
let badUserId = await badUser.getUserId();

View File

@ -26,7 +26,7 @@ describe("Test: Updating the BanList", function () {
it("Calculates what has changed correctly.", async function () {
this.timeout(10000);
const mjolnir = config.RUNTIME.client!
const moderator = await newTestUser(false, "moderator");
const moderator = await newTestUser({ name: { contains: "moderator" }});
const banListId = await mjolnir.createRoom({ invite: [await moderator.getUserId()]});
const banList = new BanList(banListId, banListId, mjolnir);
mjolnir.setUserPowerLevel(await moderator.getUserId(), banListId, 100);
@ -117,7 +117,7 @@ describe("Test: Updating the BanList", function () {
it("Will remove rules with old types when they are 'soft redacted' with a different but more recent event type.", async function () {
this.timeout(3000);
const mjolnir = config.RUNTIME.client!
const moderator = await newTestUser(false, "moderator");
const moderator = await newTestUser({ name: { contains: "moderator" }});
const banListId = await mjolnir.createRoom({ invite: [await moderator.getUserId()]});
const banList = new BanList(banListId, banListId, mjolnir);
mjolnir.setUserPowerLevel(await moderator.getUserId(), banListId, 100);
@ -139,7 +139,7 @@ describe("Test: Updating the BanList", function () {
it("A rule of the most recent type won't be deleted when an old rule is deleted for the same entity.", async function () {
this.timeout(3000);
const mjolnir = config.RUNTIME.client!
const moderator = await newTestUser(false, "moderator");
const moderator = await newTestUser({ name: { contains: "moderator" }});
const banListId = await mjolnir.createRoom({ invite: [await moderator.getUserId()]});
const banList = new BanList(banListId, banListId, mjolnir);
mjolnir.setUserPowerLevel(await moderator.getUserId(), banListId, 100);

View File

@ -18,12 +18,11 @@ const REGISTRATION_RETRY_BASE_DELAY_MS = 100;
* @returns The response from synapse.
*/
export async function registerUser(username: string, displayname: string, password: string, admin: boolean) {
let registerUrl = `${config.homeserverUrl}/_synapse/admin/v1/register`
let { data } = await axios.get(registerUrl);
let nonce = data.nonce!;
let mac = HmacSHA1(`${nonce}\0${username}\0${password}\0${admin ? 'admin' : 'notadmin'}`, 'REGISTRATION_SHARED_SECRET');
const registerUrl = `${config.homeserverUrl}/_synapse/admin/v1/register`;
for (let i = 1; i <= REGISTRATION_ATTEMPTS; ++i) {
try {
const { data: { nonce } } = await axios.get(registerUrl);
const mac = HmacSHA1(`${nonce}\0${username}\0${password}\0${admin ? 'admin' : 'notadmin'}`, 'REGISTRATION_SHARED_SECRET');
return await axios.post(registerUrl, {
nonce,
username,
@ -45,47 +44,117 @@ export async function registerUser(username: string, displayname: string, passwo
throw new Error(`Retried registration ${REGISTRATION_ATTEMPTS} times, is Mjolnir or Synapse misconfigured?`);
}
export type RegistrationOptions = {
/**
* If specified and true, make the user an admin.
*/
isAdmin?: boolean,
/**
* If `exact`, use the account with this exact name, attempting to reuse
* an existing account if possible.
*
* If `contains` create a new account with a name that contains this
* specific string.
*/
name: { exact: string } | { contains: string },
/**
* If specified and true, throttle this user.
*/
isThrottled?: boolean
}
/**
* Register a new test user with a unique username.
* Register a new test user.
*
* @param isAdmin Whether to make the new user an admin.
* @param label If specified, a string to place somewhere within the username.
* @returns A string that is the username and password of a new user.
* @returns A string that is both the username and password of a new user.
*/
export async function registerNewTestUser(isAdmin: boolean, label: string = "") {
let isUserValid = false;
let username;
if (label != "") {
label += "-";
}
async function registerNewTestUser(options: RegistrationOptions) {
do {
username = `mjolnir-test-user-${label}${Math.floor(Math.random() * 100000)}`
await registerUser(username, username, username, isAdmin).then(_ => isUserValid = true).catch(e => {
let username;
if ("exact" in options.name) {
username = options.name.exact;
} else {
username = `mjolnir-test-user-${options.name.contains}${Math.floor(Math.random() * 100000)}`
}
try {
await registerUser(username, username, username, options.isAdmin);
return username;
} catch (e) {
if (e.isAxiosError && e?.response?.data?.errcode === 'M_USER_IN_USE') {
LogService.debug("test/clientHelper", `${username} already registered, trying another`);
false // continue and try again
if ("exact" in options.name) {
LogService.debug("test/clientHelper", `${username} already registered, reusing`);
return username;
} else {
LogService.debug("test/clientHelper", `${username} already registered, trying another`);
}
} else {
console.error(`failed to register user ${e}`);
throw e;
}
})
} while (!isUserValid);
return username;
}
} while (true);
}
/**
* Registers a unique test user and returns a `MatrixClient` logged in and ready to use.
* Registers a test user and returns a `MatrixClient` logged in and ready to use.
*
* @param isAdmin Whether to make the user an admin.
* @param label If specified, a string to place somewhere within the username.
* @returns A new `MatrixClient` session for a unique test user.
*/
export async function newTestUser(isAdmin: boolean = false, label: string = ""): Promise<MatrixClient> {
const username = await registerNewTestUser(isAdmin, label);
export async function newTestUser(options: RegistrationOptions): Promise<MatrixClient> {
const username = await registerNewTestUser(options);
const pantalaimon = new PantalaimonClient(config.homeserverUrl, new MemoryStorageProvider());
return await pantalaimon.createClientWithCredentials(username, username);
const client = await pantalaimon.createClientWithCredentials(username, username);
if (!options.isThrottled) {
let userId = await client.getUserId();
await overrideRatelimitForUser(userId);
}
return client;
}
let _globalAdminUser: MatrixClient;
/**
* Get a client that can perform synapse admin API actions.
* @returns A client logged in with an admin user.
*/
async function getGlobalAdminUser(): Promise<MatrixClient> {
// Initialize global admin user if needed.
if (!_globalAdminUser) {
const USERNAME = "mjolnir-test-internal-admin-user";
try {
await registerUser(USERNAME, USERNAME, USERNAME, true);
} catch (e) {
if (e.isAxiosError && e?.response?.data?.errcode === 'M_USER_IN_USE') {
// Then we've already registered the user in a previous run and that is ok.
} else {
throw e;
}
}
_globalAdminUser = await new PantalaimonClient(config.homeserverUrl, new MemoryStorageProvider()).createClientWithCredentials(USERNAME, USERNAME);
}
return _globalAdminUser;
}
/**
* Disable ratelimiting for this user in Synapse.
* @param userId The user to disable ratelimiting for, has to include both the server part and local part.
*/
export async function overrideRatelimitForUser(userId: string) {
await (await getGlobalAdminUser()).doRequest("POST", `/_synapse/admin/v1/users/${userId}/override_ratelimit`, null, {
"messages_per_second": 0,
"burst_count": 0
});
}
/**
* Put back the default ratelimiting for this user in Synapse.
* @param userId The user to use default ratelimiting for, has to include both the server part and local part.
*/
export async function resetRatelimitForUser(userId: string) {
await (await getGlobalAdminUser()).doRequest("DELETE", `/_synapse/admin/v1/users/${userId}/override_ratelimit`, null);
}
/**
* Utility to create an event listener for m.notice msgtype m.room.messages.
* @param targetRoomdId The roomId to listen into.

View File

@ -10,11 +10,11 @@ import { getFirstReaction } from "./commandUtils";
it('Mjölnir redacts all of the events sent by a spammer when instructed to by giving their id and a room id.', async function() {
this.timeout(60000);
// Create a few users and a room.
let badUser = await newTestUser(false, "spammer-needs-redacting");
let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } });
let badUserId = await badUser.getUserId();
const mjolnir = config.RUNTIME.client!
let mjolnirUserId = await mjolnir.getUserId();
let moderator = await newTestUser(false, "moderator");
let moderator = await newTestUser({ name: { contains: "moderator" } });
this.moderator = moderator;
await moderator.joinRoom(config.managementRoom);
let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId(), mjolnirUserId]});
@ -54,11 +54,11 @@ import { getFirstReaction } from "./commandUtils";
it('Mjölnir redacts all of the events sent by a spammer when instructed to by giving their id in multiple rooms.', async function() {
this.timeout(60000);
// Create a few users and a room.
let badUser = await newTestUser(false, "spammer-needs-redacting");
let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } });
let badUserId = await badUser.getUserId();
const mjolnir = config.RUNTIME.client!
let mjolnirUserId = await mjolnir.getUserId();
let moderator = await newTestUser(false, "moderator");
let moderator = await newTestUser({ name: { contains: "moderator" } });
this.moderator = moderator;
await moderator.joinRoom(config.managementRoom);
let targetRooms: string[] = [];
@ -103,10 +103,10 @@ import { getFirstReaction } from "./commandUtils";
it("Redacts a single event when instructed to.", async function () {
this.timeout(60000);
// Create a few users and a room.
let badUser = await newTestUser(false, "spammer-needs-redacting");
let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } });
const mjolnir = config.RUNTIME.client!
let mjolnirUserId = await mjolnir.getUserId();
let moderator = await newTestUser(false, "moderator");
let moderator = await newTestUser({ name: { contains: "moderator" } });
this.moderator = moderator;
await moderator.joinRoom(config.managementRoom);
let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId(), mjolnirUserId]});

View File

@ -4,7 +4,7 @@ import { newTestUser, noticeListener } from "./clientHelper"
describe("Test: !help command", function() {
let client;
this.beforeEach(async function () {
client = await newTestUser(true);
client = await newTestUser({ name: { contains: "-" }});;
await client.start();
})
this.afterEach(async function () {

View File

@ -23,7 +23,7 @@ import {
} from "matrix-bot-sdk";
import { Mjolnir} from '../../src/Mjolnir';
import config from "../../src/config";
import { registerUser } from "./clientHelper";
import { overrideRatelimitForUser, registerUser } from "./clientHelper";
import { patchMatrixClient } from "../../src/utils";
/**
@ -82,6 +82,7 @@ export async function makeMjolnir(): Promise<Mjolnir> {
LogService.info("test/mjolnirSetupUtils", "Starting bot...");
const pantalaimon = new PantalaimonClient(config.homeserverUrl, new MemoryStorageProvider());
const client = await pantalaimon.createClientWithCredentials(config.pantalaimon.username, config.pantalaimon.password);
await overrideRatelimitForUser(await client.getUserId());
patchMatrixClient();
await ensureAliasedRoomExists(client, config.managementRoom);
let mjolnir = await Mjolnir.setupMjolnirFromConfig(client);

View File

@ -0,0 +1,74 @@
import { strict as assert } from "assert";
import { newTestUser, overrideRatelimitForUser, resetRatelimitForUser } from "./clientHelper";
import { getMessagesByUserIn } from "../../src/utils";
import { getFirstReaction } from "./commands/commandUtils";
describe("Test: throttled users can function with Mjolnir.", function () {
it('throttled users survive being throttled by synapse', async function() {
this.timeout(60000);
let throttledUser = await newTestUser({ name: { contains: "throttled" }, isThrottled: true });
let throttledUserId = await throttledUser.getUserId();
let targetRoom = await throttledUser.createRoom();
// send enough messages to hit the rate limit.
await Promise.all([...Array(150).keys()].map((i) => throttledUser.sendMessage(targetRoom, {msgtype: 'm.text.', body: `Message #${i}`})));
let messageCount = 0;
await getMessagesByUserIn(throttledUser, throttledUserId, targetRoom, 150, (events) => {
messageCount += events.length;
});
assert.equal(messageCount, 150, "There should have been 150 messages in this room");
})
})
describe("Test: Mjolnir can still sync and respond to commands while throttled", function () {
beforeEach(async function() {
await resetRatelimitForUser(await this.mjolnir.client.getUserId())
})
afterEach(async function() {
await overrideRatelimitForUser(await this.mjolnir.client.getUserId());
})
it('Can still perform and respond to a redaction command', async function () {
this.timeout(60000);
// Create a few users and a room.
let badUser = await newTestUser({ name: { contains: "spammer-needs-redacting" } });
let badUserId = await badUser.getUserId();
const mjolnir = this.mjolnir.client;
let mjolnirUserId = await mjolnir.getUserId();
let moderator = await newTestUser({ name: { contains: "moderator" } });
this.moderator = moderator;
await moderator.joinRoom(this.mjolnir.managementRoomId);
let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId(), mjolnirUserId]});
await moderator.setUserPowerLevel(mjolnirUserId, targetRoom, 100);
await badUser.joinRoom(targetRoom);
// Give Mjolnir some work to do and some messages to sync through.
await Promise.all([...Array(100).keys()].map((i) => moderator.sendMessage(this.mjolnir.managementRoomId, {msgtype: 'm.text.', body: `Irrelevant Message #${i}`})));
await Promise.all([...Array(50).keys()].map(_ => moderator.sendMessage(this.mjolnir.managementRoomId, {msgtype: 'm.text', body: '!mjolnir status'})));
await moderator.sendMessage(this.mjolnir.managementRoomId, {msgtype: 'm.text', body: `!mjolnir rooms add ${targetRoom}`});
await Promise.all([...Array(50).keys()].map((i) => badUser.sendMessage(targetRoom, {msgtype: 'm.text.', body: `Bad Message #${i}`})));
try {
moderator.start();
await getFirstReaction(moderator, this.mjolnir.managementRoomId, '✅', async () => {
return await moderator.sendMessage(this.mjolnir.managementRoomId, { msgtype: 'm.text', body: `!mjolnir redact ${badUserId} ${targetRoom}` });
});
} finally {
moderator.stop();
}
let count = 0;
await getMessagesByUserIn(moderator, badUserId, targetRoom, 1000, function(events) {
count += events.length
events.map(e => {
if (e.type === 'm.room.member') {
assert.equal(Object.keys(e.content).length, 1, "Only membership should be left on the membership event when it has been redacted.")
} else if (Object.keys(e.content).length !== 0) {
throw new Error(`This event should have been redacted: ${JSON.stringify(e, null, 2)}`)
}
})
});
assert.equal(count, 51, "There should be exactly 51 events from the spammer in this room.");
})
})

View File

@ -10,9 +10,9 @@ describe("Test: timeline pagination", function () {
it('does not paginate across the entire room history while backfilling.', async function() {
this.timeout(60000);
// Create a few users and a room.
let badUser = await newTestUser(false, "spammer");
let badUser = await newTestUser({ name: { contains: "spammer" }});
let badUserId = await badUser.getUserId();
let moderator = await newTestUser(false, "moderator");
let moderator = await newTestUser({ name: { contains: "moderator" }});
let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId()]});
await badUser.joinRoom(targetRoom);
@ -39,9 +39,9 @@ describe("Test: timeline pagination", function () {
})
it('does not call the callback with an empty array when there are no relevant events', async function() {
this.timeout(60000);
let badUser = await newTestUser(false, "spammer");
let badUser = await newTestUser({ name: { contains: "spammer" }});
let badUserId = await badUser.getUserId();
let moderator = await newTestUser(false, "moderator");
let moderator = await newTestUser({ name: { contains: "moderator" }});
let targetRoom = await moderator.createRoom();
// send some irrelevant messages
await Promise.all([...Array(200).keys()].map((i) => moderator.sendMessage(targetRoom, {msgtype: 'm.text.', body: `Irrelevant Message #${i}`})));
@ -54,9 +54,9 @@ describe("Test: timeline pagination", function () {
})
it("The limit provided is respected", async function() {
this.timeout(60000);
let badUser = await newTestUser(false, "spammer");
let badUser = await newTestUser({ name: { contains: "spammer" }});
let badUserId = await badUser.getUserId();
let moderator = await newTestUser(false, "moderator");
let moderator = await newTestUser({ name: { contains: "moderator" }});
let targetRoom = await moderator.createRoom({ invite: [await badUser.getUserId()]});
await badUser.joinRoom(targetRoom);
// send some bad person messages
@ -83,7 +83,7 @@ describe("Test: timeline pagination", function () {
});
it("Gives the events to the callback ordered by youngest first (even more important when the limit is reached halfway through a chunk).", async function() {
this.timeout(60000);
let moderator = await newTestUser(false, "moderator");
let moderator = await newTestUser({ name: { contains: "moderator" }});
let moderatorId = await moderator.getUserId();
let targetRoom = await moderator.createRoom();
for (let i = 0; i < 20; i++) {