diff --git a/personal/combine/build_month.py b/personal/combine/build_month.py index aad50b7..b5c2abd 100644 --- a/personal/combine/build_month.py +++ b/personal/combine/build_month.py @@ -142,6 +142,7 @@ if __name__ == "__main__": log.info(f"Input folder: {args.input}") log.info(f"Output folder: {args.output}") + log.info(f"Month: {args.month}") log.info(f"Compression level: {level}") prefix = None diff --git a/personal/diagnostic/test_file.py b/personal/diagnostic/test_file.py index 0a04ff6..923ce9e 100644 --- a/personal/diagnostic/test_file.py +++ b/personal/diagnostic/test_file.py @@ -8,7 +8,7 @@ log = discord_logging.init_logging() if __name__ == "__main__": - input_path = r"\\MYCLOUDPR4100\Public\ingest\combined\comments\RC_23-07-10.zst" + input_path = r"\\MYCLOUDPR4100\Public\reddit\comments\RC_2023-09.zst" input_file_paths = [] if os.path.isdir(input_path): @@ -32,7 +32,7 @@ if __name__ == "__main__": for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path): new_timestamp = int(obj['created_utc']) created = datetime.utcfromtimestamp(new_timestamp) - if previous_timestamp is not None and previous_timestamp - (60 * 60) > new_timestamp: + if previous_timestamp is not None and previous_timestamp - (2) > new_timestamp: log.warning(f"Out of order timestamps {datetime.utcfromtimestamp(previous_timestamp).strftime('%Y-%m-%d %H:%M:%S')} - 4 hours > {created.strftime('%Y-%m-%d %H:%M:%S')}") previous_timestamp = new_timestamp file_lines += 1 diff --git a/personal/transform/split_blocks_by_minutes.py b/personal/transform/split_blocks_by_minutes.py new file mode 100644 index 0000000..852fb1a --- /dev/null +++ b/personal/transform/split_blocks_by_minutes.py @@ -0,0 +1,46 @@ +import discord_logging +import os +import zstandard +from datetime import datetime +import json + +log = discord_logging.init_logging() + +import utils + +NEWLINE_ENCODED = "\n".encode('utf-8') + + +if __name__ == "__main__": + input_file = r"\\MYCLOUDPR4100\Public\reddit\blocks\RS_2023-10.zst_blocks" + output_folder = r"\\MYCLOUDPR4100\Public\ingest\download" + file_type = "comments" if "RC" in input_file else "submissions" + + log.info(f"Input: {input_file} - Output: {output_folder}") + previous_minute, output_handle, created_utc = None, None, None + count_objects, count_minute = 0, 0 + for obj in utils.read_obj_zst_blocks(input_file): + created_utc = datetime.utcfromtimestamp(obj["created_utc"]) + current_minute = created_utc.replace(second=0) + + if previous_minute is None or current_minute > previous_minute: + log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}") + previous_minute = current_minute + count_minute = 0 + if output_handle is not None: + output_handle.close() + + output_path = os.path.join(output_folder, file_type, created_utc.strftime('%y-%m-%d')) + if not os.path.exists(output_path): + os.makedirs(output_path) + output_path = os.path.join(output_path, f"{('RC' if file_type == 'comments' else 'RS')}_{created_utc.strftime('%y-%m-%d_%H-%M')}.zst") + output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb')) + + count_objects += 1 + count_minute += 1 + output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8')) + output_handle.write(NEWLINE_ENCODED) + + log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}") + if output_handle is not None: + output_handle.close() diff --git a/personal/transform/split_by_minutes.py b/personal/transform/split_by_minutes.py index cefc5a4..5fd705b 100644 --- a/personal/transform/split_by_minutes.py +++ b/personal/transform/split_by_minutes.py @@ -12,7 +12,7 @@ NEWLINE_ENCODED = "\n".encode('utf-8') if __name__ == "__main__": - input_file = r"\\MYCLOUDPR4100\Public\RC_2023-08.zst" + input_file = r"\\MYCLOUDPR4100\Public\RS_2023-09.zst" output_folder = r"\\MYCLOUDPR4100\Public\ingest\download" file_type = "comments" if "RC" in input_file else "submissions" diff --git a/personal/utils.py b/personal/utils.py index f675c9e..080aa06 100644 --- a/personal/utils.py +++ b/personal/utils.py @@ -1,5 +1,7 @@ import zstandard import json +import os +from zst_blocks import ZstBlocksFile def read_obj_zst(file_name): @@ -75,6 +77,14 @@ class OutputZst: return True +# copied from https://github.com/ArthurHeitmann/zst_blocks_format +def read_obj_zst_blocks(file_name): + with open(file_name, "rb") as file: + for row in ZstBlocksFile.streamRows(file): + line = row.decode() + yield json.loads(line.strip()) + + def base36encode(integer: int) -> str: chars = '0123456789abcdefghijklmnopqrstuvwxyz' sign = '-' if integer < 0 else '' diff --git a/personal/zst_blocks.py b/personal/zst_blocks.py new file mode 100644 index 0000000..e911f42 --- /dev/null +++ b/personal/zst_blocks.py @@ -0,0 +1,254 @@ +# copied from https://github.com/ArthurHeitmann/zst_blocks_format + +from __future__ import annotations +from dataclasses import dataclass +import os +import time +import struct +from typing import BinaryIO, Callable, Iterable, Literal +from zstandard import ZstdDecompressor, ZstdCompressor + +_endian: Literal["little", "big"] = "little" + +_uint32Struct = struct.Struct(" bytes: + file.seek(rowPosition.blockOffset) + return ZstBlock.readRow(file, rowPosition.rowIndex) + + @staticmethod + def readMultipleBlocks(file: BinaryIO, rowPositions: list[RowPosition]) -> \ + list[bytes]: + blockGroupsDict: dict[int, RowPositionGroup] = {} + for i, rowPosition in enumerate(rowPositions): + if rowPosition.blockOffset not in blockGroupsDict: + blockGroupsDict[rowPosition.blockOffset] = RowPositionGroup( + rowPosition.blockOffset, []) + blockGroupsDict[rowPosition.blockOffset].rowIndices.append( + RowIndex(rowPosition.rowIndex, i)) + blockGroups = list(blockGroupsDict.values()) + + rows: list = [None] * len(rowPositions) + for blockGroup in blockGroups: + file.seek(blockGroup.blockOffset) + blockRows = ZstBlock.readSpecificRows(file, map(lambda + pair: pair.withinBlockIndex, + blockGroup.rowIndices)) + for originalPosition, row in zip(blockGroup.rowIndices, blockRows): + rows[originalPosition.originalRowIndex] = row + + return rows + + @staticmethod + def streamRows(file: BinaryIO, blockIndexProgressCallback: Callable[[ + int], None] | None = None) -> Iterable[bytes]: + fileSize = os.path.getsize(file.name) + blockIndex = 0 + while file.tell() < fileSize: + yield from ZstBlock.streamRows(file) + blockIndex += 1 + if blockIndexProgressCallback is not None: + blockIndexProgressCallback(blockIndex) + + @staticmethod + def appendBlock(file: BinaryIO, rows: list[bytes], + compressionLevel=_defaultCompressionLevel) -> None: + file.seek(file.tell()) + ZstBlock(rows).write(file, compressionLevel=compressionLevel) + + @staticmethod + def writeStream(file: BinaryIO, rowStream: Iterable[bytes], blockSize: int, + rowPositions: list[RowPosition] | None = None, + compressionLevel=_defaultCompressionLevel) -> None: + pendingRows = [] + for row in rowStream: + pendingRows.append(row) + if len(pendingRows) >= blockSize: + ZstBlock(pendingRows).write(file, rowPositions, + compressionLevel=compressionLevel) + pendingRows = [] + if len(pendingRows) > 0: + ZstBlock(pendingRows).write(file, rowPositions, + compressionLevel=compressionLevel) + + @staticmethod + def writeBlocksStream(file: BinaryIO, blocksStream: Iterable[list[bytes]], + rowPositions: list[RowPosition] | None = None, + compressionLevel=_defaultCompressionLevel) -> None: + for rows in blocksStream: + ZstBlock(rows).write(file, rowPositions, + compressionLevel=compressionLevel) + + @staticmethod + def countBlocks(file: BinaryIO) -> int: + fileSize = os.path.getsize(file.name) + blockCount = 0 + initialPos = file.tell() + pos = initialPos + while pos < fileSize: + blockCount += 1 + blockSize = _uint32Struct.unpack(file.read(4))[0] + pos += 4 + blockSize + file.seek(pos) + file.seek(initialPos) + return blockCount + + @staticmethod + def generateRowPositions(file: BinaryIO) -> Iterable[RowPosition]: + fileSize = os.path.getsize(file.name) + while file.tell() < fileSize: + yield from ZstBlock.generateRowPositions(file) + + +class ZstBlock: + rows: list[bytes] + + def __init__(self, rows: list[bytes]): + self.rows = rows + + @classmethod + def streamRows(cls, file: BinaryIO) -> Iterable[bytes]: + compressedSize = _uint32Struct.unpack(file.read(4))[0] + compressedData = file.read(compressedSize) + decompressedData = ZstdDecompressor().decompress(compressedData) + + memoryView = memoryview(decompressedData) + count = _uint32Struct.unpack(memoryView[0:4])[0] + rows: list[ZstRowInfo] = [None] * count + for i in range(count): + rows[i] = ZstRowInfo.read(memoryView, 4 + i * ZstRowInfo.structSize) + + dataStart = 4 + count * ZstRowInfo.structSize + for row in rows: + yield decompressedData[ + dataStart + row.offset: dataStart + row.offset + row.size] + + @classmethod + def readSpecificRows(cls, file: BinaryIO, rowIndices: Iterable[int]) -> \ + list[bytes]: + compressedSize = _uint32Struct.unpack(file.read(4))[0] + compressedData = file.read(compressedSize) + decompressedData = ZstdDecompressor().decompress(compressedData) + + memoryView = memoryview(decompressedData) + count = _uint32Struct.unpack(memoryView[0:4])[0] + rows: list[ZstRowInfo] = [None] * count + for i in range(count): + rows[i] = ZstRowInfo.read(memoryView, 4 + i * ZstRowInfo.structSize) + + dataStart = 4 + count * ZstRowInfo.structSize + return [ + decompressedData[ + dataStart + rows[rowIndex].offset: dataStart + rows[ + rowIndex].offset + rows[rowIndex].size] + for rowIndex in rowIndices + ] + + @classmethod + def readRow(cls, file: BinaryIO, rowIndex: int) -> bytes: + compressedSize = _uint32Struct.unpack(file.read(4))[0] + compressedData = file.read(compressedSize) + decompressedData = ZstdDecompressor().decompress(compressedData) + + memoryView = memoryview(decompressedData) + count = _uint32Struct.unpack(memoryView[0:4])[0] + if rowIndex >= count: + raise Exception("Row index out of range") + row = ZstRowInfo.read(memoryView, 4 + rowIndex * ZstRowInfo.structSize) + + dataStart = 4 + count * ZstRowInfo.structSize + return decompressedData[ + dataStart + row.offset: dataStart + row.offset + row.size] + + def write(self, file: BinaryIO, + rowPositions: list[RowPosition] | None = None, + compressionLevel=_defaultCompressionLevel) -> None: + uncompressedSize = \ + 4 + \ + len(self.rows) * ZstRowInfo.structSize + \ + sum(len(row) for row in self.rows) + uncompressedBytes = bytearray(uncompressedSize) + uncompressedBytes[0:4] = len(self.rows).to_bytes(4, _endian) + + dataOffset = 4 + len(self.rows) * ZstRowInfo.structSize + blockOffset = file.tell() + currentDataLocalOffset = 0 + for i in range(len(self.rows)): + row = self.rows[i] + rowInfo = ZstRowInfo(currentDataLocalOffset, len(row)) + rowInfo.write(uncompressedBytes, 4 + i * ZstRowInfo.structSize) + uncompressedBytes[ + dataOffset + currentDataLocalOffset: dataOffset + currentDataLocalOffset + len( + row)] = row + currentDataLocalOffset += len(row) + if rowPositions is not None: + rowPositions.append(RowPosition(blockOffset, i)) + uncompressedData = bytes(uncompressedBytes) + compressedData = ZstdCompressor(compressionLevel).compress( + uncompressedData) + compressedSize = len(compressedData) + blockBytes = bytearray(4 + compressedSize) + blockBytes[0:4] = compressedSize.to_bytes(4, _endian) + blockBytes[4:4 + compressedSize] = compressedData + file.write(blockBytes) + + @staticmethod + def generateRowPositions(file: BinaryIO) -> Iterable[RowPosition]: + blockOffset = file.tell() + compressedSize = _uint32Struct.unpack(file.read(4))[0] + compressedData = file.read(compressedSize) + decompressedData = ZstdDecompressor().decompress(compressedData) + + memoryView = memoryview(decompressedData) + count = _uint32Struct.unpack(memoryView[0:4])[0] + for i in range(count): + yield RowPosition(blockOffset, i) + + +class ZstRowInfo: + structSize = 8 + offset: int + size: int + + def __init__(self, offset: int, size: int): + self.offset = offset + self.size = size + + @staticmethod + def read(bytes: bytes, position: int) -> ZstRowInfo: + offset, size = _uint32X2Struct.unpack( + bytes[position: position + ZstRowInfo.structSize]) + return ZstRowInfo(offset, size) + + def write(self, bytes: bytearray, position: int) -> None: + bytes[position + 0: position + 4] = self.offset.to_bytes(4, _endian) + bytes[position + 4: position + 8] = self.size.to_bytes(4, _endian) + + +@dataclass +class RowPosition: + blockOffset: int + rowIndex: int + + +@dataclass +class RowIndex: + withinBlockIndex: int + originalRowIndex: int + + +@dataclass +class RowPositionGroup: + blockOffset: int + rowIndices: list[RowIndex] diff --git a/scripts/filter_file.py b/scripts/filter_file.py index 0f5850e..6b85d1f 100644 --- a/scripts/filter_file.py +++ b/scripts/filter_file.py @@ -7,7 +7,7 @@ from datetime import datetime import logging.handlers # put the path to the input file, or a folder of files to process all of -input_file = r"\\MYCLOUDPR4100\Public\reddit_test" +input_file = r"\\MYCLOUDPR4100\Public\reddit\subreddits/CryptoCurrency_submissions.zst" # put the name or path to the output file. The file extension from below will be added automatically. If the input file is a folder, the output will be treated as a folder as well output_file = r"\\MYCLOUDPR4100\Public\output" # the format to output in, pick from the following options @@ -28,8 +28,8 @@ single_field = None write_bad_lines = True # only output items between these two dates -from_date = datetime.strptime("2005-01-01", "%Y-%m-%d") -to_date = datetime.strptime("2025-01-01", "%Y-%m-%d") +from_date = datetime.strptime("2022-01-01", "%Y-%m-%d") +to_date = datetime.strptime("2022-12-31", "%Y-%m-%d") # the field to filter on, the values to filter with and whether it should be an exact match # some examples: @@ -76,7 +76,7 @@ to_date = datetime.strptime("2025-01-01", "%Y-%m-%d") # if you want only top level comments instead of all comments, you can set field to "parent_id" instead of "link_id" field = "title" -values = ['post race discussion'] +values = [''] # if you have a long list of values, you can put them in a file and put the filename here. If set this overrides the value list above # if this list is very large, it could greatly slow down the process values_file = None