mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-04 03:16:40 -04:00
Work on multiprocess, change up argument format, handle comments and submissions at the same time, split the output
This commit is contained in:
parent
c4d652d0cf
commit
1a3789c298
2 changed files with 89 additions and 58 deletions
|
@ -8,17 +8,18 @@ log = discord_logging.init_logging()
|
|||
|
||||
if __name__ == "__main__":
|
||||
subreddits = {}
|
||||
object_type = "comments"
|
||||
folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}"
|
||||
field = 'subreddit'
|
||||
object_type = "submissions"
|
||||
folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\multisub_{object_type}"
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}.zst"
|
||||
input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\multisub_{object_type}.zst"
|
||||
input_file_size = os.stat(input_file).st_size
|
||||
total_lines = 0
|
||||
for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file):
|
||||
if comment['subreddit'] not in subreddits:
|
||||
subreddits[comment['subreddit']] = {'writer': utils.OutputZst(os.path.join(folder, comment['subreddit'] + f"_{object_type}.zst")), 'lines': 0}
|
||||
subreddit = subreddits[comment['subreddit']]
|
||||
if comment[field] not in subreddits:
|
||||
subreddits[comment[field]] = {'writer': utils.OutputZst(os.path.join(folder, comment[field] + f"_{object_type}.zst")), 'lines': 0}
|
||||
subreddit = subreddits[comment[field]]
|
||||
subreddit['writer'].write(line)
|
||||
subreddit['writer'].write("\n")
|
||||
subreddit['lines'] += 1
|
||||
|
|
|
@ -3,16 +3,19 @@
|
|||
# that month. After all the ndjson files are processed, it iterates through the resulting files and combines
|
||||
# them into a final file.
|
||||
|
||||
# this script assumes the files are named in chronological order and prefixed with RS_ or RC_, like the pushshift dumps
|
||||
|
||||
# features:
|
||||
# - multiple processes in parallel to maximize drive read and decompression
|
||||
# - saves state as it completes each file and picks up where it stopped
|
||||
# - detailed progress indicators
|
||||
|
||||
# examples:
|
||||
# - get all comments that have a subreddit field (subreddit is the default) of "wallstreetbets"
|
||||
# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name wallstreetbets_comments --value wallstreetbets
|
||||
# - get all comments that have an author field of Watchful1
|
||||
# python3 combine_folder_multiprocess.py reddit/comments reddit_final --name watchful1_comments --field author --value Watchful1
|
||||
# - get all comments that have a subreddit field (subreddit is the default) of "wallstreetbets". This will create a single output file "wallstreetbets_comments.zst" in the folder the script is run in
|
||||
# python3 combine_folder_multiprocess.py reddit/comments --value wallstreetbets
|
||||
# - get all comments and submissions (assuming both types of dump files are under the reddit folder) that have an author field of Watchful1 or spez and output the results to a folder called pushshift.
|
||||
# This will result in four files, pushshift/Watchful1_comments, pushshift/Watchful1_submissions, pushshift/spez_comments, pushshift/spez_submissions
|
||||
# python3 combine_folder_multiprocess.py reddit --field author --value Watchful1,spez --output pushshift
|
||||
|
||||
import zstandard
|
||||
import os
|
||||
|
@ -72,31 +75,22 @@ class Queue:
|
|||
return self.list[0] if len(self.list) > 0 else None
|
||||
|
||||
|
||||
# builds file paths
|
||||
def folder_helper(output_folder, output_file_name):
|
||||
working_folder = os.path.join(output_folder, "pushshift_working_dir_" + output_file_name)
|
||||
status_file = os.path.join(working_folder, output_file_name + ".json")
|
||||
return working_folder, status_file
|
||||
|
||||
|
||||
# save file information and progress to a json file
|
||||
# we don't want to save the whole FileConfig object, since some of the info resets if we restart
|
||||
def save_file_list(input_files, output_folder, output_file_name):
|
||||
working_folder, status_json_file_name = folder_helper(output_folder, output_file_name)
|
||||
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
||||
def save_file_list(input_files, working_folder, status_json):
|
||||
if not os.path.exists(working_folder):
|
||||
os.makedirs(working_folder)
|
||||
simple_file_list = []
|
||||
for file in input_files:
|
||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
||||
with open(status_json_file_name, 'w') as status_json_file:
|
||||
with open(status_json, 'w') as status_json_file:
|
||||
status_json_file.write(json.dumps(simple_file_list, indent=4))
|
||||
|
||||
|
||||
# load file information from the json file and recalculate file sizes
|
||||
def load_file_list(output_folder, output_file_name):
|
||||
_, status_json_file_name = folder_helper(output_folder, output_file_name)
|
||||
if os.path.exists(status_json_file_name):
|
||||
with open(status_json_file_name, 'r') as status_json_file:
|
||||
def load_file_list(status_json):
|
||||
if os.path.exists(status_json):
|
||||
with open(status_json, 'r') as status_json_file:
|
||||
simple_file_list = json.load(status_json_file)
|
||||
input_files = []
|
||||
for simple_file in simple_file_list:
|
||||
|
@ -144,7 +138,7 @@ def read_lines_zst(file_name):
|
|||
# base of each separate process. Loads a file, iterates through lines and writes out
|
||||
# the ones where the `field` of the object matches `value`. Also passes status
|
||||
# information back to the parent via a queue
|
||||
def process_file(file, working_folder, queue, field, value, values, case_sensitive):
|
||||
def process_file(file, queue, field, value, values, case_sensitive):
|
||||
output_file = None
|
||||
try:
|
||||
for line, file_bytes_processed in read_lines_zst(file.input_path):
|
||||
|
@ -160,9 +154,6 @@ def process_file(file, working_folder, queue, field, value, values, case_sensiti
|
|||
|
||||
if matched:
|
||||
if output_file is None:
|
||||
if file.output_path is None:
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file.output_path = os.path.join(working_folder, created.strftime("%Y-%m"))
|
||||
output_file = open(file.output_path, 'w')
|
||||
output_file.write(line)
|
||||
output_file.write("\n")
|
||||
|
@ -185,9 +176,10 @@ def process_file(file, working_folder, queue, field, value, values, case_sensiti
|
|||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files")
|
||||
parser.add_argument("input", help="The input folder to read files from")
|
||||
parser.add_argument("output", help="The output folder to store temporary files in and write the output to")
|
||||
parser.add_argument("--name", help="What to name the output file", default="pushshift")
|
||||
parser.add_argument("input", help="The input folder to recursively read files from")
|
||||
parser.add_argument("--split", help="Split the output into separate files by the filter fields, only applies if there's multiple fields", action='store_const', const=True, default=True)
|
||||
parser.add_argument("--output", help="Put the output files in this folder", default="")
|
||||
parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working")
|
||||
parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit")
|
||||
parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift")
|
||||
parser.add_argument("--processes", help="Number of processes to use", default=10, type=int)
|
||||
|
@ -204,7 +196,7 @@ if __name__ == '__main__':
|
|||
log.setLevel(logging.DEBUG)
|
||||
|
||||
log.info(f"Loading files from: {args.input}")
|
||||
log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.zst'))}")
|
||||
log.info(f"Writing output to: {args.output}.zst")
|
||||
|
||||
if not args.case_sensitive:
|
||||
args.value = args.value.lower()
|
||||
|
@ -226,8 +218,8 @@ if __name__ == '__main__':
|
|||
|
||||
multiprocessing.set_start_method('spawn')
|
||||
queue = multiprocessing.Manager().Queue()
|
||||
input_files = load_file_list(args.output, args.name)
|
||||
working_folder, _ = folder_helper(args.output, args.name)
|
||||
status_json = os.path.join(args.working, "status.json")
|
||||
input_files = load_file_list(status_json)
|
||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||
if input_files is None:
|
||||
input_files = []
|
||||
|
@ -236,9 +228,12 @@ if __name__ == '__main__':
|
|||
for file_name in files:
|
||||
if file_name.endswith(".zst"):
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
input_files.append(FileConfig(input_path))
|
||||
output_path = os.path.join(args.working, file_name[:-4])
|
||||
input_files.append(FileConfig(input_path, output_path=output_path))
|
||||
|
||||
save_file_list(input_files, args.output, args.name)
|
||||
save_file_list(input_files, args.working, status_json)
|
||||
else:
|
||||
log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again")
|
||||
|
||||
files_processed = 0
|
||||
total_bytes = 0
|
||||
|
@ -269,7 +264,7 @@ if __name__ == '__main__':
|
|||
log.info(f"Processing file: {file.input_path}")
|
||||
# start the workers
|
||||
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
||||
workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, value, values, args.case_sensitive) for file in files_to_process], error_callback=log.info)
|
||||
workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values, args.case_sensitive) for file in files_to_process], error_callback=log.info)
|
||||
while not workers.ready():
|
||||
# loop until the workers are all done, pulling in status messages as they are sent
|
||||
file_update = queue.get()
|
||||
|
@ -294,7 +289,7 @@ if __name__ == '__main__':
|
|||
files_errored += 1 if file.error_message is not None else 0
|
||||
i += 1
|
||||
if file_update.complete or file_update.error_message is not None:
|
||||
save_file_list(input_files, args.output, args.name)
|
||||
save_file_list(input_files, args.working, status_json)
|
||||
current_time = time.time()
|
||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||
|
||||
|
@ -317,7 +312,7 @@ if __name__ == '__main__':
|
|||
working_file_paths = []
|
||||
count_incomplete = 0
|
||||
# build a list of output files to combine
|
||||
for file in input_files:
|
||||
for file in sorted(input_files, key=lambda item: os.path.split(item.output_path)[1]):
|
||||
if not file.complete:
|
||||
if file.error_message is not None:
|
||||
log.info(f"File {file.input_path} errored {file.error_message}")
|
||||
|
@ -331,10 +326,7 @@ if __name__ == '__main__':
|
|||
f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%")
|
||||
count_incomplete += 1
|
||||
elif file.output_path is not None:
|
||||
if not os.path.exists(file.output_path):
|
||||
log.info(f"Output file {file.output_path} doesn't exist")
|
||||
count_incomplete += 1
|
||||
else:
|
||||
if os.path.exists(file.output_path):
|
||||
working_file_paths.append(file.output_path)
|
||||
|
||||
if count_incomplete > 0:
|
||||
|
@ -344,18 +336,56 @@ if __name__ == '__main__':
|
|||
log.info(f"Processing complete, combining {len(working_file_paths)} result files")
|
||||
|
||||
output_lines = 0
|
||||
output_file_path = os.path.join(args.output, args.name + ".zst")
|
||||
# combine all the output files into the final results file
|
||||
with open(output_file_path, 'wb') as output_file:
|
||||
files_combined = 0
|
||||
writer = zstandard.ZstdCompressor().stream_writer(output_file)
|
||||
for working_file_path in working_file_paths:
|
||||
files_combined += 1
|
||||
log.info(f"Reading {files_combined}/{len(working_file_paths)}")
|
||||
with open(working_file_path, 'r') as input_file:
|
||||
for line in input_file:
|
||||
output_lines += 1
|
||||
encoded_line = line.encode('utf-8')
|
||||
writer.write(encoded_line)
|
||||
all_handles = []
|
||||
output_handles = {}
|
||||
files_combined = 0
|
||||
if args.split and values:
|
||||
split = True
|
||||
else:
|
||||
split = False
|
||||
for working_file_path in working_file_paths:
|
||||
files_combined += 1
|
||||
log.info(f"Reading {files_combined}/{len(working_file_paths)} : {os.path.split(working_file_path)[1]}")
|
||||
working_file_name = os.path.split(working_file_path)[1]
|
||||
if working_file_name.startswith("RS"):
|
||||
file_type = "submissions"
|
||||
elif working_file_name.startswith("RC"):
|
||||
file_type = "comments"
|
||||
else:
|
||||
log.warning(f"Unknown working file type, skipping: {working_file_name}")
|
||||
continue
|
||||
if file_type not in output_handles:
|
||||
output_handles[file_type] = {}
|
||||
file_type_handles = output_handles[file_type]
|
||||
with open(working_file_path, 'r') as input_file:
|
||||
for line in input_file:
|
||||
output_lines += 1
|
||||
if split:
|
||||
obj = json.loads(line)
|
||||
observed_case = obj[args.field]
|
||||
else:
|
||||
observed_case = value
|
||||
observed = observed_case if args.case_sensitive else observed_case.lower()
|
||||
if observed not in file_type_handles:
|
||||
if args.output:
|
||||
if not os.path.exists(args.output):
|
||||
os.makedirs(args.output)
|
||||
output_file_path = os.path.join(args.output, f"{observed_case}_{file_type}.zst")
|
||||
else:
|
||||
output_file_path = f"{observed_case}_{file_type}.zst"
|
||||
log.info(f"Writing to file {output_file_path}")
|
||||
file_handle = open(output_file_path, 'wb')
|
||||
writer = zstandard.ZstdCompressor().stream_writer(file_handle)
|
||||
file_type_handles[observed] = writer
|
||||
all_handles.append(writer)
|
||||
all_handles.append(file_handle)
|
||||
else:
|
||||
writer = file_type_handles[observed]
|
||||
|
||||
log.info(f"Finished combining files, {output_lines:,} lines written to {output_file_path}")
|
||||
encoded_line = line.encode('utf-8')
|
||||
writer.write(encoded_line)
|
||||
|
||||
for handle in all_handles:
|
||||
handle.close()
|
||||
|
||||
log.info(f"Finished combining files, {output_lines:,} lines written")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue