fixed per-friend message passing and cleaning. Next steps: debug multi-step friend transmission

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8136 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-04-15 20:48:30 +00:00
parent 20deecd6d3
commit 5785ddca84
2 changed files with 78 additions and 46 deletions

View File

@ -265,7 +265,7 @@ class RsGRouterMatrixFriendListItem: public RsGRouterItem
class RsGRouterRoutingInfoItem: public RsGRouterItem, public GRouterRoutingInfo, public RsGRouterNonCopyableObject class RsGRouterRoutingInfoItem: public RsGRouterItem, public GRouterRoutingInfo, public RsGRouterNonCopyableObject
{ {
public: public:
RsGRouterRoutingInfoItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO) RsGRouterRoutingInfoItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO)
{ setPriorityLevel(0) ; } // this item is never sent through the network { setPriorityLevel(0) ; } // this item is never sent through the network
virtual ~RsGRouterRoutingInfoItem() { clear() ; } virtual ~RsGRouterRoutingInfoItem() { clear() ; }

View File

@ -254,7 +254,7 @@
#include "grouterclientservice.h" #include "grouterclientservice.h"
/**********************/ /**********************/
#define GROUTER_DEBUG //#define GROUTER_DEBUG
/**********************/ /**********************/
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response. static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
@ -939,12 +939,12 @@ void p3GRouter::routePendingObjects()
// For now, disable tunnels. We'll first check that the good old tunnel system works as before. // For now, disable tunnels. We'll first check that the good old tunnel system works as before.
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS) if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
locked_collectAvailableTunnels(it->second.tunnel_hash,peers); locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
// For now, disable friends. We'll first check that the good old tunnel system works as before. // For now, disable friends. We'll first check that the good old tunnel system works as before.
// if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS) //if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
// locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN); // locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
if(peers.empty()) if(peers.empty())
{ {
@ -1153,32 +1153,32 @@ bool p3GRouter::locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTr
void p3GRouter::autoWash() void p3GRouter::autoWash()
{ {
bool items_deleted = false ; bool items_deleted = false ;
std::map<GRouterMsgPropagationId,std::pair<GRouterClientService *,uint32_t> > notified_msgs ; std::map<GRouterMsgPropagationId,GRouterClientService *> failed_msgs ;
{ {
RS_STACK_MUTEX(grMtx) ; RS_STACK_MUTEX(grMtx) ;
time_t now = time(NULL) ; time_t now = time(NULL) ;
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();) for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();)
if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK if( it->second.data_status == RS_GROUTER_DATA_STATUS_DONE
|| ((it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now) && !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN))) // is the item too old for cache || ((it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
{ && !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN))) // is the item too old for cache
{
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK) grouter_debug() << " Removing too-old cached item " << std::hex << it->first << std::dec << std::endl;
grouter_debug() << " Removing received cached item " << std::hex << it->first << std::dec << std::endl;
else
grouter_debug() << " Removing too-old cached item " << std::hex << it->first << std::dec << std::endl;
#endif #endif
GRouterClientService *client = NULL ; GRouterClientService *client = NULL ;
GRouterServiceId service_id = 0; GRouterServiceId service_id = 0;
uint32_t status = (it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)?GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED:GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED ; if( it->second.data_status != RS_GROUTER_DATA_STATUS_DONE )
{
if(!locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client,service_id)) if(!locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client,service_id))
std::cerr << " ERROR: cannot find client for cancelled message " << it->first << std::endl; std::cerr << " ERROR: cannot find client for cancelled message " << it->first << std::endl;
else else
notified_msgs[it->first] = std::make_pair(client,status) ; failed_msgs[it->first] = client;
}
delete it->second.data_item ; delete it->second.data_item ;
if(it->second.receipt_item != NULL) if(it->second.receipt_item != NULL)
@ -1212,11 +1212,10 @@ void p3GRouter::autoWash()
} }
// look into pending items. // look into pending items.
#warning move the notification for received messages in the handlign function of signed receipts. Keep the notification for unsent here. for(std::map<GRouterMsgPropagationId,GRouterClientService*>::const_iterator it(failed_msgs.begin());it!=failed_msgs.end();++it)
for(std::map<GRouterMsgPropagationId,std::pair<GRouterClientService*,uint32_t> >::const_iterator it(notified_msgs.begin());it!=notified_msgs.end();++it)
{ {
std::cerr << " notifying client for message id " << std::hex << it->first << " state = " << it->second.second << std::endl; std::cerr << " notifying client for message id " << std::hex << it->first << " state = FAILED" << std::endl;
it->second.first->notifyDataStatus(it->first, it->second.second) ; it->second->notifyDataStatus(it->first ,GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) ;
} }
#warning should we also clean incoming data pipes? #warning should we also clean incoming data pipes?
@ -1331,6 +1330,10 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
// first check if we're a proxy or not. We also remove the message from the global router sending list. // first check if we're a proxy or not. We also remove the message from the global router sending list.
// in the proxy case, we should only store the receipt. // in the proxy case, we should only store the receipt.
GRouterClientService *client_service = NULL;
GRouterServiceId service_id ;
GRouterMsgPropagationId mid = 0 ;
{ {
RS_STACK_MUTEX (grMtx) ; RS_STACK_MUTEX (grMtx) ;
@ -1362,14 +1365,40 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
std::cerr << " removing messsage from cache." << std::endl; std::cerr << " removing messsage from cache." << std::endl;
#endif #endif
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN)
{
#ifdef GROUTER_DEBUG
std::cerr << " message is at origin. Setting message transmission to DONE" << std::endl;
#endif
it->second.data_status = RS_GROUTER_DATA_STATUS_DONE;
}
else
{
#ifdef GROUTER_DEBUG
std::cerr << " message is not at origin. Setting message transmission to RECEIPT_OK" << std::endl;
#endif
it->second.data_status = RS_GROUTER_DATA_STATUS_RECEIPT_OK; it->second.data_status = RS_GROUTER_DATA_STATUS_RECEIPT_OK;
it->second.receipt_item = receipt_item->duplicate() ; it->second.receipt_item = receipt_item->duplicate() ;
}
if(locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client_service,service_id))
mid = it->first ;
else
{
mid = 0 ;
std::cerr << " ERROR: cannot retrieve service ID for message " << std::hex << it->first << std::dec << std::endl;
}
changed = true ; changed = true ;
} }
if(mid != 0)
{
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " notifying client that the msg was received." << std::endl; std::cerr << " notifying client " << (void*)client_service << " that msg " << std::hex << mid << std::dec << " was received." << std::endl;
#endif #endif
client_service->notifyDataStatus(mid, GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) ;
}
if(changed) if(changed)
IndicateConfigChanged() ; IndicateConfigChanged() ;
@ -1436,7 +1465,7 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl; std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl;
#warning we should make sure there's no duplicates. Possibly turn RsTlvIdSet.ids into a std::set! #warning we should make sure there is no duplicates. Possibly turn RsTlvIdSet.ids into a std::set!
if(!mTurtle->isTurtlePeer(data_item->PeerId())) if(!mTurtle->isTurtlePeer(data_item->PeerId()))
_pending_messages[data_item->routing_id].incoming_routes.ids.push_back(data_item->PeerId()) ; _pending_messages[data_item->routing_id].incoming_routes.ids.push_back(data_item->PeerId()) ;
@ -1868,44 +1897,47 @@ void p3GRouter::makeGxsIdAndClientId(const TurtleFileHash& sum,RsGxsId& gxs_id,G
} }
bool p3GRouter::loadList(std::list<RsItem*>& items) bool p3GRouter::loadList(std::list<RsItem*>& items)
{ {
{
RS_STACK_MUTEX(grMtx) ; RS_STACK_MUTEX(grMtx) ;
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
grouter_debug() << "p3GRouter::loadList() : " << std::endl; grouter_debug() << "p3GRouter::loadList() : " << std::endl;
#endif #endif
_routing_matrix.loadList(items) ; _routing_matrix.loadList(items) ;
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
// remove all existing objects. // remove all existing objects.
// //
grouter_debug() << " removing all existing items (" << _pending_messages.size() << " items to delete)." << std::endl; grouter_debug() << " removing all existing items (" << _pending_messages.size() << " items to delete)." << std::endl;
#endif #endif
if(!_pending_messages.empty()) if(!_pending_messages.empty())
std::cerr << " WARNING: pending msg list is not empty. List will be cleared." << std::endl; std::cerr << " WARNING: pending msg list is not empty. List will be cleared." << std::endl;
// clear the existing list. // clear the existing list.
// //
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it) for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
delete it->second.data_item ; delete it->second.data_item ;
_pending_messages.clear() ; _pending_messages.clear() ;
for(std::list<RsItem*>::const_iterator it(items.begin());it!=items.end();++it) for(std::list<RsItem*>::const_iterator it(items.begin());it!=items.end();++it)
{
RsGRouterRoutingInfoItem *itm1 = NULL ;
if(NULL != (itm1 = dynamic_cast<RsGRouterRoutingInfoItem*>(*it)))
{ {
_pending_messages[itm1->data_item->routing_id] = *itm1 ; RsGRouterRoutingInfoItem *itm1 = NULL ;
//_pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication.
itm1->data_item = NULL ; // prevents deletion. if(NULL != (itm1 = dynamic_cast<RsGRouterRoutingInfoItem*>(*it)))
} {
_pending_messages[itm1->data_item->routing_id] = *itm1 ;
//_pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication.
delete *it ; itm1->data_item = NULL ; // prevents deletion.
}
delete *it ;
}
} }
debugDump();
return true ; return true ;
} }
bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items) bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items)
@ -2025,7 +2057,7 @@ void p3GRouter::debugDump()
grouter_debug() << " Data items: " << std::endl; grouter_debug() << " Data items: " << std::endl;
static const std::string statusString[5] = { "Unkn","Pend","Sent","Ackn","Dead" }; static const std::string statusString[6] = { "Unkn","Pend","Sent","Receipt OK","Ongoing","Done" };
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it) for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
{ {