Re-arrange a bit

This commit is contained in:
Watchful1 2023-08-26 17:09:50 -07:00
parent de209b338a
commit 4f56c141fd
4 changed files with 378 additions and 340 deletions

View file

@ -1,12 +1,12 @@
import sys
import requests
import time
import discord_logging
import argparse
import os
import re
import zstandard
from datetime import datetime, timedelta
from collections import defaultdict
import json
import praw
from praw import endpoints
@ -16,13 +16,35 @@ log = discord_logging.init_logging(debug=False)
import utils
import classes
from classes import IngestType
from merge import ObjectType
NEWLINE_ENCODED = "\n".encode('utf-8')
reg = re.compile(r"\d\d-\d\d-\d\d_\d\d-\d\d")
def query_pushshift(ids, bearer, object_type):
object_name = "comment" if object_type == ObjectType.COMMENT else "submission"
url = f"https://api.pushshift.io/reddit/{object_name}/search?limit=1000&ids={','.join(ids)}"
log.debug(f"pushshift query: {url}")
response = None
for i in range(4):
response = requests.get(url, headers={
'User-Agent': "In script by /u/Watchful1",
'Authorization': f"Bearer {bearer}"})
if response.status_code == 200:
break
if response.status_code == 403:
log.warning(f"Pushshift unauthorized, aborting")
sys.exit(2)
time.sleep(2)
if response.status_code != 200:
log.warning(f"4 requests failed with status code {response.status_code}")
return response.json()['data']
def build_day(day_to_process, input_folders, output_folder, object_type, reddit, pushshift_token):
file_type = "comments" if object_type == utils.ObjectType.COMMENT else "submissions"
file_type = "comments" if object_type == ObjectType.COMMENT else "submissions"
file_minutes = {}
minute_iterator = day_to_process - timedelta(minutes=2)
@ -72,7 +94,7 @@ def build_day(day_to_process, input_folders, output_folder, object_type, reddit,
f"{working_highest_minute.strftime('%y-%m-%d_%H-%M')} with {len(missing_ids)} ids")
for chunk in utils.chunk_list(missing_ids, 50):
pushshift_objects = utils.query_pushshift(chunk, pushshift_token, object_type)
pushshift_objects = query_pushshift(chunk, pushshift_token, object_type)
for pushshift_object in pushshift_objects:
if objects.add_object(pushshift_object, IngestType.PUSHSHIFT):
unmatched_field = True
@ -137,9 +159,9 @@ if __name__ == "__main__":
object_type = None
if args.type == "comments":
object_type = utils.ObjectType.COMMENT
object_type = ObjectType.COMMENT
elif args.type == "submissions":
object_type = utils.ObjectType.SUBMISSION
object_type = ObjectType.SUBMISSION
else:
log.error(f"Invalid type: {args.type}")
sys.exit(2)
@ -160,6 +182,3 @@ if __name__ == "__main__":
start_date = start_date + timedelta(days=1)
#log.info(f"{len(file_minutes)} : {count_ingest_minutes} : {count_rescan_minutes} : {day_highest_id - day_lowest_id:,} - {count_objects:,} = {(day_highest_id - day_lowest_id) - count_objects:,}: {utils.base36encode(day_lowest_id)}-{utils.base36encode(day_highest_id)}")
# minute_iterator = datetime.strptime("23-07-10 23:30", '%y-%m-%d %H:%M')
# working_lowest_minute = datetime.strptime("23-07-10 23:30", '%y-%m-%d %H:%M')

View file

@ -1,85 +1,10 @@
import re
import sys
from enum import Enum
import discord_logging
import zstandard
import json
from datetime import datetime
import requests
import time
import discord_logging
import counters
log = discord_logging.get_logger(init=True)
def parse_ingest_string(ingest_string):
ingest_ids = []
for char in ingest_string:
ingest_ids.append(char)
return ingest_ids
def read_obj_zst(file_name):
with open(file_name, 'rb') as file_handle:
buffer = ''
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
while True:
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
if not chunk:
break
lines = (buffer + chunk).split("\n")
for line in lines[:-1]:
if line == "":
continue
yield json.loads(line.strip())
buffer = lines[-1]
reader.close()
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
chunk = reader.read(chunk_size)
bytes_read += chunk_size
if previous_chunk is not None:
chunk = previous_chunk + chunk
try:
return chunk.decode()
except UnicodeDecodeError:
if bytes_read > max_window_size:
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
def base36encode(integer: int) -> str:
chars = '0123456789abcdefghijklmnopqrstuvwxyz'
sign = '-' if integer < 0 else ''
integer = abs(integer)
result = ''
while integer > 0:
integer, remainder = divmod(integer, 36)
result = chars[remainder] + result
return sign + result
def base36decode(base36: str) -> int:
return int(base36, 36)
def next_string_id(string_id):
return base36encode(base36decode(string_id) + 1)
def get_next_hundred_ids(start_id):
start_num = base36decode(start_id)
ids = []
id_num = -1
for id_num in range(start_num, start_num + 100):
ids.append(base36encode(id_num))
return ids, base36encode(id_num)
log = discord_logging.get_logger()
class FieldAction(Enum):
@ -496,54 +421,3 @@ def parse_fields(new_obj, obj_type):
new_obj['retrieved_on'] = int(datetime.utcnow().timestamp())
return unmatched_field
def merge_lowest_highest_id(str_id, lowest_id, highest_id):
int_id = base36decode(str_id)
if lowest_id is None or int_id < lowest_id:
lowest_id = int_id
if highest_id is None or int_id > highest_id:
highest_id = int_id
return lowest_id, highest_id
async def record_rate_limits(reddit, client):
reddit_user = await reddit.user.me()
remaining = int(reddit._core._rate_limiter.remaining)
used = int(reddit._core._rate_limiter.used)
counters.rate_requests_remaining.labels(username=reddit_user.name, client=client).set(remaining)
counters.rate_requests_used.labels(username=reddit_user.name, client=client).set(used)
reset_timestamp = reddit._core._rate_limiter.reset_timestamp
seconds_to_reset = (datetime.utcfromtimestamp(reset_timestamp) - datetime.utcnow()).total_seconds()
counters.rate_seconds_remaining.labels(username=reddit_user.name, client=client).set(int(seconds_to_reset))
window_size = int(reddit._core._rate_limiter.window_size) if reddit._core._rate_limiter.window_size is not None else reddit._core._rate_limiter.window_size
time_to_next_request = max((datetime.utcnow() - datetime.utcfromtimestamp(reddit._core._rate_limiter.next_request_timestamp)).total_seconds(), 0)
#log.info(f"Rate: u/{reddit_user.name}: {window_size} : {remaining} : {used} : {seconds_to_reset:.2f} : {time_to_next_request:.3f} ")
return
def chunk_list(items, chunk_size):
for i in range(0, len(items), chunk_size):
yield items[i:i + chunk_size]
def query_pushshift(ids, bearer, object_type):
object_name = "comment" if object_type == ObjectType.COMMENT else "submission"
url = f"https://api.pushshift.io/reddit/{object_name}/search?limit=1000&ids={','.join(ids)}"
log.debug(f"pushshift query: {url}")
response = None
for i in range(4):
response = requests.get(url, headers={
'User-Agent': "In script by /u/Watchful1",
'Authorization': f"Bearer {bearer}"})
if response.status_code == 200:
break
if response.status_code == 403:
log.warning(f"Pushshift unauthorized, aborting")
sys.exit(2)
time.sleep(2)
if response.status_code != 200:
log.warning(f"4 requests failed with status code {response.status_code}")
return response.json()['data']

View file

@ -7,13 +7,14 @@ def read_obj_zst(file_name):
buffer = ''
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
while True:
chunk = reader.read(2**27).decode()
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
if not chunk:
break
lines = (buffer + chunk).split("\n")
for line in lines[:-1]:
yield json.loads(line)
if line == "":
continue
yield json.loads(line.strip())
buffer = lines[-1]
reader.close()
@ -71,3 +72,32 @@ class OutputZst:
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
return True
def base36encode(integer: int) -> str:
chars = '0123456789abcdefghijklmnopqrstuvwxyz'
sign = '-' if integer < 0 else ''
integer = abs(integer)
result = ''
while integer > 0:
integer, remainder = divmod(integer, 36)
result = chars[remainder] + result
return sign + result
def base36decode(base36: str) -> int:
return int(base36, 36)
def merge_lowest_highest_id(str_id, lowest_id, highest_id):
int_id = base36decode(str_id)
if lowest_id is None or int_id < lowest_id:
lowest_id = int_id
if highest_id is None or int_id > highest_id:
highest_id = int_id
return lowest_id, highest_id
def chunk_list(items, chunk_size):
for i in range(0, len(items), chunk_size):
yield items[i:i + chunk_size]