mat2-web/test/test_api.py
2025-01-12 12:11:06 +00:00

553 lines
22 KiB
Python

import json
import os
import shutil
import tempfile
import unittest
import zipfile
import io
from unittest.mock import patch
from openapi_spec_validator import validate_spec
from six import BytesIO
import main
class Mat2APITestCase(unittest.TestCase):
def setUp(self):
os.environ.setdefault('MAT2_ALLOW_ORIGIN_WHITELIST', 'origin1.gnu origin2.gnu')
self.upload_folder = tempfile.mkdtemp()
app = main.create_app(
test_config={
'TESTING': True,
'UPLOAD_FOLDER': self.upload_folder
}
)
self.app = app.test_client()
def tearDown(self):
shutil.rmtree(self.upload_folder)
if os.environ.get('MAT2_ALLOW_ORIGIN_WHITELIST'):
del os.environ['MAT2_ALLOW_ORIGIN_WHITELIST']
def test_api_upload_valid(self):
request = self.app.post(
'/api/upload',
data='{"file_name": "test_name.jpg", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
self.assertEqual(request.status_code, 201)
data = request.get_json()
self.assertEqual(data['inactive_after_sec'], 15 * 60)
self.assertEqual(data['output_filename'], 'test_name.cleaned.jpg')
self.assertEqual(data['mime'], 'image/jpeg')
self.assertEqual(len(data['secret']), 64)
self.assertEqual(len(data['key']), 64)
self.assertNotEqual(data['key'], data['secret'])
self.assertTrue('http://localhost/api/download/' in data['download_link'])
self.assertTrue('test_name.cleaned.jpg' in data['download_link'])
def test_api_upload_missing_params(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg"}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.status_code, 400)
error = request.get_json()['message']
self.assertEqual(error['file'], 'Post parameter is not specified: file')
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg", "file": "invalid base46 string"}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.status_code, 400)
error = request.get_json()['message']
self.assertEqual(error, 'Failed decoding file')
def test_api_not_supported(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.pdf", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.status_code, 415)
error = request.get_json()['message']
self.assertEqual(error, 'The filetype is not supported')
def test_api_not_supported_extension(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.csv", '
'"file": "MSwyLDMKNCw1LDY="}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.status_code, 415)
error = request.get_json()['message']
self.assertEqual(error, 'The filetype is not supported')
def test_api_supported_extensions(self):
rv = self.app.get('/api/extension')
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.headers['Content-Type'], 'application/json')
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
extensions = json.loads(rv.data.decode('utf-8'))
self.assertIn('.pot', extensions)
self.assertIn('.jpg', extensions)
self.assertIn('.png', extensions)
self.assertIn('.zip', extensions)
def test_api_cors_not_set(self):
del os.environ['MAT2_ALLOW_ORIGIN_WHITELIST']
app = main.create_app()
app.config.update(
TESTING=True
)
app = app.test_client()
rv = app.get('/api/extension')
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], '*')
def test_api_cors(self):
rv = self.app.get('/api/extension')
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
rv = self.app.get('/api/extension', headers={'Origin': 'origin2.gnu'})
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin2.gnu')
rv = self.app.get('/api/extension', headers={'Origin': 'origin1.gnu'})
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
def test_api_cleaning_failed(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.zip", '
'"file": "UEsDBBQACAAIAPicPE8AAAAAAAAAAAAAAAAXACAAZmFpbGluZy5ub3Qt'
'd29ya2luZy1leHRVVA0AB+Saj13kmo9d5JqPXXV4CwABBOkDAAAE6QMAAAMAUEsHCAAA'
'AAACAAAAAAAAAFBLAwQUAAgACAD6nDxPAAAAAAAAAAAAAAAACQAgAHRlc3QuanNvblVUD'
'QAH6JqPXeiaj13omo9ddXgLAAEE6QMAAATpAwAAAwBQSwcIAAAAAAIAAAAAAAAAUEsBAhQD'
'FAAIAAgA+Jw8TwAAAAACAAAAAAAAABcAIAAAAAAAAAAAAKSBAAAAAGZhaWxpbmcubm90LXd'
'vcmtpbmctZXh0VVQNAAfkmo9d5JqPXeSaj111eAsAAQTpAwAABOkDAABQSwECFAMUAAgACAD6'
'nDxPAAAAAAIAAAAAAAAACQAgAAAAAAAAAAAApIFnAAAAdGVzdC5qc29uVVQNAAfomo9d6JqPXe'
'iaj111eAsAAQTpAwAABOkDAABQSwUGAAAAAAIAAgC8AAAAwAAAAAAA"}',
headers={'content-type': 'application/json'}
)
error = request.get_json()['message']
self.assertEqual(request.status_code, 400)
self.assertEqual(error, 'Unable to clean application/zip')
def test_api_download(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
print(request.get_json())
self.assertEqual(request.status_code, 201)
data = request.get_json()
request = self.app.get('http://localhost/api/download/161/'
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test name.cleaned.jpg')
self.assertEqual(request.status_code, 400)
error = request.get_json()['message']
self.assertEqual(error, 'Insecure filename')
request = self.app.get(data['download_link'].replace('test_name', 'wrong_test'))
self.assertEqual(request.status_code, 404)
error = request.get_json()['message']
self.assertEqual(error, 'File not found')
uri_parts = data['download_link'].split("/")
self.assertEqual(len(uri_parts[5]), len(uri_parts[6]))
self.assertEqual(64, len(uri_parts[5]))
key_uri_parts = uri_parts
key_uri_parts[5] = '70623619c'
request = self.app.get("/".join(key_uri_parts))
self.assertEqual(request.status_code, 400)
error = request.get_json()['message']
self.assertEqual(error, 'The file hash does not match')
key_uri_parts = uri_parts
key_uri_parts[6] = '70623619c'
request = self.app.get("/".join(key_uri_parts))
self.assertEqual(request.status_code, 400)
error = request.get_json()['message']
self.assertEqual(error, 'The file hash does not match')
request = self.app.head(data['download_link'])
self.assertEqual(request.status_code, 200)
self.assertEqual(request.headers['Content-Length'], '633')
request = self.app.get(data['download_link'])
self.assertEqual(request.status_code, 200)
self.assertIn('attachment; filename=test_name.cleaned.jpg', request.headers['Content-Disposition'])
request = self.app.get(data['download_link'])
self.assertEqual(request.status_code, 404)
def test_api_bulk_download(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
upload_one = request.get_json()
self.assertEqual(request.status_code, 201)
request = self.app.post('/api/upload',
data='{"file_name": "test_name_two.jpg", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42'
'mO0vqpQDwAENAGxOnU0jQAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.status_code, 201)
upload_two = request.get_json()
post_body = {
u'download_list': [
{
u'file_name': upload_one['output_filename'],
u'key': upload_one['key'],
u'secret': upload_one['secret']
},
{
u'file_name': upload_two['output_filename'],
u'key': upload_two['key'],
u'secret': upload_two['secret']
}
]
}
request = self.app.post('/api/download/bulk',
data=json.dumps(post_body),
headers={'content-type': 'application/json'}
)
response = request.get_json()
self.assertEqual(request.status_code, 201)
self.assertIn(
"http://localhost/api/download/",
response['download_link']
)
self.assertIn(
".cleaned.zip",
response['download_link']
)
self.assertIn('files.', response['output_filename'])
self.assertIn('cleaned.zip', response['output_filename'])
self.assertEqual(15 * 60, response['inactive_after_sec'])
self.assertIn(response['mime'], 'application/zip')
self.assertEqual(response['meta_after'], {})
request = self.app.head(response['download_link'])
self.assertEqual(request.status_code, 200)
self.assertTrue(request.headers['Content-Length'])
request = self.app.get(response['download_link'])
self.assertIn('attachment; filename=files.', request.headers['Content-Disposition'])
zip_response = zipfile.ZipFile(BytesIO(request.data))
self.assertEqual(2, len(zip_response.namelist()))
for name in zip_response.namelist():
self.assertIn('.cleaned.jpg', name)
self.assertEqual(request.status_code, 200)
request = self.app.get(response['download_link'])
self.assertEqual(request.status_code, 404)
request = self.app.get(upload_one['download_link'])
self.assertEqual(request.status_code, 404)
request = self.app.get(upload_two['download_link'])
self.assertEqual(request.status_code, 404)
def test_api_bulk_empty_body(self):
request = self.app.post(
'/api/download/bulk',
)
self.assertEqual(415, request.status_code)
error_message = request.get_json()['message']
self.assertEqual("Did not attempt to load JSON data because the request Content-Type was not 'application/json'.", error_message)
def test_api_bulk_download_validation(self):
post_body = {
u'download_list': [
{
u'file_name': 'invalid_file_name',
u'key': 'invalid_key',
u'secret': 'invalid_secret'
}
]
}
request = self.app.post('/api/download/bulk',
data=json.dumps(post_body),
headers={'content-type': 'application/json'}
)
response = request.get_json()
self.assertEqual(response['message']['download_list'][0], 'min length is 2')
self.assertEqual(request.status_code, 400)
post_body = {
u'download_list': [{}, {}]
}
request = self.app.post('/api/download/bulk',
data=json.dumps(post_body),
headers={'content-type': 'application/json'}
)
response = request.get_json()
self.assertEqual(response['message']['download_list'][0]['0'][0]['file_name'][0], 'required field')
self.assertEqual(response['message']['download_list'][0]['0'][0]['key'][0], 'required field')
self.assertEqual(request.status_code, 400)
post_body = {
u'download_list': [
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
},
{
u'file_name': 'test.jpg',
u'key': 'key'
}
]
}
request = self.app.post('/api/download/bulk',
data=json.dumps(post_body),
headers={'content-type': 'application/json'}
)
response = request.get_json()
self.assertEqual(response['message']['download_list'][0], 'max length is 10')
self.assertEqual(request.status_code, 400)
post_body = {
u'download_list': [
{
u'file_name': 'invalid_file_name1',
u'key': 'invalid_key1',
u'secret': 'invalid_secret1'
},
{
u'file_name': 'invalid_file_name2',
u'key': 'invalid_key2',
u'secret': 'invalid_secret2'
}
]
}
request = self.app.post('/api/download/bulk',
data=json.dumps(post_body),
headers={'content-type': 'application/json'}
)
response = request.get_json()
self.assertEqual('File not found', response['message'])
@patch('matweb.file_removal_scheduler.random.randint')
def test_api_upload_leftover(self, randint_mock):
os.environ['MAT2_MAX_FILE_AGE_FOR_REMOVAL'] = '0'
self.upload_folder = tempfile.mkdtemp()
app = main.create_app(
test_config={
'TESTING': True,
'UPLOAD_FOLDER': self.upload_folder
}
)
app = app.test_client()
randint_mock.return_value = 1
self.upload_download_test_jpg_and_assert_response_code(app, 200)
randint_mock.return_value = 0
self.upload_download_test_jpg_and_assert_response_code(app, 404)
os.environ['MAT2_MAX_FILE_AGE_FOR_REMOVAL'] = str(15 * 60)
def upload_download_test_jpg_and_assert_response_code(self, app, code):
request = app.post('/api/upload',
data='{"file_name": "test_name.jpg", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
download_link = request.get_json()['download_link']
request = app.get(download_link)
self.assertEqual(code, request.status_code)
def test_download_naughty_input(self):
request = self.app.get(
'/api/download/%F2%8C%BF%BD%F1%AE%98%A3%E4%B7%B8%F2%9B%94%BE%F2%A7%8B%83%F1%B1%80%9F%F3%AA%89%A6/1p/str'
)
error_message = request.get_json()['message']
self.assertEqual(404, request.status_code)
self.assertEqual("File not found", error_message)
def test_download_bulk_naughty_input(self):
request = self.app.post(
'/api/download/bulk',
data='\"\'\'\'&type %SYSTEMROOT%\\\\win.ini\"',
headers={'content-type': 'application/json'}
)
error_message = request.get_json()['message']
self.assertEqual(400, request.status_code)
self.assertEqual("Invalid Post Body", error_message)
def test_upload_naughty_input(self):
request = self.app.post('/api/upload',
data='{"file_name": "\\\\", '
'"file": "\\\\"}',
headers={'content-type': 'application/json'}
)
error_message = request.get_json()['message']
self.assertEqual(415, request.status_code)
self.assertEqual("The filetype is not supported", error_message)
request = self.app.post('/api/upload',
data='{"file_name": "", '
'"file": ""}',
headers={'content-type': 'application/json'}
)
error_message = request.get_json()['message']
self.assertEqual(400, request.status_code)
self.assertEqual("Failed decoding file", error_message)
request = self.app.post('/api/upload',
data="\"\'\'\'&&cat$z $z/etc$z/passwdu0000\"",
headers={'content-type': 'application/json'}
)
error_message = request.get_json()['message']
self.assertEqual(400, request.status_code)
self.assertEqual("Failed parsing body", error_message)
def test_valid_opena_api_spec(self):
spec = self.app.get('apispec_1.json').get_json()
# Test workaround due to https://github.com/flasgger/flasgger/issues/374
validate_spec(spec)
def test_remove_metadata(self):
r = self.app.post(
'/api/remove_metadata',
data=dict(
file=(io.BytesIO(b""), 'test.txt'),
),
follow_redirects=False
)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.headers['Content-Disposition'], 'attachment; filename=test.cleaned.txt')
self.assertEqual(r.headers['Content-Type'], 'text/plain; charset=utf-8')
self.assertEqual(r.data, b'')
def test_remove_metadata_not_supported_extension(self):
r = self.app.post(
'/api/remove_metadata',
data=dict(
file=(io.BytesIO(b"1,2,3 \n 4,5,6"), 'test.csv'),
),
follow_redirects=False
)
self.assertEqual(
r.get_json()['message'],
'The filetype is not supported'
)
self.assertEqual(r.status_code, 415)
def test_remove_metdata_validation(self):
r = self.app.post(
'/api/remove_metadata',
data=dict(
fileNotExisting=(io.BytesIO(b""), 'test.random'),
),
follow_redirects=False
)
self.assertEqual(
r.get_json()['message'],
'File part missing: Multipart filename and non-chunked-transfer-encoding required'
)
self.assertEqual(r.status_code, 400)
r = self.app.post(
'/api/remove_metadata',
data=dict(
file=(io.BytesIO(b""), ''),
),
follow_redirects=False
)
self.assertEqual(r.get_json()['message'], 'No selected `file`')
self.assertEqual(r.status_code, 400)
r = self.app.post(
'/api/remove_metadata',
data=dict(
file=(io.BytesIO(b""), '../../'),
),
follow_redirects=False
)
self.assertEqual(r.get_json()['message'], 'The filetype is not supported')
self.assertEqual(r.status_code, 415)
r = self.app.post(
'/api/remove_metadata',
data=dict(
file=(io.BytesIO(b""), 'test.random'),
),
follow_redirects=False
)
self.assertEqual(r.get_json()['message'], 'The filetype is not supported')
self.assertEqual(r.status_code, 415)
if __name__ == '__main__':
unittest.main()