Update multiprocess to handle large numbers of output files

This commit is contained in:
Watchful1 2023-03-06 20:37:15 -08:00
parent 8dcc65abf7
commit 1f7a3137f4

View file

@ -24,9 +24,10 @@ import sys
import time import time
import argparse import argparse
import re import re
from datetime import datetime from collections import defaultdict
import logging.handlers import logging.handlers
import multiprocessing import multiprocessing
from enum import Enum
# sets up logging to the console as well as a file # sets up logging to the console as well as a file
@ -34,9 +35,9 @@ log = logging.getLogger("bot")
log.setLevel(logging.INFO) log.setLevel(logging.INFO)
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s') log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
log_stderr_handler = logging.StreamHandler() log_str_handler = logging.StreamHandler()
log_stderr_handler.setFormatter(log_formatter) log_str_handler.setFormatter(log_formatter)
log.addHandler(log_stderr_handler) log.addHandler(log_str_handler)
if not os.path.exists("logs"): if not os.path.exists("logs"):
os.makedirs("logs") os.makedirs("logs")
log_file_handler = logging.handlers.RotatingFileHandler( log_file_handler = logging.handlers.RotatingFileHandler(
@ -45,6 +46,19 @@ log_file_handler.setFormatter(log_formatter)
log.addHandler(log_file_handler) log.addHandler(log_file_handler)
class FileType(Enum):
COMMENT = 1
SUBMISSION = 2
@staticmethod
def to_str(file_type):
if file_type == FileType.COMMENT:
return "comments"
elif file_type == FileType.SUBMISSION:
return "submissions"
return "other"
# convenience object used to pass status information between processes # convenience object used to pass status information between processes
class FileConfig: class FileConfig:
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0, lines_matched=0): def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0, lines_matched=0):
@ -57,6 +71,13 @@ class FileConfig:
self.error_message = None self.error_message = None
self.error_lines = error_lines self.error_lines = error_lines
self.lines_matched = lines_matched self.lines_matched = lines_matched
file_name = os.path.split(input_path)[1]
if file_name.startswith("RS"):
self.file_type = FileType.SUBMISSION
elif file_name.startswith("RC"):
self.file_type = FileType.COMMENT
else:
raise ValueError(f"Unknown working file type: {file_name}")
def __str__(self): def __str__(self):
return f"{self.input_path} : {self.output_path} : {self.file_size} : {self.complete} : {self.bytes_processed} : {self.lines_processed}" return f"{self.input_path} : {self.output_path} : {self.file_size} : {self.complete} : {self.bytes_processed} : {self.lines_processed}"
@ -64,18 +85,29 @@ class FileConfig:
# another convenience object to read and write from both zst files and ndjson files # another convenience object to read and write from both zst files and ndjson files
class FileHandle: class FileHandle:
def __init__(self, path): newline_encoded = "\n".encode('utf-8')
self.path = path ext_len = len(".zst")
if self.path.endswith(".zst"):
self.is_compressed = True
elif self.path.endswith(".ndjson"):
self.is_compressed = False
else:
raise TypeError(f"File type not supported for writing {self.path}")
self.write_handle = None def __init__(self, path, is_split=False):
self.other_handle = None self.path = path
self.newline_encoded = "\n".encode('utf-8') self.is_split = is_split
self.handles = {}
def get_paths(self, character_filter=None):
if self.is_split:
paths = []
for file in os.listdir(self.path):
if not file.endswith(".zst"):
continue
if character_filter is not None and character_filter != file[-FileHandle.ext_len - 1:-FileHandle.ext_len]:
continue
paths.append(os.path.join(self.path, file))
return paths
else:
return [self.path]
def get_count_files(self):
return len(self.get_paths())
# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes # recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes
@staticmethod @staticmethod
@ -93,9 +125,16 @@ class FileHandle:
# open a zst compressed ndjson file, or a regular uncompressed ndjson file and yield lines one at a time # open a zst compressed ndjson file, or a regular uncompressed ndjson file and yield lines one at a time
# also passes back file progress # also passes back file progress
def yield_lines(self): def yield_lines(self, character_filter=None):
if self.is_compressed: if self.is_split:
with open(self.path, 'rb') as file_handle: if character_filter is not None:
path = os.path.join(self.path, f"{character_filter}.zst")
else:
raise ValueError(f"{self.path} is split but no filter passed")
else:
path = self.path
if os.path.exists(path):
with open(path, 'rb') as file_handle:
buffer = '' buffer = ''
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
while True: while True:
@ -110,34 +149,38 @@ class FileHandle:
buffer = lines[-1] buffer = lines[-1]
reader.close() reader.close()
# get either the main write handle or the character filter one, opening a new handle as needed
def get_write_handle(self, character_filter=None):
if character_filter is None:
character_filter = 1 # use 1 as the default name since ints hash quickly
handle = self.handles.get(character_filter)
if handle is None:
if character_filter == 1:
path = self.path
else: else:
with open(self.path, 'r') as file_handle: if not os.path.exists(self.path):
line = file_handle.readline() os.makedirs(self.path)
while line: path = os.path.join(self.path, f"{character_filter}.zst")
yield line.rstrip('\n'), file_handle.tell() handle = zstandard.ZstdCompressor().stream_writer(open(path, 'wb'))
line = file_handle.readline() self.handles[character_filter] = handle
return handle
# write a line, opening the appropriate handle # write a line, opening the appropriate handle
def write_line(self, line): def write_line(self, line, value=None):
if self.write_handle is None: if self.is_split:
if self.is_compressed: if value is None:
self.other_handle = open(self.path, 'wb') raise ValueError(f"{self.path} is split but no value passed")
self.write_handle = zstandard.ZstdCompressor().stream_writer(self.other_handle) character_filter = value[:1]
handle = self.get_write_handle(character_filter)
else: else:
self.write_handle = open(self.path, 'w', encoding="utf-8") handle = self.get_write_handle()
if self.is_compressed: handle.write(line.encode('utf-8'))
self.write_handle.write(line.encode('utf-8')) handle.write(FileHandle.newline_encoded)
self.write_handle.write(self.newline_encoded)
else:
self.write_handle.write(line)
self.write_handle.write("\n")
def close(self): def close(self):
if self.write_handle: for handle in self.handles.values():
self.write_handle.close() handle.close()
if self.other_handle:
self.other_handle.close()
# used for calculating running average of read speed # used for calculating running average of read speed
@ -157,16 +200,21 @@ class Queue:
# save file information and progress to a json file # save file information and progress to a json file
# we don't want to save the whole FileConfig object, since some info resets if we restart # we don't want to save the whole FileConfig object, since some info resets if we restart
def save_file_list(input_files, working_folder, status_json, arg_string, script_type): def save_file_list(input_files, working_folder, status_json, arg_string, script_type, completed_prefixes=None):
if not os.path.exists(working_folder): if not os.path.exists(working_folder):
os.makedirs(working_folder) os.makedirs(working_folder)
simple_file_list = [] simple_file_list = []
for file in input_files: for file in input_files:
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines, file.lines_matched]) simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines, file.lines_matched])
if completed_prefixes is None:
completed_prefixes = []
else:
completed_prefixes = sorted([prefix for prefix in completed_prefixes])
with open(status_json, 'w') as status_json_file: with open(status_json, 'w') as status_json_file:
output_dict = { output_dict = {
"args": arg_string, "args": arg_string,
"type": script_type, "type": script_type,
"completed_prefixes": completed_prefixes,
"files": simple_file_list, "files": simple_file_list,
} }
status_json_file.write(json.dumps(output_dict, indent=4)) status_json_file.write(json.dumps(output_dict, indent=4))
@ -182,18 +230,21 @@ def load_file_list(status_json):
input_files.append( input_files.append(
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4], simple_file[5]) FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4], simple_file[5])
) )
return input_files, output_dict["args"], output_dict["type"] completed_prefixes = set()
for prefix in output_dict["completed_prefixes"]:
completed_prefixes.add(prefix)
return input_files, output_dict["args"], output_dict["type"], completed_prefixes
else: else:
return None, None, None return None, None, None, set()
# base of each separate process. Loads a file, iterates through lines and writes out # base of each separate process. Loads a file, iterates through lines and writes out
# the ones where the `field` of the object matches `value`. Also passes status # the ones where the `field` of the object matches `value`. Also passes status
# information back to the parent via a queue # information back to the parent via a queue
def process_file(file, queue, field, value, values): def process_file(file, queue, field, value, values, split_intermediate):
queue.put(file) queue.put(file)
input_handle = FileHandle(file.input_path) input_handle = FileHandle(file.input_path)
output_handle = FileHandle(file.output_path) output_handle = FileHandle(file.output_path, is_split=split_intermediate)
try: try:
for line, file_bytes_processed in input_handle.yield_lines(): for line, file_bytes_processed in input_handle.yield_lines():
try: try:
@ -207,7 +258,7 @@ def process_file(file, queue, field, value, values):
matched = True matched = True
if matched: if matched:
output_handle.write_line(line) output_handle.write_line(line, observed)
file.lines_matched += 1 file.lines_matched += 1
except (KeyError, json.JSONDecodeError) as err: except (KeyError, json.JSONDecodeError) as err:
file.error_lines += 1 file.error_lines += 1
@ -227,15 +278,14 @@ def process_file(file, queue, field, value, values):
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files") parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files")
parser.add_argument("input", help="The input folder to recursively read files from") parser.add_argument("input", help="The input folder to recursively read files from")
parser.add_argument("--split", help="Split the output into separate files by the filter fields, only applies if there's multiple fields", action='store_const', const=True, default=True)
parser.add_argument("--output", help="Put the output files in this folder", default="") parser.add_argument("--output", help="Put the output files in this folder", default="")
parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working") parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working")
parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit") parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit")
parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift") parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift")
parser.add_argument("--value_list", help="A file of newline separated values to use. Overrides the value param if it is set", default=None) parser.add_argument("--value_list", help="A file of newline separated values to use. Overrides the value param if it is set", default=None)
parser.add_argument("--processes", help="Number of processes to use", default=10, type=int) parser.add_argument("--processes", help="Number of processes to use", default=10, type=int)
parser.add_argument("--file_filter", help="Regex filenames have to match to be processed", default="^rc_|rs_") parser.add_argument("--file_filter", help="Regex filenames have to match to be processed", default="^RC_|RS_")
parser.add_argument("--compress_intermediate", help="Compress the intermediate files, use if the filter will result in a very large amount of data", action="store_true") parser.add_argument("--split_intermediate", help="Split the intermediate files by the first letter of the matched field, use if the filter will result in a large number of separate files", action="store_true")
parser.add_argument( parser.add_argument(
"--error_rate", help= "--error_rate", help=
"Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, " "Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, "
@ -282,7 +332,7 @@ if __name__ == '__main__':
multiprocessing.set_start_method('spawn') multiprocessing.set_start_method('spawn')
queue = multiprocessing.Manager().Queue() queue = multiprocessing.Manager().Queue()
status_json = os.path.join(args.working, "status.json") status_json = os.path.join(args.working, "status.json")
input_files, saved_arg_string, saved_type = load_file_list(status_json) input_files, saved_arg_string, saved_type, completed_prefixes = load_file_list(status_json)
if saved_arg_string and saved_arg_string != arg_string: if saved_arg_string and saved_arg_string != arg_string:
log.warning(f"Args don't match args from json file. Delete working folder") log.warning(f"Args don't match args from json file. Delete working folder")
sys.exit(0) sys.exit(0)
@ -297,20 +347,20 @@ if __name__ == '__main__':
for subdir, dirs, files in os.walk(args.input): for subdir, dirs, files in os.walk(args.input):
files.sort() files.sort()
for file_name in files: for file_name in files:
if file_name.endswith(".zst") and re.search(args.file_filter, file_name, re.IGNORECASE) is not None: if file_name.endswith(".zst") and re.search(args.file_filter, file_name) is not None:
input_path = os.path.join(subdir, file_name) input_path = os.path.join(subdir, file_name)
output_path = os.path.join(args.working, f"{file_name[:-4]}.{('zst' if args.compress_intermediate else 'ndjson')}") if args.split_intermediate:
output_extension = ""
else:
output_extension = ".zst"
output_path = os.path.join(args.working, f"{file_name[:-4]}{output_extension}")
input_files.append(FileConfig(input_path, output_path=output_path)) input_files.append(FileConfig(input_path, output_path=output_path))
save_file_list(input_files, args.working, status_json, arg_string, script_type) save_file_list(input_files, args.working, status_json, arg_string, script_type)
else: else:
log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again") log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again")
files_processed = 0 files_processed, total_bytes, total_bytes_processed, total_lines_processed, total_lines_matched, total_lines_errored = 0, 0, 0, 0, 0, 0
total_bytes = 0
total_bytes_processed = 0
total_lines_processed = 0
total_lines_errored = 0
files_to_process = [] files_to_process = []
# calculate the total file size for progress reports, build a list of incomplete files to process # calculate the total file size for progress reports, build a list of incomplete files to process
# do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end # do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end
@ -319,6 +369,7 @@ if __name__ == '__main__':
if file.complete: if file.complete:
files_processed += 1 files_processed += 1
total_lines_processed += file.lines_processed total_lines_processed += file.lines_processed
total_lines_matched += file.lines_matched
total_bytes_processed += file.file_size total_bytes_processed += file.file_size
total_lines_errored += file.error_lines total_lines_errored += file.error_lines
else: else:
@ -335,7 +386,7 @@ if __name__ == '__main__':
log.info(f"Processing file: {file.input_path}") log.info(f"Processing file: {file.input_path}")
# start the workers # start the workers
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool: with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values) for file in files_to_process], chunksize=1, error_callback=log.info) workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values, args.split_intermediate) for file in files_to_process], chunksize=1, error_callback=log.info)
while not workers.ready() or not queue.empty(): while not workers.ready() or not queue.empty():
# loop until the workers are all done, pulling in status messages as they are sent # loop until the workers are all done, pulling in status messages as they are sent
file_update = queue.get() file_update = queue.get()
@ -349,13 +400,7 @@ if __name__ == '__main__':
# I'm going to assume that the list of files is short enough that it's no # I'm going to assume that the list of files is short enough that it's no
# big deal to just iterate each time since that saves a bunch of work # big deal to just iterate each time since that saves a bunch of work
total_lines_processed = 0 total_lines_processed, total_lines_matched, total_bytes_processed, total_lines_errored, files_processed, files_errored, i = 0, 0, 0, 0, 0, 0, 0
total_lines_matched = 0
total_bytes_processed = 0
total_lines_errored = 0
files_processed = 0
files_errored = 0
i = 0
for file in input_files: for file in input_files:
if file.input_path == file_update.input_path: if file.input_path == file_update.input_path:
input_files[i] = file_update input_files[i] = file_update
@ -389,8 +434,10 @@ if __name__ == '__main__':
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}") log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
working_file_paths = [] type_handles = defaultdict(list)
prefixes = set()
count_incomplete = 0 count_incomplete = 0
count_intermediate_files = 0
# build a list of output files to combine # build a list of output files to combine
for file in sorted(input_files, key=lambda item: os.path.split(item.output_path)[1]): for file in sorted(input_files, key=lambda item: os.path.split(item.output_path)[1]):
if not file.complete: if not file.complete:
@ -405,68 +452,94 @@ if __name__ == '__main__':
f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}, " f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}, "
f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%") f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%")
count_incomplete += 1 count_incomplete += 1
elif file.output_path is not None: elif file.output_path is not None and os.path.exists(file.output_path):
if os.path.exists(file.output_path): input_handle = FileHandle(file.output_path, is_split=args.split_intermediate)
working_file_paths.append(file.output_path) for path in input_handle.get_paths():
prefixes.add(path[-FileHandle.ext_len - 1:-FileHandle.ext_len])
count_intermediate_files += 1
type_handles[file.file_type].append(input_handle)
if count_incomplete > 0: if count_incomplete > 0:
log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting") log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting")
sys.exit() sys.exit()
log.info(f"Processing complete, combining {len(working_file_paths)} result files") log.info(f"Processing complete, combining {count_intermediate_files} result files")
for completed_prefix in completed_prefixes:
if completed_prefix in prefixes:
prefixes.remove(completed_prefix)
output_lines = 0 output_lines = 0
all_handles = []
output_handles = {} output_handles = {}
files_combined = 0 files_combined = 0
if args.split and values: if values:
split = True split = True
else: else:
split = False split = False
for working_file_path in working_file_paths: if args.split_intermediate:
for prefix in sorted(prefixes):
log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines")
for file_type, input_handles in type_handles.items():
for input_handle in input_handles:
has_lines = False
for line, file_bytes_processed in input_handle.yield_lines(character_filter=prefix):
if not has_lines:
has_lines = True
files_combined += 1 files_combined += 1
log.info(f"From {files_combined}/{len(working_file_paths)} files to {len(all_handles):,} output handles : {output_lines:,} lines : {os.path.split(working_file_path)[1]}")
working_file_name = os.path.split(working_file_path)[1]
if working_file_name.startswith("RS"):
file_type = "submissions"
elif working_file_name.startswith("RC"):
file_type = "comments"
else:
log.warning(f"Unknown working file type, skipping: {working_file_name}")
continue
input_handle = FileHandle(working_file_path)
if file_type not in output_handles:
output_handles[file_type] = {}
file_type_handles = output_handles[file_type]
for line, file_bytes_processed in input_handle.yield_lines():
output_lines += 1 output_lines += 1
if split:
obj = json.loads(line) obj = json.loads(line)
observed_case = obj[args.field] observed_case = obj[args.field]
else:
observed_case = value
observed = observed_case.lower() observed = observed_case.lower()
if observed not in file_type_handles: if observed not in output_handles:
if args.output: if args.output:
if not os.path.exists(args.output): if not os.path.exists(args.output):
os.makedirs(args.output) os.makedirs(args.output)
output_file_path = os.path.join(args.output, f"{observed_case}_{file_type}.zst") output_file_path = os.path.join(args.output, f"{observed_case}_{FileType.to_str(file_type)}.zst")
else: else:
output_file_path = f"{observed_case}_{file_type}.zst" output_file_path = f"{observed_case}_{FileType.to_str(file_type)}.zst"
log.debug(f"Writing to file {output_file_path}") log.debug(f"Writing to file {output_file_path}")
output_handle = FileHandle(output_file_path) output_handle = FileHandle(output_file_path)
file_type_handles[observed] = output_handle output_handles[observed] = output_handle
all_handles.append(output_handle)
else: else:
output_handle = file_type_handles[observed] output_handle = output_handles[observed]
output_handle.write_line(line) output_handle.write_line(line)
if output_lines % 100000 == 0: if output_lines % 1000000 == 0:
log.info(f"From {files_combined}/{len(working_file_paths)} files to {len(all_handles):,} output handles : {output_lines:,} lines : {os.path.split(working_file_path)[1]}") log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines : {input_handle.path} / {prefix}")
for handle in output_handles.values():
log.info(f"From {files_combined}/{len(working_file_paths)} files to {len(all_handles):,} output handles : {output_lines:,} lines")
for handle in all_handles:
handle.close() handle.close()
output_handles = {}
completed_prefixes.add(prefix)
save_file_list(input_files, args.working, status_json, arg_string, script_type, completed_prefixes)
log.info(f"Finished combining files, {output_lines:,} lines written") else:
log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines")
for file_type, input_handles in type_handles.items():
for input_handle in input_handles:
files_combined += 1
for line, file_bytes_processed in input_handle.yield_lines():
output_lines += 1
obj = json.loads(line)
observed_case = obj[args.field]
observed = observed_case.lower()
if observed not in output_handles:
if args.output:
if not os.path.exists(args.output):
os.makedirs(args.output)
output_file_path = os.path.join(args.output, f"{observed_case}_{FileType.to_str(file_type)}.zst")
else:
output_file_path = f"{observed_case}_{FileType.to_str(file_type)}.zst"
log.debug(f"Writing to file {output_file_path}")
output_handle = FileHandle(output_file_path)
output_handles[observed] = output_handle
else:
output_handle = output_handles[observed]
output_handle.write_line(line)
if output_lines % 1000000 == 0:
log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines : {input_handle.path}")
for handle in output_handles.values():
handle.close()
output_handles = {}
log.info(f"From {files_combined}/{count_intermediate_files} files to {len(output_handles):,} output handles : {output_lines:,}/{total_lines_matched:,} lines")