added distant data access in GxsNetService

This commit is contained in:
csoler 2018-04-04 21:41:21 +02:00
parent 8fe3eb711d
commit 7d561bcceb
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
4 changed files with 120 additions and 43 deletions

View File

@ -336,7 +336,6 @@ static std::string nice_time_stamp(time_t now,time_t TS)
}
}
static std::ostream& gxsnetdebug(const RsPeerId& peer_id,const RsGxsGroupId& grp_id,uint32_t service_type)
{
static nullstream null ;
@ -611,7 +610,7 @@ void RsGxsNetService::syncWithPeers()
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_(*sit) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global group TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) to himself" << std::endl;
#endif
sendItem(grp);
generic_sendItem(grp);
}
if(!mAllowMsgSync)
@ -727,7 +726,7 @@ void RsGxsNetService::syncWithPeers()
#ifdef NXS_NET_DEBUG_7
GXSNETDEBUG_PG(*sit,grpId) << " Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself - in clear " << std::endl;
#endif
sendItem(msg);
generic_sendItem(msg);
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_PG(*sit,grpId) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself" << std::endl;
@ -738,6 +737,28 @@ void RsGxsNetService::syncWithPeers()
#endif
}
void RsGxsNetService::generic_sendItem(RsNxsItem *si)
{
// check if the item is to be sent to a distant peer or not
if(mAllowDistSync && mGxsNetTunnel->isDistantPeer( static_cast<RsGxsNetTunnelVirtualPeerId>(si->PeerId())))
{
RsNxsSerialiser ser(mServType);
uint32_t size = ser.size(si);
unsigned char *mem = (unsigned char *)rs_malloc(size) ;
if(!mem)
return ;
ser.serialise(si,mem,&size) ;
mGxsNetTunnel->sendData(mem,size,static_cast<RsGxsNetTunnelVirtualPeerId>(si->PeerId()));
}
else
sendItem(si) ;
}
void RsGxsNetService::checkDistantSyncState()
{
if(!mAllowDistSync)
@ -757,8 +778,6 @@ void RsGxsNetService::checkDistantSyncState()
std::set<RsPeerId> online_peers;
mNetMgr->getOnlineList(mServiceInfo.mServiceType , online_peers);
uint16_t service_id = ((mServiceInfo.mServiceType >> 8)& 0xffff);
RS_STACK_MUTEX(mNxsMutex) ;
for(auto it(grpMeta.begin());it!=grpMeta.end();++it)
@ -779,14 +798,14 @@ void RsGxsNetService::checkDistantSyncState()
if(at_least_one_friend_is_supplier)
{
mGxsNetTunnel->releasePeers(service_id,grpId);
mGxsNetTunnel->releasePeers(mServType,grpId);
#ifdef NXS_NET_DEBUG_8
GXSNETDEBUG___<< " Group " << grpId << ": suppliers among friends. Releasing peers." << std::endl;
#endif
}
else
{
mGxsNetTunnel->requestPeers(service_id,grpId);
mGxsNetTunnel->requestPeers(mServType,grpId);
#ifdef NXS_NET_DEBUG_8
GXSNETDEBUG___<< " Group " << grpId << ": no suppliers among friends. Requesting peers." << std::endl;
#endif
@ -856,7 +875,7 @@ void RsGxsNetService::syncGrpStatistics()
grs->grpId = it->first ;
grs->PeerId(peer_id) ;
sendItem(grs) ;
generic_sendItem(grs) ;
}
}
}
@ -937,7 +956,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " sending back statistics item with " << vec.size() << " elements." << std::endl;
#endif
sendItem(grs_resp) ;
generic_sendItem(grs_resp) ;
}
else if(grs->request_type == RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_RESPONSE)
{
@ -1638,11 +1657,30 @@ RsSerialiser *RsGxsNetService::setupSerialiser()
return rss;
}
RsItem *RsGxsNetService::generic_recvItem()
{
{
RsItem *item ;
if(NULL != (item=recvItem()))
return item ;
}
unsigned char *data = NULL ;
uint32_t size = 0 ;
RsGxsNetTunnelVirtualPeerId virtual_peer_id ;
if(mGxsNetTunnel->receiveData(mServType,data,size,virtual_peer_id))
return dynamic_cast<RsNxsItem*>(RsNxsSerialiser(mServType).deserialise(data,&size)) ;
return NULL ;
}
void RsGxsNetService::recvNxsItemQueue()
{
RsItem *item ;
while(NULL != (item=recvItem()))
while(NULL != (item=generic_recvItem()))
{
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_P_(item->PeerId()) << "Received RsGxsNetService Item:" << (void*)item << " type=" << std::hex << item->PacketId() << std::dec << std::endl ;
@ -2243,7 +2281,7 @@ void RsGxsNetService::processTransactions()
lit_end = tr->mItems.end();
for(; lit != lit_end; ++lit){
sendItem(*lit);
generic_sendItem(*lit);
}
tr->mItems.clear(); // clear so they don't get deleted in trans cleaning
@ -2352,7 +2390,7 @@ void RsGxsNetService::processTransactions()
trans->transactFlag = RsNxsTransacItem::FLAG_END_SUCCESS;
trans->transactionNumber = transN;
trans->PeerId(tr->mTransaction->PeerId());
sendItem(trans);
generic_sendItem(trans);
// move to completed transactions
@ -2395,7 +2433,7 @@ void RsGxsNetService::processTransactions()
(tr->mTransaction->transactFlag & RsNxsTransacItem::FLAG_TYPE_MASK);
trans->transactionNumber = transN;
trans->PeerId(tr->mTransaction->PeerId());
sendItem(trans);
generic_sendItem(trans);
tr->mFlag = NxsTransaction::FLAG_STATE_RECEIVING;
}
@ -2772,7 +2810,7 @@ void RsGxsNetService::locked_pushMsgTransactionFromList(std::list<RsNxsItem*>& r
newTrans->mTransaction->PeerId(mOwnId);
if (locked_addTransaction(newTrans))
sendItem(transac);
generic_sendItem(transac);
else
{
delete newTrans;
@ -3068,7 +3106,7 @@ void RsGxsNetService::locked_pushGrpTransactionFromList( std::list<RsNxsItem*>&
newTrans->mTransaction->PeerId(mOwnId);
if (locked_addTransaction(newTrans))
sendItem(transac);
generic_sendItem(transac);
else
{
delete newTrans;
@ -3272,8 +3310,8 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
ntr->PeerId(tr->mTransaction->PeerId());
if(locked_addTransaction(newTr))
sendItem(ntr);
else
generic_sendItem(ntr);
else
{
delete ntr ;
delete newTr;
@ -3567,7 +3605,7 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
ntr->PeerId(tr->mTransaction->PeerId());
if(locked_addTransaction(newTr))
sendItem(ntr);
generic_sendItem(ntr);
else
{
delete ntr ;
@ -3886,7 +3924,7 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
<< peer << " with " << respList.size() << " groups " << std::endl;
#endif
if(locked_addTransaction(tr))
sendItem(trItem);
generic_sendItem(trItem);
else
{
delete tr ;
@ -4411,7 +4449,7 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, c
#endif
// signal peer to prepare for transaction
if(locked_addTransaction(tr))
sendItem(trItem);
generic_sendItem(trItem);
else
{
delete tr ;
@ -4798,7 +4836,7 @@ void RsGxsNetService::sharePublishKeysPending()
publishKeyItem->private_key = publishKey ;
publishKeyItem->PeerId(*it);
sendItem(publishKeyItem);
generic_sendItem(publishKeyItem);
#ifdef NXS_NET_DEBUG_3
GXSNETDEBUG_PG(*it,grpMeta->mGroupId) << " sent key item to " << *it << std::endl;
#endif

View File

@ -495,6 +495,9 @@ private:
void cleanRejectedMessages();
void processObserverNotifications();
void generic_sendItem(RsNxsItem *si);
RsItem *generic_recvItem();
private:
static void locked_checkDelay(uint32_t& time_in_secs);

View File

@ -38,17 +38,6 @@
RsGxsNetTunnelService::RsGxsNetTunnelService(): mGxsNetTunnelMtx("GxsNetTunnel") {}
//===========================================================================================================================================//
// Internal structures //
//===========================================================================================================================================//
RsGxsNetTunnelVirtualPeerInfo::~RsGxsNetTunnelVirtualPeerInfo()
{
for(auto it(providing_set.begin());it!=providing_set.end();++it)
for(auto it2(it->second.incoming_data.begin());it2!=it->second.incoming_data.end();++it2)
delete *it2 ;
}
//===========================================================================================================================================//
// Transport Items //
//===========================================================================================================================================//
@ -140,6 +129,40 @@ RsGxsNetTunnelService::~RsGxsNetTunnelService()
mGroups.clear();
mHandledHashes.clear();
mVirtualPeers.clear();
mIncomingData.clear();
}
bool RsGxsNetTunnelService::isDistantPeer(const RsGxsNetTunnelVirtualPeerId& virtual_peer)
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
return mVirtualPeers.find(virtual_peer) != mVirtualPeers.end();
}
bool RsGxsNetTunnelService::receiveData(uint16_t service_id,unsigned char *& data,uint32_t& data_len,RsGxsNetTunnelVirtualPeerId& virtual_peer)
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
std::list<std::pair<RsGxsNetTunnelVirtualPeerId,RsTlvBinaryData*> >& lst(mIncomingData[service_id]) ;
if(lst.empty())
{
data = NULL;
data_len = 0;
return false ;
}
data = (unsigned char*)lst.front().second->bin_data ;
data_len = lst.front().second->bin_len ;
virtual_peer = lst.front().first;
lst.front().second->bin_data = NULL ; // avoids deletion
lst.front().second->bin_len = 0 ; // avoids deletion
delete lst.front().second;
lst.pop_front();
return true;
}
bool RsGxsNetTunnelService::sendData(unsigned char *& data,uint32_t data_len,const RsGxsNetTunnelVirtualPeerId& virtual_peer)
@ -297,12 +320,12 @@ void RsGxsNetTunnelService::dump() const
for(auto it(mVirtualPeers.begin());it!=mVirtualPeers.end();++it)
{
std::cerr << " GXS Peer:" << it->first << " Turtle:" << it->second.turtle_virtual_peer_id
<< " status: " << vpid_status_str[it->second.vpid_status] << " s: "
<< " status: " << vpid_status_str[it->second.vpid_status] << " direction: "
<< (int)it->second.side << " last seen " << time(NULL)-it->second.last_contact
<< " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE) << std::endl;
<< " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE,10) << std::endl;
for(auto it2(it->second.providing_set.begin());it2!=it->second.providing_set.end();++it2)
std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups, " << it2->second.incoming_data.size() << " data" << std::endl;
std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups" << std::endl;
}
std::cerr << "Hashes: " << std::endl;
@ -422,8 +445,6 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
return;
}
RsGxsNetTunnelVirtualPeerInfo& vp_info(it2->second) ;
uint16_t service_id = getRsItemService(getRsItemId(data)) ;
#ifdef DEBUG_RSGXSNETTUNNEL
@ -437,7 +458,7 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
bind->bin_len = data_size;
bind->bin_data = data;
vp_info.providing_set[service_id].incoming_data.push_back(bind) ;
mIncomingData[service_id].push_back(std::make_pair(gxs_vpid,bind)) ;
}
void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const TurtleVirtualPeerId& vpid,RsTurtleGenericTunnelItem::Direction dir)
@ -459,6 +480,7 @@ void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const Tur
RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ;
ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_VPIDS_AVAILABLE ;
ginfo.virtual_peers.insert(vpid);
uint8_t encryption_master_key[RS_GXS_TUNNEL_CONST_EKEY_SIZE];

View File

@ -113,7 +113,7 @@ struct RsGxsNetTunnelVirtualPeerInfo
};
RsGxsNetTunnelVirtualPeerInfo() : vpid_status(RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN), last_contact(0),side(0) { memset(encryption_master_key,0,32) ; }
~RsGxsNetTunnelVirtualPeerInfo() ;
virtual ~RsGxsNetTunnelVirtualPeerInfo(){}
uint8_t vpid_status ; // status of the peer
time_t last_contact ; // last time some data was sent/recvd
@ -186,12 +186,24 @@ public:
bool sendData(unsigned char *& data, uint32_t data_len, const RsGxsNetTunnelVirtualPeerId& virtual_peer) ;
/*!
* \brief receivedItem
* returns the next received item from the given virtual peer.
* \param virtual_peer
* \brief receiveData
* returns the next piece of data received fro the given service, and the virtual GXS peer that sended it.
* \param service_id service that provide the data
* \param data memory check containing the data. Memory ownership belongs to the client.
* \param data_len length of memory chunk
* \param virtual_peer peer who sent the data
* \return
* true if something is returned. If not, data is set to NULL, data_len to 0.
*/
RsItem *receivedItem(const RsGxsNetTunnelVirtualPeerId& virtual_peer) ;
bool receiveData(uint16_t service_id,unsigned char *& data,uint32_t& data_len,RsGxsNetTunnelVirtualPeerId& virtual_peer) ;
/*!
* \brief isDistantPeer
* returns wether the peer is in the list of available distant peers or not
* \return true if the peer is a distant GXS peer.
*/
bool isDistantPeer(const RsGxsNetTunnelVirtualPeerId& virtual_peer) ;
/*!
* \brief dumps all information about monitored groups.
@ -234,6 +246,8 @@ private:
std::list<std::pair<TurtleVirtualPeerId,RsTurtleGenericDataItem*> > mPendingTurtleItems ; // items that need to be sent off-turtle Mutex.
std::map<uint16_t,std::list<std::pair<RsGxsNetTunnelVirtualPeerId,RsTlvBinaryData *> > > mIncomingData; // list of incoming data items, per service.
/*!
* \brief Generates the hash to request tunnels for this group. This hash is only used by turtle, and is used to
* hide the real group id.