mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-03 19:06:39 -04:00
Try building monthly count files
This commit is contained in:
parent
ee10be10b9
commit
1fa0f7a7a7
1 changed files with 158 additions and 132 deletions
|
@ -29,9 +29,10 @@ log.addHandler(log_file_handler)
|
|||
|
||||
# convenience object used to pass status information between processes
|
||||
class FileConfig:
|
||||
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0):
|
||||
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0, count_file_path=None):
|
||||
self.input_path = input_path
|
||||
self.output_path = output_path
|
||||
self.count_file_path = count_file_path
|
||||
self.file_size = os.stat(input_path).st_size
|
||||
self.complete = complete
|
||||
self.bytes_processed = self.file_size if complete else 0
|
||||
|
@ -60,16 +61,17 @@ class Queue:
|
|||
|
||||
# save file information and progress to a json file
|
||||
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
||||
def save_file_list(input_files, working_folder, status_json, script_type):
|
||||
def save_file_list(input_files, working_folder, status_json, script_type, stage):
|
||||
if not os.path.exists(working_folder):
|
||||
os.makedirs(working_folder)
|
||||
simple_file_list = []
|
||||
for file in input_files:
|
||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines, file.monthly_count_file])
|
||||
with open(status_json, 'w') as status_json_file:
|
||||
output_dict = {
|
||||
"files": simple_file_list,
|
||||
"type": script_type,
|
||||
"stage": stage,
|
||||
}
|
||||
status_json_file.write(json.dumps(output_dict, indent=4))
|
||||
|
||||
|
@ -82,11 +84,11 @@ def load_file_list(status_json):
|
|||
input_files = []
|
||||
for simple_file in output_dict["files"]:
|
||||
input_files.append(
|
||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4], simple_file[5] if len(simple_file) > 5 else None)
|
||||
)
|
||||
return input_files, output_dict["type"]
|
||||
return input_files, output_dict["type"], output_dict["stage"]
|
||||
else:
|
||||
return None, None
|
||||
return None, None, "count"
|
||||
|
||||
|
||||
# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes
|
||||
|
@ -158,6 +160,7 @@ if __name__ == '__main__':
|
|||
parser.add_argument("input", help="The input folder to recursively read files from")
|
||||
parser.add_argument("--output", help="Name of the output file", default="field_counts")
|
||||
parser.add_argument("--working", help="The folder to store temporary files in", default="pushshift_working")
|
||||
parser.add_argument("--monthly_count_folder", help="The folder to store monthly count files in", default="pushshift_counts")
|
||||
parser.add_argument("--field", help="Which field to count", default="subreddit")
|
||||
parser.add_argument("--min_count", help="Dont write any counts below this number", default=1000, type=int)
|
||||
parser.add_argument("--processes", help="Number of processes to use", default=10, type=int)
|
||||
|
@ -183,145 +186,168 @@ if __name__ == '__main__':
|
|||
multiprocessing.set_start_method('spawn')
|
||||
queue = multiprocessing.Manager().Queue()
|
||||
status_json = os.path.join(args.working, "status.json")
|
||||
input_files, saved_type = load_file_list(status_json)
|
||||
input_files, saved_type, stage = load_file_list(status_json)
|
||||
|
||||
if saved_type and saved_type != script_type:
|
||||
log.warning(f"Script type doesn't match type from json file. Delete working folder")
|
||||
sys.exit(0)
|
||||
|
||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||
if input_files is None:
|
||||
input_files = []
|
||||
for subdir, dirs, files in os.walk(args.input):
|
||||
files.sort()
|
||||
for file_name in files:
|
||||
if file_name.endswith(".zst") and re.search(args.file_filter, file_name, re.IGNORECASE) is not None:
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
output_path = os.path.join(args.working, file_name[:-4])
|
||||
input_files.append(FileConfig(input_path, output_path=output_path))
|
||||
if stage == "count":
|
||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||
if input_files is None:
|
||||
input_files = []
|
||||
for subdir, dirs, files in os.walk(args.input):
|
||||
files.sort()
|
||||
for file_name in files:
|
||||
if file_name.endswith(".zst") and re.search(args.file_filter, file_name, re.IGNORECASE) is not None:
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
output_path = os.path.join(args.working, file_name[:-4])
|
||||
input_files.append(FileConfig(input_path, output_path=output_path))
|
||||
|
||||
save_file_list(input_files, args.working, status_json, script_type)
|
||||
else:
|
||||
log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again")
|
||||
|
||||
files_processed = 0
|
||||
total_bytes = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_to_process = []
|
||||
# calculate the total file size for progress reports, build a list of incomplete files to process
|
||||
# do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end
|
||||
for file in sorted(input_files, key=lambda item: item.file_size, reverse=True):
|
||||
total_bytes += file.file_size
|
||||
if file.complete:
|
||||
files_processed += 1
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.file_size
|
||||
total_lines_errored += file.error_lines
|
||||
save_file_list(input_files, args.working, status_json, script_type, "count")
|
||||
else:
|
||||
files_to_process.append(file)
|
||||
log.info(f"Existing input file was read, if this is not correct you should delete the {args.working} folder and run this script again")
|
||||
|
||||
log.info(f"Processed {files_processed} of {len(input_files)} files with {(total_bytes_processed / (2**30)):.2f} of {(total_bytes / (2**30)):.2f} gigabytes")
|
||||
|
||||
start_time = time.time()
|
||||
if len(files_to_process):
|
||||
progress_queue = Queue(40)
|
||||
progress_queue.put([start_time, total_lines_processed, total_bytes_processed])
|
||||
speed_queue = Queue(40)
|
||||
for file in files_to_process:
|
||||
log.info(f"Processing file: {file.input_path}")
|
||||
# start the workers
|
||||
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
||||
workers = pool.starmap_async(process_file, [(file, queue, args.field) for file in files_to_process], chunksize=1, error_callback=log.info)
|
||||
while not workers.ready():
|
||||
# loop until the workers are all done, pulling in status messages as they are sent
|
||||
file_update = queue.get()
|
||||
if file_update.error_message is not None:
|
||||
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
||||
# I'm going to assume that the list of files is short enough that it's no
|
||||
# big deal to just iterate each time since that saves a bunch of work
|
||||
total_lines_processed = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_processed = 0
|
||||
files_errored = 0
|
||||
i = 0
|
||||
for file in input_files:
|
||||
if file.input_path == file_update.input_path:
|
||||
input_files[i] = file_update
|
||||
file = file_update
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.bytes_processed
|
||||
total_lines_errored += file.error_lines
|
||||
files_processed += 1 if file.complete or file.error_message is not None else 0
|
||||
files_errored += 1 if file.error_message is not None else 0
|
||||
i += 1
|
||||
if file_update.complete or file_update.error_message is not None:
|
||||
save_file_list(input_files, args.working, status_json, script_type)
|
||||
current_time = time.time()
|
||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||
|
||||
first_time, first_lines, first_bytes = progress_queue.peek()
|
||||
bytes_per_second = int((total_bytes_processed - first_bytes)/(current_time - first_time))
|
||||
speed_queue.put(bytes_per_second)
|
||||
seconds_left = int((total_bytes - total_bytes_processed) / int(sum(speed_queue.list) / len(speed_queue.list)))
|
||||
minutes_left = int(seconds_left / 60)
|
||||
hours_left = int(minutes_left / 60)
|
||||
days_left = int(hours_left / 24)
|
||||
|
||||
log.info(
|
||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining")
|
||||
|
||||
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
||||
|
||||
working_file_paths = []
|
||||
count_incomplete = 0
|
||||
# build a list of output files to combine
|
||||
for file in sorted(input_files, key=lambda item: os.path.split(item.output_path)[1]):
|
||||
if not file.complete:
|
||||
if file.error_message is not None:
|
||||
log.info(f"File {file.input_path} errored {file.error_message}")
|
||||
files_processed = 0
|
||||
total_bytes = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_to_process = []
|
||||
# calculate the total file size for progress reports, build a list of incomplete files to process
|
||||
# do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end
|
||||
for file in sorted(input_files, key=lambda item: item.file_size, reverse=True):
|
||||
total_bytes += file.file_size
|
||||
if file.complete:
|
||||
files_processed += 1
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.file_size
|
||||
total_lines_errored += file.error_lines
|
||||
else:
|
||||
log.info(f"File {file.input_path} is not marked as complete")
|
||||
count_incomplete += 1
|
||||
else:
|
||||
if file.error_lines > file.lines_processed * (args.error_rate * 0.01):
|
||||
log.info(
|
||||
f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}, "
|
||||
f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%")
|
||||
files_to_process.append(file)
|
||||
|
||||
log.info(f"Processed {files_processed} of {len(input_files)} files with {(total_bytes_processed / (2**30)):.2f} of {(total_bytes / (2**30)):.2f} gigabytes")
|
||||
|
||||
start_time = time.time()
|
||||
if len(files_to_process):
|
||||
progress_queue = Queue(40)
|
||||
progress_queue.put([start_time, total_lines_processed, total_bytes_processed])
|
||||
speed_queue = Queue(40)
|
||||
for file in files_to_process:
|
||||
log.info(f"Processing file: {file.input_path}")
|
||||
# start the workers
|
||||
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
||||
workers = pool.starmap_async(process_file, [(file, queue, args.field) for file in files_to_process], chunksize=1, error_callback=log.info)
|
||||
while not workers.ready():
|
||||
# loop until the workers are all done, pulling in status messages as they are sent
|
||||
file_update = queue.get()
|
||||
if file_update.error_message is not None:
|
||||
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
||||
# I'm going to assume that the list of files is short enough that it's no
|
||||
# big deal to just iterate each time since that saves a bunch of work
|
||||
total_lines_processed = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_processed = 0
|
||||
files_errored = 0
|
||||
i = 0
|
||||
for file in input_files:
|
||||
if file.input_path == file_update.input_path:
|
||||
input_files[i] = file_update
|
||||
file = file_update
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.bytes_processed
|
||||
total_lines_errored += file.error_lines
|
||||
files_processed += 1 if file.complete or file.error_message is not None else 0
|
||||
files_errored += 1 if file.error_message is not None else 0
|
||||
i += 1
|
||||
if file_update.complete or file_update.error_message is not None:
|
||||
save_file_list(input_files, args.working, status_json, script_type)
|
||||
current_time = time.time()
|
||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||
|
||||
first_time, first_lines, first_bytes = progress_queue.peek()
|
||||
bytes_per_second = int((total_bytes_processed - first_bytes)/(current_time - first_time))
|
||||
speed_queue.put(bytes_per_second)
|
||||
seconds_left = int((total_bytes - total_bytes_processed) / int(sum(speed_queue.list) / len(speed_queue.list)))
|
||||
minutes_left = int(seconds_left / 60)
|
||||
hours_left = int(minutes_left / 60)
|
||||
days_left = int(hours_left / 24)
|
||||
|
||||
log.info(
|
||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining")
|
||||
|
||||
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
||||
stage = "sum"
|
||||
save_file_list(input_files, args.working, status_json, script_type, stage)
|
||||
|
||||
if stage == "sum":
|
||||
#working_file_paths = []
|
||||
count_incomplete = 0
|
||||
# build a list of output files to combine
|
||||
input_files = sorted(input_files, key=lambda item: os.path.split(item.output_path)[1])
|
||||
for file in input_files:
|
||||
if not file.complete:
|
||||
if file.error_message is not None:
|
||||
log.info(f"File {file.input_path} errored {file.error_message}")
|
||||
else:
|
||||
log.info(f"File {file.input_path} is not marked as complete")
|
||||
count_incomplete += 1
|
||||
elif file.output_path is not None:
|
||||
if os.path.exists(file.output_path):
|
||||
working_file_paths.append(file.output_path)
|
||||
else:
|
||||
if file.error_lines > file.lines_processed * (args.error_rate * 0.01):
|
||||
log.info(
|
||||
f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}, "
|
||||
f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%")
|
||||
count_incomplete += 1
|
||||
|
||||
if count_incomplete > 0:
|
||||
log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting")
|
||||
sys.exit()
|
||||
if count_incomplete > 0:
|
||||
log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting")
|
||||
sys.exit()
|
||||
|
||||
log.info(f"Processing complete, combining {len(working_file_paths)} result files")
|
||||
log.info(f"Processing complete, combining {len(input_files)} result files")
|
||||
|
||||
input_lines = 0
|
||||
files_combined = 0
|
||||
field_counts = defaultdict(int)
|
||||
for working_file_path in working_file_paths:
|
||||
files_combined += 1
|
||||
log.info(f"Reading {files_combined}/{len(working_file_paths)} : {input_lines:,} : {len(field_counts):,} : {os.path.split(working_file_path)[1]}")
|
||||
with open(working_file_path, 'r') as input_file:
|
||||
for line in input_file:
|
||||
input_lines += 1
|
||||
field_counts[line.strip()] += 1
|
||||
input_lines = 0
|
||||
files_counted = 0
|
||||
monthly_count_folder_paths = []
|
||||
for file in input_files:
|
||||
files_counted += 1
|
||||
if not os.path.exists(file.output_path):
|
||||
log.info(f"Output file {file.output_path} does not exist, skipping")
|
||||
continue
|
||||
monthly_counts = defaultdict(int)
|
||||
log.info(f"Reading {files_counted}/{len(input_files)} : {input_lines:,} : {os.path.split(file.output_path)[1]}")
|
||||
with open(file.output_path, 'r') as input_file:
|
||||
for line in input_file:
|
||||
input_lines += 1
|
||||
monthly_counts[line.strip()] += 1
|
||||
|
||||
sorted_counts = sorted(field_counts.items(), key=lambda item: item[1], reverse=True)
|
||||
file.monthly_count_file = os.path.join(args.monthly_count_folder, os.path.basename(file.output_path))
|
||||
with open(file.monthly_count_file, 'w') as output_handle:
|
||||
for field, count in sorted(monthly_counts.items(), key=lambda item: item[1], reverse=True):
|
||||
output_handle.write(f"{field} {count}\n")
|
||||
|
||||
output_counts = 0
|
||||
with open(f"{args.output}.txt", 'w') as output_handle:
|
||||
for field, count in sorted_counts:
|
||||
if count >= args.min_count:
|
||||
output_counts += 1
|
||||
output_handle.write(f"{field} {count}\n")
|
||||
log.info(f"Finished combining files into monthlies, {input_lines:,} lines read. Combining into result output")
|
||||
stage = "agg"
|
||||
save_file_list(input_files, args.working, status_json, script_type, stage)
|
||||
|
||||
log.info(f"Finished combining files, {input_lines:,} lines read, {output_counts:,} field counts written")
|
||||
if stage == "agg":
|
||||
field_counts = defaultdict(int)
|
||||
for file in input_files:
|
||||
with open(file.monthly_count_file, 'r') as input_handle:
|
||||
for line in input_handle:
|
||||
field, count = line.strip().split("\t")
|
||||
field_counts[field] = count
|
||||
|
||||
sorted_counts = sorted(field_counts.items(), key=lambda item: item[1], reverse=True)
|
||||
|
||||
output_counts = 0
|
||||
with open(f"{args.output}.txt", 'w') as output_handle:
|
||||
for field, count in sorted_counts:
|
||||
if count >= args.min_count:
|
||||
output_counts += 1
|
||||
output_handle.write(f"{field} {count}\n")
|
||||
|
||||
log.info(f"Finished combining files, {output_counts:,} field counts written")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue