event retry & err conditioning
This commit is contained in:
parent
90ef7362bf
commit
064c481c02
@ -292,6 +292,8 @@ class EventService {
|
||||
i !== 5,
|
||||
i
|
||||
)
|
||||
} else {
|
||||
throw new Error(`Failed to fetch block ${toBlock}`)
|
||||
}
|
||||
}
|
||||
|
||||
@ -310,23 +312,16 @@ class EventService {
|
||||
}
|
||||
}
|
||||
|
||||
createBatchRequest({ batchIndex, batchSize, batchBlocks, blockDenom, type }) {
|
||||
return new Array(batchSize).fill('').map(
|
||||
(_, i) =>
|
||||
createBatchRequest(batchArray) {
|
||||
return batchArray.map(
|
||||
(e, i) =>
|
||||
new Promise(async (resolve) => {
|
||||
const toBlock = batchBlocks[batchIndex * batchSize + i]
|
||||
const fromBlock = toBlock - blockDenom
|
||||
|
||||
const batchEvents = await this.getEventsPartFromRpc(
|
||||
{
|
||||
fromBlock,
|
||||
toBlock,
|
||||
type
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
resolve(batchEvents.events)
|
||||
try {
|
||||
const { events } = await this.getEventsPartFromRpc({ ...e }, true)
|
||||
resolve(events)
|
||||
} catch (e) {
|
||||
resolve({ isFailedBatch: true, ...e })
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
@ -335,30 +330,47 @@ class EventService {
|
||||
try {
|
||||
const batchSize = 10
|
||||
const blockRange = 10000
|
||||
const { blockDifference, currentBlockNumber } = await this.getBlocksDiff({ fromBlock })
|
||||
|
||||
let [events, failed] = [[], []]
|
||||
let lastBlock = fromBlock
|
||||
|
||||
const { blockDifference, currentBlockNumber } = await this.getBlocksDiff({ fromBlock })
|
||||
const batchDigest = blockDifference === 0 ? 1 : Math.ceil(blockDifference / blockRange)
|
||||
|
||||
const blockDenom = Math.ceil(blockDifference / batchDigest)
|
||||
const batchCount = Math.ceil(batchDigest / batchSize)
|
||||
|
||||
const blocks = new Array(batchCount * batchSize).fill('')
|
||||
const batchBlocks = blocks.map((_, i) => (i + 1) * blockDenom + fromBlock)
|
||||
|
||||
let events = []
|
||||
|
||||
if (fromBlock < currentBlockNumber) {
|
||||
this.updateEventProgress(0, type)
|
||||
await this.updateEventProgress(0, type)
|
||||
|
||||
for (let batchIndex = 0; batchIndex < batchCount; batchIndex++) {
|
||||
const batch = await Promise.all(
|
||||
this.createBatchRequest({ batchIndex, batchBlocks, blockDenom, batchSize, type })
|
||||
)
|
||||
const isLastBatch = batchIndex === batchCount - 1
|
||||
const params = new Array(batchSize).fill('').map((_, i) => {
|
||||
const toBlock = (i + 1) * blockDenom + lastBlock
|
||||
const fromBlock = toBlock - blockDenom
|
||||
return { fromBlock, toBlock, type }
|
||||
})
|
||||
const batch = await Promise.all(this.createBatchRequest(params))
|
||||
|
||||
this.updateEventProgress(batchIndex / batchCount, type)
|
||||
events = events.concat(batch)
|
||||
lastBlock = params[batchSize - 1].toBlock
|
||||
events = events.concat(batch.filter((e) => !e.isFailedBatch))
|
||||
failed = failed.concat(batch.filter((e) => e.isFailedBatch))
|
||||
|
||||
const progressIndex = batchIndex - failed.length / batchSize
|
||||
|
||||
if (isLastBatch && failed.length !== 0) {
|
||||
const fbatch = await Promise.all(this.createBatchRequest(failed))
|
||||
const isFailedBatch = fbatch.filter((e) => e.isFailedBatch).length !== 0
|
||||
|
||||
if (isFailedBatch) {
|
||||
throw new Error('Failed to batch events')
|
||||
} else {
|
||||
events = events.concat(fbatch)
|
||||
}
|
||||
}
|
||||
await this.updateEventProgress(progressIndex / batchCount, type)
|
||||
await sleep(200)
|
||||
}
|
||||
|
||||
events = flattenNArray(events)
|
||||
|
||||
return {
|
||||
|
Loading…
x
Reference in New Issue
Block a user