test trades simultaneously using a thread pool instead of batches

This commit is contained in:
woodser 2023-02-15 10:59:57 -05:00
parent 8bc228b53d
commit 3ddde6e671

View file

@ -1866,23 +1866,17 @@ async function executeTrades(ctxs: TradeContext[], executionCtx?: TradeContext):
} }
await Promise.all(fundWalletPromises); await Promise.all(fundWalletPromises);
// execute trades in batches unless serial // execute trades in thread pool unless serial
if (executionCtx.concurrentTrades) { if (executionCtx.concurrentTrades) {
let batchNum = 0;
while (batchNum * executionCtx.maxConcurrency! < ctxs.length) {
if (ctxs.length > executionCtx.maxConcurrency!) HavenoUtils.log(0, "Executing trade batch " + (batchNum + 1));
const tradePromises: Promise<string>[] = []; const tradePromises: Promise<string>[] = [];
const ctxBatch = ctxs.slice(batchNum * executionCtx.maxConcurrency!, batchNum * executionCtx.maxConcurrency! + executionCtx.maxConcurrency!); const pool = new monerojs.ThreadPool(executionCtx.maxConcurrency!);
for (const ctx of ctxBatch) tradePromises.push(executeTrade(Object.assign(ctx, {concurrentTrades: executionCtx.concurrentTrades}))); for (const ctx of ctxs) tradePromises.push(pool.submit(() => executeTrade(Object.assign(ctx, {concurrentTrades: executionCtx!.concurrentTrades}))));
try { try {
const offerIdBatch = await Promise.all(tradePromises); offerIds = await Promise.all(tradePromises);
for (const offerId of offerIdBatch) offerIds.push(offerId);
} catch (e2) { } catch (e2) {
if (!executionCtx.stopOnFailure) await Promise.allSettled(tradePromises); // wait for other trades to complete before throwing error if (!executionCtx.stopOnFailure) await Promise.allSettled(tradePromises); // wait for other trades to complete before throwing error
throw e2; throw e2;
} }
batchNum++;
}
} else { } else {
for (const ctx of ctxs) { for (const ctx of ctxs) {
offerIds.push(await executeTrade(Object.assign(ctx, {concurrentTrades: executionCtx.concurrentTrades}))); offerIds.push(await executeTrade(Object.assign(ctx, {concurrentTrades: executionCtx.concurrentTrades})));
@ -2116,7 +2110,7 @@ async function executeTrade(ctx?: TradeContext): Promise<string> {
ctx.isPaymentReceived = true; ctx.isPaymentReceived = true;
fetchedTrade = await getSeller(ctx)!.getTrade(trade.getTradeId()); fetchedTrade = await getSeller(ctx)!.getTrade(trade.getTradeId());
expect(fetchedTrade.getPhase()).toEqual("PAYMENT_RECEIVED"); expect(fetchedTrade.getPhase()).toEqual("PAYMENT_RECEIVED");
await wait(ctx.walletSyncPeriodMs!); // buyer or arbitrator will sign and publish payout tx await wait(ctx.walletSyncPeriodMs! * 2); // buyer or arbitrator will sign and publish payout tx
await testTradeState(await getSeller(ctx)!.getTrade(trade.getTradeId()), {phase: "PAYMENT_RECEIVED", payoutState: ["PAYOUT_PUBLISHED", "PAYOUT_CONFIRMED", "PAYOUT_UNLOCKED"], isCompleted: false, isPayoutPublished: true}); await testTradeState(await getSeller(ctx)!.getTrade(trade.getTradeId()), {phase: "PAYMENT_RECEIVED", payoutState: ["PAYOUT_PUBLISHED", "PAYOUT_CONFIRMED", "PAYOUT_UNLOCKED"], isCompleted: false, isPayoutPublished: true});
} }