diff --git a/personal/combine/build_day.py b/personal/combine/build_day.py index e4008e0..50f09d7 100644 --- a/personal/combine/build_day.py +++ b/personal/combine/build_day.py @@ -15,7 +15,7 @@ import logging.handlers sys.path.append('personal') -log = discord_logging.init_logging(debug=False) +log = discord_logging.init_logging(debug=True) import utils import classes @@ -133,10 +133,10 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit) working_highest_minute = last_minute_of_day else: working_highest_minute = minute_iterator - timedelta(minutes=1) - missing_ids = objects.get_missing_ids_by_minutes(working_lowest_minute, working_highest_minute) + missing_ids, start_id, end_id = objects.get_missing_ids_by_minutes(working_lowest_minute, working_highest_minute) log.debug( - f"Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} to " - f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} with {len(missing_ids)} ids") + f"Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(start_id)}|{start_id}) to " + f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(end_id)}|{end_id}) with {len(missing_ids)} ({end_id - start_id}) ids") for chunk in utils.chunk_list(missing_ids, 50): pushshift_objects, pushshift_token = query_pushshift(chunk, pushshift_token, object_type) @@ -150,6 +150,10 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit) if objects.add_object(reddit_object['data'], IngestType.BACKFILL): unmatched_field = True + for missing_id in missing_ids: + if missing_id not in objects.by_id: + objects.add_missing_object(missing_id) + objects.delete_objects_below_minute(working_lowest_minute) while working_lowest_minute <= working_highest_minute: folder = os.path.join(output_folder, file_type, working_lowest_minute.strftime('%y-%m-%d')) @@ -164,7 +168,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit) objects.delete_object_id(obj['id']) log.info( f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : " - f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL])}") + f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL, IngestType.MISSING])}") output_handle.close() working_lowest_minute += timedelta(minutes=1) diff --git a/personal/combine/classes.py b/personal/combine/classes.py index 90a2dc6..fdcbc7b 100644 --- a/personal/combine/classes.py +++ b/personal/combine/classes.py @@ -186,6 +186,7 @@ class IngestType(Enum): DOWNLOAD = 3 PUSHSHIFT = 4 BACKFILL = 5 + MISSING = 6 class ObjectDict: @@ -233,7 +234,13 @@ class ObjectDict: return "|".join(bldr) def get_counts_string_by_minute(self, minute, ingest_types): - return ObjectDict.get_counts_string_from_dict(self.counts[minute], ingest_types) + count_string = ObjectDict.get_counts_string_from_dict(self.counts[minute], ingest_types) + minute_dict = self.by_minute.get(minute) + if minute_dict is None: + range_string = "" + else: + range_string = f" - {len(minute_dict.obj_list)} ({minute_dict.max_id - minute_dict.min_id}) ({utils.base36encode(minute_dict.min_id)}-{utils.base36encode(minute_dict.max_id)})" + return count_string + range_string def get_counts_string(self): sum_dict = defaultdict(lambda: defaultdict(int)) @@ -242,6 +249,7 @@ class ObjectDict: if ingest_type in counts_dict: sum_dict[ingest_type][True] += counts_dict[ingest_type][True] sum_dict[ingest_type][False] += counts_dict[ingest_type][False] + return ObjectDict.get_counts_string_from_dict(sum_dict, IngestType) def get_missing_ids_by_minutes(self, start_minute, end_minute): @@ -252,7 +260,7 @@ class ObjectDict: string_id = utils.base36encode(int_id) if not self.contains_id(string_id): missing_ids.append(string_id) - return missing_ids + return missing_ids, start_id, end_id def add_object(self, obj, ingest_type): created_utc = datetime.utcfromtimestamp(obj["created_utc"]) @@ -271,6 +279,17 @@ class ObjectDict: self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id) return unmatched_field + def add_missing_object(self, obj_id): + if obj_id in self.by_id: + return + int_id = utils.base36decode(obj_id) + for minute, minute_dict in self.by_minute.items(): + if minute_dict.min_id is None: + continue + if minute_dict.min_id < int_id < minute_dict.max_id: + self.counts[minute][IngestType.MISSING][True] += 1 + return + class ObjectMinuteList: def __init__(self): diff --git a/personal/transform/split_by_minutes.py b/personal/transform/split_by_minutes.py index a3f4f65..87378e5 100644 --- a/personal/transform/split_by_minutes.py +++ b/personal/transform/split_by_minutes.py @@ -12,7 +12,7 @@ NEWLINE_ENCODED = "\n".encode('utf-8') if __name__ == "__main__": - input_file = r"\\MYCLOUDPR4100\Public\RS_2023-05.zst" + input_file = r"\\MYCLOUDPR4100\Public\RC_2023-05.zst" output_folder = r"\\MYCLOUDPR4100\Public\ingest\download" file_type = "comments" if "RC" in input_file else "submissions"