mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-28 09:04:10 -04:00
Add count fields
This commit is contained in:
parent
8a0256285f
commit
3700b21b81
2 changed files with 95 additions and 3 deletions
91
personal/count_fields.py
Normal file
91
personal/count_fields.py
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
import zstandard
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
import logging.handlers
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger("bot")
|
||||||
|
log.setLevel(logging.DEBUG)
|
||||||
|
log.addHandler(logging.StreamHandler())
|
||||||
|
|
||||||
|
|
||||||
|
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
|
||||||
|
chunk = reader.read(chunk_size)
|
||||||
|
bytes_read += chunk_size
|
||||||
|
if previous_chunk is not None:
|
||||||
|
chunk = previous_chunk + chunk
|
||||||
|
try:
|
||||||
|
return chunk.decode()
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
if bytes_read > max_window_size:
|
||||||
|
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
|
||||||
|
log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
|
||||||
|
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
|
||||||
|
|
||||||
|
|
||||||
|
def read_lines_zst(file_name):
|
||||||
|
with open(file_name, 'rb') as file_handle:
|
||||||
|
buffer = ''
|
||||||
|
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||||
|
while True:
|
||||||
|
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
lines = (buffer + chunk).split("\n")
|
||||||
|
for line in lines[:-1]:
|
||||||
|
yield json.loads(line)
|
||||||
|
buffer = lines[-1]
|
||||||
|
reader.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
#input_folder = r"\\MYCLOUDPR4100\Public\ingest\ingest\comments\23-06-23"
|
||||||
|
input_folder = r"\\MYCLOUDPR4100\Public\reddit\comments"
|
||||||
|
input_files = []
|
||||||
|
total_size = 0
|
||||||
|
for subdir, dirs, files in os.walk(input_folder):
|
||||||
|
for filename in files:
|
||||||
|
input_path = os.path.join(subdir, filename)
|
||||||
|
if input_path.endswith(".zst"):
|
||||||
|
file_size = os.stat(input_path).st_size
|
||||||
|
total_size += file_size
|
||||||
|
input_files.append([input_path, file_size])
|
||||||
|
|
||||||
|
log.info(f"Processing {len(input_files)} files of {(total_size / (2**30)):.2f} gigabytes")
|
||||||
|
|
||||||
|
total_lines = 0
|
||||||
|
fields = defaultdict(lambda: defaultdict(int))
|
||||||
|
for input_file in input_files:
|
||||||
|
file_lines = 0
|
||||||
|
created = None
|
||||||
|
for obj in read_lines_zst(input_file[0]):
|
||||||
|
for key, value in obj.items():
|
||||||
|
value = str(value)[:20]
|
||||||
|
fields[key][value] += 1
|
||||||
|
|
||||||
|
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||||
|
file_lines += 1
|
||||||
|
if file_lines % 100000 == 0:
|
||||||
|
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,}")
|
||||||
|
if file_lines >= 1000:
|
||||||
|
break
|
||||||
|
total_lines += file_lines
|
||||||
|
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,}")
|
||||||
|
|
||||||
|
sorted_fields = []
|
||||||
|
for key, values in fields.items():
|
||||||
|
total_occurrences = 0
|
||||||
|
unique_values = 0
|
||||||
|
examples = []
|
||||||
|
for value_name, count in values.items():
|
||||||
|
unique_values += 1
|
||||||
|
total_occurrences += count
|
||||||
|
if len(examples) < 3:
|
||||||
|
examples.append(value_name)
|
||||||
|
sorted_fields.append((total_occurrences, f"{key}: {(total_occurrences / total_lines) * 100:.2f} : {unique_values:,} : {','.join(examples)}"))
|
||||||
|
sorted_fields.sort(key=lambda x:x[0], reverse=True)
|
||||||
|
for count, string in sorted_fields:
|
||||||
|
log.info(string)
|
|
@ -8,7 +8,7 @@ log = discord_logging.init_logging()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
input_path = r"\\MYCLOUDPR4100\Public\reddit\subreddits\NoStupidQuestions_comments.zst"
|
input_path = r"\\MYCLOUDPR4100\Public\ingest\combined\comments\RC_23-07-10.zst"
|
||||||
|
|
||||||
input_file_paths = []
|
input_file_paths = []
|
||||||
if os.path.isdir(input_path):
|
if os.path.isdir(input_path):
|
||||||
|
@ -32,11 +32,12 @@ if __name__ == "__main__":
|
||||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
||||||
new_timestamp = int(obj['created_utc'])
|
new_timestamp = int(obj['created_utc'])
|
||||||
created = datetime.utcfromtimestamp(new_timestamp)
|
created = datetime.utcfromtimestamp(new_timestamp)
|
||||||
if previous_timestamp is not None and previous_timestamp - (60 * 60 * 4) > new_timestamp:
|
if previous_timestamp is not None and previous_timestamp - (60 * 60) > new_timestamp:
|
||||||
log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}")
|
log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
previous_timestamp = new_timestamp
|
previous_timestamp = new_timestamp
|
||||||
file_lines += 1
|
file_lines += 1
|
||||||
if file_lines % 10000 == 0:
|
if file_lines % 100000 == 0:
|
||||||
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||||
|
|
||||||
|
files_processed += 1
|
||||||
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue