from datetime import datetime import os import discord_logging import sys import zstandard import json from enum import Enum from sortedcontainers import SortedList from collections import defaultdict log = discord_logging.get_logger() import utils import merge NEWLINE_ENCODED = "\n".encode('utf-8') class ApiRequest: def __init__(self, ids, is_submission, ingest_name, estimated_datetime=None, missing_expected=False): self.ids = ids self.is_submission = is_submission self.ingest_name = ingest_name self.estimated_datetime = estimated_datetime self.missing_expected = missing_expected self.results = None self.complete = False self.tries = 0 self.prev_lengths = [] def should_retry(self): if self.complete: return False # the request is complete, no need to retry if len(self.prev_lengths) <= 1: return True # we've only made one attempt and it didn't work, do retry if self.prev_lengths[-1] == 0: if len(self.prev_lengths) < (10 if self.missing_expected else 100): return True # the most recent result was 0 objects, retry up to 100 times else: log.info(f"Force finished request with retries: {self}") self.complete = True return False if self.prev_lengths[-1] == self.prev_lengths[-2]: if self.missing_expected: self.complete = True return False # the latest two requests were the same and we're expecting missing objects, mark as complete elif len(self.prev_lengths) >= 4 and \ self.prev_lengths[-1] == self.prev_lengths[-3] and \ self.prev_lengths[-1] == self.prev_lengths[-4]: log.info(f"Force finished request with retries: {self}") self.complete = True return False # the latest four requests were the same, go ahead and mark as complete return True # recent requests didn't match, and weren't 0, go ahead and retry def get_body_key(self): return "self_text" if self.is_submission else "body" def get_string_type(self): return "submission" if self.is_submission else "comment" def get_prefix(self): return "t3_" if self.is_submission else "t1_" def set_results(self, results): self.prev_lengths.append(len(results)) self.results = [] current_timestamp = int(datetime.utcnow().timestamp()) for result in results: obj = result['data'] if 'body_html' in obj: del obj['body_html'] if 'selftext_html' in obj: del obj['selftext_html'] obj['retrieved_on'] = current_timestamp self.results.append(obj) log.debug(f"Set result: {self}") def id_string(self): return f"{self.get_prefix()}{(f',{self.get_prefix()}'.join(self.ids))}" def __str__(self): return \ f"{self.ingest_name}: {self.ids[0]}-{self.ids[-1]} {self.get_string_type()}: " \ f"{len(self.results) if self.results else self.results} : {self.tries} : " \ f"{self.complete} : {','.join([str(val) for val in self.prev_lengths])}" def __gt__(self, other): if isinstance(other, ApiRequest): return False return True def __lt__(self, other): if isinstance(other, ApiRequest): return True return False def __eq__(self, other): if isinstance(other, ApiRequest): return True return False class Queue: def __init__(self, max_size): self.list = [] self.max_size = max_size def put(self, item): if len(self.list) >= self.max_size: self.list.pop(0) self.list.append(item) def peek(self): return self.list[0] if len(self.list) > 0 else None class OutputHandle: def __init__(self, is_submission, dump_folder): self.handle = None self.current_path = None self.current_minute = None self.is_submission = is_submission self.dump_folder = dump_folder if not os.path.exists(dump_folder): os.makedirs(dump_folder) def matched_minute(self, new_date_time): return self.current_minute is not None and new_date_time.minute == self.current_minute def get_path(self, date_folder, export_filename, increment=None): folder = f"{self.dump_folder}{os.path.sep}{date_folder}" if not os.path.exists(folder): os.makedirs(folder) bldr = [folder] bldr.append(os.path.sep) if self.is_submission: bldr.append("RS_") else: bldr.append("RC_") bldr.append(export_filename) if increment is not None: bldr.append("_") bldr.append(str(increment)) bldr.append(".zst") return ''.join(bldr) def rollover_to_minute(self, date_time): if self.handle is not None: self.handle.close() os.rename(self.current_path + ".tmp", self.current_path) date_folder = date_time.strftime('%y-%m-%d') export_filename = date_time.strftime('%y-%m-%d_%H-%M') export_path = self.get_path(date_folder, export_filename) if os.path.exists(export_path + ".tmp"): os.rename(export_path + ".tmp", export_path) i = 0 while os.path.exists(export_path): log.info(f"Dump exists, incrementing: {export_path}") i += 1 export_path = self.get_path(date_folder, export_filename, i) if i > 100: log.warning(f"Something went wrong, more than 100 dumps for minute, aborting") sys.exit(3) self.current_path = export_path self.handle = zstandard.ZstdCompressor().stream_writer(open(export_path + ".tmp", 'wb')) self.current_minute = date_time.minute def write_object(self, obj): self.handle.write(json.dumps(obj, sort_keys=True).encode('utf-8')) self.handle.write(NEWLINE_ENCODED) def flush(self): self.handle.flush() def close(self): if self.handle is not None: self.handle.close() class IngestType(Enum): INGEST = 1 RESCAN = 2 DOWNLOAD = 3 PUSHSHIFT = 4 BACKFILL = 5 class ObjectDict: def __init__(self, min_datetime, max_datetime, obj_type): self.min_datetime = min_datetime self.max_datetime = max_datetime self.obj_type = obj_type self.counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) self.min_id = None self.max_id = None self.by_id = {} self.by_minute = defaultdict(ObjectMinuteList) def contains_id(self, str_id): return str_id in self.by_id def delete_object_id(self, str_id): del self.by_id[str_id] def delete_objects_below_minute(self, delete_below_minute): for minute, minute_list in self.by_minute.items(): if minute < delete_below_minute: for obj in minute_list.obj_list: self.delete_object_id(obj['id']) def rebuild_minute_dict(self): self.by_minute = defaultdict(ObjectMinuteList) for obj in self.by_id.values(): created_minute = datetime.utcfromtimestamp(obj["created_utc"]).replace(second=0, microsecond=0) self.by_minute[created_minute].add(obj) def count_minutes(self): return len(self.by_minute) @staticmethod def get_counts_string_from_dict(counts_dict, ingest_types): bldr = [] for ingest_type in ingest_types: if ingest_type in counts_dict: bldr.append(f"{counts_dict[ingest_type][True]}({counts_dict[ingest_type][False]})") else: bldr.append("0(0)") return "|".join(bldr) def get_counts_string_by_minute(self, minute, ingest_types): return ObjectDict.get_counts_string_from_dict(self.counts[minute], ingest_types) def get_counts_string(self): sum_dict = defaultdict(lambda: defaultdict(int)) for counts_dict in self.counts.values(): for ingest_type in IngestType: if ingest_type in counts_dict: sum_dict[ingest_type][True] += counts_dict[ingest_type][True] sum_dict[ingest_type][False] += counts_dict[ingest_type][False] return ObjectDict.get_counts_string_from_dict(sum_dict, IngestType) def get_missing_ids_by_minutes(self, start_minute, end_minute): start_id = self.by_minute[start_minute].min_id end_id = self.by_minute[end_minute].max_id missing_ids = [] for int_id in range(start_id, end_id + 1): string_id = utils.base36encode(int_id) if not self.contains_id(string_id): missing_ids.append(string_id) return missing_ids def add_object(self, obj, ingest_type): created_utc = datetime.utcfromtimestamp(obj["created_utc"]) created_minute = created_utc.replace(second=0, microsecond=0) if obj['id'] in self.by_id: existing_obj = self.by_id[obj['id']] unmatched_field = merge.merge_fields(existing_obj, obj, self.obj_type) self.counts[created_minute][ingest_type][False] += 1 return unmatched_field if created_utc < self.min_datetime or created_utc > self.max_datetime: return False unmatched_field = merge.parse_fields(obj, self.obj_type) self.by_id[obj['id']] = obj self.by_minute[created_minute].add(obj) self.counts[created_minute][ingest_type][True] += 1 self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id) return unmatched_field class ObjectMinuteList: def __init__(self): self.obj_list = SortedList(key=lambda x: f"{x['created_utc']}:{x['id']}") self.min_id = None self.max_id = None def add(self, obj): self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id) self.obj_list.add(obj)