From bd7378ff916dcaa16be2847f492cf5c84ff116b3 Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Sat, 4 Sep 2021 23:17:53 -0700 Subject: [PATCH] Initial commit --- .gitignore | 6 + Pipfile | 12 + Pipfile.lock | 75 ++++++ README.md | 1 + scripts/combine_folder_multiprocess.py | 314 +++++++++++++++++++++++++ scripts/iterate_folder.py | 62 +++++ scripts/single_file.py | 55 +++++ 7 files changed, 525 insertions(+) create mode 100644 .gitignore create mode 100644 Pipfile create mode 100644 Pipfile.lock create mode 100644 README.md create mode 100644 scripts/combine_folder_multiprocess.py create mode 100644 scripts/iterate_folder.py create mode 100644 scripts/single_file.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4a71093 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea/* +logs/* +__pycache__/* +*.db +*.ini +*.txt \ No newline at end of file diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..03819f5 --- /dev/null +++ b/Pipfile @@ -0,0 +1,12 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +zstandard = "*" + +[dev-packages] + +[requires] +python_version = "3.8" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..2f92ed1 --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,75 @@ +{ + "_meta": { + "hash": { + "sha256": "03662c664bce0cd9fad7ea3062bb09bfe999f755a00e0f4de720c66955ccae62" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.8" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "zstandard": { + "hashes": [ + "sha256:1c5ef399f81204fbd9f0df3debf80389fd8aa9660fe1746d37c80b0d45f809e9", + "sha256:1faefe33e3d6870a4dce637bcb41f7abb46a1872a595ecc7b034016081c37543", + "sha256:1fb23b1754ce834a3a1a1e148cc2faad76eeadf9d889efe5e8199d3fb839d3c6", + "sha256:22f127ff5da052ffba73af146d7d61db874f5edb468b36c9cb0b857316a21b3d", + "sha256:2353b61f249a5fc243aae3caa1207c80c7e6919a58b1f9992758fa496f61f839", + "sha256:24cdcc6f297f7c978a40fb7706877ad33d8e28acc1786992a52199502d6da2a4", + "sha256:31e35790434da54c106f05fa93ab4d0fab2798a6350e8a73928ec602e8505836", + "sha256:3547ff4eee7175d944a865bbdf5529b0969c253e8a148c287f0668fe4eb9c935", + "sha256:378ac053c0cfc74d115cbb6ee181540f3e793c7cca8ed8cd3893e338af9e942c", + "sha256:3e1cd2db25117c5b7c7e86a17cde6104a93719a9df7cb099d7498e4c1d13ee5c", + "sha256:3fe469a887f6142cc108e44c7f42c036e43620ebaf500747be2317c9f4615d4f", + "sha256:4800ab8ec94cbf1ed09c2b4686288750cab0642cb4d6fba2a56db66b923aeb92", + "sha256:52de08355fd5cfb3ef4533891092bb96229d43c2069703d4aff04fdbedf9c92f", + "sha256:5752f44795b943c99be367fee5edf3122a1690b0d1ecd1bd5ec94c7fd2c39c94", + "sha256:5d53f02aeb8fdd48b88bc80bece82542d084fb1a7ba03bf241fd53b63aee4f22", + "sha256:69b7a5720b8dfab9005a43c7ddb2e3ccacbb9a2442908ae4ed49dd51ab19698a", + "sha256:6cc162b5b6e3c40b223163a9ea86cd332bd352ddadb5fd142fc0706e5e4eaaff", + "sha256:6f5d0330bc992b1e267a1b69fbdbb5ebe8c3a6af107d67e14c7a5b1ede2c5945", + "sha256:6ffadd48e6fe85f27ca3ca10cfd3ef3d0f933bef7316870285ffeb58d791ca9c", + "sha256:72a011678c654df8323aa7b687e3147749034fdbe994d346f139ab9702b59cea", + "sha256:77d26452676f471223571efd73131fd4a626622c7960458aab2763e025836fc5", + "sha256:7a88cc773ffe55992ff7259a8df5fb3570168d7138c69aadba40142d0e5ce39a", + "sha256:7b16bd74ae7bfbaca407a127e11058b287a4267caad13bd41305a5e630472549", + "sha256:855d95ec78b6f0ff66e076d5461bf12d09d8e8f7e2b3fc9de7236d1464fd730e", + "sha256:8baf7991547441458325ca8fafeae79ef1501cb4354022724f3edd62279c5b2b", + "sha256:8fb77dd152054c6685639d855693579a92f276b38b8003be5942de31d241ebfb", + "sha256:92d49cc3b49372cfea2d42f43a2c16a98a32a6bc2f42abcde121132dbfc2f023", + "sha256:94d0de65e37f5677165725f1fc7fb1616b9542d42a9832a9a0bdcba0ed68b63b", + "sha256:9867206093d7283d7de01bd2bf60389eb4d19b67306a0a763d1a8a4dbe2fb7c3", + "sha256:9ee3c992b93e26c2ae827404a626138588e30bdabaaf7aa3aa25082a4e718790", + "sha256:a4f8af277bb527fa3d56b216bda4da931b36b2d3fe416b6fc1744072b2c1dbd9", + "sha256:ab9f19460dfa4c5dd25431b75bee28b5f018bf43476858d64b1aa1046196a2a0", + "sha256:ac43c1821ba81e9344d818c5feed574a17f51fca27976ff7d022645c378fbbf5", + "sha256:af5a011609206e390b44847da32463437505bf55fd8985e7a91c52d9da338d4b", + "sha256:b0975748bb6ec55b6d0f6665313c2cf7af6f536221dccd5879b967d76f6e7899", + "sha256:b4963dad6cf28bfe0b61c3265d1c74a26a7605df3445bfcd3ba25de012330b2d", + "sha256:b7d3a484ace91ed827aa2ef3b44895e2ec106031012f14d28bd11a55f24fa734", + "sha256:bd3c478a4a574f412efc58ba7e09ab4cd83484c545746a01601636e87e3dbf23", + "sha256:c9e2dcb7f851f020232b991c226c5678dc07090256e929e45a89538d82f71d2e", + "sha256:d25c8eeb4720da41e7afbc404891e3a945b8bb6d5230e4c53d23ac4f4f9fc52c", + "sha256:dc8c03d0c5c10c200441ffb4cce46d869d9e5c4ef007f55856751dc288a2dffd", + "sha256:ec58e84d625553d191a23d5988a19c3ebfed519fff2a8b844223e3f074152163", + "sha256:eda0719b29792f0fea04a853377cfff934660cb6cd72a0a0eeba7a1f0df4a16e", + "sha256:edde82ce3007a64e8434ccaf1b53271da4f255224d77b880b59e7d6d73df90c8", + "sha256:f36722144bc0a5068934e51dca5a38a5b4daac1be84f4423244277e4baf24e7a", + "sha256:f8bb00ced04a8feff05989996db47906673ed45b11d86ad5ce892b5741e5f9dd", + "sha256:f98fc5750aac2d63d482909184aac72a979bfd123b112ec53fd365104ea15b1c", + "sha256:ff5b75f94101beaa373f1511319580a010f6e03458ee51b1a386d7de5331440a" + ], + "index": "pypi", + "version": "==0.15.2" + } + }, + "develop": {} +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..8739277 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +coming soon \ No newline at end of file diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py new file mode 100644 index 0000000..7270c4b --- /dev/null +++ b/scripts/combine_folder_multiprocess.py @@ -0,0 +1,314 @@ +import zstandard +import os +import json +import sys +import time +import argparse +from datetime import datetime +import logging.handlers +import multiprocessing + + +# sets up logging to the console as well as a file +log = logging.getLogger("bot") +log.setLevel(logging.INFO) +log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s') + +log_stderr_handler = logging.StreamHandler() +log_stderr_handler.setFormatter(log_formatter) +log.addHandler(log_stderr_handler) +if not os.path.exists("logs"): + os.makedirs("logs") +log_file_handler = logging.handlers.RotatingFileHandler( + os.path.join("logs", "bot.log"), maxBytes=1024*1024*16, backupCount=5) +log_file_handler.setFormatter(log_formatter) +log.addHandler(log_file_handler) + + +# this script iterates through zst compressed ndjson files, like the pushshift reddit dumps, loads each line +# and passes it into the save_obj function, if it function returns true for a line, it's written out into a +# separate file for that month. After all the ndjson files are processed, it iterates through the resulting +# files and combines them into a final file. + +# once complete, the combined file can easily be processed like +# with open(file_path, 'r') as file: +# for line in file: +# obj = json.loads(line) + +# features: +# - multiple processes in parallel to maximize drive read and decompression +# - saves state as it completes each file and picks up where it stopped +# - detailed progress indicators + + +# convenience object used to pass status information between processes +class FileConfig: + def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0): + self.input_path = input_path + self.output_path = output_path + self.file_size = os.stat(input_path).st_size + self.complete = complete + self.bytes_processed = self.file_size if complete else 0 + self.lines_processed = lines_processed if complete else 0 + self.error_message = None + self.error_lines = error_lines + + def __str__(self): + return f"{self.input_path} : {self.output_path} : {self.file_size} : {self.complete} : {self.bytes_processed} : {self.lines_processed}" + + +# used for calculating running average of read speed +class Queue: + def __init__(self, max_size): + self.list = [] + self.max_size = max_size + + def put(self, item): + if len(self.list) >= self.max_size: + self.list.pop(0) + self.list.append(item) + + def peek(self): + return self.list[0] if len(self.list) > 0 else None + + +# builds file paths +def folder_helper(output_folder, output_file_name): + working_folder = os.path.join(output_folder, "pushshift_working_dir_" + output_file_name) + status_file = os.path.join(working_folder, output_file_name + ".json") + return working_folder, status_file + + +# save file information and progress to a json file +# we don't want to save the whole FileConfig object, since some of the info resets if we restart +def save_file_list(input_files, output_folder, output_file_name): + working_folder, status_json_file_name = folder_helper(output_folder, output_file_name) + if not os.path.exists(working_folder): + os.makedirs(working_folder) + simple_file_list = [] + for file in input_files: + simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines]) + with open(status_json_file_name, 'w') as status_json_file: + status_json_file.write(json.dumps(simple_file_list, indent=4)) + + +# load file information from the json file and recalculate file sizes +def load_file_list(output_folder, output_file_name): + _, status_json_file_name = folder_helper(output_folder, output_file_name) + if os.path.exists(status_json_file_name): + with open(status_json_file_name, 'r') as status_json_file: + simple_file_list = json.load(status_json_file) + input_files = [] + for simple_file in simple_file_list: + input_files.append( + FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4]) + ) + return input_files + else: + return None + + +# open a zst compressed ndjson file and yield lines one at a time +# also passes back file progress +def read_lines_zst(file_name): + with open(file_name, 'rb') as file_handle: + buffer = '' + reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) + while True: + chunk = reader.read(2**27).decode() + if not chunk: + break + lines = (buffer + chunk).split("\n") + + for line in lines[:-1]: + yield line, file_handle.tell() + + buffer = lines[-1] + reader.close() + + +# base of each separate process. Loads a file, iterates through lines and writes out +# the ones where save_obj() returns true. Also passes status information back to the parent via a queue +def process_file(file, working_folder, queue, field, value): + output_file = None + try: + for line, file_bytes_processed in read_lines_zst(file.input_path): + try: + obj = json.loads(line) + if obj[field] == value: + if output_file is None: + if file.output_path is None: + created = datetime.utcfromtimestamp(int(obj['created_utc'])) + file.output_path = os.path.join(working_folder, created.strftime("%Y-%m")) + output_file = open(file.output_path, 'w') + output_file.write(line) + output_file.write("\n") + except (KeyError, json.JSONDecodeError) as err: + file.error_lines += 1 + file.lines_processed += 1 + if file.lines_processed % 1000000 == 0: + file.bytes_processed = file_bytes_processed + queue.put(file) + + if output_file is not None: + output_file.close() + + file.complete = True + file.bytes_processed = file.file_size + except Exception as err: + file.error_message = str(err) + queue.put(file) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files") + parser.add_argument("input", help="The input folder to read files from") + parser.add_argument("output", help="The output folder to store temporary files in and write the output to") + parser.add_argument("--name", help="What to name the output file", default="pushshift") + parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit") + parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value", default="pushshift") + parser.add_argument("--processes", help="Number of processes to use", default=10, type=int) + parser.add_argument( + "--error_rate", help= + "Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, " + "there are a number of posts that simply don't have a subreddit attached", default=1, type=int) + parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False) + args = parser.parse_args() + + if args.debug: + log.setLevel(logging.DEBUG) + + log.info(f"Loading files from: {args.input}") + log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.txt'))}") + + multiprocessing.set_start_method('spawn') + queue = multiprocessing.Manager().Queue() + input_files = load_file_list(args.output, args.name) + working_folder, _ = folder_helper(args.output, args.name) + # if the file list wasn't loaded from the json, this is the first run, find what files we need to process + if input_files is None: + input_files = [] + for subdir, dirs, files in os.walk(args.input): + files.sort() + for file_name in files: + if file_name.endswith(".zst"): + input_path = os.path.join(subdir, file_name) + input_files.append(FileConfig(input_path)) + + save_file_list(input_files, args.output, args.name) + + files_processed = 0 + total_bytes = 0 + total_bytes_processed = 0 + total_lines_processed = 0 + files_to_process = [] + # calculate the total file size for progress reports, build a list of incomplete files to process + for file in input_files: + total_bytes += file.file_size + if file.complete: + files_processed += 1 + total_lines_processed += file.lines_processed + total_bytes_processed += file.file_size + else: + files_to_process.append(file) + + log.info(f"Processed {files_processed} of {len(input_files)} files with {(total_bytes_processed / (2**30)):.2f} of {(total_bytes / (2**30)):.2f} gigabytes") + + start_time = time.time() + if len(files_to_process): + progress_queue = Queue(40) + progress_queue.put([start_time, total_lines_processed, total_bytes_processed]) + speed_queue = Queue(40) + for file in files_to_process: + log.debug(f"Processing file: {file.input_path}") + # start the workers + with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool: + workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, args.value) for file in files_to_process], error_callback=log.info) + while not workers.ready(): + # loop until the workers are all done, pulling in status messages as they are sent + file_update = queue.get() + if file_update.error_message is not None: + log.warning(f"File failed {file_update.input_path}: {file_update.error_message}") + continue + # I'm going to assume that the list of files is short enough that it's no + # big deal to just iterate each time since that saves a bunch of work + total_lines_processed = 0 + total_bytes_processed = 0 + total_lines_errored = 0 + files_processed = 0 + i = 0 + for file in input_files: + if file.input_path == file_update.input_path: + input_files[i] = file_update + file = file_update + total_lines_processed += file.lines_processed + total_bytes_processed += file.bytes_processed + total_lines_errored += file.error_lines + files_processed += 1 if file.complete else 0 + i += 1 + if file_update.complete: + save_file_list(input_files, args.output, args.name) + current_time = time.time() + progress_queue.put([current_time, total_lines_processed, total_bytes_processed]) + + first_time, first_lines, first_bytes = progress_queue.peek() + bytes_per_second = int((total_bytes_processed - first_bytes)/(current_time - first_time)) + speed_queue.put(bytes_per_second) + seconds_left = int((total_bytes - total_bytes_processed) / int(sum(speed_queue.list) / len(speed_queue.list))) + minutes_left = int(seconds_left / 60) + hours_left = int(minutes_left / 60) + days_left = int(hours_left / 24) + + log.info( + f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : " + f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : " + f"{files_processed}/{len(input_files)} files : " + f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining") + + log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}") + + working_file_paths = [] + count_incomplete = 0 + # build a list of output files to combine + for file in input_files: + if not file.complete: + log.info(f"File {file.input_path} is not marked as complete") + count_incomplete += 1 + else: + if file.error_lines > file.lines_processed * (args.error_rate * 0.01): + log.info( + f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}, " + f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%") + count_incomplete += 1 + elif file.output_path is not None: + if not os.path.exists(file.output_path): + log.info(f"Output file {file.output_path} doesn't exist") + count_incomplete += 1 + else: + working_file_paths.append(file.output_path) + + if count_incomplete > 0: + log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting") + sys.exit() + + log.info(f"Processing complete, combining {len(working_file_paths)} result files") + + output_lines = 0 + output_file_path = os.path.join(args.output, args.name + ".txt") + # combine all the output files into the final results file + with open(output_file_path, 'w') as output_file: + i = 0 + for working_file_path in working_file_paths: + i += 1 + log.info(f"Reading {i}/{len(working_file_paths)}") + with open(working_file_path, 'r') as input_file: + for line in input_file.readlines(): + output_lines += 1 + output_file.write(line) + + log.info(f"Finished combining files, {output_lines:,} lines written to {output_file_path}") + + +# test file sorting +# compress results +# example command line call in comment diff --git a/scripts/iterate_folder.py b/scripts/iterate_folder.py new file mode 100644 index 0000000..317b01a --- /dev/null +++ b/scripts/iterate_folder.py @@ -0,0 +1,62 @@ +import zstandard +import os +import json +import sys +from datetime import datetime +import logging.handlers + + +log = logging.getLogger("bot") +log.setLevel(logging.DEBUG) +log.addHandler(logging.StreamHandler()) + + +def read_lines_zst(file_name): + with open(file_name, 'rb') as file_handle: + buffer = '' + reader = zstandard.ZstdDecompressor(max_window_size=2**28).stream_reader(file_handle) + while True: + chunk = reader.read(2**24).decode() # read a 16mb chunk at a time. There are some really big comments + if not chunk: + break + lines = (buffer + chunk).split("\n") + + for line in lines[:-1]: + yield line, file_handle.tell() + + buffer = lines[-1] + reader.close() + + +input_folder = sys.argv[1] +input_files = [] +total_size = 0 +for subdir, dirs, files in os.walk(input_folder): + for filename in files: + input_path = os.path.join(subdir, filename) + if input_path.endswith(".zst"): + file_size = os.stat(input_path).st_size + total_size += file_size + input_files.append([input_path, file_size]) + +log.info(f"Processing {len(input_files)} files of {(total_size / (2**30)):.2f} gigabytes") + +total_lines = 0 +total_bytes_processed = 0 +for input_file in input_files: + file_lines = 0 + file_bytes_processed = 0 + created = None + for line, file_bytes_processed in read_lines_zst(input_file[0]): + obj = json.loads(line) + created = datetime.utcfromtimestamp(int(obj['created_utc'])) + file_lines += 1 + if file_lines == 1: + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : 0% : {(total_bytes_processed / total_size) * 100:.0f}%") + if file_lines % 100000 == 0: + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : {(file_bytes_processed / input_file[1]) * 100:.0f}% : {(total_bytes_processed / total_size) * 100:.0f}%") + total_lines += file_lines + total_bytes_processed += input_file[1] + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {total_lines:,} : 100% : {(total_bytes_processed / total_size) * 100:.0f}%") + +log.info(f"Total: {total_lines}") diff --git a/scripts/single_file.py b/scripts/single_file.py new file mode 100644 index 0000000..23e9dea --- /dev/null +++ b/scripts/single_file.py @@ -0,0 +1,55 @@ +import zstandard +import os +import json +import sys +from datetime import datetime +import logging.handlers + + +log = logging.getLogger("bot") +log.setLevel(logging.DEBUG) +log.addHandler(logging.StreamHandler()) + + +def read_lines_zst(file_name): + with open(file_name, 'rb') as file_handle: + buffer = '' + reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) + while True: + chunk = reader.read(2**27).decode() + if not chunk: + break + lines = (buffer + chunk).split("\n") + + for line in lines[:-1]: + yield line, file_handle.tell() + + buffer = lines[-1] + reader.close() + + +if __name__ == "__main__": + file_path = r"\\MYCLOUDPR4100\Public\reddit\submissions\RS_2013-03.zst" + file_size = os.stat(file_path).st_size + file_lines = 0 + file_bytes_processed = 0 + created = None + field = "subreddit" + value = "wallstreetbets" + bad_lines = 0 + try: + for line, file_bytes_processed in read_lines_zst(file_path): + try: + obj = json.loads(line) + created = datetime.utcfromtimestamp(int(obj['created_utc'])) + temp = obj[field] == value + except (KeyError, json.JSONDecodeError) as err: + bad_lines += 1 + file_lines += 1 + if file_lines % 100000 == 0: + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%") + except Exception as err: + log.info(err) + + log.info(f"Complete : {file_lines:,} : {bad_lines:,}") +