improved explicit requests of missing GXS ids, also fixing situations where empty peers lists would wipe out pending peer lists

This commit is contained in:
csoler 2016-07-15 16:59:57 -04:00
parent 8ccfee20bb
commit 90dfc6e14a
7 changed files with 134 additions and 63 deletions

View file

@ -168,7 +168,7 @@ bool DistributedChatService::handleRecvChatLobbyMsgItem(RsChatMsgItem *ci)
if(it == _chat_lobbys.end()) if(it == _chat_lobbys.end())
{ {
#ifdef DEBUG_CHAT_LOBBIES #ifdef DEBUG_CHAT_LOBBIES
std::cerr << "Chatlobby for id " << std::hex << item->lobby_id << " has no record. Dropping the msg." << std::dec << std::endl; std::cerr << "Chatlobby for id " << std::hex << cli->lobby_id << " has no record. Dropping the msg." << std::dec << std::endl;
#endif #endif
return false; return false;
} }

View file

@ -2832,12 +2832,15 @@ void RsGenExchange::processRecvdMessages()
void RsGenExchange::processRecvdGroups() void RsGenExchange::processRecvdGroups()
{ {
RS_STACK_MUTEX(mGenMtx) ; RS_STACK_MUTEX(mGenMtx) ;
if(mReceivedGrps.empty()) if(mReceivedGrps.empty())
return; return;
NxsGrpPendValidVect::iterator vit = mReceivedGrps.begin(); #ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::Processing received groups" << std::endl;
#endif
NxsGrpPendValidVect::iterator vit = mReceivedGrps.begin();
std::vector<RsGxsGroupId> existingGrpIds; std::vector<RsGxsGroupId> existingGrpIds;
std::list<RsGxsGroupId> grpIds; std::list<RsGxsGroupId> grpIds;

View file

@ -96,7 +96,6 @@
* as these will be used very frequently. * as these will be used very frequently.
*****/ *****/
typedef RsPeerId PeerId; // SHOULD BE REMOVED => RsPeerId (SSLID)
typedef PGPIdType RsPgpId; typedef PGPIdType RsPgpId;
/* Identity Interface for GXS Message Verification. /* Identity Interface for GXS Message Verification.
@ -150,7 +149,7 @@ public:
* @param keyref the KeyRef of the key being requested * @param keyref the KeyRef of the key being requested
* @return will * @return will
*/ */
virtual bool requestKey(const RsGxsId &id, const std::list<PeerId> &peers) = 0; virtual bool requestKey(const RsGxsId &id, const std::list<RsPeerId> &peers) = 0;
virtual bool requestPrivateKey(const RsGxsId &id) = 0; virtual bool requestPrivateKey(const RsGxsId &id) = 0;

View file

@ -220,16 +220,16 @@
NXS_NET_DEBUG_3 publish key exchange NXS_NET_DEBUG_3 publish key exchange
NXS_NET_DEBUG_4 vetting NXS_NET_DEBUG_4 vetting
NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out) NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out)
NXS_NET_DEBUG_6 NXS_NET_DEBUG_6 group sync statistics (e.g. number of posts at nighbour nodes, etc)
NXS_NET_DEBUG_7 encryption/decryption of transactions NXS_NET_DEBUG_7 encryption/decryption of transactions
***/ ***/
//#define NXS_NET_DEBUG_0 1 #define NXS_NET_DEBUG_0 1
//#define NXS_NET_DEBUG_1 1 //#define NXS_NET_DEBUG_1 1
//#define NXS_NET_DEBUG_2 1 //#define NXS_NET_DEBUG_2 1
//#define NXS_NET_DEBUG_3 1 //#define NXS_NET_DEBUG_3 1
//#define NXS_NET_DEBUG_4 1 //#define NXS_NET_DEBUG_4 1
//#define NXS_NET_DEBUG_5 1 #define NXS_NET_DEBUG_5 1
//#define NXS_NET_DEBUG_6 1 //#define NXS_NET_DEBUG_6 1
//#define NXS_NET_DEBUG_7 1 //#define NXS_NET_DEBUG_7 1
@ -266,8 +266,8 @@ static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_GXS_KEY_MISSING = 0x05 ;
|| defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) || defined(NXS_NET_DEBUG_7) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) || defined(NXS_NET_DEBUG_7)
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ; static const RsPeerId peer_to_print = RsPeerId(std::string("")) ;
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("87c769d3ffeafdc4433f557d50cdf2e8" )) ; // use this to allow to this group id only, or "" for all IDs static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("30501fac090b65f5b9def169598b53ed" )) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = 0x215 ; // use this to allow to this service id only, or 0 for all services static const uint32_t service_to_print = 0x211 ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums) // warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {}; class nullstream: public std::ostream {};
@ -4610,7 +4610,13 @@ void RsGxsNetService::setSyncAge(uint32_t /* age */)
int RsGxsNetService::requestGrp(const std::list<RsGxsGroupId>& grpId, const RsPeerId& peerId) int RsGxsNetService::requestGrp(const std::list<RsGxsGroupId>& grpId, const RsPeerId& peerId)
{ {
RS_STACK_MUTEX(mNxsMutex) ; RS_STACK_MUTEX(mNxsMutex) ;
mExplicitRequest[peerId].assign(grpId.begin(), grpId.end()); #ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(peerId) << "RsGxsNetService::requestGrp(): adding explicit group requests to peer " << peerId << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpId.begin());it!=grpId.end();++it)
GXSNETDEBUG_PG(peerId,*it) << " Group ID: " << *it << std::endl;
#endif
mExplicitRequest[peerId].assign(grpId.begin(), grpId.end());
return 1; return 1;
} }
@ -4622,7 +4628,10 @@ void RsGxsNetService::processExplicitGroupRequests()
for(; cit != mExplicitRequest.end(); ++cit) for(; cit != mExplicitRequest.end(); ++cit)
{ {
const RsPeerId& peerId = cit->first; #ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(cit->first) << "RsGxsNetService::sending pending explicit group requests to peer " << cit->first << std::endl;
#endif
const RsPeerId& peerId = cit->first;
const std::list<RsGxsGroupId>& groupIdList = cit->second; const std::list<RsGxsGroupId>& groupIdList = cit->second;
std::list<RsNxsItem*> grpSyncItems; std::list<RsNxsItem*> grpSyncItems;
@ -4630,7 +4639,10 @@ void RsGxsNetService::processExplicitGroupRequests()
uint32_t transN = locked_getTransactionId(); uint32_t transN = locked_getTransactionId();
for(; git != groupIdList.end(); ++git) for(; git != groupIdList.end(); ++git)
{ {
RsNxsSyncGrpItem* item = new RsNxsSyncGrpItem(mServType); #ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(peerId,*git) << " group request for grp ID " << *git << " to peer " << peerId << std::endl;
#endif
RsNxsSyncGrpItem* item = new RsNxsSyncGrpItem(mServType);
item->grpId = *git; item->grpId = *git;
item->PeerId(peerId); item->PeerId(peerId);
item->flag = RsNxsSyncGrpItem::FLAG_REQUEST; item->flag = RsNxsSyncGrpItem::FLAG_REQUEST;

View file

@ -1073,7 +1073,7 @@ bool p3GxsCircles::locked_processLoadingCacheEntry(RsGxsCircleCache& cache)
} }
else else
{ {
std::list<PeerId> peers; std::list<RsPeerId> peers;
if(!cache.mOriginator.isNull()) if(!cache.mOriginator.isNull())
{ {

View file

@ -50,6 +50,7 @@
* #define DEBUG_OPINION 1 * #define DEBUG_OPINION 1
* #define GXSID_GEN_DUMMY_DATA 1 * #define GXSID_GEN_DUMMY_DATA 1
****/ ****/
#define DEBUG_IDS 1
#define ID_REQUEST_LIST 0x0001 #define ID_REQUEST_LIST 0x0001
#define ID_REQUEST_IDENTITY 0x0002 #define ID_REQUEST_IDENTITY 0x0002
@ -700,16 +701,25 @@ bool p3IdService::havePrivateKey(const RsGxsId &id)
return mKeyCache.is_cached(id) ; return mKeyCache.is_cached(id) ;
} }
bool p3IdService::requestKey(const RsGxsId &id, const std::list<PeerId> &peers) bool p3IdService::requestKey(const RsGxsId &id, const std::list<RsPeerId>& peers)
{ {
if (haveKey(id)) if (haveKey(id))
return true; return true;
else else
{ {
if(isPendingNetworkRequest(id)) RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
return true;
}
std::map<RsGxsId,std::list<RsPeerId> >::iterator rit = mIdsNotPresent.find(id) ;
if(rit != mIdsNotPresent.end())
{
locked_mergeIdsToRequestFromNet(id,peers) ;
return true ;
}
}
if(peers.empty())
std::cerr << "(EE) empty peer list in ID request from net. This is bound to fail!" << std::endl;
return cache_request_load(id, peers); return cache_request_load(id, peers);
} }
@ -725,6 +735,27 @@ bool p3IdService::isPendingNetworkRequest(const RsGxsId& gxsId)
return false; return false;
} }
void p3IdService::locked_mergeIdsToRequestFromNet(const RsGxsId& id,const std::list<RsPeerId>& peers)
{
// merge the two lists (I use a std::set to make it more efficient)
std::cerr << "p3IdService::requestKey(): merging list with existing pending request." << std::endl;
std::list<RsPeerId>& old_peers(mIdsNotPresent[id]) ; // create if necessary
std::set<RsPeerId> new_peers ;
for(std::list<RsPeerId>::const_iterator it(peers.begin());it!=peers.end();++it)
new_peers.insert(*it) ;
for(std::list<RsPeerId>::iterator it(old_peers.begin());it!=old_peers.end();++it)
new_peers.insert(*it) ;
old_peers.clear();
for(std::set<RsPeerId>::iterator it(new_peers.begin());it!=new_peers.end();++it)
old_peers.push_back(*it) ;
}
bool p3IdService::getKey(const RsGxsId &id, RsTlvPublicRSAKey &key) bool p3IdService::getKey(const RsGxsId &id, RsTlvPublicRSAKey &key)
{ {
{ {
@ -918,7 +949,7 @@ bool p3IdService::haveReputation(const RsGxsId &id)
return haveKey(id); return haveKey(id);
} }
bool p3IdService::loadReputation(const RsGxsId &id, const std::list<PeerId>& peers) bool p3IdService::loadReputation(const RsGxsId &id, const std::list<RsPeerId>& peers)
{ {
if (haveKey(id)) if (haveKey(id))
return true; return true;
@ -2010,11 +2041,10 @@ bool p3IdService::cache_store(const RsGxsIdGroupItem *item)
#define MIN_CYCLE_GAP 2 #define MIN_CYCLE_GAP 2
bool p3IdService::cache_request_load(const RsGxsId &id, const std::list<PeerId>& peers) bool p3IdService::cache_request_load(const RsGxsId &id, const std::list<RsPeerId> &peers)
{ {
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
std::cerr << "p3IdService::cache_request_load(" << id << ")"; std::cerr << "p3IdService::cache_request_load(" << id << ")" << std::endl;
std::cerr << std::endl;
#endif // DEBUG_IDS #endif // DEBUG_IDS
{ {
@ -2130,11 +2160,14 @@ bool p3IdService::cache_load_for_token(uint32_t token)
// now store identities that aren't present // now store identities that aren't present
RsStackMutex stack(mIdMtx); RsStackMutex stack(mIdMtx);
mIdsNotPresent.insert(mPendingCache.begin(), mPendingCache.end());
for(std::map<RsGxsId,std::list<RsPeerId> >::const_iterator itt(mPendingCache.begin());itt!=mPendingCache.end();++itt)
locked_mergeIdsToRequestFromNet(itt->first,itt->second) ;
mPendingCache.clear(); mPendingCache.clear();
if(!mIdsNotPresent.empty()) if(!mIdsNotPresent.empty())
schedule_now(GXSID_EVENT_REQUEST_IDS); schedule_now(GXSID_EVENT_REQUEST_IDS);
} }
} }
@ -2150,46 +2183,69 @@ bool p3IdService::cache_load_for_token(uint32_t token)
void p3IdService::requestIdsFromNet() void p3IdService::requestIdsFromNet()
{ {
RsStackMutex stack(mIdMtx); RsStackMutex stack(mIdMtx);
std::map<RsGxsId, std::list<RsPeerId> >::const_iterator cit; if(!mNes)
std::map<RsPeerId, std::list<RsGxsId> > requests; {
std::cerr << "(WW) cannot request missing GXS IDs because network service is not present." << std::endl;
return ;
}
// transform to appropriate structure (<peer, std::list<RsGxsId> > map) to make request to nes std::map<RsGxsId, std::list<RsPeerId> >::iterator cit;
for(cit = mIdsNotPresent.begin(); cit != mIdsNotPresent.end(); ++cit) std::map<RsPeerId, std::list<RsGxsId> > requests;
{
{ // Transform to appropriate structure (<peer, std::list<RsGxsId> > map) to make request to nes per peer ID
// Only delete entries in mIdsNotPresent that can actually be performed.
for(cit = mIdsNotPresent.begin(); cit != mIdsNotPresent.end();)
{
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
std::cerr << "p3IdService::requestIdsFromNet() Id not found, deferring for net request: "; std::cerr << "p3IdService::requestIdsFromNet() Id not found, deferring for net request: " << cit->first << std::endl;
std::cerr << cit->first; #endif
std::cerr << std::endl;
#endif // DEBUG_IDS
} const std::list<RsPeerId>& peers = cit->second;
std::list<RsPeerId>::const_iterator cit2;
const std::list<RsPeerId>& peers = cit->second; bool request_can_proceed = false ;
std::list<RsPeerId>::const_iterator cit2;
for(cit2 = peers.begin(); cit2 != peers.end(); ++cit2)
requests[*cit2].push_back(cit->first);
}
std::map<RsPeerId, std::list<RsGxsId> >::const_iterator cit2; for(cit2 = peers.begin(); cit2 != peers.end(); ++cit2)
if(rsPeers->isOnline(*cit2)) // make sure that the peer in online, so that we know that the request has some chance to succeed.
for(cit2 = requests.begin(); cit2 != requests.end(); ++cit2)
{
if(mNes)
{ {
std::list<RsGxsId>::const_iterator gxs_id_it = cit2->second.begin(); requests[*cit2].push_back(cit->first);
std::list<RsGxsGroupId> grpIds; request_can_proceed = true ;
for(; gxs_id_it != cit2->second.end(); ++gxs_id_it) #ifdef DEBUG_IDS
grpIds.push_back(RsGxsGroupId(gxs_id_it->toStdString())); std::cerr << " will ask ID " << cit->first << " to peer ID " << *cit2 << std::endl;
#endif
mNes->requestGrp(grpIds, cit2->first);
} }
if(request_can_proceed)
{
std::map<RsGxsId, std::list<RsPeerId> >::iterator tmp(cit);
++tmp ;
mIdsNotPresent.erase(cit) ;
cit = tmp ;
}
else
{
std::cerr << "(EE) empty online peers list in ID request for groupId " << cit->first << ". This is not going to work! Keeping it until peers show up."<< std::endl;
++cit ;
}
}
for(std::map<RsPeerId, std::list<RsGxsId> >::const_iterator cit2(requests.begin()); cit2 != requests.end(); ++cit2)
{
std::list<RsGxsId>::const_iterator gxs_id_it = cit2->second.begin();
std::list<RsGxsGroupId> grpIds;
for(; gxs_id_it != cit2->second.end(); ++gxs_id_it)
{
#ifdef DEBUG_IDS
std::cerr << " asking ID " << *gxs_id_it << " to peer ID " << cit2->first << std::endl;
#endif
grpIds.push_back(RsGxsGroupId(*gxs_id_it));
} }
mIdsNotPresent.clear(); mNes->requestGrp(grpIds, cit2->first);
}
} }
bool p3IdService::cache_update_if_cached(const RsGxsId &id, std::string serviceString) bool p3IdService::cache_update_if_cached(const RsGxsId &id, std::string serviceString)
@ -2373,7 +2429,7 @@ bool p3IdService::cachetest_handlerequest(uint32_t token)
/* try the cache! */ /* try the cache! */
if (!haveKey(*vit)) if (!haveKey(*vit))
{ {
std::list<PeerId> nullpeers; std::list<RsPeerId> nullpeers;
requestKey(*vit, nullpeers); requestKey(*vit, nullpeers);
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
@ -2606,7 +2662,7 @@ RsGenExchange::ServiceCreate_Return p3IdService::service_CreateGroup(RsGxsGrpIte
// } // }
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
std::cerr << "p3IdService::service_CreateGroup() for : " << item->mMeta.mGroupId; std::cerr << "p3IdService::service_CreateGroup() for : " << item->meta.mGroupId;
std::cerr << std::endl; std::cerr << std::endl;
std::cerr << "p3IdService::service_CreateGroup() Alt GroupId : " << item->meta.mGroupId; std::cerr << "p3IdService::service_CreateGroup() Alt GroupId : " << item->meta.mGroupId;
std::cerr << std::endl; std::cerr << std::endl;
@ -2653,7 +2709,7 @@ RsGenExchange::ServiceCreate_Return p3IdService::service_CreateGroup(RsGxsGrpIte
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
std::cerr << "p3IdService::service_CreateGroup() Calculated PgpIdHash : " << item->group.mPgpIdHash; std::cerr << "p3IdService::service_CreateGroup() Calculated PgpIdHash : " << item->mPgpIdHash;
std::cerr << std::endl; std::cerr << std::endl;
#endif // DEBUG_IDS #endif // DEBUG_IDS
@ -3015,7 +3071,7 @@ bool p3IdService::checkId(const RsGxsIdGroup &grp, RsPgpId &pgpId,bool& error)
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
std::string esign ; std::string esign ;
Radix64::encode((char *) grp.mPgpIdSign.c_str(), grp.mPgpIdSign.length(),esign) ; Radix64::encode((unsigned char *) grp.mPgpIdSign.c_str(), grp.mPgpIdSign.length(),esign) ;
std::cerr << "Checking group signature " << esign << std::endl; std::cerr << "Checking group signature " << esign << std::endl;
#endif #endif
RsPgpId issuer_id ; RsPgpId issuer_id ;

View file

@ -298,7 +298,7 @@ public:
virtual bool getKey(const RsGxsId &id, RsTlvPublicRSAKey &key); virtual bool getKey(const RsGxsId &id, RsTlvPublicRSAKey &key);
virtual bool getPrivateKey(const RsGxsId &id, RsTlvPrivateRSAKey &key); virtual bool getPrivateKey(const RsGxsId &id, RsTlvPrivateRSAKey &key);
virtual bool requestKey(const RsGxsId &id, const std::list<PeerId> &peers); virtual bool requestKey(const RsGxsId &id, const std::list<RsPeerId> &peers);
virtual bool requestPrivateKey(const RsGxsId &id); virtual bool requestPrivateKey(const RsGxsId &id);
@ -343,7 +343,7 @@ private:
*/ */
int cache_tick(); int cache_tick();
bool cache_request_load(const RsGxsId &id, const std::list<PeerId>& peers = std::list<PeerId>()); bool cache_request_load(const RsGxsId &id, const std::list<RsPeerId>& peers = std::list<RsPeerId>());
bool cache_start_load(); bool cache_start_load();
bool cache_load_for_token(uint32_t token); bool cache_load_for_token(uint32_t token);
@ -351,7 +351,8 @@ private:
bool cache_update_if_cached(const RsGxsId &id, std::string serviceString); bool cache_update_if_cached(const RsGxsId &id, std::string serviceString);
bool isPendingNetworkRequest(const RsGxsId& gxsId); bool isPendingNetworkRequest(const RsGxsId& gxsId);
void requestIdsFromNet(); void locked_mergeIdsToRequestFromNet(const RsGxsId& id,const std::list<RsPeerId>& peers);
void requestIdsFromNet();
// Mutex protected. // Mutex protected.