mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-01-14 00:49:41 -05:00
- added notification from global router to client services
- keep distant messages in outbox until they get notified to be received - cleanup dead code git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7284 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
11e370c56d
commit
ed198af807
@ -55,6 +55,14 @@ class GRouterClientService
|
||||
std::cerr << " destination key_id = " << destination_key.toStdString() << std::endl;
|
||||
}
|
||||
|
||||
// This method is called by the global router when a message has been acknowledged, in order to notify the client.
|
||||
//
|
||||
virtual void acknowledgeDataReceived(const GRouterMsgPropagationId& received_id)
|
||||
{
|
||||
std::cerr << "!!!!!! Received Data acknowledge from global router, but the client service is not handling it !!!!!!!!!!" << std::endl ;
|
||||
std::cerr << " message ID = " << received_id << std::endl;
|
||||
}
|
||||
|
||||
// This function is mandatory. It should do two things:
|
||||
// 1 - keep a pointer to the global router, so as to be able to send data (e.g. copy pt into a local variable)
|
||||
// 2 - call pt->registerTunnelService(this), so that the TR knows that service and can send back information to it.
|
||||
|
@ -346,7 +346,7 @@ void p3GRouter::routePendingObjects()
|
||||
#endif
|
||||
|
||||
std::set<RsPeerId> lst ;
|
||||
mServiceControl->getPeersConnected(RS_SERVICE_TYPE_GROUTER,lst) ;
|
||||
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,lst) ;
|
||||
RsPeerId own_id( mServiceControl->getOwnId() );
|
||||
|
||||
// The policy is the following:
|
||||
@ -635,6 +635,30 @@ void p3GRouter::handleIncoming()
|
||||
}
|
||||
}
|
||||
|
||||
void p3GRouter::locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " Key is owned by us. Notifying service that item was ACKed." << std::endl;
|
||||
#endif
|
||||
// notify the client
|
||||
//
|
||||
std::map<GRouterKeyId, GRouterPublishedKeyInfo>::const_iterator it = _owned_key_ids.find(key) ;
|
||||
|
||||
if(it == _owned_key_ids.end())
|
||||
{
|
||||
std::cerr << "(EE) key " << key << " is not owned by us. That is a weird situation. Probably a bug!" << std::endl;
|
||||
return ;
|
||||
}
|
||||
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(it->second.service_id) ;
|
||||
|
||||
if(its == _registered_services.end())
|
||||
{
|
||||
std::cerr << "(EE) key " << key << " is attached to service " << it->second.service_id << ", which is unknown!! That is a bug." << std::endl;
|
||||
return ;
|
||||
}
|
||||
its->second->acknowledgeDataReceived(msg_id) ;
|
||||
}
|
||||
|
||||
void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
|
||||
{
|
||||
RsStackMutex mtx(grMtx) ;
|
||||
@ -695,13 +719,21 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
|
||||
uint32_t next_state = it->second.status_flags;
|
||||
uint32_t forward_state = RS_GROUTER_ACK_STATE_UNKN ;
|
||||
bool update_routing_matrix = false ;
|
||||
bool should_remove = false ;
|
||||
bool delete_data = false ;
|
||||
|
||||
time_t now = time(NULL) ;
|
||||
|
||||
switch(item->state)
|
||||
{
|
||||
case RS_GROUTER_ACK_STATE_IRCV:
|
||||
case RS_GROUTER_ACK_STATE_RCVD:
|
||||
if(it->second.origin == mLinkMgr->getOwnId())
|
||||
{
|
||||
locked_notifyClientAcknowledged(it->second.destination_key,it->first) ;
|
||||
should_remove = true ;
|
||||
} // no break afterwards. That is on purpose!
|
||||
|
||||
case RS_GROUTER_ACK_STATE_IRCV:
|
||||
// Notify the origin. This is the main route and it was successful.
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
@ -714,6 +746,7 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
|
||||
next_state = RS_GROUTER_ROUTING_STATE_ARVD ;
|
||||
|
||||
update_routing_matrix = true ;
|
||||
delete_data = true ;
|
||||
break ;
|
||||
|
||||
|
||||
@ -721,14 +754,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
|
||||
break ;
|
||||
}
|
||||
|
||||
if(it->second.origin == mLinkMgr->getOwnId())
|
||||
{
|
||||
// find the client service and notify it.
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " We're owner: should notify client id" << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
// Just decrement the list of tried friends
|
||||
//
|
||||
bool found = false ;
|
||||
@ -773,14 +798,11 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
|
||||
|
||||
if(it->second.tried_friends.empty())
|
||||
{
|
||||
delete it->second.data_item ;
|
||||
it->second.data_item = NULL ;
|
||||
|
||||
// delete item, but keep the cache entry for a while.
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " No tries left. Removing item from pending list." << std::endl;
|
||||
grouter_debug() << " No tries left. Keeping item into pending list or a while." << std::endl;
|
||||
#endif
|
||||
// If no route was found, delete item, but keep the cache entry for a while in order to avoid bouncing.
|
||||
//
|
||||
if(it->second.status_flags != RS_GROUTER_ROUTING_STATE_ARVD && next_state != RS_GROUTER_ROUTING_STATE_ARVD)
|
||||
{
|
||||
next_state = RS_GROUTER_ROUTING_STATE_DEAD ;
|
||||
@ -805,6 +827,20 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
|
||||
sendACK(it->second.origin,item->mid,item->state) ;
|
||||
}
|
||||
it->second.status_flags = next_state ;
|
||||
|
||||
if(delete_data)
|
||||
{
|
||||
delete it->second.data_item ;
|
||||
it->second.data_item = NULL ;
|
||||
}
|
||||
if(should_remove)
|
||||
{
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " Removing entry from pending messages. " << std::endl;
|
||||
#endif
|
||||
delete it->second.data_item ;
|
||||
_pending_messages.erase(it) ;
|
||||
}
|
||||
}
|
||||
|
||||
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
|
||||
@ -930,7 +966,9 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
|
||||
|
||||
grouter_debug() << " after triage: status = " << new_status_flags << ", ack = " << returned_ack << std::endl;
|
||||
|
||||
if(new_status_flags != RS_GROUTER_ROUTING_STATE_UNKN) itr->second.status_flags = new_status_flags ;
|
||||
if(new_status_flags != RS_GROUTER_ROUTING_STATE_UNKN)
|
||||
itr->second.status_flags = new_status_flags ;
|
||||
|
||||
if(returned_ack != RS_GROUTER_ACK_STATE_UNKN)
|
||||
sendACK(item->PeerId(),item->routing_id,returned_ack) ;
|
||||
|
||||
@ -944,7 +982,7 @@ bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientSe
|
||||
return true ;
|
||||
}
|
||||
|
||||
void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item)
|
||||
void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& propagation_id)
|
||||
{
|
||||
RsStackMutex mtx(grMtx) ;
|
||||
// push the item into pending messages.
|
||||
@ -963,7 +1001,6 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
|
||||
|
||||
// Make sure we have a unique id (at least locally).
|
||||
//
|
||||
GRouterMsgPropagationId propagation_id ;
|
||||
do { propagation_id = RSRandom::random_u32(); } while(_pending_messages.find(propagation_id) != _pending_messages.end()) ;
|
||||
|
||||
item->destination_key = destination ;
|
||||
@ -1070,7 +1107,7 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
|
||||
info.published_keys.clear() ;
|
||||
|
||||
std::set<RsPeerId> ids ;
|
||||
mServiceControl->getPeersConnected(RS_SERVICE_TYPE_GROUTER,ids) ;
|
||||
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
|
||||
|
||||
RsStackMutex mtx(grMtx) ;
|
||||
|
||||
|
@ -85,9 +85,10 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
|
||||
//===================================================//
|
||||
|
||||
// Sends an item to the given destination. The router takes ownership of
|
||||
// the memory. That means item_data will be erase on return.
|
||||
// the memory. That means item_data will be erase on return. The returned id should be
|
||||
// remembered by the client, so that he knows when the data has been received.
|
||||
//
|
||||
void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item) ;
|
||||
void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) ;
|
||||
|
||||
// Sends an ACK to the origin of the msg. This is used to notify for
|
||||
// unfound route, or message correctly received, depending on the particular situation.
|
||||
@ -170,6 +171,8 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
|
||||
static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ;
|
||||
static time_t computeNextTimeDelay(time_t duration) ;
|
||||
|
||||
void locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const ;
|
||||
|
||||
uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ;
|
||||
|
||||
//===================================================//
|
||||
|
@ -85,7 +85,7 @@ class RsGRouter
|
||||
// Communication to other services. //
|
||||
//===================================================//
|
||||
|
||||
virtual void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item) =0;
|
||||
virtual void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) =0;
|
||||
virtual bool registerKey(const GRouterKeyId& key,const GRouterServiceId& client_id,const std::string& description_string) =0;
|
||||
};
|
||||
|
||||
|
@ -1567,10 +1567,8 @@ int RsServer::StartupRetroShare()
|
||||
// Services that have been changed to pqiServiceMonitor
|
||||
serviceCtrl->registerServiceMonitor(msgSrv, msgSrv->getServiceInfo().mServiceType);
|
||||
serviceCtrl->registerServiceMonitor(mDisc, mDisc->getServiceInfo().mServiceType);
|
||||
serviceCtrl->registerServiceMonitor(mStatusSrv,
|
||||
mStatusSrv->getServiceInfo().mServiceType);
|
||||
serviceCtrl->registerServiceMonitor(chatSrv,
|
||||
chatSrv->getServiceInfo().mServiceType);
|
||||
serviceCtrl->registerServiceMonitor(mStatusSrv, mStatusSrv->getServiceInfo().mServiceType);
|
||||
serviceCtrl->registerServiceMonitor(chatSrv, chatSrv->getServiceInfo().mServiceType);
|
||||
serviceCtrl->registerServiceMonitor(mBwCtrl, mDisc->getServiceInfo().mServiceType);
|
||||
|
||||
/**************************************************************************/
|
||||
|
@ -431,7 +431,6 @@ const uint32_t RS_MSG_FLAGS_REPLIED = 0x00000080;
|
||||
const uint32_t RS_MSG_FLAGS_FORWARDED = 0x00000100;
|
||||
const uint32_t RS_MSG_FLAGS_STAR = 0x00000200;
|
||||
const uint32_t RS_MSG_FLAGS_PARTIAL = 0x00000400;
|
||||
// system message
|
||||
const uint32_t RS_MSG_FLAGS_USER_REQUEST = 0x00000800;
|
||||
const uint32_t RS_MSG_FLAGS_FRIEND_RECOMMENDATION = 0x00001000;
|
||||
const uint32_t RS_MSG_FLAGS_SYSTEM = RS_MSG_FLAGS_USER_REQUEST | RS_MSG_FLAGS_FRIEND_RECOMMENDATION;
|
||||
@ -442,6 +441,7 @@ const uint32_t RS_MSG_FLAGS_SIGNATURE_CHECKS = 0x00010000;
|
||||
const uint32_t RS_MSG_FLAGS_SIGNED = 0x00020000;
|
||||
const uint32_t RS_MSG_FLAGS_LOAD_EMBEDDED_IMAGES = 0x00040000;
|
||||
const uint32_t RS_MSG_FLAGS_DECRYPTED = 0x00080000;
|
||||
const uint32_t RS_MSG_FLAGS_ROUTED = 0x00100000;
|
||||
|
||||
class RsMessageItem: public RsItem
|
||||
{
|
||||
|
@ -298,16 +298,6 @@ void p3MsgService::checkSizeAndSendMessage(RsMsgItem *msg)
|
||||
|
||||
std::cerr << "Msg is size " << msg->message.size() << std::endl;
|
||||
|
||||
if( msg->msgFlags & RS_MSG_FLAGS_DISTANT )
|
||||
{
|
||||
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << "checkOutgoingMessages(): removing pending message flag for peer id " << msg->PeerId() << "." << std::endl;
|
||||
#endif
|
||||
_messenging_contacts[GRouterKeyId(msg->PeerId())].pending_messages = false ;
|
||||
}
|
||||
|
||||
while(msg->message.size() > MAX_STRING_SIZE)
|
||||
{
|
||||
// chop off the first 15000 wchars
|
||||
@ -366,7 +356,9 @@ int p3MsgService::checkOutgoingMessages()
|
||||
/* find the certificate */
|
||||
RsPeerId pid = mit->second->PeerId();
|
||||
|
||||
if( pid == ownId || (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) || mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */
|
||||
if( pid == ownId
|
||||
|| ( (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && (!(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED)))
|
||||
|| mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */
|
||||
{
|
||||
/* send msg */
|
||||
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
|
||||
@ -375,10 +367,17 @@ int p3MsgService::checkOutgoingMessages()
|
||||
(mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING;
|
||||
|
||||
output_queue.push_back(mit->second) ;
|
||||
toErase.push_back(mit->first);
|
||||
|
||||
// When the message is a distant msg, dont remove it yet from the list. Only mark it as being sent, so that we don't send it again.
|
||||
//
|
||||
if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT))
|
||||
{
|
||||
toErase.push_back(mit->first);
|
||||
changed = true ;
|
||||
}
|
||||
else
|
||||
mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED ;
|
||||
}
|
||||
else
|
||||
{
|
||||
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
|
||||
@ -2151,7 +2150,45 @@ void p3MsgService::sendGRouterData(const GRouterKeyId& key_id,RsMsgItem *msgitem
|
||||
|
||||
delete[] msg_serialized_data ;
|
||||
|
||||
mGRouter->sendData(key_id,item) ;
|
||||
GRouterMsgPropagationId grouter_message_id ;
|
||||
|
||||
mGRouter->sendData(key_id,item,grouter_message_id) ;
|
||||
|
||||
// now store the grouter id along with the message id, so that we can keep track of received messages
|
||||
|
||||
_ongoing_messages[grouter_message_id] = msgitem->msgId ;
|
||||
}
|
||||
void p3MsgService::acknowledgeDataReceived(const GRouterMsgPropagationId& id)
|
||||
{
|
||||
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
|
||||
#endif
|
||||
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
|
||||
|
||||
if(it == _ongoing_messages.end())
|
||||
{
|
||||
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
uint32_t msg_id = it->second ;
|
||||
|
||||
// we should now remove the item from the msgOutgoing list.
|
||||
|
||||
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id) ;
|
||||
|
||||
if(it2 == msgOutgoing.end())
|
||||
{
|
||||
std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
delete it2->second ;
|
||||
msgOutgoing.erase(it2) ;
|
||||
|
||||
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD);
|
||||
IndicateConfigChanged() ;
|
||||
}
|
||||
void p3MsgService::receiveGRouterData(const GRouterKeyId& key, const RsGRouterGenericDataItem *gitem)
|
||||
{
|
||||
@ -2179,22 +2216,6 @@ void p3MsgService::sendPrivateMsgItem(RsMsgItem *msgitem)
|
||||
std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item to peer " << msgitem->PeerId() << std::endl;
|
||||
#endif
|
||||
GRouterKeyId key_id(msgitem->PeerId()) ;
|
||||
{
|
||||
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
|
||||
// allocate a new contact. If it does not exist, set its tunnel state to DN
|
||||
//
|
||||
std::map<GRouterKeyId,DistantMessengingContact>::iterator it = _messenging_contacts.find(key_id) ;
|
||||
|
||||
if(it == _messenging_contacts.end())
|
||||
{
|
||||
std::cerr << "(EE) p3MsgService::sendPrivateMsgItem(): ERROR: no tunnel for message to send. This should not happen. " << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
if(!it->second.pending_messages)
|
||||
std::cerr << "(WW) p3MsgService::sendPrivateMsgItem(): WARNING: no pending message flag. This should not happen. " << std::endl;
|
||||
}
|
||||
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << " Flushing msg " << msgitem->msgId << " for peer id " << msgitem->PeerId() << std::endl;
|
||||
|
@ -132,11 +132,12 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
|
||||
// This contains the ongoing tunnel handling contacts.
|
||||
// The map is indexed by the hash
|
||||
//
|
||||
std::map<GRouterKeyId,DistantMessengingContact> _messenging_contacts ;
|
||||
std::map<GRouterMsgPropagationId,uint32_t> _ongoing_messages ;
|
||||
|
||||
// Overloaded from GRouterClientService
|
||||
|
||||
virtual void receiveGRouterData(const GRouterKeyId& key,const RsGRouterGenericDataItem *item) ;
|
||||
virtual void acknowledgeDataReceived(const GRouterMsgPropagationId& msg_id) ;
|
||||
|
||||
// Utility functions
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user