Batch events from sync within BanList.

Fixes #203
This commit is contained in:
gnuxie 2022-02-10 11:35:25 +00:00
parent b7eb129978
commit f229716150
3 changed files with 91 additions and 64 deletions

View File

@ -44,7 +44,6 @@ import { htmlEscape } from "./utils";
import { ReportManager } from "./report/ReportManager";
import { WebAPIs } from "./webapis/WebAPIs";
import RuleServer from "./models/RuleServer";
import { batchedSyncWithBanList } from "./actions/policySyncBatcher";
export const STATE_NOT_STARTED = "not_started";
export const STATE_CHECKING_PERMISSIONS = "checking_permissions";
@ -534,6 +533,20 @@ export class Mjolnir {
this.protections.delete(protectionName);
}
/**
* Helper for constructing `BanList`s and making sure they have the right listeners set up.
* @param roomId The room id for the `BanList`.
* @param roomRef A reference (matrix.to URL) for the `BanList`.
*/
private async addBanList(roomId: string, roomRef: string): Promise<BanList> {
const list = new BanList(roomId, roomRef, this.client);
this.ruleServer?.watch(list);
list.on('BanList.batch', this.syncWithBanList.bind(this));
await list.updateList();
this.banLists.push(list);
return list;
}
public async watchList(roomRef: string): Promise<BanList | null> {
const joinedRooms = await this.client.getJoinedRooms();
const permalink = Permalinks.parseUrl(roomRef);
@ -546,10 +559,7 @@ export class Mjolnir {
if (this.banLists.find(b => b.roomId === roomId)) return null;
const list = new BanList(roomId, roomRef, this.client);
this.ruleServer?.watch(list);
await list.updateList();
this.banLists.push(list);
const list = await this.addBanList(roomId, roomRef);
await this.client.setAccountData(WATCHED_LISTS_EVENT_TYPE, {
references: this.banLists.map(b => b.roomRef),
@ -605,7 +615,7 @@ export class Mjolnir {
}
public async buildWatchedBanLists() {
const banLists: BanList[] = [];
this.banLists = [];
const joinedRooms = await this.client.getJoinedRooms();
let watchedListsEvent: { references?: string[] } | null = null;
@ -625,14 +635,8 @@ export class Mjolnir {
}
await this.warnAboutUnprotectedBanListRoom(roomId);
const list = new BanList(roomId, roomRef, this.client);
this.ruleServer?.watch(list);
await list.updateList();
banLists.push(list);
await this.addBanList(roomId, roomRef);
}
this.banLists = banLists;
}
public async verifyPermissions(verbose = true, printRegardless = false) {
@ -763,7 +767,7 @@ export class Mjolnir {
* @param policyRoomId The room with a policy list which we will check for changes and apply them to all protected rooms.
* @returns When all of the protected rooms have been updated.
*/
public async immediateSyncWithBanList(banList: BanList): Promise<void> {
public async syncWithBanList(banList: BanList): Promise<void> {
const changes = await banList.updateList();
let hadErrors = false;
@ -788,19 +792,6 @@ export class Mjolnir {
await this.printBanlistChanges(changes, banList, true);
}
/**
* Pulls any changes to the rules that are in a policy room and updates all protected rooms
* with those changes. Does not fail if there are errors updating the room, these are reported to the management room.
* @param policyRoomId The room with a policy list which we will check for changes and apply them to all protected rooms.
* @returns When all of the protected rooms have been updated.
*/
public syncWithBanList(policyRoomId: string, promptingChange: { event_id: string, origin_server_ts: number, type: string }): void {
const banList = this.banLists.find(list => list.roomId === policyRoomId);
if (banList === undefined) return;
batchedSyncWithBanList(this, banList, promptingChange.event_id);
}
private async handleEvent(roomId: string, event: any) {
// Check for UISI errors
if (roomId === this.managementRoomId) {
@ -816,9 +807,10 @@ export class Mjolnir {
// Check for updated ban lists before checking protected rooms - the ban lists might be protected
// themselves.
if (this.banLists.map(b => b.roomId).includes(roomId)) {
const banList = this.banLists.find(list => list.roomId === roomId);
if (banList !== undefined) {
if (ALL_RULE_TYPES.includes(event['type'])) {
await this.syncWithBanList(roomId, event);
banList.updateForEvent(event)
}
}

View File

@ -1,35 +0,0 @@
import { LogService } from "matrix-bot-sdk";
import { Mjolnir } from "../Mjolnir";
import BanList from "../models/BanList";
/**
*
*/
export const batchedSyncWithBanList: ((mjolnir: Mjolnir, banList: BanList, updateEventId: string) => Promise<void>) = (() => {
// A Map of ban list room ids and the event id of the policy event that was most recently sent there.
// When the batcher has finished waiting, it will remove the entry for a banlist BEFORE it syncs Mjolnir with it.
const queuedListUpdates: Map<string, string> = new Map();
const waitPeriod = 200; // 200ms seems good enough.
const maxWait = 3000; // 3s is long enough to wait while batching.
return async function(mjolnir: Mjolnir, banList: BanList, eventId: string) {
//
if (queuedListUpdates.has(banList.roomId)) {
queuedListUpdates.set(banList.roomId, eventId);
return;
}
queuedListUpdates.set(banList.roomId, eventId);
let start = Date.now();
do {
await new Promise(resolve => setTimeout(resolve, waitPeriod));
} while ((Date.now() - start) < maxWait && queuedListUpdates.get(banList.roomId) !== eventId)
queuedListUpdates.delete(banList.roomId);
try {
await mjolnir.immediateSyncWithBanList(banList)
} catch (e) {
LogService.error('Mjolnir.syncForUpdatedPolicyRoom', `Error syncing BanList ${banList.roomId}: `, e);
}
}
})()

View File

@ -74,6 +74,8 @@ export interface ListRuleChange {
declare interface BanList {
on(event: 'BanList.update', listener: (list: BanList, changes: ListRuleChange[]) => void): this
emit(event: 'BanList.update', list: BanList, changes: ListRuleChange[]): boolean
on(event: 'BanList.batch', listener: (list: BanList) => void): this
emit(event: 'BanList.batch', list: BanList): boolean
}
/**
@ -84,6 +86,8 @@ class BanList extends EventEmitter {
private shortcode: string|null = null;
// A map of state events indexed first by state type and then state keys.
private state: Map<string, Map<string, any>> = new Map();
// Batches new events from sync together before starting the process to update the list.
private readonly batcher: UpdateBatcher;
/**
* Construct a BanList, does not synchronize with the room.
@ -93,6 +97,7 @@ class BanList extends EventEmitter {
*/
constructor(public readonly roomId: string, public readonly roomRef: string, private client: MatrixClient) {
super();
this.batcher = new UpdateBatcher(this);
}
/**
@ -281,6 +286,71 @@ class BanList extends EventEmitter {
this.emit('BanList.update', this, changes);
return changes;
}
/**
* Inform the `BanList` about a new event from the room it is modelling.
* @param event An event from the room the `BanList` models to inform an instance about.
*/
public updateForEvent(event: { event_id: string }): void {
// We have to allow the batcher to emit BanList.batch because
// if we await in the updateForEvent method that is called by Mjolnir's sync
// event emitter, then by the time we start batching we will be far too late
// and unable to batch effectivly
// if you don't believe me you can test it for yourself, it is rubbish.
this.batcher.addToBatch(event.event_id)
}
}
export default BanList;
/**
* Helper class that emits a batch event on a `BanList` when it has made a batch
* out of the events given to `addToBatch`.
*/
class UpdateBatcher {
private isWaiting = false;
private previousEventId: string|null = null;
private readonly waitPeriod = 200; // 200ms seems good enough.
private readonly maxWait = 3000; // 3s is long enough to wait while batching.
constructor(private readonly banList: BanList) {
}
/**
* Reset the state for the next batch.
*/
private reset() {
this.previousEventId = null;
this.isWaiting = false;
}
/**
* Checks if any more events have been added to the current batch since
* the previous iteration, then keep waiting up to `this.maxWait`, otherwise stop
* and emit a batch.
* @param eventId The id of the first event for this batch.
*/
private async checkBatch(eventId: string): Promise<void> {
let start = Date.now();
do {
await new Promise(resolve => setTimeout(resolve, this.waitPeriod));
} while ((Date.now() - start) < this.maxWait && this.previousEventId !== eventId)
this.reset();
this.banList.emit('BanList.batch', this.banList);
}
/**
* Adds an event to the batch.
* @param eventId The event to inform the batcher about.
*/
public addToBatch(eventId: string): void {
if (this.isWaiting) {
this.previousEventId = eventId;
return;
}
this.previousEventId = eventId;
this.isWaiting = true;
this.checkBatch(eventId);
}
}