mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-01-15 09:27:09 -05:00
re-enabled load/save pending items.
git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-NewGRouterModel@7853 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
f1990276c3
commit
15fd4d787a
@ -186,10 +186,18 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI
|
||||
|
||||
RsGRouterRoutingInfoItem *item = new RsGRouterRoutingInfoItem() ;
|
||||
|
||||
RsPeerId peer_id ;
|
||||
ok &= peer_id.deserialise(data, pktsize, offset) ;
|
||||
item->PeerId(peer_id) ;
|
||||
|
||||
ok &= getRawUInt32(data, pktsize, &offset, &item->data_status);
|
||||
ok &= getRawUInt32(data, pktsize, &offset, &item->tunnel_status);
|
||||
ok &= getRawTimeT(data, pktsize, &offset, item->received_time_TS);
|
||||
ok &= getRawTimeT(data, pktsize, &offset, item->last_sent_TS);
|
||||
ok &= getRawUInt32(data, pktsize, &offset, &item->sending_attempts);
|
||||
|
||||
ok &= getRawUInt32(data, pktsize, &offset, &item->client_id);
|
||||
ok &= item->tunnel_hash.deserialise(data, pktsize, offset) ;
|
||||
|
||||
item->data_item = deserialise_RsGRouterGenericDataItem(&((uint8_t*)data)[offset],pktsize - offset) ;
|
||||
if(item->data_item != NULL)
|
||||
@ -197,7 +205,12 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI
|
||||
else
|
||||
ok = false ;
|
||||
|
||||
item->destination_key = item->data_item->destination_key ;
|
||||
item->receipt_item = deserialise_RsGRouterSignedReceiptItem(&((uint8_t*)data)[offset],pktsize - offset) ;
|
||||
if(item->receipt_item != NULL)
|
||||
offset += item->receipt_item->serial_size() ;
|
||||
else
|
||||
ok = false ;
|
||||
|
||||
|
||||
if (offset != rssize || !ok)
|
||||
{
|
||||
@ -510,15 +523,22 @@ uint32_t RsGRouterMatrixFriendListItem::serial_size() const
|
||||
}
|
||||
uint32_t RsGRouterRoutingInfoItem::serial_size() const
|
||||
{
|
||||
uint32_t s = 8 ; // header
|
||||
s += 4 ; // status_flags
|
||||
s += 8 ; // received_time
|
||||
s += 8 ; // last_sent
|
||||
s += 4 ; // tried_friends.size() ;
|
||||
s += sizeof(GRouterServiceId) ; // service_id
|
||||
s += data_item->serial_size(); // data_item
|
||||
uint32_t s = 8 ; // header
|
||||
s += PeerId().serial_size() ;
|
||||
s += 4 ; // data status_flags
|
||||
s += 4 ; // tunnel status_flags
|
||||
s += 8 ; // received_time
|
||||
s += 8 ; // last_sent
|
||||
s += 8 ; // last_TR_TS
|
||||
s += 4 ; // sending attempts
|
||||
|
||||
return s ;
|
||||
s += sizeof(GRouterServiceId) ; // service_id
|
||||
s += tunnel_hash.serial_size() ;
|
||||
|
||||
s += data_item->serial_size(); // data_item
|
||||
s += receipt_item->serial_size(); // receipt_item
|
||||
|
||||
return s ;
|
||||
}
|
||||
|
||||
bool RsGRouterMatrixFriendListItem::serialise(void *data,uint32_t& size) const
|
||||
@ -597,15 +617,27 @@ bool RsGRouterRoutingInfoItem::serialise(void *data,uint32_t& size) const
|
||||
if(!serialise_header(data,size,tlvsize,offset))
|
||||
return false ;
|
||||
|
||||
ok &= PeerId().serialise(data, tlvsize, offset) ; // we keep this.
|
||||
ok &= setRawUInt32(data, tlvsize, &offset, data_status) ;
|
||||
ok &= setRawUInt32(data, tlvsize, &offset, tunnel_status) ;
|
||||
ok &= setRawTimeT(data, tlvsize, &offset, received_time_TS) ;
|
||||
ok &= setRawTimeT(data, tlvsize, &offset, last_sent_TS) ;
|
||||
ok &= setRawTimeT(data, tlvsize, &offset, last_tunnel_request_TS) ;
|
||||
ok &= setRawUInt32(data, tlvsize, &offset, sending_attempts) ;
|
||||
|
||||
ok &= setRawUInt32(data, tlvsize, &offset, client_id) ;
|
||||
ok &= tunnel_hash.serialise(data, tlvsize, offset) ;
|
||||
|
||||
uint32_t ns = size - offset ;
|
||||
ok &= data_item->serialise( &((uint8_t*)data)[offset], ns) ;
|
||||
offset += ns ;
|
||||
offset += data_item->serial_size() ;
|
||||
|
||||
if(receipt_item != NULL)
|
||||
{
|
||||
uint32_t ns = size - offset ;
|
||||
ok &= receipt_item->serialise( &((uint8_t*)data)[offset], ns) ;
|
||||
offset += receipt_item->serial_size() ;
|
||||
}
|
||||
if (offset != tlvsize)
|
||||
{
|
||||
ok = false;
|
||||
@ -623,7 +655,7 @@ std::ostream& RsGRouterSignedReceiptItem::print(std::ostream& o, uint16_t)
|
||||
{
|
||||
o << "RsGRouterReceiptItem:" << std::endl ;
|
||||
o << " direct origin: \""<< PeerId() << "\"" << std::endl ;
|
||||
o << " Mid: " << routing_id << std::endl ;
|
||||
o << " Mid: " << std::hex << routing_id << std::dec << std::endl ;
|
||||
o << " State: " << flags << std::endl ;
|
||||
o << " Dest: " << destination_key << std::endl ;
|
||||
o << " Sign: " << signature.keyId << std::endl ;
|
||||
@ -647,15 +679,18 @@ std::ostream& RsGRouterGenericDataItem::print(std::ostream& o, uint16_t)
|
||||
|
||||
std::ostream& RsGRouterRoutingInfoItem::print(std::ostream& o, uint16_t)
|
||||
{
|
||||
o << "RsGRouterRoutingInfoItem:" << std::endl ;
|
||||
o << " direct origin: \""<< PeerId() << "\"" << std::endl ;
|
||||
o << " recv time: "<< received_time_TS << std::endl ;
|
||||
o << " Last sent: "<< last_sent_TS << std::endl ;
|
||||
o << " flags: "<< std::hex << data_status << std::dec << std::endl ;
|
||||
o << " Key 1: "<< destination_key << std::endl ;
|
||||
o << " Key 2: "<< data_item->destination_key << std::endl ;
|
||||
o << " Data size: "<< data_item->data_size << std::endl ;
|
||||
o << " Client id: "<< client_id << std::endl ;
|
||||
o << "RsGRouterRoutingInfoItem:" << std::endl ;
|
||||
o << " direct origin: "<< PeerId() << std::endl ;
|
||||
o << " data status: "<< std::hex<< data_status << std::dec << std::endl ;
|
||||
o << " tunnel status: "<< tunnel_status << std::endl ;
|
||||
o << " recv time: "<< received_time_TS << std::endl ;
|
||||
o << " Last sent: "<< last_sent_TS << std::endl ;
|
||||
o << " Sending attempts:"<< sending_attempts << std::endl ;
|
||||
o << " destination key: "<< data_item->destination_key << std::endl ;
|
||||
o << " Client id: "<< client_id << std::endl ;
|
||||
o << " tunnel hash: "<< tunnel_hash << std::endl ;
|
||||
o << " Data size: "<< data_item->data_size << std::endl ;
|
||||
o << " Signed receipt: "<< (void*)receipt_item << std::endl ;
|
||||
|
||||
return o ;
|
||||
}
|
||||
|
@ -88,7 +88,6 @@ public:
|
||||
time_t last_tunnel_request_TS ; // last time tunnels have been asked for this item.
|
||||
uint32_t sending_attempts ; // number of times tunnels have been asked for this peer without success
|
||||
|
||||
RsGxsId destination_key ; // ultimate destination for this key
|
||||
GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId
|
||||
TurtleFileHash tunnel_hash ; // tunnel hash to be used for this item
|
||||
|
||||
|
@ -205,6 +205,7 @@
|
||||
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
|
||||
static const uint32_t MAX_DELAY_BETWEEN_TWO_SEND = 120 ; // wait for 120 seconds before re-sending.
|
||||
static const uint32_t TUNNEL_OK_WAIT_TIME = 10 ; // wait for 10 seconds after last tunnel ok, so that we have a complete set of tunnels.
|
||||
static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary.
|
||||
|
||||
const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ;
|
||||
|
||||
@ -401,20 +402,26 @@ bool p3GRouter::handleTunnelRequest(const RsFileHash& hash,const RsPeerId& /*pee
|
||||
// - we know the destination and have a route (according to matrix) => accept with high probability
|
||||
// - we don't know the destination => accept with very low probability
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "p3GRouter::handleTunnelRequest(). Got req for hash " << hash << ", responding OK" << std::endl;
|
||||
#endif
|
||||
|
||||
if(_owned_key_ids.find(hash) == _owned_key_ids.end())
|
||||
return false ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " responding ok." << std::endl;
|
||||
#endif
|
||||
return true ;
|
||||
}
|
||||
void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "p3GRouter::receiveTurtleData() " << std::endl;
|
||||
std::cerr << " Received data for hash : " << hash << std::endl;
|
||||
std::cerr << " Virtual peer id : " << virtual_peer_id << std::endl;
|
||||
std::cerr << " Direction : " << direction << std::endl;
|
||||
#endif
|
||||
|
||||
// turtle data is received.
|
||||
// This function
|
||||
@ -428,8 +435,10 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileH
|
||||
std::cerr << " ERROR: item is not a data item. That is an error." << std::endl;
|
||||
return ;
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " data size : " << item->data_size << std::endl;
|
||||
std::cerr << " data bytes : " << RsDirUtil::sha1sum((unsigned char*)item->data_bytes,item->data_size) << std::endl;
|
||||
#endif
|
||||
|
||||
RsGRouterAbstractMsgItem *generic_item = NULL ;
|
||||
|
||||
@ -455,19 +464,25 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileH
|
||||
|
||||
if(chunk_item != NULL)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " item is a transaction item." << std::endl;
|
||||
#endif
|
||||
generic_item = it->second.addDataChunk(virtual_peer_id,chunk_item) ; // addDataChunk takes ownership over chunk_item
|
||||
}
|
||||
else if(NULL != (trans_ack_item = dynamic_cast<RsGRouterTransactionAcknItem*>(itm)))
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " item is a transaction ACK." << std::endl;
|
||||
#endif
|
||||
|
||||
std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.find(trans_ack_item->propagation_id) ;
|
||||
|
||||
if(it != _pending_messages.end())
|
||||
{
|
||||
it->second.data_status = RS_GROUTER_DATA_STATUS_SENT;
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " setting new status as sent/awaiting receipt." << std::endl;
|
||||
#endif
|
||||
}
|
||||
else
|
||||
std::cerr << " ERROR: no routing ID corresponds to this ACK item. Inconsistency!" << std::endl;
|
||||
@ -484,8 +499,10 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileH
|
||||
|
||||
if(generic_item != NULL)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " transaction is finished. Passing newly created item to client." << std::endl;
|
||||
std::cerr << " sending a ACK item" << std::endl;
|
||||
#endif
|
||||
|
||||
RsGRouterTransactionAcknItem ackn_item ;
|
||||
ackn_item.propagation_id = generic_item->routing_id ;
|
||||
@ -603,8 +620,10 @@ void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPee
|
||||
|
||||
// Server side tunnels. This is incoming data. Nothing to do.
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << ", direction=" << dir << std::endl;
|
||||
std::cerr << " direction = " << dir << std::endl;
|
||||
#endif
|
||||
|
||||
// client side. We set the tunnel flags to READY.
|
||||
|
||||
@ -616,7 +635,9 @@ void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPee
|
||||
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
||||
if(it->second.tunnel_hash == hash)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " setting tunnel state to READY." << std::endl;
|
||||
#endif
|
||||
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_READY ;
|
||||
found = true ;
|
||||
break ;
|
||||
@ -632,8 +653,9 @@ void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPee
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " adding VPID." << std::endl;
|
||||
#endif
|
||||
|
||||
_virtual_peers[hash].addVirtualPeer(virtual_peer_id) ;
|
||||
|
||||
@ -642,8 +664,10 @@ void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtual
|
||||
{
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << std::endl;
|
||||
std::cerr << " removing VPID." << std::endl;
|
||||
#endif
|
||||
|
||||
// make sure the VPID exists.
|
||||
|
||||
@ -651,14 +675,16 @@ void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtual
|
||||
|
||||
if(it == _virtual_peers.end())
|
||||
{
|
||||
std::cerr << " no virtual peers at all for this hash! This is a consistency error." << std::endl;
|
||||
std::cerr << " no virtual peers at all for this hash: " << hash << "! This is a consistency error." << std::endl;
|
||||
return ;
|
||||
}
|
||||
it->second.removeVirtualPeer(virtual_peer_id) ;
|
||||
|
||||
if(it->second.virtual_peers.empty())
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " last virtual peer removed. Also deleting hash entry." << std::endl;
|
||||
#endif
|
||||
_virtual_peers.erase(it) ;
|
||||
}
|
||||
}
|
||||
@ -709,11 +735,13 @@ void p3GRouter::handleTunnels()
|
||||
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
if(!_pending_messages.empty())
|
||||
{
|
||||
grouter_debug() << "p3GRouter::handleTunnels()" << std::endl;
|
||||
grouter_debug() << " building priority list of items to send..." << std::endl;
|
||||
}
|
||||
#endif
|
||||
|
||||
static uint32_t send_retry_time_delays[6] = { 0, 1800, 3600, 5*3600, 12*3600, 24*2600 } ;
|
||||
time_t now = time(NULL) ;
|
||||
@ -721,7 +749,9 @@ if(!_pending_messages.empty())
|
||||
|
||||
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " " << std::hex << it->first << std::dec << " data_status=" << it->second.data_status << ", tunnel_status=" << it->second.tunnel_status;
|
||||
#endif
|
||||
|
||||
if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING)
|
||||
{
|
||||
@ -730,7 +760,9 @@ if(!_pending_messages.empty())
|
||||
uint32_t item_delay = now - it->second.last_tunnel_request_TS ;
|
||||
int item_priority = item_delay - send_retry_time_delays[std::min(5u,it->second.sending_attempts)] ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " delay=" << item_delay << " attempts=" << it->second.sending_attempts << ", priority=" << item_priority << std::endl;
|
||||
#endif
|
||||
|
||||
if(item_priority > 0)
|
||||
priority_list.push_back(std::make_pair(item_priority,&it->second)) ;
|
||||
@ -741,20 +773,28 @@ if(!_pending_messages.empty())
|
||||
|
||||
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " stopping tunnels for this message." << std::endl; ;
|
||||
#endif
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
else if(it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY)
|
||||
grouter_debug() << " tunnel is available. " << std::endl;
|
||||
else
|
||||
grouter_debug() << " doing nothing." << std::endl;
|
||||
|
||||
grouter_debug() << std::endl;
|
||||
#endif
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
else
|
||||
std::cerr << " doing nothing." << std::endl;
|
||||
#endif
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
if(!priority_list.empty())
|
||||
grouter_debug() << " sorting..." << std::endl;
|
||||
#endif
|
||||
|
||||
std::sort(priority_list.begin(),priority_list.end()) ;
|
||||
|
||||
@ -762,7 +802,9 @@ if(!_pending_messages.empty())
|
||||
|
||||
for(uint32_t i=0;i<priority_list.size();++i)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " askign tunnel management for msg=" << priority_list[i].first << " hash=" << priority_list[i].second->tunnel_hash << std::endl;
|
||||
#endif
|
||||
|
||||
mTurtle->monitorTunnels(priority_list[i].second->tunnel_hash,this) ;
|
||||
|
||||
@ -780,45 +822,61 @@ void p3GRouter::routePendingObjects()
|
||||
|
||||
time_t now = time(NULL) ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "p3GRouter::routePendingObjects()" << std::endl;
|
||||
#endif
|
||||
|
||||
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it)
|
||||
if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING && it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY
|
||||
&& now > it->second.last_sent_TS + MAX_DELAY_BETWEEN_TWO_SEND)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " routing id: " << std::hex << it->first << std::dec ;
|
||||
#endif
|
||||
|
||||
const TurtleFileHash& hash(it->second.tunnel_hash) ;
|
||||
std::map<TurtleFileHash,GRouterTunnelInfo>::const_iterator vpit ;
|
||||
|
||||
if( (vpit = _virtual_peers.find(hash)) == _virtual_peers.end())
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << ". No virtual peers. Skipping now." << std::endl;
|
||||
#endif
|
||||
continue ;
|
||||
}
|
||||
|
||||
if(vpit->second.last_tunnel_ok_TS + TUNNEL_OK_WAIT_TIME > now)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << ". Still waiting delay (stabilisation)." << std::endl;
|
||||
#endif
|
||||
continue ;
|
||||
}
|
||||
|
||||
// for now, just take one. But in the future, we will need some policy to temporarily store objects at proxy peers, etc.
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " " << vpit->second.virtual_peers.size() << " virtual peers available. " << std::endl;
|
||||
#endif
|
||||
|
||||
if(vpit->second.virtual_peers.empty())
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " no peers available. Cannot send!!" << std::endl;
|
||||
#endif
|
||||
continue ;
|
||||
}
|
||||
TurtleVirtualPeerId vpid = (vpit->second.virtual_peers.begin())->first ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " sending to " << vpid << std::endl;
|
||||
#endif
|
||||
|
||||
sendDataInTunnel(vpid,it->second.data_item) ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " setting last sent time to now" << std::endl;
|
||||
#endif
|
||||
|
||||
it->second.last_sent_TS = now ;
|
||||
}
|
||||
@ -831,9 +889,11 @@ bool p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterAbstra
|
||||
{
|
||||
// split into chunks and send them all into the tunnel.
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "p3GRouter::sendDataInTunnel()" << std::endl;
|
||||
std::cerr << "item dump before send:" << std::endl;
|
||||
item->print(std::cerr, 2) ;
|
||||
#endif
|
||||
|
||||
uint32_t size = item->serial_size();
|
||||
uint8_t *data = (uint8_t*)malloc(size) ;
|
||||
@ -865,7 +925,9 @@ bool p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterAbstra
|
||||
chunk_item->chunk_size = chunk_size ;
|
||||
chunk_item->chunk_data = (uint8_t*)malloc(chunk_size) ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " preparing to send a chunk [" << offset << " -> " << offset + chunk_size << " / " << size << "]" << std::endl;
|
||||
#endif
|
||||
|
||||
if(chunk_item->chunk_data == NULL)
|
||||
{
|
||||
@ -898,7 +960,9 @@ bool p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterAbstra
|
||||
turtle_item->data_size = turtle_data_size ;
|
||||
turtle_item->data_bytes = turtle_data ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " sending to vpid " << vpid << std::endl;
|
||||
#endif
|
||||
mTurtle->sendTurtleData(vpid,turtle_item) ;
|
||||
}
|
||||
|
||||
@ -923,10 +987,11 @@ void p3GRouter::handleIncoming(const TurtleFileHash& hash,RsGRouterAbstractMsgIt
|
||||
|
||||
void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSignedReceiptItem *receipt_item)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "Handling incoming signed receipt item." << std::endl;
|
||||
std::cerr << "Item content:" << std::endl;
|
||||
receipt_item->print(std::cerr,2) ;
|
||||
|
||||
#endif
|
||||
|
||||
// Because we don't do proxy-transmission yet, the client needs to be notified. Otherwise, we will need to
|
||||
// first check if we're a proxy or not. We also remove the message from the global router sending list.
|
||||
@ -948,17 +1013,20 @@ void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSi
|
||||
std::cerr << " checking receipt hash : FAILED. Receipt is dropped." << std::endl;
|
||||
return ;
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
else
|
||||
std::cerr << " checking receipt hash : OK" << std::endl;
|
||||
#endif
|
||||
|
||||
if(! verifySignedDataItem(receipt_item))
|
||||
{
|
||||
std::cerr << " checking receipt signature : FAILED. Receipt is dropped." << std::endl;
|
||||
return ;
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " checking receipt signature : OK. " << std::endl;
|
||||
|
||||
std::cerr << " removing messsage from cache." << std::endl;
|
||||
#endif
|
||||
|
||||
delete it->second.data_item ;
|
||||
//delete it->second.receipt_item ;
|
||||
@ -967,7 +1035,9 @@ void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSi
|
||||
//it->second.data_status = RS_GROUTER_DATA_STATUS_RECEIPT_OK;
|
||||
//it->second.receipt_item = signed_receipt_item ;
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " notifying client that the msg was received." << std::endl;
|
||||
#endif
|
||||
|
||||
GRouterClientService *client = NULL ;
|
||||
GRouterServiceId service_id = 0;
|
||||
@ -977,17 +1047,21 @@ void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSi
|
||||
std::cerr << " ERROR: cannot find client service for this hash/key combination." << std::endl;
|
||||
return ;
|
||||
}
|
||||
std::cerr << " retrieved client " << (void*)client << ", service_id=" << service_id << std::endl;
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " retrieved client " << (void*)client << ", service_id=" << std::hex << service_id << std::dec << std::endl;
|
||||
std::cerr << " acknowledging client for data received" << std::endl;
|
||||
#endif
|
||||
|
||||
client->acknowledgeDataReceived(receipt_item->routing_id) ;
|
||||
}
|
||||
|
||||
void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGenericDataItem *generic_item)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "Handling incoming data item. Passing to client." << std::endl;
|
||||
std::cerr << "Item content:" << std::endl;
|
||||
generic_item->print(std::cerr,2) ;
|
||||
#endif
|
||||
|
||||
GRouterClientService *client = NULL ;
|
||||
GRouterServiceId service_id = 0;
|
||||
@ -1002,9 +1076,15 @@ void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGener
|
||||
// The item's signature must be checked, and the item needs to be decrypted.
|
||||
|
||||
if(verifySignedDataItem(generic_item)) // we should get proper flags out of this
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " verifying item signature: CHECKED!" ;
|
||||
#endif
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
else
|
||||
std::cerr << " verifying item signature: FAILED!" ;
|
||||
#endif
|
||||
|
||||
// compute the hash before decryption.
|
||||
|
||||
@ -1015,8 +1095,10 @@ void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGener
|
||||
std::cerr << " decrypting item : FAILED! Item will be dropped." << std::endl;
|
||||
return ;
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
else
|
||||
std::cerr << " decrypting item : OK!" << std::endl;
|
||||
#endif
|
||||
|
||||
// make a copy of the data, since the item will be deleted.
|
||||
|
||||
@ -1033,14 +1115,18 @@ void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGener
|
||||
receipt_item->destination_key = generic_item->signature.keyId ;
|
||||
receipt_item->flags = 0 ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " preparing signed receipt." << std::endl;
|
||||
#endif
|
||||
|
||||
if(!signDataItem(receipt_item,generic_item->destination_key))
|
||||
{
|
||||
std::cerr << " signing: FAILED. Receipt dropped. ERROR." << std::endl;
|
||||
return ;
|
||||
}
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " signing: OK." << std::endl;
|
||||
#endif
|
||||
|
||||
// Normally (proxy mode) we should store the signed receipt so that it can be sent back, and handle it
|
||||
// in the routePendingObjects() method.
|
||||
@ -1052,7 +1138,9 @@ void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGener
|
||||
return ;
|
||||
}
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " sent signed receipt in tunnel " << generic_item->PeerId() << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
bool p3GRouter::getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id)
|
||||
@ -1069,19 +1157,21 @@ bool p3GRouter::getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId&
|
||||
return false;
|
||||
}
|
||||
|
||||
RS_STACK_MUTEX (grMtx) ;
|
||||
|
||||
// now find the client given its id.
|
||||
|
||||
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(service_id) ;
|
||||
|
||||
if(its == _registered_services.end())
|
||||
{
|
||||
std::cerr << " ERROR: client id " << service_id << " not registered. Consistency error." << std::endl;
|
||||
return false;
|
||||
}
|
||||
RS_STACK_MUTEX (grMtx) ;
|
||||
|
||||
client = its->second ;
|
||||
// now find the client given its id.
|
||||
|
||||
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(service_id) ;
|
||||
|
||||
if(its == _registered_services.end())
|
||||
{
|
||||
std::cerr << " ERROR: client id " << service_id << " not registered. Consistency error." << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
client = its->second ;
|
||||
}
|
||||
|
||||
return true ;
|
||||
}
|
||||
@ -1141,7 +1231,7 @@ bool p3GRouter::encryptDataItem(RsGRouterGenericDataItem *item,const RsGxsId& de
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " Encrypted size = " << encrypted_size << std::endl;
|
||||
std::cerr << " First bytes of encrypted data: " << RsUtil::BinToHex((const char *)encrypted_data,std::min(encrypted_size,50)) << "..."<< std::endl;
|
||||
std::cerr << " First bytes of encrypted data: " << RsUtil::BinToHex((const char *)encrypted_data,std::min(encrypted_size,30)) << "..."<< std::endl;
|
||||
std::cerr << " Encrypted data hash = " << RsDirUtil::sha1sum((const uint8_t *)encrypted_data,encrypted_size) << std::endl;
|
||||
#endif
|
||||
return true ;
|
||||
@ -1213,13 +1303,15 @@ bool p3GRouter::signDataItem(RsGRouterAbstractMsgItem *item,const RsGxsId& signi
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " Signing..." << std::endl;
|
||||
std::cerr << "First bytes of signed data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,30u)) << "..."<< std::endl;
|
||||
#endif
|
||||
std::cerr << "First bytes of signed data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,50u)) << "..."<< std::endl;
|
||||
|
||||
if(!GxsSecurity::getSignature((char *)data,data_size,signature_key,item->signature))
|
||||
throw std::runtime_error("Cannot sign for id " + signing_id.toStdString() + ". Signature call failed.") ;
|
||||
|
||||
std::cerr << "Created signature for data hash: " << RsDirUtil::sha1sum(data,data_size) << " and key id=" << signing_id << std::endl;
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "Created signature for data hash: " << RsDirUtil::sha1sum(data,data_size) << " and key id=" << signing_id << std::endl;
|
||||
#endif
|
||||
free(data) ;
|
||||
return true ;
|
||||
}
|
||||
@ -1258,8 +1350,10 @@ bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item)
|
||||
if(signature_key.keyData.bin_data == NULL)
|
||||
throw std::runtime_error("No key for checking signature from " + item->signature.keyId.toStdString());
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " Validating signature for data hash: " << RsDirUtil::sha1sum(data,data_size) << " and key_id = " << item->signature.keyId << std::endl;
|
||||
std::cerr << " First bytes of encrypted data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,50u)) << "..."<< std::endl;
|
||||
std::cerr << " First bytes of encrypted data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,30u)) << "..."<< std::endl;
|
||||
#endif
|
||||
|
||||
if(!GxsSecurity::validateSignature((char*)data,data_size,signature_key,item->signature))
|
||||
throw std::runtime_error("Signature was verified and it doesn't check! This is a security issue!") ;
|
||||
@ -1278,6 +1372,11 @@ bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item)
|
||||
|
||||
bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& client_id,uint8_t *data, uint32_t data_size,const RsGxsId& signing_id, GRouterMsgPropagationId &propagation_id)
|
||||
{
|
||||
if(data_size > MAX_GROUTER_DATA_SIZE)
|
||||
{
|
||||
std::cerr << "GRouter max size limit exceeded (size=" << data_size << ", max=" << MAX_GROUTER_DATA_SIZE << "). Please send a smaller object!" << std::endl;
|
||||
}
|
||||
|
||||
// Make sure we have a unique id (at least locally).
|
||||
//
|
||||
{
|
||||
@ -1332,7 +1431,6 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
|
||||
info.last_tunnel_request_TS = 0 ;
|
||||
info.sending_attempts = 0 ;
|
||||
info.received_time_TS = now ;
|
||||
info.destination_key = destination ;
|
||||
info.tunnel_hash = makeTunnelHash(destination,client_id) ;
|
||||
info.client_id = client_id ;
|
||||
|
||||
@ -1341,7 +1439,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
|
||||
grouter_debug() << " routing id = " << propagation_id << std::endl;
|
||||
grouter_debug() << " data_item.size = " << info.data_item->data_size << std::endl;
|
||||
grouter_debug() << " data_item.byte = " << RsDirUtil::sha1sum(info.data_item->data_bytes,info.data_item->data_size) << std::endl;
|
||||
grouter_debug() << " destination = " << info.destination_key << std::endl;
|
||||
grouter_debug() << " destination = " << data_item->destination_key << std::endl;
|
||||
grouter_debug() << " signed by key = " << data_item->signature.keyId << std::endl;
|
||||
grouter_debug() << " data status = " << info.data_status << std::endl;
|
||||
grouter_debug() << " tunnel status = " << info.tunnel_status << std::endl;
|
||||
@ -1397,7 +1495,9 @@ bool p3GRouter::loadList(std::list<RsItem*>& items)
|
||||
grouter_debug() << " removing all existing items (" << _pending_messages.size() << " items to delete)." << std::endl;
|
||||
#endif
|
||||
|
||||
#ifdef SUSPENDED
|
||||
if(!_pending_messages.empty())
|
||||
std::cerr << " WARNING: pending msg list is not empty. List will be cleared." << std::endl;
|
||||
|
||||
// clear the existing list.
|
||||
//
|
||||
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
||||
@ -1410,7 +1510,7 @@ bool p3GRouter::loadList(std::list<RsItem*>& items)
|
||||
RsGRouterRoutingInfoItem *itm1 = NULL ;
|
||||
|
||||
if(NULL != (itm1 = dynamic_cast<RsGRouterRoutingInfoItem*>(*it)))
|
||||
{
|
||||
{
|
||||
_pending_messages[itm1->data_item->routing_id] = *itm1 ;
|
||||
//_pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication.
|
||||
|
||||
@ -1419,23 +1519,22 @@ bool p3GRouter::loadList(std::list<RsItem*>& items)
|
||||
|
||||
delete *it ;
|
||||
}
|
||||
#endif
|
||||
return true ;
|
||||
return true ;
|
||||
}
|
||||
bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items)
|
||||
{
|
||||
// We save
|
||||
// - the routing clues
|
||||
// - the pending items
|
||||
// We save
|
||||
// - the routing clues
|
||||
// - the pending items
|
||||
|
||||
cleanup = true ; // the client should delete the items.
|
||||
cleanup = true ; // the client should delete the items.
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << "p3GRouter::saveList()..." << std::endl;
|
||||
grouter_debug() << " saving routing clues." << std::endl;
|
||||
grouter_debug() << "p3GRouter::saveList()..." << std::endl;
|
||||
grouter_debug() << " saving routing clues." << std::endl;
|
||||
#endif
|
||||
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
_routing_matrix.saveList(items) ;
|
||||
|
||||
@ -1443,19 +1542,17 @@ bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items)
|
||||
grouter_debug() << " saving pending items." << std::endl;
|
||||
#endif
|
||||
|
||||
#ifdef SUSPENDED
|
||||
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
||||
{
|
||||
RsGRouterRoutingInfoItem *item = new RsGRouterRoutingInfoItem ;
|
||||
|
||||
*(GRouterRoutingInfo*)item = it->second ; // copy all members
|
||||
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
||||
{
|
||||
RsGRouterRoutingInfoItem *item = new RsGRouterRoutingInfoItem ;
|
||||
|
||||
item->data_item = it->second.data_item->duplicate() ; // deep copy, because we call delete on the object, and the item might be removed before we handle it in the client.
|
||||
items.push_back(item) ;
|
||||
*(GRouterRoutingInfo*)item = it->second ; // copy all members
|
||||
|
||||
item->data_item = it->second.data_item->duplicate() ; // deep copy, because we call delete on the object, and the item might be removed before we handle it in the client.
|
||||
items.push_back(item) ;
|
||||
}
|
||||
#endif
|
||||
|
||||
return true ;
|
||||
return true ;
|
||||
}
|
||||
|
||||
bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
|
||||
@ -1498,7 +1595,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
|
||||
GRouterRoutingCacheInfo& cinfo(infos.back()) ;
|
||||
|
||||
cinfo.mid = it->first ;
|
||||
cinfo.destination = it->second.destination_key ;
|
||||
cinfo.destination = it->second.data_item->destination_key ;
|
||||
cinfo.time_stamp = it->second.received_time_TS ;
|
||||
cinfo.status = it->second.data_status;
|
||||
cinfo.data_size = it->second.data_item->data_size ;
|
||||
@ -1536,7 +1633,7 @@ void p3GRouter::debugDump()
|
||||
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
||||
{
|
||||
grouter_debug() << " Msg id : " << std::hex << it->first << std::dec ;
|
||||
grouter_debug() << " Destination: " << it->second.destination_key ;
|
||||
grouter_debug() << " Destination: " << it->second.data_item->destination_key ;
|
||||
grouter_debug() << " Received : " << now - it->second.received_time_TS << " secs ago.";
|
||||
grouter_debug() << " Last sent : " << now - it->second.last_sent_TS << " secs ago.";
|
||||
grouter_debug() << " Data Status: " << statusString[it->second.data_status] << std::endl;
|
||||
@ -1556,8 +1653,8 @@ void p3GRouter::debugDump()
|
||||
|
||||
grouter_debug() << " Routing matrix: " << std::endl;
|
||||
|
||||
// if(_debug_enabled)
|
||||
// _routing_matrix.debugDump() ;
|
||||
// if(_debug_enabled)
|
||||
// _routing_matrix.debugDump() ;
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user