added a docker dev environment

Signed-off-by: Jan Friedli <jan.friedli@immerda.ch>
This commit is contained in:
JF 2019-07-09 14:56:21 -07:00 committed by jvoisin
parent 9d155d171e
commit 06346e1946
8 changed files with 431 additions and 86 deletions

View file

@ -23,6 +23,8 @@ tests:debian:
stage: test stage: test
script: script:
- apt-get -qqy update - apt-get -qqy update
- apt-get -qqy install --no-install-recommends mat2 python3-flask python3-coverage - apt-get -qqy install --no-install-recommends mat2 python3-flask python3-coverage python3-pip python3-setuptools
- python3-coverage run --branch --include main.py -m unittest discover - pip3 install wheel
- pip3 install -r requirements.txt
- python3-coverage run --branch --include main.py -m unittest discover -s test
- python3-coverage report -m - python3-coverage report -m

View file

@ -52,6 +52,11 @@ Nginx is the recommended web engine, but you can also use Apache if you prefer,
by copying [this file](https://0xacab.org/jvoisin/mat2-web/tree/master/config/apache2.config) by copying [this file](https://0xacab.org/jvoisin/mat2-web/tree/master/config/apache2.config)
to your `/etc/apache2/sites-enabled/mat2-web` file. to your `/etc/apache2/sites-enabled/mat2-web` file.
Then configure the environment variable: `MAT2_ALLOW_ORIGIN_WHITELIST=https://myhost1.org https://myhost2.org`
Note that you can add multiple hosts from which you want to accept API requests. These need to be separated by
a space.
**IMPORTANT:** The default value if the variable is not set is: `Access-Control-Allow-Origin: *`
Finally, restart uWSGI and your web server: Finally, restart uWSGI and your web server:
``` ```
@ -85,6 +90,63 @@ the docker dev environment. Mat2-web is now accessible on your host machine at `
Every code change triggers a restart of the app. Every code change triggers a restart of the app.
If you want to add/remove dependencies you have to rebuild the container. If you want to add/remove dependencies you have to rebuild the container.
# RESTful API
## Upload Endpoint
**Endpoint:** `/api/upload`
**HTTP Verbs:** POST
**Body:**
```json
{
"file_name": "my-filename.jpg",
"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="
}
```
The `file_name` parameter takes the file name.
The `file` parameter is the base64 encoded file which will be cleaned.
**Example Response:**
```json
{
"output_filename": "fancy.cleaned.jpg",
"key": "81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161",
"meta": {
"BitDepth": 8,
"ColorType": "RGB with Alpha",
"Compression": "Deflate/Inflate",
"Filter": "Adaptive",
"Interlace": "Noninterlaced"
},
"meta_after": {},
"download_link": "http://localhost:5000/download/81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/fancy.cleaned.jpg"
}
```
## Supported Extensions Endpoint
**Endpoint:** `/api/extension`
**HTTP Verbs:** GET
**Example Response (shortened):**
```json
[
".asc",
".avi",
".bat",
".bmp",
".brf",
".c",
".css",
".docx",
".epub"
]
```
# Custom templates # Custom templates
You can override the default templates from `templates/` by putting replacements You can override the default templates from `templates/` by putting replacements

View file

@ -5,6 +5,7 @@ services:
environment: environment:
- FLASK_APP=main.py - FLASK_APP=main.py
- FLASK_ENV=development - FLASK_ENV=development
- MAT2_ALLOW_ORIGIN_WHITELIST=*
ports: ports:
- "5000:5000" - "5000:5000"
volumes: volumes:

234
main.py
View file

@ -1,107 +1,191 @@
import os import os
import hashlib
import hmac import hmac
import mimetypes as mtype import mimetypes as mtype
import jinja2
import base64
import io
import binascii
import utils
from libmat2 import parser_factory from libmat2 import parser_factory
from flask import Flask, flash, request, redirect, url_for, render_template, send_from_directory, after_this_request
from flask import Flask, flash, request, redirect, url_for, render_template from flask_restful import Resource, Api, reqparse, abort
from flask import send_from_directory, after_this_request
import jinja2
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from werkzeug.datastructures import FileStorage
from flask_cors import CORS
from urllib.parse import urljoin
app = Flask(__name__) def create_app(test_config=None):
app.config['SECRET_KEY'] = os.urandom(32) app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = './uploads/' app.config['SECRET_KEY'] = os.urandom(32)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB app.config['UPLOAD_FOLDER'] = './uploads/'
app.config['CUSTOM_TEMPLATES_DIR'] = 'custom_templates' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
app.config['CUSTOM_TEMPLATES_DIR'] = 'custom_templates'
app.jinja_loader = jinja2.ChoiceLoader([ # type: ignore app.jinja_loader = jinja2.ChoiceLoader([ # type: ignore
jinja2.FileSystemLoader(app.config['CUSTOM_TEMPLATES_DIR']), jinja2.FileSystemLoader(app.config['CUSTOM_TEMPLATES_DIR']),
app.jinja_loader, app.jinja_loader,
]) ])
def __hash_file(filepath: str) -> str: api = Api(app)
sha256 = hashlib.sha256() CORS(app, resources={r"/api/*": {"origins": utils.get_allow_origin_header_value()}})
with open(filepath, 'rb') as f:
while True:
data = f.read(65536) # read the file by chunk of 64k
if not data:
break
sha256.update(data)
return sha256.hexdigest()
@app.route('/download/<string:key>/<string:filename>')
def download_file(key:str, filename:str):
if filename != secure_filename(filename):
return redirect(url_for('upload_file'))
@app.route('/download/<string:key>/<string:filename>') complete_path, filepath = get_file_paths(filename)
def download_file(key:str, filename:str):
if filename != secure_filename(filename):
return redirect(url_for('upload_file'))
filepath = secure_filename(filename) if not os.path.exists(complete_path):
return redirect(url_for('upload_file'))
if hmac.compare_digest(utils.hash_file(complete_path), key) is False:
return redirect(url_for('upload_file'))
complete_path = os.path.join(app.config['UPLOAD_FOLDER'], filepath) @after_this_request
if not os.path.exists(complete_path): def remove_file(response):
return redirect(url_for('upload_file')) os.remove(complete_path)
if hmac.compare_digest(__hash_file(complete_path), key) is False: return response
print('hash: %s, key: %s' % (__hash_file(complete_path), key)) return send_from_directory(app.config['UPLOAD_FOLDER'], filepath)
return redirect(url_for('upload_file'))
@after_this_request @app.route('/', methods=['GET', 'POST'])
def remove_file(response): def upload_file():
os.remove(complete_path) utils.check_upload_folder(app.config['UPLOAD_FOLDER'])
return response mimetypes = get_supported_extensions()
return send_from_directory(app.config['UPLOAD_FOLDER'], filepath)
@app.route('/', methods=['GET', 'POST']) if request.method == 'POST':
def upload_file(): if 'file' not in request.files: # check if the post request has the file part
if not os.path.exists(app.config['UPLOAD_FOLDER']): flash('No file part')
os.mkdir(app.config['UPLOAD_FOLDER']) return redirect(request.url)
mimetypes = set() uploaded_file = request.files['file']
for parser in parser_factory._get_parsers(): if not uploaded_file.filename:
for m in parser.mimetypes: flash('No selected file')
mimetypes |= set(mtype.guess_all_extensions(m, strict=False)) return redirect(request.url)
# since `guess_extension` might return `None`, we need to filter it out
mimetypes = sorted(filter(None, mimetypes))
if request.method == 'POST': filename, filepath = save_file(uploaded_file)
if 'file' not in request.files: # check if the post request has the file part parser, mime = get_file_parser(filepath)
flash('No file part')
return redirect(request.url) if parser is None:
uploaded_file = request.files['file'] flash('The type %s is not supported' % mime)
if not uploaded_file.filename: return redirect(url_for('upload_file'))
flash('No selected file')
return redirect(request.url) meta = parser.get_meta()
filename = secure_filename(uploaded_file.filename)
if parser.remove_all() is not True:
flash('Unable to clean %s' % mime)
return redirect(url_for('upload_file'))
key, meta_after, output_filename = cleanup(parser, filepath)
return render_template(
'download.html', mimetypes=mimetypes, meta=meta, filename=output_filename, meta_after=meta_after, key=key
)
max_file_size = int(app.config['MAX_CONTENT_LENGTH'] / 1024 / 1024)
return render_template('index.html', max_file_size=max_file_size, mimetypes=mimetypes)
def get_supported_extensions():
extensions = set()
for parser in parser_factory._get_parsers():
for m in parser.mimetypes:
extensions |= set(mtype.guess_all_extensions(m, strict=False))
# since `guess_extension` might return `None`, we need to filter it out
return sorted(filter(None, extensions))
def save_file(file):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
uploaded_file.save(os.path.join(filepath)) file.save(os.path.join(filepath))
return filename, filepath
def get_file_parser(filepath: str):
parser, mime = parser_factory.get_parser(filepath) parser, mime = parser_factory.get_parser(filepath)
if parser is None: return parser, mime
flash('The type %s is not supported' % mime)
return redirect(url_for('upload_file'))
meta = parser.get_meta() def cleanup(parser, filepath):
if parser.remove_all() is not True:
flash('Unable to clean %s' % mime)
return redirect(url_for('upload_file'))
output_filename = os.path.basename(parser.output_filename) output_filename = os.path.basename(parser.output_filename)
# Get metadata after cleanup
parser, _ = parser_factory.get_parser(parser.output_filename) parser, _ = parser_factory.get_parser(parser.output_filename)
meta_after = parser.get_meta() meta_after = parser.get_meta()
os.remove(filepath) os.remove(filepath)
key = __hash_file(os.path.join(app.config['UPLOAD_FOLDER'], output_filename)) key = utils.hash_file(os.path.join(app.config['UPLOAD_FOLDER'], output_filename))
return key, meta_after, output_filename
return render_template('download.html', mimetypes=mimetypes, meta=meta, filename=output_filename, meta_after=meta_after, key=key) def get_file_paths(filename):
filepath = secure_filename(filename)
max_file_size = int(app.config['MAX_CONTENT_LENGTH'] / 1024 / 1024) complete_path = os.path.join(app.config['UPLOAD_FOLDER'], filepath)
return render_template('index.html', max_file_size=max_file_size, mimetypes=mimetypes) return complete_path, filepath
class APIUpload(Resource):
def post(self):
utils.check_upload_folder(app.config['UPLOAD_FOLDER'])
req_parser = reqparse.RequestParser()
req_parser.add_argument('file_name', type=str, required=True, help='Post parameter is not specified: file_name')
req_parser.add_argument('file', type=str, required=True, help='Post parameter is not specified: file')
args = req_parser.parse_args()
try:
file_data = base64.b64decode(args['file'])
except binascii.Error as err:
abort(400, message='Failed decoding file: ' + str(err))
file = FileStorage(stream=io.BytesIO(file_data), filename=args['file_name'])
filename, filepath = save_file(file)
parser, mime = get_file_parser(filepath)
if parser is None:
abort(415, message='The type %s is not supported' % mime)
meta = parser.get_meta()
if not parser.remove_all():
abort(500, message='Unable to clean %s' % mime)
key, meta_after, output_filename = cleanup(parser, filepath)
return {
'output_filename': output_filename,
'key': key,
'meta': meta,
'meta_after': meta_after,
'download_link': urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename))
}
class APIDownload(Resource):
def get(self, key: str, filename: str):
if filename != secure_filename(filename):
abort(400, message='Insecure filename')
complete_path, filepath = get_file_paths(filename)
if not os.path.exists(complete_path):
abort(404, message='File not found')
return redirect(url_for('upload_file'))
if hmac.compare_digest(utils.hash_file(complete_path), key) is False:
abort(400, message='The file hash does not match')
return redirect(url_for('upload_file'))
@after_this_request
def remove_file(response):
os.remove(complete_path)
return response
return send_from_directory(app.config['UPLOAD_FOLDER'], filepath)
class APIMSupportedExtensions(Resource):
def get(self):
return get_supported_extensions()
api.add_resource(APIUpload, '/api/upload')
api.add_resource(APIDownload, '/api/download/<string:key>/<string:filename>')
api.add_resource(APIMSupportedExtensions, '/api/extension')
return app
if __name__ == '__main__': # pragma: no cover if __name__ == '__main__': # pragma: no cover
app.run() create_app().run()

View file

@ -3,3 +3,5 @@ ffmpeg==1.4
bubblewrap==1.2.0 bubblewrap==1.2.0
mat2==0.9.0 mat2==0.9.0
flask==1.0.3 flask==1.0.3
Flask-RESTful==0.3.7
Flask-Cors==3.0.8

View file

@ -2,18 +2,24 @@ import unittest
import tempfile import tempfile
import shutil import shutil
import io import io
import os
import main import main
class FlaskrTestCase(unittest.TestCase): class Mat2WebTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
main.app.testing = True os.environ.setdefault('MAT2_ALLOW_ORIGIN_WHITELIST', 'origin1.gnu origin2.gnu')
main.app.config['UPLOAD_FOLDER'] = tempfile.mkdtemp() app = main.create_app()
self.app = main.app.test_client() self.upload_folder = tempfile.mkdtemp()
app.config.update(
TESTING=True,
UPLOAD_FOLDER=self.upload_folder
)
self.app = app.test_client()
def tearDown(self): def tearDown(self):
shutil.rmtree(main.app.config['UPLOAD_FOLDER']) shutil.rmtree(self.upload_folder)
def test_get_root(self): def test_get_root(self):
rv = self.app.get('/') rv = self.app.get('/')
@ -36,7 +42,6 @@ class FlaskrTestCase(unittest.TestCase):
rv = self.app.get('/download/1337/non_existant') rv = self.app.get('/download/1337/non_existant')
self.assertEqual(rv.status_code, 302) self.assertEqual(rv.status_code, 302)
def test_get_upload_without_file(self): def test_get_upload_without_file(self):
rv = self.app.post('/') rv = self.app.post('/')
self.assertEqual(rv.status_code, 302) self.assertEqual(rv.status_code, 302)
@ -60,12 +65,11 @@ class FlaskrTestCase(unittest.TestCase):
def test_get_upload_no_file_name(self): def test_get_upload_no_file_name(self):
rv = self.app.post('/', rv = self.app.post('/',
data=dict( data=dict(
file=(io.BytesIO(b"aaa"), ''), file=(io.BytesIO(b"aaa")),
), follow_redirects=True) ), follow_redirects=True)
self.assertIn(b'No file part', rv.data) self.assertIn(b'No file part', rv.data)
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
def test_get_upload_harmless_file(self): def test_get_upload_harmless_file(self):
rv = self.app.post('/', rv = self.app.post('/',
data=dict( data=dict(
@ -73,6 +77,7 @@ class FlaskrTestCase(unittest.TestCase):
), follow_redirects=True) ), follow_redirects=True)
self.assertIn(b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt', rv.data) self.assertIn(b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt', rv.data)
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
self.assertNotIn('Access-Control-Allow-Origin', rv.headers)
rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt') rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt')
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
@ -80,6 +85,18 @@ class FlaskrTestCase(unittest.TestCase):
rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt') rv = self.app.get('/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt')
self.assertEqual(rv.status_code, 302) self.assertEqual(rv.status_code, 302)
def test_upload_wrong_hash(self):
rv = self.app.post('/',
data=dict(
file=(io.BytesIO(b"Some text"), 'test.txt'),
), follow_redirects=True)
self.assertIn(b'/download/4c2e9e6da31a64c70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt',
rv.data)
self.assertEqual(rv.status_code, 200)
rv = self.app.get('/download/70623619c449a040968cdbea85945bf384fa30ed2d5d24fa3/test.cleaned.txt')
self.assertEqual(rv.status_code, 302)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

155
test/test_api.py Normal file
View file

@ -0,0 +1,155 @@
import unittest
import tempfile
import shutil
import json
import os
import main
class Mat2APITestCase(unittest.TestCase):
def setUp(self):
os.environ.setdefault('MAT2_ALLOW_ORIGIN_WHITELIST', 'origin1.gnu origin2.gnu')
app = main.create_app()
self.upload_folder = tempfile.mkdtemp()
app.config.update(
TESTING=True,
UPLOAD_FOLDER=self.upload_folder
)
self.app = app.test_client()
def tearDown(self):
shutil.rmtree(self.upload_folder)
if os.environ.get('MAT2_ALLOW_ORIGIN_WHITELIST'):
del os.environ['MAT2_ALLOW_ORIGIN_WHITELIST']
def test_api_upload_valid(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
self.assertEqual(request.status_code, 200)
data = json.loads(request.data.decode('utf-8'))
expected = {
'output_filename': 'test_name.cleaned.jpg',
'key': '81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161',
'meta': {
'BitDepth': 8,
'ColorType': 'RGB with Alpha',
'Compression': 'Deflate/Inflate',
'Filter': 'Adaptive',
'Interlace': 'Noninterlaced'
},
'meta_after': {},
'download_link': 'http://localhost/api/download/'
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test_name.cleaned.jpg'
}
self.assertEqual(data, expected)
def test_api_upload_missing_params(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg"}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.status_code, 400)
error = json.loads(request.data.decode('utf-8'))['message']
self.assertEqual(error['file'], 'Post parameter is not specified: file')
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg", "file": "invalid base46 string"}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.status_code, 400)
error = json.loads(request.data.decode('utf-8'))['message']
self.assertEqual(error, 'Failed decoding file: Incorrect padding')
def test_api_not_supported(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.pdf", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.headers['Content-Type'], 'application/json')
self.assertEqual(request.status_code, 415)
error = json.loads(request.data.decode('utf-8'))['message']
self.assertEqual(error, 'The type application/pdf is not supported')
def test_api_supported_extensions(self):
rv = self.app.get('/api/extension')
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.headers['Content-Type'], 'application/json')
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
extensions = json.loads(rv.data.decode('utf-8'))
self.assertIn('.pot', extensions)
self.assertIn('.asc', extensions)
self.assertIn('.png', extensions)
self.assertIn('.zip', extensions)
def test_api_cors_not_set(self):
del os.environ['MAT2_ALLOW_ORIGIN_WHITELIST']
app = main.create_app()
app.config.update(
TESTING=True
)
app = app.test_client()
rv = app.get('/api/extension')
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], '*')
def test_api_cors(self):
rv = self.app.get('/api/extension')
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
rv = self.app.get('/api/extension', headers={'Origin': 'origin2.gnu'})
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin2.gnu')
rv = self.app.get('/api/extension', headers={'Origin': 'origin1.gnu'})
self.assertEqual(rv.headers['Access-Control-Allow-Origin'], 'origin1.gnu')
def test_api_download(self):
request = self.app.post('/api/upload',
data='{"file_name": "test_name.jpg", '
'"file": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAf'
'FcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="}',
headers={'content-type': 'application/json'}
)
self.assertEqual(request.status_code, 200)
data = json.loads(request.data.decode('utf-8'))
request = self.app.get('http://localhost/api/download/'
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/test name.cleaned.jpg')
self.assertEqual(request.status_code, 400)
error = json.loads(request.data.decode('utf-8'))['message']
self.assertEqual(error, 'Insecure filename')
request = self.app.get('http://localhost/api/download/'
'81a541f9ebc0233d419d25ed39908b16f82be26a783f32d56c381559e84e6161/'
'wrong_file_name.jpg')
self.assertEqual(request.status_code, 404)
error = json.loads(request.data.decode('utf-8'))['message']
self.assertEqual(error, 'File not found')
request = self.app.get('http://localhost/api/download/81a541f9e/test_name.cleaned.jpg')
self.assertEqual(request.status_code, 400)
error = json.loads(request.data.decode('utf-8'))['message']
self.assertEqual(error, 'The file hash does not match')
request = self.app.get(data['download_link'])
self.assertEqual(request.status_code, 200)
if __name__ == '__main__':
unittest.main()

22
utils.py Normal file
View file

@ -0,0 +1,22 @@
import os
import hashlib
def get_allow_origin_header_value():
return os.environ.get('MAT2_ALLOW_ORIGIN_WHITELIST', '*').split(" ")
def hash_file(filepath: str) -> str:
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f:
while True:
data = f.read(65536) # read the file by chunk of 64k
if not data:
break
sha256.update(data)
return sha256.hexdigest()
def check_upload_folder(upload_folder):
if not os.path.exists(upload_folder):
os.mkdir(upload_folder)