mirror of
https://github.com/autistic-symposium/blockchain-data-engineering-toolkit.git
synced 2025-04-25 10:19:13 -04:00
add db methods
This commit is contained in:
parent
817e5dfcec
commit
678f09fb33
@ -22,3 +22,13 @@ SIZE_CHUNK_NEXT = 5000
|
||||
LOG_LEVEL=info
|
||||
OUTPUT_DIR = ./output
|
||||
|
||||
|
||||
|
||||
#########################
|
||||
##### mongodb settings
|
||||
#########################
|
||||
MONGDB_URI = mongodb://localhost:27017
|
||||
MONGODB_DB_NAME = balances
|
||||
MONGODB_COLLECTION_NAME = balances
|
||||
|
||||
|
||||
|
@ -7,17 +7,21 @@ base58==2.1.1
|
||||
bitarray==2.7.3
|
||||
certifi==2022.12.7
|
||||
charset-normalizer==3.1.0
|
||||
click==8.1.3
|
||||
click==8.0.4
|
||||
colorama==0.4.6
|
||||
cytoolz==0.12.1
|
||||
dnspython==2.3.0
|
||||
eth-abi==2.2.0
|
||||
eth-account==0.5.9
|
||||
eth-hash==0.5.1
|
||||
eth-hash==0.3.3
|
||||
eth-keyfile==0.5.1
|
||||
eth-keys==0.3.4
|
||||
eth-rlp==0.2.1
|
||||
eth-typing==2.3.0
|
||||
eth-utils==1.9.5
|
||||
eth-utils==1.10.0
|
||||
ethereum-dasm==0.1.4
|
||||
ethereum-etl==2.1.2
|
||||
evmdasm==0.1.10
|
||||
fastapi==0.94.0
|
||||
frozenlist==1.3.3
|
||||
h11==0.14.0
|
||||
@ -47,6 +51,7 @@ simplejson==3.18.3
|
||||
six==1.16.0
|
||||
sniffio==1.3.0
|
||||
starlette==0.26.0.post1
|
||||
tabulate==0.9.0
|
||||
toolz==0.12.0
|
||||
tqdm==4.65.0
|
||||
typing_extensions==4.5.0
|
||||
|
@ -85,7 +85,7 @@ class TokenIndexer:
|
||||
|
||||
try:
|
||||
block_timestamp = self.web3.eth.getBlock(block_number)['timestamp']
|
||||
return int(datetime.datetime.utcfromtimestamp(block_timestamp))
|
||||
return datetime.datetime.utcfromtimestamp(block_timestamp)
|
||||
except (BlockNotFound, ValueError):
|
||||
return None
|
||||
|
||||
@ -122,11 +122,15 @@ class TokenIndexer:
|
||||
|
||||
try:
|
||||
for log in logs:
|
||||
print(log)
|
||||
import sys
|
||||
sys.exit()
|
||||
processed_logs[log['transactionHash']] = {}
|
||||
processed_logs[log['transactionHash']]['blockNumber'] = convert_hex_to_int(log['blockNumber'])
|
||||
processed_logs[log['transactionHash']]['from'] = '0x' + log['topics'][1][26:]
|
||||
processed_logs[log['transactionHash']]['to'] = '0x' + log['topics'][2][26:]
|
||||
processed_logs[log['transactionHash']]['amount'] = float(Decimal(convert_hex_to_int(log['data'])) * self.decimal)
|
||||
processed_logs[log['transactionHash']]['timestamp'] = self._get_block_timestamp(convert_hex_to_int(log['blockNumber']))
|
||||
except KeyError as e:
|
||||
print(f'Error processing logs: {e}')
|
||||
|
||||
|
@ -6,7 +6,8 @@
|
||||
import uvicorn
|
||||
import argparse
|
||||
|
||||
from src.utils.db_utils import populate_db
|
||||
from src.utils.os_utils import load_config
|
||||
from src.utils.db_processing import populate_db
|
||||
from src.blockchains.ethereum import TokenIndexer
|
||||
from src.utils.vercel_utils import upload_to_vercel
|
||||
from src.utils.test_api import fetch_token_balance as f
|
||||
@ -24,7 +25,7 @@ def run_menu() -> argparse.ArgumentParser:
|
||||
help="Process historical transfer events data. \
|
||||
Example: indexer -p <json data file>")
|
||||
parser.add_argument('-d', dest='db', nargs=1,
|
||||
help="Populate db with processed event data. \
|
||||
help="Populate local db with processed event data. \
|
||||
Example: indexer -d <json data file>")
|
||||
|
||||
parser.add_argument('-a', dest='api', action='store_true',
|
||||
@ -49,6 +50,7 @@ def run_menu() -> argparse.ArgumentParser:
|
||||
def run() -> None:
|
||||
"""Entry point for this module."""
|
||||
|
||||
load_config()
|
||||
parser = run_menu()
|
||||
args = parser.parse_args()
|
||||
|
||||
|
@ -1,25 +1,30 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# server/api.py
|
||||
# This class implements the API server.
|
||||
|
||||
from fastapi import FastAPI
|
||||
from routes import router
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app.include_router(router)
|
||||
|
||||
|
||||
from pymongo import MongoClient
|
||||
|
||||
from src.utils import os_utils
|
||||
from src.server.routes import router
|
||||
|
||||
url = "mongodb://localhost:27017/"
|
||||
DB_NAME = "balances"
|
||||
|
||||
@app.on_event("startup")
|
||||
def startup_db_client():
|
||||
app.mongodb_client = MongoClient(url)
|
||||
app.database = app.mongodb_client[DB_NAME]
|
||||
def create_app():
|
||||
"""Create the FastAPI app."""
|
||||
|
||||
print("Connected to the MongoDB database!")
|
||||
app = FastAPI()
|
||||
app.include_router(router)
|
||||
|
||||
@app.on_event("shutdown")
|
||||
def shutdown_db_client():
|
||||
app.mongodb_client.close()
|
||||
env_vars = os_utils.load_config()
|
||||
url = env_vars['MONGODB_URL']
|
||||
db_name = env_vars['MONGODB_DB_NAME']
|
||||
|
||||
@app.on_event("startup")
|
||||
def startup_db_client():
|
||||
app.mongodb_client = MongoClient(url)
|
||||
app.database = app.mongodb_client[db_name]
|
||||
os_utils.log_info("Connected to the MongoDB database!")
|
||||
|
||||
@app.on_event("shutdown")
|
||||
def shutdown_db_client():
|
||||
app.mongodb_client.close()
|
54
token-scanner-api/src/utils/db_processing.py
Normal file
54
token-scanner-api/src/utils/db_processing.py
Normal file
@ -0,0 +1,54 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# utils/db_processing.py
|
||||
# Furnish the database with data. This should be run once.
|
||||
|
||||
import pymongo
|
||||
import src.utils.os_utils as os_utils
|
||||
|
||||
|
||||
def format_and_load_data(filepath):
|
||||
"""Load and parse data prior being ingested into the database."""
|
||||
|
||||
data = os_utils.open_json(filepath)
|
||||
result = []
|
||||
|
||||
for wallet, balance in data.items():
|
||||
result.append({"wallet": wallet, "balance": balance})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def populate_db(filepath):
|
||||
|
||||
#################################
|
||||
# Connect to database via client
|
||||
#################################
|
||||
env_vars = os_utils.load_config()
|
||||
url = env_vars['MONGODB_URL']
|
||||
db_name = env_vars['MONGODB_DB_NAME']
|
||||
collection_name = env_vars['MONGODB_COLLECTION_NAME']
|
||||
|
||||
client = pymongo.MongoClient(url)
|
||||
|
||||
with client:
|
||||
##############################
|
||||
# Create database for balances
|
||||
##############################
|
||||
database = client[db_name]
|
||||
|
||||
################################
|
||||
# Create collection for balances
|
||||
# (make sure it's empty first)
|
||||
################################
|
||||
wallet_collection = database[collection_name]
|
||||
wallet_collection.drop()
|
||||
|
||||
##############################
|
||||
# Load wallet balances into db
|
||||
##############################
|
||||
data = format_and_load_data(filepath)
|
||||
wallet_collection.insert_many(data)
|
||||
|
||||
os_utils.log_info(f'Inserted {len(data)} records into database.')
|
||||
os_utils.log_info(f'Number of records in database: {wallet_collection.count_documents({})}')
|
||||
os_utils.log_info(f'Example of record: {wallet_collection.find_one()}')
|
@ -1,29 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# utils/furnish_db.py
|
||||
# Furnish the database with data.
|
||||
|
||||
import pymongo
|
||||
import src.utils.os_utils as os_utils
|
||||
|
||||
def run():
|
||||
|
||||
url = "mongodb://localhost:27017/"
|
||||
|
||||
client = pymongo.MongoClient(url)
|
||||
db = client.test
|
||||
database_name = client["balances"]
|
||||
collection_name = database_name["balances"]
|
||||
|
||||
|
||||
filename = "./balance.json"
|
||||
data = os_utils.open_json(filename)
|
||||
|
||||
|
||||
result = []
|
||||
for wallet, balance in data.items():
|
||||
result.append({"wallet": wallet, "balance": balance})
|
||||
|
||||
collection_name.insert_many(result)
|
||||
|
||||
def populate_db():
|
||||
pass
|
@ -47,6 +47,9 @@ def load_config() -> dict:
|
||||
env_vars['SIZE_CHUNK_NEXT'] = os.getenv("SIZE_CHUNK_NEXT")
|
||||
env_vars['OUTPUT_DIR'] = os.getenv("OUTPUT_DIR")
|
||||
env_vars['TOKEN_DECIMALS'] = os.getenv("TOKEN_DECIMALS")
|
||||
env_vars['MONGODB_URI'] = os.getenv("MONGODB_URI")
|
||||
env_vars['MONGODB_DB_NAME'] = os.getenv("MONGODB_DB_NAME")
|
||||
env_vars['MONGODB_COLLECTION_NAME'] = os.getenv("MONGODB_COLLECTION_NAME")
|
||||
set_logging(os.getenv("LOG_LEVEL"))
|
||||
return env_vars
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user