mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-05 03:46:54 -04:00
Add csv script
This commit is contained in:
parent
c08f5f212f
commit
461028b401
6 changed files with 169 additions and 49 deletions
|
@ -1,33 +1,24 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
subreddits = {}
|
||||
object_type = "submissions"
|
||||
folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\{object_type}"
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\relationships_{object_type}.zst"
|
||||
subreddits = defaultdict(int)
|
||||
input_file = r"\\MYCLOUDPR4100\Public\reddit\comments\RC_2021-06.zst"
|
||||
input_file_size = os.stat(input_file).st_size
|
||||
total_lines = 0
|
||||
for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file):
|
||||
if comment['subreddit'] not in subreddits:
|
||||
subreddits[comment['subreddit']] = {'writer': utils.OutputZst(os.path.join(folder, comment['subreddit'] + f"_{object_type}.zst")), 'lines': 0}
|
||||
subreddit = subreddits[comment['subreddit']]
|
||||
subreddit['writer'].write(line)
|
||||
subreddit['writer'].write("\n")
|
||||
subreddit['lines'] += 1
|
||||
subreddits[comment['subreddit']] += 1
|
||||
total_lines += 1
|
||||
if total_lines % 100000 == 0:
|
||||
log.info(f"{total_lines:,} lines, {(file_bytes_processed / input_file_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"{total_lines:,} lines, 100%")
|
||||
|
||||
for name, subreddit in subreddits.items():
|
||||
log.info(f"r/{name}: {subreddit['lines']:,} lines")
|
||||
subreddit['writer'].close()
|
||||
for subreddit, count in sorted(subreddits.items(), key=lambda item: item[1] * -1):
|
||||
if count > 1000:
|
||||
log.info(f"r/{subreddit}: {count:,}")
|
||||
|
|
|
@ -5,6 +5,7 @@ import discord_logging
|
|||
import pymongo
|
||||
import time
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
@ -14,26 +15,33 @@ if __name__ == "__main__":
|
|||
client = pymongo.MongoClient(f"mongodb://{mongo_address}:27017", serverSelectionTimeoutMS=5000)
|
||||
log.info(f"Database connected at {mongo_address} on {client.admin.command('serverStatus')['host']}")
|
||||
|
||||
count = 0
|
||||
start_time = time.time()
|
||||
cursor = client.reddit_database.comments.find(
|
||||
filter={"subreddit": "RelationshipsOver35"},
|
||||
projection={'_id': False},
|
||||
sort=[('created_utc', pymongo.ASCENDING)]
|
||||
)
|
||||
log.info(f"Got cursor in {int(time.time() - start_time)} seconds")
|
||||
subreddits = [
|
||||
"PersonalFinanceCanada"
|
||||
]
|
||||
start_date = datetime(2020, 1, 1)
|
||||
end_date = datetime(2021, 1, 1)
|
||||
|
||||
output_writer = utils.OutputZst(r"\\MYCLOUDPR4100\Public\reddit_final\RelationshipsOver35_comments.zst")
|
||||
start_time = time.time()
|
||||
for comment in cursor:
|
||||
count += 1
|
||||
output_writer.write(json.dumps(comment, separators=(',', ':')))
|
||||
output_writer.write("\n")
|
||||
if count % 100000 == 0:
|
||||
log.info(f"{count,} in {int(time.time() - start_time)} seconds")
|
||||
for subreddit in subreddits:
|
||||
count = 0
|
||||
start_time = time.time()
|
||||
cursor = client.reddit_database.comments.find(
|
||||
filter={"subreddit": subreddit, "created_utc": {"$gte": int(start_date.timestamp()), "$lt": int(end_date.timestamp())}},
|
||||
projection={'_id': False},
|
||||
sort=[('created_utc', pymongo.ASCENDING)]
|
||||
)
|
||||
log.info(f"Got cursor in {int(time.time() - start_time)} seconds")
|
||||
|
||||
output_writer.close()
|
||||
log.info(f"{count,} in {int(time.time() - start_time)} seconds")
|
||||
output_writer = utils.OutputZst(r"\\MYCLOUDPR4100\Public\reddit_final\{0}_comments.zst".format(subreddit))
|
||||
start_time = time.time()
|
||||
for comment in cursor:
|
||||
count += 1
|
||||
output_writer.write(json.dumps(comment, separators=(',', ':')))
|
||||
output_writer.write("\n")
|
||||
if count % 10000 == 0:
|
||||
log.info(f"{count:,} through {datetime.utcfromtimestamp(int(comment['created_utc'])).strftime('%Y-%m-%d %H:%M:%S')} in {int(time.time() - start_time)} seconds r/{subreddit}")
|
||||
|
||||
output_writer.close()
|
||||
log.info(f"{count:,} in {int(time.time() - start_time)} seconds r/{subreddit}")
|
||||
|
||||
|
||||
# db.comments.createIndex({subreddit:1}) // remove
|
||||
|
|
30
personal/extract_file.py
Normal file
30
personal/extract_file.py
Normal file
|
@ -0,0 +1,30 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
input_file_path = r"\\MYCLOUDPR4100\Public\reddit_final\curiousdrive_submissions.zst"
|
||||
output_file_path = r"\\MYCLOUDPR4100\Public\reddit_final\curiousdrive_submissions.txt"
|
||||
file_size = os.stat(input_file_path).st_size
|
||||
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
created = None
|
||||
inserts = []
|
||||
output_file = open(output_file_path, 'w')
|
||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(input_file_path):
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file_lines += 1
|
||||
output_file.write(line)
|
||||
output_file.write("\n")
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
||||
output_file.close()
|
||||
|
|
@ -8,11 +8,11 @@ log = discord_logging.init_logging()
|
|||
|
||||
if __name__ == "__main__":
|
||||
subreddits = {}
|
||||
object_type = "submissions"
|
||||
folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\{object_type}"
|
||||
object_type = "comments"
|
||||
folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}"
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\relationships_{object_type}.zst"
|
||||
input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\ratmanreturns265_{object_type}.zst"
|
||||
input_file_size = os.stat(input_file).st_size
|
||||
total_lines = 0
|
||||
for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file):
|
||||
|
|
|
@ -8,18 +8,30 @@ log = discord_logging.init_logging()
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
file_path = r"\\MYCLOUDPR4100\Public\reddit\submissions\RS_2011-01.zst"
|
||||
file_size = os.stat(file_path).st_size
|
||||
input_path = r"\\MYCLOUDPR4100\Public\reddit\requests\jeanyp"
|
||||
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
created = None
|
||||
inserts = []
|
||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file_lines += 1
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
input_file_paths = []
|
||||
if os.path.isdir(input_path):
|
||||
for subdir, dirs, files in os.walk(input_path):
|
||||
files.sort()
|
||||
for file_name in files:
|
||||
if file_name.endswith(".zst"):
|
||||
input_file_paths.append(os.path.join(subdir, file_name))
|
||||
else:
|
||||
input_file_paths.append(input_path)
|
||||
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
||||
files_processed = 0
|
||||
for file_path in input_file_paths:
|
||||
file_name = os.path.basename(file_path)
|
||||
file_size = os.stat(file_path).st_size
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
created = None
|
||||
inserts = []
|
||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file_lines += 1
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"{files_processed}/{len(input_file_paths)}: {file_name} : {created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
||||
|
|
79
scripts/to_csv.py
Normal file
79
scripts/to_csv.py
Normal file
|
@ -0,0 +1,79 @@
|
|||
# this converts a zst file to csv
|
||||
#
|
||||
# it's important to note that the resulting file will likely be quite large
|
||||
# and you probably won't be able to open it in excel or another csv reader
|
||||
#
|
||||
# arguments are inputfile, outputfile, fields
|
||||
# call this like
|
||||
# python to_csv.py wallstreetbets_submissions.zst wallstreetbets_submissions.csv author,selftext,title
|
||||
|
||||
import zstandard
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
import csv
|
||||
from datetime import datetime
|
||||
import logging.handlers
|
||||
|
||||
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.addHandler(logging.StreamHandler())
|
||||
|
||||
|
||||
def read_lines_zst(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**27).decode()
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
yield line, file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
reader.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
input_file_path = sys.argv[1]
|
||||
output_file_path = sys.argv[2]
|
||||
fields = sys.argv[3].split(",")
|
||||
|
||||
file_size = os.stat(input_file_path).st_size
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
line = None
|
||||
created = None
|
||||
bad_lines = 0
|
||||
output_file = open(output_file_path, "w", encoding='utf-8', newline="")
|
||||
writer = csv.writer(output_file)
|
||||
writer.writerow(fields)
|
||||
try:
|
||||
for line, file_bytes_processed in read_lines_zst(input_file_path):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
output_obj = []
|
||||
for field in fields:
|
||||
output_obj.append(obj[field].encode("utf-8", errors='replace').decode())
|
||||
writer.writerow(output_obj)
|
||||
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
except json.JSONDecodeError as err:
|
||||
bad_lines += 1
|
||||
file_lines += 1
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
except KeyError as err:
|
||||
log.info(f"Object has no key: {err}")
|
||||
log.info(line)
|
||||
except Exception as err:
|
||||
log.info(err)
|
||||
log.info(line)
|
||||
|
||||
output_file.close()
|
||||
log.info(f"Complete : {file_lines:,} : {bad_lines:,}")
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue