fixed deadlock due to cross-locking RsGxsNetService and RsGxsGenExchange (reported by sss)

This commit is contained in:
cyril soler 2016-12-16 10:03:23 +01:00
parent 1ef11a27fd
commit f8f040bde9
2 changed files with 33 additions and 23 deletions

View File

@ -397,21 +397,33 @@ void RsGxsNetService::processObserverNotifications()
GXSNETDEBUG___ << "Processing observer notification." << std::endl;
#endif
// Observer notifycation should never be done explicitly within a Mutex-protected region, because of the risk
// of causing a cross-deadlock between the observer (RsGxsGenExchange) and the network layer (RsGxsNetService).
std::vector<RsNxsGrp*> grps_copy ;
std::vector<RsNxsMsg*> msgs_copy ;
std::set<RsGxsGroupId> stat_copy ;
std::set<RsGxsGroupId> keys_copy ;
{
RS_STACK_MUTEX(mNxsMutex) ;
grps_copy = mNewGroupsToNotify ;
msgs_copy = mNewMessagesToNotify ;
stat_copy = mNewStatsToNotify ;
keys_copy = mNewPublishKeysToNotify ;
mNewGroupsToNotify.clear() ;
mNewMessagesToNotify.clear() ;
mNewStatsToNotify.clear() ;
mNewPublishKeysToNotify.clear() ;
}
mObserver->notifyNewGroups(grps_copy);
mObserver->notifyNewMessages(msgs_copy);
if(!grps_copy.empty()) mObserver->notifyNewGroups (grps_copy);
if(!msgs_copy.empty()) mObserver->notifyNewMessages(msgs_copy);
for(std::set<RsGxsGroupId>::const_iterator it(keys_copy.begin());it!=keys_copy.end();++it) mObserver->notifyReceivePublishKey(*it);
for(std::set<RsGxsGroupId>::const_iterator it(stat_copy.begin());it!=stat_copy.end();++it) mObserver->notifyChangedGroupStats(*it);
}
void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id)
@ -428,7 +440,7 @@ void RsGxsNetService::cleanRejectedMessages()
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << "Cleaning rejected messages." << std::endl;
#endif
for(std::map<RsGxsMessageId,time_t>::iterator it(mRejectedMessages.begin());it!=mRejectedMessages.end();)
if(it->second + REJECTED_MESSAGE_RETRY_DELAY < now)
{
@ -809,28 +821,23 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
sendItem(grs_resp) ;
}
else if(grs->request_type == RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_RESPONSE)
{
{
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl;
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl;
#endif
bool should_notify = false ;
{
RS_STACK_MUTEX(mNxsMutex) ;
RsGxsGrpConfig& rec(mServerGrpConfigMap[grs->grpId]) ;
RS_STACK_MUTEX(mNxsMutex) ;
RsGxsGrpConfig& rec(mServerGrpConfigMap[grs->grpId]) ;
uint32_t old_count = rec.max_visible_count ;
uint32_t old_suppliers_count = rec.suppliers.ids.size() ;
uint32_t old_count = rec.max_visible_count ;
uint32_t old_suppliers_count = rec.suppliers.ids.size() ;
rec.suppliers.ids.insert(grs->PeerId()) ;
rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ;
rec.update_TS = time(NULL) ;
rec.suppliers.ids.insert(grs->PeerId()) ;
rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ;
rec.update_TS = time(NULL) ;
if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.ids.size())
should_notify = true ;
}
if(should_notify)
mObserver->notifyChangedGroupStats(grs->grpId);
}
if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.ids.size())
mNewStatsToNotify.insert(grs->grpId) ;
}
else
std::cerr << "(EE) RsGxsNetService::handleRecvSyncGrpStatistics(): unknown item type " << grs->request_type << " found. This is a bug." << std::endl;
}
@ -2701,7 +2708,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
gnsr.max_visible_count = std::max(gnsr.max_visible_count, mcount) ;
if (oldVisibleCount != gnsr.max_visible_count || oldSuppliersCount != gnsr.suppliers.ids.size())
mObserver->notifyChangedGroupStats(grpId);
mNewStatsToNotify.insert(grpId) ;
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_PG(item->PeerId(),grpId) << " grpId = " << grpId << std::endl;
@ -4776,7 +4783,7 @@ void RsGxsNetService::handleRecvPublishKeys(RsNxsGroupPublishKeyItem *item)
#ifdef NXS_NET_DEBUG
GXSNETDEBUG_PG(item->PeerId(),item->grpId)<< " updated database with new publish keys." << std::endl;
#endif
mObserver->notifyReceivePublishKey(item->grpId);
mNewPublishKeysToNotify.insert(item->grpId) ;
}
else
{

View File

@ -564,9 +564,12 @@ private:
RsServiceInfo mServiceInfo;
std::map<RsGxsMessageId,time_t> mRejectedMessages;
std::vector<RsNxsGrp*> mNewGroupsToNotify ;
std::vector<RsNxsMsg*> mNewMessagesToNotify ;
std::set<RsGxsGroupId> mNewStatsToNotify ;
std::set<RsGxsGroupId> mNewPublishKeysToNotify ;
void debugDump();
};