From ed198af80707559c5d40262a9ca659ae2c4ed337 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 19 Apr 2014 16:02:11 +0000 Subject: [PATCH] - added notification from global router to client services - keep distant messages in outbox until they get notified to be received - cleanup dead code git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7284 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- .../src/grouter/grouterclientservice.h | 8 ++ libretroshare/src/grouter/p3grouter.cc | 79 +++++++++++++----- libretroshare/src/grouter/p3grouter.h | 7 +- libretroshare/src/retroshare/rsgrouter.h | 2 +- libretroshare/src/rsserver/rsinit.cc | 6 +- libretroshare/src/serialiser/rsmsgitems.h | 2 +- libretroshare/src/services/p3msgservice.cc | 81 ++++++++++++------- libretroshare/src/services/p3msgservice.h | 3 +- 8 files changed, 128 insertions(+), 60 deletions(-) diff --git a/libretroshare/src/grouter/grouterclientservice.h b/libretroshare/src/grouter/grouterclientservice.h index bca1560e8..2b48cbd1c 100644 --- a/libretroshare/src/grouter/grouterclientservice.h +++ b/libretroshare/src/grouter/grouterclientservice.h @@ -55,6 +55,14 @@ class GRouterClientService std::cerr << " destination key_id = " << destination_key.toStdString() << std::endl; } + // This method is called by the global router when a message has been acknowledged, in order to notify the client. + // + virtual void acknowledgeDataReceived(const GRouterMsgPropagationId& received_id) + { + std::cerr << "!!!!!! Received Data acknowledge from global router, but the client service is not handling it !!!!!!!!!!" << std::endl ; + std::cerr << " message ID = " << received_id << std::endl; + } + // This function is mandatory. It should do two things: // 1 - keep a pointer to the global router, so as to be able to send data (e.g. copy pt into a local variable) // 2 - call pt->registerTunnelService(this), so that the TR knows that service and can send back information to it. diff --git a/libretroshare/src/grouter/p3grouter.cc b/libretroshare/src/grouter/p3grouter.cc index f53915a3f..30fd3c38a 100644 --- a/libretroshare/src/grouter/p3grouter.cc +++ b/libretroshare/src/grouter/p3grouter.cc @@ -346,7 +346,7 @@ void p3GRouter::routePendingObjects() #endif std::set lst ; - mServiceControl->getPeersConnected(RS_SERVICE_TYPE_GROUTER,lst) ; + mServiceControl->getPeersConnected(getServiceInfo().mServiceType,lst) ; RsPeerId own_id( mServiceControl->getOwnId() ); // The policy is the following: @@ -635,6 +635,30 @@ void p3GRouter::handleIncoming() } } +void p3GRouter::locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const +{ +#ifdef GROUTER_DEBUG + grouter_debug() << " Key is owned by us. Notifying service that item was ACKed." << std::endl; +#endif + // notify the client + // + std::map::const_iterator it = _owned_key_ids.find(key) ; + + if(it == _owned_key_ids.end()) + { + std::cerr << "(EE) key " << key << " is not owned by us. That is a weird situation. Probably a bug!" << std::endl; + return ; + } + std::map::const_iterator its = _registered_services.find(it->second.service_id) ; + + if(its == _registered_services.end()) + { + std::cerr << "(EE) key " << key << " is attached to service " << it->second.service_id << ", which is unknown!! That is a bug." << std::endl; + return ; + } + its->second->acknowledgeDataReceived(msg_id) ; +} + void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) { RsStackMutex mtx(grMtx) ; @@ -695,13 +719,21 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) uint32_t next_state = it->second.status_flags; uint32_t forward_state = RS_GROUTER_ACK_STATE_UNKN ; bool update_routing_matrix = false ; + bool should_remove = false ; + bool delete_data = false ; time_t now = time(NULL) ; switch(item->state) { + case RS_GROUTER_ACK_STATE_RCVD: + if(it->second.origin == mLinkMgr->getOwnId()) + { + locked_notifyClientAcknowledged(it->second.destination_key,it->first) ; + should_remove = true ; + } // no break afterwards. That is on purpose! + case RS_GROUTER_ACK_STATE_IRCV: - case RS_GROUTER_ACK_STATE_RCVD: // Notify the origin. This is the main route and it was successful. #ifdef GROUTER_DEBUG @@ -714,6 +746,7 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) next_state = RS_GROUTER_ROUTING_STATE_ARVD ; update_routing_matrix = true ; + delete_data = true ; break ; @@ -721,14 +754,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) break ; } - if(it->second.origin == mLinkMgr->getOwnId()) - { - // find the client service and notify it. -#ifdef GROUTER_DEBUG - grouter_debug() << " We're owner: should notify client id" << std::endl; -#endif - } - // Just decrement the list of tried friends // bool found = false ; @@ -773,19 +798,16 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) if(it->second.tried_friends.empty()) { - delete it->second.data_item ; - it->second.data_item = NULL ; - - // delete item, but keep the cache entry for a while. - #ifdef GROUTER_DEBUG - grouter_debug() << " No tries left. Removing item from pending list." << std::endl; + grouter_debug() << " No tries left. Keeping item into pending list or a while." << std::endl; #endif + // If no route was found, delete item, but keep the cache entry for a while in order to avoid bouncing. + // if(it->second.status_flags != RS_GROUTER_ROUTING_STATE_ARVD && next_state != RS_GROUTER_ROUTING_STATE_ARVD) { next_state = RS_GROUTER_ROUTING_STATE_DEAD ; forward_state = RS_GROUTER_ACK_STATE_GVNP ; - } + } } // Now send an ACK if necessary. @@ -805,6 +827,20 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) sendACK(it->second.origin,item->mid,item->state) ; } it->second.status_flags = next_state ; + + if(delete_data) + { + delete it->second.data_item ; + it->second.data_item = NULL ; + } + if(should_remove) + { +#ifdef GROUTER_DEBUG + grouter_debug() << " Removing entry from pending messages. " << std::endl; +#endif + delete it->second.data_item ; + _pending_messages.erase(it) ; + } } void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item) @@ -930,7 +966,9 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item) grouter_debug() << " after triage: status = " << new_status_flags << ", ack = " << returned_ack << std::endl; - if(new_status_flags != RS_GROUTER_ROUTING_STATE_UNKN) itr->second.status_flags = new_status_flags ; + if(new_status_flags != RS_GROUTER_ROUTING_STATE_UNKN) + itr->second.status_flags = new_status_flags ; + if(returned_ack != RS_GROUTER_ACK_STATE_UNKN) sendACK(item->PeerId(),item->routing_id,returned_ack) ; @@ -944,7 +982,7 @@ bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientSe return true ; } -void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item) +void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& propagation_id) { RsStackMutex mtx(grMtx) ; // push the item into pending messages. @@ -963,7 +1001,6 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt // Make sure we have a unique id (at least locally). // - GRouterMsgPropagationId propagation_id ; do { propagation_id = RSRandom::random_u32(); } while(_pending_messages.find(propagation_id) != _pending_messages.end()) ; item->destination_key = destination ; @@ -1070,7 +1107,7 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info) info.published_keys.clear() ; std::set ids ; - mServiceControl->getPeersConnected(RS_SERVICE_TYPE_GROUTER,ids) ; + mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ; RsStackMutex mtx(grMtx) ; diff --git a/libretroshare/src/grouter/p3grouter.h b/libretroshare/src/grouter/p3grouter.h index 558211b27..bf3ec8fd3 100644 --- a/libretroshare/src/grouter/p3grouter.h +++ b/libretroshare/src/grouter/p3grouter.h @@ -85,9 +85,10 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config //===================================================// // Sends an item to the given destination. The router takes ownership of - // the memory. That means item_data will be erase on return. + // the memory. That means item_data will be erase on return. The returned id should be + // remembered by the client, so that he knows when the data has been received. // - void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item) ; + void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) ; // Sends an ACK to the origin of the msg. This is used to notify for // unfound route, or message correctly received, depending on the particular situation. @@ -170,6 +171,8 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ; static time_t computeNextTimeDelay(time_t duration) ; + void locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const ; + uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ; //===================================================// diff --git a/libretroshare/src/retroshare/rsgrouter.h b/libretroshare/src/retroshare/rsgrouter.h index fb349a55e..5202f1d57 100644 --- a/libretroshare/src/retroshare/rsgrouter.h +++ b/libretroshare/src/retroshare/rsgrouter.h @@ -85,7 +85,7 @@ class RsGRouter // Communication to other services. // //===================================================// - virtual void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item) =0; + virtual void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) =0; virtual bool registerKey(const GRouterKeyId& key,const GRouterServiceId& client_id,const std::string& description_string) =0; }; diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index 2eeedaf5e..dc27d5e73 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1567,10 +1567,8 @@ int RsServer::StartupRetroShare() // Services that have been changed to pqiServiceMonitor serviceCtrl->registerServiceMonitor(msgSrv, msgSrv->getServiceInfo().mServiceType); serviceCtrl->registerServiceMonitor(mDisc, mDisc->getServiceInfo().mServiceType); - serviceCtrl->registerServiceMonitor(mStatusSrv, - mStatusSrv->getServiceInfo().mServiceType); - serviceCtrl->registerServiceMonitor(chatSrv, - chatSrv->getServiceInfo().mServiceType); + serviceCtrl->registerServiceMonitor(mStatusSrv, mStatusSrv->getServiceInfo().mServiceType); + serviceCtrl->registerServiceMonitor(chatSrv, chatSrv->getServiceInfo().mServiceType); serviceCtrl->registerServiceMonitor(mBwCtrl, mDisc->getServiceInfo().mServiceType); /**************************************************************************/ diff --git a/libretroshare/src/serialiser/rsmsgitems.h b/libretroshare/src/serialiser/rsmsgitems.h index 66221637f..e893f9bbb 100644 --- a/libretroshare/src/serialiser/rsmsgitems.h +++ b/libretroshare/src/serialiser/rsmsgitems.h @@ -431,7 +431,6 @@ const uint32_t RS_MSG_FLAGS_REPLIED = 0x00000080; const uint32_t RS_MSG_FLAGS_FORWARDED = 0x00000100; const uint32_t RS_MSG_FLAGS_STAR = 0x00000200; const uint32_t RS_MSG_FLAGS_PARTIAL = 0x00000400; -// system message const uint32_t RS_MSG_FLAGS_USER_REQUEST = 0x00000800; const uint32_t RS_MSG_FLAGS_FRIEND_RECOMMENDATION = 0x00001000; const uint32_t RS_MSG_FLAGS_SYSTEM = RS_MSG_FLAGS_USER_REQUEST | RS_MSG_FLAGS_FRIEND_RECOMMENDATION; @@ -442,6 +441,7 @@ const uint32_t RS_MSG_FLAGS_SIGNATURE_CHECKS = 0x00010000; const uint32_t RS_MSG_FLAGS_SIGNED = 0x00020000; const uint32_t RS_MSG_FLAGS_LOAD_EMBEDDED_IMAGES = 0x00040000; const uint32_t RS_MSG_FLAGS_DECRYPTED = 0x00080000; +const uint32_t RS_MSG_FLAGS_ROUTED = 0x00100000; class RsMessageItem: public RsItem { diff --git a/libretroshare/src/services/p3msgservice.cc b/libretroshare/src/services/p3msgservice.cc index e77ed1960..58abac2ea 100644 --- a/libretroshare/src/services/p3msgservice.cc +++ b/libretroshare/src/services/p3msgservice.cc @@ -298,16 +298,6 @@ void p3MsgService::checkSizeAndSendMessage(RsMsgItem *msg) std::cerr << "Msg is size " << msg->message.size() << std::endl; - if( msg->msgFlags & RS_MSG_FLAGS_DISTANT ) - { - RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/ - -#ifdef DEBUG_DISTANT_MSG - std::cerr << "checkOutgoingMessages(): removing pending message flag for peer id " << msg->PeerId() << "." << std::endl; -#endif - _messenging_contacts[GRouterKeyId(msg->PeerId())].pending_messages = false ; - } - while(msg->message.size() > MAX_STRING_SIZE) { // chop off the first 15000 wchars @@ -366,7 +356,9 @@ int p3MsgService::checkOutgoingMessages() /* find the certificate */ RsPeerId pid = mit->second->PeerId(); - if( pid == ownId || (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) || mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */ + if( pid == ownId + || ( (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && (!(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED))) + || mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */ { /* send msg */ pqioutput(PQL_DEBUG_BASIC, msgservicezone, @@ -375,9 +367,16 @@ int p3MsgService::checkOutgoingMessages() (mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING; output_queue.push_back(mit->second) ; - toErase.push_back(mit->first); - changed = true ; + // When the message is a distant msg, dont remove it yet from the list. Only mark it as being sent, so that we don't send it again. + // + if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT)) + { + toErase.push_back(mit->first); + changed = true ; + } + else + mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED ; } else { @@ -2151,7 +2150,45 @@ void p3MsgService::sendGRouterData(const GRouterKeyId& key_id,RsMsgItem *msgitem delete[] msg_serialized_data ; - mGRouter->sendData(key_id,item) ; + GRouterMsgPropagationId grouter_message_id ; + + mGRouter->sendData(key_id,item,grouter_message_id) ; + + // now store the grouter id along with the message id, so that we can keep track of received messages + + _ongoing_messages[grouter_message_id] = msgitem->msgId ; +} +void p3MsgService::acknowledgeDataReceived(const GRouterMsgPropagationId& id) +{ + RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/ +#ifdef DEBUG_DISTANT_MSG + std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl; +#endif + std::map::iterator it = _ongoing_messages.find(id) ; + + if(it == _ongoing_messages.end()) + { + std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl; + return ; + } + + uint32_t msg_id = it->second ; + + // we should now remove the item from the msgOutgoing list. + + std::map::iterator it2 = msgOutgoing.find(msg_id) ; + + if(it2 == msgOutgoing.end()) + { + std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl; + return ; + } + + delete it2->second ; + msgOutgoing.erase(it2) ; + + RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD); + IndicateConfigChanged() ; } void p3MsgService::receiveGRouterData(const GRouterKeyId& key, const RsGRouterGenericDataItem *gitem) { @@ -2179,22 +2216,6 @@ void p3MsgService::sendPrivateMsgItem(RsMsgItem *msgitem) std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item to peer " << msgitem->PeerId() << std::endl; #endif GRouterKeyId key_id(msgitem->PeerId()) ; - { - RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/ - - // allocate a new contact. If it does not exist, set its tunnel state to DN - // - std::map::iterator it = _messenging_contacts.find(key_id) ; - - if(it == _messenging_contacts.end()) - { - std::cerr << "(EE) p3MsgService::sendPrivateMsgItem(): ERROR: no tunnel for message to send. This should not happen. " << std::endl; - return ; - } - - if(!it->second.pending_messages) - std::cerr << "(WW) p3MsgService::sendPrivateMsgItem(): WARNING: no pending message flag. This should not happen. " << std::endl; - } #ifdef DEBUG_DISTANT_MSG std::cerr << " Flushing msg " << msgitem->msgId << " for peer id " << msgitem->PeerId() << std::endl; diff --git a/libretroshare/src/services/p3msgservice.h b/libretroshare/src/services/p3msgservice.h index 289ca4399..413840e0f 100644 --- a/libretroshare/src/services/p3msgservice.h +++ b/libretroshare/src/services/p3msgservice.h @@ -132,11 +132,12 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor, // This contains the ongoing tunnel handling contacts. // The map is indexed by the hash // - std::map _messenging_contacts ; + std::map _ongoing_messages ; // Overloaded from GRouterClientService virtual void receiveGRouterData(const GRouterKeyId& key,const RsGRouterGenericDataItem *item) ; + virtual void acknowledgeDataReceived(const GRouterMsgPropagationId& msg_id) ; // Utility functions