fixed data incoming/outgoing in GxsNetTunnel

This commit is contained in:
csoler 2018-03-26 23:19:29 +02:00
parent 5566d90f32
commit b488760d7d
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
2 changed files with 110 additions and 37 deletions

View File

@ -37,6 +37,19 @@
RsGxsNetTunnelService::RsGxsNetTunnelService(): mGxsNetTunnelMtx("GxsNetTunnel") {}
//===========================================================================================================================================//
// Internal structures //
//===========================================================================================================================================//
RsGxsNetTunnelVirtualPeerInfo::~RsGxsNetTunnelVirtualPeerInfo()
{
for(auto it(outgoing_items.begin());it!=outgoing_items.end();++it)
delete *it ;
for(auto it(incoming_data.begin());it!=incoming_data.end();++it)
delete *it ;
}
//===========================================================================================================================================//
// Transport Items //
//===========================================================================================================================================//
@ -267,7 +280,7 @@ void RsGxsNetTunnelService::dump() const
std::cerr << " turtle:" << it2->first << " status: " << vpid_status_str[it2->second.vpid_status] << " s: "
<< (int)it2->second.side << " last seen " << time(NULL)-it2->second.last_contact
<< " ekey: " << RsUtil::BinToHex(it2->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE)
<< " pending (" << it2->second.incoming_items.size() << "," << it2->second.outgoing_items.size() << ")" << std::endl;
<< " pending (" << it2->second.incoming_data.size() << "," << it2->second.outgoing_items.size() << ")" << std::endl;
}
std::cerr << "Virtual peers: " << std::endl;
@ -303,7 +316,9 @@ bool RsGxsNetTunnelService::handleTunnelRequest(const RsFileHash &hash,const RsP
void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction)
{
GXS_NET_TUNNEL_NOT_IMPLEMENTED();
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " received turtle data for vpid " << virtual_peer_id << " for hash " << hash << " in direction " << (int)direction << std::endl;
#endif
if(item->PacketSubType() != RS_TURTLE_SUBTYPE_GENERIC_DATA)
{
@ -358,10 +373,42 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
return ;
}
RsGxsNetTunnelItem *decrypted_item = dynamic_cast<RsGxsNetTunnelItem*>(RsGxsNetTunnelSerializer().deserialise(data,&data_size)) ;
free(data);
// Now we might get 2 kinds of items: GxsNetTunnel items, to be handled here, and Gxs data items, to be handled by the client service
handleIncoming(decrypted_item);
RsItem *decrypted_item = RsGxsNetTunnelSerializer().deserialise(data,&data_size);
RsGxsNetTunnelVirtualPeerItem *pid_item = dynamic_cast<RsGxsNetTunnelVirtualPeerItem*>(decrypted_item) ;
if(pid_item)
{
if(direction == RsTurtleGenericTunnelItem::DIRECTION_SERVER)
{
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " item is a virtual peer id item with vpid = "<< pid_item->virtual_peer_id << ". Setting virtual peer." << std::endl;
#endif
vp_info.net_service_virtual_peer = pid_item->virtual_peer_id;
vp_info.vpid_status = RsGxsNetTunnelVirtualPeerInfo::RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ;
}
else
GXS_NET_TUNNEL_ERROR() << "Cannot decrypt data!" << std::endl;
free(data);
return ;
}
else
{
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " item is GXS data. Storing into incoming list." << std::endl;
#endif
// push the data into the service incoming data list
RsTlvBinaryData *bind = new RsTlvBinaryData;
bind->tlvtype = 0;
bind->bin_len = data_size;
bind->bin_data = data;
vp_info.incoming_data.push_back(bind) ;
}
}
void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const TurtleVirtualPeerId& vpid,RsTurtleGenericTunnelItem::Direction dir)
@ -374,6 +421,9 @@ void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const Tur
return ;
}
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " adding virtual peer " << vpid << " for hash " << hash << " in direction " << dir << std::endl;
#endif
const RsGxsGroupId group_id(it->second) ;
RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ;
@ -393,6 +443,9 @@ void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const Tur
{
vpinfo.net_service_virtual_peer = makeServerVirtualPeerIdForGroup(group_id);
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " peer is server side: sending back virtual peer name " << vpinfo.net_service_virtual_peer << std::endl;
#endif
RsGxsNetTunnelVirtualPeerItem *pitem = new RsGxsNetTunnelVirtualPeerItem ;
pitem->virtual_peer_id = vpinfo.net_service_virtual_peer ;
@ -405,6 +458,9 @@ void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const Tur
void RsGxsNetTunnelService::removeVirtualPeer(const TurtleFileHash& hash, const TurtleVirtualPeerId& vpid)
{
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " removing virtual peer " << vpid << " for hash " << hash << std::endl;
#endif
auto it = mHandledHashes.find(hash) ;
if(it == mHandledHashes.end())
@ -420,7 +476,13 @@ void RsGxsNetTunnelService::removeVirtualPeer(const TurtleFileHash& hash, const
ginfo.virtual_peers.erase(vpid);
if(ginfo.virtual_peers.empty())
{
ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_TUNNELS_REQUESTED ;
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " no more virtual peers for group " << group_id << ": setting status to TUNNELS_REQUESTED" << std::endl;
#endif
}
}
RsFileHash RsGxsNetTunnelService::calculateGroupHash(const RsGxsGroupId& group_id) const
@ -454,41 +516,51 @@ void RsGxsNetTunnelService::data_tick()
// cleanup
autowash();
static time_t last_dump = time(NULL);
time_t now = time(NULL);
if(last_dump + 10 > now)
{
last_dump = now;
dump();
}
}
void RsGxsNetTunnelService::autowash()
{
}
#ifdef TODO
void RsGxsNetTunnelService::handleIncomingItem(const RsGxsTunnelId& tunnel_id,RsGxsTunnelItem *item)
{
if(item == NULL)
return ;
// We have 3 things to do:
//
// 1 - if it's a data item, send an ACK
// 2 - if it's an ack item, mark the item as properly received, and remove it from the queue
// 3 - if it's a status item, act accordingly.
switch(item->PacketSubType())
{
case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA: handleRecvTunnelDataItem(tunnel_id,dynamic_cast<RsGxsTunnelDataItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA_ACK: handleRecvTunnelDataAckItem(tunnel_id,dynamic_cast<RsGxsTunnelDataAckItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GXS_TUNNEL_STATUS: handleRecvStatusItem(tunnel_id,dynamic_cast<RsGxsTunnelStatusItem*>(item)) ;
break ;
default:
std::cerr << "(EE) impossible situation. DH items should be handled at the service level" << std::endl;
}
delete item ;
}
#endif
// void RsGxsNetTunnelService::handleIncoming(const RsGxsTunnelId& tunnel_id,RsGxsTunnelItem *item)
// {
// #ifdef DEBUG_RSGXSNETTUNNEL
// GXS_NET_TUNNEL_DEBUG() << " received turtle data for vpid " << virtual_peer_id << " for hash " << hash << " in direction " << dir << std::endl;
// #endif
// if(item == NULL)
// return ;
//
// // We have 3 things to do:
// //
// // 1 - if it's a data item, send an ACK
// // 2 - if it's an ack item, mark the item as properly received, and remove it from the queue
// // 3 - if it's a status item, act accordingly.
//
// switch(item->PacketSubType())
// {
//
// case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA: handleRecvTunnelDataItem(tunnel_id,dynamic_cast<RsGxsTunnelDataItem*>(item)) ;
// break ;
//
// case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA_ACK: handleRecvTunnelDataAckItem(tunnel_id,dynamic_cast<RsGxsTunnelDataAckItem*>(item)) ;
// break ;
//
// case RS_PKT_SUBTYPE_GXS_TUNNEL_STATUS: handleRecvStatusItem(tunnel_id,dynamic_cast<RsGxsTunnelStatusItem*>(item)) ;
// break ;
//
// default:
// std::cerr << "(EE) impossible situation. DH items should be handled at the service level" << std::endl;
// }
//
// delete item ;
// }

View File

@ -76,6 +76,7 @@ struct RsGxsNetTunnelVirtualPeerInfo
};
RsGxsNetTunnelVirtualPeerInfo() : vpid_status(RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN) { memset(encryption_master_key,0,16) ; }
~RsGxsNetTunnelVirtualPeerInfo() ;
uint8_t vpid_status ; // status of the peer
uint8_t side ; // client/server
@ -85,7 +86,7 @@ struct RsGxsNetTunnelVirtualPeerInfo
RsGxsNetTunnelVirtualPeerId net_service_virtual_peer ; // anonymised peer that is used to communicate with client services
RsGxsGroupId group_id ; // group id
std::list<RsItem*> incoming_items ;
std::list<RsTlvBinaryData*> incoming_data ;
std::list<RsItem*> outgoing_items ;
};