mirror of
https://github.com/onionshare/onionshare.git
synced 2025-01-03 19:51:05 -05:00
275 lines
9.0 KiB
Python
275 lines
9.0 KiB
Python
import os
|
|
import random
|
|
import re
|
|
import zipfile
|
|
import tempfile
|
|
import base64
|
|
from io import BytesIO
|
|
|
|
import pytest
|
|
from werkzeug.datastructures import Headers
|
|
|
|
from onionshare_cli.common import Common
|
|
from onionshare_cli.web import Web
|
|
from onionshare_cli.settings import Settings
|
|
from onionshare_cli.mode_settings import ModeSettings
|
|
import onionshare_cli.web.receive_mode
|
|
|
|
# Stub requests.post, for receive mode webhook tests
|
|
webhook_url = None
|
|
webhook_data = None
|
|
|
|
|
|
def requests_post_stub(url, data, timeout, proxies):
|
|
global webhook_url, webhook_data
|
|
webhook_url = url
|
|
webhook_data = data
|
|
|
|
|
|
onionshare_cli.web.receive_mode.requests.post = requests_post_stub
|
|
|
|
|
|
DEFAULT_ZW_FILENAME_REGEX = re.compile(r"^onionshare_[a-z2-7]{6}.zip$")
|
|
RANDOM_STR_REGEX = re.compile(r"^[a-z2-7]+$")
|
|
|
|
|
|
def web_obj(temp_dir, common_obj, mode, num_files=0):
|
|
""" Creates a Web object, in either share mode or receive mode, ready for testing """
|
|
common_obj.settings = Settings(common_obj)
|
|
mode_settings = ModeSettings(common_obj)
|
|
web = Web(common_obj, False, mode_settings, mode)
|
|
web.generate_password()
|
|
web.running = True
|
|
|
|
web.app.testing = True
|
|
|
|
# Share mode
|
|
if mode == "share":
|
|
# Add files
|
|
files = []
|
|
for _ in range(num_files):
|
|
with tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) as tmp_file:
|
|
tmp_file.write(b"*" * 1024)
|
|
files.append(tmp_file.name)
|
|
web.share_mode.set_file_info(files)
|
|
# Receive mode
|
|
else:
|
|
pass
|
|
|
|
return web
|
|
|
|
|
|
class TestWeb:
|
|
def test_share_mode(self, temp_dir, common_obj):
|
|
web = web_obj(temp_dir, common_obj, "share", 3)
|
|
assert web.mode == "share"
|
|
with web.app.test_client() as c:
|
|
# Load / without auth
|
|
res = c.get("/")
|
|
res.get_data()
|
|
assert res.status_code == 401
|
|
|
|
# Load / with invalid auth
|
|
res = c.get("/", headers=self._make_auth_headers("invalid"))
|
|
res.get_data()
|
|
assert res.status_code == 401
|
|
|
|
# Load / with valid auth
|
|
res = c.get("/", headers=self._make_auth_headers(web.password))
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
|
|
# Download
|
|
res = c.get("/download", headers=self._make_auth_headers(web.password))
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
assert (
|
|
res.mimetype == "application/zip"
|
|
or res.mimetype == "application/x-zip-compressed"
|
|
)
|
|
|
|
def test_share_mode_autostop_sharing_on(self, temp_dir, common_obj, temp_file_1024):
|
|
web = web_obj(temp_dir, common_obj, "share", 3)
|
|
web.settings.set("share", "autostop_sharing", True)
|
|
|
|
assert web.running == True
|
|
|
|
with web.app.test_client() as c:
|
|
# Download the first time
|
|
res = c.get("/download", headers=self._make_auth_headers(web.password))
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
assert (
|
|
res.mimetype == "application/zip"
|
|
or res.mimetype == "application/x-zip-compressed"
|
|
)
|
|
|
|
assert web.running == False
|
|
|
|
def test_share_mode_autostop_sharing_off(
|
|
self, temp_dir, common_obj, temp_file_1024
|
|
):
|
|
web = web_obj(temp_dir, common_obj, "share", 3)
|
|
web.settings.set("share", "autostop_sharing", False)
|
|
|
|
assert web.running == True
|
|
|
|
with web.app.test_client() as c:
|
|
# Download the first time
|
|
res = c.get("/download", headers=self._make_auth_headers(web.password))
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
assert (
|
|
res.mimetype == "application/zip"
|
|
or res.mimetype == "application/x-zip-compressed"
|
|
)
|
|
assert web.running == True
|
|
|
|
def test_receive_mode(self, temp_dir, common_obj):
|
|
web = web_obj(temp_dir, common_obj, "receive")
|
|
assert web.mode == "receive"
|
|
|
|
with web.app.test_client() as c:
|
|
# Load / without auth
|
|
res = c.get("/")
|
|
res.get_data()
|
|
assert res.status_code == 401
|
|
|
|
# Load / with invalid auth
|
|
res = c.get("/", headers=self._make_auth_headers("invalid"))
|
|
res.get_data()
|
|
assert res.status_code == 401
|
|
|
|
# Load / with valid auth
|
|
res = c.get("/", headers=self._make_auth_headers(web.password))
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
|
|
def test_receive_mode_webhook(self, temp_dir, common_obj):
|
|
global webhook_url, webhook_data
|
|
webhook_url = None
|
|
webhook_data = None
|
|
|
|
web = web_obj(temp_dir, common_obj, "receive")
|
|
assert web.mode == "receive"
|
|
web.settings.set("receive", "webhook_url", "http://127.0.0.1:1337/example")
|
|
web.proxies = None
|
|
assert (
|
|
web.settings.get("receive", "webhook_url")
|
|
== "http://127.0.0.1:1337/example"
|
|
)
|
|
|
|
with web.app.test_client() as c:
|
|
res = c.get("/", headers=self._make_auth_headers(web.password))
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
|
|
res = c.post(
|
|
"/upload-ajax",
|
|
buffered=True,
|
|
content_type="multipart/form-data",
|
|
data={"file[]": (BytesIO(b"THIS IS A TEST FILE"), "new_york.jpg")},
|
|
headers=self._make_auth_headers(web.password),
|
|
)
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
|
|
assert webhook_url == "http://127.0.0.1:1337/example"
|
|
assert webhook_data == "1 file uploaded to OnionShare"
|
|
|
|
def test_public_mode_on(self, temp_dir, common_obj):
|
|
web = web_obj(temp_dir, common_obj, "receive")
|
|
web.settings.set("general", "public", True)
|
|
|
|
with web.app.test_client() as c:
|
|
# Loading / should work without auth
|
|
res = c.get("/")
|
|
data1 = res.get_data()
|
|
assert res.status_code == 200
|
|
|
|
def test_public_mode_off(self, temp_dir, common_obj):
|
|
web = web_obj(temp_dir, common_obj, "receive")
|
|
web.settings.set("general", "public", False)
|
|
|
|
with web.app.test_client() as c:
|
|
# Load / without auth
|
|
res = c.get("/")
|
|
res.get_data()
|
|
assert res.status_code == 401
|
|
|
|
# But static resources should work without auth
|
|
res = c.get(f"{web.static_url_path}/css/style.css")
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
|
|
# Load / with valid auth
|
|
res = c.get("/", headers=self._make_auth_headers(web.password))
|
|
res.get_data()
|
|
assert res.status_code == 200
|
|
|
|
def _make_auth_headers(self, password):
|
|
auth = base64.b64encode(b"onionshare:" + password.encode()).decode()
|
|
h = Headers()
|
|
h.add("Authorization", "Basic " + auth)
|
|
return h
|
|
|
|
|
|
class TestZipWriterDefault:
|
|
@pytest.mark.parametrize(
|
|
"test_input",
|
|
(
|
|
f"onionshare_{''.join(random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6))}.zip"
|
|
for _ in range(50)
|
|
),
|
|
)
|
|
def test_default_zw_filename_regex(self, test_input):
|
|
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input))
|
|
|
|
def test_zw_filename(self, default_zw):
|
|
zw_filename = os.path.basename(default_zw.zip_filename)
|
|
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(zw_filename))
|
|
|
|
def test_zipfile_filename_matches_zipwriter_filename(self, default_zw):
|
|
assert default_zw.z.filename == default_zw.zip_filename
|
|
|
|
def test_zipfile_allow_zip64(self, default_zw):
|
|
assert default_zw.z._allowZip64 is True
|
|
|
|
def test_zipfile_mode(self, default_zw):
|
|
assert default_zw.z.mode == "w"
|
|
|
|
def test_callback(self, default_zw):
|
|
assert default_zw.processed_size_callback(None) is None
|
|
|
|
def test_add_file(self, default_zw, temp_file_1024_delete):
|
|
default_zw.add_file(temp_file_1024_delete)
|
|
zipfile_info = default_zw.z.getinfo(os.path.basename(temp_file_1024_delete))
|
|
|
|
assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED
|
|
assert zipfile_info.file_size == 1024
|
|
|
|
def test_add_directory(self, temp_dir_1024_delete, default_zw):
|
|
previous_size = default_zw._size # size before adding directory
|
|
default_zw.add_dir(temp_dir_1024_delete)
|
|
assert default_zw._size == previous_size + 1024
|
|
|
|
|
|
class TestZipWriterCustom:
|
|
@pytest.mark.parametrize(
|
|
"test_input",
|
|
(
|
|
Common.random_string(
|
|
random.randint(2, 50), random.choice((None, random.randint(2, 50)))
|
|
)
|
|
for _ in range(50)
|
|
),
|
|
)
|
|
def test_random_string_regex(self, test_input):
|
|
assert bool(RANDOM_STR_REGEX.match(test_input))
|
|
|
|
def test_custom_filename(self, custom_zw):
|
|
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename))
|
|
|
|
def test_custom_callback(self, custom_zw):
|
|
assert custom_zw.processed_size_callback(None) == "custom_callback"
|