diff --git a/Pipfile b/Pipfile index 2997958..63d0be0 100644 --- a/Pipfile +++ b/Pipfile @@ -9,6 +9,8 @@ discord-logging = {editable = true, git = "https://github.com/Watchful1/DiscordL requests = "*" pymongo = {extras = ["srv"], version = "*"} scipy = "*" +sortedcontainers = "*" +praw = "*" [dev-packages] diff --git a/personal/combine/build_day.py b/personal/combine/build_day.py new file mode 100644 index 0000000..991f2a1 --- /dev/null +++ b/personal/combine/build_day.py @@ -0,0 +1,165 @@ +import sys + +import discord_logging +import argparse +import os +import re +import zstandard +from datetime import datetime, timedelta +from collections import defaultdict +import json +import praw +from praw import endpoints + +log = discord_logging.init_logging(debug=False) + +import utils +import classes +from classes import IngestType + +NEWLINE_ENCODED = "\n".encode('utf-8') +reg = re.compile(r"\d\d-\d\d-\d\d_\d\d-\d\d") + + +def build_day(day_to_process, input_folders, output_folder, object_type, reddit, pushshift_token): + file_type = "comments" if object_type == utils.ObjectType.COMMENT else "submissions" + + file_minutes = {} + minute_iterator = day_to_process - timedelta(minutes=2) + end_time = day_to_process + timedelta(days=1, minutes=2) + while minute_iterator <= end_time: + file_minutes[minute_iterator] = [] + minute_iterator += timedelta(minutes=1) + + for merge_folder, ingest_type in input_folders: + merge_date_folder = os.path.join(merge_folder, file_type, day_to_process.strftime('%y-%m-%d')) + if os.path.exists(merge_date_folder): + for file in os.listdir(merge_date_folder): + match = reg.search(file) + if not match: + log.info(f"File doesn't match regex: {file}") + continue + file_date = datetime.strptime(match.group(), '%y-%m-%d_%H-%M') + file_minutes[file_date].append((os.path.join(merge_date_folder, file), ingest_type)) + + output_path = os.path.join(output_folder, file_type) + if not os.path.exists(output_path): + os.makedirs(output_path) + output_path = os.path.join(output_path, f"{('RC' if file_type == 'comments' else 'RS')}_{day_to_process.strftime('%y-%m-%d')}.zst") + output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb')) + + objects = classes.ObjectDict(day_to_process, day_to_process + timedelta(days=1) - timedelta(seconds=1), object_type) + unmatched_field = False + minute_iterator = day_to_process - timedelta(minutes=2) + working_lowest_minute = day_to_process + last_minute_of_day = day_to_process + timedelta(days=1) - timedelta(minutes=1) + end_time = day_to_process + timedelta(days=1, minutes=2) + while minute_iterator <= end_time: + for ingest_file, ingest_type in file_minutes[minute_iterator]: + for obj in utils.read_obj_zst(ingest_file): + if objects.add_object(obj, ingest_type): + unmatched_field = True + log.info(f"Loaded {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {objects.get_counts_string_by_minute(minute_iterator, [IngestType.INGEST, IngestType.RESCAN, IngestType.DOWNLOAD])}") + + if minute_iterator >= end_time or objects.count_minutes() >= 11: + if minute_iterator > last_minute_of_day: + working_highest_minute = last_minute_of_day + else: + working_highest_minute = minute_iterator - timedelta(minutes=1) + missing_ids = objects.get_missing_ids_by_minutes(working_lowest_minute, working_highest_minute) + log.debug( + f"Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} to " + f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} with {len(missing_ids)} ids") + + for chunk in utils.chunk_list(missing_ids, 50): + pushshift_objects = utils.query_pushshift(chunk, pushshift_token, object_type) + for pushshift_object in pushshift_objects: + if objects.add_object(pushshift_object, IngestType.PUSHSHIFT): + unmatched_field = True + + id_prefix = 't1_' if file_type == 'comments' else 't3_' + for chunk in utils.chunk_list(missing_ids, 100): + id_string = f"{id_prefix}{(f',{id_prefix}'.join(chunk))}" + reddit_objects = reddit.request(method="GET", path=endpoints.API_PATH["info"], params={"id": id_string}) + for reddit_object in reddit_objects['data']['children']: + if objects.add_object(reddit_object['data'], IngestType.BACKFILL): + unmatched_field = True + + objects.delete_objects_below_minute(working_lowest_minute) + while working_lowest_minute <= working_highest_minute: + for obj in objects.by_minute[working_lowest_minute].obj_list: + output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8')) + output_handle.write(NEWLINE_ENCODED) + objects.delete_object_id(obj['id']) + log.info( + f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : " + f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL])}") + working_lowest_minute += timedelta(minutes=1) + + objects.rebuild_minute_dict() + + if unmatched_field: + log.info(f"Unmatched field, aborting") + sys.exit(1) + + minute_iterator += timedelta(minutes=1) + + output_handle.close() + log.info(f"Finished day {day_to_process.strftime('%y-%m-%d')}: {objects.get_counts_string()}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Combine the ingest and rescan files, clean and do pushshift lookups as needed") + parser.add_argument("--type", help="The object type, either comments or submissions", required=True) + parser.add_argument("--start_date", help="The start of the date range to process, format YY-MM-DD", required=True) + parser.add_argument("--end_date", help="The end of the date range to process, format YY-MM-DD. If not provided, the script processed only one day") + parser.add_argument('--input', help='Input folder', required=True) + parser.add_argument('--output', help='Output folder', required=True) + args = parser.parse_args() + + input_folders = [ + (os.path.join(args.input, "ingest"), IngestType.INGEST), + (os.path.join(args.input, "rescan"), IngestType.RESCAN), + (os.path.join(args.input, "download"), IngestType.DOWNLOAD), + ] + + if args.start_date is None: + log.error(f"No start date provided") + sys.exit(2) + start_date = datetime.strptime(args.start_date, '%y-%m-%d') + end_date = start_date + if args.end_date is not None: + end_date = datetime.strptime(args.end_date, '%y-%m-%d') + + for input_folder, ingest_type in input_folders: + log.info(f"Input folder: {input_folder}") + log.info(f"Output folder: {args.output}") + + object_type = None + if args.type == "comments": + object_type = utils.ObjectType.COMMENT + elif args.type == "submissions": + object_type = utils.ObjectType.SUBMISSION + else: + log.error(f"Invalid type: {args.type}") + sys.exit(2) + + config = discord_logging.get_config() + user_name = "Watchful12" + reddit = praw.Reddit( + username=user_name, + password=discord_logging.get_config_var(config, user_name, "password"), + client_id=discord_logging.get_config_var(config, user_name, f"client_id_1"), + client_secret=discord_logging.get_config_var(config, user_name, f"client_secret_1"), + user_agent=f"Remindme ingest script") + + pushshift_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoiV2F0Y2hmdWwxIiwiZXhwaXJlcyI6MTY5MzA5OTE4OC4wMjU3MDU4fQ.HJJd73nwHArOz2lErpubUuTVd_gdJ44SfpKDjb91tIY" + + while start_date <= end_date: + build_day(start_date, input_folders, args.output, object_type, reddit, pushshift_token) + start_date = start_date + timedelta(days=1) + + #log.info(f"{len(file_minutes)} : {count_ingest_minutes} : {count_rescan_minutes} : {day_highest_id - day_lowest_id:,} - {count_objects:,} = {(day_highest_id - day_lowest_id) - count_objects:,}: {utils.base36encode(day_lowest_id)}-{utils.base36encode(day_highest_id)}") + +# minute_iterator = datetime.strptime("23-07-10 23:30", '%y-%m-%d %H:%M') +# working_lowest_minute = datetime.strptime("23-07-10 23:30", '%y-%m-%d %H:%M') \ No newline at end of file diff --git a/personal/combine/classes.py b/personal/combine/classes.py new file mode 100644 index 0000000..1451a34 --- /dev/null +++ b/personal/combine/classes.py @@ -0,0 +1,282 @@ +from datetime import datetime +import os +import discord_logging +import sys +import zstandard +import json +from enum import Enum +from sortedcontainers import SortedList +from collections import defaultdict + +log = discord_logging.get_logger() + +import utils + +NEWLINE_ENCODED = "\n".encode('utf-8') + + +class ApiRequest: + def __init__(self, ids, is_submission, ingest_name, estimated_datetime=None, missing_expected=False): + self.ids = ids + self.is_submission = is_submission + self.ingest_name = ingest_name + self.estimated_datetime = estimated_datetime + self.missing_expected = missing_expected + self.results = None + self.complete = False + self.tries = 0 + self.prev_lengths = [] + + def should_retry(self): + if self.complete: + return False # the request is complete, no need to retry + if len(self.prev_lengths) <= 1: + return True # we've only made one attempt and it didn't work, do retry + if self.prev_lengths[-1] == 0: + if len(self.prev_lengths) < (10 if self.missing_expected else 100): + return True # the most recent result was 0 objects, retry up to 100 times + else: + log.info(f"Force finished request with retries: {self}") + self.complete = True + return False + if self.prev_lengths[-1] == self.prev_lengths[-2]: + if self.missing_expected: + self.complete = True + return False # the latest two requests were the same and we're expecting missing objects, mark as complete + elif len(self.prev_lengths) >= 4 and \ + self.prev_lengths[-1] == self.prev_lengths[-3] and \ + self.prev_lengths[-1] == self.prev_lengths[-4]: + log.info(f"Force finished request with retries: {self}") + self.complete = True + return False # the latest four requests were the same, go ahead and mark as complete + return True # recent requests didn't match, and weren't 0, go ahead and retry + + def get_body_key(self): + return "self_text" if self.is_submission else "body" + + def get_string_type(self): + return "submission" if self.is_submission else "comment" + + def get_prefix(self): + return "t3_" if self.is_submission else "t1_" + + def set_results(self, results): + self.prev_lengths.append(len(results)) + self.results = [] + current_timestamp = int(datetime.utcnow().timestamp()) + for result in results: + obj = result['data'] + if 'body_html' in obj: + del obj['body_html'] + if 'selftext_html' in obj: + del obj['selftext_html'] + obj['retrieved_on'] = current_timestamp + self.results.append(obj) + log.debug(f"Set result: {self}") + + def id_string(self): + return f"{self.get_prefix()}{(f',{self.get_prefix()}'.join(self.ids))}" + + def __str__(self): + return \ + f"{self.ingest_name}: {self.ids[0]}-{self.ids[-1]} {self.get_string_type()}: " \ + f"{len(self.results) if self.results else self.results} : {self.tries} : " \ + f"{self.complete} : {','.join([str(val) for val in self.prev_lengths])}" + + def __gt__(self, other): + if isinstance(other, ApiRequest): + return False + return True + + def __lt__(self, other): + if isinstance(other, ApiRequest): + return True + return False + + def __eq__(self, other): + if isinstance(other, ApiRequest): + return True + return False + + +class Queue: + def __init__(self, max_size): + self.list = [] + self.max_size = max_size + + def put(self, item): + if len(self.list) >= self.max_size: + self.list.pop(0) + self.list.append(item) + + def peek(self): + return self.list[0] if len(self.list) > 0 else None + + +class OutputHandle: + def __init__(self, is_submission, dump_folder): + self.handle = None + self.current_path = None + self.current_minute = None + self.is_submission = is_submission + self.dump_folder = dump_folder + + if not os.path.exists(dump_folder): + os.makedirs(dump_folder) + + def matched_minute(self, new_date_time): + return self.current_minute is not None and new_date_time.minute == self.current_minute + + def get_path(self, date_folder, export_filename, increment=None): + folder = f"{self.dump_folder}{os.path.sep}{date_folder}" + if not os.path.exists(folder): + os.makedirs(folder) + + bldr = [folder] + bldr.append(os.path.sep) + if self.is_submission: + bldr.append("RS_") + else: + bldr.append("RC_") + bldr.append(export_filename) + if increment is not None: + bldr.append("_") + bldr.append(str(increment)) + bldr.append(".zst") + + return ''.join(bldr) + + def rollover_to_minute(self, date_time): + if self.handle is not None: + self.handle.close() + os.rename(self.current_path + ".tmp", self.current_path) + date_folder = date_time.strftime('%y-%m-%d') + export_filename = date_time.strftime('%y-%m-%d_%H-%M') + export_path = self.get_path(date_folder, export_filename) + if os.path.exists(export_path + ".tmp"): + os.rename(export_path + ".tmp", export_path) + i = 0 + while os.path.exists(export_path): + log.info(f"Dump exists, incrementing: {export_path}") + i += 1 + export_path = self.get_path(date_folder, export_filename, i) + if i > 100: + log.warning(f"Something went wrong, more than 100 dumps for minute, aborting") + sys.exit(3) + self.current_path = export_path + self.handle = zstandard.ZstdCompressor().stream_writer(open(export_path + ".tmp", 'wb')) + self.current_minute = date_time.minute + + def write_object(self, obj): + self.handle.write(json.dumps(obj, sort_keys=True).encode('utf-8')) + self.handle.write(NEWLINE_ENCODED) + + def flush(self): + self.handle.flush() + + def close(self): + if self.handle is not None: + self.handle.close() + + +class IngestType(Enum): + INGEST = 1 + RESCAN = 2 + DOWNLOAD = 3 + PUSHSHIFT = 4 + BACKFILL = 5 + + +class ObjectDict: + def __init__(self, min_datetime, max_datetime, obj_type): + self.min_datetime = min_datetime + self.max_datetime = max_datetime + self.obj_type = obj_type + + self.counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + self.min_id = None + self.max_id = None + + self.by_id = {} + self.by_minute = defaultdict(ObjectMinuteList) + + def contains_id(self, str_id): + return str_id in self.by_id + + def delete_object_id(self, str_id): + del self.by_id[str_id] + + def delete_objects_below_minute(self, delete_below_minute): + for minute, minute_list in self.by_minute.items(): + if minute < delete_below_minute: + for obj in minute_list.obj_list: + self.delete_object_id(obj['id']) + + def rebuild_minute_dict(self): + self.by_minute = defaultdict(ObjectMinuteList) + for obj in self.by_id.values(): + created_minute = datetime.utcfromtimestamp(obj["created_utc"]).replace(second=0, microsecond=0) + self.by_minute[created_minute].add(obj) + + def count_minutes(self): + return len(self.by_minute) + + @staticmethod + def get_counts_string_from_dict(counts_dict, ingest_types): + bldr = [] + for ingest_type in ingest_types: + if ingest_type in counts_dict: + bldr.append(f"{counts_dict[ingest_type][True]}({counts_dict[ingest_type][False]})") + else: + bldr.append("0(0)") + return "|".join(bldr) + + def get_counts_string_by_minute(self, minute, ingest_types): + return ObjectDict.get_counts_string_from_dict(self.counts[minute], ingest_types) + + def get_counts_string(self): + sum_dict = defaultdict(lambda: defaultdict(int)) + for counts_dict in self.counts.values(): + for ingest_type in IngestType: + if ingest_type in counts_dict: + sum_dict[ingest_type][True] += counts_dict[ingest_type][True] + sum_dict[ingest_type][False] += counts_dict[ingest_type][False] + return ObjectDict.get_counts_string_from_dict(sum_dict, IngestType) + + def get_missing_ids_by_minutes(self, start_minute, end_minute): + start_id = self.by_minute[start_minute].min_id + end_id = self.by_minute[end_minute].max_id + missing_ids = [] + for int_id in range(start_id, end_id + 1): + string_id = utils.base36encode(int_id) + if not self.contains_id(string_id): + missing_ids.append(string_id) + return missing_ids + + def add_object(self, obj, ingest_type): + created_utc = datetime.utcfromtimestamp(obj["created_utc"]) + created_minute = created_utc.replace(second=0, microsecond=0) + if obj['id'] in self.by_id: + existing_obj = self.by_id[obj['id']] + unmatched_field = utils.merge_fields(existing_obj, obj, self.obj_type) + self.counts[created_minute][ingest_type][False] += 1 + return unmatched_field + if created_utc < self.min_datetime or created_utc > self.max_datetime: + return False + unmatched_field = utils.parse_fields(obj, self.obj_type) + self.by_id[obj['id']] = obj + self.by_minute[created_minute].add(obj) + self.counts[created_minute][ingest_type][True] += 1 + self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id) + return unmatched_field + + +class ObjectMinuteList: + def __init__(self): + self.obj_list = SortedList(key=lambda x: f"{x['created_utc']}:{x['id']}") + self.min_id = None + self.max_id = None + + def add(self, obj): + self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id) + self.obj_list.add(obj) diff --git a/personal/combine/utils.py b/personal/combine/utils.py new file mode 100644 index 0000000..e6e994a --- /dev/null +++ b/personal/combine/utils.py @@ -0,0 +1,549 @@ +import re +import sys +from enum import Enum +import discord_logging +import zstandard +import json +from datetime import datetime +import requests +import time + +import counters + +log = discord_logging.get_logger(init=True) + + +def parse_ingest_string(ingest_string): + ingest_ids = [] + for char in ingest_string: + ingest_ids.append(char) + return ingest_ids + + +def read_obj_zst(file_name): + with open(file_name, 'rb') as file_handle: + buffer = '' + reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) + while True: + chunk = read_and_decode(reader, 2**27, (2**29) * 2) + + if not chunk: + break + lines = (buffer + chunk).split("\n") + + for line in lines[:-1]: + if line == "": + continue + yield json.loads(line.strip()) + + buffer = lines[-1] + + reader.close() + + +def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0): + chunk = reader.read(chunk_size) + bytes_read += chunk_size + if previous_chunk is not None: + chunk = previous_chunk + chunk + try: + return chunk.decode() + except UnicodeDecodeError: + if bytes_read > max_window_size: + raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes") + return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read) + + +def base36encode(integer: int) -> str: + chars = '0123456789abcdefghijklmnopqrstuvwxyz' + sign = '-' if integer < 0 else '' + integer = abs(integer) + result = '' + while integer > 0: + integer, remainder = divmod(integer, 36) + result = chars[remainder] + result + return sign + result + + +def base36decode(base36: str) -> int: + return int(base36, 36) + + +def next_string_id(string_id): + return base36encode(base36decode(string_id) + 1) + + +def get_next_hundred_ids(start_id): + start_num = base36decode(start_id) + ids = [] + id_num = -1 + for id_num in range(start_num, start_num + 100): + ids.append(base36encode(id_num)) + return ids, base36encode(id_num) + + +class FieldAction(Enum): + OVERWRITE = 1 + OVERWRITE_NOT_NONE = 2 + OVERWRITE_IF_NONE = 3 + DONT_OVERWRITE = 4 + DELETE = 5 + SPECIAL = 6 + SPECIAL_NO_OVERWRITE = 7 + ALLOW = 8 + ALLOW_EMPTY = 9 + + +class ObjectType(Enum): + COMMENT = 1 + SUBMISSION = 2 + + +field_actions = { + ObjectType.COMMENT: { + "all_awardings": FieldAction.OVERWRITE_NOT_NONE, + "approved": FieldAction.DELETE, + "approved_at_utc": FieldAction.SPECIAL_NO_OVERWRITE, + "approved_by": FieldAction.SPECIAL_NO_OVERWRITE, + "archived": FieldAction.OVERWRITE, + "associated_award": FieldAction.ALLOW_EMPTY, + "author": FieldAction.OVERWRITE_IF_NONE, + "author_cakeday": FieldAction.DONT_OVERWRITE, + "author_flair_background_color": FieldAction.OVERWRITE_IF_NONE, + "author_flair_css_class": FieldAction.OVERWRITE_IF_NONE, + "author_flair_richtext": FieldAction.OVERWRITE_IF_NONE, + "author_flair_template_id": FieldAction.OVERWRITE_IF_NONE, + "author_flair_text": FieldAction.OVERWRITE_IF_NONE, + "author_flair_text_color": FieldAction.OVERWRITE_IF_NONE, + "author_flair_type": FieldAction.OVERWRITE_IF_NONE, + "author_fullname": FieldAction.OVERWRITE_IF_NONE, + "author_is_blocked": FieldAction.SPECIAL_NO_OVERWRITE, + "author_patreon_flair": FieldAction.OVERWRITE, + "author_premium": FieldAction.OVERWRITE, + "awarders": FieldAction.OVERWRITE_IF_NONE, + "ban_note": FieldAction.DELETE, + "banned_at_utc": FieldAction.SPECIAL_NO_OVERWRITE, + "banned_by": FieldAction.SPECIAL_NO_OVERWRITE, + "body": FieldAction.SPECIAL, + "body_html": FieldAction.DELETE, + "body_sha1": FieldAction.OVERWRITE_NOT_NONE, + "can_gild": FieldAction.OVERWRITE, + "can_mod_post": FieldAction.SPECIAL_NO_OVERWRITE, + "collapsed": FieldAction.OVERWRITE, + "collapsed_because_crowd_control": FieldAction.ALLOW_EMPTY, + "collapsed_reason": FieldAction.OVERWRITE, + "collapsed_reason_code": FieldAction.OVERWRITE, + "comment_type": FieldAction.ALLOW_EMPTY, + "controversiality": FieldAction.OVERWRITE, + "created": FieldAction.OVERWRITE_IF_NONE, + "created_utc": FieldAction.ALLOW, + "distinguished": FieldAction.OVERWRITE, + "downs": FieldAction.OVERWRITE_IF_NONE, + "editable": FieldAction.OVERWRITE, + "edited": FieldAction.OVERWRITE_NOT_NONE, + "gilded": FieldAction.OVERWRITE_NOT_NONE, + "gildings": FieldAction.OVERWRITE_NOT_NONE, + "id": FieldAction.ALLOW, + "ignore_reports": FieldAction.DELETE, + "is_submitter": FieldAction.DONT_OVERWRITE, + "likes": FieldAction.ALLOW_EMPTY, + "link_id": FieldAction.ALLOW, + "locked": FieldAction.OVERWRITE, + "media_metadata": FieldAction.OVERWRITE, + "mod_note": FieldAction.ALLOW_EMPTY, + "mod_reason_by": FieldAction.ALLOW_EMPTY, + "mod_reason_title": FieldAction.ALLOW_EMPTY, + "mod_reports": FieldAction.SPECIAL_NO_OVERWRITE, + "mod_reports_dismissed": FieldAction.SPECIAL_NO_OVERWRITE, + "name": FieldAction.OVERWRITE_IF_NONE, + "nest_level": FieldAction.OVERWRITE_NOT_NONE, + "no_follow": FieldAction.OVERWRITE, + "num_reports": FieldAction.SPECIAL_NO_OVERWRITE, + "parent_id": FieldAction.OVERWRITE_IF_NONE, + "permalink": FieldAction.DONT_OVERWRITE, + "removal_reason": FieldAction.SPECIAL, + "removed": FieldAction.DELETE, + "replies": FieldAction.OVERWRITE_IF_NONE, + "report_reasons": FieldAction.SPECIAL_NO_OVERWRITE, + "retrieved_on": FieldAction.SPECIAL, + "retrieved_utc": FieldAction.SPECIAL, + "saved": FieldAction.SPECIAL_NO_OVERWRITE, + "score": FieldAction.OVERWRITE_NOT_NONE, + "score_hidden": FieldAction.OVERWRITE, + "send_replies": FieldAction.OVERWRITE, + "spam": FieldAction.DELETE, + "stickied": FieldAction.OVERWRITE, + "subreddit": FieldAction.OVERWRITE_NOT_NONE, + "subreddit_id": FieldAction.ALLOW, + "subreddit_name_prefixed": FieldAction.OVERWRITE_NOT_NONE, + "subreddit_type": FieldAction.DONT_OVERWRITE, + "top_awarded_type": FieldAction.ALLOW_EMPTY, + "total_awards_received": FieldAction.OVERWRITE_NOT_NONE, + "treatment_tags": FieldAction.OVERWRITE_NOT_NONE, + "unrepliable_reason": FieldAction.ALLOW_EMPTY, + "ups": FieldAction.OVERWRITE_NOT_NONE, + "user_reports": FieldAction.SPECIAL_NO_OVERWRITE, + "user_reports_dismissed": FieldAction.SPECIAL_NO_OVERWRITE, + "updated_on": FieldAction.SPECIAL, + "updated_utc": FieldAction.SPECIAL, + "utc_datetime_str": FieldAction.DELETE, + }, + ObjectType.SUBMISSION: { + "ad_promoted_user_posts": FieldAction.ALLOW_EMPTY, + "ad_supplementary_text_md": FieldAction.ALLOW, + "adserver_click_url": FieldAction.ALLOW_EMPTY, + "adserver_imp_pixel": FieldAction.ALLOW_EMPTY, + "all_awardings": FieldAction.OVERWRITE_NOT_NONE, + "allow_live_comments": FieldAction.OVERWRITE, + "app_store_data": FieldAction.ALLOW_EMPTY, + "approved": FieldAction.DELETE, + "approved_at_utc": FieldAction.SPECIAL_NO_OVERWRITE, + "approved_by": FieldAction.SPECIAL_NO_OVERWRITE, + "archived": FieldAction.ALLOW_EMPTY, + "author": FieldAction.OVERWRITE_IF_NONE, + "author_cakeday": FieldAction.DONT_OVERWRITE, + "author_flair_background_color": FieldAction.OVERWRITE_NOT_NONE, + "author_flair_css_class": FieldAction.OVERWRITE_NOT_NONE, + "author_flair_richtext": FieldAction.OVERWRITE_NOT_NONE, + "author_flair_template_id": FieldAction.OVERWRITE_NOT_NONE, + "author_flair_text": FieldAction.OVERWRITE_NOT_NONE, + "author_flair_text_color": FieldAction.OVERWRITE_NOT_NONE, + "author_flair_type": FieldAction.OVERWRITE_NOT_NONE, + "author_fullname": FieldAction.OVERWRITE_NOT_NONE, + "author_id": FieldAction.OVERWRITE_NOT_NONE, + "author_is_blocked": FieldAction.SPECIAL_NO_OVERWRITE, + "author_patreon_flair": FieldAction.OVERWRITE, + "author_premium": FieldAction.OVERWRITE, + "awarders": FieldAction.ALLOW_EMPTY, + "ban_note": FieldAction.DELETE, + "banned_at_utc": FieldAction.SPECIAL_NO_OVERWRITE, + "banned_by": FieldAction.SPECIAL_NO_OVERWRITE, + "call_to_action": FieldAction.OVERWRITE, + "campaign_id": FieldAction.ALLOW_EMPTY, + "can_gild": FieldAction.OVERWRITE, + "can_mod_post": FieldAction.SPECIAL_NO_OVERWRITE, + "category": FieldAction.OVERWRITE_NOT_NONE, + "clicked": FieldAction.SPECIAL_NO_OVERWRITE, + "collections": FieldAction.OVERWRITE_NOT_NONE, + "content_categories": FieldAction.ALLOW, + "contest_mode": FieldAction.OVERWRITE, + "created": FieldAction.OVERWRITE_IF_NONE, + "created_utc": FieldAction.ALLOW, + "crosspost_parent": FieldAction.ALLOW, + "crosspost_parent_list": FieldAction.OVERWRITE_NOT_NONE, + "discussion_type": FieldAction.ALLOW, + "distinguished": FieldAction.OVERWRITE, + "domain": FieldAction.OVERWRITE_NOT_NONE, + "domain_override": FieldAction.OVERWRITE_NOT_NONE, + "downs": FieldAction.SPECIAL_NO_OVERWRITE, + "edited": FieldAction.OVERWRITE, + "embed_type": FieldAction.ALLOW_EMPTY, + "embed_url": FieldAction.ALLOW_EMPTY, + "event_end": FieldAction.OVERWRITE_NOT_NONE, + "event_is_live": FieldAction.OVERWRITE_NOT_NONE, + "event_start": FieldAction.OVERWRITE_NOT_NONE, + "events": FieldAction.ALLOW_EMPTY, + "eventsOnRender": FieldAction.ALLOW_EMPTY, + "gallery_data": FieldAction.OVERWRITE_NOT_NONE, + "gilded": FieldAction.OVERWRITE_NOT_NONE, + "gildings": FieldAction.OVERWRITE_NOT_NONE, + "hidden": FieldAction.ALLOW_EMPTY, + "hide_score": FieldAction.OVERWRITE, + "href_url": FieldAction.DONT_OVERWRITE, + "id": FieldAction.ALLOW, + "ignore_reports": FieldAction.DELETE, + "impression_id": FieldAction.ALLOW_EMPTY, + "impression_id_str": FieldAction.ALLOW_EMPTY, + "is_blank": FieldAction.ALLOW_EMPTY, + "is_created_from_ads_ui": FieldAction.ALLOW, + "is_crosspostable": FieldAction.OVERWRITE, + "is_gallery": FieldAction.ALLOW, + "is_meta": FieldAction.OVERWRITE, + "is_original_content": FieldAction.OVERWRITE, + "is_reddit_media_domain": FieldAction.OVERWRITE, + "is_robot_indexable": FieldAction.OVERWRITE, + "is_self": FieldAction.DONT_OVERWRITE, + "is_survey_ad": FieldAction.ALLOW_EMPTY, + "is_video": FieldAction.ALLOW, + "likes": FieldAction.ALLOW_EMPTY, + "link_flair_background_color": FieldAction.OVERWRITE_NOT_NONE, + "link_flair_css_class": FieldAction.OVERWRITE_NOT_NONE, + "link_flair_richtext": FieldAction.OVERWRITE_NOT_NONE, + "link_flair_template_id": FieldAction.OVERWRITE_NOT_NONE, + "link_flair_text": FieldAction.OVERWRITE_NOT_NONE, + "link_flair_text_color": FieldAction.OVERWRITE_NOT_NONE, + "link_flair_type": FieldAction.OVERWRITE_NOT_NONE, + "locked": FieldAction.OVERWRITE, + "media": FieldAction.OVERWRITE_NOT_NONE, + "media_embed": FieldAction.OVERWRITE_NOT_NONE, + "media_metadata": FieldAction.OVERWRITE_NOT_NONE, + "media_only": FieldAction.OVERWRITE, + "mobile_ad_url": FieldAction.ALLOW, + "mod_note": FieldAction.ALLOW_EMPTY, + "mod_reason_by": FieldAction.ALLOW_EMPTY, + "mod_reason_title": FieldAction.ALLOW_EMPTY, + "mod_reports": FieldAction.SPECIAL_NO_OVERWRITE, + "name": FieldAction.OVERWRITE_IF_NONE, + "no_follow": FieldAction.OVERWRITE, + "num_comments": FieldAction.OVERWRITE_NOT_NONE, + "num_crossposts": FieldAction.OVERWRITE, + "num_reports": FieldAction.SPECIAL_NO_OVERWRITE, + "original_link": FieldAction.ALLOW_EMPTY, + "outbound_link": FieldAction.ALLOW_EMPTY, + "over_18": FieldAction.OVERWRITE, + "parent_whitelist_status": FieldAction.OVERWRITE, + "permalink": FieldAction.DONT_OVERWRITE, + "pinned": FieldAction.ALLOW_EMPTY, + "poll_data": FieldAction.OVERWRITE_NOT_NONE, + "post_hint": FieldAction.OVERWRITE, + "preview": FieldAction.OVERWRITE_NOT_NONE, + "priority_id": FieldAction.ALLOW_EMPTY, + "product_ids": FieldAction.ALLOW_EMPTY, + "promo_layout": FieldAction.OVERWRITE, + "promoted": FieldAction.ALLOW_EMPTY, + "promoted_by": FieldAction.ALLOW_EMPTY, + "promoted_display_name": FieldAction.ALLOW_EMPTY, + "promoted_url": FieldAction.ALLOW_EMPTY, + "pwls": FieldAction.OVERWRITE, + "quarantine": FieldAction.DONT_OVERWRITE, + "removal_reason": FieldAction.SPECIAL, + "removed": FieldAction.DELETE, + "removed_by": FieldAction.SPECIAL_NO_OVERWRITE, + "removed_by_category": FieldAction.OVERWRITE, + "report_reasons": FieldAction.SPECIAL_NO_OVERWRITE, + "retrieved_on": FieldAction.SPECIAL, + "retrieved_utc": FieldAction.SPECIAL, + "saved": FieldAction.SPECIAL_NO_OVERWRITE, + "score": FieldAction.OVERWRITE_NOT_NONE, + "secure_media": FieldAction.OVERWRITE_NOT_NONE, + "secure_media_embed": FieldAction.OVERWRITE_NOT_NONE, + "selftext": FieldAction.SPECIAL, + "selftext_html": FieldAction.DELETE, + "send_replies": FieldAction.OVERWRITE, + "show_media": FieldAction.ALLOW, + "sk_ad_network_data": FieldAction.ALLOW_EMPTY, + "spam": FieldAction.DELETE, + "spoiler": FieldAction.OVERWRITE, + "stickied": FieldAction.OVERWRITE, + "subcaption": FieldAction.OVERWRITE, + "subreddit": FieldAction.ALLOW, + "subreddit_id": FieldAction.ALLOW, + "subreddit_name_prefixed": FieldAction.ALLOW, + "subreddit_subscribers": FieldAction.OVERWRITE_IF_NONE, + "subreddit_type": FieldAction.DONT_OVERWRITE, + "suggested_sort": FieldAction.OVERWRITE, + "third_party_trackers": FieldAction.ALLOW_EMPTY, + "third_party_tracking": FieldAction.ALLOW_EMPTY, + "third_party_tracking_2": FieldAction.ALLOW_EMPTY, + "thumbnail": FieldAction.OVERWRITE_NOT_NONE, + "thumbnail_height": FieldAction.OVERWRITE_NOT_NONE, + "thumbnail_width": FieldAction.OVERWRITE_NOT_NONE, + "title": FieldAction.DONT_OVERWRITE, + "top_awarded_type": FieldAction.OVERWRITE, + "total_awards_received": FieldAction.OVERWRITE_NOT_NONE, + "treatment_tags": FieldAction.OVERWRITE_NOT_NONE, + "updated_on": FieldAction.SPECIAL, + "updated_utc": FieldAction.SPECIAL, + "ups": FieldAction.OVERWRITE_NOT_NONE, + "upvote_ratio": FieldAction.OVERWRITE, + "url": FieldAction.OVERWRITE_NOT_NONE, + "url_overridden_by_dest": FieldAction.OVERWRITE_NOT_NONE, + "user_reports": FieldAction.SPECIAL_NO_OVERWRITE, + "user_reports_dismissed": FieldAction.SPECIAL_NO_OVERWRITE, + "utc_datetime_str": FieldAction.DELETE, + "view_count": FieldAction.ALLOW_EMPTY, + "visited": FieldAction.SPECIAL_NO_OVERWRITE, + "whitelist_status": FieldAction.OVERWRITE, + "wls": FieldAction.OVERWRITE, + }, +} + + +def is_empty(value): + return value is None \ + or value == "" \ + or value == "[deleted]" \ + or value == "[removed]" \ + or value == [] \ + or value == {} \ + or value is False \ + or value == 0 + + +def replace(match): + if match.group(0) == "amp;": return "" + if match.group(0) == "<": return "<" + if match.group(0) == ">": return ">" + log.warning(f"Unknown group: {match}") + sys.exit(2) + + +unencode_regex = re.compile(r"amp;|<|>") + + +def merge_fields(existing_obj, new_obj, obj_type): + unmatched_field = False + type_actions = field_actions[obj_type] + for key, new_value in new_obj.items(): + action = type_actions.get(key) + + original_value = existing_obj.get(key) + if new_value != original_value: + if isinstance(new_value, str) and unencode_regex.search(new_value): + new_value_no_encode = unencode_regex.sub(replace, new_value) + if new_value_no_encode == original_value: + continue + if action == FieldAction.OVERWRITE: + existing_obj[key] = new_value + elif action == FieldAction.OVERWRITE_NOT_NONE: + if not is_empty(new_value): + existing_obj[key] = new_value + elif action == FieldAction.OVERWRITE_IF_NONE: + if is_empty(original_value): + existing_obj[key] = new_value + elif action == FieldAction.SPECIAL: + if key == "body": + if not is_empty(new_value): + if 'previous_body' in existing_obj: + existing_obj['previous_body'] = original_value + existing_obj['body'] = new_value + elif key == "selftext": + if not is_empty(new_value): + if 'previous_selftext' not in existing_obj: + existing_obj['previous_selftext'] = original_value + existing_obj['selftext'] = new_value + elif key == "removal_reason" and new_value in ["legal", None]: + existing_obj[key] = new_value + elif key in ["retrieved_on", "retrieved_utc"]: + prev_retrieved_on = existing_obj["retrieved_on"] + if new_value < prev_retrieved_on: + existing_obj["retrieved_on"] = new_value + existing_obj["updated_on"] = prev_retrieved_on + if new_value > prev_retrieved_on: + existing_obj["updated_on"] = new_value + elif key in ["updated_on", "updated_utc"]: + if new_value > existing_obj["updated_on"]: + existing_obj["updated_on"] = new_value + else: + log.info(f"{new_obj['id']} unmatched special: {key}: {original_value} != {new_value}") + unmatched_field = True + elif action == FieldAction.DELETE or action == FieldAction.DONT_OVERWRITE or action == FieldAction.SPECIAL_NO_OVERWRITE: + pass + else: + log.info(f"{new_obj['id']} unmatched no action: {key}|{action}: {original_value} != {new_value}") + unmatched_field = True + elif action is None: + log.info(f"{new_obj['id']} matched no action: {key}: {new_value}") + unmatched_field = True + + return unmatched_field + + +def parse_fields(new_obj, obj_type): + keys_to_delete = [] + keys_to_add = [] + unmatched_field = False + type_actions = field_actions[obj_type] + for key, new_value in new_obj.items(): + action = type_actions.get(key) + if action is not None: + if action == FieldAction.DELETE: + keys_to_delete.append(key) + elif action == FieldAction.ALLOW_EMPTY: + if not is_empty(new_value): + log.info(f"{new_obj['id']} not empty: {key}: {new_value}") + unmatched_field = True + keys_to_delete.append(key) + elif action == FieldAction.SPECIAL: + if key in ["retrieved_on", "body", "selftext", "updated_on"]: + pass + elif key == "removal_reason" and new_value in ["legal", None]: + pass + elif key == "retrieved_utc": + keys_to_add.append(("retrieved_on", new_value)) + keys_to_delete.append(key) + elif key == "updated_utc": + keys_to_add.append(("updated_on", new_value)) + keys_to_delete.append(key) + else: + log.info(f"{new_obj['id']} special no match: {key}: {new_value}") + unmatched_field = True + keys_to_delete.append(key) + elif action == FieldAction.SPECIAL_NO_OVERWRITE: + if key in ["can_mod_post", "saved", "clicked", "visited", "author_is_blocked"]: + new_obj[key] = False + elif key in ["banned_at_utc", "banned_by", "approved_at_utc", "approved_by", "user_reports_dismissed", "mod_reports_dismissed", "removed_by"]: + new_obj[key] = None + elif key in ["num_reports", "downs"]: + new_obj[key] = 0 + elif key in ["report_reasons", "user_reports", "mod_reports"]: + new_obj[key] = [] + else: + log.info(f"{new_obj['id']} special no overwrite no match: {key}: {new_value}") + unmatched_field = True + keys_to_delete.append(key) + else: + log.info(f"{new_obj['id']} no action: {key}: {new_value}") + unmatched_field = True + + for key in keys_to_delete: + del new_obj[key] + + for key, value in keys_to_add: + new_obj[key] = value + + if 'retrieved_on' not in new_obj: + new_obj['retrieved_on'] = int(datetime.utcnow().timestamp()) + + return unmatched_field + + +def merge_lowest_highest_id(str_id, lowest_id, highest_id): + int_id = base36decode(str_id) + if lowest_id is None or int_id < lowest_id: + lowest_id = int_id + if highest_id is None or int_id > highest_id: + highest_id = int_id + return lowest_id, highest_id + + +async def record_rate_limits(reddit, client): + reddit_user = await reddit.user.me() + remaining = int(reddit._core._rate_limiter.remaining) + used = int(reddit._core._rate_limiter.used) + counters.rate_requests_remaining.labels(username=reddit_user.name, client=client).set(remaining) + counters.rate_requests_used.labels(username=reddit_user.name, client=client).set(used) + + reset_timestamp = reddit._core._rate_limiter.reset_timestamp + seconds_to_reset = (datetime.utcfromtimestamp(reset_timestamp) - datetime.utcnow()).total_seconds() + counters.rate_seconds_remaining.labels(username=reddit_user.name, client=client).set(int(seconds_to_reset)) + window_size = int(reddit._core._rate_limiter.window_size) if reddit._core._rate_limiter.window_size is not None else reddit._core._rate_limiter.window_size + time_to_next_request = max((datetime.utcnow() - datetime.utcfromtimestamp(reddit._core._rate_limiter.next_request_timestamp)).total_seconds(), 0) + #log.info(f"Rate: u/{reddit_user.name}: {window_size} : {remaining} : {used} : {seconds_to_reset:.2f} : {time_to_next_request:.3f} ") + + return + + +def chunk_list(items, chunk_size): + for i in range(0, len(items), chunk_size): + yield items[i:i + chunk_size] + + +def query_pushshift(ids, bearer, object_type): + object_name = "comment" if object_type == ObjectType.COMMENT else "submission" + url = f"https://api.pushshift.io/reddit/{object_name}/search?limit=1000&ids={','.join(ids)}" + log.debug(f"pushshift query: {url}") + response = None + for i in range(4): + response = requests.get(url, headers={ + 'User-Agent': "In script by /u/Watchful1", + 'Authorization': f"Bearer {bearer}"}) + if response.status_code == 200: + break + if response.status_code == 403: + log.warning(f"Pushshift unauthorized, aborting") + sys.exit(2) + time.sleep(2) + if response.status_code != 200: + log.warning(f"4 requests failed with status code {response.status_code}") + return response.json()['data'] diff --git a/personal/transform/copy_listed_files.py b/personal/move/copy_listed_files.py similarity index 100% rename from personal/transform/copy_listed_files.py rename to personal/move/copy_listed_files.py diff --git a/personal/move/move_files.py b/personal/move/move_files.py new file mode 100644 index 0000000..4b38856 --- /dev/null +++ b/personal/move/move_files.py @@ -0,0 +1,43 @@ +import os +import discord_logging +import re +from datetime import datetime + +log = discord_logging.init_logging() + + +if __name__ == "__main__": + parent_folder = r"\\MYCLOUDPR4100\Public\ingest" + folders = [r"ingest\comments",r"ingest\submissions",r"rescan\comments",r"rescan\submissions"] + reg = re.compile(r"\d\d-\d\d-\d\d_\d\d-\d\d") + for folder in folders: + files = [] + created_date_folders = set() + folder_path = os.path.join(parent_folder, folder) + for file in os.listdir(folder_path): + file_path = os.path.join(folder_path, file) + if file.endswith(".zst"): + files.append(file) + log.info(f"{folder}: {len(files):,}") + + count_moved = 0 + for file in files: + match = reg.search(file) + if not match: + log.info(f"File doesn't match regex: {file}") + continue + file_date = datetime.strptime(match.group(), '%y-%m-%d_%H-%M') + date_folder_name = file_date.strftime('%y-%m-%d') + date_folder_path = os.path.join(folder_path, date_folder_name) + if date_folder_name not in created_date_folders: + log.info(f"Creating folder: {date_folder_path}") + if not os.path.exists(date_folder_path): + os.makedirs(date_folder_path) + created_date_folders.add(date_folder_name) + old_file_path = os.path.join(folder_path, file) + new_file_path = os.path.join(date_folder_path, file) + os.rename(old_file_path, new_file_path) + count_moved += 1 + if count_moved % 100 == 0: + log.info(f"{count_moved:,}/{len(files):,}: {folder}") + log.info(f"{count_moved:,}/{len(files):,}: {folder}") diff --git a/personal/opt_in_quarantined.py b/personal/opt_in_quarantined.py new file mode 100644 index 0000000..384d01d --- /dev/null +++ b/personal/opt_in_quarantined.py @@ -0,0 +1,26 @@ +import asyncpraw +import requests +import asyncio + + +async def opt_in(reddit, subreddit_name): + subreddit = await reddit.subreddit(subreddit_name) + await subreddit.quaran.opt_in() + + +async def main(subreddits): + reddit = asyncpraw.Reddit("Watchful12") + for subreddit_name in subreddits: + print(f"r/{subreddit_name}") + try: + subreddit = await reddit.subreddit(subreddit_name) + await subreddit.quaran.opt_in() + except Exception as err: + print(f"Error opting into r/{subreddit_name} : {err}") + await reddit.close() + + +if __name__ == "__main__": + subreddits = requests.get("https://pastebin.com/raw/WKi36t1w").text.split("\r\n") + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + asyncio.run(main(subreddits)) diff --git a/personal/transform/split_by_minutes.py b/personal/transform/split_by_minutes.py new file mode 100644 index 0000000..c814a73 --- /dev/null +++ b/personal/transform/split_by_minutes.py @@ -0,0 +1,45 @@ +import discord_logging +import os +import zstandard +from datetime import datetime +import json + +log = discord_logging.init_logging() + +import utils + +NEWLINE_ENCODED = "\n".encode('utf-8') + + +if __name__ == "__main__": + input_file = r"\\MYCLOUDPR4100\Public\RS_2023-07.zst" + output_folder = r"\\MYCLOUDPR4100\Public\ingest\download" + file_type = "comments" if "RC" in input_file else "submissions" + + previous_minute, output_handle, created_utc = None, None, None + count_objects, count_minute = 0, 0 + for obj in utils.read_obj_zst(input_file): + created_utc = datetime.utcfromtimestamp(obj["created_utc"]) + current_minute = created_utc.replace(second=0) + + if previous_minute is None or current_minute > previous_minute: + log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}") + previous_minute = current_minute + count_minute = 0 + if output_handle is not None: + output_handle.close() + + output_path = os.path.join(output_folder, file_type, created_utc.strftime('%y-%m-%d')) + if not os.path.exists(output_path): + os.makedirs(output_path) + output_path = os.path.join(output_path, f"{('RC' if file_type == 'comments' else 'RS')}_{created_utc.strftime('%y-%m-%d_%H-%M')}.zst") + output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb')) + + count_objects += 1 + count_minute += 1 + output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8')) + output_handle.write(NEWLINE_ENCODED) + + log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}") + if output_handle is not None: + output_handle.close()