mirror of
https://github.com/Watchful1/PushshiftDumps.git
synced 2025-07-25 15:45:19 -04:00
Support multiple files in filter_file
This commit is contained in:
parent
0827eee152
commit
d7beff9a08
1 changed files with 44 additions and 35 deletions
|
@ -6,8 +6,8 @@ import csv
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
|
|
||||||
# put the path to the input file
|
# put the path to the input file, or a folder of files to process all of
|
||||||
input_file = r"\\MYCLOUDPR4100\Public\reddit\subreddits\formula1_submissions.zst"
|
input_file = r"\\MYCLOUDPR4100\Public\reddit_test"
|
||||||
# put the name or path to the output file. The file extension from below will be added automatically
|
# put the name or path to the output file. The file extension from below will be added automatically
|
||||||
output_file = r"\\MYCLOUDPR4100\Public\output"
|
output_file = r"\\MYCLOUDPR4100\Public\output"
|
||||||
# the format to output in, pick from the following options
|
# the format to output in, pick from the following options
|
||||||
|
@ -24,8 +24,6 @@ output_format = "csv"
|
||||||
# parent_id: only for comments, the fullname of the parent of the comment. Either another comment or the submission if it's top level
|
# parent_id: only for comments, the fullname of the parent of the comment. Either another comment or the submission if it's top level
|
||||||
single_field = None
|
single_field = None
|
||||||
# the fields in the file are different depending on whether it has comments or submissions. If we're writing a csv, we need to know which fields to write.
|
# the fields in the file are different depending on whether it has comments or submissions. If we're writing a csv, we need to know which fields to write.
|
||||||
# The filename from the torrent has which type it is, but you'll need to change this if you removed that from the filename
|
|
||||||
is_submission = "submission" in input_file
|
|
||||||
# set this to true to write out to the log every time there's a bad line, set to false if you're expecting only some of the lines to match the key
|
# set this to true to write out to the log every time there's a bad line, set to false if you're expecting only some of the lines to match the key
|
||||||
write_bad_lines = True
|
write_bad_lines = True
|
||||||
|
|
||||||
|
@ -171,50 +169,23 @@ def read_lines_zst(file_name):
|
||||||
reader.close()
|
reader.close()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
def process_file(input_file, output_file, output_format, field, values, from_date, to_date, single_field, exact_match):
|
||||||
if single_field is not None:
|
|
||||||
log.info("Single field output mode, changing output file format to txt")
|
|
||||||
output_format = "txt"
|
|
||||||
output_path = f"{output_file}.{output_format}"
|
output_path = f"{output_file}.{output_format}"
|
||||||
|
is_submission = "submission" in input_file
|
||||||
|
log.info(f"Input: {input_file} : Output: {output_path} : Is submission {is_submission}")
|
||||||
writer = None
|
writer = None
|
||||||
if output_format == "zst":
|
if output_format == "zst":
|
||||||
log.info("Output format set to zst")
|
|
||||||
handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb'))
|
handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb'))
|
||||||
elif output_format == "txt":
|
elif output_format == "txt":
|
||||||
log.info("Output format set to txt")
|
|
||||||
handle = open(output_path, 'w', encoding='UTF-8')
|
handle = open(output_path, 'w', encoding='UTF-8')
|
||||||
elif output_format == "csv":
|
elif output_format == "csv":
|
||||||
log.info("Output format set to csv")
|
|
||||||
handle = open(output_path, 'w', encoding='UTF-8', newline='')
|
handle = open(output_path, 'w', encoding='UTF-8', newline='')
|
||||||
writer = csv.writer(handle)
|
writer = csv.writer(handle)
|
||||||
else:
|
else:
|
||||||
log.error(f"Unsupported output format {output_format}")
|
log.error(f"Unsupported output format {output_format}")
|
||||||
sys.exit()
|
sys.exit()
|
||||||
log.info(f"Input file is: {input_file}")
|
|
||||||
log.info(f"Output file is: {output_path}")
|
|
||||||
|
|
||||||
if values_file is not None:
|
|
||||||
values = []
|
|
||||||
with open(values_file, 'r') as values_handle:
|
|
||||||
for value in values_handle:
|
|
||||||
values.append(value.strip().lower())
|
|
||||||
log.info(f"Loaded {len(values)} from values file {values_file}")
|
|
||||||
else:
|
|
||||||
values = [value.lower() for value in values] # convert to lowercase
|
|
||||||
|
|
||||||
log.info(f"Filtering field: {field}")
|
|
||||||
if len(values) <= 20:
|
|
||||||
log.info(f"On values: {','.join(values)}")
|
|
||||||
else:
|
|
||||||
log.info(f"On values:")
|
|
||||||
for value in values:
|
|
||||||
log.info(value)
|
|
||||||
log.info(f"Exact match {('on' if exact_match else 'off')}. Single field {single_field}. Is submission {is_submission}")
|
|
||||||
log.info(f"From date {from_date.strftime('%Y-%m-%d')} to date {to_date.strftime('%Y-%m-%d')}")
|
|
||||||
|
|
||||||
file_size = os.stat(input_file).st_size
|
file_size = os.stat(input_file).st_size
|
||||||
file_bytes_processed = 0
|
|
||||||
created = None
|
created = None
|
||||||
matched_lines = 0
|
matched_lines = 0
|
||||||
bad_lines = 0
|
bad_lines = 0
|
||||||
|
@ -269,7 +240,45 @@ if __name__ == "__main__":
|
||||||
log.warning(f"Line decoding failed: {err}")
|
log.warning(f"Line decoding failed: {err}")
|
||||||
log.warning(line)
|
log.warning(line)
|
||||||
|
|
||||||
|
|
||||||
handle.close()
|
handle.close()
|
||||||
log.info(f"Complete : {total_lines:,} : {matched_lines:,} : {bad_lines:,}")
|
log.info(f"Complete : {total_lines:,} : {matched_lines:,} : {bad_lines:,}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if single_field is not None:
|
||||||
|
log.info("Single field output mode, changing output file format to txt")
|
||||||
|
output_format = "txt"
|
||||||
|
|
||||||
|
if values_file is not None:
|
||||||
|
values = []
|
||||||
|
with open(values_file, 'r') as values_handle:
|
||||||
|
for value in values_handle:
|
||||||
|
values.append(value.strip().lower())
|
||||||
|
log.info(f"Loaded {len(values)} from values file {values_file}")
|
||||||
|
else:
|
||||||
|
values = [value.lower() for value in values] # convert to lowercase
|
||||||
|
|
||||||
|
log.info(f"Filtering field: {field}")
|
||||||
|
if len(values) <= 20:
|
||||||
|
log.info(f"On values: {','.join(values)}")
|
||||||
|
else:
|
||||||
|
log.info(f"On values:")
|
||||||
|
for value in values:
|
||||||
|
log.info(value)
|
||||||
|
log.info(f"Exact match {('on' if exact_match else 'off')}. Single field {single_field}.")
|
||||||
|
log.info(f"From date {from_date.strftime('%Y-%m-%d')} to date {to_date.strftime('%Y-%m-%d')}")
|
||||||
|
log.info(f"Output format set to {output_format}")
|
||||||
|
|
||||||
|
input_files = []
|
||||||
|
if os.path.isdir(input_file):
|
||||||
|
if not os.path.exists(output_file):
|
||||||
|
os.makedirs(output_file)
|
||||||
|
for file in os.listdir(input_file):
|
||||||
|
if not os.path.isdir(file) and file.endswith(".zst"):
|
||||||
|
input_name = os.path.splitext(os.path.splitext(os.path.basename(file))[0])[0]
|
||||||
|
input_files.append((os.path.join(input_file, file), os.path.join(output_file, input_name)))
|
||||||
|
else:
|
||||||
|
input_files.append((input_file, output_file))
|
||||||
|
log.info(f"Processing {len(input_files)} files")
|
||||||
|
for file_in, file_out in input_files:
|
||||||
|
process_file(file_in, file_out, output_format, field, values, from_date, to_date, single_field, exact_match)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue