mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-23 14:50:35 -04:00
Move streamer files in
This commit is contained in:
parent
4f1d70d34a
commit
de209b338a
8 changed files with 1112 additions and 0 deletions
282
personal/combine/classes.py
Normal file
282
personal/combine/classes.py
Normal file
|
@ -0,0 +1,282 @@
|
|||
from datetime import datetime
|
||||
import os
|
||||
import discord_logging
|
||||
import sys
|
||||
import zstandard
|
||||
import json
|
||||
from enum import Enum
|
||||
from sortedcontainers import SortedList
|
||||
from collections import defaultdict
|
||||
|
||||
log = discord_logging.get_logger()
|
||||
|
||||
import utils
|
||||
|
||||
NEWLINE_ENCODED = "\n".encode('utf-8')
|
||||
|
||||
|
||||
class ApiRequest:
|
||||
def __init__(self, ids, is_submission, ingest_name, estimated_datetime=None, missing_expected=False):
|
||||
self.ids = ids
|
||||
self.is_submission = is_submission
|
||||
self.ingest_name = ingest_name
|
||||
self.estimated_datetime = estimated_datetime
|
||||
self.missing_expected = missing_expected
|
||||
self.results = None
|
||||
self.complete = False
|
||||
self.tries = 0
|
||||
self.prev_lengths = []
|
||||
|
||||
def should_retry(self):
|
||||
if self.complete:
|
||||
return False # the request is complete, no need to retry
|
||||
if len(self.prev_lengths) <= 1:
|
||||
return True # we've only made one attempt and it didn't work, do retry
|
||||
if self.prev_lengths[-1] == 0:
|
||||
if len(self.prev_lengths) < (10 if self.missing_expected else 100):
|
||||
return True # the most recent result was 0 objects, retry up to 100 times
|
||||
else:
|
||||
log.info(f"Force finished request with retries: {self}")
|
||||
self.complete = True
|
||||
return False
|
||||
if self.prev_lengths[-1] == self.prev_lengths[-2]:
|
||||
if self.missing_expected:
|
||||
self.complete = True
|
||||
return False # the latest two requests were the same and we're expecting missing objects, mark as complete
|
||||
elif len(self.prev_lengths) >= 4 and \
|
||||
self.prev_lengths[-1] == self.prev_lengths[-3] and \
|
||||
self.prev_lengths[-1] == self.prev_lengths[-4]:
|
||||
log.info(f"Force finished request with retries: {self}")
|
||||
self.complete = True
|
||||
return False # the latest four requests were the same, go ahead and mark as complete
|
||||
return True # recent requests didn't match, and weren't 0, go ahead and retry
|
||||
|
||||
def get_body_key(self):
|
||||
return "self_text" if self.is_submission else "body"
|
||||
|
||||
def get_string_type(self):
|
||||
return "submission" if self.is_submission else "comment"
|
||||
|
||||
def get_prefix(self):
|
||||
return "t3_" if self.is_submission else "t1_"
|
||||
|
||||
def set_results(self, results):
|
||||
self.prev_lengths.append(len(results))
|
||||
self.results = []
|
||||
current_timestamp = int(datetime.utcnow().timestamp())
|
||||
for result in results:
|
||||
obj = result['data']
|
||||
if 'body_html' in obj:
|
||||
del obj['body_html']
|
||||
if 'selftext_html' in obj:
|
||||
del obj['selftext_html']
|
||||
obj['retrieved_on'] = current_timestamp
|
||||
self.results.append(obj)
|
||||
log.debug(f"Set result: {self}")
|
||||
|
||||
def id_string(self):
|
||||
return f"{self.get_prefix()}{(f',{self.get_prefix()}'.join(self.ids))}"
|
||||
|
||||
def __str__(self):
|
||||
return \
|
||||
f"{self.ingest_name}: {self.ids[0]}-{self.ids[-1]} {self.get_string_type()}: " \
|
||||
f"{len(self.results) if self.results else self.results} : {self.tries} : " \
|
||||
f"{self.complete} : {','.join([str(val) for val in self.prev_lengths])}"
|
||||
|
||||
def __gt__(self, other):
|
||||
if isinstance(other, ApiRequest):
|
||||
return False
|
||||
return True
|
||||
|
||||
def __lt__(self, other):
|
||||
if isinstance(other, ApiRequest):
|
||||
return True
|
||||
return False
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, ApiRequest):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class Queue:
|
||||
def __init__(self, max_size):
|
||||
self.list = []
|
||||
self.max_size = max_size
|
||||
|
||||
def put(self, item):
|
||||
if len(self.list) >= self.max_size:
|
||||
self.list.pop(0)
|
||||
self.list.append(item)
|
||||
|
||||
def peek(self):
|
||||
return self.list[0] if len(self.list) > 0 else None
|
||||
|
||||
|
||||
class OutputHandle:
|
||||
def __init__(self, is_submission, dump_folder):
|
||||
self.handle = None
|
||||
self.current_path = None
|
||||
self.current_minute = None
|
||||
self.is_submission = is_submission
|
||||
self.dump_folder = dump_folder
|
||||
|
||||
if not os.path.exists(dump_folder):
|
||||
os.makedirs(dump_folder)
|
||||
|
||||
def matched_minute(self, new_date_time):
|
||||
return self.current_minute is not None and new_date_time.minute == self.current_minute
|
||||
|
||||
def get_path(self, date_folder, export_filename, increment=None):
|
||||
folder = f"{self.dump_folder}{os.path.sep}{date_folder}"
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
|
||||
bldr = [folder]
|
||||
bldr.append(os.path.sep)
|
||||
if self.is_submission:
|
||||
bldr.append("RS_")
|
||||
else:
|
||||
bldr.append("RC_")
|
||||
bldr.append(export_filename)
|
||||
if increment is not None:
|
||||
bldr.append("_")
|
||||
bldr.append(str(increment))
|
||||
bldr.append(".zst")
|
||||
|
||||
return ''.join(bldr)
|
||||
|
||||
def rollover_to_minute(self, date_time):
|
||||
if self.handle is not None:
|
||||
self.handle.close()
|
||||
os.rename(self.current_path + ".tmp", self.current_path)
|
||||
date_folder = date_time.strftime('%y-%m-%d')
|
||||
export_filename = date_time.strftime('%y-%m-%d_%H-%M')
|
||||
export_path = self.get_path(date_folder, export_filename)
|
||||
if os.path.exists(export_path + ".tmp"):
|
||||
os.rename(export_path + ".tmp", export_path)
|
||||
i = 0
|
||||
while os.path.exists(export_path):
|
||||
log.info(f"Dump exists, incrementing: {export_path}")
|
||||
i += 1
|
||||
export_path = self.get_path(date_folder, export_filename, i)
|
||||
if i > 100:
|
||||
log.warning(f"Something went wrong, more than 100 dumps for minute, aborting")
|
||||
sys.exit(3)
|
||||
self.current_path = export_path
|
||||
self.handle = zstandard.ZstdCompressor().stream_writer(open(export_path + ".tmp", 'wb'))
|
||||
self.current_minute = date_time.minute
|
||||
|
||||
def write_object(self, obj):
|
||||
self.handle.write(json.dumps(obj, sort_keys=True).encode('utf-8'))
|
||||
self.handle.write(NEWLINE_ENCODED)
|
||||
|
||||
def flush(self):
|
||||
self.handle.flush()
|
||||
|
||||
def close(self):
|
||||
if self.handle is not None:
|
||||
self.handle.close()
|
||||
|
||||
|
||||
class IngestType(Enum):
|
||||
INGEST = 1
|
||||
RESCAN = 2
|
||||
DOWNLOAD = 3
|
||||
PUSHSHIFT = 4
|
||||
BACKFILL = 5
|
||||
|
||||
|
||||
class ObjectDict:
|
||||
def __init__(self, min_datetime, max_datetime, obj_type):
|
||||
self.min_datetime = min_datetime
|
||||
self.max_datetime = max_datetime
|
||||
self.obj_type = obj_type
|
||||
|
||||
self.counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
|
||||
self.min_id = None
|
||||
self.max_id = None
|
||||
|
||||
self.by_id = {}
|
||||
self.by_minute = defaultdict(ObjectMinuteList)
|
||||
|
||||
def contains_id(self, str_id):
|
||||
return str_id in self.by_id
|
||||
|
||||
def delete_object_id(self, str_id):
|
||||
del self.by_id[str_id]
|
||||
|
||||
def delete_objects_below_minute(self, delete_below_minute):
|
||||
for minute, minute_list in self.by_minute.items():
|
||||
if minute < delete_below_minute:
|
||||
for obj in minute_list.obj_list:
|
||||
self.delete_object_id(obj['id'])
|
||||
|
||||
def rebuild_minute_dict(self):
|
||||
self.by_minute = defaultdict(ObjectMinuteList)
|
||||
for obj in self.by_id.values():
|
||||
created_minute = datetime.utcfromtimestamp(obj["created_utc"]).replace(second=0, microsecond=0)
|
||||
self.by_minute[created_minute].add(obj)
|
||||
|
||||
def count_minutes(self):
|
||||
return len(self.by_minute)
|
||||
|
||||
@staticmethod
|
||||
def get_counts_string_from_dict(counts_dict, ingest_types):
|
||||
bldr = []
|
||||
for ingest_type in ingest_types:
|
||||
if ingest_type in counts_dict:
|
||||
bldr.append(f"{counts_dict[ingest_type][True]}({counts_dict[ingest_type][False]})")
|
||||
else:
|
||||
bldr.append("0(0)")
|
||||
return "|".join(bldr)
|
||||
|
||||
def get_counts_string_by_minute(self, minute, ingest_types):
|
||||
return ObjectDict.get_counts_string_from_dict(self.counts[minute], ingest_types)
|
||||
|
||||
def get_counts_string(self):
|
||||
sum_dict = defaultdict(lambda: defaultdict(int))
|
||||
for counts_dict in self.counts.values():
|
||||
for ingest_type in IngestType:
|
||||
if ingest_type in counts_dict:
|
||||
sum_dict[ingest_type][True] += counts_dict[ingest_type][True]
|
||||
sum_dict[ingest_type][False] += counts_dict[ingest_type][False]
|
||||
return ObjectDict.get_counts_string_from_dict(sum_dict, IngestType)
|
||||
|
||||
def get_missing_ids_by_minutes(self, start_minute, end_minute):
|
||||
start_id = self.by_minute[start_minute].min_id
|
||||
end_id = self.by_minute[end_minute].max_id
|
||||
missing_ids = []
|
||||
for int_id in range(start_id, end_id + 1):
|
||||
string_id = utils.base36encode(int_id)
|
||||
if not self.contains_id(string_id):
|
||||
missing_ids.append(string_id)
|
||||
return missing_ids
|
||||
|
||||
def add_object(self, obj, ingest_type):
|
||||
created_utc = datetime.utcfromtimestamp(obj["created_utc"])
|
||||
created_minute = created_utc.replace(second=0, microsecond=0)
|
||||
if obj['id'] in self.by_id:
|
||||
existing_obj = self.by_id[obj['id']]
|
||||
unmatched_field = utils.merge_fields(existing_obj, obj, self.obj_type)
|
||||
self.counts[created_minute][ingest_type][False] += 1
|
||||
return unmatched_field
|
||||
if created_utc < self.min_datetime or created_utc > self.max_datetime:
|
||||
return False
|
||||
unmatched_field = utils.parse_fields(obj, self.obj_type)
|
||||
self.by_id[obj['id']] = obj
|
||||
self.by_minute[created_minute].add(obj)
|
||||
self.counts[created_minute][ingest_type][True] += 1
|
||||
self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id)
|
||||
return unmatched_field
|
||||
|
||||
|
||||
class ObjectMinuteList:
|
||||
def __init__(self):
|
||||
self.obj_list = SortedList(key=lambda x: f"{x['created_utc']}:{x['id']}")
|
||||
self.min_id = None
|
||||
self.max_id = None
|
||||
|
||||
def add(self, obj):
|
||||
self.min_id, self.max_id = utils.merge_lowest_highest_id(obj['id'], self.min_id, self.max_id)
|
||||
self.obj_list.add(obj)
|
Loading…
Add table
Add a link
Reference in a new issue