distant outgoing messages stay in outbox until actually sent

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6661 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2013-08-31 21:10:04 +00:00
parent 9488a6a33d
commit 4c87eb0781
2 changed files with 70 additions and 37 deletions

View File

@ -103,9 +103,10 @@ int p3MsgService::tick()
if(now > last_management_time + 5) if(now > last_management_time + 5)
{ {
manageDistantPeers() ; manageDistantPeers() ;
checkOutgoingMessages();
last_management_time = now ; last_management_time = now ;
} }
//checkOutgoingMessages();
return 0; return 0;
} }
@ -306,8 +307,38 @@ int p3MsgService::checkOutgoingMessages()
/* find the certificate */ /* find the certificate */
std::string pid = mit->second->PeerId(); std::string pid = mit->second->PeerId();
bool tunnel_is_ok = false ;
if(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT || mLinkMgr->isOnline(pid) || pid == ownId) /* FEEDBACK Msg to Ourselves */ if(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT)
{
// Do we have a tunnel already?
//
const std::string& hash = mit->second->PeerId() ;
std::map<std::string,DistantMessengingContact>::iterator it = _messenging_contacts.find(hash) ;
if(it != _messenging_contacts.end())
{
tunnel_is_ok = (it->second.status == RS_DISTANT_MSG_STATUS_TUNNEL_OK) ;
#ifdef DEBUG_DISTANT_MSG
std::cerr << "checkOutGoingMessages(): distant contact found. tunnel_is_ok = " << tunnel_is_ok << std::endl;
#endif
}
else
{
#ifdef DEBUG_DISTANT_MSG
std::cerr << "checkOutGoingMessages(): distant contact not found. Asking for tunnels for hash " << hash << std::endl;
#endif
// no. Ask for monitoring tunnels.
rsTurtle->monitorTunnels(hash,this) ;
tunnel_is_ok = false ;
DistantMessengingContact& contact( _messenging_contacts[hash] ) ;
contact.status = RS_DISTANT_MSG_STATUS_TUNNEL_DN ;
contact.pending_messages = true ;
}
}
if(tunnel_is_ok || mLinkMgr->isOnline(pid) || pid == ownId) /* FEEDBACK Msg to Ourselves */
{ {
/* send msg */ /* send msg */
pqioutput(PQL_DEBUG_BASIC, msgservicezone, pqioutput(PQL_DEBUG_BASIC, msgservicezone,
@ -350,8 +381,20 @@ int p3MsgService::checkOutgoingMessages()
} }
for(std::list<RsMsgItem*>::const_iterator it(output_queue.begin());it!=output_queue.end();++it) for(std::list<RsMsgItem*>::const_iterator it(output_queue.begin());it!=output_queue.end();++it)
{
checkSizeAndSendMessage(*it) ; checkSizeAndSendMessage(*it) ;
if( (*it)->msgFlags & RS_MSG_FLAGS_DISTANT )
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
_messenging_contacts[(*it)->PeerId()].pending_messages = false ;
#ifdef DEBUG_DISTANT_MSG
std::cerr << "checkOutgoingMessages(): removing pending message flag for hash " << (*it)->PeerId() << "." << std::endl;
#endif
}
}
if(changed) if(changed)
rsicontrol->getNotify().notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD); rsicontrol->getNotify().notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD);
@ -2128,29 +2171,6 @@ void p3MsgService::manageDistantPeers()
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << "p3MsgService::manageDistantPeers()" << std::endl; std::cerr << "p3MsgService::manageDistantPeers()" << std::endl;
#endif #endif
std::vector<std::pair<std::string,RsMsgItem*> > to_send ;
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
for(std::map<std::string,DistantMessengingContact>::iterator it(_messenging_contacts.begin());it!=_messenging_contacts.end();++it)
if(it->second.status == RS_DISTANT_MSG_STATUS_TUNNEL_OK)
{
for(uint32_t i=0;i<it->second.pending_messages.size();++i)
to_send.push_back(std::pair<std::string,RsMsgItem*>(it->first,it->second.pending_messages[i])) ;
it->second.pending_messages.clear() ;
}
}
for(uint32_t i=0;i<to_send.size();++i)
{
#ifdef DEBUG_DISTANT_MSG
std::cerr << " Flushing msg " << to_send[i].second->msgId << std::endl;
#endif
sendTurtleData(to_send[i].first,to_send[i].second) ;
}
time_t now = time(NULL) ; time_t now = time(NULL) ;
{ {
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
@ -2174,7 +2194,7 @@ void p3MsgService::manageDistantPeers()
// clean dead contacts. // clean dead contacts.
// //
for(std::map<std::string,DistantMessengingContact>::iterator it(_messenging_contacts.begin());it!=_messenging_contacts.end();) for(std::map<std::string,DistantMessengingContact>::iterator it(_messenging_contacts.begin());it!=_messenging_contacts.end();)
if(it->second.pending_messages.empty() && it->second.status == RS_DISTANT_MSG_STATUS_TUNNEL_DN) if((!it->second.pending_messages) && it->second.status == RS_DISTANT_MSG_STATUS_TUNNEL_DN)
{ {
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << " Removing dead contact with no pending msgs and dead tunnel. hash=" << it->first << std::endl; std::cerr << " Removing dead contact with no pending msgs and dead tunnel. hash=" << it->first << std::endl;
@ -2221,7 +2241,7 @@ void p3MsgService::removeVirtualPeer(const TurtleFileHash& hash, const TurtleVir
contact.status = RS_DISTANT_MSG_STATUS_TUNNEL_DN ; contact.status = RS_DISTANT_MSG_STATUS_TUNNEL_DN ;
contact.virtual_peer_id.clear() ; contact.virtual_peer_id.clear() ;
if(contact.pending_messages.empty()) if(!contact.pending_messages)
remove_tunnel = true ; remove_tunnel = true ;
} }
@ -2364,26 +2384,39 @@ void p3MsgService::sendPrivateMsgItem(RsMsgItem *msgitem)
{ {
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item to peer " << msgitem->PeerId() << std::endl; std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item to peer " << msgitem->PeerId() << std::endl;
std::cerr << " asking for tunnels" << std::endl; // std::cerr << " asking for tunnels" << std::endl;
std::cerr << " recording msg info" << std::endl; // std::cerr << " recording msg info" << std::endl;
#endif #endif
const std::string& hash = msgitem->PeerId() ; // const std::string& hash = msgitem->PeerId() ;
rsTurtle->monitorTunnels(hash,this) ; // create a tunnel for it, and put the msg on the waiting list. // rsTurtle->monitorTunnels(hash,this) ; // create a tunnel for it, and put the msg on the waiting list.
{ {
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
// allocate a new contact. If it does not exist, set its tunnel state to DN // allocate a new contact. If it does not exist, set its tunnel state to DN
// //
std::map<std::string,DistantMessengingContact>::iterator it = _messenging_contacts.find(hash) ; std::map<std::string,DistantMessengingContact>::iterator it = _messenging_contacts.find(msgitem->PeerId()) ;
DistantMessengingContact& contact( _messenging_contacts[hash] ) ;
if(it == _messenging_contacts.end()) if(it == _messenging_contacts.end())
contact.status = RS_DISTANT_MSG_STATUS_TUNNEL_DN ; {
std::cerr << "(EE) p3MsgService::sendPrivateMsgItem(): ERROR: no tunnel for message to send. This should not happen. " << std::endl;
return ;
}
contact.pending_messages.push_back(msgitem) ; // record the msg to be sent. if(it->second.status != RS_DISTANT_MSG_STATUS_TUNNEL_OK)
{
std::cerr << "(WW) p3MsgService::sendPrivateMsgItem(): WARNING: no tunnel for message to send. This should not happen. " << std::endl;
return ;
}
if(!it->second.pending_messages)
std::cerr << "(WW) p3MsgService::sendPrivateMsgItem(): WARNING: no pending message flag. This should not happen. " << std::endl;
} }
#ifdef DEBUG_DISTANT_MSG
std::cerr << " Flushing msg " << msgitem->msgId << " for peer id " << msgitem->PeerId() << std::endl;
#endif
sendTurtleData(msgitem->PeerId(),msgitem) ;
} }

View File

@ -113,7 +113,7 @@ int checkOutgoingMessages();
time_t last_hit_time ; time_t last_hit_time ;
std::string virtual_peer_id ; std::string virtual_peer_id ;
uint32_t status ; uint32_t status ;
std::vector<RsMsgItem*> pending_messages ; bool pending_messages ;
}; };
bool createDistantOfflineMessengingInvite(time_t time_of_validity,TurtleFileHash& hash) ; bool createDistantOfflineMessengingInvite(time_t time_of_validity,TurtleFileHash& hash) ;