diff --git a/README.md b/README.md index 8739277..490ffa1 100644 --- a/README.md +++ b/README.md @@ -1 +1,5 @@ -coming soon \ No newline at end of file +This repo contains example python scripts for processing the reddit dump files created by pushshift. The files can be downloaded from [here](https://files.pushshift.io/reddit/) or torrented from [here](https://academictorrents.com/details/f37bb9c0abe350f0f1cbd4577d0fe413ed07724e). + +* `single_file.py` decompresses and iterates over a single zst compressed file +* `iterate_folder.py` does the same, but for all files in a folder +* `combine_folder_multiprocess.py` uses separate processes to iterate over multiple files in parallel, writing lines that match the criteria passed in to text files, then combining them into a final zst compressed file \ No newline at end of file diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index 7270c4b..e4017df 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -1,3 +1,24 @@ +# this script iterates through zst compressed ndjson files, like the pushshift reddit dumps, loads each line +# and passes it into the save_obj function, if it function returns true for a line, it's written out into a +# separate file for that month. After all the ndjson files are processed, it iterates through the resulting +# files and combines them into a final file. + +# once complete, the combined file can easily be processed like +# with open(file_path, 'r') as file: +# for line in file: +# obj = json.loads(line) + +# features: +# - multiple processes in parallel to maximize drive read and decompression +# - saves state as it completes each file and picks up where it stopped +# - detailed progress indicators + +# examples: +# - get all comments that have a subreddit field (subreddit is the default) of "wallstreetbets" +# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name wallstreetbets_comments --value wallstreetbets +# - get all comments that have an author field of Watchful1 +# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name watchful1_comments --field author --value Watchful1 + import zstandard import os import json @@ -25,22 +46,6 @@ log_file_handler.setFormatter(log_formatter) log.addHandler(log_file_handler) -# this script iterates through zst compressed ndjson files, like the pushshift reddit dumps, loads each line -# and passes it into the save_obj function, if it function returns true for a line, it's written out into a -# separate file for that month. After all the ndjson files are processed, it iterates through the resulting -# files and combines them into a final file. - -# once complete, the combined file can easily be processed like -# with open(file_path, 'r') as file: -# for line in file: -# obj = json.loads(line) - -# features: -# - multiple processes in parallel to maximize drive read and decompression -# - saves state as it completes each file and picks up where it stopped -# - detailed progress indicators - - # convenience object used to pass status information between processes class FileConfig: def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0): @@ -179,7 +184,7 @@ if __name__ == '__main__': log.setLevel(logging.DEBUG) log.info(f"Loading files from: {args.input}") - log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.txt'))}") + log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.zst'))}") multiprocessing.set_start_method('spawn') queue = multiprocessing.Manager().Queue() @@ -294,21 +299,17 @@ if __name__ == '__main__': log.info(f"Processing complete, combining {len(working_file_paths)} result files") output_lines = 0 - output_file_path = os.path.join(args.output, args.name + ".txt") + output_file_path = os.path.join(args.output, args.name + ".zst") # combine all the output files into the final results file with open(output_file_path, 'w') as output_file: - i = 0 + files_combined = 0 + writer = zstandard.ZstdCompressor().stream_writer(output_file) for working_file_path in working_file_paths: - i += 1 - log.info(f"Reading {i}/{len(working_file_paths)}") + files_combined += 1 + log.info(f"Reading {files_combined}/{len(working_file_paths)}") with open(working_file_path, 'r') as input_file: for line in input_file.readlines(): output_lines += 1 - output_file.write(line) + writer.write(line.encode('utf-8')) log.info(f"Finished combining files, {output_lines:,} lines written to {output_file_path}") - - -# test file sorting -# compress results -# example command line call in comment diff --git a/scripts/iterate_folder.py b/scripts/iterate_folder.py index 317b01a..99981c2 100644 --- a/scripts/iterate_folder.py +++ b/scripts/iterate_folder.py @@ -1,3 +1,7 @@ +# this is an example of iterating over all zst files in a single folder, +# decompressing them and reading the created_utc field to make sure the files +# are intact. It has no output other than the number of lines + import zstandard import os import json diff --git a/scripts/single_file.py b/scripts/single_file.py index 23e9dea..ed8d810 100644 --- a/scripts/single_file.py +++ b/scripts/single_file.py @@ -1,3 +1,5 @@ +# this is an example of loading and iterating over a single file + import zstandard import os import json @@ -29,7 +31,7 @@ def read_lines_zst(file_name): if __name__ == "__main__": - file_path = r"\\MYCLOUDPR4100\Public\reddit\submissions\RS_2013-03.zst" + file_path = sys.argv[1] file_size = os.stat(file_path).st_size file_lines = 0 file_bytes_processed = 0