mirror of
https://github.com/matrix-org/mjolnir.git
synced 2024-10-01 01:36:06 -04:00
WIP: Feedback
This commit is contained in:
parent
3292cf199e
commit
881a8a9063
@ -48,7 +48,5 @@ export async function execKickCommand(roomId: string, event: any, mjolnir: Mjoln
|
||||
}
|
||||
}
|
||||
|
||||
await mjolnir.taskQueue.push(async () => {
|
||||
return mjolnir.client.unstableApis.addReactionToEvent(roomId, event['event_id'], '✅');
|
||||
})
|
||||
}
|
||||
|
@ -91,50 +91,6 @@ export class ThrottlingQueue {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Push an iterator of tasks onto the queue.
|
||||
*
|
||||
* As long as the iterator isn't complete, new tasks from this generator
|
||||
* will be automatically added to the queue. Task errors are logged but
|
||||
* do not stop the iterator. Iterator errors do stop the iterator.
|
||||
*
|
||||
* @param source An asynchronous iterator of tasks.
|
||||
* @return A promise resolved once `source` is exhausted or rejected once
|
||||
* `source` throws.
|
||||
*/
|
||||
public pull<T>(source: () => Promise<{done: boolean, value?: Task<T>}>): Promise<void> {
|
||||
// Wrap `source` in a promise resolved once it is exhausted.
|
||||
return new Promise((resolve, reject) => {
|
||||
const step = async () => {
|
||||
let item;
|
||||
// In case of iterator failure, stop immediately and reject.
|
||||
try {
|
||||
item = await source();
|
||||
} catch (ex) {
|
||||
return reject(ex);
|
||||
}
|
||||
// In case of task failure, ignore the failure (it has been
|
||||
// logged by `push` already) and proceed.
|
||||
try {
|
||||
if (item.value) {
|
||||
/* do not await, or we'll create a deadlock */ this.push(item.value);
|
||||
}
|
||||
} finally {
|
||||
// If this was the last item, resolve and stop, otherwise
|
||||
// rearm and wait for the next item.
|
||||
if (item.done) {
|
||||
return resolve(void(0));
|
||||
} else {
|
||||
// Since entries are executed in order, `item.value` will be completed
|
||||
// before we execute the next call to `step`.
|
||||
this.push(step);
|
||||
}
|
||||
}
|
||||
};
|
||||
this.push(step);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Block a queue for a number of milliseconds.
|
||||
*
|
||||
|
@ -9,10 +9,10 @@ describe("Test: ThrottlingQueue", function() {
|
||||
|
||||
const queue = new ThrottlingQueue(this.mjolnir, 10);
|
||||
let state = new Map();
|
||||
let promises = [];
|
||||
let promises: Promise<void>[] = [];
|
||||
for (let counter = 0; counter < 10; ++counter) {
|
||||
const i = counter;
|
||||
promises.push(queue.push(async () => {
|
||||
const promise = queue.push(async () => {
|
||||
if (state.get(i)) {
|
||||
throw new Error(`We shouldn't have set state[${i}] yet`);
|
||||
}
|
||||
@ -22,55 +22,19 @@ describe("Test: ThrottlingQueue", function() {
|
||||
throw new Error(`We should have set state[${j}] already`);
|
||||
}
|
||||
}
|
||||
}));
|
||||
});
|
||||
promises.push(promise);
|
||||
}
|
||||
await Promise.all(promises);
|
||||
// Give code a little bit more time to trip itself.
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
if (!state.get(i)) {
|
||||
throw new Error(`This is the end of the test, we should have set state[${i}]`);
|
||||
}
|
||||
}
|
||||
|
||||
// Give code a little bit more time to trip itself, in case `promises` are accidentally
|
||||
// resolved too early.
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
if (!state.get(i)) {
|
||||
throw new Error(`This is the end of the test, we should have set state[${i}]`);
|
||||
}
|
||||
}
|
||||
|
||||
queue.dispose();
|
||||
});
|
||||
|
||||
it("Tasks enqueued with `pull()` are executed exactly once and in the right order", async function() {
|
||||
this.timeout(200000);
|
||||
const queue = new ThrottlingQueue(this.mjolnir, 10);
|
||||
let state = new Map();
|
||||
let promise;
|
||||
|
||||
{
|
||||
let counter = 0;
|
||||
promise = queue.pull(async () => {
|
||||
if (counter == 10) {
|
||||
return { done: true };
|
||||
}
|
||||
const i = counter++;
|
||||
return {
|
||||
done: false,
|
||||
value: async () => {
|
||||
if (state.get(i)) {
|
||||
throw new Error(`We shouldn't have set state[${i}] yet`);
|
||||
}
|
||||
state.set(i, true);
|
||||
for (let j = 0; j < i; ++j) {
|
||||
if (!state.get(j)) {
|
||||
throw new Error(`We should have set state[${j}] already`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
await promise;
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
if (!state.get(i)) {
|
||||
throw new Error(`This is the end of the test, we should have set state[${i}]`);
|
||||
}
|
||||
}
|
||||
|
||||
queue.dispose();
|
||||
});
|
||||
@ -79,7 +43,7 @@ describe("Test: ThrottlingQueue", function() {
|
||||
this.timeout(20000);
|
||||
const queue = new ThrottlingQueue(this.mjolnir, 10);
|
||||
let state = new Map();
|
||||
let promises = [];
|
||||
let promises: Promise<void>[] = [];
|
||||
for (let counter = 0; counter < 10; ++counter) {
|
||||
const i = counter;
|
||||
promises.push(queue.push(async () => {
|
||||
@ -103,56 +67,15 @@ describe("Test: ThrottlingQueue", function() {
|
||||
queue.block(100);
|
||||
|
||||
await Promise.all(promises);
|
||||
// Give code a little bit more time to trip itself.
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
if (!state.get(i)) {
|
||||
throw new Error(`This is the end of the test, we should have set state[${i}]`);
|
||||
}
|
||||
}
|
||||
|
||||
// Give code a little bit more time to trip itself, in case `promises` are accidentally
|
||||
// resolved too early.
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
if (!state.get(i)) {
|
||||
throw new Error(`This is the end of the test, we should have set state[${i}]`);
|
||||
}
|
||||
}
|
||||
|
||||
queue.dispose();
|
||||
});
|
||||
|
||||
it("Tasks enqueued with `pull()` are executed exactly once and in the right order, even if we call `block()` at some point", async function() {
|
||||
this.timeout(20000);
|
||||
const queue = new ThrottlingQueue(this.mjolnir, 10);
|
||||
let state = new Map();
|
||||
let promise;
|
||||
|
||||
{
|
||||
let counter = 0;
|
||||
promise = queue.pull(async () => {
|
||||
if (counter == 10) {
|
||||
return { done: true };
|
||||
}
|
||||
const i = counter++;
|
||||
return {
|
||||
done: false,
|
||||
value: async () => {
|
||||
if (state.get(i)) {
|
||||
throw new Error(`We shouldn't have set state[${i}] yet`);
|
||||
}
|
||||
state.set(i, true);
|
||||
if (i % 3 === 0) {
|
||||
// Arbitrary call to `delay()`.
|
||||
queue.block(20);
|
||||
}
|
||||
for (let j = 0; j < i; ++j) {
|
||||
if (!state.get(j)) {
|
||||
throw new Error(`We should have set state[${j}] already`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
await promise;
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
if (!state.get(i)) {
|
||||
throw new Error(`This is the end of the test, we should have set state[${i}]`);
|
||||
}
|
||||
}
|
||||
|
||||
queue.dispose();
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user