PushshiftDumps/personal/ingest/download_images.py
2025-07-23 19:26:45 -07:00

122 lines
3.8 KiB
Python

import zstandard
import os
import json
import sys
import csv
from datetime import datetime
import logging.handlers
import traceback
import praw
from praw import endpoints
import prawcore
import time
# put the path to the input file
input_file = r"\\MYCLOUDPR4100\Public\wallstreetbets_gainloss_rehydrate.zst"
# put the name or path to the output file. The file extension from below will be added automatically. If the input file is a folder, the output will be treated as a folder as well
output_folder = r"\\MYCLOUDPR4100\Public\wallstreetbets_gainloss_images"
# sets up logging to the console as well as a file
log = logging.getLogger("bot")
log.setLevel(logging.INFO)
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
log_str_handler = logging.StreamHandler()
log_str_handler.setFormatter(log_formatter)
log.addHandler(log_str_handler)
if not os.path.exists("../combine/logs"):
os.makedirs("../combine/logs")
log_file_handler = logging.handlers.RotatingFileHandler(os.path.join(
"../combine/logs", "bot.log"), maxBytes=1024 * 1024 * 16, backupCount=5)
log_file_handler.setFormatter(log_formatter)
log.addHandler(log_file_handler)
def query_reddit(ids, reddit, is_submission):
id_prefix = 't3_' if is_submission else 't1_'
id_string = f"{id_prefix}{(f',{id_prefix}'.join(ids))}"
response = None
for i in range(20):
try:
response = reddit.request(method="GET", path=endpoints.API_PATH["info"], params={"id": id_string})
break
except (prawcore.exceptions.ServerError, prawcore.exceptions.RequestException) as err:
log.info(f"No response from reddit api for {is_submission}, sleeping {i * 5} seconds: {err} : {id_string}")
time.sleep(i * 5)
if response is None:
log.warning(f"Reddit api failed, aborting")
return []
return response['data']['children']
def write_line_zst(handle, line):
handle.write(line.encode('utf-8'))
handle.write("\n".encode('utf-8'))
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
chunk = reader.read(chunk_size)
bytes_read += chunk_size
if previous_chunk is not None:
chunk = previous_chunk + chunk
try:
return chunk.decode()
except UnicodeDecodeError:
if bytes_read > max_window_size:
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
def read_lines_zst(file_name):
with open(file_name, 'rb') as file_handle:
buffer = ''
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
while True:
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
if not chunk:
break
lines = (buffer + chunk).split("\n")
for line in lines[:-1]:
yield line.strip(), file_handle.tell()
buffer = lines[-1]
reader.close()
if __name__ == "__main__":
log.info(f"Input: {input_file} : Output: {output_folder}")
reddit = praw.Reddit("Watchful1BotTest")
file_size = os.stat(input_file).st_size
created = None
found_lines = 0
total_lines = 0
for line, file_bytes_processed in read_lines_zst(input_file):
total_lines += 1
if total_lines < 100000:
continue
obj = json.loads(line)
created = datetime.utcfromtimestamp(int(obj['created_utc']))
url = obj["url"]
if "i.redd.it" in url:
log.info(url)
elif "reddit.com/gallery" in url and "media_metadata" in obj and obj["media_metadata"] is not None:
for media in obj["media_metadata"]:
log.info(obj["media_metadata"][media]["s"]["u"])
if total_lines > 100100:
break
# if total_lines % 10000 == 0:
# log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {total_lines:,} : {found_lines:,} : {missing_lines:,} : {file_bytes_processed:,}:{(file_bytes_processed / file_size) * 100:.0f}%")
# log.info(f"Complete : {total_lines:,} : {found_lines:,} : {missing_lines:,}")