diff --git a/personal/count_fields.py b/personal/count_fields.py new file mode 100644 index 0000000..f687b2c --- /dev/null +++ b/personal/count_fields.py @@ -0,0 +1,91 @@ +import zstandard +import os +import json +import sys +from datetime import datetime +import logging.handlers +from collections import defaultdict + + +log = logging.getLogger("bot") +log.setLevel(logging.DEBUG) +log.addHandler(logging.StreamHandler()) + + +def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0): + chunk = reader.read(chunk_size) + bytes_read += chunk_size + if previous_chunk is not None: + chunk = previous_chunk + chunk + try: + return chunk.decode() + except UnicodeDecodeError: + if bytes_read > max_window_size: + raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes") + log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk") + return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read) + + +def read_lines_zst(file_name): + with open(file_name, 'rb') as file_handle: + buffer = '' + reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) + while True: + chunk = read_and_decode(reader, 2**27, (2**29) * 2) + if not chunk: + break + lines = (buffer + chunk).split("\n") + for line in lines[:-1]: + yield json.loads(line) + buffer = lines[-1] + reader.close() + + +if __name__ == "__main__": + #input_folder = r"\\MYCLOUDPR4100\Public\ingest\ingest\comments\23-06-23" + input_folder = r"\\MYCLOUDPR4100\Public\reddit\comments" + input_files = [] + total_size = 0 + for subdir, dirs, files in os.walk(input_folder): + for filename in files: + input_path = os.path.join(subdir, filename) + if input_path.endswith(".zst"): + file_size = os.stat(input_path).st_size + total_size += file_size + input_files.append([input_path, file_size]) + + log.info(f"Processing {len(input_files)} files of {(total_size / (2**30)):.2f} gigabytes") + + total_lines = 0 + fields = defaultdict(lambda: defaultdict(int)) + for input_file in input_files: + file_lines = 0 + created = None + for obj in read_lines_zst(input_file[0]): + for key, value in obj.items(): + value = str(value)[:20] + fields[key][value] += 1 + + created = datetime.utcfromtimestamp(int(obj['created_utc'])) + file_lines += 1 + if file_lines % 100000 == 0: + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,}") + if file_lines >= 1000: + break + total_lines += file_lines + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,}") + + sorted_fields = [] + for key, values in fields.items(): + total_occurrences = 0 + unique_values = 0 + examples = [] + for value_name, count in values.items(): + unique_values += 1 + total_occurrences += count + if len(examples) < 3: + examples.append(value_name) + sorted_fields.append((total_occurrences, f"{key}: {(total_occurrences / total_lines) * 100:.2f} : {unique_values:,} : {','.join(examples)}")) + sorted_fields.sort(key=lambda x:x[0], reverse=True) + for count, string in sorted_fields: + log.info(string) diff --git a/personal/test_file.py b/personal/test_file.py index 45f3d18..0a04ff6 100644 --- a/personal/test_file.py +++ b/personal/test_file.py @@ -8,7 +8,7 @@ log = discord_logging.init_logging() if __name__ == "__main__": - input_path = r"\\MYCLOUDPR4100\Public\reddit\subreddits\NoStupidQuestions_comments.zst" + input_path = r"\\MYCLOUDPR4100\Public\ingest\combined\comments\RC_23-07-10.zst" input_file_paths = [] if os.path.isdir(input_path): @@ -32,11 +32,12 @@ if __name__ == "__main__": for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path): new_timestamp = int(obj['created_utc']) created = datetime.utcfromtimestamp(new_timestamp) - if previous_timestamp is not None and previous_timestamp - (60 * 60 * 4) > new_timestamp: + if previous_timestamp is not None and previous_timestamp - (60 * 60) > new_timestamp: log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}") previous_timestamp = new_timestamp file_lines += 1 - if file_lines % 10000 == 0: + if file_lines % 100000 == 0: log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%") + files_processed += 1 log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")