mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-30 09:58:52 -04:00
Add personal scripts to git
This commit is contained in:
parent
e4e8ad480c
commit
3be517ef12
12 changed files with 531 additions and 113 deletions
21
personal/comments_per_day.py
Normal file
21
personal/comments_per_day.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
day = None
|
||||
day_comments = 0
|
||||
for comment in utils.read_obj_zst(r"\\MYCLOUDPR4100\Public\reddit_final\wallstreetbets_comments.zst"):
|
||||
created_day = datetime.utcfromtimestamp(int(comment['created_utc'])).strftime("%m/%d/%y")
|
||||
if day is None:
|
||||
day = created_day
|
||||
if day != created_day:
|
||||
log.info(f"{day} {day_comments}")
|
||||
day_comments = 0
|
||||
day = created_day
|
||||
day_comments += 1
|
||||
|
||||
log.info(f"{day} {day_comments}")
|
33
personal/compare_lines.py
Normal file
33
personal/compare_lines.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
file_one = open(r"\\MYCLOUDPR4100\Public\reddit_final\RelationshipsOver35_comments_dump.txt", 'r')
|
||||
file_two = open(r"\\MYCLOUDPR4100\Public\reddit_final\RelationshipsOver35_comments_mongo.txt", 'r')
|
||||
|
||||
file_lines = 0
|
||||
while True:
|
||||
file_lines += 1
|
||||
line_one = file_one.readline().rstrip()
|
||||
line_two = file_two.readline().rstrip()
|
||||
if line_one != line_two:
|
||||
log.info(f"lines not matching: {file_lines}")
|
||||
log.info(line_one)
|
||||
log.info(line_two)
|
||||
#break
|
||||
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{file_lines:,}")
|
||||
|
||||
if not line_one:
|
||||
break
|
||||
|
||||
log.info(f"{file_lines:,}")
|
||||
file_one.close()
|
||||
file_two.close()
|
33
personal/count_by_subreddit.py
Normal file
33
personal/count_by_subreddit.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
subreddits = {}
|
||||
object_type = "submissions"
|
||||
folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\{object_type}"
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\relationships_{object_type}.zst"
|
||||
input_file_size = os.stat(input_file).st_size
|
||||
total_lines = 0
|
||||
for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file):
|
||||
if comment['subreddit'] not in subreddits:
|
||||
subreddits[comment['subreddit']] = {'writer': utils.OutputZst(os.path.join(folder, comment['subreddit'] + f"_{object_type}.zst")), 'lines': 0}
|
||||
subreddit = subreddits[comment['subreddit']]
|
||||
subreddit['writer'].write(line)
|
||||
subreddit['writer'].write("\n")
|
||||
subreddit['lines'] += 1
|
||||
total_lines += 1
|
||||
if total_lines % 100000 == 0:
|
||||
log.info(f"{total_lines:,} lines, {(file_bytes_processed / input_file_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"{total_lines:,} lines, 100%")
|
||||
|
||||
for name, subreddit in subreddits.items():
|
||||
log.info(f"r/{name}: {subreddit['lines']:,} lines")
|
||||
subreddit['writer'].close()
|
47
personal/export_mongo.py
Normal file
47
personal/export_mongo.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
import json
|
||||
|
||||
import utils
|
||||
import discord_logging
|
||||
import pymongo
|
||||
import time
|
||||
import sys
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mongo_address = sys.argv[1] # 192.168.1.131
|
||||
client = pymongo.MongoClient(f"mongodb://{mongo_address}:27017", serverSelectionTimeoutMS=5000)
|
||||
log.info(f"Database connected at {mongo_address} on {client.admin.command('serverStatus')['host']}")
|
||||
|
||||
count = 0
|
||||
start_time = time.time()
|
||||
cursor = client.reddit_database.comments.find(
|
||||
filter={"subreddit": "RelationshipsOver35"},
|
||||
projection={'_id': False},
|
||||
sort=[('created_utc', pymongo.ASCENDING)]
|
||||
)
|
||||
log.info(f"Got cursor in {int(time.time() - start_time)} seconds")
|
||||
|
||||
output_writer = utils.OutputZst(r"\\MYCLOUDPR4100\Public\reddit_final\RelationshipsOver35_comments.zst")
|
||||
start_time = time.time()
|
||||
for comment in cursor:
|
||||
count += 1
|
||||
output_writer.write(json.dumps(comment, separators=(',', ':')))
|
||||
output_writer.write("\n")
|
||||
if count % 100000 == 0:
|
||||
log.info(f"{count,} in {int(time.time() - start_time)} seconds")
|
||||
|
||||
output_writer.close()
|
||||
log.info(f"{count,} in {int(time.time() - start_time)} seconds")
|
||||
|
||||
|
||||
# db.comments.createIndex({subreddit:1}) // remove
|
||||
# db.comments.createIndex({subreddit:1, created_utc:1})
|
||||
# db.comments.createIndex({author:1, created_utc:1})
|
||||
# db.comments.createIndex({id:1})
|
||||
# db.submissions.createIndex({subreddit:1, created_utc:1})
|
||||
# db.submissions.createIndex({author:1, created_utc:1})
|
||||
# db.submissions.createIndex({id:1})
|
||||
# db.submissions.createIndex({created_utc:1})
|
||||
# db.comments.createIndex({created_utc:1})
|
57
personal/group_subs.py
Normal file
57
personal/group_subs.py
Normal file
|
@ -0,0 +1,57 @@
|
|||
import json
|
||||
from datetime import datetime
|
||||
import utils
|
||||
import discord_logging
|
||||
import pymongo
|
||||
import time
|
||||
import sys
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mongo_address = sys.argv[1] # 192.168.1.131
|
||||
client = pymongo.MongoClient(f"mongodb://{mongo_address}:27017", serverSelectionTimeoutMS=5000)
|
||||
log.info(f"Database connected at {mongo_address} on {client.admin.command('serverStatus')['host']}")
|
||||
|
||||
count = 0
|
||||
start_time = time.time()
|
||||
start_date = int(datetime(2021, 6, 1).timestamp())
|
||||
cursor = client.reddit_database.submissions.aggregate(
|
||||
[
|
||||
{"$match": {"created_utc": {"$gt": start_date}}},
|
||||
{"$project": {"subreddit": 1, "over_18": {"$cond": ["$over_18", 1, 0]}}},
|
||||
{"$group": {"_id": "$subreddit", "countTotal": {"$count": {}}, "countNsfw": {"$sum": "$over_18"}}},
|
||||
{"$match": {"countTotal": {"$gt": 100}}},
|
||||
],
|
||||
allowDiskUse=True
|
||||
)
|
||||
log.info(f"Got cursor in {int(time.time() - start_time)} seconds")
|
||||
|
||||
start_time = time.time()
|
||||
subreddits = []
|
||||
for subreddit in cursor:
|
||||
subreddit['percent'] = int((subreddit['countNsfw']/subreddit['countTotal'])*100)
|
||||
if subreddit['percent'] >= 10:
|
||||
subreddits.append(subreddit)
|
||||
count += 1
|
||||
if count % 100000 == 0:
|
||||
log.info(f"{count:,} in {int(time.time() - start_time)} seconds")
|
||||
|
||||
log.info(f"{count:,} in {int(time.time() - start_time)} seconds")
|
||||
|
||||
file_out = open(r"\\MYCLOUDPR4100\Public\reddit_final\subreddits.txt", 'w')
|
||||
for subreddit in sorted(subreddits, key=lambda item: (item['percent'], item['countTotal']), reverse=True):
|
||||
file_out.write(f"{subreddit['_id']: <22}{subreddit['countTotal']: <8}{subreddit['countNsfw']: <8}{subreddit['percent']}%\n")
|
||||
file_out.close()
|
||||
|
||||
|
||||
# db.comments.createIndex({subreddit:1}) // remove
|
||||
# db.comments.createIndex({subreddit:1, created_utc:1})
|
||||
# db.comments.createIndex({author:1, created_utc:1})
|
||||
# db.comments.createIndex({id:1})
|
||||
# db.submissions.createIndex({subreddit:1, created_utc:1})
|
||||
# db.submissions.createIndex({author:1, created_utc:1})
|
||||
# db.submissions.createIndex({id:1})
|
||||
# db.submissions.createIndex({created_utc:1})
|
||||
# db.comments.createIndex({created_utc:1})
|
62
personal/insert_mongo.py
Normal file
62
personal/insert_mongo.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
import os
|
||||
import pymongo
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mongo_address = sys.argv[1] # 192.168.1.131
|
||||
client = pymongo.MongoClient(f"mongodb://{mongo_address}:27017", serverSelectionTimeoutMS=5000)
|
||||
|
||||
log.info(f"Database connected at {mongo_address} on {client.admin.command('serverStatus')['host']}")
|
||||
|
||||
object_type = sys.argv[2]
|
||||
input_folder = sys.argv[3]
|
||||
input_files = []
|
||||
total_size = 0
|
||||
for subdir, dirs, files in os.walk(input_folder + os.sep + object_type):
|
||||
files.sort()
|
||||
for filename in files:
|
||||
input_path = os.path.join(subdir, filename)
|
||||
if input_path.endswith(".zst"):
|
||||
file_size = os.stat(input_path).st_size
|
||||
total_size += file_size
|
||||
input_files.append([input_path, file_size])
|
||||
|
||||
log.info(f"Processing {len(input_files)} files of {(total_size / (2 ** 30)):.2f} gigabytes")
|
||||
|
||||
collection = client.reddit_database[object_type]
|
||||
|
||||
log.info(f"Using collection {object_type} which has {collection.estimated_document_count()} objects already")
|
||||
|
||||
total_lines = 0
|
||||
total_bytes_processed = 0
|
||||
for input_file in input_files:
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
created = None
|
||||
inserts = []
|
||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(input_file[0]):
|
||||
inserts.append(obj)
|
||||
if len(inserts) >= 10000:
|
||||
collection.insert_many(inserts)
|
||||
inserts = []
|
||||
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file_lines += 1
|
||||
if file_lines == 1:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : 0% : {(total_bytes_processed / total_size) * 100:.0f}%")
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : {(file_bytes_processed / input_file[1]) * 100:.0f}% : {(total_bytes_processed / total_size) * 100:.0f}%")
|
||||
|
||||
if len(inserts) >= 0:
|
||||
collection.insert_many(inserts)
|
||||
total_lines += file_lines
|
||||
total_bytes_processed += input_file[1]
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {total_lines:,} : 100% : {(total_bytes_processed / total_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"Total: {total_lines}")
|
33
personal/split_by_subreddit.py
Normal file
33
personal/split_by_subreddit.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
subreddits = {}
|
||||
object_type = "submissions"
|
||||
folder = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\{object_type}"
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
input_file = f"\\\\MYCLOUDPR4100\\Public\\reddit_final\\relationships_{object_type}.zst"
|
||||
input_file_size = os.stat(input_file).st_size
|
||||
total_lines = 0
|
||||
for comment, line, file_bytes_processed in utils.read_obj_zst_meta(input_file):
|
||||
if comment['subreddit'] not in subreddits:
|
||||
subreddits[comment['subreddit']] = {'writer': utils.OutputZst(os.path.join(folder, comment['subreddit'] + f"_{object_type}.zst")), 'lines': 0}
|
||||
subreddit = subreddits[comment['subreddit']]
|
||||
subreddit['writer'].write(line)
|
||||
subreddit['writer'].write("\n")
|
||||
subreddit['lines'] += 1
|
||||
total_lines += 1
|
||||
if total_lines % 100000 == 0:
|
||||
log.info(f"{total_lines:,} lines, {(file_bytes_processed / input_file_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"{total_lines:,} lines, 100%")
|
||||
|
||||
for name, subreddit in subreddits.items():
|
||||
log.info(f"r/{name}: {subreddit['lines']:,} lines")
|
||||
subreddit['writer'].close()
|
25
personal/test_file.py
Normal file
25
personal/test_file.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
import utils
|
||||
import discord_logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
log = discord_logging.init_logging()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
file_path = r"\\MYCLOUDPR4100\Public\reddit\submissions\RS_2011-01.zst"
|
||||
file_size = os.stat(file_path).st_size
|
||||
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
created = None
|
||||
inserts = []
|
||||
for obj, line, file_bytes_processed in utils.read_obj_zst_meta(file_path):
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file_lines += 1
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : 100%")
|
||||
|
60
personal/utils.py
Normal file
60
personal/utils.py
Normal file
|
@ -0,0 +1,60 @@
|
|||
import zstandard
|
||||
import json
|
||||
|
||||
|
||||
def read_obj_zst(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**27).decode()
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
yield json.loads(line)
|
||||
|
||||
buffer = lines[-1]
|
||||
reader.close()
|
||||
|
||||
|
||||
def read_obj_zst_meta(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**27).decode()
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
try:
|
||||
json_object = json.loads(line)
|
||||
except (KeyError, json.JSONDecodeError) as err:
|
||||
continue
|
||||
yield json_object, line, file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
reader.close()
|
||||
|
||||
|
||||
class OutputZst:
|
||||
def __init__(self, file_name):
|
||||
output_file = open(file_name, 'wb')
|
||||
self.writer = zstandard.ZstdCompressor().stream_writer(output_file)
|
||||
|
||||
def write(self, line):
|
||||
encoded_line = line.encode('utf-8')
|
||||
self.writer.write(encoded_line)
|
||||
|
||||
def close(self):
|
||||
self.writer.close()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
self.close()
|
||||
return True
|
Loading…
Add table
Add a link
Reference in a new issue