wip tx queue worker

This commit is contained in:
smart_ex 2022-05-23 20:28:40 +10:00
parent 82d3cc63e4
commit 1828228a9d
6 changed files with 32 additions and 23 deletions

View File

@ -12,9 +12,10 @@ type PriceJobData = Token[]
type PriceJobReturn = number
type HealthJobReturn = { balance: BigNumber, isEnought: boolean }
type RelayerJobData = WithdrawalData & { id: string, status: JobStatus, type: RelayerJobType }
type RelayerJobReturn = any
export type RelayerJobData =
WithdrawalData
& { id: string, status: JobStatus, type: RelayerJobType, txHash?: string, confirmations?: number }
export type RelayerJobReturn = any
export type RelayerProcessor = Processor<RelayerJobData, RelayerJobReturn, RelayerJobType>
export type PriceProcessor = Processor<PriceJobData, PriceJobReturn, 'updatePrice'>
@ -80,14 +81,20 @@ export class RelayerQueueHelper {
get queue() {
if (!this._queue) {
this._queue = new Queue<RelayerJobData, RelayerJobReturn, RelayerJobType>(this.config.queueName, { connection: this.store.client });
this._queue = new Queue<RelayerJobData, RelayerJobReturn, RelayerJobType>(this.config.queueName, {
connection: this.store.client,
defaultJobOptions: { stackTraceLimit: 100 },
});
}
return this._queue;
}
get worker() {
if (!this._worker) {
this._worker = new Worker<RelayerJobData, RelayerJobReturn, RelayerJobType>(this.config.queueName, relayerProcessor, { connection: this.store.client });
this._worker = new Worker<RelayerJobData, RelayerJobReturn, RelayerJobType>(this.config.queueName, relayerProcessor, {
connection: this.store.client,
concurrency: 1,
});
}
return this._worker;
}

View File

@ -4,6 +4,5 @@ import { PriceProcessor } from './index';
export const priceProcessor: PriceProcessor = async (job) => {
const priceService = getPriceService();
const result = await priceService.fetchPrices(job.data);
console.log('priceProcessor', result);
return await priceService.savePrices(result);
};

View File

@ -3,14 +3,21 @@ import { getTxService } from '../services';
import { JobStatus } from '../types';
export const relayerProcessor: RelayerProcessor = async (job) => {
await job.update({ ...job.data, status: JobStatus.ACCEPTED });
console.log(`Start processing a new ${job.data.type} job ${job.id}`);
try {
await job.update({ ...job.data, status: JobStatus.ACCEPTED });
console.log(`Start processing a new ${job.data.type} job ${job.id}`);
const txService = getTxService();
txService.currentJob = job;
const withdrawalData = job.data;
await txService.checkTornadoFee(withdrawalData);
const txData = await txService.prepareTxData(withdrawalData);
const receipt = await txService.sendTx(txData);
return receipt;
} catch (e) {
console.log(e);
await job.update({ ...job.data, status: JobStatus.FAILED });
throw new Error(e.message);
}
const txService = getTxService();
const withdrawalData = job.data;
await txService.checkTornadoFee(withdrawalData);
const txData = await txService.prepareTxData(withdrawalData);
const receipt = await txService.sendTx(txData);
console.log(receipt);
return receipt;
};

View File

@ -17,8 +17,9 @@ export const schedulerWorker = async () => {
export const relayerWorker = async () => {
await configService.init();
const relayer = new RelayerQueueHelper();
console.log(relayer.queue.name, 'worker started');
relayer.worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result: ${result}`);
console.log(`Job ${job.id} completed with result: `, result);
});
relayer.worker.on('failed', (job, error) => console.log(error));
};

View File

@ -12,7 +12,6 @@ type relayerQueueName = `relayer_${availableIds}`
@singleton()
export class ConfigService {
static instance: ConfigService;
netIdKey: netIds;
queueName: relayerQueueName;
tokens: Token[];
@ -35,7 +34,6 @@ export class ConfigService {
this.instances = instances[this.netIdKey];
this.provider = getProvider(false);
this.wallet = new Wallet(this.privateKey, this.provider);
console.log(this.wallet.address);
this._fillInstanceMap();
}
@ -43,10 +41,6 @@ export class ConfigService {
return this._proxyContract;
}
get proxyAddress(): string {
return this._proxyAddress;
}
private _fillInstanceMap() {
if (!this.instances) throw new Error('config mismatch, check your environment variables');
// TODO

View File

@ -1,3 +1,4 @@
import { schedulerWorker } from './queue/worker';
import { schedulerWorker, relayerWorker } from './queue/worker';
schedulerWorker();
relayerWorker();