Merge pull request #908 from csoler/v0.6-GxsTransport

V0.6 gxs transport
This commit is contained in:
csoler 2017-06-29 13:50:41 +02:00 committed by GitHub
commit a3e8b967a9
14 changed files with 229 additions and 96 deletions

View File

@ -92,6 +92,7 @@ public:
uint32_t mMaxVisibleCount ;
bool mGrpAutoSync ;
bool mAllowMsgSync;
time_t mLastGroupModificationTS ;
};
typedef std::map<RsGxsGroupId, std::vector<RsNxsMsg*> > NxsMsgDataResult;

View File

@ -70,7 +70,7 @@ static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes
RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns,
RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs,
uint32_t authenPolicy, uint32_t messageStorePeriod)
uint32_t authenPolicy)
: mGenMtx("GenExchange"),
mDataStore(gds),
mNetService(ns),
@ -78,7 +78,6 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
mServType(servType),
mGixs(gixs),
mAuthenPolicy(authenPolicy),
MESSAGE_STORE_PERIOD(messageStorePeriod),
mCleaning(false),
mLastClean((int)time(NULL) - (int)(RSRandom::random_u32() % MSG_CLEANUP_PERIOD)), // this helps unsynchronising the checks for the different services
mMsgCleanUp(NULL),
@ -94,9 +93,7 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
VALIDATE_FAIL_TRY_LATER(2),
VALIDATE_MAX_WAITING_TIME(60)
{
mDataAccess = new RsGxsDataAccess(gds);
}
void RsGenExchange::setNetworkExchangeService(RsNetworkExchangeService *ns)
@ -104,7 +101,9 @@ void RsGenExchange::setNetworkExchangeService(RsNetworkExchangeService *ns)
if(mNetService != NULL)
std::cerr << "(EE) Cannot override existing network exchange service. Make sure it has been deleted otherwise." << std::endl;
else
{
mNetService = ns ;
}
}
RsGenExchange::~RsGenExchange()
@ -244,10 +243,13 @@ void RsGenExchange::tick()
bool RsGenExchange::messagePublicationTest(const RsGxsMsgMetaData& meta)
{
time_t st = MESSAGE_STORE_PERIOD;
if(!mNetService)
{
std::cerr << "(EE) No network service in service " << std::hex << serviceType() << std::dec << ": cannot read message storage time." << std::endl;
return false ;
}
if(mNetService)
st = mNetService->getKeepAge(meta.mGroupId, st);
uint32_t st = mNetService->getKeepAge(meta.mGroupId);
time_t storageTimeLimit = meta.mPublishTs + st;
@ -1374,6 +1376,13 @@ bool RsGenExchange::getGroupData(const uint32_t &token, std::vector<RsGxsGrpItem
gItem->meta.mVisibleMsgCount = 0;
}
// When the group is not subscribed, the last post value is not updated, because there's no message stored. As a consequence,
// we rely on network statistics to give this value, but it is not as accurate as if it was locally computed, because of blocked
// posts, friends not available, sync delays, etc.
if(!(IS_GROUP_SUBSCRIBED(gItem->meta.mSubscribeFlags)))
gItem->meta.mLastPost = sts.mLastGroupModificationTS ;
// Also check the group privacy flags. A while ago, it as possible to publish a group without privacy flags. Now it is not possible anymore.
// As a consequence, it's important to supply a correct value in this flag before the data can be edited/updated.
@ -1825,10 +1834,12 @@ uint32_t RsGenExchange::getStoragePeriod(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mGenMtx) ;
if(mNetService != NULL)
return mNetService->getKeepAge(grpId,MESSAGE_STORE_PERIOD) ;
else
return MESSAGE_STORE_PERIOD;
if(!mNetService)
{
std::cerr << "(EE) No network service in service " << std::hex << serviceType() << std::dec << ": cannot read message storage time. Returning infinity." << std::endl;
return false ;
}
return mNetService->getKeepAge(grpId) ;
}
void RsGenExchange::setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs)
{

View File

@ -117,9 +117,7 @@ public:
* @param gixs This is used for verification of msgs and groups received by Gen Exchange using identities.
* @param authenPolicy This determines the authentication used for verfying authorship of msgs and groups
*/
RsGenExchange(RsGeneralDataService* gds, RsNetworkExchangeService* ns,
RsSerialType* serviceSerialiser, uint16_t mServType, RsGixs* gixs, uint32_t authenPolicy,
uint32_t messageStorePeriod = RS_GXS_DEFAULT_MSG_STORE_PERIOD);
RsGenExchange(RsGeneralDataService* gds, RsNetworkExchangeService* ns, RsSerialType* serviceSerialiser, uint16_t mServType, RsGixs* gixs, uint32_t authenPolicy);
virtual ~RsGenExchange();
@ -665,7 +663,7 @@ public:
* \brief getDefaultStoragePeriod. All times in seconds.
* \return
*/
virtual uint32_t getDefaultStoragePeriod() { return MESSAGE_STORE_PERIOD; }
virtual uint32_t getDefaultStoragePeriod() { return mNetService->getDefaultKeepAge() ; }
virtual uint32_t getStoragePeriod(const RsGxsGroupId& grpId) ;
virtual void setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) ;
@ -893,8 +891,6 @@ private:
std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > mMsgPendingValidate;
typedef std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > NxsMsgPendingVect;
const uint32_t MESSAGE_STORE_PERIOD;
bool mCleaning;
time_t mLastClean;
RsGxsMessageCleanUp* mMsgCleanUp;

View File

@ -311,7 +311,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs,
const RsServiceInfo serviceInfo,
RsGixsReputation* reputations, RsGcxs* circles, RsGixs *gixs,
PgpAuxUtils *pgpUtils, bool grpAutoSync,bool msgAutoSync)
PgpAuxUtils *pgpUtils, bool grpAutoSync, bool msgAutoSync, uint32_t default_store_period, uint32_t default_sync_period)
: p3ThreadedService(), p3Config(), mTransactionN(0),
mObserver(nxsObs), mDataStore(gds),
mServType(servType), mTransactionTimeOut(TRANSAC_TIMEOUT),
@ -321,11 +321,20 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
mCircles(circles), mGixs(gixs),
mReputations(reputations), mPgpUtils(pgpUtils),
mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),
mServiceInfo(serviceInfo)
mServiceInfo(serviceInfo), mDefaultMsgStorePeriod(default_store_period),
mDefaultMsgSyncPeriod(default_sync_period)
{
addSerialType(new RsNxsSerialiser(mServType));
mOwnId = mNetMgr->getOwnId();
mUpdateCounter = 0;
// check the consistency
if(mDefaultMsgStorePeriod > 0 && mDefaultMsgSyncPeriod > mDefaultMsgStorePeriod)
{
std::cerr << "(WW) in GXS service \"" << getServiceInfo().mServiceName << "\": too large message sync period will be set to message store period." << std::endl;
mDefaultMsgSyncPeriod = mDefaultMsgStorePeriod ;
}
}
void RsGxsNetService::getItemNames(std::map<uint8_t,std::string>& names) const
@ -623,8 +632,8 @@ void RsGxsNetService::syncWithPeers()
msg->PeerId(peerId);
msg->updateTS = updateTS;
int req_delay = (int)mServerGrpConfigMap[grpId].msg_req_delay ;
int keep_delay = (int)mServerGrpConfigMap[grpId].msg_keep_delay ;
int req_delay = (int)locked_getGrpConfig(grpId).msg_req_delay ;
int keep_delay = (int)locked_getGrpConfig(grpId).msg_keep_delay ;
// If we store for less than we request, we request less, otherwise the posts will be deleted after being obtained.
@ -681,12 +690,13 @@ void RsGxsNetService::syncGrpStatistics()
for(std::map<RsGxsGroupId,RsGxsGrpMetaData*>::const_iterator it(grpMeta.begin());it!=grpMeta.end();++it)
{
const RsGxsGrpConfig& rec = mServerGrpConfigMap[it->first] ;
const RsGxsGrpConfig& rec = locked_getGrpConfig(it->first) ;
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " group " << it->first ;
#endif
if(rec.update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.ids.size() > 0)
if(rec.statistics_update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.ids.size() > 0)
{
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl;
@ -786,7 +796,10 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
grs_resp->grpId = grs->grpId;
grs_resp->PeerId(grs->PeerId()) ;
grs_resp->last_post_TS = 0 ;
grs_resp->last_post_TS = grpMeta->mPublishTs ; // This is not zero, and necessarily older than any message in the group up to clock precision.
// This allows us to use 0 as "uninitialized" proof. If the group meta has been changed, this time
// will be more recent than some messages. This shouldn't be a problem, since this value can only
// be used to discard groups that are not used.
for(uint32_t i=0;i<vec.size();++i)
{
@ -807,14 +820,16 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl;
#endif
RS_STACK_MUTEX(mNxsMutex) ;
RsGxsGrpConfig& rec(mServerGrpConfigMap[grs->grpId]) ;
RsGxsGrpConfig& rec(locked_getGrpConfig(grs->grpId)) ;
uint32_t old_count = rec.max_visible_count ;
uint32_t old_suppliers_count = rec.suppliers.ids.size() ;
rec.suppliers.ids.insert(grs->PeerId()) ;
rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ;
rec.update_TS = time(NULL) ;
rec.statistics_update_TS = time(NULL) ;
rec.last_group_modification_TS = grs->last_post_TS;
if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.ids.size())
mNewStatsToNotify.insert(grs->grpId) ;
@ -1422,7 +1437,7 @@ bool RsGxsNetService::loadList(std::list<RsItem *> &load)
// the update time stamp is randomised so as not to ask all friends at once about group statistics.
it->second.update_TS = now - GROUP_STATS_UPDATE_DELAY + (RSRandom::random_u32()%(GROUP_STATS_UPDATE_DELAY/10)) ;
it->second.statistics_update_TS = now - GROUP_STATS_UPDATE_DELAY + (RSRandom::random_u32()%(GROUP_STATS_UPDATE_DELAY/10)) ;
// Similarly, we remove all suppliers.
// Actual suppliers will come back automatically.
@ -2296,6 +2311,7 @@ bool RsGxsNetService::getGroupNetworkStats(const RsGxsGroupId& gid,RsGroupNetwor
stats.mMaxVisibleCount = it->second.max_visible_count ;
stats.mAllowMsgSync = mAllowMsgSync ;
stats.mGrpAutoSync = mGrpAutoSync ;
stats.mLastGroupModificationTS = it->second.last_group_modification_TS ;
return true ;
}
@ -2684,7 +2700,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
uint32_t mcount = msgItemL.size() ;
RsPeerId pid = msgItemL.front()->PeerId() ;
RsGxsGrpConfig& gnsr(mServerGrpConfigMap[grpId]) ;
RsGxsGrpConfig& gnsr(locked_getGrpConfig(grpId));
std::set<RsPeerId>::size_type oldSuppliersCount = gnsr.suppliers.ids.size();
uint32_t oldVisibleCount = gnsr.max_visible_count;
@ -4058,7 +4074,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
if(grp_is_known || mServerGrpConfigMap.find(item->grpId)!=mServerGrpConfigMap.end())
{
RsGxsGrpConfig & rec(mServerGrpConfigMap[item->grpId]) ; // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry
RsGxsGrpConfig& rec(locked_getGrpConfig(item->grpId)); // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry
rec.suppliers.ids.insert(peer) ;
}
if(!peer_can_receive_update)
@ -4128,7 +4144,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
time_t now = time(NULL) ;
uint32_t max_send_delay = mServerGrpConfigMap[item->grpId].msg_req_delay; // we should use "sync" but there's only one variable used in the GUI: the req one.
uint32_t max_send_delay = locked_getGrpConfig(item->grpId).msg_req_delay; // we should use "sync" but there's only one variable used in the GUI: the req one.
if(canSendMsgIds(msgMetas, *grpMeta, peer, should_encrypt_to_this_circle_id))
{
@ -4417,7 +4433,7 @@ bool RsGxsNetService::checkPermissionsForFriendGroup(const RsPeerId& sslId,const
void RsGxsNetService::pauseSynchronisation(bool /* enabled */)
{
std::cerr << "(EE) RsGxsNetService::pauseSynchronisation() called, but not implemented." << std::endl;
}
void RsGxsNetService::setSyncAge(const RsGxsGroupId &grpId, uint32_t age_in_secs)
@ -4426,7 +4442,7 @@ void RsGxsNetService::setSyncAge(const RsGxsGroupId &grpId, uint32_t age_in_secs
locked_checkDelay(age_in_secs) ;
RsGxsGrpConfig& conf(mServerGrpConfigMap[grpId]) ;
RsGxsGrpConfig& conf(locked_getGrpConfig(grpId));
if(conf.msg_req_delay != age_in_secs)
{
@ -4444,7 +4460,7 @@ void RsGxsNetService::setKeepAge(const RsGxsGroupId &grpId, uint32_t age_in_secs
locked_checkDelay(age_in_secs) ;
RsGxsGrpConfig& conf(mServerGrpConfigMap[grpId]) ;
RsGxsGrpConfig& conf(locked_getGrpConfig(grpId));
if(conf.msg_keep_delay != age_in_secs)
{
@ -4453,27 +4469,39 @@ void RsGxsNetService::setKeepAge(const RsGxsGroupId &grpId, uint32_t age_in_secs
}
}
RsGxsGrpConfig& RsGxsNetService::locked_getGrpConfig(const RsGxsGroupId& grp_id)
{
GrpConfigMap::iterator it = mServerGrpConfigMap.find(grp_id);
if(it == mServerGrpConfigMap.end())
{
RsGxsGrpConfig& conf(mServerGrpConfigMap[grp_id]) ;
conf.msg_keep_delay = mDefaultMsgStorePeriod;
conf.msg_send_delay = mDefaultMsgSyncPeriod;
conf.msg_req_delay = mDefaultMsgSyncPeriod;
conf.max_visible_count = 0 ;
conf.statistics_update_TS = 0 ;
conf.last_group_modification_TS = 0 ;
return conf ;
}
else
return it->second;
}
uint32_t RsGxsNetService::getSyncAge(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mNxsMutex) ;
GrpConfigMap::const_iterator it = mServerGrpConfigMap.find(grpId) ;
if(it == mServerGrpConfigMap.end())
return RS_GXS_DEFAULT_MSG_REQ_PERIOD ;
else
return it->second.msg_req_delay ;
return locked_getGrpConfig(grpId).msg_req_delay ;
}
uint32_t RsGxsNetService::getKeepAge(const RsGxsGroupId& grpId,uint32_t default_value)
uint32_t RsGxsNetService::getKeepAge(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mNxsMutex) ;
GrpConfigMap::const_iterator it = mServerGrpConfigMap.find(grpId) ;
if(it == mServerGrpConfigMap.end())
return default_value ;
else
return it->second.msg_keep_delay ;
return locked_getGrpConfig(grpId).msg_keep_delay ;
}
int RsGxsNetService::requestGrp(const std::list<RsGxsGroupId>& grpId, const RsPeerId& peerId)
@ -4776,6 +4804,8 @@ bool RsGxsNetService::removeGroups(const std::list<RsGxsGroupId>& groups)
GXSNETDEBUG__G(*git) << " deleting info for group " << *git << std::endl;
#endif
// Here we do not use locked_getGrpConfig() because we dont want the entry to be created if it doesnot already exist.
GrpConfigMap::iterator it = mServerGrpConfigMap.find(*git) ;
if(it != mServerGrpConfigMap.end())

View File

@ -93,13 +93,16 @@ public:
const RsServiceInfo serviceInfo,
RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL,
PgpAuxUtils *pgpUtils = NULL,
bool grpAutoSync = true, bool msgAutoSync = true);
bool grpAutoSync = true, bool msgAutoSync = true,
uint32_t default_store_period = RS_GXS_DEFAULT_MSG_STORE_PERIOD,
uint32_t default_sync_period = RS_GXS_DEFAULT_MSG_REQ_PERIOD);
virtual ~RsGxsNetService();
virtual RsServiceInfo getServiceInfo() { return mServiceInfo; }
virtual void getItemNames(std::map<uint8_t,std::string>& names) const ;
public:
@ -111,9 +114,13 @@ public:
virtual void setKeepAge(const RsGxsGroupId& grpId,uint32_t age_in_secs);
virtual uint32_t getSyncAge(const RsGxsGroupId& id);
virtual uint32_t getKeepAge(const RsGxsGroupId& id,uint32_t default_value);
virtual uint32_t getKeepAge(const RsGxsGroupId& id);
virtual uint32_t getDefaultSyncAge() { return RS_GXS_DEFAULT_MSG_REQ_PERIOD ; }
virtual uint32_t getDefaultSyncAge() { return mDefaultMsgSyncPeriod ; }
virtual uint32_t getDefaultKeepAge() { return mDefaultMsgStorePeriod ; }
virtual void setDefaultKeepAge(uint32_t t) { mDefaultMsgStorePeriod = t ; }
virtual void setDefaultSyncAge(uint32_t t) { mDefaultMsgSyncPeriod = t ; }
/*!
* pauses synchronisation of subscribed groups and request for group id
@ -413,6 +420,7 @@ private:
static RsGxsGroupId hashGrpId(const RsGxsGroupId& gid,const RsPeerId& pid) ;
RsGxsGrpConfig& locked_getGrpConfig(const RsGxsGroupId& grp_id);
private:
typedef std::vector<RsNxsGrp*> GrpFragments;
@ -573,6 +581,9 @@ private:
std::set<RsGxsGroupId> mNewPublishKeysToNotify ;
void debugDump();
uint32_t mDefaultMsgStorePeriod ;
uint32_t mDefaultMsgSyncPeriod ;
};
#endif // RSGXSNETSERVICE_H

View File

@ -35,7 +35,7 @@ static const uint32_t MAX_GXS_IDS_REQUESTS_NET = 10 ; // max number of reques
//#define DEBUG_GXSUTIL 1
#define GXSUTIL_DEBUG() std::cerr << time(NULL) << " : GXS_UTIL : " << __FUNCTION__ << " : "
#define GXSUTIL_DEBUG() std::cerr << "[" << time(NULL) << "] : GXS_UTIL : " << __FUNCTION__ << " : "
RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, RsGenExchange *genex, uint32_t chunkSize)
: mDs(dataService), mGenExchangeClient(genex), CHUNK_SIZE(chunkSize)
@ -57,7 +57,8 @@ bool RsGxsMessageCleanUp::clean()
time_t now = time(NULL);
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " Cleaning up groups in service" << std::hex << mGenExchangeClient->serviceType() << std::dec << std::endl;
uint16_t service_type = mGenExchangeClient->serviceType() ;
GXSUTIL_DEBUG() << " Cleaning up groups in service " << std::hex << service_type << std::dec << std::endl;
#endif
while(!mGrpMeta.empty())
{
@ -97,8 +98,10 @@ bool RsGxsMessageCleanUp::clean()
{
RsGxsMsgMetaData* meta = metaV[i];
bool have_kids = (messages_with_kids.find(meta->mMsgId)!=messages_with_kids.end());
// check if expired
bool remove = store_period > 0 && ((meta->mPublishTs + store_period) < now) && (messages_with_kids.find(meta->mMsgId)==messages_with_kids.end());
bool remove = store_period > 0 && ((meta->mPublishTs + store_period) < now) && !have_kids;
// check client does not want the message kept regardless of age
remove &= !(meta->mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP);
@ -107,12 +110,18 @@ bool RsGxsMessageCleanUp::clean()
remove = remove || (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED);
remove = remove || !(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED);
GXSUTIL_DEBUG() << " msg id " << meta->mMsgId << " in grp " << grpId << ": keep_flag=" << bool(meta->mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP)
<< " subscribed: " << bool(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) << " store_period: " << store_period
<< " kids: " << have_kids << " now - meta->mPublishTs: " << now - meta->mPublishTs ;
if( remove )
{
req[grpId].push_back(meta->mMsgId);
GXSUTIL_DEBUG() << " Scheduling msg id " << meta->mMsgId << " in grp " << grpId << " for removal." << std::endl;
std::cerr << " Scheduling for removal." << std::endl;
}
else
std::cerr << std::endl;
delete meta;
}
@ -313,7 +322,7 @@ bool RsGxsIntegrityCheck::check()
{
gxs_ids.push_back(*it) ;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " " << *it << std::endl;
GXSUTIL_DEBUG() << " " << it->first << std::endl;
#endif
}
uint32_t nb_requested_not_in_cache = 0;
@ -328,7 +337,7 @@ bool RsGxsIntegrityCheck::check()
{
uint32_t n = RSRandom::random_u32() % gxs_ids.size() ;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n] ;
GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n].first ;
#endif
if(!mGixs->haveKey(gxs_ids[n].first)) // checks if we have it already in the cache (conservative way to ensure that we atually have it)

View File

@ -73,9 +73,13 @@ public:
virtual void setKeepAge(const RsGxsGroupId& id,uint32_t age_in_secs) =0;
virtual uint32_t getSyncAge(const RsGxsGroupId& id) =0;
virtual uint32_t getKeepAge(const RsGxsGroupId& id,uint32_t default_value) =0;
virtual uint32_t getKeepAge(const RsGxsGroupId& id) =0;
virtual void setDefaultKeepAge(uint32_t t) =0;
virtual void setDefaultSyncAge(uint32_t t) =0;
virtual uint32_t getDefaultSyncAge() =0;
virtual uint32_t getDefaultKeepAge() =0;
/*!
* Initiates a search through the network

View File

@ -40,7 +40,10 @@ p3GxsTrans::~p3GxsTrans()
bool p3GxsTrans::getStatistics(GxsTransStatistics& stats)
{
{
RS_STACK_MUTEX(mDataMutex);
stats.prefered_group_id = mPreferredGroupId;
}
stats.outgoing_records.clear();
{
@ -140,9 +143,34 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
{
case GROUPS_LIST:
{
#ifdef DEBUG_GXSTRANS
std::cerr << " Reviewing available groups. " << std::endl;
#endif
std::vector<RsGxsGrpItem*> groups;
getGroupData(token, groups);
// First recompute the prefered group Id.
{
RS_STACK_MUTEX(mDataMutex);
mPreferredGroupId.clear();
for( auto grp : groups )
{
locked_supersedePreferredGroup(grp->meta.mGroupId);
if(RsGenExchange::getStoragePeriod(grp->meta.mGroupId) != GXS_STORAGE_PERIOD)
{
std::cerr << "(WW) forcing storage period in GxsTrans group " << grp->meta.mGroupId << " to " << GXS_STORAGE_PERIOD << " seconds. Value was " << RsGenExchange::getStoragePeriod(grp->meta.mGroupId) << std::endl;
RsGenExchange::setStoragePeriod(grp->meta.mGroupId,GXS_STORAGE_PERIOD) ;
}
}
}
#ifdef DEBUG_GXSTRANS
std::cerr << " computed preferred group id: " << mPreferredGroupId << std::endl;
#endif
for( auto grp : groups )
{
/* For each group check if it is better candidate then
@ -155,18 +183,24 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
const RsGroupMetaData& meta = grp->meta;
bool subscribed = IS_GROUP_SUBSCRIBED(meta.mSubscribeFlags);
bool old = olderThen( meta.mLastPost,
UNUSED_GROUP_UNSUBSCRIBE_INTERVAL );
bool supersede = supersedePreferredGroup(meta.mGroupId);
bool old = olderThen( meta.mLastPost, UNUSED_GROUP_UNSUBSCRIBE_INTERVAL );
uint32_t token;
bool shoudlSubscribe = !subscribed && ( !old || supersede );
bool shoudlUnSubscribe = subscribed && old
&& meta.mGroupId != mPreferredGroupId;
bool shouldSubscribe = false ;
bool shouldUnSubscribe = false ;
{
RS_STACK_MUTEX(mDataMutex);
bool shouldSubscribe = !subscribed && ( !old || meta.mGroupId == mPreferredGroupId );
bool shouldUnSubscribe = subscribed && old && meta.mGroupId != mPreferredGroupId;
}
if(shoudlSubscribe)
#ifdef DEBUG_GXSTRANS
std::cout << " group " << grp->meta.mGroupId << ", subscribed: " << subscribed << " last post: " << meta.mLastPost << " should subscribe: "<< shouldSubscribe
<< ", should unsubscribe: " << shouldUnSubscribe << std::endl;
#endif
if(shouldSubscribe)
RsGenExchange::subscribeToGroup(token, meta.mGroupId, true);
else if(shoudlUnSubscribe)
else if(shouldUnSubscribe)
RsGenExchange::subscribeToGroup(token, meta.mGroupId, false);
#ifdef GXS_MAIL_GRP_DEBUG
@ -188,7 +222,14 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
delete grp;
}
if(mPreferredGroupId.isNull())
bool have_preferred_group = false ;
{
RS_STACK_MUTEX(mDataMutex);
have_preferred_group = !mPreferredGroupId.isNull();
}
if(!have_preferred_group)
{
/* This is true only at first run when we haven't received mail
* distribuition groups from friends
@ -214,7 +255,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
#endif
RsGxsGroupId grpId;
acknowledgeTokenGrp(token, grpId);
supersedePreferredGroup(grpId);
RS_STACK_MUTEX(mDataMutex);
locked_supersedePreferredGroup(grpId);
break;
}
case MAILS_UPDATE:
@ -346,6 +389,9 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
#ifdef DEBUG_GXSTRANS
std::cerr << "Group " << mit->first << ": " << std::endl;
#endif
for(; vit != msgV.end(); ++vit)
{
@ -448,6 +494,10 @@ void p3GxsTrans::service_tick()
mCleanupThread->start() ;
mLastMsgCleanup = now ;
}
// This forces to review all groups, and decide to subscribe or not to each of them.
requestGroupsData();
}
// now grab collected messages to delete
@ -817,6 +867,8 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr)
}
case GxsTransSendStatus::PENDING_PREFERRED_GROUP:
{
RS_STACK_MUTEX(mDataMutex);
if(mPreferredGroupId.isNull())
{
requestGroupsData();
@ -841,7 +893,10 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr)
grsrz.resize(grsz);
RsGxsTransSerializer().serialise(&grcpt, &grsrz[0], &grsz);
{
RS_STACK_MUTEX(mDataMutex);
pr.presignedReceipt.grpId = mPreferredGroupId;
}
pr.presignedReceipt.metaData = new RsGxsMsgMetaData();
*pr.presignedReceipt.metaData = grcpt.meta;
pr.presignedReceipt.msg.setBinData(&grsrz[0], grsz);

View File

@ -92,14 +92,15 @@ public:
p3IdService& identities ) :
RsGenExchange( gds, nes, new RsGxsTransSerializer(),
RS_SERVICE_TYPE_GXS_TRANS, &identities,
AuthenPolicy(), GXS_STORAGE_PERIOD ),
AuthenPolicy()),
GxsTokenQueue(this),
RsGxsTrans(this),
mIdService(identities),
mServClientsMutex("p3GxsTrans client services map mutex"),
mOutgoingMutex("p3GxsTrans outgoing queue map mutex"),
mIngoingMutex("p3GxsTrans ingoing queue map mutex"),
mPerUserStatsMutex("p3GxsTrans user stats mutex")
mPerUserStatsMutex("p3GxsTrans user stats mutex"),
mDataMutex("p3GxsTrans data mutex")
{
mLastMsgCleanup = time(NULL) - MAX_DELAY_BETWEEN_CLEANUPS + 30; // always check 30 secs after start
mCleanupThread = NULL ;
@ -149,6 +150,8 @@ public:
/// @see RsGenExchange::getServiceInfo()
virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_TRANS, "GXS Mails", 0, 1, 0, 1 ); }
static const uint32_t GXS_STORAGE_PERIOD = 0x127500; // 14 days.
static const uint32_t GXS_SYNC_PERIOD = 0x127500;
private:
/** Time interval of inactivity before a distribution group is unsubscribed.
* Approximatively 3 months seems ok ATM. */
@ -167,7 +170,6 @@ private:
* signed acknowledged is received for each of them.
* Two weeks seems fair ATM.
*/
static const uint32_t GXS_STORAGE_PERIOD = 0x127500;
static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS ; // every 20 mins. Could be less.
time_t mLastMsgCleanup ;
@ -253,7 +255,7 @@ private:
* @return true if preferredGroupId has been supeseded by potentialGrId
* false otherwise.
*/
bool inline supersedePreferredGroup(const RsGxsGroupId& potentialGrId)
bool inline locked_supersedePreferredGroup(const RsGxsGroupId& potentialGrId)
{
if(mPreferredGroupId < potentialGrId)
{
@ -320,5 +322,9 @@ private:
RsMutex mPerUserStatsMutex;
std::map<RsGxsId,MsgSizeCount> per_user_statistics ;
// Mutex to protect local data
RsMutex mDataMutex;
};

View File

@ -68,7 +68,8 @@ public:
msg_req_delay = RS_GXS_DEFAULT_MSG_REQ_PERIOD ;
max_visible_count = 0 ;
update_TS = 0 ;
statistics_update_TS = 0 ;
last_group_modification_TS = 0 ;
}
uint32_t msg_keep_delay ; // delay after which we discard the posts
@ -77,7 +78,8 @@ public:
RsTlvPeerIdSet suppliers; // list of friends who feed this group
uint32_t max_visible_count ; // max visible count reported by contributing friends
time_t update_TS ; // last time the max visible count was updated.
time_t statistics_update_TS ; // last time the max visible count was updated.
time_t last_group_modification_TS ; // last time the group was modified, either in meta data or in the list of messages posted in it.
};
class RsGxsGrpConfigItem : public RsGxsNetServiceItem, public RsGxsGrpConfig

View File

@ -1298,16 +1298,10 @@ int RsServer::StartupRetroShare()
//
mPluginsManager->setServiceControl(serviceCtrl) ;
// std::cerr << "rsinitconf (core 1) = " << (void*)rsInitConfig<<std::endl;
// std::cerr << "gxs_passwd (core 1) = " << (void*)&rsInitConfig->gxs_passwd<<" \"" << rsInitConfig->gxs_passwd << "\""<< std::endl;
// Now load the plugins. This parses the available SO/DLL files for known symbols.
//
mPluginsManager->loadPlugins(plugins_directories) ;
// std::cerr << "rsinitconf (core 1) = " << (void*)rsInitConfig<<std::endl;
// std::cerr << "gxs_passwd (core 2) = " << (void*)&rsInitConfig->gxs_passwd<< " \"" << rsInitConfig->gxs_passwd << "\""<< std::endl;
// Also load some plugins explicitly. This is helpful for
// - developping plugins
//
@ -1394,7 +1388,6 @@ int RsServer::StartupRetroShare()
mPosted->setNetworkExchangeService(posted_ns) ;
/**** Wiki GXS service ****/
#ifdef RS_USE_WIKI
@ -1493,10 +1486,12 @@ int RsServer::StartupRetroShare()
currGxsDir + "/", "gxstrans_db", RS_SERVICE_TYPE_GXS_TRANS,
NULL, rsInitConfig->gxs_passwd );
mGxsTrans = new p3GxsTrans(gxstrans_ds, NULL, *mGxsIdService);
RsGxsNetService* gxstrans_ns = new RsGxsNetService(
RS_SERVICE_TYPE_GXS_TRANS, gxstrans_ds, nxsMgr, mGxsTrans,
mGxsTrans->getServiceInfo(), mReputations, mGxsCircles,
mGxsIdService, pgpAuxUtils);
mGxsIdService, pgpAuxUtils,true,true,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD);
mGxsTrans->setNetworkExchangeService(gxstrans_ns);
pqih->addService(gxstrans_ns, true);
# endif // RS_GXS_TRANS

View File

@ -44,8 +44,6 @@
RsGxsForums *rsGxsForums = NULL;
const uint32_t GXSFORUMS_MSG_STORE_PERIOD = 60*60*24*31*12; // 12 months / 1 year
#define FORUM_TESTEVENT_DUMMYDATA 0x0001
#define DUMMYDATA_PERIOD 60 // long enough for some RsIdentities to be generated.
@ -56,8 +54,7 @@ const uint32_t GXSFORUMS_MSG_STORE_PERIOD = 60*60*24*31*12; // 12 months / 1 yea
p3GxsForums::p3GxsForums( RsGeneralDataService *gds,
RsNetworkExchangeService *nes, RsGixs* gixs ) :
RsGenExchange( gds, nes, new RsGxsForumSerialiser(),
RS_SERVICE_GXS_TYPE_FORUMS, gixs, forumsAuthenPolicy(),
GXSFORUMS_MSG_STORE_PERIOD),
RS_SERVICE_GXS_TYPE_FORUMS, gixs, forumsAuthenPolicy()),
RsGxsForums(this), mGenToken(0), mGenActive(false), mGenCount(0)
{
// Test Data disabled in Repo.

View File

@ -252,8 +252,10 @@ void GxsTransportStatistics::updateContent()
groupTreeWidget->addTopLevelItem(item);
groupTreeWidget->setItemExpanded(item,openned_groups.find(it->first) != openned_groups.end());
QString msg_time_string = (stat.last_publish_TS>0)?QString(" (Last msg: %1)").arg(QDateTime::fromTime_t(stat.last_publish_TS).toString()):"" ;
item->setData(COL_GROUP_NUM_MSGS, Qt::DisplayRole, QString::number(stat.mNumMsgs) + msg_time_string) ;
item->setData(COL_GROUP_GRP_ID, Qt::DisplayRole, QString::fromStdString(stat.mGrpId.toStdString())) ;
item->setData(COL_GROUP_NUM_MSGS, Qt::DisplayRole, QString::number(stat.mNumMsgs)) ;
item->setData(COL_GROUP_SIZE_MSGS, Qt::DisplayRole, QString::number(stat.mTotalSizeOfMsgs)) ;
item->setData(COL_GROUP_SUBSCRIBED,Qt::DisplayRole, stat.subscribed?tr("Yes"):tr("No")) ;
item->setData(COL_GROUP_POPULARITY,Qt::DisplayRole, QString::number(stat.popularity)) ;
@ -429,6 +431,7 @@ void GxsTransportStatistics::loadMsgMeta(const uint32_t& token)
return ;
for(GxsMsgMetaMap::const_iterator it(m.begin());it!=m.end();++it)
mGroupStats[it->first].messages_metas = it->second ;
for(uint32_t i=0;i<it->second.size();++i)
mGroupStats[it->first].addMessageMeta(it->second[i]) ;
}

View File

@ -38,11 +38,24 @@ class UIStateHelper;
class RsGxsTransGroupStatistics: public GxsGroupStatistic
{
public:
RsGxsTransGroupStatistics() {}
RsGxsTransGroupStatistics()
{
last_publish_TS = 0;
popularity = 0;
subscribed = false;
}
void addMessageMeta(const RsMsgMetaData& meta)
{
messages_metas.push_back(meta) ;
last_publish_TS = std::max(last_publish_TS,meta.mPublishTs) ;
}
bool subscribed ;
int popularity ;
time_t last_publish_TS;
std::vector<RsMsgMetaData> messages_metas ;
};