mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-05 03:46:54 -04:00
Initial commit
This commit is contained in:
commit
bd7378ff91
7 changed files with 525 additions and 0 deletions
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
|
@ -0,0 +1,6 @@
|
|||
.idea/*
|
||||
logs/*
|
||||
__pycache__/*
|
||||
*.db
|
||||
*.ini
|
||||
*.txt
|
12
Pipfile
Normal file
12
Pipfile
Normal file
|
@ -0,0 +1,12 @@
|
|||
[[source]]
|
||||
url = "https://pypi.org/simple"
|
||||
verify_ssl = true
|
||||
name = "pypi"
|
||||
|
||||
[packages]
|
||||
zstandard = "*"
|
||||
|
||||
[dev-packages]
|
||||
|
||||
[requires]
|
||||
python_version = "3.8"
|
75
Pipfile.lock
generated
Normal file
75
Pipfile.lock
generated
Normal file
|
@ -0,0 +1,75 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "03662c664bce0cd9fad7ea3062bb09bfe999f755a00e0f4de720c66955ccae62"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
"python_version": "3.8"
|
||||
},
|
||||
"sources": [
|
||||
{
|
||||
"name": "pypi",
|
||||
"url": "https://pypi.org/simple",
|
||||
"verify_ssl": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"default": {
|
||||
"zstandard": {
|
||||
"hashes": [
|
||||
"sha256:1c5ef399f81204fbd9f0df3debf80389fd8aa9660fe1746d37c80b0d45f809e9",
|
||||
"sha256:1faefe33e3d6870a4dce637bcb41f7abb46a1872a595ecc7b034016081c37543",
|
||||
"sha256:1fb23b1754ce834a3a1a1e148cc2faad76eeadf9d889efe5e8199d3fb839d3c6",
|
||||
"sha256:22f127ff5da052ffba73af146d7d61db874f5edb468b36c9cb0b857316a21b3d",
|
||||
"sha256:2353b61f249a5fc243aae3caa1207c80c7e6919a58b1f9992758fa496f61f839",
|
||||
"sha256:24cdcc6f297f7c978a40fb7706877ad33d8e28acc1786992a52199502d6da2a4",
|
||||
"sha256:31e35790434da54c106f05fa93ab4d0fab2798a6350e8a73928ec602e8505836",
|
||||
"sha256:3547ff4eee7175d944a865bbdf5529b0969c253e8a148c287f0668fe4eb9c935",
|
||||
"sha256:378ac053c0cfc74d115cbb6ee181540f3e793c7cca8ed8cd3893e338af9e942c",
|
||||
"sha256:3e1cd2db25117c5b7c7e86a17cde6104a93719a9df7cb099d7498e4c1d13ee5c",
|
||||
"sha256:3fe469a887f6142cc108e44c7f42c036e43620ebaf500747be2317c9f4615d4f",
|
||||
"sha256:4800ab8ec94cbf1ed09c2b4686288750cab0642cb4d6fba2a56db66b923aeb92",
|
||||
"sha256:52de08355fd5cfb3ef4533891092bb96229d43c2069703d4aff04fdbedf9c92f",
|
||||
"sha256:5752f44795b943c99be367fee5edf3122a1690b0d1ecd1bd5ec94c7fd2c39c94",
|
||||
"sha256:5d53f02aeb8fdd48b88bc80bece82542d084fb1a7ba03bf241fd53b63aee4f22",
|
||||
"sha256:69b7a5720b8dfab9005a43c7ddb2e3ccacbb9a2442908ae4ed49dd51ab19698a",
|
||||
"sha256:6cc162b5b6e3c40b223163a9ea86cd332bd352ddadb5fd142fc0706e5e4eaaff",
|
||||
"sha256:6f5d0330bc992b1e267a1b69fbdbb5ebe8c3a6af107d67e14c7a5b1ede2c5945",
|
||||
"sha256:6ffadd48e6fe85f27ca3ca10cfd3ef3d0f933bef7316870285ffeb58d791ca9c",
|
||||
"sha256:72a011678c654df8323aa7b687e3147749034fdbe994d346f139ab9702b59cea",
|
||||
"sha256:77d26452676f471223571efd73131fd4a626622c7960458aab2763e025836fc5",
|
||||
"sha256:7a88cc773ffe55992ff7259a8df5fb3570168d7138c69aadba40142d0e5ce39a",
|
||||
"sha256:7b16bd74ae7bfbaca407a127e11058b287a4267caad13bd41305a5e630472549",
|
||||
"sha256:855d95ec78b6f0ff66e076d5461bf12d09d8e8f7e2b3fc9de7236d1464fd730e",
|
||||
"sha256:8baf7991547441458325ca8fafeae79ef1501cb4354022724f3edd62279c5b2b",
|
||||
"sha256:8fb77dd152054c6685639d855693579a92f276b38b8003be5942de31d241ebfb",
|
||||
"sha256:92d49cc3b49372cfea2d42f43a2c16a98a32a6bc2f42abcde121132dbfc2f023",
|
||||
"sha256:94d0de65e37f5677165725f1fc7fb1616b9542d42a9832a9a0bdcba0ed68b63b",
|
||||
"sha256:9867206093d7283d7de01bd2bf60389eb4d19b67306a0a763d1a8a4dbe2fb7c3",
|
||||
"sha256:9ee3c992b93e26c2ae827404a626138588e30bdabaaf7aa3aa25082a4e718790",
|
||||
"sha256:a4f8af277bb527fa3d56b216bda4da931b36b2d3fe416b6fc1744072b2c1dbd9",
|
||||
"sha256:ab9f19460dfa4c5dd25431b75bee28b5f018bf43476858d64b1aa1046196a2a0",
|
||||
"sha256:ac43c1821ba81e9344d818c5feed574a17f51fca27976ff7d022645c378fbbf5",
|
||||
"sha256:af5a011609206e390b44847da32463437505bf55fd8985e7a91c52d9da338d4b",
|
||||
"sha256:b0975748bb6ec55b6d0f6665313c2cf7af6f536221dccd5879b967d76f6e7899",
|
||||
"sha256:b4963dad6cf28bfe0b61c3265d1c74a26a7605df3445bfcd3ba25de012330b2d",
|
||||
"sha256:b7d3a484ace91ed827aa2ef3b44895e2ec106031012f14d28bd11a55f24fa734",
|
||||
"sha256:bd3c478a4a574f412efc58ba7e09ab4cd83484c545746a01601636e87e3dbf23",
|
||||
"sha256:c9e2dcb7f851f020232b991c226c5678dc07090256e929e45a89538d82f71d2e",
|
||||
"sha256:d25c8eeb4720da41e7afbc404891e3a945b8bb6d5230e4c53d23ac4f4f9fc52c",
|
||||
"sha256:dc8c03d0c5c10c200441ffb4cce46d869d9e5c4ef007f55856751dc288a2dffd",
|
||||
"sha256:ec58e84d625553d191a23d5988a19c3ebfed519fff2a8b844223e3f074152163",
|
||||
"sha256:eda0719b29792f0fea04a853377cfff934660cb6cd72a0a0eeba7a1f0df4a16e",
|
||||
"sha256:edde82ce3007a64e8434ccaf1b53271da4f255224d77b880b59e7d6d73df90c8",
|
||||
"sha256:f36722144bc0a5068934e51dca5a38a5b4daac1be84f4423244277e4baf24e7a",
|
||||
"sha256:f8bb00ced04a8feff05989996db47906673ed45b11d86ad5ce892b5741e5f9dd",
|
||||
"sha256:f98fc5750aac2d63d482909184aac72a979bfd123b112ec53fd365104ea15b1c",
|
||||
"sha256:ff5b75f94101beaa373f1511319580a010f6e03458ee51b1a386d7de5331440a"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.15.2"
|
||||
}
|
||||
},
|
||||
"develop": {}
|
||||
}
|
1
README.md
Normal file
1
README.md
Normal file
|
@ -0,0 +1 @@
|
|||
coming soon
|
314
scripts/combine_folder_multiprocess.py
Normal file
314
scripts/combine_folder_multiprocess.py
Normal file
|
@ -0,0 +1,314 @@
|
|||
import zstandard
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
import logging.handlers
|
||||
import multiprocessing
|
||||
|
||||
|
||||
# sets up logging to the console as well as a file
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.INFO)
|
||||
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
|
||||
|
||||
log_stderr_handler = logging.StreamHandler()
|
||||
log_stderr_handler.setFormatter(log_formatter)
|
||||
log.addHandler(log_stderr_handler)
|
||||
if not os.path.exists("logs"):
|
||||
os.makedirs("logs")
|
||||
log_file_handler = logging.handlers.RotatingFileHandler(
|
||||
os.path.join("logs", "bot.log"), maxBytes=1024*1024*16, backupCount=5)
|
||||
log_file_handler.setFormatter(log_formatter)
|
||||
log.addHandler(log_file_handler)
|
||||
|
||||
|
||||
# this script iterates through zst compressed ndjson files, like the pushshift reddit dumps, loads each line
|
||||
# and passes it into the save_obj function, if it function returns true for a line, it's written out into a
|
||||
# separate file for that month. After all the ndjson files are processed, it iterates through the resulting
|
||||
# files and combines them into a final file.
|
||||
|
||||
# once complete, the combined file can easily be processed like
|
||||
# with open(file_path, 'r') as file:
|
||||
# for line in file:
|
||||
# obj = json.loads(line)
|
||||
|
||||
# features:
|
||||
# - multiple processes in parallel to maximize drive read and decompression
|
||||
# - saves state as it completes each file and picks up where it stopped
|
||||
# - detailed progress indicators
|
||||
|
||||
|
||||
# convenience object used to pass status information between processes
|
||||
class FileConfig:
|
||||
def __init__(self, input_path, output_path=None, complete=False, lines_processed=0, error_lines=0):
|
||||
self.input_path = input_path
|
||||
self.output_path = output_path
|
||||
self.file_size = os.stat(input_path).st_size
|
||||
self.complete = complete
|
||||
self.bytes_processed = self.file_size if complete else 0
|
||||
self.lines_processed = lines_processed if complete else 0
|
||||
self.error_message = None
|
||||
self.error_lines = error_lines
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.input_path} : {self.output_path} : {self.file_size} : {self.complete} : {self.bytes_processed} : {self.lines_processed}"
|
||||
|
||||
|
||||
# used for calculating running average of read speed
|
||||
class Queue:
|
||||
def __init__(self, max_size):
|
||||
self.list = []
|
||||
self.max_size = max_size
|
||||
|
||||
def put(self, item):
|
||||
if len(self.list) >= self.max_size:
|
||||
self.list.pop(0)
|
||||
self.list.append(item)
|
||||
|
||||
def peek(self):
|
||||
return self.list[0] if len(self.list) > 0 else None
|
||||
|
||||
|
||||
# builds file paths
|
||||
def folder_helper(output_folder, output_file_name):
|
||||
working_folder = os.path.join(output_folder, "pushshift_working_dir_" + output_file_name)
|
||||
status_file = os.path.join(working_folder, output_file_name + ".json")
|
||||
return working_folder, status_file
|
||||
|
||||
|
||||
# save file information and progress to a json file
|
||||
# we don't want to save the whole FileConfig object, since some of the info resets if we restart
|
||||
def save_file_list(input_files, output_folder, output_file_name):
|
||||
working_folder, status_json_file_name = folder_helper(output_folder, output_file_name)
|
||||
if not os.path.exists(working_folder):
|
||||
os.makedirs(working_folder)
|
||||
simple_file_list = []
|
||||
for file in input_files:
|
||||
simple_file_list.append([file.input_path, file.output_path, file.complete, file.lines_processed, file.error_lines])
|
||||
with open(status_json_file_name, 'w') as status_json_file:
|
||||
status_json_file.write(json.dumps(simple_file_list, indent=4))
|
||||
|
||||
|
||||
# load file information from the json file and recalculate file sizes
|
||||
def load_file_list(output_folder, output_file_name):
|
||||
_, status_json_file_name = folder_helper(output_folder, output_file_name)
|
||||
if os.path.exists(status_json_file_name):
|
||||
with open(status_json_file_name, 'r') as status_json_file:
|
||||
simple_file_list = json.load(status_json_file)
|
||||
input_files = []
|
||||
for simple_file in simple_file_list:
|
||||
input_files.append(
|
||||
FileConfig(simple_file[0], simple_file[1], simple_file[2], simple_file[3], simple_file[4])
|
||||
)
|
||||
return input_files
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
# open a zst compressed ndjson file and yield lines one at a time
|
||||
# also passes back file progress
|
||||
def read_lines_zst(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**27).decode()
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
yield line, file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
reader.close()
|
||||
|
||||
|
||||
# base of each separate process. Loads a file, iterates through lines and writes out
|
||||
# the ones where save_obj() returns true. Also passes status information back to the parent via a queue
|
||||
def process_file(file, working_folder, queue, field, value):
|
||||
output_file = None
|
||||
try:
|
||||
for line, file_bytes_processed in read_lines_zst(file.input_path):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
if obj[field] == value:
|
||||
if output_file is None:
|
||||
if file.output_path is None:
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file.output_path = os.path.join(working_folder, created.strftime("%Y-%m"))
|
||||
output_file = open(file.output_path, 'w')
|
||||
output_file.write(line)
|
||||
output_file.write("\n")
|
||||
except (KeyError, json.JSONDecodeError) as err:
|
||||
file.error_lines += 1
|
||||
file.lines_processed += 1
|
||||
if file.lines_processed % 1000000 == 0:
|
||||
file.bytes_processed = file_bytes_processed
|
||||
queue.put(file)
|
||||
|
||||
if output_file is not None:
|
||||
output_file.close()
|
||||
|
||||
file.complete = True
|
||||
file.bytes_processed = file.file_size
|
||||
except Exception as err:
|
||||
file.error_message = str(err)
|
||||
queue.put(file)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description="Use multiple processes to decompress and iterate over pushshift dump files")
|
||||
parser.add_argument("input", help="The input folder to read files from")
|
||||
parser.add_argument("output", help="The output folder to store temporary files in and write the output to")
|
||||
parser.add_argument("--name", help="What to name the output file", default="pushshift")
|
||||
parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit")
|
||||
parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value", default="pushshift")
|
||||
parser.add_argument("--processes", help="Number of processes to use", default=10, type=int)
|
||||
parser.add_argument(
|
||||
"--error_rate", help=
|
||||
"Percentage as an integer from 0 to 100 of the lines where the field can be missing. For the subreddit field especially, "
|
||||
"there are a number of posts that simply don't have a subreddit attached", default=1, type=int)
|
||||
parser.add_argument("--debug", help="Enable debug logging", action='store_const', const=True, default=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.debug:
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
log.info(f"Loading files from: {args.input}")
|
||||
log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.txt'))}")
|
||||
|
||||
multiprocessing.set_start_method('spawn')
|
||||
queue = multiprocessing.Manager().Queue()
|
||||
input_files = load_file_list(args.output, args.name)
|
||||
working_folder, _ = folder_helper(args.output, args.name)
|
||||
# if the file list wasn't loaded from the json, this is the first run, find what files we need to process
|
||||
if input_files is None:
|
||||
input_files = []
|
||||
for subdir, dirs, files in os.walk(args.input):
|
||||
files.sort()
|
||||
for file_name in files:
|
||||
if file_name.endswith(".zst"):
|
||||
input_path = os.path.join(subdir, file_name)
|
||||
input_files.append(FileConfig(input_path))
|
||||
|
||||
save_file_list(input_files, args.output, args.name)
|
||||
|
||||
files_processed = 0
|
||||
total_bytes = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_processed = 0
|
||||
files_to_process = []
|
||||
# calculate the total file size for progress reports, build a list of incomplete files to process
|
||||
for file in input_files:
|
||||
total_bytes += file.file_size
|
||||
if file.complete:
|
||||
files_processed += 1
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.file_size
|
||||
else:
|
||||
files_to_process.append(file)
|
||||
|
||||
log.info(f"Processed {files_processed} of {len(input_files)} files with {(total_bytes_processed / (2**30)):.2f} of {(total_bytes / (2**30)):.2f} gigabytes")
|
||||
|
||||
start_time = time.time()
|
||||
if len(files_to_process):
|
||||
progress_queue = Queue(40)
|
||||
progress_queue.put([start_time, total_lines_processed, total_bytes_processed])
|
||||
speed_queue = Queue(40)
|
||||
for file in files_to_process:
|
||||
log.debug(f"Processing file: {file.input_path}")
|
||||
# start the workers
|
||||
with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool:
|
||||
workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, args.value) for file in files_to_process], error_callback=log.info)
|
||||
while not workers.ready():
|
||||
# loop until the workers are all done, pulling in status messages as they are sent
|
||||
file_update = queue.get()
|
||||
if file_update.error_message is not None:
|
||||
log.warning(f"File failed {file_update.input_path}: {file_update.error_message}")
|
||||
continue
|
||||
# I'm going to assume that the list of files is short enough that it's no
|
||||
# big deal to just iterate each time since that saves a bunch of work
|
||||
total_lines_processed = 0
|
||||
total_bytes_processed = 0
|
||||
total_lines_errored = 0
|
||||
files_processed = 0
|
||||
i = 0
|
||||
for file in input_files:
|
||||
if file.input_path == file_update.input_path:
|
||||
input_files[i] = file_update
|
||||
file = file_update
|
||||
total_lines_processed += file.lines_processed
|
||||
total_bytes_processed += file.bytes_processed
|
||||
total_lines_errored += file.error_lines
|
||||
files_processed += 1 if file.complete else 0
|
||||
i += 1
|
||||
if file_update.complete:
|
||||
save_file_list(input_files, args.output, args.name)
|
||||
current_time = time.time()
|
||||
progress_queue.put([current_time, total_lines_processed, total_bytes_processed])
|
||||
|
||||
first_time, first_lines, first_bytes = progress_queue.peek()
|
||||
bytes_per_second = int((total_bytes_processed - first_bytes)/(current_time - first_time))
|
||||
speed_queue.put(bytes_per_second)
|
||||
seconds_left = int((total_bytes - total_bytes_processed) / int(sum(speed_queue.list) / len(speed_queue.list)))
|
||||
minutes_left = int(seconds_left / 60)
|
||||
hours_left = int(minutes_left / 60)
|
||||
days_left = int(hours_left / 24)
|
||||
|
||||
log.info(
|
||||
f"{total_lines_processed:,} lines at {(total_lines_processed - first_lines)/(current_time - first_time):,.0f}/s, {total_lines_errored:,} errored : "
|
||||
f"{(total_bytes_processed / (2**30)):.2f} gb at {(bytes_per_second / (2**20)):,.0f} mb/s, {(total_bytes_processed / total_bytes) * 100:.0f}% : "
|
||||
f"{files_processed}/{len(input_files)} files : "
|
||||
f"{(str(days_left) + 'd ' if days_left > 0 else '')}{hours_left - (days_left * 24)}:{minutes_left - (hours_left * 60):02}:{seconds_left - (minutes_left * 60):02} remaining")
|
||||
|
||||
log.info(f"{total_lines_processed:,}, {total_lines_errored} errored : {(total_bytes_processed / (2**30)):.2f} gb, {(total_bytes_processed / total_bytes) * 100:.0f}% : {files_processed}/{len(input_files)}")
|
||||
|
||||
working_file_paths = []
|
||||
count_incomplete = 0
|
||||
# build a list of output files to combine
|
||||
for file in input_files:
|
||||
if not file.complete:
|
||||
log.info(f"File {file.input_path} is not marked as complete")
|
||||
count_incomplete += 1
|
||||
else:
|
||||
if file.error_lines > file.lines_processed * (args.error_rate * 0.01):
|
||||
log.info(
|
||||
f"File {file.input_path} has {file.error_lines:,} errored lines out of {file.lines_processed:,}, "
|
||||
f"{(file.error_lines / file.lines_processed) * (args.error_rate * 0.01):.2f}% which is above the limit of {args.error_rate}%")
|
||||
count_incomplete += 1
|
||||
elif file.output_path is not None:
|
||||
if not os.path.exists(file.output_path):
|
||||
log.info(f"Output file {file.output_path} doesn't exist")
|
||||
count_incomplete += 1
|
||||
else:
|
||||
working_file_paths.append(file.output_path)
|
||||
|
||||
if count_incomplete > 0:
|
||||
log.info(f"{count_incomplete} files were not completed, errored or don't exist, something went wrong. Aborting")
|
||||
sys.exit()
|
||||
|
||||
log.info(f"Processing complete, combining {len(working_file_paths)} result files")
|
||||
|
||||
output_lines = 0
|
||||
output_file_path = os.path.join(args.output, args.name + ".txt")
|
||||
# combine all the output files into the final results file
|
||||
with open(output_file_path, 'w') as output_file:
|
||||
i = 0
|
||||
for working_file_path in working_file_paths:
|
||||
i += 1
|
||||
log.info(f"Reading {i}/{len(working_file_paths)}")
|
||||
with open(working_file_path, 'r') as input_file:
|
||||
for line in input_file.readlines():
|
||||
output_lines += 1
|
||||
output_file.write(line)
|
||||
|
||||
log.info(f"Finished combining files, {output_lines:,} lines written to {output_file_path}")
|
||||
|
||||
|
||||
# test file sorting
|
||||
# compress results
|
||||
# example command line call in comment
|
62
scripts/iterate_folder.py
Normal file
62
scripts/iterate_folder.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import zstandard
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
from datetime import datetime
|
||||
import logging.handlers
|
||||
|
||||
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.addHandler(logging.StreamHandler())
|
||||
|
||||
|
||||
def read_lines_zst(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**28).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**24).decode() # read a 16mb chunk at a time. There are some really big comments
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
yield line, file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
reader.close()
|
||||
|
||||
|
||||
input_folder = sys.argv[1]
|
||||
input_files = []
|
||||
total_size = 0
|
||||
for subdir, dirs, files in os.walk(input_folder):
|
||||
for filename in files:
|
||||
input_path = os.path.join(subdir, filename)
|
||||
if input_path.endswith(".zst"):
|
||||
file_size = os.stat(input_path).st_size
|
||||
total_size += file_size
|
||||
input_files.append([input_path, file_size])
|
||||
|
||||
log.info(f"Processing {len(input_files)} files of {(total_size / (2**30)):.2f} gigabytes")
|
||||
|
||||
total_lines = 0
|
||||
total_bytes_processed = 0
|
||||
for input_file in input_files:
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
created = None
|
||||
for line, file_bytes_processed in read_lines_zst(input_file[0]):
|
||||
obj = json.loads(line)
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
file_lines += 1
|
||||
if file_lines == 1:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : 0% : {(total_bytes_processed / total_size) * 100:.0f}%")
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : {(file_bytes_processed / input_file[1]) * 100:.0f}% : {(total_bytes_processed / total_size) * 100:.0f}%")
|
||||
total_lines += file_lines
|
||||
total_bytes_processed += input_file[1]
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {total_lines:,} : 100% : {(total_bytes_processed / total_size) * 100:.0f}%")
|
||||
|
||||
log.info(f"Total: {total_lines}")
|
55
scripts/single_file.py
Normal file
55
scripts/single_file.py
Normal file
|
@ -0,0 +1,55 @@
|
|||
import zstandard
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
from datetime import datetime
|
||||
import logging.handlers
|
||||
|
||||
|
||||
log = logging.getLogger("bot")
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.addHandler(logging.StreamHandler())
|
||||
|
||||
|
||||
def read_lines_zst(file_name):
|
||||
with open(file_name, 'rb') as file_handle:
|
||||
buffer = ''
|
||||
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
|
||||
while True:
|
||||
chunk = reader.read(2**27).decode()
|
||||
if not chunk:
|
||||
break
|
||||
lines = (buffer + chunk).split("\n")
|
||||
|
||||
for line in lines[:-1]:
|
||||
yield line, file_handle.tell()
|
||||
|
||||
buffer = lines[-1]
|
||||
reader.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
file_path = r"\\MYCLOUDPR4100\Public\reddit\submissions\RS_2013-03.zst"
|
||||
file_size = os.stat(file_path).st_size
|
||||
file_lines = 0
|
||||
file_bytes_processed = 0
|
||||
created = None
|
||||
field = "subreddit"
|
||||
value = "wallstreetbets"
|
||||
bad_lines = 0
|
||||
try:
|
||||
for line, file_bytes_processed in read_lines_zst(file_path):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
created = datetime.utcfromtimestamp(int(obj['created_utc']))
|
||||
temp = obj[field] == value
|
||||
except (KeyError, json.JSONDecodeError) as err:
|
||||
bad_lines += 1
|
||||
file_lines += 1
|
||||
if file_lines % 100000 == 0:
|
||||
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
|
||||
except Exception as err:
|
||||
log.info(err)
|
||||
|
||||
log.info(f"Complete : {file_lines:,} : {bad_lines:,}")
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue