- added client notification to grouter

- now distant messages stay in outgoing box until notified to be received.
- fixed serialisation bug



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7293 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2014-04-21 12:39:30 +00:00
parent a3ddc0ac7d
commit 4d2907efed
6 changed files with 36 additions and 49 deletions

View File

@ -259,6 +259,7 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI
ok &= getRawUInt32(data, pktsize, &offset, &item->status_flags);
ok &= item->origin.deserialise(data, pktsize, offset) ;
ok &= getRawTimeT(data, pktsize, &offset, item->received_time);
ok &= getRawUInt32(data, pktsize, &offset, &item->client_id);
uint32_t s = 0 ;
ok &= getRawUInt32(data, pktsize, &offset, &s) ;
@ -276,6 +277,10 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI
}
item->data_item = deserialise_RsGRouterGenericDataItem(&((uint8_t*)data)[offset],pktsize - offset) ;
if(item->data_item != NULL)
offset += item->data_item->serial_size() ;
else
ok = false ;
if (offset != rssize || !ok)
{
@ -453,8 +458,9 @@ uint32_t RsGRouterRoutingInfoItem::serial_size() const
s += origin.serial_size() ; // origin
s += 8 ; // received_time
s += 4 ; // tried_friends.size() ;
s += sizeof(GRouterServiceId) ; // service_id
s += tried_friends.size() * ( RsPeerId::SIZE_IN_BYTES + 8 + 4 + 4 ) ; // FriendTrialRecord
s += data_item->serial_size(); // data_item
s += data_item->serial_size(); // data_item
return s ;
}
@ -538,6 +544,7 @@ bool RsGRouterRoutingInfoItem::serialise(void *data,uint32_t& size) const
ok &= setRawUInt32(data, tlvsize, &offset, status_flags) ;
ok &= origin.serialise(data, tlvsize, offset) ;
ok &= setRawTimeT(data, tlvsize, &offset, received_time) ;
ok &= setRawUInt32(data, tlvsize, &offset, client_id) ;
ok &= setRawUInt32(data, tlvsize, &offset, tried_friends.size()) ;
for(std::list<FriendTrialRecord>::const_iterator it(tried_friends.begin());it!=tried_friends.end();++it)
@ -602,6 +609,7 @@ std::ostream& RsGRouterRoutingInfoItem::print(std::ostream& o, uint16_t)
o << " flags: "<< std::hex << status_flags << std::dec << std::endl ;
o << " Key: "<< data_item->destination_key.toStdString() << std::endl ;
o << " Data size: "<< data_item->data_size << std::endl ;
o << " Client id: "<< client_id << std::endl ;
o << " Tried friends: "<< tried_friends.size() << std::endl;
return o ;

View File

@ -38,7 +38,7 @@ const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x05 ; // used to send da
const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues
const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x85 ; // item to save routing info
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x86 ; // item to save routing info
const uint8_t QOS_PRIORITY_RS_GROUTER_PUBLISH_KEY = 3 ; // slow items. No need to congest the network with this.
const uint8_t QOS_PRIORITY_RS_GROUTER_ACK = 3 ;

View File

@ -85,6 +85,7 @@ class GRouterRoutingInfo
std::list<FriendTrialRecord> tried_friends ; // list of friends to which the item was sent ordered with time.
GRouterKeyId destination_key ; // ultimate destination for this key
GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId
RsGRouterGenericDataItem *data_item ;
};

View File

@ -208,6 +208,7 @@ p3GRouter::p3GRouter(p3ServiceControl *sc,p3LinkMgr *lm)
_last_debug_output_time = 0 ;
_last_config_changed = 0 ;
_last_matrix_update_time = 0 ;
_debug_enabled = true ;
_random_salt = RSRandom::random_u64() ;
@ -316,7 +317,7 @@ void p3GRouter::autoWash()
_pending_messages.erase(it) ;
it = tmp ;
}
else if(it->second.data_item != NULL && it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && computeNextTimeDelay(it->second.last_sent - it->second.received_time) + it->second.last_sent < now)
else if(it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && computeNextTimeDelay(it->second.last_sent - it->second.received_time) + it->second.last_sent < now)
{
it->second.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
#ifdef GROUTER_DEBUG
@ -388,8 +389,6 @@ void p3GRouter::routePendingObjects()
{
sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_GVNP) ;
it->second.status_flags = RS_GROUTER_ROUTING_STATE_DEAD ;
delete it->second.data_item ;
it->second.data_item = NULL ;
++it ;
continue ;
}
@ -635,25 +634,18 @@ void p3GRouter::handleIncoming()
}
}
void p3GRouter::locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const
void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const
{
#ifdef GROUTER_DEBUG
grouter_debug() << " Key is owned by us. Notifying service that item was ACKed." << std::endl;
grouter_debug() << " Key is owned by us. Notifying service that item was ACKed. msg_id=" << msg_id << ", service_id = " << service_id << "." << std::endl;
#endif
// notify the client
//
std::map<GRouterKeyId, GRouterPublishedKeyInfo>::const_iterator it = _owned_key_ids.find(key) ;
if(it == _owned_key_ids.end())
{
std::cerr << "(EE) key " << key << " is not owned by us. That is a weird situation. Probably a bug!" << std::endl;
return ;
}
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(it->second.service_id) ;
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(service_id) ;
if(its == _registered_services.end())
{
std::cerr << "(EE) key " << key << " is attached to service " << it->second.service_id << ", which is unknown!! That is a bug." << std::endl;
std::cerr << "(EE) message " << msg_id << " is attached to service " << service_id << ", which is unknown!! That is a bug." << std::endl;
return ;
}
its->second->acknowledgeDataReceived(msg_id) ;
@ -720,7 +712,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
uint32_t forward_state = RS_GROUTER_ACK_STATE_UNKN ;
bool update_routing_matrix = false ;
bool should_remove = false ;
bool delete_data = false ;
time_t now = time(NULL) ;
@ -729,7 +720,7 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
case RS_GROUTER_ACK_STATE_RCVD:
if(it->second.origin == mLinkMgr->getOwnId())
{
locked_notifyClientAcknowledged(it->second.destination_key,it->first) ;
locked_notifyClientAcknowledged(it->first,it->second.client_id) ;
should_remove = true ;
} // no break afterwards. That is on purpose!
@ -746,7 +737,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
next_state = RS_GROUTER_ROUTING_STATE_ARVD ;
update_routing_matrix = true ;
delete_data = true ;
break ;
@ -803,7 +793,12 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
#endif
// If no route was found, delete item, but keep the cache entry for a while in order to avoid bouncing.
//
if(it->second.status_flags != RS_GROUTER_ROUTING_STATE_ARVD && next_state != RS_GROUTER_ROUTING_STATE_ARVD)
if(it->second.origin == mLinkMgr->getOwnId())
{
next_state = RS_GROUTER_ROUTING_STATE_SENT ; // Keep it that way until the item gets sent again (turned into PEND)
forward_state = RS_GROUTER_ACK_STATE_UNKN ;
}
else if(it->second.status_flags != RS_GROUTER_ROUTING_STATE_ARVD && next_state != RS_GROUTER_ROUTING_STATE_ARVD)
{
next_state = RS_GROUTER_ROUTING_STATE_DEAD ;
forward_state = RS_GROUTER_ACK_STATE_GVNP ;
@ -828,11 +823,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
}
it->second.status_flags = next_state ;
if(delete_data)
{
delete it->second.data_item ;
it->second.data_item = NULL ;
}
if(should_remove)
{
#ifdef GROUTER_DEBUG
@ -922,6 +912,7 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
info.last_sent = 0 ;
info.destination_key = item->destination_key ;
info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
info.client_id = 0 ;
_pending_messages[item->routing_id] = info ;
itr = _pending_messages.find(item->routing_id) ;
@ -982,7 +973,7 @@ bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientSe
return true ;
}
void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& propagation_id)
void p3GRouter::sendData(const GRouterKeyId& destination,const GRouterServiceId& client_id, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& propagation_id)
{
RsStackMutex mtx(grMtx) ;
// push the item into pending messages.
@ -998,6 +989,7 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
info.last_sent = 0 ;
info.received_time = now ;
info.destination_key = destination ;
info.client_id = client_id ;
// Make sure we have a unique id (at least locally).
//
@ -1015,6 +1007,7 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
grouter_debug() << " distance = " << info.data_item->randomized_distance << std::endl;
grouter_debug() << " origin = " << info.origin.toStdString() << std::endl;
grouter_debug() << " Recv time = " << info.received_time << std::endl;
grouter_debug() << " Client id = " << info.client_id << std::endl;
#endif
_pending_messages[propagation_id] = info ;
@ -1047,6 +1040,8 @@ bool p3GRouter::loadList(std::list<RsItem*>& items)
grouter_debug() << " removing all existing items (" << _pending_messages.size() << " items to delete)." << std::endl;
#endif
// clear the existing list.
//
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
delete it->second.data_item ;
_pending_messages.clear() ;
@ -1058,7 +1053,7 @@ bool p3GRouter::loadList(std::list<RsItem*>& items)
if(NULL != (itm1 = dynamic_cast<RsGRouterRoutingInfoItem*>(*it)))
{
_pending_messages[itm1->data_item->routing_id] = *itm1 ;
_pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication.
//_pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication.
itm1->data_item = NULL ; // prevents deletion.
}
@ -1093,7 +1088,6 @@ bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items)
*(GRouterRoutingInfo*)item = it->second ; // copy all members
item->data_item = it->second.data_item->duplicate() ; // deep copy, because we call delete on the object, and the item might be removed before we handle it in the client.
items.push_back(item) ;
}
@ -1143,7 +1137,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
cinfo.destination = it->second.destination_key ;
cinfo.time_stamp = it->second.received_time ;
cinfo.status = it->second.status_flags ;
cinfo.data_size = (it->second.data_item==NULL)?0:(it->second.data_item->data_size) ;
cinfo.data_size = it->second.data_item->data_size ;
}
return true ;
}

View File

@ -87,8 +87,9 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
// Sends an item to the given destination. The router takes ownership of
// the memory. That means item_data will be erase on return. The returned id should be
// remembered by the client, so that he knows when the data has been received.
// The client id is supplied so that the client can be notified when the data has been received.
//
void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) ;
void sendData(const GRouterKeyId& destination,const GRouterServiceId& client_id, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) ;
// Sends an ACK to the origin of the msg. This is used to notify for
// unfound route, or message correctly received, depending on the particular situation.
@ -171,7 +172,7 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ;
static time_t computeNextTimeDelay(time_t duration) ;
void locked_notifyClientAcknowledged(const GRouterKeyId& key,const GRouterMsgPropagationId& msg_id) const ;
void locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const ;
uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ;
@ -215,23 +216,6 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
//
std::map<GRouterKeyId, GRouterPublishedKeyInfo> _owned_key_ids ;
#ifdef TO_BE_REMOVED
// Key publish cache and buffers
// Handles key publish items routes and forwarding info.
//
// 1 - timestamps of diffused keys received stored by diffusion id.
std::map<GRouterKeyPropagationId,time_t> _key_diffusion_time_stamps ;
// 2 - list of key diffusion items to be routed. These are stored in a priority structure
// where the priority is based on key distance, so that:
// - long distance keys get propagated less easily
// - when the list exceeds the maximum allowed size, items with the largest distance get dropped.
//
std::priority_queue<RsGRouterPublishKeyItem *> _key_diffusion_items ;
void handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item) ;
#endif
// Registered services. These are known to the different peers with a common id,
// so it's important to keep consistency here. This map is volatile, and re-created at each startup of
// the software, when newly created services register themselves.

View File

@ -85,7 +85,7 @@ class RsGRouter
// Communication to other services. //
//===================================================//
virtual void sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) =0;
virtual void sendData(const GRouterKeyId& destination, const GRouterServiceId& client_id, RsGRouterGenericDataItem *item,GRouterMsgPropagationId& id) =0;
virtual bool registerKey(const GRouterKeyId& key,const GRouterServiceId& client_id,const std::string& description_string) =0;
};