mirror of
https://github.com/onionshare/onionshare.git
synced 2024-12-26 07:49:48 -05:00
594 lines
22 KiB
Python
594 lines
22 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
"""
|
||
|
flask.testsuite.helpers
|
||
|
~~~~~~~~~~~~~~~~~~~~~~~
|
||
|
|
||
|
Various helpers.
|
||
|
|
||
|
:copyright: (c) 2011 by Armin Ronacher.
|
||
|
:license: BSD, see LICENSE for more details.
|
||
|
"""
|
||
|
|
||
|
import os
|
||
|
import flask
|
||
|
import unittest
|
||
|
from logging import StreamHandler
|
||
|
from flask.testsuite import FlaskTestCase, catch_warnings, catch_stderr
|
||
|
from werkzeug.http import parse_cache_control_header, parse_options_header
|
||
|
from flask._compat import StringIO, text_type
|
||
|
|
||
|
|
||
|
def has_encoding(name):
|
||
|
try:
|
||
|
import codecs
|
||
|
codecs.lookup(name)
|
||
|
return True
|
||
|
except LookupError:
|
||
|
return False
|
||
|
|
||
|
|
||
|
class JSONTestCase(FlaskTestCase):
|
||
|
|
||
|
def test_json_bad_requests(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
@app.route('/json', methods=['POST'])
|
||
|
def return_json():
|
||
|
return flask.jsonify(foo=text_type(flask.request.get_json()))
|
||
|
c = app.test_client()
|
||
|
rv = c.post('/json', data='malformed', content_type='application/json')
|
||
|
self.assert_equal(rv.status_code, 400)
|
||
|
|
||
|
def test_json_body_encoding(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.testing = True
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
return flask.request.get_json()
|
||
|
|
||
|
c = app.test_client()
|
||
|
resp = c.get('/', data=u'"Hällo Wörld"'.encode('iso-8859-15'),
|
||
|
content_type='application/json; charset=iso-8859-15')
|
||
|
self.assert_equal(resp.data, u'Hällo Wörld'.encode('utf-8'))
|
||
|
|
||
|
def test_jsonify(self):
|
||
|
d = dict(a=23, b=42, c=[1, 2, 3])
|
||
|
app = flask.Flask(__name__)
|
||
|
@app.route('/kw')
|
||
|
def return_kwargs():
|
||
|
return flask.jsonify(**d)
|
||
|
@app.route('/dict')
|
||
|
def return_dict():
|
||
|
return flask.jsonify(d)
|
||
|
c = app.test_client()
|
||
|
for url in '/kw', '/dict':
|
||
|
rv = c.get(url)
|
||
|
self.assert_equal(rv.mimetype, 'application/json')
|
||
|
self.assert_equal(flask.json.loads(rv.data), d)
|
||
|
|
||
|
def test_json_as_unicode(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
|
||
|
app.config['JSON_AS_ASCII'] = True
|
||
|
with app.app_context():
|
||
|
rv = flask.json.dumps(u'\N{SNOWMAN}')
|
||
|
self.assert_equal(rv, '"\\u2603"')
|
||
|
|
||
|
app.config['JSON_AS_ASCII'] = False
|
||
|
with app.app_context():
|
||
|
rv = flask.json.dumps(u'\N{SNOWMAN}')
|
||
|
self.assert_equal(rv, u'"\u2603"')
|
||
|
|
||
|
def test_json_attr(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
@app.route('/add', methods=['POST'])
|
||
|
def add():
|
||
|
json = flask.request.get_json()
|
||
|
return text_type(json['a'] + json['b'])
|
||
|
c = app.test_client()
|
||
|
rv = c.post('/add', data=flask.json.dumps({'a': 1, 'b': 2}),
|
||
|
content_type='application/json')
|
||
|
self.assert_equal(rv.data, b'3')
|
||
|
|
||
|
def test_template_escaping(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
render = flask.render_template_string
|
||
|
with app.test_request_context():
|
||
|
rv = flask.json.htmlsafe_dumps('</script>')
|
||
|
self.assert_equal(rv, u'"\\u003c/script\\u003e"')
|
||
|
self.assert_equal(type(rv), text_type)
|
||
|
rv = render('{{ "</script>"|tojson }}')
|
||
|
self.assert_equal(rv, '"\\u003c/script\\u003e"')
|
||
|
rv = render('{{ "<\0/script>"|tojson }}')
|
||
|
self.assert_equal(rv, '"\\u003c\\u0000/script\\u003e"')
|
||
|
rv = render('{{ "<!--<script>"|tojson }}')
|
||
|
self.assert_equal(rv, '"\\u003c!--\\u003cscript\\u003e"')
|
||
|
rv = render('{{ "&"|tojson }}')
|
||
|
self.assert_equal(rv, '"\\u0026"')
|
||
|
rv = render('{{ "\'"|tojson }}')
|
||
|
self.assert_equal(rv, '"\\u0027"')
|
||
|
rv = render("<a ng-data='{{ data|tojson }}'></a>",
|
||
|
data={'x': ["foo", "bar", "baz'"]})
|
||
|
self.assert_equal(rv,
|
||
|
'<a ng-data=\'{"x": ["foo", "bar", "baz\\u0027"]}\'></a>')
|
||
|
|
||
|
def test_json_customization(self):
|
||
|
class X(object):
|
||
|
def __init__(self, val):
|
||
|
self.val = val
|
||
|
class MyEncoder(flask.json.JSONEncoder):
|
||
|
def default(self, o):
|
||
|
if isinstance(o, X):
|
||
|
return '<%d>' % o.val
|
||
|
return flask.json.JSONEncoder.default(self, o)
|
||
|
class MyDecoder(flask.json.JSONDecoder):
|
||
|
def __init__(self, *args, **kwargs):
|
||
|
kwargs.setdefault('object_hook', self.object_hook)
|
||
|
flask.json.JSONDecoder.__init__(self, *args, **kwargs)
|
||
|
def object_hook(self, obj):
|
||
|
if len(obj) == 1 and '_foo' in obj:
|
||
|
return X(obj['_foo'])
|
||
|
return obj
|
||
|
app = flask.Flask(__name__)
|
||
|
app.testing = True
|
||
|
app.json_encoder = MyEncoder
|
||
|
app.json_decoder = MyDecoder
|
||
|
@app.route('/', methods=['POST'])
|
||
|
def index():
|
||
|
return flask.json.dumps(flask.request.get_json()['x'])
|
||
|
c = app.test_client()
|
||
|
rv = c.post('/', data=flask.json.dumps({
|
||
|
'x': {'_foo': 42}
|
||
|
}), content_type='application/json')
|
||
|
self.assertEqual(rv.data, b'"<42>"')
|
||
|
|
||
|
def test_modified_url_encoding(self):
|
||
|
class ModifiedRequest(flask.Request):
|
||
|
url_charset = 'euc-kr'
|
||
|
app = flask.Flask(__name__)
|
||
|
app.testing = True
|
||
|
app.request_class = ModifiedRequest
|
||
|
app.url_map.charset = 'euc-kr'
|
||
|
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
return flask.request.args['foo']
|
||
|
|
||
|
rv = app.test_client().get(u'/?foo=정상처리'.encode('euc-kr'))
|
||
|
self.assert_equal(rv.status_code, 200)
|
||
|
self.assert_equal(rv.data, u'정상처리'.encode('utf-8'))
|
||
|
|
||
|
if not has_encoding('euc-kr'):
|
||
|
test_modified_url_encoding = None
|
||
|
|
||
|
def test_json_key_sorting(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.testing = True
|
||
|
self.assert_equal(app.config['JSON_SORT_KEYS'], True)
|
||
|
d = dict.fromkeys(range(20), 'foo')
|
||
|
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
return flask.jsonify(values=d)
|
||
|
|
||
|
c = app.test_client()
|
||
|
rv = c.get('/')
|
||
|
lines = [x.strip() for x in rv.data.strip().decode('utf-8').splitlines()]
|
||
|
self.assert_equal(lines, [
|
||
|
'{',
|
||
|
'"values": {',
|
||
|
'"0": "foo",',
|
||
|
'"1": "foo",',
|
||
|
'"2": "foo",',
|
||
|
'"3": "foo",',
|
||
|
'"4": "foo",',
|
||
|
'"5": "foo",',
|
||
|
'"6": "foo",',
|
||
|
'"7": "foo",',
|
||
|
'"8": "foo",',
|
||
|
'"9": "foo",',
|
||
|
'"10": "foo",',
|
||
|
'"11": "foo",',
|
||
|
'"12": "foo",',
|
||
|
'"13": "foo",',
|
||
|
'"14": "foo",',
|
||
|
'"15": "foo",',
|
||
|
'"16": "foo",',
|
||
|
'"17": "foo",',
|
||
|
'"18": "foo",',
|
||
|
'"19": "foo"',
|
||
|
'}',
|
||
|
'}'
|
||
|
])
|
||
|
|
||
|
|
||
|
class SendfileTestCase(FlaskTestCase):
|
||
|
|
||
|
def test_send_file_regular(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
with app.test_request_context():
|
||
|
rv = flask.send_file('static/index.html')
|
||
|
self.assert_true(rv.direct_passthrough)
|
||
|
self.assert_equal(rv.mimetype, 'text/html')
|
||
|
with app.open_resource('static/index.html') as f:
|
||
|
rv.direct_passthrough = False
|
||
|
self.assert_equal(rv.data, f.read())
|
||
|
rv.close()
|
||
|
|
||
|
def test_send_file_xsendfile(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.use_x_sendfile = True
|
||
|
with app.test_request_context():
|
||
|
rv = flask.send_file('static/index.html')
|
||
|
self.assert_true(rv.direct_passthrough)
|
||
|
self.assert_in('x-sendfile', rv.headers)
|
||
|
self.assert_equal(rv.headers['x-sendfile'],
|
||
|
os.path.join(app.root_path, 'static/index.html'))
|
||
|
self.assert_equal(rv.mimetype, 'text/html')
|
||
|
rv.close()
|
||
|
|
||
|
def test_send_file_object(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
with catch_warnings() as captured:
|
||
|
with app.test_request_context():
|
||
|
f = open(os.path.join(app.root_path, 'static/index.html'))
|
||
|
rv = flask.send_file(f)
|
||
|
rv.direct_passthrough = False
|
||
|
with app.open_resource('static/index.html') as f:
|
||
|
self.assert_equal(rv.data, f.read())
|
||
|
self.assert_equal(rv.mimetype, 'text/html')
|
||
|
rv.close()
|
||
|
# mimetypes + etag
|
||
|
self.assert_equal(len(captured), 2)
|
||
|
|
||
|
app.use_x_sendfile = True
|
||
|
with catch_warnings() as captured:
|
||
|
with app.test_request_context():
|
||
|
f = open(os.path.join(app.root_path, 'static/index.html'))
|
||
|
rv = flask.send_file(f)
|
||
|
self.assert_equal(rv.mimetype, 'text/html')
|
||
|
self.assert_in('x-sendfile', rv.headers)
|
||
|
self.assert_equal(rv.headers['x-sendfile'],
|
||
|
os.path.join(app.root_path, 'static/index.html'))
|
||
|
rv.close()
|
||
|
# mimetypes + etag
|
||
|
self.assert_equal(len(captured), 2)
|
||
|
|
||
|
app.use_x_sendfile = False
|
||
|
with app.test_request_context():
|
||
|
with catch_warnings() as captured:
|
||
|
f = StringIO('Test')
|
||
|
rv = flask.send_file(f)
|
||
|
rv.direct_passthrough = False
|
||
|
self.assert_equal(rv.data, b'Test')
|
||
|
self.assert_equal(rv.mimetype, 'application/octet-stream')
|
||
|
rv.close()
|
||
|
# etags
|
||
|
self.assert_equal(len(captured), 1)
|
||
|
with catch_warnings() as captured:
|
||
|
f = StringIO('Test')
|
||
|
rv = flask.send_file(f, mimetype='text/plain')
|
||
|
rv.direct_passthrough = False
|
||
|
self.assert_equal(rv.data, b'Test')
|
||
|
self.assert_equal(rv.mimetype, 'text/plain')
|
||
|
rv.close()
|
||
|
# etags
|
||
|
self.assert_equal(len(captured), 1)
|
||
|
|
||
|
app.use_x_sendfile = True
|
||
|
with catch_warnings() as captured:
|
||
|
with app.test_request_context():
|
||
|
f = StringIO('Test')
|
||
|
rv = flask.send_file(f)
|
||
|
self.assert_not_in('x-sendfile', rv.headers)
|
||
|
rv.close()
|
||
|
# etags
|
||
|
self.assert_equal(len(captured), 1)
|
||
|
|
||
|
def test_attachment(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
with catch_warnings() as captured:
|
||
|
with app.test_request_context():
|
||
|
f = open(os.path.join(app.root_path, 'static/index.html'))
|
||
|
rv = flask.send_file(f, as_attachment=True)
|
||
|
value, options = parse_options_header(rv.headers['Content-Disposition'])
|
||
|
self.assert_equal(value, 'attachment')
|
||
|
rv.close()
|
||
|
# mimetypes + etag
|
||
|
self.assert_equal(len(captured), 2)
|
||
|
|
||
|
with app.test_request_context():
|
||
|
self.assert_equal(options['filename'], 'index.html')
|
||
|
rv = flask.send_file('static/index.html', as_attachment=True)
|
||
|
value, options = parse_options_header(rv.headers['Content-Disposition'])
|
||
|
self.assert_equal(value, 'attachment')
|
||
|
self.assert_equal(options['filename'], 'index.html')
|
||
|
rv.close()
|
||
|
|
||
|
with app.test_request_context():
|
||
|
rv = flask.send_file(StringIO('Test'), as_attachment=True,
|
||
|
attachment_filename='index.txt',
|
||
|
add_etags=False)
|
||
|
self.assert_equal(rv.mimetype, 'text/plain')
|
||
|
value, options = parse_options_header(rv.headers['Content-Disposition'])
|
||
|
self.assert_equal(value, 'attachment')
|
||
|
self.assert_equal(options['filename'], 'index.txt')
|
||
|
rv.close()
|
||
|
|
||
|
def test_static_file(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
# default cache timeout is 12 hours
|
||
|
with app.test_request_context():
|
||
|
# Test with static file handler.
|
||
|
rv = app.send_static_file('index.html')
|
||
|
cc = parse_cache_control_header(rv.headers['Cache-Control'])
|
||
|
self.assert_equal(cc.max_age, 12 * 60 * 60)
|
||
|
rv.close()
|
||
|
# Test again with direct use of send_file utility.
|
||
|
rv = flask.send_file('static/index.html')
|
||
|
cc = parse_cache_control_header(rv.headers['Cache-Control'])
|
||
|
self.assert_equal(cc.max_age, 12 * 60 * 60)
|
||
|
rv.close()
|
||
|
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 3600
|
||
|
with app.test_request_context():
|
||
|
# Test with static file handler.
|
||
|
rv = app.send_static_file('index.html')
|
||
|
cc = parse_cache_control_header(rv.headers['Cache-Control'])
|
||
|
self.assert_equal(cc.max_age, 3600)
|
||
|
rv.close()
|
||
|
# Test again with direct use of send_file utility.
|
||
|
rv = flask.send_file('static/index.html')
|
||
|
cc = parse_cache_control_header(rv.headers['Cache-Control'])
|
||
|
self.assert_equal(cc.max_age, 3600)
|
||
|
rv.close()
|
||
|
class StaticFileApp(flask.Flask):
|
||
|
def get_send_file_max_age(self, filename):
|
||
|
return 10
|
||
|
app = StaticFileApp(__name__)
|
||
|
with app.test_request_context():
|
||
|
# Test with static file handler.
|
||
|
rv = app.send_static_file('index.html')
|
||
|
cc = parse_cache_control_header(rv.headers['Cache-Control'])
|
||
|
self.assert_equal(cc.max_age, 10)
|
||
|
rv.close()
|
||
|
# Test again with direct use of send_file utility.
|
||
|
rv = flask.send_file('static/index.html')
|
||
|
cc = parse_cache_control_header(rv.headers['Cache-Control'])
|
||
|
self.assert_equal(cc.max_age, 10)
|
||
|
rv.close()
|
||
|
|
||
|
|
||
|
class LoggingTestCase(FlaskTestCase):
|
||
|
|
||
|
def test_logger_cache(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
logger1 = app.logger
|
||
|
self.assert_true(app.logger is logger1)
|
||
|
self.assert_equal(logger1.name, __name__)
|
||
|
app.logger_name = __name__ + '/test_logger_cache'
|
||
|
self.assert_true(app.logger is not logger1)
|
||
|
|
||
|
def test_debug_log(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.debug = True
|
||
|
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
app.logger.warning('the standard library is dead')
|
||
|
app.logger.debug('this is a debug statement')
|
||
|
return ''
|
||
|
|
||
|
@app.route('/exc')
|
||
|
def exc():
|
||
|
1 // 0
|
||
|
|
||
|
with app.test_client() as c:
|
||
|
with catch_stderr() as err:
|
||
|
c.get('/')
|
||
|
out = err.getvalue()
|
||
|
self.assert_in('WARNING in helpers [', out)
|
||
|
self.assert_in(os.path.basename(__file__.rsplit('.', 1)[0] + '.py'), out)
|
||
|
self.assert_in('the standard library is dead', out)
|
||
|
self.assert_in('this is a debug statement', out)
|
||
|
|
||
|
with catch_stderr() as err:
|
||
|
try:
|
||
|
c.get('/exc')
|
||
|
except ZeroDivisionError:
|
||
|
pass
|
||
|
else:
|
||
|
self.assert_true(False, 'debug log ate the exception')
|
||
|
|
||
|
def test_debug_log_override(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.debug = True
|
||
|
app.logger_name = 'flask_tests/test_debug_log_override'
|
||
|
app.logger.level = 10
|
||
|
self.assert_equal(app.logger.level, 10)
|
||
|
|
||
|
def test_exception_logging(self):
|
||
|
out = StringIO()
|
||
|
app = flask.Flask(__name__)
|
||
|
app.logger_name = 'flask_tests/test_exception_logging'
|
||
|
app.logger.addHandler(StreamHandler(out))
|
||
|
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
1 // 0
|
||
|
|
||
|
rv = app.test_client().get('/')
|
||
|
self.assert_equal(rv.status_code, 500)
|
||
|
self.assert_in(b'Internal Server Error', rv.data)
|
||
|
|
||
|
err = out.getvalue()
|
||
|
self.assert_in('Exception on / [GET]', err)
|
||
|
self.assert_in('Traceback (most recent call last):', err)
|
||
|
self.assert_in('1 // 0', err)
|
||
|
self.assert_in('ZeroDivisionError:', err)
|
||
|
|
||
|
def test_processor_exceptions(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
@app.before_request
|
||
|
def before_request():
|
||
|
if trigger == 'before':
|
||
|
1 // 0
|
||
|
@app.after_request
|
||
|
def after_request(response):
|
||
|
if trigger == 'after':
|
||
|
1 // 0
|
||
|
return response
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
return 'Foo'
|
||
|
@app.errorhandler(500)
|
||
|
def internal_server_error(e):
|
||
|
return 'Hello Server Error', 500
|
||
|
for trigger in 'before', 'after':
|
||
|
rv = app.test_client().get('/')
|
||
|
self.assert_equal(rv.status_code, 500)
|
||
|
self.assert_equal(rv.data, b'Hello Server Error')
|
||
|
|
||
|
def test_url_for_with_anchor(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
return '42'
|
||
|
with app.test_request_context():
|
||
|
self.assert_equal(flask.url_for('index', _anchor='x y'),
|
||
|
'/#x%20y')
|
||
|
|
||
|
def test_url_for_with_scheme(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
return '42'
|
||
|
with app.test_request_context():
|
||
|
self.assert_equal(flask.url_for('index',
|
||
|
_external=True,
|
||
|
_scheme='https'),
|
||
|
'https://localhost/')
|
||
|
|
||
|
def test_url_for_with_scheme_not_external(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
return '42'
|
||
|
with app.test_request_context():
|
||
|
self.assert_raises(ValueError,
|
||
|
flask.url_for,
|
||
|
'index',
|
||
|
_scheme='https')
|
||
|
|
||
|
def test_url_with_method(self):
|
||
|
from flask.views import MethodView
|
||
|
app = flask.Flask(__name__)
|
||
|
class MyView(MethodView):
|
||
|
def get(self, id=None):
|
||
|
if id is None:
|
||
|
return 'List'
|
||
|
return 'Get %d' % id
|
||
|
def post(self):
|
||
|
return 'Create'
|
||
|
myview = MyView.as_view('myview')
|
||
|
app.add_url_rule('/myview/', methods=['GET'],
|
||
|
view_func=myview)
|
||
|
app.add_url_rule('/myview/<int:id>', methods=['GET'],
|
||
|
view_func=myview)
|
||
|
app.add_url_rule('/myview/create', methods=['POST'],
|
||
|
view_func=myview)
|
||
|
|
||
|
with app.test_request_context():
|
||
|
self.assert_equal(flask.url_for('myview', _method='GET'),
|
||
|
'/myview/')
|
||
|
self.assert_equal(flask.url_for('myview', id=42, _method='GET'),
|
||
|
'/myview/42')
|
||
|
self.assert_equal(flask.url_for('myview', _method='POST'),
|
||
|
'/myview/create')
|
||
|
|
||
|
|
||
|
class NoImportsTestCase(FlaskTestCase):
|
||
|
"""Test Flasks are created without import.
|
||
|
|
||
|
Avoiding ``__import__`` helps create Flask instances where there are errors
|
||
|
at import time. Those runtime errors will be apparent to the user soon
|
||
|
enough, but tools which build Flask instances meta-programmatically benefit
|
||
|
from a Flask which does not ``__import__``. Instead of importing to
|
||
|
retrieve file paths or metadata on a module or package, use the pkgutil and
|
||
|
imp modules in the Python standard library.
|
||
|
"""
|
||
|
|
||
|
def test_name_with_import_error(self):
|
||
|
try:
|
||
|
flask.Flask('importerror')
|
||
|
except NotImplementedError:
|
||
|
self.fail('Flask(import_name) is importing import_name.')
|
||
|
|
||
|
|
||
|
class StreamingTestCase(FlaskTestCase):
|
||
|
|
||
|
def test_streaming_with_context(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.testing = True
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
def generate():
|
||
|
yield 'Hello '
|
||
|
yield flask.request.args['name']
|
||
|
yield '!'
|
||
|
return flask.Response(flask.stream_with_context(generate()))
|
||
|
c = app.test_client()
|
||
|
rv = c.get('/?name=World')
|
||
|
self.assertEqual(rv.data, b'Hello World!')
|
||
|
|
||
|
def test_streaming_with_context_as_decorator(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.testing = True
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
@flask.stream_with_context
|
||
|
def generate():
|
||
|
yield 'Hello '
|
||
|
yield flask.request.args['name']
|
||
|
yield '!'
|
||
|
return flask.Response(generate())
|
||
|
c = app.test_client()
|
||
|
rv = c.get('/?name=World')
|
||
|
self.assertEqual(rv.data, b'Hello World!')
|
||
|
|
||
|
def test_streaming_with_context_and_custom_close(self):
|
||
|
app = flask.Flask(__name__)
|
||
|
app.testing = True
|
||
|
called = []
|
||
|
class Wrapper(object):
|
||
|
def __init__(self, gen):
|
||
|
self._gen = gen
|
||
|
def __iter__(self):
|
||
|
return self
|
||
|
def close(self):
|
||
|
called.append(42)
|
||
|
def __next__(self):
|
||
|
return next(self._gen)
|
||
|
next = __next__
|
||
|
@app.route('/')
|
||
|
def index():
|
||
|
def generate():
|
||
|
yield 'Hello '
|
||
|
yield flask.request.args['name']
|
||
|
yield '!'
|
||
|
return flask.Response(flask.stream_with_context(
|
||
|
Wrapper(generate())))
|
||
|
c = app.test_client()
|
||
|
rv = c.get('/?name=World')
|
||
|
self.assertEqual(rv.data, b'Hello World!')
|
||
|
self.assertEqual(called, [42])
|
||
|
|
||
|
|
||
|
def suite():
|
||
|
suite = unittest.TestSuite()
|
||
|
if flask.json_available:
|
||
|
suite.addTest(unittest.makeSuite(JSONTestCase))
|
||
|
suite.addTest(unittest.makeSuite(SendfileTestCase))
|
||
|
suite.addTest(unittest.makeSuite(LoggingTestCase))
|
||
|
suite.addTest(unittest.makeSuite(NoImportsTestCase))
|
||
|
suite.addTest(unittest.makeSuite(StreamingTestCase))
|
||
|
return suite
|