diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index e68bfd1..e4e49c2 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -146,7 +146,7 @@ def read_lines_zst(file_name): # information back to the parent via a queue def process_file(file, queue, field, value, values, case_sensitive): output_file = None - log.debug(f"Starting file: {file.input_path} : {file.file_size:,}") + queue.put(file) try: for line, file_bytes_processed in read_lines_zst(file.input_path): try: @@ -176,7 +176,6 @@ def process_file(file, queue, field, value, values, case_sensitive): file.complete = True file.bytes_processed = file.file_size - log.debug(f"Finished file: {file.input_path} : {file.file_size:,}") except Exception as err: file.error_message = str(err) queue.put(file) @@ -190,6 +189,7 @@ if __name__ == '__main__': parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working") parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit") parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift") + parser.add_argument("--value_list", help="A file of newline separated values to use. Overrides the value param if it is set", default=None) parser.add_argument("--processes", help="Number of processes to use", default=10, type=int) parser.add_argument("--case-sensitive", help="Matching should be case sensitive", action="store_true") parser.add_argument("--file_filter", help="Regex filenames have to match to be processed", default="^rc_|rs_") @@ -215,20 +215,29 @@ if __name__ == '__main__': if not args.case_sensitive: args.value = args.value.lower() - value_strings = args.value.split(",") value = None values = None - if len(value_strings) > 1: - values = set() - for value_inner in value_strings: - values.add(value_inner) - log.info(f"Checking field {args.field} for values {(', '.join(value_strings))}") - elif len(value_strings) == 1: - value = value_strings[0] - log.info(f"Checking field {args.field} for value {value}") + if args.value_list: + log.info(f"Reading {args.value_list} for values to compare") + with open(args.value_list, 'r') as value_list_handle: + values = set() + for line in value_list_handle: + values.add(line.strip()) + log.info(f"Comparing {args.field} against {len(values)} values") + else: - log.info(f"Invalid value specified, aborting: {args.value}") - sys.exit() + value_strings = args.value.split(",") + if len(value_strings) > 1: + values = set() + for value_inner in value_strings: + values.add(value_inner) + log.info(f"Checking field {args.field} for values {(', '.join(value_strings))}") + elif len(value_strings) == 1: + value = value_strings[0] + log.info(f"Checking field {args.field} for value {value}") + else: + log.info(f"Invalid value specified, aborting: {args.value}") + sys.exit() multiprocessing.set_start_method('spawn') queue = multiprocessing.Manager().Queue() @@ -292,6 +301,12 @@ if __name__ == '__main__': file_update = queue.get() if file_update.error_message is not None: log.warning(f"File failed {file_update.input_path}: {file_update.error_message}") + + # this is the workers telling us they are starting a new file, print the debug message but nothing else + if file_update.lines_processed == 0: + log.debug(f"Starting file: {file_update.input_path} : {file_update.file_size:,}") + continue + # I'm going to assume that the list of files is short enough that it's no # big deal to just iterate each time since that saves a bunch of work total_lines_processed = 0 @@ -312,6 +327,7 @@ if __name__ == '__main__': i += 1 if file_update.complete or file_update.error_message is not None: save_file_list(input_files, args.working, status_json, arg_string, script_type) + log.debug(f"Finished file: {file_update.input_path} : {file_update.file_size:,}") current_time = time.time() progress_queue.put([current_time, total_lines_processed, total_bytes_processed])