diff --git a/personal/combine/build_month.py b/personal/combine/build_month.py new file mode 100644 index 0000000..5057f11 --- /dev/null +++ b/personal/combine/build_month.py @@ -0,0 +1,172 @@ +import sys +import requests +import time +import discord_logging +import argparse +import os +import re +import zstandard +from datetime import datetime, timedelta +import json +import praw +from praw import endpoints +import prawcore +import logging.handlers + +sys.path.append('personal') + +log = discord_logging.init_logging(debug=False) + +import utils +import classes +from classes import IngestType +from merge import ObjectType + + +NEWLINE_ENCODED = "\n".encode('utf-8') +reg = re.compile(r"\d\d-\d\d-\d\d_\d\d-\d\d") + + +def end_of_day(input_minute): + return input_minute.replace(hour=0, minute=0, second=0) + timedelta(days=1) + + +def build_day(day_to_process, input_folders, output_folder, object_type, reddit): + log.info(f"Using pushshift token: {pushshift_token}") + + file_type = "comments" if object_type == ObjectType.COMMENT else "submissions" + + file_minutes = {} + minute_iterator = day_to_process - timedelta(minutes=2) + end_time = end_of_day(day_to_process) + timedelta(minutes=2) + while minute_iterator <= end_time: + file_minutes[minute_iterator] = [] + minute_iterator += timedelta(minutes=1) + + for merge_folder, ingest_type in input_folders: + merge_date_folder = os.path.join(merge_folder, file_type, day_to_process.strftime('%y-%m-%d')) + if os.path.exists(merge_date_folder): + for file in os.listdir(merge_date_folder): + match = reg.search(file) + if not match: + log.info(f"File doesn't match regex: {file}") + continue + file_date = datetime.strptime(match.group(), '%y-%m-%d_%H-%M') + if file_date in file_minutes: + file_minutes[file_date].append((os.path.join(merge_date_folder, file), ingest_type)) + + objects = classes.ObjectDict(day_to_process, day_to_process + timedelta(days=1) - timedelta(seconds=1), object_type) + unmatched_field = False + minute_iterator = day_to_process - timedelta(minutes=2) + working_lowest_minute = day_to_process + last_minute_of_day = end_of_day(day_to_process) - timedelta(minutes=1) + while minute_iterator <= end_time: + for ingest_file, ingest_type in file_minutes[minute_iterator]: + for obj in utils.read_obj_zst(ingest_file): + if objects.add_object(obj, ingest_type): + unmatched_field = True + log.info(f"Loaded {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {objects.get_counts_string_by_minute(minute_iterator, [IngestType.INGEST, IngestType.RESCAN, IngestType.DOWNLOAD])}") + + if minute_iterator >= end_time or objects.count_minutes() >= 11: + if minute_iterator > last_minute_of_day: + working_highest_minute = last_minute_of_day + else: + working_highest_minute = minute_iterator - timedelta(minutes=1) + missing_ids, start_id, end_id = objects.get_missing_ids_by_minutes(working_lowest_minute, working_highest_minute) + log.debug( + f"Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(start_id)}|{start_id}) to " + f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(end_id)}|{end_id}) with {len(missing_ids)} ({end_id - start_id}) ids") + + for chunk in utils.chunk_list(missing_ids, 50): + pushshift_objects, pushshift_token = query_pushshift(chunk, pushshift_token, object_type) + for pushshift_object in pushshift_objects: + if objects.add_object(pushshift_object, IngestType.PUSHSHIFT): + unmatched_field = True + + for chunk in utils.chunk_list(missing_ids, 100): + reddit_objects = query_reddit(chunk, reddit, object_type) + for reddit_object in reddit_objects: + if objects.add_object(reddit_object['data'], IngestType.BACKFILL): + unmatched_field = True + + for missing_id in missing_ids: + if missing_id not in objects.by_id: + objects.add_missing_object(missing_id) + + objects.delete_objects_below_minute(working_lowest_minute) + while working_lowest_minute <= working_highest_minute: + folder = os.path.join(output_folder, file_type, working_lowest_minute.strftime('%y-%m-%d')) + if not os.path.exists(folder): + os.makedirs(folder) + output_path = os.path.join(folder, f"{('RS' if object_type == ObjectType.COMMENT else 'RC')}_{working_lowest_minute.strftime('%y-%m-%d_%H-%M')}.zst") + output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb')) + + for obj in objects.by_minute[working_lowest_minute].obj_list: + output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8')) + output_handle.write(NEWLINE_ENCODED) + objects.delete_object_id(obj['id']) + log.info( + f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : " + f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL, IngestType.MISSING])}") + output_handle.close() + working_lowest_minute += timedelta(minutes=1) + + objects.rebuild_minute_dict() + + discord_logging.flush_discord() + if unmatched_field: + log.warning(f"Unmatched field, aborting") + discord_logging.flush_discord() + sys.exit(1) + + minute_iterator += timedelta(minutes=1) + + log.info(f"Finished day {day_to_process.strftime('%y-%m-%d')}: {objects.get_counts_string()}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Combine the minute files into a single month") + parser.add_argument("--type", help="The object type, either comments or submissions", required=True) + parser.add_argument("--month", help="The month to process, format YY-MM", required=True) + parser.add_argument('--input', help='Input folder', required=True) + parser.add_argument('--output', help='Output folder', required=True) + parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False) + args = parser.parse_args() + + if args.debug: + discord_logging.set_level(logging.DEBUG) + + month = datetime.strptime(args.month, '%y-%m') + + log.info(f"Input folder: {args.input}") + log.info(f"Output folder: {args.output}") + + prefix = None + if args.type == "comments": + prefix = "RC" + elif args.type == "submissions": + prefix = "RS" + else: + log.error(f"Invalid type: {args.type}") + sys.exit(2) + + output_path = os.path.join(args.output, args.type, f"{prefix}_{month.strftime('%Y-%m')}.zst") + output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb')) + + count_objects = 0 + minute_iterator = month + end_time = month.replace(month=month.month + 1) + while minute_iterator < end_time: + minute_file_path = os.path.join(args.input, args.type, minute_iterator.strftime('%y-%m-%d'), f"{prefix}_{minute_iterator.strftime('%y-%m-%d_%H-%M')}.zst") + for obj, line, _ in utils.read_obj_zst_meta(minute_file_path): + output_handle.write(line) + output_handle.write(NEWLINE_ENCODED) + + count_objects += 1 + if count_objects % 100000 == 0: + log.info(f"{minute_iterator.strftime('%y-%m-%d_%H-%M')} : {count_objects}") + + minute_iterator += timedelta(minutes=1) + + log.info(f"{minute_iterator.strftime('%y-%m-%d_%H-%M')} : {count_objects}") + output_handle.close() diff --git a/personal/combine/build_day.py b/personal/combine/merge_and_backfill.py similarity index 98% rename from personal/combine/build_day.py rename to personal/combine/merge_and_backfill.py index 690ed8f..e42c937 100644 --- a/personal/combine/build_day.py +++ b/personal/combine/merge_and_backfill.py @@ -171,7 +171,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit) folder = os.path.join(output_folder, file_type, working_lowest_minute.strftime('%y-%m-%d')) if not os.path.exists(folder): os.makedirs(folder) - output_path = os.path.join(folder, f"{('RS' if object_type == ObjectType.COMMENT else 'RC')}_{working_lowest_minute.strftime('%y-%m-%d_%H-%M')}.zst") + output_path = os.path.join(folder, f"{('RC' if object_type == ObjectType.COMMENT else 'RS')}_{working_lowest_minute.strftime('%y-%m-%d_%H-%M')}.zst") output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb')) for obj in objects.by_minute[working_lowest_minute].obj_list: diff --git a/personal/combine/merge_minutes.py b/personal/combine/merge_minutes.py new file mode 100644 index 0000000..cf0349a --- /dev/null +++ b/personal/combine/merge_minutes.py @@ -0,0 +1,143 @@ +import sys +import requests +import time +import discord_logging +import argparse +import os +import re +import zstandard +from datetime import datetime, timedelta +import json +import praw +from praw import endpoints +import prawcore +import logging.handlers + +sys.path.append('personal') + +log = discord_logging.init_logging(debug=False) + +import utils +import classes +from classes import IngestType +from merge import ObjectType + + +NEWLINE_ENCODED = "\n".encode('utf-8') +reg = re.compile(r"\d\d-\d\d-\d\d_\d\d-\d\d") + + +def end_of_day(input_minute): + return input_minute.replace(hour=0, minute=0, second=0) + timedelta(days=1) + + +def build_day(day_to_process, input_folders, output_folder, object_type): + file_type = "comments" if object_type == ObjectType.COMMENT else "submissions" + + file_minutes = {} + minute_iterator = day_to_process - timedelta(minutes=2) + end_time = end_of_day(day_to_process) + timedelta(minutes=2) + while minute_iterator <= end_time: + file_minutes[minute_iterator] = [] + minute_iterator += timedelta(minutes=1) + + for merge_folder, ingest_type in input_folders: + merge_date_folder = os.path.join(merge_folder, file_type, day_to_process.strftime('%y-%m-%d')) + if os.path.exists(merge_date_folder): + for file in os.listdir(merge_date_folder): + match = reg.search(file) + if not match: + log.info(f"File doesn't match regex: {file}") + continue + file_date = datetime.strptime(match.group(), '%y-%m-%d_%H-%M') + if file_date in file_minutes: + file_minutes[file_date].append((os.path.join(merge_date_folder, file), ingest_type)) + + objects = classes.ObjectDict(day_to_process, day_to_process + timedelta(days=1) - timedelta(seconds=1), object_type) + unmatched_field = False + minute_iterator = day_to_process - timedelta(minutes=2) + working_lowest_minute = day_to_process + last_minute_of_day = end_of_day(day_to_process) - timedelta(minutes=1) + while minute_iterator <= end_time: + for ingest_file, ingest_type in file_minutes[minute_iterator]: + for obj in utils.read_obj_zst(ingest_file): + if objects.add_object(obj, ingest_type): + unmatched_field = True + log.info(f"Loaded {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {objects.get_counts_string_by_minute(minute_iterator, [IngestType.INGEST, IngestType.DOWNLOAD])}") + + if minute_iterator >= end_time or objects.count_minutes() >= 11: + if minute_iterator > last_minute_of_day: + working_highest_minute = last_minute_of_day + else: + working_highest_minute = minute_iterator - timedelta(minutes=1) + + objects.delete_objects_below_minute(working_lowest_minute) + while working_lowest_minute <= working_highest_minute: + folder = os.path.join(output_folder, file_type, working_lowest_minute.strftime('%y-%m-%d')) + if not os.path.exists(folder): + os.makedirs(folder) + output_path = os.path.join(folder, f"{('RS' if object_type == ObjectType.COMMENT else 'RC')}_{working_lowest_minute.strftime('%y-%m-%d_%H-%M')}.zst") + output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb')) + + for obj in objects.by_minute[working_lowest_minute].obj_list: + output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8')) + output_handle.write(NEWLINE_ENCODED) + objects.delete_object_id(obj['id']) + log.info(f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')}") + output_handle.close() + working_lowest_minute += timedelta(minutes=1) + + objects.rebuild_minute_dict() + + discord_logging.flush_discord() + if unmatched_field: + log.info(f"Unmatched field, aborting") + sys.exit(1) + + minute_iterator += timedelta(minutes=1) + + log.info(f"Finished day {day_to_process.strftime('%y-%m-%d')}: {objects.get_counts_string()}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Combine two ingest files") + parser.add_argument("--type", help="The object type, either comments or submissions", required=True) + parser.add_argument("--start_date", help="The start of the date range to process, format YY-MM-DD_HH-MM", required=True) + parser.add_argument("--end_date", help="The end of the date range to process, format YY-MM-DD. If not provided, the script processes to the end of the day") + parser.add_argument('--input', help='Input folder', required=True) + parser.add_argument('--output', help='Output folder', required=True) + parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False) + args = parser.parse_args() + + if args.debug: + discord_logging.set_level(logging.DEBUG) + + if args.start_date is None: + log.error(f"No start date provided") + sys.exit(2) + start_date = datetime.strptime(args.start_date, '%y-%m-%d_%H-%M') + end_date = end_of_day(start_date) + if args.end_date is not None: + end_date = datetime.strptime(args.end_date, '%y-%m-%d') + + input_folders = [ + (os.path.join(args.input, "combined"), IngestType.INGEST), + (os.path.join(args.input, "download"), IngestType.DOWNLOAD), + ] + + for input_folder, ingest_type in input_folders: + log.info(f"Input folder: {input_folder}") + log.info(f"Output folder: {args.output}") + + object_type = None + if args.type == "comments": + object_type = ObjectType.COMMENT + elif args.type == "submissions": + object_type = ObjectType.SUBMISSION + else: + log.error(f"Invalid type: {args.type}") + sys.exit(2) + + while start_date <= end_date: + build_day(start_date, input_folders, args.output, object_type) + start_date = end_of_day(start_date) diff --git a/personal/move/rename_files.py b/personal/move/rename_files.py new file mode 100644 index 0000000..7212c9d --- /dev/null +++ b/personal/move/rename_files.py @@ -0,0 +1,30 @@ +import os +import discord_logging +import re +from datetime import datetime + +log = discord_logging.init_logging() + + +if __name__ == "__main__": + parent_folder = r"\\MYCLOUDPR4100\Public\ingest\combined\submissions" + files = [] + for folder_name in os.listdir(parent_folder): + folder = os.path.join(parent_folder, folder_name) + for file in os.listdir(folder): + file_path = os.path.join(parent_folder, folder, file) + if file.endswith(".zst"): + files.append((folder, file)) + log.info(f"{parent_folder}: {len(files):,}") + + count_moved = 0 + for folder, old_file in files: + old_path = os.path.join(folder, old_file) + new_file = old_file.replace("RC_", "RS_") + new_path = os.path.join(folder, new_file) + + os.rename(old_path, new_path) + count_moved += 1 + if count_moved % 100 == 0: + log.info(f"{count_moved:,}/{len(files):,}: {folder}") + log.info(f"{count_moved:,}/{len(files):,}") diff --git a/personal/utils.py b/personal/utils.py index 3912d55..f675c9e 100644 --- a/personal/utils.py +++ b/personal/utils.py @@ -44,6 +44,7 @@ def read_obj_zst_meta(file_name): lines = (buffer + chunk).split("\n") for line in lines[:-1]: + line = line.strip() try: json_object = json.loads(line) except (KeyError, json.JSONDecodeError) as err: