mirror of
https://github.com/sys-nyx/red-arch.git
synced 2025-05-05 08:15:28 -04:00
293 lines
11 KiB
Python
293 lines
11 KiB
Python
import os
|
|
import sys
|
|
import csv
|
|
import json
|
|
import argparse
|
|
import zstandard
|
|
import logging.handlers
|
|
from datetime import datetime
|
|
|
|
# put the path to the input file, or a folder of files to process all of
|
|
input_file = r"\\MYCLOUDPR4100\Public\reddit\subreddits23\wallstreetbets_submissions.zst"
|
|
# put the name or path to the output file. The file extension from below will be added automatically. If the input file is a folder, the output will be treated as a folder as well
|
|
output_file = r"\\MYCLOUDPR4100\Public\output"
|
|
# the format to output in, pick from the following options
|
|
# zst: same as the input, a zstandard compressed ndjson file. Can be read by the other scripts in the repo
|
|
# txt: an ndjson file, which is a text file with a separate json object on each line. Can be opened by any text editor
|
|
# csv: a comma separated value file. Can be opened by a text editor or excel
|
|
# WARNING READ THIS: if you use txt or csv output on a large input file without filtering out most of the rows, the resulting file will be extremely large. Usually about 7 times as large as the compressed input file
|
|
output_format = "csv"
|
|
# override the above format and output only this field into a text file, one per line. Useful if you want to make a list of authors or ids. See the examples below
|
|
# any field that's in the dump is supported, but useful ones are
|
|
# author: the username of the author
|
|
# id: the id of the submission or comment
|
|
# link_id: only for comments, the fullname of the submission the comment is associated with
|
|
# parent_id: only for comments, the fullname of the parent of the comment. Either another comment or the submission if it's top level
|
|
single_field = None
|
|
# the fields in the file are different depending on whether it has comments or submissions. If we're writing a csv, we need to know which fields to write.
|
|
# set this to true to write out to the log every time there's a bad line, set to false if you're expecting only some of the lines to match the key
|
|
write_bad_lines = True
|
|
|
|
# only output items between these two dates
|
|
from_date = datetime.strptime("2005-01-01", "%Y-%m-%d")
|
|
to_date = datetime.strptime("2030-12-31", "%Y-%m-%d")
|
|
|
|
# the field to filter on, the values to filter with and whether it should be an exact match
|
|
# some examples:
|
|
#
|
|
# return only objects where the author is u/watchful1 or u/spez
|
|
# field = "author"
|
|
# values = ["watchful1","spez"]
|
|
# exact_match = True
|
|
#
|
|
# return only objects where the title contains either "stonk" or "moon"
|
|
# field = "title"
|
|
# values = ["stonk","moon"]
|
|
# exact_match = False
|
|
#
|
|
# return only objects where the body contains either "stonk" or "moon". For submissions the body is in the "selftext" field, for comments it's in the "body" field
|
|
# field = "selftext"
|
|
# values = ["stonk","moon"]
|
|
# exact_match = False
|
|
#
|
|
#
|
|
# filter a submission file and then get a file with all the comments only in those submissions. This is a multi step process
|
|
# add your submission filters and set the output file name to something unique
|
|
# input_file = "redditdev_submissions.zst"
|
|
# output_file = "filtered_submissions"
|
|
# output_format = "csv"
|
|
# field = "author"
|
|
# values = ["watchful1"]
|
|
#
|
|
# run the script, this will result in a file called "filtered_submissions.csv" that contains only submissions by u/watchful1
|
|
# now we'll run the script again with the same input and filters, but set the output to single field. Be sure to change the output file to a new name, but don't change any of the other inputs
|
|
# output_file = "submission_ids"
|
|
# single_field = "id"
|
|
#
|
|
# run the script again, this will result in a file called "submission_ids.txt" that has an id on each line
|
|
# now we'll remove all the other filters and update the script to input from the comments file, and use the submission ids list we created before. And change the output name again so we don't override anything
|
|
# input_file = "redditdev_comments.zst"
|
|
# output_file = "filtered_comments"
|
|
# single_field = None # resetting this back so it's not used
|
|
# field = "link_id" # in the comment object, this is the field that contains the submission id
|
|
# values_file = "submission_ids.txt"
|
|
# exact_match = False # the link_id field has a prefix on it, so we can't do an exact match
|
|
#
|
|
# run the script one last time and now you have a file called "filtered_comments.csv" that only has comments from your submissions above
|
|
# if you want only top level comments instead of all comments, you can set field to "parent_id" instead of "link_id"
|
|
|
|
# change this to field = None if you don't want to filter by anything
|
|
field = "body"
|
|
values = ['']
|
|
# if you have a long list of values, you can put them in a file and put the filename here. If set this overrides the value list above
|
|
# if this list is very large, it could greatly slow down the process
|
|
values_file = None
|
|
exact_match = False
|
|
|
|
|
|
# sets up logging to the console as well as a file
|
|
log = logging.getLogger("bot")
|
|
log.setLevel(logging.INFO)
|
|
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
|
|
log_str_handler = logging.StreamHandler()
|
|
log_str_handler.setFormatter(log_formatter)
|
|
log.addHandler(log_str_handler)
|
|
if not os.path.exists("logs"):
|
|
os.makedirs("logs")
|
|
log_file_handler = logging.handlers.RotatingFileHandler(os.path.join("logs", "bot.log"), maxBytes=1024*1024*16, backupCount=5)
|
|
log_file_handler.setFormatter(log_formatter)
|
|
log.addHandler(log_file_handler)
|
|
|
|
|
|
def write_line_zst(handle, line):
|
|
handle.write(line.encode('utf-8'))
|
|
handle.write("\n".encode('utf-8'))
|
|
|
|
|
|
def write_line_json(handle, obj):
|
|
handle.write(json.dumps(obj))
|
|
handle.write("\n")
|
|
|
|
|
|
def write_line_single(handle, obj, field):
|
|
if field in obj:
|
|
handle.write(obj[field])
|
|
else:
|
|
log.info(f"{field} not in object {obj['id']}")
|
|
handle.write("\n")
|
|
|
|
|
|
def write_line_csv(writer, obj, is_submission):
|
|
output_list = []
|
|
output_list.append(str(obj['score']))
|
|
output_list.append(datetime.fromtimestamp(int(obj['created_utc'])).strftime("%Y-%m-%d"))
|
|
if is_submission:
|
|
output_list.append(obj['title'])
|
|
output_list.append(f"u/{obj['author']}")
|
|
if 'permalink' in obj:
|
|
output_list.append(f"https://www.reddit.com{obj['permalink']}")
|
|
else:
|
|
output_list.append(f"https://www.reddit.com/r/{obj['subreddit']}/comments/{obj['link_id'][3:]}/_/{obj['id']}")
|
|
if is_submission:
|
|
if obj['is_self']:
|
|
if 'selftext' in obj:
|
|
output_list.append(obj['selftext'])
|
|
else:
|
|
output_list.append("")
|
|
else:
|
|
output_list.append(obj['url'])
|
|
else:
|
|
output_list.append(obj['body'])
|
|
writer.writerow(output_list)
|
|
|
|
|
|
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
|
|
chunk = reader.read(chunk_size)
|
|
bytes_read += chunk_size
|
|
if previous_chunk is not None:
|
|
chunk = previous_chunk + chunk
|
|
try:
|
|
return chunk.decode()
|
|
except UnicodeDecodeError:
|
|
if bytes_read > max_window_size:
|
|
raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
|
|
log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
|
|
return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)
|
|
|
|
|
|
def read_lines_zst(file_name):
|
|
with open(file_name, 'rb') as file_handle:
|
|
buffer = ''
|
|
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
|
while True:
|
|
chunk = read_and_decode(reader, 2**27, (2**29) * 2)
|
|
|
|
if not chunk:
|
|
break
|
|
lines = (buffer + chunk).split("\n")
|
|
|
|
for line in lines[:-1]:
|
|
yield line.strip(), file_handle.tell()
|
|
|
|
buffer = lines[-1]
|
|
|
|
reader.close()
|
|
|
|
|
|
def process_file(input_file, output_file, output_format, field, values, from_date, to_date, single_field, exact_match):
|
|
output_path = f"{output_file}.{output_format}"
|
|
is_submission = "submission" in input_file
|
|
log.info(f"Input: {input_file} : Output: {output_path} : Is submission {is_submission}")
|
|
writer = None
|
|
if output_format == "zst":
|
|
handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb'))
|
|
elif output_format == "txt":
|
|
handle = open(output_path, 'w', encoding='UTF-8')
|
|
elif output_format == "csv":
|
|
handle = open(output_path, 'w', encoding='UTF-8', newline='')
|
|
writer = csv.writer(handle)
|
|
else:
|
|
log.error(f"Unsupported output format {output_format}")
|
|
sys.exit()
|
|
|
|
file_size = os.stat(input_file).st_size
|
|
created = None
|
|
matched_lines = 0
|
|
bad_lines = 0
|
|
total_lines = 0
|
|
for line, file_bytes_processed in read_lines_zst(input_file):
|
|
total_lines += 1
|
|
if total_lines % 100000 == 0:
|
|
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {total_lines:,} : {matched_lines:,} : {bad_lines:,} : {file_bytes_processed:,}:{(file_bytes_processed / file_size) * 100:.0f}%")
|
|
|
|
try:
|
|
obj = json.loads(line)
|
|
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
|
|
|
if created < from_date:
|
|
continue
|
|
if created > to_date:
|
|
continue
|
|
|
|
if field is not None:
|
|
field_value = obj[field].lower()
|
|
matched = False
|
|
for value in values:
|
|
if exact_match:
|
|
if value == field_value:
|
|
matched = True
|
|
break
|
|
else:
|
|
if value in field_value:
|
|
matched = True
|
|
break
|
|
if not matched:
|
|
continue
|
|
|
|
matched_lines += 1
|
|
if output_format == "zst":
|
|
write_line_zst(handle, line)
|
|
elif output_format == "csv":
|
|
write_line_csv(writer, obj, is_submission)
|
|
elif output_format == "txt":
|
|
if single_field is not None:
|
|
write_line_single(handle, obj, single_field)
|
|
else:
|
|
write_line_json(handle, obj)
|
|
else:
|
|
log.info(f"Something went wrong, invalid output format {output_format}")
|
|
except (KeyError, json.JSONDecodeError) as err:
|
|
bad_lines += 1
|
|
if write_bad_lines:
|
|
if isinstance(err, KeyError):
|
|
log.warning(f"Key {field} is not in the object: {err}")
|
|
elif isinstance(err, json.JSONDecodeError):
|
|
log.warning(f"Line decoding failed: {err}")
|
|
log.warning(line)
|
|
|
|
handle.close()
|
|
log.info(f"Complete : {total_lines:,} : {matched_lines:,} : {bad_lines:,}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if single_field is not None:
|
|
log.info("Single field output mode, changing output file format to txt")
|
|
output_format = "txt"
|
|
|
|
if values_file is not None:
|
|
values = []
|
|
with open(values_file, 'r') as values_handle:
|
|
for value in values_handle:
|
|
values.append(value.strip().lower())
|
|
log.info(f"Loaded {len(values)} from values file {values_file}")
|
|
else:
|
|
values = [value.lower() for value in values] # convert to lowercase
|
|
|
|
log.info(f"Filtering field: {field}")
|
|
if len(values) <= 20:
|
|
log.info(f"On values: {','.join(values)}")
|
|
else:
|
|
log.info(f"On values:")
|
|
for value in values:
|
|
log.info(value)
|
|
log.info(f"Exact match {('on' if exact_match else 'off')}. Single field {single_field}.")
|
|
log.info(f"From date {from_date.strftime('%Y-%m-%d')} to date {to_date.strftime('%Y-%m-%d')}")
|
|
log.info(f"Output format set to {output_format}")
|
|
|
|
input_files = []
|
|
if os.path.isdir(input_file):
|
|
if not os.path.exists(output_file):
|
|
os.makedirs(output_file)
|
|
for file in os.listdir(input_file):
|
|
if not os.path.isdir(file) and file.endswith(".zst"):
|
|
input_name = os.path.splitext(os.path.splitext(os.path.basename(file))[0])[0]
|
|
input_files.append((os.path.join(input_file, file), os.path.join(output_file, input_name)))
|
|
else:
|
|
input_files.append((input_file, output_file))
|
|
log.info(f"Processing {len(input_files)} files")
|
|
for file_in, file_out in input_files:
|
|
try:
|
|
process_file(file_in, file_out, output_format, field, values, from_date, to_date, single_field, exact_match)
|
|
except Exception as err:
|
|
log.warning(f"Error processing {file_in}: {err}")
|
|
log.warning(traceback.format_exc())
|