This commit is contained in:
Watchful1 2021-09-09 22:24:14 -07:00
parent bd7378ff91
commit dd12687141
4 changed files with 40 additions and 29 deletions

View file

@ -1,3 +1,24 @@
# this script iterates through zst compressed ndjson files, like the pushshift reddit dumps, loads each line
# and passes it into the save_obj function, if it function returns true for a line, it's written out into a
# separate file for that month. After all the ndjson files are processed, it iterates through the resulting
# files and combines them into a final file.
# once complete, the combined file can easily be processed like
# with open(file_path, 'r') as file:
# for line in file:
# obj = json.loads(line)
# features:
# - multiple processes in parallel to maximize drive read and decompression
# - saves state as it completes each file and picks up where it stopped
# - detailed progress indicators
# examples:
# - get all comments that have a subreddit field (subreddit is the default) of "wallstreetbets"
# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name wallstreetbets_comments --value wallstreetbets
# - get all comments that have an author field of Watchful1
# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name watchful1_comments --field author --value Watchful1
import zstandard
import os
import json
@ -25,22 +46,6 @@ log_file_handler.setFormatter(log_formatter)
log.addHandler(log_file_handler)
# this script iterates through zst compressed ndjson files, like the pushshift reddit dumps, loads each line
# and passes it into the save_obj function, if it function returns true for a line, it's written out into a
# separate file for that month. After all the ndjson files are processed, it iterates through the resulting
# files and combines them into a final file.
# once complete, the combined file can easily be processed like
# with open(file_path, 'r') as file:
# for line in file:
# obj = json.loads(line)
# features:
# - multiple processes in parallel to maximize drive read and decompression
# - saves state as it completes each file and picks up where it stopped
# - detailed progress indicators
# convenience object used to pass status information between processes
class FileConfig:
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0):
@ -179,7 +184,7 @@ if __name__ == '__main__':
log.setLevel(logging.DEBUG)
log.info(f"Loading files from: {args.input}")
log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.txt'))}")
log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.zst'))}")
multiprocessing.set_start_method('spawn')
queue = multiprocessing.Manager().Queue()
@ -294,21 +299,17 @@ if __name__ == '__main__':
log.info(f"Processing complete, combining {len(working_file_paths)} result files")
output_lines = 0
output_file_path = os.path.join(args.output, args.name + ".txt")
output_file_path = os.path.join(args.output, args.name + ".zst")
# combine all the output files into the final results file
with open(output_file_path, 'w') as output_file:
i = 0
files_combined = 0
writer = zstandard.ZstdCompressor().stream_writer(output_file)
for working_file_path in working_file_paths:
i += 1
log.info(f"Reading {i}/{len(working_file_paths)}")
files_combined += 1
log.info(f"Reading {files_combined}/{len(working_file_paths)}")
with open(working_file_path, 'r') as input_file:
for line in input_file.readlines():
output_lines += 1
output_file.write(line)
writer.write(line.encode('utf-8'))
log.info(f"Finished combining files, {output_lines:,} lines written to {output_file_path}")
# test file sorting
# compress results
# example command line call in comment