From 82966bf7f6cab55c0a3f9582686c97aaeb925bad Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Sat, 23 Dec 2023 17:36:08 -0800 Subject: [PATCH] Add partial matching support to multiprocess script --- scripts/combine_folder_multiprocess.py | 88 +++++++++++++++++++------- 1 file changed, 65 insertions(+), 23 deletions(-) diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index 10d4513..2297fab 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -241,21 +241,37 @@ def load_file_list(status_json): # base of each separate process. Loads a file, iterates through lines and writes out # the ones where the `field` of the object matches `value`. Also passes status # information back to the parent via a queue -def process_file(file, queue, field, value, values, split_intermediate): +def process_file(file, queue, field, values, partial, regex, split_intermediate): queue.put(file) input_handle = FileHandle(file.input_path) output_handle = FileHandle(file.output_path, is_split=split_intermediate) + + value = None + if len(values) == 1: + value = min(values) + try: for line, file_bytes_processed in input_handle.yield_lines(): try: obj = json.loads(line) matched = False observed = obj[field].lower() - if value is not None: - if observed == value: + if regex: + for reg in values: + if reg.search(observed): + matched = True + break + elif partial: + for val in values: + if val in observed: + matched = True + break + else: + if value is not None: + if observed == value: + matched = True + elif observed in values: matched = True - elif observed in values: - matched = True if matched: output_handle.write_line(line, observed) @@ -291,6 +307,14 @@ if __name__ == '__main__': "Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, " "there are a number of posts that simply don't have a subreddit attached", default=1, type=int) parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False) + parser.add_argument( + "--partial", help="The values only have to be contained in the field, not match exactly. If this is set, " + "the output files are not split by value. WARNING: This can severely slow down the script, especially if searching the " + "body.", action='store_const', const=True, default=False) + parser.add_argument( + "--regex", help="The values are treated as regular expressions. If this is set, " + "the output files are not split by value. WARNING: This can severely slow down the script, especially if searching the " + "body. If set, ignores the --partial flag", action='store_const', const=True, default=False) script_type = "split" args = parser.parse_args() @@ -305,29 +329,44 @@ if __name__ == '__main__': else: log.info(f"Writing output to working folder") - value = None - values = None + if (args.partial or args.regex) and args.split_intermediate: + log.info("The partial and regex flags are not compatible with the split_intermediate flag") + sys.exit(1) + + values = set() if args.value_list: log.info(f"Reading {args.value_list} for values to compare") with open(args.value_list, 'r') as value_list_handle: - values = set() for line in value_list_handle: - values.add(line.strip().lower()) - log.info(f"Comparing {args.field} against {len(values)} values") + values.add(line) else: - value_strings = args.value.split(",") - if len(value_strings) > 1: - values = set() - for value_inner in value_strings: - values.add(value_inner.lower()) - log.info(f"Checking field {args.field} for values {(', '.join(value_strings))}") - elif len(value_strings) == 1: - value = value_strings[0].lower() - log.info(f"Checking field {args.field} for value {value}") + values = set(args.value.split(",")) + + if args.regex: + regexes = [] + for reg in values: + regexes.append(re.compile(reg)) + values = regexes + if len(values) > 1: + log.info(f"Checking field {args.field} against {len(values)} regexes") else: - log.info(f"Invalid value specified, aborting: {args.value}") - sys.exit() + log.info(f"Checking field {args.field} against regex {values[0]}") + else: + lower_values = set() + for value_inner in values: + lower_values.add(value_inner.strip().lower()) + values = lower_values + if len(values) > 5: + val_string = f"any of {len(values)} values" + elif len(values) == 1: + val_string = f"the value {(','.join(values))}" + else: + val_string = f"any of the values {(','.join(values))}" + if args.partial: + log.info(f"Checking if any of {val_string} are contained in field {args.field}") + else: + log.info(f"Checking if any of {val_string} exactly match field {args.field}") multiprocessing.set_start_method('spawn') queue = multiprocessing.Manager().Queue() @@ -386,7 +425,7 @@ if __name__ == '__main__': log.info(f"Processing file: {file.input_path}") # start the workers with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool: - workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values, args.split_intermediate) for file in files_to_process], chunksize=1, error_callback=log.info) + workers = pool.starmap_async(process_file, [(file, queue, args.field, values, args.partial, args.regex, args.split_intermediate) for file in files_to_process], chunksize=1, error_callback=log.info) while not workers.ready() or not queue.empty(): # loop until the workers are all done, pulling in status messages as they are sent file_update = queue.get() @@ -520,7 +559,10 @@ if __name__ == '__main__': for line, file_bytes_processed in input_handle.yield_lines(): output_lines += 1 obj = json.loads(line) - observed_case = obj[args.field] + if args.partial or args.regex: + observed_case = "output" + else: + observed_case = obj[args.field] observed = observed_case.lower() if observed not in output_handles: if args.output: