mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-25 15:45:19 -04:00
Update frame sizes
This commit is contained in:
parent
3fa63048e3
commit
c4d652d0cf
3 changed files with 40 additions and 19 deletions
|
@ -8,7 +8,7 @@ log = discord_logging.init_logging()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
input_path = r"\\MYCLOUDPR4100\Public\reddit\requests\jeanyp"
|
input_path = r"\\MYCLOUDPR4100\Public\reddit\requests\wallstreetbets_comments.zst"
|
||||||
|
|
||||||
input_file_paths = []
|
input_file_paths = []
|
||||||
if os.path.isdir(input_path):
|
if os.path.isdir(input_path):
|
||||||
|
@ -27,11 +27,16 @@ if __name__ == "__main__":
|
||||||
file_lines = 0
|
file_lines = 0
|
||||||
file_bytes_processed = 0
|
file_bytes_processed = 0
|
||||||
created = None
|
created = None
|
||||||
|
previous_timestamp = None
|
||||||
inserts = []
|
inserts = []
|
||||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
||||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
new_timestamp = int(obj['created_utc'])
|
||||||
|
created = datetime.utcfromtimestamp(new_timestamp)
|
||||||
|
if previous_timestamp is not None and previous_timestamp - (60 * 60 * 4) > new_timestamp:
|
||||||
|
log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
previous_timestamp = new_timestamp
|
||||||
file_lines += 1
|
file_lines += 1
|
||||||
if file_lines % 100000 == 0:
|
if file_lines % 10000 == 0:
|
||||||
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||||
|
|
||||||
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
||||||
|
|
|
@ -108,6 +108,20 @@ def load_file_list(output_folder, output_file_name):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes
|
||||||
|
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
|
||||||
|
chunk = reader.read(chunk_size)
|
||||||
|
bytes_read += chunk_size
|
||||||
|
if previous_chunk is not None:
|
||||||
|
chunk = previous_chunk + chunk
|
||||||
|
try:
|
||||||
|
return chunk.decode()
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
if bytes_read > max_window_size:
|
||||||
|
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
|
||||||
|
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
|
||||||
|
|
||||||
|
|
||||||
# open a zst compressed ndjson file and yield lines one at a time
|
# open a zst compressed ndjson file and yield lines one at a time
|
||||||
# also passes back file progress
|
# also passes back file progress
|
||||||
def read_lines_zst(file_name):
|
def read_lines_zst(file_name):
|
||||||
|
@ -115,12 +129,7 @@ def read_lines_zst(file_name):
|
||||||
buffer = ''
|
buffer = ''
|
||||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||||
while True:
|
while True:
|
||||||
data_chunk = reader.read(2**27)
|
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
|
||||||
try:
|
|
||||||
chunk = data_chunk.decode()
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
data_chunk += reader.read(2**29)
|
|
||||||
chunk = data_chunk.decode()
|
|
||||||
if not chunk:
|
if not chunk:
|
||||||
break
|
break
|
||||||
lines = (buffer + chunk).split("\n")
|
lines = (buffer + chunk).split("\n")
|
||||||
|
@ -257,7 +266,7 @@ if __name__ == '__main__':
|
||||||
progress_queue.put([start_time, total_lines_processed, total_bytes_processed])
|
progress_queue.put([start_time, total_lines_processed, total_bytes_processed])
|
||||||
speed_queue = Queue(40)
|
speed_queue = Queue(40)
|
||||||
for file in files_to_process:
|
for file in files_to_process:
|
||||||
log.debug(f"Processing file: {file.input_path}")
|
log.info(f"Processing file: {file.input_path}")
|
||||||
# start the workers
|
# start the workers
|
||||||
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
||||||
workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, value, values, args.case_sensitive) for file in files_to_process], error_callback=log.info)
|
workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, value, values, args.case_sensitive) for file in files_to_process], error_callback=log.info)
|
||||||
|
@ -282,6 +291,7 @@ if __name__ == '__main__':
|
||||||
total_bytes_processed += file.bytes_processed
|
total_bytes_processed += file.bytes_processed
|
||||||
total_lines_errored += file.error_lines
|
total_lines_errored += file.error_lines
|
||||||
files_processed += 1 if file.complete or file.error_message is not None else 0
|
files_processed += 1 if file.complete or file.error_message is not None else 0
|
||||||
|
files_errored += 1 if file.error_message is not None else 0
|
||||||
i += 1
|
i += 1
|
||||||
if file_update.complete or file_update.error_message is not None:
|
if file_update.complete or file_update.error_message is not None:
|
||||||
save_file_list(input_files, args.output, args.name)
|
save_file_list(input_files, args.output, args.name)
|
||||||
|
|
|
@ -13,21 +13,27 @@ log.setLevel(logging.DEBUG)
|
||||||
log.addHandler(logging.StreamHandler())
|
log.addHandler(logging.StreamHandler())
|
||||||
|
|
||||||
|
|
||||||
|
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
|
||||||
|
chunk = reader.read(chunk_size)
|
||||||
|
bytes_read += chunk_size
|
||||||
|
if previous_chunk is not None:
|
||||||
|
chunk = previous_chunk + chunk
|
||||||
|
try:
|
||||||
|
return chunk.decode()
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
if bytes_read > max_window_size:
|
||||||
|
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
|
||||||
|
log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
|
||||||
|
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
|
||||||
|
|
||||||
|
|
||||||
def read_lines_zst(file_name):
|
def read_lines_zst(file_name):
|
||||||
skip_to = 3566550750
|
|
||||||
bytes_read = 0
|
|
||||||
with open(file_name, 'rb') as file_handle:
|
with open(file_name, 'rb') as file_handle:
|
||||||
buffer = ''
|
buffer = ''
|
||||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||||
#reader.read(40000000000)
|
#reader.read(40000000000)
|
||||||
while True:
|
while True:
|
||||||
data_chunk = reader.read(2**27)
|
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
|
||||||
try:
|
|
||||||
chunk = data_chunk.decode()
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
log.info("Decoding error, reading a second chunk")
|
|
||||||
data_chunk += reader.read(2**29)
|
|
||||||
chunk = data_chunk.decode()
|
|
||||||
|
|
||||||
if not chunk:
|
if not chunk:
|
||||||
break
|
break
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue