moved notify calls to RsGenExchange out of RsGxsNetService lock zone, to avoid deadlocks

This commit is contained in:
csoler 2015-12-23 11:18:26 -05:00
parent 226948cab5
commit 98b27d6a2b
2 changed files with 42 additions and 2 deletions

View File

@ -320,6 +320,18 @@ int RsGxsNetService::tick()
if(receivedItems()) if(receivedItems())
recvNxsItemQueue(); recvNxsItemQueue();
bool should_notify = false;
{
RS_STACK_MUTEX(mNxsMutex) ;
should_notify = should_notify || !mNewGroupsToNotify.empty() ;
should_notify = should_notify || !mNewMessagesToNotify.empty() ;
}
if(should_notify)
processObserverNotifications() ;
time_t now = time(NULL); time_t now = time(NULL);
time_t elapsed = mSYNC_PERIOD + mSyncTs; time_t elapsed = mSYNC_PERIOD + mSyncTs;
@ -346,6 +358,29 @@ int RsGxsNetService::tick()
return 1; return 1;
} }
void RsGxsNetService::processObserverNotifications()
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << "Processing observer notification." << std::endl;
#endif
std::vector<RsNxsGrp*> grps_copy ;
std::vector<RsNxsMsg*> msgs_copy ;
{
RS_STACK_MUTEX(mNxsMutex) ;
grps_copy = mNewGroupsToNotify ;
msgs_copy = mNewMessagesToNotify ;
mNewGroupsToNotify.clear() ;
mNewMessagesToNotify.clear() ;
}
mObserver->notifyNewGroups(grps_copy);
mObserver->notifyNewMessages(msgs_copy);
}
void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id) void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id)
{ {
RS_STACK_MUTEX(mNxsMutex) ; RS_STACK_MUTEX(mNxsMutex) ;
@ -2170,7 +2205,8 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grps[i]->grpId) ; GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grps[i]->grpId) ;
#endif #endif
// notify listener of grps // notify listener of grps
mObserver->notifyNewGroups(grps); for(uint32_t i=0;i<grps.size();++i)
mNewGroupsToNotify.push_back(grps[i]) ;
// now note this as the latest you've received from this peer // now note this as the latest you've received from this peer
RsPeerId peerFrom = tr->mTransaction->PeerId(); RsPeerId peerFrom = tr->mTransaction->PeerId();
@ -2250,7 +2286,8 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grpId) << " " << msgs[i]->msgId << std::endl ; GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grpId) << " " << msgs[i]->msgId << std::endl ;
#endif #endif
// notify listener of msgs // notify listener of msgs
mObserver->notifyNewMessages(msgs); for(uint32_t i=0;i<msgs.size();++i)
mNewMessagesToNotify.push_back(msgs[i]) ;
// now note that this is the latest you've received from this peer // now note that this is the latest you've received from this peer
// for the grp id // for the grp id

View File

@ -449,6 +449,7 @@ private:
void locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages) ; void locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages) ;
void cleanRejectedMessages(); void cleanRejectedMessages();
void processObserverNotifications();
private: private:
@ -526,6 +527,8 @@ private:
RsServiceInfo mServiceInfo; RsServiceInfo mServiceInfo;
std::map<RsGxsMessageId,time_t> mRejectedMessages; std::map<RsGxsMessageId,time_t> mRejectedMessages;
std::vector<RsNxsGrp*> mNewGroupsToNotify ;
std::vector<RsNxsMsg*> mNewMessagesToNotify ;
void debugDump(); void debugDump();
}; };