Add partial matching support to multiprocess script

This commit is contained in:
Watchful1 2023-12-23 17:36:08 -08:00
parent 71c413d097
commit 82966bf7f6

View file

@ -241,21 +241,37 @@ def load_file_list(status_json):
# base of each separate process. Loads a file, iterates through lines and writes out
# the ones where the `field` of the object matches `value`. Also passes status
# information back to the parent via a queue
def process_file(file, queue, field, value, values, split_intermediate):
def process_file(file, queue, field, values, partial, regex, split_intermediate):
queue.put(file)
input_handle = FileHandle(file.input_path)
output_handle = FileHandle(file.output_path, is_split=split_intermediate)
value = None
if len(values) == 1:
value = min(values)
try:
for line, file_bytes_processed in input_handle.yield_lines():
try:
obj = json.loads(line)
matched = False
observed = obj[field].lower()
if value is not None:
if observed == value:
if regex:
for reg in values:
if reg.search(observed):
matched = True
break
elif partial:
for val in values:
if val in observed:
matched = True
break
else:
if value is not None:
if observed == value:
matched = True
elif observed in values:
matched = True
elif observed in values:
matched = True
if matched:
output_handle.write_line(line, observed)
@ -291,6 +307,14 @@ if __name__ == '__main__':
"Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, "
"there are a number of posts that simply don't have a subreddit attached", default=1, type=int)
parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False)
parser.add_argument(
"--partial", help="The values only have to be contained in the field, not match exactly. If this is set, "
"the output files are not split by value. WARNING: This can severely slow down the script, especially if searching the "
"body.", action='store_const', const=True, default=False)
parser.add_argument(
"--regex", help="The values are treated as regular expressions. If this is set, "
"the output files are not split by value. WARNING: This can severely slow down the script, especially if searching the "
"body. If set, ignores the --partial flag", action='store_const', const=True, default=False)
script_type = "split"
args = parser.parse_args()
@ -305,29 +329,44 @@ if __name__ == '__main__':
else:
log.info(f"Writing output to working folder")
value = None
values = None
if (args.partial or args.regex) and args.split_intermediate:
log.info("The partial and regex flags are not compatible with the split_intermediate flag")
sys.exit(1)
values = set()
if args.value_list:
log.info(f"Reading {args.value_list} for values to compare")
with open(args.value_list, 'r') as value_list_handle:
values = set()
for line in value_list_handle:
values.add(line.strip().lower())
log.info(f"Comparing {args.field} against {len(values)} values")
values.add(line)
else:
value_strings = args.value.split(",")
if len(value_strings) > 1:
values = set()
for value_inner in value_strings:
values.add(value_inner.lower())
log.info(f"Checking field {args.field} for values {(', '.join(value_strings))}")
elif len(value_strings) == 1:
value = value_strings[0].lower()
log.info(f"Checking field {args.field} for value {value}")
values = set(args.value.split(","))
if args.regex:
regexes = []
for reg in values:
regexes.append(re.compile(reg))
values = regexes
if len(values) > 1:
log.info(f"Checking field {args.field} against {len(values)} regexes")
else:
log.info(f"Invalid value specified, aborting: {args.value}")
sys.exit()
log.info(f"Checking field {args.field} against regex {values[0]}")
else:
lower_values = set()
for value_inner in values:
lower_values.add(value_inner.strip().lower())
values = lower_values
if len(values) > 5:
val_string = f"any of {len(values)} values"
elif len(values) == 1:
val_string = f"the value {(','.join(values))}"
else:
val_string = f"any of the values {(','.join(values))}"
if args.partial:
log.info(f"Checking if any of {val_string} are contained in field {args.field}")
else:
log.info(f"Checking if any of {val_string} exactly match field {args.field}")
multiprocessing.set_start_method('spawn')
queue = multiprocessing.Manager().Queue()
@ -386,7 +425,7 @@ if __name__ == '__main__':
log.info(f"Processing file: {file.input_path}")
# start the workers
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values, args.split_intermediate) for file in files_to_process], chunksize=1, error_callback=log.info)
workers = pool.starmap_async(process_file, [(file, queue, args.field, values, args.partial, args.regex, args.split_intermediate) for file in files_to_process], chunksize=1, error_callback=log.info)
while not workers.ready() or not queue.empty():
# loop until the workers are all done, pulling in status messages as they are sent
file_update = queue.get()
@ -520,7 +559,10 @@ if __name__ == '__main__':
for line, file_bytes_processed in input_handle.yield_lines():
output_lines += 1
obj = json.loads(line)
observed_case = obj[args.field]
if args.partial or args.regex:
observed_case = "output"
else:
observed_case = obj[args.field]
observed = observed_case.lower()
if observed not in output_handles:
if args.output: