Add onionshare CLI to cli folder, move GUI to desktop folder, and start refactoring it to work with briefcase

This commit is contained in:
Micah Lee 2020-10-12 22:40:55 -07:00
parent 93e90c89ae
commit a54f99adf6
583 changed files with 14871 additions and 474 deletions

0
cli/tests/__init__.py Normal file
View file

198
cli/tests/conftest.py Normal file
View file

@ -0,0 +1,198 @@
import sys
# Force tests to look for resources in the source code tree
sys.onionshare_dev_mode = True
# Let OnionShare know the tests are running, to avoid colliding with settings files
sys.onionshare_test_mode = True
import os
import shutil
import tempfile
import pytest
from onionshare_cli import common, web
# The temporary directory for CLI tests
test_temp_dir = None
def pytest_addoption(parser):
parser.addoption(
"--runtor", action="store_true", default=False, help="run tor tests"
)
def pytest_collection_modifyitems(config, items):
if not config.getoption("--runtor"):
# --runtor given in cli: do not skip tor tests
skip_tor = pytest.mark.skip(reason="need --runtor option to run")
for item in items:
if "tor" in item.keywords:
item.add_marker(skip_tor)
@pytest.fixture
def temp_dir():
"""Creates a persistent temporary directory for the CLI tests to use"""
global test_temp_dir
if not test_temp_dir:
test_temp_dir = tempfile.mkdtemp()
return test_temp_dir
@pytest.fixture
def temp_dir_1024(temp_dir):
""" Create a temporary directory that has a single file of a
particular size (1024 bytes).
"""
new_temp_dir = tempfile.mkdtemp(dir=temp_dir)
tmp_file, tmp_file_path = tempfile.mkstemp(dir=new_temp_dir)
with open(tmp_file, "wb") as f:
f.write(b"*" * 1024)
return new_temp_dir
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture
def temp_dir_1024_delete(temp_dir):
""" Create a temporary directory that has a single file of a
particular size (1024 bytes). The temporary directory (including
the file inside) will be deleted after fixture usage.
"""
with tempfile.TemporaryDirectory(dir=temp_dir) as new_temp_dir:
tmp_file, tmp_file_path = tempfile.mkstemp(dir=new_temp_dir)
with open(tmp_file, "wb") as f:
f.write(b"*" * 1024)
yield new_temp_dir
@pytest.fixture
def temp_file_1024(temp_dir):
""" Create a temporary file of a particular size (1024 bytes). """
with tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) as tmp_file:
tmp_file.write(b"*" * 1024)
return tmp_file.name
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture
def temp_file_1024_delete(temp_dir):
"""
Create a temporary file of a particular size (1024 bytes).
The temporary file will be deleted after fixture usage.
"""
with tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) as tmp_file:
tmp_file.write(b"*" * 1024)
tmp_file.flush()
tmp_file.close()
yield tmp_file.name
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture(scope="session")
def custom_zw():
zw = web.share_mode.ZipWriter(
common.Common(),
zip_filename=common.Common.random_string(4, 6),
processed_size_callback=lambda _: "custom_callback",
)
yield zw
zw.close()
os.remove(zw.zip_filename)
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture(scope="session")
def default_zw():
zw = web.share_mode.ZipWriter(common.Common())
yield zw
zw.close()
tmp_dir = os.path.dirname(zw.zip_filename)
try:
shutil.rmtree(tmp_dir, ignore_errors=True)
except:
pass
@pytest.fixture
def locale_en(monkeypatch):
monkeypatch.setattr("locale.getdefaultlocale", lambda: ("en_US", "UTF-8"))
@pytest.fixture
def locale_fr(monkeypatch):
monkeypatch.setattr("locale.getdefaultlocale", lambda: ("fr_FR", "UTF-8"))
@pytest.fixture
def locale_invalid(monkeypatch):
monkeypatch.setattr("locale.getdefaultlocale", lambda: ("xx_XX", "UTF-8"))
@pytest.fixture
def locale_ru(monkeypatch):
monkeypatch.setattr("locale.getdefaultlocale", lambda: ("ru_RU", "UTF-8"))
@pytest.fixture
def platform_darwin(monkeypatch):
monkeypatch.setattr("platform.system", lambda: "Darwin")
@pytest.fixture # (scope="session")
def platform_linux(monkeypatch):
monkeypatch.setattr("platform.system", lambda: "Linux")
@pytest.fixture
def platform_windows(monkeypatch):
monkeypatch.setattr("platform.system", lambda: "Windows")
@pytest.fixture
def sys_argv_sys_prefix(monkeypatch):
monkeypatch.setattr("sys.argv", [sys.prefix])
@pytest.fixture
def sys_frozen(monkeypatch):
monkeypatch.setattr("sys.frozen", True, raising=False)
@pytest.fixture
def sys_meipass(monkeypatch):
monkeypatch.setattr("sys._MEIPASS", os.path.expanduser("~"), raising=False)
@pytest.fixture # (scope="session")
def sys_onionshare_dev_mode(monkeypatch):
monkeypatch.setattr("sys.onionshare_dev_mode", True, raising=False)
@pytest.fixture
def time_time_100(monkeypatch):
monkeypatch.setattr("time.time", lambda: 100)
@pytest.fixture
def time_strftime(monkeypatch):
monkeypatch.setattr("time.strftime", lambda _: "Jun 06 2013 11:05:00")
@pytest.fixture
def common_obj():
return common.Common()
@pytest.fixture
def settings_obj(sys_onionshare_dev_mode, platform_linux):
_common = common.Common()
_common.version = "DUMMY_VERSION_1.2.3"
return settings.Settings(_common)

4
cli/tests/pytest.ini Normal file
View file

@ -0,0 +1,4 @@
[pytest]
markers =
gui: marks tests as a GUI test
tor: marks tests as a Tor GUI test

58
cli/tests/test_cli.py Normal file
View file

@ -0,0 +1,58 @@
import os
import pytest
from onionshare_cli import OnionShare
from onionshare_cli.common import Common
from onionshare_cli.mode_settings import ModeSettings
class MyOnion:
def __init__(self):
self.auth_string = "TestHidServAuth"
self.private_key = ""
self.scheduled_key = None
@staticmethod
def start_onion_service(
self, mode_settings_obj, await_publication=True, save_scheduled_key=False
):
return "test_service_id.onion"
@pytest.fixture
def onionshare_obj():
common = Common()
return OnionShare(common, MyOnion())
@pytest.fixture
def mode_settings_obj():
common = Common()
return ModeSettings(common)
class TestOnionShare:
def test_init(self, onionshare_obj):
assert onionshare_obj.hidserv_dir is None
assert onionshare_obj.onion_host is None
assert onionshare_obj.cleanup_filenames == []
assert onionshare_obj.local_only is False
def test_start_onion_service(self, onionshare_obj, mode_settings_obj):
onionshare_obj.start_onion_service(mode_settings_obj)
assert 17600 <= onionshare_obj.port <= 17650
assert onionshare_obj.onion_host == "test_service_id.onion"
def test_start_onion_service_local_only(self, onionshare_obj, mode_settings_obj):
onionshare_obj.local_only = True
onionshare_obj.start_onion_service(mode_settings_obj)
assert onionshare_obj.onion_host == "127.0.0.1:{}".format(onionshare_obj.port)
def test_cleanup(self, onionshare_obj, temp_dir_1024, temp_file_1024):
onionshare_obj.cleanup_filenames = [temp_dir_1024, temp_file_1024]
onionshare_obj.cleanup()
assert os.path.exists(temp_dir_1024) is False
assert os.path.exists(temp_dir_1024) is False
assert onionshare_obj.cleanup_filenames == []

View file

@ -0,0 +1,275 @@
import contextlib
import inspect
import io
import os
import random
import re
import socket
import sys
import zipfile
import pytest
PASSWORD_REGEX = re.compile(r"^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$")
# TODO: Improve the Common tests to test it all as a single class
class TestBuildPassword:
@pytest.mark.parametrize(
"test_input,expected",
(
# VALID, two lowercase words, separated by a hyphen
("syrup-enzyme", True),
("caution-friday", True),
# VALID, two lowercase words, with one hyphenated compound word
("drop-down-thimble", True),
("unmixed-yo-yo", True),
# VALID, two lowercase hyphenated compound words, separated by hyphen
("yo-yo-drop-down", True),
("felt-tip-t-shirt", True),
("hello-world", True),
# INVALID
("Upper-Case", False),
("digits-123", False),
("too-many-hyphens-", False),
("symbols-!@#$%", False),
),
)
def test_build_password_regex(self, test_input, expected):
""" Test that `PASSWORD_REGEX` accounts for the following patterns
There are a few hyphenated words in `wordlist.txt`:
* drop-down
* felt-tip
* t-shirt
* yo-yo
These words cause a few extra potential password patterns:
* word-word
* hyphenated-word-word
* word-hyphenated-word
* hyphenated-word-hyphenated-word
"""
assert bool(PASSWORD_REGEX.match(test_input)) == expected
def test_build_password_unique(self, common_obj, sys_onionshare_dev_mode):
assert common_obj.build_password() != common_obj.build_password()
class TestDirSize:
def test_temp_dir_size(self, common_obj, temp_dir_1024_delete):
""" dir_size() should return the total size (in bytes) of all files
in a particular directory.
"""
assert common_obj.dir_size(temp_dir_1024_delete) == 1024
class TestEstimatedTimeRemaining:
@pytest.mark.parametrize(
"test_input,expected",
(
((2, 676, 12), "8h14m16s"),
((14, 1049, 30), "1h26m15s"),
((21, 450, 1), "33m42s"),
((31, 1115, 80), "11m39s"),
((336, 989, 32), "2m12s"),
((603, 949, 38), "36s"),
((971, 1009, 83), "1s"),
),
)
def test_estimated_time_remaining(
self, common_obj, test_input, expected, time_time_100
):
assert common_obj.estimated_time_remaining(*test_input) == expected
@pytest.mark.parametrize(
"test_input",
(
(10, 20, 100), # if `time_elapsed == 0`
(0, 37, 99), # if `download_rate == 0`
),
)
def test_raises_zero_division_error(self, common_obj, test_input, time_time_100):
with pytest.raises(ZeroDivisionError):
common_obj.estimated_time_remaining(*test_input)
class TestFormatSeconds:
@pytest.mark.parametrize(
"test_input,expected",
(
(0, "0s"),
(26, "26s"),
(60, "1m"),
(947.35, "15m47s"),
(1847, "30m47s"),
(2193.94, "36m34s"),
(3600, "1h"),
(13426.83, "3h43m47s"),
(16293, "4h31m33s"),
(18392.14, "5h6m32s"),
(86400, "1d"),
(129674, "1d12h1m14s"),
(56404.12, "15h40m4s"),
),
)
def test_format_seconds(self, common_obj, test_input, expected):
assert common_obj.format_seconds(test_input) == expected
# TODO: test negative numbers?
@pytest.mark.parametrize("test_input", ("string", lambda: None, [], {}, set()))
def test_invalid_input_types(self, common_obj, test_input):
with pytest.raises(TypeError):
common_obj.format_seconds(test_input)
class TestGetAvailablePort:
@pytest.mark.parametrize(
"port_min,port_max",
((random.randint(1024, 1500), random.randint(1800, 2048)) for _ in range(50)),
)
def test_returns_an_open_port(self, common_obj, port_min, port_max):
""" get_available_port() should return an open port within the range """
port = common_obj.get_available_port(port_min, port_max)
assert port_min <= port <= port_max
with socket.socket() as tmpsock:
tmpsock.bind(("127.0.0.1", port))
class TestGetPlatform:
def test_darwin(self, platform_darwin, common_obj):
assert common_obj.platform == "Darwin"
def test_linux(self, platform_linux, common_obj):
assert common_obj.platform == "Linux"
def test_windows(self, platform_windows, common_obj):
assert common_obj.platform == "Windows"
# TODO: double-check these tests
class TestGetResourcePath:
def test_onionshare_dev_mode(self, common_obj, sys_onionshare_dev_mode):
prefix = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.abspath(inspect.getfile(inspect.currentframe()))
)
),
"share",
)
assert common_obj.get_resource_path(
os.path.join(prefix, "test_filename")
) == os.path.join(prefix, "test_filename")
def test_linux(self, common_obj, platform_linux, sys_argv_sys_prefix):
prefix = os.path.join(sys.prefix, "share", "onionshare")
assert common_obj.get_resource_path(
os.path.join(prefix, "test_filename")
) == os.path.join(prefix, "test_filename")
def test_frozen_darwin(self, common_obj, platform_darwin, sys_frozen, sys_meipass):
prefix = os.path.join(sys._MEIPASS, "share")
assert common_obj.get_resource_path(
os.path.join(prefix, "test_filename")
) == os.path.join(prefix, "test_filename")
class TestGetTorPaths:
@pytest.mark.skipif(sys.platform != "Darwin", reason="requires MacOS")
def test_get_tor_paths_darwin(
self, platform_darwin, common_obj, sys_frozen, sys_meipass
):
base_path = os.path.dirname(
os.path.dirname(os.path.dirname(common_obj.get_resource_path("")))
)
tor_path = os.path.join(base_path, "Resources", "Tor", "tor")
tor_geo_ip_file_path = os.path.join(base_path, "Resources", "Tor", "geoip")
tor_geo_ipv6_file_path = os.path.join(base_path, "Resources", "Tor", "geoip6")
obfs4proxy_file_path = os.path.join(base_path, "Resources", "Tor", "obfs4proxy")
assert common_obj.get_tor_paths() == (
tor_path,
tor_geo_ip_file_path,
tor_geo_ipv6_file_path,
obfs4proxy_file_path,
)
@pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux")
def test_get_tor_paths_linux(self, platform_linux, common_obj):
(
tor_path,
tor_geo_ip_file_path,
tor_geo_ipv6_file_path,
_, # obfs4proxy is optional
) = common_obj.get_tor_paths()
assert os.path.basename(tor_path) == "tor"
assert (
tor_geo_ip_file_path == "/usr/share/tor/geoip"
or tor_geo_ip_file_path == "/usr/local/share/tor/geoip"
)
assert (
tor_geo_ipv6_file_path == "/usr/share/tor/geoip6"
or tor_geo_ipv6_file_path == "/usr/local/share/tor/geoip6"
)
@pytest.mark.skipif(sys.platform != "win32", reason="requires Windows")
def test_get_tor_paths_windows(self, platform_windows, common_obj, sys_frozen):
base_path = os.path.join(
os.path.dirname(os.path.dirname(common_obj.get_resource_path(""))), "tor"
)
tor_path = os.path.join(os.path.join(base_path, "Tor"), "tor.exe")
obfs4proxy_file_path = os.path.join(
os.path.join(base_path, "Tor"), "obfs4proxy.exe"
)
tor_geo_ip_file_path = os.path.join(
os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip"
)
tor_geo_ipv6_file_path = os.path.join(
os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip6"
)
assert common_obj.get_tor_paths() == (
tor_path,
tor_geo_ip_file_path,
tor_geo_ipv6_file_path,
obfs4proxy_file_path,
)
class TestHumanReadableFilesize:
@pytest.mark.parametrize(
"test_input,expected",
(
(1024 ** 0, "1.0 B"),
(1024 ** 1, "1.0 KiB"),
(1024 ** 2, "1.0 MiB"),
(1024 ** 3, "1.0 GiB"),
(1024 ** 4, "1.0 TiB"),
(1024 ** 5, "1.0 PiB"),
(1024 ** 6, "1.0 EiB"),
(1024 ** 7, "1.0 ZiB"),
(1024 ** 8, "1.0 YiB"),
),
)
def test_human_readable_filesize(self, common_obj, test_input, expected):
assert common_obj.human_readable_filesize(test_input) == expected
class TestLog:
def test_output(self, common_obj, time_strftime):
common_obj.verbose = True
# From: https://stackoverflow.com/questions/1218933
with io.StringIO() as buf, contextlib.redirect_stdout(buf):
common_obj.log("TestModule", "dummy_func")
common_obj.log("TestModule", "dummy_func", "TEST_MSG")
output = buf.getvalue()
line_one, line_two, _ = output.split("\n")
assert line_one == "[Jun 06 2013 11:05:00] TestModule.dummy_func"
assert line_two == "[Jun 06 2013 11:05:00] TestModule.dummy_func: TEST_MSG"

View file

@ -0,0 +1,147 @@
import json
import os
import tempfile
import sys
import pytest
from onionshare_cli import common, settings
@pytest.fixture
def settings_obj(sys_onionshare_dev_mode, platform_linux):
_common = common.Common()
_common.version = "DUMMY_VERSION_1.2.3"
return settings.Settings(_common)
class TestSettings:
def test_init(self, settings_obj):
expected_settings = {
"version": "DUMMY_VERSION_1.2.3",
"connection_type": "bundled",
"control_port_address": "127.0.0.1",
"control_port_port": 9051,
"socks_address": "127.0.0.1",
"socks_port": 9050,
"socket_file_path": "/var/run/tor/control",
"auth_type": "no_auth",
"auth_password": "",
"use_autoupdate": True,
"autoupdate_timestamp": None,
"no_bridges": True,
"tor_bridges_use_obfs4": False,
"tor_bridges_use_meek_lite_azure": False,
"tor_bridges_use_custom_bridges": "",
"persistent_tabs": [],
}
for key in settings_obj._settings:
# Skip locale, it will not always default to the same thing
if key != "locale":
assert settings_obj._settings[key] == settings_obj.default_settings[key]
assert settings_obj._settings[key] == expected_settings[key]
def test_fill_in_defaults(self, settings_obj):
del settings_obj._settings["version"]
settings_obj.fill_in_defaults()
assert settings_obj._settings["version"] == "DUMMY_VERSION_1.2.3"
def test_load(self, temp_dir, settings_obj):
custom_settings = {
"version": "CUSTOM_VERSION",
"socks_port": 9999,
"use_stealth": True,
}
tmp_file, tmp_file_path = tempfile.mkstemp(dir=temp_dir)
with open(tmp_file, "w") as f:
json.dump(custom_settings, f)
settings_obj.filename = tmp_file_path
settings_obj.load()
assert settings_obj._settings["version"] == "CUSTOM_VERSION"
assert settings_obj._settings["socks_port"] == 9999
assert settings_obj._settings["use_stealth"] is True
os.remove(tmp_file_path)
assert os.path.exists(tmp_file_path) is False
def test_save(self, monkeypatch, temp_dir, settings_obj):
settings_filename = "default_settings.json"
new_temp_dir = tempfile.mkdtemp(dir=temp_dir)
settings_path = os.path.join(new_temp_dir, settings_filename)
settings_obj.filename = settings_path
settings_obj.save()
with open(settings_path, "r") as f:
settings = json.load(f)
assert settings_obj._settings == settings
os.remove(settings_path)
assert os.path.exists(settings_path) is False
def test_get(self, settings_obj):
assert settings_obj.get("version") == "DUMMY_VERSION_1.2.3"
assert settings_obj.get("connection_type") == "bundled"
assert settings_obj.get("control_port_address") == "127.0.0.1"
assert settings_obj.get("control_port_port") == 9051
assert settings_obj.get("socks_address") == "127.0.0.1"
assert settings_obj.get("socks_port") == 9050
assert settings_obj.get("socket_file_path") == "/var/run/tor/control"
assert settings_obj.get("auth_type") == "no_auth"
assert settings_obj.get("auth_password") == ""
assert settings_obj.get("use_autoupdate") is True
assert settings_obj.get("autoupdate_timestamp") is None
assert settings_obj.get("autoupdate_timestamp") is None
assert settings_obj.get("no_bridges") is True
assert settings_obj.get("tor_bridges_use_obfs4") is False
assert settings_obj.get("tor_bridges_use_meek_lite_azure") is False
assert settings_obj.get("tor_bridges_use_custom_bridges") == ""
def test_set_version(self, settings_obj):
settings_obj.set("version", "CUSTOM_VERSION")
assert settings_obj._settings["version"] == "CUSTOM_VERSION"
def test_set_control_port_port(self, settings_obj):
settings_obj.set("control_port_port", 999)
assert settings_obj._settings["control_port_port"] == 999
settings_obj.set("control_port_port", "NON_INTEGER")
assert settings_obj._settings["control_port_port"] == 9051
def test_set_socks_port(self, settings_obj):
settings_obj.set("socks_port", 888)
assert settings_obj._settings["socks_port"] == 888
settings_obj.set("socks_port", "NON_INTEGER")
assert settings_obj._settings["socks_port"] == 9050
@pytest.mark.skipif(sys.platform != "Darwin", reason="requires Darwin")
def test_filename_darwin(self, monkeypatch, platform_darwin):
obj = settings.Settings(common.Common())
assert obj.filename == os.path.expanduser(
"~/Library/Application Support/OnionShare-testdata/onionshare.json"
)
@pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux")
def test_filename_linux(self, monkeypatch, platform_linux):
obj = settings.Settings(common.Common())
assert obj.filename == os.path.expanduser(
"~/.config/onionshare-testdata/onionshare.json"
)
@pytest.mark.skipif(sys.platform != "win32", reason="requires Windows")
def test_filename_windows(self, monkeypatch, platform_windows):
obj = settings.Settings(common.Common())
assert obj.filename == os.path.expanduser(
"~\\AppData\\Roaming\\OnionShare-testdata\\onionshare.json"
)
def test_set_custom_bridge(self, settings_obj):
settings_obj.set(
"tor_bridges_use_custom_bridges",
"Bridge 45.3.20.65:9050 21300AD88890A49C429A6CB9959CFD44490A8F6E",
)
assert (
settings_obj._settings["tor_bridges_use_custom_bridges"]
== "Bridge 45.3.20.65:9050 21300AD88890A49C429A6CB9959CFD44490A8F6E"
)

231
cli/tests/test_cli_web.py Normal file
View file

@ -0,0 +1,231 @@
import contextlib
import inspect
import io
import os
import random
import re
import socket
import sys
import zipfile
import tempfile
import base64
import pytest
from werkzeug.datastructures import Headers
from onionshare_cli.common import Common
from onionshare_cli.web import Web
from onionshare_cli.settings import Settings
from onionshare_cli.mode_settings import ModeSettings
DEFAULT_ZW_FILENAME_REGEX = re.compile(r"^onionshare_[a-z2-7]{6}.zip$")
RANDOM_STR_REGEX = re.compile(r"^[a-z2-7]+$")
def web_obj(temp_dir, common_obj, mode, num_files=0):
""" Creates a Web object, in either share mode or receive mode, ready for testing """
common_obj.settings = Settings(common_obj)
mode_settings = ModeSettings(common_obj)
web = Web(common_obj, False, mode_settings, mode)
web.generate_password()
web.running = True
web.app.testing = True
# Share mode
if mode == "share":
# Add files
files = []
for _ in range(num_files):
with tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) as tmp_file:
tmp_file.write(b"*" * 1024)
files.append(tmp_file.name)
web.share_mode.set_file_info(files)
# Receive mode
else:
pass
return web
class TestWeb:
def test_share_mode(self, temp_dir, common_obj):
web = web_obj(temp_dir, common_obj, "share", 3)
assert web.mode == "share"
with web.app.test_client() as c:
# Load / without auth
res = c.get("/")
res.get_data()
assert res.status_code == 401
# Load / with invalid auth
res = c.get("/", headers=self._make_auth_headers("invalid"))
res.get_data()
assert res.status_code == 401
# Load / with valid auth
res = c.get("/", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
# Download
res = c.get("/download", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
assert (
res.mimetype == "application/zip"
or res.mimetype == "application/x-zip-compressed"
)
def test_share_mode_autostop_sharing_on(self, temp_dir, common_obj, temp_file_1024):
web = web_obj(temp_dir, common_obj, "share", 3)
web.settings.set("share", "autostop_sharing", True)
assert web.running == True
with web.app.test_client() as c:
# Download the first time
res = c.get("/download", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
assert (
res.mimetype == "application/zip"
or res.mimetype == "application/x-zip-compressed"
)
assert web.running == False
def test_share_mode_autostop_sharing_off(
self, temp_dir, common_obj, temp_file_1024
):
web = web_obj(temp_dir, common_obj, "share", 3)
web.settings.set("share", "autostop_sharing", False)
assert web.running == True
with web.app.test_client() as c:
# Download the first time
res = c.get("/download", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
assert (
res.mimetype == "application/zip"
or res.mimetype == "application/x-zip-compressed"
)
assert web.running == True
def test_receive_mode(self, temp_dir, common_obj):
web = web_obj(temp_dir, common_obj, "receive")
assert web.mode == "receive"
with web.app.test_client() as c:
# Load / without auth
res = c.get("/")
res.get_data()
assert res.status_code == 401
# Load / with invalid auth
res = c.get("/", headers=self._make_auth_headers("invalid"))
res.get_data()
assert res.status_code == 401
# Load / with valid auth
res = c.get("/", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
def test_public_mode_on(self, temp_dir, common_obj):
web = web_obj(temp_dir, common_obj, "receive")
web.settings.set("general", "public", True)
with web.app.test_client() as c:
# Loading / should work without auth
res = c.get("/")
data1 = res.get_data()
assert res.status_code == 200
def test_public_mode_off(self, temp_dir, common_obj):
web = web_obj(temp_dir, common_obj, "receive")
web.settings.set("general", "public", False)
with web.app.test_client() as c:
# Load / without auth
res = c.get("/")
res.get_data()
assert res.status_code == 401
# But static resources should work without auth
res = c.get(f"{web.static_url_path}/css/style.css")
res.get_data()
assert res.status_code == 200
# Load / with valid auth
res = c.get("/", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
def _make_auth_headers(self, password):
auth = base64.b64encode(b"onionshare:" + password.encode()).decode()
h = Headers()
h.add("Authorization", "Basic " + auth)
return h
class TestZipWriterDefault:
@pytest.mark.parametrize(
"test_input",
(
f"onionshare_{''.join(random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6))}.zip"
for _ in range(50)
),
)
def test_default_zw_filename_regex(self, test_input):
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input))
def test_zw_filename(self, default_zw):
zw_filename = os.path.basename(default_zw.zip_filename)
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename))
def test_zipfile_filename_matches_zipwriter_filename(self, default_zw):
assert default_zw.z.filename == default_zw.zip_filename
def test_zipfile_allow_zip64(self, default_zw):
assert default_zw.z._allowZip64 is True
def test_zipfile_mode(self, default_zw):
assert default_zw.z.mode == "w"
def test_callback(self, default_zw):
assert default_zw.processed_size_callback(None) is None
def test_add_file(self, default_zw, temp_file_1024_delete):
default_zw.add_file(temp_file_1024_delete)
zipfile_info = default_zw.z.getinfo(os.path.basename(temp_file_1024_delete))
assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED
assert zipfile_info.file_size == 1024
def test_add_directory(self, temp_dir_1024_delete, default_zw):
previous_size = default_zw._size # size before adding directory
default_zw.add_dir(temp_dir_1024_delete)
assert default_zw._size == previous_size + 1024
class TestZipWriterCustom:
@pytest.mark.parametrize(
"test_input",
(
Common.random_string(
random.randint(2, 50), random.choice((None, random.randint(2, 50)))
)
for _ in range(50)
),
)
def test_random_string_regex(self, test_input):
assert bool(RANDOM_STR_REGEX.match(test_input))
def test_custom_filename(self, custom_zw):
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename))
def test_custom_callback(self, custom_zw):
assert custom_zw.processed_size_callback(None) == "custom_callback"