mirror of
https://github.com/autistic-symposium/blockchain-data-engineering-toolkit.git
synced 2025-05-02 14:56:17 -04:00
fix token class
This commit is contained in:
parent
3a73ee5c0f
commit
715d6c95e5
5 changed files with 851933 additions and 114 deletions
|
@ -10,10 +10,8 @@ TOKEN_CONTRACT_ABI =
|
||||||
##### indexing token data
|
##### indexing token data
|
||||||
#########################
|
#########################
|
||||||
|
|
||||||
|
MAX_RETRIES = 4
|
||||||
MAX_RETRIES =
|
SIZE_CHUNK_NEXT = 50000
|
||||||
RETRIES_TIMEOUT =
|
|
||||||
SIZE_CHUNK_NEXT =
|
|
||||||
|
|
||||||
|
|
||||||
#########################
|
#########################
|
||||||
|
|
851837
token-scanner-api/output/raw_data_2023-03-11_18-58-37.json
Normal file
851837
token-scanner-api/output/raw_data_2023-03-11_18-58-37.json
Normal file
File diff suppressed because it is too large
Load diff
|
@ -8,33 +8,28 @@ import datetime
|
||||||
from web3 import Web3
|
from web3 import Web3
|
||||||
from web3.exceptions import BlockNotFound
|
from web3.exceptions import BlockNotFound
|
||||||
from web3.providers.rpc import HTTPProvider
|
from web3.providers.rpc import HTTPProvider
|
||||||
from web3._utils.filters import construct_event_filter_params
|
|
||||||
|
|
||||||
import src.utils.os_utils as os_utils
|
from src.utils.arithmetics import convert_hex_to_int
|
||||||
from src.utils.arithmetics import wei_to_eth, to_decimal
|
from src.utils.os_utils import send_rpc_request, exit_with_error, create_result_file, \
|
||||||
|
set_output, open_json, load_config, save_output, log_info
|
||||||
|
|
||||||
|
|
||||||
class TokenIndexer:
|
class TokenIndexer:
|
||||||
|
|
||||||
def __init__(self, indexing_type = "address"):
|
def __init__(self):
|
||||||
|
|
||||||
self.env_vars = os_utils.load_config()
|
self.env_vars = load_config()
|
||||||
self.web3 = self._set_web3_object()
|
self.web3 = self._set_web3_object()
|
||||||
|
|
||||||
if not self._is_connected():
|
|
||||||
os_utils.exit_with_error('Cannot connect to the node. Exiting.')
|
|
||||||
|
|
||||||
# contract parameters
|
# contract parameters
|
||||||
self.contract_address = self.env_vars['TOKEN_CONTRACT']
|
self.contract_address = self.env_vars['TOKEN_CONTRACT']
|
||||||
self.contract_abi = self._set_contract_abi()
|
self.contract_abi = self._set_contract_abi()
|
||||||
self.contract_object = self.web3.eth.contract(abi=self.contract_abi)
|
self.contract_object = self.web3.eth.contract(abi=self.contract_abi)
|
||||||
self.events = self.contract_object.events.Transfer
|
|
||||||
|
|
||||||
# indexing parameters
|
# indexing parameters
|
||||||
self.indexing_type = self._set_indexing_type(indexing_type)
|
self.provider_url = self.env_vars['RPC_PROVIDER_URL']
|
||||||
self.max_retries = int(self.env_vars['MAX_RETRIES'])
|
self.max_retries = int(self.env_vars['MAX_RETRIES'])
|
||||||
self.retries_timeout = float(self.env_vars['RETRIES_TIMEOUT'])
|
self.size_chunks_next = int(self.env_vars['SIZE_CHUNK_NEXT'])
|
||||||
self.size_chunks_next = float(self.env_vars['SIZE_CHUNK_NEXT'])
|
|
||||||
|
|
||||||
# results parameters
|
# results parameters
|
||||||
self.result_data = {}
|
self.result_data = {}
|
||||||
|
@ -45,56 +40,39 @@ class TokenIndexer:
|
||||||
# Private methods: setters #
|
# Private methods: setters #
|
||||||
###########################################
|
###########################################
|
||||||
|
|
||||||
def _is_connected(self) -> bool:
|
|
||||||
"""Check if the node is connected to the network."""
|
|
||||||
|
|
||||||
return self.web3.isConnected()
|
|
||||||
|
|
||||||
def _set_web3_object(self) -> None:
|
def _set_web3_object(self) -> None:
|
||||||
"""Set web3 object from RPC provider."""
|
"""Set web3 object from RPC provider."""
|
||||||
|
|
||||||
rpc_provider = HTTPProvider(self.env_vars['RPC_PROVIDER_URL'])
|
rpc_provider = HTTPProvider(self.env_vars['RPC_PROVIDER_URL'])
|
||||||
rpc_provider.middlewares.clear()
|
rpc_provider.middlewares.clear()
|
||||||
return Web3(rpc_provider)
|
return Web3(rpc_provider)
|
||||||
|
|
||||||
def _set_result_destination(self) -> None:
|
|
||||||
"""Set result destination."""
|
|
||||||
|
|
||||||
this_result_file = os_utils.create_result_file("raw_data")
|
|
||||||
return os_utils.set_output(this_result_file, self.env_vars)
|
|
||||||
|
|
||||||
|
|
||||||
|
def _is_connected(self) -> bool:
|
||||||
|
"""Check if the node is connected to the network."""
|
||||||
|
|
||||||
|
if not self.web3.isConnected():
|
||||||
|
exit_with_error('Cannot connect to the node. Exiting.')
|
||||||
|
|
||||||
def _set_contract_abi(self) -> None:
|
def _set_contract_abi(self) -> None:
|
||||||
"""Set contract ABI."""
|
"""Set contract ABI."""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return os_utils.open_json(self.env_vars['TOKEN_CONTRACT_ABI'])
|
return open_json(self.env_vars['TOKEN_CONTRACT_ABI'])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
os_utils.exit_with_error(f'Cannot parse contract ABI: {e}. Exiting.')
|
exit_with_error(f'Cannot parse contract ABI: {e}. Exiting.')
|
||||||
|
|
||||||
def _set_indexing_type(self, indexing_type: str) -> None:
|
def _set_result_destination(self) -> None:
|
||||||
"""Set filter for indexing."""
|
"""Set result destination."""
|
||||||
|
|
||||||
if indexing_type == "address":
|
this_result_file = create_result_file("raw_data")
|
||||||
return {indexing_type: self.contract_address}
|
return set_output(this_result_file, self.env_vars)
|
||||||
|
|
||||||
else:
|
|
||||||
os_utils.exit_with_error(f'Indexing type {indexing_type} is not implemented yet. Exiting.')
|
|
||||||
|
|
||||||
|
|
||||||
###########################################
|
###########################################
|
||||||
# Private methods: logic #
|
# Private methods: logic #
|
||||||
###########################################
|
###########################################
|
||||||
|
|
||||||
def _get_end_block(self, start_block) -> int:
|
|
||||||
"""Get the last block to index."""
|
|
||||||
|
|
||||||
end_block = self.web3.eth.blockNumber - 1
|
|
||||||
|
|
||||||
if start_block > end_block:
|
|
||||||
os_utils.exit_with_error(f'Cannot start from block {start_block} and end at block {end_block}. Exiting.')
|
|
||||||
|
|
||||||
return end_block
|
|
||||||
|
|
||||||
def _get_block_timestamp(self, block_number) -> int:
|
def _get_block_timestamp(self, block_number) -> int:
|
||||||
"""Get the timestamp of a given block."""
|
"""Get the timestamp of a given block."""
|
||||||
|
|
||||||
|
@ -104,87 +82,68 @@ class TokenIndexer:
|
||||||
except (BlockNotFound, ValueError):
|
except (BlockNotFound, ValueError):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _fetch_events(self, start_block, end_block) -> dict:
|
def _get_logs(self, from_block: int, to_block: int) -> list:
|
||||||
"""Fetch events from a range of blocks."""
|
"""Get logs from a given address between two blocks,"""
|
||||||
|
|
||||||
# https://github.com/ethereum/web3.py/blob/master/web3/_utils/filters.py
|
# keccak256('Transfer(address,address,uint256)')
|
||||||
_, event_filter = construct_event_filter_params(self.contract_abi,
|
topic = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
|
||||||
self.web3.codec,
|
|
||||||
address=self.contract_address,
|
|
||||||
argument_filters=self.indexing_type,
|
|
||||||
fromBlock=start_block,
|
|
||||||
toBlock=end_block)
|
|
||||||
filter_logs = self.web3.eth.get_logs(event_filter)
|
|
||||||
return [self._get_event_data(self.web3.codec, self.contract_abi, event) for event in filter_logs]
|
|
||||||
|
|
||||||
def _web3_retry_call(self, start_block, end_block) -> None:
|
# https://docs.infura.io/infura/networks/ethereum/json-rpc-methods/eth_getlogs
|
||||||
"""Handle eth_getLogs multiple reuests by retrying."""
|
method = 'eth_getLogs'
|
||||||
|
|
||||||
retry = 0
|
|
||||||
while retry < self.max_retries - 1:
|
|
||||||
try:
|
|
||||||
return end_block, self._fetch_events(start_block, end_block)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
os_utils.log_error(f'Failed to index events for blocks range {start_block} to {end_block}: {e}')
|
|
||||||
end_block = start_block + ((end_block - start_block) // 2)
|
|
||||||
time.sleep(self.retries_timeout)
|
|
||||||
retry += 1
|
|
||||||
|
|
||||||
def _run_indexer_by_chunk(self, start_block, end_block_for_chunk) -> (int, dict):
|
|
||||||
"""Run the indexer for each chunk."""
|
|
||||||
|
|
||||||
this_results = []
|
|
||||||
this_end_block, events = self._web3_retry_call(start_block, end_block_for_chunk)
|
|
||||||
|
|
||||||
for events in events:
|
|
||||||
transfer = {
|
|
||||||
"from": events["args"]["from"],
|
|
||||||
"to": events["args"]["to"],
|
|
||||||
"value": str(to_decimal(wei_to_eth(events["args"]["to"],))),
|
|
||||||
}
|
|
||||||
this_results.append(transfer)
|
|
||||||
|
|
||||||
return this_end_block, this_results
|
return send_rpc_request(self.provider_url, method,
|
||||||
|
[{'address': self.contract_address,
|
||||||
def _run_indexer(self, start_block=None, end_block=None) -> None:
|
'fromBlock': from_block,
|
||||||
|
'toBlock': to_block,
|
||||||
|
'topics': [topic]
|
||||||
|
}])
|
||||||
|
|
||||||
# set up the indexer
|
def _get_last_block_number(self) -> int:
|
||||||
results = {}
|
"""
|
||||||
start_block = start_block or 0
|
Get the last block number with the eth_blockNumber method.
|
||||||
end_block = end_block or self._get_end_block(start_block)
|
https://docs.infura.io/infura/networks/ethereum/json-rpc-methods/eth_blocknumber
|
||||||
|
"""
|
||||||
# start the indexer loop
|
|
||||||
while start_block <= end_block:
|
method = 'eth_blockNumber'
|
||||||
|
return convert_hex_to_int(send_rpc_request(self.provider_url, method))
|
||||||
end_block_for_chunk = int(start_block + self.size_chunks_next)
|
|
||||||
os_utils.log_info(f'Indexing transfers for blocks: {start_block} - {end_block_for_chunk}')
|
|
||||||
|
|
||||||
# scan chunk
|
|
||||||
this_block_end, this_results = self._run_indexer_by_chunk(start_block, end_block_for_chunk)
|
|
||||||
|
|
||||||
# update indexer parameters
|
|
||||||
results += this_results
|
|
||||||
start_block = this_block_end + 1
|
|
||||||
|
|
||||||
self.result_data = results
|
|
||||||
|
|
||||||
|
|
||||||
###########################
|
###########################
|
||||||
# Public methods #
|
# Public methods #
|
||||||
###########################
|
###########################
|
||||||
|
|
||||||
|
def get_transfer_logs_chunks(self, from_block=None, to_block=None) -> list:
|
||||||
|
"""Get transfer logs from a given address between two blocks by small chunks"""
|
||||||
|
|
||||||
|
logs = []
|
||||||
|
from_block = from_block or 1
|
||||||
|
to_block = to_block or self._get_last_block_number()
|
||||||
|
|
||||||
|
log_info(f'Indexing transfer events between blocks {from_block} and {to_block}...')
|
||||||
|
for block in range(from_block, to_block, self.size_chunks_next):
|
||||||
|
attempt = 0
|
||||||
|
while attempt < self.max_retries:
|
||||||
|
last_block = block + self.size_chunks_next
|
||||||
|
print(f'loading blocks {block} to {last_block}')
|
||||||
|
try:
|
||||||
|
this_logs = self._get_logs(hex(block), hex(last_block))
|
||||||
|
if this_logs:
|
||||||
|
log_info(f'Found {len(this_logs)} transfer events between blocks {block} and {last_block}.')
|
||||||
|
logs += this_logs
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
attempt += 1
|
||||||
|
|
||||||
|
self.result_data = logs
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Run the indexer."""
|
"""Run the indexer."""
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
self._run_indexer()
|
self.get_transfer_logs_chunks()
|
||||||
|
|
||||||
print(self.result_data)
|
|
||||||
import sys
|
|
||||||
sys.exit()
|
|
||||||
delta_time = time.time() - start_time
|
delta_time = time.time() - start_time
|
||||||
os_utils.log_info(f'{len(self.result_data)} transfer events were indexed on {delta_time} seconds.')
|
log_info(f'{len(self.result_data)} transfer events were indexed on {delta_time} seconds.')
|
||||||
|
|
||||||
os_utils.save_output(self.result_filepath, self.result_data)
|
save_output(self.result_filepath, self.result_data)
|
||||||
os_utils.log_info(f'Results were saved at {self.result_filepath}.')
|
log_info(f'Results were saved at {self.result_filepath}.')
|
||||||
|
|
|
@ -28,3 +28,9 @@ def wei_to_eth(num) -> float:
|
||||||
"""Convert wei to eth."""
|
"""Convert wei to eth."""
|
||||||
|
|
||||||
return num / float(1000000000000000000)
|
return num / float(1000000000000000000)
|
||||||
|
|
||||||
|
|
||||||
|
def convert_hex_to_int(hex_string: str) -> int:
|
||||||
|
"""Convert a hex string to an integer"""
|
||||||
|
|
||||||
|
return int(hex_string, 16)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import os
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import requests
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
@ -43,7 +44,6 @@ def load_config() -> dict:
|
||||||
env_vars['TOKEN_CONTRACT'] = os.getenv("TOKEN_CONTRACT")
|
env_vars['TOKEN_CONTRACT'] = os.getenv("TOKEN_CONTRACT")
|
||||||
env_vars['TOKEN_CONTRACT_ABI'] = os.getenv("TOKEN_CONTRACT_ABI")
|
env_vars['TOKEN_CONTRACT_ABI'] = os.getenv("TOKEN_CONTRACT_ABI")
|
||||||
env_vars['MAX_RETRIES'] = os.getenv("MAX_RETRIES")
|
env_vars['MAX_RETRIES'] = os.getenv("MAX_RETRIES")
|
||||||
env_vars['RETRIES_TIMEOUT'] = os.getenv("RETRIES_TIMEOUT")
|
|
||||||
env_vars['SIZE_CHUNK_NEXT'] = os.getenv("SIZE_CHUNK_NEXT")
|
env_vars['SIZE_CHUNK_NEXT'] = os.getenv("SIZE_CHUNK_NEXT")
|
||||||
env_vars['OUTPUT_DIR'] = os.getenv("OUTPUT_DIR")
|
env_vars['OUTPUT_DIR'] = os.getenv("OUTPUT_DIR")
|
||||||
set_logging(os.getenv("LOG_LEVEL"))
|
set_logging(os.getenv("LOG_LEVEL"))
|
||||||
|
@ -136,3 +136,22 @@ def create_result_file(prefix) -> str:
|
||||||
|
|
||||||
this_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
this_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||||
return f'{prefix}_{this_time}.json'
|
return f'{prefix}_{this_time}.json'
|
||||||
|
|
||||||
|
|
||||||
|
def send_rpc_request(url, method, params=None) -> dict:
|
||||||
|
"""Send a JSON-RPC request to a given URL"""
|
||||||
|
|
||||||
|
params = params or []
|
||||||
|
data = {'jsonrpc': '2.0', 'method': method, 'params': params, 'id': 1}
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data)
|
||||||
|
if 'result' in response.json():
|
||||||
|
return response.json()['result']
|
||||||
|
else:
|
||||||
|
log_error('Query failed: {}.'.format(response.json()['error']))
|
||||||
|
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
log_error('Error querying to {0}: {1}'.format(url, e.response.text))
|
||||||
|
|
||||||
|
return {}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue