merged with upstream

This commit is contained in:
csoler 2016-02-10 00:00:21 -05:00
commit 3db3ccf636
185 changed files with 61045 additions and 37839 deletions

View file

@ -299,10 +299,10 @@ bool p3BitDht::loadList(std::list<RsItem *>& load)
return false;
}
std::cerr << "BITDHT Load Item:";
std::cerr << std::endl;
//std::cerr << "BITDHT Load Item:";
//std::cerr << std::endl;
config->print(std::cerr, 0);
//config->print(std::cerr, 0);
std::list<std::string> servers;
int peers[RSDHT_RELAY_NUM_CLASS] = {0};
@ -320,16 +320,16 @@ bool p3BitDht::loadList(std::list<RsItem *>& load)
{
/* add to RELAY_SERVER List */
servers.push_back(value);
std::cerr << "p3BitDht::loadList() Found Server: " << value;
std::cerr << std::endl;
//std::cerr << "p3BitDht::loadList() Found Server: " << value;
//std::cerr << std::endl;
}
else if (0 == strncmp(key.c_str(), "RELAY_MODE", 10))
{
mode = atoi(value.c_str());
haveMode = true;
std::cerr << "p3BitDht::loadList() Found Mode: " << mode;
std::cerr << std::endl;
//std::cerr << "p3BitDht::loadList() Found Mode: " << mode;
//std::cerr << std::endl;
}
else if (0 == strncmp(key.c_str(), "RELAY_CLASS", 11))
{
@ -358,24 +358,24 @@ bool p3BitDht::loadList(std::list<RsItem *>& load)
if (key[13] == 'C')
{
std::cerr << "p3BitDht::loadList() Found Count(" << idx << "): ";
std::cerr << val;
std::cerr << std::endl;
//std::cerr << "p3BitDht::loadList() Found Count(" << idx << "): ";
//std::cerr << val;
//std::cerr << std::endl;
peers[idx] = val;
}
else
{
std::cerr << "p3BitDht::loadList() Found Bandwidth(" << idx << "): ";
std::cerr << val;
std::cerr << std::endl;
//std::cerr << "p3BitDht::loadList() Found Bandwidth(" << idx << "): ";
//std::cerr << val;
//std::cerr << std::endl;
bandwidth[idx] = val;
}
}
else
{
std::cerr << "p3BitDht::loadList() Unknown Key:value: " << key;
std::cerr << ":" << value;
std::cerr << std::endl;
//std::cerr << "p3BitDht::loadList() Unknown Key:value: " << key;
//std::cerr << ":" << value;
//std::cerr << std::endl;
}
}

View file

@ -61,7 +61,7 @@ public:
// This method is called by the global router when a message has been received, or cannot be sent, etc.
//
virtual void notifyDataStatus(const GRouterMsgPropagationId& received_id,uint32_t data_status)
virtual void notifyDataStatus(const GRouterMsgPropagationId& received_id,const RsGxsId& signer_id,uint32_t data_status)
{
std::cerr << "!!!!!! Received Data status from global router, but the client service is not handling it !!!!!!!!!!" << std::endl ;
std::cerr << " message ID = " << received_id << std::endl;

View file

@ -142,25 +142,44 @@ RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataI
ok &= getRawUInt32(data, pktsize, &offset, &item->service_id);
ok &= getRawUInt32(data, pktsize, &offset, &item->data_size);
if(item->data_size > rssize || offset > rssize - item->data_size) // better than if(item->data_size + offset > rssize)
if(item->data_size > 0) // This happens when the item data has been deleted from the cache
{
std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl;
delete item;
return NULL ;
}
if(item->data_size > rssize || offset > rssize - item->data_size) // better than if(item->data_size + offset > rssize)
{
std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl;
delete item;
return NULL ;
}
if( NULL == (item->data_bytes = (uint8_t*)rs_malloc(item->data_size)))
{
delete item;
return NULL ;
}
if( NULL == (item->data_bytes = (uint8_t*)rs_malloc(item->data_size)))
{
delete item;
return NULL ;
}
memcpy(item->data_bytes,&((uint8_t*)data)[offset],item->data_size) ;
offset += item->data_size ;
memcpy(item->data_bytes,&((uint8_t*)data)[offset],item->data_size) ;
offset += item->data_size ;
}
else
item->data_bytes = NULL ;
ok &= item->signature.GetTlv(data, pktsize, &offset) ;
ok &= getRawUInt32(data, pktsize, &offset, &item->randomized_distance);
ok &= getRawUInt32(data, pktsize, &offset, &item->duplication_factor);
// make sure the duplication factor is not altered by friends. In the worst case, the item will duplicate a bit more.
if(item->duplication_factor < 1)
{
item->duplication_factor = 1 ;
std::cerr << "(II) correcting GRouter item duplication factor from 0 to 1, to ensure backward compat." << std::endl;
}
if(item->duplication_factor > GROUTER_MAX_DUPLICATION_FACTOR)
{
std::cerr << "(WW) correcting GRouter item duplication factor of " << item->duplication_factor << ". This is very unexpected." << std::endl;
item->duplication_factor = GROUTER_MAX_DUPLICATION_FACTOR ;
}
ok &= getRawUInt32(data, pktsize, &offset, &item->flags);
if (offset != rssize || !ok)
@ -382,7 +401,7 @@ uint32_t RsGRouterGenericDataItem::serial_size() const
s += 4 ; // service id
s += data_size ; // data
s += signature.TlvSize() ; // signature
s += 4 ; // randomized distance
s += 4 ; // duplication_factor
s += 4 ; // flags
return s ;
@ -483,7 +502,7 @@ bool RsGRouterGenericDataItem::serialise(void *data,uint32_t& size) const
ok &= signature.SetTlv(data, tlvsize, &offset) ;
ok &= setRawUInt32(data, tlvsize, &offset, randomized_distance) ;
ok &= setRawUInt32(data, tlvsize, &offset, duplication_factor) ;
ok &= setRawUInt32(data, tlvsize, &offset, flags) ;
if (offset != tlvsize)
@ -796,7 +815,7 @@ std::ostream& RsGRouterGenericDataItem::print(std::ostream& o, uint16_t)
o << " Data size: " << data_size << std::endl ;
o << " Data hash: " << RsDirUtil::sha1sum(data_bytes,data_size) << std::endl ;
o << " signature key: " << signature.keyId << std::endl;
o << " randomized dist:" << randomized_distance << std::endl;
o << " duplication fac:" << duplication_factor << std::endl;
o << " flags: " << flags << std::endl;
return o ;

View file

@ -133,7 +133,7 @@ class RsGRouterGenericDataItem: public RsGRouterAbstractMsgItem, public RsGRoute
uint32_t data_size ;
uint8_t *data_bytes;
uint32_t randomized_distance ; // number of hops (tunnel wise. Does not preclude of the real distance)
uint32_t duplication_factor ; // number of duplicates allowed. Should be capped at each de-serialise operation!
// utility methods for signing data
virtual uint32_t signed_data_size() const ;

View file

@ -203,7 +203,7 @@ void GRouterMatrix::debugDump() const
std::cerr << " " << it->first << ": from " << it->second.friend_id << " " << now - it->second.time_stamp << " secs ago." << std::endl;
}
bool GRouterMatrix::computeRoutingProbabilities(const GRouterKeyId& key_id, const std::vector<RsPeerId>& friends, std::vector<float>& probas) const
bool GRouterMatrix::computeRoutingProbabilities(const GRouterKeyId& key_id, const std::vector<RsPeerId>& friends, std::vector<float>& probas, float& maximum) const
{
// Routing probabilities are computed according to routing clues
//
@ -239,6 +239,7 @@ bool GRouterMatrix::computeRoutingProbabilities(const GRouterKeyId& key_id, cons
return false ;
}
const std::vector<float>& w(it2->second) ;
maximum = 0.0f ;
for(uint32_t i=0;i<friends.size();++i)
{
@ -250,6 +251,9 @@ bool GRouterMatrix::computeRoutingProbabilities(const GRouterKeyId& key_id, cons
{
probas[i] = w[findex] ;
total += w[findex] ;
if(maximum < w[findex])
maximum = w[findex] ;
}
}

View file

@ -57,7 +57,7 @@ class GRouterMatrix
// the computation accounts for the time at which the info was received and the
// weight of each routing hit record.
//
bool computeRoutingProbabilities(const GRouterKeyId& id, const std::vector<RsPeerId>& friends, std::vector<float>& probas) const ;
bool computeRoutingProbabilities(const GRouterKeyId& id, const std::vector<RsPeerId>& friends, std::vector<float>& probas, float &maximum) const ;
// Update routing probabilities for each key, accounting for all received events, but without
// activity information

View file

@ -44,16 +44,21 @@ static const uint32_t RS_GROUTER_MAX_KEEP_TRACKING_CLUES = 86400*10 ; // m
static const float RS_GROUTER_BASE_WEIGHT_ROUTED_MSG = 1.0f ; // base contribution of routed message clue to routing matrix
static const float RS_GROUTER_BASE_WEIGHT_GXS_PACKET = 0.1f ; // base contribution of GXS message to routing matrix
static const float RS_GROUTER_PROBABILITY_THRESHOLD_FOR_RANDOM_ROUTING = 0.01f ; // routing probability under which the routage is performed randomly
static const float RS_GROUTER_PROBABILITY_THRESHOLD_BEST_PEERS_SELECT = 0.5f ; // min ratio of forward proba with respect to best peer.
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
static const uint32_t MAX_TUNNEL_UNMANAGED_TIME = 600 ; // min time before retry tunnels for that msg.
static const uint32_t MAX_DELAY_FOR_RESEND = 2*86400+300 ; // re-send if held for more than 2 days (cache store period) plus security delay.
static const uint32_t MAX_DESTINATION_KEEP_TIME = 20*86400 ; // keep for 20 days in destination cache to avoid re-
static const uint32_t MAX_RECEIPT_KEEP_TIME = 20*86400 ; // keep for 20 days in destination cache to avoid re-
static const uint32_t TUNNEL_OK_WAIT_TIME = 2 ; // wait for 2 seconds after last tunnel ok, so that we have a complete set of tunnels.
static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary.
static const uint32_t MAX_TRANSACTION_ACK_WAITING_TIME = 60 ; // wait for at most 60 secs for a ACK. If not restart the transaction.
static const uint32_t DIRECT_FRIEND_TRY_DELAY = 20 ; // wait for 20 secs if no friends available, then try tunnels.
static const uint32_t MAX_INACTIVE_DATA_PIPE_DELAY = 300 ; // clean inactive data pipes for more than 5 mins
static const uint32_t GROUTER_MAX_DUPLICATION_FACTOR = 10 ; // max number of duplicates for a given message to keep in the network
static const uint32_t GROUTER_MAX_BRANCHING_FACTOR = 3 ; // max number of branches, for locally forwarding items
static const time_t RS_GROUTER_DEBUG_OUTPUT_PERIOD = 10 ; // Output everything
static const time_t RS_GROUTER_AUTOWASH_PERIOD = 10 ; // Autowash every minute. Not a costly operation.

View file

@ -190,6 +190,8 @@
#include "services/p3idservice.h"
#include "turtle/p3turtle.h"
#include "gxs/rsgixs.h"
#include "retroshare/rspeers.h"
#include "retroshare/rsreputations.h"
#include "p3grouter.h"
#include "grouteritems.h"
@ -855,6 +857,34 @@ void p3GRouter::handleTunnels()
}
}
void p3GRouter::locked_sendToPeers(RsGRouterGenericDataItem *data_item,const std::map<RsPeerId,uint32_t>& peers_and_duplication_factors)
{
// slice the data appropriately and send.
uint32_t saved_duplication_factor = data_item->duplication_factor ; // this little trick avoids copying the item for each peer before slicing it up
for(std::map<RsPeerId,uint32_t>::const_iterator itpid(peers_and_duplication_factors.begin());itpid!=peers_and_duplication_factors.end();++itpid)
{
std::list<RsGRouterTransactionChunkItem*> chunks ;
data_item->duplication_factor = itpid->second;
sliceDataItem(data_item,chunks) ;
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(itpid->first,*(*it2) ) ;
#ifdef GROUTER_DEBUG
std::cerr << " sending " << chunks.size() << " slices to peer " << itpid->first << " with duplication factor = " << itpid->second << std::endl;
#endif
// delete temporary items
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator cit=chunks.begin();cit!=chunks.end();++cit)
delete *cit;
}
data_item->duplication_factor = saved_duplication_factor ;
}
void p3GRouter::routePendingObjects()
{
// Go throught he list of pending messages. For those with a peer ready, send the message to that peer.
@ -885,17 +915,45 @@ void p3GRouter::routePendingObjects()
{
// Look for tunnels and friends where to send the data. Send to both.
std::list<RsPeerId> peers ;
std::map<RsPeerId,uint32_t> peers_and_duplication_factors ;
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
locked_collectAvailableTunnels(it->second.tunnel_hash,it->second.data_item->duplication_factor,peers_and_duplication_factors);
// For now, disable friends. We'll first check that the good old tunnel system works as before.
if(!peers_and_duplication_factors.empty())
{
#ifdef GROUTER_DEBUG
std::cerr << " tunnels available! sending!" << std::endl;
#endif
locked_sendToPeers(it->second.data_item,peers_and_duplication_factors) ;
// change item state in waiting list
it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
it->second.data_transaction_TS = now ;
pending_messages_changed = true ;
continue ; // no need to seek for friend asynced routes since tunnels directly go to the final destination!
}
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.incoming_routes.ids, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
locked_collectAvailableFriends(it->second.data_item->destination_key,it->second.incoming_routes.ids,it->second.data_item->duplication_factor,peers_and_duplication_factors);
if(peers.empty())
if(!peers_and_duplication_factors.empty())
{
#ifdef GROUTER_DEBUG
std::cerr << " friends available! sending!" << std::endl;
#endif
locked_sendToPeers(it->second.data_item,peers_and_duplication_factors) ;
// change item state in waiting list
it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
it->second.data_transaction_TS = now ;
pending_messages_changed = true ;
}
else
{
#ifdef GROUTER_DEBUG
std::cerr << " no direct friends available" << std::endl;
@ -908,33 +966,8 @@ void p3GRouter::routePendingObjects()
#endif
it->second.routing_flags |= GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
}
continue ;
}
// slice the data appropriately and send.
std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.data_item,chunks) ;
#ifdef GROUTER_DEBUG
if(!peers.empty())
std::cerr << " sending to peers:" << std::endl;
#endif
for(std::list<RsPeerId>::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid)
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(*itpid,*(*it2) ) ;
// delete temporary items
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator cit=chunks.begin();cit!=chunks.end();++cit)
delete *cit;
// change item state in waiting list
it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
it->second.data_transaction_TS = now ;
pending_messages_changed = true ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS)
{
@ -1014,44 +1047,127 @@ void p3GRouter::routePendingObjects()
IndicateConfigChanged() ;
}
void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::list<RsPeerId>& friend_peers,const std::set<RsPeerId>& incoming_routes,bool is_origin)
void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,const std::set<RsPeerId>& incoming_routes,uint32_t duplication_factor, std::map<RsPeerId,uint32_t>& friend_peers_and_duplication_factors)
{
// The strategy is the following:
// Old strategy was the following:
// if origin
// send to multiple neighbors : best and random
// else
// send to a single "best" neighbor (determined by threshold over routing probability),
std::set<RsPeerId> ids ;
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
std::vector<float> probas;
std::vector<RsPeerId> tmp_peers;
// remove previous peers
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
if(incoming_routes.find(*it) == incoming_routes.end())
tmp_peers.push_back(*it) ;
if(tmp_peers.empty())
return ;
_routing_matrix.computeRoutingProbabilities(gxs_id, tmp_peers, probas) ;
// New strategy is:
//
// Characteristics of the distribution to look at:
// * who's online, who's not
// * all values quite equal
// * single value well above others
// * largest value is small
// Algorithm:
//
// 0 - encode duplicate factor in routed item and allow at most N duplicates
// - when forwarding to N peers, split the duplication factor into N bins, each being proportional to the forwarding probability.
// Example for N=3 and D=10:
//
// p Calculation Final bin
//
// +-0.21--> 0.21*10=2.1 --> 2 0.1 below
// |
// 10 ----+-0.45--> 0.45*10=4.5 --> 4.6-> 5 0.4 above
// |
// +-0.34--> 0.34*10=3.4 --> 3.0-> 3 0
//
//
// 1 - get routing probabilities p_i for all peers as well as the maximum proba p before normalization.
//
// Set N = min(3,item->duplication_factor) // max number of friends to route to
//
// if p < threshold // That means the routage info is too old => Fallback to random routing.
// Select N random online friends and forward to them.
// else
// Let p_i be the probabilities of all peers
// Select all online peers for which p_i >= 0.5*p.
// if !empty
// Update duplication factors according to probabilities and number of peers
// Route to these peers
// else
// Keep the item
//
#ifdef GROUTER_DEBUG
std::cerr << "locked_getAvailableFriends()" << std::endl;
std::cerr << " getting connected friends, computing routing probabilities" << std::endl;
std::cerr << " looking for friends for item to ID " << gxs_id << " duplication factor = " << duplication_factor << std::endl;
std::cerr << " retrieving online friends and all friends lists." << std::endl;
#endif
std::set<RsPeerId> online_ids ;
std::list<RsPeerId> all_ids ;
rsPeers->getFriendList(all_ids) ;
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,online_ids) ;
std::vector<RsPeerId> tmp_peers;
for(std::list<RsPeerId>::const_iterator it(all_ids.begin());it!=all_ids.end();++it)
tmp_peers.push_back(*it) ;
std::vector<float> probas;
float maximum = 1.0;
float max_probability = 0.0;
_routing_matrix.computeRoutingProbabilities(gxs_id, tmp_peers, probas, maximum) ;
#ifdef GROUTER_DEBUG
std::cerr << " initial routing probabilities (maximum=" << maximum << ")" << std::endl;
for(uint32_t i=0;i<tmp_peers.size();++i)
std::cerr << " " << tmp_peers[i] << " " << probas[i] << std::endl;
#endif
if(maximum < RS_GROUTER_PROBABILITY_THRESHOLD_FOR_RANDOM_ROUTING)
{
max_probability = 0.0f ;
#ifdef GROUTER_DEBUG
std::cerr << " max proba: " << maximum << " is below random threshold => using uniform random routing." << std::endl;
#endif
}
else
{
for(uint32_t i=0;i<tmp_peers.size();++i)
max_probability = std::max(max_probability,probas[i]) ;
}
#ifdef GROUTER_DEBUG
std::cerr << " maxi probability=" << max_probability << std::endl;
#endif
// remove incoming peers and peers for which the proba is less than half the maximum proba
uint32_t count=0;
for(uint32_t i=0;i<tmp_peers.size();++i)
if(incoming_routes.find(tmp_peers[i]) != incoming_routes.end())
std::cerr << " removing " << tmp_peers[i] << " which is an incoming route" << std::endl;
else if(probas[i] < RS_GROUTER_PROBABILITY_THRESHOLD_BEST_PEERS_SELECT*max_probability)
std::cerr << " removing " << tmp_peers[i] << " because probability is below best peers threshold" << std::endl;
else
{
tmp_peers[count] = tmp_peers[i] ;
probas[count] = (max_probability==0.0)? (0.5+0.001*RSRandom::random_f32()) : probas[i] ;
++count ;
}
tmp_peers.resize(count) ;
probas.resize(count) ;
if(tmp_peers.empty())
{
#ifdef GROUTER_DEBUG
std::cerr << " online - available peers empty => giving up." << std::endl;
#endif
return ;
}
// now select the N best peers
#ifdef GROUTER_DEBUG
std::cerr << " Remaining peers and routing probabilities:" << std::endl;
for(uint32_t i=0;i<tmp_peers.size();++i)
std::cerr << " " << tmp_peers[i] << ", probability: " << probas[i] << std::endl;
#endif
uint32_t max_count = is_origin?3:1 ;
float probability_threshold = is_origin?0.0:0.5 ;
#ifdef GROUTER_DEBUG
std::cerr << " position at origin: " << is_origin << " => mac_count=" << max_count << ", proba threshold=" << probability_threshold << std::endl;
#endif
std::vector<std::pair<float,RsPeerId> > mypairs ;
for(uint32_t i=0;i<tmp_peers.size();++i)
@ -1060,24 +1176,37 @@ void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::l
// now sort them up
std::sort(mypairs.begin(),mypairs.end(),item_comparator_001()) ;
// take the max_count peers that are still above min_probability
uint32_t n=0 ;
for(std::vector<std::pair<float,RsPeerId> >::const_reverse_iterator it = mypairs.rbegin();it!=mypairs.rend() && n<max_count;++it)
if( (*it).first >= probability_threshold )
{
friend_peers.push_back( (*it).second ), ++n ;
int max_count = std::min(std::min((uint32_t)mypairs.size(),(uint32_t)GROUTER_MAX_BRANCHING_FACTOR),duplication_factor);
// normalise the probabilities
#ifdef GROUTER_DEBUG
std::cerr << " keeping " << (*it).second << std::endl;
std::cerr << " Normalising probabilities..." << std::endl;
#endif
if(!is_origin) // only collect one peer if we're not at origin.
break ;
float total_probas = 0.0f ;
if(mypairs.size() > 0 && max_count > 0)
{
for(int i=mypairs.size()-1,n=0;i>=0 && n<max_count;--i,++n) total_probas += mypairs[i].first ;
for(int i=mypairs.size()-1,n=0;i>=0 && n<max_count;--i,++n) mypairs[i].first /= total_probas ;
}
float duplication_factor_delta =0.0;
for(int i=mypairs.size()-1,n=0;i>=0 && n<max_count;--i,++n)
{
float ideal_dupl = duplication_factor*mypairs[i].first - duplication_factor_delta ; // correct what was taken in advance
uint32_t real_dupl = std::min( duplication_factor - max_count+1,std::max(1u,uint32_t(rint(ideal_dupl)))) ;
duplication_factor_delta = real_dupl - ideal_dupl ;
std::cerr << " Peer " << mypairs[i].second << " prob=" << mypairs[i].first << ", ideal_dupl=" << ideal_dupl << ", real=" << real_dupl << ". Delta = " << duplication_factor_delta << std::endl;
friend_peers_and_duplication_factors[ mypairs[i].second ] = real_dupl ; // should be updated correctly
}
}
void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers)
void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,uint32_t total_duplication,std::map<RsPeerId,uint32_t>& tunnel_peers_and_duplication_factors)
{
time_t now = time(NULL) ;
@ -1110,7 +1239,7 @@ void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,std::l
#endif
TurtleVirtualPeerId vpid = *(vpit->second.virtual_peers.begin()) ;
tunnel_peers.push_back(vpid) ;
tunnel_peers_and_duplication_factors[vpid] = total_duplication ;
}
bool p3GRouter::locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& trans_item)
@ -1165,19 +1294,30 @@ void p3GRouter::autoWash()
bool items_deleted = false ;
time_t now = time(NULL) ;
std::map<GRouterMsgPropagationId,GRouterClientService *> failed_msgs ;
std::map<GRouterMsgPropagationId,std::pair<GRouterClientService *,RsGxsId> > failed_msgs ;
{
RS_STACK_MUTEX(grMtx) ;
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();)
if( (it->second.data_status == RS_GROUTER_DATA_STATUS_DONE &&
(!(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION)
|| it->second.received_time_TS + MAX_DESTINATION_KEEP_TIME < now))
|| ((it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
&& !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN)
&& !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION)
)) // is the item too old for cache
{
bool delete_entry = false ;
if(it->second.data_status == RS_GROUTER_DATA_STATUS_DONE )
{
if(!(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION) || it->second.received_time_TS + MAX_DESTINATION_KEEP_TIME < now) // is the item too old for cache
delete_entry = true ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK )
{
if(it->second.received_time_TS + MAX_RECEIPT_KEEP_TIME < now) // is the item too old for cache
delete_entry = true ;
}
else
if(it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
delete_entry = true ;
if(delete_entry)
{
#ifdef GROUTER_DEBUG
grouter_debug() << " Removing cached item " << std::hex << it->first << std::dec << std::endl;
@ -1190,7 +1330,7 @@ void p3GRouter::autoWash()
GRouterClientService *client = NULL;
if(locked_getLocallyRegisteredClientFromServiceId(it->second.client_id,client))
failed_msgs[it->first] = client ;
failed_msgs[it->first] = std::make_pair(client,it->second.data_item->signature.keyId) ;
else
std::cerr << " ERROR: client id " << it->second.client_id << " not registered. Consistency error." << std::endl;
}
@ -1209,6 +1349,7 @@ void p3GRouter::autoWash()
}
else
++it ;
}
// also check all existing tunnels
@ -1248,12 +1389,12 @@ void p3GRouter::autoWash()
}
// Look into pending items.
for(std::map<GRouterMsgPropagationId,GRouterClientService*>::const_iterator it(failed_msgs.begin());it!=failed_msgs.end();++it)
for(std::map<GRouterMsgPropagationId,std::pair<GRouterClientService*,RsGxsId> >::const_iterator it(failed_msgs.begin());it!=failed_msgs.end();++it)
{
#ifdef GROUTER_DEBUG
std::cerr << " notifying client for message id " << std::hex << it->first << " state = FAILED" << std::endl;
#endif
it->second->notifyDataStatus(it->first ,GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) ;
it->second.first->notifyDataStatus(it->first,it->second.second ,GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) ;
}
if(items_deleted)
@ -1361,13 +1502,13 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
std::cerr << "Item content:" << std::endl;
receipt_item->print(std::cerr,2) ;
#endif
RsGxsId signer_id ;
// Because we don't do proxy-transmission yet, the client needs to be notified. Otherwise, we will need to
// first check if we're a proxy or not. We also remove the message from the global router sending list.
// in the proxy case, we should only store the receipt.
GRouterClientService *client_service = NULL;
GRouterServiceId service_id ;
GRouterMsgPropagationId mid = 0 ;
{
@ -1379,6 +1520,7 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
std::cerr << " ERROR: no routing ID corresponds to this message. Inconsistency!" << std::endl;
return ;
}
signer_id = it->second.data_item->signature.keyId ;
// check hash.
@ -1435,7 +1577,7 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
#ifdef GROUTER_DEBUG
std::cerr << " notifying client " << (void*)client_service << " that msg " << std::hex << mid << std::dec << " was received." << std::endl;
#endif
client_service->notifyDataStatus(mid, GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) ;
client_service->notifyDataStatus(mid, signer_id, GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) ;
}
// also note the incoming route in the routing matrix
@ -1830,7 +1972,11 @@ bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item)
{
try
{
RsTlvSecurityKey signature_key ;
if(rsReputations->isIdentityBanned(item->signature.keyId))
{
std::cerr << "(WW) received global router message from banned identity " << item->signature.keyId << ". Rejecting the message." << std::endl;
return false ;
}
uint32_t data_size = item->signed_data_size() ;
RsTemporaryMemory data(data_size) ;
@ -1843,12 +1989,20 @@ bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item)
uint32_t error_status ;
if(!mGixs->validateData(data,data_size,item->signature,true,error_status))
{
switch(error_status)
{
case RsGixs::RS_GIXS_ERROR_KEY_NOT_AVAILABLE: std::cerr << "(EE) Key for GXS Id " << item->signature.keyId << " is not available. Cannot verify." << std::endl;
case RsGixs::RS_GIXS_ERROR_KEY_NOT_AVAILABLE:
{
std::list<RsPeerId> peer_ids ;
peer_ids.push_back(item->PeerId()) ;
std::cerr << "(EE) Key for GXS Id " << item->signature.keyId << " is not available. Cannot verify. Asking key to peer " << item->PeerId() << std::endl;
mGixs->requestKey(item->signature.keyId,peer_ids) ; // request the key around
}
break ;
case RsGixs::RS_GIXS_ERROR_SIGNATURE_MISMATCH: std::cerr << "(EE) Signature mismatch. Spoofing/Corrupted/MITM?." << std::endl;
break ;
@ -1927,7 +2081,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
data_item->data_size = data_size ;
data_item->routing_id = propagation_id ;
data_item->randomized_distance = 0 ;
data_item->duplication_factor = GROUTER_MAX_DUPLICATION_FACTOR ;
data_item->service_id = client_id ;
data_item->destination_key = destination ;
data_item->flags = 0 ; // this is unused for now.
@ -1987,7 +2141,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
grouter_debug() << " data status = " << info.data_status << std::endl;
grouter_debug() << " tunnel status = " << info.tunnel_status << std::endl;
grouter_debug() << " sending attempt= " << info.sending_attempts << std::endl;
grouter_debug() << " distance = " << info.data_item->randomized_distance << std::endl;
grouter_debug() << " duplicate fact = " << info.data_item->duplication_factor << std::endl;
grouter_debug() << " recv time = " << info.received_time_TS << std::endl;
grouter_debug() << " client id = " << std::hex << data_item->service_id << std::dec << std::endl;
grouter_debug() << " tunnel hash = " << info.tunnel_hash << std::endl;
@ -2134,21 +2288,23 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
info.friend_ids.clear() ;
info.published_keys.clear() ;
std::set<RsPeerId> ids ;
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
std::list<RsPeerId> ids ;
//mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
rsPeers->getFriendList(ids) ;
info.published_keys = _owned_key_ids ;
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
for(std::list<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
info.friend_ids.push_back(*it) ;
std::vector<GRouterKeyId> known_keys ;
std::vector<float> probas ;
float maximum= 0.0f;
_routing_matrix.getListOfKnownKeys(known_keys) ;
for(uint32_t i=0;i<known_keys.size();++i)
{
_routing_matrix.computeRoutingProbabilities(known_keys[i],info.friend_ids,probas) ;
_routing_matrix.computeRoutingProbabilities(known_keys[i],info.friend_ids,probas,maximum) ;
info.per_friend_probabilities[known_keys[i]] = probas ;
}
@ -2167,6 +2323,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
cinfo.mid = it->first ;
cinfo.local_origin = it->second.incoming_routes.ids ;
cinfo.destination = it->second.data_item->destination_key ;
cinfo.duplication_factor = it->second.data_item->duplication_factor ;
cinfo.routing_time = it->second.received_time_TS ;
cinfo.last_tunnel_attempt_time = it->second.last_tunnel_request_TS ;
cinfo.last_sent_time = it->second.last_sent_TS ;
@ -2220,15 +2377,16 @@ void p3GRouter::debugDump()
{
grouter_debug() << " Msg id: " << std::hex << it->first << std::dec ;
grouter_debug() << " data hash: " << it->second.item_hash ;
grouter_debug() << " client id: " << std::hex << it->second.client_id << std::dec;
grouter_debug() << " client: " << std::hex << it->second.client_id << std::dec;
grouter_debug() << " Flags: " << std::hex << it->second.routing_flags << std::dec;
grouter_debug() << " Destination: " << it->second.data_item->destination_key ;
grouter_debug() << " Received: " << now - it->second.received_time_TS << " secs ago.";
grouter_debug() << " Last sent: " << now - it->second.last_sent_TS << " secs ago.";
grouter_debug() << " Transaction TS: " << now - it->second.data_transaction_TS << " secs ago.";
grouter_debug() << " Data Status: " << statusString[it->second.data_status] << std::endl;
grouter_debug() << " Tunl Status: " << statusString[it->second.tunnel_status] << std::endl;
grouter_debug() << " Receipt ok: " << (it->second.receipt_item != NULL) << std::endl;
grouter_debug() << " Dest: " << it->second.data_item->destination_key ;
grouter_debug() << " Recd: " << now - it->second.received_time_TS << " secs ago.";
grouter_debug() << " Sent: " << now - it->second.last_sent_TS << " secs ago.";
grouter_debug() << " Trans. TS: " << now - it->second.data_transaction_TS << " secs ago." ;
grouter_debug() << " Data Status: " << statusString[it->second.data_status] ;
grouter_debug() << " Tunl Status: " << statusString[it->second.tunnel_status] ;
grouter_debug() << " Receipt ok: " << (it->second.receipt_item != NULL) ;
grouter_debug() << " Dup: " << it->second.data_item->duplication_factor << std::endl;
}
grouter_debug() << " Tunnels: " << std::endl;
@ -2251,8 +2409,8 @@ void p3GRouter::debugDump()
grouter_debug() << " Routing matrix: " << std::endl;
if(_debug_enabled)
_routing_matrix.debugDump() ;
// if(_debug_enabled)
// _routing_matrix.debugDump() ;
}

View file

@ -272,8 +272,9 @@ private:
//bool locked_getGxsIdAndClientId(const TurtleFileHash &sum,RsGxsId& gxs_id,GRouterServiceId& client_id);
bool locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& item);
void locked_collectAvailableFriends(const GRouterKeyId &gxs_id,std::list<RsPeerId>& friend_peers, const std::set<RsPeerId>& incoming_routes,bool is_origin);
void locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers);
void locked_collectAvailableFriends(const GRouterKeyId &gxs_id, const std::set<RsPeerId>& incoming_routes,uint32_t duplication_factor, std::map<RsPeerId, uint32_t> &friend_peers_and_duplication_factors);
void locked_collectAvailableTunnels(const TurtleFileHash& hash, uint32_t total_duplication, std::map<RsPeerId, uint32_t> &tunnel_peers_and_duplication_factors);
void locked_sendToPeers(RsGRouterGenericDataItem *data_item, const std::map<RsPeerId, uint32_t> &peers_and_duplication_factors);
//===================================================//
// p3Config methods //

View file

@ -2365,7 +2365,10 @@ void RsGenExchange::publishGrps()
if(ggps.mIsUpdate)
mDataAccess->updateGroupData(grp);
else
mDataAccess->addGroupData(grp);
mDataAccess->addGroupData(grp);
if(mNetService!=NULL)
mNetService->subscribeStatusChanged(grpId,true) ;
}
else
{

View file

@ -221,6 +221,7 @@
#define NXS_NET_DEBUG_7 1
#define GIXS_CUT_OFF 0
//#define NXS_FRAG
// The constant below have a direct influence on how fast forums/channels/posted/identity groups propagate and on the overloading of queues:
//
@ -235,8 +236,9 @@
#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends
#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed.
#define GROUP_STATS_UPDATE_DELAY 1800 // update unsubscribed group statistics every 30 mins
#define GROUP_STATS_UPDATE_NB_PEERS 2 // update unsubscribed group statistics every 30 mins
#define GROUP_STATS_UPDATE_DELAY 240 // update unsubscribed group statistics every 3 mins
#define GROUP_STATS_UPDATE_NB_PEERS 2 // number of peers to which the group stats are asked
#define MAX_ALLOWED_GXS_MESSAGE_SIZE 199000 // 200,000 bytes including signature and headers
// Debug system to allow to print only for some IDs (group, Peer, etc)
@ -245,7 +247,7 @@
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ;
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("" )) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = 0 ; // use this to allow to this service id only, or 0 for all services
static const uint32_t service_to_print = 0x215 ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {};
@ -597,7 +599,7 @@ void RsGxsNetService::syncWithPeers()
grp->PeerId(*sit);
grp->updateTS = updateTS;
NxsBandwidthRecorder::recordEvent(mServType,grp) ;
//NxsBandwidthRecorder::recordEvent(mServType,grp) ;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_(*sit) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global group TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) to himself" << std::endl;
@ -632,7 +634,11 @@ void RsGxsNetService::syncWithPeers()
sit = peers.begin();
float sending_probability = NxsBandwidthRecorder::computeCurrentSendingProbability() ;
// Jan. 26, 2016. This has been disabled, since GXS has been fixed, groups will not re-ask for data. So even if outqueues are filled up by multiple
// attempts of the same request, the transfer will eventually end up. The code for NxsBandwidthRecorder should be kept for a while,
// just in case.
// float sending_probability = NxsBandwidthRecorder::computeCurrentSendingProbability() ;
#ifdef NXS_NET_DEBUG_2
std::cerr << " syncWithPeers(): Sending probability = " << sending_probability << std::endl;
#endif
@ -688,22 +694,24 @@ void RsGxsNetService::syncWithPeers()
msg->grpId = grpId;
msg->updateTS = updateTS;
NxsBandwidthRecorder::recordEvent(mServType,msg) ;
//NxsBandwidthRecorder::recordEvent(mServType,msg) ;
if(RSRandom::random_f32() < sending_probability)
{
sendItem(msg);
//if(RSRandom::random_f32() < sending_probability)
//{
sendItem(msg);
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_PG(*sit,grpId) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself" << std::endl;
#endif
}
else
{
delete msg ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(*sit,grpId) << " cancel RsNxsSyncMsg req (last local update TS for group+peer) for grpId=" << grpId << " to peer " << *sit << ": not enough bandwidth." << std::endl;
#endif
}
//}
//else
//{
// delete msg ;
//#ifdef NXS_NET_DEBUG_0
// GXSNETDEBUG_PG(*sit,grpId) << " cancel RsNxsSyncMsg req (last local update TS for group+peer) for grpId=" << grpId << " to peer " << *sit << ": not enough bandwidth." << std::endl;
//#endif
//}
}
}
@ -897,7 +905,13 @@ void RsGxsNetService::subscribeStatusChanged(const RsGxsGroupId& grpId,bool subs
else
it->second->msgUpdateTS = 0 ; // reset!
// no need to update mGrpServerUpdateItem since the ::updateServerSyncTS() call will do it.
// We also update mGrpServerUpdateItem so as to trigger a new grp list exchange with friends (friends will send their known ClientTS which
// will be lower than our own grpUpdateTS, triggering our sending of the new subscribed grp list.
if(mGrpServerUpdateItem == NULL)
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
mGrpServerUpdateItem->grpUpdateTS = time(NULL) ;
}
bool RsGxsNetService::fragmentMsg(RsNxsMsg& msg, MsgFragments& msgFragments) const
@ -906,7 +920,8 @@ bool RsGxsNetService::fragmentMsg(RsNxsMsg& msg, MsgFragments& msgFragments) con
uint32_t msgSize = msg.msg.TlvSize();
uint32_t dataLeft = msgSize;
uint8_t nFragments = ceil(float(msgSize)/FRAGMENT_SIZE);
char buffer[FRAGMENT_SIZE];
RsTemporaryMemory buffer(FRAGMENT_SIZE);
int currPos = 0;
@ -972,10 +987,25 @@ RsNxsMsg* RsGxsNetService::deFragmentMsg(MsgFragments& msgFragments) const
if(msgFragments.size() == 1)
{
RsNxsMsg* m = msgFragments.front();
if(m->count > 1)
if(m->count > 1) // normally mcount should be exactly 1, but if not initialised (old versions) it's going to be 0
{
// delete everything
std::cerr << "(WW) Cannot deFragment message set. m->count=" << m->count << ", but msgFragments.size()=" << msgFragments.size() << ". Incomplete? Dropping all." << std::endl;
for(uint32_t i=0;i<msgFragments.size();++i)
delete msgFragments[i] ;
msgFragments.clear();
return NULL;
}
else
{
// single piece. No need to say anything. Just return it.
msgFragments.clear();
return m;
}
}
// first determine total size for binary data
@ -985,9 +1015,21 @@ RsNxsMsg* RsGxsNetService::deFragmentMsg(MsgFragments& msgFragments) const
for(; mit != msgFragments.end(); ++mit)
datSize += (*mit)->msg.bin_len;
char* data = new char[datSize];
RsTemporaryMemory data(datSize) ;
if(!data)
{
for(uint32_t i=0;i<msgFragments.size();++i)
delete msgFragments[i] ;
msgFragments.clear();
return NULL ;
}
uint32_t currPos = 0;
std::cerr << "(II) deFragmenting long message of size " << datSize << ", from " << msgFragments.size() << " pieces." << std::endl;
for(mit = msgFragments.begin(); mit != msgFragments.end(); ++mit)
{
RsNxsMsg* msg = *mit;
@ -1003,10 +1045,17 @@ RsNxsMsg* RsGxsNetService::deFragmentMsg(MsgFragments& msgFragments) const
msg->transactionNumber = m.transactionNumber;
msg->meta = m.meta;
delete[] data;
// now clean!
for(uint32_t i=0;i<msgFragments.size();++i)
delete msgFragments[i] ;
msgFragments.clear();
return msg;
}
// This is unused apparently, since groups are never large. Anyway, we keep it in case we need it.
RsNxsGrp* RsGxsNetService::deFragmentGrp(GrpFragments& grpFragments) const
{
if(grpFragments.empty()) return NULL;
@ -1265,7 +1314,7 @@ struct MsgFragCollate
bool operator()(RsNxsMsg* msg) { return msg->msgId == mMsgId;}
};
void RsGxsNetService::collateMsgFragments(MsgFragments fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const
void RsGxsNetService::collateMsgFragments(MsgFragments& fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const
{
// get all unique message Ids;
MsgFragments::iterator vit = fragments.begin();
@ -1359,31 +1408,33 @@ private:
bool RsGxsNetService::loadList(std::list<RsItem *> &load)
{
RS_STACK_MUTEX(mNxsMutex) ;
RS_STACK_MUTEX(mNxsMutex) ;
// The delete is done in StoreHere, if necessary
// The delete is done in StoreHere, if necessary
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem));
// We reset group statistics here. This is the best place since we know at this point which are all unsubscribed groups.
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem));
time_t now = time(NULL);
for(ClientMsgMap::iterator it = mClientMsgUpdateMap.begin();it!=mClientMsgUpdateMap.end();++it)
for(std::map<RsGxsGroupId,RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
{
RsGroupNetworkStatsRecord& gnsr = mGroupNetworkStats[it2->first] ;
time_t now = time(NULL);
for(std::map<RsGxsGroupId,RsGroupNetworkStatsRecord>::iterator it(mGroupNetworkStats.begin());it!=mGroupNetworkStats.end();++it)
{
// At each reload, we reset the count of visible messages. It will be rapidely restored to its real value from friends.
// At each reload, divide the last count by 2. This gradually flushes old information away.
it->second.max_visible_count = 0; // std::max(it2->second.message_count,gnsr.max_visible_count) ;
// the update time stamp is randomised so as not to ask all friends at once about group statistics.
it->second.update_TS = now - GROUP_STATS_UPDATE_DELAY + (RSRandom::random_u32()%(GROUP_STATS_UPDATE_DELAY/10)) ;
gnsr.max_visible_count = std::max(it2->second.message_count,gnsr.max_visible_count/2) ;
gnsr.update_TS = now - GROUP_STATS_UPDATE_DELAY + (RSRandom::random_u32()%(GROUP_STATS_UPDATE_DELAY/10)) ;
// Similarly, we remove all suppliers.
// Actual suppliers will come back automatically.
// Similarly, we remove some of the suppliers randomly. If they are
// actual suppliers, they will come back automatically. If they are
// not, they will be forgotten.
it->second.suppliers.clear() ;
}
if(RSRandom::random_f32() > 0.2)
gnsr.suppliers.insert(it->first) ;
}
return true;
return true;
}
#include <algorithm>
@ -1675,7 +1726,9 @@ void RsGxsNetService::data_tick()
if(mUpdateCounter >= 120) // 60 seconds
{
updateServerSyncTS();
#ifdef TO_REMOVE
updateClientSyncTS();
#endif
mUpdateCounter = 1;
}
else
@ -1733,6 +1786,7 @@ void RsGxsNetService::debugDump()
#endif
}
#ifdef TO_REMOVE
// This method is normally not needed, but we use it to correct possible inconsistencies in the updte time stamps
// on the client side.
@ -1770,6 +1824,7 @@ void RsGxsNetService::updateClientSyncTS()
}
}
}
#endif
void RsGxsNetService::updateServerSyncTS()
{
@ -1784,6 +1839,10 @@ void RsGxsNetService::updateServerSyncTS()
// retrieve all grps and update TS
mDataStore->retrieveGxsGrpMetaData(gxsMap);
#ifdef TO_REMOVE
// (cyril) This code is removed because it is inconsistent: the list of grps does not need to be updated when
// new posts arrive. The two (grp list and msg list) are handled independently.
// as a grp list server also note this is the latest item you have
if(mGrpServerUpdateItem == NULL)
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
@ -1792,6 +1851,7 @@ void RsGxsNetService::updateServerSyncTS()
// we have unsubscribed a group.
mGrpServerUpdateItem->grpUpdateTS = 0 ;
#endif
bool change = false;
// then remove from mServerMsgUpdateMap, all items that are not in the group list!
@ -1829,6 +1889,7 @@ void RsGxsNetService::updateServerSyncTS()
ServerMsgMap::iterator mapIT = mServerMsgUpdateMap.find(grpId);
RsGxsServerMsgUpdateItem* msui = NULL;
#ifdef TO_REMOVE
// That accounts for modification of the meta data.
if(mGrpServerUpdateItem->grpUpdateTS < grpMeta->mPublishTs)
@ -1838,6 +1899,7 @@ void RsGxsNetService::updateServerSyncTS()
#endif
mGrpServerUpdateItem->grpUpdateTS = grpMeta->mPublishTs;
}
#endif
if(mapIT == mServerMsgUpdateMap.end())
{
@ -1861,6 +1923,7 @@ void RsGxsNetService::updateServerSyncTS()
#endif
}
#ifdef TO_REMOVE
// This might be very inefficient with time. This is needed because an old message might have been received, so the last modification time
// needs to account for this so that a friend who hasn't
@ -1872,6 +1935,7 @@ void RsGxsNetService::updateServerSyncTS()
mGrpServerUpdateItem->grpUpdateTS = grpMeta->mRecvTS;
change = true;
}
#endif
}
// actual change in config settings, then save configuration
@ -2261,7 +2325,6 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " and updating mClientGrpUpdateMap for peer " << peerFrom << " of new time stamp " << nice_time_stamp(time(NULL),updateTS) << std::endl;
#endif
#warning should not we conservatively use the most recent one, in case the peer has reset its mServerGrpUpdate time?? What happens if the peer unsubscribed a recent group?
item->grpUpdateTS = updateTS;
item->peerId = peerFrom;
@ -2295,13 +2358,15 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg" << std::endl;
}
#ifdef NSXS_FRAG
std::map<RsGxsGroupId, MsgFragments > collatedMsgs;
collateMsgFragments(msgs, collatedMsgs);
#ifdef NXS_FRAG
// (cyril) This code does not work. Since we do not really need message fragmenting, I won't fix it.
std::map<RsGxsMessageId, MsgFragments > collatedMsgs;
collateMsgFragments(msgs, collatedMsgs); // this destroys msgs whatsoever and recovers memory when needed
msgs.clear();
std::map<RsGxsGroupId, MsgFragments >::iterator mit = collatedMsgs.begin();
std::map<RsGxsMessageId, MsgFragments >::iterator mit = collatedMsgs.begin();
for(; mit != collatedMsgs.end(); ++mit)
{
MsgFragments& f = mit->second;
@ -2310,6 +2375,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
if(msg)
msgs.push_back(msg);
}
collatedMsgs.clear();
#endif
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grpId) << " ...and notifying observer of " << msgs.size() << " new messages." << std::endl;
@ -2542,7 +2608,12 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
RsGxsGrpMetaData* grpMeta = grpMetaMap[grpId];
#warning TODO: what if grpMeta is NULL?
if(grpMeta == NULL) // this should not happen, but just in case...
{
std::cerr << "(EE) grpMeta is NULL in " << __PRETTY_FUNCTION__ << " line " << __LINE__ << ". This is very unexpected." << std::endl;
return ;
}
if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ))
{
// For unsubscribed groups, we update the timestamp something more recent, so that the group content will not be asked to the same
@ -2945,6 +3016,24 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
if(!reqList.empty())
locked_pushGrpTransactionFromList(reqList, tr->mTransaction->PeerId(), transN);
else
{
ClientGrpMap::iterator it = mClientGrpUpdateMap.find(tr->mTransaction->PeerId());
RsGxsGrpUpdateItem* item = NULL;
if(it != mClientGrpUpdateMap.end())
item = it->second;
else
{
item = new RsGxsGrpUpdateItem(mServType);
mClientGrpUpdateMap.insert(std::make_pair(tr->mTransaction->PeerId(), item));
}
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " reqList is empty, updating anyway ClientGrpUpdate TS for peer " << tr->mTransaction->PeerId() << " to: " << tr->mTransaction->updateTS << std::endl;
#endif
item->grpUpdateTS = tr->mTransaction->updateTS;
item->peerId = tr->mTransaction->PeerId();
IndicateConfigChanged();
}
}
void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
@ -3162,37 +3251,52 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
uint32_t msgSize = 0;
for(;mit != msgs.end(); ++mit)
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
for(; vit != msgV.end(); ++vit)
{
RsNxsMsg* msg = *vit;
msg->PeerId(peerId);
msg->transactionNumber = transN;
for(; vit != msgV.end(); ++vit)
{
RsNxsMsg* msg = *vit;
msg->PeerId(peerId);
msg->transactionNumber = transN;
#ifndef NXS_FRAG
newTr->mItems.push_back(msg);
msgSize++;
// Quick trick to clamp messages with an exceptionnally large size. Signature will fail on client side, and the message
// will be rejected.
if(msg->msg.bin_len > MAX_ALLOWED_GXS_MESSAGE_SIZE)
{
std::cerr << "(WW) message with ID " << msg->msgId << " in group " << msg->grpId << " exceeds size limit of " << MAX_ALLOWED_GXS_MESSAGE_SIZE << " bytes. Actual size is " << msg->msg.bin_len << " bytes. Message will be truncated and rejected at client." << std::endl;
msg->msg.bin_len = 1 ; // arbitrary small size, but not 0. No need to send the data since it's going to be rejected.
}
#ifdef NXS_FRAG
MsgFragments fragments;
fragmentMsg(*msg, fragments);
delete msg ;
MsgFragments::iterator mit = fragments.begin();
for(; mit != fragments.end(); ++mit)
{
newTr->mItems.push_back(*mit);
msgSize++;
}
#else
MsgFragments fragments;
fragmentMsg(*msg, fragments);
msg->count = 1; // only one piece. This is to keep compatibility if we ever implement fragmenting in the future.
msg->pos = 0;
MsgFragments::iterator mit = fragments.begin();
for(; mit != fragments.end(); ++mit)
{
newTr->mItems.push_back(*mit);
msgSize++;
}
newTr->mItems.push_back(msg);
msgSize++;
#endif
}
}
if(newTr->mItems.empty()){
delete newTr;
return;
}
}
}
if(newTr->mItems.empty()){
delete newTr;
return;
}
// now if transaction is limited to an external group, encrypt it for members of the group.
@ -3553,6 +3657,10 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
trItem->timestamp = 0;
trItem->PeerId(peer);
trItem->transactionNumber = transN;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_ (peer) << "Setting tr->mTransaction->updateTS to " << mGrpServerUpdateItem->grpUpdateTS << std::endl;
#endif
trItem->updateTS = mGrpServerUpdateItem->grpUpdateTS;
// also make a copy for the resident transaction
tr->mTransaction = new RsNxsTransacItem(*trItem);
tr->mTransaction->PeerId(mOwnId);
@ -3921,13 +4029,20 @@ bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncMsgReqItem *item)
}
void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item)
{
if (!item)
return;
if (!item)
return;
RS_STACK_MUTEX(mNxsMutex) ;
RS_STACK_MUTEX(mNxsMutex) ;
const RsPeerId& peer = item->PeerId();
// Insert the PeerId in suppliers list for this grpId
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << "RsGxsNetService::handleRecvSyncMessage(): Inserting PeerId " << item->PeerId() << " in suppliers list for group " << item->grpId << std::endl;
#endif
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[item->grpId]) ; // this creates it if needed
rec.suppliers.insert(peer) ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << "handleRecvSyncMsg(): Received last update TS of group " << item->grpId << ", for peer " << peer << ", TS = " << time(NULL) - item->updateTS << " secs ago." ;
#endif

View file

@ -378,7 +378,9 @@ private:
void locked_doMsgUpdateWork(const RsNxsTransacItem* nxsTrans, const RsGxsGroupId& grpId);
void updateServerSyncTS();
#ifdef TO_REMOVE
void updateClientSyncTS();
#endif
bool locked_CanReceiveUpdate(const RsNxsSyncGrpReqItem *item);
bool locked_CanReceiveUpdate(const RsNxsSyncMsgReqItem* item);
@ -433,7 +435,7 @@ private:
* @param fragments message fragments which are not necessarily from the same message
* @param partFragments the partitioned fragments (into message ids)
*/
void collateMsgFragments(MsgFragments fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const;
void collateMsgFragments(MsgFragments &fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const;
/*!
* Note that if all fragments for a group are not found then its fragments are dropped

View file

@ -28,6 +28,7 @@
#include "rsgxsutil.h"
#include "retroshare/rsgxsflags.h"
#include "retroshare/rspeers.h"
#include "retroshare/rsreputations.h"
#include "pqi/pqihash.h"
#include "gxs/rsgixs.h"
@ -161,7 +162,8 @@ bool RsGxsIntegrityCheck::check()
std::cerr << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->grpId << std::endl;
#endif
used_gxs_ids.insert(grp->metaData->mAuthorId) ;
if(rsReputations!=NULL && !rsReputations->isIdentityBanned(grp->metaData->mAuthorId))
used_gxs_ids.insert(grp->metaData->mAuthorId) ;
}
}
}
@ -242,7 +244,8 @@ bool RsGxsIntegrityCheck::check()
#ifdef GXSUTIL_DEBUG
std::cerr << "TimeStamping message authors' key ID " << msg->metaData->mAuthorId << " in message " << msg->msgId << ", group ID " << msg->grpId<< std::endl;
#endif
used_gxs_ids.insert(msg->metaData->mAuthorId) ;
if(rsReputations!=NULL && !rsReputations->isIdentityBanned(msg->metaData->mAuthorId))
used_gxs_ids.insert(msg->metaData->mAuthorId) ;
}
delete msg;

View file

@ -53,7 +53,8 @@ static const uint32_t RS_GXS_TUNNEL_DH_STATUS_UNINITIALIZED = 0x0000 ;
static const uint32_t RS_GXS_TUNNEL_DH_STATUS_HALF_KEY_DONE = 0x0001 ;
static const uint32_t RS_GXS_TUNNEL_DH_STATUS_KEY_AVAILABLE = 0x0002 ;
static const uint32_t RS_GXS_TUNNEL_DELAY_BETWEEN_RESEND = 10 ; // re-send every 10 secs.
static const uint32_t RS_GXS_TUNNEL_DELAY_BETWEEN_RESEND = 10 ; // re-send every 10 secs.
static const uint32_t RS_GXS_TUNNEL_DATA_PRINT_STORAGE_DELAY = 600 ; // store old message ids for 10 minutes.
static const uint32_t GXS_TUNNEL_ENCRYPTION_HMAC_SIZE = SHA_DIGEST_LENGTH ;
static const uint32_t GXS_TUNNEL_ENCRYPTION_IV_SIZE = 8 ;
@ -73,9 +74,6 @@ p3GxsTunnelService::p3GxsTunnelService(RsGixs *pids)
: mGixs(pids), mGxsTunnelMtx("GXS tunnel")
{
mTurtle = NULL ;
// any value is fine here, even 0, since items in different RS sessions will use different AES keys.
global_item_counter = 0;//RSRandom::random_u64() ;
}
void p3GxsTunnelService::connectToTurtleRouter(p3turtle *tr)
@ -103,7 +101,6 @@ bool p3GxsTunnelService::registerClientService(uint32_t service_id,RsGxsTunnelSe
int p3GxsTunnelService::tick()
{
static time_t last_dump = 0 ;
#ifdef DEBUG_GXS_TUNNEL
time_t now = time(NULL);
@ -196,7 +193,9 @@ void p3GxsTunnelService::flush()
for(std::map<RsGxsTunnelId,GxsTunnelPeerInfo>::iterator it(_gxs_tunnel_contacts.begin());it!=_gxs_tunnel_contacts.end();)
{
// Remove any tunnel that was remotely closed, since we cannot use it anymore.
// All sorts of cleaning. We start with the ones that may remove stuff, for efficiency reasons.
// 1 - Remove any tunnel that was remotely closed, since we cannot use it anymore.
if(it->second.status == RS_GXS_TUNNEL_STATUS_REMOTELY_CLOSED && it->second.last_contact + 20 < now)
{
@ -207,6 +206,8 @@ void p3GxsTunnelService::flush()
continue ;
}
// 2 - re-digg tunnels that have died out of inaction
if(it->second.last_contact+20+GXS_TUNNEL_KEEP_ALIVE_TIMEOUT < now && it->second.status == RS_GXS_TUNNEL_STATUS_CAN_TALK)
{
#ifdef DEBUG_GXS_TUNNEL
@ -228,6 +229,9 @@ void p3GxsTunnelService::flush()
mTurtle->forceReDiggTunnels( randomHashFromDestinationGxsId(it->second.to_gxs_id) );
}
}
// send keep alive packets to active tunnels.
if(it->second.last_keep_alive_sent + GXS_TUNNEL_KEEP_ALIVE_TIMEOUT < now && it->second.status == RS_GXS_TUNNEL_STATUS_CAN_TALK)
{
RsGxsTunnelStatusItem *cs = new RsGxsTunnelStatusItem ;
@ -244,6 +248,23 @@ void p3GxsTunnelService::flush()
std::cerr << "(II) GxsTunnelService:: Sending keep alive packet to gxs id " << it->first << std::endl;
#endif
}
// clean old received data prints.
for(std::map<uint64_t,time_t>::iterator it2=it->second.received_data_prints.begin();it2!=it->second.received_data_prints.end();)
if(now > it2->second + RS_GXS_TUNNEL_DATA_PRINT_STORAGE_DELAY)
{
#ifdef DEBUG_GXS_TUNNEL
std::cerr << "(II) erasing old data print for message #" << it2->first << " in tunnel " << it->first << std::endl;
#endif
std::map<uint64_t,time_t>::iterator tmp(it2) ;
++tmp ;
it->second.received_data_prints.erase(it2) ;
it2 = tmp ;
}
else
++it2 ;
++it ;
}
}
@ -347,6 +368,16 @@ void p3GxsTunnelService::handleRecvTunnelDataItem(const RsGxsTunnelId& tunnel_id
it2->second.client_services.insert(item->service_id) ;
peer_from = it2->second.to_gxs_id ;
}
// Check if the item has already been received. This is necessary because we actually re-send items until an ACK is received. If the ACK gets lost (connection interrupted) the
// item may be received twice. This is conservative and ensure that no item is lost nor received twice.
if(it2->second.received_data_prints.find(item->unique_item_counter) != it2->second.received_data_prints.end())
{
std::cerr << "(WW) received the same data item #" << std::hex << item->unique_item_counter << std::dec << " twice in last 20 mins. Tunnel id=" << tunnel_id << ". Probably a replay. Item will be dropped." << std::endl;
return ;
}
it2->second.received_data_prints[item->unique_item_counter] = time(NULL) ;
}
if(service->acceptDataFromPeer(peer_from,tunnel_id))
@ -1325,7 +1356,7 @@ bool p3GxsTunnelService::sendData(const RsGxsTunnelId &tunnel_id, uint32_t servi
RsGxsTunnelDataItem *item = new RsGxsTunnelDataItem ;
item->unique_item_counter = global_item_counter++; // this allows to make the item unique
item->unique_item_counter = RSRandom::random_u64(); // this allows to make the item unique, except very rarely, we we don't care.
item->flags = 0; // not used yet.
item->service_id = service_id;
item->data_size = size; // encrypted data size

View file

@ -171,6 +171,7 @@ private:
RsTurtleGenericTunnelItem::Direction direction ; // specifiec wether we are client(managing the tunnel) or server.
TurtleFileHash hash ; // hash that is last used. This is necessary for handling tunnel establishment
std::set<uint32_t> client_services ;// services that used this tunnel
std::map<uint64_t,time_t> received_data_prints ; // list of recently received messages, to avoid duplicates. Kept for 20 mins at most.
uint32_t total_sent ;
uint32_t total_received ;
};
@ -252,8 +253,6 @@ private:
RsGixs *mGixs ;
RsMutex mGxsTunnelMtx ;
uint64_t global_item_counter ;
std::map<uint32_t,RsGxsTunnelClientService*> mRegisteredServices ;
void debug_dump();

View file

@ -1212,7 +1212,7 @@ bool p3NetMgrIMPL::setExtAddress(const struct sockaddr_storage &addr)
bool changed = false;
{
RsStackMutex stack(mNetMtx); /****** STACK LOCK MUTEX *******/
if (sockaddr_storage_same(mExtAddr, addr))
if (!sockaddr_storage_same(mExtAddr, addr))
{
changed = true;
}

View file

@ -25,6 +25,7 @@
#ifdef WINDOWS_SYS
#include "util/rswin.h"
#include "util/rsmemory.h"
#include <ws2tcpip.h>
#endif // WINDOWS_SYS

View file

@ -434,9 +434,9 @@ int pqipersongrp::removePeer(const RsPeerId& id)
std::map<RsPeerId, SearchModule *>::iterator it;
#ifdef PGRP_DEBUG
#endif
std::cerr << "pqipersongrp::removePeer() id: " << id;
std::cerr << std::endl;
#endif
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/

View file

@ -391,25 +391,26 @@ bool pqiSSLstore::encryptedSendItems(const std::list<RsItem*>& rsItemList)
std::list<RsItem*>::const_iterator it;
uint32_t sizeItems = 0, sizeItem = 0;
uint32_t offset = 0;
char* data = NULL;
for(it = rsItemList.begin(); it != rsItemList.end(); ++it)
sizeItems += rsSerialiser->size(*it);
if(*it != NULL)
sizeItems += rsSerialiser->size(*it);
data = new char[sizeItems];
RsTemporaryMemory data(sizeItems) ;
for(it = rsItemList.begin(); it != rsItemList.end(); ++it)
{
sizeItem = rsSerialiser->size(*it);
if(*it != NULL)
{
sizeItem = rsSerialiser->size(*it);
if(rsSerialiser->serialise(*it, (data+offset),&sizeItem))
offset += sizeItem;
else
std::cerr << "(EE) pqiSSLstore::encryptedSendItems(): One item did not serialize. The item is probably unknown from the serializer. Dropping the item. " << std::endl;
if(rsSerialiser->serialise(*it, &data[offset],&sizeItem))
offset += sizeItem;
else
std::cerr << "(EE) pqiSSLstore::encryptedSendItems(): One item did not serialize. The item is probably unknown from the serializer. Dropping the item. " << std::endl;
if (!(bio_flags & BIN_FLAGS_NO_DELETE))
delete *it;
}
if (!(bio_flags & BIN_FLAGS_NO_DELETE))
delete *it;
}
bool result = true;
@ -418,9 +419,6 @@ bool pqiSSLstore::encryptedSendItems(const std::list<RsItem*>& rsItemList)
else
result = false;
if(data != NULL)
delete[] data;
return result;
}

View file

@ -50,6 +50,7 @@ public:
time_t last_tunnel_attempt_time;
time_t last_sent_time;
bool receipt_available ;
uint32_t duplication_factor ;
uint32_t data_status ;
uint32_t tunnel_status ;
uint32_t data_size ;

View file

@ -1,7 +1,7 @@
#define RS_MAJOR_VERSION 0
#define RS_MINOR_VERSION 6
#define RS_BUILD_NUMBER 0
#define RS_BUILD_NUMBER_ADD "x" // <-- do we need this?
#define RS_BUILD_NUMBER_ADD ""
// The revision number should be the 4 first bytes of the git revision hash, which is obtained using:
// git log --pretty="%H" | head -1 | cut -c1-8

View file

@ -296,8 +296,17 @@ bool p3Peers::getPeerDetails(const RsPeerId& id, RsPeerDetails &d)
d.hiddenNodeAddress = ps.hiddenDomain;
d.hiddenNodePort = ps.hiddenPort;
d.hiddenType = ps.hiddenType;
d.localAddr = sockaddr_storage_iptostring(ps.localaddr);
d.localPort = sockaddr_storage_port(ps.localaddr);
if(sockaddr_storage_isnull(ps.localaddr)) // that happens if the address is not initialised.
{
d.localAddr = "INVALID_IP";
d.localPort = 0 ;
}
else
{
d.localAddr = sockaddr_storage_iptostring(ps.localaddr);
d.localPort = sockaddr_storage_port(ps.localaddr);
}
d.extAddr = "hidden";
d.extPort = 0;
d.dyndns = "";

View file

@ -1336,7 +1336,14 @@ int RsServer::StartupRetroShare()
false,false); // don't synchronise group automatic (need explicit group request)
// don't sync messages at all.
// Normally we wouldn't need this (we do in other service):
// mGxsIdService->setNetworkExchangeService(gxsid_ns) ;
// ...since GxsIds are propagated manually. But that requires the gen exchange of GXSids to
// constantly test that mNetService is not null. The call below is to make the service aware of the
// netService so that it can request the missing ids. We'll need to fix this.
mGxsIdService->setNes(gxsid_ns);
/**** GxsCircle service ****/
// create GXS Circle service

View file

@ -275,7 +275,7 @@ bool RsLoginHandler::tryAutoLogin(const RsPeerId& ssl_id,std::string& ssl_passwd
fseek(fp, 0, SEEK_SET);
dataptr = (char *) rs_malloc(datalen);
if(data_ptr == NULL)
if(dataptr == NULL)
{
fclose(fp);
return false;

View file

@ -354,7 +354,6 @@ int p3MsgService::checkOutgoingMessages()
* if online, send
*/
time_t now = time(NULL);
bool changed = false ;
std::list<RsMsgItem*> output_queue ;
@ -1853,7 +1852,7 @@ void p3MsgService::manageDistantPeers()
}
}
void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t data_status)
void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id, const RsGxsId &signer_id, uint32_t data_status)
{
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
{
@ -1861,7 +1860,7 @@ void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t d
std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells us that item ID " << id << " could not be delivered on time." ;
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
if(it == _ongoing_messages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
@ -1869,6 +1868,7 @@ void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t d
}
uint32_t msg_id = it->second ;
std::cerr << " message id = " << msg_id << std::endl;
mDistantOutgoingMsgSigners[msg_id] = signer_id ; // this is needed because it's not saved in config, but we should probably include it in _ongoing_messages
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id) ;
@ -1992,7 +1992,15 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
{
RS_STACK_MUTEX(mMsgMtx) ;
signing_key_id = mDistantOutgoingMsgSigners[msgitem->msgId] ;
std::map<uint32_t,RsGxsId>::const_iterator it = mDistantOutgoingMsgSigners.find(msgitem->msgId) ;
if(it == mDistantOutgoingMsgSigners.end())
{
std::cerr << "(EE) no signer registered for distant message " << msgitem->msgId << ". Cannot send!" << std::endl;
return ;
}
signing_key_id = it->second ;
if(signing_key_id.isNull())
{
@ -2033,6 +2041,7 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
RS_STACK_MUTEX(mMsgMtx) ;
_ongoing_messages[grouter_message_id] = msgitem->msgId ;
}
IndicateConfigChanged(); // save _ongoing_messages
}

View file

@ -142,7 +142,7 @@ private:
virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ;
virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ;
virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,uint32_t data_status) ;
virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,const RsGxsId& signer_id,uint32_t data_status) ;
// Utility functions

View file

@ -190,8 +190,6 @@ void p3rtt::sendPingMeasurements()
mServiceCtrl->getPeersConnected(getServiceInfo().mServiceType, idList);
double ts = getCurrentTS();
#ifdef DEBUG_RTT
std::cerr << "p3rtt::sendPingMeasurements() @ts: " << ts;
std::cerr << std::endl;
@ -205,7 +203,8 @@ void p3rtt::sendPingMeasurements()
std::cerr << "p3rtt::sendPingMeasurements() Pinging: " << *it;
std::cerr << std::endl;
#endif
double ts = getCurrentTS();
/* create the packet */
RsRttPingItem *pingPkt = new RsRttPingItem();
pingPkt->PeerId(*it);

View file

@ -1872,6 +1872,7 @@ void p3turtle::monitorTunnels(const RsFileHash& hash,RsTurtleClientService *clie
// No tunnels at start, but this triggers digging new tunnels.
//
_incoming_file_hashes[hash].tunnels.clear();
_incoming_file_hashes[hash].use_aggressive_mode = allow_multi_tunnels ;
// also should send associated request to the file transfer module.
_incoming_file_hashes[hash].last_digg_time = RSRandom::random_u32()%10 ;