From 894961c3ee47af3a01a8c8d19e0bfb68c6ffe07d Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Tue, 17 Jan 2023 22:37:25 -0800 Subject: [PATCH] Save the arguments in the status json so we don't accidentally reuse the same files for a different run --- scripts/combine_folder_multiprocess.py | 34 ++++++++++++++++++-------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index 5424cba..c2f2fef 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -77,29 +77,33 @@ class Queue: # save file information and progress to a json file # we don't want to save the whole FileConfig object, since some info resets if we restart -def save_file_list(input_files, working_folder, status_json): +def save_file_list(input_files, working_folder, status_json, arg_string): if not os.path.exists(working_folder): os.makedirs(working_folder) simple_file_list = [] for file in input_files: simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines]) with open(status_json, 'w') as status_json_file: - status_json_file.write(json.dumps(simple_file_list, indent=4)) + output_dict = { + "args": arg_string, + "files": simple_file_list + } + status_json_file.write(json.dumps(output_dict, indent=4)) # load file information from the json file and recalculate file sizes def load_file_list(status_json): if os.path.exists(status_json): with open(status_json, 'r') as status_json_file: - simple_file_list = json.load(status_json_file) + output_dict = json.load(status_json_file) input_files = [] - for simple_file in simple_file_list: + for simple_file in output_dict["files"]: input_files.append( FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4]) ) - return input_files + return input_files, output_dict["args"] else: - return None + return None, None # recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes @@ -140,6 +144,7 @@ def read_lines_zst(file_name): # information back to the parent via a queue def process_file(file, queue, field, value, values, case_sensitive): output_file = None + log.debug(f"Starting file: {file.input_path}") try: for line, file_bytes_processed in read_lines_zst(file.input_path): try: @@ -169,6 +174,7 @@ def process_file(file, queue, field, value, values, case_sensitive): file.complete = True file.bytes_processed = file.file_size + log.debug(f"Finished file: {file.input_path}") except Exception as err: file.error_message = str(err) queue.put(file) @@ -191,12 +197,16 @@ if __name__ == '__main__': parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False) args = parser.parse_args() + arg_string = f"{args.field}:{args.value}:{args.case_sensitive}" if args.debug: log.setLevel(logging.DEBUG) log.info(f"Loading files from: {args.input}") - log.info(f"Writing output to: {args.output}.zst") + if args.output: + log.info(f"Writing output to: {args.output}") + else: + log.info(f"Writing output to working folder") if not args.case_sensitive: args.value = args.value.lower() @@ -219,7 +229,11 @@ if __name__ == '__main__': multiprocessing.set_start_method('spawn') queue = multiprocessing.Manager().Queue() status_json = os.path.join(args.working, "status.json") - input_files = load_file_list(status_json) + input_files, saved_arg_string = load_file_list(status_json) + if saved_arg_string and saved_arg_string != arg_string: + log.warning(f"Args don't match args from json file. Delete working folder") + sys.exit(0) + # if the file list wasn't loaded from the json, this is the first run, find what files we need to process if input_files is None: input_files = [] @@ -231,7 +245,7 @@ if __name__ == '__main__': output_path = os.path.join(args.working, file_name[:-4]) input_files.append(FileConfig(input_path, output_path=output_path)) - save_file_list(input_files, args.working, status_json) + save_file_list(input_files, args.working, status_json, arg_string) else: log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again") @@ -289,7 +303,7 @@ if __name__ == '__main__': files_errored += 1 if file.error_message is not None else 0 i += 1 if file_update.complete or file_update.error_message is not None: - save_file_list(input_files, args.working, status_json) + save_file_list(input_files, args.working, status_json, arg_string) current_time = time.time() progress_queue.put([current_time, total_lines_processed, total_bytes_processed])