mirror of
https://github.com/onionshare/onionshare.git
synced 2025-02-10 03:38:55 -05:00
Fix tests
This commit is contained in:
parent
9785be0375
commit
15d66c1a6f
@ -2,8 +2,6 @@ import json
|
|||||||
import os
|
import os
|
||||||
import requests
|
import requests
|
||||||
import shutil
|
import shutil
|
||||||
import socket
|
|
||||||
import socks
|
|
||||||
import base64
|
import base64
|
||||||
|
|
||||||
from PyQt5 import QtCore, QtTest
|
from PyQt5 import QtCore, QtTest
|
||||||
@ -129,9 +127,9 @@ class GuiBaseTest(object):
|
|||||||
files = {'file[]': open('/tmp/test.txt', 'rb')}
|
files = {'file[]': open('/tmp/test.txt', 'rb')}
|
||||||
url = 'http://127.0.0.1:{}/upload'.format(self.gui.app.port)
|
url = 'http://127.0.0.1:{}/upload'.format(self.gui.app.port)
|
||||||
if public_mode:
|
if public_mode:
|
||||||
response = requests.post(url, files=files)
|
r = requests.post(url, files=files)
|
||||||
else:
|
else:
|
||||||
response = requests.post(url, files=files, auth=requests.auth.HTTPBasicAuth('onionshare', mode.web.password))
|
r = requests.post(url, files=files, auth=requests.auth.HTTPBasicAuth('onionshare', mode.web.password))
|
||||||
QtTest.QTest.qWait(2000)
|
QtTest.QTest.qWait(2000)
|
||||||
|
|
||||||
if type(mode) == ShareMode:
|
if type(mode) == ShareMode:
|
||||||
@ -186,9 +184,11 @@ class GuiBaseTest(object):
|
|||||||
|
|
||||||
def web_server_is_running(self):
|
def web_server_is_running(self):
|
||||||
'''Test that the web server has started'''
|
'''Test that the web server has started'''
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
try:
|
||||||
|
r = requests.get('http://127.0.0.1:{}/'.format(self.gui.app.port))
|
||||||
self.assertEqual(sock.connect_ex(('127.0.0.1',self.gui.app.port)), 0)
|
self.assertTrue(True)
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
self.assertTrue(False)
|
||||||
|
|
||||||
|
|
||||||
def have_a_password(self, mode, public_mode):
|
def have_a_password(self, mode, public_mode):
|
||||||
@ -226,34 +226,14 @@ class GuiBaseTest(object):
|
|||||||
|
|
||||||
def web_page(self, mode, string, public_mode):
|
def web_page(self, mode, string, public_mode):
|
||||||
'''Test that the web page contains a string'''
|
'''Test that the web page contains a string'''
|
||||||
s = socks.socksocket()
|
|
||||||
s.settimeout(60)
|
|
||||||
s.connect(('127.0.0.1', self.gui.app.port))
|
|
||||||
|
|
||||||
if not public_mode:
|
url = "http://127.0.0.1:{}/".format(self.gui.app.port)
|
||||||
path = '/{}'.format(mode.server_status.web.password)
|
if public_mode:
|
||||||
|
r = requests.get(url)
|
||||||
else:
|
else:
|
||||||
path = '/'
|
r = requests.get(url, auth=requests.auth.HTTPBasicAuth('onionshare', mode.web.password))
|
||||||
|
|
||||||
http_request = 'GET / HTTP/1.0\r\n'
|
self.assertTrue(string in r.text)
|
||||||
http_request += 'Host: 127.0.0.1\r\n'
|
|
||||||
if not public_mode:
|
|
||||||
auth = base64.b64encode(b'onionshare:'+password.encode()).decode()
|
|
||||||
http_request += 'Authorization: Basic {}'.format(auth)
|
|
||||||
http_request += '\r\n'
|
|
||||||
s.sendall(http_request.encode('utf-8'))
|
|
||||||
|
|
||||||
with open('/tmp/webpage', 'wb') as file_to_write:
|
|
||||||
while True:
|
|
||||||
data = s.recv(1024)
|
|
||||||
if not data:
|
|
||||||
break
|
|
||||||
file_to_write.write(data)
|
|
||||||
file_to_write.close()
|
|
||||||
|
|
||||||
f = open('/tmp/webpage')
|
|
||||||
self.assertTrue(string in f.read())
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
|
|
||||||
def history_widgets_present(self, mode):
|
def history_widgets_present(self, mode):
|
||||||
@ -277,10 +257,12 @@ class GuiBaseTest(object):
|
|||||||
def web_server_is_stopped(self):
|
def web_server_is_stopped(self):
|
||||||
'''Test that the web server also stopped'''
|
'''Test that the web server also stopped'''
|
||||||
QtTest.QTest.qWait(2000)
|
QtTest.QTest.qWait(2000)
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
|
|
||||||
# We should be closed by now. Fail if not!
|
try:
|
||||||
self.assertNotEqual(sock.connect_ex(('127.0.0.1',self.gui.app.port)), 0)
|
r = requests.get('http://127.0.0.1:{}/'.format(self.gui.app.port))
|
||||||
|
self.assertTrue(False)
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
self.assertTrue(True)
|
||||||
|
|
||||||
|
|
||||||
def server_status_indicator_says_closed(self, mode, stay_open):
|
def server_status_indicator_says_closed(self, mode, stay_open):
|
||||||
|
@ -7,18 +7,24 @@ from .GuiBaseTest import GuiBaseTest
|
|||||||
class GuiReceiveTest(GuiBaseTest):
|
class GuiReceiveTest(GuiBaseTest):
|
||||||
def upload_file(self, public_mode, file_to_upload, expected_basename, identical_files_at_once=False):
|
def upload_file(self, public_mode, file_to_upload, expected_basename, identical_files_at_once=False):
|
||||||
'''Test that we can upload the file'''
|
'''Test that we can upload the file'''
|
||||||
files = {'file[]': open(file_to_upload, 'rb')}
|
|
||||||
url = 'http://127.0.0.1:{}/upload'.format(self.gui.app.port)
|
# Wait 2 seconds to make sure the filename, based on timestamp, isn't accidentally reused
|
||||||
if not public_mode:
|
|
||||||
r = requests.post(url, files=files)
|
|
||||||
else:
|
|
||||||
r = requests.post(url, files=files, auth=requests.auth.HTTPBasicAuth('onionshare', mode.web.password))
|
|
||||||
if identical_files_at_once:
|
|
||||||
# Send a duplicate upload to test for collisions
|
|
||||||
r = requests.post(path, files=files)
|
|
||||||
QtTest.QTest.qWait(2000)
|
QtTest.QTest.qWait(2000)
|
||||||
|
|
||||||
# Make sure the file is within the last 10 seconds worth of filenames
|
files = {'file[]': open(file_to_upload, 'rb')}
|
||||||
|
url = 'http://127.0.0.1:{}/upload'.format(self.gui.app.port)
|
||||||
|
if public_mode:
|
||||||
|
r = requests.post(url, files=files)
|
||||||
|
else:
|
||||||
|
r = requests.post(url, files=files, auth=requests.auth.HTTPBasicAuth('onionshare', self.gui.receive_mode.web.password))
|
||||||
|
|
||||||
|
if identical_files_at_once:
|
||||||
|
# Send a duplicate upload to test for collisions
|
||||||
|
r = requests.post(url, files=files)
|
||||||
|
|
||||||
|
QtTest.QTest.qWait(2000)
|
||||||
|
|
||||||
|
# Make sure the file is within the last 10 seconds worth of fileames
|
||||||
exists = False
|
exists = False
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
@ -40,23 +46,23 @@ class GuiReceiveTest(GuiBaseTest):
|
|||||||
'''Test that we can't upload the file when permissions are wrong, and expected content is shown'''
|
'''Test that we can't upload the file when permissions are wrong, and expected content is shown'''
|
||||||
files = {'file[]': open('/tmp/test.txt', 'rb')}
|
files = {'file[]': open('/tmp/test.txt', 'rb')}
|
||||||
url = 'http://127.0.0.1:{}/upload'.format(self.gui.app.port)
|
url = 'http://127.0.0.1:{}/upload'.format(self.gui.app.port)
|
||||||
if not public_mode:
|
if public_mode:
|
||||||
r = requests.post(url, files=files)
|
r = requests.post(url, files=files)
|
||||||
else:
|
else:
|
||||||
r = requests.post(url, files=files, auth=requests.auth.HTTPBasicAuth('onionshare', mode.web.password))
|
r = requests.post(url, files=files, auth=requests.auth.HTTPBasicAuth('onionshare', self.gui.receive_mode.web.password))
|
||||||
|
|
||||||
QtCore.QTimer.singleShot(1000, self.accept_dialog)
|
QtCore.QTimer.singleShot(1000, self.accept_dialog)
|
||||||
self.assertTrue('Error uploading, please inform the OnionShare user' in response.text)
|
self.assertTrue('Error uploading, please inform the OnionShare user' in r.text)
|
||||||
|
|
||||||
def upload_dir_permissions(self, mode=0o755):
|
def upload_dir_permissions(self, mode=0o755):
|
||||||
'''Manipulate the permissions on the upload dir in between tests'''
|
'''Manipulate the permissions on the upload dir in between tests'''
|
||||||
os.chmod('/tmp/OnionShare', mode)
|
os.chmod('/tmp/OnionShare', mode)
|
||||||
|
|
||||||
def try_public_paths_in_non_public_mode(self):
|
def try_without_auth_in_non_public_mode(self):
|
||||||
r = requests.post('http://127.0.0.1:{}/upload'.format(self.gui.app.port))
|
r = requests.post('http://127.0.0.1:{}/upload'.format(self.gui.app.port))
|
||||||
self.assertEqual(response.status_code, 404)
|
self.assertEqual(r.status_code, 401)
|
||||||
r = requests.get('http://127.0.0.1:{}/close'.format(self.gui.app.port))
|
r = requests.get('http://127.0.0.1:{}/close'.format(self.gui.app.port))
|
||||||
self.assertEqual(response.status_code, 404)
|
self.assertEqual(r.status_code, 401)
|
||||||
|
|
||||||
def uploading_zero_files_shouldnt_change_ui(self, mode, public_mode):
|
def uploading_zero_files_shouldnt_change_ui(self, mode, public_mode):
|
||||||
'''If you submit the receive mode form without selecting any files, the UI shouldn't get updated'''
|
'''If you submit the receive mode form without selecting any files, the UI shouldn't get updated'''
|
||||||
@ -68,7 +74,7 @@ class GuiReceiveTest(GuiBaseTest):
|
|||||||
before_number_of_history_items = len(mode.history.item_list.items)
|
before_number_of_history_items = len(mode.history.item_list.items)
|
||||||
|
|
||||||
# Click submit without including any files a few times
|
# Click submit without including any files a few times
|
||||||
if not public_mode:
|
if public_mode:
|
||||||
r = requests.post(url, files={})
|
r = requests.post(url, files={})
|
||||||
r = requests.post(url, files={})
|
r = requests.post(url, files={})
|
||||||
r = requests.post(url, files={})
|
r = requests.post(url, files={})
|
||||||
@ -102,11 +108,11 @@ class GuiReceiveTest(GuiBaseTest):
|
|||||||
self.server_status_indicator_says_started(self.gui.receive_mode)
|
self.server_status_indicator_says_started(self.gui.receive_mode)
|
||||||
self.web_page(self.gui.receive_mode, 'Select the files you want to send, then click', public_mode)
|
self.web_page(self.gui.receive_mode, 'Select the files you want to send, then click', public_mode)
|
||||||
|
|
||||||
def run_all_receive_mode_tests(self, public_mode, receive_allow_receiver_shutdown):
|
def run_all_receive_mode_tests(self, public_mode):
|
||||||
'''Upload files in receive mode and stop the share'''
|
'''Upload files in receive mode and stop the share'''
|
||||||
self.run_all_receive_mode_setup_tests(public_mode)
|
self.run_all_receive_mode_setup_tests(public_mode)
|
||||||
if not public_mode:
|
if not public_mode:
|
||||||
self.try_public_paths_in_non_public_mode()
|
self.try_without_auth_in_non_public_mode()
|
||||||
self.upload_file(public_mode, '/tmp/test.txt', 'test.txt')
|
self.upload_file(public_mode, '/tmp/test.txt', 'test.txt')
|
||||||
self.history_widgets_present(self.gui.receive_mode)
|
self.history_widgets_present(self.gui.receive_mode)
|
||||||
self.counter_incremented(self.gui.receive_mode, 1)
|
self.counter_incremented(self.gui.receive_mode, 1)
|
||||||
@ -128,7 +134,7 @@ class GuiReceiveTest(GuiBaseTest):
|
|||||||
self.server_is_started(self.gui.receive_mode)
|
self.server_is_started(self.gui.receive_mode)
|
||||||
self.history_indicator(self.gui.receive_mode, public_mode)
|
self.history_indicator(self.gui.receive_mode, public_mode)
|
||||||
|
|
||||||
def run_all_receive_mode_unwritable_dir_tests(self, public_mode, receive_allow_receiver_shutdown):
|
def run_all_receive_mode_unwritable_dir_tests(self, public_mode):
|
||||||
'''Attempt to upload (unwritable) files in receive mode and stop the share'''
|
'''Attempt to upload (unwritable) files in receive mode and stop the share'''
|
||||||
self.run_all_receive_mode_setup_tests(public_mode)
|
self.run_all_receive_mode_setup_tests(public_mode)
|
||||||
self.upload_dir_permissions(0o400)
|
self.upload_dir_permissions(0o400)
|
||||||
|
@ -2,6 +2,7 @@ import os
|
|||||||
import requests
|
import requests
|
||||||
import socks
|
import socks
|
||||||
import zipfile
|
import zipfile
|
||||||
|
import tempfile
|
||||||
from PyQt5 import QtCore, QtTest
|
from PyQt5 import QtCore, QtTest
|
||||||
from .GuiBaseTest import GuiBaseTest
|
from .GuiBaseTest import GuiBaseTest
|
||||||
|
|
||||||
@ -66,29 +67,17 @@ class GuiShareTest(GuiBaseTest):
|
|||||||
|
|
||||||
def download_share(self, public_mode):
|
def download_share(self, public_mode):
|
||||||
'''Test that we can download the share'''
|
'''Test that we can download the share'''
|
||||||
s = socks.socksocket()
|
url = "http://127.0.0.1:{}/download".format(self.gui.app.port)
|
||||||
s.settimeout(60)
|
|
||||||
s.connect(('127.0.0.1', self.gui.app.port))
|
|
||||||
|
|
||||||
if public_mode:
|
if public_mode:
|
||||||
path = '/download'
|
r = requests.get(url)
|
||||||
else:
|
else:
|
||||||
path = '{}/download'.format(self.gui.share_mode.web.password)
|
r = requests.get(url, auth=requests.auth.HTTPBasicAuth('onionshare', self.gui.share_mode.server_status.web.password))
|
||||||
|
|
||||||
http_request = 'GET {} HTTP/1.0\r\n'.format(path)
|
tmp_file = tempfile.NamedTemporaryFile()
|
||||||
http_request += 'Host: 127.0.0.1\r\n'
|
with open(tmp_file.name, 'wb') as f:
|
||||||
http_request += '\r\n'
|
f.write(r.content)
|
||||||
s.sendall(http_request.encode('utf-8'))
|
|
||||||
|
|
||||||
with open('/tmp/download.zip', 'wb') as file_to_write:
|
zip = zipfile.ZipFile(tmp_file.name)
|
||||||
while True:
|
|
||||||
data = s.recv(1024)
|
|
||||||
if not data:
|
|
||||||
break
|
|
||||||
file_to_write.write(data)
|
|
||||||
file_to_write.close()
|
|
||||||
|
|
||||||
zip = zipfile.ZipFile('/tmp/download.zip')
|
|
||||||
QtTest.QTest.qWait(2000)
|
QtTest.QTest.qWait(2000)
|
||||||
self.assertEqual('onionshare', zip.read('test.txt').decode('utf-8'))
|
self.assertEqual('onionshare', zip.read('test.txt').decode('utf-8'))
|
||||||
|
|
||||||
@ -98,7 +87,7 @@ class GuiShareTest(GuiBaseTest):
|
|||||||
|
|
||||||
for _ in range(20):
|
for _ in range(20):
|
||||||
password_guess = self.gui.common.build_password()
|
password_guess = self.gui.common.build_password()
|
||||||
r = requests.get(url, auth=requests.auth.HTTPBasicAuth('onionshare', password))
|
r = requests.get(url, auth=requests.auth.HTTPBasicAuth('onionshare', password_guess))
|
||||||
|
|
||||||
# A nasty hack to avoid the Alert dialog that blocks the rest of the test
|
# A nasty hack to avoid the Alert dialog that blocks the rest of the test
|
||||||
if not public_mode:
|
if not public_mode:
|
||||||
|
@ -20,7 +20,7 @@ class LocalReceiveModeUnwritableTest(unittest.TestCase, GuiReceiveTest):
|
|||||||
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
||||||
def test_gui(self):
|
def test_gui(self):
|
||||||
self.run_all_common_setup_tests()
|
self.run_all_common_setup_tests()
|
||||||
self.run_all_receive_mode_unwritable_dir_tests(False, True)
|
self.run_all_receive_mode_unwritable_dir_tests(False)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -21,7 +21,7 @@ class LocalReceivePublicModeUnwritableTest(unittest.TestCase, GuiReceiveTest):
|
|||||||
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
||||||
def test_gui(self):
|
def test_gui(self):
|
||||||
self.run_all_common_setup_tests()
|
self.run_all_common_setup_tests()
|
||||||
self.run_all_receive_mode_unwritable_dir_tests(True, True)
|
self.run_all_receive_mode_unwritable_dir_tests(True)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -21,7 +21,7 @@ class LocalReceiveModePublicModeTest(unittest.TestCase, GuiReceiveTest):
|
|||||||
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
||||||
def test_gui(self):
|
def test_gui(self):
|
||||||
self.run_all_common_setup_tests()
|
self.run_all_common_setup_tests()
|
||||||
self.run_all_receive_mode_tests(True, True)
|
self.run_all_receive_mode_tests(True)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -20,7 +20,7 @@ class LocalReceiveModeTest(unittest.TestCase, GuiReceiveTest):
|
|||||||
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
@pytest.mark.skipif(pytest.__version__ < '2.9', reason="requires newer pytest")
|
||||||
def test_gui(self):
|
def test_gui(self):
|
||||||
self.run_all_common_setup_tests()
|
self.run_all_common_setup_tests()
|
||||||
self.run_all_receive_mode_tests(False, True)
|
self.run_all_receive_mode_tests(False)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user