From 7d561bccebb56989842eb9bb923d939ea807c418 Mon Sep 17 00:00:00 2001 From: csoler Date: Wed, 4 Apr 2018 21:41:21 +0200 Subject: [PATCH] added distant data access in GxsNetService --- libretroshare/src/gxs/rsgxsnetservice.cc | 80 +++++++++++++++++------- libretroshare/src/gxs/rsgxsnetservice.h | 3 + libretroshare/src/gxs/rsgxsnettunnel.cc | 56 ++++++++++++----- libretroshare/src/gxs/rsgxsnettunnel.h | 24 +++++-- 4 files changed, 120 insertions(+), 43 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index e86d325de..cd68ea6bf 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -336,7 +336,6 @@ static std::string nice_time_stamp(time_t now,time_t TS) } } - static std::ostream& gxsnetdebug(const RsPeerId& peer_id,const RsGxsGroupId& grp_id,uint32_t service_type) { static nullstream null ; @@ -611,7 +610,7 @@ void RsGxsNetService::syncWithPeers() #ifdef NXS_NET_DEBUG_5 GXSNETDEBUG_P_(*sit) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global group TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) to himself" << std::endl; #endif - sendItem(grp); + generic_sendItem(grp); } if(!mAllowMsgSync) @@ -727,7 +726,7 @@ void RsGxsNetService::syncWithPeers() #ifdef NXS_NET_DEBUG_7 GXSNETDEBUG_PG(*sit,grpId) << " Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself - in clear " << std::endl; #endif - sendItem(msg); + generic_sendItem(msg); #ifdef NXS_NET_DEBUG_5 GXSNETDEBUG_PG(*sit,grpId) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself" << std::endl; @@ -738,6 +737,28 @@ void RsGxsNetService::syncWithPeers() #endif } +void RsGxsNetService::generic_sendItem(RsNxsItem *si) +{ + // check if the item is to be sent to a distant peer or not + + if(mAllowDistSync && mGxsNetTunnel->isDistantPeer( static_cast(si->PeerId()))) + { + RsNxsSerialiser ser(mServType); + + uint32_t size = ser.size(si); + unsigned char *mem = (unsigned char *)rs_malloc(size) ; + + if(!mem) + return ; + + ser.serialise(si,mem,&size) ; + + mGxsNetTunnel->sendData(mem,size,static_cast(si->PeerId())); + } + else + sendItem(si) ; +} + void RsGxsNetService::checkDistantSyncState() { if(!mAllowDistSync) @@ -757,8 +778,6 @@ void RsGxsNetService::checkDistantSyncState() std::set online_peers; mNetMgr->getOnlineList(mServiceInfo.mServiceType , online_peers); - uint16_t service_id = ((mServiceInfo.mServiceType >> 8)& 0xffff); - RS_STACK_MUTEX(mNxsMutex) ; for(auto it(grpMeta.begin());it!=grpMeta.end();++it) @@ -779,14 +798,14 @@ void RsGxsNetService::checkDistantSyncState() if(at_least_one_friend_is_supplier) { - mGxsNetTunnel->releasePeers(service_id,grpId); + mGxsNetTunnel->releasePeers(mServType,grpId); #ifdef NXS_NET_DEBUG_8 GXSNETDEBUG___<< " Group " << grpId << ": suppliers among friends. Releasing peers." << std::endl; #endif } else { - mGxsNetTunnel->requestPeers(service_id,grpId); + mGxsNetTunnel->requestPeers(mServType,grpId); #ifdef NXS_NET_DEBUG_8 GXSNETDEBUG___<< " Group " << grpId << ": no suppliers among friends. Requesting peers." << std::endl; #endif @@ -856,7 +875,7 @@ void RsGxsNetService::syncGrpStatistics() grs->grpId = it->first ; grs->PeerId(peer_id) ; - sendItem(grs) ; + generic_sendItem(grs) ; } } } @@ -937,7 +956,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs) GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " sending back statistics item with " << vec.size() << " elements." << std::endl; #endif - sendItem(grs_resp) ; + generic_sendItem(grs_resp) ; } else if(grs->request_type == RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_RESPONSE) { @@ -1638,11 +1657,30 @@ RsSerialiser *RsGxsNetService::setupSerialiser() return rss; } +RsItem *RsGxsNetService::generic_recvItem() +{ + { + RsItem *item ; + + if(NULL != (item=recvItem())) + return item ; + } + + unsigned char *data = NULL ; + uint32_t size = 0 ; + RsGxsNetTunnelVirtualPeerId virtual_peer_id ; + + if(mGxsNetTunnel->receiveData(mServType,data,size,virtual_peer_id)) + return dynamic_cast(RsNxsSerialiser(mServType).deserialise(data,&size)) ; + + return NULL ; +} + void RsGxsNetService::recvNxsItemQueue() { RsItem *item ; - while(NULL != (item=recvItem())) + while(NULL != (item=generic_recvItem())) { #ifdef NXS_NET_DEBUG_1 GXSNETDEBUG_P_(item->PeerId()) << "Received RsGxsNetService Item:" << (void*)item << " type=" << std::hex << item->PacketId() << std::dec << std::endl ; @@ -2243,7 +2281,7 @@ void RsGxsNetService::processTransactions() lit_end = tr->mItems.end(); for(; lit != lit_end; ++lit){ - sendItem(*lit); + generic_sendItem(*lit); } tr->mItems.clear(); // clear so they don't get deleted in trans cleaning @@ -2352,7 +2390,7 @@ void RsGxsNetService::processTransactions() trans->transactFlag = RsNxsTransacItem::FLAG_END_SUCCESS; trans->transactionNumber = transN; trans->PeerId(tr->mTransaction->PeerId()); - sendItem(trans); + generic_sendItem(trans); // move to completed transactions @@ -2395,7 +2433,7 @@ void RsGxsNetService::processTransactions() (tr->mTransaction->transactFlag & RsNxsTransacItem::FLAG_TYPE_MASK); trans->transactionNumber = transN; trans->PeerId(tr->mTransaction->PeerId()); - sendItem(trans); + generic_sendItem(trans); tr->mFlag = NxsTransaction::FLAG_STATE_RECEIVING; } @@ -2772,7 +2810,7 @@ void RsGxsNetService::locked_pushMsgTransactionFromList(std::list& r newTrans->mTransaction->PeerId(mOwnId); if (locked_addTransaction(newTrans)) - sendItem(transac); + generic_sendItem(transac); else { delete newTrans; @@ -3068,7 +3106,7 @@ void RsGxsNetService::locked_pushGrpTransactionFromList( std::list& newTrans->mTransaction->PeerId(mOwnId); if (locked_addTransaction(newTrans)) - sendItem(transac); + generic_sendItem(transac); else { delete newTrans; @@ -3272,8 +3310,8 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) ntr->PeerId(tr->mTransaction->PeerId()); if(locked_addTransaction(newTr)) - sendItem(ntr); - else + generic_sendItem(ntr); + else { delete ntr ; delete newTr; @@ -3567,7 +3605,7 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr) ntr->PeerId(tr->mTransaction->PeerId()); if(locked_addTransaction(newTr)) - sendItem(ntr); + generic_sendItem(ntr); else { delete ntr ; @@ -3886,7 +3924,7 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list& respList << peer << " with " << respList.size() << " groups " << std::endl; #endif if(locked_addTransaction(tr)) - sendItem(trItem); + generic_sendItem(trItem); else { delete tr ; @@ -4411,7 +4449,7 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list& itemL, c #endif // signal peer to prepare for transaction if(locked_addTransaction(tr)) - sendItem(trItem); + generic_sendItem(trItem); else { delete tr ; @@ -4798,7 +4836,7 @@ void RsGxsNetService::sharePublishKeysPending() publishKeyItem->private_key = publishKey ; publishKeyItem->PeerId(*it); - sendItem(publishKeyItem); + generic_sendItem(publishKeyItem); #ifdef NXS_NET_DEBUG_3 GXSNETDEBUG_PG(*it,grpMeta->mGroupId) << " sent key item to " << *it << std::endl; #endif diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index e798f8717..aac6c474a 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -495,6 +495,9 @@ private: void cleanRejectedMessages(); void processObserverNotifications(); + void generic_sendItem(RsNxsItem *si); + RsItem *generic_recvItem(); + private: static void locked_checkDelay(uint32_t& time_in_secs); diff --git a/libretroshare/src/gxs/rsgxsnettunnel.cc b/libretroshare/src/gxs/rsgxsnettunnel.cc index 3d0f25a15..65ae72111 100644 --- a/libretroshare/src/gxs/rsgxsnettunnel.cc +++ b/libretroshare/src/gxs/rsgxsnettunnel.cc @@ -38,17 +38,6 @@ RsGxsNetTunnelService::RsGxsNetTunnelService(): mGxsNetTunnelMtx("GxsNetTunnel") {} -//===========================================================================================================================================// -// Internal structures // -//===========================================================================================================================================// - -RsGxsNetTunnelVirtualPeerInfo::~RsGxsNetTunnelVirtualPeerInfo() -{ - for(auto it(providing_set.begin());it!=providing_set.end();++it) - for(auto it2(it->second.incoming_data.begin());it2!=it->second.incoming_data.end();++it2) - delete *it2 ; -} - //===========================================================================================================================================// // Transport Items // //===========================================================================================================================================// @@ -140,6 +129,40 @@ RsGxsNetTunnelService::~RsGxsNetTunnelService() mGroups.clear(); mHandledHashes.clear(); mVirtualPeers.clear(); + mIncomingData.clear(); +} + +bool RsGxsNetTunnelService::isDistantPeer(const RsGxsNetTunnelVirtualPeerId& virtual_peer) +{ + RS_STACK_MUTEX(mGxsNetTunnelMtx); + + return mVirtualPeers.find(virtual_peer) != mVirtualPeers.end(); +} + +bool RsGxsNetTunnelService::receiveData(uint16_t service_id,unsigned char *& data,uint32_t& data_len,RsGxsNetTunnelVirtualPeerId& virtual_peer) +{ + RS_STACK_MUTEX(mGxsNetTunnelMtx); + + std::list >& lst(mIncomingData[service_id]) ; + + if(lst.empty()) + { + data = NULL; + data_len = 0; + return false ; + } + + data = (unsigned char*)lst.front().second->bin_data ; + data_len = lst.front().second->bin_len ; + virtual_peer = lst.front().first; + + lst.front().second->bin_data = NULL ; // avoids deletion + lst.front().second->bin_len = 0 ; // avoids deletion + + delete lst.front().second; + lst.pop_front(); + + return true; } bool RsGxsNetTunnelService::sendData(unsigned char *& data,uint32_t data_len,const RsGxsNetTunnelVirtualPeerId& virtual_peer) @@ -297,12 +320,12 @@ void RsGxsNetTunnelService::dump() const for(auto it(mVirtualPeers.begin());it!=mVirtualPeers.end();++it) { std::cerr << " GXS Peer:" << it->first << " Turtle:" << it->second.turtle_virtual_peer_id - << " status: " << vpid_status_str[it->second.vpid_status] << " s: " + << " status: " << vpid_status_str[it->second.vpid_status] << " direction: " << (int)it->second.side << " last seen " << time(NULL)-it->second.last_contact - << " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE) << std::endl; + << " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE,10) << std::endl; for(auto it2(it->second.providing_set.begin());it2!=it->second.providing_set.end();++it2) - std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups, " << it2->second.incoming_data.size() << " data" << std::endl; + std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups" << std::endl; } std::cerr << "Hashes: " << std::endl; @@ -422,8 +445,6 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co return; } - RsGxsNetTunnelVirtualPeerInfo& vp_info(it2->second) ; - uint16_t service_id = getRsItemService(getRsItemId(data)) ; #ifdef DEBUG_RSGXSNETTUNNEL @@ -437,7 +458,7 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co bind->bin_len = data_size; bind->bin_data = data; - vp_info.providing_set[service_id].incoming_data.push_back(bind) ; + mIncomingData[service_id].push_back(std::make_pair(gxs_vpid,bind)) ; } void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const TurtleVirtualPeerId& vpid,RsTurtleGenericTunnelItem::Direction dir) @@ -459,6 +480,7 @@ void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const Tur RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ; ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_VPIDS_AVAILABLE ; + ginfo.virtual_peers.insert(vpid); uint8_t encryption_master_key[RS_GXS_TUNNEL_CONST_EKEY_SIZE]; diff --git a/libretroshare/src/gxs/rsgxsnettunnel.h b/libretroshare/src/gxs/rsgxsnettunnel.h index 3d6910d14..7543689a2 100644 --- a/libretroshare/src/gxs/rsgxsnettunnel.h +++ b/libretroshare/src/gxs/rsgxsnettunnel.h @@ -113,7 +113,7 @@ struct RsGxsNetTunnelVirtualPeerInfo }; RsGxsNetTunnelVirtualPeerInfo() : vpid_status(RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN), last_contact(0),side(0) { memset(encryption_master_key,0,32) ; } - ~RsGxsNetTunnelVirtualPeerInfo() ; + virtual ~RsGxsNetTunnelVirtualPeerInfo(){} uint8_t vpid_status ; // status of the peer time_t last_contact ; // last time some data was sent/recvd @@ -186,12 +186,24 @@ public: bool sendData(unsigned char *& data, uint32_t data_len, const RsGxsNetTunnelVirtualPeerId& virtual_peer) ; /*! - * \brief receivedItem - * returns the next received item from the given virtual peer. - * \param virtual_peer + * \brief receiveData + * returns the next piece of data received fro the given service, and the virtual GXS peer that sended it. + * \param service_id service that provide the data + * \param data memory check containing the data. Memory ownership belongs to the client. + * \param data_len length of memory chunk + * \param virtual_peer peer who sent the data * \return + * true if something is returned. If not, data is set to NULL, data_len to 0. */ - RsItem *receivedItem(const RsGxsNetTunnelVirtualPeerId& virtual_peer) ; + bool receiveData(uint16_t service_id,unsigned char *& data,uint32_t& data_len,RsGxsNetTunnelVirtualPeerId& virtual_peer) ; + + /*! + * \brief isDistantPeer + * returns wether the peer is in the list of available distant peers or not + * \return true if the peer is a distant GXS peer. + */ + + bool isDistantPeer(const RsGxsNetTunnelVirtualPeerId& virtual_peer) ; /*! * \brief dumps all information about monitored groups. @@ -234,6 +246,8 @@ private: std::list > mPendingTurtleItems ; // items that need to be sent off-turtle Mutex. + std::map > > mIncomingData; // list of incoming data items, per service. + /*! * \brief Generates the hash to request tunnels for this group. This hash is only used by turtle, and is used to * hide the real group id.