mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-29 01:18:34 -04:00
Build by minute so we don't have to redo a whole day if there's a problem
This commit is contained in:
parent
3d453dc6e5
commit
9ad5d9c06f
1 changed files with 21 additions and 17 deletions
|
@ -45,12 +45,16 @@ def query_pushshift(ids, bearer, object_type):
|
||||||
return response.json()['data']
|
return response.json()['data']
|
||||||
|
|
||||||
|
|
||||||
|
def end_of_day(input_minute):
|
||||||
|
return input_minute.replace(hour=0, minute=0, second=0) + timedelta(days=1)
|
||||||
|
|
||||||
|
|
||||||
def build_day(day_to_process, input_folders, output_folder, object_type, reddit, pushshift_token):
|
def build_day(day_to_process, input_folders, output_folder, object_type, reddit, pushshift_token):
|
||||||
file_type = "comments" if object_type == ObjectType.COMMENT else "submissions"
|
file_type = "comments" if object_type == ObjectType.COMMENT else "submissions"
|
||||||
|
|
||||||
file_minutes = {}
|
file_minutes = {}
|
||||||
minute_iterator = day_to_process - timedelta(minutes=2)
|
minute_iterator = day_to_process - timedelta(minutes=2)
|
||||||
end_time = day_to_process + timedelta(days=1, minutes=2)
|
end_time = end_of_day(day_to_process) + timedelta(minutes=2)
|
||||||
while minute_iterator <= end_time:
|
while minute_iterator <= end_time:
|
||||||
file_minutes[minute_iterator] = []
|
file_minutes[minute_iterator] = []
|
||||||
minute_iterator += timedelta(minutes=1)
|
minute_iterator += timedelta(minutes=1)
|
||||||
|
@ -64,20 +68,14 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
||||||
log.info(f"File doesn't match regex: {file}")
|
log.info(f"File doesn't match regex: {file}")
|
||||||
continue
|
continue
|
||||||
file_date = datetime.strptime(match.group(), '%y-%m-%d_%H-%M')
|
file_date = datetime.strptime(match.group(), '%y-%m-%d_%H-%M')
|
||||||
|
if file_date in file_minutes:
|
||||||
file_minutes[file_date].append((os.path.join(merge_date_folder, file), ingest_type))
|
file_minutes[file_date].append((os.path.join(merge_date_folder, file), ingest_type))
|
||||||
|
|
||||||
output_path = os.path.join(output_folder, file_type)
|
|
||||||
if not os.path.exists(output_path):
|
|
||||||
os.makedirs(output_path)
|
|
||||||
output_path = os.path.join(output_path, f"{('RC' if file_type == 'comments' else 'RS')}_{day_to_process.strftime('%y-%m-%d')}.zst")
|
|
||||||
output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb'))
|
|
||||||
|
|
||||||
objects = classes.ObjectDict(day_to_process, day_to_process + timedelta(days=1) - timedelta(seconds=1), object_type)
|
objects = classes.ObjectDict(day_to_process, day_to_process + timedelta(days=1) - timedelta(seconds=1), object_type)
|
||||||
unmatched_field = False
|
unmatched_field = False
|
||||||
minute_iterator = day_to_process - timedelta(minutes=2)
|
minute_iterator = day_to_process - timedelta(minutes=2)
|
||||||
working_lowest_minute = day_to_process
|
working_lowest_minute = day_to_process
|
||||||
last_minute_of_day = day_to_process + timedelta(days=1) - timedelta(minutes=1)
|
last_minute_of_day = end_of_day(day_to_process) - timedelta(minutes=1)
|
||||||
end_time = day_to_process + timedelta(days=1, minutes=2)
|
|
||||||
while minute_iterator <= end_time:
|
while minute_iterator <= end_time:
|
||||||
for ingest_file, ingest_type in file_minutes[minute_iterator]:
|
for ingest_file, ingest_type in file_minutes[minute_iterator]:
|
||||||
for obj in utils.read_obj_zst(ingest_file):
|
for obj in utils.read_obj_zst(ingest_file):
|
||||||
|
@ -111,6 +109,12 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
||||||
|
|
||||||
objects.delete_objects_below_minute(working_lowest_minute)
|
objects.delete_objects_below_minute(working_lowest_minute)
|
||||||
while working_lowest_minute <= working_highest_minute:
|
while working_lowest_minute <= working_highest_minute:
|
||||||
|
folder = os.path.join(output_folder, file_type, working_lowest_minute.strftime('%y-%m-%d'))
|
||||||
|
if not os.path.exists(folder):
|
||||||
|
os.makedirs(folder)
|
||||||
|
output_path = os.path.join(folder, f"{('RS' if object_type == ObjectType.COMMENT else 'RC')}_{working_lowest_minute.strftime('%y-%m-%d_%H-%M')}.zst")
|
||||||
|
output_handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb'))
|
||||||
|
|
||||||
for obj in objects.by_minute[working_lowest_minute].obj_list:
|
for obj in objects.by_minute[working_lowest_minute].obj_list:
|
||||||
output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8'))
|
output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8'))
|
||||||
output_handle.write(NEWLINE_ENCODED)
|
output_handle.write(NEWLINE_ENCODED)
|
||||||
|
@ -118,6 +122,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
||||||
log.info(
|
log.info(
|
||||||
f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : "
|
f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : "
|
||||||
f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL])}")
|
f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL])}")
|
||||||
|
output_handle.close()
|
||||||
working_lowest_minute += timedelta(minutes=1)
|
working_lowest_minute += timedelta(minutes=1)
|
||||||
|
|
||||||
objects.rebuild_minute_dict()
|
objects.rebuild_minute_dict()
|
||||||
|
@ -128,15 +133,14 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
||||||
|
|
||||||
minute_iterator += timedelta(minutes=1)
|
minute_iterator += timedelta(minutes=1)
|
||||||
|
|
||||||
output_handle.close()
|
|
||||||
log.info(f"Finished day {day_to_process.strftime('%y-%m-%d')}: {objects.get_counts_string()}")
|
log.info(f"Finished day {day_to_process.strftime('%y-%m-%d')}: {objects.get_counts_string()}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(description="Combine the ingest and rescan files, clean and do pushshift lookups as needed")
|
parser = argparse.ArgumentParser(description="Combine the ingest and rescan files, clean and do pushshift lookups as needed")
|
||||||
parser.add_argument("--type", help="The object type, either comments or submissions", required=True)
|
parser.add_argument("--type", help="The object type, either comments or submissions", required=True)
|
||||||
parser.add_argument("--start_date", help="The start of the date range to process, format YY-MM-DD", required=True)
|
parser.add_argument("--start_date", help="The start of the date range to process, format YY-MM-DD_HH-MM", required=True)
|
||||||
parser.add_argument("--end_date", help="The end of the date range to process, format YY-MM-DD. If not provided, the script processed only one day")
|
parser.add_argument("--end_date", help="The end of the date range to process, format YY-MM-DD_HH-MM. If not provided, the script processes to the end of the day")
|
||||||
parser.add_argument('--input', help='Input folder', required=True)
|
parser.add_argument('--input', help='Input folder', required=True)
|
||||||
parser.add_argument('--output', help='Output folder', required=True)
|
parser.add_argument('--output', help='Output folder', required=True)
|
||||||
parser.add_argument('--pushshift', help='The pushshift token', required=True)
|
parser.add_argument('--pushshift', help='The pushshift token', required=True)
|
||||||
|
@ -151,10 +155,10 @@ if __name__ == "__main__":
|
||||||
if args.start_date is None:
|
if args.start_date is None:
|
||||||
log.error(f"No start date provided")
|
log.error(f"No start date provided")
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
start_date = datetime.strptime(args.start_date, '%y-%m-%d')
|
start_date = datetime.strptime(args.start_date, '%y-%m-%d_%H-%M')
|
||||||
end_date = start_date
|
end_date = end_of_day(start_date)
|
||||||
if args.end_date is not None:
|
if args.end_date is not None:
|
||||||
end_date = datetime.strptime(args.end_date, '%y-%m-%d')
|
end_date = datetime.strptime(args.end_date, '%y-%m-%d_%H-%M')
|
||||||
|
|
||||||
for input_folder, ingest_type in input_folders:
|
for input_folder, ingest_type in input_folders:
|
||||||
log.info(f"Input folder: {input_folder}")
|
log.info(f"Input folder: {input_folder}")
|
||||||
|
@ -174,6 +178,6 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
while start_date <= end_date:
|
while start_date <= end_date:
|
||||||
build_day(start_date, input_folders, args.output, object_type, reddit, args.pushshift)
|
build_day(start_date, input_folders, args.output, object_type, reddit, args.pushshift)
|
||||||
start_date = start_date + timedelta(days=1)
|
start_date = end_of_day(start_date)
|
||||||
|
|
||||||
#log.info(f"{len(file_minutes)} : {count_ingest_minutes} : {count_rescan_minutes} : {day_highest_id - day_lowest_id:,} - {count_objects:,} = {(day_highest_id - day_lowest_id) - count_objects:,}: {utils.base36encode(day_lowest_id)}-{utils.base36encode(day_highest_id)}")
|
#log.info(f"{len(file_minutes)} : {count_ingest_minutes} : {count_rescan_minutes} : {day_highest_id - day_lowest_id:,} - {count_objects:,} = {(day_highest_id - day_lowest_id) - count_objects:,}: {utils.base36encode(day_lowest_id)}-{utils.base36encode(day_highest_id)}")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue