From 1f7a3137f43fd0455579f2008471a39e5f065f70 Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Mon, 6 Mar 2023 20:37:15 -0800 Subject: [PATCH] Update multiprocess to handle large numbers of output files --- scripts/combine_folder_multiprocess.py | 305 +++++++++++++++---------- 1 file changed, 189 insertions(+), 116 deletions(-) diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index 01d66ba..b6a2d17 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -24,9 +24,10 @@ import sys import time import argparse import re -from datetime import datetime +from collections import defaultdict import logging.handlers import multiprocessing +from enum import Enum # sets up logging to the console as well as a file @@ -34,9 +35,9 @@ log = logging.getLogger("bot") log.setLevel(logging.INFO) log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s') -log_stderr_handler = logging.StreamHandler() -log_stderr_handler.setFormatter(log_formatter) -log.addHandler(log_stderr_handler) +log_str_handler = logging.StreamHandler() +log_str_handler.setFormatter(log_formatter) +log.addHandler(log_str_handler) if not os.path.exists("logs"): os.makedirs("logs") log_file_handler = logging.handlers.RotatingFileHandler( @@ -45,6 +46,19 @@ log_file_handler.setFormatter(log_formatter) log.addHandler(log_file_handler) +class FileType(Enum): + COMMENT = 1 + SUBMISSION = 2 + + @staticmethod + def to_str(file_type): + if file_type == FileType.COMMENT: + return "comments" + elif file_type == FileType.SUBMISSION: + return "submissions" + return "other" + + # convenience object used to pass status information between processes class FileConfig: def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0, lines_matched=0): @@ -57,6 +71,13 @@ class FileConfig: self.error_message = None self.error_lines = error_lines self.lines_matched = lines_matched + file_name = os.path.split(input_path)[1] + if file_name.startswith("RS"): + self.file_type = FileType.SUBMISSION + elif file_name.startswith("RC"): + self.file_type = FileType.COMMENT + else: + raise ValueError(f"Unknown working file type: {file_name}") def __str__(self): return f"{self.input_path} : {self.output_path} : {self.file_size} : {self.complete} : {self.bytes_processed} : {self.lines_processed}" @@ -64,18 +85,29 @@ class FileConfig: # another convenience object to read and write from both zst files and ndjson files class FileHandle: - def __init__(self, path): - self.path = path - if self.path.endswith(".zst"): - self.is_compressed = True - elif self.path.endswith(".ndjson"): - self.is_compressed = False - else: - raise TypeError(f"File type not supported for writing {self.path}") + newline_encoded = "\n".encode('utf-8') + ext_len = len(".zst") - self.write_handle = None - self.other_handle = None - self.newline_encoded = "\n".encode('utf-8') + def __init__(self, path, is_split=False): + self.path = path + self.is_split = is_split + self.handles = {} + + def get_paths(self, character_filter=None): + if self.is_split: + paths = [] + for file in os.listdir(self.path): + if not file.endswith(".zst"): + continue + if character_filter is not None and character_filter != file[-FileHandle.ext_len - 1:-FileHandle.ext_len]: + continue + paths.append(os.path.join(self.path, file)) + return paths + else: + return [self.path] + + def get_count_files(self): + return len(self.get_paths()) # recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes @staticmethod @@ -93,9 +125,16 @@ class FileHandle: # open a zst compressed ndjson file, or a regular uncompressed ndjson file and yield lines one at a time # also passes back file progress - def yield_lines(self): - if self.is_compressed: - with open(self.path, 'rb') as file_handle: + def yield_lines(self, character_filter=None): + if self.is_split: + if character_filter is not None: + path = os.path.join(self.path, f"{character_filter}.zst") + else: + raise ValueError(f"{self.path} is split but no filter passed") + else: + path = self.path + if os.path.exists(path): + with open(path, 'rb') as file_handle: buffer = '' reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) while True: @@ -110,34 +149,38 @@ class FileHandle: buffer = lines[-1] reader.close() - else: - with open(self.path, 'r') as file_handle: - line = file_handle.readline() - while line: - yield line.rstrip('\n'), file_handle.tell() - line = file_handle.readline() + # get either the main write handle or the character filter one, opening a new handle as needed + def get_write_handle(self, character_filter=None): + if character_filter is None: + character_filter = 1 # use 1 as the default name since ints hash quickly + handle = self.handles.get(character_filter) + if handle is None: + if character_filter == 1: + path = self.path + else: + if not os.path.exists(self.path): + os.makedirs(self.path) + path = os.path.join(self.path, f"{character_filter}.zst") + handle = zstandard.ZstdCompressor().stream_writer(open(path, 'wb')) + self.handles[character_filter] = handle + return handle # write a line, opening the appropriate handle - def write_line(self, line): - if self.write_handle is None: - if self.is_compressed: - self.other_handle = open(self.path, 'wb') - self.write_handle = zstandard.ZstdCompressor().stream_writer(self.other_handle) - else: - self.write_handle = open(self.path, 'w', encoding="utf-8") - - if self.is_compressed: - self.write_handle.write(line.encode('utf-8')) - self.write_handle.write(self.newline_encoded) + def write_line(self, line, value=None): + if self.is_split: + if value is None: + raise ValueError(f"{self.path} is split but no value passed") + character_filter = value[:1] + handle = self.get_write_handle(character_filter) else: - self.write_handle.write(line) - self.write_handle.write("\n") + handle = self.get_write_handle() + + handle.write(line.encode('utf-8')) + handle.write(FileHandle.newline_encoded) def close(self): - if self.write_handle: - self.write_handle.close() - if self.other_handle: - self.other_handle.close() + for handle in self.handles.values(): + handle.close() # used for calculating running average of read speed @@ -157,16 +200,21 @@ class Queue: # save file information and progress to a json file # we don't want to save the whole FileConfig object, since some info resets if we restart -def save_file_list(input_files, working_folder, status_json, arg_string, script_type): +def save_file_list(input_files, working_folder, status_json, arg_string, script_type, completed_prefixes=None): if not os.path.exists(working_folder): os.makedirs(working_folder) simple_file_list = [] for file in input_files: simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines, file.lines_matched]) + if completed_prefixes is None: + completed_prefixes = [] + else: + completed_prefixes = sorted([prefix for prefix in completed_prefixes]) with open(status_json, 'w') as status_json_file: output_dict = { "args": arg_string, "type": script_type, + "completed_prefixes": completed_prefixes, "files": simple_file_list, } status_json_file.write(json.dumps(output_dict, indent=4)) @@ -182,18 +230,21 @@ def load_file_list(status_json): input_files.append( FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4], simple_file[5]) ) - return input_files, output_dict["args"], output_dict["type"] + completed_prefixes = set() + for prefix in output_dict["completed_prefixes"]: + completed_prefixes.add(prefix) + return input_files, output_dict["args"], output_dict["type"], completed_prefixes else: - return None, None, None + return None, None, None, set() # base of each separate process. Loads a file, iterates through lines and writes out # the ones where the `field` of the object matches `value`. Also passes status # information back to the parent via a queue -def process_file(file, queue, field, value, values): +def process_file(file, queue, field, value, values, split_intermediate): queue.put(file) input_handle = FileHandle(file.input_path) - output_handle = FileHandle(file.output_path) + output_handle = FileHandle(file.output_path, is_split=split_intermediate) try: for line, file_bytes_processed in input_handle.yield_lines(): try: @@ -207,7 +258,7 @@ def process_file(file, queue, field, value, values): matched = True if matched: - output_handle.write_line(line) + output_handle.write_line(line, observed) file.lines_matched += 1 except (KeyError, json.JSONDecodeError) as err: file.error_lines += 1 @@ -227,15 +278,14 @@ def process_file(file, queue, field, value, values): if __name__ == '__main__': parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files") parser.add_argument("input", help="The input folder to recursively read files from") - parser.add_argument("--split", help="Split the output into separate files by the filter fields, only applies if there's multiple fields", action='store_const', const=True, default=True) parser.add_argument("--output", help="Put the output files in this folder", default="") parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working") parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit") parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift") parser.add_argument("--value_list", help="A file of newline separated values to use. Overrides the value param if it is set", default=None) parser.add_argument("--processes", help="Number of processes to use", default=10, type=int) - parser.add_argument("--file_filter", help="Regex filenames have to match to be processed", default="^rc_|rs_") - parser.add_argument("--compress_intermediate", help="Compress the intermediate files, use if the filter will result in a very large amount of data", action="store_true") + parser.add_argument("--file_filter", help="Regex filenames have to match to be processed", default="^RC_|RS_") + parser.add_argument("--split_intermediate", help="Split the intermediate files by the first letter of the matched field, use if the filter will result in a large number of separate files", action="store_true") parser.add_argument( "--error_rate", help= "Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, " @@ -282,7 +332,7 @@ if __name__ == '__main__': multiprocessing.set_start_method('spawn') queue = multiprocessing.Manager().Queue() status_json = os.path.join(args.working, "status.json") - input_files, saved_arg_string, saved_type = load_file_list(status_json) + input_files, saved_arg_string, saved_type, completed_prefixes = load_file_list(status_json) if saved_arg_string and saved_arg_string != arg_string: log.warning(f"Args don't match args from json file. Delete working folder") sys.exit(0) @@ -297,20 +347,20 @@ if __name__ == '__main__': for subdir, dirs, files in os.walk(args.input): files.sort() for file_name in files: - if file_name.endswith(".zst") and re.search(args.file_filter, file_name, re.IGNORECASE) is not None: + if file_name.endswith(".zst") and re.search(args.file_filter, file_name) is not None: input_path = os.path.join(subdir, file_name) - output_path = os.path.join(args.working, f"{file_name[:-4]}.{('zst' if args.compress_intermediate else 'ndjson')}") + if args.split_intermediate: + output_extension = "" + else: + output_extension = ".zst" + output_path = os.path.join(args.working, f"{file_name[:-4]}{output_extension}") input_files.append(FileConfig(input_path, output_path=output_path)) save_file_list(input_files, args.working, status_json, arg_string, script_type) else: log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again") - files_processed = 0 - total_bytes = 0 - total_bytes_processed = 0 - total_lines_processed = 0 - total_lines_errored = 0 + files_processed, total_bytes, total_bytes_processed, total_lines_processed, total_lines_matched, total_lines_errored = 0, 0, 0, 0, 0, 0 files_to_process = [] # calculate the total file size for progress reports, build a list of incomplete files to process # do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end @@ -319,6 +369,7 @@ if __name__ == '__main__': if file.complete: files_processed += 1 total_lines_processed += file.lines_processed + total_lines_matched += file.lines_matched total_bytes_processed += file.file_size total_lines_errored += file.error_lines else: @@ -335,7 +386,7 @@ if __name__ == '__main__': log.info(f"Processing file: {file.input_path}") # start the workers with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool: - workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values) for file in files_to_process], chunksize=1, error_callback=log.info) + workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values, args.split_intermediate) for file in files_to_process], chunksize=1, error_callback=log.info) while not workers.ready() or not queue.empty(): # loop until the workers are all done, pulling in status messages as they are sent file_update = queue.get() @@ -349,13 +400,7 @@ if __name__ == '__main__': # I'm going to assume that the list of files is short enough that it's no # big deal to just iterate each time since that saves a bunch of work - total_lines_processed = 0 - total_lines_matched = 0 - total_bytes_processed = 0 - total_lines_errored = 0 - files_processed = 0 - files_errored = 0 - i = 0 + total_lines_processed, total_lines_matched, total_bytes_processed, total_lines_errored, files_processed, files_errored, i = 0, 0, 0, 0, 0, 0, 0 for file in input_files: if file.input_path == file_update.input_path: input_files[i] = file_update @@ -389,8 +434,10 @@ if __name__ == '__main__': log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}") - working_file_paths = [] + type_handles = defaultdict(list) + prefixes = set() count_incomplete = 0 + count_intermediate_files = 0 # build a list of output files to combine for file in sorted(input_files, key=lambda item: os.path.split(item.output_path)[1]): if not file.complete: @@ -405,68 +452,94 @@ if __name__ == '__main__': f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}, " f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%") count_incomplete += 1 - elif file.output_path is not None: - if os.path.exists(file.output_path): - working_file_paths.append(file.output_path) + elif file.output_path is not None and os.path.exists(file.output_path): + input_handle = FileHandle(file.output_path, is_split=args.split_intermediate) + for path in input_handle.get_paths(): + prefixes.add(path[-FileHandle.ext_len - 1:-FileHandle.ext_len]) + count_intermediate_files += 1 + type_handles[file.file_type].append(input_handle) if count_incomplete > 0: log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting") sys.exit() - log.info(f"Processing complete, combining {len(working_file_paths)} result files") + log.info(f"Processing complete, combining {count_intermediate_files} result files") + + for completed_prefix in completed_prefixes: + if completed_prefix in prefixes: + prefixes.remove(completed_prefix) output_lines = 0 - all_handles = [] output_handles = {} files_combined = 0 - if args.split and values: + if values: split = True else: split = False - for working_file_path in working_file_paths: - files_combined += 1 - log.info(f"From {files_combined}/{len(working_file_paths)} files to {len(all_handles):,} output handles : {output_lines:,} lines : {os.path.split(working_file_path)[1]}") - working_file_name = os.path.split(working_file_path)[1] - if working_file_name.startswith("RS"): - file_type = "submissions" - elif working_file_name.startswith("RC"): - file_type = "comments" - else: - log.warning(f"Unknown working file type, skipping: {working_file_name}") - continue - input_handle = FileHandle(working_file_path) - if file_type not in output_handles: - output_handles[file_type] = {} - file_type_handles = output_handles[file_type] + if args.split_intermediate: + for prefix in sorted(prefixes): + log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines") + for file_type, input_handles in type_handles.items(): + for input_handle in input_handles: + has_lines = False + for line, file_bytes_processed in input_handle.yield_lines(character_filter=prefix): + if not has_lines: + has_lines = True + files_combined += 1 + output_lines += 1 + obj = json.loads(line) + observed_case = obj[args.field] + observed = observed_case.lower() + if observed not in output_handles: + if args.output: + if not os.path.exists(args.output): + os.makedirs(args.output) + output_file_path = os.path.join(args.output, f"{observed_case}_{FileType.to_str(file_type)}.zst") + else: + output_file_path = f"{observed_case}_{FileType.to_str(file_type)}.zst" + log.debug(f"Writing to file {output_file_path}") + output_handle = FileHandle(output_file_path) + output_handles[observed] = output_handle + else: + output_handle = output_handles[observed] - for line, file_bytes_processed in input_handle.yield_lines(): - output_lines += 1 - if split: - obj = json.loads(line) - observed_case = obj[args.field] - else: - observed_case = value - observed = observed_case.lower() - if observed not in file_type_handles: - if args.output: - if not os.path.exists(args.output): - os.makedirs(args.output) - output_file_path = os.path.join(args.output, f"{observed_case}_{file_type}.zst") - else: - output_file_path = f"{observed_case}_{file_type}.zst" - log.debug(f"Writing to file {output_file_path}") - output_handle = FileHandle(output_file_path) - file_type_handles[observed] = output_handle - all_handles.append(output_handle) - else: - output_handle = file_type_handles[observed] + output_handle.write_line(line) + if output_lines % 1000000 == 0: + log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines : {input_handle.path} / {prefix}") + for handle in output_handles.values(): + handle.close() + output_handles = {} + completed_prefixes.add(prefix) + save_file_list(input_files, args.working, status_json, arg_string, script_type, completed_prefixes) - output_handle.write_line(line) - if output_lines % 100000 == 0: - log.info(f"From {files_combined}/{len(working_file_paths)} files to {len(all_handles):,} output handles : {output_lines:,} lines : {os.path.split(working_file_path)[1]}") + else: + log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines") + for file_type, input_handles in type_handles.items(): + for input_handle in input_handles: + files_combined += 1 + for line, file_bytes_processed in input_handle.yield_lines(): + output_lines += 1 + obj = json.loads(line) + observed_case = obj[args.field] + observed = observed_case.lower() + if observed not in output_handles: + if args.output: + if not os.path.exists(args.output): + os.makedirs(args.output) + output_file_path = os.path.join(args.output, f"{observed_case}_{FileType.to_str(file_type)}.zst") + else: + output_file_path = f"{observed_case}_{FileType.to_str(file_type)}.zst" + log.debug(f"Writing to file {output_file_path}") + output_handle = FileHandle(output_file_path) + output_handles[observed] = output_handle + else: + output_handle = output_handles[observed] - log.info(f"From {files_combined}/{len(working_file_paths)} files to {len(all_handles):,} output handles : {output_lines:,} lines") - for handle in all_handles: - handle.close() + output_handle.write_line(line) + if output_lines % 1000000 == 0: + log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines : {input_handle.path}") + for handle in output_handles.values(): + handle.close() + output_handles = {} - log.info(f"Finished combining files, {output_lines:,} lines written") + log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines")