mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-08-01 10:56:04 -04:00
Initial implentation of process_month.py
This commit is contained in:
parent
b54a2483dc
commit
fa5f6316fb
10 changed files with 703 additions and 369 deletions
|
@ -15,7 +15,7 @@ import logging.handlers
|
|||
|
||||
sys.path.append('personal')
|
||||
|
||||
log = discord_logging.init_logging(debug=False)
|
||||
log = discord_logging.get_logger(init=True)
|
||||
|
||||
import utils
|
||||
import classes
|
||||
|
@ -27,6 +27,57 @@ NEWLINE_ENCODED = "\n".encode('utf-8')
|
|||
reg = re.compile(r"\d\d-\d\d-\d\d_\d\d-\d\d")
|
||||
|
||||
|
||||
def build_month(month, input_folder, output_folder, file_type, compression_level):
|
||||
total_objects = 0
|
||||
total_bytes = 0
|
||||
minute_iterator = month
|
||||
if month.month == 12:
|
||||
end_time = month.replace(year=month.year + 1, month=1)
|
||||
else:
|
||||
end_time = month.replace(month=month.month + 1)
|
||||
while minute_iterator < end_time:
|
||||
minute_file_path = os.path.join(input_folder, file_type, minute_iterator.strftime('%y-%m-%d'), f"{prefix}_{minute_iterator.strftime('%y-%m-%d_%H-%M')}.zst")
|
||||
for obj, line, _ in utils.read_obj_zst_meta(minute_file_path):
|
||||
total_bytes += len(line.encode('utf-8'))
|
||||
total_bytes += 1
|
||||
|
||||
total_objects += 1
|
||||
if total_objects % 1000000 == 0:
|
||||
log.info(f"{file_type}: Counting: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {total_objects:,} : {total_bytes:,}")
|
||||
|
||||
minute_iterator += timedelta(minutes=1)
|
||||
|
||||
log.info(f"{file_type}: Counting: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {total_objects:,} : {total_bytes:,}")
|
||||
|
||||
output_path = os.path.join(output_folder, file_type, f"{prefix}_{month.strftime('%Y-%m')}.zst")
|
||||
output_handle = zstandard.ZstdCompressor(level=compression_level, write_content_size=True, write_checksum=True, threads=-1).stream_writer(open(output_path, 'wb'), size=total_bytes)
|
||||
|
||||
count_objects = 0
|
||||
count_bytes = 0
|
||||
minute_iterator = month
|
||||
if month.month == 12:
|
||||
end_time = month.replace(year=month.year + 1, month=1)
|
||||
else:
|
||||
end_time = month.replace(month=month.month + 1)
|
||||
while minute_iterator < end_time:
|
||||
minute_file_path = os.path.join(input_folder, file_type, minute_iterator.strftime('%y-%m-%d'), f"{prefix}_{minute_iterator.strftime('%y-%m-%d_%H-%M')}.zst")
|
||||
for obj, line, _ in utils.read_obj_zst_meta(minute_file_path):
|
||||
line_encoded = line.encode('utf-8')
|
||||
count_bytes += len(line_encoded)
|
||||
count_bytes += 1
|
||||
output_handle.write(line_encoded)
|
||||
output_handle.write(NEWLINE_ENCODED)
|
||||
|
||||
count_objects += 1
|
||||
if count_objects % 100000 == 0:
|
||||
log.info(f"{file_type}: Writing: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {count_objects:,}/{total_objects:,} : {count_bytes:,}/{total_bytes:,}")
|
||||
|
||||
minute_iterator += timedelta(minutes=1)
|
||||
|
||||
log.info(f"{file_type}: Writing: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {count_objects:,}/{total_objects:,} : {count_bytes:,}/{total_bytes:,}")
|
||||
output_handle.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Combine the minute files into a single month")
|
||||
parser.add_argument("--type", help="The object type, either comments or submissions", required=True)
|
||||
|
@ -57,51 +108,10 @@ if __name__ == "__main__":
|
|||
log.error(f"Invalid type: {args.type}")
|
||||
sys.exit(2)
|
||||
|
||||
total_objects = 0
|
||||
total_bytes = 0
|
||||
minute_iterator = month
|
||||
if month.month == 12:
|
||||
end_time = month.replace(year=month.year + 1, month=1)
|
||||
else:
|
||||
end_time = month.replace(month=month.month + 1)
|
||||
while minute_iterator < end_time:
|
||||
minute_file_path = os.path.join(args.input, args.type, minute_iterator.strftime('%y-%m-%d'), f"{prefix}_{minute_iterator.strftime('%y-%m-%d_%H-%M')}.zst")
|
||||
for obj, line, _ in utils.read_obj_zst_meta(minute_file_path):
|
||||
total_bytes += len(line.encode('utf-8'))
|
||||
total_bytes += 1
|
||||
|
||||
total_objects += 1
|
||||
if total_objects % 1000000 == 0:
|
||||
log.info(f"Counting: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {total_objects:,} : {total_bytes:,}")
|
||||
|
||||
minute_iterator += timedelta(minutes=1)
|
||||
|
||||
log.info(f"Counting: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {total_objects:,} : {total_bytes:,}")
|
||||
|
||||
output_path = os.path.join(args.output, args.type, f"{prefix}_{month.strftime('%Y-%m')}.zst")
|
||||
output_handle = zstandard.ZstdCompressor(level=level, write_content_size=True, write_checksum=True, threads=-1).stream_writer(open(output_path, 'wb'), size=total_bytes)
|
||||
|
||||
count_objects = 0
|
||||
count_bytes = 0
|
||||
minute_iterator = month
|
||||
if month.month == 12:
|
||||
end_time = month.replace(year=month.year + 1, month=1)
|
||||
else:
|
||||
end_time = month.replace(month=month.month + 1)
|
||||
while minute_iterator < end_time:
|
||||
minute_file_path = os.path.join(args.input, args.type, minute_iterator.strftime('%y-%m-%d'), f"{prefix}_{minute_iterator.strftime('%y-%m-%d_%H-%M')}.zst")
|
||||
for obj, line, _ in utils.read_obj_zst_meta(minute_file_path):
|
||||
line_encoded = line.encode('utf-8')
|
||||
count_bytes += len(line_encoded)
|
||||
count_bytes += 1
|
||||
output_handle.write(line_encoded)
|
||||
output_handle.write(NEWLINE_ENCODED)
|
||||
|
||||
count_objects += 1
|
||||
if count_objects % 100000 == 0:
|
||||
log.info(f"Writing: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {count_objects:,}/{total_objects:,} : {count_bytes:,}/{total_bytes:,}")
|
||||
|
||||
minute_iterator += timedelta(minutes=1)
|
||||
|
||||
log.info(f"Writing: {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {count_objects:,}/{total_objects:,} : {count_bytes:,}/{total_bytes:,}")
|
||||
output_handle.close()
|
||||
build_month(
|
||||
month,
|
||||
args.input,
|
||||
args.output,
|
||||
args.type,
|
||||
level
|
||||
)
|
||||
|
|
|
@ -15,7 +15,7 @@ import logging.handlers
|
|||
|
||||
sys.path.append('personal')
|
||||
|
||||
log = discord_logging.init_logging(debug=False)
|
||||
log = discord_logging.get_logger(init=True)
|
||||
|
||||
import utils
|
||||
import classes
|
||||
|
@ -27,6 +27,18 @@ NEWLINE_ENCODED = "\n".encode('utf-8')
|
|||
reg = re.compile(r"\d\d-\d\d-\d\d_\d\d-\d\d")
|
||||
|
||||
|
||||
def get_pushshift_token(old_token):
|
||||
saved_token = load_pushshift_token()
|
||||
if saved_token is None or old_token == saved_token:
|
||||
log.info(f"Requesting new token")
|
||||
result_token = re_auth_pushshift(old_token)
|
||||
save_pushshift_token(result_token)
|
||||
else:
|
||||
result_token = saved_token
|
||||
|
||||
return result_token
|
||||
|
||||
|
||||
def save_pushshift_token(token):
|
||||
with open("pushshift.txt", 'w') as file:
|
||||
file.write(token)
|
||||
|
@ -66,7 +78,7 @@ def re_auth_pushshift(old_token):
|
|||
sys.exit(1)
|
||||
|
||||
|
||||
def query_pushshift(ids, bearer, object_type):
|
||||
def query_pushshift(ids, bearer, object_type, pushshift_token_function):
|
||||
object_name = "comment" if object_type == ObjectType.COMMENT else "submission"
|
||||
url = f"https://api.pushshift.io/reddit/{object_name}/search?limit=1000&ids={','.join(ids)}"
|
||||
log.debug(f"pushshift query: {url}")
|
||||
|
@ -87,7 +99,7 @@ def query_pushshift(ids, bearer, object_type):
|
|||
log.warning(f"Pushshift 403, trying reauth: {response.json()}")
|
||||
log.warning(url)
|
||||
log.warning(f"'Authorization': Bearer {bearer}")
|
||||
bearer = re_auth_pushshift(bearer)
|
||||
bearer = pushshift_token_function(bearer)
|
||||
time.sleep(2)
|
||||
if response.status_code != 200:
|
||||
log.warning(f"4 requests failed with status code {response.status_code}")
|
||||
|
@ -112,12 +124,12 @@ def end_of_day(input_minute):
|
|||
return input_minute.replace(hour=0, minute=0, second=0) + timedelta(days=1)
|
||||
|
||||
|
||||
def build_day(day_to_process, input_folders, output_folder, object_type, reddit, ignore_ids):
|
||||
pushshift_token = load_pushshift_token()
|
||||
log.info(f"Using pushshift token: {pushshift_token}")
|
||||
|
||||
def build_day(day_to_process, input_folders, output_folder, object_type, reddit, ignore_ids, pushshift_token_function):
|
||||
file_type = "comments" if object_type == ObjectType.COMMENT else "submissions"
|
||||
|
||||
pushshift_token = pushshift_token_function(None)
|
||||
log.info(f"{file_type}: Using pushshift token: {pushshift_token}")
|
||||
|
||||
file_minutes = {}
|
||||
minute_iterator = day_to_process - timedelta(minutes=2)
|
||||
end_time = end_of_day(day_to_process) + timedelta(minutes=2)
|
||||
|
@ -131,7 +143,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
|||
for file in os.listdir(merge_date_folder):
|
||||
match = reg.search(file)
|
||||
if not match:
|
||||
log.info(f"File doesn't match regex: {file}")
|
||||
log.info(f"{file_type}: File doesn't match regex: {file}")
|
||||
continue
|
||||
file_date = datetime.strptime(match.group(), '%y-%m-%d_%H-%M')
|
||||
if file_date in file_minutes:
|
||||
|
@ -147,7 +159,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
|||
for obj in utils.read_obj_zst(ingest_file):
|
||||
if objects.add_object(obj, ingest_type):
|
||||
unmatched_field = True
|
||||
log.info(f"Loaded {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {objects.get_counts_string_by_minute(minute_iterator, [IngestType.INGEST, IngestType.RESCAN, IngestType.DOWNLOAD])}")
|
||||
log.info(f"{file_type}: Loaded {minute_iterator.strftime('%y-%m-%d_%H-%M')} : {objects.get_counts_string_by_minute(minute_iterator, [IngestType.INGEST, IngestType.RESCAN, IngestType.DOWNLOAD])}")
|
||||
|
||||
if minute_iterator >= end_time or objects.count_minutes() >= 11:
|
||||
if minute_iterator > last_minute_of_day:
|
||||
|
@ -156,11 +168,11 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
|||
working_highest_minute = minute_iterator - timedelta(minutes=1)
|
||||
missing_ids, start_id, end_id = objects.get_missing_ids_by_minutes(working_lowest_minute, working_highest_minute, ignore_ids)
|
||||
log.debug(
|
||||
f"Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(start_id)}|{start_id}) to "
|
||||
f"{file_type}: Backfilling from: {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(start_id)}|{start_id}) to "
|
||||
f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} ({utils.base36encode(end_id)}|{end_id}) with {len(missing_ids)} ({end_id - start_id}) ids")
|
||||
|
||||
for chunk in utils.chunk_list(missing_ids, 50):
|
||||
pushshift_objects, pushshift_token = query_pushshift(chunk, pushshift_token, object_type)
|
||||
pushshift_objects, pushshift_token = query_pushshift(chunk, pushshift_token, object_type, pushshift_token_function)
|
||||
for pushshift_object in pushshift_objects:
|
||||
if objects.add_object(pushshift_object, IngestType.PUSHSHIFT):
|
||||
unmatched_field = True
|
||||
|
@ -188,7 +200,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
|||
output_handle.write(NEWLINE_ENCODED)
|
||||
objects.delete_object_id(obj['id'])
|
||||
log.info(
|
||||
f"Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : "
|
||||
f"{file_type}: Wrote up to {working_lowest_minute.strftime('%y-%m-%d_%H-%M')} : "
|
||||
f"{objects.get_counts_string_by_minute(working_lowest_minute, [IngestType.PUSHSHIFT, IngestType.BACKFILL, IngestType.MISSING])}")
|
||||
output_handle.close()
|
||||
working_lowest_minute += timedelta(minutes=1)
|
||||
|
@ -197,13 +209,20 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
|
|||
|
||||
discord_logging.flush_discord()
|
||||
if unmatched_field:
|
||||
log.warning(f"Unmatched field, aborting")
|
||||
log.warning(f"{file_type}: Unmatched field, aborting")
|
||||
discord_logging.flush_discord()
|
||||
sys.exit(1)
|
||||
|
||||
minute_iterator += timedelta(minutes=1)
|
||||
|
||||
log.info(f"Finished day {day_to_process.strftime('%y-%m-%d')}: {objects.get_counts_string()}")
|
||||
log.info(f"{file_type}: Finished day {day_to_process.strftime('%y-%m-%d')}: {objects.get_counts_string()}")
|
||||
|
||||
|
||||
def merge_and_backfill(start_date, end_date, input_folders, output_folder, object_type, ignore_ids, reddit_username, pushshift_token_function):
|
||||
reddit = praw.Reddit(reddit_username)
|
||||
while start_date <= end_date:
|
||||
build_day(start_date, input_folders, output_folder, object_type, reddit, ignore_ids, pushshift_token_function)
|
||||
start_date = end_of_day(start_date)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -254,19 +273,22 @@ if __name__ == "__main__":
|
|||
start_id, end_id = id_range.split("-")
|
||||
ignore_ids.append((utils.base36decode(start_id), utils.base36decode(end_id)))
|
||||
|
||||
user_name = "Watchful12"
|
||||
reddit = praw.Reddit(user_name)
|
||||
|
||||
discord_logging.init_discord_logging(
|
||||
section_name=None,
|
||||
log_level=logging.WARNING,
|
||||
logging_webhook=reddit.config.custom["logging_webhook"]
|
||||
section_name="Watchful12",
|
||||
log_level=logging.WARNING
|
||||
)
|
||||
|
||||
if args.pushshift is not None:
|
||||
log.warning(f"Saving pushshift token: {args.pushshift}")
|
||||
save_pushshift_token(args.pushshift)
|
||||
|
||||
while start_date <= end_date:
|
||||
build_day(start_date, input_folders, args.output, object_type, reddit, ignore_ids)
|
||||
start_date = end_of_day(start_date)
|
||||
merge_and_backfill(
|
||||
start_date,
|
||||
end_date,
|
||||
input_folders,
|
||||
args.output,
|
||||
object_type,
|
||||
ignore_ids,
|
||||
"Watchful12",
|
||||
get_pushshift_token
|
||||
)
|
||||
|
|
260
personal/process_month.py
Normal file
260
personal/process_month.py
Normal file
|
@ -0,0 +1,260 @@
|
|||
import sys
|
||||
sys.path.append('personal')
|
||||
sys.path.append('combine')
|
||||
|
||||
import os
|
||||
import argparse
|
||||
import json
|
||||
import time
|
||||
import logging.handlers
|
||||
import requests
|
||||
import praw
|
||||
from datetime import datetime, timedelta
|
||||
import multiprocessing_logging
|
||||
|
||||
import discord_logging
|
||||
import multiprocessing
|
||||
|
||||
multiprocessing_logging.install_mp_handler()
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
import utils
|
||||
from transform import split_blocks_by_minutes
|
||||
from combine.merge_and_backfill import build_day, IngestType, ObjectType
|
||||
from combine import build_month
|
||||
|
||||
|
||||
def get_pushshift_token(old_token):
|
||||
global pushshift_lock
|
||||
pushshift_lock.acquire()
|
||||
saved_token = load_pushshift_token()
|
||||
if saved_token is None or saved_token == "" or old_token == saved_token:
|
||||
if old_token is None:
|
||||
log.warning("No saved or passed in token")
|
||||
save_pushshift_token("")
|
||||
raise ValueError("No saved or passed in token")
|
||||
|
||||
log.info(f"Requesting new token")
|
||||
result_token = re_auth_pushshift(old_token)
|
||||
save_pushshift_token(result_token)
|
||||
else:
|
||||
result_token = saved_token
|
||||
|
||||
pushshift_lock.release()
|
||||
return result_token
|
||||
|
||||
|
||||
def save_pushshift_token(token):
|
||||
with open("pushshift.txt", 'w') as file:
|
||||
file.write(token)
|
||||
|
||||
|
||||
def load_pushshift_token():
|
||||
if not os.path.exists("pushshift.txt"):
|
||||
return None
|
||||
with open("pushshift.txt", 'r') as file:
|
||||
token = file.read().strip()
|
||||
return token
|
||||
|
||||
|
||||
def re_auth_pushshift(old_token):
|
||||
url = f"https://auth.pushshift.io/refresh?access_token={old_token}"
|
||||
log.warning(f"Reauth request: {url}")
|
||||
response = requests.post(url)
|
||||
result = response.json()
|
||||
log.warning(f"Reauth response: {str(result)}")
|
||||
discord_logging.flush_discord()
|
||||
if 'access_token' in result:
|
||||
new_token = result['access_token']
|
||||
log.warning(f"New pushshift token: {new_token}")
|
||||
save_pushshift_token(new_token)
|
||||
discord_logging.flush_discord()
|
||||
return new_token
|
||||
elif 'detail' in result:
|
||||
if result['detail'] == 'Access token is still active and can not be refreshed.':
|
||||
log.warning(f"Access token still active, trying request again")
|
||||
time.sleep(5)
|
||||
return old_token
|
||||
|
||||
log.warning(f"Reauth failed: {result['detail']}")
|
||||
discord_logging.flush_discord()
|
||||
return old_token
|
||||
else:
|
||||
log.warning(f"Something went wrong re-authing")
|
||||
discord_logging.flush_discord()
|
||||
return old_token
|
||||
|
||||
|
||||
def init(p_lock):
|
||||
global pushshift_lock
|
||||
pushshift_lock = p_lock
|
||||
|
||||
|
||||
def save_status(status_json, stages, month):
|
||||
log.debug(f"Saving status: {stages}")
|
||||
output_dict = {
|
||||
"stages": stages,
|
||||
"month": month,
|
||||
}
|
||||
json_string = json.dumps(output_dict, indent=4, default=str)
|
||||
with open(status_json, 'w') as status_json_file:
|
||||
status_json_file.write(json_string)
|
||||
|
||||
|
||||
def load_status(status_json):
|
||||
if os.path.exists(status_json):
|
||||
with open(status_json, 'r') as status_json_file:
|
||||
output_dict = json.load(status_json_file)
|
||||
for stage_type, stage in output_dict["stages"].items():
|
||||
if stage["merge"] is not None:
|
||||
stage["merge"] = datetime.strptime(stage["merge"], "%Y-%m-%d %H:%M:%S")
|
||||
return output_dict["stages"], output_dict["month"]
|
||||
else:
|
||||
stages = {
|
||||
"comment": {
|
||||
"split": False,
|
||||
"merge": None, # 24-02-01
|
||||
"build": False,
|
||||
},
|
||||
"submission": {
|
||||
"split": False,
|
||||
"merge": None, # 24-02-01
|
||||
"build": False,
|
||||
}
|
||||
}
|
||||
return stages, None
|
||||
|
||||
|
||||
def end_of_day(input_minute):
|
||||
return input_minute.replace(hour=0, minute=0, second=0) + timedelta(days=1)
|
||||
|
||||
|
||||
def process(queue, base_folder, month, file_type, type_stages, reddit_username, compression_level, ignore_ids):
|
||||
try:
|
||||
# for stage, status in type_stages.items():
|
||||
# log.info(f"{file_type} {stage}: {status}")
|
||||
file_prefix = "RC" if file_type == "comment" else "RS"
|
||||
if not type_stages["split"]:
|
||||
split_file = os.path.join(base_folder, "reddit", "blocks", f"{file_prefix}_20{month}.zst")
|
||||
if not os.path.exists(split_file):
|
||||
log.info(f"{file_type}: File {split_file} doesn't exist, checking for blocks")
|
||||
split_file = os.path.join(base_folder, "reddit", "blocks", f"{file_prefix}_20{month}.zst_blocks")
|
||||
if not os.path.exists(split_file):
|
||||
log.error(f"{file_type}: File {split_file} doesn't exist, aborting")
|
||||
return False
|
||||
|
||||
split_folder = os.path.join(base_folder, "ingest", "download")
|
||||
|
||||
log.info(f"{file_type}: Starting {file_type} split")
|
||||
log.info(f"{file_type}: Reading from: {split_file}")
|
||||
log.info(f"{file_type}: Writing to: {split_folder}")
|
||||
split_blocks_by_minutes.split_by_minutes(split_file, split_folder)
|
||||
|
||||
log.info(f"{file_type}: {file_type} split complete")
|
||||
queue.put((file_type, "split", True))
|
||||
|
||||
start_date = datetime.strptime(month, "%y-%m")
|
||||
if start_date.month == 12:
|
||||
end_date = start_date.replace(year=start_date.year + 1, month=1)
|
||||
else:
|
||||
end_date = start_date.replace(month=start_date.month + 1)
|
||||
if type_stages["merge"] is None or type_stages["merge"] < end_date:
|
||||
if type_stages["merge"] is not None:
|
||||
start_date = type_stages["merge"]
|
||||
|
||||
log.info(f"{file_type}: Starting {file_type} merge from {start_date}")
|
||||
|
||||
reddit = praw.Reddit(reddit_username)
|
||||
|
||||
input_folders = [
|
||||
(os.path.join(base_folder, "ingest", "ingest"), IngestType.INGEST),
|
||||
(os.path.join(base_folder, "ingest", "rescan"), IngestType.RESCAN),
|
||||
(os.path.join(base_folder, "ingest", "download"), IngestType.DOWNLOAD),
|
||||
]
|
||||
for input_folder in input_folders:
|
||||
log.info(f"{file_type}: Reading from: {input_folder[0]} : {input_folder[1]}")
|
||||
combined_folder = os.path.join(base_folder, "ingest", "combined")
|
||||
log.info(f"{file_type}: Writing to: {combined_folder}")
|
||||
while start_date <= end_date:
|
||||
build_day(
|
||||
start_date,
|
||||
input_folders,
|
||||
combined_folder,
|
||||
ObjectType.COMMENT if file_type == "comment" else ObjectType.SUBMISSION,
|
||||
reddit,
|
||||
ignore_ids,
|
||||
get_pushshift_token
|
||||
)
|
||||
start_date = end_of_day(start_date)
|
||||
queue.put((file_type, "merge", start_date))
|
||||
log.info(f"{file_type}: {file_type} merge complete")
|
||||
|
||||
if not type_stages["build"]:
|
||||
log.info(f"{file_type}: Starting {file_type} build")
|
||||
|
||||
input_folder = os.path.join(base_folder, "ingest", "combined")
|
||||
log.info(f"{file_type}: Reading from: {input_folder}")
|
||||
log.info(f"{file_type}: Writing to: {base_folder}")
|
||||
build_month.build_month(
|
||||
month,
|
||||
input_folder,
|
||||
base_folder,
|
||||
file_type,
|
||||
compression_level
|
||||
)
|
||||
queue.put((file_type, "build", True))
|
||||
log.info(f"{file_type}: {file_type} build complete")
|
||||
|
||||
log.info(f"{file_type}: {file_type} all steps complete")
|
||||
|
||||
# for stage, status in type_stages.items():
|
||||
# log.info(f"{file_type} {stage}: {status}")
|
||||
except Exception as err:
|
||||
queue.put((file_type, "error", str(err)))
|
||||
# for stage, status in type_stages.items():
|
||||
# log.info(f"{file_type} {stage}: {status}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="")
|
||||
parser.add_argument('month', help='Month to process')
|
||||
parser.add_argument('folder', help='Folder under which all the files are stored')
|
||||
parser.add_argument("--ignore_ids", help="Ignore ids between the id ranges listed", default=None)
|
||||
parser.add_argument("--level", help="The compression ratio to output at", default="22")
|
||||
args = parser.parse_args()
|
||||
|
||||
ignore_ids = []
|
||||
if args.ignore_ids is not None:
|
||||
for id_range in args.ignore_ids.split(","):
|
||||
start_id, end_id = id_range.split("-")
|
||||
ignore_ids.append((utils.base36decode(start_id), utils.base36decode(end_id)))
|
||||
|
||||
discord_logging.init_discord_logging(
|
||||
section_name="Watchful12",
|
||||
log_level=logging.WARNING,
|
||||
)
|
||||
|
||||
status_file = "process.json"
|
||||
stages, month = load_status(status_file)
|
||||
|
||||
if month is not None and args.month != month:
|
||||
log.error(f"Month does not match saved month, aborting: {month} : {args.month}")
|
||||
sys.exit(0)
|
||||
month = args.month
|
||||
log.info(f"Processing {month}")
|
||||
|
||||
multiprocessing.set_start_method('spawn')
|
||||
queue = multiprocessing.Manager().Queue()
|
||||
p_lock = multiprocessing.Lock()
|
||||
with multiprocessing.Pool(processes=2, initializer=init, initargs=(p_lock,)) as pool:
|
||||
arguments = []
|
||||
for file_type, type_stages in stages.items():
|
||||
arguments.append((queue, args.folder, month, file_type, type_stages, "Watchful12", args.level, ignore_ids))
|
||||
workers = pool.starmap_async(process, arguments, chunksize=1, error_callback=log.info)
|
||||
while not workers.ready() or not queue.empty():
|
||||
file_type, stage, status = queue.get()
|
||||
if stage == "error":
|
||||
log.error(f"Error in {file_type}: {status}")
|
||||
stages[file_type][stage] = status
|
||||
save_status(status_file, stages, month)
|
||||
#log.info(f"workers {workers.ready()} : queue {queue.empty()}")
|
|
@ -8,39 +8,39 @@ from datetime import datetime
|
|||
import json
|
||||
import argparse
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
log = discord_logging.get_logger(init=True)
|
||||
|
||||
import utils
|
||||
|
||||
NEWLINE_ENCODED = "\n".encode('utf-8')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Take a zst_blocks file and split it by minute chunks")
|
||||
parser.add_argument('--input', help='Input file', required=True)
|
||||
parser.add_argument('--output', help='Output folder', required=True)
|
||||
args = parser.parse_args()
|
||||
def split_by_minutes(input_file, output_file):
|
||||
file_type = "comments" if "RC" in input_file else "submissions"
|
||||
|
||||
# input_file = r"\\MYCLOUDPR4100\Public\reddit\blocks\RS_2023-10.zst_blocks"
|
||||
# output_folder = r"\\MYCLOUDPR4100\Public\ingest\download"
|
||||
file_type = "comments" if "RC" in args.input else "submissions"
|
||||
|
||||
log.info(f"Input file: {args.input}")
|
||||
log.info(f"Output folder: {args.output}")
|
||||
log.info(f"{file_type}: Input file: {input_file}")
|
||||
log.info(f"{file_type}: Output folder: {output_file}")
|
||||
previous_minute, output_handle, created_utc = None, None, None
|
||||
count_objects, count_minute = 0, 0
|
||||
for obj in utils.read_obj_zst_blocks(args.input):
|
||||
if input_file.endswith(".zst"):
|
||||
reader = utils.read_obj_zst(input_file)
|
||||
elif input_file.endswith(".zst_blocks"):
|
||||
reader = utils.read_obj_zst_blocks(input_file)
|
||||
else:
|
||||
log.error(f"{file_type}: Unsupported file type: {input_file}")
|
||||
return
|
||||
for obj in reader:
|
||||
created_utc = datetime.utcfromtimestamp(obj["created_utc"])
|
||||
current_minute = created_utc.replace(second=0)
|
||||
|
||||
if previous_minute is None or current_minute > previous_minute:
|
||||
log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}")
|
||||
log.info(f"{file_type}: {created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}")
|
||||
previous_minute = current_minute
|
||||
count_minute = 0
|
||||
if output_handle is not None:
|
||||
output_handle.close()
|
||||
|
||||
output_path = os.path.join(args.output, file_type, created_utc.strftime('%y-%m-%d'))
|
||||
output_path = os.path.join(output_file, file_type, created_utc.strftime('%y-%m-%d'))
|
||||
if not os.path.exists(output_path):
|
||||
os.makedirs(output_path)
|
||||
output_path = os.path.join(output_path, f"{('RC' if file_type == 'comments' else 'RS')}_{created_utc.strftime('%y-%m-%d_%H-%M')}.zst")
|
||||
|
@ -51,6 +51,18 @@ if __name__ == "__main__":
|
|||
output_handle.write(json.dumps(obj, sort_keys=True).encode('utf-8'))
|
||||
output_handle.write(NEWLINE_ENCODED)
|
||||
|
||||
log.info(f"{created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}")
|
||||
if created_utc is None:
|
||||
log.error(f"{file_type}: {input_file} appears to be empty")
|
||||
sys.exit(1)
|
||||
log.info(f"{file_type}: {created_utc.strftime('%y-%m-%d_%H-%M')}: {count_objects:,} : {count_minute: ,}")
|
||||
if output_handle is not None:
|
||||
output_handle.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Take a zst_blocks file and split it by minute chunks")
|
||||
parser.add_argument('--input', help='Input file', required=True)
|
||||
parser.add_argument('--output', help='Output folder', required=True)
|
||||
args = parser.parse_args()
|
||||
|
||||
split_by_minutes(args.input, args.output)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue