mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-23 06:40:47 -04:00
Add split blocks by minute
This commit is contained in:
parent
4ecf22aaee
commit
f35762e203
7 changed files with 318 additions and 7 deletions
|
@ -142,6 +142,7 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
log.info(f"Input folder: {args.input}")
|
log.info(f"Input folder: {args.input}")
|
||||||
log.info(f"Output folder: {args.output}")
|
log.info(f"Output folder: {args.output}")
|
||||||
|
log.info(f"Month: {args.month}")
|
||||||
log.info(f"Compression level: {level}")
|
log.info(f"Compression level: {level}")
|
||||||
|
|
||||||
prefix = None
|
prefix = None
|
||||||
|
|
|
@ -8,7 +8,7 @@ log = discord_logging.init_logging()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
input_path = r"\\MYCLOUDPR4100\Public\ingest\combined\comments\RC_23-07-10.zst"
|
input_path = r"\\MYCLOUDPR4100\Public\reddit\comments\RC_2023-09.zst"
|
||||||
|
|
||||||
input_file_paths = []
|
input_file_paths = []
|
||||||
if os.path.isdir(input_path):
|
if os.path.isdir(input_path):
|
||||||
|
@ -32,7 +32,7 @@ if __name__ == "__main__":
|
||||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
||||||
new_timestamp = int(obj['created_utc'])
|
new_timestamp = int(obj['created_utc'])
|
||||||
created = datetime.utcfromtimestamp(new_timestamp)
|
created = datetime.utcfromtimestamp(new_timestamp)
|
||||||
if previous_timestamp is not None and previous_timestamp - (60 * 60) > new_timestamp:
|
if previous_timestamp is not None and previous_timestamp - (2) > new_timestamp:
|
||||||
log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}")
|
log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
previous_timestamp = new_timestamp
|
previous_timestamp = new_timestamp
|
||||||
file_lines += 1
|
file_lines += 1
|
||||||
|
|
46
personal/transform/split_blocks_by_minutes.py
Normal file
46
personal/transform/split_blocks_by_minutes.py
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
import discord_logging
|
||||||
|
import os
|
||||||
|
import zstandard
|
||||||
|
from datetime import datetime
|
||||||
|
import json
|
||||||
|
|
||||||
|
log = discord_logging.init_logging()
|
||||||
|
|
||||||
|
import utils
|
||||||
|
|
||||||
|
NEWLINE_ENCODED = "\n".encode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
input_file = r"\\MYCLOUDPR4100\Public\reddit\blocks\RS_2023-10.zst_blocks"
|
||||||
|
output_folder = r"\\MYCLOUDPR4100\Public\ingest\download"
|
||||||
|
file_type = "comments" if "RC" in input_file else "submissions"
|
||||||
|
|
||||||
|
log.info(f"Input: {input_file} - Output: {output_folder}")
|
||||||
|
previous_minute, output_handle, created_utc = None, None, None
|
||||||
|
count_objects, count_minute = 0, 0
|
||||||
|
for obj in utils.read_obj_zst_blocks(input_file):
|
||||||
|
created_utc = datetime.utcfromtimestamp(obj["created_utc"])
|
||||||
|
current_minute = created_utc.replace(second=0)
|
||||||
|
|
||||||
|
if previous_minute is None or current_minute > previous_minute:
|
||||||
|
log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}")
|
||||||
|
previous_minute = current_minute
|
||||||
|
count_minute = 0
|
||||||
|
if output_handle is not None:
|
||||||
|
output_handle.close()
|
||||||
|
|
||||||
|
output_path = os.path.join(output_folder, file_type, created_utc.strftime('%y-%m-%d'))
|
||||||
|
if not os.path.exists(output_path):
|
||||||
|
os.makedirs(output_path)
|
||||||
|
output_path = os.path.join(output_path, f"{('RC' if file_type == 'comments' else 'RS')}_{created_utc.strftime('%y-%m-%d_%H-%M')}.zst")
|
||||||
|
output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb'))
|
||||||
|
|
||||||
|
count_objects += 1
|
||||||
|
count_minute += 1
|
||||||
|
output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8'))
|
||||||
|
output_handle.write(NEWLINE_ENCODED)
|
||||||
|
|
||||||
|
log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}")
|
||||||
|
if output_handle is not None:
|
||||||
|
output_handle.close()
|
|
@ -12,7 +12,7 @@ NEWLINE_ENCODED = "\n".encode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
input_file = r"\\MYCLOUDPR4100\Public\RC_2023-08.zst"
|
input_file = r"\\MYCLOUDPR4100\Public\RS_2023-09.zst"
|
||||||
output_folder = r"\\MYCLOUDPR4100\Public\ingest\download"
|
output_folder = r"\\MYCLOUDPR4100\Public\ingest\download"
|
||||||
file_type = "comments" if "RC" in input_file else "submissions"
|
file_type = "comments" if "RC" in input_file else "submissions"
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import zstandard
|
import zstandard
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
from zst_blocks import ZstBlocksFile
|
||||||
|
|
||||||
|
|
||||||
def read_obj_zst(file_name):
|
def read_obj_zst(file_name):
|
||||||
|
@ -75,6 +77,14 @@ class OutputZst:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
# copied from https://github.com/ArthurHeitmann/zst_blocks_format
|
||||||
|
def read_obj_zst_blocks(file_name):
|
||||||
|
with open(file_name, "rb") as file:
|
||||||
|
for row in ZstBlocksFile.streamRows(file):
|
||||||
|
line = row.decode()
|
||||||
|
yield json.loads(line.strip())
|
||||||
|
|
||||||
|
|
||||||
def base36encode(integer: int) -> str:
|
def base36encode(integer: int) -> str:
|
||||||
chars = '0123456789abcdefghijklmnopqrstuvwxyz'
|
chars = '0123456789abcdefghijklmnopqrstuvwxyz'
|
||||||
sign = '-' if integer < 0 else ''
|
sign = '-' if integer < 0 else ''
|
||||||
|
|
254
personal/zst_blocks.py
Normal file
254
personal/zst_blocks.py
Normal file
|
@ -0,0 +1,254 @@
|
||||||
|
# copied from https://github.com/ArthurHeitmann/zst_blocks_format
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from dataclasses import dataclass
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import struct
|
||||||
|
from typing import BinaryIO, Callable, Iterable, Literal
|
||||||
|
from zstandard import ZstdDecompressor, ZstdCompressor
|
||||||
|
|
||||||
|
_endian: Literal["little", "big"] = "little"
|
||||||
|
|
||||||
|
_uint32Struct = struct.Struct("<I")
|
||||||
|
_uint32X2Struct = struct.Struct("<II")
|
||||||
|
|
||||||
|
_defaultCompressionLevel = 3
|
||||||
|
|
||||||
|
|
||||||
|
class ZstBlocksFile:
|
||||||
|
blocks: list[ZstBlock]
|
||||||
|
|
||||||
|
def __init__(self, blocks: list[ZstBlock]):
|
||||||
|
self.blocks = blocks
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def readBlockRowAt(file: BinaryIO, rowPosition: RowPosition) -> bytes:
|
||||||
|
file.seek(rowPosition.blockOffset)
|
||||||
|
return ZstBlock.readRow(file, rowPosition.rowIndex)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def readMultipleBlocks(file: BinaryIO, rowPositions: list[RowPosition]) -> \
|
||||||
|
list[bytes]:
|
||||||
|
blockGroupsDict: dict[int, RowPositionGroup] = {}
|
||||||
|
for i, rowPosition in enumerate(rowPositions):
|
||||||
|
if rowPosition.blockOffset not in blockGroupsDict:
|
||||||
|
blockGroupsDict[rowPosition.blockOffset] = RowPositionGroup(
|
||||||
|
rowPosition.blockOffset, [])
|
||||||
|
blockGroupsDict[rowPosition.blockOffset].rowIndices.append(
|
||||||
|
RowIndex(rowPosition.rowIndex, i))
|
||||||
|
blockGroups = list(blockGroupsDict.values())
|
||||||
|
|
||||||
|
rows: list = [None] * len(rowPositions)
|
||||||
|
for blockGroup in blockGroups:
|
||||||
|
file.seek(blockGroup.blockOffset)
|
||||||
|
blockRows = ZstBlock.readSpecificRows(file, map(lambda
|
||||||
|
pair: pair.withinBlockIndex,
|
||||||
|
blockGroup.rowIndices))
|
||||||
|
for originalPosition, row in zip(blockGroup.rowIndices, blockRows):
|
||||||
|
rows[originalPosition.originalRowIndex] = row
|
||||||
|
|
||||||
|
return rows
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def streamRows(file: BinaryIO, blockIndexProgressCallback: Callable[[
|
||||||
|
int], None] | None = None) -> Iterable[bytes]:
|
||||||
|
fileSize = os.path.getsize(file.name)
|
||||||
|
blockIndex = 0
|
||||||
|
while file.tell() < fileSize:
|
||||||
|
yield from ZstBlock.streamRows(file)
|
||||||
|
blockIndex += 1
|
||||||
|
if blockIndexProgressCallback is not None:
|
||||||
|
blockIndexProgressCallback(blockIndex)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def appendBlock(file: BinaryIO, rows: list[bytes],
|
||||||
|
compressionLevel=_defaultCompressionLevel) -> None:
|
||||||
|
file.seek(file.tell())
|
||||||
|
ZstBlock(rows).write(file, compressionLevel=compressionLevel)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def writeStream(file: BinaryIO, rowStream: Iterable[bytes], blockSize: int,
|
||||||
|
rowPositions: list[RowPosition] | None = None,
|
||||||
|
compressionLevel=_defaultCompressionLevel) -> None:
|
||||||
|
pendingRows = []
|
||||||
|
for row in rowStream:
|
||||||
|
pendingRows.append(row)
|
||||||
|
if len(pendingRows) >= blockSize:
|
||||||
|
ZstBlock(pendingRows).write(file, rowPositions,
|
||||||
|
compressionLevel=compressionLevel)
|
||||||
|
pendingRows = []
|
||||||
|
if len(pendingRows) > 0:
|
||||||
|
ZstBlock(pendingRows).write(file, rowPositions,
|
||||||
|
compressionLevel=compressionLevel)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def writeBlocksStream(file: BinaryIO, blocksStream: Iterable[list[bytes]],
|
||||||
|
rowPositions: list[RowPosition] | None = None,
|
||||||
|
compressionLevel=_defaultCompressionLevel) -> None:
|
||||||
|
for rows in blocksStream:
|
||||||
|
ZstBlock(rows).write(file, rowPositions,
|
||||||
|
compressionLevel=compressionLevel)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def countBlocks(file: BinaryIO) -> int:
|
||||||
|
fileSize = os.path.getsize(file.name)
|
||||||
|
blockCount = 0
|
||||||
|
initialPos = file.tell()
|
||||||
|
pos = initialPos
|
||||||
|
while pos < fileSize:
|
||||||
|
blockCount += 1
|
||||||
|
blockSize = _uint32Struct.unpack(file.read(4))[0]
|
||||||
|
pos += 4 + blockSize
|
||||||
|
file.seek(pos)
|
||||||
|
file.seek(initialPos)
|
||||||
|
return blockCount
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def generateRowPositions(file: BinaryIO) -> Iterable[RowPosition]:
|
||||||
|
fileSize = os.path.getsize(file.name)
|
||||||
|
while file.tell() < fileSize:
|
||||||
|
yield from ZstBlock.generateRowPositions(file)
|
||||||
|
|
||||||
|
|
||||||
|
class ZstBlock:
|
||||||
|
rows: list[bytes]
|
||||||
|
|
||||||
|
def __init__(self, rows: list[bytes]):
|
||||||
|
self.rows = rows
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def streamRows(cls, file: BinaryIO) -> Iterable[bytes]:
|
||||||
|
compressedSize = _uint32Struct.unpack(file.read(4))[0]
|
||||||
|
compressedData = file.read(compressedSize)
|
||||||
|
decompressedData = ZstdDecompressor().decompress(compressedData)
|
||||||
|
|
||||||
|
memoryView = memoryview(decompressedData)
|
||||||
|
count = _uint32Struct.unpack(memoryView[0:4])[0]
|
||||||
|
rows: list[ZstRowInfo] = [None] * count
|
||||||
|
for i in range(count):
|
||||||
|
rows[i] = ZstRowInfo.read(memoryView, 4 + i * ZstRowInfo.structSize)
|
||||||
|
|
||||||
|
dataStart = 4 + count * ZstRowInfo.structSize
|
||||||
|
for row in rows:
|
||||||
|
yield decompressedData[
|
||||||
|
dataStart + row.offset: dataStart + row.offset + row.size]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def readSpecificRows(cls, file: BinaryIO, rowIndices: Iterable[int]) -> \
|
||||||
|
list[bytes]:
|
||||||
|
compressedSize = _uint32Struct.unpack(file.read(4))[0]
|
||||||
|
compressedData = file.read(compressedSize)
|
||||||
|
decompressedData = ZstdDecompressor().decompress(compressedData)
|
||||||
|
|
||||||
|
memoryView = memoryview(decompressedData)
|
||||||
|
count = _uint32Struct.unpack(memoryView[0:4])[0]
|
||||||
|
rows: list[ZstRowInfo] = [None] * count
|
||||||
|
for i in range(count):
|
||||||
|
rows[i] = ZstRowInfo.read(memoryView, 4 + i * ZstRowInfo.structSize)
|
||||||
|
|
||||||
|
dataStart = 4 + count * ZstRowInfo.structSize
|
||||||
|
return [
|
||||||
|
decompressedData[
|
||||||
|
dataStart + rows[rowIndex].offset: dataStart + rows[
|
||||||
|
rowIndex].offset + rows[rowIndex].size]
|
||||||
|
for rowIndex in rowIndices
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def readRow(cls, file: BinaryIO, rowIndex: int) -> bytes:
|
||||||
|
compressedSize = _uint32Struct.unpack(file.read(4))[0]
|
||||||
|
compressedData = file.read(compressedSize)
|
||||||
|
decompressedData = ZstdDecompressor().decompress(compressedData)
|
||||||
|
|
||||||
|
memoryView = memoryview(decompressedData)
|
||||||
|
count = _uint32Struct.unpack(memoryView[0:4])[0]
|
||||||
|
if rowIndex >= count:
|
||||||
|
raise Exception("Row index out of range")
|
||||||
|
row = ZstRowInfo.read(memoryView, 4 + rowIndex * ZstRowInfo.structSize)
|
||||||
|
|
||||||
|
dataStart = 4 + count * ZstRowInfo.structSize
|
||||||
|
return decompressedData[
|
||||||
|
dataStart + row.offset: dataStart + row.offset + row.size]
|
||||||
|
|
||||||
|
def write(self, file: BinaryIO,
|
||||||
|
rowPositions: list[RowPosition] | None = None,
|
||||||
|
compressionLevel=_defaultCompressionLevel) -> None:
|
||||||
|
uncompressedSize = \
|
||||||
|
4 + \
|
||||||
|
len(self.rows) * ZstRowInfo.structSize + \
|
||||||
|
sum(len(row) for row in self.rows)
|
||||||
|
uncompressedBytes = bytearray(uncompressedSize)
|
||||||
|
uncompressedBytes[0:4] = len(self.rows).to_bytes(4, _endian)
|
||||||
|
|
||||||
|
dataOffset = 4 + len(self.rows) * ZstRowInfo.structSize
|
||||||
|
blockOffset = file.tell()
|
||||||
|
currentDataLocalOffset = 0
|
||||||
|
for i in range(len(self.rows)):
|
||||||
|
row = self.rows[i]
|
||||||
|
rowInfo = ZstRowInfo(currentDataLocalOffset, len(row))
|
||||||
|
rowInfo.write(uncompressedBytes, 4 + i * ZstRowInfo.structSize)
|
||||||
|
uncompressedBytes[
|
||||||
|
dataOffset + currentDataLocalOffset: dataOffset + currentDataLocalOffset + len(
|
||||||
|
row)] = row
|
||||||
|
currentDataLocalOffset += len(row)
|
||||||
|
if rowPositions is not None:
|
||||||
|
rowPositions.append(RowPosition(blockOffset, i))
|
||||||
|
uncompressedData = bytes(uncompressedBytes)
|
||||||
|
compressedData = ZstdCompressor(compressionLevel).compress(
|
||||||
|
uncompressedData)
|
||||||
|
compressedSize = len(compressedData)
|
||||||
|
blockBytes = bytearray(4 + compressedSize)
|
||||||
|
blockBytes[0:4] = compressedSize.to_bytes(4, _endian)
|
||||||
|
blockBytes[4:4 + compressedSize] = compressedData
|
||||||
|
file.write(blockBytes)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def generateRowPositions(file: BinaryIO) -> Iterable[RowPosition]:
|
||||||
|
blockOffset = file.tell()
|
||||||
|
compressedSize = _uint32Struct.unpack(file.read(4))[0]
|
||||||
|
compressedData = file.read(compressedSize)
|
||||||
|
decompressedData = ZstdDecompressor().decompress(compressedData)
|
||||||
|
|
||||||
|
memoryView = memoryview(decompressedData)
|
||||||
|
count = _uint32Struct.unpack(memoryView[0:4])[0]
|
||||||
|
for i in range(count):
|
||||||
|
yield RowPosition(blockOffset, i)
|
||||||
|
|
||||||
|
|
||||||
|
class ZstRowInfo:
|
||||||
|
structSize = 8
|
||||||
|
offset: int
|
||||||
|
size: int
|
||||||
|
|
||||||
|
def __init__(self, offset: int, size: int):
|
||||||
|
self.offset = offset
|
||||||
|
self.size = size
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def read(bytes: bytes, position: int) -> ZstRowInfo:
|
||||||
|
offset, size = _uint32X2Struct.unpack(
|
||||||
|
bytes[position: position + ZstRowInfo.structSize])
|
||||||
|
return ZstRowInfo(offset, size)
|
||||||
|
|
||||||
|
def write(self, bytes: bytearray, position: int) -> None:
|
||||||
|
bytes[position + 0: position + 4] = self.offset.to_bytes(4, _endian)
|
||||||
|
bytes[position + 4: position + 8] = self.size.to_bytes(4, _endian)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RowPosition:
|
||||||
|
blockOffset: int
|
||||||
|
rowIndex: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RowIndex:
|
||||||
|
withinBlockIndex: int
|
||||||
|
originalRowIndex: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RowPositionGroup:
|
||||||
|
blockOffset: int
|
||||||
|
rowIndices: list[RowIndex]
|
|
@ -7,7 +7,7 @@ from datetime import datetime
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
|
|
||||||
# put the path to the input file, or a folder of files to process all of
|
# put the path to the input file, or a folder of files to process all of
|
||||||
input_file = r"\\MYCLOUDPR4100\Public\reddit_test"
|
input_file = r"\\MYCLOUDPR4100\Public\reddit\subreddits/CryptoCurrency_submissions.zst"
|
||||||
# put the name or path to the output file. The file extension from below will be added automatically. If the input file is a folder, the output will be treated as a folder as well
|
# put the name or path to the output file. The file extension from below will be added automatically. If the input file is a folder, the output will be treated as a folder as well
|
||||||
output_file = r"\\MYCLOUDPR4100\Public\output"
|
output_file = r"\\MYCLOUDPR4100\Public\output"
|
||||||
# the format to output in, pick from the following options
|
# the format to output in, pick from the following options
|
||||||
|
@ -28,8 +28,8 @@ single_field = None
|
||||||
write_bad_lines = True
|
write_bad_lines = True
|
||||||
|
|
||||||
# only output items between these two dates
|
# only output items between these two dates
|
||||||
from_date = datetime.strptime("2005-01-01", "%Y-%m-%d")
|
from_date = datetime.strptime("2022-01-01", "%Y-%m-%d")
|
||||||
to_date = datetime.strptime("2025-01-01", "%Y-%m-%d")
|
to_date = datetime.strptime("2022-12-31", "%Y-%m-%d")
|
||||||
|
|
||||||
# the field to filter on, the values to filter with and whether it should be an exact match
|
# the field to filter on, the values to filter with and whether it should be an exact match
|
||||||
# some examples:
|
# some examples:
|
||||||
|
@ -76,7 +76,7 @@ to_date = datetime.strptime("2025-01-01", "%Y-%m-%d")
|
||||||
# if you want only top level comments instead of all comments, you can set field to "parent_id" instead of "link_id"
|
# if you want only top level comments instead of all comments, you can set field to "parent_id" instead of "link_id"
|
||||||
|
|
||||||
field = "title"
|
field = "title"
|
||||||
values = ['post race discussion']
|
values = ['']
|
||||||
# if you have a long list of values, you can put them in a file and put the filename here. If set this overrides the value list above
|
# if you have a long list of values, you can put them in a file and put the filename here. If set this overrides the value list above
|
||||||
# if this list is very large, it could greatly slow down the process
|
# if this list is very large, it could greatly slow down the process
|
||||||
values_file = None
|
values_file = None
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue