improved GXS dist sync item handling

This commit is contained in:
csoler 2018-04-06 15:26:54 +02:00
parent 7d561bcceb
commit 2b9139bf85
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
3 changed files with 65 additions and 36 deletions

View File

@ -1670,8 +1670,15 @@ RsItem *RsGxsNetService::generic_recvItem()
uint32_t size = 0 ;
RsGxsNetTunnelVirtualPeerId virtual_peer_id ;
if(mGxsNetTunnel->receiveData(mServType,data,size,virtual_peer_id))
return dynamic_cast<RsNxsItem*>(RsNxsSerialiser(mServType).deserialise(data,&size)) ;
if(mAllowDistSync && mGxsNetTunnel->receiveData(mServType,data,size,virtual_peer_id))
{
RsItem *item = dynamic_cast<RsNxsItem*>(RsNxsSerialiser(mServType).deserialise(data,&size)) ;
item->PeerId(virtual_peer_id) ;
free(data) ;
return item ;
}
return NULL ;
}

View File

@ -129,6 +129,11 @@ RsGxsNetTunnelService::~RsGxsNetTunnelService()
mGroups.clear();
mHandledHashes.clear();
mVirtualPeers.clear();
for(auto it(mIncomingData.begin());it!=mIncomingData.end();++it)
for(auto it2((*it).second.begin());it2!=(*it).second.end();++it2)
delete (*it2).second;
mIncomingData.clear();
}
@ -234,6 +239,7 @@ bool RsGxsNetTunnelService::requestPeers(uint16_t service_id,const RsGxsGroupId&
ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE;
ginfo.hash = calculateGroupHash(group_id) ;
ginfo.service_id = service_id;
mHandledHashes[ginfo.hash] = group_id ;
@ -255,6 +261,7 @@ bool RsGxsNetTunnelService::releasePeers(uint16_t service_id, const RsGxsGroupId
ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE;
ginfo.hash = calculateGroupHash(group_id) ;
ginfo.service_id = service_id;
mHandledHashes[ginfo.hash] = group_id ; // yes, we do not remove, because we're supposed to answer tunnel requests from other peers.
@ -331,6 +338,11 @@ void RsGxsNetTunnelService::dump() const
std::cerr << "Hashes: " << std::endl;
for(auto it(mHandledHashes.begin());it!=mHandledHashes.end();++it)
std::cerr << " hash: " << it->first << " GroupId: " << it->second << std::endl;
std::cerr << "Incoming data: " << std::endl;
for(auto it(mIncomingData.begin());it!=mIncomingData.end();++it)
for(auto it2(it->second.begin());it2!=it->second.end();++it2)
std::cerr << " service " << std::hex << it->first << std::dec << " peer id " << it2->first << " " << (void*)it2->second << std::endl;
}
//===========================================================================================================================================//
@ -396,13 +408,23 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
return ;
}
RsItem *decrypted_item = RsGxsNetTunnelSerializer().deserialise(data,&data_size);
RsGxsNetTunnelVirtualPeerItem *pid_item = dynamic_cast<RsGxsNetTunnelVirtualPeerItem*>(decrypted_item) ;
if(pid_item)
if(getRsItemService(getRsItemId(data)) == RS_SERVICE_TYPE_GXS_NET_TUNNEL)
{
RsItem *decrypted_item = RsGxsNetTunnelSerializer().deserialise(data,&data_size);
RsGxsNetTunnelVirtualPeerItem *pid_item = dynamic_cast<RsGxsNetTunnelVirtualPeerItem*>(decrypted_item) ;
if(!pid_item)
{
delete decrypted_item ;
return ;
}
uint16_t service_id = mGroups[group_id].service_id ;
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " item is a virtual peer id item with vpid = "<< pid_item->virtual_peer_id << ". Setting virtual peer." << std::endl;
GXS_NET_TUNNEL_DEBUG() << " item is a virtual peer id item with vpid = "<< pid_item->virtual_peer_id
<< " for group " << group_id << " in service " << std::hex << service_id << std::dec
<< ". Setting virtual peer." << std::endl;
#endif
// we receive a virtual peer id, so we need to update the local information for this peer id
@ -413,52 +435,53 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
vp_info.vpid_status = RsGxsNetTunnelVirtualPeerInfo::RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ; // status of the peer
vp_info.side = direction; // client/server
vp_info.last_contact = time(NULL); // last time some data was sent/recvd
vp_info.providing_set[service_id].provided_groups.insert(group_id);
memcpy(vp_info.encryption_master_key,encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE);
vp_info.turtle_virtual_peer_id = turtle_virtual_peer_id; // turtle peer to use when sending data to this vpid.
free(data);
return ;
}
delete decrypted_item ;
// item is a generic data item for the client. Let's store the data in the appropriate incoming data queue.
auto it = mTurtle2GxsPeer.find(turtle_virtual_peer_id) ;
if(it == mTurtle2GxsPeer.end())
else
{
GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for vpid " << turtle_virtual_peer_id << " but this vpid is unknown!" << std::endl;
free(data);
return;
}
// item is a generic data item for the client. Let's store the data in the appropriate incoming data queue.
RsGxsNetTunnelVirtualPeerId gxs_vpid = it->second ;
auto it = mTurtle2GxsPeer.find(turtle_virtual_peer_id) ;
auto it2 = mVirtualPeers.find(gxs_vpid) ;
if(it == mTurtle2GxsPeer.end())
{
GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for vpid " << turtle_virtual_peer_id << " but this vpid is unknown!" << std::endl;
free(data);
return;
}
if(it2 == mVirtualPeers.end())
{
GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for GXS vpid " << gxs_vpid << " but the virtual peer id is missing!" << std::endl;
free(data);
return;
}
RsGxsNetTunnelVirtualPeerId gxs_vpid = it->second ;
uint16_t service_id = getRsItemService(getRsItemId(data)) ;
auto it2 = mVirtualPeers.find(gxs_vpid) ;
if(it2 == mVirtualPeers.end())
{
GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for GXS vpid " << gxs_vpid << " but the virtual peer id is missing!" << std::endl;
free(data);
return;
}
uint16_t service_id = getRsItemService(getRsItemId(data)) ;
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << "item contains generic data for service " << std::hex << service_id << std::dec << " for VPID " << gxs_vpid << ". Storing in incoming list" << std::endl;
GXS_NET_TUNNEL_DEBUG() << "item contains generic data for service " << std::hex << service_id << std::dec << " for VPID " << gxs_vpid << ". Storing in incoming list" << std::endl;
#endif
// push the data into the service incoming data list
// push the data into the service incoming data list
RsTlvBinaryData *bind = new RsTlvBinaryData;
bind->tlvtype = 0;
bind->bin_len = data_size;
bind->bin_data = data;
RsTlvBinaryData *bind = new RsTlvBinaryData;
bind->tlvtype = 0;
bind->bin_len = data_size;
bind->bin_data = data;
mIncomingData[service_id].push_back(std::make_pair(gxs_vpid,bind)) ;
mIncomingData[service_id].push_back(std::make_pair(gxs_vpid,bind)) ;
}
}
void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const TurtleVirtualPeerId& vpid,RsTurtleGenericTunnelItem::Direction dir)

View File

@ -102,7 +102,6 @@ class RsGxsNetTunnelItem ;
struct RsGxsNetTunnelVirtualPeerProvidingSet
{
std::set<RsGxsGroupId> provided_groups ;
std::list<RsTlvBinaryData*> incoming_data ;
};
struct RsGxsNetTunnelVirtualPeerInfo