From c301e472bd7fd79d675c5df089db0b16fd1e2cfe Mon Sep 17 00:00:00 2001 From: jfriedli Date: Sun, 26 Apr 2020 09:50:14 -0700 Subject: [PATCH] Resolve "Use a HMAC instead of a hash" --- main.py | 2 +- matweb/frontend.py | 14 ++++--- matweb/rest_api.py | 19 ++++++--- matweb/utils.py | 35 +++++++++++----- requirements.txt | 2 + templates/download.html | 2 +- templates/index.html | 2 +- test/test.py | 89 +++++++++++++++++++++++++---------------- test/test_api.py | 74 +++++++++++++++++++--------------- 9 files changed, 148 insertions(+), 91 deletions(-) diff --git a/main.py b/main.py index c21c1a1..db186d1 100644 --- a/main.py +++ b/main.py @@ -36,7 +36,7 @@ def create_app(test_config=None): ) api.add_resource( rest_api.APIDownload, - '/api/download//', + '/api/download///', resource_class_kwargs={'upload_folder': app.config['UPLOAD_FOLDER']} ) api.add_resource( diff --git a/matweb/frontend.py b/matweb/frontend.py index 93432b4..2e25467 100644 --- a/matweb/frontend.py +++ b/matweb/frontend.py @@ -18,8 +18,8 @@ def info(): ) -@routes.route('/download//') -def download_file(key: str, filename: str): +@routes.route('/download///') +def download_file(key: str, secret: str, filename: str): if filename != secure_filename(filename): return redirect(url_for('routes.upload_file')) @@ -28,7 +28,7 @@ def download_file(key: str, filename: str): if not os.path.exists(complete_path): return redirect(url_for('routes.upload_file')) - if hmac.compare_digest(utils.hash_file(complete_path), key) is False: + if hmac.compare_digest(utils.hash_file(complete_path, secret), key) is False: return redirect(url_for('routes.upload_file')) @after_this_request @@ -67,10 +67,14 @@ def upload_file(): flash('Unable to clean %s' % mime) return redirect(url_for('routes.upload_file')) - key, meta_after, output_filename = utils.cleanup(parser, filepath, current_app.config['UPLOAD_FOLDER']) + key, secret, meta_after, output_filename = utils.cleanup(parser, filepath, current_app.config['UPLOAD_FOLDER']) return render_template( - 'download.html', mimetypes=mime_types, meta=meta, filename=output_filename, meta_after=meta_after, key=key + 'download.html', + mimetypes=mime_types, + meta=meta, + download_uri=url_for('routes.download_file', key=key, secret=secret, filename=output_filename), + meta_after=meta_after, ) max_file_size = int(current_app.config['MAX_CONTENT_LENGTH'] / 1024 / 1024) diff --git a/matweb/rest_api.py b/matweb/rest_api.py index 60d834f..4098050 100644 --- a/matweb/rest_api.py +++ b/matweb/rest_api.py @@ -42,14 +42,15 @@ class APIUpload(Resource): if not parser.remove_all(): abort(500, message='Unable to clean %s' % mime) - key, meta_after, output_filename = utils.cleanup(parser, filepath, self.upload_folder) + key, secret, meta_after, output_filename = utils.cleanup(parser, filepath, self.upload_folder) return utils.return_file_created_response( output_filename, mime, key, + secret, meta, meta_after, - urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename)) + urljoin(request.host_url, '%s/%s/%s/%s/%s' % ('api', 'download', key, secret, output_filename)) ) @@ -58,8 +59,8 @@ class APIDownload(Resource): def __init__(self, **kwargs): self.upload_folder = kwargs['upload_folder'] - def get(self, key: str, filename: str): - complete_path, filepath = utils.is_valid_api_download_file(filename, key, self.upload_folder) + def get(self, key: str, secret: str, filename: str): + complete_path, filepath = utils.is_valid_api_download_file(filename, key, secret, self.upload_folder) # Make sure the file is NOT deleted on HEAD requests if request.method == 'GET': file_removal_scheduler.run_file_removal_job(self.upload_folder) @@ -87,6 +88,7 @@ class APIBulkDownloadCreator(Resource): 'type': 'dict', 'schema': { 'key': {'type': 'string', 'required': True}, + 'secret': {'type': 'string', 'required': True}, 'file_name': {'type': 'string', 'required': True} } } @@ -108,6 +110,7 @@ class APIBulkDownloadCreator(Resource): complete_path, file_path = utils.is_valid_api_download_file( file_candidate['file_name'], file_candidate['key'], + file_candidate['secret'], self.upload_folder ) try: @@ -124,13 +127,17 @@ class APIBulkDownloadCreator(Resource): parser, mime = utils.get_file_parser(zip_path) if not parser.remove_all(): abort(500, message='Unable to clean %s' % mime) - key, meta_after, output_filename = utils.cleanup(parser, zip_path, self.upload_folder) + key, secret, meta_after, output_filename = utils.cleanup(parser, zip_path, self.upload_folder) return { 'output_filename': output_filename, 'mime': mime, 'key': key, + 'secret': secret, 'meta_after': meta_after, - 'download_link': urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename)) + 'download_link': urljoin( + request.host_url, + '%s/%s/%s/%s/%s' % ('api', 'download', key, secret, output_filename) + ) }, 201 diff --git a/matweb/utils.py b/matweb/utils.py index 8dfff45..ec9b99c 100644 --- a/matweb/utils.py +++ b/matweb/utils.py @@ -12,15 +12,21 @@ def get_allow_origin_header_value(): return os.environ.get('MAT2_ALLOW_ORIGIN_WHITELIST', '*').split(" ") -def hash_file(filepath: str) -> str: - sha256 = hashlib.sha256() +def hash_file(filepath: str, secret: str) -> str: + """ + The goal of the hmac is to ONLY make the hashes unpredictable + :param filepath: Path of the file + :param secret: a server side generated secret + :return: digest, secret + """ + mac = hmac.new(secret.encode(), None, hashlib.sha256) with open(filepath, 'rb') as f: while True: data = f.read(65536) # read the file by chunk of 64k if not data: break - sha256.update(data) - return sha256.hexdigest() + mac.update(data) + return mac.hexdigest() def check_upload_folder(upload_folder): @@ -28,11 +34,20 @@ def check_upload_folder(upload_folder): os.mkdir(upload_folder) -def return_file_created_response(output_filename, mime, key, meta, meta_after, download_link): +def return_file_created_response( + output_filename: str, + mime: str, + key: str, + secret: str, + meta: list, + meta_after: list, + download_link: str +) -> dict: return { 'output_filename': output_filename, 'mime': mime, 'key': key, + 'secret': secret, 'meta': meta, 'meta_after': meta_after, 'download_link': download_link @@ -65,9 +80,9 @@ def cleanup(parser, filepath, upload_folder): parser, _ = parser_factory.get_parser(parser.output_filename) meta_after = parser.get_meta() os.remove(filepath) - - key = hash_file(os.path.join(upload_folder, output_filename)) - return key, meta_after, output_filename + secret = os.urandom(32).hex() + key = hash_file(os.path.join(upload_folder, output_filename), secret) + return key, secret, meta_after, output_filename def get_file_paths(filename, upload_folder): @@ -77,7 +92,7 @@ def get_file_paths(filename, upload_folder): return complete_path, filepath -def is_valid_api_download_file(filename, key, upload_folder): +def is_valid_api_download_file(filename: str, key: str, secret: str, upload_folder: str) -> [str, str]: if filename != secure_filename(filename): abort(400, message='Insecure filename') @@ -86,6 +101,6 @@ def is_valid_api_download_file(filename, key, upload_folder): if not os.path.exists(complete_path): abort(404, message='File not found') - if hmac.compare_digest(hash_file(complete_path), key) is False: + if hmac.compare_digest(hash_file(complete_path, secret), key) is False: abort(400, message='The file hash does not match') return complete_path, filepath diff --git a/requirements.txt b/requirements.txt index 61f9711..4afa377 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ flask==1.0.3 Flask-RESTful==0.3.7 Flask-Cors==3.0.8 Cerberus==1.3.1 +Flask-Testing==0.8.0 +blinker==1.4 \ No newline at end of file diff --git a/templates/download.html b/templates/download.html index 736c9f5..8100623 100644 --- a/templates/download.html +++ b/templates/download.html @@ -10,7 +10,7 @@ {% endif %}
- + diff --git a/templates/index.html b/templates/index.html index ed583d2..f92ef69 100644 --- a/templates/index.html +++ b/templates/index.html @@ -3,7 +3,7 @@

Remove metadata

- The file you see is just the tip of the iceberg. Remove the hidden meta + The file you see is just the tip of the iceberg. Remove the hidden metadata.

diff --git a/test/test.py b/test/test.py index 02216ac..2d09662 100644 --- a/test/test.py +++ b/test/test.py @@ -6,12 +6,13 @@ import io import os from unittest.mock import patch +from flask_testing import TestCase import main -class Mat2WebTestCase(unittest.TestCase): - def setUp(self): +class Mat2WebTestCase(TestCase): + def create_app(self): os.environ.setdefault('MAT2_ALLOW_ORIGIN_WHITELIST', 'origin1.gnu origin2.gnu') self.upload_folder = tempfile.mkdtemp() app = main.create_app( @@ -20,45 +21,45 @@ class Mat2WebTestCase(unittest.TestCase): 'UPLOAD_FOLDER': self.upload_folder } ) - self.app = app.test_client() + return app def tearDown(self): shutil.rmtree(self.upload_folder) def test_get_root(self): - rv = self.app.get('/') + rv = self.client.get('/') self.assertIn(b'mat2-web', rv.data) def test_check_mimetypes(self): - rv = self.app.get('/') + rv = self.client.get('/') self.assertIn(b'.torrent', rv.data) self.assertIn(b'.ods', rv.data) def test_get_download_dangerous_file(self): - rv = self.app.get('/download/1337/\..\filename') + rv = self.client.get('/download/1337/aabb/\..\filename') self.assertEqual(rv.status_code, 302) def test_get_download_without_key_file(self): - rv = self.app.get('/download/non_existant') + rv = self.client.get('/download/non_existant') self.assertEqual(rv.status_code, 404) def test_get_download_nonexistant_file(self): - rv = self.app.get('/download/1337/non_existant') + rv = self.client.get('/download/1337/aabb/non_existant') self.assertEqual(rv.status_code, 302) def test_get_upload_without_file(self): - rv = self.app.post('/') + rv = self.client.post('/') self.assertEqual(rv.status_code, 302) def test_get_upload_empty_file(self): - rv = self.app.post('/', + rv = self.client.post('/', data=dict( file=(io.BytesIO(b""), 'test.pdf'), ), follow_redirects=False) self.assertEqual(rv.status_code, 302) def test_get_upload_empty_file_redir(self): - rv = self.app.post('/', + rv = self.client.post('/', data=dict( file=(io.BytesIO(b""), 'test.pdf'), ), follow_redirects=True) @@ -67,7 +68,7 @@ class Mat2WebTestCase(unittest.TestCase): self.assertEqual(rv.status_code, 200) def test_get_upload_no_selected_file(self): - rv = self.app.post('/', + rv = self.client.post('/', data=dict( file=(io.BytesIO(b""), ''), ), follow_redirects=True) @@ -86,7 +87,7 @@ class Mat2WebTestCase(unittest.TestCase): 'AAAAAAAAAAApIFnAAAAdGVzdC5qc29uVVQNAAfomo9d6JqPXeiaj111eAsAAQTpAwAABOkDAAB' 'QSwUGAAAAAAIAAgC8AAAAwAAAAAAA' ) - rv = self.app.post('/', + rv = self.client.post('/', data=dict( file=(io.BytesIO(zip_file_bytes), 'test.zip'), ), follow_redirects=True) @@ -94,7 +95,7 @@ class Mat2WebTestCase(unittest.TestCase): self.assertEqual(rv.status_code, 200) def test_get_upload_no_file_name(self): - rv = self.app.post('/', + rv = self.client.post('/', data=dict( file=(io.BytesIO(b"aaa")), ), follow_redirects=True) @@ -102,30 +103,51 @@ class Mat2WebTestCase(unittest.TestCase): self.assertEqual(rv.status_code, 200) def test_get_upload_harmless_file(self): - rv = self.app.post('/', - data=dict( - file=(io.BytesIO(b"Some text"), 'test.txt'), - ), follow_redirects=True) - self.assertIn(b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt', rv.data) + rv = self.client.post( + '/', + data=dict( + file=(io.BytesIO(b"Some text"), 'test.txt'), + ), + follow_redirects=True + ) + download_uri = self.get_context_variable('download_uri') + self.assertIn('/test.cleaned.txt', download_uri) self.assertEqual(rv.status_code, 200) self.assertNotIn('Access-Control-Allow-Origin', rv.headers) - rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt') + rv = self.client.get(download_uri) self.assertEqual(rv.status_code, 200) - rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt') + rv = self.client.get(download_uri) self.assertEqual(rv.status_code, 302) - def test_upload_wrong_hash(self): - rv = self.app.post('/', - data=dict( - file=(io.BytesIO(b"Some text"), 'test.txt'), - ), follow_redirects=True) - self.assertIn(b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt', - rv.data) + def test_upload_wrong_hash_or_secret(self): + rv = self.client.post( + '/', + data=dict( + file=(io.BytesIO(b"Some text"), 'test.txt'), + ), + follow_redirects=True + ) + + download_uri = self.get_context_variable('download_uri') + + self.assertIn('/test.cleaned.txt', download_uri) + self.assertIn('/download', download_uri) self.assertEqual(rv.status_code, 200) - rv = self.app.get('/download/70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt') + uri_parts = download_uri.split("/") + self.assertEqual(len(uri_parts[2]), len(uri_parts[3])) + self.assertEqual(64, len(uri_parts[2])) + + key_uri_parts = uri_parts + key_uri_parts[2] = '70623619c' + rv = self.client.get("/".join(key_uri_parts)) + self.assertEqual(rv.status_code, 302) + + key_uri_parts = uri_parts + key_uri_parts[3] = '70623619c' + rv = self.client.get("/".join(key_uri_parts)) self.assertEqual(rv.status_code, 302) @patch('matweb.file_removal_scheduler.random.randint') @@ -140,19 +162,18 @@ class Mat2WebTestCase(unittest.TestCase): ) app = app.test_client() - request = self.app.post('/', + request = self.client.post('/', data=dict( file=(io.BytesIO(b"Some text"), 'test.txt'), ), follow_redirects=True) self.assertEqual(request.status_code, 200) - request = app.get( - b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt' - ) + + request = app.get(self.get_context_variable('download_uri')) self.assertEqual(302, request.status_code) os.environ['MAT2_MAX_FILE_AGE_FOR_REMOVAL'] = '9999' def test_info_page(self): - rv = self.app.get('/info') + rv = self.client.get('/info') self.assertIn(b'What are metadata?', rv.data) self.assertIn(b'.asc', rv.data) self.assertIn(b'.mp2', rv.data) diff --git a/test/test_api.py b/test/test_api.py index 36aae9d..4925d9e 100644 --- a/test/test_api.py +++ b/test/test_api.py @@ -30,33 +30,26 @@ class Mat2APITestCase(unittest.TestCase): del os.environ['MAT2_ALLOW_ORIGIN_WHITELIST'] def test_api_upload_valid(self): - request = self.app.post('/api/upload', - data='{"file_name": "test_name.jpg", ' - '"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf' - 'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}', - headers={'content-type': 'application/json'} - ) + request = self.app.post( + '/api/upload', + data='{"file_name": "test_name.jpg", ' + '"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf' + 'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}', + headers={'content-type': 'application/json'} + ) self.assertEqual(request.headers['Content-Type'], 'application/json') self.assertEqual(request.headers['Access-Control-Allow-Origin'], 'origin1.gnu') self.assertEqual(request.status_code, 200) data = request.get_json() - expected = { - 'output_filename': 'test_name.cleaned.jpg', - 'mime': 'image/jpeg', - 'key': '81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161', - 'meta': { - 'BitDepth': 8, - 'ColorType': 'RGB with Alpha', - 'Compression': 'Deflate/Inflate', - 'Filter': 'Adaptive', - 'Interlace': 'Noninterlaced' - }, - 'meta_after': {}, - 'download_link': 'http://localhost/api/download/' - '81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test_name.cleaned.jpg' - } - self.assertEqual(data, expected) + self.assertEqual(data['output_filename'], 'test_name.cleaned.jpg') + self.assertEqual(data['output_filename'], 'test_name.cleaned.jpg') + self.assertEqual(data['mime'], 'image/jpeg') + self.assertEqual(len(data['secret']), 64) + self.assertEqual(len(data['key']), 64) + self.assertNotEqual(data['key'], data['secret']) + self.assertTrue('http://localhost/api/download/' in data['download_link']) + self.assertTrue('test_name.cleaned.jpg' in data['download_link']) def test_api_upload_missing_params(self): request = self.app.post('/api/upload', @@ -141,7 +134,6 @@ class Mat2APITestCase(unittest.TestCase): error = request.get_json()['message'] self.assertEqual(error, 'Unable to clean application/zip') - def test_api_download(self): request = self.app.post('/api/upload', data='{"file_name": "test_name.jpg", ' @@ -152,25 +144,36 @@ class Mat2APITestCase(unittest.TestCase): self.assertEqual(request.status_code, 200) data = request.get_json() - request = self.app.get('http://localhost/api/download/' + request = self.app.get('http://localhost/api/download/161/' '81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test name.cleaned.jpg') self.assertEqual(request.status_code, 400) error = request.get_json()['message'] self.assertEqual(error, 'Insecure filename') - request = self.app.get('http://localhost/api/download/' - '81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/' - 'wrong_file_name.jpg') + request = self.app.get(data['download_link'].replace('test_name', 'wrong_test')) self.assertEqual(request.status_code, 404) error = request.get_json()['message'] self.assertEqual(error, 'File not found') - request = self.app.get('http://localhost/api/download/81a541f9e/test_name.cleaned.jpg') + uri_parts = data['download_link'].split("/") + self.assertEqual(len(uri_parts[5]), len(uri_parts[6])) + self.assertEqual(64, len(uri_parts[5])) + + key_uri_parts = uri_parts + key_uri_parts[5] = '70623619c' + request = self.app.get("/".join(key_uri_parts)) self.assertEqual(request.status_code, 400) error = request.get_json()['message'] self.assertEqual(error, 'The file hash does not match') + key_uri_parts = uri_parts + key_uri_parts[6] = '70623619c' + request = self.app.get("/".join(key_uri_parts)) + self.assertEqual(request.status_code, 400) + error = request.get_json()['message'] + self.assertEqual(error, 'The file hash does not match') + request = self.app.head(data['download_link']) self.assertEqual(request.status_code, 200) self.assertEqual(request.headers['Content-Length'], '633') @@ -205,11 +208,13 @@ class Mat2APITestCase(unittest.TestCase): u'download_list': [ { u'file_name': upload_one['output_filename'], - u'key': upload_one['key'] + u'key': upload_one['key'], + u'secret': upload_one['secret'] }, { u'file_name': upload_two['output_filename'], - u'key': upload_two['key'] + u'key': upload_two['key'], + u'secret': upload_two['secret'] } ] } @@ -261,7 +266,8 @@ class Mat2APITestCase(unittest.TestCase): u'download_list': [ { u'file_name': 'invalid_file_name', - u'key': 'invalid_key' + u'key': 'invalid_key', + u'secret': 'invalid_secret' } ] } @@ -348,11 +354,13 @@ class Mat2APITestCase(unittest.TestCase): u'download_list': [ { u'file_name': 'invalid_file_name1', - u'key': 'invalid_key1' + u'key': 'invalid_key1', + u'secret': 'invalid_secret1' }, { u'file_name': 'invalid_file_name2', - u'key': 'invalid_key2' + u'key': 'invalid_key2', + u'secret': 'invalid_secret2' } ] }