Finished Notifications in p3ServiceControl:

- added extra Mutex to make notifications work.
 - Added ServiceControl tick to ServiceServer.
 - switched p3discovery2 from pqiMonitor -> pqiServiceMonitor



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7201 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2014-03-23 03:30:59 +00:00
parent 708f6d6b43
commit a06064e167
6 changed files with 212 additions and 66 deletions

View file

@ -32,7 +32,7 @@ RsServiceControl *rsServiceControl = NULL;
p3ServiceControl::p3ServiceControl(uint32_t configId) p3ServiceControl::p3ServiceControl(uint32_t configId)
:RsServiceControl(), /* p3Config(configId), pqiMonitor(), */ :RsServiceControl(), /* p3Config(configId), pqiMonitor(), */
mCtrlMtx("p3ServiceControl") mCtrlMtx("p3ServiceControl"), mMonitorMtx("P3ServiceControl::Monitor")
{ {
} }
@ -90,7 +90,7 @@ bool p3ServiceControl::deregisterService(uint32_t serviceId)
/* Interface for Services */ /* Interface for Services */
bool p3ServiceControl::registerServiceMonitor(pqiServiceMonitor *monitor, uint32_t serviceId) bool p3ServiceControl::registerServiceMonitor(pqiServiceMonitor *monitor, uint32_t serviceId)
{ {
RsStackMutex stack(mCtrlMtx); /***** LOCK STACK MUTEX ****/ RsStackMutex stack(mMonitorMtx); /***** LOCK STACK MUTEX ****/
std::cerr << "p3ServiceControl::registerServiceMonitor() for ServiceId: "; std::cerr << "p3ServiceControl::registerServiceMonitor() for ServiceId: ";
std::cerr << serviceId; std::cerr << serviceId;
@ -103,7 +103,7 @@ bool p3ServiceControl::registerServiceMonitor(pqiServiceMonitor *monitor, uint32
bool p3ServiceControl::deregisterServiceMonitor(pqiServiceMonitor *monitor) bool p3ServiceControl::deregisterServiceMonitor(pqiServiceMonitor *monitor)
{ {
RsStackMutex stack(mCtrlMtx); /***** LOCK STACK MUTEX ****/ RsStackMutex stack(mMonitorMtx); /***** LOCK STACK MUTEX ****/
std::cerr << "p3ServiceControl::deregisterServiceMonitor()"; std::cerr << "p3ServiceControl::deregisterServiceMonitor()";
std::cerr << std::endl; std::cerr << std::endl;
@ -682,7 +682,7 @@ void p3ServiceControl::removePeer(const RsPeerId &peerId)
{ {
RsStackMutex stack(mCtrlMtx); /***** LOCK STACK MUTEX ****/ RsStackMutex stack(mCtrlMtx); /***** LOCK STACK MUTEX ****/
std::cerr << "p3ServiceControl::removePeer()"; std::cerr << "p3ServiceControl::removePeer() : " << peerId.toStdString();
std::cerr << std::endl; std::cerr << std::endl;
ServicePeerFilter originalFilter; ServicePeerFilter originalFilter;
@ -692,10 +692,18 @@ void p3ServiceControl::removePeer(const RsPeerId &peerId)
fit = mPeerFilterMap.find(peerId); fit = mPeerFilterMap.find(peerId);
if (fit != mPeerFilterMap.end()) if (fit != mPeerFilterMap.end())
{ {
std::cerr << "p3ServiceControl::removePeer() clearing mPeerFilterMap";
std::cerr << std::endl;
hadFilter = true; hadFilter = true;
originalFilter = fit->second; originalFilter = fit->second;
mPeerFilterMap.erase(fit); mPeerFilterMap.erase(fit);
} }
else
{
std::cerr << "p3ServiceControl::removePeer() Nothing in mPeerFilterMap";
std::cerr << std::endl;
}
} }
{ {
@ -703,8 +711,16 @@ void p3ServiceControl::removePeer(const RsPeerId &peerId)
sit = mServicesProvided.find(peerId); sit = mServicesProvided.find(peerId);
if (sit != mServicesProvided.end()) if (sit != mServicesProvided.end())
{ {
std::cerr << "p3ServiceControl::removePeer() clearing mServicesProvided";
std::cerr << std::endl;
mServicesProvided.erase(sit); mServicesProvided.erase(sit);
} }
else
{
std::cerr << "p3ServiceControl::removePeer() Nothing in mServicesProvided";
std::cerr << std::endl;
}
} }
if (hadFilter) if (hadFilter)
@ -720,6 +736,9 @@ void p3ServiceControl::removePeer(const RsPeerId &peerId)
void p3ServiceControl::tick() void p3ServiceControl::tick()
{ {
notifyAboutFriends();
notifyServices();
std::cerr << "p3ServiceControl::tick()"; std::cerr << "p3ServiceControl::tick()";
std::cerr << std::endl; std::cerr << std::endl;
} }
@ -754,8 +773,14 @@ void p3ServiceControl::statusChange(const std::list<pqipeer> &plist)
std::list<pqipeer>::const_iterator pit; std::list<pqipeer>::const_iterator pit;
for(pit = plist.begin(); pit != plist.end(); pit++) for(pit = plist.begin(); pit != plist.end(); pit++)
{ {
std::cerr << "p3ServiceControl::statusChange() for peer: ";
std::cerr << " peer: " << (pit->id).toStdString();
std::cerr << " state: " << pit->state;
std::cerr << " actions: " << pit->actions;
std::cerr << std::endl;
if (pit->state & RS_PEER_S_FRIEND) if (pit->state & RS_PEER_S_FRIEND)
{ {
// Connected / Disconnected. (interal actions).
if (pit->actions & RS_PEER_CONNECTED) if (pit->actions & RS_PEER_CONNECTED)
{ {
updatePeerConnect(pit->id); updatePeerConnect(pit->id);
@ -764,6 +789,19 @@ void p3ServiceControl::statusChange(const std::list<pqipeer> &plist)
{ {
updatePeerDisconnect(pit->id); updatePeerDisconnect(pit->id);
} }
// Added / Removed. (pass on notifications).
if (pit->actions & RS_PEER_NEW)
{
updatePeerNew(pit->id);
}
}
else
{
if (pit->actions & RS_PEER_MOVED)
{
updatePeerRemoved(pit->id);
}
} }
} }
return; return;
@ -786,53 +824,143 @@ void p3ServiceControl::updatePeerDisconnect(const RsPeerId &peerId)
return; return;
} }
// Update Peer status.
void p3ServiceControl::updatePeerNew(const RsPeerId &peerId)
{
std::cerr << "p3ServiceControl::updatePeerNew(): " << peerId.toStdString();
std::cerr << std::endl;
pqiServicePeer peer;
peer.id = peerId;
peer.actions = RS_SERVICE_PEER_NEW;
mFriendNotifications.push_back(peer);
return;
}
void p3ServiceControl::updatePeerRemoved(const RsPeerId &peerId)
{
std::cerr << "p3ServiceControl::updatePeerRemoved(): " << peerId.toStdString();
std::cerr << std::endl;
removePeer(peerId);
pqiServicePeer peer;
peer.id = peerId;
peer.actions = RS_SERVICE_PEER_REMOVED;
mFriendNotifications.push_back(peer);
return;
}
/****************************************************************************/ /****************************************************************************/
/****************************************************************************/ /****************************************************************************/
void p3ServiceControl::notifyAboutFriends()
{
std::list<pqiServicePeer> friendNotifications;
{
RsStackMutex stack(mCtrlMtx); /***** LOCK STACK MUTEX ****/
if (mFriendNotifications.empty())
{
return;
}
std::cerr << "p3ServiceControl::notifyAboutFriends(): Something has changed!";
std::cerr << std::endl;
mFriendNotifications.swap(friendNotifications);
}
{
RsStackMutex stack(mMonitorMtx); /***** LOCK STACK MUTEX ****/
std::multimap<uint32_t, pqiServiceMonitor *>::const_iterator sit;
for(sit = mMonitors.begin(); sit != mMonitors.end(); sit++)
{
sit->second->statusChange(friendNotifications);
}
}
}
void p3ServiceControl::notifyServices() void p3ServiceControl::notifyServices()
{ {
RsStackMutex stack(mCtrlMtx); /***** LOCK STACK MUTEX ****/ std::map<uint32_t, ServiceNotifications> notifications;
std::map<uint32_t, ServiceNotifications>::const_iterator it;
std::multimap<uint32_t, pqiServiceMonitor *>::const_iterator sit, eit;
for(it = mNotifications.begin(); it != mNotifications.end(); it++)
{ {
sit = mMonitors.lower_bound(it->first); RsStackMutex stack(mCtrlMtx); /***** LOCK STACK MUTEX ****/
eit = mMonitors.upper_bound(it->first);
if (sit == eit) if (mNotifications.empty())
{ {
/* nothing to notify - skip */ return;
continue;
} }
std::list<pqiServicePeer> peers; std::cerr << "p3ServiceControl::notifyServices()";
std::set<RsPeerId>::const_iterator pit; std::cerr << std::endl;
for(pit = it->second.mAdded.begin();
pit != it->second.mAdded.end(); pit++) mNotifications.swap(notifications);
}
{
RsStackMutex stack(mMonitorMtx); /***** LOCK STACK MUTEX ****/
std::map<uint32_t, ServiceNotifications>::const_iterator it;
std::multimap<uint32_t, pqiServiceMonitor *>::const_iterator sit, eit;
for(it = notifications.begin(); it != notifications.end(); it++)
{ {
pqiServicePeer peer; std::cerr << "p3ServiceControl::notifyServices(): Notifications for Service: " << it->first;
peer.id = *pit; std::cerr << std::endl;
peer.actions = RS_SERVICE_PEER_CONNECTED;
peers.push_back(peer); sit = mMonitors.lower_bound(it->first);
} eit = mMonitors.upper_bound(it->first);
if (sit == eit)
for(pit = it->second.mRemoved.begin(); {
pit != it->second.mRemoved.end(); pit++) /* nothing to notify - skip */
{ std::cerr << "p3ServiceControl::notifyServices(): Noone Monitoring ... skipping";
pqiServicePeer peer; std::cerr << std::endl;
peer.id = *pit;
peer.actions = RS_SERVICE_PEER_REMOVED; continue;
}
peers.push_back(peer);
} std::list<pqiServicePeer> peers;
std::set<RsPeerId>::const_iterator pit;
for(; sit != eit; sit++) for(pit = it->second.mAdded.begin();
{ pit != it->second.mAdded.end(); pit++)
sit->second->statusChange(peers); {
pqiServicePeer peer;
peer.id = *pit;
peer.actions = RS_SERVICE_PEER_CONNECTED;
peers.push_back(peer);
std::cerr << "p3ServiceControl::notifyServices(): Peer: " << *pit << " CONNECTED";
std::cerr << std::endl;
}
for(pit = it->second.mRemoved.begin();
pit != it->second.mRemoved.end(); pit++)
{
pqiServicePeer peer;
peer.id = *pit;
peer.actions = RS_SERVICE_PEER_DISCONNECTED;
peers.push_back(peer);
std::cerr << "p3ServiceControl::notifyServices(): Peer: " << *pit << " DISCONNECTED";
std::cerr << std::endl;
}
for(; sit != eit; sit++)
{
std::cerr << "p3ServiceControl::notifyServices(): Sending to Monitoring Service";
std::cerr << std::endl;
sit->second->statusChange(peers);
}
} }
} }
mNotifications.clear();
} }

View file

@ -55,7 +55,7 @@ class ServicePeerFilter
std::ostream &operator<<(std::ostream &out, const ServicePeerFilter &filter); std::ostream &operator<<(std::ostream &out, const ServicePeerFilter &filter);
class p3ServiceControl: public RsServiceControl /* , public p3Config, public pqiMonitor */ class p3ServiceControl: public RsServiceControl, public pqiMonitor /* , public p3Config */
{ {
public: public:
@ -124,6 +124,7 @@ virtual bool loadList(std::list<RsItem *>& load);
private: private:
void notifyServices(); void notifyServices();
void notifyAboutFriends();
bool createDefaultPermissions_locked(uint32_t serviceId, std::string serviceName, bool defaultOn); bool createDefaultPermissions_locked(uint32_t serviceId, std::string serviceName, bool defaultOn);
@ -138,6 +139,8 @@ void recordFilterChanges_locked(const RsPeerId &peerId,
void updatePeerConnect(const RsPeerId &peerId); void updatePeerConnect(const RsPeerId &peerId);
void updatePeerDisconnect(const RsPeerId &peerId); void updatePeerDisconnect(const RsPeerId &peerId);
void updatePeerNew(const RsPeerId &peerId);
void updatePeerRemoved(const RsPeerId &peerId);
bool peerHasPermissionForService_locked(const RsPeerId &peerId, uint32_t serviceId); bool peerHasPermissionForService_locked(const RsPeerId &peerId, uint32_t serviceId);
@ -157,8 +160,12 @@ bool peerHasPermissionForService_locked(const RsPeerId &peerId, uint32_t service
// Below here is saved in Configuration. // Below here is saved in Configuration.
std::map<uint32_t, RsServicePermissions> mServicePermissionMap; std::map<uint32_t, RsServicePermissions> mServicePermissionMap;
std::multimap<uint32_t, pqiServiceMonitor *> mMonitors;
std::map<uint32_t, ServiceNotifications> mNotifications; std::map<uint32_t, ServiceNotifications> mNotifications;
std::list<pqiServicePeer> mFriendNotifications;
// Separate mutex here - must not hold both at the same time!
RsMutex mMonitorMtx; /* below is protected */
std::multimap<uint32_t, pqiServiceMonitor *> mMonitors;
}; };

View file

@ -170,6 +170,8 @@ bool p3ServiceServer::sendItem(RsRawItem *item)
int p3ServiceServer::tick() int p3ServiceServer::tick()
{ {
mServiceControl->tick();
RsStackMutex stack(srvMtx); /********* LOCKED *********/ RsStackMutex stack(srvMtx); /********* LOCKED *********/
#ifdef SERVICE_DEBUG #ifdef SERVICE_DEBUG

View file

@ -1604,15 +1604,20 @@ int RsServer::StartupRetroShare()
/**************************************************************************/ /**************************************************************************/
/* need to Monitor too! */ /* need to Monitor too! */
mLinkMgr->addMonitor(serviceInfo);
mLinkMgr->addMonitor(pqih); mLinkMgr->addMonitor(pqih);
mLinkMgr->addMonitor(serviceCtrl);
mLinkMgr->addMonitor(serviceInfo);
// below should be changed to pqiServiceMonitors....
mLinkMgr->addMonitor(mCacheStrapper); mLinkMgr->addMonitor(mCacheStrapper);
mLinkMgr->addMonitor(mDisc);
mLinkMgr->addMonitor(msgSrv); mLinkMgr->addMonitor(msgSrv);
mLinkMgr->addMonitor(mStatusSrv); mLinkMgr->addMonitor(mStatusSrv);
mLinkMgr->addMonitor(chatSrv); mLinkMgr->addMonitor(chatSrv);
mLinkMgr->addMonitor(mBwCtrl); mLinkMgr->addMonitor(mBwCtrl);
// Services that have been changed to pqiServiceMonitor
serviceCtrl->registerServiceMonitor(mDisc, mDisc->getServiceInfo().mServiceType);
/* must also add the controller as a Monitor... /* must also add the controller as a Monitor...
* a little hack to get it to work. * a little hack to get it to work.
*/ */

View file

@ -1063,45 +1063,49 @@ void p3discovery2::recvPGPCertificate(const SSLID &fromId, RsDiscPgpCertItem *it
/************* from pqiMonitor *******************/ /************* from pqiServiceMonitor *******************/
void p3discovery2::statusChange(const std::list<pqipeer> &plist) void p3discovery2::statusChange(const std::list<pqiServicePeer> &plist)
{ {
#ifdef P3DISC_DEBUG //#ifdef P3DISC_DEBUG
std::cerr << "p3discovery2::statusChange()" << std::endl; std::cerr << "p3discovery2::statusChange()" << std::endl;
#endif //#endif
std::list<pqipeer>::const_iterator pit; std::list<pqiServicePeer>::const_iterator pit;
for(pit = plist.begin(); pit != plist.end(); pit++) for(pit = plist.begin(); pit != plist.end(); pit++)
{ {
if ((pit->state & RS_PEER_S_FRIEND) && (pit->actions & RS_PEER_CONNECTED)) if (pit->actions & RS_SERVICE_PEER_CONNECTED)
{ {
#ifdef P3DISC_DEBUG //#ifdef P3DISC_DEBUG
std::cerr << "p3discovery2::statusChange() Starting Disc with: " << pit->id << std::endl; std::cerr << "p3discovery2::statusChange() Starting Disc with: " << pit->id << std::endl;
#endif //#endif
sendOwnContactInfo(pit->id); sendOwnContactInfo(pit->id);
} }
else if (!(pit->state & RS_PEER_S_FRIEND) && (pit->actions & RS_PEER_MOVED)) else if (pit->actions & RS_SERVICE_PEER_DISCONNECTED)
{ {
#ifdef P3DISC_DEBUG std::cerr << "p3discovery2::statusChange() Disconnected: " << pit->id << std::endl;
std::cerr << "p3discovery2::statusChange() Removing Friend: " << pit->id << std::endl;
#endif
removeFriend(pit->id);
} }
else if ((pit->state & RS_PEER_S_FRIEND) && (pit->actions & RS_PEER_NEW))
if (pit->actions & RS_SERVICE_PEER_NEW)
{ {
#ifdef P3DISC_DEBUG //#ifdef P3DISC_DEBUG
std::cerr << "p3discovery2::statusChange() Adding Friend: " << pit->id << std::endl; std::cerr << "p3discovery2::statusChange() Adding Friend: " << pit->id << std::endl;
#endif //#endif
addFriend(pit->id); addFriend(pit->id);
} }
else if (pit->actions & RS_SERVICE_PEER_REMOVED)
{
//#ifdef P3DISC_DEBUG
std::cerr << "p3discovery2::statusChange() Removing Friend: " << pit->id << std::endl;
//#endif
removeFriend(pit->id);
}
} }
#ifdef P3DISC_DEBUG //#ifdef P3DISC_DEBUG
std::cerr << "p3discovery2::statusChange() finished." << std::endl; std::cerr << "p3discovery2::statusChange() finished." << std::endl;
#endif //#endif
return; return;
} }
/*************************************************************************************/ /*************************************************************************************/
/* Extracting Network Graph Details */ /* Extracting Network Graph Details */

View file

@ -34,7 +34,7 @@
#include "pqi/p3linkmgr.h" #include "pqi/p3linkmgr.h"
#include "pqi/p3netmgr.h" #include "pqi/p3netmgr.h"
#include "pqi/pqimonitor.h" #include "pqi/pqiservicemonitor.h"
#include "serialiser/rsdiscovery2items.h" #include "serialiser/rsdiscovery2items.h"
#include "services/p3service.h" #include "services/p3service.h"
#include "pqi/authgpg.h" #include "pqi/authgpg.h"
@ -74,7 +74,7 @@ void mergeFriendList(const std::list<PGPID> &friends);
class p3discovery2: public RsDisc, public p3Service, public pqiMonitor, public AuthGPGService class p3discovery2: public RsDisc, public p3Service, public pqiServiceMonitor, public AuthGPGService
{ {
public: public:
@ -83,9 +83,9 @@ virtual ~p3discovery2();
virtual RsServiceInfo getServiceInfo(); virtual RsServiceInfo getServiceInfo();
/************* from pqiMonitor *******************/ /************* from pqiServiceMonitor *******************/
virtual void statusChange(const std::list<pqipeer> &plist); virtual void statusChange(const std::list<pqiServicePeer> &plist);
/************* from pqiMonitor *******************/ /************* from pqiServiceMonitor *******************/
int tick(); int tick();