mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-26 16:15:37 -04:00
Some cleanup, optimize multiprocess
This commit is contained in:
parent
461028b401
commit
1a99630073
5 changed files with 246 additions and 237 deletions
|
@ -115,7 +115,12 @@ def read_lines_zst(file_name):
|
|||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**27).decode()
|
||||
data_chunk = reader.read(2**27)
|
||||
try:
|
||||
chunk = data_chunk.decode()
|
||||
except UnicodeDecodeError:
|
||||
data_chunk += reader.read(2**29)
|
||||
chunk = data_chunk.decode()
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
@ -227,7 +232,8 @@ if __name__ == '__main__':
|
|||
total_lines_errored = 0
|
||||
files_to_process = []
|
||||
# calculate the total file size for progress reports, build a list of incomplete files to process
|
||||
for file in input_files:
|
||||
# do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end
|
||||
for file in sorted(input_files, key=lambda item: item.file_size, reverse=True):
|
||||
total_bytes += file.file_size
|
||||
if file.complete:
|
||||
files_processed += 1
|
||||
|
@ -254,13 +260,13 @@ if __name__ == '__main__':
|
|||
file_update = queue.get()
|
||||
if file_update.error_message is not None:
|
||||
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
||||
continue
|
||||
# I'm going to assume that the list of files is short enough that it's no
|
||||
# big deal to just iterate each time since that saves a bunch of work
|
||||
total_lines_processed = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_processed = 0
|
||||
files_errored = 0
|
||||
i = 0
|
||||
for file in input_files:
|
||||
if file.input_path == file_update.input_path:
|
||||
|
@ -269,9 +275,9 @@ if __name__ == '__main__':
|
|||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.bytes_processed
|
||||
total_lines_errored += file.error_lines
|
||||
files_processed += 1 if file.complete else 0
|
||||
files_processed += 1 if file.complete or file.error_message is not None else 0
|
||||
i += 1
|
||||
if file_update.complete:
|
||||
if file_update.complete or file_update.error_message is not None:
|
||||
save_file_list(input_files, args.output, args.name)
|
||||
current_time = time.time()
|
||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||
|
@ -287,7 +293,7 @@ if __name__ == '__main__':
|
|||
log.info(
|
||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||
f"{files_processed}/{len(input_files)} files : "
|
||||
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining")
|
||||
|
||||
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
||||
|
@ -297,7 +303,10 @@ if __name__ == '__main__':
|
|||
# build a list of output files to combine
|
||||
for file in input_files:
|
||||
if not file.complete:
|
||||
log.info(f"File {file.input_path} is not marked as complete")
|
||||
if file.error_message is not None:
|
||||
log.info(f"File {file.input_path} errored {file.error_message}")
|
||||
else:
|
||||
log.info(f"File {file.input_path} is not marked as complete")
|
||||
count_incomplete += 1
|
||||
else:
|
||||
if file.error_lines > file.lines_processed * (args.error_rate * 0.01):
|
||||
|
|
|
@ -14,11 +14,21 @@ log.addHandler(logging.StreamHandler())
|
|||
|
||||
|
||||
def read_lines_zst(file_name):
|
||||
skip_to = 3566550750
|
||||
bytes_read = 0
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
#reader.read(40000000000)
|
||||
while True:
|
||||
chunk = reader.read(2**27).decode()
|
||||
data_chunk = reader.read(2**27)
|
||||
try:
|
||||
chunk = data_chunk.decode()
|
||||
except UnicodeDecodeError:
|
||||
log.info("Decoding error, reading a second chunk")
|
||||
data_chunk += reader.read(2**29)
|
||||
chunk = data_chunk.decode()
|
||||
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
@ -27,6 +37,7 @@ def read_lines_zst(file_name):
|
|||
yield line, file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
|
||||
reader.close()
|
||||
|
||||
|
||||
|
@ -39,19 +50,20 @@ if __name__ == "__main__":
|
|||
field = "subreddit"
|
||||
value = "wallstreetbets"
|
||||
bad_lines = 0
|
||||
try:
|
||||
for line, file_bytes_processed in read_lines_zst(file_path):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
temp = obj[field] == value
|
||||
except (KeyError, json.JSONDecodeError) as err:
|
||||
bad_lines += 1
|
||||
file_lines += 1
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
except Exception as err:
|
||||
log.info(err)
|
||||
# try:
|
||||
for line, file_bytes_processed in read_lines_zst(file_path):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
temp = obj[field] == value
|
||||
except (KeyError, json.JSONDecodeError) as err:
|
||||
bad_lines += 1
|
||||
file_lines += 1
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {file_bytes_processed:,}:{(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
|
||||
# except Exception as err:
|
||||
# log.info(err)
|
||||
|
||||
log.info(f"Complete : {file_lines:,} : {bad_lines:,}")
|
||||
|
||||
|
|
|
@ -56,10 +56,11 @@ if __name__ == "__main__":
|
|||
for line, file_bytes_processed in read_lines_zst(input_file_path):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
output_obj = []
|
||||
for field in fields:
|
||||
output_obj.append(obj[field].encode("utf-8", errors='replace').decode())
|
||||
writer.writerow(output_obj)
|
||||
if "social dilemma" in obj['body'].lower():
|
||||
output_obj = []
|
||||
for field in fields:
|
||||
output_obj.append(str(obj[field]).encode("utf-8", errors='replace').decode())
|
||||
writer.writerow(output_obj)
|
||||
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
except json.JSONDecodeError as err:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue