mirror of
https://github.com/matrix-org/mjolnir.git
synced 2024-10-01 01:36:06 -04:00
Batch events from ban lists together during sync (#221)
* Test for batching ACL. * Batch events from sync within BanList. * Introduce the BanList.batch event to the BanList emitter to let Mjolnir sync after new events have been added from sync. Fixes #203
This commit is contained in:
parent
097829d75a
commit
e9dff8fd5a
@ -26,7 +26,7 @@ import {
|
||||
UserID
|
||||
} from "matrix-bot-sdk";
|
||||
|
||||
import BanList, { ALL_RULE_TYPES, ListRuleChange, RULE_ROOM, RULE_SERVER, RULE_USER } from "./models/BanList";
|
||||
import BanList, { ALL_RULE_TYPES as ALL_BAN_LIST_RULE_TYPES, ListRuleChange, RULE_ROOM, RULE_SERVER, RULE_USER } from "./models/BanList";
|
||||
import { applyServerAcls } from "./actions/ApplyAcl";
|
||||
import { RoomUpdateError } from "./models/RoomUpdateError";
|
||||
import { COMMAND_PREFIX, handleCommand } from "./commands/CommandHandler";
|
||||
@ -535,6 +535,20 @@ export class Mjolnir {
|
||||
this.protections.delete(protectionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper for constructing `BanList`s and making sure they have the right listeners set up.
|
||||
* @param roomId The room id for the `BanList`.
|
||||
* @param roomRef A reference (matrix.to URL) for the `BanList`.
|
||||
*/
|
||||
private async addBanList(roomId: string, roomRef: string): Promise<BanList> {
|
||||
const list = new BanList(roomId, roomRef, this.client);
|
||||
this.ruleServer?.watch(list);
|
||||
list.on('BanList.batch', this.syncWithBanList.bind(this));
|
||||
await list.updateList();
|
||||
this.banLists.push(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
public async watchList(roomRef: string): Promise<BanList | null> {
|
||||
const joinedRooms = await this.client.getJoinedRooms();
|
||||
const permalink = Permalinks.parseUrl(roomRef);
|
||||
@ -547,10 +561,7 @@ export class Mjolnir {
|
||||
|
||||
if (this.banLists.find(b => b.roomId === roomId)) return null;
|
||||
|
||||
const list = new BanList(roomId, roomRef, this.client);
|
||||
this.ruleServer?.watch(list);
|
||||
await list.updateList();
|
||||
this.banLists.push(list);
|
||||
const list = await this.addBanList(roomId, roomRef);
|
||||
|
||||
await this.client.setAccountData(WATCHED_LISTS_EVENT_TYPE, {
|
||||
references: this.banLists.map(b => b.roomRef),
|
||||
@ -605,8 +616,8 @@ export class Mjolnir {
|
||||
}
|
||||
}
|
||||
|
||||
public async buildWatchedBanLists() {
|
||||
const banLists: BanList[] = [];
|
||||
private async buildWatchedBanLists() {
|
||||
this.banLists = [];
|
||||
const joinedRooms = await this.client.getJoinedRooms();
|
||||
|
||||
let watchedListsEvent: { references?: string[] } | null = null;
|
||||
@ -626,14 +637,8 @@ export class Mjolnir {
|
||||
}
|
||||
|
||||
await this.warnAboutUnprotectedBanListRoom(roomId);
|
||||
|
||||
const list = new BanList(roomId, roomRef, this.client);
|
||||
this.ruleServer?.watch(list);
|
||||
await list.updateList();
|
||||
banLists.push(list);
|
||||
await this.addBanList(roomId, roomRef);
|
||||
}
|
||||
|
||||
this.banLists = banLists;
|
||||
}
|
||||
|
||||
public async verifyPermissions(verbose = true, printRegardless = false) {
|
||||
@ -761,14 +766,11 @@ export class Mjolnir {
|
||||
/**
|
||||
* Pulls any changes to the rules that are in a policy room and updates all protected rooms
|
||||
* with those changes. Does not fail if there are errors updating the room, these are reported to the management room.
|
||||
* @param policyRoomId The room with a policy list which we will check for changes and apply them to all protected rooms.
|
||||
* @param banList The `BanList` which we will check for changes and apply them to all protected rooms.
|
||||
* @returns When all of the protected rooms have been updated.
|
||||
*/
|
||||
public async syncWithPolicyRoom(policyRoomId: string): Promise<void> {
|
||||
const banList = this.banLists.find(list => list.roomId === policyRoomId);
|
||||
if (banList === undefined) return;
|
||||
private async syncWithBanList(banList: BanList): Promise<void> {
|
||||
const changes = await banList.updateList();
|
||||
await this.printBanlistChanges(changes, banList, true);
|
||||
|
||||
let hadErrors = false;
|
||||
const aclErrors = await applyServerAcls(this.banLists, Object.keys(this.protectedRooms), this);
|
||||
@ -788,6 +790,8 @@ export class Mjolnir {
|
||||
formatted_body: html,
|
||||
});
|
||||
}
|
||||
// This can fail if the change is very large and it is much less important than applying bans, so do it last.
|
||||
await this.printBanlistChanges(changes, banList, true);
|
||||
}
|
||||
|
||||
private async handleEvent(roomId: string, event: any) {
|
||||
@ -805,9 +809,10 @@ export class Mjolnir {
|
||||
|
||||
// Check for updated ban lists before checking protected rooms - the ban lists might be protected
|
||||
// themselves.
|
||||
if (this.banLists.map(b => b.roomId).includes(roomId)) {
|
||||
if (ALL_RULE_TYPES.includes(event['type'])) {
|
||||
await this.syncWithPolicyRoom(roomId);
|
||||
const banList = this.banLists.find(list => list.roomId === roomId);
|
||||
if (banList !== undefined) {
|
||||
if (ALL_BAN_LIST_RULE_TYPES.includes(event['type']) || event['type'] === 'm.room.redaction') {
|
||||
banList.updateForEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -26,12 +26,13 @@ import { ERROR_KIND_FATAL, ERROR_KIND_PERMISSION } from "../ErrorCache";
|
||||
/**
|
||||
* Applies the server ACLs represented by the ban lists to the provided rooms, returning the
|
||||
* room IDs that could not be updated and their error.
|
||||
* Does not update the banLists before taking their rules to build the server ACL.
|
||||
* @param {BanList[]} lists The lists to construct ACLs from.
|
||||
* @param {string[]} roomIds The room IDs to apply the ACLs in.
|
||||
* @param {Mjolnir} mjolnir The Mjolnir client to apply the ACLs with.
|
||||
*/
|
||||
export async function applyServerAcls(lists: BanList[], roomIds: string[], mjolnir: Mjolnir): Promise<RoomUpdateError[]> {
|
||||
const serverName: string = new UserID(await config.RUNTIME.client!.getUserId()).domain;
|
||||
const serverName: string = new UserID(await mjolnir.client.getUserId()).domain;
|
||||
|
||||
// Construct a server ACL first
|
||||
const acl = new ServerAcl(serverName).denyIpAddresses().allowServer("*");
|
||||
|
@ -72,8 +72,12 @@ export interface ListRuleChange {
|
||||
}
|
||||
|
||||
declare interface BanList {
|
||||
// BanList.update is emitted when the BanList has pulled new rules from Matrix and informs listeners of any changes.
|
||||
on(event: 'BanList.update', listener: (list: BanList, changes: ListRuleChange[]) => void): this
|
||||
emit(event: 'BanList.update', list: BanList, changes: ListRuleChange[]): boolean
|
||||
// BanList.batch is emitted when the BanList has created a batch from the events provided by `updateForEvent`.
|
||||
on(event: 'BanList.batch', listener: (list: BanList) => void): this
|
||||
emit(event: 'BanList.batch', list: BanList): boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,6 +88,8 @@ class BanList extends EventEmitter {
|
||||
private shortcode: string|null = null;
|
||||
// A map of state events indexed first by state type and then state keys.
|
||||
private state: Map<string, Map<string, any>> = new Map();
|
||||
// Batches new events from sync together before starting the process to update the list.
|
||||
private readonly batcher: UpdateBatcher;
|
||||
|
||||
/**
|
||||
* Construct a BanList, does not synchronize with the room.
|
||||
@ -93,6 +99,7 @@ class BanList extends EventEmitter {
|
||||
*/
|
||||
constructor(public readonly roomId: string, public readonly roomRef: string, private client: MatrixClient) {
|
||||
super();
|
||||
this.batcher = new UpdateBatcher(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -281,6 +288,75 @@ class BanList extends EventEmitter {
|
||||
this.emit('BanList.update', this, changes);
|
||||
return changes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inform the `BanList` about a new event from the room it is modelling.
|
||||
* @param event An event from the room the `BanList` models to inform an instance about.
|
||||
*/
|
||||
public updateForEvent(event: { event_id: string }): void {
|
||||
this.batcher.addToBatch(event.event_id)
|
||||
}
|
||||
}
|
||||
|
||||
export default BanList;
|
||||
|
||||
/**
|
||||
* Helper class that emits a batch event on a `BanList` when it has made a batch
|
||||
* out of the events given to `addToBatch`.
|
||||
*/
|
||||
class UpdateBatcher {
|
||||
// Whether we are waiting for more events to form a batch.
|
||||
private isWaiting = false;
|
||||
// The latest (or most recent) event we have received.
|
||||
private latestEventId: string|null = null;
|
||||
private readonly waitPeriodMS = 200; // 200ms seems good enough.
|
||||
private readonly maxWaitMS = 3000; // 3s is long enough to wait while batching.
|
||||
|
||||
constructor(private readonly banList: BanList) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the state for the next batch.
|
||||
*/
|
||||
private reset() {
|
||||
this.latestEventId = null;
|
||||
this.isWaiting = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if any more events have been added to the current batch since
|
||||
* the previous iteration, then keep waiting up to `this.maxWait`, otherwise stop
|
||||
* and emit a batch.
|
||||
* @param eventId The id of the first event for this batch.
|
||||
*/
|
||||
private async checkBatch(eventId: string): Promise<void> {
|
||||
let start = Date.now();
|
||||
do {
|
||||
await new Promise(resolve => setTimeout(resolve, this.waitPeriodMS));
|
||||
} while ((Date.now() - start) < this.maxWaitMS && this.latestEventId !== eventId)
|
||||
this.reset();
|
||||
this.banList.emit('BanList.batch', this.banList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an event to the batch.
|
||||
* @param eventId The event to inform the batcher about.
|
||||
*/
|
||||
public addToBatch(eventId: string): void {
|
||||
if (this.isWaiting) {
|
||||
this.latestEventId = eventId;
|
||||
return;
|
||||
}
|
||||
this.latestEventId = eventId;
|
||||
this.isWaiting = true;
|
||||
// We 'spawn' off here after performing the checks above
|
||||
// rather than before (ie if `addToBatch` was async) because
|
||||
// `banListTest` showed that there were 100~ ACL events per protected room
|
||||
// as compared to just 5~ by doing this. Not entirely sure why but it probably
|
||||
// has to do with queuing up `n event` tasks on the event loop that exaust scheduling
|
||||
// (so the latency between them is percieved as much higher by
|
||||
// the time they get checked in `this.checkBatch`, thus batching fails).
|
||||
this.checkBatch(eventId);
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,12 @@
|
||||
import { strict as assert } from "assert";
|
||||
|
||||
import config from "../../src/config";
|
||||
import { newTestUser } from "./clientHelper";
|
||||
import { MatrixClient, UserID } from "matrix-bot-sdk";
|
||||
import { newTestUser, noticeListener } from "./clientHelper";
|
||||
import { LogService, MatrixClient, Permalinks, UserID } from "matrix-bot-sdk";
|
||||
import BanList, { ALL_RULE_TYPES, ChangeType, ListRuleChange, RULE_SERVER, RULE_USER } from "../../src/models/BanList";
|
||||
import { ServerAcl, ServerAclContent } from "../../src/models/ServerAcl";
|
||||
import { createBanList } from "./commands/commandUtils";
|
||||
import { getMessagesByUserIn } from "../../src/utils";
|
||||
|
||||
/**
|
||||
* Create a policy rule in a policy room.
|
||||
@ -225,3 +227,66 @@ describe('Test: We will not be able to ban ourselves via ACL.', function () {
|
||||
assert.equal(acl.literalAclContent().deny.length, 3);
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
describe('Test: ACL updates will batch when rules are added in succession.', function () {
|
||||
it('Will batch ACL updates if we spam rules into a BanList', async function () {
|
||||
this.timeout(180000)
|
||||
const mjolnir = config.RUNTIME.client!
|
||||
const serverName: string = new UserID(await mjolnir.getUserId()).domain
|
||||
const moderator = await newTestUser({ name: { contains: "moderator" }});
|
||||
moderator.joinRoom(this.mjolnir.managementRoomId);
|
||||
const mjolnirId = await mjolnir.getUserId();
|
||||
|
||||
// Setup some protected rooms so we can check their ACL state later.
|
||||
const protectedRooms: string[] = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const room = await moderator.createRoom({ invite: [mjolnirId]});
|
||||
await mjolnir.joinRoom(room);
|
||||
await moderator.setUserPowerLevel(mjolnirId, room, 100);
|
||||
await this.mjolnir!.addProtectedRoom(room);
|
||||
protectedRooms.push(room);
|
||||
}
|
||||
|
||||
// If a previous test hasn't cleaned up properly, these rooms will be populated by bogus ACLs at this point.
|
||||
await this.mjolnir!.syncLists();
|
||||
await Promise.all(protectedRooms.map(async room => {
|
||||
// We're going to need timeline pagination I'm afraid.
|
||||
const roomAcl = await mjolnir.getRoomStateEvent(room, "m.room.server_acl", "");
|
||||
assert.equal(roomAcl?.deny?.length ?? 0, 0, 'There should be no entries in the deny ACL.');
|
||||
}));
|
||||
|
||||
// Flood the watched list with banned servers, which should prompt Mjolnir to update server ACL in protected rooms.
|
||||
const banListId = await moderator.createRoom({ invite: [mjolnirId] });
|
||||
mjolnir.joinRoom(banListId);
|
||||
this.mjolnir!.watchList(Permalinks.forRoom(banListId));
|
||||
const acl = new ServerAcl(serverName).denyIpAddresses().allowServer("*");
|
||||
for (let i = 0; i < 200; i++) {
|
||||
const badServer = `${i}.evil.com`;
|
||||
acl.denyServer(badServer);
|
||||
await createPolicyRule(moderator, banListId, RULE_SERVER, badServer, `Rule #${i}`);
|
||||
// Give them a bit of a spread over time.
|
||||
await new Promise(resolve => setTimeout(resolve, 5));
|
||||
}
|
||||
|
||||
// We do this because it should force us to wait until all the ACL events have been applied.
|
||||
// Even if that does mean the last few events will not go through batching...
|
||||
await this.mjolnir!.syncLists();
|
||||
|
||||
// Check each of the protected rooms for ACL events and make sure they were batched and are correct.
|
||||
await Promise.all(protectedRooms.map(async room => {
|
||||
const roomAcl = await mjolnir.getRoomStateEvent(room, "m.room.server_acl", "");
|
||||
if (!acl.matches(roomAcl)) {
|
||||
assert.fail(`Room ${room} doesn't have the correct ACL: ${JSON.stringify(roomAcl, null, 2)}`)
|
||||
}
|
||||
let aclEventCount = 0;
|
||||
await getMessagesByUserIn(mjolnir, mjolnirId, room, 100, events => {
|
||||
events.forEach(event => event.type === 'm.room.server_acl' ? aclEventCount += 1 : null);
|
||||
});
|
||||
LogService.debug('BanListTest', `aclEventCount: ${aclEventCount}`);
|
||||
// If there's less than two then it means the ACL was updated by this test calling `this.mjolnir!.syncLists()`
|
||||
// and not the listener that detects changes to ban lists (that we want to test!).
|
||||
assert.equal(aclEventCount < 10 && aclEventCount > 2, true, 'We should have sent less than 10 ACL events to each room because they should be batched')
|
||||
}));
|
||||
})
|
||||
})
|
||||
|
@ -11,17 +11,26 @@ export const mochaHooks = {
|
||||
async function() {
|
||||
console.log("mochaHooks.beforeEach");
|
||||
// Sometimes it takes a little longer to register users.
|
||||
this.timeout(3000)
|
||||
this.timeout(10000)
|
||||
this.managementRoomAlias = config.managementRoom;
|
||||
this.mjolnir = await makeMjolnir();
|
||||
config.RUNTIME.client = this.mjolnir.client;
|
||||
await Promise.all([
|
||||
this.mjolnir.client.setAccountData('org.matrix.mjolnir.protected_rooms', { rooms: [] }),
|
||||
this.mjolnir.client.setAccountData('org.matrix.mjolnir.watched_lists', { references: [] }),
|
||||
]);
|
||||
await this.mjolnir.start();
|
||||
console.log("mochaHooks.beforeEach DONE");
|
||||
}
|
||||
],
|
||||
afterEach: [
|
||||
async function() {
|
||||
this.timeout(10000)
|
||||
await this.mjolnir.stop();
|
||||
await Promise.all([
|
||||
this.mjolnir.client.setAccountData('org.matrix.mjolnir.protected_rooms', { rooms: [] }),
|
||||
this.mjolnir.client.setAccountData('org.matrix.mjolnir.watched_lists', { references: [] }),
|
||||
]);
|
||||
// remove alias from management room and leave it.
|
||||
await teardownManagementRoom(this.mjolnir.client, this.mjolnir.managementRoomId, config.managementRoom);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user