mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-03 02:46:40 -04:00
More detailed logging
This commit is contained in:
parent
574fdc43b4
commit
c953c1e18a
3 changed files with 31 additions and 8 deletions
|
@ -15,7 +15,7 @@ import logging.handlers
|
|||
|
||||
sys.path.append('personal')
|
||||
|
||||
log = discord_logging.init_logging(debug=False)
|
||||
log = discord_logging.init_logging(debug=True)
|
||||
|
||||
import utils
|
||||
import classes
|
||||
|
@ -133,10 +133,10 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit)
|
|||
working_highest_minute = last_minute_of_day
|
||||
else:
|
||||
working_highest_minute = minute_iterator - timedelta(minutes=1)
|
||||
missing_ids = objects.get_missing_ids_by_minutes(working_lowest_minute, working_highest_minute)
|
||||
missing_ids, start_id, end_id = objects.get_missing_ids_by_minutes(working_lowest_minute, working_highest_minute)
|
||||
log.debug(
|
||||
f"Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} to "
|
||||
f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} with {len(missing_ids)} ids")
|
||||
f"Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(start_id)}|{start_id}) to "
|
||||
f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(end_id)}|{end_id}) with {len(missing_ids)} ({end_id - start_id}) ids")
|
||||
|
||||
for chunk in utils.chunk_list(missing_ids, 50):
|
||||
pushshift_objects, pushshift_token = query_pushshift(chunk, pushshift_token, object_type)
|
||||
|
@ -150,6 +150,10 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit)
|
|||
if objects.add_object(reddit_object['data'], IngestType.BACKFILL):
|
||||
unmatched_field = True
|
||||
|
||||
for missing_id in missing_ids:
|
||||
if missing_id not in objects.by_id:
|
||||
objects.add_missing_object(missing_id)
|
||||
|
||||
objects.delete_objects_below_minute(working_lowest_minute)
|
||||
while working_lowest_minute <= working_highest_minute:
|
||||
folder = os.path.join(output_folder, file_type, working_lowest_minute.strftime('%y-%m-%d'))
|
||||
|
@ -164,7 +168,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit)
|
|||
objects.delete_object_id(obj['id'])
|
||||
log.info(
|
||||
f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : "
|
||||
f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL])}")
|
||||
f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL, IngestType.MISSING])}")
|
||||
output_handle.close()
|
||||
working_lowest_minute += timedelta(minutes=1)
|
||||
|
||||
|
|
|
@ -186,6 +186,7 @@ class IngestType(Enum):
|
|||
DOWNLOAD = 3
|
||||
PUSHSHIFT = 4
|
||||
BACKFILL = 5
|
||||
MISSING = 6
|
||||
|
||||
|
||||
class ObjectDict:
|
||||
|
@ -233,7 +234,13 @@ class ObjectDict:
|
|||
return "|".join(bldr)
|
||||
|
||||
def get_counts_string_by_minute(self, minute, ingest_types):
|
||||
return ObjectDict.get_counts_string_from_dict(self.counts[minute], ingest_types)
|
||||
count_string = ObjectDict.get_counts_string_from_dict(self.counts[minute], ingest_types)
|
||||
minute_dict = self.by_minute.get(minute)
|
||||
if minute_dict is None:
|
||||
range_string = ""
|
||||
else:
|
||||
range_string = f" - {len(minute_dict.obj_list)} ({minute_dict.max_id - minute_dict.min_id}) ({utils.base36encode(minute_dict.min_id)}-{utils.base36encode(minute_dict.max_id)})"
|
||||
return count_string + range_string
|
||||
|
||||
def get_counts_string(self):
|
||||
sum_dict = defaultdict(lambda: defaultdict(int))
|
||||
|
@ -242,6 +249,7 @@ class ObjectDict:
|
|||
if ingest_type in counts_dict:
|
||||
sum_dict[ingest_type][True] += counts_dict[ingest_type][True]
|
||||
sum_dict[ingest_type][False] += counts_dict[ingest_type][False]
|
||||
|
||||
return ObjectDict.get_counts_string_from_dict(sum_dict, IngestType)
|
||||
|
||||
def get_missing_ids_by_minutes(self, start_minute, end_minute):
|
||||
|
@ -252,7 +260,7 @@ class ObjectDict:
|
|||
string_id = utils.base36encode(int_id)
|
||||
if not self.contains_id(string_id):
|
||||
missing_ids.append(string_id)
|
||||
return missing_ids
|
||||
return missing_ids, start_id, end_id
|
||||
|
||||
def add_object(self, obj, ingest_type):
|
||||
created_utc = datetime.utcfromtimestamp(obj["created_utc"])
|
||||
|
@ -271,6 +279,17 @@ class ObjectDict:
|
|||
self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id)
|
||||
return unmatched_field
|
||||
|
||||
def add_missing_object(self, obj_id):
|
||||
if obj_id in self.by_id:
|
||||
return
|
||||
int_id = utils.base36decode(obj_id)
|
||||
for minute, minute_dict in self.by_minute.items():
|
||||
if minute_dict.min_id is None:
|
||||
continue
|
||||
if minute_dict.min_id < int_id < minute_dict.max_id:
|
||||
self.counts[minute][IngestType.MISSING][True] += 1
|
||||
return
|
||||
|
||||
|
||||
class ObjectMinuteList:
|
||||
def __init__(self):
|
||||
|
|
|
@ -12,7 +12,7 @@ NEWLINE_ENCODED = "\n".encode('utf-8')
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
input_file = r"\\MYCLOUDPR4100\Public\RS_2023-05.zst"
|
||||
input_file = r"\\MYCLOUDPR4100\Public\RC_2023-05.zst"
|
||||
output_folder = r"\\MYCLOUDPR4100\Public\ingest\download"
|
||||
file_type = "comments" if "RC" in input_file else "submissions"
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue