Add value_list argument to take a large list of values to filter on

This commit is contained in:
Watchful1 2023-01-24 20:51:10 -08:00
parent 3415c7880e
commit 2358bf555b

View file

@ -146,7 +146,7 @@ def read_lines_zst(file_name):
# information back to the parent via a queue
def process_file(file, queue, field, value, values, case_sensitive):
output_file = None
log.debug(f"Starting file: {file.input_path} : {file.file_size:,}")
queue.put(file)
try:
for line, file_bytes_processed in read_lines_zst(file.input_path):
try:
@ -176,7 +176,6 @@ def process_file(file, queue, field, value, values, case_sensitive):
file.complete = True
file.bytes_processed = file.file_size
log.debug(f"Finished file: {file.input_path} : {file.file_size:,}")
except Exception as err:
file.error_message = str(err)
queue.put(file)
@ -190,6 +189,7 @@ if __name__ == '__main__':
parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working")
parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit")
parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift")
parser.add_argument("--value_list", help="A file of newline separated values to use. Overrides the value param if it is set", default=None)
parser.add_argument("--processes", help="Number of processes to use", default=10, type=int)
parser.add_argument("--case-sensitive", help="Matching should be case sensitive", action="store_true")
parser.add_argument("--file_filter", help="Regex filenames have to match to be processed", default="^rc_|rs_")
@ -215,20 +215,29 @@ if __name__ == '__main__':
if not args.case_sensitive:
args.value = args.value.lower()
value_strings = args.value.split(",")
value = None
values = None
if len(value_strings) > 1:
values = set()
for value_inner in value_strings:
values.add(value_inner)
log.info(f"Checking field {args.field} for values {(', '.join(value_strings))}")
elif len(value_strings) == 1:
value = value_strings[0]
log.info(f"Checking field {args.field} for value {value}")
if args.value_list:
log.info(f"Reading {args.value_list} for values to compare")
with open(args.value_list, 'r') as value_list_handle:
values = set()
for line in value_list_handle:
values.add(line.strip())
log.info(f"Comparing {args.field} against {len(values)} values")
else:
log.info(f"Invalid value specified, aborting: {args.value}")
sys.exit()
value_strings = args.value.split(",")
if len(value_strings) > 1:
values = set()
for value_inner in value_strings:
values.add(value_inner)
log.info(f"Checking field {args.field} for values {(', '.join(value_strings))}")
elif len(value_strings) == 1:
value = value_strings[0]
log.info(f"Checking field {args.field} for value {value}")
else:
log.info(f"Invalid value specified, aborting: {args.value}")
sys.exit()
multiprocessing.set_start_method('spawn')
queue = multiprocessing.Manager().Queue()
@ -292,6 +301,12 @@ if __name__ == '__main__':
file_update = queue.get()
if file_update.error_message is not None:
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
# this is the workers telling us they are starting a new file, print the debug message but nothing else
if file_update.lines_processed == 0:
log.debug(f"Starting file: {file_update.input_path} : {file_update.file_size:,}")
continue
# I'm going to assume that the list of files is short enough that it's no
# big deal to just iterate each time since that saves a bunch of work
total_lines_processed = 0
@ -312,6 +327,7 @@ if __name__ == '__main__':
i += 1
if file_update.complete or file_update.error_message is not None:
save_file_list(input_files, args.working, status_json, arg_string, script_type)
log.debug(f"Finished file: {file_update.input_path} : {file_update.file_size:,}")
current_time = time.time()
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])