From 591c4a21f69b68730c9ac3c164fc64093ad62ad3 Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Fri, 26 Jan 2024 21:42:34 -0800 Subject: [PATCH] Clean up --- .../recompress_folder_multiprocess.py | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/personal/compression/recompress_folder_multiprocess.py b/personal/compression/recompress_folder_multiprocess.py index c42616d..1e99db7 100644 --- a/personal/compression/recompress_folder_multiprocess.py +++ b/personal/compression/recompress_folder_multiprocess.py @@ -151,13 +151,16 @@ def process_file(file, queue, threads, level): file.total_lines, file.uncompressed_size = count_lines_bytes(file.input_path) queue.put(file) - decompressor = zstandard.ZstdDecompressor(max_window_size=2**31) - compressor = zstandard.ZstdCompressor(level=level, write_content_size=True, write_checksum=True, threads=threads) - with open(file.input_path, 'rb') as input_handle, open(file.output_path, "wb") as output_handle: - compression_reader = decompressor.stream_reader(input_handle) - read_count, file.new_compressed_size = compressor.copy_stream(compression_reader, output_handle, size=file.uncompressed_size) + try: + decompressor = zstandard.ZstdDecompressor(max_window_size=2**31) + compressor = zstandard.ZstdCompressor(level=level, write_content_size=True, write_checksum=True, threads=threads) + with open(file.input_path, 'rb') as input_handle, open(file.output_path, "wb") as output_handle: + compression_reader = decompressor.stream_reader(input_handle) + read_count, file.new_compressed_size = compressor.copy_stream(compression_reader, output_handle, size=file.uncompressed_size) + file.complete = True + except Exception as err: + file.error_message = str(err) #log.info(f"{read_count:,} to {write_count:,} in {seconds:,.2f} with {threads} threads") - file.complete = True queue.put(file) @@ -168,7 +171,7 @@ if __name__ == '__main__': parser.add_argument("--level", help="The compression ratio to output at. From 0 to 22", default=22, type=int) parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working") parser.add_argument("--processes", help="Number of processes to use", default=4, type=int) - parser.add_argument("--threads", help="Number of threads per process", default=1, type=int) + parser.add_argument("--threads", help="Number of threads per process", default=0, type=int) parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False) script_type = "compress" @@ -252,16 +255,17 @@ if __name__ == '__main__': if file.input_path == file_update.input_path: input_files[i] = file_update file = file_update - processed_old_bytes += file.old_compressed_size - processed_uncompressed_bytes += file.uncompressed_size - processed_new_bytes += file.new_compressed_size - processed_lines += file.total_lines + if file.complete: + processed_old_bytes += file.old_compressed_size + processed_uncompressed_bytes += file.uncompressed_size if file.uncompressed_size is not None else 0 + processed_new_bytes += file.new_compressed_size if file.new_compressed_size is not None else 0 + processed_lines += file.total_lines if file.total_lines is not None else 0 files_processed += 1 if file.complete or file.error_message is not None else 0 files_errored += 1 if file.error_message is not None else 0 i += 1 if file_update.complete or file_update.error_message is not None: save_file_list(input_files, args.working, status_json, arg_string, script_type) - log.debug(f"Finished file: {file_update.input_path} : {file_update.file_size:,}") + log.debug(f"Finished file: {file_update.input_path}") current_time = time.time() progress_queue.put([current_time, processed_old_bytes]) @@ -274,10 +278,11 @@ if __name__ == '__main__': days_left = int(hours_left / 24) log.info( - f"{(processed_old_bytes / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(processed_old_bytes / total_old_bytes) * 100:.0f}% : " - f"{(processed_uncompressed_bytes / (2**30)):.2f} gb uncompressed to {(processed_new_bytes / (2**30)):.2f} gb : " - f"{(processed_old_bytes / processed_uncompressed_bytes)} old ratio : {(processed_new_bytes / processed_uncompressed_bytes)} new ratio : {(processed_new_bytes / processed_old_bytes)} difference : " + f"{(processed_old_bytes / (2**30)):.3f} gb at {(bytes_per_second / (2**20)):,.2f} mb/s, {(processed_old_bytes / total_old_bytes) * 100:.0f}% : " + f"{(processed_uncompressed_bytes / (2**30)):.3f} gb uncompressed down to {(processed_new_bytes / (2**30)):.3f} gb compressed : " + f"{(processed_old_bytes / processed_uncompressed_bytes):.3f} old ratio : {(processed_new_bytes / processed_uncompressed_bytes):.3f} new ratio : {(processed_new_bytes / processed_old_bytes):.3f} difference : " f"{files_processed}({files_errored})/{len(input_files)} files : " - f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining") + f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining : " + f"{first_time}:{first_bytes}:{current_time}:{processed_old_bytes}:{processed_uncompressed_bytes}:{processed_new_bytes}:{total_old_bytes}:{int(sum(speed_queue.list))}:{len(speed_queue.list)}") log.info(f"{(processed_old_bytes / (2**30)):.2f} gb, {(processed_old_bytes / total_old_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")