mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-28 00:55:17 -04:00
Bit of other work
This commit is contained in:
parent
2bae2a38d2
commit
c7aa694631
7 changed files with 483 additions and 0 deletions
50
personal/copy_listed_files.py
Normal file
50
personal/copy_listed_files.py
Normal file
|
@ -0,0 +1,50 @@
|
|||
import shutil
|
||||
import os
|
||||
import logging.handlers
|
||||
import re
|
||||
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.addHandler(logging.StreamHandler())
|
||||
|
||||
if __name__ == '__main__':
|
||||
input_folder = r"\\MYCLOUDPR4100\Public\pushshift_output"
|
||||
output_folder = r"\\MYCLOUDPR4100\Public\request"
|
||||
subs = ['PoliticalDiscussion', 'worldnews', 'science']
|
||||
overwrite = False
|
||||
|
||||
lower_subs = set()
|
||||
for sub in subs:
|
||||
lower_subs.add(sub.lower())
|
||||
|
||||
matched_subs = set()
|
||||
total_size = 0
|
||||
for file_name in os.listdir(input_folder):
|
||||
file_path = os.path.join(input_folder, file_name)
|
||||
if file_name.endswith(".zst") and os.path.isfile(file_path):
|
||||
match = re.match(r"(\w+)(?:_(?:comments|submissions).zst)", file_name)
|
||||
if match:
|
||||
sub_cased = match.group(1)
|
||||
if sub_cased.lower() in lower_subs:
|
||||
matched_subs.add(sub_cased)
|
||||
file_size = os.stat(file_path).st_size
|
||||
total_size += file_size
|
||||
log.info(f"Copying {file_name} : {(file_size / (2**20)):,.0f} mb : {(total_size / (2**20)):,.0f} mb")
|
||||
output_path = os.path.join(output_folder, file_name)
|
||||
if overwrite or not os.path.exists(output_path):
|
||||
shutil.copy(file_path, output_path)
|
||||
|
||||
log.info(f"Copied {len(matched_subs)}/{len(subs)} subs of total size {(total_size / (2**20)):,.0f} mb")
|
||||
if len(matched_subs) != len(lower_subs):
|
||||
lower_matched_subs = [sub.lower() for sub in matched_subs]
|
||||
for sub in lower_subs:
|
||||
if sub not in lower_matched_subs:
|
||||
log.info(f"Missing r/{sub}")
|
||||
|
||||
sorted_case_subs = sorted(matched_subs)
|
||||
bldr = ['torrenttools create -a "https://academictorrents.com/announce.php" -c "Comments and submissions from r/']
|
||||
bldr.append(', r/'.join(sorted_case_subs))
|
||||
bldr.append(' through the end of 2022" --include ".*')
|
||||
bldr.append('.*zst" --include ".*'.join(sorted_case_subs))
|
||||
bldr.append('.*zst" -o username.torrent reddit')
|
||||
log.info(''.join(bldr))
|
20
personal/download_pictures.py
Normal file
20
personal/download_pictures.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
from urllib.parse import urlparse
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
if __name__ == "__main__":
|
||||
domains = defaultdict(list)
|
||||
lines = 0
|
||||
for submission in utils.read_obj_zst(r"\\MYCLOUDPR4100\Public\guessmybf_submissions.zst"):
|
||||
if submission['is_self']:
|
||||
continue
|
||||
|
||||
domain = urlparse(submission['url']).netloc
|
||||
domains[domain].append(submission['url'])
|
||||
lines += 1
|
||||
|
||||
log.info(f"{lines}")
|
15
personal/objects_per_month.py
Normal file
15
personal/objects_per_month.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
import os
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
input_folder = r"\\MYCLOUDPR4100\Public\pushshift_counts_summed"
|
||||
for subdir, dirs, files in os.walk(input_folder):
|
||||
for file_name in files:
|
||||
items = 0
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
with open(input_path, 'r') as input_handle:
|
||||
for line in input_handle:
|
||||
subreddit, count = line.strip().split("\t")
|
||||
items += int(count)
|
||||
print(f"{file_name} {items}")
|
62
personal/recompress_folder.py
Normal file
62
personal/recompress_folder.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import argparse
|
||||
import zstandard
|
||||
import os
|
||||
import logging.handlers
|
||||
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.INFO)
|
||||
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
|
||||
log_str_handler = logging.StreamHandler()
|
||||
log_str_handler.setFormatter(log_formatter)
|
||||
log.addHandler(log_str_handler)
|
||||
if not os.path.exists("logs"):
|
||||
os.makedirs("logs")
|
||||
log_file_handler = logging.handlers.RotatingFileHandler(os.path.join("logs", "bot.log"), maxBytes=1024*1024*16, backupCount=5)
|
||||
log_file_handler.setFormatter(log_formatter)
|
||||
log.addHandler(log_file_handler)
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description="Take all the zst files in the input folder, extract them and compress them again at the ratio specified")
|
||||
parser.add_argument("input", help="The input folder to read files from")
|
||||
parser.add_argument("output", help="The output folder to write files to")
|
||||
parser.add_argument("--level", help="The compression ratio to output at", default="3")
|
||||
args = parser.parse_args()
|
||||
|
||||
log.info(f"Reading all files from {args.input}")
|
||||
|
||||
files = []
|
||||
total_size = 0
|
||||
for file_name in os.listdir(args.input):
|
||||
file_path = os.path.join(args.input, file_name)
|
||||
if file_name.endswith(".zst") and os.path.isfile(file_path):
|
||||
file_size = os.stat(file_path).st_size
|
||||
total_size += file_size
|
||||
files.append((file_name, file_size))
|
||||
if len(files) % 1000 == 0:
|
||||
log.info(f"Loaded {len(files)} files")
|
||||
log.info(f"Loaded {len(files)} files of total size {total_size:,}")
|
||||
|
||||
level = int(args.level)
|
||||
log.info(f"Writing files out to {args.output} at ratio {level}")
|
||||
if not os.path.exists(args.output):
|
||||
os.makedirs(args.output)
|
||||
|
||||
compressed_bytes_read = 0
|
||||
uncompressed_bytes_read = 0
|
||||
bytes_written = 0
|
||||
files_read = 0
|
||||
|
||||
decompressor = zstandard.ZstdDecompressor(max_window_size=2**31)
|
||||
compressor = zstandard.ZstdCompressor(level=level, threads=-1)
|
||||
for file_name, file_size in files:
|
||||
input_path = os.path.join(args.input, file_name)
|
||||
output_path = os.path.join(args.output, file_name)
|
||||
with open(input_path, 'rb') as input_handle, open(output_path, "wb") as output_handle:
|
||||
compression_reader = decompressor.stream_reader(input_handle)
|
||||
read_count, write_count = compressor.copy_stream(compression_reader, output_handle)
|
||||
|
||||
compressed_bytes_read += file_size
|
||||
uncompressed_bytes_read += read_count
|
||||
bytes_written += write_count
|
||||
files_read += 1
|
||||
log.info(f"{files_read:,}/{len(files):,} : {(compressed_bytes_read / (2**30)):.2f} gb of {(total_size / (2**30)):.2f} gb compressed to {(bytes_written / (2**30)):.2f} gb : {bytes_written /compressed_bytes_read:.3f}")
|
24
personal/subreddits_per_month.py
Normal file
24
personal/subreddits_per_month.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
import os
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
input_folder = r"\\MYCLOUDPR4100\Public\pushshift_counts"
|
||||
output_folder = r"\\MYCLOUDPR4100\Public\pushshift_counts_summed"
|
||||
lines = 0
|
||||
for subdir, dirs, files in os.walk(input_folder):
|
||||
for file_name in files:
|
||||
subreddits = defaultdict(int)
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
output_path = os.path.join(output_folder, f"{file_name}.txt")
|
||||
print(f"{lines} : {input_path}")
|
||||
with open(input_path, 'r') as input_handle:
|
||||
for line in input_handle:
|
||||
lines += 1
|
||||
subreddits[line.strip()] += 1
|
||||
if lines % 1000000 == 0:
|
||||
print(f"{lines} : {input_path}")
|
||||
|
||||
with open(output_path, 'w') as output_handle:
|
||||
for subreddit, count in sorted(subreddits.items(), key=lambda item: item[1], reverse=True):
|
||||
output_handle.write(f"{subreddit} {count}\n")
|
37
personal/sum_subreddit_counts.py
Normal file
37
personal/sum_subreddit_counts.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
import os
|
||||
import logging.handlers
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.addHandler(logging.StreamHandler())
|
||||
|
||||
if __name__ == '__main__':
|
||||
input_folder = r"\\MYCLOUDPR4100\Public\pushshift_counts_summed"
|
||||
output_file = r"\\MYCLOUDPR4100\Public\subreddit_counts_total.txt"
|
||||
subreddits = defaultdict(int)
|
||||
|
||||
for subdir, dirs, files in os.walk(input_folder):
|
||||
for filename in files:
|
||||
log.info(f"Processing file: {filename}")
|
||||
input_path = os.path.join(subdir, filename)
|
||||
with open(input_path, 'r') as input_handle:
|
||||
line_count = 0
|
||||
for line in input_handle:
|
||||
subreddit, count_string = line.strip().split("\t")
|
||||
count = int(count_string)
|
||||
subreddits[subreddit] += count
|
||||
line_count += 1
|
||||
|
||||
log.info(f"Total subreddits: {len(subreddits):,}")
|
||||
|
||||
count_written = 0
|
||||
with open(output_file, 'w') as output_handle:
|
||||
for subreddit, count in sorted(subreddits.items(), key=lambda item: item[1], reverse=True):
|
||||
output_handle.write(f"{subreddit} {count}\n")
|
||||
count_written += 1
|
||||
if count_written % 1000000 == 0:
|
||||
log.info(f"Written: {count_written:,}/{len(subreddits):,}")
|
||||
|
||||
log.info(f"Written: {count_written:,}/{len(subreddits):,}")
|
275
personal/test_files_multiprocess.py
Normal file
275
personal/test_files_multiprocess.py
Normal file
|
@ -0,0 +1,275 @@
|
|||
import zstandard
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import argparse
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
import logging.handlers
|
||||
import multiprocessing
|
||||
|
||||
|
||||
# sets up logging to the console as well as a file
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.INFO)
|
||||
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
|
||||
|
||||
log_stderr_handler = logging.StreamHandler()
|
||||
log_stderr_handler.setFormatter(log_formatter)
|
||||
log.addHandler(log_stderr_handler)
|
||||
if not os.path.exists("logs"):
|
||||
os.makedirs("logs")
|
||||
log_file_handler = logging.handlers.RotatingFileHandler(
|
||||
os.path.join("logs", "bot.log"), maxBytes=1024*1024*16, backupCount=5)
|
||||
log_file_handler.setFormatter(log_formatter)
|
||||
log.addHandler(log_file_handler)
|
||||
|
||||
|
||||
# convenience object used to pass status information between processes
|
||||
class FileConfig:
|
||||
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0):
|
||||
self.input_path = input_path
|
||||
self.output_path = output_path
|
||||
self.file_size = os.stat(input_path).st_size
|
||||
self.complete = complete
|
||||
self.bytes_processed = self.file_size if complete else 0
|
||||
self.lines_processed = lines_processed if complete else 0
|
||||
self.error_message = None
|
||||
self.error_lines = error_lines
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.input_path} : {self.output_path} : {self.file_size} : {self.complete} : {self.bytes_processed} : {self.lines_processed}"
|
||||
|
||||
|
||||
# used for calculating running average of read speed
|
||||
class Queue:
|
||||
def __init__(self, max_size):
|
||||
self.list = []
|
||||
self.max_size = max_size
|
||||
|
||||
def put(self, item):
|
||||
if len(self.list) >= self.max_size:
|
||||
self.list.pop(0)
|
||||
self.list.append(item)
|
||||
|
||||
def peek(self):
|
||||
return self.list[0] if len(self.list) > 0 else None
|
||||
|
||||
|
||||
# save file information and progress to a json file
|
||||
# we don't want to save the whole FileConfig object, since some info resets if we restart
|
||||
def save_file_list(input_files, status_json, script_type):
|
||||
simple_file_list = []
|
||||
for file in input_files:
|
||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
||||
with open(status_json, 'w') as status_json_file:
|
||||
output_dict = {
|
||||
"files": simple_file_list,
|
||||
"type": script_type,
|
||||
}
|
||||
status_json_file.write(json.dumps(output_dict, indent=4))
|
||||
|
||||
|
||||
# load file information from the json file and recalculate file sizes
|
||||
def load_file_list(status_json):
|
||||
if os.path.exists(status_json):
|
||||
with open(status_json, 'r') as status_json_file:
|
||||
output_dict = json.load(status_json_file)
|
||||
input_files = []
|
||||
for simple_file in output_dict["files"]:
|
||||
input_files.append(
|
||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
||||
)
|
||||
return input_files, output_dict["type"]
|
||||
else:
|
||||
return None, None
|
||||
|
||||
|
||||
# recursively decompress and decode a chunk of bytes. If there's a decode error then read another chunk and try with that, up to a limit of max_window_size bytes
|
||||
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
|
||||
chunk = reader.read(chunk_size)
|
||||
bytes_read += chunk_size
|
||||
if previous_chunk is not None:
|
||||
chunk = previous_chunk + chunk
|
||||
try:
|
||||
return chunk.decode()
|
||||
except UnicodeDecodeError:
|
||||
if bytes_read > max_window_size:
|
||||
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
|
||||
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
|
||||
|
||||
|
||||
# open a zst compressed ndjson file and yield lines one at a time
|
||||
# also passes back file progress
|
||||
def read_lines_zst(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
yield line, file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
reader.close()
|
||||
|
||||
|
||||
def process_file(file, queue):
|
||||
try:
|
||||
for line, file_bytes_processed in read_lines_zst(file.input_path):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
observed = obj["created_utc"]
|
||||
# just load the json and try to access a field to make sure it works
|
||||
except (KeyError, json.JSONDecodeError) as err:
|
||||
file.error_lines += 1
|
||||
file.lines_processed += 1
|
||||
if file.lines_processed % 1000000 == 0:
|
||||
file.bytes_processed = file_bytes_processed
|
||||
queue.put(file)
|
||||
|
||||
file.complete = True
|
||||
file.bytes_processed = file.file_size
|
||||
except Exception as err:
|
||||
file.error_message = str(err)
|
||||
queue.put(file)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files")
|
||||
parser.add_argument("input", help="The input folder to recursively read files from")
|
||||
parser.add_argument("--processes", help="Number of processes to use", default=10, type=int)
|
||||
parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False)
|
||||
script_type = "test"
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.debug:
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
log.info(f"Loading files from: {args.input}")
|
||||
|
||||
multiprocessing.set_start_method('spawn')
|
||||
queue = multiprocessing.Manager().Queue()
|
||||
status_json = "status.json"
|
||||
input_files, saved_type = load_file_list(status_json)
|
||||
|
||||
if saved_type and saved_type != script_type:
|
||||
log.warning(f"Script type doesn't match type from json file. Delete working folder")
|
||||
sys.exit(0)
|
||||
|
||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||
if input_files is None:
|
||||
input_files = []
|
||||
for subdir, dirs, files in os.walk(args.input):
|
||||
files.sort()
|
||||
for file_name in files:
|
||||
if file_name.endswith(".zst"):
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
input_files.append(FileConfig(input_path))
|
||||
|
||||
save_file_list(input_files, status_json, script_type)
|
||||
else:
|
||||
log.info(f"Existing input file was read, if this is not correct you should delete the {status_json} folder and run this script again")
|
||||
|
||||
files_processed = 0
|
||||
total_bytes = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_to_process = []
|
||||
# calculate the total file size for progress reports, build a list of incomplete files to process
|
||||
# do this largest to smallest by file size so that we aren't processing a few really big files with only a few threads at the end
|
||||
for file in sorted(input_files, key=lambda item: item.file_size, reverse=True):
|
||||
total_bytes += file.file_size
|
||||
if file.complete:
|
||||
files_processed += 1
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.file_size
|
||||
total_lines_errored += file.error_lines
|
||||
else:
|
||||
files_to_process.append(file)
|
||||
|
||||
log.info(f"Processed {files_processed} of {len(input_files)} files with {(total_bytes_processed / (2**30)):.2f} of {(total_bytes / (2**30)):.2f} gigabytes")
|
||||
|
||||
start_time = time.time()
|
||||
if len(files_to_process):
|
||||
progress_queue = Queue(40)
|
||||
progress_queue.put([start_time, total_lines_processed, total_bytes_processed])
|
||||
speed_queue = Queue(40)
|
||||
for file in files_to_process:
|
||||
log.debug(f"Processing file: {file.input_path}")
|
||||
# start the workers
|
||||
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
||||
workers = pool.starmap_async(process_file, [(file, queue) for file in files_to_process], chunksize=1, error_callback=log.info)
|
||||
while not workers.ready():
|
||||
# loop until the workers are all done, pulling in status messages as they are sent
|
||||
file_update = queue.get()
|
||||
if file_update.error_message is not None:
|
||||
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
||||
# I'm going to assume that the list of files is short enough that it's no
|
||||
# big deal to just iterate each time since that saves a bunch of work
|
||||
total_lines_processed = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_processed = 0
|
||||
files_errored = 0
|
||||
i = 0
|
||||
for file in input_files:
|
||||
if file.input_path == file_update.input_path:
|
||||
input_files[i] = file_update
|
||||
file = file_update
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.bytes_processed
|
||||
total_lines_errored += file.error_lines
|
||||
files_processed += 1 if file.complete or file.error_message is not None else 0
|
||||
files_errored += 1 if file.error_message is not None else 0
|
||||
i += 1
|
||||
if file_update.complete or file_update.error_message is not None:
|
||||
save_file_list(input_files, status_json, script_type)
|
||||
current_time = time.time()
|
||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||
|
||||
first_time, first_lines, first_bytes = progress_queue.peek()
|
||||
bytes_per_second = int((total_bytes_processed - first_bytes)/(current_time - first_time))
|
||||
speed_queue.put(bytes_per_second)
|
||||
seconds_left = int((total_bytes - total_bytes_processed) / int(sum(speed_queue.list) / len(speed_queue.list)))
|
||||
minutes_left = int(seconds_left / 60)
|
||||
hours_left = int(minutes_left / 60)
|
||||
days_left = int(hours_left / 24)
|
||||
|
||||
log.info(
|
||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining")
|
||||
|
||||
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
||||
|
||||
count_complete = 0
|
||||
count_incomplete = 0
|
||||
# build a list of output files to combine
|
||||
for file in input_files:
|
||||
if not file.complete:
|
||||
if file.error_message is not None:
|
||||
log.info(f"File {file.input_path} errored {file.error_message}")
|
||||
else:
|
||||
log.info(f"File {file.input_path} is not marked as complete")
|
||||
count_incomplete += 1
|
||||
else:
|
||||
if file.error_lines > 0:
|
||||
log.info(f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}")
|
||||
count_incomplete += 1
|
||||
else:
|
||||
count_complete += 1
|
||||
|
||||
if count_incomplete > 0:
|
||||
log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting")
|
||||
else:
|
||||
log.info(f"Processing complete, {count_complete} successful files")
|
Loading…
Add table
Add a link
Reference in a new issue