From e18d2c3ea278f9e6208423a3ba2295ca9178355f Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Fri, 8 Nov 2019 16:44:21 +0800 Subject: [PATCH] Add CLI tests in --- onionshare/common.py | 33 ++-- tests2/gui_base_test.py | 2 +- tests2/test_cli.py | 75 +++++++++ tests2/test_cli_common.py | 312 ++++++++++++++++++++++++++++++++++++ tests2/test_cli_settings.py | 158 ++++++++++++++++++ tests2/test_cli_strings.py | 65 ++++++++ tests2/test_cli_web.py | 241 ++++++++++++++++++++++++++++ 7 files changed, 869 insertions(+), 17 deletions(-) create mode 100644 tests2/test_cli.py create mode 100644 tests2/test_cli_common.py create mode 100644 tests2/test_cli_settings.py create mode 100644 tests2/test_cli_strings.py create mode 100644 tests2/test_cli_web.py diff --git a/onionshare/common.py b/onionshare/common.py index cf713818..e85403eb 100644 --- a/onionshare/common.py +++ b/onionshare/common.py @@ -159,23 +159,24 @@ class Common: """ Returns the path of the OnionShare data directory. """ - if getattr(sys, "onionshare_test_mode", False): - onionshare_data_dir = os.path.expanduser("~/.config/onionshare-testdata") - else: - if self.platform == "Windows": - try: - appdata = os.environ["APPDATA"] - onionshare_data_dir = f"{appdata}\\OnionShare" - except: - # If for some reason we don't have the 'APPDATA' environment variable - # (like running tests in Linux while pretending to be in Windows) - onionshare_data_dir = os.path.expanduser("~/.config/onionshare") - elif self.platform == "Darwin": - onionshare_data_dir = os.path.expanduser( - "~/Library/Application Support/OnionShare" - ) - else: + if self.platform == "Windows": + try: + appdata = os.environ["APPDATA"] + onionshare_data_dir = f"{appdata}\\OnionShare" + except: + # If for some reason we don't have the 'APPDATA' environment variable + # (like running tests in Linux while pretending to be in Windows) onionshare_data_dir = os.path.expanduser("~/.config/onionshare") + elif self.platform == "Darwin": + onionshare_data_dir = os.path.expanduser( + "~/Library/Application Support/OnionShare" + ) + else: + onionshare_data_dir = os.path.expanduser("~/.config/onionshare") + + # Modify the data dir if running tests + if getattr(sys, "onionshare_test_mode", False): + onionshare_data_dir += "-testdata" os.makedirs(onionshare_data_dir, 0o700, True) return onionshare_data_dir diff --git a/tests2/gui_base_test.py b/tests2/gui_base_test.py index 66f5e19d..2cd9c7b8 100644 --- a/tests2/gui_base_test.py +++ b/tests2/gui_base_test.py @@ -26,7 +26,7 @@ from onionshare_gui.tab.mode.website_mode import WebsiteMode class GuiBaseTest(unittest.TestCase): @classmethod def setUpClass(cls): - common = Common() + common = Common(verbose=True) qtapp = Application(common) common.gui = GuiCommon(common, qtapp, local_only=True) cls.gui = MainWindow(common, filenames=None) diff --git a/tests2/test_cli.py b/tests2/test_cli.py new file mode 100644 index 00000000..0addf6d5 --- /dev/null +++ b/tests2/test_cli.py @@ -0,0 +1,75 @@ +""" +OnionShare | https://onionshare.org/ + +Copyright (C) 2014-2018 Micah Lee + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import os + +import pytest + +from onionshare import OnionShare +from onionshare.common import Common +from onionshare.mode_settings import ModeSettings + + +class MyOnion: + def __init__(self): + self.auth_string = "TestHidServAuth" + self.private_key = "" + self.scheduled_key = None + + @staticmethod + def start_onion_service(self, await_publication=True, save_scheduled_key=False): + return "test_service_id.onion" + + +@pytest.fixture +def onionshare_obj(): + common = Common() + return OnionShare(common, MyOnion()) + + +@pytest.fixture +def mode_settings_obj(): + common = Common() + return ModeSettings(common) + + +class TestOnionShare: + def test_init(self, onionshare_obj): + assert onionshare_obj.hidserv_dir is None + assert onionshare_obj.onion_host is None + assert onionshare_obj.cleanup_filenames == [] + assert onionshare_obj.local_only is False + + def test_start_onion_service(self, onionshare_obj, mode_settings_obj): + onionshare_obj.start_onion_service(mode_settings_obj) + assert 17600 <= onionshare_obj.port <= 17650 + assert onionshare_obj.onion_host == "test_service_id.onion" + + def test_start_onion_service_local_only(self, onionshare_obj, mode_settings_obj): + onionshare_obj.local_only = True + onionshare_obj.start_onion_service(mode_settings_obj) + assert onionshare_obj.onion_host == "127.0.0.1:{}".format(onionshare_obj.port) + + def test_cleanup(self, onionshare_obj, temp_dir_1024, temp_file_1024): + onionshare_obj.cleanup_filenames = [temp_dir_1024, temp_file_1024] + onionshare_obj.cleanup() + + assert os.path.exists(temp_dir_1024) is False + assert os.path.exists(temp_dir_1024) is False + assert onionshare_obj.cleanup_filenames == [] diff --git a/tests2/test_cli_common.py b/tests2/test_cli_common.py new file mode 100644 index 00000000..1f230295 --- /dev/null +++ b/tests2/test_cli_common.py @@ -0,0 +1,312 @@ +""" +OnionShare | https://onionshare.org/ + +Copyright (C) 2014-2018 Micah Lee + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import contextlib +import inspect +import io +import os +import random +import re +import socket +import sys +import zipfile + +import pytest + +LOG_MSG_REGEX = re.compile( + r""" + ^\[Jun\ 06\ 2013\ 11:05:00\] + \ TestModule\.\.dummy_func + \ at\ 0x[a-f0-9]+>(:\ TEST_MSG)?$""", + re.VERBOSE, +) +PASSWORD_REGEX = re.compile(r"^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$") + + +# TODO: Improve the Common tests to test it all as a single class + + +class TestBuildPassword: + @pytest.mark.parametrize( + "test_input,expected", + ( + # VALID, two lowercase words, separated by a hyphen + ("syrup-enzyme", True), + ("caution-friday", True), + # VALID, two lowercase words, with one hyphenated compound word + ("drop-down-thimble", True), + ("unmixed-yo-yo", True), + # VALID, two lowercase hyphenated compound words, separated by hyphen + ("yo-yo-drop-down", True), + ("felt-tip-t-shirt", True), + ("hello-world", True), + # INVALID + ("Upper-Case", False), + ("digits-123", False), + ("too-many-hyphens-", False), + ("symbols-!@#$%", False), + ), + ) + def test_build_password_regex(self, test_input, expected): + """ Test that `PASSWORD_REGEX` accounts for the following patterns + + There are a few hyphenated words in `wordlist.txt`: + * drop-down + * felt-tip + * t-shirt + * yo-yo + + These words cause a few extra potential password patterns: + * word-word + * hyphenated-word-word + * word-hyphenated-word + * hyphenated-word-hyphenated-word + """ + + assert bool(PASSWORD_REGEX.match(test_input)) == expected + + def test_build_password_unique(self, common_obj, sys_onionshare_dev_mode): + assert common_obj.build_password() != common_obj.build_password() + + +class TestDirSize: + def test_temp_dir_size(self, common_obj, temp_dir_1024_delete): + """ dir_size() should return the total size (in bytes) of all files + in a particular directory. + """ + + assert common_obj.dir_size(temp_dir_1024_delete) == 1024 + + +class TestEstimatedTimeRemaining: + @pytest.mark.parametrize( + "test_input,expected", + ( + ((2, 676, 12), "8h14m16s"), + ((14, 1049, 30), "1h26m15s"), + ((21, 450, 1), "33m42s"), + ((31, 1115, 80), "11m39s"), + ((336, 989, 32), "2m12s"), + ((603, 949, 38), "36s"), + ((971, 1009, 83), "1s"), + ), + ) + def test_estimated_time_remaining( + self, common_obj, test_input, expected, time_time_100 + ): + assert common_obj.estimated_time_remaining(*test_input) == expected + + @pytest.mark.parametrize( + "test_input", + ( + (10, 20, 100), # if `time_elapsed == 0` + (0, 37, 99), # if `download_rate == 0` + ), + ) + def test_raises_zero_division_error(self, common_obj, test_input, time_time_100): + with pytest.raises(ZeroDivisionError): + common_obj.estimated_time_remaining(*test_input) + + +class TestFormatSeconds: + @pytest.mark.parametrize( + "test_input,expected", + ( + (0, "0s"), + (26, "26s"), + (60, "1m"), + (947.35, "15m47s"), + (1847, "30m47s"), + (2193.94, "36m34s"), + (3600, "1h"), + (13426.83, "3h43m47s"), + (16293, "4h31m33s"), + (18392.14, "5h6m32s"), + (86400, "1d"), + (129674, "1d12h1m14s"), + (56404.12, "15h40m4s"), + ), + ) + def test_format_seconds(self, common_obj, test_input, expected): + assert common_obj.format_seconds(test_input) == expected + + # TODO: test negative numbers? + @pytest.mark.parametrize("test_input", ("string", lambda: None, [], {}, set())) + def test_invalid_input_types(self, common_obj, test_input): + with pytest.raises(TypeError): + common_obj.format_seconds(test_input) + + +class TestGetAvailablePort: + @pytest.mark.parametrize( + "port_min,port_max", + ((random.randint(1024, 1500), random.randint(1800, 2048)) for _ in range(50)), + ) + def test_returns_an_open_port(self, common_obj, port_min, port_max): + """ get_available_port() should return an open port within the range """ + + port = common_obj.get_available_port(port_min, port_max) + assert port_min <= port <= port_max + with socket.socket() as tmpsock: + tmpsock.bind(("127.0.0.1", port)) + + +class TestGetPlatform: + def test_darwin(self, platform_darwin, common_obj): + assert common_obj.platform == "Darwin" + + def test_linux(self, platform_linux, common_obj): + assert common_obj.platform == "Linux" + + def test_windows(self, platform_windows, common_obj): + assert common_obj.platform == "Windows" + + +# TODO: double-check these tests +class TestGetResourcePath: + def test_onionshare_dev_mode(self, common_obj, sys_onionshare_dev_mode): + prefix = os.path.join( + os.path.dirname( + os.path.dirname( + os.path.abspath(inspect.getfile(inspect.currentframe())) + ) + ), + "share", + ) + assert common_obj.get_resource_path( + os.path.join(prefix, "test_filename") + ) == os.path.join(prefix, "test_filename") + + def test_linux(self, common_obj, platform_linux, sys_argv_sys_prefix): + prefix = os.path.join(sys.prefix, "share/onionshare") + assert common_obj.get_resource_path( + os.path.join(prefix, "test_filename") + ) == os.path.join(prefix, "test_filename") + + def test_frozen_darwin(self, common_obj, platform_darwin, sys_frozen, sys_meipass): + prefix = os.path.join(sys._MEIPASS, "share") + assert common_obj.get_resource_path( + os.path.join(prefix, "test_filename") + ) == os.path.join(prefix, "test_filename") + + +class TestGetTorPaths: + # @pytest.mark.skipif(sys.platform != 'Darwin', reason='requires MacOS') ? + def test_get_tor_paths_darwin( + self, platform_darwin, common_obj, sys_frozen, sys_meipass + ): + base_path = os.path.dirname( + os.path.dirname(os.path.dirname(common_obj.get_resource_path(""))) + ) + tor_path = os.path.join(base_path, "Resources", "Tor", "tor") + tor_geo_ip_file_path = os.path.join(base_path, "Resources", "Tor", "geoip") + tor_geo_ipv6_file_path = os.path.join(base_path, "Resources", "Tor", "geoip6") + obfs4proxy_file_path = os.path.join(base_path, "Resources", "Tor", "obfs4proxy") + assert common_obj.get_tor_paths() == ( + tor_path, + tor_geo_ip_file_path, + tor_geo_ipv6_file_path, + obfs4proxy_file_path, + ) + + # @pytest.mark.skipif(sys.platform != 'Linux', reason='requires Linux') ? + def test_get_tor_paths_linux(self, platform_linux, common_obj): + assert common_obj.get_tor_paths() == ( + "/usr/bin/tor", + "/usr/share/tor/geoip", + "/usr/share/tor/geoip6", + "/usr/bin/obfs4proxy", + ) + + # @pytest.mark.skipif(sys.platform != 'Windows', reason='requires Windows') ? + def test_get_tor_paths_windows(self, platform_windows, common_obj, sys_frozen): + base_path = os.path.join( + os.path.dirname(os.path.dirname(common_obj.get_resource_path(""))), "tor" + ) + tor_path = os.path.join(os.path.join(base_path, "Tor"), "tor.exe") + obfs4proxy_file_path = os.path.join( + os.path.join(base_path, "Tor"), "obfs4proxy.exe" + ) + tor_geo_ip_file_path = os.path.join( + os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip" + ) + tor_geo_ipv6_file_path = os.path.join( + os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip6" + ) + assert common_obj.get_tor_paths() == ( + tor_path, + tor_geo_ip_file_path, + tor_geo_ipv6_file_path, + obfs4proxy_file_path, + ) + + +class TestHumanReadableFilesize: + @pytest.mark.parametrize( + "test_input,expected", + ( + (1024 ** 0, "1.0 B"), + (1024 ** 1, "1.0 KiB"), + (1024 ** 2, "1.0 MiB"), + (1024 ** 3, "1.0 GiB"), + (1024 ** 4, "1.0 TiB"), + (1024 ** 5, "1.0 PiB"), + (1024 ** 6, "1.0 EiB"), + (1024 ** 7, "1.0 ZiB"), + (1024 ** 8, "1.0 YiB"), + ), + ) + def test_human_readable_filesize(self, common_obj, test_input, expected): + assert common_obj.human_readable_filesize(test_input) == expected + + +class TestLog: + @pytest.mark.parametrize( + "test_input", + ( + ( + "[Jun 06 2013 11:05:00]" + " TestModule..dummy_func" + " at 0xdeadbeef>" + ), + ( + "[Jun 06 2013 11:05:00]" + " TestModule..dummy_func" + " at 0xdeadbeef>: TEST_MSG" + ), + ), + ) + def test_log_msg_regex(self, test_input): + assert bool(LOG_MSG_REGEX.match(test_input)) + + def test_output(self, common_obj, time_strftime): + def dummy_func(): + pass + + common_obj.verbose = True + + # From: https://stackoverflow.com/questions/1218933 + with io.StringIO() as buf, contextlib.redirect_stdout(buf): + common_obj.log("TestModule", dummy_func) + common_obj.log("TestModule", dummy_func, "TEST_MSG") + output = buf.getvalue() + + line_one, line_two, _ = output.split("\n") + assert LOG_MSG_REGEX.match(line_one) + assert LOG_MSG_REGEX.match(line_two) diff --git a/tests2/test_cli_settings.py b/tests2/test_cli_settings.py new file mode 100644 index 00000000..14269490 --- /dev/null +++ b/tests2/test_cli_settings.py @@ -0,0 +1,158 @@ +""" +OnionShare | https://onionshare.org/ + +Copyright (C) 2014-2018 Micah Lee + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import json +import os +import tempfile + +import pytest + +from onionshare import common, settings, strings + + +@pytest.fixture +def settings_obj(sys_onionshare_dev_mode, platform_linux): + _common = common.Common() + _common.version = "DUMMY_VERSION_1.2.3" + return settings.Settings(_common) + + +class TestSettings: + def test_init(self, settings_obj): + expected_settings = { + "version": "DUMMY_VERSION_1.2.3", + "connection_type": "bundled", + "control_port_address": "127.0.0.1", + "control_port_port": 9051, + "socks_address": "127.0.0.1", + "socks_port": 9050, + "socket_file_path": "/var/run/tor/control", + "auth_type": "no_auth", + "auth_password": "", + "use_autoupdate": True, + "autoupdate_timestamp": None, + "no_bridges": True, + "tor_bridges_use_obfs4": False, + "tor_bridges_use_meek_lite_azure": False, + "tor_bridges_use_custom_bridges": "", + "persistent_tabs": [], + } + for key in settings_obj._settings: + # Skip locale, it will not always default to the same thing + if key != "locale": + assert settings_obj._settings[key] == settings_obj.default_settings[key] + assert settings_obj._settings[key] == expected_settings[key] + + def test_fill_in_defaults(self, settings_obj): + del settings_obj._settings["version"] + settings_obj.fill_in_defaults() + assert settings_obj._settings["version"] == "DUMMY_VERSION_1.2.3" + + def test_load(self, settings_obj): + custom_settings = { + "version": "CUSTOM_VERSION", + "socks_port": 9999, + "use_stealth": True, + } + tmp_file, tmp_file_path = tempfile.mkstemp() + with open(tmp_file, "w") as f: + json.dump(custom_settings, f) + settings_obj.filename = tmp_file_path + settings_obj.load() + + assert settings_obj._settings["version"] == "CUSTOM_VERSION" + assert settings_obj._settings["socks_port"] == 9999 + assert settings_obj._settings["use_stealth"] is True + + os.remove(tmp_file_path) + assert os.path.exists(tmp_file_path) is False + + def test_save(self, monkeypatch, settings_obj): + monkeypatch.setattr(strings, "_", lambda _: "") + + settings_filename = "default_settings.json" + tmp_dir = tempfile.gettempdir() + settings_path = os.path.join(tmp_dir, settings_filename) + settings_obj.filename = settings_path + settings_obj.save() + with open(settings_path, "r") as f: + settings = json.load(f) + + assert settings_obj._settings == settings + + os.remove(settings_path) + assert os.path.exists(settings_path) is False + + def test_get(self, settings_obj): + assert settings_obj.get("version") == "DUMMY_VERSION_1.2.3" + assert settings_obj.get("connection_type") == "bundled" + assert settings_obj.get("control_port_address") == "127.0.0.1" + assert settings_obj.get("control_port_port") == 9051 + assert settings_obj.get("socks_address") == "127.0.0.1" + assert settings_obj.get("socks_port") == 9050 + assert settings_obj.get("socket_file_path") == "/var/run/tor/control" + assert settings_obj.get("auth_type") == "no_auth" + assert settings_obj.get("auth_password") == "" + assert settings_obj.get("use_autoupdate") is True + assert settings_obj.get("autoupdate_timestamp") is None + assert settings_obj.get("autoupdate_timestamp") is None + assert settings_obj.get("no_bridges") is True + assert settings_obj.get("tor_bridges_use_obfs4") is False + assert settings_obj.get("tor_bridges_use_meek_lite_azure") is False + assert settings_obj.get("tor_bridges_use_custom_bridges") == "" + + def test_set_version(self, settings_obj): + settings_obj.set("version", "CUSTOM_VERSION") + assert settings_obj._settings["version"] == "CUSTOM_VERSION" + + def test_set_control_port_port(self, settings_obj): + settings_obj.set("control_port_port", 999) + assert settings_obj._settings["control_port_port"] == 999 + + settings_obj.set("control_port_port", "NON_INTEGER") + assert settings_obj._settings["control_port_port"] == 9051 + + def test_set_socks_port(self, settings_obj): + settings_obj.set("socks_port", 888) + assert settings_obj._settings["socks_port"] == 888 + + settings_obj.set("socks_port", "NON_INTEGER") + assert settings_obj._settings["socks_port"] == 9050 + + def test_filename_darwin(self, monkeypatch, platform_darwin): + obj = settings.Settings(common.Common()) + assert obj.filename == os.path.expanduser( + "~/Library/Application Support/OnionShare-testdata/onionshare.json" + ) + + def test_filename_linux(self, monkeypatch, platform_linux): + obj = settings.Settings(common.Common()) + assert obj.filename == os.path.expanduser( + "~/.config/onionshare-testdata/onionshare.json" + ) + + def test_set_custom_bridge(self, settings_obj): + settings_obj.set( + "tor_bridges_use_custom_bridges", + "Bridge 45.3.20.65:9050 21300AD88890A49C429A6CB9959CFD44490A8F6E", + ) + assert ( + settings_obj._settings["tor_bridges_use_custom_bridges"] + == "Bridge 45.3.20.65:9050 21300AD88890A49C429A6CB9959CFD44490A8F6E" + ) diff --git a/tests2/test_cli_strings.py b/tests2/test_cli_strings.py new file mode 100644 index 00000000..7ad65191 --- /dev/null +++ b/tests2/test_cli_strings.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +""" +OnionShare | https://onionshare.org/ + +Copyright (C) 2014-2018 Micah Lee + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import types + +import pytest + +from onionshare import strings +from onionshare.settings import Settings + +# # Stub get_resource_path so it finds the correct path while running tests +# def get_resource_path(filename): +# resources_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'share') +# path = os.path.join(resources_dir, filename) +# return path +# common.get_resource_path = get_resource_path + + +def test_underscore_is_function(): + assert callable(strings._) and isinstance(strings._, types.FunctionType) + + +class TestLoadStrings: + def test_load_strings_defaults_to_english( + self, common_obj, locale_en, sys_onionshare_dev_mode + ): + """ load_strings() loads English by default """ + common_obj.settings = Settings(common_obj) + strings.load_strings(common_obj) + assert strings._("preparing_files") == "Compressing files." + + def test_load_strings_loads_other_languages( + self, common_obj, locale_fr, sys_onionshare_dev_mode + ): + """ load_strings() loads other languages in different locales """ + common_obj.settings = Settings(common_obj) + common_obj.settings.set("locale", "fr") + strings.load_strings(common_obj) + assert strings._("preparing_files") == "Compression des fichiers." + + def test_load_invalid_locale( + self, common_obj, locale_invalid, sys_onionshare_dev_mode + ): + """ load_strings() raises a KeyError for an invalid locale """ + with pytest.raises(KeyError): + common_obj.settings = Settings(common_obj) + common_obj.settings.set("locale", "XX") + strings.load_strings(common_obj) diff --git a/tests2/test_cli_web.py b/tests2/test_cli_web.py new file mode 100644 index 00000000..2ce2f758 --- /dev/null +++ b/tests2/test_cli_web.py @@ -0,0 +1,241 @@ +""" +OnionShare | https://onionshare.org/ + +Copyright (C) 2014-2018 Micah Lee + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import contextlib +import inspect +import io +import os +import random +import re +import socket +import sys +import zipfile +import tempfile +import base64 + +import pytest +from werkzeug.datastructures import Headers + +from onionshare.common import Common +from onionshare import strings +from onionshare.web import Web +from onionshare.settings import Settings +from onionshare.mode_settings import ModeSettings + +DEFAULT_ZW_FILENAME_REGEX = re.compile(r"^onionshare_[a-z2-7]{6}.zip$") +RANDOM_STR_REGEX = re.compile(r"^[a-z2-7]+$") + + +def web_obj(common_obj, mode, num_files=0): + """ Creates a Web object, in either share mode or receive mode, ready for testing """ + common_obj.settings = Settings(common_obj) + strings.load_strings(common_obj) + mode_settings = ModeSettings(common_obj) + web = Web(common_obj, False, mode_settings, mode) + web.generate_password() + web.running = True + + web.app.testing = True + + # Share mode + if mode == "share": + # Add files + files = [] + for _ in range(num_files): + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + tmp_file.write(b"*" * 1024) + files.append(tmp_file.name) + web.share_mode.set_file_info(files) + # Receive mode + else: + pass + + return web + + +class TestWeb: + def test_share_mode(self, common_obj): + web = web_obj(common_obj, "share", 3) + assert web.mode == "share" + with web.app.test_client() as c: + # Load / without auth + res = c.get("/") + res.get_data() + assert res.status_code == 401 + + # Load / with invalid auth + res = c.get("/", headers=self._make_auth_headers("invalid")) + res.get_data() + assert res.status_code == 401 + + # Load / with valid auth + res = c.get("/", headers=self._make_auth_headers(web.password)) + res.get_data() + assert res.status_code == 200 + + # Download + res = c.get("/download", headers=self._make_auth_headers(web.password)) + res.get_data() + assert res.status_code == 200 + assert res.mimetype == "application/zip" + + def test_share_mode_autostop_sharing_on(self, common_obj, temp_file_1024): + web = web_obj(common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", True) + + assert web.running == True + + with web.app.test_client() as c: + # Download the first time + res = c.get("/download", headers=self._make_auth_headers(web.password)) + res.get_data() + assert res.status_code == 200 + assert res.mimetype == "application/zip" + + assert web.running == False + + def test_share_mode_autostop_sharing_off(self, common_obj, temp_file_1024): + web = web_obj(common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + + assert web.running == True + + with web.app.test_client() as c: + # Download the first time + res = c.get("/download", headers=self._make_auth_headers(web.password)) + res.get_data() + assert res.status_code == 200 + assert res.mimetype == "application/zip" + assert web.running == True + + def test_receive_mode(self, common_obj): + web = web_obj(common_obj, "receive") + assert web.mode == "receive" + + with web.app.test_client() as c: + # Load / without auth + res = c.get("/") + res.get_data() + assert res.status_code == 401 + + # Load / with invalid auth + res = c.get("/", headers=self._make_auth_headers("invalid")) + res.get_data() + assert res.status_code == 401 + + # Load / with valid auth + res = c.get("/", headers=self._make_auth_headers(web.password)) + res.get_data() + assert res.status_code == 200 + + def test_public_mode_on(self, common_obj): + web = web_obj(common_obj, "receive") + web.settings.set("general", "public", True) + + with web.app.test_client() as c: + # Loading / should work without auth + res = c.get("/") + data1 = res.get_data() + assert res.status_code == 200 + + def test_public_mode_off(self, common_obj): + web = web_obj(common_obj, "receive") + web.settings.set("general", "public", False) + + with web.app.test_client() as c: + # Load / without auth + res = c.get("/") + res.get_data() + assert res.status_code == 401 + + # But static resources should work without auth + res = c.get(f"{web.static_url_path}/css/style.css") + res.get_data() + assert res.status_code == 200 + + # Load / with valid auth + res = c.get("/", headers=self._make_auth_headers(web.password)) + res.get_data() + assert res.status_code == 200 + + def _make_auth_headers(self, password): + auth = base64.b64encode(b"onionshare:" + password.encode()).decode() + h = Headers() + h.add("Authorization", "Basic " + auth) + return h + + +class TestZipWriterDefault: + @pytest.mark.parametrize( + "test_input", + ( + f"onionshare_{''.join(random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6))}.zip" + for _ in range(50) + ), + ) + def test_default_zw_filename_regex(self, test_input): + assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input)) + + def test_zw_filename(self, default_zw): + zw_filename = os.path.basename(default_zw.zip_filename) + assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename)) + + def test_zipfile_filename_matches_zipwriter_filename(self, default_zw): + assert default_zw.z.filename == default_zw.zip_filename + + def test_zipfile_allow_zip64(self, default_zw): + assert default_zw.z._allowZip64 is True + + def test_zipfile_mode(self, default_zw): + assert default_zw.z.mode == "w" + + def test_callback(self, default_zw): + assert default_zw.processed_size_callback(None) is None + + def test_add_file(self, default_zw, temp_file_1024_delete): + default_zw.add_file(temp_file_1024_delete) + zipfile_info = default_zw.z.getinfo(os.path.basename(temp_file_1024_delete)) + + assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED + assert zipfile_info.file_size == 1024 + + def test_add_directory(self, temp_dir_1024_delete, default_zw): + previous_size = default_zw._size # size before adding directory + default_zw.add_dir(temp_dir_1024_delete) + assert default_zw._size == previous_size + 1024 + + +class TestZipWriterCustom: + @pytest.mark.parametrize( + "test_input", + ( + Common.random_string( + random.randint(2, 50), random.choice((None, random.randint(2, 50))) + ) + for _ in range(50) + ), + ) + def test_random_string_regex(self, test_input): + assert bool(RANDOM_STR_REGEX.match(test_input)) + + def test_custom_filename(self, custom_zw): + assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename)) + + def test_custom_callback(self, custom_zw): + assert custom_zw.processed_size_callback(None) == "custom_callback"