fixed but in re-sending of failed grouter items

This commit is contained in:
csoler 2016-02-01 00:33:11 -05:00
parent aa194caea3
commit 6d1a3937d6
5 changed files with 41 additions and 16 deletions

View File

@ -61,7 +61,7 @@ public:
// This method is called by the global router when a message has been received, or cannot be sent, etc.
//
virtual void notifyDataStatus(const GRouterMsgPropagationId& received_id,uint32_t data_status)
virtual void notifyDataStatus(const GRouterMsgPropagationId& received_id,const RsGxsId& signer_id,uint32_t data_status)
{
std::cerr << "!!!!!! Received Data status from global router, but the client service is not handling it !!!!!!!!!!" << std::endl ;
std::cerr << " message ID = " << received_id << std::endl;

View File

@ -51,6 +51,7 @@ static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wa
static const uint32_t MAX_TUNNEL_UNMANAGED_TIME = 600 ; // min time before retry tunnels for that msg.
static const uint32_t MAX_DELAY_FOR_RESEND = 2*86400+300 ; // re-send if held for more than 2 days (cache store period) plus security delay.
static const uint32_t MAX_DESTINATION_KEEP_TIME = 20*86400 ; // keep for 20 days in destination cache to avoid re-
static const uint32_t MAX_RECEIPT_KEEP_TIME = 20*86400 ; // keep for 20 days in destination cache to avoid re-
static const uint32_t TUNNEL_OK_WAIT_TIME = 2 ; // wait for 2 seconds after last tunnel ok, so that we have a complete set of tunnels.
static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary.
static const uint32_t MAX_TRANSACTION_ACK_WAITING_TIME = 60 ; // wait for at most 60 secs for a ACK. If not restart the transaction.

View File

@ -1286,18 +1286,30 @@ void p3GRouter::autoWash()
bool items_deleted = false ;
time_t now = time(NULL) ;
std::map<GRouterMsgPropagationId,GRouterClientService *> failed_msgs ;
std::map<GRouterMsgPropagationId,std::pair<GRouterClientService *,RsGxsId> > failed_msgs ;
{
RS_STACK_MUTEX(grMtx) ;
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();)
if( (it->second.data_status == RS_GROUTER_DATA_STATUS_DONE && (!(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION)
|| it->second.received_time_TS + MAX_DESTINATION_KEEP_TIME < now))
|| ((it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
&& !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN)
&& !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION)
)) // is the item too old for cache
{
bool delete_entry = false ;
if(it->second.data_status == RS_GROUTER_DATA_STATUS_DONE )
{
if(!(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION) || it->second.received_time_TS + MAX_DESTINATION_KEEP_TIME < now) // is the item too old for cache
delete_entry = true ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK )
{
if(it->second.received_time_TS + MAX_RECEIPT_KEEP_TIME < now) // is the item too old for cache
delete_entry = true ;
}
else
if(it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
delete_entry = true ;
if(delete_entry)
{
#ifdef GROUTER_DEBUG
grouter_debug() << " Removing cached item " << std::hex << it->first << std::dec << std::endl;
@ -1310,7 +1322,7 @@ void p3GRouter::autoWash()
GRouterClientService *client = NULL;
if(locked_getLocallyRegisteredClientFromServiceId(it->second.client_id,client))
failed_msgs[it->first] = client ;
failed_msgs[it->first] = std::make_pair(client,it->second.data_item->signature.keyId) ;
else
std::cerr << " ERROR: client id " << it->second.client_id << " not registered. Consistency error." << std::endl;
}
@ -1329,6 +1341,7 @@ void p3GRouter::autoWash()
}
else
++it ;
}
// also check all existing tunnels
@ -1368,12 +1381,12 @@ void p3GRouter::autoWash()
}
// Look into pending items.
for(std::map<GRouterMsgPropagationId,GRouterClientService*>::const_iterator it(failed_msgs.begin());it!=failed_msgs.end();++it)
for(std::map<GRouterMsgPropagationId,std::pair<GRouterClientService*,RsGxsId> >::const_iterator it(failed_msgs.begin());it!=failed_msgs.end();++it)
{
#ifdef GROUTER_DEBUG
std::cerr << " notifying client for message id " << std::hex << it->first << " state = FAILED" << std::endl;
#endif
it->second->notifyDataStatus(it->first ,GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) ;
it->second.first->notifyDataStatus(it->first,it->second.second ,GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) ;
}
if(items_deleted)
@ -1481,6 +1494,7 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
std::cerr << "Item content:" << std::endl;
receipt_item->print(std::cerr,2) ;
#endif
RsGxsId signer_id ;
// Because we don't do proxy-transmission yet, the client needs to be notified. Otherwise, we will need to
// first check if we're a proxy or not. We also remove the message from the global router sending list.
@ -1498,6 +1512,7 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
std::cerr << " ERROR: no routing ID corresponds to this message. Inconsistency!" << std::endl;
return ;
}
signer_id = it->second.data_item->signature.keyId ;
// check hash.
@ -1554,7 +1569,7 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
#ifdef GROUTER_DEBUG
std::cerr << " notifying client " << (void*)client_service << " that msg " << std::hex << mid << std::dec << " was received." << std::endl;
#endif
client_service->notifyDataStatus(mid, GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) ;
client_service->notifyDataStatus(mid, signer_id, GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) ;
}
// also note the incoming route in the routing matrix

View File

@ -1852,7 +1852,7 @@ void p3MsgService::manageDistantPeers()
}
}
void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t data_status)
void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id, const RsGxsId &signer_id, uint32_t data_status)
{
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
{
@ -1868,6 +1868,7 @@ void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t d
}
uint32_t msg_id = it->second ;
std::cerr << " message id = " << msg_id << std::endl;
mDistantOutgoingMsgSigners[msg_id] = signer_id ; // this is needed because it's not saved in config, but we should probably include it in _ongoing_messages
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id) ;
@ -1991,7 +1992,15 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
{
RS_STACK_MUTEX(mMsgMtx) ;
signing_key_id = mDistantOutgoingMsgSigners[msgitem->msgId] ;
std::map<uint32_t,RsGxsId>::const_iterator it = mDistantOutgoingMsgSigners.find(msgitem->msgId) ;
if(it == mDistantOutgoingMsgSigners.end())
{
std::cerr << "(EE) no signer registered for distant message " << msgitem->msgId << ". Cannot send!" << std::endl;
return ;
}
signing_key_id = it->second ;
if(signing_key_id.isNull())
{

View File

@ -142,7 +142,7 @@ private:
virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ;
virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ;
virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,uint32_t data_status) ;
virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,const RsGxsId& signer_id,uint32_t data_status) ;
// Utility functions