Event streaming now happens on an app level, rather than a per-room level. Make eventStreamService manage it's own repolling provided no one calls stop() on it. Couple the stream with eventHandlerService so any controller can just blithely call eventStreamService.resume() and expect to 'get stuff' without having to handle promises (though resume() still returns a promise for that request and proxies it through $q). Kill and reset the stream if you logout.

This commit is contained in:
Kegan Dougal 2014-08-15 13:43:07 +01:00
parent c51cf4efca
commit 7ddb7a5cbb
5 changed files with 76 additions and 19 deletions

View file

@ -19,19 +19,21 @@ limitations under the License.
/*
This service manages where in the event stream the web client currently is and
provides methods to resume/pause/stop the event stream. This service is not
responsible for parsing event data. For that, see the eventDataHandler.
responsible for parsing event data. For that, see the eventHandlerService.
*/
angular.module('eventStreamService', [])
.factory('eventStreamService', ['matrixService', function(matrixService) {
.factory('eventStreamService', ['$q', '$timeout', 'matrixService', 'eventHandlerService', function($q, $timeout, matrixService, eventHandlerService) {
var END = "END";
var START = "START";
var TIMEOUT_MS = 5000;
var ERR_TIMEOUT_MS = 5000;
var settings = {
from: "END",
to: undefined,
limit: undefined,
shouldPoll: true
shouldPoll: true,
isActive: false
};
// interrupts the stream. Only valid if there is a stream conneciton
@ -39,19 +41,69 @@ angular.module('eventStreamService', [])
var interrupt = function(shouldPoll) {
console.log("p[EventStream] interrupt("+shouldPoll+") "+
JSON.stringify(settings));
settings.shouldPoll = shouldPoll;
settings.isActive = false;
};
var saveStreamSettings = function() {
localStorage.setItem("streamSettings", JSON.stringify(settings));
};
var startEventStream = function() {
settings.shouldPoll = true;
settings.isActive = true;
var deferred = $q.defer();
// run the stream from the latest token
matrixService.getEventStream(settings.from, TIMEOUT_MS).then(
function(response) {
if (!settings.isActive) {
console.log("[EventStream] Got response but now inactive. Dropping data.");
return;
}
settings.from = response.data.end;
console.log("[EventStream] Got response from "+settings.from+" to "+response.data.end);
eventHandlerService.handleEvents(response.data.chunk, true);
deferred.resolve(response);
if (settings.shouldPoll) {
$timeout(startEventStream, 0);
}
else {
console.log("[EventStream] Stopping poll.");
}
},
function(error) {
if (error.status == 403) {
settings.shouldPoll = false;
}
deferred.reject(error);
if (settings.shouldPoll) {
$timeout(startEventStream, ERR_TIMEOUT_MS);
}
else {
console.log("[EventStream] Stopping polling.");
}
}
);
return deferred.promise;
};
return {
// resume the stream from whereever it last got up to. Typically used
// when the page is opened.
resume: function() {
if (settings.isActive) {
console.log("[EventStream] Already active, ignoring resume()");
return;
}
console.log("[EventStream] resume "+JSON.stringify(settings));
// run the stream from the latest token
return matrixService.getEventStream(settings.from, TIMEOUT_MS);
return startEventStream();
},
// pause the stream. Resuming it will continue from the current position