From 461028b40116f146a5eece16855522c6309e294c Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Mon, 14 Feb 2022 16:04:27 -0800 Subject: [PATCH] Add csv script --- personal/count_by_subreddit.py | 23 +++------- personal/export_mongo.py | 44 +++++++++++-------- personal/extract_file.py | 30 +++++++++++++ personal/split_by_subreddit.py | 6 +-- personal/test_file.py | 36 ++++++++++------ scripts/to_csv.py | 79 ++++++++++++++++++++++++++++++++++ 6 files changed, 169 insertions(+), 49 deletions(-) create mode 100644 personal/extract_file.py create mode 100644 scripts/to_csv.py diff --git a/personal/count_by_subreddit.py b/personal/count_by_subreddit.py index b5d4884..292ec74 100644 --- a/personal/count_by_subreddit.py +++ b/personal/count_by_subreddit.py @@ -1,33 +1,24 @@ import utils import discord_logging import os -from datetime import datetime +from collections import defaultdict log = discord_logging.init_logging() if __name__ == "__main__": - subreddits = {} - object_type = "submissions" - folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\{object_type}" - if not os.path.exists(folder): - os.makedirs(folder) - input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\relationships_{object_type}.zst" + subreddits = defaultdict(int) + input_file = r"\\MYCLOUDPR4100\Public\reddit\comments\RC_2021-06.zst" input_file_size = os.stat(input_file).st_size total_lines = 0 for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file): - if comment['subreddit'] not in subreddits: - subreddits[comment['subreddit']] = {'writer': utils.OutputZst(os.path.join(folder, comment['subreddit'] + f"_{object_type}.zst")), 'lines': 0} - subreddit = subreddits[comment['subreddit']] - subreddit['writer'].write(line) - subreddit['writer'].write("\n") - subreddit['lines'] += 1 + subreddits[comment['subreddit']] += 1 total_lines += 1 if total_lines % 100000 == 0: log.info(f"{total_lines:,} lines, {(file_bytes_processed / input_file_size) * 100:.0f}%") log.info(f"{total_lines:,} lines, 100%") - for name, subreddit in subreddits.items(): - log.info(f"r/{name}: {subreddit['lines']:,} lines") - subreddit['writer'].close() + for subreddit, count in sorted(subreddits.items(), key=lambda item: item[1] * -1): + if count > 1000: + log.info(f"r/{subreddit}: {count:,}") diff --git a/personal/export_mongo.py b/personal/export_mongo.py index fc45054..0bfd5f2 100644 --- a/personal/export_mongo.py +++ b/personal/export_mongo.py @@ -5,6 +5,7 @@ import discord_logging import pymongo import time import sys +from datetime import datetime log = discord_logging.init_logging() @@ -14,26 +15,33 @@ if __name__ == "__main__": client = pymongo.MongoClient(f"mongodb://{mongo_address}:27017", serverSelectionTimeoutMS=5000) log.info(f"Database connected at {mongo_address} on {client.admin.command('serverStatus')['host']}") - count = 0 - start_time = time.time() - cursor = client.reddit_database.comments.find( - filter={"subreddit": "RelationshipsOver35"}, - projection={'_id': False}, - sort=[('created_utc', pymongo.ASCENDING)] - ) - log.info(f"Got cursor in {int(time.time() - start_time)} seconds") + subreddits = [ + "PersonalFinanceCanada" + ] + start_date = datetime(2020, 1, 1) + end_date = datetime(2021, 1, 1) - output_writer = utils.OutputZst(r"\\MYCLOUDPR4100\Public\reddit_final\RelationshipsOver35_comments.zst") - start_time = time.time() - for comment in cursor: - count += 1 - output_writer.write(json.dumps(comment, separators=(',', ':'))) - output_writer.write("\n") - if count % 100000 == 0: - log.info(f"{count,} in {int(time.time() - start_time)} seconds") + for subreddit in subreddits: + count = 0 + start_time = time.time() + cursor = client.reddit_database.comments.find( + filter={"subreddit": subreddit, "created_utc": {"$gte": int(start_date.timestamp()), "$lt": int(end_date.timestamp())}}, + projection={'_id': False}, + sort=[('created_utc', pymongo.ASCENDING)] + ) + log.info(f"Got cursor in {int(time.time() - start_time)} seconds") - output_writer.close() - log.info(f"{count,} in {int(time.time() - start_time)} seconds") + output_writer = utils.OutputZst(r"\\MYCLOUDPR4100\Public\reddit_final\{0}_comments.zst".format(subreddit)) + start_time = time.time() + for comment in cursor: + count += 1 + output_writer.write(json.dumps(comment, separators=(',', ':'))) + output_writer.write("\n") + if count % 10000 == 0: + log.info(f"{count:,} through {datetime.utcfromtimestamp(int(comment['created_utc'])).strftime('%Y-%m-%d %H:%M:%S')} in {int(time.time() - start_time)} seconds r/{subreddit}") + + output_writer.close() + log.info(f"{count:,} in {int(time.time() - start_time)} seconds r/{subreddit}") # db.comments.createIndex({subreddit:1}) // remove diff --git a/personal/extract_file.py b/personal/extract_file.py new file mode 100644 index 0000000..d549367 --- /dev/null +++ b/personal/extract_file.py @@ -0,0 +1,30 @@ +import utils +import discord_logging +import os +import sys +from datetime import datetime + +log = discord_logging.init_logging() + + +if __name__ == "__main__": + input_file_path = r"\\MYCLOUDPR4100\Public\reddit_final\curiousdrive_submissions.zst" + output_file_path = r"\\MYCLOUDPR4100\Public\reddit_final\curiousdrive_submissions.txt" + file_size = os.stat(input_file_path).st_size + + file_lines = 0 + file_bytes_processed = 0 + created = None + inserts = [] + output_file = open(output_file_path, 'w') + for obj, line, file_bytes_processed in utils.read_obj_zst_meta(input_file_path): + created = datetime.utcfromtimestamp(int(obj['created_utc'])) + file_lines += 1 + output_file.write(line) + output_file.write("\n") + if file_lines % 100000 == 0: + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%") + + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%") + output_file.close() + diff --git a/personal/split_by_subreddit.py b/personal/split_by_subreddit.py index b5d4884..4a5e784 100644 --- a/personal/split_by_subreddit.py +++ b/personal/split_by_subreddit.py @@ -8,11 +8,11 @@ log = discord_logging.init_logging() if __name__ == "__main__": subreddits = {} - object_type = "submissions" - folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\{object_type}" + object_type = "comments" + folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}" if not os.path.exists(folder): os.makedirs(folder) - input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\relationships_{object_type}.zst" + input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}.zst" input_file_size = os.stat(input_file).st_size total_lines = 0 for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file): diff --git a/personal/test_file.py b/personal/test_file.py index 50222ec..081b977 100644 --- a/personal/test_file.py +++ b/personal/test_file.py @@ -8,18 +8,30 @@ log = discord_logging.init_logging() if __name__ == "__main__": - file_path = r"\\MYCLOUDPR4100\Public\reddit\submissions\RS_2011-01.zst" - file_size = os.stat(file_path).st_size + input_path = r"\\MYCLOUDPR4100\Public\reddit\requests\jeanyp" - file_lines = 0 - file_bytes_processed = 0 - created = None - inserts = [] - for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path): - created = datetime.utcfromtimestamp(int(obj['created_utc'])) - file_lines += 1 - if file_lines % 100000 == 0: - log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%") + input_file_paths = [] + if os.path.isdir(input_path): + for subdir, dirs, files in os.walk(input_path): + files.sort() + for file_name in files: + if file_name.endswith(".zst"): + input_file_paths.append(os.path.join(subdir, file_name)) + else: + input_file_paths.append(input_path) - log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%") + files_processed = 0 + for file_path in input_file_paths: + file_name = os.path.basename(file_path) + file_size = os.stat(file_path).st_size + file_lines = 0 + file_bytes_processed = 0 + created = None + inserts = [] + for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path): + created = datetime.utcfromtimestamp(int(obj['created_utc'])) + file_lines += 1 + if file_lines % 100000 == 0: + log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%") + log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%") diff --git a/scripts/to_csv.py b/scripts/to_csv.py new file mode 100644 index 0000000..d0d3bb8 --- /dev/null +++ b/scripts/to_csv.py @@ -0,0 +1,79 @@ +# this converts a zst file to csv +# +# it's important to note that the resulting file will likely be quite large +# and you probably won't be able to open it in excel or another csv reader +# +# arguments are inputfile, outputfile, fields +# call this like +# python to_csv.py wallstreetbets_submissions.zst wallstreetbets_submissions.csv author,selftext,title + +import zstandard +import os +import json +import sys +import csv +from datetime import datetime +import logging.handlers + + +log = logging.getLogger("bot") +log.setLevel(logging.DEBUG) +log.addHandler(logging.StreamHandler()) + + +def read_lines_zst(file_name): + with open(file_name, 'rb') as file_handle: + buffer = '' + reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle) + while True: + chunk = reader.read(2**27).decode() + if not chunk: + break + lines = (buffer + chunk).split("\n") + + for line in lines[:-1]: + yield line, file_handle.tell() + + buffer = lines[-1] + reader.close() + + +if __name__ == "__main__": + input_file_path = sys.argv[1] + output_file_path = sys.argv[2] + fields = sys.argv[3].split(",") + + file_size = os.stat(input_file_path).st_size + file_lines = 0 + file_bytes_processed = 0 + line = None + created = None + bad_lines = 0 + output_file = open(output_file_path, "w", encoding='utf-8', newline="") + writer = csv.writer(output_file) + writer.writerow(fields) + try: + for line, file_bytes_processed in read_lines_zst(input_file_path): + try: + obj = json.loads(line) + output_obj = [] + for field in fields: + output_obj.append(obj[field].encode("utf-8", errors='replace').decode()) + writer.writerow(output_obj) + + created = datetime.utcfromtimestamp(int(obj['created_utc'])) + except json.JSONDecodeError as err: + bad_lines += 1 + file_lines += 1 + if file_lines % 100000 == 0: + log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%") + except KeyError as err: + log.info(f"Object has no key: {err}") + log.info(line) + except Exception as err: + log.info(err) + log.info(line) + + output_file.close() + log.info(f"Complete : {file_lines:,} : {bad_lines:,}") +