From 4e140e218ba7b6756028271dfbaf369637fa8473 Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Sun, 12 Nov 2023 16:18:04 -0800 Subject: [PATCH] Add recompress file test script --- personal/combine/build_month.py | 6 +-- personal/compression/recompress_file.py | 63 +++++++++++++++++++++++++ scripts/find_overlapping_users.py | 6 ++- 3 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 personal/compression/recompress_file.py diff --git a/personal/combine/build_month.py b/personal/combine/build_month.py index 5186ca9..663183d 100644 --- a/personal/combine/build_month.py +++ b/personal/combine/build_month.py @@ -76,7 +76,7 @@ if __name__ == "__main__": log.info(f"Counting: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {total_objects:,} : {total_bytes:,}") output_path = os.path.join(args.output, args.type, f"{prefix}_{month.strftime('%Y-%m')}.zst") - output_handle = zstandard.ZstdCompressor(level=level, write_content_size=True, threads=-1).stream_writer(open(output_path, 'wb'), size=total_bytes) + output_handle = zstandard.ZstdCompressor(level=level, write_content_size=True, write_checksum=True, threads=-1).stream_writer(open(output_path, 'wb'), size=total_bytes) count_objects = 0 count_bytes = 0 @@ -86,8 +86,8 @@ if __name__ == "__main__": minute_file_path = os.path.join(args.input, args.type, minute_iterator.strftime('%y-%m-%d'), f"{prefix}_{minute_iterator.strftime('%y-%m-%d_%H-%M')}.zst") for obj, line, _ in utils.read_obj_zst_meta(minute_file_path): line_encoded = line.encode('utf-8') - total_bytes += len(line_encoded) - total_bytes += 1 + count_bytes += len(line_encoded) + count_bytes += 1 output_handle.write(line_encoded) output_handle.write(NEWLINE_ENCODED) diff --git a/personal/compression/recompress_file.py b/personal/compression/recompress_file.py new file mode 100644 index 0000000..b036e10 --- /dev/null +++ b/personal/compression/recompress_file.py @@ -0,0 +1,63 @@ +import argparse +import zstandard +import utils +import discord_logging +import time +import os + +log = discord_logging.init_logging() + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="Take all the zst files in the input folder, extract them and compress them again at the ratio specified") + parser.add_argument("input", help="The input file") + parser.add_argument("output", help="The output file") + parser.add_argument("--level", help="The compression ratio to output at", default="3") + args = parser.parse_args() + + log.info(f"Input file {args.input}") + log.info(f"Output file {args.output}") + + # files = [] + # total_size = 0 + # for file_name in os.listdir(args.input): + # file_path = os.path.join(args.input, file_name) + # if file_name.endswith(".zst") and os.path.isfile(file_path): + # file_size = os.stat(file_path).st_size + # total_size += file_size + # files.append((file_name, file_size)) + # if len(files) % 1000 == 0: + # log.info(f"Loaded {len(files)} files") + # log.info(f"Loaded {len(files)} files of total size {total_size:,}") + # + # level = int(args.level) + # log.info(f"Writing files out to {args.output} at ratio {level}") + # if not os.path.exists(args.output): + # os.makedirs(args.output) + + total_objects = 0 + total_bytes = 0 + for obj, line, _ in utils.read_obj_zst_meta(args.input): + total_bytes += len(line.encode('utf-8')) + total_bytes += 1 + + total_objects += 1 + if total_objects % 1000000 == 0: + log.info(f"{total_objects:,} : {total_bytes:,}") + + log.info(f"{total_objects:,} : {total_bytes:,}") + + for threads in range(-1, 21): + decompressor = zstandard.ZstdDecompressor(max_window_size=2**31) + compressor = zstandard.ZstdCompressor(level=22, write_content_size=True, write_checksum=True, threads=threads) + start_time = time.time() + with open(args.input, 'rb') as input_handle, open(args.output, "wb") as output_handle: + compression_reader = decompressor.stream_reader(input_handle) + read_count, write_count = compressor.copy_stream(compression_reader, output_handle, size=total_bytes) + seconds = time.time() - start_time + + log.info(f"{read_count:,} to {write_count:,} in {seconds:,.2f} with {threads} threads") + + # compressed_bytes_read += file_size + # uncompressed_bytes_read += read_count + # bytes_written += write_count + # log.info(f"{files_read:,}/{len(files):,} : {(compressed_bytes_read / (2**30)):.2f} gb of {(total_size / (2**30)):.2f} gb compressed to {(bytes_written / (2**30)):.2f} gb : {bytes_written /compressed_bytes_read:.3f}") diff --git a/scripts/find_overlapping_users.py b/scripts/find_overlapping_users.py index acded30..5099557 100644 --- a/scripts/find_overlapping_users.py +++ b/scripts/find_overlapping_users.py @@ -7,8 +7,10 @@ import zstandard import json input_files = [ - r"\\MYCLOUDPR4100\Public\reddit\subreddits\collapse_comments.zst", - r"\\MYCLOUDPR4100\Public\reddit\subreddits\Slovakia_comments.zst", + r"\\MYCLOUDPR4100\Public\reddit\subreddits\PersonalFinanceCanada_comments.zst", + r"\\MYCLOUDPR4100\Public\reddit\subreddits\hacking_comments.zst", + r"\\MYCLOUDPR4100\Public\reddit\subreddits\alberta_comments.zst", + r"\\MYCLOUDPR4100\Public\reddit\subreddits\GothGirls_comments.zst", ] ignored_users = ['[deleted]', 'automoderator'] min_comments_per_sub = 1