mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-25 23:55:18 -04:00
Save the arguments in the status json so we don't accidentally reuse the same files for a different run
This commit is contained in:
parent
edf82d3d90
commit
894961c3ee
1 changed files with 24 additions and 10 deletions
|
@ -77,29 +77,33 @@ class Queue:
|
||||||
|
|
||||||
# save file information and progress to a json file
|
# save file information and progress to a json file
|
||||||
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
||||||
def save_file_list(input_files, working_folder, status_json):
|
def save_file_list(input_files, working_folder, status_json, arg_string):
|
||||||
if not os.path.exists(working_folder):
|
if not os.path.exists(working_folder):
|
||||||
os.makedirs(working_folder)
|
os.makedirs(working_folder)
|
||||||
simple_file_list = []
|
simple_file_list = []
|
||||||
for file in input_files:
|
for file in input_files:
|
||||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
||||||
with open(status_json, 'w') as status_json_file:
|
with open(status_json, 'w') as status_json_file:
|
||||||
status_json_file.write(json.dumps(simple_file_list, indent=4))
|
output_dict = {
|
||||||
|
"args": arg_string,
|
||||||
|
"files": simple_file_list
|
||||||
|
}
|
||||||
|
status_json_file.write(json.dumps(output_dict, indent=4))
|
||||||
|
|
||||||
|
|
||||||
# load file information from the json file and recalculate file sizes
|
# load file information from the json file and recalculate file sizes
|
||||||
def load_file_list(status_json):
|
def load_file_list(status_json):
|
||||||
if os.path.exists(status_json):
|
if os.path.exists(status_json):
|
||||||
with open(status_json, 'r') as status_json_file:
|
with open(status_json, 'r') as status_json_file:
|
||||||
simple_file_list = json.load(status_json_file)
|
output_dict = json.load(status_json_file)
|
||||||
input_files = []
|
input_files = []
|
||||||
for simple_file in simple_file_list:
|
for simple_file in output_dict["files"]:
|
||||||
input_files.append(
|
input_files.append(
|
||||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
||||||
)
|
)
|
||||||
return input_files
|
return input_files, output_dict["args"]
|
||||||
else:
|
else:
|
||||||
return None
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes
|
# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes
|
||||||
|
@ -140,6 +144,7 @@ def read_lines_zst(file_name):
|
||||||
# information back to the parent via a queue
|
# information back to the parent via a queue
|
||||||
def process_file(file, queue, field, value, values, case_sensitive):
|
def process_file(file, queue, field, value, values, case_sensitive):
|
||||||
output_file = None
|
output_file = None
|
||||||
|
log.debug(f"Starting file: {file.input_path}")
|
||||||
try:
|
try:
|
||||||
for line, file_bytes_processed in read_lines_zst(file.input_path):
|
for line, file_bytes_processed in read_lines_zst(file.input_path):
|
||||||
try:
|
try:
|
||||||
|
@ -169,6 +174,7 @@ def process_file(file, queue, field, value, values, case_sensitive):
|
||||||
|
|
||||||
file.complete = True
|
file.complete = True
|
||||||
file.bytes_processed = file.file_size
|
file.bytes_processed = file.file_size
|
||||||
|
log.debug(f"Finished file: {file.input_path}")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
file.error_message = str(err)
|
file.error_message = str(err)
|
||||||
queue.put(file)
|
queue.put(file)
|
||||||
|
@ -191,12 +197,16 @@ if __name__ == '__main__':
|
||||||
parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False)
|
parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
arg_string = f"{args.field}:{args.value}:{args.case_sensitive}"
|
||||||
|
|
||||||
if args.debug:
|
if args.debug:
|
||||||
log.setLevel(logging.DEBUG)
|
log.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
log.info(f"Loading files from: {args.input}")
|
log.info(f"Loading files from: {args.input}")
|
||||||
log.info(f"Writing output to: {args.output}.zst")
|
if args.output:
|
||||||
|
log.info(f"Writing output to: {args.output}")
|
||||||
|
else:
|
||||||
|
log.info(f"Writing output to working folder")
|
||||||
|
|
||||||
if not args.case_sensitive:
|
if not args.case_sensitive:
|
||||||
args.value = args.value.lower()
|
args.value = args.value.lower()
|
||||||
|
@ -219,7 +229,11 @@ if __name__ == '__main__':
|
||||||
multiprocessing.set_start_method('spawn')
|
multiprocessing.set_start_method('spawn')
|
||||||
queue = multiprocessing.Manager().Queue()
|
queue = multiprocessing.Manager().Queue()
|
||||||
status_json = os.path.join(args.working, "status.json")
|
status_json = os.path.join(args.working, "status.json")
|
||||||
input_files = load_file_list(status_json)
|
input_files, saved_arg_string = load_file_list(status_json)
|
||||||
|
if saved_arg_string and saved_arg_string != arg_string:
|
||||||
|
log.warning(f"Args don't match args from json file. Delete working folder")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||||
if input_files is None:
|
if input_files is None:
|
||||||
input_files = []
|
input_files = []
|
||||||
|
@ -231,7 +245,7 @@ if __name__ == '__main__':
|
||||||
output_path = os.path.join(args.working, file_name[:-4])
|
output_path = os.path.join(args.working, file_name[:-4])
|
||||||
input_files.append(FileConfig(input_path, output_path=output_path))
|
input_files.append(FileConfig(input_path, output_path=output_path))
|
||||||
|
|
||||||
save_file_list(input_files, args.working, status_json)
|
save_file_list(input_files, args.working, status_json, arg_string)
|
||||||
else:
|
else:
|
||||||
log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again")
|
log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again")
|
||||||
|
|
||||||
|
@ -289,7 +303,7 @@ if __name__ == '__main__':
|
||||||
files_errored += 1 if file.error_message is not None else 0
|
files_errored += 1 if file.error_message is not None else 0
|
||||||
i += 1
|
i += 1
|
||||||
if file_update.complete or file_update.error_message is not None:
|
if file_update.complete or file_update.error_message is not None:
|
||||||
save_file_list(input_files, args.working, status_json)
|
save_file_list(input_files, args.working, status_json, arg_string)
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue