mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-04 03:16:40 -04:00
Update iterate folder with the new decode method
This commit is contained in:
parent
f7286b7572
commit
e103298be3
1 changed files with 19 additions and 3 deletions
|
@ -15,20 +15,36 @@ log.setLevel(logging.DEBUG)
|
|||
log.addHandler(logging.StreamHandler())
|
||||
|
||||
|
||||
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
|
||||
chunk = reader.read(chunk_size)
|
||||
bytes_read += chunk_size
|
||||
if previous_chunk is not None:
|
||||
chunk = previous_chunk + chunk
|
||||
try:
|
||||
return chunk.decode()
|
||||
except UnicodeDecodeError:
|
||||
if bytes_read > max_window_size:
|
||||
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
|
||||
log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
|
||||
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
|
||||
|
||||
|
||||
def read_lines_zst(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**28).stream_reader(file_handle)
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**24).decode() # read a 16mb chunk at a time. There are some really big comments
|
||||
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
|
||||
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
yield line, file_handle.tell()
|
||||
yield line.strip(), file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
|
||||
reader.close()
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue