From 4f395add0a7c28b3cac2b1c35e335d1947376033 Mon Sep 17 00:00:00 2001 From: Delirious Lettuce Date: Fri, 7 Jul 2017 21:33:45 -0600 Subject: [PATCH] * Use re.verbose on LOG_MSG_REGEX * Create new fixtures for creating temporary files and directories * Modify custom_zw fixture to include custom callback function to test * Delete parent directory of ZipWriter file from default_zw fixture * Modify monkeypatch arguments * Group tests into separate classes for each function * Parametrize ZeroDivisionError tests * Test all regular expressions * Use new fixtures to ensure proper deletion of files after testing --- test/onionshare_common_test.py | 282 +++++++++++++++++---------------- 1 file changed, 142 insertions(+), 140 deletions(-) diff --git a/test/onionshare_common_test.py b/test/onionshare_common_test.py index 00158959..482000ab 100644 --- a/test/onionshare_common_test.py +++ b/test/onionshare_common_test.py @@ -21,23 +21,23 @@ import contextlib import inspect import io import os -import platform import random import re -import shutil import socket import sys import tempfile -import time import zipfile import pytest +import shutil from onionshare import common DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$') -# TODO: use re.VERBOSE on LOG_MSG_REGEX for readability? -LOG_MSG_REGEX = re.compile(r'^\[Jun 06 2013 11:05:00\] TestModule\.\.test_func at 0x[a-f0-9]+>(: TEST_MSG)?$') +LOG_MSG_REGEX = re.compile(r""" + ^\[Jun\ 06\ 2013\ 11:05:00\] + \ TestModule\.\.dummy_func + \ at\ 0x[a-f0-9]+>(:\ TEST_MSG)?$""", re.VERBOSE) RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$') SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$') @@ -46,14 +46,45 @@ SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$') # FIXTURES # ################################################# -# TODO: separate fixtures into a separate file? +# TODO: separate fixtures into a separate file: conftest.py ? # TODO: comment fixtures properly +@pytest.yield_fixture() +def temp_dir_1024_delete(): + """ + Create a temporary directory that has a single file of a particular + size (1024 bytes). The temporary directory (and file inside) will + be deleted after fixture usage. + """ + + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_file, tmp_file_path = tempfile.mkstemp(dir=tmp_dir) + with open(tmp_file, 'wb') as f: + f.write(b'*' * 1024) + yield tmp_dir + + +@pytest.yield_fixture() +def temp_file_1024_delete(): + """ + Create a temporary file of a particular size (1024 bytes). + The temporary file will be deleted after fixture usage. + """ + + with tempfile.NamedTemporaryFile() as tmp_file: + tmp_file.write(b'*' * 1024) + tmp_file.flush() + yield tmp_file.name + + # pytest > 2.9 only needs @pytest.fixture @pytest.yield_fixture(scope='session') def custom_zw(): - zw = common.ZipWriter(zip_filename=common.random_string(4, 6)) + zw = common.ZipWriter( + zip_filename=common.random_string(4, 6), + processed_size_callback=lambda _: 'custom_callback' + ) yield zw zw.close() os.remove(zw.zip_filename) @@ -65,7 +96,8 @@ def default_zw(): zw = common.ZipWriter() yield zw zw.close() - os.remove(zw.zip_filename) + tmp_dir = os.path.dirname(zw.zip_filename) + shutil.rmtree(tmp_dir) @pytest.fixture @@ -106,16 +138,16 @@ def sys_frozen(monkeypatch): @pytest.fixture def sys_meipass(monkeypatch): monkeypatch.setattr( - sys, '_MEIPASS', os.path.expanduser('~'), raising=False) + 'sys._MEIPASS', os.path.expanduser('~'), raising=False) @pytest.fixture def sys_onionshare_dev_mode(monkeypatch): - monkeypatch.setattr(sys, 'onionshare_dev_mode', True, raising=False) + monkeypatch.setattr('sys.onionshare_dev_mode', True, raising=False) @pytest.fixture -def time_100(monkeypatch): +def time_time_100(monkeypatch): monkeypatch.setattr('time.time', lambda: 100) @@ -169,28 +201,16 @@ class TestBuildSlug: assert bool(SLUG_REGEX.match(test_input)) == expected def test_build_slug_unique(self, sys_onionshare_dev_mode): - # fixture for common.get_resource??? assert common.build_slug() != common.build_slug() -@pytest.mark.parametrize('directory_size', (5, 500, 5000)) -def test_dir_size(directory_size): - """ dir_size() should return the total size (in bytes) of all files - in a particular directory. +class TestDirSize: + def test_temp_dir_size(self, temp_dir_1024_delete): + """ dir_size() should return the total size (in bytes) of all files + in a particular directory. + """ - This test creates a temporary directory with a single file of a - particular size. After the test is complete, it deletes the - temporary directory. - """ - - # TODO: use helper function to create temporary file? - tmp_dir = tempfile.mkdtemp() - with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tmp_file: - tmp_file.write(b'*' * directory_size) - - # tempfile.TemporaryDirectory raised error when given to `dir_size` - assert common.dir_size(tmp_dir) == directory_size - shutil.rmtree(tmp_dir) + assert common.dir_size(temp_dir_1024_delete) == 1024 class TestEstimatedTimeRemaining: @@ -203,25 +223,16 @@ class TestEstimatedTimeRemaining: ((603, 949, 38), '36s'), ((971, 1009, 83), '1s') )) - def test_estimated_time_remaining(self, test_input, expected, time_100): + def test_estimated_time_remaining(self, test_input, expected, time_time_100): assert common.estimated_time_remaining(*test_input) == expected - # TODO: merge these two? parametrize? - def test_raises_zero_division_error(self, time_100): - """ estimated_time_remaining() raises a ZeroDivisionError if - `time_elapsed` == 0 - """ - + @pytest.mark.parametrize('test_input', ( + (10, 20, 100), # if `time_elapsed == 0` + (0, 37, 99) # if `download_rate == 0` + )) + def test_raises_zero_division_error(self, test_input, time_time_100): with pytest.raises(ZeroDivisionError): - common.estimated_time_remaining(10, 20, 100) - - def test_raises_zero_division_error_2(self, time_100): - """ estimated_time_remaining() raises a ZeroDivision error if - `download_rate` == 0 - """ - - with pytest.raises(ZeroDivisionError): - common.estimated_time_remaining(0, 37, 99) + common.estimated_time_remaining(*test_input) class TestFormatSeconds: @@ -252,20 +263,20 @@ class TestFormatSeconds: common.format_seconds(test_input) -@pytest.mark.parametrize('port_min,port_max', ( - (random.randint(1024, 1500), - random.randint(1800, 2048)) for _ in range(50) -)) -def test_get_available_port_returns_an_open_port(port_min, port_max): - """ get_available_port() should return an open port within the range """ +class TestGetAvailablePort: + @pytest.mark.parametrize('port_min,port_max', ( + (random.randint(1024, 1500), + random.randint(1800, 2048)) for _ in range(50) + )) + def test_returns_an_open_port(self, port_min, port_max): + """ get_available_port() should return an open port within the range """ - port = common.get_available_port(port_min, port_max) - assert port_min <= port <= port_max - with socket.socket() as tmpsock: - tmpsock.bind(('127.0.0.1', port)) + port = common.get_available_port(port_min, port_max) + assert port_min <= port <= port_max + with socket.socket() as tmpsock: + tmpsock.bind(('127.0.0.1', port)) -# TODO: is there a way to parametrize (fixture, expected)? class TestGetPlatform: def test_darwin(self, platform_darwin): assert common.get_platform() == 'Darwin' @@ -309,7 +320,6 @@ class TestGetResourcePath: os.path.join(prefix, 'test_filename')) -# @pytest.mark.usefixtures('platform_darwin', 'platform_linux', 'platform_windows') class TestGetTorPaths: # @pytest.mark.skipif(sys.platform != 'Darwin', reason='requires MacOS') ? def test_get_tor_paths_darwin(self, platform_darwin, sys_frozen, sys_meipass): @@ -341,50 +351,55 @@ class TestGetTorPaths: (tor_path, tor_geo_ip_file_path, tor_geo_ipv6_file_path)) -def test_get_version(sys_onionshare_dev_mode): - with open(common.get_resource_path('version.txt')) as f: - version = f.read().strip() +class TestGetVersion: + def test_get_version(self, sys_onionshare_dev_mode): + with open(common.get_resource_path('version.txt')) as f: + version = f.read().strip() - assert version == common.get_version() + assert version == common.get_version() -@pytest.mark.parametrize('test_input,expected', ( - (1024 ** 0, '1.0 B'), - (1024 ** 1, '1.0 KiB'), - (1024 ** 2, '1.0 MiB'), - (1024 ** 3, '1.0 GiB'), - (1024 ** 4, '1.0 TiB'), - (1024 ** 5, '1.0 PiB'), - (1024 ** 6, '1.0 EiB'), - (1024 ** 7, '1.0 ZiB'), - (1024 ** 8, '1.0 YiB') -)) -def test_human_readable_filesize(test_input, expected): - assert common.human_readable_filesize(test_input) == expected +class TestHumanReadableFilesize: + @pytest.mark.parametrize('test_input,expected', ( + (1024 ** 0, '1.0 B'), + (1024 ** 1, '1.0 KiB'), + (1024 ** 2, '1.0 MiB'), + (1024 ** 3, '1.0 GiB'), + (1024 ** 4, '1.0 TiB'), + (1024 ** 5, '1.0 PiB'), + (1024 ** 6, '1.0 EiB'), + (1024 ** 7, '1.0 ZiB'), + (1024 ** 8, '1.0 YiB') + )) + def test_human_readable_filesize(self, test_input, expected): + assert common.human_readable_filesize(test_input) == expected -def test_log(set_debug_true, time_strftime): - def test_func(): - pass +class TestLog: + @pytest.mark.parametrize('test_input', ( + ('[Jun 06 2013 11:05:00]' + ' TestModule..dummy_func' + ' at 0xdeadbeef>'), + ('[Jun 06 2013 11:05:00]' + ' TestModule..dummy_func' + ' at 0xdeadbeef>: TEST_MSG') + )) + def test_log_msg_regex(self, test_input): + assert bool(LOG_MSG_REGEX.match(test_input)) - # From: https://stackoverflow.com/questions/1218933 - with io.StringIO() as buf, contextlib.redirect_stdout(buf): - common.log('TestModule', test_func) - common.log('TestModule', test_func, 'TEST_MSG') - output = buf.getvalue() + def test_output(self, set_debug_true, time_strftime): + def dummy_func(): + pass - line_one, line_two, _ = output.split('\n') - assert LOG_MSG_REGEX.match(line_one) - assert LOG_MSG_REGEX.match(line_two) + # From: https://stackoverflow.com/questions/1218933 + with io.StringIO() as buf, contextlib.redirect_stdout(buf): + common.log('TestModule', dummy_func) + common.log('TestModule', dummy_func, 'TEST_MSG') + output = buf.getvalue() - -@pytest.mark.parametrize('test_input,expected', ( - (common.random_string(random.randint(2, 50), - random.choice((None, random.randint(2, 50))) - ), True) for _ in range(50) -)) -def test_random_string_regex(test_input, expected): - assert bool(RANDOM_STR_REGEX.match(test_input)) == expected + line_one, line_two, _ = output.split('\n') + assert LOG_MSG_REGEX.match(line_one) + assert LOG_MSG_REGEX.match(line_two) class TestSetDebug: @@ -397,8 +412,18 @@ class TestSetDebug: assert common.debug is False -# TODO: ZipWriter doesn't enforce the `.zip` extension with custom filename -class TestDefaultZipWriter: +class TestZipWriterDefault: + @pytest.mark.parametrize('test_input', ( + 'onionshare_{}.zip'.format(''.join( + random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6)) + ) for _ in range(50) + )) + def test_default_zw_filename_regex(self, test_input): + assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input)) + + def test_init(self, default_zw): + pass # TODO: + def test_zw_filename(self, default_zw): zw_filename = os.path.basename(default_zw.zip_filename) assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename)) @@ -415,55 +440,32 @@ class TestDefaultZipWriter: def test_callback(self, default_zw): assert default_zw.processed_size_callback(None) is None - def test_add_file(self, default_zw): - tmp_file_size = 1000 - # TODO: use helper function to create temporary file? - with tempfile.NamedTemporaryFile(delete=False) as tmp_file: - tmp_file.write(b'*' * tmp_file_size) + def test_add_file(self, default_zw, temp_file_1024_delete): + default_zw.add_file(temp_file_1024_delete) + zipfile_info = default_zw.z.getinfo( + os.path.basename(temp_file_1024_delete)) - tmp_file_path = tmp_file.name - default_zw.add_file(tmp_file_path) - - zipfile_info = default_zw.z.getinfo(os.path.basename(tmp_file_path)) assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED - assert zipfile_info.file_size == tmp_file_size + assert zipfile_info.file_size == 1024 - os.remove(tmp_file_path) - assert os.path.exists(tmp_file_path) is False - - def test_add_directory(self, default_zw): - directory_size = 1000 - tmp_dir = create_temporary_directory(directory_size) - current_size = default_zw._size - default_zw.add_dir(tmp_dir) - - assert default_zw._size == current_size + directory_size - shutil.rmtree(tmp_dir) - assert os.path.exists(tmp_dir) is False + def test_add_directory(self, temp_dir_1024_delete, default_zw): + previous_size = default_zw._size # size before adding directory + default_zw.add_dir(temp_dir_1024_delete) + assert default_zw._size == previous_size + 1024 -def test_zip_writer_custom_filename(custom_zw): - assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename)) +class TestZipWriterCustom: + @pytest.mark.parametrize('test_input', ( + common.random_string( + random.randint(2, 50), + random.choice((None, random.randint(2, 50))) + ) for _ in range(50) + )) + def test_random_string_regex(self, test_input): + assert bool(RANDOM_STR_REGEX.match(test_input)) + def test_custom_filename(self, custom_zw): + assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename)) -def create_temporary_directory(directory_size): - """ Create a temporary directory with a single file of a - particular size. Return directory path as a string - """ - - tmp_dir = tempfile.mkdtemp() - # create_temporary_file(directory=tmp_dir) - - with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tmp_file: - tmp_file.write(b'*' * directory_size) - - return tmp_dir - - -# TODO: rewrite this helper function to DRY up tests that use temporary files -# def create_temporary_file(directory=None, delete=False, file_size=100): -# if file_size <= 0: -# file_size = 100 -# with tempfile.NamedTemporaryFile(dir=directory, delete=delete) as tmp_file: -# tmp_file.write(b'*' * file_size) -# return tmp_file.name + def test_custom_callback(self, custom_zw): + assert custom_zw.processed_size_callback(None) == 'custom_callback'