diff --git a/libretroshare/src/gxstunnel/p3gxstunnel.cc b/libretroshare/src/gxstunnel/p3gxstunnel.cc index 43ff5b0b3..e7d85275a 100644 --- a/libretroshare/src/gxstunnel/p3gxstunnel.cc +++ b/libretroshare/src/gxstunnel/p3gxstunnel.cc @@ -53,7 +53,8 @@ static const uint32_t RS_GXS_TUNNEL_DH_STATUS_UNINITIALIZED = 0x0000 ; static const uint32_t RS_GXS_TUNNEL_DH_STATUS_HALF_KEY_DONE = 0x0001 ; static const uint32_t RS_GXS_TUNNEL_DH_STATUS_KEY_AVAILABLE = 0x0002 ; -static const uint32_t RS_GXS_TUNNEL_DELAY_BETWEEN_RESEND = 10 ; // re-send every 10 secs. +static const uint32_t RS_GXS_TUNNEL_DELAY_BETWEEN_RESEND = 10 ; // re-send every 10 secs. +static const uint32_t RS_GXS_TUNNEL_DATA_PRINT_STORAGE_DELAY = 600 ; // store old message ids for 10 minutes. static const uint32_t GXS_TUNNEL_ENCRYPTION_HMAC_SIZE = SHA_DIGEST_LENGTH ; static const uint32_t GXS_TUNNEL_ENCRYPTION_IV_SIZE = 8 ; @@ -73,9 +74,6 @@ p3GxsTunnelService::p3GxsTunnelService(RsGixs *pids) : mGixs(pids), mGxsTunnelMtx("GXS tunnel") { mTurtle = NULL ; - - // any value is fine here, even 0, since items in different RS sessions will use different AES keys. - global_item_counter = 0;//RSRandom::random_u64() ; } void p3GxsTunnelService::connectToTurtleRouter(p3turtle *tr) @@ -196,7 +194,9 @@ void p3GxsTunnelService::flush() for(std::map::iterator it(_gxs_tunnel_contacts.begin());it!=_gxs_tunnel_contacts.end();) { - // Remove any tunnel that was remotely closed, since we cannot use it anymore. + // All sorts of cleaning. We start with the ones that may remove stuff, for efficiency reasons. + + // 1 - Remove any tunnel that was remotely closed, since we cannot use it anymore. if(it->second.status == RS_GXS_TUNNEL_STATUS_REMOTELY_CLOSED && it->second.last_contact + 20 < now) { @@ -207,6 +207,8 @@ void p3GxsTunnelService::flush() continue ; } + // 2 - re-digg tunnels that have died out of inaction + if(it->second.last_contact+20+GXS_TUNNEL_KEEP_ALIVE_TIMEOUT < now && it->second.status == RS_GXS_TUNNEL_STATUS_CAN_TALK) { #ifdef DEBUG_GXS_TUNNEL @@ -228,6 +230,9 @@ void p3GxsTunnelService::flush() mTurtle->forceReDiggTunnels( randomHashFromDestinationGxsId(it->second.to_gxs_id) ); } } + + // send keep alive packets to active tunnels. + if(it->second.last_keep_alive_sent + GXS_TUNNEL_KEEP_ALIVE_TIMEOUT < now && it->second.status == RS_GXS_TUNNEL_STATUS_CAN_TALK) { RsGxsTunnelStatusItem *cs = new RsGxsTunnelStatusItem ; @@ -244,6 +249,23 @@ void p3GxsTunnelService::flush() std::cerr << "(II) GxsTunnelService:: Sending keep alive packet to gxs id " << it->first << std::endl; #endif } + + // clean old received data prints. + + for(std::map::iterator it2=it->second.received_data_prints.begin();it2!=it->second.received_data_prints.end();) + if(now > it2->second + RS_GXS_TUNNEL_DATA_PRINT_STORAGE_DELAY) + { +#ifdef DEBUG_GXS_TUNNEL + std::cerr << "(II) erasing old data print for message #" << it2->first << " in tunnel " << it->first << std::endl; +#endif + std::map::iterator tmp(it2) ; + ++tmp ; + it->second.received_data_prints.erase(it2) ; + it2 = tmp ; + } + else + ++it2 ; + ++it ; } } @@ -347,6 +369,16 @@ void p3GxsTunnelService::handleRecvTunnelDataItem(const RsGxsTunnelId& tunnel_id it2->second.client_services.insert(item->service_id) ; peer_from = it2->second.to_gxs_id ; } + + // Check if the item has already been received. This is necessary because we actually re-send items until an ACK is received. If the ACK gets lost (connection interrupted) the + // item may be received twice. This is conservative and ensure that no item is lost nor received twice. + + if(it2->second.received_data_prints.find(item->unique_item_counter) != it2->second.received_data_prints.end()) + { + std::cerr << "(WW) received the same data item #" << std::hex << item->unique_item_counter << std::dec << " twice in last 20 mins. Tunnel id=" << tunnel_id << ". Probably a replay. Item will be dropped." << std::endl; + return ; + } + it2->second.received_data_prints[item->unique_item_counter] = time(NULL) ; } if(service->acceptDataFromPeer(peer_from,tunnel_id)) @@ -1325,7 +1357,7 @@ bool p3GxsTunnelService::sendData(const RsGxsTunnelId &tunnel_id, uint32_t servi RsGxsTunnelDataItem *item = new RsGxsTunnelDataItem ; - item->unique_item_counter = global_item_counter++; // this allows to make the item unique + item->unique_item_counter = RSRandom::random_u64(); // this allows to make the item unique, except very rarely, we we don't care. item->flags = 0; // not used yet. item->service_id = service_id; item->data_size = size; // encrypted data size diff --git a/libretroshare/src/gxstunnel/p3gxstunnel.h b/libretroshare/src/gxstunnel/p3gxstunnel.h index 67589f8e5..9a2481623 100644 --- a/libretroshare/src/gxstunnel/p3gxstunnel.h +++ b/libretroshare/src/gxstunnel/p3gxstunnel.h @@ -171,6 +171,7 @@ private: RsTurtleGenericTunnelItem::Direction direction ; // specifiec wether we are client(managing the tunnel) or server. TurtleFileHash hash ; // hash that is last used. This is necessary for handling tunnel establishment std::set client_services ;// services that used this tunnel + std::map received_data_prints ; // list of recently received messages, to avoid duplicates. Kept for 20 mins at most. uint32_t total_sent ; uint32_t total_received ; }; @@ -252,8 +253,6 @@ private: RsGixs *mGixs ; RsMutex mGxsTunnelMtx ; - uint64_t global_item_counter ; - std::map mRegisteredServices ; void debug_dump();