// Externals types import { MarkOptional, DeepRequired } from 'ts-essentials' // External modules import EventEmitter from 'events' import { randomBytes, sign } from 'crypto' import { TransactionRequest } from '@ethersproject/abstract-provider' import { EventFilter, BaseContract, BigNumber, ContractTransaction, providers, Signer, VoidSigner } from 'ethers' // Our local types import { ERC20Tornado, ERC20Tornado__factory, ETHTornado, ETHTornado__factory, TornadoInstance, TornadoInstance__factory, TornadoProxy__factory, TornadoProxy, ERC20, ERC20__factory, RelayerRegistry, RelayerRegistry__factory, Multicall3Contract__factory } from './deth' import { Multicall3 } from './deth/Multicall3Contract' // Local modules import { Onchain, Cache, Docs, Options as DataOptions } from '@tornado/sdk-data' import { ErrorUtils, HexUtils, AsyncUtils } from '@tornado/sdk-utils' // @ts-ignore import { parseIndexableString } from 'pouchdb-collate' // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ DECLARATIONS (MUST BE INLINED) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ export type TornadoContracts = | TornadoInstance | TornadoProxy | ETHTornado | ERC20Tornado | ERC20 | RelayerRegistry export namespace Options { export interface Sync { startBlock?: number targetBlock?: number blockDelta?: number blockDivisor?: number concurrencyLimit?: number msTimeout?: number listenForEvents?: boolean } export type Cache = DataOptions.Cache } // We use a vanilla provider here, but in reality we will probably // add a censorship-checking custom derivative of it type Provider = providers.Provider // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ REST ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * The Chain class stores Tornado-agnostic chain data and also * handles such interactions. */ export class Chain { public provider: Provider private _emptySigner: VoidSigner private _signer?: Signer private _chainId?: number private _symbol?: string private _fetched: boolean constructor(provider: Provider, signer?: Signer) { this.provider = provider this._signer = signer this._emptySigner = new VoidSigner('0x' + randomBytes(20).toString('hex'), provider) this._fetched = false } connectSigner(signer: Signer): void { this._signer = signer } async fetchChainData(): Promise { const network = await this.provider.getNetwork() this._chainId = network.chainId this._symbol = await Onchain.getNetworkSymbol(String(network.chainId)) this._fetched = true } private _signerConnected(parentCallName: string): void { if (!this._signer) throw ErrorUtils.getError(`Chain.${parentCallName}: signer must be connected!`) } private _propertiesFetched(parentCallName: string): void { if (!this._fetched) throw ErrorUtils.getError( `Chain.${parentCallName}: properties must be fetched first with \`fetchProperties\`.` ) } get signer(): Signer { this._signerConnected('signer') return this._signer! } get id(): number { this._propertiesFetched('id') return this._chainId! } get symbol(): string { this._propertiesFetched('symbol') return this._symbol! } latestBlockNum(): Promise { return this.provider.getBlockNumber() } getAccountBalance(account: string): Promise { return this.provider.getBalance(account) } getGasPrice(): Promise { return this.provider.getGasPrice() } getTokenContract(tokenAddress: string): ERC20 { return Contracts.getToken(tokenAddress, this._signer ?? this.provider) } async getTokenDecimals(token: string): Promise { let treq = { to: token, data: '0x313ce567' } return BigNumber.from(await this._emptySigner.call(treq)) } async getTokenBalance(account: string, token: string, normalized: boolean = false): Promise { let treq = { to: token, data: '0x70a08231000000000000000000000000' + HexUtils.prepareAddress(account) } let divisor = normalized ? BigNumber.from(10).pow(await this.getTokenDecimals(token)) : 1 return BigNumber.from(await this._emptySigner.call(treq)).div(divisor) } async populateBatchCall( callStruct: Array> ): Promise { if (callStruct[0].value) return await Multicall3Contract__factory.connect( await Onchain.getMulticall3Address(String(this.id)), this.provider ).populateTransaction.aggregate3Value(callStruct as Array) return await Multicall3Contract__factory.connect( await Onchain.getMulticall3Address(String(this.id)), this.provider ).populateTransaction.aggregate3(callStruct) } async batchCall( callStruct: Array> ): Promise { if (this._signer) if (callStruct[0].value) return await Multicall3Contract__factory.connect( await Onchain.getMulticall3Address(String(this.id)), this._signer ).aggregate3Value(callStruct as Array) else { return await Multicall3Contract__factory.connect( await Onchain.getMulticall3Address(String(this.id)), this.provider ).aggregate3(callStruct) } else throw ErrorUtils.getError('Chain.batchCall: no signer provided.') } } /** * This is Tornado-specific. */ export namespace Contracts { function _getContract( name: string, address: string, signerOrProvider: Signer | Provider ): C { if (name == 'TornadoInstance') { return TornadoInstance__factory.connect(address, signerOrProvider) as C } else if (name == 'TornadoProxy') { return TornadoProxy__factory.connect(address, signerOrProvider) as C } else if (name == 'ETHTornado') { return ETHTornado__factory.connect(address, signerOrProvider) as C } else if (name == 'ERC20Tornado') { return ERC20Tornado__factory.connect(address, signerOrProvider) as C } else if (name == 'RelayerRegistry') { return RelayerRegistry__factory.connect(address, signerOrProvider) as C } else { return ERC20__factory.connect(address, signerOrProvider) as C } } type Path = string const contractMap: Map = new Map() export function getProxy(network: string, signerOrProvider: Signer | Provider): TornadoProxy { const key = `TornadoProxy${network}` if (!contractMap.has(key)) { contractMap.set( key, _getContract('TornadoProxy', Onchain.getProxyAddressSync(network), signerOrProvider) ) } return contractMap.get(`TornadoProxy${network}`) as TornadoProxy } export function getRegistry(signerOrProvider: Signer | Provider, network?: string): RelayerRegistry { const key = `RelayerRegistry${network}` if (!contractMap.has(key)) contractMap.set( key, _getContract( 'RelayerRegistry', Onchain.getRegistryAddressSync(network), signerOrProvider ) ) return contractMap.get(key) as RelayerRegistry } export function getInstance( network: string, token: string, denomination: string, signerOrProvider: Signer | Provider ): TornadoInstance { const key = `TornadoInstance${network}${token}${denomination}` if (!contractMap.has(key)) { contractMap.set( key, _getContract( 'TornadoInstance', Onchain.getInstanceAddressSync(network, token, denomination), signerOrProvider ) ) } return contractMap.get(key) as TornadoInstance } export function getToken(tokenAddress: string, signerOrProvider: Signer | Provider): ERC20 { if (!contractMap.has(tokenAddress)) contractMap.set(tokenAddress, _getContract('ERC20', tokenAddress, signerOrProvider)) return contractMap.get(tokenAddress) as ERC20 } export function getTornToken(signerOrProvider: Signer | Provider): ERC20 { const key = '$TORN' if (!contractMap.has(key)) { contractMap.set( key, _getContract('ERC20', '0x77777feddddffc19ff86db637967013e6c6a116c', signerOrProvider) ) } return contractMap.get(key) as ERC20 } } export abstract class Synchronizer extends EventEmitter { caches: Map> constructor() { super() this.caches = new Map>() } async sync( eventName: string, filter: EventFilter, contract: BaseContract, cache: Cache.Syncable, options?: Options.Sync ): Promise { const _options = await this._populateSyncOptions(options ?? {}) // Assign pooler cache.initializePooler(cache.getCallbacks(contract), cache.getErrorHandlers(), _options.concurrencyLimit) // Decide whether we have a latest block const numEntries = (await cache.db.info()).doc_count // Check for synced blocks if (0 < numEntries) { const [lastSyncedBlock, ,] = parseIndexableString( (await cache.db.allDocs({ descending: true, limit: 1 })).rows[0].id ) _options.startBlock = lastSyncedBlock < _options.startBlock ? _options.startBlock : lastSyncedBlock _options.blockDelta = _options.blockDivisor ? Math.floor((_options.targetBlock - _options.startBlock) / _options.blockDivisor) : _options.blockDelta if (10_000 < _options.blockDelta) throw ErrorUtils.getError('Synchronizer.sync: blockDelta must not be above 10,000!') } // Start synchronizing let dbPromises = [] this.emit('debug', _options.startBlock, _options.targetBlock, _options.blockDelta) this.emit('sync', 'syncing') for ( let currentBlock = _options.startBlock, blockDelta = _options.blockDelta, targetBlock = _options.targetBlock, concurrencyLimit = _options.concurrencyLimit; currentBlock < targetBlock; currentBlock += blockDelta ) { if (cache.pooler!.pending < concurrencyLimit) { const sum = currentBlock + blockDelta await AsyncUtils.timeout(_options.msTimeout) if (currentBlock + blockDelta < targetBlock) { await cache.pooler!.pool(currentBlock, sum) } else { await cache.pooler!.pool(currentBlock, sum - (sum % targetBlock)) } this.emit('debug', currentBlock++, sum) } else { let res: Array = await cache.pooler!.race() if (res.length != 0) dbPromises.push( cache.db.bulkDocs(res.map((el) => cache.buildDoc(el))).catch((err) => { throw ErrorUtils.ensureError(err) }) ) currentBlock -= blockDelta } } this.emit('sync', 'synced') // Immediately start listening if we're doing this if (_options.listenForEvents) this.listenForEvents(eventName, contract, filter, cache) // Then wait for all pooler requests to resolve let results = await cache.pooler!.all() // Then transform them, we know the shape in forward results = results.reduce((res: any[], response: any[]) => { if (response[0]) response.forEach((el: any) => res.push(cache.buildDoc(el))) return res }, []) // Then wait for old dbPromises to resolve await Promise.all(dbPromises) // Add the last docs await cache.db.bulkDocs(results).catch((err) => { throw ErrorUtils.ensureError(err) }) // Add cache if not present if (!this.caches.has(cache.name)) this.caches.set(cache.name, cache) } loadCache = Cache.Syncable>(name: string): C { return this.caches.get(name) as C } listenForEvents( name: string, contract: BaseContract, filter: EventFilter, cache: Cache.Syncable ) { contract.on(filter, (...eventArgs) => { this.emit(name, cache.name, cache.db.put(cache.buildDoc(eventArgs[eventArgs.length - 1]))) }) } clearListenerByIndex(contract: BaseContract, event: EventFilter, listenerIndex: number = 0): void { const listeners = contract.listeners() contract.off(event, listeners[listenerIndex]) } protected async _populateSyncOptions(options: Options.Sync): Promise> { if (!options.targetBlock) throw ErrorUtils.getError('Synchronizer._populateSyncOptions: targetBlock not set.') if (!options.startBlock) throw ErrorUtils.getError('Synchronizer._populateSyncOptions: startBlock not set.') options.blockDelta = options.blockDelta ?? (options.blockDivisor ? Math.floor((options.targetBlock - options.startBlock) / options.blockDivisor) : 10000) if (10_000 < options.blockDelta) throw ErrorUtils.getError('Synchronizer._populateSyncOptions: blockDelta must not be above 10,000!') options.concurrencyLimit = options.concurrencyLimit ?? 8 options.msTimeout = options.msTimeout ?? 200 // 5 requests per second options.listenForEvents = options.listenForEvents ?? false return options as DeepRequired } } export function syncErrorHandler( err: Error, numResolvedPromises: number, callbackIndex: number, orderIndex: number, ...args: any[] ): void { err = ErrorUtils.ensureError(err) if (err.message.match('context deadline exceeded')) console.error( ErrorUtils.getError( `Context deadline exceeded, stop if more promises do not resolve. Resolved: ${numResolvedPromises}` ) ) else if (err.message.match('Invalid JSON RPC')) console.error( ErrorUtils.getError('Endpoint returned invalid value (we might be rate limited), retrying.') ) else if (err.message.match('logs.forEach')) console.error(ErrorUtils.getError("Unclear type mismatch error, retrying. If this doesn't stop, cancel.")) else { err.message += `\nCallback args supplied: [${args.join(', ')}]\n` throw err } }