mirror of
https://github.com/onionshare/onionshare.git
synced 2025-01-24 05:31:27 -05:00
258 lines
7.0 KiB
Python
258 lines
7.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
werkzeug.testsuite.cache
|
|
~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Tests the cache system
|
|
|
|
:copyright: (c) 2013 by Armin Ronacher.
|
|
:license: BSD, see LICENSE for more details.
|
|
"""
|
|
import os
|
|
import time
|
|
import unittest
|
|
import tempfile
|
|
import shutil
|
|
|
|
from werkzeug.testsuite import WerkzeugTestCase
|
|
from werkzeug.contrib import cache
|
|
|
|
try:
|
|
import redis
|
|
try:
|
|
from redis.exceptions import ConnectionError as RedisConnectionError
|
|
cache.RedisCache(key_prefix='werkzeug-test-case:')._client.set('test','connection')
|
|
except RedisConnectionError:
|
|
redis = None
|
|
except ImportError:
|
|
redis = None
|
|
try:
|
|
import pylibmc as memcache
|
|
except ImportError:
|
|
try:
|
|
from google.appengine.api import memcache
|
|
except ImportError:
|
|
try:
|
|
import memcache
|
|
except ImportError:
|
|
memcache = None
|
|
|
|
|
|
class SimpleCacheTestCase(WerkzeugTestCase):
|
|
|
|
def test_get_dict(self):
|
|
c = cache.SimpleCache()
|
|
c.set('a', 'a')
|
|
c.set('b', 'b')
|
|
d = c.get_dict('a', 'b')
|
|
assert 'a' in d
|
|
assert 'a' == d['a']
|
|
assert 'b' in d
|
|
assert 'b' == d['b']
|
|
|
|
def test_set_many(self):
|
|
c = cache.SimpleCache()
|
|
c.set_many({0: 0, 1: 1, 2: 4})
|
|
assert c.get(2) == 4
|
|
c.set_many((i, i*i) for i in range(3))
|
|
assert c.get(2) == 4
|
|
|
|
|
|
class FileSystemCacheTestCase(WerkzeugTestCase):
|
|
|
|
def test_set_get(self):
|
|
tmp_dir = tempfile.mkdtemp()
|
|
try:
|
|
c = cache.FileSystemCache(cache_dir=tmp_dir)
|
|
for i in range(3):
|
|
c.set(str(i), i * i)
|
|
for i in range(3):
|
|
result = c.get(str(i))
|
|
assert result == i * i
|
|
finally:
|
|
shutil.rmtree(tmp_dir)
|
|
|
|
def test_filesystemcache_prune(self):
|
|
THRESHOLD = 13
|
|
tmp_dir = tempfile.mkdtemp()
|
|
c = cache.FileSystemCache(cache_dir=tmp_dir, threshold=THRESHOLD)
|
|
for i in range(2 * THRESHOLD):
|
|
c.set(str(i), i)
|
|
cache_files = os.listdir(tmp_dir)
|
|
shutil.rmtree(tmp_dir)
|
|
assert len(cache_files) <= THRESHOLD
|
|
|
|
|
|
def test_filesystemcache_clear(self):
|
|
tmp_dir = tempfile.mkdtemp()
|
|
c = cache.FileSystemCache(cache_dir=tmp_dir)
|
|
c.set('foo', 'bar')
|
|
cache_files = os.listdir(tmp_dir)
|
|
assert len(cache_files) == 1
|
|
c.clear()
|
|
cache_files = os.listdir(tmp_dir)
|
|
assert len(cache_files) == 0
|
|
shutil.rmtree(tmp_dir)
|
|
|
|
|
|
class RedisCacheTestCase(WerkzeugTestCase):
|
|
|
|
def make_cache(self):
|
|
return cache.RedisCache(key_prefix='werkzeug-test-case:')
|
|
|
|
def teardown(self):
|
|
self.make_cache().clear()
|
|
|
|
def test_compat(self):
|
|
c = self.make_cache()
|
|
c._client.set(c.key_prefix + 'foo', b'Awesome')
|
|
self.assert_equal(c.get('foo'), b'Awesome')
|
|
c._client.set(c.key_prefix + 'foo', b'42')
|
|
self.assert_equal(c.get('foo'), 42)
|
|
|
|
def test_get_set(self):
|
|
c = self.make_cache()
|
|
c.set('foo', ['bar'])
|
|
assert c.get('foo') == ['bar']
|
|
|
|
def test_get_many(self):
|
|
c = self.make_cache()
|
|
c.set('foo', ['bar'])
|
|
c.set('spam', 'eggs')
|
|
assert c.get_many('foo', 'spam') == [['bar'], 'eggs']
|
|
|
|
def test_set_many(self):
|
|
c = self.make_cache()
|
|
c.set_many({'foo': 'bar', 'spam': ['eggs']})
|
|
assert c.get('foo') == 'bar'
|
|
assert c.get('spam') == ['eggs']
|
|
|
|
def test_expire(self):
|
|
c = self.make_cache()
|
|
c.set('foo', 'bar', 1)
|
|
time.sleep(2)
|
|
assert c.get('foo') is None
|
|
|
|
def test_add(self):
|
|
c = self.make_cache()
|
|
# sanity check that add() works like set()
|
|
c.add('foo', 'bar')
|
|
assert c.get('foo') == 'bar'
|
|
c.add('foo', 'qux')
|
|
assert c.get('foo') == 'bar'
|
|
|
|
def test_delete(self):
|
|
c = self.make_cache()
|
|
c.add('foo', 'bar')
|
|
assert c.get('foo') == 'bar'
|
|
c.delete('foo')
|
|
assert c.get('foo') is None
|
|
|
|
def test_delete_many(self):
|
|
c = self.make_cache()
|
|
c.add('foo', 'bar')
|
|
c.add('spam', 'eggs')
|
|
c.delete_many('foo', 'spam')
|
|
assert c.get('foo') is None
|
|
assert c.get('spam') is None
|
|
|
|
def test_inc_dec(self):
|
|
c = self.make_cache()
|
|
c.set('foo', 1)
|
|
self.assert_equal(c.inc('foo'), 2)
|
|
self.assert_equal(c.dec('foo'), 1)
|
|
c.delete('foo')
|
|
|
|
def test_true_false(self):
|
|
c = self.make_cache()
|
|
c.set('foo', True)
|
|
assert c.get('foo') == True
|
|
c.set('bar', False)
|
|
assert c.get('bar') == False
|
|
|
|
|
|
class MemcachedCacheTestCase(WerkzeugTestCase):
|
|
|
|
def make_cache(self):
|
|
return cache.MemcachedCache(key_prefix='werkzeug-test-case:')
|
|
|
|
def teardown(self):
|
|
self.make_cache().clear()
|
|
|
|
def test_compat(self):
|
|
c = self.make_cache()
|
|
c._client.set(c.key_prefix + b'foo', 'bar')
|
|
self.assert_equal(c.get('foo'), 'bar')
|
|
|
|
def test_get_set(self):
|
|
c = self.make_cache()
|
|
c.set('foo', 'bar')
|
|
self.assert_equal(c.get('foo'), 'bar')
|
|
|
|
def test_get_many(self):
|
|
c = self.make_cache()
|
|
c.set('foo', 'bar')
|
|
c.set('spam', 'eggs')
|
|
self.assert_equal(c.get_many('foo', 'spam'), ['bar', 'eggs'])
|
|
|
|
def test_set_many(self):
|
|
c = self.make_cache()
|
|
c.set_many({'foo': 'bar', 'spam': 'eggs'})
|
|
self.assert_equal(c.get('foo'), 'bar')
|
|
self.assert_equal(c.get('spam'), 'eggs')
|
|
|
|
def test_expire(self):
|
|
c = self.make_cache()
|
|
c.set('foo', 'bar', 1)
|
|
time.sleep(2)
|
|
self.assert_is_none(c.get('foo'))
|
|
|
|
def test_add(self):
|
|
c = self.make_cache()
|
|
c.add('foo', 'bar')
|
|
self.assert_equal(c.get('foo'), 'bar')
|
|
c.add('foo', 'baz')
|
|
self.assert_equal(c.get('foo'), 'bar')
|
|
|
|
def test_delete(self):
|
|
c = self.make_cache()
|
|
c.add('foo', 'bar')
|
|
self.assert_equal(c.get('foo'), 'bar')
|
|
c.delete('foo')
|
|
self.assert_is_none(c.get('foo'))
|
|
|
|
def test_delete_many(self):
|
|
c = self.make_cache()
|
|
c.add('foo', 'bar')
|
|
c.add('spam', 'eggs')
|
|
c.delete_many('foo', 'spam')
|
|
self.assert_is_none(c.get('foo'))
|
|
self.assert_is_none(c.get('spam'))
|
|
|
|
def test_inc_dec(self):
|
|
c = self.make_cache()
|
|
c.set('foo', 1)
|
|
# XXX: Is this an intended difference?
|
|
c.inc('foo')
|
|
self.assert_equal(c.get('foo'), 2)
|
|
c.dec('foo')
|
|
self.assert_equal(c.get('foo'), 1)
|
|
|
|
def test_true_false(self):
|
|
c = self.make_cache()
|
|
c.set('foo', True)
|
|
self.assert_equal(c.get('foo'), True)
|
|
c.set('bar', False)
|
|
self.assert_equal(c.get('bar'), False)
|
|
|
|
|
|
def suite():
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.makeSuite(SimpleCacheTestCase))
|
|
suite.addTest(unittest.makeSuite(FileSystemCacheTestCase))
|
|
if redis is not None:
|
|
suite.addTest(unittest.makeSuite(RedisCacheTestCase))
|
|
if memcache is not None:
|
|
suite.addTest(unittest.makeSuite(MemcachedCacheTestCase))
|
|
return suite
|