From 4d2907efedac3f24cff100f0da76fe49e721df0f Mon Sep 17 00:00:00 2001 From: csoler Date: Mon, 21 Apr 2014 12:39:30 +0000 Subject: [PATCH] - added client notification to grouter - now distant messages stay in outgoing box until notified to be received. - fixed serialisation bug git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7293 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/grouter/grouteritems.cc | 10 ++++- libretroshare/src/grouter/grouteritems.h | 2 +- libretroshare/src/grouter/groutertypes.h | 1 + libretroshare/src/grouter/p3grouter.cc | 48 ++++++++++------------- libretroshare/src/grouter/p3grouter.h | 22 ++--------- libretroshare/src/retroshare/rsgrouter.h | 2 +- 6 files changed, 36 insertions(+), 49 deletions(-) diff --git a/libretroshare/src/grouter/grouteritems.cc b/libretroshare/src/grouter/grouteritems.cc index 5430ae4bb..27ebf0d2e 100644 --- a/libretroshare/src/grouter/grouteritems.cc +++ b/libretroshare/src/grouter/grouteritems.cc @@ -259,6 +259,7 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI ok &= getRawUInt32(data, pktsize, &offset, &item->status_flags); ok &= item->origin.deserialise(data, pktsize, offset) ; ok &= getRawTimeT(data, pktsize, &offset, item->received_time); + ok &= getRawUInt32(data, pktsize, &offset, &item->client_id); uint32_t s = 0 ; ok &= getRawUInt32(data, pktsize, &offset, &s) ; @@ -276,6 +277,10 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI } item->data_item = deserialise_RsGRouterGenericDataItem(&((uint8_t*)data)[offset],pktsize - offset) ; + if(item->data_item != NULL) + offset += item->data_item->serial_size() ; + else + ok = false ; if (offset != rssize || !ok) { @@ -453,8 +458,9 @@ uint32_t RsGRouterRoutingInfoItem::serial_size() const s += origin.serial_size() ; // origin s += 8 ; // received_time s += 4 ; // tried_friends.size() ; + s += sizeof(GRouterServiceId) ; // service_id s += tried_friends.size() * ( RsPeerId::SIZE_IN_BYTES + 8 + 4 + 4 ) ; // FriendTrialRecord - s += data_item->serial_size(); // data_item + s += data_item->serial_size(); // data_item return s ; } @@ -538,6 +544,7 @@ bool RsGRouterRoutingInfoItem::serialise(void *data,uint32_t& size) const ok &= setRawUInt32(data, tlvsize, &offset, status_flags) ; ok &= origin.serialise(data, tlvsize, offset) ; ok &= setRawTimeT(data, tlvsize, &offset, received_time) ; + ok &= setRawUInt32(data, tlvsize, &offset, client_id) ; ok &= setRawUInt32(data, tlvsize, &offset, tried_friends.size()) ; for(std::list::const_iterator it(tried_friends.begin());it!=tried_friends.end();++it) @@ -602,6 +609,7 @@ std::ostream& RsGRouterRoutingInfoItem::print(std::ostream& o, uint16_t) o << " flags: "<< std::hex << status_flags << std::dec << std::endl ; o << " Key: "<< data_item->destination_key.toStdString() << std::endl ; o << " Data size: "<< data_item->data_size << std::endl ; + o << " Client id: "<< client_id << std::endl ; o << " Tried friends: "<< tried_friends.size() << std::endl; return o ; diff --git a/libretroshare/src/grouter/grouteritems.h b/libretroshare/src/grouter/grouteritems.h index eea14f12f..c50211610 100644 --- a/libretroshare/src/grouter/grouteritems.h +++ b/libretroshare/src/grouter/grouteritems.h @@ -38,7 +38,7 @@ const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x05 ; // used to send da const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists -const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x85 ; // item to save routing info +const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x86 ; // item to save routing info const uint8_t QOS_PRIORITY_RS_GROUTER_PUBLISH_KEY = 3 ; // slow items. No need to congest the network with this. const uint8_t QOS_PRIORITY_RS_GROUTER_ACK = 3 ; diff --git a/libretroshare/src/grouter/groutertypes.h b/libretroshare/src/grouter/groutertypes.h index 2f78fa2cd..7394b7c22 100644 --- a/libretroshare/src/grouter/groutertypes.h +++ b/libretroshare/src/grouter/groutertypes.h @@ -85,6 +85,7 @@ class GRouterRoutingInfo std::list tried_friends ; // list of friends to which the item was sent ordered with time. GRouterKeyId destination_key ; // ultimate destination for this key + GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId RsGRouterGenericDataItem *data_item ; }; diff --git a/libretroshare/src/grouter/p3grouter.cc b/libretroshare/src/grouter/p3grouter.cc index 30fd3c38a..c1e519217 100644 --- a/libretroshare/src/grouter/p3grouter.cc +++ b/libretroshare/src/grouter/p3grouter.cc @@ -208,6 +208,7 @@ p3GRouter::p3GRouter(p3ServiceControl *sc,p3LinkMgr *lm) _last_debug_output_time = 0 ; _last_config_changed = 0 ; _last_matrix_update_time = 0 ; + _debug_enabled = true ; _random_salt = RSRandom::random_u64() ; @@ -316,7 +317,7 @@ void p3GRouter::autoWash() _pending_messages.erase(it) ; it = tmp ; } - else if(it->second.data_item != NULL && it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && computeNextTimeDelay(it->second.last_sent - it->second.received_time) + it->second.last_sent < now) + else if(it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && computeNextTimeDelay(it->second.last_sent - it->second.received_time) + it->second.last_sent < now) { it->second.status_flags = RS_GROUTER_ROUTING_STATE_PEND ; #ifdef GROUTER_DEBUG @@ -388,8 +389,6 @@ void p3GRouter::routePendingObjects() { sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_GVNP) ; it->second.status_flags = RS_GROUTER_ROUTING_STATE_DEAD ; - delete it->second.data_item ; - it->second.data_item = NULL ; ++it ; continue ; } @@ -635,25 +634,18 @@ void p3GRouter::handleIncoming() } } -void p3GRouter::locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const +void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const { #ifdef GROUTER_DEBUG - grouter_debug() << " Key is owned by us. Notifying service that item was ACKed." << std::endl; + grouter_debug() << " Key is owned by us. Notifying service that item was ACKed. msg_id=" << msg_id << ", service_id = " << service_id << "." << std::endl; #endif // notify the client // - std::map::const_iterator it = _owned_key_ids.find(key) ; - - if(it == _owned_key_ids.end()) - { - std::cerr << "(EE) key " << key << " is not owned by us. That is a weird situation. Probably a bug!" << std::endl; - return ; - } - std::map::const_iterator its = _registered_services.find(it->second.service_id) ; + std::map::const_iterator its = _registered_services.find(service_id) ; if(its == _registered_services.end()) { - std::cerr << "(EE) key " << key << " is attached to service " << it->second.service_id << ", which is unknown!! That is a bug." << std::endl; + std::cerr << "(EE) message " << msg_id << " is attached to service " << service_id << ", which is unknown!! That is a bug." << std::endl; return ; } its->second->acknowledgeDataReceived(msg_id) ; @@ -720,7 +712,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) uint32_t forward_state = RS_GROUTER_ACK_STATE_UNKN ; bool update_routing_matrix = false ; bool should_remove = false ; - bool delete_data = false ; time_t now = time(NULL) ; @@ -729,7 +720,7 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) case RS_GROUTER_ACK_STATE_RCVD: if(it->second.origin == mLinkMgr->getOwnId()) { - locked_notifyClientAcknowledged(it->second.destination_key,it->first) ; + locked_notifyClientAcknowledged(it->first,it->second.client_id) ; should_remove = true ; } // no break afterwards. That is on purpose! @@ -746,7 +737,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) next_state = RS_GROUTER_ROUTING_STATE_ARVD ; update_routing_matrix = true ; - delete_data = true ; break ; @@ -803,7 +793,12 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) #endif // If no route was found, delete item, but keep the cache entry for a while in order to avoid bouncing. // - if(it->second.status_flags != RS_GROUTER_ROUTING_STATE_ARVD && next_state != RS_GROUTER_ROUTING_STATE_ARVD) + if(it->second.origin == mLinkMgr->getOwnId()) + { + next_state = RS_GROUTER_ROUTING_STATE_SENT ; // Keep it that way until the item gets sent again (turned into PEND) + forward_state = RS_GROUTER_ACK_STATE_UNKN ; + } + else if(it->second.status_flags != RS_GROUTER_ROUTING_STATE_ARVD && next_state != RS_GROUTER_ROUTING_STATE_ARVD) { next_state = RS_GROUTER_ROUTING_STATE_DEAD ; forward_state = RS_GROUTER_ACK_STATE_GVNP ; @@ -828,11 +823,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) } it->second.status_flags = next_state ; - if(delete_data) - { - delete it->second.data_item ; - it->second.data_item = NULL ; - } if(should_remove) { #ifdef GROUTER_DEBUG @@ -922,6 +912,7 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item) info.last_sent = 0 ; info.destination_key = item->destination_key ; info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ; + info.client_id = 0 ; _pending_messages[item->routing_id] = info ; itr = _pending_messages.find(item->routing_id) ; @@ -982,7 +973,7 @@ bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientSe return true ; } -void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& propagation_id) +void p3GRouter::sendData(const GRouterKeyId& destination,const GRouterServiceId& client_id, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& propagation_id) { RsStackMutex mtx(grMtx) ; // push the item into pending messages. @@ -998,6 +989,7 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt info.last_sent = 0 ; info.received_time = now ; info.destination_key = destination ; + info.client_id = client_id ; // Make sure we have a unique id (at least locally). // @@ -1015,6 +1007,7 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt grouter_debug() << " distance = " << info.data_item->randomized_distance << std::endl; grouter_debug() << " origin = " << info.origin.toStdString() << std::endl; grouter_debug() << " Recv time = " << info.received_time << std::endl; + grouter_debug() << " Client id = " << info.client_id << std::endl; #endif _pending_messages[propagation_id] = info ; @@ -1047,6 +1040,8 @@ bool p3GRouter::loadList(std::list& items) grouter_debug() << " removing all existing items (" << _pending_messages.size() << " items to delete)." << std::endl; #endif + // clear the existing list. + // for(std::map::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it) delete it->second.data_item ; _pending_messages.clear() ; @@ -1058,7 +1053,7 @@ bool p3GRouter::loadList(std::list& items) if(NULL != (itm1 = dynamic_cast(*it))) { _pending_messages[itm1->data_item->routing_id] = *itm1 ; - _pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication. + //_pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication. itm1->data_item = NULL ; // prevents deletion. } @@ -1093,7 +1088,6 @@ bool p3GRouter::saveList(bool& cleanup,std::list& items) *(GRouterRoutingInfo*)item = it->second ; // copy all members item->data_item = it->second.data_item->duplicate() ; // deep copy, because we call delete on the object, and the item might be removed before we handle it in the client. - items.push_back(item) ; } @@ -1143,7 +1137,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector& infos) cinfo.destination = it->second.destination_key ; cinfo.time_stamp = it->second.received_time ; cinfo.status = it->second.status_flags ; - cinfo.data_size = (it->second.data_item==NULL)?0:(it->second.data_item->data_size) ; + cinfo.data_size = it->second.data_item->data_size ; } return true ; } diff --git a/libretroshare/src/grouter/p3grouter.h b/libretroshare/src/grouter/p3grouter.h index bf3ec8fd3..8653ff427 100644 --- a/libretroshare/src/grouter/p3grouter.h +++ b/libretroshare/src/grouter/p3grouter.h @@ -87,8 +87,9 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config // Sends an item to the given destination. The router takes ownership of // the memory. That means item_data will be erase on return. The returned id should be // remembered by the client, so that he knows when the data has been received. + // The client id is supplied so that the client can be notified when the data has been received. // - void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) ; + void sendData(const GRouterKeyId& destination,const GRouterServiceId& client_id, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) ; // Sends an ACK to the origin of the msg. This is used to notify for // unfound route, or message correctly received, depending on the particular situation. @@ -171,7 +172,7 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ; static time_t computeNextTimeDelay(time_t duration) ; - void locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const ; + void locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const ; uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ; @@ -215,23 +216,6 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config // std::map _owned_key_ids ; -#ifdef TO_BE_REMOVED - // Key publish cache and buffers - // Handles key publish items routes and forwarding info. - // - // 1 - timestamps of diffused keys received stored by diffusion id. - std::map _key_diffusion_time_stamps ; - - // 2 - list of key diffusion items to be routed. These are stored in a priority structure - // where the priority is based on key distance, so that: - // - long distance keys get propagated less easily - // - when the list exceeds the maximum allowed size, items with the largest distance get dropped. - // - std::priority_queue _key_diffusion_items ; - - void handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item) ; -#endif - // Registered services. These are known to the different peers with a common id, // so it's important to keep consistency here. This map is volatile, and re-created at each startup of // the software, when newly created services register themselves. diff --git a/libretroshare/src/retroshare/rsgrouter.h b/libretroshare/src/retroshare/rsgrouter.h index 5202f1d57..f217bede1 100644 --- a/libretroshare/src/retroshare/rsgrouter.h +++ b/libretroshare/src/retroshare/rsgrouter.h @@ -85,7 +85,7 @@ class RsGRouter // Communication to other services. // //===================================================// - virtual void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) =0; + virtual void sendData(const GRouterKeyId& destination, const GRouterServiceId& client_id, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) =0; virtual bool registerKey(const GRouterKeyId& key,const GRouterServiceId& client_id,const std::string& description_string) =0; };