diff --git a/personal/test_file.py b/personal/test_file.py index 081b977..6795e0a 100644 --- a/personal/test_file.py +++ b/personal/test_file.py @@ -8,7 +8,7 @@ log = discord_logging.init_logging() if __name__ == "__main__": - input_path = r"\\MYCLOUDPR4100\Public\reddit\requests\jeanyp" + input_path = r"\\MYCLOUDPR4100\Public\reddit\requests\wallstreetbets_comments.zst" input_file_paths = [] if os.path.isdir(input_path): @@ -27,11 +27,16 @@ if __name__ == "__main__": file_lines = 0 file_bytes_processed = 0 created = None + previous_timestamp = None inserts = [] for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path): - created = datetime.utcfromtimestamp(int(obj['created_utc'])) + new_timestamp = int(obj['created_utc']) + created = datetime.utcfromtimestamp(new_timestamp) + if previous_timestamp is not None and previous_timestamp - (60 * 60 * 4) > new_timestamp: + log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}") + previous_timestamp = new_timestamp file_lines += 1 - if file_lines % 100000 == 0: + if file_lines % 10000 == 0: log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%") log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%") diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index c604985..453b5aa 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -108,6 +108,20 @@ def load_file_list(output_folder, output_file_name): return None +# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes +def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0): + chunk = reader.read(chunk_size) + bytes_read += chunk_size + if previous_chunk is not None: + chunk = previous_chunk + chunk + try: + return chunk.decode() + except UnicodeDecodeError: + if bytes_read > max_window_size: + raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes") + return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read) + + # open a zst compressed ndjson file and yield lines one at a time # also passes back file progress def read_lines_zst(file_name): @@ -115,12 +129,7 @@ def read_lines_zst(file_name): buffer = '' reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) while True: - data_chunk = reader.read(2**27) - try: - chunk = data_chunk.decode() - except UnicodeDecodeError: - data_chunk += reader.read(2**29) - chunk = data_chunk.decode() + chunk = read_and_decode(reader, 2**27, (2**29) * 2) if not chunk: break lines = (buffer + chunk).split("\n") @@ -257,7 +266,7 @@ if __name__ == '__main__': progress_queue.put([start_time, total_lines_processed, total_bytes_processed]) speed_queue = Queue(40) for file in files_to_process: - log.debug(f"Processing file: {file.input_path}") + log.info(f"Processing file: {file.input_path}") # start the workers with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool: workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, value, values, args.case_sensitive) for file in files_to_process], error_callback=log.info) @@ -282,6 +291,7 @@ if __name__ == '__main__': total_bytes_processed += file.bytes_processed total_lines_errored += file.error_lines files_processed += 1 if file.complete or file.error_message is not None else 0 + files_errored += 1 if file.error_message is not None else 0 i += 1 if file_update.complete or file_update.error_message is not None: save_file_list(input_files, args.output, args.name) diff --git a/scripts/single_file.py b/scripts/single_file.py index 43c22d4..92a5d38 100644 --- a/scripts/single_file.py +++ b/scripts/single_file.py @@ -13,21 +13,27 @@ log.setLevel(logging.DEBUG) log.addHandler(logging.StreamHandler()) +def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0): + chunk = reader.read(chunk_size) + bytes_read += chunk_size + if previous_chunk is not None: + chunk = previous_chunk + chunk + try: + return chunk.decode() + except UnicodeDecodeError: + if bytes_read > max_window_size: + raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes") + log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk") + return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read) + + def read_lines_zst(file_name): - skip_to = 3566550750 - bytes_read = 0 with open(file_name, 'rb') as file_handle: buffer = '' reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) #reader.read(40000000000) while True: - data_chunk = reader.read(2**27) - try: - chunk = data_chunk.decode() - except UnicodeDecodeError: - log.info("Decoding error, reading a second chunk") - data_chunk += reader.read(2**29) - chunk = data_chunk.decode() + chunk = read_and_decode(reader, 2**27, (2**29) * 2) if not chunk: break