2019-09-27 20:26:57 +00:00
/ *
2021-07-01 21:11:27 +00:00
Copyright 2019 - 2021 The Matrix . org Foundation C . I . C .
2019-09-27 20:26:57 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2020-04-15 00:46:39 +00:00
import {
2021-07-01 21:11:27 +00:00
extractRequestError ,
2020-04-15 00:46:39 +00:00
LogLevel ,
LogService ,
MatrixClient ,
MatrixGlob ,
MessageType ,
Permalinks ,
TextualMessageEventContent ,
2022-01-05 09:43:43 +00:00
UserID ,
getRequestFn ,
setRequestFn ,
2020-04-15 00:46:39 +00:00
} from "matrix-bot-sdk" ;
2019-12-10 02:56:12 +00:00
import { logMessage } from "./LogProxy" ;
import config from "./config" ;
2020-04-15 00:46:39 +00:00
import * as htmlEscape from "escape-html" ;
2021-12-21 14:10:25 +00:00
import { ClientRequest , IncomingMessage } from "http" ;
2019-10-09 14:53:37 +00:00
2019-09-27 20:26:57 +00:00
export function setToArray < T > ( set : Set < T > ) : T [ ] {
const arr : T [ ] = [ ] ;
for ( const v of set ) {
arr . push ( v ) ;
}
return arr ;
}
2019-10-09 14:53:37 +00:00
2019-12-10 02:43:41 +00:00
export function isTrueJoinEvent ( event : any ) : boolean {
const membership = event [ 'content' ] [ 'membership' ] || 'join' ;
let prevMembership = "leave" ;
if ( event [ 'unsigned' ] && event [ 'unsigned' ] [ 'prev_content' ] ) {
prevMembership = event [ 'unsigned' ] [ 'prev_content' ] [ 'membership' ] || 'leave' ;
}
// We look at the previous membership to filter out profile changes
return membership === 'join' && prevMembership !== "join" ;
}
2020-04-14 22:44:31 +00:00
export async function redactUserMessagesIn ( client : MatrixClient , userIdOrGlob : string , targetRoomIds : string [ ] , limit = 1000 ) {
2019-12-10 02:56:12 +00:00
for ( const targetRoomId of targetRoomIds ) {
2020-04-15 00:46:39 +00:00
await logMessage ( LogLevel . DEBUG , "utils#redactUserMessagesIn" , ` Fetching sent messages for ${ userIdOrGlob } in ${ targetRoomId } to redact... ` , targetRoomId ) ;
2019-12-10 02:56:12 +00:00
2020-06-12 14:15:48 +00:00
await getMessagesByUserIn ( client , userIdOrGlob , targetRoomId , limit , async ( eventsToRedact ) = > {
for ( const victimEvent of eventsToRedact ) {
await logMessage ( LogLevel . DEBUG , "utils#redactUserMessagesIn" , ` Redacting ${ victimEvent [ 'event_id' ] } in ${ targetRoomId } ` , targetRoomId ) ;
if ( ! config . noop ) {
await client . redactEvent ( targetRoomId , victimEvent [ 'event_id' ] ) ;
} else {
await logMessage ( LogLevel . WARN , "utils#redactUserMessagesIn" , ` Tried to redact ${ victimEvent [ 'event_id' ] } in ${ targetRoomId } but Mjolnir is running in no-op mode ` , targetRoomId ) ;
}
2019-12-10 02:56:12 +00:00
}
2020-06-12 14:15:48 +00:00
} ) ;
2019-12-10 02:56:12 +00:00
}
}
2019-12-10 02:43:41 +00:00
/ * *
* Gets all the events sent by a user ( or users if using wildcards ) in a given room ID , since
* the time they joined .
* @param { MatrixClient } client The client to use .
2021-10-14 09:35:53 +00:00
* @param { string } sender The sender . A matrix user id or a wildcard to match multiple senders e . g . * . example . com .
* Can also be used to generically search the sender field e . g . * bob * for all events from senders with "bob" in them .
* See ` MatrixGlob ` in matrix - bot - sdk .
2019-12-10 02:43:41 +00:00
* @param { string } roomId The room ID to search in .
2021-10-14 09:35:53 +00:00
* @param { number } limit The maximum number of messages to search . Defaults to 1000 . This will be a greater or equal
* number of events that are provided to the callback if a wildcard is used , as not all events paginated
* will match the glob . The reason the limit is calculated this way is so that a caller cannot accidentally
* traverse the entire room history .
2020-06-12 14:15:48 +00:00
* @param { function } cb Callback function to handle the events as they are received .
2021-10-14 09:35:53 +00:00
* The callback will only be called if there are any relevant events .
* @returns { Promise < void > } Resolves when either : the limit has been reached , no relevant events could be found or there is no more timeline to paginate .
2019-12-10 02:43:41 +00:00
* /
2021-10-14 09:35:53 +00:00
export async function getMessagesByUserIn ( client : MatrixClient , sender : string , roomId : string , limit : number , cb : ( events : any [ ] ) = > void ) : Promise < void > {
2021-10-13 10:54:27 +00:00
const isGlob = sender . includes ( "*" ) ;
const roomEventFilter = {
rooms : [ roomId ] ,
. . . isGlob ? { } : { senders : [ sender ] }
2019-10-09 14:53:37 +00:00
} ;
2019-12-10 02:43:41 +00:00
const matcher = new MatrixGlob ( sender ) ;
function testUser ( userId : string ) : boolean {
if ( isGlob ) {
return matcher . test ( userId ) ;
} else {
return userId === sender ;
}
}
2021-09-16 17:10:46 +00:00
/ * *
* Note : ` rooms/initialSync ` is deprecated . However , there is no replacement for this API for the time being .
* While previous versions of this function used ` /sync ` , experience shows that it can grow extremely
* slow ( 4 - 5 minutes long ) when we need to sync many large rooms , which leads to timeouts and
* breakage in Mjolnir , see https : //github.com/matrix-org/synapse/issues/10842.
* /
function roomInitialSync() {
return client . doRequest ( "GET" , ` /_matrix/client/r0/rooms/ ${ encodeURIComponent ( roomId ) } /initialSync ` ) ;
2019-10-09 14:53:37 +00:00
}
function backfill ( from : string ) {
const qs = {
2021-10-13 10:54:27 +00:00
filter : JSON.stringify ( roomEventFilter ) ,
2019-10-09 14:53:37 +00:00
from : from ,
dir : "b" ,
} ;
2021-10-14 09:35:53 +00:00
LogService . info ( "utils" , "Backfilling with token: " + from ) ;
2019-10-09 14:53:37 +00:00
return client . doRequest ( "GET" , ` /_matrix/client/r0/rooms/ ${ encodeURIComponent ( roomId ) } /messages ` , qs ) ;
}
// Do an initial sync first to get the batch token
2021-09-16 17:10:46 +00:00
const response = await roomInitialSync ( ) ;
2019-10-09 14:53:37 +00:00
2019-12-10 02:43:41 +00:00
let processed = 0 ;
2021-10-14 09:35:53 +00:00
/ * *
* Filter events from the timeline to events that are from a matching sender and under the limit that can be processed by the callback .
* @param events Events from the room timeline .
* @returns Events that can safely be processed by the callback .
* /
function filterEvents ( events : any [ ] ) {
2021-07-22 06:38:44 +00:00
const messages : any [ ] = [ ] ;
2021-10-14 09:35:53 +00:00
for ( const event of events ) {
if ( processed >= limit ) return messages ; // we have provided enough events.
2019-12-10 02:43:41 +00:00
processed ++ ;
if ( testUser ( event [ 'sender' ] ) ) messages . push ( event ) ;
2019-10-09 14:53:37 +00:00
}
2021-10-14 09:35:53 +00:00
return messages ;
}
2019-10-09 14:53:37 +00:00
2021-10-14 09:35:53 +00:00
// The recommended APIs for fetching events from a room is to use both rooms/initialSync then /messages.
// Unfortunately, this results in code that is rather hard to read, as these two APIs employ very different data structures.
// We prefer discarding the results from rooms/initialSync and reading only from /messages,
// even if it's a little slower, for the sake of code maintenance.
const timeline = response [ 'messages' ]
if ( timeline ) {
// The end of the PaginationChunk has the most recent events from rooms/initialSync.
// This token is required be present in the PagintionChunk from rooms/initialSync.
let token = timeline [ 'end' ] ! ;
// We check that we have the token because rooms/messages is not required to provide one
// and will not provide one when there is no more history to paginate.
while ( token && processed < limit ) {
const bfMessages = await backfill ( token ) ;
2019-12-10 02:43:41 +00:00
let lastToken = token ;
token = bfMessages [ 'end' ] ;
if ( lastToken === token ) {
2021-10-14 09:35:53 +00:00
LogService . debug ( "utils" , "Backfill returned same end token - returning early." ) ;
2020-06-12 14:15:48 +00:00
return ;
2019-10-09 14:53:37 +00:00
}
2021-10-14 09:35:53 +00:00
const events = filterEvents ( bfMessages [ 'chunk' ] || [ ] ) ;
// If we are using a glob, there may be no relevant events in this chunk.
2021-10-15 15:42:48 +00:00
if ( events . length > 0 ) {
await cb ( events ) ;
}
2019-10-09 14:53:37 +00:00
}
2021-10-14 09:35:53 +00:00
} else {
throw new Error ( ` Internal Error: rooms/initialSync did not return a pagination chunk for ${ roomId } , this is not normal and if it is we need to stop using it. See roomInitialSync() for why we are using it. ` ) ;
}
2019-10-09 14:53:37 +00:00
}
2020-04-15 00:46:39 +00:00
export async function replaceRoomIdsWithPills ( client : MatrixClient , text : string , roomIds : string [ ] | string , msgtype : MessageType = "m.text" ) : Promise < TextualMessageEventContent > {
if ( ! Array . isArray ( roomIds ) ) roomIds = [ roomIds ] ;
const content : TextualMessageEventContent = {
body : text ,
formatted_body : htmlEscape ( text ) ,
msgtype : msgtype ,
format : "org.matrix.custom.html" ,
} ;
const escapeRegex = ( v : string ) : string = > {
return v . replace ( /[-\/\\^$*+?.()|[\]{}]/g , '\\$&' ) ;
} ;
const viaServers = [ ( new UserID ( await client . getUserId ( ) ) ) . domain ] ;
for ( const roomId of roomIds ) {
2020-05-12 03:30:22 +00:00
let alias = roomId ;
try {
2020-05-12 03:31:47 +00:00
alias = ( await client . getPublishedAlias ( roomId ) ) || roomId ;
2020-05-12 03:30:22 +00:00
} catch ( e ) {
// This is a recursive call, so tell the function not to try and call us
await logMessage ( LogLevel . WARN , "utils" , ` Failed to resolve room alias for ${ roomId } - see console for details ` , null , true ) ;
2021-07-01 21:11:27 +00:00
LogService . warn ( "utils" , extractRequestError ( e ) ) ;
2020-05-12 03:30:22 +00:00
}
2020-04-15 00:46:39 +00:00
const regexRoomId = new RegExp ( escapeRegex ( roomId ) , "g" ) ;
content . body = content . body . replace ( regexRoomId , alias ) ;
2021-07-22 06:38:44 +00:00
if ( content . formatted_body ) {
content . formatted_body = content . formatted_body . replace ( regexRoomId , ` <a href=" ${ Permalinks . forRoom ( alias , viaServers ) } "> ${ alias } </a> ` ) ;
}
2020-04-15 00:46:39 +00:00
}
return content ;
}
2021-12-21 14:10:25 +00:00
2022-01-05 09:43:43 +00:00
let isMatrixClientPatchedForConciseExceptions = false ;
2022-01-04 11:33:08 +00:00
/ * *
2022-01-05 09:43:43 +00:00
* Patch ` MatrixClient ` into something that throws concise exceptions .
2022-01-04 11:33:08 +00:00
*
* By default , instances of ` MatrixClient ` throw instances of ` IncomingMessage `
* in case of many errors . Unfortunately , these instances are unusable :
*
* - they are logged as ~ 800 * lines of code * ;
* - there is no error message ;
* - they offer no stack .
*
2022-01-05 09:43:43 +00:00
* This method configures ` MatrixClient ` to ensure that methods that may throw
* instead throws more reasonable insetances of ` Error ` .
2022-01-04 11:33:08 +00:00
* /
2022-01-06 11:19:19 +00:00
function patchMatrixClientForConciseExceptions() {
2022-01-05 09:43:43 +00:00
if ( isMatrixClientPatchedForConciseExceptions ) {
return ;
}
let originalRequestFn = getRequestFn ( ) ;
setRequestFn ( ( params , cb ) = > {
2022-01-07 10:00:44 +00:00
// Store an error early, to maintain *some* semblance of stack.
// We'll only throw the error if there is one.
let error = new Error ( "STACK CAPTURE" ) ;
2022-01-05 09:43:43 +00:00
originalRequestFn ( params , function conciseExceptionRequestFn ( err , response , resBody ) {
2022-01-06 11:19:19 +00:00
if ( ! err && ( response ? . statusCode < 200 || response ? . statusCode >= 300 ) ) {
2022-01-05 09:43:43 +00:00
// Normally, converting HTTP Errors into rejections is done by the caller
// of `requestFn` within matrix-bot-sdk. However, this always ends up rejecting
// with an `IncomingMessage` - exactly what we wish to avoid here.
err = response ;
// Safety note: In the calling code within matrix-bot-sdk, if we return
// an IncomingMessage as an error, we end up logging an unredacted response,
// which may include tokens, passwords, etc. This could be a grave privacy
// leak. The matrix-bot-sdk typically handles this by sanitizing the data
// before logging it but, by converting the HTTP Error into a rejection
// earlier than expected by the matrix-bot-sdk, we skip this step of
// sanitization.
//
// However, since the error we're creating is an `IncomingMessage`, we
// rewrite it into an `Error` ourselves in this function. Our `Error`
// is even more sanitized (we only include the URL, HTTP method and
// the error response) so we are NOT causing a privacy leak.
if ( ! ( err instanceof IncomingMessage ) ) {
// Safety check.
throw new TypeError ( "Internal error: at this stage, the error should be an IncomingMessage" ) ;
}
2022-01-04 11:33:08 +00:00
}
2022-01-05 09:43:43 +00:00
if ( ! ( err instanceof IncomingMessage ) ) {
// In most cases, we're happy with the result.
return cb ( err , response , resBody ) ;
2022-01-04 11:33:08 +00:00
}
2022-01-05 09:43:43 +00:00
// However, MatrixClient has a tendency of throwing
// instances of `IncomingMessage` instead of instances
// of `Error`. The former take ~800 lines of log and
// provide no stack trace, which makes them typically
// useless.
let method : string | null = null ;
let path = '' ;
let body : string | null = null ;
if ( err . method ) {
method = err . method ;
2021-12-21 14:10:25 +00:00
}
2022-01-05 09:43:43 +00:00
if ( err . url ) {
path = err . url ;
}
if ( "req" in err && ( err as any ) . req instanceof ClientRequest ) {
if ( ! method ) {
method = ( err as any ) . req . method ;
}
if ( ! path ) {
path = ( err as any ) . req . path ;
}
}
if ( "body" in err ) {
2022-01-06 11:19:19 +00:00
body = ( err as any ) . body ;
2022-01-05 09:43:43 +00:00
}
2022-01-07 10:00:44 +00:00
let message = ` Error during MatrixClient request ${ method } ${ path } : ${ err . statusCode } ${ err . statusMessage } -- ${ body } ` ;
error . message = message ;
2022-01-06 11:19:19 +00:00
if ( body ) {
// Calling code may use `body` to check for errors, so let's
// make sure that we're providing it.
try {
body = JSON . parse ( body ) ;
} catch ( ex ) {
// Not JSON.
}
// Define the property but don't make it visible during logging.
Object . defineProperty ( error , "body" , {
value : body ,
enumerable : false ,
} ) ;
}
// Calling code may use `statusCode` to check for errors, so let's
// make sure that we're providing it.
if ( "statusCode" in err ) {
2022-01-07 10:00:44 +00:00
// Define the property but don't make it visible during logging.
2022-01-06 11:19:19 +00:00
Object . defineProperty ( error , "statusCode" , {
value : err.statusCode ,
enumerable : false ,
} ) ;
}
2022-01-05 09:43:43 +00:00
return cb ( error , response , resBody ) ;
} )
2021-12-21 14:10:25 +00:00
} ) ;
2022-01-05 09:43:43 +00:00
isMatrixClientPatchedForConciseExceptions = true ;
}
2022-01-25 12:19:44 +00:00
const MAX_REQUEST_ATTEMPTS = 15 ;
const REQUEST_RETRY_BASE_DURATION_MS = 100 ;
const TRACE_CONCURRENT_REQUESTS = false ;
let numberOfConcurrentRequests = 0 ;
let isMatrixClientPatchedForRetryWhenThrottled = false ;
/ * *
* Patch instances of MatrixClient to make sure that it retries requests
* in case of throttling .
*
* Note : As of this writing , we do not re - attempt requests that timeout ,
* only request that are throttled by the server . The rationale is that ,
* in case of DoS , we do not wish to make the situation even worse .
* /
function patchMatrixClientForRetry() {
if ( isMatrixClientPatchedForRetryWhenThrottled ) {
return ;
}
let originalRequestFn = getRequestFn ( ) ;
setRequestFn ( async ( params , cb ) = > {
let attempt = 1 ;
numberOfConcurrentRequests += 1 ;
if ( TRACE_CONCURRENT_REQUESTS ) {
console . trace ( "Current number of concurrent requests" , numberOfConcurrentRequests ) ;
}
try {
while ( true ) {
try {
let result : any [ ] = await new Promise ( ( resolve , reject ) = > {
originalRequestFn ( params , function requestFnWithRetry ( err , response , resBody ) {
// Note: There is no data race on `attempt` as we `await` before continuing
// to the next iteration of the loop.
if ( attempt < MAX_REQUEST_ATTEMPTS && err ? . body ? . errcode === 'M_LIMIT_EXCEEDED' ) {
// We need to retry.
reject ( err ) ;
} else {
// No need-to-retry error? Lucky us!
// Note that this may very well be an error, just not
// one we need to retry.
resolve ( [ err , response , resBody ] ) ;
}
} ) ;
} ) ;
// This is our final result.
// Pass result, whether success or error.
return cb ( . . . result ) ;
} catch ( err ) {
// Need to retry.
let retryAfterMs = attempt * attempt * REQUEST_RETRY_BASE_DURATION_MS ;
if ( "retry_after_ms" in err ) {
try {
retryAfterMs = Number . parseInt ( err . retry_after_ms , 10 ) ;
} catch ( ex ) {
// Use default value.
}
}
LogService . debug ( "Mjolnir.client" , ` Waiting ${ retryAfterMs } ms before retrying ${ params . method } ${ params . uri } ` ) ;
await new Promise ( resolve = > setTimeout ( resolve , retryAfterMs ) ) ;
attempt += 1 ;
}
}
} finally {
numberOfConcurrentRequests -= 1 ;
}
} ) ;
isMatrixClientPatchedForRetryWhenThrottled = true ;
}
2022-01-06 11:19:19 +00:00
/ * *
* Perform any patching deemed necessary to MatrixClient .
* /
export function patchMatrixClient() {
// Note that the order of patches is meaningful.
//
// - `patchMatrixClientForConciseExceptions` converts all `IncomingMessage`
// errors into instances of `Error` handled as errors;
// - `patchMatrixClientForRetry` expects that all errors are returned as
// errors.
patchMatrixClientForConciseExceptions ( ) ;
2022-01-25 12:19:44 +00:00
patchMatrixClientForRetry ( ) ;
2022-01-06 11:19:19 +00:00
}