mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-26 16:15:37 -04:00
Try to redo logging
This commit is contained in:
parent
83228b7b0a
commit
b312694821
1 changed files with 8 additions and 13 deletions
|
@ -62,7 +62,7 @@ class Queue:
|
||||||
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
||||||
def save_file_list(input_files, status_json, script_type):
|
def save_file_list(input_files, status_json, script_type):
|
||||||
simple_file_list = []
|
simple_file_list = []
|
||||||
for file in input_files:
|
for file in input_files.values():
|
||||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
||||||
with open(status_json, 'w') as status_json_file:
|
with open(status_json, 'w') as status_json_file:
|
||||||
output_dict = {
|
output_dict = {
|
||||||
|
@ -79,9 +79,7 @@ def load_file_list(status_json):
|
||||||
output_dict = json.load(status_json_file)
|
output_dict = json.load(status_json_file)
|
||||||
input_files = []
|
input_files = []
|
||||||
for simple_file in output_dict["files"]:
|
for simple_file in output_dict["files"]:
|
||||||
input_files.append(
|
input_files[simple_file[0]] = FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
||||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
|
||||||
)
|
|
||||||
return input_files, output_dict["type"]
|
return input_files, output_dict["type"]
|
||||||
else:
|
else:
|
||||||
return None, None
|
return None, None
|
||||||
|
@ -166,13 +164,13 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||||
if input_files is None:
|
if input_files is None:
|
||||||
input_files = []
|
input_files = {}
|
||||||
for subdir, dirs, files in os.walk(args.input):
|
for subdir, dirs, files in os.walk(args.input):
|
||||||
files.sort()
|
files.sort()
|
||||||
for file_name in files:
|
for file_name in files:
|
||||||
if file_name.endswith(".zst"):
|
if file_name.endswith(".zst"):
|
||||||
input_path = os.path.join(subdir, file_name)
|
input_path = os.path.join(subdir, file_name)
|
||||||
input_files.append(FileConfig(input_path))
|
input_files[input_path] = FileConfig(input_path)
|
||||||
|
|
||||||
save_file_list(input_files, status_json, script_type)
|
save_file_list(input_files, status_json, script_type)
|
||||||
else:
|
else:
|
||||||
|
@ -186,7 +184,7 @@ if __name__ == '__main__':
|
||||||
files_to_process = []
|
files_to_process = []
|
||||||
# calculate the total file size for progress reports, build a list of incomplete files to process
|
# calculate the total file size for progress reports, build a list of incomplete files to process
|
||||||
# do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end
|
# do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end
|
||||||
for file in sorted(input_files, key=lambda item: item.file_size, reverse=True):
|
for file in sorted(input_files.values(), key=lambda item: item.file_size, reverse=True):
|
||||||
total_bytes += file.file_size
|
total_bytes += file.file_size
|
||||||
if file.complete:
|
if file.complete:
|
||||||
files_processed += 1
|
files_processed += 1
|
||||||
|
@ -215,9 +213,9 @@ if __name__ == '__main__':
|
||||||
if file_update.error_message is not None:
|
if file_update.error_message is not None:
|
||||||
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
|
|
||||||
|
input_files[file_update.input_path] = file_update
|
||||||
if last_log_time is None or (current_time - last_log_time) > 5 or queue.empty():
|
if last_log_time is None or (current_time - last_log_time) > 5 or queue.empty():
|
||||||
# I'm going to assume that the list of files is short enough that it's no
|
|
||||||
# big deal to just iterate each time since that saves a bunch of work
|
|
||||||
total_lines_processed = 0
|
total_lines_processed = 0
|
||||||
total_bytes_processed = 0
|
total_bytes_processed = 0
|
||||||
total_lines_errored = 0
|
total_lines_errored = 0
|
||||||
|
@ -225,9 +223,6 @@ if __name__ == '__main__':
|
||||||
files_errored = 0
|
files_errored = 0
|
||||||
i = 0
|
i = 0
|
||||||
for file in input_files:
|
for file in input_files:
|
||||||
if file.input_path == file_update.input_path:
|
|
||||||
input_files[i] = file_update
|
|
||||||
file = file_update
|
|
||||||
total_lines_processed += file.lines_processed
|
total_lines_processed += file.lines_processed
|
||||||
total_bytes_processed += file.bytes_processed
|
total_bytes_processed += file.bytes_processed
|
||||||
total_lines_errored += file.error_lines
|
total_lines_errored += file.error_lines
|
||||||
|
@ -251,7 +246,7 @@ if __name__ == '__main__':
|
||||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||||
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
||||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining : "
|
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining : "
|
||||||
f"{queue.qsize()} files in queue")
|
f"{queue.qsize()} files in queue : {current_time} : {last_log_time} : {current_time - last_log_time if last_log_time is not None else 0} : {queue.empty()}")
|
||||||
last_log_time = time.time()
|
last_log_time = time.time()
|
||||||
|
|
||||||
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue