This commit is contained in:
Watchful1 2024-01-26 21:42:34 -08:00
parent 7727934db0
commit 591c4a21f6

View file

@ -151,13 +151,16 @@ def process_file(file, queue, threads, level):
file.total_lines, file.uncompressed_size = count_lines_bytes(file.input_path)
queue.put(file)
decompressor = zstandard.ZstdDecompressor(max_window_size=2**31)
compressor = zstandard.ZstdCompressor(level=level, write_content_size=True, write_checksum=True, threads=threads)
with open(file.input_path, 'rb') as input_handle, open(file.output_path, "wb") as output_handle:
compression_reader = decompressor.stream_reader(input_handle)
read_count, file.new_compressed_size = compressor.copy_stream(compression_reader, output_handle, size=file.uncompressed_size)
try:
decompressor = zstandard.ZstdDecompressor(max_window_size=2**31)
compressor = zstandard.ZstdCompressor(level=level, write_content_size=True, write_checksum=True, threads=threads)
with open(file.input_path, 'rb') as input_handle, open(file.output_path, "wb") as output_handle:
compression_reader = decompressor.stream_reader(input_handle)
read_count, file.new_compressed_size = compressor.copy_stream(compression_reader, output_handle, size=file.uncompressed_size)
file.complete = True
except Exception as err:
file.error_message = str(err)
#log.info(f"{read_count:,} to {write_count:,} in {seconds:,.2f} with {threads} threads")
file.complete = True
queue.put(file)
@ -168,7 +171,7 @@ if __name__ == '__main__':
parser.add_argument("--level", help="The compression ratio to output at. From 0 to 22", default=22, type=int)
parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working")
parser.add_argument("--processes", help="Number of processes to use", default=4, type=int)
parser.add_argument("--threads", help="Number of threads per process", default=1, type=int)
parser.add_argument("--threads", help="Number of threads per process", default=0, type=int)
parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False)
script_type = "compress"
@ -252,16 +255,17 @@ if __name__ == '__main__':
if file.input_path == file_update.input_path:
input_files[i] = file_update
file = file_update
processed_old_bytes += file.old_compressed_size
processed_uncompressed_bytes += file.uncompressed_size
processed_new_bytes += file.new_compressed_size
processed_lines += file.total_lines
if file.complete:
processed_old_bytes += file.old_compressed_size
processed_uncompressed_bytes += file.uncompressed_size if file.uncompressed_size is not None else 0
processed_new_bytes += file.new_compressed_size if file.new_compressed_size is not None else 0
processed_lines += file.total_lines if file.total_lines is not None else 0
files_processed += 1 if file.complete or file.error_message is not None else 0
files_errored += 1 if file.error_message is not None else 0
i += 1
if file_update.complete or file_update.error_message is not None:
save_file_list(input_files, args.working, status_json, arg_string, script_type)
log.debug(f"Finished file: {file_update.input_path} : {file_update.file_size:,}")
log.debug(f"Finished file: {file_update.input_path}")
current_time = time.time()
progress_queue.put([current_time, processed_old_bytes])
@ -274,10 +278,11 @@ if __name__ == '__main__':
days_left = int(hours_left / 24)
log.info(
f"{(processed_old_bytes / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(processed_old_bytes / total_old_bytes) * 100:.0f}% : "
f"{(processed_uncompressed_bytes / (2**30)):.2f} gb uncompressed to {(processed_new_bytes / (2**30)):.2f} gb : "
f"{(processed_old_bytes / processed_uncompressed_bytes)} old ratio : {(processed_new_bytes / processed_uncompressed_bytes)} new ratio : {(processed_new_bytes / processed_old_bytes)} difference : "
f"{(processed_old_bytes / (2**30)):.3f} gb at {(bytes_per_second / (2**20)):,.2f} mb/s, {(processed_old_bytes / total_old_bytes) * 100:.0f}% : "
f"{(processed_uncompressed_bytes / (2**30)):.3f} gb uncompressed down to {(processed_new_bytes / (2**30)):.3f} gb compressed : "
f"{(processed_old_bytes / processed_uncompressed_bytes):.3f} old ratio : {(processed_new_bytes / processed_uncompressed_bytes):.3f} new ratio : {(processed_new_bytes / processed_old_bytes):.3f} difference : "
f"{files_processed}({files_errored})/{len(input_files)} files : "
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining")
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining : "
f"{first_time}:{first_bytes}:{current_time}:{processed_old_bytes}:{processed_uncompressed_bytes}:{processed_new_bytes}:{total_old_bytes}:{int(sum(speed_queue.list))}:{len(speed_queue.list)}")
log.info(f"{(processed_old_bytes / (2**30)):.2f} gb, {(processed_old_bytes / total_old_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")