mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-25 15:45:19 -04:00
Add from/to
This commit is contained in:
parent
3984f0ecfc
commit
15d574e198
3 changed files with 75 additions and 47 deletions
|
@ -139,6 +139,51 @@ def process_file(file, queue):
|
||||||
queue.put(file)
|
queue.put(file)
|
||||||
|
|
||||||
|
|
||||||
|
def process_update(input_files, queue, last_log_time, force_write):
|
||||||
|
file_update = queue.get()
|
||||||
|
if file_update.error_message is not None:
|
||||||
|
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
input_files[file_update.input_path] = file_update
|
||||||
|
if force_write or last_log_time is None or (current_time - last_log_time) > 5 or queue.empty():
|
||||||
|
total_lines_processed = 0
|
||||||
|
total_bytes_processed = 0
|
||||||
|
total_lines_errored = 0
|
||||||
|
files_processed = 0
|
||||||
|
files_errored = 0
|
||||||
|
i = 0
|
||||||
|
for file in input_files.values():
|
||||||
|
total_lines_processed += file.lines_processed
|
||||||
|
total_bytes_processed += file.bytes_processed
|
||||||
|
total_lines_errored += file.error_lines
|
||||||
|
files_processed += 1 if file.complete or file.error_message is not None else 0
|
||||||
|
files_errored += 1 if file.error_message is not None else 0
|
||||||
|
i += 1
|
||||||
|
if file_update.complete or file_update.error_message is not None:
|
||||||
|
save_file_list(input_files, status_json, script_type)
|
||||||
|
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||||
|
|
||||||
|
first_time, first_lines, first_bytes = progress_queue.peek()
|
||||||
|
bytes_per_second = int((total_bytes_processed - first_bytes)/(current_time - first_time))
|
||||||
|
speed_queue.put(bytes_per_second)
|
||||||
|
seconds_left = int((total_bytes - total_bytes_processed) / int(sum(speed_queue.list) / len(speed_queue.list)))
|
||||||
|
minutes_left = int(seconds_left / 60)
|
||||||
|
hours_left = int(minutes_left / 60)
|
||||||
|
days_left = int(hours_left / 24)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
||||||
|
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||||
|
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
||||||
|
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining : "
|
||||||
|
f"{queue.qsize()} files in queue : {current_time} : {last_log_time} : {current_time - last_log_time if last_log_time is not None else 0} : "
|
||||||
|
f"{(current_time - last_log_time) > 5 if last_log_time is not None else 0} : {int(current_time - last_log_time) > 5 if last_log_time is not None else 0} : "
|
||||||
|
f"{last_log_time is None or (current_time - last_log_time) > 5 or queue.empty()} : {queue.empty()}")
|
||||||
|
last_log_time = time.time()
|
||||||
|
return last_log_time
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files")
|
parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files")
|
||||||
parser.add_argument("input", help="The input folder to recursively read files from")
|
parser.add_argument("input", help="The input folder to recursively read files from")
|
||||||
|
@ -209,47 +254,10 @@ if __name__ == '__main__':
|
||||||
workers = pool.starmap_async(process_file, [(file, queue) for file in files_to_process], chunksize=1, error_callback=log.info)
|
workers = pool.starmap_async(process_file, [(file, queue) for file in files_to_process], chunksize=1, error_callback=log.info)
|
||||||
while not workers.ready():
|
while not workers.ready():
|
||||||
# loop until the workers are all done, pulling in status messages as they are sent
|
# loop until the workers are all done, pulling in status messages as they are sent
|
||||||
file_update = queue.get()
|
last_log_time = process_update(input_files, queue, last_log_time, False)
|
||||||
if file_update.error_message is not None:
|
|
||||||
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
|
||||||
current_time = time.time()
|
|
||||||
|
|
||||||
input_files[file_update.input_path] = file_update
|
while not queue.empty():
|
||||||
if last_log_time is None or (current_time - last_log_time) > 5 or queue.empty():
|
|
||||||
total_lines_processed = 0
|
|
||||||
total_bytes_processed = 0
|
|
||||||
total_lines_errored = 0
|
|
||||||
files_processed = 0
|
|
||||||
files_errored = 0
|
|
||||||
i = 0
|
|
||||||
for file in input_files.values():
|
|
||||||
total_lines_processed += file.lines_processed
|
|
||||||
total_bytes_processed += file.bytes_processed
|
|
||||||
total_lines_errored += file.error_lines
|
|
||||||
files_processed += 1 if file.complete or file.error_message is not None else 0
|
|
||||||
files_errored += 1 if file.error_message is not None else 0
|
|
||||||
i += 1
|
|
||||||
if file_update.complete or file_update.error_message is not None:
|
|
||||||
save_file_list(input_files, status_json, script_type)
|
|
||||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
|
||||||
|
|
||||||
first_time, first_lines, first_bytes = progress_queue.peek()
|
|
||||||
bytes_per_second = int((total_bytes_processed - first_bytes)/(current_time - first_time))
|
|
||||||
speed_queue.put(bytes_per_second)
|
|
||||||
seconds_left = int((total_bytes - total_bytes_processed) / int(sum(speed_queue.list) / len(speed_queue.list)))
|
|
||||||
minutes_left = int(seconds_left / 60)
|
|
||||||
hours_left = int(minutes_left / 60)
|
|
||||||
days_left = int(hours_left / 24)
|
|
||||||
|
|
||||||
log.info(
|
|
||||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
|
||||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
|
||||||
f"{files_processed}({files_errored})/{len(input_files)} files : "
|
|
||||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining : "
|
|
||||||
f"{queue.qsize()} files in queue : {current_time} : {last_log_time} : {current_time - last_log_time if last_log_time is not None else 0} : "
|
|
||||||
f"{(current_time - last_log_time) > 5 if last_log_time is not None else 0} : {int(current_time - last_log_time) > 5 if last_log_time is not None else 0} : "
|
|
||||||
f"{last_log_time is None or (current_time - last_log_time) > 5 or queue.empty()} : {queue.empty()}")
|
|
||||||
last_log_time = time.time()
|
|
||||||
|
|
||||||
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
||||||
|
|
||||||
|
|
|
@ -12,11 +12,10 @@ import json
|
||||||
# change the subreddits to the list of subreddits, one per line. The case must exactly match, ie, for r/AskReddit, put "AskReddit"
|
# change the subreddits to the list of subreddits, one per line. The case must exactly match, ie, for r/AskReddit, put "AskReddit"
|
||||||
# the files in the folder must match the format from the torrent, subreddit_type.zst, like AskReddit_comments.zst
|
# the files in the folder must match the format from the torrent, subreddit_type.zst, like AskReddit_comments.zst
|
||||||
# the script will look for both comments and submissions files for each subreddit
|
# the script will look for both comments and submissions files for each subreddit
|
||||||
folder = r"\\MYCLOUDPR4100\Public\reddit\subreddits23"
|
folder = r"\\MYCLOUDPR4100\Public\reddit\subreddits24"
|
||||||
subreddits_string = """
|
subreddits_string = """
|
||||||
TheSimpsons
|
navy
|
||||||
Askmen
|
beetle
|
||||||
Seinfeld
|
|
||||||
"""
|
"""
|
||||||
ignored_users = {'[deleted]', 'automoderator'}
|
ignored_users = {'[deleted]', 'automoderator'}
|
||||||
# this is a list of users to ignore when doing the comparison. Most popular bots post in many subreddits and aren't the person you're looking for
|
# this is a list of users to ignore when doing the comparison. Most popular bots post in many subreddits and aren't the person you're looking for
|
||||||
|
@ -25,6 +24,8 @@ ignored_users_file = "ignored.txt"
|
||||||
min_comments_per_sub = 1
|
min_comments_per_sub = 1
|
||||||
output_file_name = "users.txt"
|
output_file_name = "users.txt"
|
||||||
require_first_subreddit = False # if true, print users that occur in the first subreddit and any one of the following ones. Otherwise just find the most overlap between all subs
|
require_first_subreddit = False # if true, print users that occur in the first subreddit and any one of the following ones. Otherwise just find the most overlap between all subs
|
||||||
|
from_date = datetime.strptime("2005-01-01", "%Y-%m-%d")
|
||||||
|
to_date = datetime.strptime("2030-12-31", "%Y-%m-%d")
|
||||||
|
|
||||||
|
|
||||||
# sets up logging to the console as well as a file
|
# sets up logging to the console as well as a file
|
||||||
|
@ -74,7 +75,7 @@ def read_lines_zst(file_name):
|
||||||
reader.close()
|
reader.close()
|
||||||
|
|
||||||
|
|
||||||
def get_commenters_from_file(subreddit, subreddit_file, subreddit_commenters, total_lines, files_status):
|
def get_commenters_from_file(subreddit, subreddit_file, subreddit_commenters, total_lines, files_status, from_date, to_date):
|
||||||
file_lines = 0
|
file_lines = 0
|
||||||
created = None
|
created = None
|
||||||
file_size = os.stat(subreddit_file).st_size
|
file_size = os.stat(subreddit_file).st_size
|
||||||
|
@ -87,6 +88,8 @@ def get_commenters_from_file(subreddit, subreddit_file, subreddit_commenters, to
|
||||||
try:
|
try:
|
||||||
obj = json.loads(line)
|
obj = json.loads(line)
|
||||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||||
|
if created < from_date or created > to_date:
|
||||||
|
continue
|
||||||
|
|
||||||
if obj['author'].lower() not in ignored_users:
|
if obj['author'].lower() not in ignored_users:
|
||||||
subreddit_commenters[obj['author']] += 1
|
subreddit_commenters[obj['author']] += 1
|
||||||
|
@ -171,8 +174,10 @@ if __name__ == "__main__":
|
||||||
subreddit_stat[file_type],
|
subreddit_stat[file_type],
|
||||||
commenters,
|
commenters,
|
||||||
total_lines,
|
total_lines,
|
||||||
f"{files_processed}|{len(subreddit_stats)}")
|
f"{files_processed}|{len(subreddit_stats)}",
|
||||||
|
from_date,
|
||||||
|
to_date
|
||||||
|
)
|
||||||
for commenter in commenters:
|
for commenter in commenters:
|
||||||
if require_first_subreddit and not is_first and commenter not in commenterSubreddits:
|
if require_first_subreddit and not is_first and commenter not in commenterSubreddits:
|
||||||
continue
|
continue
|
||||||
|
@ -199,6 +204,7 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
with open(output_file_name, 'w') as txt:
|
with open(output_file_name, 'w') as txt:
|
||||||
log.info(f"Writing output to {output_file_name}")
|
log.info(f"Writing output to {output_file_name}")
|
||||||
|
txt.write(f"Commenters in subreddits {(', '.join(subreddits))}\n")
|
||||||
for i in range(len(subreddits)):
|
for i in range(len(subreddits)):
|
||||||
commenters = len(sharedCommenters[len(subreddits) - i])
|
commenters = len(sharedCommenters[len(subreddits) - i])
|
||||||
inner_str = f"but {i} " if i != 0 else ""
|
inner_str = f"but {i} " if i != 0 else ""
|
||||||
|
|
|
@ -156,3 +156,17 @@ The-Worst-Bot
|
||||||
theHelperdroid
|
theHelperdroid
|
||||||
VredditDownloader
|
VredditDownloader
|
||||||
YOUREABOT
|
YOUREABOT
|
||||||
|
YTubeInfoBot
|
||||||
|
URLfixerBot
|
||||||
|
TweetsInCommentsBot
|
||||||
|
SovietRussiaBot
|
||||||
|
ShibeBot
|
||||||
|
PressFBot
|
||||||
|
LittleHelperRobot
|
||||||
|
LinkFixerBot
|
||||||
|
LinkFixerBotSnr
|
||||||
|
Link_Demobilizer
|
||||||
|
LazyLinkerBot
|
||||||
|
Darnit_Bot
|
||||||
|
checks_out_bot
|
||||||
|
HippoBot9000
|
Loading…
Add table
Add a link
Reference in a new issue