From 50be918a1cc3a9be6025fa159fc841c877fc5df1 Mon Sep 17 00:00:00 2001 From: Watchful1 Date: Thu, 14 Oct 2021 19:33:25 -0700 Subject: [PATCH] Add support for multiple values --- .gitignore | 1 + Pipfile | 2 ++ Pipfile.lock | 46 +++++++++++++++++++++++++- scripts/combine_folder_multiprocess.py | 30 ++++++++++++++--- 4 files changed, 74 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 4a71093..3b42084 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .idea/* logs/* __pycache__/* +personal/* *.db *.ini *.txt \ No newline at end of file diff --git a/Pipfile b/Pipfile index 03819f5..8b0fc33 100644 --- a/Pipfile +++ b/Pipfile @@ -5,6 +5,8 @@ name = "pypi" [packages] zstandard = "*" +discord-logging = {editable = true, git = "https://github.com/Watchful1/DiscordLogging.git"} +requests = "*" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index 2f92ed1..1f1039b 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "03662c664bce0cd9fad7ea3062bb09bfe999f755a00e0f4de720c66955ccae62" + "sha256": "0bcdd1b8971e4dded4e9058aaf7849a5a7fe8fa127a4d37455e897d16aa7cae0" }, "pipfile-spec": 6, "requires": { @@ -16,6 +16,50 @@ ] }, "default": { + "certifi": { + "hashes": [ + "sha256:2bbf76fd432960138b3ef6dda3dde0544f27cbf8546c458e60baf371917ba9ee", + "sha256:50b1e4f8446b06f41be7dd6338db18e0990601dce795c2b1686458aa7e8fa7d8" + ], + "version": "==2021.5.30" + }, + "charset-normalizer": { + "hashes": [ + "sha256:0c8911edd15d19223366a194a513099a302055a962bca2cec0f54b8b63175d8b", + "sha256:f23667ebe1084be45f6ae0538e4a5a865206544097e4e8bbcacf42cd02a348f3" + ], + "markers": "python_version >= '3'", + "version": "==2.0.4" + }, + "discord-logging": { + "editable": true, + "git": "https://github.com/Watchful1/DiscordLogging.git", + "ref": "9e543194d3612dde92eae1203d0ea143a7963f6e" + }, + "idna": { + "hashes": [ + "sha256:14475042e284991034cb48e06f6851428fb14c4dc953acd9be9a5e95c7b6dd7a", + "sha256:467fbad99067910785144ce333826c71fb0e63a425657295239737f7ecd125f3" + ], + "markers": "python_version >= '3'", + "version": "==3.2" + }, + "requests": { + "hashes": [ + "sha256:6c1246513ecd5ecd4528a0906f910e8f0f9c6b8ec72030dc9fd154dc1a6efd24", + "sha256:b8aa58f8cf793ffd8782d3d8cb19e66ef36f7aba4353eec859e74678b01b07a7" + ], + "index": "pypi", + "version": "==2.26.0" + }, + "urllib3": { + "hashes": [ + "sha256:39fb8672126159acb139a7718dd10806104dec1e2f0f6c88aab05d17df10c8d4", + "sha256:f57b4c16c62fa2760b7e3d97c35b255512fb6b59a259730f36ba32ce9f8e342f" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", + "version": "==1.26.6" + }, "zstandard": { "hashes": [ "sha256:1c5ef399f81204fbd9f0df3debf80389fd8aa9660fe1746d37c80b0d45f809e9", diff --git a/scripts/combine_folder_multiprocess.py b/scripts/combine_folder_multiprocess.py index 3a49fb0..9e6d3a7 100644 --- a/scripts/combine_folder_multiprocess.py +++ b/scripts/combine_folder_multiprocess.py @@ -130,13 +130,20 @@ def read_lines_zst(file_name): # base of each separate process. Loads a file, iterates through lines and writes out # the ones where the `field` of the object matches `value`. Also passes status # information back to the parent via a queue -def process_file(file, working_folder, queue, field, value): +def process_file(file, working_folder, queue, field, value, values): output_file = None try: for line, file_bytes_processed in read_lines_zst(file.input_path): try: obj = json.loads(line) - if obj[field] == value: + matched = False + if value is not None: + if obj[field] == value: + matched = True + elif obj[field] in values: + matched = True + + if matched: if output_file is None: if file.output_path is None: created = datetime.utcfromtimestamp(int(obj['created_utc'])) @@ -167,7 +174,7 @@ if __name__ == '__main__': parser.add_argument("output", help="The output folder to store temporary files in and write the output to") parser.add_argument("--name", help="What to name the output file", default="pushshift") parser.add_argument("--field", help="When deciding what lines to keep, use this field for comparisons", default="subreddit") - parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value", default="pushshift") + parser.add_argument("--value", help="When deciding what lines to keep, compare the field to this value. Supports a comma separated list", default="pushshift") parser.add_argument("--processes", help="Number of processes to use", default=10, type=int) parser.add_argument( "--error_rate", help= @@ -182,6 +189,21 @@ if __name__ == '__main__': log.info(f"Loading files from: {args.input}") log.info(f"Writing output to: {(os.path.join(args.output, args.name + '.zst'))}") + value_strings = args.value.split(",") + value = None + values = None + if len(value_strings) > 1: + values = set() + for value_inner in value_strings: + values.add(value_inner) + log.info(f"Checking field {args.field} for values {(', '.join(value_strings))}") + elif len(value_strings) == 1: + value = value_strings[0] + log.info(f"Checking field {args.field} for value {value}") + else: + log.info(f"Invalid value specified, aborting: {args.value}") + sys.exit() + multiprocessing.set_start_method('spawn') queue = multiprocessing.Manager().Queue() input_files = load_file_list(args.output, args.name) @@ -226,7 +248,7 @@ if __name__ == '__main__': log.debug(f"Processing file: {file.input_path}") # start the workers with multiprocessing.Pool(processes=min(args.processes, len(files_to_process))) as pool: - workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, args.value) for file in files_to_process], error_callback=log.info) + workers = pool.starmap_async(process_file, [(file, working_folder, queue, args.field, value, values) for file in files_to_process], error_callback=log.info) while not workers.ready(): # loop until the workers are all done, pulling in status messages as they are sent file_update = queue.get()