diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 7962f9bf3..2a17b54d9 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -269,6 +269,7 @@ NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out) NXS_NET_DEBUG_6 group sync statistics (e.g. number of posts at nighbour nodes, etc) NXS_NET_DEBUG_7 encryption/decryption of transactions + NXS_NET_DEBUG_8 gxs distant sync ***/ //#define NXS_NET_DEBUG_0 1 @@ -279,6 +280,7 @@ //#define NXS_NET_DEBUG_5 1 //#define NXS_NET_DEBUG_6 1 //#define NXS_NET_DEBUG_7 1 +#define NXS_NET_DEBUG_8 1 //#define NXS_FRAG @@ -312,11 +314,12 @@ static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_GXS_KEY_MISSING = 0x05 ; // Debug system to allow to print only for some IDs (group, Peer, etc) #if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) \ - || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) || defined(NXS_NET_DEBUG_7) + || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) || defined(NXS_NET_DEBUG_7) \ + || defined(NXS_NET_DEBUG_8) static const RsPeerId peer_to_print = RsPeerId(std::string("")) ; static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("")) ; // use this to allow to this group id only, or "" for all IDs -static const uint32_t service_to_print = RS_SERVICE_TYPE_GXS_TRANS ; // use this to allow to this service id only, or 0 for all services +static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_CHANNELS ; // use this to allow to this service id only, or 0 for all services // warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums) class nullstream: public std::ostream {}; @@ -448,6 +451,7 @@ int RsGxsNetService::tick() { syncWithPeers(); syncGrpStatistics(); + checkDistantSyncState(); mSyncTs = now; } @@ -566,7 +570,6 @@ void RsGxsNetService::syncWithPeers() std::set peers; mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); -#ifdef TODO if(mAllowDistSync) { // Grab all online virtual peers of distant tunnels for the current service. @@ -575,9 +578,8 @@ void RsGxsNetService::syncWithPeers() mGxsNetTunnel->getVirtualPeers(mServType,vpids); for(auto it(vpids.begin());it!=vpids.end();++it) - peers.push_back(RsPeerId(*it)) ; + peers.insert(RsPeerId(*it)) ; } -#endif if (peers.empty()) { // nothing to do @@ -735,6 +737,62 @@ void RsGxsNetService::syncWithPeers() #endif } +void RsGxsNetService::checkDistantSyncState() +{ + if(!mAllowDistSync) + return ; + + RsGxsGrpMetaTemporaryMap grpMeta; + mDataStore->retrieveGxsGrpMetaData(grpMeta); + + // Go through group statistics and groups without information are re-requested to random peers selected + // among the ones who provided the group info. + +#ifdef NXS_NET_DEBUG_8 + GXSNETDEBUG___<< "Checking distant sync for all groups." << std::endl; +#endif + // get the list of online peers + + std::set online_peers; + mNetMgr->getOnlineList(mServiceInfo.mServiceType , online_peers); + + uint16_t service_id = ((mServiceInfo.mServiceType >> 8)& 0xffff); + + RS_STACK_MUTEX(mNxsMutex) ; + + for(auto it(grpMeta.begin());it!=grpMeta.end();++it) + if(it->second->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) // we only consider subscribed groups here. + { +#warning (cyril) We might need to also remove peers for recently unsubscribed groups + const RsGxsGroupId& grpId(it->first); + const RsGxsGrpConfig& rec = locked_getGrpConfig(grpId) ; + +#ifdef NXS_NET_DEBUG_6 + GXSNETDEBUG__G(it->first) << " group " << grpId; +#endif + bool at_least_one_friend_is_supplier = false ; + + for(auto it2(rec.suppliers.ids.begin());it2!=rec.suppliers.ids.end() && !at_least_one_friend_is_supplier;++it2) + if(online_peers.find(*it2) != online_peers.end()) // check that the peer is online + at_least_one_friend_is_supplier = true ; + + if(at_least_one_friend_is_supplier) + { + mGxsNetTunnel->releasePeers(service_id,grpId); +#ifdef NXS_NET_DEBUG_8 + GXSNETDEBUG___<< " Group " << grpId << ": suppliers among friends. Releasing peers." << std::endl; +#endif + } + else + { + mGxsNetTunnel->requestPeers(service_id,grpId); +#ifdef NXS_NET_DEBUG_8 + GXSNETDEBUG___<< " Group " << grpId << ": no suppliers among friends. Requesting peers." << std::endl; +#endif + } + } +} + void RsGxsNetService::syncGrpStatistics() { RS_STACK_MUTEX(mNxsMutex) ; @@ -763,44 +821,44 @@ void RsGxsNetService::syncGrpStatistics() #endif if(rec.statistics_update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.ids.size() > 0) - { + { #ifdef NXS_NET_DEBUG_6 - GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl; + GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl; #endif - // randomly select GROUP_STATS_UPDATE_NB_PEERS friends among the suppliers of this group + // randomly select GROUP_STATS_UPDATE_NB_PEERS friends among the suppliers of this group - uint32_t n = RSRandom::random_u32() % rec.suppliers.ids.size() ; + uint32_t n = RSRandom::random_u32() % rec.suppliers.ids.size() ; - std::set::const_iterator rit = rec.suppliers.ids.begin(); - for(uint32_t i=0;i::const_iterator rit = rec.suppliers.ids.begin(); + for(uint32_t i=0;ifirst) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl; + GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl; #endif - RsNxsSyncGrpStatsItem *grs = new RsNxsSyncGrpStatsItem(mServType) ; + RsNxsSyncGrpStatsItem *grs = new RsNxsSyncGrpStatsItem(mServType) ; - grs->request_type = RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_REQUEST ; + grs->request_type = RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_REQUEST ; - grs->grpId = it->first ; - grs->PeerId(peer_id) ; + grs->grpId = it->first ; + grs->PeerId(peer_id) ; - sendItem(grs) ; + sendItem(grs) ; + } } - } - } + } #ifdef NXS_NET_DEBUG_6 else GXSNETDEBUG__G(it->first) << " up to date." << std::endl; diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 49d44dab2..8142f2e74 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -35,6 +35,7 @@ #include "pqi/p3linkmgr.h" #include "rsitems/rsnxsitems.h" #include "rsitems/rsgxsupdateitems.h" +#include "rsgxsnettunnel.h" #include "rsgxsnetutils.h" #include "pqi/p3cfgmgr.h" #include "rsgixs.h" @@ -394,6 +395,7 @@ private: void locked_pushGrpRespFromList(std::list& respList, const RsPeerId& peer, const uint32_t& transN); void locked_pushMsgRespFromList(std::list& itemL, const RsPeerId& sslId, const RsGxsGroupId &grp_id, const uint32_t& transN); + void checkDistantSyncState(); void syncWithPeers(); void syncGrpStatistics(); void addGroupItemToList(NxsTransaction*& tr, @@ -582,6 +584,8 @@ private: uint32_t mDefaultMsgStorePeriod ; uint32_t mDefaultMsgSyncPeriod ; + + RsGxsNetTunnelService *mGxsNetTunnel; }; #endif // RSGXSNETSERVICE_H diff --git a/libretroshare/src/gxs/rsgxsnettunnel.cc b/libretroshare/src/gxs/rsgxsnettunnel.cc index d5628eed3..c502f2c6d 100644 --- a/libretroshare/src/gxs/rsgxsnettunnel.cc +++ b/libretroshare/src/gxs/rsgxsnettunnel.cc @@ -193,14 +193,14 @@ bool RsGxsNetTunnelService::getVirtualPeers(uint16_t service_id, std::listsecond.providing_set.find(service_id) != it->second.providing_set.end()) peers.push_back(it->first) ; -#ifdef DEBUG_GXS_TUNNEL +#ifdef DEBUG_RSGXSNETTUNNEL GXS_NET_TUNNEL_DEBUG() << " service " << std::hex << service_id << std::dec << " returning " << peers.size() << " peers." << std::endl; #endif return true ; } -bool RsGxsNetTunnelService::requestPeers(const RsGxsGroupId& group_id) +bool RsGxsNetTunnelService::requestPeers(uint16_t service_id,const RsGxsGroupId& group_id) { RS_STACK_MUTEX(mGxsNetTunnelMtx); @@ -211,11 +211,14 @@ bool RsGxsNetTunnelService::requestPeers(const RsGxsGroupId& group_id) ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE; // we dont set the group policy here. It will only be set if no peers, or too few peers are available. +#ifdef DEBUG_RSGXSNETTUNNEL + GXS_NET_TUNNEL_DEBUG() << " service " << std::hex << service_id << std::dec << " requesting peers for group " << group_id << std::endl; +#endif return true; } -bool RsGxsNetTunnelService::releasePeers(const RsGxsGroupId& group_id) +bool RsGxsNetTunnelService::releasePeers(uint16_t service_id, const RsGxsGroupId& group_id) { RS_STACK_MUTEX(mGxsNetTunnelMtx); @@ -226,6 +229,9 @@ bool RsGxsNetTunnelService::releasePeers(const RsGxsGroupId& group_id) ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE; mTurtle->stopMonitoringTunnels(ginfo.hash) ; +#ifdef DEBUG_RSGXSNETTUNNEL + GXS_NET_TUNNEL_DEBUG() << " service " << std::hex << service_id << std::dec << " releasing peers for group " << group_id << std::endl; +#endif return true; } @@ -329,14 +335,14 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co } // find the group id - auto it = mHandledHashes.find(hash) ; + auto it4 = mHandledHashes.find(hash) ; - if(it == mHandledHashes.end()) + if(it4 == mHandledHashes.end()) { GXS_NET_TUNNEL_ERROR() << "Cannot find hash " << hash << " to be handled by GxsNetTunnel" << std::endl; return; } - RsGxsGroupId group_id = it->second; + RsGxsGroupId group_id = it4->second; // Now check if we got an item to advertise a virtual peer @@ -367,21 +373,33 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co #ifdef DEBUG_RSGXSNETTUNNEL GXS_NET_TUNNEL_DEBUG() << " item is a virtual peer id item with vpid = "<< pid_item->virtual_peer_id << ". Setting virtual peer." << std::endl; #endif -#ifdef TODO - vp_info.net_service_virtual_peer = pid_item->virtual_peer_id; - vp_info.vpid_status = RsGxsNetTunnelVirtualPeerInfo::RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ; -#endif + // we receive a virtual peer id, so we need to update the local information for this peer id + + mTurtle2GxsPeer[turtle_virtual_peer_id] = pid_item->virtual_peer_id ; + + RsGxsNetTunnelVirtualPeerInfo& vp_info(mVirtualPeers[pid_item->virtual_peer_id]) ; + + vp_info.vpid_status = RsGxsNetTunnelVirtualPeerInfo::RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ; // status of the peer + vp_info.side = direction; // client/server + vp_info.last_contact = time(NULL); // last time some data was sent/recvd + + memcpy(vp_info.encryption_master_key,encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE); + + vp_info.turtle_virtual_peer_id = turtle_virtual_peer_id; // turtle peer to use when sending data to this vpid. free(data); return ; } + delete decrypted_item ; + + // item is a generic data item for the client. Let's store the data in the appropriate incoming data queue. -#ifdef TODO auto it = mTurtle2GxsPeer.find(turtle_virtual_peer_id) ; if(it == mTurtle2GxsPeer.end()) { GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for vpid " << turtle_virtual_peer_id << " but this vpid is unknown!" << std::endl; + free(data); return; } @@ -391,28 +409,27 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co if(it2 == mVirtualPeers.end()) { - GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for GXS vpid " << gxs_vpid " but the virtual peer id is missing!" << std::endl; + GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for GXS vpid " << gxs_vpid << " but the virtual peer id is missing!" << std::endl; + free(data); return; } RsGxsNetTunnelVirtualPeerInfo& vp_info(it2->second) ; + uint16_t service_id = getRsItemService(getRsItemId(data)) ; - else - { #ifdef DEBUG_RSGXSNETTUNNEL - GXS_NET_TUNNEL_DEBUG() << " item is GXS data. Storing into incoming list." << std::endl; + GXS_NET_TUNNEL_DEBUG() << "item contains generic data for service " << std::hex << service_id << std::dec << " for VPID " << gxs_vpid << ". Storing in incoming list" << std::endl; #endif - // push the data into the service incoming data list - RsTlvBinaryData *bind = new RsTlvBinaryData; - bind->tlvtype = 0; - bind->bin_len = data_size; - bind->bin_data = data; + // push the data into the service incoming data list - vp_info.incoming_data.push_back(bind) ; - } -#endif + RsTlvBinaryData *bind = new RsTlvBinaryData; + bind->tlvtype = 0; + bind->bin_len = data_size; + bind->bin_data = data; + + vp_info.providing_set[service_id].incoming_data.push_back(bind) ; } void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const TurtleVirtualPeerId& vpid,RsTurtleGenericTunnelItem::Direction dir) diff --git a/libretroshare/src/gxs/rsgxsnettunnel.h b/libretroshare/src/gxs/rsgxsnettunnel.h index a8b9e701b..451f5fb9c 100644 --- a/libretroshare/src/gxs/rsgxsnettunnel.h +++ b/libretroshare/src/gxs/rsgxsnettunnel.h @@ -87,6 +87,13 @@ // Therefore, virtual peers are stored separately from groups, because each one can sync multiple groups. // // * virtual peers are also shared among services. This reduces the required amount of tunnels and tunnel requests to send. +// +// +// How do we know that a group needs distant sync? +// * look into GrpConfigMap for suppliers. Suppliers is cleared at load. +// * last_update_TS in GrpConfigMap is randomised so it cannot be used +// * we need a way to know that there's no suppliers for good reasons (not that we just started) +// * typedef RsPeerId RsGxsNetTunnelVirtualPeerId ; @@ -105,16 +112,15 @@ struct RsGxsNetTunnelVirtualPeerInfo RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE = 0x02 // virtual peer id is known. Data can transfer. }; - RsGxsNetTunnelVirtualPeerInfo() : vpid_status(RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN) { memset(encryption_master_key,0,32) ; } + RsGxsNetTunnelVirtualPeerInfo() : vpid_status(RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN), last_contact(0),side(0) { memset(encryption_master_key,0,32) ; } ~RsGxsNetTunnelVirtualPeerInfo() ; uint8_t vpid_status ; // status of the peer - uint8_t side ; // client/server - uint8_t encryption_master_key[32] ; // key from which the encryption key is derived for each virtual peer (using H(master_key | random IV)) time_t last_contact ; // last time some data was sent/recvd + uint8_t side ; // client/server + uint8_t encryption_master_key[32]; TurtleVirtualPeerId turtle_virtual_peer_id ; // turtle peer to use when sending data to this vpid. - RsGxsGroupId group_id ; // group id std::map providing_set; // partial list of groups provided by this virtual peer id, based on tunnel results, for each service }; @@ -155,13 +161,13 @@ public: * \brief Manage tunnels for this group * @param group_id group for which tunnels should be released */ - bool requestPeers(const RsGxsGroupId&group_id) ; + bool requestPeers(uint16_t service_id, const RsGxsGroupId&group_id) ; /*! * \brief Stop managing tunnels for this group * @param group_id group for which tunnels should be released */ - bool releasePeers(const RsGxsGroupId&group_id) ; + bool releasePeers(uint16_t service_id,const RsGxsGroupId&group_id) ; /*! * \brief Get the list of active virtual peers for a given group. This implies that a tunnel is up and diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index 0d9300202..17314626e 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1386,10 +1386,10 @@ int RsServer::StartupRetroShare() // create GXS photo service RsGxsNetService* gxschannels_ns = new RsGxsNetService( - RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr, - mGxsChannels, mGxsChannels->getServiceInfo(), - mReputations, mGxsCircles,mGxsIdService, - pgpAuxUtils); + RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr, + mGxsChannels, mGxsChannels->getServiceInfo(), + mReputations, mGxsCircles,mGxsIdService, + pgpAuxUtils,true,true,true); mGxsChannels->setNetworkExchangeService(gxschannels_ns) ;