Merge develop in and fix upload/timer functionality so that it works as described. Still needs fixing to not throw a connection error to the lucky last uploader after their upload completes and server stops due to expiry having passed

This commit is contained in:
Miguel Jacq 2018-10-01 16:42:54 +10:00
commit d69bba4c9d
88 changed files with 5751 additions and 1668 deletions

0
tests/__init__.py Normal file
View file

160
tests/conftest.py Normal file
View file

@ -0,0 +1,160 @@
import sys
# Force tests to look for resources in the source code tree
sys.onionshare_dev_mode = True
import os
import shutil
import tempfile
import pytest
from onionshare import common, web, settings
@pytest.fixture
def temp_dir_1024():
""" Create a temporary directory that has a single file of a
particular size (1024 bytes).
"""
tmp_dir = tempfile.mkdtemp()
tmp_file, tmp_file_path = tempfile.mkstemp(dir=tmp_dir)
with open(tmp_file, 'wb') as f:
f.write(b'*' * 1024)
return tmp_dir
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture
def temp_dir_1024_delete():
""" Create a temporary directory that has a single file of a
particular size (1024 bytes). The temporary directory (including
the file inside) will be deleted after fixture usage.
"""
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file, tmp_file_path = tempfile.mkstemp(dir=tmp_dir)
with open(tmp_file, 'wb') as f:
f.write(b'*' * 1024)
yield tmp_dir
@pytest.fixture
def temp_file_1024():
""" Create a temporary file of a particular size (1024 bytes). """
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(b'*' * 1024)
return tmp_file.name
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture
def temp_file_1024_delete():
"""
Create a temporary file of a particular size (1024 bytes).
The temporary file will be deleted after fixture usage.
"""
with tempfile.NamedTemporaryFile() as tmp_file:
tmp_file.write(b'*' * 1024)
tmp_file.flush()
yield tmp_file.name
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture(scope='session')
def custom_zw():
zw = web.share_mode.ZipWriter(
common.Common(),
zip_filename=common.Common.random_string(4, 6),
processed_size_callback=lambda _: 'custom_callback'
)
yield zw
zw.close()
os.remove(zw.zip_filename)
# pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture(scope='session')
def default_zw():
zw = web.share_mode.ZipWriter(common.Common())
yield zw
zw.close()
tmp_dir = os.path.dirname(zw.zip_filename)
shutil.rmtree(tmp_dir)
@pytest.fixture
def locale_en(monkeypatch):
monkeypatch.setattr('locale.getdefaultlocale', lambda: ('en_US', 'UTF-8'))
@pytest.fixture
def locale_fr(monkeypatch):
monkeypatch.setattr('locale.getdefaultlocale', lambda: ('fr_FR', 'UTF-8'))
@pytest.fixture
def locale_invalid(monkeypatch):
monkeypatch.setattr('locale.getdefaultlocale', lambda: ('xx_XX', 'UTF-8'))
@pytest.fixture
def locale_ru(monkeypatch):
monkeypatch.setattr('locale.getdefaultlocale', lambda: ('ru_RU', 'UTF-8'))
@pytest.fixture
def platform_darwin(monkeypatch):
monkeypatch.setattr('platform.system', lambda: 'Darwin')
@pytest.fixture # (scope="session")
def platform_linux(monkeypatch):
monkeypatch.setattr('platform.system', lambda: 'Linux')
@pytest.fixture
def platform_windows(monkeypatch):
monkeypatch.setattr('platform.system', lambda: 'Windows')
@pytest.fixture
def sys_argv_sys_prefix(monkeypatch):
monkeypatch.setattr('sys.argv', [sys.prefix])
@pytest.fixture
def sys_frozen(monkeypatch):
monkeypatch.setattr('sys.frozen', True, raising=False)
@pytest.fixture
def sys_meipass(monkeypatch):
monkeypatch.setattr(
'sys._MEIPASS', os.path.expanduser('~'), raising=False)
@pytest.fixture # (scope="session")
def sys_onionshare_dev_mode(monkeypatch):
monkeypatch.setattr('sys.onionshare_dev_mode', True, raising=False)
@pytest.fixture
def time_time_100(monkeypatch):
monkeypatch.setattr('time.time', lambda: 100)
@pytest.fixture
def time_strftime(monkeypatch):
monkeypatch.setattr('time.strftime', lambda _: 'Jun 06 2013 11:05:00')
@pytest.fixture
def common_obj():
return common.Common()
@pytest.fixture
def settings_obj(sys_onionshare_dev_mode, platform_linux):
_common = common.Common()
_common.version = 'DUMMY_VERSION_1.2.3'
return settings.Settings(_common)

38
tests/test_helpers.py Normal file
View file

@ -0,0 +1,38 @@
"""
OnionShare | https://onionshare.org/
Copyright (C) 2014-2018 Micah Lee <micah@micahflee.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import tempfile
import os
class MockSubprocess():
def __init__(self):
self.last_call = None
def call(self, args):
self.last_call = args
def last_call_args(self):
return self.last_call
def write_tempfile(text):
path = os.path.join(tempfile.mkdtemp(), "/test-file.txt")
with open(path, "w") as f:
f.write(text)
return path

86
tests/test_onionshare.py Normal file
View file

@ -0,0 +1,86 @@
"""
OnionShare | https://onionshare.org/
Copyright (C) 2014-2018 Micah Lee <micah@micahflee.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import pytest
from onionshare import OnionShare
from onionshare.common import Common
class MyOnion:
def __init__(self, stealth=False):
self.auth_string = 'TestHidServAuth'
self.private_key = ''
self.stealth = stealth
@staticmethod
def start_onion_service(_):
return 'test_service_id.onion'
@pytest.fixture
def onionshare_obj():
common = Common()
return OnionShare(common, MyOnion())
class TestOnionShare:
def test_init(self, onionshare_obj):
assert onionshare_obj.hidserv_dir is None
assert onionshare_obj.onion_host is None
assert onionshare_obj.stealth is None
assert onionshare_obj.cleanup_filenames == []
assert onionshare_obj.local_only is False
def test_set_stealth_true(self, onionshare_obj):
onionshare_obj.set_stealth(True)
assert onionshare_obj.stealth is True
assert onionshare_obj.onion.stealth is True
def test_set_stealth_false(self, onionshare_obj):
onionshare_obj.set_stealth(False)
assert onionshare_obj.stealth is False
assert onionshare_obj.onion.stealth is False
def test_start_onion_service(self, onionshare_obj):
onionshare_obj.set_stealth(False)
onionshare_obj.start_onion_service()
assert 17600 <= onionshare_obj.port <= 17650
assert onionshare_obj.onion_host == 'test_service_id.onion'
def test_start_onion_service_stealth(self, onionshare_obj):
onionshare_obj.set_stealth(True)
onionshare_obj.start_onion_service()
assert onionshare_obj.auth_string == 'TestHidServAuth'
def test_start_onion_service_local_only(self, onionshare_obj):
onionshare_obj.local_only = True
onionshare_obj.start_onion_service()
assert onionshare_obj.onion_host == '127.0.0.1:{}'.format(
onionshare_obj.port)
def test_cleanup(self, onionshare_obj, temp_dir_1024, temp_file_1024):
onionshare_obj.cleanup_filenames = [temp_dir_1024, temp_file_1024]
onionshare_obj.cleanup()
assert os.path.exists(temp_dir_1024) is False
assert os.path.exists(temp_dir_1024) is False
assert onionshare_obj.cleanup_filenames == []

View file

@ -0,0 +1,281 @@
"""
OnionShare | https://onionshare.org/
Copyright (C) 2014-2018 Micah Lee <micah@micahflee.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import contextlib
import inspect
import io
import os
import random
import re
import socket
import sys
import zipfile
import pytest
LOG_MSG_REGEX = re.compile(r"""
^\[Jun\ 06\ 2013\ 11:05:00\]
\ TestModule\.<function\ TestLog\.test_output\.<locals>\.dummy_func
\ at\ 0x[a-f0-9]+>(:\ TEST_MSG)?$""", re.VERBOSE)
SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$')
# TODO: Improve the Common tests to test it all as a single class
class TestBuildSlug:
@pytest.mark.parametrize('test_input,expected', (
# VALID, two lowercase words, separated by a hyphen
('syrup-enzyme', True),
('caution-friday', True),
# VALID, two lowercase words, with one hyphenated compound word
('drop-down-thimble', True),
('unmixed-yo-yo', True),
# VALID, two lowercase hyphenated compound words, separated by hyphen
('yo-yo-drop-down', True),
('felt-tip-t-shirt', True),
('hello-world', True),
# INVALID
('Upper-Case', False),
('digits-123', False),
('too-many-hyphens-', False),
('symbols-!@#$%', False)
))
def test_build_slug_regex(self, test_input, expected):
""" Test that `SLUG_REGEX` accounts for the following patterns
There are a few hyphenated words in `wordlist.txt`:
* drop-down
* felt-tip
* t-shirt
* yo-yo
These words cause a few extra potential slug patterns:
* word-word
* hyphenated-word-word
* word-hyphenated-word
* hyphenated-word-hyphenated-word
"""
assert bool(SLUG_REGEX.match(test_input)) == expected
def test_build_slug_unique(self, common_obj, sys_onionshare_dev_mode):
assert common_obj.build_slug() != common_obj.build_slug()
class TestDirSize:
def test_temp_dir_size(self, common_obj, temp_dir_1024_delete):
""" dir_size() should return the total size (in bytes) of all files
in a particular directory.
"""
assert common_obj.dir_size(temp_dir_1024_delete) == 1024
class TestEstimatedTimeRemaining:
@pytest.mark.parametrize('test_input,expected', (
((2, 676, 12), '8h14m16s'),
((14, 1049, 30), '1h26m15s'),
((21, 450, 1), '33m42s'),
((31, 1115, 80), '11m39s'),
((336, 989, 32), '2m12s'),
((603, 949, 38), '36s'),
((971, 1009, 83), '1s')
))
def test_estimated_time_remaining(
self, common_obj, test_input, expected, time_time_100):
assert common_obj.estimated_time_remaining(*test_input) == expected
@pytest.mark.parametrize('test_input', (
(10, 20, 100), # if `time_elapsed == 0`
(0, 37, 99) # if `download_rate == 0`
))
def test_raises_zero_division_error(self, common_obj, test_input, time_time_100):
with pytest.raises(ZeroDivisionError):
common_obj.estimated_time_remaining(*test_input)
class TestFormatSeconds:
@pytest.mark.parametrize('test_input,expected', (
(0, '0s'),
(26, '26s'),
(60, '1m'),
(947.35, '15m47s'),
(1847, '30m47s'),
(2193.94, '36m34s'),
(3600, '1h'),
(13426.83, '3h43m47s'),
(16293, '4h31m33s'),
(18392.14, '5h6m32s'),
(86400, '1d'),
(129674, '1d12h1m14s'),
(56404.12, '15h40m4s')
))
def test_format_seconds(self, common_obj, test_input, expected):
assert common_obj.format_seconds(test_input) == expected
# TODO: test negative numbers?
@pytest.mark.parametrize('test_input', (
'string', lambda: None, [], {}, set()
))
def test_invalid_input_types(self, common_obj, test_input):
with pytest.raises(TypeError):
common_obj.format_seconds(test_input)
class TestGetAvailablePort:
@pytest.mark.parametrize('port_min,port_max', (
(random.randint(1024, 1500),
random.randint(1800, 2048)) for _ in range(50)
))
def test_returns_an_open_port(self, common_obj, port_min, port_max):
""" get_available_port() should return an open port within the range """
port = common_obj.get_available_port(port_min, port_max)
assert port_min <= port <= port_max
with socket.socket() as tmpsock:
tmpsock.bind(('127.0.0.1', port))
class TestGetPlatform:
def test_darwin(self, platform_darwin, common_obj):
assert common_obj.platform == 'Darwin'
def test_linux(self, platform_linux, common_obj):
assert common_obj.platform == 'Linux'
def test_windows(self, platform_windows, common_obj):
assert common_obj.platform == 'Windows'
# TODO: double-check these tests
class TestGetResourcePath:
def test_onionshare_dev_mode(self, common_obj, sys_onionshare_dev_mode):
prefix = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.abspath(
inspect.getfile(
inspect.currentframe())))), 'share')
assert (
common_obj.get_resource_path(os.path.join(prefix, 'test_filename')) ==
os.path.join(prefix, 'test_filename'))
def test_linux(self, common_obj, platform_linux, sys_argv_sys_prefix):
prefix = os.path.join(sys.prefix, 'share/onionshare')
assert (
common_obj.get_resource_path(os.path.join(prefix, 'test_filename')) ==
os.path.join(prefix, 'test_filename'))
def test_frozen_darwin(self, common_obj, platform_darwin, sys_frozen, sys_meipass):
prefix = os.path.join(sys._MEIPASS, 'share')
assert (
common_obj.get_resource_path(os.path.join(prefix, 'test_filename')) ==
os.path.join(prefix, 'test_filename'))
class TestGetTorPaths:
# @pytest.mark.skipif(sys.platform != 'Darwin', reason='requires MacOS') ?
def test_get_tor_paths_darwin(self, platform_darwin, common_obj, sys_frozen, sys_meipass):
base_path = os.path.dirname(
os.path.dirname(
os.path.dirname(
common_obj.get_resource_path(''))))
tor_path = os.path.join(
base_path, 'Resources', 'Tor', 'tor')
tor_geo_ip_file_path = os.path.join(
base_path, 'Resources', 'Tor', 'geoip')
tor_geo_ipv6_file_path = os.path.join(
base_path, 'Resources', 'Tor', 'geoip6')
obfs4proxy_file_path = os.path.join(
base_path, 'Resources', 'Tor', 'obfs4proxy')
assert (common_obj.get_tor_paths() ==
(tor_path, tor_geo_ip_file_path, tor_geo_ipv6_file_path, obfs4proxy_file_path))
# @pytest.mark.skipif(sys.platform != 'Linux', reason='requires Linux') ?
def test_get_tor_paths_linux(self, platform_linux, common_obj):
assert (common_obj.get_tor_paths() ==
('/usr/bin/tor', '/usr/share/tor/geoip', '/usr/share/tor/geoip6', '/usr/bin/obfs4proxy'))
# @pytest.mark.skipif(sys.platform != 'Windows', reason='requires Windows') ?
def test_get_tor_paths_windows(self, platform_windows, common_obj, sys_frozen):
base_path = os.path.join(
os.path.dirname(
os.path.dirname(
common_obj.get_resource_path(''))), 'tor')
tor_path = os.path.join(
os.path.join(base_path, 'Tor'), 'tor.exe')
obfs4proxy_file_path = os.path.join(
os.path.join(base_path, 'Tor'), 'obfs4proxy.exe')
tor_geo_ip_file_path = os.path.join(
os.path.join(
os.path.join(base_path, 'Data'), 'Tor'), 'geoip')
tor_geo_ipv6_file_path = os.path.join(
os.path.join(
os.path.join(base_path, 'Data'), 'Tor'), 'geoip6')
assert (common_obj.get_tor_paths() ==
(tor_path, tor_geo_ip_file_path, tor_geo_ipv6_file_path, obfs4proxy_file_path))
class TestHumanReadableFilesize:
@pytest.mark.parametrize('test_input,expected', (
(1024 ** 0, '1.0 B'),
(1024 ** 1, '1.0 KiB'),
(1024 ** 2, '1.0 MiB'),
(1024 ** 3, '1.0 GiB'),
(1024 ** 4, '1.0 TiB'),
(1024 ** 5, '1.0 PiB'),
(1024 ** 6, '1.0 EiB'),
(1024 ** 7, '1.0 ZiB'),
(1024 ** 8, '1.0 YiB')
))
def test_human_readable_filesize(self, common_obj, test_input, expected):
assert common_obj.human_readable_filesize(test_input) == expected
class TestLog:
@pytest.mark.parametrize('test_input', (
('[Jun 06 2013 11:05:00]'
' TestModule.<function TestLog.test_output.<locals>.dummy_func'
' at 0xdeadbeef>'),
('[Jun 06 2013 11:05:00]'
' TestModule.<function TestLog.test_output.<locals>.dummy_func'
' at 0xdeadbeef>: TEST_MSG')
))
def test_log_msg_regex(self, test_input):
assert bool(LOG_MSG_REGEX.match(test_input))
def test_output(self, common_obj, time_strftime):
def dummy_func():
pass
common_obj.debug = True
# From: https://stackoverflow.com/questions/1218933
with io.StringIO() as buf, contextlib.redirect_stdout(buf):
common_obj.log('TestModule', dummy_func)
common_obj.log('TestModule', dummy_func, 'TEST_MSG')
output = buf.getvalue()
line_one, line_two, _ = output.split('\n')
assert LOG_MSG_REGEX.match(line_one)
assert LOG_MSG_REGEX.match(line_two)

View file

@ -0,0 +1,177 @@
"""
OnionShare | https://onionshare.org/
Copyright (C) 2014-2018 Micah Lee <micah@micahflee.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import json
import os
import tempfile
import pytest
from onionshare import common, settings, strings
@pytest.fixture
def os_path_expanduser(monkeypatch):
monkeypatch.setattr('os.path.expanduser', lambda path: path)
@pytest.fixture
def settings_obj(sys_onionshare_dev_mode, platform_linux):
_common = common.Common()
_common.version = 'DUMMY_VERSION_1.2.3'
return settings.Settings(_common)
class TestSettings:
def test_init(self, settings_obj):
assert settings_obj._settings == settings_obj.default_settings == {
'version': 'DUMMY_VERSION_1.2.3',
'connection_type': 'bundled',
'control_port_address': '127.0.0.1',
'control_port_port': 9051,
'socks_address': '127.0.0.1',
'socks_port': 9050,
'socket_file_path': '/var/run/tor/control',
'auth_type': 'no_auth',
'auth_password': '',
'close_after_first_download': True,
'shutdown_timeout': False,
'use_stealth': False,
'use_autoupdate': True,
'autoupdate_timestamp': None,
'no_bridges': True,
'tor_bridges_use_obfs4': False,
'tor_bridges_use_meek_lite_azure': False,
'tor_bridges_use_custom_bridges': '',
'use_legacy_v2_onions': False,
'save_private_key': False,
'private_key': '',
'slug': '',
'hidservauth_string': '',
'downloads_dir': os.path.expanduser('~/OnionShare'),
'public_mode': False
}
def test_fill_in_defaults(self, settings_obj):
del settings_obj._settings['version']
settings_obj.fill_in_defaults()
assert settings_obj._settings['version'] == 'DUMMY_VERSION_1.2.3'
def test_load(self, settings_obj):
custom_settings = {
'version': 'CUSTOM_VERSION',
'socks_port': 9999,
'use_stealth': True
}
tmp_file, tmp_file_path = tempfile.mkstemp()
with open(tmp_file, 'w') as f:
json.dump(custom_settings, f)
settings_obj.filename = tmp_file_path
settings_obj.load()
assert settings_obj._settings['version'] == 'CUSTOM_VERSION'
assert settings_obj._settings['socks_port'] == 9999
assert settings_obj._settings['use_stealth'] is True
os.remove(tmp_file_path)
assert os.path.exists(tmp_file_path) is False
def test_save(self, monkeypatch, settings_obj):
monkeypatch.setattr(strings, '_', lambda _: '')
settings_filename = 'default_settings.json'
tmp_dir = tempfile.gettempdir()
settings_path = os.path.join(tmp_dir, settings_filename)
settings_obj.filename = settings_path
settings_obj.save()
with open(settings_path, 'r') as f:
settings = json.load(f)
assert settings_obj._settings == settings
os.remove(settings_path)
assert os.path.exists(settings_path) is False
def test_get(self, settings_obj):
assert settings_obj.get('version') == 'DUMMY_VERSION_1.2.3'
assert settings_obj.get('connection_type') == 'bundled'
assert settings_obj.get('control_port_address') == '127.0.0.1'
assert settings_obj.get('control_port_port') == 9051
assert settings_obj.get('socks_address') == '127.0.0.1'
assert settings_obj.get('socks_port') == 9050
assert settings_obj.get('socket_file_path') == '/var/run/tor/control'
assert settings_obj.get('auth_type') == 'no_auth'
assert settings_obj.get('auth_password') == ''
assert settings_obj.get('close_after_first_download') is True
assert settings_obj.get('use_stealth') is False
assert settings_obj.get('use_autoupdate') is True
assert settings_obj.get('autoupdate_timestamp') is None
assert settings_obj.get('autoupdate_timestamp') is None
assert settings_obj.get('no_bridges') is True
assert settings_obj.get('tor_bridges_use_obfs4') is False
assert settings_obj.get('tor_bridges_use_meek_lite_azure') is False
assert settings_obj.get('tor_bridges_use_custom_bridges') == ''
def test_set_version(self, settings_obj):
settings_obj.set('version', 'CUSTOM_VERSION')
assert settings_obj._settings['version'] == 'CUSTOM_VERSION'
def test_set_control_port_port(self, settings_obj):
settings_obj.set('control_port_port', 999)
assert settings_obj._settings['control_port_port'] == 999
settings_obj.set('control_port_port', 'NON_INTEGER')
assert settings_obj._settings['control_port_port'] == 9051
def test_set_socks_port(self, settings_obj):
settings_obj.set('socks_port', 888)
assert settings_obj._settings['socks_port'] == 888
settings_obj.set('socks_port', 'NON_INTEGER')
assert settings_obj._settings['socks_port'] == 9050
def test_filename_darwin(
self,
monkeypatch,
os_path_expanduser,
platform_darwin):
obj = settings.Settings(common.Common())
assert (obj.filename ==
'~/Library/Application Support/OnionShare/onionshare.json')
def test_filename_linux(
self,
monkeypatch,
os_path_expanduser,
platform_linux):
obj = settings.Settings(common.Common())
assert obj.filename == '~/.config/onionshare/onionshare.json'
def test_filename_windows(
self,
monkeypatch,
platform_windows):
monkeypatch.setenv('APPDATA', 'C:')
obj = settings.Settings(common.Common())
assert obj.filename == 'C:\\OnionShare\\onionshare.json'
def test_set_custom_bridge(self, settings_obj):
settings_obj.set('tor_bridges_use_custom_bridges', 'Bridge 45.3.20.65:9050 21300AD88890A49C429A6CB9959CFD44490A8F6E')
assert settings_obj._settings['tor_bridges_use_custom_bridges'] == 'Bridge 45.3.20.65:9050 21300AD88890A49C429A6CB9959CFD44490A8F6E'

View file

@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
"""
OnionShare | https://onionshare.org/
Copyright (C) 2014-2018 Micah Lee <micah@micahflee.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import types
import pytest
from onionshare import strings
# # Stub get_resource_path so it finds the correct path while running tests
# def get_resource_path(filename):
# resources_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'share')
# path = os.path.join(resources_dir, filename)
# return path
# common.get_resource_path = get_resource_path
def test_starts_with_empty_strings():
""" Creates an empty strings dict by default """
assert strings.strings == {}
def test_underscore_is_function():
assert callable(strings._) and isinstance(strings._, types.FunctionType)
class TestLoadStrings:
def test_load_strings_defaults_to_english(
self, common_obj, locale_en, sys_onionshare_dev_mode):
""" load_strings() loads English by default """
strings.load_strings(common_obj)
assert strings._('preparing_files') == "Compressing files."
def test_load_strings_loads_other_languages(
self, common_obj, locale_fr, sys_onionshare_dev_mode):
""" load_strings() loads other languages in different locales """
strings.load_strings(common_obj, "fr")
assert strings._('preparing_files') == "Préparation des fichiers à partager."
def test_load_invalid_locale(
self, common_obj, locale_invalid, sys_onionshare_dev_mode):
""" load_strings() raises a KeyError for an invalid locale """
with pytest.raises(KeyError):
strings.load_strings(common_obj, 'XX')

View file

@ -0,0 +1,225 @@
"""
OnionShare | https://onionshare.org/
Copyright (C) 2014-2018 Micah Lee <micah@micahflee.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import contextlib
import inspect
import io
import os
import random
import re
import socket
import sys
import zipfile
import tempfile
import pytest
from onionshare.common import Common
from onionshare.web import Web
from onionshare.settings import Settings
DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$')
RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$')
def web_obj(common_obj, mode, num_files=0):
""" Creates a Web object, in either share mode or receive mode, ready for testing """
common_obj.load_settings()
web = Web(common_obj, False, mode)
web.generate_slug()
web.stay_open = True
web.running = True
web.app.testing = True
# Share mode
if mode == 'share':
# Add files
files = []
for i in range(num_files):
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(b'*' * 1024)
files.append(tmp_file.name)
web.share_mode.set_file_info(files)
# Receive mode
else:
pass
return web
class TestWeb:
def test_share_mode(self, common_obj):
web = web_obj(common_obj, 'share', 3)
assert web.mode is 'share'
with web.app.test_client() as c:
# Load 404 pages
res = c.get('/')
res.get_data()
assert res.status_code == 404
res = c.get('/invalidslug'.format(web.slug))
res.get_data()
assert res.status_code == 404
# Load download page
res = c.get('/{}'.format(web.slug))
res.get_data()
assert res.status_code == 200
# Download
res = c.get('/{}/download'.format(web.slug))
res.get_data()
assert res.status_code == 200
assert res.mimetype == 'application/zip'
def test_share_mode_close_after_first_download_on(self, common_obj, temp_file_1024):
web = web_obj(common_obj, 'share', 3)
web.stay_open = False
assert web.running == True
with web.app.test_client() as c:
# Download the first time
res = c.get('/{}/download'.format(web.slug))
res.get_data()
assert res.status_code == 200
assert res.mimetype == 'application/zip'
assert web.running == False
def test_share_mode_close_after_first_download_off(self, common_obj, temp_file_1024):
web = web_obj(common_obj, 'share', 3)
web.stay_open = True
assert web.running == True
with web.app.test_client() as c:
# Download the first time
res = c.get('/{}/download'.format(web.slug))
res.get_data()
assert res.status_code == 200
assert res.mimetype == 'application/zip'
assert web.running == True
def test_receive_mode(self, common_obj):
web = web_obj(common_obj, 'receive')
assert web.mode is 'receive'
with web.app.test_client() as c:
# Load 404 pages
res = c.get('/')
res.get_data()
assert res.status_code == 404
res = c.get('/invalidslug'.format(web.slug))
res.get_data()
assert res.status_code == 404
# Load upload page
res = c.get('/{}'.format(web.slug))
res.get_data()
assert res.status_code == 200
def test_public_mode_on(self, common_obj):
web = web_obj(common_obj, 'receive')
common_obj.settings.set('public_mode', True)
with web.app.test_client() as c:
# Upload page should be accessible from /
res = c.get('/')
data1 = res.get_data()
assert res.status_code == 200
# /[slug] should be a 404
res = c.get('/{}'.format(web.slug))
data2 = res.get_data()
assert res.status_code == 404
def test_public_mode_off(self, common_obj):
web = web_obj(common_obj, 'receive')
common_obj.settings.set('public_mode', False)
with web.app.test_client() as c:
# / should be a 404
res = c.get('/')
data1 = res.get_data()
assert res.status_code == 404
# Upload page should be accessible from /[slug]
res = c.get('/{}'.format(web.slug))
data2 = res.get_data()
assert res.status_code == 200
class TestZipWriterDefault:
@pytest.mark.parametrize('test_input', (
'onionshare_{}.zip'.format(''.join(
random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6)
)) for _ in range(50)
))
def test_default_zw_filename_regex(self, test_input):
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input))
def test_zw_filename(self, default_zw):
zw_filename = os.path.basename(default_zw.zip_filename)
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename))
def test_zipfile_filename_matches_zipwriter_filename(self, default_zw):
assert default_zw.z.filename == default_zw.zip_filename
def test_zipfile_allow_zip64(self, default_zw):
assert default_zw.z._allowZip64 is True
def test_zipfile_mode(self, default_zw):
assert default_zw.z.mode == 'w'
def test_callback(self, default_zw):
assert default_zw.processed_size_callback(None) is None
def test_add_file(self, default_zw, temp_file_1024_delete):
default_zw.add_file(temp_file_1024_delete)
zipfile_info = default_zw.z.getinfo(
os.path.basename(temp_file_1024_delete))
assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED
assert zipfile_info.file_size == 1024
def test_add_directory(self, temp_dir_1024_delete, default_zw):
previous_size = default_zw._size # size before adding directory
default_zw.add_dir(temp_dir_1024_delete)
assert default_zw._size == previous_size + 1024
class TestZipWriterCustom:
@pytest.mark.parametrize('test_input', (
Common.random_string(
random.randint(2, 50),
random.choice((None, random.randint(2, 50)))
) for _ in range(50)
))
def test_random_string_regex(self, test_input):
assert bool(RANDOM_STR_REGEX.match(test_input))
def test_custom_filename(self, custom_zw):
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename))
def test_custom_callback(self, custom_zw):
assert custom_zw.processed_size_callback(None) == 'custom_callback'