Initial Ruleserver prototype.

This is an experimental ruleserver that will serve the combined rules from
the active policy lists to a Synapse module over a web api.
This makes it easier to communicate changes in policy lists to Synapse workers
that do not have an immediate view over all of the policy rooms at
the same time.
This also allows us to express moderation actions to the homeserver
beyond what is currently expressible via MSC2313 policy
lists.
This commit is contained in:
gnuxie 2021-10-22 09:47:05 +01:00
parent 47e3d8ae2c
commit 1a65122b0d
6 changed files with 387 additions and 9 deletions

View File

@ -178,7 +178,8 @@ web:
# The address to listen for requests on. Defaults to all addresses.
# Be careful with this setting, as opening to the wide web will increase
# your security perimeter.
address: localhost
# We listen on all in harness because we might be getting requests through the docker gateway.
address: "0.0.0.0"
# A web API designed to intercept Matrix API
# POST /_matrix/client/r0/rooms/{roomId}/report/{eventId}
@ -186,3 +187,7 @@ web:
abuseReporting:
# Whether to enable this feature.
enabled: true
# A web API for a description of all the combined rules from watched banlists.
# GET /api/1/ruleserver/updates
ruleServer:
enabled: true

View File

@ -42,6 +42,7 @@ import { EventRedactionQueue, RedactUserInRoom } from "./queues/EventRedactionQu
import * as htmlEscape from "escape-html";
import { ReportManager } from "./report/ReportManager";
import { WebAPIs } from "./webapis/WebAPIs";
import RuleServer from "./models/RuleServer";
export const STATE_NOT_STARTED = "not_started";
export const STATE_CHECKING_PERMISSIONS = "checking_permissions";
@ -144,7 +145,8 @@ export class Mjolnir {
}
await logMessage(LogLevel.INFO, "index", "Mjolnir is starting up. Use !mjolnir to query status.");
const mjolnir = new Mjolnir(client, managementRoomId, protectedRooms, banLists);
const ruleServer = config.web.ruleServer ? new RuleServer() : null;
const mjolnir = new Mjolnir(client, managementRoomId, protectedRooms, banLists, ruleServer);
Mjolnir.addJoinOnInviteListener(mjolnir, client, config);
return mjolnir;
}
@ -154,6 +156,8 @@ export class Mjolnir {
public readonly managementRoomId: string,
public readonly protectedRooms: { [roomId: string]: string },
private banLists: BanList[],
// Combines the rules from ban lists so they can be served to a homeserver module or another consumer.
public readonly ruleServer: RuleServer|null,
) {
this.explicitlyProtectedRoomIds = Object.keys(this.protectedRooms);
@ -220,7 +224,7 @@ export class Mjolnir {
// Setup Web APIs
console.log("Creating Web APIs");
this.webapis = new WebAPIs(new ReportManager(this));
this.webapis = new WebAPIs(new ReportManager(this), this.ruleServer);
}
public get lists(): BanList[] {
@ -431,6 +435,7 @@ export class Mjolnir {
if (this.banLists.find(b => b.roomId === roomId)) return null;
const list = new BanList(roomId, roomRef, this.client);
this.ruleServer?.watch(list);
await list.updateList();
this.banLists.push(list);
@ -449,12 +454,14 @@ export class Mjolnir {
const roomId = await this.client.resolveRoom(permalink.roomIdOrAlias);
const list = this.banLists.find(b => b.roomId === roomId) || null;
if (list) this.banLists.splice(this.banLists.indexOf(list), 1);
if (list) {
this.banLists.splice(this.banLists.indexOf(list), 1);
this.ruleServer?.unwatch(list);
}
await this.client.setAccountData(WATCHED_LISTS_EVENT_TYPE, {
references: this.banLists.map(b => b.roomRef),
});
return list;
}
@ -508,6 +515,7 @@ export class Mjolnir {
await this.warnAboutUnprotectedBanListRoom(roomId);
const list = new BanList(roomId, roomRef, this.client);
this.ruleServer?.watch(list);
await list.updateList();
banLists.push(list);
}

View File

@ -76,6 +76,9 @@ interface IConfig {
abuseReporting: {
enabled: boolean;
}
ruleServer: {
enabled: boolean;
}
}
/**
@ -137,7 +140,10 @@ const defaultConfig: IConfig = {
address: "localhost",
abuseReporting: {
enabled: false,
}
},
ruleServer: {
enabled: false,
},
},
// Needed to make the interface happy.

View File

@ -15,6 +15,7 @@ limitations under the License.
*/
import { extractRequestError, LogService, MatrixClient } from "matrix-bot-sdk";
import { EventEmitter } from "events";
import { ListRule } from "./ListRule";
export const RULE_USER = "m.policy.rule.user";
@ -70,11 +71,16 @@ export interface ListRuleChange {
readonly previousState?: any,
}
declare interface BanList {
on(event: 'BanList.update', listener: (list: BanList, changes: ListRuleChange[]) => void): this
emit(event: 'BanList.update', list: BanList, changes: ListRuleChange[]): boolean
}
/**
* The BanList caches all of the rules that are active in a policy room so Mjolnir can refer to when applying bans etc.
* This cannot be used to update events in the modeled room, it is a readonly model of the policy room.
*/
export default class BanList {
class BanList extends EventEmitter {
private shortcode: string|null = null;
// A map of state events indexed first by state type and then state keys.
private state: Map<string, Map<string, any>> = new Map();
@ -86,6 +92,7 @@ export default class BanList {
* @param client A matrix client that is used to read the state of the room when `updateList` is called.
*/
constructor(public readonly roomId: string, public readonly roomRef, private client: MatrixClient) {
super();
}
/**
@ -268,6 +275,9 @@ export default class BanList {
changes.push({rule, changeType, event, sender: event.sender, ... previousState ? {previousState} : {} });
}
}
this.emit('BanList.update', this, changes);
return changes;
}
}
export default BanList;

319
src/models/RuleServer.ts Normal file
View File

@ -0,0 +1,319 @@
/*
Copyright 2019-2021 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import BanList, { ChangeType, ListRuleChange, RULE_ROOM, RULE_SERVER, RULE_USER } from "./BanList"
import * as crypto from "crypto";
import { LogService } from "matrix-bot-sdk";
import { ListRule } from "./ListRule";
export const USER_MAY_INVITE = 'user_may_invite';
export const CHECK_EVENT_FOR_SPAM = 'check_event_for_spam';
/**
* Rules in the RuleServer format that have been produced from a single event.
*/
class EventRules {
constructor (
readonly eventId: string,
readonly roomId: string,
readonly ruleServerRules: RuleServerRule[],
// The token associated with when the event rules were created.
readonly token: number
) {
}
}
/**
* A description of a property that should be checked as part of a RuleServerRule.
*/
interface Checks {
property: string;
}
/**
* A Rule served by the rule server.
*/
interface RuleServerRule {
// A unique identifier for this rule.
readonly id: string
// A description of a property that should be checked.
readonly checks: Checks
}
/**
* The RuleServer is an experimental server that is used to propogate the rules of the watched policy rooms (BanLists) to
* homeservers (or e.g. synapse modules).
* This is done using an experimental format that is heavily based on the "Spam Checker Callbacks" made available to
* synapse modules https://matrix-org.github.io/synapse/latest/modules/spam_checker_callbacks.html.
*
*/
export default class RuleServer {
// Each token is an index for a row of this two dimensional array.
// Each row represents the rules that were added during the lifetime of that token.
private ruleStartsByToken: EventRules[][] = [[]];
// Each row, indexed by a token, represents the rules that were stopped during the lifetime of that token.
private ruleStopsByToken: string[][] = [[]];
// We use this to quickly lookup if we have stored a policy without scanning rulesByToken.
// First key is the room id and the second is the event id.
private rulesByEvent: Map<string, Map<string, EventRules>> = new Map();
// A unique identifier for this server instance that is given to each response so we can tell if the token
// was issued by this server or not. This is important for when Mjolnir has been restarted
// but the client consuming the rules hasn't been
// and we need to tell the client we have rebuilt all of the rules (via `reset` in the response).
private readonly serverId: string = crypto.randomUUID();
// Represents the current instant in which rules can started and/or stopped.
// Should always be incremented before adding rules. See `nextToken`.
private currentToken = 0;
private readonly banListUpdateListener = this.update.bind(this);
/**
* The token is used to separate EventRules from each other based on when they were added.
* The lower the token, the longer a rule has been tracked for (relative to other rules in this RuleServer).
* The token is incremented before adding new rules to be served.
*/
private nextToken(): void {
this.currentToken += 1;
this.ruleStartsByToken.push([]);
this.ruleStopsByToken.push([]);
}
/**
* Get a combination of the serverId and currentToken to give to the client.
*/
private get since(): string {
return `${this.serverId}::${this.currentToken}`;
}
/**
* Get the `EventRules` object for a Matrix event.
* @param roomId The room the event came from.
* @param eventId The id of the event.
* @returns The `EventRules` object describing which rules have been created based on the policy the event represents
* or `undefined` if there are no `EventRules` associated with the event.
*/
private getEventRules(roomId: string, eventId: string): EventRules|undefined {
return this.rulesByEvent.get(roomId)?.get(eventId);
}
/**
* Add the EventRule to be served by the rule server at the current token.
* @param eventRules Add rules for an associated policy room event. (e.g. m.policy.rule.user).
* @throws If there are already rules associated with the event specified in `eventRules.eventId`.
*/
private addEventRules(eventRules: EventRules): void {
const {roomId, eventId, token} = eventRules;
if (this.rulesByEvent.get(roomId)?.has(eventId)) {
throw new TypeError(`There is already an entry in the RuleServer for rules created from the event ${eventId}.`);
}
const roomTable = this.rulesByEvent.get(roomId);
if (roomTable) {
roomTable.set(eventId, eventRules);
} else {
this.rulesByEvent.set(roomId, new Map().set(eventId, eventRules));
}
this.ruleStartsByToken[token].push(eventRules);
}
/**
* Stop serving the rules from this policy rule.
* @param eventRules The EventRules to stop serving from the rule server.
*/
private stopEventRules(eventRules: EventRules): void {
const {eventId, roomId, token} = eventRules;
this.rulesByEvent.get(roomId)?.delete(eventId);
// We expect that each row of `rulesByEvent` list of eventRules (represented by 1 row in `rulesByEvent`) to be relatively small (1-5)
// as it can only contain eventRules added during the instant of time represented by one token.
const index = this.ruleStartsByToken[token].indexOf(eventRules);
if (index > -1) {
this.ruleStartsByToken[token].splice(index, 1);
}
eventRules.ruleServerRules.map(rule => this.ruleStopsByToken[this.currentToken].push(rule.id));
}
/**
* Update the rule server to reflect a ListRule change.
* @param change A ListRuleChange sourced from a BanList.
*/
private applyRuleChange(change: ListRuleChange): void {
if (change.changeType === ChangeType.Added) {
const eventRules = new EventRules(change.event.event_id, change.event.room_id, toRuleServerFormat(change.rule), this.currentToken);
this.addEventRules(eventRules);
} else if (change.changeType === ChangeType.Modified) {
const entry: EventRules|undefined = this.getEventRules(change.event.roomId, change.previousState.event_id);
if (entry === undefined) {
LogService.error('RuleServer', `Could not find the rules for the previous modified state ${change.event['state_type']} ${change.event['state_key']} ${change.previousState?.event_id}`);
return;
}
this.stopEventRules(entry);
const eventRules = new EventRules(change.event.event_id, change.event.room_id, toRuleServerFormat(change.rule), this.currentToken);
this.addEventRules(eventRules);
} else if (change.changeType === ChangeType.Removed) {
// 1) When the change is a redaction, the original version of the event will be available to us in `change.previousState`.
// 2) When an event has been "soft redacted" (ie we have a new event with the same state type and state_key with no content),
// the events in the `previousState` and `event` slots of `change` will be distinct events.
// In either case (of redaction or "soft redaction") we can use `previousState` to get the right event id to stop.
const entry: EventRules|undefined = this.getEventRules(change.event.room_id, change.previousState.event_id);
if (entry === undefined) {
LogService.error('RuleServer', `Could not find the rules for the previous modified state ${change.event['state_type']} ${change.event['state_key']} ${change.previousState?.event_id}`);
return;
}
this.stopEventRules(entry);
}
}
/**
* Watch the ban list for changes and serve its policies as rules.
* You will almost always want to call this before calling `updateList` on the BanList for the first time,
* as we won't be able to serve rules that have already been interned in the BanList.
* @param banList a BanList to watch for rule changes with.
*/
public watch(banList: BanList): void {
banList.on('BanList.update', this.banListUpdateListener);
}
/**
* Remove all of the rules that have been created from the policies in this banList.
* @param banList The BanList to unwatch.
*/
public unwatch(banList: BanList): void {
banList.removeListener('BanList.update', this.banListUpdateListener);
const listRules = this.rulesByEvent.get(banList.roomId);
this.nextToken();
if (listRules) {
for (const rule of listRules.values()) {
this.stopEventRules(rule);
}
}
}
/**
* Process the changes that have been made to a BanList.
* This will ususally be called as a callback from `BanList.onChange`.
* @param banList The BanList that the changes happened in.
* @param changes An array of ListRuleChanges.
*/
private update(banList: BanList, changes: ListRuleChange[]) {
if (changes.length > 0) {
this.nextToken();
changes.forEach(this.applyRuleChange, this);
}
}
/**
* Get all of the new rules since the token.
* @param sinceToken A token that has previously been issued by this server.
* @returns An object with the rules that have been started and stopped since the token and a new token to poll for more rules with.
*/
public getUpdates(sinceToken: string | null): {start: RuleServerRule[], stop: string[], reset?: boolean, since: string} {
const updatesSince = <T = EventRules|string>(token: number | null, policyStore: T[][]): T[] => {
if (token === null) {
// The client is requesting for the first time, we will give them everything.
return policyStore.flat();
} else if (token === this.currentToken) {
// There will be no new rules to give this client, they're up to date.
return [];
} else {
return policyStore.slice(token).flat();
}
}
const [serverId, since] = sinceToken ? sinceToken.split('::') : [null, null];
const parsedSince: number | null = since ? parseInt(since, 10) : null;
if (serverId && serverId !== this.serverId) {
// The server has restarted, but the client has not and still has rules we can no longer account for.
// So we have to resend them everything.
return {
start: updatesSince(null, this.ruleStartsByToken).map((e: EventRules) => e.ruleServerRules).flat(),
stop: updatesSince(null, this.ruleStopsByToken),
since: this.since,
reset: true
}
} else {
// We will bring the client up to date on the rules.
return {
start: updatesSince(parsedSince, this.ruleStartsByToken).map((e: EventRules) => e.ruleServerRules).flat(),
stop: updatesSince(parsedSince, this.ruleStopsByToken),
since: this.since,
}
}
}
}
/**
* Convert a ListRule into the format that can be served by the rule server.
* @param policyRule A ListRule to convert.
* @returns An array of rules that can be served from the rule server.
*/
function toRuleServerFormat(policyRule: ListRule): RuleServerRule[] {
function makeLiteral(literal: string) {
return {literal}
}
function makeGlob(glob: string) {
return {glob}
}
function makeServerGlob(server: string) {
return {glob: `:${server}`}
}
function makeRule(checks: Checks) {
return {
id: crypto.randomUUID(),
checks: checks
}
}
if (policyRule.kind === RULE_USER) {
// Block any messages or invites from being sent by a matching local user
// Block any messages or invitations from being received that were sent by a matching remote user.
return [{
property: USER_MAY_INVITE,
user_id: [makeGlob(policyRule.entity)]
},
{
property: CHECK_EVENT_FOR_SPAM,
sender: [makeGlob(policyRule.entity)]
}].map(makeRule)
} else if (policyRule.kind === RULE_ROOM) {
// Block any messages being sent or received in the room, stop invitations being sent to the room and
// stop anyone receiving invitations from the room.
return [{
property: USER_MAY_INVITE,
'room_id': [makeLiteral(policyRule.entity)]
},
{
property: CHECK_EVENT_FOR_SPAM,
'room_id': [makeLiteral(policyRule.entity)]
}].map(makeRule)
} else if (policyRule.kind === RULE_SERVER) {
// Block any invitations from the server or any new messages from the server.
return [{
property: USER_MAY_INVITE,
user_id: [makeServerGlob(policyRule.entity)]
},
{
property: CHECK_EVENT_FOR_SPAM,
sender: [makeServerGlob(policyRule.entity)]
}].map(makeRule)
} else {
LogService.info('RuleServer', `Ignoring unsupported policy rule type ${policyRule.kind}`);
return []
}
}

View File

@ -17,11 +17,13 @@ limitations under the License.
import { Server } from "http";
import * as express from "express";
import { MatrixClient } from "matrix-bot-sdk";
import { LogService, MatrixClient } from "matrix-bot-sdk";
import config from "../config";
import RuleServer from "../models/RuleServer";
import { ReportManager } from "../report/ReportManager";
/**
* A common prefix for all web-exposed APIs.
*/
@ -33,7 +35,7 @@ export class WebAPIs {
private webController: express.Express = express();
private httpServer?: Server;
constructor(private reportManager: ReportManager) {
constructor(private reportManager: ReportManager, private readonly ruleServer: RuleServer|null) {
// Setup JSON parsing.
this.webController.use(express.json());
}
@ -56,6 +58,22 @@ export class WebAPIs {
});
console.log(`Configuring ${API_PREFIX}/report/:room_id/:event_id... DONE`);
}
// Configure ruleServer API.
// FIXME: Doesn't this need some kind of access control?
// See https://github.com/matrix-org/mjolnir/issues/139#issuecomment-1012221479.
if (config.web.ruleServer.enabled) {
const updatesUrl = `${API_PREFIX}/ruleserver/updates`;
LogService.info("WebAPIs", `Configuring ${updatesUrl}...`);
if (!this.ruleServer) {
throw new Error("The rule server to use has not been configured for the WebAPIs.");
}
const ruleServer: RuleServer = this.ruleServer;
this.webController.get(updatesUrl, async (request, response) => {
await this.handleRuleServerUpdate(ruleServer, { request, response, since: request.query.since as string});
});
LogService.info("WebAPIs", `Configuring ${updatesUrl}... DONE`);
}
}
public stop() {
@ -163,4 +181,16 @@ export class WebAPIs {
response.status(503);
}
}
async handleRuleServerUpdate(ruleServer: RuleServer, { since, request, response }: { since: string, request: express.Request, response: express.Response }) {
// FIXME Have to do this because express sends keep alive by default and during tests.
// The server will never be able to close because express never closes the sockets, only stops accepting new connections.
// See https://github.com/matrix-org/mjolnir/issues/139#issuecomment-1012221479.
response.set("Connection", "close");
try {
response.json(ruleServer.getUpdates(since)).status(200);
} catch (ex) {
LogService.error("WebAPIs", `Error responding to a rule server updates request`, since, ex);
}
}
}