From 881a8a906332c46f25ffc49698f080aea591aaf2 Mon Sep 17 00:00:00 2001 From: David Teller Date: Thu, 19 May 2022 15:12:31 +0200 Subject: [PATCH] WIP: Feedback --- src/commands/KickCommand.ts | 4 +- src/queues/ThrottlingQueue.ts | 46 +--------- test/integration/throttleQueueTest.ts | 119 +++++--------------------- 3 files changed, 23 insertions(+), 146 deletions(-) diff --git a/src/commands/KickCommand.ts b/src/commands/KickCommand.ts index fc98159..deec02c 100644 --- a/src/commands/KickCommand.ts +++ b/src/commands/KickCommand.ts @@ -48,7 +48,5 @@ export async function execKickCommand(roomId: string, event: any, mjolnir: Mjoln } } - await mjolnir.taskQueue.push(async () => { - return mjolnir.client.unstableApis.addReactionToEvent(roomId, event['event_id'], '✅'); - }) + return mjolnir.client.unstableApis.addReactionToEvent(roomId, event['event_id'], '✅'); } diff --git a/src/queues/ThrottlingQueue.ts b/src/queues/ThrottlingQueue.ts index 2108bd1..b9052c6 100644 --- a/src/queues/ThrottlingQueue.ts +++ b/src/queues/ThrottlingQueue.ts @@ -91,50 +91,6 @@ export class ThrottlingQueue { }); } - /** - * Push an iterator of tasks onto the queue. - * - * As long as the iterator isn't complete, new tasks from this generator - * will be automatically added to the queue. Task errors are logged but - * do not stop the iterator. Iterator errors do stop the iterator. - * - * @param source An asynchronous iterator of tasks. - * @return A promise resolved once `source` is exhausted or rejected once - * `source` throws. - */ - public pull(source: () => Promise<{done: boolean, value?: Task}>): Promise { - // Wrap `source` in a promise resolved once it is exhausted. - return new Promise((resolve, reject) => { - const step = async () => { - let item; - // In case of iterator failure, stop immediately and reject. - try { - item = await source(); - } catch (ex) { - return reject(ex); - } - // In case of task failure, ignore the failure (it has been - // logged by `push` already) and proceed. - try { - if (item.value) { - /* do not await, or we'll create a deadlock */ this.push(item.value); - } - } finally { - // If this was the last item, resolve and stop, otherwise - // rearm and wait for the next item. - if (item.done) { - return resolve(void(0)); - } else { - // Since entries are executed in order, `item.value` will be completed - // before we execute the next call to `step`. - this.push(step); - } - } - }; - this.push(step); - }); - } - /** * Block a queue for a number of milliseconds. * @@ -232,4 +188,4 @@ export class ThrottlingQueue { this.start(); } } -} \ No newline at end of file +} diff --git a/test/integration/throttleQueueTest.ts b/test/integration/throttleQueueTest.ts index 54bd95b..829e3f3 100644 --- a/test/integration/throttleQueueTest.ts +++ b/test/integration/throttleQueueTest.ts @@ -9,10 +9,10 @@ describe("Test: ThrottlingQueue", function() { const queue = new ThrottlingQueue(this.mjolnir, 10); let state = new Map(); - let promises = []; + let promises: Promise[] = []; for (let counter = 0; counter < 10; ++counter) { const i = counter; - promises.push(queue.push(async () => { + const promise = queue.push(async () => { if (state.get(i)) { throw new Error(`We shouldn't have set state[${i}] yet`); } @@ -22,55 +22,19 @@ describe("Test: ThrottlingQueue", function() { throw new Error(`We should have set state[${j}] already`); } } - })); + }); + promises.push(promise); } await Promise.all(promises); - // Give code a little bit more time to trip itself. + for (let i = 0; i < 10; ++i) { + if (!state.get(i)) { + throw new Error(`This is the end of the test, we should have set state[${i}]`); + } + } + + // Give code a little bit more time to trip itself, in case `promises` are accidentally + // resolved too early. await new Promise(resolve => setTimeout(resolve, 1000)); - for (let i = 0; i < 10; ++i) { - if (!state.get(i)) { - throw new Error(`This is the end of the test, we should have set state[${i}]`); - } - } - - queue.dispose(); - }); - - it("Tasks enqueued with `pull()` are executed exactly once and in the right order", async function() { - this.timeout(200000); - const queue = new ThrottlingQueue(this.mjolnir, 10); - let state = new Map(); - let promise; - - { - let counter = 0; - promise = queue.pull(async () => { - if (counter == 10) { - return { done: true }; - } - const i = counter++; - return { - done: false, - value: async () => { - if (state.get(i)) { - throw new Error(`We shouldn't have set state[${i}] yet`); - } - state.set(i, true); - for (let j = 0; j < i; ++j) { - if (!state.get(j)) { - throw new Error(`We should have set state[${j}] already`); - } - } - } - } - }); - } - await promise; - for (let i = 0; i < 10; ++i) { - if (!state.get(i)) { - throw new Error(`This is the end of the test, we should have set state[${i}]`); - } - } queue.dispose(); }); @@ -79,7 +43,7 @@ describe("Test: ThrottlingQueue", function() { this.timeout(20000); const queue = new ThrottlingQueue(this.mjolnir, 10); let state = new Map(); - let promises = []; + let promises: Promise[] = []; for (let counter = 0; counter < 10; ++counter) { const i = counter; promises.push(queue.push(async () => { @@ -103,56 +67,15 @@ describe("Test: ThrottlingQueue", function() { queue.block(100); await Promise.all(promises); - // Give code a little bit more time to trip itself. + for (let i = 0; i < 10; ++i) { + if (!state.get(i)) { + throw new Error(`This is the end of the test, we should have set state[${i}]`); + } + } + + // Give code a little bit more time to trip itself, in case `promises` are accidentally + // resolved too early. await new Promise(resolve => setTimeout(resolve, 1000)); - for (let i = 0; i < 10; ++i) { - if (!state.get(i)) { - throw new Error(`This is the end of the test, we should have set state[${i}]`); - } - } - - queue.dispose(); - }); - - it("Tasks enqueued with `pull()` are executed exactly once and in the right order, even if we call `block()` at some point", async function() { - this.timeout(20000); - const queue = new ThrottlingQueue(this.mjolnir, 10); - let state = new Map(); - let promise; - - { - let counter = 0; - promise = queue.pull(async () => { - if (counter == 10) { - return { done: true }; - } - const i = counter++; - return { - done: false, - value: async () => { - if (state.get(i)) { - throw new Error(`We shouldn't have set state[${i}] yet`); - } - state.set(i, true); - if (i % 3 === 0) { - // Arbitrary call to `delay()`. - queue.block(20); - } - for (let j = 0; j < i; ++j) { - if (!state.get(j)) { - throw new Error(`We should have set state[${j}] already`); - } - } - } - } - }); - } - await promise; - for (let i = 0; i < 10; ++i) { - if (!state.get(i)) { - throw new Error(`This is the end of the test, we should have set state[${i}]`); - } - } queue.dispose(); });