mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-23 14:50:35 -04:00
Count matched lines
This commit is contained in:
parent
ba6da35b37
commit
8282a5e765
1 changed files with 9 additions and 5 deletions
|
@ -47,7 +47,7 @@ log.addHandler(log_file_handler)
|
|||
|
||||
# convenience object used to pass status information between processes
|
||||
class FileConfig:
|
||||
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0):
|
||||
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0, lines_matched=0):
|
||||
self.input_path = input_path
|
||||
self.output_path = output_path
|
||||
self.file_size = os.stat(input_path).st_size
|
||||
|
@ -56,6 +56,7 @@ class FileConfig:
|
|||
self.lines_processed = lines_processed if complete else 0
|
||||
self.error_message = None
|
||||
self.error_lines = error_lines
|
||||
self.lines_matched = lines_matched
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.input_path} : {self.output_path} : {self.file_size} : {self.complete} : {self.bytes_processed} : {self.lines_processed}"
|
||||
|
@ -161,7 +162,7 @@ def save_file_list(input_files, working_folder, status_json, arg_string, script_
|
|||
os.makedirs(working_folder)
|
||||
simple_file_list = []
|
||||
for file in input_files:
|
||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines, file.lines_matched])
|
||||
with open(status_json, 'w') as status_json_file:
|
||||
output_dict = {
|
||||
"args": arg_string,
|
||||
|
@ -179,7 +180,7 @@ def load_file_list(status_json):
|
|||
input_files = []
|
||||
for simple_file in output_dict["files"]:
|
||||
input_files.append(
|
||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4], simple_file[5])
|
||||
)
|
||||
return input_files, output_dict["args"], output_dict["type"]
|
||||
else:
|
||||
|
@ -207,6 +208,7 @@ def process_file(file, queue, field, value, values):
|
|||
|
||||
if matched:
|
||||
output_handle.write_line(line)
|
||||
file.lines_matched += 1
|
||||
except (KeyError, json.JSONDecodeError) as err:
|
||||
file.error_lines += 1
|
||||
file.lines_processed += 1
|
||||
|
@ -334,7 +336,7 @@ if __name__ == '__main__':
|
|||
# start the workers
|
||||
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
||||
workers = pool.starmap_async(process_file, [(file, queue, args.field, value, values) for file in files_to_process], chunksize=1, error_callback=log.info)
|
||||
while not workers.ready():
|
||||
while not workers.ready() or not queue.empty():
|
||||
# loop until the workers are all done, pulling in status messages as they are sent
|
||||
file_update = queue.get()
|
||||
if file_update.error_message is not None:
|
||||
|
@ -348,6 +350,7 @@ if __name__ == '__main__':
|
|||
# I'm going to assume that the list of files is short enough that it's no
|
||||
# big deal to just iterate each time since that saves a bunch of work
|
||||
total_lines_processed = 0
|
||||
total_lines_matched = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_processed = 0
|
||||
|
@ -358,6 +361,7 @@ if __name__ == '__main__':
|
|||
input_files[i] = file_update
|
||||
file = file_update
|
||||
total_lines_processed += file.lines_processed
|
||||
total_lines_matched += file.lines_matched
|
||||
total_bytes_processed += file.bytes_processed
|
||||
total_lines_errored += file.error_lines
|
||||
files_processed += 1 if file.complete or file.error_message is not None else 0
|
||||
|
@ -378,7 +382,7 @@ if __name__ == '__main__':
|
|||
days_left = int(hours_left / 24)
|
||||
|
||||
log.info(
|
||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored, {total_lines_matched:,} matched : "
|
||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue