More work on pushers. Attempt to do HTTP pokes. Not sure if the actual HTTP pokes work or not yet but the retry semantics are pretty good.

This commit is contained in:
David Baker 2014-11-21 12:21:00 +00:00
parent 58f82e2e54
commit eb6aedf92c
7 changed files with 150 additions and 20 deletions

View File

@ -60,6 +60,25 @@ class SimpleHttpClient(object):
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def post_json_get_json(self, uri, post_json):
json_str = json.dumps(post_json)
logger.info("HTTP POST %s -> %s", json_str, uri)
response = yield self.agent.request(
"POST",
uri.encode("ascii"),
headers=Headers({
"Content-Type": ["application/json"]
}),
bodyProducer=FileBodyProducer(StringIO(json_str))
)
body = yield readBody(response)
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def get_json(self, uri, args={}):
""" Get's some json from the given host and path

View File

@ -26,12 +26,15 @@ logger = logging.getLogger(__name__)
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 10 * 60 * 1000
MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
last_token, last_success, failing_since):
self.hs = _hs
self.evStreamHandler = self.hs.get_handlers().event_stream_handler
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.user_name = user_name
self.app = app
self.app_display_name = app_display_name
@ -40,6 +43,7 @@ class Pusher(object):
self.data = data
self.last_token = last_token
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.failing_since = None
@defer.inlineCallbacks
def start(self):
@ -58,17 +62,51 @@ class Pusher(object):
config = PaginationConfig(from_token=from_tok, limit='1')
chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
if (self.dispatchPush(chunk['chunk'][0])):
# limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event
singleEvent = None
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
singleEvent = c
break
if not singleEvent:
continue
ret = yield self.dispatchPush(singleEvent)
if (ret):
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey,
self.last_token, self.clock.time_msec())
if self.failing_since:
self.failing_since = None
self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
else:
logger.warn("Failed to dispatch push for user %s. Trying again in %dms",
self.user_name, self.backoff_delay)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
self.backoff_delay *=2
if self.backoff_delay > Pusher.MAX_BACKOFF:
self.backoff_delay = Pusher.MAX_BACKOFF
if not self.failing_since:
self.failing_since = self.clock.time_msec()
self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
# we really only give up so that if the URL gets fixed, we don't suddenly deliver a load
# of old notifications.
logger.warn("Giving up on a notification to user %s, pushkey %s",
self.user_name, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
self.failing_since = None
self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
else:
logger.warn("Failed to dispatch push for user %s (failing for %dms)."
"Trying again in %dms",
self.user_name,
self.clock.time_msec() - self.failing_since,
self.backoff_delay
)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
self.backoff_delay *=2
if self.backoff_delay > Pusher.MAX_BACKOFF:
self.backoff_delay = Pusher.MAX_BACKOFF
class PusherConfigException(Exception):

View File

@ -14,13 +14,17 @@
# limitations under the License.
from synapse.push import Pusher, PusherConfigException
from synapse.http.client import SimpleHttpClient
from twisted.internet import defer
import logging
logger = logging.getLogger(__name__)
class HttpPusher(Pusher):
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
last_token, last_success, failing_since):
super(HttpPusher, self).__init__(_hs,
user_name,
app,
@ -28,12 +32,55 @@ class HttpPusher(Pusher):
device_display_name,
pushkey,
data,
last_token)
last_token,
last_success,
failing_since)
if 'url' not in data:
raise PusherConfigException("'url' required in data for HTTP pusher")
self.url = data['url']
self.httpCli = SimpleHttpClient(self.hs)
self.data_minus_url = {}
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
def _build_notification_dict(self, event):
# we probably do not want to push for every presence update
# (we may want to be able to set up notifications when specific
# people sign in, but we'd want to only deliver the pertinent ones)
# Actually, presence events will not get this far now because we
# need to filter them out in the main Pusher code.
if 'event_id' not in event:
return None
return {
'notification': {
'transition' : 'new', # everything is new for now: we don't have read receipts
'id': event['event_id'],
'type': event['type'],
'from': event['user_id'],
# we may have to fetch this over federation and we can't trust it anyway: is it worth it?
#'fromDisplayName': 'Steve Stevington'
},
#'counts': { -- we don't mark messages as read yet so we have no way of knowing
# 'unread': 1,
# 'missedCalls': 2
# },
'devices': {
self.pushkey: {
'data' : self.data_minus_url
}
}
}
@defer.inlineCallbacks
def dispatchPush(self, event):
print event
return True
notificationDict = self._build_notification_dict(event)
if not notificationDict:
defer.returnValue(True)
try:
yield self.httpCli.post_json_get_json(self.url, notificationDict)
except:
logger.exception("Failed to push %s ", self.url)
defer.returnValue(False)
defer.returnValue(True)

View File

@ -45,7 +45,9 @@ class PusherPool:
"device_display_name": device_display_name,
"pushkey": pushkey,
"data": data,
"last_token": None
"last_token": None,
"last_success": None,
"failing_since": None
})
self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data)
@ -69,7 +71,9 @@ class PusherPool:
device_display_name=pusherdict['device_display_name'],
pushkey=pusherdict['pushkey'],
data=pusherdict['data'],
last_token=pusherdict['last_token']
last_token=pusherdict['last_token'],
last_success=pusherdict['last_success'],
failing_since=pusherdict['failing_since']
)
else:
raise PusherConfigException("Unknown pusher type '%s' for user %s" %

View File

@ -29,7 +29,8 @@ class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def get_all_pushers_after_id(self, min_id):
sql = (
"SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, last_token "
"SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, "
"last_token, last_success, failing_since "
"FROM pushers "
"WHERE id > ?"
)
@ -46,8 +47,9 @@ class PusherStore(SQLBaseStore):
"device_display_name": r[5],
"pushkey": r[6],
"data": r[7],
"last_token": r[8]
"last_token": r[8],
"last_success": r[9],
"failing_since": r[10]
}
for r in rows
]
@ -79,6 +81,20 @@ class PusherStore(SQLBaseStore):
{'last_token': last_token}
)
@defer.inlineCallbacks
def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success):
yield self._simple_update_one(PushersTable.table_name,
{'user_name': user_name, 'pushkey': pushkey},
{'last_token': last_token, 'last_success': last_success}
)
@defer.inlineCallbacks
def update_pusher_failing_since(self, user_name, pushkey, failing_since):
yield self._simple_update_one(PushersTable.table_name,
{'user_name': user_name, 'pushkey': pushkey},
{'failing_since': failing_since}
)
class PushersTable(Table):
table_name = "pushers"
@ -92,7 +108,9 @@ class PushersTable(Table):
"device_display_name",
"pushkey",
"data",
"last_token"
"last_token",
"last_success",
"failing_since"
]
EntryType = collections.namedtuple("PusherEntry", fields)

View File

@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers (
pushkey blob NOT NULL,
data text,
last_token TEXT,
last_success BIGINT,
failing_since BIGINT,
FOREIGN KEY(user_name) REFERENCES users(name),
UNIQUE (user_name, pushkey)
);

View File

@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers (
pushkey blob NOT NULL,
data text,
last_token TEXT,
last_success BIGINT,
failing_since BIGINT,
FOREIGN KEY(user_name) REFERENCES users(name),
UNIQUE (user_name, pushkey)
);