forked-synapse/tests/api/test_ratelimiting.py

40 lines
1.2 KiB
Python
Raw Normal View History

2014-09-02 10:06:20 -04:00
from synapse.api.ratelimiting import Ratelimiter
from tests import unittest
2014-09-02 10:06:20 -04:00
2016-02-19 10:34:38 -05:00
2014-09-02 10:06:20 -04:00
class TestRatelimiter(unittest.TestCase):
def test_allowed(self):
limiter = Ratelimiter()
allowed, time_allowed = limiter.can_do_action(
key="test_id", time_now_s=0, rate_hz=0.1, burst_count=1
2014-09-02 10:06:20 -04:00
)
self.assertTrue(allowed)
self.assertEquals(10., time_allowed)
allowed, time_allowed = limiter.can_do_action(
key="test_id", time_now_s=5, rate_hz=0.1, burst_count=1
2014-09-02 10:06:20 -04:00
)
self.assertFalse(allowed)
self.assertEquals(10., time_allowed)
allowed, time_allowed = limiter.can_do_action(
key="test_id", time_now_s=10, rate_hz=0.1, burst_count=1
2014-09-02 10:06:20 -04:00
)
self.assertTrue(allowed)
self.assertEquals(20., time_allowed)
def test_pruning(self):
limiter = Ratelimiter()
allowed, time_allowed = limiter.can_do_action(
key="test_id_1", time_now_s=0, rate_hz=0.1, burst_count=1
2014-09-02 10:06:20 -04:00
)
self.assertIn("test_id_1", limiter.message_counts)
allowed, time_allowed = limiter.can_do_action(
key="test_id_2", time_now_s=10, rate_hz=0.1, burst_count=1
2014-09-02 10:06:20 -04:00
)
self.assertNotIn("test_id_1", limiter.message_counts)