diff --git a/src/queue/index.ts b/src/queue/index.ts index 3f56b80..eb120b5 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -12,9 +12,10 @@ type PriceJobData = Token[] type PriceJobReturn = number type HealthJobReturn = { balance: BigNumber, isEnought: boolean } -type RelayerJobData = WithdrawalData & { id: string, status: JobStatus, type: RelayerJobType } - -type RelayerJobReturn = any +export type RelayerJobData = + WithdrawalData + & { id: string, status: JobStatus, type: RelayerJobType, txHash?: string, confirmations?: number } +export type RelayerJobReturn = any export type RelayerProcessor = Processor export type PriceProcessor = Processor @@ -80,14 +81,20 @@ export class RelayerQueueHelper { get queue() { if (!this._queue) { - this._queue = new Queue(this.config.queueName, { connection: this.store.client }); + this._queue = new Queue(this.config.queueName, { + connection: this.store.client, + defaultJobOptions: { stackTraceLimit: 100 }, + }); } return this._queue; } get worker() { if (!this._worker) { - this._worker = new Worker(this.config.queueName, relayerProcessor, { connection: this.store.client }); + this._worker = new Worker(this.config.queueName, relayerProcessor, { + connection: this.store.client, + concurrency: 1, + }); } return this._worker; } diff --git a/src/queue/price.processor.ts b/src/queue/price.processor.ts index 4fcff02..fc944c7 100644 --- a/src/queue/price.processor.ts +++ b/src/queue/price.processor.ts @@ -4,6 +4,5 @@ import { PriceProcessor } from './index'; export const priceProcessor: PriceProcessor = async (job) => { const priceService = getPriceService(); const result = await priceService.fetchPrices(job.data); - console.log('priceProcessor', result); return await priceService.savePrices(result); }; diff --git a/src/queue/relayer.processor.ts b/src/queue/relayer.processor.ts index 2f09662..11350d4 100644 --- a/src/queue/relayer.processor.ts +++ b/src/queue/relayer.processor.ts @@ -3,14 +3,21 @@ import { getTxService } from '../services'; import { JobStatus } from '../types'; export const relayerProcessor: RelayerProcessor = async (job) => { - await job.update({ ...job.data, status: JobStatus.ACCEPTED }); - console.log(`Start processing a new ${job.data.type} job ${job.id}`); + try { + await job.update({ ...job.data, status: JobStatus.ACCEPTED }); + console.log(`Start processing a new ${job.data.type} job ${job.id}`); + + const txService = getTxService(); + txService.currentJob = job; + const withdrawalData = job.data; + await txService.checkTornadoFee(withdrawalData); + const txData = await txService.prepareTxData(withdrawalData); + const receipt = await txService.sendTx(txData); + return receipt; + } catch (e) { + console.log(e); + await job.update({ ...job.data, status: JobStatus.FAILED }); + throw new Error(e.message); + } - const txService = getTxService(); - const withdrawalData = job.data; - await txService.checkTornadoFee(withdrawalData); - const txData = await txService.prepareTxData(withdrawalData); - const receipt = await txService.sendTx(txData); - console.log(receipt); - return receipt; }; diff --git a/src/queue/worker.ts b/src/queue/worker.ts index bf0c9b1..64a85e3 100644 --- a/src/queue/worker.ts +++ b/src/queue/worker.ts @@ -17,8 +17,9 @@ export const schedulerWorker = async () => { export const relayerWorker = async () => { await configService.init(); const relayer = new RelayerQueueHelper(); + console.log(relayer.queue.name, 'worker started'); relayer.worker.on('completed', (job, result) => { - console.log(`Job ${job.id} completed with result: ${result}`); + console.log(`Job ${job.id} completed with result: `, result); }); relayer.worker.on('failed', (job, error) => console.log(error)); }; diff --git a/src/services/config.service.ts b/src/services/config.service.ts index 012edb6..4c95942 100644 --- a/src/services/config.service.ts +++ b/src/services/config.service.ts @@ -12,7 +12,6 @@ type relayerQueueName = `relayer_${availableIds}` @singleton() export class ConfigService { - static instance: ConfigService; netIdKey: netIds; queueName: relayerQueueName; tokens: Token[]; @@ -35,7 +34,6 @@ export class ConfigService { this.instances = instances[this.netIdKey]; this.provider = getProvider(false); this.wallet = new Wallet(this.privateKey, this.provider); - console.log(this.wallet.address); this._fillInstanceMap(); } @@ -43,10 +41,6 @@ export class ConfigService { return this._proxyContract; } - get proxyAddress(): string { - return this._proxyAddress; - } - private _fillInstanceMap() { if (!this.instances) throw new Error('config mismatch, check your environment variables'); // TODO diff --git a/src/worker.ts b/src/worker.ts index a16611b..a4d5a92 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,3 +1,4 @@ -import { schedulerWorker } from './queue/worker'; +import { schedulerWorker, relayerWorker } from './queue/worker'; schedulerWorker(); +relayerWorker();