mirror of
https://0xacab.org/jvoisin/mat2-web.git
synced 2025-02-24 00:59:59 -05:00
Resolve "Use a HMAC instead of a hash"
This commit is contained in:
parent
e1bac8b6a7
commit
c301e472bd
2
main.py
2
main.py
@ -36,7 +36,7 @@ def create_app(test_config=None):
|
|||||||
)
|
)
|
||||||
api.add_resource(
|
api.add_resource(
|
||||||
rest_api.APIDownload,
|
rest_api.APIDownload,
|
||||||
'/api/download/<string:key>/<string:filename>',
|
'/api/download/<string:key>/<string:secret>/<string:filename>',
|
||||||
resource_class_kwargs={'upload_folder': app.config['UPLOAD_FOLDER']}
|
resource_class_kwargs={'upload_folder': app.config['UPLOAD_FOLDER']}
|
||||||
)
|
)
|
||||||
api.add_resource(
|
api.add_resource(
|
||||||
|
@ -18,8 +18,8 @@ def info():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@routes.route('/download/<string:key>/<string:filename>')
|
@routes.route('/download/<string:key>/<string:secret>/<string:filename>')
|
||||||
def download_file(key: str, filename: str):
|
def download_file(key: str, secret: str, filename: str):
|
||||||
if filename != secure_filename(filename):
|
if filename != secure_filename(filename):
|
||||||
return redirect(url_for('routes.upload_file'))
|
return redirect(url_for('routes.upload_file'))
|
||||||
|
|
||||||
@ -28,7 +28,7 @@ def download_file(key: str, filename: str):
|
|||||||
|
|
||||||
if not os.path.exists(complete_path):
|
if not os.path.exists(complete_path):
|
||||||
return redirect(url_for('routes.upload_file'))
|
return redirect(url_for('routes.upload_file'))
|
||||||
if hmac.compare_digest(utils.hash_file(complete_path), key) is False:
|
if hmac.compare_digest(utils.hash_file(complete_path, secret), key) is False:
|
||||||
return redirect(url_for('routes.upload_file'))
|
return redirect(url_for('routes.upload_file'))
|
||||||
|
|
||||||
@after_this_request
|
@after_this_request
|
||||||
@ -67,10 +67,14 @@ def upload_file():
|
|||||||
flash('Unable to clean %s' % mime)
|
flash('Unable to clean %s' % mime)
|
||||||
return redirect(url_for('routes.upload_file'))
|
return redirect(url_for('routes.upload_file'))
|
||||||
|
|
||||||
key, meta_after, output_filename = utils.cleanup(parser, filepath, current_app.config['UPLOAD_FOLDER'])
|
key, secret, meta_after, output_filename = utils.cleanup(parser, filepath, current_app.config['UPLOAD_FOLDER'])
|
||||||
|
|
||||||
return render_template(
|
return render_template(
|
||||||
'download.html', mimetypes=mime_types, meta=meta, filename=output_filename, meta_after=meta_after, key=key
|
'download.html',
|
||||||
|
mimetypes=mime_types,
|
||||||
|
meta=meta,
|
||||||
|
download_uri=url_for('routes.download_file', key=key, secret=secret, filename=output_filename),
|
||||||
|
meta_after=meta_after,
|
||||||
)
|
)
|
||||||
|
|
||||||
max_file_size = int(current_app.config['MAX_CONTENT_LENGTH'] / 1024 / 1024)
|
max_file_size = int(current_app.config['MAX_CONTENT_LENGTH'] / 1024 / 1024)
|
||||||
|
@ -42,14 +42,15 @@ class APIUpload(Resource):
|
|||||||
if not parser.remove_all():
|
if not parser.remove_all():
|
||||||
abort(500, message='Unable to clean %s' % mime)
|
abort(500, message='Unable to clean %s' % mime)
|
||||||
|
|
||||||
key, meta_after, output_filename = utils.cleanup(parser, filepath, self.upload_folder)
|
key, secret, meta_after, output_filename = utils.cleanup(parser, filepath, self.upload_folder)
|
||||||
return utils.return_file_created_response(
|
return utils.return_file_created_response(
|
||||||
output_filename,
|
output_filename,
|
||||||
mime,
|
mime,
|
||||||
key,
|
key,
|
||||||
|
secret,
|
||||||
meta,
|
meta,
|
||||||
meta_after,
|
meta_after,
|
||||||
urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename))
|
urljoin(request.host_url, '%s/%s/%s/%s/%s' % ('api', 'download', key, secret, output_filename))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -58,8 +59,8 @@ class APIDownload(Resource):
|
|||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
self.upload_folder = kwargs['upload_folder']
|
self.upload_folder = kwargs['upload_folder']
|
||||||
|
|
||||||
def get(self, key: str, filename: str):
|
def get(self, key: str, secret: str, filename: str):
|
||||||
complete_path, filepath = utils.is_valid_api_download_file(filename, key, self.upload_folder)
|
complete_path, filepath = utils.is_valid_api_download_file(filename, key, secret, self.upload_folder)
|
||||||
# Make sure the file is NOT deleted on HEAD requests
|
# Make sure the file is NOT deleted on HEAD requests
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
file_removal_scheduler.run_file_removal_job(self.upload_folder)
|
file_removal_scheduler.run_file_removal_job(self.upload_folder)
|
||||||
@ -87,6 +88,7 @@ class APIBulkDownloadCreator(Resource):
|
|||||||
'type': 'dict',
|
'type': 'dict',
|
||||||
'schema': {
|
'schema': {
|
||||||
'key': {'type': 'string', 'required': True},
|
'key': {'type': 'string', 'required': True},
|
||||||
|
'secret': {'type': 'string', 'required': True},
|
||||||
'file_name': {'type': 'string', 'required': True}
|
'file_name': {'type': 'string', 'required': True}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -108,6 +110,7 @@ class APIBulkDownloadCreator(Resource):
|
|||||||
complete_path, file_path = utils.is_valid_api_download_file(
|
complete_path, file_path = utils.is_valid_api_download_file(
|
||||||
file_candidate['file_name'],
|
file_candidate['file_name'],
|
||||||
file_candidate['key'],
|
file_candidate['key'],
|
||||||
|
file_candidate['secret'],
|
||||||
self.upload_folder
|
self.upload_folder
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
@ -124,13 +127,17 @@ class APIBulkDownloadCreator(Resource):
|
|||||||
parser, mime = utils.get_file_parser(zip_path)
|
parser, mime = utils.get_file_parser(zip_path)
|
||||||
if not parser.remove_all():
|
if not parser.remove_all():
|
||||||
abort(500, message='Unable to clean %s' % mime)
|
abort(500, message='Unable to clean %s' % mime)
|
||||||
key, meta_after, output_filename = utils.cleanup(parser, zip_path, self.upload_folder)
|
key, secret, meta_after, output_filename = utils.cleanup(parser, zip_path, self.upload_folder)
|
||||||
return {
|
return {
|
||||||
'output_filename': output_filename,
|
'output_filename': output_filename,
|
||||||
'mime': mime,
|
'mime': mime,
|
||||||
'key': key,
|
'key': key,
|
||||||
|
'secret': secret,
|
||||||
'meta_after': meta_after,
|
'meta_after': meta_after,
|
||||||
'download_link': urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename))
|
'download_link': urljoin(
|
||||||
|
request.host_url,
|
||||||
|
'%s/%s/%s/%s/%s' % ('api', 'download', key, secret, output_filename)
|
||||||
|
)
|
||||||
}, 201
|
}, 201
|
||||||
|
|
||||||
|
|
||||||
|
@ -12,15 +12,21 @@ def get_allow_origin_header_value():
|
|||||||
return os.environ.get('MAT2_ALLOW_ORIGIN_WHITELIST', '*').split(" ")
|
return os.environ.get('MAT2_ALLOW_ORIGIN_WHITELIST', '*').split(" ")
|
||||||
|
|
||||||
|
|
||||||
def hash_file(filepath: str) -> str:
|
def hash_file(filepath: str, secret: str) -> str:
|
||||||
sha256 = hashlib.sha256()
|
"""
|
||||||
|
The goal of the hmac is to ONLY make the hashes unpredictable
|
||||||
|
:param filepath: Path of the file
|
||||||
|
:param secret: a server side generated secret
|
||||||
|
:return: digest, secret
|
||||||
|
"""
|
||||||
|
mac = hmac.new(secret.encode(), None, hashlib.sha256)
|
||||||
with open(filepath, 'rb') as f:
|
with open(filepath, 'rb') as f:
|
||||||
while True:
|
while True:
|
||||||
data = f.read(65536) # read the file by chunk of 64k
|
data = f.read(65536) # read the file by chunk of 64k
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
sha256.update(data)
|
mac.update(data)
|
||||||
return sha256.hexdigest()
|
return mac.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def check_upload_folder(upload_folder):
|
def check_upload_folder(upload_folder):
|
||||||
@ -28,11 +34,20 @@ def check_upload_folder(upload_folder):
|
|||||||
os.mkdir(upload_folder)
|
os.mkdir(upload_folder)
|
||||||
|
|
||||||
|
|
||||||
def return_file_created_response(output_filename, mime, key, meta, meta_after, download_link):
|
def return_file_created_response(
|
||||||
|
output_filename: str,
|
||||||
|
mime: str,
|
||||||
|
key: str,
|
||||||
|
secret: str,
|
||||||
|
meta: list,
|
||||||
|
meta_after: list,
|
||||||
|
download_link: str
|
||||||
|
) -> dict:
|
||||||
return {
|
return {
|
||||||
'output_filename': output_filename,
|
'output_filename': output_filename,
|
||||||
'mime': mime,
|
'mime': mime,
|
||||||
'key': key,
|
'key': key,
|
||||||
|
'secret': secret,
|
||||||
'meta': meta,
|
'meta': meta,
|
||||||
'meta_after': meta_after,
|
'meta_after': meta_after,
|
||||||
'download_link': download_link
|
'download_link': download_link
|
||||||
@ -65,9 +80,9 @@ def cleanup(parser, filepath, upload_folder):
|
|||||||
parser, _ = parser_factory.get_parser(parser.output_filename)
|
parser, _ = parser_factory.get_parser(parser.output_filename)
|
||||||
meta_after = parser.get_meta()
|
meta_after = parser.get_meta()
|
||||||
os.remove(filepath)
|
os.remove(filepath)
|
||||||
|
secret = os.urandom(32).hex()
|
||||||
key = hash_file(os.path.join(upload_folder, output_filename))
|
key = hash_file(os.path.join(upload_folder, output_filename), secret)
|
||||||
return key, meta_after, output_filename
|
return key, secret, meta_after, output_filename
|
||||||
|
|
||||||
|
|
||||||
def get_file_paths(filename, upload_folder):
|
def get_file_paths(filename, upload_folder):
|
||||||
@ -77,7 +92,7 @@ def get_file_paths(filename, upload_folder):
|
|||||||
return complete_path, filepath
|
return complete_path, filepath
|
||||||
|
|
||||||
|
|
||||||
def is_valid_api_download_file(filename, key, upload_folder):
|
def is_valid_api_download_file(filename: str, key: str, secret: str, upload_folder: str) -> [str, str]:
|
||||||
if filename != secure_filename(filename):
|
if filename != secure_filename(filename):
|
||||||
abort(400, message='Insecure filename')
|
abort(400, message='Insecure filename')
|
||||||
|
|
||||||
@ -86,6 +101,6 @@ def is_valid_api_download_file(filename, key, upload_folder):
|
|||||||
if not os.path.exists(complete_path):
|
if not os.path.exists(complete_path):
|
||||||
abort(404, message='File not found')
|
abort(404, message='File not found')
|
||||||
|
|
||||||
if hmac.compare_digest(hash_file(complete_path), key) is False:
|
if hmac.compare_digest(hash_file(complete_path, secret), key) is False:
|
||||||
abort(400, message='The file hash does not match')
|
abort(400, message='The file hash does not match')
|
||||||
return complete_path, filepath
|
return complete_path, filepath
|
||||||
|
@ -6,3 +6,5 @@ flask==1.0.3
|
|||||||
Flask-RESTful==0.3.7
|
Flask-RESTful==0.3.7
|
||||||
Flask-Cors==3.0.8
|
Flask-Cors==3.0.8
|
||||||
Cerberus==1.3.1
|
Cerberus==1.3.1
|
||||||
|
Flask-Testing==0.8.0
|
||||||
|
blinker==1.4
|
@ -10,7 +10,7 @@
|
|||||||
{% endif %}
|
{% endif %}
|
||||||
<div class="uk-flex uk-flex-center">
|
<div class="uk-flex uk-flex-center">
|
||||||
<div>
|
<div>
|
||||||
<a class="uk-flex-1" href='{{ url_for('routes.download_file', key=key, filename=filename) }}'>
|
<a class="uk-flex-1" href='{{ download_uri }}'>
|
||||||
<button class="uk-button uk-button-primary">
|
<button class="uk-button uk-button-primary">
|
||||||
⇩ download cleaned file
|
⇩ download cleaned file
|
||||||
</button>
|
</button>
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
<div class="shadowed-box u-text-center u-center-block">
|
<div class="shadowed-box u-text-center u-center-block">
|
||||||
<h2 class="uk-text-center">Remove metadata</h2>
|
<h2 class="uk-text-center">Remove metadata</h2>
|
||||||
<p class="uk-text-center">
|
<p class="uk-text-center">
|
||||||
The file you see is just the tip of the iceberg. Remove the hidden meta
|
The file you see is just the tip of the iceberg. Remove the hidden metadata.
|
||||||
</p>
|
</p>
|
||||||
<div class="uk-flex uk-flex-center">
|
<div class="uk-flex uk-flex-center">
|
||||||
<div>
|
<div>
|
||||||
|
89
test/test.py
89
test/test.py
@ -6,12 +6,13 @@ import io
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
from flask_testing import TestCase
|
||||||
|
|
||||||
import main
|
import main
|
||||||
|
|
||||||
|
|
||||||
class Mat2WebTestCase(unittest.TestCase):
|
class Mat2WebTestCase(TestCase):
|
||||||
def setUp(self):
|
def create_app(self):
|
||||||
os.environ.setdefault('MAT2_ALLOW_ORIGIN_WHITELIST', 'origin1.gnu origin2.gnu')
|
os.environ.setdefault('MAT2_ALLOW_ORIGIN_WHITELIST', 'origin1.gnu origin2.gnu')
|
||||||
self.upload_folder = tempfile.mkdtemp()
|
self.upload_folder = tempfile.mkdtemp()
|
||||||
app = main.create_app(
|
app = main.create_app(
|
||||||
@ -20,45 +21,45 @@ class Mat2WebTestCase(unittest.TestCase):
|
|||||||
'UPLOAD_FOLDER': self.upload_folder
|
'UPLOAD_FOLDER': self.upload_folder
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
self.app = app.test_client()
|
return app
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
shutil.rmtree(self.upload_folder)
|
shutil.rmtree(self.upload_folder)
|
||||||
|
|
||||||
def test_get_root(self):
|
def test_get_root(self):
|
||||||
rv = self.app.get('/')
|
rv = self.client.get('/')
|
||||||
self.assertIn(b'mat2-web', rv.data)
|
self.assertIn(b'mat2-web', rv.data)
|
||||||
|
|
||||||
def test_check_mimetypes(self):
|
def test_check_mimetypes(self):
|
||||||
rv = self.app.get('/')
|
rv = self.client.get('/')
|
||||||
self.assertIn(b'.torrent', rv.data)
|
self.assertIn(b'.torrent', rv.data)
|
||||||
self.assertIn(b'.ods', rv.data)
|
self.assertIn(b'.ods', rv.data)
|
||||||
|
|
||||||
def test_get_download_dangerous_file(self):
|
def test_get_download_dangerous_file(self):
|
||||||
rv = self.app.get('/download/1337/\..\filename')
|
rv = self.client.get('/download/1337/aabb/\..\filename')
|
||||||
self.assertEqual(rv.status_code, 302)
|
self.assertEqual(rv.status_code, 302)
|
||||||
|
|
||||||
def test_get_download_without_key_file(self):
|
def test_get_download_without_key_file(self):
|
||||||
rv = self.app.get('/download/non_existant')
|
rv = self.client.get('/download/non_existant')
|
||||||
self.assertEqual(rv.status_code, 404)
|
self.assertEqual(rv.status_code, 404)
|
||||||
|
|
||||||
def test_get_download_nonexistant_file(self):
|
def test_get_download_nonexistant_file(self):
|
||||||
rv = self.app.get('/download/1337/non_existant')
|
rv = self.client.get('/download/1337/aabb/non_existant')
|
||||||
self.assertEqual(rv.status_code, 302)
|
self.assertEqual(rv.status_code, 302)
|
||||||
|
|
||||||
def test_get_upload_without_file(self):
|
def test_get_upload_without_file(self):
|
||||||
rv = self.app.post('/')
|
rv = self.client.post('/')
|
||||||
self.assertEqual(rv.status_code, 302)
|
self.assertEqual(rv.status_code, 302)
|
||||||
|
|
||||||
def test_get_upload_empty_file(self):
|
def test_get_upload_empty_file(self):
|
||||||
rv = self.app.post('/',
|
rv = self.client.post('/',
|
||||||
data=dict(
|
data=dict(
|
||||||
file=(io.BytesIO(b""), 'test.pdf'),
|
file=(io.BytesIO(b""), 'test.pdf'),
|
||||||
), follow_redirects=False)
|
), follow_redirects=False)
|
||||||
self.assertEqual(rv.status_code, 302)
|
self.assertEqual(rv.status_code, 302)
|
||||||
|
|
||||||
def test_get_upload_empty_file_redir(self):
|
def test_get_upload_empty_file_redir(self):
|
||||||
rv = self.app.post('/',
|
rv = self.client.post('/',
|
||||||
data=dict(
|
data=dict(
|
||||||
file=(io.BytesIO(b""), 'test.pdf'),
|
file=(io.BytesIO(b""), 'test.pdf'),
|
||||||
), follow_redirects=True)
|
), follow_redirects=True)
|
||||||
@ -67,7 +68,7 @@ class Mat2WebTestCase(unittest.TestCase):
|
|||||||
self.assertEqual(rv.status_code, 200)
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
|
||||||
def test_get_upload_no_selected_file(self):
|
def test_get_upload_no_selected_file(self):
|
||||||
rv = self.app.post('/',
|
rv = self.client.post('/',
|
||||||
data=dict(
|
data=dict(
|
||||||
file=(io.BytesIO(b""), ''),
|
file=(io.BytesIO(b""), ''),
|
||||||
), follow_redirects=True)
|
), follow_redirects=True)
|
||||||
@ -86,7 +87,7 @@ class Mat2WebTestCase(unittest.TestCase):
|
|||||||
'AAAAAAAAAAApIFnAAAAdGVzdC5qc29uVVQNAAfomo9d6JqPXeiaj111eAsAAQTpAwAABOkDAAB'
|
'AAAAAAAAAAApIFnAAAAdGVzdC5qc29uVVQNAAfomo9d6JqPXeiaj111eAsAAQTpAwAABOkDAAB'
|
||||||
'QSwUGAAAAAAIAAgC8AAAAwAAAAAAA'
|
'QSwUGAAAAAAIAAgC8AAAAwAAAAAAA'
|
||||||
)
|
)
|
||||||
rv = self.app.post('/',
|
rv = self.client.post('/',
|
||||||
data=dict(
|
data=dict(
|
||||||
file=(io.BytesIO(zip_file_bytes), 'test.zip'),
|
file=(io.BytesIO(zip_file_bytes), 'test.zip'),
|
||||||
), follow_redirects=True)
|
), follow_redirects=True)
|
||||||
@ -94,7 +95,7 @@ class Mat2WebTestCase(unittest.TestCase):
|
|||||||
self.assertEqual(rv.status_code, 200)
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
|
||||||
def test_get_upload_no_file_name(self):
|
def test_get_upload_no_file_name(self):
|
||||||
rv = self.app.post('/',
|
rv = self.client.post('/',
|
||||||
data=dict(
|
data=dict(
|
||||||
file=(io.BytesIO(b"aaa")),
|
file=(io.BytesIO(b"aaa")),
|
||||||
), follow_redirects=True)
|
), follow_redirects=True)
|
||||||
@ -102,30 +103,51 @@ class Mat2WebTestCase(unittest.TestCase):
|
|||||||
self.assertEqual(rv.status_code, 200)
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
|
||||||
def test_get_upload_harmless_file(self):
|
def test_get_upload_harmless_file(self):
|
||||||
rv = self.app.post('/',
|
rv = self.client.post(
|
||||||
data=dict(
|
'/',
|
||||||
file=(io.BytesIO(b"Some text"), 'test.txt'),
|
data=dict(
|
||||||
), follow_redirects=True)
|
file=(io.BytesIO(b"Some text"), 'test.txt'),
|
||||||
self.assertIn(b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt', rv.data)
|
),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
download_uri = self.get_context_variable('download_uri')
|
||||||
|
self.assertIn('/test.cleaned.txt', download_uri)
|
||||||
self.assertEqual(rv.status_code, 200)
|
self.assertEqual(rv.status_code, 200)
|
||||||
self.assertNotIn('Access-Control-Allow-Origin', rv.headers)
|
self.assertNotIn('Access-Control-Allow-Origin', rv.headers)
|
||||||
|
|
||||||
rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt')
|
rv = self.client.get(download_uri)
|
||||||
self.assertEqual(rv.status_code, 200)
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
|
||||||
rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt')
|
rv = self.client.get(download_uri)
|
||||||
self.assertEqual(rv.status_code, 302)
|
self.assertEqual(rv.status_code, 302)
|
||||||
|
|
||||||
def test_upload_wrong_hash(self):
|
def test_upload_wrong_hash_or_secret(self):
|
||||||
rv = self.app.post('/',
|
rv = self.client.post(
|
||||||
data=dict(
|
'/',
|
||||||
file=(io.BytesIO(b"Some text"), 'test.txt'),
|
data=dict(
|
||||||
), follow_redirects=True)
|
file=(io.BytesIO(b"Some text"), 'test.txt'),
|
||||||
self.assertIn(b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt',
|
),
|
||||||
rv.data)
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
download_uri = self.get_context_variable('download_uri')
|
||||||
|
|
||||||
|
self.assertIn('/test.cleaned.txt', download_uri)
|
||||||
|
self.assertIn('/download', download_uri)
|
||||||
self.assertEqual(rv.status_code, 200)
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
|
||||||
rv = self.app.get('/download/70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt')
|
uri_parts = download_uri.split("/")
|
||||||
|
self.assertEqual(len(uri_parts[2]), len(uri_parts[3]))
|
||||||
|
self.assertEqual(64, len(uri_parts[2]))
|
||||||
|
|
||||||
|
key_uri_parts = uri_parts
|
||||||
|
key_uri_parts[2] = '70623619c'
|
||||||
|
rv = self.client.get("/".join(key_uri_parts))
|
||||||
|
self.assertEqual(rv.status_code, 302)
|
||||||
|
|
||||||
|
key_uri_parts = uri_parts
|
||||||
|
key_uri_parts[3] = '70623619c'
|
||||||
|
rv = self.client.get("/".join(key_uri_parts))
|
||||||
self.assertEqual(rv.status_code, 302)
|
self.assertEqual(rv.status_code, 302)
|
||||||
|
|
||||||
@patch('matweb.file_removal_scheduler.random.randint')
|
@patch('matweb.file_removal_scheduler.random.randint')
|
||||||
@ -140,19 +162,18 @@ class Mat2WebTestCase(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
app = app.test_client()
|
app = app.test_client()
|
||||||
|
|
||||||
request = self.app.post('/',
|
request = self.client.post('/',
|
||||||
data=dict(
|
data=dict(
|
||||||
file=(io.BytesIO(b"Some text"), 'test.txt'),
|
file=(io.BytesIO(b"Some text"), 'test.txt'),
|
||||||
), follow_redirects=True)
|
), follow_redirects=True)
|
||||||
self.assertEqual(request.status_code, 200)
|
self.assertEqual(request.status_code, 200)
|
||||||
request = app.get(
|
|
||||||
b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt'
|
request = app.get(self.get_context_variable('download_uri'))
|
||||||
)
|
|
||||||
self.assertEqual(302, request.status_code)
|
self.assertEqual(302, request.status_code)
|
||||||
os.environ['MAT2_MAX_FILE_AGE_FOR_REMOVAL'] = '9999'
|
os.environ['MAT2_MAX_FILE_AGE_FOR_REMOVAL'] = '9999'
|
||||||
|
|
||||||
def test_info_page(self):
|
def test_info_page(self):
|
||||||
rv = self.app.get('/info')
|
rv = self.client.get('/info')
|
||||||
self.assertIn(b'What are metadata?', rv.data)
|
self.assertIn(b'What are metadata?', rv.data)
|
||||||
self.assertIn(b'.asc', rv.data)
|
self.assertIn(b'.asc', rv.data)
|
||||||
self.assertIn(b'.mp2', rv.data)
|
self.assertIn(b'.mp2', rv.data)
|
||||||
|
@ -30,33 +30,26 @@ class Mat2APITestCase(unittest.TestCase):
|
|||||||
del os.environ['MAT2_ALLOW_ORIGIN_WHITELIST']
|
del os.environ['MAT2_ALLOW_ORIGIN_WHITELIST']
|
||||||
|
|
||||||
def test_api_upload_valid(self):
|
def test_api_upload_valid(self):
|
||||||
request = self.app.post('/api/upload',
|
request = self.app.post(
|
||||||
data='{"file_name": "test_name.jpg", '
|
'/api/upload',
|
||||||
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
|
data='{"file_name": "test_name.jpg", '
|
||||||
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
|
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
|
||||||
headers={'content-type': 'application/json'}
|
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
|
||||||
)
|
headers={'content-type': 'application/json'}
|
||||||
|
)
|
||||||
self.assertEqual(request.headers['Content-Type'], 'application/json')
|
self.assertEqual(request.headers['Content-Type'], 'application/json')
|
||||||
self.assertEqual(request.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
|
self.assertEqual(request.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
|
||||||
self.assertEqual(request.status_code, 200)
|
self.assertEqual(request.status_code, 200)
|
||||||
|
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
expected = {
|
self.assertEqual(data['output_filename'], 'test_name.cleaned.jpg')
|
||||||
'output_filename': 'test_name.cleaned.jpg',
|
self.assertEqual(data['output_filename'], 'test_name.cleaned.jpg')
|
||||||
'mime': 'image/jpeg',
|
self.assertEqual(data['mime'], 'image/jpeg')
|
||||||
'key': '81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161',
|
self.assertEqual(len(data['secret']), 64)
|
||||||
'meta': {
|
self.assertEqual(len(data['key']), 64)
|
||||||
'BitDepth': 8,
|
self.assertNotEqual(data['key'], data['secret'])
|
||||||
'ColorType': 'RGB with Alpha',
|
self.assertTrue('http://localhost/api/download/' in data['download_link'])
|
||||||
'Compression': 'Deflate/Inflate',
|
self.assertTrue('test_name.cleaned.jpg' in data['download_link'])
|
||||||
'Filter': 'Adaptive',
|
|
||||||
'Interlace': 'Noninterlaced'
|
|
||||||
},
|
|
||||||
'meta_after': {},
|
|
||||||
'download_link': 'http://localhost/api/download/'
|
|
||||||
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test_name.cleaned.jpg'
|
|
||||||
}
|
|
||||||
self.assertEqual(data, expected)
|
|
||||||
|
|
||||||
def test_api_upload_missing_params(self):
|
def test_api_upload_missing_params(self):
|
||||||
request = self.app.post('/api/upload',
|
request = self.app.post('/api/upload',
|
||||||
@ -141,7 +134,6 @@ class Mat2APITestCase(unittest.TestCase):
|
|||||||
error = request.get_json()['message']
|
error = request.get_json()['message']
|
||||||
self.assertEqual(error, 'Unable to clean application/zip')
|
self.assertEqual(error, 'Unable to clean application/zip')
|
||||||
|
|
||||||
|
|
||||||
def test_api_download(self):
|
def test_api_download(self):
|
||||||
request = self.app.post('/api/upload',
|
request = self.app.post('/api/upload',
|
||||||
data='{"file_name": "test_name.jpg", '
|
data='{"file_name": "test_name.jpg", '
|
||||||
@ -152,25 +144,36 @@ class Mat2APITestCase(unittest.TestCase):
|
|||||||
self.assertEqual(request.status_code, 200)
|
self.assertEqual(request.status_code, 200)
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
|
|
||||||
request = self.app.get('http://localhost/api/download/'
|
request = self.app.get('http://localhost/api/download/161/'
|
||||||
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test name.cleaned.jpg')
|
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test name.cleaned.jpg')
|
||||||
self.assertEqual(request.status_code, 400)
|
self.assertEqual(request.status_code, 400)
|
||||||
error = request.get_json()['message']
|
error = request.get_json()['message']
|
||||||
self.assertEqual(error, 'Insecure filename')
|
self.assertEqual(error, 'Insecure filename')
|
||||||
|
|
||||||
request = self.app.get('http://localhost/api/download/'
|
request = self.app.get(data['download_link'].replace('test_name', 'wrong_test'))
|
||||||
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/'
|
|
||||||
'wrong_file_name.jpg')
|
|
||||||
self.assertEqual(request.status_code, 404)
|
self.assertEqual(request.status_code, 404)
|
||||||
error = request.get_json()['message']
|
error = request.get_json()['message']
|
||||||
self.assertEqual(error, 'File not found')
|
self.assertEqual(error, 'File not found')
|
||||||
|
|
||||||
request = self.app.get('http://localhost/api/download/81a541f9e/test_name.cleaned.jpg')
|
uri_parts = data['download_link'].split("/")
|
||||||
|
self.assertEqual(len(uri_parts[5]), len(uri_parts[6]))
|
||||||
|
self.assertEqual(64, len(uri_parts[5]))
|
||||||
|
|
||||||
|
key_uri_parts = uri_parts
|
||||||
|
key_uri_parts[5] = '70623619c'
|
||||||
|
request = self.app.get("/".join(key_uri_parts))
|
||||||
self.assertEqual(request.status_code, 400)
|
self.assertEqual(request.status_code, 400)
|
||||||
|
|
||||||
error = request.get_json()['message']
|
error = request.get_json()['message']
|
||||||
self.assertEqual(error, 'The file hash does not match')
|
self.assertEqual(error, 'The file hash does not match')
|
||||||
|
|
||||||
|
key_uri_parts = uri_parts
|
||||||
|
key_uri_parts[6] = '70623619c'
|
||||||
|
request = self.app.get("/".join(key_uri_parts))
|
||||||
|
self.assertEqual(request.status_code, 400)
|
||||||
|
error = request.get_json()['message']
|
||||||
|
self.assertEqual(error, 'The file hash does not match')
|
||||||
|
|
||||||
request = self.app.head(data['download_link'])
|
request = self.app.head(data['download_link'])
|
||||||
self.assertEqual(request.status_code, 200)
|
self.assertEqual(request.status_code, 200)
|
||||||
self.assertEqual(request.headers['Content-Length'], '633')
|
self.assertEqual(request.headers['Content-Length'], '633')
|
||||||
@ -205,11 +208,13 @@ class Mat2APITestCase(unittest.TestCase):
|
|||||||
u'download_list': [
|
u'download_list': [
|
||||||
{
|
{
|
||||||
u'file_name': upload_one['output_filename'],
|
u'file_name': upload_one['output_filename'],
|
||||||
u'key': upload_one['key']
|
u'key': upload_one['key'],
|
||||||
|
u'secret': upload_one['secret']
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
u'file_name': upload_two['output_filename'],
|
u'file_name': upload_two['output_filename'],
|
||||||
u'key': upload_two['key']
|
u'key': upload_two['key'],
|
||||||
|
u'secret': upload_two['secret']
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@ -261,7 +266,8 @@ class Mat2APITestCase(unittest.TestCase):
|
|||||||
u'download_list': [
|
u'download_list': [
|
||||||
{
|
{
|
||||||
u'file_name': 'invalid_file_name',
|
u'file_name': 'invalid_file_name',
|
||||||
u'key': 'invalid_key'
|
u'key': 'invalid_key',
|
||||||
|
u'secret': 'invalid_secret'
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@ -348,11 +354,13 @@ class Mat2APITestCase(unittest.TestCase):
|
|||||||
u'download_list': [
|
u'download_list': [
|
||||||
{
|
{
|
||||||
u'file_name': 'invalid_file_name1',
|
u'file_name': 'invalid_file_name1',
|
||||||
u'key': 'invalid_key1'
|
u'key': 'invalid_key1',
|
||||||
|
u'secret': 'invalid_secret1'
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
u'file_name': 'invalid_file_name2',
|
u'file_name': 'invalid_file_name2',
|
||||||
u'key': 'invalid_key2'
|
u'key': 'invalid_key2',
|
||||||
|
u'secret': 'invalid_secret2'
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user