mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-23 06:40:47 -04:00
Bit more cleanup for combine and add count
This commit is contained in:
parent
894961c3ee
commit
cae4434c33
2 changed files with 345 additions and 10 deletions
|
@ -23,6 +23,7 @@ import json
|
|||
import sys
|
||||
import time
|
||||
import argparse
|
||||
import re
|
||||
from datetime import datetime
|
||||
import logging.handlers
|
||||
import multiprocessing
|
||||
|
@ -77,7 +78,7 @@ class Queue:
|
|||
|
||||
# save file information and progress to a json file
|
||||
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
||||
def save_file_list(input_files, working_folder, status_json, arg_string):
|
||||
def save_file_list(input_files, working_folder, status_json, arg_string, script_type):
|
||||
if not os.path.exists(working_folder):
|
||||
os.makedirs(working_folder)
|
||||
simple_file_list = []
|
||||
|
@ -86,7 +87,8 @@ def save_file_list(input_files, working_folder, status_json, arg_string):
|
|||
with open(status_json, 'w') as status_json_file:
|
||||
output_dict = {
|
||||
"args": arg_string,
|
||||
"files": simple_file_list
|
||||
"type": script_type,
|
||||
"files": simple_file_list,
|
||||
}
|
||||
status_json_file.write(json.dumps(output_dict, indent=4))
|
||||
|
||||
|
@ -101,9 +103,9 @@ def load_file_list(status_json):
|
|||
input_files.append(
|
||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
||||
)
|
||||
return input_files, output_dict["args"]
|
||||
return input_files, output_dict["args"], output_dict["type"]
|
||||
else:
|
||||
return None, None
|
||||
return None, None, None
|
||||
|
||||
|
||||
# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes
|
||||
|
@ -144,7 +146,7 @@ def read_lines_zst(file_name):
|
|||
# information back to the parent via a queue
|
||||
def process_file(file, queue, field, value, values, case_sensitive):
|
||||
output_file = None
|
||||
log.debug(f"Starting file: {file.input_path}")
|
||||
log.debug(f"Starting file: {file.input_path} : {file.file_size:,}")
|
||||
try:
|
||||
for line, file_bytes_processed in read_lines_zst(file.input_path):
|
||||
try:
|
||||
|
@ -174,7 +176,7 @@ def process_file(file, queue, field, value, values, case_sensitive):
|
|||
|
||||
file.complete = True
|
||||
file.bytes_processed = file.file_size
|
||||
log.debug(f"Finished file: {file.input_path}")
|
||||
log.debug(f"Finished file: {file.input_path} : {file.file_size:,}")
|
||||
except Exception as err:
|
||||
file.error_message = str(err)
|
||||
queue.put(file)
|
||||
|
@ -190,11 +192,13 @@ if __name__ == '__main__':
|
|||
parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list. This is case sensitive", default="pushshift")
|
||||
parser.add_argument("--processes", help="Number of processes to use", default=10, type=int)
|
||||
parser.add_argument("--case-sensitive", help="Matching should be case sensitive", action="store_true")
|
||||
parser.add_argument("--file_filter", help="Regex filenames have to match to be processed", default="^rc_|rs_")
|
||||
parser.add_argument(
|
||||
"--error_rate", help=
|
||||
"Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, "
|
||||
"there are a number of posts that simply don't have a subreddit attached", default=1, type=int)
|
||||
parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False)
|
||||
script_type = "split"
|
||||
|
||||
args = parser.parse_args()
|
||||
arg_string = f"{args.field}:{args.value}:{args.case_sensitive}"
|
||||
|
@ -229,23 +233,27 @@ if __name__ == '__main__':
|
|||
multiprocessing.set_start_method('spawn')
|
||||
queue = multiprocessing.Manager().Queue()
|
||||
status_json = os.path.join(args.working, "status.json")
|
||||
input_files, saved_arg_string = load_file_list(status_json)
|
||||
input_files, saved_arg_string, saved_type = load_file_list(status_json)
|
||||
if saved_arg_string and saved_arg_string != arg_string:
|
||||
log.warning(f"Args don't match args from json file. Delete working folder")
|
||||
sys.exit(0)
|
||||
|
||||
if saved_type and saved_type != script_type:
|
||||
log.warning(f"Script type doesn't match type from json file. Delete working folder")
|
||||
sys.exit(0)
|
||||
|
||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||
if input_files is None:
|
||||
input_files = []
|
||||
for subdir, dirs, files in os.walk(args.input):
|
||||
files.sort()
|
||||
for file_name in files:
|
||||
if file_name.endswith(".zst"):
|
||||
if file_name.endswith(".zst") and re.search(args.file_filter, file_name, re.IGNORECASE) is not None:
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
output_path = os.path.join(args.working, file_name[:-4])
|
||||
input_files.append(FileConfig(input_path, output_path=output_path))
|
||||
|
||||
save_file_list(input_files, args.working, status_json, arg_string)
|
||||
save_file_list(input_files, args.working, status_json, arg_string, script_type)
|
||||
else:
|
||||
log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again")
|
||||
|
||||
|
@ -303,7 +311,7 @@ if __name__ == '__main__':
|
|||
files_errored += 1 if file.error_message is not None else 0
|
||||
i += 1
|
||||
if file_update.complete or file_update.error_message is not None:
|
||||
save_file_list(input_files, args.working, status_json, arg_string)
|
||||
save_file_list(input_files, args.working, status_json, arg_string, script_type)
|
||||
current_time = time.time()
|
||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue