diff --git a/personal/diagnostic/test_files_multiprocess.py b/personal/diagnostic/test_files_multiprocess.py index fef2e57..2d98073 100644 --- a/personal/diagnostic/test_files_multiprocess.py +++ b/personal/diagnostic/test_files_multiprocess.py @@ -62,7 +62,7 @@ class Queue: # we don't want to save the whole FileConfig object, since some info resets if we restart def save_file_list(input_files, status_json, script_type): simple_file_list = [] - for file in input_files: + for file in input_files.values(): simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines]) with open(status_json, 'w') as status_json_file: output_dict = { @@ -79,9 +79,7 @@ def load_file_list(status_json): output_dict = json.load(status_json_file) input_files = [] for simple_file in output_dict["files"]: - input_files.append( - FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4]) - ) + input_files[simple_file[0]] = FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4]) return input_files, output_dict["type"] else: return None, None @@ -166,13 +164,13 @@ if __name__ == '__main__': # if the file list wasn't loaded from the json, this is the first run, find what files we need to process if input_files is None: - input_files = [] + input_files = {} for subdir, dirs, files in os.walk(args.input): files.sort() for file_name in files: if file_name.endswith(".zst"): input_path = os.path.join(subdir, file_name) - input_files.append(FileConfig(input_path)) + input_files[input_path] = FileConfig(input_path) save_file_list(input_files, status_json, script_type) else: @@ -186,7 +184,7 @@ if __name__ == '__main__': files_to_process = [] # calculate the total file size for progress reports, build a list of incomplete files to process # do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end - for file in sorted(input_files, key=lambda item: item.file_size, reverse=True): + for file in sorted(input_files.values(), key=lambda item: item.file_size, reverse=True): total_bytes += file.file_size if file.complete: files_processed += 1 @@ -215,9 +213,9 @@ if __name__ == '__main__': if file_update.error_message is not None: log.warning(f"File failed {file_update.input_path}: {file_update.error_message}") current_time = time.time() + + input_files[file_update.input_path] = file_update if last_log_time is None or (current_time - last_log_time) > 5 or queue.empty(): - # I'm going to assume that the list of files is short enough that it's no - # big deal to just iterate each time since that saves a bunch of work total_lines_processed = 0 total_bytes_processed = 0 total_lines_errored = 0 @@ -225,9 +223,6 @@ if __name__ == '__main__': files_errored = 0 i = 0 for file in input_files: - if file.input_path == file_update.input_path: - input_files[i] = file_update - file = file_update total_lines_processed += file.lines_processed total_bytes_processed += file.bytes_processed total_lines_errored += file.error_lines @@ -251,7 +246,7 @@ if __name__ == '__main__': f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : " f"{files_processed}({files_errored})/{len(input_files)} files : " f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining : " - f"{queue.qsize()} files in queue") + f"{queue.qsize()} files in queue : {current_time} : {last_log_time} : {current_time - last_log_time if last_log_time is not None else 0} : {queue.empty()}") last_log_time = time.time() log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")