mirror of
https://github.com/sys-nyx/red-arch.git
synced 2025-05-06 08:45:31 -04:00
global commit to catch any untracked changes so far
This commit is contained in:
parent
32ea2550be
commit
5a1a78f682
3 changed files with 82 additions and 0 deletions
Binary file not shown.
Before Width: | Height: | Size: 107 KiB |
Binary file not shown.
Before Width: | Height: | Size: 163 KiB |
82
watchful.py
Normal file
82
watchful.py
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
import zstandard
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
import logging.handlers
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger("bot")
|
||||||
|
log.setLevel(logging.DEBUG)
|
||||||
|
log.addHandler(logging.StreamHandler())
|
||||||
|
|
||||||
|
|
||||||
|
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
|
||||||
|
chunk = reader.read(chunk_size)
|
||||||
|
bytes_read += chunk_size
|
||||||
|
if previous_chunk is not None:
|
||||||
|
chunk = previous_chunk + chunk
|
||||||
|
try:
|
||||||
|
return chunk.decode()
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
if bytes_read > max_window_size:
|
||||||
|
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
|
||||||
|
log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
|
||||||
|
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
|
||||||
|
|
||||||
|
|
||||||
|
def read_lines_zst(file_name):
|
||||||
|
with open(file_name, 'rb') as file_handle:
|
||||||
|
buffer = ''
|
||||||
|
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||||
|
while True:
|
||||||
|
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
|
||||||
|
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
lines = (buffer + chunk).split("\n")
|
||||||
|
|
||||||
|
for line in lines[:-1]:
|
||||||
|
yield line, file_handle.tell()
|
||||||
|
|
||||||
|
buffer = lines[-1]
|
||||||
|
|
||||||
|
reader.close()
|
||||||
|
|
||||||
|
def return_redd_objects(path: str) -> list[dict]:
|
||||||
|
file_path = path
|
||||||
|
file_size = os.stat(file_path).st_size
|
||||||
|
file_lines = 0
|
||||||
|
file_bytes_processed = 0
|
||||||
|
created = None
|
||||||
|
# field = "subreddit"
|
||||||
|
# value = "wallstreetbets"
|
||||||
|
bad_lines = 0
|
||||||
|
objects = []
|
||||||
|
authors = {}
|
||||||
|
|
||||||
|
# try:
|
||||||
|
for line, file_bytes_processed in read_lines_zst(file_path):
|
||||||
|
try:
|
||||||
|
obj = json.loads(line)
|
||||||
|
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||||
|
# temp = obj[field] == value
|
||||||
|
objects.append(obj)
|
||||||
|
|
||||||
|
except (KeyError, json.JSONDecodeError) as err:
|
||||||
|
bad_lines += 1
|
||||||
|
file_lines += 1
|
||||||
|
|
||||||
|
if file_lines % 100000 == 0:
|
||||||
|
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {file_bytes_processed:,}:{(file_bytes_processed / file_size) * 100:.0f}%")
|
||||||
|
log.info(f"Complete : {file_lines:,} : {bad_lines:,}")
|
||||||
|
return objects
|
||||||
|
|
||||||
|
if __name__=="__main__":
|
||||||
|
inpath = sys.argv[1]
|
||||||
|
outpath = sys.argv[2]
|
||||||
|
|
||||||
|
objs = return_redd_objects(inpath)
|
||||||
|
|
||||||
|
with open(outpath, 'w') as f:
|
||||||
|
f.write(json.dumps(objs, indent=2))
|
Loading…
Add table
Add a link
Reference in a new issue