Move ZipWriter from common into web, because that's the only place it's used

This commit is contained in:
Micah Lee 2018-03-08 05:45:07 -08:00
parent 2e31db8543
commit 2e6538b7f8
No known key found for this signature in database
GPG Key ID: 403C2657CD994F73
5 changed files with 143 additions and 111 deletions

View File

@ -28,7 +28,6 @@ import sys
import tempfile import tempfile
import threading import threading
import time import time
import zipfile
debug = False debug = False
@ -226,54 +225,6 @@ def dir_size(start_path):
return total_size return total_size
class ZipWriter(object):
"""
ZipWriter accepts files and directories and compresses them into a zip file
with. If a zip_filename is not passed in, it will use the default onionshare
filename.
"""
def __init__(self, zip_filename=None, processed_size_callback=None):
if zip_filename:
self.zip_filename = zip_filename
else:
self.zip_filename = '{0:s}/onionshare_{1:s}.zip'.format(tempfile.mkdtemp(), random_string(4, 6))
self.z = zipfile.ZipFile(self.zip_filename, 'w', allowZip64=True)
self.processed_size_callback = processed_size_callback
if self.processed_size_callback is None:
self.processed_size_callback = lambda _: None
self._size = 0
self.processed_size_callback(self._size)
def add_file(self, filename):
"""
Add a file to the zip archive.
"""
self.z.write(filename, os.path.basename(filename), zipfile.ZIP_DEFLATED)
self._size += os.path.getsize(filename)
self.processed_size_callback(self._size)
def add_dir(self, filename):
"""
Add a directory, and all of its children, to the zip archive.
"""
dir_to_strip = os.path.dirname(filename.rstrip('/'))+'/'
for dirpath, dirnames, filenames in os.walk(filename):
for f in filenames:
full_filename = os.path.join(dirpath, f)
if not os.path.islink(full_filename):
arc_filename = full_filename[len(dir_to_strip):]
self.z.write(full_filename, arc_filename, zipfile.ZIP_DEFLATED)
self._size += os.path.getsize(full_filename)
self.processed_size_callback(self._size)
def close(self):
"""
Close the zip archive.
"""
self.z.close()
class close_after_seconds(threading.Thread): class close_after_seconds(threading.Thread):
""" """
Background thread sleeps t hours and returns. Background thread sleeps t hours and returns.

View File

@ -26,6 +26,7 @@ import queue
import socket import socket
import sys import sys
import tempfile import tempfile
import zipfile
from distutils.version import LooseVersion as Version from distutils.version import LooseVersion as Version
from urllib.request import urlopen from urllib.request import urlopen
@ -324,7 +325,7 @@ class Web(object):
self.file_info['dirs'] = sorted(self.file_info['dirs'], key=lambda k: k['basename']) self.file_info['dirs'] = sorted(self.file_info['dirs'], key=lambda k: k['basename'])
# zip up the files and folders # zip up the files and folders
z = common.ZipWriter(processed_size_callback=processed_size_callback) z = ZipWriter(processed_size_callback=processed_size_callback)
for info in self.file_info['files']: for info in self.file_info['files']:
z.add_file(info['filename']) z.add_file(info['filename'])
for info in self.file_info['dirs']: for info in self.file_info['dirs']:
@ -415,3 +416,51 @@ class Web(object):
urlopen('http://127.0.0.1:{0:d}/{1:s}/shutdown'.format(port, shutdown_slug)).read() urlopen('http://127.0.0.1:{0:d}/{1:s}/shutdown'.format(port, shutdown_slug)).read()
except: except:
pass pass
class ZipWriter(object):
"""
ZipWriter accepts files and directories and compresses them into a zip file
with. If a zip_filename is not passed in, it will use the default onionshare
filename.
"""
def __init__(self, zip_filename=None, processed_size_callback=None):
if zip_filename:
self.zip_filename = zip_filename
else:
self.zip_filename = '{0:s}/onionshare_{1:s}.zip'.format(tempfile.mkdtemp(), common.random_string(4, 6))
self.z = zipfile.ZipFile(self.zip_filename, 'w', allowZip64=True)
self.processed_size_callback = processed_size_callback
if self.processed_size_callback is None:
self.processed_size_callback = lambda _: None
self._size = 0
self.processed_size_callback(self._size)
def add_file(self, filename):
"""
Add a file to the zip archive.
"""
self.z.write(filename, os.path.basename(filename), zipfile.ZIP_DEFLATED)
self._size += os.path.getsize(filename)
self.processed_size_callback(self._size)
def add_dir(self, filename):
"""
Add a directory, and all of its children, to the zip archive.
"""
dir_to_strip = os.path.dirname(filename.rstrip('/'))+'/'
for dirpath, dirnames, filenames in os.walk(filename):
for f in filenames:
full_filename = os.path.join(dirpath, f)
if not os.path.islink(full_filename):
arc_filename = full_filename[len(dir_to_strip):]
self.z.write(full_filename, arc_filename, zipfile.ZIP_DEFLATED)
self._size += os.path.getsize(full_filename)
self.processed_size_callback(self._size)
def close(self):
"""
Close the zip archive.
"""
self.z.close()

View File

@ -8,7 +8,7 @@ import tempfile
import pytest import pytest
from onionshare import common from onionshare import common, web
@pytest.fixture @pytest.fixture
def temp_dir_1024(): def temp_dir_1024():
@ -64,7 +64,7 @@ def temp_file_1024_delete():
# pytest > 2.9 only needs @pytest.fixture # pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture(scope='session') @pytest.yield_fixture(scope='session')
def custom_zw(): def custom_zw():
zw = common.ZipWriter( zw = web.ZipWriter(
zip_filename=common.random_string(4, 6), zip_filename=common.random_string(4, 6),
processed_size_callback=lambda _: 'custom_callback' processed_size_callback=lambda _: 'custom_callback'
) )
@ -76,7 +76,7 @@ def custom_zw():
# pytest > 2.9 only needs @pytest.fixture # pytest > 2.9 only needs @pytest.fixture
@pytest.yield_fixture(scope='session') @pytest.yield_fixture(scope='session')
def default_zw(): def default_zw():
zw = common.ZipWriter() zw = web.ZipWriter()
yield zw yield zw
zw.close() zw.close()
tmp_dir = os.path.dirname(zw.zip_filename) tmp_dir = os.path.dirname(zw.zip_filename)

View File

@ -31,12 +31,10 @@ import pytest
from onionshare import common from onionshare import common
DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$')
LOG_MSG_REGEX = re.compile(r""" LOG_MSG_REGEX = re.compile(r"""
^\[Jun\ 06\ 2013\ 11:05:00\] ^\[Jun\ 06\ 2013\ 11:05:00\]
\ TestModule\.<function\ TestLog\.test_output\.<locals>\.dummy_func \ TestModule\.<function\ TestLog\.test_output\.<locals>\.dummy_func
\ at\ 0x[a-f0-9]+>(:\ TEST_MSG)?$""", re.VERBOSE) \ at\ 0x[a-f0-9]+>(:\ TEST_MSG)?$""", re.VERBOSE)
RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$')
SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$') SLUG_REGEX = re.compile(r'^([a-z]+)(-[a-z]+)?-([a-z]+)(-[a-z]+)?$')
@ -296,59 +294,3 @@ class TestSetDebug:
def test_debug_false(self, set_debug_true): def test_debug_false(self, set_debug_true):
common.set_debug(False) common.set_debug(False)
assert common.debug is False assert common.debug is False
class TestZipWriterDefault:
@pytest.mark.parametrize('test_input', (
'onionshare_{}.zip'.format(''.join(
random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6)
)) for _ in range(50)
))
def test_default_zw_filename_regex(self, test_input):
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input))
def test_zw_filename(self, default_zw):
zw_filename = os.path.basename(default_zw.zip_filename)
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename))
def test_zipfile_filename_matches_zipwriter_filename(self, default_zw):
assert default_zw.z.filename == default_zw.zip_filename
def test_zipfile_allow_zip64(self, default_zw):
assert default_zw.z._allowZip64 is True
def test_zipfile_mode(self, default_zw):
assert default_zw.z.mode == 'w'
def test_callback(self, default_zw):
assert default_zw.processed_size_callback(None) is None
def test_add_file(self, default_zw, temp_file_1024_delete):
default_zw.add_file(temp_file_1024_delete)
zipfile_info = default_zw.z.getinfo(
os.path.basename(temp_file_1024_delete))
assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED
assert zipfile_info.file_size == 1024
def test_add_directory(self, temp_dir_1024_delete, default_zw):
previous_size = default_zw._size # size before adding directory
default_zw.add_dir(temp_dir_1024_delete)
assert default_zw._size == previous_size + 1024
class TestZipWriterCustom:
@pytest.mark.parametrize('test_input', (
common.random_string(
random.randint(2, 50),
random.choice((None, random.randint(2, 50)))
) for _ in range(50)
))
def test_random_string_regex(self, test_input):
assert bool(RANDOM_STR_REGEX.match(test_input))
def test_custom_filename(self, custom_zw):
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename))
def test_custom_callback(self, custom_zw):
assert custom_zw.processed_size_callback(None) == 'custom_callback'

View File

@ -0,0 +1,90 @@
"""
OnionShare | https://onionshare.org/
Copyright (C) 2017 Micah Lee <micah@micahflee.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import contextlib
import inspect
import io
import os
import random
import re
import socket
import sys
import zipfile
import pytest
from onionshare import common
DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$')
RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$')
class TestZipWriterDefault:
@pytest.mark.parametrize('test_input', (
'onionshare_{}.zip'.format(''.join(
random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6)
)) for _ in range(50)
))
def test_default_zw_filename_regex(self, test_input):
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input))
def test_zw_filename(self, default_zw):
zw_filename = os.path.basename(default_zw.zip_filename)
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename))
def test_zipfile_filename_matches_zipwriter_filename(self, default_zw):
assert default_zw.z.filename == default_zw.zip_filename
def test_zipfile_allow_zip64(self, default_zw):
assert default_zw.z._allowZip64 is True
def test_zipfile_mode(self, default_zw):
assert default_zw.z.mode == 'w'
def test_callback(self, default_zw):
assert default_zw.processed_size_callback(None) is None
def test_add_file(self, default_zw, temp_file_1024_delete):
default_zw.add_file(temp_file_1024_delete)
zipfile_info = default_zw.z.getinfo(
os.path.basename(temp_file_1024_delete))
assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED
assert zipfile_info.file_size == 1024
def test_add_directory(self, temp_dir_1024_delete, default_zw):
previous_size = default_zw._size # size before adding directory
default_zw.add_dir(temp_dir_1024_delete)
assert default_zw._size == previous_size + 1024
class TestZipWriterCustom:
@pytest.mark.parametrize('test_input', (
common.random_string(
random.randint(2, 50),
random.choice((None, random.randint(2, 50)))
) for _ in range(50)
))
def test_random_string_regex(self, test_input):
assert bool(RANDOM_STR_REGEX.match(test_input))
def test_custom_filename(self, custom_zw):
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename))
def test_custom_callback(self, custom_zw):
assert custom_zw.processed_size_callback(None) == 'custom_callback'