first commit

This commit is contained in:
osiris account 2023-03-11 17:12:27 -08:00
commit bb17a2a56e
29 changed files with 1238 additions and 0 deletions

View file

@ -0,0 +1 @@
# -*- encoding: utf-8 -*-

View file

@ -0,0 +1 @@
# -*- encoding: utf-8 -*-

View file

@ -0,0 +1,190 @@
# -*- encoding: utf-8 -*-
# blockchains/ethereum.py
# This class implements a blockchain indexer for Ethereum.
import time
import datetime
from web3 import Web3
from web3.exceptions import BlockNotFound
from web3.providers.rpc import HTTPProvider
from web3._utils.filters import construct_event_filter_params
import src.utils.os_utils as os_utils
from src.utils.arithmetics import wei_to_eth, to_decimal
class TokenIndexer:
def __init__(self, indexing_type = "address"):
self.env_vars = os_utils.load_config()
self.web3 = self._set_web3_object()
if not self._is_connected():
os_utils.exit_with_error('Cannot connect to the node. Exiting.')
# contract parameters
self.contract_address = self.env_vars['TOKEN_CONTRACT']
self.contract_abi = self._set_contract_abi()
self.contract_object = self.web3.eth.contract(abi=self.contract_abi)
self.events = self.contract_object.events.Transfer
# indexing parameters
self.indexing_type = self._set_indexing_type(indexing_type)
self.max_retries = int(self.env_vars['MAX_RETRIES'])
self.retries_timeout = float(self.env_vars['RETRIES_TIMEOUT'])
self.size_chunks_next = float(self.env_vars['SIZE_CHUNK_NEXT'])
# results parameters
self.result_data = {}
self.result_filepath = self._set_result_destination()
###########################################
# Private methods: setters #
###########################################
def _is_connected(self) -> bool:
"""Check if the node is connected to the network."""
return self.web3.isConnected()
def _set_web3_object(self) -> None:
"""Set web3 object from RPC provider."""
rpc_provider = HTTPProvider(self.env_vars['RPC_PROVIDER_URL'])
rpc_provider.middlewares.clear()
return Web3(rpc_provider)
def _set_result_destination(self) -> None:
"""Set result destination."""
this_result_file = os_utils.create_result_file("raw_data")
return os_utils.set_output(this_result_file, self.env_vars)
def _set_contract_abi(self) -> None:
"""Set contract ABI."""
try:
return os_utils.open_json(self.env_vars['TOKEN_CONTRACT_ABI'])
except Exception as e:
os_utils.exit_with_error(f'Cannot parse contract ABI: {e}. Exiting.')
def _set_indexing_type(self, indexing_type: str) -> None:
"""Set filter for indexing."""
if indexing_type == "address":
return {indexing_type: self.contract_address}
else:
os_utils.exit_with_error(f'Indexing type {indexing_type} is not implemented yet. Exiting.')
###########################################
# Private methods: logic #
###########################################
def _get_end_block(self, start_block) -> int:
"""Get the last block to index."""
end_block = self.web3.eth.blockNumber - 1
if start_block > end_block:
os_utils.exit_with_error(f'Cannot start from block {start_block} and end at block {end_block}. Exiting.')
return end_block
def _get_block_timestamp(self, block_number) -> int:
"""Get the timestamp of a given block."""
try:
block_timestamp = self.web3.eth.getBlock(block_number)['timestamp']
return int(datetime.datetime.utcfromtimestamp(block_timestamp))
except (BlockNotFound, ValueError):
return None
def _fetch_events(self, start_block, end_block) -> dict:
"""Fetch events from a range of blocks."""
# https://github.com/ethereum/web3.py/blob/master/web3/_utils/filters.py
_, event_filter = construct_event_filter_params(self.contract_abi,
self.web3.codec,
address=self.contract_address,
argument_filters=self.indexing_type,
fromBlock=start_block,
toBlock=end_block)
filter_logs = self.web3.eth.get_logs(event_filter)
return [self._get_event_data(self.web3.codec, self.contract_abi, event) for event in filter_logs]
def _web3_retry_call(self, start_block, end_block) -> None:
"""Handle eth_getLogs multiple reuests by retrying."""
retry = 0
while retry < self.max_retries - 1:
try:
return end_block, self._fetch_events(start_block, end_block)
except Exception as e:
os_utils.log_error(f'Failed to index events for blocks range {start_block} to {end_block}: {e}')
end_block = start_block + ((end_block - start_block) // 2)
time.sleep(self.retries_timeout)
retry += 1
def _run_indexer_by_chunk(self, start_block, end_block_for_chunk) -> (int, dict):
"""Run the indexer for each chunk."""
this_results = []
this_end_block, events = self._web3_retry_call(start_block, end_block_for_chunk)
for events in events:
transfer = {
"from": events["args"]["from"],
"to": events["args"]["to"],
"value": str(to_decimal(wei_to_eth(events["args"]["to"],))),
}
this_results.append(transfer)
return this_end_block, this_results
def _run_indexer(self, start_block=None, end_block=None) -> None:
# set up the indexer
results = {}
start_block = start_block or 0
end_block = end_block or self._get_end_block(start_block)
# start the indexer loop
while start_block <= end_block:
end_block_for_chunk = int(start_block + self.size_chunks_next)
os_utils.log_info(f'Indexing transfers for blocks: {start_block} - {end_block_for_chunk}')
# scan chunk
this_block_end, this_results = self._run_indexer_by_chunk(start_block, end_block_for_chunk)
# update indexer parameters
results += this_results
start_block = this_block_end + 1
self.result_data = results
###########################
# Public methods #
###########################
def run(self):
"""Run the indexer."""
start_time = time.time()
self._run_indexer()
print(self.result_data)
import sys
sys.exit()
delta_time = time.time() - start_time
os_utils.log_info(f'{len(self.result_data)} transfer events were indexed on {delta_time} seconds.')
os_utils.save_output(self.result_filepath, self.result_data)
os_utils.log_info(f'Results were saved at {self.result_filepath}.')

View file

@ -0,0 +1,90 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# src/main.py
# Entry point for ethereum-token-api.
import uvicorn
import argparse
from src.utils.db_utils import populate_db
from src.blockchains.ethereum import TokenIndexer
from src.utils.vercel_utils import upload_to_vercel
from src.utils.test_api import fetch_token_balance as f
from src.utils.data_processing import run_data_processing
def run_menu() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description='🪙 Token indexer and API.')
parser.add_argument('-e', dest='indexer', action='store_true', \
help="Retrieve historical transfer events data on Ethereum. \
Example: indexer -e")
parser.add_argument('-p', dest='process', nargs=1,
help="Process historical transfer events data. \
Example: indexer -p <json data file>")
parser.add_argument('-d', dest='db', nargs=1,
help="Populate db with processed event data. \
Example: indexer -d <json data file>")
parser.add_argument('-a', dest='api', action='store_true',
help="Run the event scanner api locally. \
Example: indexer -a")
parser.add_argument('-c', dest='vercel', action='store_true',
help="Deploy event scanner to Vercel. \
Example: indexer -c")
parser.add_argument('-b', dest='balance', nargs=1,
help="Fetch token balance for a given wallet. \
Example: indexer -b <wallet address>")
parser.add_argument('-t', dest='top', nargs=1,
help="Fetch top token holders. \
Example: indexer -t <number of holders>")
parser.add_argument('-g', dest='change', nargs=1,
help="Fetch weekly balance change for a given wallet. \
Example: indexer -g <wallet address>")
return parser
def run() -> None:
"""Entry point for this module."""
parser = run_menu()
args = parser.parse_args()
#############################
# Run historical data indexer
#############################
if args.indexer:
indexer = TokenIndexer()
indexer.run()
elif args.process:
run_data_processing(args.process[0])
elif args.db:
populate_db(args.db[0])
#############################
# Run deployment tools
#############################
elif args.api:
uvicorn.run("src.server.api:app", host="0.0.0.0", port=8000, reload=True)
elif args.vercel:
upload_to_vercel()
#############################
# Run api tests
#############################
elif args.balance:
f.fetch_token_balance(args.balance[0])
elif args.top:
f.fetch_top_holders(args.top[0])
elif args.change:
f.fetch_change(args.change[0])
else:
parser.print_help()
if __name__ == "__main__":
run()

View file

@ -0,0 +1 @@
# -*- encoding: utf-8 -*-

View file

@ -0,0 +1,25 @@
from fastapi import FastAPI
from routes import router
app = FastAPI()
app.include_router(router)
from pymongo import MongoClient
url = "mongodb://localhost:27017/"
DB_NAME = "balances"
@app.on_event("startup")
def startup_db_client():
app.mongodb_client = MongoClient(url)
app.database = app.mongodb_client[DB_NAME]
print("Connected to the MongoDB database!")
@app.on_event("shutdown")
def shutdown_db_client():
app.mongodb_client.close()

View file

@ -0,0 +1,52 @@
import motor.motor_asyncio
import json
MONGO_DETAILS = "mongodb://localhost:27017"
#client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS)
from pymongo import MongoClient
client = MongoClient(MONGO_DETAILS)
database = client.balances
collection = database.get_collection("balances_fuckkk")
print(database.list_collection_names())
def wallet_helper(item) -> dict:
return {
"wallet": item["wallet"],
}
from bson.objectid import ObjectId
async def retrieve_students():
bals = collection.find()
res = []
counter = 0
for i in bals:
res.append(wallet_helper(i))
if counter > 2:
break
counter += 1
return res
def balancer_helper(item) -> dict:
return {
"wallet": item["wallet"],
"balance": item["balance"],
}
# Retrieve a student with a matching ID
async def retrieve_student(wallet: str) -> dict:
student = collection.find_one({"wallet": wallet})
if student:
return balancer_helper(student)

View file

@ -0,0 +1,27 @@
from typing import Optional
from pydantic import BaseModel, EmailStr, Field
class WalletsSchema(BaseModel):
wallet: float = Field(...)
class Config:
allow_population_by_field_name = True
schema_extra = {
"example": {
"wallet": "balance"
}
}
def ResponseModel(data, message):
return {
"data": [data],
"code": 200,
"message": message,
}
def ErrorResponseModel(error, code, message):
return {"error": error, "code": code, "message": message}

View file

@ -0,0 +1,61 @@
import asyncio
import ethereum as APIEth
from fastapi import APIRouter, Body
from fastapi.encoders import jsonable_encoder
from database import (
retrieve_students,
retrieve_student,
)
from models import (
WalletsSchema,
ResponseModel,
ErrorResponseModel,
)
router = APIRouter()
@router.get("/")
async def get_notes() -> dict:
return {
"message": "server is up and running!"
}
@router.get("/balance/{address}")
async def get_token_balance(address: str) -> dict:
"""Get a token balance for a given address."""
futures = [retrieve_student(address)]
result = await asyncio.gather(*futures)
return {"result": result}
@router.get("/top")
async def get_top_holders() -> dict:
"""Get top holders of a given token."""
futures = [retrieve_students()]
result = await asyncio.gather(*futures)
if result:
return {"top_holders": result}
else:
return {"error": "No holders found"}
@router.get("/weekly/{address}")
async def get_holder_weekly_change(address: str) -> dict:
"""Get weekly change of a given address."""
futures = [APIEth.fetch_weekly_balance_change_by_address(address)]
result = await asyncio.gather(*futures)
print(result)
return {"result": result}

View file

@ -0,0 +1 @@
# -*- encoding: utf-8 -*-

View file

@ -0,0 +1,30 @@
# -*- encoding: utf-8 -*-
# utils/arithmetics.py
# This class implements math methods used by the other classes.
from decimal import Decimal, getcontext
from src.utils.os_utils import log_error
def div(dividend, divisor) -> Decimal:
"""Return higher precision division."""
if divisor == 0:
log_error('Found a zero division error. Returning 0.')
return 0
return to_decimal(dividend) / to_decimal(divisor)
def to_decimal(value, precision=None) -> Decimal:
"""Return Decimal value for higher (defined) precision."""
precision = precision or 22
getcontext().prec = precision
return Decimal(value)
def wei_to_eth(num) -> float:
"""Convert wei to eth."""
return num / float(1000000000000000000)

View file

@ -0,0 +1,36 @@
# -*- encoding: utf-8 -*-
# utils/data_processing.py
# Data processing for token transfers.
import collections
from decimal import Decimal
import src.utils.os_utils as os_utils
def process_balances(filepath) -> list:
"""Return a list of balances for each address."""
data = os_utils.open_json(filepath)
balances = collections.defaultdict(Decimal)
for _, block_data in data.items():
for _, tx_data in block_data.items():
for _, event_data in tx_data.items():
balances[event_data["from"]] -= Decimal(event_data["value"])
balances[event_data["to"]] += Decimal(event_data["value"])
balances = {key: float(value) for key, value in balances.items() if value >= Decimal('0')}
return dict(sorted(balances.items(), key=lambda x: x[1]))
def run_data_processing(filepath) -> None:
"""Run data processing."""
balance_data = process_balances(filepath)
balance_output_file = os_utils.create_result_file("balances")
balance_output_filepath = os_utils.set_output(balance_output_file)
os_utils.log_info(f' Writing balances to {balance_output_filepath}')
os_utils.save_output(balance_output_filepath, balance_data)

View file

@ -0,0 +1,29 @@
# -*- encoding: utf-8 -*-
# utils/furnish_db.py
# Furnish the database with data.
import pymongo
import src.utils.os_utils as os_utils
def run():
url = "mongodb://localhost:27017/"
client = pymongo.MongoClient(url)
db = client.test
database_name = client["balances"]
collection_name = database_name["balances"]
filename = "./balance.json"
data = os_utils.open_json(filename)
result = []
for wallet, balance in data.items():
result.append({"wallet": wallet, "balance": balance})
collection_name.insert_many(result)
def populate_db():
pass

View file

@ -0,0 +1,138 @@
# -*- encoding: utf-8 -*-
# utils/os.py
# This class implements OS/file system util methods used by the other classes.
import os
import sys
import json
import logging
from pathlib import Path
from dotenv import load_dotenv
from datetime import datetime
def set_logging(log_level) -> None:
"""Set logging level according to .env config."""
if log_level == 'info':
logging.basicConfig(level=logging.INFO, format='%(message)s')
elif log_level == 'error':
logging.basicConfig(level=logging.ERROR, format='%(message)s')
elif log_level == 'debug':
logging.basicConfig(level=logging.DEBUG, format='%(message)s')
else:
print(f'Logging level {log_level} is not available. Setting to ERROR')
logging.basicConfig(level=logging.ERROR, format='%(message)s')
def load_config() -> dict:
"""Load and set environment variables."""
env_file = Path('.') / '.env'
if not os.path.isfile(env_file):
exit_with_error('Please create an .env file')
env_vars = {}
load_dotenv(env_file)
try:
env_vars['RPC_PROVIDER_URL'] = os.getenv("RPC_PROVIDER_URL")
env_vars['TOKEN_CONTRACT'] = os.getenv("TOKEN_CONTRACT")
env_vars['TOKEN_CONTRACT_ABI'] = os.getenv("TOKEN_CONTRACT_ABI")
env_vars['MAX_RETRIES'] = os.getenv("MAX_RETRIES")
env_vars['RETRIES_TIMEOUT'] = os.getenv("RETRIES_TIMEOUT")
env_vars['SIZE_CHUNK_NEXT'] = os.getenv("SIZE_CHUNK_NEXT")
env_vars['OUTPUT_DIR'] = os.getenv("OUTPUT_DIR")
set_logging(os.getenv("LOG_LEVEL"))
return env_vars
except KeyError as e:
exit_with_error(f'Cannot extract env variables: {e}. Exiting.')
def log_error(string) -> None:
"""Print STDOUT error using the logging library."""
logging.error('⛔️ %s', string)
def log_info(string) -> None:
"""Print STDOUT info using the logging library."""
logging.info(' %s', string)
def log_debug(string) -> None:
"""Print STDOUT debug using the logging library."""
logging.debug('⚠️ %s', string)
def open_json(filepath) -> dict:
"""Load and parse a file."""
try:
with open(filepath, 'r', encoding='utf-8') as infile:
return json.load(infile)
except (IOError, FileNotFoundError, TypeError) as e:
exit_with_error(f'Failed to parse: "{filepath}": {e}')
def format_path(dir_path, filename) -> str:
"""Format a OS full filepath."""
return os.path.join(dir_path, filename)
def save_output(destination, data, mode="w") -> None:
"""Save data from memory to a destination in disk."""
try:
with open(destination, mode, encoding='utf-8') as outfile:
json.dump(data, outfile, indent=4)
except (IOError, TypeError) as e:
log_error(f'Could not save {destination}: {e}')
def create_dir(result_dir) -> None:
"""Check whether a directory exists and create it if needed."""
try:
if not os.path.isdir(result_dir):
os.mkdir(result_dir)
except OSError as e:
log_error(f'Could not create {result_dir}: {e}')
def set_output(output_file, env_vars=None) -> str:
"""Create an output destination to save solutions."""
if env_vars is None:
env_vars = load_config()
try:
output_dir = env_vars['OUTPUT_DIR']
create_dir(output_dir)
return format_path(output_dir, output_file)
except (TypeError, KeyError) as e:
exit_with_error(f'Could not format output file: {e}')
def exit_with_error(message) -> None:
"""Log an error message and halt the program."""
log_error(message)
sys.exit(1)
def create_result_file(prefix) -> str:
"""Create an output file to save solutions."""
this_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
return f'{prefix}_{this_time}.json'

View file

@ -0,0 +1,8 @@
def fetch_token_balance():
pass
def fetch_top_token_holders():
pass
def fetch_change():
pass

View file

@ -0,0 +1,4 @@
# -*- encoding: utf-8 -*-
def upload_to_vercel():
pass