diff --git a/.env.example b/.env.example index 2a07bf1..b8a3dda 100644 --- a/.env.example +++ b/.env.example @@ -22,3 +22,6 @@ CONFIRMATIONS=4 MAX_GAS_PRICE=1000 BASE_FEE_RESERVE_PERCENTAGE=25 AGGREGATOR=0x8cb1436F64a3c33aD17bb42F94e255c4c0E871b2 +# Telegram bot alerts +TELEGRAM_NOTIFIER_BOT_TOKEN= +TELEGRAM_NOTIFIER_CHAT_ID= diff --git a/docker-compose.yml b/docker-compose.yml index f79faf2..046e810 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,50 +2,23 @@ version: '2' services: server: - image: tornadocash/relayer:mining + image: tornadocash/relayer restart: always command: server env_file: .env environment: REDIS_URL: redis://redis/0 nginx_proxy_read_timeout: 600 - depends_on: [redis] - - treeWatcher: - image: tornadocash/relayer:mining - restart: always - command: treeWatcher - env_file: .env - environment: - REDIS_URL: redis://redis/0 - depends_on: [redis] - - priceWatcher: - image: tornadocash/relayer:mining - restart: always - command: priceWatcher - env_file: .env - environment: - REDIS_URL: redis://redis/0 - depends_on: [redis] - - healthWatcher: - image: tornadocash/relayer:mining - restart: always - command: healthWatcher - env_file: .env - environment: - REDIS_URL: redis://redis/0 - depends_on: [redis] + depends_on: [ redis ] worker1: - image: tornadocash/relayer:mining + image: tornadocash/relayer restart: always command: worker env_file: .env environment: REDIS_URL: redis://redis/0 - depends_on: [redis] + depends_on: [ redis ] # worker2: # image: tornadocash/relayer:mining @@ -92,28 +65,21 @@ services: # TELEGRAM_NOTIFIER_BOT_TOKEN: ... # TELEGRAM_NOTIFIER_CHAT_ID: ... - # # this container will send Telegram notifications if specified address doesn't have enough funds - # monitor_mainnet: - # image: peppersec/monitor_eth - # restart: always - # environment: - # TELEGRAM_NOTIFIER_BOT_TOKEN: ... - # TELEGRAM_NOTIFIER_CHAT_ID: ... - # ADDRESS: '0x0000000000000000000000000000000000000000' - # THRESHOLD: 0.5 # ETH - # RPC_URL: https://mainnet.infura.io - # BLOCK_EXPLORER: etherscan.io + redis: image: redis restart: always - command: [redis-server, --appendonly, 'yes'] + command: [ redis-server, '/usr/local/etc/redis/redis.conf', --appendonly, 'yes', ] + ports: + - '6379:6379' volumes: + - ./redis.conf:/usr/local/etc/redis/redis.conf - redis:/data nginx: image: nginx:alpine - container_name: nginx + # container_name: nginx restart: always ports: - 80:80 diff --git a/redis.conf b/redis.conf new file mode 100644 index 0000000..ba96fee --- /dev/null +++ b/redis.conf @@ -0,0 +1,5 @@ +timeout 0 +tcp-keepalive 0 +databases 1 +save 60 100000 +notify-keyspace-events KAE diff --git a/src/app/index.ts b/src/app/index.ts index 0feb50d..b49a1fd 100644 --- a/src/app/index.ts +++ b/src/app/index.ts @@ -3,7 +3,7 @@ import createServer from './server'; import { utils } from 'ethers'; import { port, rewardAccount } from '../config'; import { version } from '../../package.json'; -import { configService, getJobService } from '../services'; +import { configService, getJobService, getNotifierService } from '../services'; if (!utils.isAddress(rewardAccount)) { @@ -14,6 +14,8 @@ server.listen(port, '0.0.0.0', async (err, address) => { if (err) throw err; await configService.init(); await getJobService().setupRepeatableJobs(); + await getNotifierService().subscribe(); + console.log(`Relayer ${version} started on port ${address}`); }); diff --git a/src/app/routes.ts b/src/app/routes.ts index 1b36dcb..1d5f07a 100644 --- a/src/app/routes.ts +++ b/src/app/routes.ts @@ -2,14 +2,14 @@ import { FastifyInstance } from 'fastify'; import { jobsSchema, statusSchema, withdrawBodySchema, withdrawSchema } from './schema'; import { FromSchema } from 'json-schema-to-ts'; import { rewardAccount, tornadoServiceFee } from '../config'; -import { version } from '../../package.json'; -import { configService, getJobService, getPriceService } from '../services'; +import { configService, getHealthService, getJobService, getPriceService } from '../services'; import { RelayerJobType } from '../types'; export function mainHandler(server: FastifyInstance, options, next) { const jobService = getJobService(); const priceService = getPriceService(); + const healthService = getHealthService(); server.get('/', async (req, res) => { @@ -23,6 +23,7 @@ export function mainHandler(server: FastifyInstance, options, next) { async (req, res) => { const ethPrices = await priceService.getPrices(); const currentQueue = await jobService.getQueueCount(); + const errorsLog = await healthService.getErrors(); console.log(currentQueue); res.send({ rewardAccount, @@ -31,10 +32,11 @@ export function mainHandler(server: FastifyInstance, options, next) { ethPrices, tornadoServiceFee, miningServiceFee: 0, - version, + version: '4.5.0', health: { - status: true, + status: 'true', error: '', + errorsLog }, currentQueue, }); diff --git a/src/config.ts b/src/config.ts index 6416ba1..e1ad66d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -25,7 +25,7 @@ export const gasLimits = { [RelayerJobType.MINING_WITHDRAW]: 400000, }; export const minimumBalance = '1000000000000000000'; -export const minimumTornBalance = '50000000000000000000'; +export const minimumTornBalance = '30000000000000000000'; export const baseFeeReserve = Number(process.env.BASE_FEE_RESERVE_PERCENTAGE); export const tornToken = { tokenAddress: '0x77777FeDdddFfC19Ff86DB637967013e6C6A116C', diff --git a/src/modules/redis.ts b/src/modules/redis.ts index 76d6ebe..9c7c0d0 100644 --- a/src/modules/redis.ts +++ b/src/modules/redis.ts @@ -6,6 +6,13 @@ const getNewInstance: () => Redis = () => new IORedis(redisUrl, { maxRetriesPerR @singleton() export class RedisStore { + get publisher(): Redis { + if (!this._publisher) { + this._publisher = getNewInstance(); + } + return this._publisher; + } + get client() { if (!this._client) { this._client = getNewInstance(); @@ -20,8 +27,9 @@ export class RedisStore { return this._subscriber; } - _subscriber: Redis; - _client: Redis; + private _subscriber: Redis; + private _publisher: Redis; + private _client: Redis; } diff --git a/src/queue/health.processor.ts b/src/queue/health.processor.ts new file mode 100644 index 0000000..49d5864 --- /dev/null +++ b/src/queue/health.processor.ts @@ -0,0 +1,8 @@ +import { getHealthService } from '../services'; +import { Processor } from 'bullmq'; + +export const healthProcessor: Processor = async () => { + const healthService = getHealthService(); + await healthService.check(); + } +; diff --git a/src/queue/index.ts b/src/queue/index.ts index eb120b5..3cde24b 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,16 +1,18 @@ import { Processor, Queue, QueueScheduler, Worker } from 'bullmq'; import { JobStatus, RelayerJobType, Token } from '../types'; import { WithdrawalData } from '../services/tx.service'; -import { BigNumber } from 'ethers'; import { priceProcessor } from './price.processor'; import { autoInjectable } from 'tsyringe'; import { RedisStore } from '../modules/redis'; import { ConfigService } from '../services/config.service'; import { relayerProcessor } from './relayer.processor'; +import { healthProcessor } from './health.processor'; type PriceJobData = Token[] type PriceJobReturn = number -type HealthJobReturn = { balance: BigNumber, isEnought: boolean } + +type HealthJobReturn = void +type HealthJobData = null export type RelayerJobData = WithdrawalData @@ -109,4 +111,51 @@ export class RelayerQueueHelper { } +@autoInjectable() +export class HealthQueueHelper { + + private _queue: Queue; + private _worker: Worker; + private _scheduler: QueueScheduler; + + constructor(private store?: RedisStore, private config?: ConfigService) { + } + + get scheduler(): QueueScheduler { + if (!this._scheduler) { + this._scheduler = new QueueScheduler('health', { connection: this.store.client }); + } + return this._scheduler; + } + + get worker() { + if (!this._worker) { + this._worker = new Worker('health', healthProcessor, { + connection: this.store.client, + concurrency: 1, + }); + } + return this._worker; + } + + get queue() { + if (!this._queue) { + this._queue = new Queue('health', { + connection: this.store.client, + defaultJobOptions: { stackTraceLimit: 100 }, + }); + } + return this._queue; + } + + async addRepeatable() { + await this.queue.add('checkHealth', null, { + repeat: { + every: 30000, + immediately: true, + }, + }); + } + +} diff --git a/src/queue/price.processor.ts b/src/queue/price.processor.ts index fc944c7..924ee74 100644 --- a/src/queue/price.processor.ts +++ b/src/queue/price.processor.ts @@ -4,5 +4,7 @@ import { PriceProcessor } from './index'; export const priceProcessor: PriceProcessor = async (job) => { const priceService = getPriceService(); const result = await priceService.fetchPrices(job.data); - return await priceService.savePrices(result); + if (result) return await priceService.savePrices(result); + return null; }; + diff --git a/src/queue/scheduler.processor.ts b/src/queue/scheduler.processor.ts deleted file mode 100644 index 6b53b55..0000000 --- a/src/queue/scheduler.processor.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { configService } from '../services'; -import { Processor } from 'bullmq'; - -export const checkBalance: Processor = async (job) => { - return await configService.getBalance(); -}; diff --git a/src/queue/worker.ts b/src/queue/worker.ts index 64a85e3..8e91202 100644 --- a/src/queue/worker.ts +++ b/src/queue/worker.ts @@ -1,11 +1,12 @@ import 'reflect-metadata'; -import { PriceQueueHelper, RelayerQueueHelper } from './'; -import { configService } from '../services'; +import { HealthQueueHelper, PriceQueueHelper, RelayerQueueHelper } from './'; +import { configService, getHealthService } from '../services'; -export const schedulerWorker = async () => { +export const priceWorker = async () => { await configService.init(); const price = new PriceQueueHelper(); + price.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev })); console.log('price worker', price.queue.name); price.worker.on('active', () => console.log('worker active')); price.worker.on('completed', async (job, result) => { @@ -17,9 +18,24 @@ export const schedulerWorker = async () => { export const relayerWorker = async () => { await configService.init(); const relayer = new RelayerQueueHelper(); + const healthService = getHealthService(); console.log(relayer.queue.name, 'worker started'); relayer.worker.on('completed', (job, result) => { console.log(`Job ${job.id} completed with result: `, result); }); - relayer.worker.on('failed', (job, error) => console.log(error)); + relayer.worker.on('failed', (job, error) => { + healthService.saveError(error); + console.log(error); + }); +}; + +export const healthWorker = async () => { + await configService.init(); + const health = new HealthQueueHelper(); + health.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev })); + console.log(health.queue.name, 'worker started'); + health.worker.on('completed', (job, result) => { + console.log(`Job ${job.id} completed with result: `, result); + }); + health.worker.on('failed', (job, error) => console.log(error)); }; diff --git a/src/sandbox.ts b/src/sandbox.ts new file mode 100644 index 0000000..0370d15 --- /dev/null +++ b/src/sandbox.ts @@ -0,0 +1,15 @@ +import 'reflect-metadata'; +import { configService, getHealthService } from './services'; + +(async () => { + + try { + await configService.init(); + const healthService = getHealthService(); + console.log(healthService); + } catch (e) { + console.error('Top level catch', e); + } + +})(); + diff --git a/src/services/config.service.ts b/src/services/config.service.ts index e1ca71f..9454ab9 100644 --- a/src/services/config.service.ts +++ b/src/services/config.service.ts @@ -20,11 +20,10 @@ import { import { resolve } from '../modules'; import { ERC20Abi, ProxyLightABI, TornadoProxyABI } from '../../contracts'; import { availableIds, netIds, NetInstances } from '../../../torn-token'; -import { formatEther, getAddress } from 'ethers/lib/utils'; -import { providers, Wallet } from 'ethers'; +import { getAddress } from 'ethers/lib/utils'; +import { BigNumber, providers, Wallet } from 'ethers'; import { container, singleton } from 'tsyringe'; import { GasPrice } from 'gas-price-oracle/lib/types'; -import { configService } from './index'; type relayerQueueName = `relayer_${availableIds}` @@ -48,6 +47,7 @@ export class ConfigService { fallbackGasPrices: GasPrice; private _tokenAddress: string; private _tokenContract: ERC20Abi; + balances: { MAIN: { warn: string; critical: string; }; TORN: { warn: string; critical: string; }; }; constructor() { @@ -57,6 +57,10 @@ export class ConfigService { this.instances = instances[this.netIdKey]; this.provider = getProvider(false); this.wallet = new Wallet(this.privateKey, this.provider); + this.balances = { + MAIN: { warn: BigNumber.from(minimumBalance).mul(150).div(100).toString(), critical: minimumBalance }, + TORN: { warn: BigNumber.from(minimumTornBalance).mul(2).toString(), critical: minimumTornBalance }, + }; this._fillInstanceMap(); } @@ -64,21 +68,19 @@ export class ConfigService { return this._proxyContract; } + get tokenContract(): ERC20Abi { + return this._tokenContract; + } + private _fillInstanceMap() { if (!this.instances) throw new Error('config mismatch, check your environment variables'); // TODO for (const [currency, { instanceAddress, symbol, decimals }] of Object.entries(this.instances)) { - Object.entries(instanceAddress).forEach(([amount, address]) => { - if (address) { - this.addressMap.set(getAddress(address), { - currency, - amount, - symbol, - decimals, - }); - } - }, - ); + for (const [amount, address] of Object.entries(instanceAddress)) { + if (address) this.addressMap.set(getAddress(address), { + currency, amount, symbol, decimals, + }); + } } } @@ -104,6 +106,7 @@ export class ConfigService { this.fallbackGasPrices = gasPrices; } else { this._proxyAddress = tornadoGoerliProxy; + this.nativeCurrency = 'eth'; if (this.netId === 1) { this._proxyAddress = await resolve(torn.tornadoRouter.address); } @@ -117,18 +120,12 @@ export class ConfigService { decimals: el.decimals, symbol: el.symbol, })).filter(Boolean); - const { balance } = await configService.getBalance(); - const { balance: tornBalance } = await configService.getTornBalance(); console.log( 'Configuration completed\n', `-- netId: ${this.netId}\n`, `-- rpcUrl: ${this.rpcUrl}\n`, `-- relayer Address: ${this.wallet.address}\n`, - `-- relayer Balance: ${formatEther(balance)}\n`, - `-- relayer Torn balance: ${formatEther(tornBalance)}\n`, ); - - this.isInit = true; } catch (e) { console.error(`${this.constructor.name} Error:`, e.message); @@ -139,18 +136,6 @@ export class ConfigService { return this.addressMap.get(getAddress(address)); } - async getBalance() { - const balance = await this.wallet.getBalance(); - const isEnougth = balance.gt(minimumBalance); - return { balance, isEnougth }; - } - - async getTornBalance() { - const balance = await this._tokenContract.balanceOf(this.wallet.address); - const isEnougth = balance.gt(minimumTornBalance); - return { balance, isEnougth }; - } - } type InstanceProps = { diff --git a/src/services/health.service.ts b/src/services/health.service.ts new file mode 100644 index 0000000..1dabbea --- /dev/null +++ b/src/services/health.service.ts @@ -0,0 +1,70 @@ +import { autoInjectable, container } from 'tsyringe'; +import { ConfigService } from './config.service'; +import { RedisStore } from '../modules/redis'; +import { formatEther } from 'ethers/lib/utils'; + +@autoInjectable() +export class HealthService { + constructor(private config: ConfigService, private store: RedisStore) { + } + + async clearErrors() { + await this.store.client.del('errors'); + } + + async getErrors(): Promise<{ message: string, score: number }[]> { + const set = await this.store.client.zrevrange('errors', 0, -1, 'WITHSCORES'); + const errors = []; + while (set.length) { + const [message, score] = set.splice(0, 2); + errors.push({ message, score }); + } + return errors; + } + + async saveError(e) { + await this.store.client.zadd('errors', 'INCR', 1, e.message); + } + + private async _checkBalance(value, currency: 'MAIN' | 'TORN') { + let level = 'OK'; + const type = 'BALANCE'; + const key = 'alerts'; + const time = new Date().getTime(); + if (value.lt(this.config.balances[currency].critical)) { + level = 'CRITICAL'; + } else if (value.lt(this.config.balances[currency].warn)) { + level = 'WARN'; + } + + const isSent = await this.store.client.sismember(`${key}:sent`, `${type}_${currency}_${level}`); + if (!isSent) { + const alert = { + type: `${type}_${currency}_${level}`, + message: `Insufficient balance ${formatEther(value)} ${currency === 'MAIN' ? this.config.nativeCurrency : 'torn'}`, + level, + time, + }; + await this.store.client.rpush(key, JSON.stringify(alert)); + } + + } + + async check() { + const mainBalance = await this.config.wallet.getBalance(); + const tornBalance = await this.config.tokenContract.balanceOf(this.config.wallet.address); + // const mainBalance = BigNumber.from(`${2e18}`).add(1); + // const tornBalance = BigNumber.from(`${50e18}`); + await this._checkBalance(mainBalance, 'MAIN'); + await this._checkBalance(tornBalance, 'TORN'); + } + +} + +type HealthData = { + status: boolean, + error: string, + errorsLog: { message: string, score: number }[] +} + +export default () => container.resolve(HealthService); diff --git a/src/services/index.ts b/src/services/index.ts index e1f1064..8c1a200 100644 --- a/src/services/index.ts +++ b/src/services/index.ts @@ -2,3 +2,6 @@ export { default as configService } from './config.service'; export { default as getPriceService } from './price.service'; export { default as getJobService } from './job.service'; export { default as getTxService } from './tx.service'; +export { default as getNotifierService } from './notifier.service'; +export { default as getHealthService } from './health.service'; + diff --git a/src/services/job.service.ts b/src/services/job.service.ts index 441bdf0..d09eac1 100644 --- a/src/services/job.service.ts +++ b/src/services/job.service.ts @@ -1,13 +1,16 @@ import { v4 } from 'uuid'; import { JobStatus, RelayerJobType } from '../types'; -import { PriceQueueHelper, RelayerQueueHelper } from '../queue'; +import { HealthQueueHelper, PriceQueueHelper, RelayerQueueHelper } from '../queue'; import { WithdrawalData } from './tx.service'; import { container, injectable } from 'tsyringe'; import { ConfigService } from './config.service'; @injectable() export class JobService { - constructor(private price?: PriceQueueHelper, private relayer?: RelayerQueueHelper, public config?: ConfigService) { + constructor(private price?: PriceQueueHelper, + private relayer?: RelayerQueueHelper, + private health?: HealthQueueHelper, + public config?: ConfigService) { } async postJob(type: RelayerJobType, data: WithdrawalData) { @@ -35,13 +38,20 @@ export class JobService { } private async _clearSchedulerJobs() { - const jobs = await this.price.queue.getJobs(); - await Promise.all(jobs.map(job => job.remove())); + try { + + + const jobs = await Promise.all([this.price.queue.getJobs(), this.health.queue.getJobs()]); + await Promise.all(jobs.flat().map(job => job?.remove())); + } catch (e) { + console.log(e); + } } async setupRepeatableJobs() { await this._clearSchedulerJobs(); await this.price.addRepeatable(this.config.tokens); + await this.health.addRepeatable(); // await this.schedulerQ.add('checkBalance', null, { // repeat: { // every: 30000, diff --git a/src/services/notifier.service.ts b/src/services/notifier.service.ts index 37e2885..43ed455 100644 --- a/src/services/notifier.service.ts +++ b/src/services/notifier.service.ts @@ -1,5 +1,23 @@ import { Telegram } from 'telegraf'; -import { autoInjectable } from 'tsyringe'; +import { autoInjectable, container } from 'tsyringe'; +import { RedisStore } from '../modules/redis'; + +export type Levels = keyof typeof AlertLevel + +export enum AlertLevel { + 'INFO' = 'ℹ️️', + 'WARN' = '⚠️', + 'CRITICAL' = '‼️', + 'ERROR' = '💩', + 'RECOVERED' = '✅' +} + +export enum AlertType { + 'INSUFFICIENT_BALANCE', + 'INSUFFICIENT_TORN_BALANCE', + 'RPC' + +} @autoInjectable() export class NotifierService { @@ -7,16 +25,44 @@ export class NotifierService { private readonly token: string; private readonly chatId: string; - constructor() { + constructor(private store: RedisStore) { this.token = process.env.TELEGRAM_NOTIFIER_BOT_TOKEN || ''; this.chatId = process.env.TELEGRAM_NOTIFIER_CHAT_ID || ''; this.telegram = new Telegram(this.token); + } - send(message: string) { + async processAlert(message: string) { + const alert = JSON.parse(message); + const [a, b] = alert.type.split('_'); + if (alert.level === 'OK') { + this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map(l => `${a}_${b}_${l}`)); + } else { + await this.send(alert.message, alert.level); + this.store.client.sadd('alerts:sent', alert.type); + } + } + + async subscribe() { + const sub = await this.store.subscriber; + sub.subscribe('__keyspace@0__:alerts', 'rpush'); + sub.on('message', async (channel, event) => { + if (event === 'rpush') { + const messages = await this.store.client.brpop('alerts', 10); + while (messages.length) { + const [, message] = messages.splice(0, 2); + await this.processAlert(message); + } + } + }); + } + + send(message: string, level: Levels) { + const text = `${AlertLevel[level]} ${message}`; + console.log('sending message: ', text); return this.telegram.sendMessage( this.chatId, - message, + text, { parse_mode: 'HTML' }, ); } @@ -32,3 +78,5 @@ export class NotifierService { return this.telegram.getMe(); } } + +export default () => container.resolve(NotifierService); diff --git a/src/services/price.service.ts b/src/services/price.service.ts index 5aebdf6..fb7738b 100644 --- a/src/services/price.service.ts +++ b/src/services/price.service.ts @@ -27,24 +27,28 @@ export class PriceService { } async fetchPrices(tokens: Token[]) { - const names = tokens.reduce((p, c) => { - p[c.address] = c.symbol.toLowerCase(); - return p; - }, {}); - const callData = this.prepareCallData(tokens); - const { results, success } = await this.multiCall.multicall(callData); - const prices: Record = {}; - for (let i = 0; i < results.length; i++) { - if (!success[i]) { - continue; + try { + const names = tokens.reduce((p, c) => { + p[c.address] = c.symbol.toLowerCase(); + return p; + }, {}); + const callData = this.prepareCallData(tokens); + const { results, success } = await this.multiCall.multicall(callData); + const prices: Record = {}; + for (let i = 0; i < results.length; i++) { + if (!success[i]) { + continue; + } + const decodedRate = defaultAbiCoder.decode(['uint256'], results[i]).toString(); + const numerator = BigNumber.from(10).pow(tokens[i].decimals); + const denominator = BigNumber.from(10).pow(18); // eth decimals + const price = BigNumber.from(decodedRate).mul(numerator).div(denominator); + prices[names[tokens[i].address]] = price.toString(); } - const decodedRate = defaultAbiCoder.decode(['uint256'], results[i]).toString(); - const numerator = BigNumber.from(10).pow(tokens[i].decimals); - const denominator = BigNumber.from(10).pow(18); // eth decimals - const price = BigNumber.from(decodedRate).mul(numerator).div(denominator); - prices[names[tokens[i].address]] = price.toString(); + return prices; + } catch (e) { + console.log(e); } - return prices; } async getPrice(currency: string) { diff --git a/src/worker.ts b/src/worker.ts index a4d5a92..63fb800 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,4 +1,5 @@ -import { schedulerWorker, relayerWorker } from './queue/worker'; +import { priceWorker, relayerWorker, healthWorker } from './queue/worker'; -schedulerWorker(); +priceWorker(); relayerWorker(); +healthWorker();