* Use re.verbose on LOG_MSG_REGEX

* Create new fixtures for creating temporary files and directories
* Modify custom_zw fixture to include custom callback function to test
* Delete parent directory of ZipWriter file from default_zw fixture
* Modify monkeypatch arguments
* Group tests into separate classes for each function
* Parametrize ZeroDivisionError tests
* Test all regular expressions
* Use new fixtures to ensure proper deletion of files after testing
This commit is contained in:
Delirious Lettuce 2017-07-07 21:33:45 -06:00
parent 6558d2f7ce
commit 7724df923f

View file

@ -21,23 +21,23 @@ import contextlib
import inspect import inspect
import io import io
import os import os
import platform
import random import random
import re import re
import shutil
import socket import socket
import sys import sys
import tempfile import tempfile
import time
import zipfile import zipfile
import pytest import pytest
import shutil
from onionshare import common from onionshare import common
DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$') DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$')
# TODO: use re.VERBOSE on LOG_MSG_REGEX for readability? LOG_MSG_REGEX = re.compile(r"""
LOG_MSG_REGEX = re.compile(r'^\[Jun 06 2013 11:05:00\] TestModule\.<function test_log\.<locals>\.test_func at 0x[a-f0-9]+>(: TEST_MSG)?$') ^\[Jun\ 06\ 2013\ 11:05:00\]
\ TestModule\.<function\ TestLog\.test_output\.<locals>\.dummy_func
\ at\ 0x[a-f0-9]+>(:\ TEST_MSG)?$""", re.VERBOSE)
RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$') RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$')
SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$') SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$')
@ -46,14 +46,45 @@ SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$')
# FIXTURES # FIXTURES
# ################################################# # #################################################
# TODO: separate fixtures into a separate file? # TODO: separate fixtures into a separate file: conftest.py ?
# TODO: comment fixtures properly # TODO: comment fixtures properly
@pytest.yield_fixture()
def temp_dir_1024_delete():
"""
Create a temporary directory that has a single file of a particular
size (1024 bytes). The temporary directory (and file inside) will
be deleted after fixture usage.
"""
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file, tmp_file_path = tempfile.mkstemp(dir=tmp_dir)
with open(tmp_file, 'wb') as f:
f.write(b'*' * 1024)
yield tmp_dir
@pytest.yield_fixture()
def temp_file_1024_delete():
"""
Create a temporary file of a particular size (1024 bytes).
The temporary file will be deleted after fixture usage.
"""
with tempfile.NamedTemporaryFile() as tmp_file:
tmp_file.write(b'*' * 1024)
tmp_file.flush()
yield tmp_file.name
# pytest > 2.9 only needs @pytest.fixture # pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture(scope='session') @pytest.yield_fixture(scope='session')
def custom_zw(): def custom_zw():
zw = common.ZipWriter(zip_filename=common.random_string(4, 6)) zw = common.ZipWriter(
zip_filename=common.random_string(4, 6),
processed_size_callback=lambda _: 'custom_callback'
)
yield zw yield zw
zw.close() zw.close()
os.remove(zw.zip_filename) os.remove(zw.zip_filename)
@ -65,7 +96,8 @@ def default_zw():
zw = common.ZipWriter() zw = common.ZipWriter()
yield zw yield zw
zw.close() zw.close()
os.remove(zw.zip_filename) tmp_dir = os.path.dirname(zw.zip_filename)
shutil.rmtree(tmp_dir)
@pytest.fixture @pytest.fixture
@ -106,16 +138,16 @@ def sys_frozen(monkeypatch):
@pytest.fixture @pytest.fixture
def sys_meipass(monkeypatch): def sys_meipass(monkeypatch):
monkeypatch.setattr( monkeypatch.setattr(
sys, '_MEIPASS', os.path.expanduser('~'), raising=False) 'sys._MEIPASS', os.path.expanduser('~'), raising=False)
@pytest.fixture @pytest.fixture
def sys_onionshare_dev_mode(monkeypatch): def sys_onionshare_dev_mode(monkeypatch):
monkeypatch.setattr(sys, 'onionshare_dev_mode', True, raising=False) monkeypatch.setattr('sys.onionshare_dev_mode', True, raising=False)
@pytest.fixture @pytest.fixture
def time_100(monkeypatch): def time_time_100(monkeypatch):
monkeypatch.setattr('time.time', lambda: 100) monkeypatch.setattr('time.time', lambda: 100)
@ -169,28 +201,16 @@ class TestBuildSlug:
assert bool(SLUG_REGEX.match(test_input)) == expected assert bool(SLUG_REGEX.match(test_input)) == expected
def test_build_slug_unique(self, sys_onionshare_dev_mode): def test_build_slug_unique(self, sys_onionshare_dev_mode):
# fixture for common.get_resource???
assert common.build_slug() != common.build_slug() assert common.build_slug() != common.build_slug()
@pytest.mark.parametrize('directory_size', (5, 500, 5000)) class TestDirSize:
def test_dir_size(directory_size): def test_temp_dir_size(self, temp_dir_1024_delete):
""" dir_size() should return the total size (in bytes) of all files """ dir_size() should return the total size (in bytes) of all files
in a particular directory. in a particular directory.
"""
This test creates a temporary directory with a single file of a assert common.dir_size(temp_dir_1024_delete) == 1024
particular size. After the test is complete, it deletes the
temporary directory.
"""
# TODO: use helper function to create temporary file?
tmp_dir = tempfile.mkdtemp()
with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tmp_file:
tmp_file.write(b'*' * directory_size)
# tempfile.TemporaryDirectory raised error when given to `dir_size`
assert common.dir_size(tmp_dir) == directory_size
shutil.rmtree(tmp_dir)
class TestEstimatedTimeRemaining: class TestEstimatedTimeRemaining:
@ -203,25 +223,16 @@ class TestEstimatedTimeRemaining:
((603, 949, 38), '36s'), ((603, 949, 38), '36s'),
((971, 1009, 83), '1s') ((971, 1009, 83), '1s')
)) ))
def test_estimated_time_remaining(self, test_input, expected, time_100): def test_estimated_time_remaining(self, test_input, expected, time_time_100):
assert common.estimated_time_remaining(*test_input) == expected assert common.estimated_time_remaining(*test_input) == expected
# TODO: merge these two? parametrize? @pytest.mark.parametrize('test_input', (
def test_raises_zero_division_error(self, time_100): (10, 20, 100), # if `time_elapsed == 0`
""" estimated_time_remaining() raises a ZeroDivisionError if (0, 37, 99) # if `download_rate == 0`
`time_elapsed` == 0 ))
""" def test_raises_zero_division_error(self, test_input, time_time_100):
with pytest.raises(ZeroDivisionError): with pytest.raises(ZeroDivisionError):
common.estimated_time_remaining(10, 20, 100) common.estimated_time_remaining(*test_input)
def test_raises_zero_division_error_2(self, time_100):
""" estimated_time_remaining() raises a ZeroDivision error if
`download_rate` == 0
"""
with pytest.raises(ZeroDivisionError):
common.estimated_time_remaining(0, 37, 99)
class TestFormatSeconds: class TestFormatSeconds:
@ -252,20 +263,20 @@ class TestFormatSeconds:
common.format_seconds(test_input) common.format_seconds(test_input)
@pytest.mark.parametrize('port_min,port_max', ( class TestGetAvailablePort:
(random.randint(1024, 1500), @pytest.mark.parametrize('port_min,port_max', (
random.randint(1800, 2048)) for _ in range(50) (random.randint(1024, 1500),
)) random.randint(1800, 2048)) for _ in range(50)
def test_get_available_port_returns_an_open_port(port_min, port_max): ))
""" get_available_port() should return an open port within the range """ def test_returns_an_open_port(self, port_min, port_max):
""" get_available_port() should return an open port within the range """
port = common.get_available_port(port_min, port_max) port = common.get_available_port(port_min, port_max)
assert port_min <= port <= port_max assert port_min <= port <= port_max
with socket.socket() as tmpsock: with socket.socket() as tmpsock:
tmpsock.bind(('127.0.0.1', port)) tmpsock.bind(('127.0.0.1', port))
# TODO: is there a way to parametrize (fixture, expected)?
class TestGetPlatform: class TestGetPlatform:
def test_darwin(self, platform_darwin): def test_darwin(self, platform_darwin):
assert common.get_platform() == 'Darwin' assert common.get_platform() == 'Darwin'
@ -309,7 +320,6 @@ class TestGetResourcePath:
os.path.join(prefix, 'test_filename')) os.path.join(prefix, 'test_filename'))
# @pytest.mark.usefixtures('platform_darwin', 'platform_linux', 'platform_windows')
class TestGetTorPaths: class TestGetTorPaths:
# @pytest.mark.skipif(sys.platform != 'Darwin', reason='requires MacOS') ? # @pytest.mark.skipif(sys.platform != 'Darwin', reason='requires MacOS') ?
def test_get_tor_paths_darwin(self, platform_darwin, sys_frozen, sys_meipass): def test_get_tor_paths_darwin(self, platform_darwin, sys_frozen, sys_meipass):
@ -341,50 +351,55 @@ class TestGetTorPaths:
(tor_path, tor_geo_ip_file_path, tor_geo_ipv6_file_path)) (tor_path, tor_geo_ip_file_path, tor_geo_ipv6_file_path))
def test_get_version(sys_onionshare_dev_mode): class TestGetVersion:
with open(common.get_resource_path('version.txt')) as f: def test_get_version(self, sys_onionshare_dev_mode):
version = f.read().strip() with open(common.get_resource_path('version.txt')) as f:
version = f.read().strip()
assert version == common.get_version() assert version == common.get_version()
@pytest.mark.parametrize('test_input,expected', ( class TestHumanReadableFilesize:
(1024 ** 0, '1.0 B'), @pytest.mark.parametrize('test_input,expected', (
(1024 ** 1, '1.0 KiB'), (1024 ** 0, '1.0 B'),
(1024 ** 2, '1.0 MiB'), (1024 ** 1, '1.0 KiB'),
(1024 ** 3, '1.0 GiB'), (1024 ** 2, '1.0 MiB'),
(1024 ** 4, '1.0 TiB'), (1024 ** 3, '1.0 GiB'),
(1024 ** 5, '1.0 PiB'), (1024 ** 4, '1.0 TiB'),
(1024 ** 6, '1.0 EiB'), (1024 ** 5, '1.0 PiB'),
(1024 ** 7, '1.0 ZiB'), (1024 ** 6, '1.0 EiB'),
(1024 ** 8, '1.0 YiB') (1024 ** 7, '1.0 ZiB'),
)) (1024 ** 8, '1.0 YiB')
def test_human_readable_filesize(test_input, expected): ))
assert common.human_readable_filesize(test_input) == expected def test_human_readable_filesize(self, test_input, expected):
assert common.human_readable_filesize(test_input) == expected
def test_log(set_debug_true, time_strftime): class TestLog:
def test_func(): @pytest.mark.parametrize('test_input', (
pass ('[Jun 06 2013 11:05:00]'
' TestModule.<function TestLog.test_output.<locals>.dummy_func'
' at 0xdeadbeef>'),
('[Jun 06 2013 11:05:00]'
' TestModule.<function TestLog.test_output.<locals>.dummy_func'
' at 0xdeadbeef>: TEST_MSG')
))
def test_log_msg_regex(self, test_input):
assert bool(LOG_MSG_REGEX.match(test_input))
# From: https://stackoverflow.com/questions/1218933 def test_output(self, set_debug_true, time_strftime):
with io.StringIO() as buf, contextlib.redirect_stdout(buf): def dummy_func():
common.log('TestModule', test_func) pass
common.log('TestModule', test_func, 'TEST_MSG')
output = buf.getvalue()
line_one, line_two, _ = output.split('\n') # From: https://stackoverflow.com/questions/1218933
assert LOG_MSG_REGEX.match(line_one) with io.StringIO() as buf, contextlib.redirect_stdout(buf):
assert LOG_MSG_REGEX.match(line_two) common.log('TestModule', dummy_func)
common.log('TestModule', dummy_func, 'TEST_MSG')
output = buf.getvalue()
line_one, line_two, _ = output.split('\n')
@pytest.mark.parametrize('test_input,expected', ( assert LOG_MSG_REGEX.match(line_one)
(common.random_string(random.randint(2, 50), assert LOG_MSG_REGEX.match(line_two)
random.choice((None, random.randint(2, 50)))
), True) for _ in range(50)
))
def test_random_string_regex(test_input, expected):
assert bool(RANDOM_STR_REGEX.match(test_input)) == expected
class TestSetDebug: class TestSetDebug:
@ -397,8 +412,18 @@ class TestSetDebug:
assert common.debug is False assert common.debug is False
# TODO: ZipWriter doesn't enforce the `.zip` extension with custom filename class TestZipWriterDefault:
class TestDefaultZipWriter: @pytest.mark.parametrize('test_input', (
'onionshare_{}.zip'.format(''.join(
random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6))
) for _ in range(50)
))
def test_default_zw_filename_regex(self, test_input):
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input))
def test_init(self, default_zw):
pass # TODO:
def test_zw_filename(self, default_zw): def test_zw_filename(self, default_zw):
zw_filename = os.path.basename(default_zw.zip_filename) zw_filename = os.path.basename(default_zw.zip_filename)
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename)) assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename))
@ -415,55 +440,32 @@ class TestDefaultZipWriter:
def test_callback(self, default_zw): def test_callback(self, default_zw):
assert default_zw.processed_size_callback(None) is None assert default_zw.processed_size_callback(None) is None
def test_add_file(self, default_zw): def test_add_file(self, default_zw, temp_file_1024_delete):
tmp_file_size = 1000 default_zw.add_file(temp_file_1024_delete)
# TODO: use helper function to create temporary file? zipfile_info = default_zw.z.getinfo(
with tempfile.NamedTemporaryFile(delete=False) as tmp_file: os.path.basename(temp_file_1024_delete))
tmp_file.write(b'*' * tmp_file_size)
tmp_file_path = tmp_file.name
default_zw.add_file(tmp_file_path)
zipfile_info = default_zw.z.getinfo(os.path.basename(tmp_file_path))
assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED
assert zipfile_info.file_size == tmp_file_size assert zipfile_info.file_size == 1024
os.remove(tmp_file_path) def test_add_directory(self, temp_dir_1024_delete, default_zw):
assert os.path.exists(tmp_file_path) is False previous_size = default_zw._size # size before adding directory
default_zw.add_dir(temp_dir_1024_delete)
def test_add_directory(self, default_zw): assert default_zw._size == previous_size + 1024
directory_size = 1000
tmp_dir = create_temporary_directory(directory_size)
current_size = default_zw._size
default_zw.add_dir(tmp_dir)
assert default_zw._size == current_size + directory_size
shutil.rmtree(tmp_dir)
assert os.path.exists(tmp_dir) is False
def test_zip_writer_custom_filename(custom_zw): class TestZipWriterCustom:
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename)) @pytest.mark.parametrize('test_input', (
common.random_string(
random.randint(2, 50),
random.choice((None, random.randint(2, 50)))
) for _ in range(50)
))
def test_random_string_regex(self, test_input):
assert bool(RANDOM_STR_REGEX.match(test_input))
def test_custom_filename(self, custom_zw):
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename))
def create_temporary_directory(directory_size): def test_custom_callback(self, custom_zw):
""" Create a temporary directory with a single file of a assert custom_zw.processed_size_callback(None) == 'custom_callback'
particular size. Return directory path as a string
"""
tmp_dir = tempfile.mkdtemp()
# create_temporary_file(directory=tmp_dir)
with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tmp_file:
tmp_file.write(b'*' * directory_size)
return tmp_dir
# TODO: rewrite this helper function to DRY up tests that use temporary files
# def create_temporary_file(directory=None, delete=False, file_size=100):
# if file_size <= 0:
# file_size = 100
# with tempfile.NamedTemporaryFile(dir=directory, delete=delete) as tmp_file:
# tmp_file.write(b'*' * file_size)
# return tmp_file.name