mirror of
https://github.com/autistic-symposium/blockchain-data-engineering-toolkit.git
synced 2025-05-11 11:15:01 -04:00
💾
This commit is contained in:
parent
d0eb510794
commit
95da45b3da
4 changed files with 27 additions and 2 deletions
token-scanner-api
|
@ -5,6 +5,7 @@
|
||||||
RPC_PROVIDER_URL =
|
RPC_PROVIDER_URL =
|
||||||
TOKEN_CONTRACT =
|
TOKEN_CONTRACT =
|
||||||
TOKEN_CONTRACT_ABI =
|
TOKEN_CONTRACT_ABI =
|
||||||
|
TOKEN_DECIMALS =
|
||||||
|
|
||||||
#########################
|
#########################
|
||||||
##### indexing token data
|
##### indexing token data
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -6,6 +6,8 @@ import time
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
from web3 import Web3
|
from web3 import Web3
|
||||||
|
from decimal import Decimal
|
||||||
|
from collections import defaultdict
|
||||||
from web3.exceptions import BlockNotFound
|
from web3.exceptions import BlockNotFound
|
||||||
from web3.providers.rpc import HTTPProvider
|
from web3.providers.rpc import HTTPProvider
|
||||||
|
|
||||||
|
@ -30,6 +32,7 @@ class TokenIndexer:
|
||||||
self.provider_url = self.env_vars['RPC_PROVIDER_URL']
|
self.provider_url = self.env_vars['RPC_PROVIDER_URL']
|
||||||
self.max_retries = int(self.env_vars['MAX_RETRIES'])
|
self.max_retries = int(self.env_vars['MAX_RETRIES'])
|
||||||
self.size_chunks_next = int(self.env_vars['SIZE_CHUNK_NEXT'])
|
self.size_chunks_next = int(self.env_vars['SIZE_CHUNK_NEXT'])
|
||||||
|
self.decimal = self._set_decimal()
|
||||||
|
|
||||||
# results parameters
|
# results parameters
|
||||||
self.result_data = {}
|
self.result_data = {}
|
||||||
|
@ -47,6 +50,10 @@ class TokenIndexer:
|
||||||
rpc_provider.middlewares.clear()
|
rpc_provider.middlewares.clear()
|
||||||
return Web3(rpc_provider)
|
return Web3(rpc_provider)
|
||||||
|
|
||||||
|
def _set_decimal(self) -> None:
|
||||||
|
"""Set token contracts decimal."""
|
||||||
|
|
||||||
|
return Decimal('10') ** Decimal(f'-{self.env_vars["TOKEN_DECIMALS"]}')
|
||||||
|
|
||||||
def _is_connected(self) -> bool:
|
def _is_connected(self) -> bool:
|
||||||
"""Check if the node is connected to the network."""
|
"""Check if the node is connected to the network."""
|
||||||
|
@ -107,6 +114,23 @@ class TokenIndexer:
|
||||||
method = 'eth_blockNumber'
|
method = 'eth_blockNumber'
|
||||||
return convert_hex_to_int(send_rpc_request(self.provider_url, method))
|
return convert_hex_to_int(send_rpc_request(self.provider_url, method))
|
||||||
|
|
||||||
|
def _process_logs(self, logs: list) -> dict:
|
||||||
|
"""Process the logs and return a dictionary with the results."""
|
||||||
|
|
||||||
|
processed_logs = defaultdict()
|
||||||
|
|
||||||
|
try:
|
||||||
|
for log in logs:
|
||||||
|
processed_logs[log['transactionHash']] = {}
|
||||||
|
processed_logs[log['transactionHash']]['blockNumber'] = log['blockNumber']
|
||||||
|
processed_logs[log['transactionHash']]['from'] = '0x' + log['topics'][1][26:]
|
||||||
|
processed_logs[log['transactionHash']]['to'] = '0x' + log['topics'][2][26:]
|
||||||
|
processed_logs[log['transactionHash']]['amount'] = Decimal(convert_hex_to_int(log['data'])) * self.decimal
|
||||||
|
except KeyError as e:
|
||||||
|
print(f'Error processing logs: {e}')
|
||||||
|
|
||||||
|
return processed_logs
|
||||||
|
|
||||||
|
|
||||||
###########################
|
###########################
|
||||||
# Public methods #
|
# Public methods #
|
||||||
|
@ -134,7 +158,7 @@ class TokenIndexer:
|
||||||
except Exception:
|
except Exception:
|
||||||
attempt += 1
|
attempt += 1
|
||||||
|
|
||||||
self.result_data = logs
|
self.result_data = self._process_logs(logs)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Run the indexer."""
|
"""Run the indexer."""
|
||||||
|
|
|
@ -46,6 +46,7 @@ def load_config() -> dict:
|
||||||
env_vars['MAX_RETRIES'] = os.getenv("MAX_RETRIES")
|
env_vars['MAX_RETRIES'] = os.getenv("MAX_RETRIES")
|
||||||
env_vars['SIZE_CHUNK_NEXT'] = os.getenv("SIZE_CHUNK_NEXT")
|
env_vars['SIZE_CHUNK_NEXT'] = os.getenv("SIZE_CHUNK_NEXT")
|
||||||
env_vars['OUTPUT_DIR'] = os.getenv("OUTPUT_DIR")
|
env_vars['OUTPUT_DIR'] = os.getenv("OUTPUT_DIR")
|
||||||
|
env_vars['TOKEN_DECIMALS'] = os.getenv("TOKEN_DECIMALS")
|
||||||
set_logging(os.getenv("LOG_LEVEL"))
|
set_logging(os.getenv("LOG_LEVEL"))
|
||||||
return env_vars
|
return env_vars
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue