From 1a3789c298788262cce3dda00a6fff8fc1e8219a Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Thu, 12 Jan 2023 16:46:58 -0800 Subject: [PATCH] Work on multiprocess, change up argument format, handle comments and submissions at the same time, split the output --- personal/split_by_subreddit.py | 13 +-- scripts/combine_folder_multiprocess.py | 134 +++++++++++++++---------- 2 files changed, 89 insertions(+), 58 deletions(-) diff --git a/personal/split_by_subreddit.py b/personal/split_by_subreddit.py index 4a5e784..6c4bcb4 100644 --- a/personal/split_by_subreddit.py +++ b/personal/split_by_subreddit.py @@ -8,17 +8,18 @@ log = discord_logging.init_logging() if __name__ == "__main__": subreddits = {} - object_type = "comments" - folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}" + field = 'subreddit' + object_type = "submissions" + folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\multisub_{object_type}" if not os.path.exists(folder): os.makedirs(folder) - input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}.zst" + input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\multisub_{object_type}.zst" input_file_size = os.stat(input_file).st_size total_lines = 0 for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file): - if comment['subreddit'] not in subreddits: - subreddits[comment['subreddit']] = {'writer': utils.OutputZst(os.path.join(folder, comment['subreddit'] + f"_{object_type}.zst")), 'lines': 0} - subreddit = subreddits[comment['subreddit']] + if comment[field] not in subreddits: + subreddits[comment[field]] = {'writer': utils.OutputZst(os.path.join(folder, comment[field] + f"_{object_type}.zst")), 'lines': 0} + subreddit = subreddits[comment[field]] subreddit['writer'].write(line) subreddit['writer'].write("\n") subreddit['lines'] += 1 diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index 453b5aa..c58120c 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -3,16 +3,19 @@ # that month. After all the ndjson files are processed, it iterates through the resulting files and combines # them into a final file. +# this script assumes the files are named in chronological order and prefixed with RS_ or RC_, like the pushshift dumps + # features: # - multiple processes in parallel to maximize drive read and decompression # - saves state as it completes each file and picks up where it stopped # - detailed progress indicators # examples: -# - get all comments that have a subreddit field (subreddit is the default) of "wallstreetbets" -# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name wallstreetbets_comments --value wallstreetbets -# - get all comments that have an author field of Watchful1 -# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name watchful1_comments --field author --value Watchful1 +# - get all comments that have a subreddit field (subreddit is the default) of "wallstreetbets". This will create a single output file "wallstreetbets_comments.zst" in the folder the script is run in +# python3 combine_folder_multiprocess.py reddit/comments --value wallstreetbets +# - get all comments and submissions (assuming both types of dump files are under the reddit folder) that have an author field of Watchful1 or spez and output the results to a folder called pushshift. +# This will result in four files, pushshift/Watchful1_comments, pushshift/Watchful1_submissions, pushshift/spez_comments, pushshift/spez_submissions +# python3 combine_folder_multiprocess.py reddit --field author --value Watchful1,spez --output pushshift import zstandard import os @@ -72,31 +75,22 @@ class Queue: return self.list[0] if len(self.list) > 0 else None -# builds file paths -def folder_helper(output_folder, output_file_name): - working_folder = os.path.join(output_folder, "pushshift_working_dir_" + output_file_name) - status_file = os.path.join(working_folder, output_file_name + ".json") - return working_folder, status_file - - # save file information and progress to a json file -# we don't want to save the whole FileConfig object, since some of the info resets if we restart -def save_file_list(input_files, output_folder, output_file_name): - working_folder, status_json_file_name = folder_helper(output_folder, output_file_name) +# we don't want to save the whole FileConfig object, since some info resets if we restart +def save_file_list(input_files, working_folder, status_json): if not os.path.exists(working_folder): os.makedirs(working_folder) simple_file_list = [] for file in input_files: simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines]) - with open(status_json_file_name, 'w') as status_json_file: + with open(status_json, 'w') as status_json_file: status_json_file.write(json.dumps(simple_file_list, indent=4)) # load file information from the json file and recalculate file sizes -def load_file_list(output_folder, output_file_name): - _, status_json_file_name = folder_helper(output_folder, output_file_name) - if os.path.exists(status_json_file_name): - with open(status_json_file_name, 'r') as status_json_file: +def load_file_list(status_json): + if os.path.exists(status_json): + with open(status_json, 'r') as status_json_file: simple_file_list = json.load(status_json_file) input_files = [] for simple_file in simple_file_list: @@ -144,7 +138,7 @@ def read_lines_zst(file_name): # base of each separate process. Loads a file, iterates through lines and writes out # the ones where the `field` of the object matches `value`. Also passes status # information back to the parent via a queue -def process_file(file, working_folder, queue, field, value, values, case_sensitive): +def process_file(file, queue, field, value, values, case_sensitive): output_file = None try: for line, file_bytes_processed in read_lines_zst(file.input_path): @@ -160,9 +154,6 @@ def process_file(file, working_folder, queue, field, value, values, case_sensiti if matched: if output_file is None: - if file.output_path is None: - created = datetime.utcfromtimestamp(int(obj['created_utc'])) - file.output_path = os.path.join(working_folder, created.strftime("%Y-%m")) output_file = open(file.output_path, 'w') output_file.write(line) output_file.write("\n") @@ -185,9 +176,10 @@ def process_file(file, working_folder, queue, field, value, values, case_sensiti if __name__ == '__main__': parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files") - parser.add_argument("input", help="The input folder to read files from") - parser.add_argument("output", help="The output folder to store temporary files in and write the output to") - parser.add_argument("--name", help="What to name the output file", default="pushshift") + parser.add_argument("input", help="The input folder to recursively read files from") + parser.add_argument("--split", help="Split the output into separate files by the filter fields, only applies if there's multiple fields", action='store_const', const=True, default=True) + parser.add_argument("--output", help="Put the output files in this folder", default="") + parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working") parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit") parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift") parser.add_argument("--processes", help="Number of processes to use", default=10, type=int) @@ -204,7 +196,7 @@ if __name__ == '__main__': log.setLevel(logging.DEBUG) log.info(f"Loading files from: {args.input}") - log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.zst'))}") + log.info(f"Writing output to: {args.output}.zst") if not args.case_sensitive: args.value = args.value.lower() @@ -226,8 +218,8 @@ if __name__ == '__main__': multiprocessing.set_start_method('spawn') queue = multiprocessing.Manager().Queue() - input_files = load_file_list(args.output, args.name) - working_folder, _ = folder_helper(args.output, args.name) + status_json = os.path.join(args.working, "status.json") + input_files = load_file_list(status_json) # if the file list wasn't loaded from the json, this is the first run, find what files we need to process if input_files is None: input_files = [] @@ -236,9 +228,12 @@ if __name__ == '__main__': for file_name in files: if file_name.endswith(".zst"): input_path = os.path.join(subdir, file_name) - input_files.append(FileConfig(input_path)) + output_path = os.path.join(args.working, file_name[:-4]) + input_files.append(FileConfig(input_path, output_path=output_path)) - save_file_list(input_files, args.output, args.name) + save_file_list(input_files, args.working, status_json) + else: + log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again") files_processed = 0 total_bytes = 0 @@ -269,7 +264,7 @@ if __name__ == '__main__': log.info(f"Processing file: {file.input_path}") # start the workers with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool: - workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, value, values, args.case_sensitive) for file in files_to_process], error_callback=log.info) + workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values, args.case_sensitive) for file in files_to_process], error_callback=log.info) while not workers.ready(): # loop until the workers are all done, pulling in status messages as they are sent file_update = queue.get() @@ -294,7 +289,7 @@ if __name__ == '__main__': files_errored += 1 if file.error_message is not None else 0 i += 1 if file_update.complete or file_update.error_message is not None: - save_file_list(input_files, args.output, args.name) + save_file_list(input_files, args.working, status_json) current_time = time.time() progress_queue.put([current_time, total_lines_processed, total_bytes_processed]) @@ -317,7 +312,7 @@ if __name__ == '__main__': working_file_paths = [] count_incomplete = 0 # build a list of output files to combine - for file in input_files: + for file in sorted(input_files, key=lambda item: os.path.split(item.output_path)[1]): if not file.complete: if file.error_message is not None: log.info(f"File {file.input_path} errored {file.error_message}") @@ -331,10 +326,7 @@ if __name__ == '__main__': f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%") count_incomplete += 1 elif file.output_path is not None: - if not os.path.exists(file.output_path): - log.info(f"Output file {file.output_path} doesn't exist") - count_incomplete += 1 - else: + if os.path.exists(file.output_path): working_file_paths.append(file.output_path) if count_incomplete > 0: @@ -344,18 +336,56 @@ if __name__ == '__main__': log.info(f"Processing complete, combining {len(working_file_paths)} result files") output_lines = 0 - output_file_path = os.path.join(args.output, args.name + ".zst") - # combine all the output files into the final results file - with open(output_file_path, 'wb') as output_file: - files_combined = 0 - writer = zstandard.ZstdCompressor().stream_writer(output_file) - for working_file_path in working_file_paths: - files_combined += 1 - log.info(f"Reading {files_combined}/{len(working_file_paths)}") - with open(working_file_path, 'r') as input_file: - for line in input_file: - output_lines += 1 - encoded_line = line.encode('utf-8') - writer.write(encoded_line) + all_handles = [] + output_handles = {} + files_combined = 0 + if args.split and values: + split = True + else: + split = False + for working_file_path in working_file_paths: + files_combined += 1 + log.info(f"Reading {files_combined}/{len(working_file_paths)} : {os.path.split(working_file_path)[1]}") + working_file_name = os.path.split(working_file_path)[1] + if working_file_name.startswith("RS"): + file_type = "submissions" + elif working_file_name.startswith("RC"): + file_type = "comments" + else: + log.warning(f"Unknown working file type, skipping: {working_file_name}") + continue + if file_type not in output_handles: + output_handles[file_type] = {} + file_type_handles = output_handles[file_type] + with open(working_file_path, 'r') as input_file: + for line in input_file: + output_lines += 1 + if split: + obj = json.loads(line) + observed_case = obj[args.field] + else: + observed_case = value + observed = observed_case if args.case_sensitive else observed_case.lower() + if observed not in file_type_handles: + if args.output: + if not os.path.exists(args.output): + os.makedirs(args.output) + output_file_path = os.path.join(args.output, f"{observed_case}_{file_type}.zst") + else: + output_file_path = f"{observed_case}_{file_type}.zst" + log.info(f"Writing to file {output_file_path}") + file_handle = open(output_file_path, 'wb') + writer = zstandard.ZstdCompressor().stream_writer(file_handle) + file_type_handles[observed] = writer + all_handles.append(writer) + all_handles.append(file_handle) + else: + writer = file_type_handles[observed] - log.info(f"Finished combining files, {output_lines:,} lines written to {output_file_path}") + encoded_line = line.encode('utf-8') + writer.write(encoded_line) + + for handle in all_handles: + handle.close() + + log.info(f"Finished combining files, {output_lines:,} lines written")