mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-12-29 01:16:20 -05:00
added regular timestamp-ing of GXS ids of group authors and group post authors for all subscribed groups
This commit is contained in:
parent
3f132f2c33
commit
8d886b8ecc
@ -77,10 +77,10 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
|
|||||||
mAuthenPolicy(authenPolicy),
|
mAuthenPolicy(authenPolicy),
|
||||||
MESSAGE_STORE_PERIOD(messageStorePeriod),
|
MESSAGE_STORE_PERIOD(messageStorePeriod),
|
||||||
mCleaning(false),
|
mCleaning(false),
|
||||||
mLastClean(time(NULL)),
|
mLastClean(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD), // this helps unsynchronising the checks for the different services
|
||||||
mMsgCleanUp(NULL),
|
mMsgCleanUp(NULL),
|
||||||
mChecking(false),
|
mChecking(false),
|
||||||
mLastCheck(time(NULL)),
|
mLastCheck(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD), // this helps unsynchronising the checks for the different services
|
||||||
mIntegrityCheck(NULL),
|
mIntegrityCheck(NULL),
|
||||||
CREATE_FAIL(0),
|
CREATE_FAIL(0),
|
||||||
CREATE_SUCCESS(1),
|
CREATE_SUCCESS(1),
|
||||||
@ -252,7 +252,7 @@ void RsGenExchange::tick()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore);
|
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,mGixs);
|
||||||
mIntegrityCheck->start();
|
mIntegrityCheck->start();
|
||||||
mChecking = true;
|
mChecking = true;
|
||||||
}
|
}
|
||||||
@ -499,13 +499,11 @@ int RsGenExchange::createGroupSignatures(RsTlvKeySignatureSet& signSet, RsTlvBin
|
|||||||
authorKey, sign))
|
authorKey, sign))
|
||||||
{
|
{
|
||||||
id_ret = SIGN_SUCCESS;
|
id_ret = SIGN_SUCCESS;
|
||||||
|
mGixs->timeStampKey(grpMeta.mAuthorId) ;
|
||||||
|
signSet.keySignSet[INDEX_AUTHEN_IDENTITY] = sign;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
|
||||||
id_ret = SIGN_FAIL;
|
id_ret = SIGN_FAIL;
|
||||||
}
|
|
||||||
|
|
||||||
signSet.keySignSet[INDEX_AUTHEN_IDENTITY] = sign;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -206,7 +206,7 @@
|
|||||||
NXS_NET_DEBUG_3 publish key exchange
|
NXS_NET_DEBUG_3 publish key exchange
|
||||||
NXS_NET_DEBUG_4 vetting
|
NXS_NET_DEBUG_4 vetting
|
||||||
***/
|
***/
|
||||||
#define NXS_NET_DEBUG_0 1
|
//#define NXS_NET_DEBUG_0 1
|
||||||
//#define NXS_NET_DEBUG_1 1
|
//#define NXS_NET_DEBUG_1 1
|
||||||
//#define NXS_NET_DEBUG_2 1
|
//#define NXS_NET_DEBUG_2 1
|
||||||
//#define NXS_NET_DEBUG_3 1
|
//#define NXS_NET_DEBUG_3 1
|
||||||
@ -232,8 +232,9 @@
|
|||||||
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4)
|
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4)
|
||||||
|
|
||||||
//static const RsPeerId peer_to_print = RsPeerId(std::string("6718ae182d97c23af203959e678b98ac")) ; // use this to limit print to this peer id only, or "" for all IDs
|
//static const RsPeerId peer_to_print = RsPeerId(std::string("6718ae182d97c23af203959e678b98ac")) ; // use this to limit print to this peer id only, or "" for all IDs
|
||||||
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ; // use this to limit print to this peer id only, or "" for all IDs
|
//static const RsPeerId peer_to_print = RsPeerId(std::string("a97fef0e2dc82ddb19200fb30f9ac575")) ; // use this to limit print to this peer id only, or "" for all IDs
|
||||||
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("8ad26b347697d7b443ef872ba3a8ea4b")) ; // use this to allow to this group id only, or "" for all IDs
|
static const RsPeerId peer_to_print ;
|
||||||
|
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("78a7480e7af4ae12303ec7fef2736745" )) ; // use this to allow to this group id only, or "" for all IDs
|
||||||
static const uint32_t service_to_print = 0x0217 ; // use this to allow to this service id only, or 0 for all services
|
static const uint32_t service_to_print = 0x0217 ; // use this to allow to this service id only, or 0 for all services
|
||||||
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h)
|
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h)
|
||||||
|
|
||||||
@ -326,98 +327,98 @@ int RsGxsNetService::tick()
|
|||||||
class NxsBandwidthRecorder
|
class NxsBandwidthRecorder
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
static const int OUTQUEUE_CUTOFF_VALUE = 500 ;
|
static const int OUTQUEUE_CUTOFF_VALUE = 500 ;
|
||||||
static const int BANDWIDTH_ESTIMATE_DELAY = 20 ;
|
static const int BANDWIDTH_ESTIMATE_DELAY = 20 ;
|
||||||
|
|
||||||
static void recordEvent(uint16_t service_type, RsItem *item)
|
static void recordEvent(uint16_t service_type, RsItem *item)
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mtx) ;
|
RS_STACK_MUTEX(mtx) ;
|
||||||
|
|
||||||
uint32_t bw = RsNxsSerialiser(service_type).size(item) ; // this is used to estimate bandwidth.
|
uint32_t bw = RsNxsSerialiser(service_type).size(item) ; // this is used to estimate bandwidth.
|
||||||
timeval tv ;
|
timeval tv ;
|
||||||
gettimeofday(&tv,NULL) ;
|
gettimeofday(&tv,NULL) ;
|
||||||
|
|
||||||
// compute time(NULL) in msecs, for a more accurate bw estimate.
|
// compute time(NULL) in msecs, for a more accurate bw estimate.
|
||||||
|
|
||||||
uint64_t now = (uint64_t) tv.tv_sec * 1000 + tv.tv_usec/1000 ;
|
uint64_t now = (uint64_t) tv.tv_sec * 1000 + tv.tv_usec/1000 ;
|
||||||
|
|
||||||
total_record += bw ;
|
total_record += bw ;
|
||||||
++total_events ;
|
++total_events ;
|
||||||
|
|
||||||
#ifdef NXS_NET_DEBUG_2
|
#ifdef NXS_NET_DEBUG_2
|
||||||
std::cerr << "bandwidthRecorder::recordEvent() Recording event time=" << now << ". bw=" << bw << std::endl;
|
std::cerr << "bandwidthRecorder::recordEvent() Recording event time=" << now << ". bw=" << bw << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Every 20 seconds at min, compute a new estimate of the required bandwidth.
|
// Every 20 seconds at min, compute a new estimate of the required bandwidth.
|
||||||
|
|
||||||
if(now > last_event_record + BANDWIDTH_ESTIMATE_DELAY*1000)
|
if(now > last_event_record + BANDWIDTH_ESTIMATE_DELAY*1000)
|
||||||
{
|
{
|
||||||
// Compute the bandwidth using recorded times, in msecs
|
// Compute the bandwidth using recorded times, in msecs
|
||||||
float speed = total_record/1024.0f/(now - last_event_record)*1000.0f ;
|
float speed = total_record/1024.0f/(now - last_event_record)*1000.0f ;
|
||||||
|
|
||||||
// Apply a small temporal convolution.
|
// Apply a small temporal convolution.
|
||||||
estimated_required_bandwidth = 0.75*estimated_required_bandwidth + 0.25 * speed ;
|
estimated_required_bandwidth = 0.75*estimated_required_bandwidth + 0.25 * speed ;
|
||||||
|
|
||||||
#ifdef NXS_NET_DEBUG_2
|
#ifdef NXS_NET_DEBUG_2
|
||||||
std::cerr << std::dec << " " << total_record << " Bytes (" << total_events << " items)"
|
std::cerr << std::dec << " " << total_record << " Bytes (" << total_events << " items)"
|
||||||
<< " received in " << now - last_event_record << " seconds. Speed: " << speed << " KBytes/sec" << std::endl;
|
<< " received in " << now - last_event_record << " seconds. Speed: " << speed << " KBytes/sec" << std::endl;
|
||||||
std::cerr << " instantaneous speed = " << speed << " KB/s" << std::endl;
|
std::cerr << " instantaneous speed = " << speed << " KB/s" << std::endl;
|
||||||
std::cerr << " cumulated estimated = " << estimated_required_bandwidth << " KB/s" << std::endl;
|
std::cerr << " cumulated estimated = " << estimated_required_bandwidth << " KB/s" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
last_event_record = now ;
|
last_event_record = now ;
|
||||||
total_record = 0 ;
|
total_record = 0 ;
|
||||||
total_events = 0 ;
|
total_events = 0 ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Estimate the probability of sending an item so that the expected bandwidth matches the residual bandwidth
|
// Estimate the probability of sending an item so that the expected bandwidth matches the residual bandwidth
|
||||||
|
|
||||||
static float computeCurrentSendingProbability()
|
static float computeCurrentSendingProbability()
|
||||||
{
|
{
|
||||||
int maxIn=50,maxOut=50;
|
int maxIn=50,maxOut=50;
|
||||||
float currIn=0,currOut=0 ;
|
float currIn=0,currOut=0 ;
|
||||||
|
|
||||||
rsConfig->GetMaxDataRates(maxIn,maxOut) ;
|
rsConfig->GetMaxDataRates(maxIn,maxOut) ;
|
||||||
rsConfig->GetCurrentDataRates(currIn,currOut) ;
|
rsConfig->GetCurrentDataRates(currIn,currOut) ;
|
||||||
|
|
||||||
RsConfigDataRates rates ;
|
RsConfigDataRates rates ;
|
||||||
rsConfig->getTotalBandwidthRates(rates) ;
|
rsConfig->getTotalBandwidthRates(rates) ;
|
||||||
|
|
||||||
#ifdef NXS_NET_DEBUG_2
|
#ifdef NXS_NET_DEBUG_2
|
||||||
std::cerr << std::dec << std::endl;
|
std::cerr << std::dec << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
float outqueue_factor = 1.0f/pow( std::max(0.02f,rates.mQueueOut / (float)OUTQUEUE_CUTOFF_VALUE),5.0f) ;
|
float outqueue_factor = 1.0f/pow( std::max(0.02f,rates.mQueueOut / (float)OUTQUEUE_CUTOFF_VALUE),5.0f) ;
|
||||||
float accepted_bandwidth = std::max( 0.0f, maxOut - currOut) ;
|
float accepted_bandwidth = std::max( 0.0f, maxOut - currOut) ;
|
||||||
float max_bandwidth_factor = std::min( accepted_bandwidth / estimated_required_bandwidth,1.0f ) ;
|
float max_bandwidth_factor = std::min( accepted_bandwidth / estimated_required_bandwidth,1.0f ) ;
|
||||||
|
|
||||||
// We account for two things here:
|
// We account for two things here:
|
||||||
// 1 - the required max bandwidth
|
// 1 - the required max bandwidth
|
||||||
// 2 - the current network overload, measured from the size of the outqueues.
|
// 2 - the current network overload, measured from the size of the outqueues.
|
||||||
//
|
//
|
||||||
// Only the later can limit the traffic if the internet connexion speed is responsible for outqueue overloading.
|
// Only the later can limit the traffic if the internet connexion speed is responsible for outqueue overloading.
|
||||||
|
|
||||||
float sending_probability = std::min(outqueue_factor,max_bandwidth_factor) ;
|
float sending_probability = std::min(outqueue_factor,max_bandwidth_factor) ;
|
||||||
|
|
||||||
#ifdef NXS_NET_DEBUG_2
|
#ifdef NXS_NET_DEBUG_2
|
||||||
std::cerr << "bandwidthRecorder::computeCurrentSendingProbability()" << std::endl;
|
std::cerr << "bandwidthRecorder::computeCurrentSendingProbability()" << std::endl;
|
||||||
std::cerr << " current required bandwidth : " << estimated_required_bandwidth << " KB/s" << std::endl;
|
std::cerr << " current required bandwidth : " << estimated_required_bandwidth << " KB/s" << std::endl;
|
||||||
std::cerr << " max_bandwidth_factor : " << max_bandwidth_factor << std::endl;
|
std::cerr << " max_bandwidth_factor : " << max_bandwidth_factor << std::endl;
|
||||||
std::cerr << " outqueue size : " << rates.mQueueOut << ", factor=" << outqueue_factor << std::endl;
|
std::cerr << " outqueue size : " << rates.mQueueOut << ", factor=" << outqueue_factor << std::endl;
|
||||||
std::cerr << " max out : " << maxOut << ", currOut=" << currOut << std::endl;
|
std::cerr << " max out : " << maxOut << ", currOut=" << currOut << std::endl;
|
||||||
std::cerr << " computed probability : " << sending_probability << std::endl;
|
std::cerr << " computed probability : " << sending_probability << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return sending_probability ;
|
return sending_probability ;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static RsMutex mtx;
|
static RsMutex mtx;
|
||||||
static uint64_t last_event_record ;
|
static uint64_t last_event_record ;
|
||||||
static float estimated_required_bandwidth ;
|
static float estimated_required_bandwidth ;
|
||||||
static uint32_t total_events ;
|
static uint32_t total_events ;
|
||||||
static uint64_t total_record ;
|
static uint64_t total_record ;
|
||||||
};
|
};
|
||||||
|
|
||||||
uint32_t NxsBandwidthRecorder::total_events =0 ; // total number of events. Not used.
|
uint32_t NxsBandwidthRecorder::total_events =0 ; // total number of events. Not used.
|
||||||
@ -1394,6 +1395,7 @@ void RsGxsNetService::data_tick()
|
|||||||
processExplicitGroupRequests();
|
processExplicitGroupRequests();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4)
|
||||||
static std::string nice_time_stamp(time_t now,time_t TS)
|
static std::string nice_time_stamp(time_t now,time_t TS)
|
||||||
{
|
{
|
||||||
if(TS == 0)
|
if(TS == 0)
|
||||||
@ -1405,9 +1407,11 @@ static std::string nice_time_stamp(time_t now,time_t TS)
|
|||||||
return s.str() ;
|
return s.str() ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void RsGxsNetService::debugDump()
|
void RsGxsNetService::debugDump()
|
||||||
{
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_1
|
||||||
RS_STACK_MUTEX(mNxsMutex) ;
|
RS_STACK_MUTEX(mNxsMutex) ;
|
||||||
time_t now = time(NULL) ;
|
time_t now = time(NULL) ;
|
||||||
|
|
||||||
@ -1437,6 +1441,7 @@ void RsGxsNetService::debugDump()
|
|||||||
for(std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
|
for(std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
|
||||||
GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl;
|
GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
// This method is normally not needed, but we use it to correct possible inconsistencies in the updte time stamps
|
// This method is normally not needed, but we use it to correct possible inconsistencies in the updte time stamps
|
||||||
@ -1878,14 +1883,14 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
#endif
|
#endif
|
||||||
if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED)
|
if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED)
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " transaction has completed." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " transaction has completed." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
// for a completed list response transaction
|
// for a completed list response transaction
|
||||||
// one needs generate requests from this
|
// one needs generate requests from this
|
||||||
if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP)
|
if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP)
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = msg list response." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = msg list response." << std::endl;
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate msg request based on it." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate msg request based on it." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
@ -1894,7 +1899,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
|
|
||||||
}else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP)
|
}else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP)
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = grp list response." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = grp list response." << std::endl;
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate group transaction request based on it." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate group transaction request based on it." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
@ -1903,7 +1908,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
// you've finished receiving request information now gen
|
// you've finished receiving request information now gen
|
||||||
else if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ)
|
else if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ)
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = msg list request." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = msg list request." << std::endl;
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate msg list based on it." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate msg list based on it." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
@ -1911,7 +1916,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
}
|
}
|
||||||
else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ)
|
else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ)
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = grp list request." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = grp list request." << std::endl;
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate grp list based on it." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " => generate grp list based on it." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
@ -1919,7 +1924,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
}
|
}
|
||||||
else if(flag & RsNxsTransac::FLAG_TYPE_GRPS)
|
else if(flag & RsNxsTransac::FLAG_TYPE_GRPS)
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = groups." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = groups." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
std::vector<RsNxsGrp*> grps;
|
std::vector<RsNxsGrp*> grps;
|
||||||
@ -1932,16 +1937,16 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
{
|
{
|
||||||
tr->mItems.pop_front();
|
tr->mItems.pop_front();
|
||||||
grps.push_back(grp);
|
grps.push_back(grp);
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grp->grpId) << " pushing new group " << grp->grpId << " to list." << std::endl;
|
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grp->grpId) << " adding new group " << grp->grpId << " to incoming list!" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
std::cerr << " /!\\ item did not caste to grp" << std::endl;
|
std::cerr << " /!\\ item did not caste to grp" << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " notifying observer " << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " ...and notifying observer " << std::endl;
|
||||||
#endif
|
#endif
|
||||||
// notify listener of grps
|
// notify listener of grps
|
||||||
mObserver->notifyNewGroups(grps);
|
mObserver->notifyNewGroups(grps);
|
||||||
@ -1962,6 +1967,9 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
item = new RsGxsGrpUpdateItem(mServType);
|
item = new RsGxsGrpUpdateItem(mServType);
|
||||||
mClientGrpUpdateMap.insert(std::make_pair(peerFrom, item));
|
mClientGrpUpdateMap.insert(std::make_pair(peerFrom, item));
|
||||||
}
|
}
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " and updating mClientGrpUpdateMap for peer " << peerFrom << " of new time stamp " << nice_time_stamp(time(NULL),updateTS) << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
#warning should not we conservatively use the most recent one, in case the peer has reset its mServerGrpUpdate time?? What happens if the peer unsubscribed a recent group?
|
#warning should not we conservatively use the most recent one, in case the peer has reset its mServerGrpUpdate time?? What happens if the peer unsubscribed a recent group?
|
||||||
item->grpUpdateTS = updateTS;
|
item->grpUpdateTS = updateTS;
|
||||||
@ -1974,7 +1982,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
{
|
{
|
||||||
|
|
||||||
std::vector<RsNxsMsg*> msgs;
|
std::vector<RsNxsMsg*> msgs;
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = msgs." << std::endl;
|
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " type = msgs." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
RsGxsGroupId grpId;
|
RsGxsGroupId grpId;
|
||||||
@ -1988,8 +1996,8 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
|
|
||||||
tr->mItems.pop_front();
|
tr->mItems.pop_front();
|
||||||
msgs.push_back(msg);
|
msgs.push_back(msg);
|
||||||
#ifdef NXS_NET_DEBUG_1
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),msg->grpId) << " pushing grpId="<< msg->grpId << ", msgsId=" << msg->msgId << std::endl;
|
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),msg->grpId) << " pushing grpId="<< msg->grpId << ", msgsId=" << msg->msgId << " to list of incoming messages" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -2013,7 +2021,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#ifdef NXS_NET_DEBUG_0
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " notifying observer of " << msgs.size() << " new messages." << std::endl;
|
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grpId) << " ...and notifying observer of " << msgs.size() << " new messages." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
// notify listener of msgs
|
// notify listener of msgs
|
||||||
mObserver->notifyNewMessages(msgs);
|
mObserver->notifyNewMessages(msgs);
|
||||||
@ -2329,7 +2337,9 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
|
|||||||
// grp meta must be present if author present
|
// grp meta must be present if author present
|
||||||
if(!noAuthor && grpMeta == NULL)
|
if(!noAuthor && grpMeta == NULL)
|
||||||
{
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_1
|
||||||
GXSNETDEBUG_PG(item->PeerId(),grpId) << ", no group meta found. Givign up." << std::endl;
|
GXSNETDEBUG_PG(item->PeerId(),grpId) << ", no group meta found. Givign up." << std::endl;
|
||||||
|
#endif
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3208,9 +3218,11 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, const RsGxs
|
|||||||
const RsGxsCircleId& circleId = grpMeta.mCircleId;
|
const RsGxsCircleId& circleId = grpMeta.mCircleId;
|
||||||
if(circleId.isNull())
|
if(circleId.isNull())
|
||||||
{
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " ERROR; EXTERNAL_CIRCLE missing NULL CircleId";
|
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " ERROR; EXTERNAL_CIRCLE missing NULL CircleId";
|
||||||
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << grpMeta.mGroupId;
|
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << grpMeta.mGroupId;
|
||||||
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << std::endl;
|
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
// should just be shared. ? no - this happens for
|
// should just be shared. ? no - this happens for
|
||||||
// Circle Groups which lose their CircleIds.
|
// Circle Groups which lose their CircleIds.
|
||||||
@ -3650,7 +3662,9 @@ void RsGxsNetService::sharePublishKeysPending()
|
|||||||
|
|
||||||
if(recipients.empty())
|
if(recipients.empty())
|
||||||
{
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_3
|
||||||
GXSNETDEBUG___ << " No recipients online. Skipping." << std::endl;
|
GXSNETDEBUG___ << " No recipients online. Skipping." << std::endl;
|
||||||
|
#endif
|
||||||
continue ;
|
continue ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include "rsgxsutil.h"
|
#include "rsgxsutil.h"
|
||||||
#include "retroshare/rsgxsflags.h"
|
#include "retroshare/rsgxsflags.h"
|
||||||
#include "pqi/pqihash.h"
|
#include "pqi/pqihash.h"
|
||||||
|
#include "gxs/rsgixs.h"
|
||||||
|
|
||||||
|
|
||||||
RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize)
|
RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize)
|
||||||
@ -106,9 +107,8 @@ bool RsGxsMessageCleanUp::clean()
|
|||||||
return mGrpMeta.empty();
|
return mGrpMeta.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
RsGxsIntegrityCheck::RsGxsIntegrityCheck(
|
RsGxsIntegrityCheck::RsGxsIntegrityCheck(RsGeneralDataService* const dataService, RsGixs *gixs) :
|
||||||
RsGeneralDataService* const dataService) :
|
mDs(dataService), mDone(false), mIntegrityMutex("integrity"),mGixs(gixs)
|
||||||
mDs(dataService), mDone(false), mIntegrityMutex("integrity")
|
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
void RsGxsIntegrityCheck::run()
|
void RsGxsIntegrityCheck::run()
|
||||||
@ -126,13 +126,15 @@ bool RsGxsIntegrityCheck::check()
|
|||||||
GxsMsgReq msgIds;
|
GxsMsgReq msgIds;
|
||||||
GxsMsgReq grps;
|
GxsMsgReq grps;
|
||||||
|
|
||||||
|
std::set<RsGxsGroupId> subscribed_groups ;
|
||||||
|
|
||||||
// compute hash and compare to stored value, if it fails then simply add it
|
// compute hash and compare to stored value, if it fails then simply add it
|
||||||
// to list
|
// to list
|
||||||
std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin();
|
std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin();
|
||||||
for(; git != grp.end(); ++git)
|
for(; git != grp.end(); ++git)
|
||||||
{
|
{
|
||||||
RsNxsGrp* grp = git->second;
|
RsNxsGrp* grp = git->second;
|
||||||
RsFileHash currHash;
|
RsFileHash currHash;
|
||||||
pqihash pHash;
|
pqihash pHash;
|
||||||
pHash.addData(grp->grp.bin_data, grp->grp.bin_len);
|
pHash.addData(grp->grp.bin_data, grp->grp.bin_len);
|
||||||
pHash.Complete(currHash);
|
pHash.Complete(currHash);
|
||||||
@ -144,12 +146,24 @@ bool RsGxsIntegrityCheck::check()
|
|||||||
{
|
{
|
||||||
// store the group for retrieveNxsMsgs
|
// store the group for retrieveNxsMsgs
|
||||||
grps[grp->grpId];
|
grps[grp->grpId];
|
||||||
|
|
||||||
|
if(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
|
||||||
|
{
|
||||||
|
subscribed_groups.insert(git->first) ;
|
||||||
|
|
||||||
|
if(!grp->metaData->mAuthorId.isNull())
|
||||||
|
{
|
||||||
|
std::cerr << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->metaData->mAuthorId << std::endl;
|
||||||
|
mGixs->timeStampKey(grp->metaData->mAuthorId) ;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
msgIds.erase(msgIds.find(grp->grpId));
|
msgIds.erase(msgIds.find(grp->grpId));
|
||||||
// grpsToDel.push_back(grp->grpId);
|
// grpsToDel.push_back(grp->grpId);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -206,16 +220,21 @@ bool RsGxsIntegrityCheck::check()
|
|||||||
for(; vit != msgV.end(); ++vit)
|
for(; vit != msgV.end(); ++vit)
|
||||||
{
|
{
|
||||||
RsNxsMsg* msg = *vit;
|
RsNxsMsg* msg = *vit;
|
||||||
RsFileHash currHash;
|
RsFileHash currHash;
|
||||||
pqihash pHash;
|
pqihash pHash;
|
||||||
pHash.addData(msg->msg.bin_data, msg->msg.bin_len);
|
pHash.addData(msg->msg.bin_data, msg->msg.bin_len);
|
||||||
pHash.Complete(currHash);
|
pHash.Complete(currHash);
|
||||||
|
|
||||||
if(msg->metaData == NULL || currHash != msg->metaData->mHash)
|
if(msg->metaData == NULL || currHash != msg->metaData->mHash)
|
||||||
{
|
{
|
||||||
std::cerr << "(EE) deleting message data with wrong hash or null meta data. meta=" << (void*)msg->metaData << std::endl;
|
std::cerr << "(EE) deleting message data with wrong hash or null meta data. meta=" << (void*)msg->metaData << std::endl;
|
||||||
msgsToDel[msg->grpId].push_back(msg->msgId);
|
msgsToDel[msg->grpId].push_back(msg->msgId);
|
||||||
}
|
}
|
||||||
|
else if(!msg->metaData->mAuthorId.isNull() && subscribed_groups.find(msg->metaData->mGroupId)!=subscribed_groups.end())
|
||||||
|
{
|
||||||
|
std::cerr << "TimeStamping message authors' key ID " << msg->metaData->mAuthorId << " in message " << msg->msgId << ", group ID " << msg->grpId<< std::endl;
|
||||||
|
mGixs->timeStampKey(msg->metaData->mAuthorId) ;
|
||||||
|
}
|
||||||
|
|
||||||
delete msg;
|
delete msg;
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,8 @@
|
|||||||
#include "serialiser/rsnxsitems.h"
|
#include "serialiser/rsnxsitems.h"
|
||||||
#include "rsgds.h"
|
#include "rsgds.h"
|
||||||
|
|
||||||
|
class RsGixs ;
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* Handy function for cleaning out meta result containers
|
* Handy function for cleaning out meta result containers
|
||||||
* @param container
|
* @param container
|
||||||
@ -112,7 +114,7 @@ public:
|
|||||||
* @param chunkSize
|
* @param chunkSize
|
||||||
* @param sleepPeriod
|
* @param sleepPeriod
|
||||||
*/
|
*/
|
||||||
RsGxsIntegrityCheck(RsGeneralDataService* const dataService);
|
RsGxsIntegrityCheck(RsGeneralDataService* const dataService, RsGixs *gixs);
|
||||||
|
|
||||||
|
|
||||||
bool check();
|
bool check();
|
||||||
@ -129,6 +131,8 @@ private:
|
|||||||
RsMutex mIntegrityMutex;
|
RsMutex mIntegrityMutex;
|
||||||
std::list<RsGxsGroupId> mDeletedGrps;
|
std::list<RsGxsGroupId> mDeletedGrps;
|
||||||
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > mDeletedMsgs;
|
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > mDeletedMsgs;
|
||||||
|
|
||||||
|
RsGixs *mGixs ;
|
||||||
};
|
};
|
||||||
|
|
||||||
class GroupUpdate
|
class GroupUpdate
|
||||||
|
Loading…
Reference in New Issue
Block a user