From ba6da35b3789388657ec2c7db6e391709ebc818a Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Mon, 30 Jan 2023 17:05:22 -0800 Subject: [PATCH] Fix other chunk sizing --- personal/count_by_subreddit.py | 4 ++-- personal/utils.py | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/personal/count_by_subreddit.py b/personal/count_by_subreddit.py index 292ec74..064fce2 100644 --- a/personal/count_by_subreddit.py +++ b/personal/count_by_subreddit.py @@ -8,7 +8,7 @@ log = discord_logging.init_logging() if __name__ == "__main__": subreddits = defaultdict(int) - input_file = r"\\MYCLOUDPR4100\Public\reddit\comments\RC_2021-06.zst" + input_file = r"\\MYCLOUDPR4100\Public\pushshift_working\RC_2022-12.zst" input_file_size = os.stat(input_file).st_size total_lines = 0 for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file): @@ -20,5 +20,5 @@ if __name__ == "__main__": log.info(f"{total_lines:,} lines, 100%") for subreddit, count in sorted(subreddits.items(), key=lambda item: item[1] * -1): - if count > 1000: + if count >= 1: log.info(f"r/{subreddit}: {count:,}") diff --git a/personal/utils.py b/personal/utils.py index 746669f..041de3e 100644 --- a/personal/utils.py +++ b/personal/utils.py @@ -19,12 +19,25 @@ def read_obj_zst(file_name): reader.close() +def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0): + chunk = reader.read(chunk_size) + bytes_read += chunk_size + if previous_chunk is not None: + chunk = previous_chunk + chunk + try: + return chunk.decode() + except UnicodeDecodeError: + if bytes_read > max_window_size: + raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes") + return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read) + + def read_obj_zst_meta(file_name): with open(file_name, 'rb') as file_handle: buffer = '' reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) while True: - chunk = reader.read(2**27).decode() + chunk = read_and_decode(reader, 2**27, (2**29) * 2) if not chunk: break lines = (buffer + chunk).split("\n")