From 2e6538b7f8ab9df013e0e1cb438a7c068e94d7bc Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Thu, 8 Mar 2018 05:45:07 -0800 Subject: [PATCH] Move ZipWriter from common into web, because that's the only place it's used --- onionshare/common.py | 49 ------------------ onionshare/web.py | 51 ++++++++++++++++++- test/conftest.py | 6 +-- test/test_onionshare_common.py | 58 ---------------------- test/test_onionshare_web.py | 90 ++++++++++++++++++++++++++++++++++ 5 files changed, 143 insertions(+), 111 deletions(-) create mode 100644 test/test_onionshare_web.py diff --git a/onionshare/common.py b/onionshare/common.py index 0d00c7b1..218d5e6c 100644 --- a/onionshare/common.py +++ b/onionshare/common.py @@ -28,7 +28,6 @@ import sys import tempfile import threading import time -import zipfile debug = False @@ -226,54 +225,6 @@ def dir_size(start_path): return total_size -class ZipWriter(object): - """ - ZipWriter accepts files and directories and compresses them into a zip file - with. If a zip_filename is not passed in, it will use the default onionshare - filename. - """ - def __init__(self, zip_filename=None, processed_size_callback=None): - if zip_filename: - self.zip_filename = zip_filename - else: - self.zip_filename = '{0:s}/onionshare_{1:s}.zip'.format(tempfile.mkdtemp(), random_string(4, 6)) - - self.z = zipfile.ZipFile(self.zip_filename, 'w', allowZip64=True) - self.processed_size_callback = processed_size_callback - if self.processed_size_callback is None: - self.processed_size_callback = lambda _: None - self._size = 0 - self.processed_size_callback(self._size) - - def add_file(self, filename): - """ - Add a file to the zip archive. - """ - self.z.write(filename, os.path.basename(filename), zipfile.ZIP_DEFLATED) - self._size += os.path.getsize(filename) - self.processed_size_callback(self._size) - - def add_dir(self, filename): - """ - Add a directory, and all of its children, to the zip archive. - """ - dir_to_strip = os.path.dirname(filename.rstrip('/'))+'/' - for dirpath, dirnames, filenames in os.walk(filename): - for f in filenames: - full_filename = os.path.join(dirpath, f) - if not os.path.islink(full_filename): - arc_filename = full_filename[len(dir_to_strip):] - self.z.write(full_filename, arc_filename, zipfile.ZIP_DEFLATED) - self._size += os.path.getsize(full_filename) - self.processed_size_callback(self._size) - - def close(self): - """ - Close the zip archive. - """ - self.z.close() - - class close_after_seconds(threading.Thread): """ Background thread sleeps t hours and returns. diff --git a/onionshare/web.py b/onionshare/web.py index 68684651..c797a9e4 100644 --- a/onionshare/web.py +++ b/onionshare/web.py @@ -26,6 +26,7 @@ import queue import socket import sys import tempfile +import zipfile from distutils.version import LooseVersion as Version from urllib.request import urlopen @@ -324,7 +325,7 @@ class Web(object): self.file_info['dirs'] = sorted(self.file_info['dirs'], key=lambda k: k['basename']) # zip up the files and folders - z = common.ZipWriter(processed_size_callback=processed_size_callback) + z = ZipWriter(processed_size_callback=processed_size_callback) for info in self.file_info['files']: z.add_file(info['filename']) for info in self.file_info['dirs']: @@ -415,3 +416,51 @@ class Web(object): urlopen('http://127.0.0.1:{0:d}/{1:s}/shutdown'.format(port, shutdown_slug)).read() except: pass + + +class ZipWriter(object): + """ + ZipWriter accepts files and directories and compresses them into a zip file + with. If a zip_filename is not passed in, it will use the default onionshare + filename. + """ + def __init__(self, zip_filename=None, processed_size_callback=None): + if zip_filename: + self.zip_filename = zip_filename + else: + self.zip_filename = '{0:s}/onionshare_{1:s}.zip'.format(tempfile.mkdtemp(), common.random_string(4, 6)) + + self.z = zipfile.ZipFile(self.zip_filename, 'w', allowZip64=True) + self.processed_size_callback = processed_size_callback + if self.processed_size_callback is None: + self.processed_size_callback = lambda _: None + self._size = 0 + self.processed_size_callback(self._size) + + def add_file(self, filename): + """ + Add a file to the zip archive. + """ + self.z.write(filename, os.path.basename(filename), zipfile.ZIP_DEFLATED) + self._size += os.path.getsize(filename) + self.processed_size_callback(self._size) + + def add_dir(self, filename): + """ + Add a directory, and all of its children, to the zip archive. + """ + dir_to_strip = os.path.dirname(filename.rstrip('/'))+'/' + for dirpath, dirnames, filenames in os.walk(filename): + for f in filenames: + full_filename = os.path.join(dirpath, f) + if not os.path.islink(full_filename): + arc_filename = full_filename[len(dir_to_strip):] + self.z.write(full_filename, arc_filename, zipfile.ZIP_DEFLATED) + self._size += os.path.getsize(full_filename) + self.processed_size_callback(self._size) + + def close(self): + """ + Close the zip archive. + """ + self.z.close() diff --git a/test/conftest.py b/test/conftest.py index 8f10162b..88a7c054 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -8,7 +8,7 @@ import tempfile import pytest -from onionshare import common +from onionshare import common, web @pytest.fixture def temp_dir_1024(): @@ -64,7 +64,7 @@ def temp_file_1024_delete(): # pytest > 2.9 only needs @pytest.fixture @pytest.yield_fixture(scope='session') def custom_zw(): - zw = common.ZipWriter( + zw = web.ZipWriter( zip_filename=common.random_string(4, 6), processed_size_callback=lambda _: 'custom_callback' ) @@ -76,7 +76,7 @@ def custom_zw(): # pytest > 2.9 only needs @pytest.fixture @pytest.yield_fixture(scope='session') def default_zw(): - zw = common.ZipWriter() + zw = web.ZipWriter() yield zw zw.close() tmp_dir = os.path.dirname(zw.zip_filename) diff --git a/test/test_onionshare_common.py b/test/test_onionshare_common.py index cb864313..66e4a808 100644 --- a/test/test_onionshare_common.py +++ b/test/test_onionshare_common.py @@ -31,12 +31,10 @@ import pytest from onionshare import common -DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$') LOG_MSG_REGEX = re.compile(r""" ^\[Jun\ 06\ 2013\ 11:05:00\] \ TestModule\.\.dummy_func \ at\ 0x[a-f0-9]+>(:\ TEST_MSG)?$""", re.VERBOSE) -RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$') SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$') @@ -296,59 +294,3 @@ class TestSetDebug: def test_debug_false(self, set_debug_true): common.set_debug(False) assert common.debug is False - - -class TestZipWriterDefault: - @pytest.mark.parametrize('test_input', ( - 'onionshare_{}.zip'.format(''.join( - random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6) - )) for _ in range(50) - )) - def test_default_zw_filename_regex(self, test_input): - assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input)) - - def test_zw_filename(self, default_zw): - zw_filename = os.path.basename(default_zw.zip_filename) - assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename)) - - def test_zipfile_filename_matches_zipwriter_filename(self, default_zw): - assert default_zw.z.filename == default_zw.zip_filename - - def test_zipfile_allow_zip64(self, default_zw): - assert default_zw.z._allowZip64 is True - - def test_zipfile_mode(self, default_zw): - assert default_zw.z.mode == 'w' - - def test_callback(self, default_zw): - assert default_zw.processed_size_callback(None) is None - - def test_add_file(self, default_zw, temp_file_1024_delete): - default_zw.add_file(temp_file_1024_delete) - zipfile_info = default_zw.z.getinfo( - os.path.basename(temp_file_1024_delete)) - - assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED - assert zipfile_info.file_size == 1024 - - def test_add_directory(self, temp_dir_1024_delete, default_zw): - previous_size = default_zw._size # size before adding directory - default_zw.add_dir(temp_dir_1024_delete) - assert default_zw._size == previous_size + 1024 - - -class TestZipWriterCustom: - @pytest.mark.parametrize('test_input', ( - common.random_string( - random.randint(2, 50), - random.choice((None, random.randint(2, 50))) - ) for _ in range(50) - )) - def test_random_string_regex(self, test_input): - assert bool(RANDOM_STR_REGEX.match(test_input)) - - def test_custom_filename(self, custom_zw): - assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename)) - - def test_custom_callback(self, custom_zw): - assert custom_zw.processed_size_callback(None) == 'custom_callback' diff --git a/test/test_onionshare_web.py b/test/test_onionshare_web.py new file mode 100644 index 00000000..55ff1f85 --- /dev/null +++ b/test/test_onionshare_web.py @@ -0,0 +1,90 @@ +""" +OnionShare | https://onionshare.org/ + +Copyright (C) 2017 Micah Lee + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import contextlib +import inspect +import io +import os +import random +import re +import socket +import sys +import zipfile + +import pytest + +from onionshare import common + +DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$') +RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$') + +class TestZipWriterDefault: + @pytest.mark.parametrize('test_input', ( + 'onionshare_{}.zip'.format(''.join( + random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6) + )) for _ in range(50) + )) + def test_default_zw_filename_regex(self, test_input): + assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input)) + + def test_zw_filename(self, default_zw): + zw_filename = os.path.basename(default_zw.zip_filename) + assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename)) + + def test_zipfile_filename_matches_zipwriter_filename(self, default_zw): + assert default_zw.z.filename == default_zw.zip_filename + + def test_zipfile_allow_zip64(self, default_zw): + assert default_zw.z._allowZip64 is True + + def test_zipfile_mode(self, default_zw): + assert default_zw.z.mode == 'w' + + def test_callback(self, default_zw): + assert default_zw.processed_size_callback(None) is None + + def test_add_file(self, default_zw, temp_file_1024_delete): + default_zw.add_file(temp_file_1024_delete) + zipfile_info = default_zw.z.getinfo( + os.path.basename(temp_file_1024_delete)) + + assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED + assert zipfile_info.file_size == 1024 + + def test_add_directory(self, temp_dir_1024_delete, default_zw): + previous_size = default_zw._size # size before adding directory + default_zw.add_dir(temp_dir_1024_delete) + assert default_zw._size == previous_size + 1024 + + +class TestZipWriterCustom: + @pytest.mark.parametrize('test_input', ( + common.random_string( + random.randint(2, 50), + random.choice((None, random.randint(2, 50))) + ) for _ in range(50) + )) + def test_random_string_regex(self, test_input): + assert bool(RANDOM_STR_REGEX.match(test_input)) + + def test_custom_filename(self, custom_zw): + assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename)) + + def test_custom_callback(self, custom_zw): + assert custom_zw.processed_size_callback(None) == 'custom_callback'