From f8f040bde989f4e3cee4e4ae3ec2c2d3f6dafbc7 Mon Sep 17 00:00:00 2001 From: cyril soler Date: Fri, 16 Dec 2016 10:03:23 +0100 Subject: [PATCH] fixed deadlock due to cross-locking RsGxsNetService and RsGxsGenExchange (reported by sss) --- libretroshare/src/gxs/rsgxsnetservice.cc | 51 ++++++++++++++---------- libretroshare/src/gxs/rsgxsnetservice.h | 5 ++- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 2e310e8fa..e19052257 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -397,21 +397,33 @@ void RsGxsNetService::processObserverNotifications() GXSNETDEBUG___ << "Processing observer notification." << std::endl; #endif + // Observer notifycation should never be done explicitly within a Mutex-protected region, because of the risk + // of causing a cross-deadlock between the observer (RsGxsGenExchange) and the network layer (RsGxsNetService). + std::vector grps_copy ; std::vector msgs_copy ; + std::set stat_copy ; + std::set keys_copy ; { RS_STACK_MUTEX(mNxsMutex) ; grps_copy = mNewGroupsToNotify ; msgs_copy = mNewMessagesToNotify ; + stat_copy = mNewStatsToNotify ; + keys_copy = mNewPublishKeysToNotify ; mNewGroupsToNotify.clear() ; mNewMessagesToNotify.clear() ; + mNewStatsToNotify.clear() ; + mNewPublishKeysToNotify.clear() ; } - mObserver->notifyNewGroups(grps_copy); - mObserver->notifyNewMessages(msgs_copy); + if(!grps_copy.empty()) mObserver->notifyNewGroups (grps_copy); + if(!msgs_copy.empty()) mObserver->notifyNewMessages(msgs_copy); + + for(std::set::const_iterator it(keys_copy.begin());it!=keys_copy.end();++it) mObserver->notifyReceivePublishKey(*it); + for(std::set::const_iterator it(stat_copy.begin());it!=stat_copy.end();++it) mObserver->notifyChangedGroupStats(*it); } void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id) @@ -428,7 +440,7 @@ void RsGxsNetService::cleanRejectedMessages() #ifdef NXS_NET_DEBUG_0 GXSNETDEBUG___ << "Cleaning rejected messages." << std::endl; #endif - + for(std::map::iterator it(mRejectedMessages.begin());it!=mRejectedMessages.end();) if(it->second + REJECTED_MESSAGE_RETRY_DELAY < now) { @@ -809,28 +821,23 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs) sendItem(grs_resp) ; } else if(grs->request_type == RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_RESPONSE) - { + { #ifdef NXS_NET_DEBUG_6 - GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl; + GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl; #endif - bool should_notify = false ; - { - RS_STACK_MUTEX(mNxsMutex) ; - RsGxsGrpConfig& rec(mServerGrpConfigMap[grs->grpId]) ; + RS_STACK_MUTEX(mNxsMutex) ; + RsGxsGrpConfig& rec(mServerGrpConfigMap[grs->grpId]) ; - uint32_t old_count = rec.max_visible_count ; - uint32_t old_suppliers_count = rec.suppliers.ids.size() ; + uint32_t old_count = rec.max_visible_count ; + uint32_t old_suppliers_count = rec.suppliers.ids.size() ; - rec.suppliers.ids.insert(grs->PeerId()) ; - rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ; - rec.update_TS = time(NULL) ; + rec.suppliers.ids.insert(grs->PeerId()) ; + rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ; + rec.update_TS = time(NULL) ; - if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.ids.size()) - should_notify = true ; - } - if(should_notify) - mObserver->notifyChangedGroupStats(grs->grpId); - } + if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.ids.size()) + mNewStatsToNotify.insert(grs->grpId) ; + } else std::cerr << "(EE) RsGxsNetService::handleRecvSyncGrpStatistics(): unknown item type " << grs->request_type << " found. This is a bug." << std::endl; } @@ -2701,7 +2708,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) gnsr.max_visible_count = std::max(gnsr.max_visible_count, mcount) ; if (oldVisibleCount != gnsr.max_visible_count || oldSuppliersCount != gnsr.suppliers.ids.size()) - mObserver->notifyChangedGroupStats(grpId); + mNewStatsToNotify.insert(grpId) ; #ifdef NXS_NET_DEBUG_1 GXSNETDEBUG_PG(item->PeerId(),grpId) << " grpId = " << grpId << std::endl; @@ -4776,7 +4783,7 @@ void RsGxsNetService::handleRecvPublishKeys(RsNxsGroupPublishKeyItem *item) #ifdef NXS_NET_DEBUG GXSNETDEBUG_PG(item->PeerId(),item->grpId)<< " updated database with new publish keys." << std::endl; #endif - mObserver->notifyReceivePublishKey(item->grpId); + mNewPublishKeysToNotify.insert(item->grpId) ; } else { diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 7a013017e..6aae36462 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -564,9 +564,12 @@ private: RsServiceInfo mServiceInfo; std::map mRejectedMessages; + std::vector mNewGroupsToNotify ; std::vector mNewMessagesToNotify ; - + std::set mNewStatsToNotify ; + std::set mNewPublishKeysToNotify ; + void debugDump(); };