moved msg keep period settings to netService and made the initialisation obei the default settings of the service

This commit is contained in:
csoler 2017-06-28 23:12:33 +02:00
parent 12b562893d
commit f37ba83c07
9 changed files with 102 additions and 69 deletions

View File

@ -65,12 +65,12 @@ static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key
//#define GEN_EXCH_DEBUG 1
static const uint32_t MSG_CLEANUP_PERIOD = 60*59; // 59 minutes
static const uint32_t MSG_CLEANUP_PERIOD = 6 *59; // 59 minutes
static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes
RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns,
RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs,
uint32_t authenPolicy, uint32_t messageStorePeriod)
uint32_t authenPolicy)
: mGenMtx("GenExchange"),
mDataStore(gds),
mNetService(ns),
@ -78,7 +78,6 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
mServType(servType),
mGixs(gixs),
mAuthenPolicy(authenPolicy),
MESSAGE_STORE_PERIOD(messageStorePeriod),
mCleaning(false),
mLastClean((int)time(NULL) - (int)(RSRandom::random_u32() % MSG_CLEANUP_PERIOD)), // this helps unsynchronising the checks for the different services
mMsgCleanUp(NULL),
@ -94,9 +93,7 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
VALIDATE_FAIL_TRY_LATER(2),
VALIDATE_MAX_WAITING_TIME(60)
{
mDataAccess = new RsGxsDataAccess(gds);
}
void RsGenExchange::setNetworkExchangeService(RsNetworkExchangeService *ns)
@ -104,7 +101,9 @@ void RsGenExchange::setNetworkExchangeService(RsNetworkExchangeService *ns)
if(mNetService != NULL)
std::cerr << "(EE) Cannot override existing network exchange service. Make sure it has been deleted otherwise." << std::endl;
else
{
mNetService = ns ;
}
}
RsGenExchange::~RsGenExchange()
@ -244,10 +243,13 @@ void RsGenExchange::tick()
bool RsGenExchange::messagePublicationTest(const RsGxsMsgMetaData& meta)
{
time_t st = MESSAGE_STORE_PERIOD;
if(!mNetService)
{
std::cerr << "(EE) No network service in service " << std::hex << serviceType() << std::dec << ": cannot read message storage time." << std::endl;
return false ;
}
if(mNetService)
st = mNetService->getKeepAge(meta.mGroupId, st);
uint32_t st = mNetService->getKeepAge(meta.mGroupId);
time_t storageTimeLimit = meta.mPublishTs + st;
@ -1825,10 +1827,12 @@ uint32_t RsGenExchange::getStoragePeriod(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mGenMtx) ;
if(mNetService != NULL)
return mNetService->getKeepAge(grpId,MESSAGE_STORE_PERIOD) ;
else
return MESSAGE_STORE_PERIOD;
if(!mNetService)
{
std::cerr << "(EE) No network service in service " << std::hex << serviceType() << std::dec << ": cannot read message storage time. Returning infinity." << std::endl;
return false ;
}
return mNetService->getKeepAge(grpId) ;
}
void RsGenExchange::setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs)
{

View File

@ -117,9 +117,7 @@ public:
* @param gixs This is used for verification of msgs and groups received by Gen Exchange using identities.
* @param authenPolicy This determines the authentication used for verfying authorship of msgs and groups
*/
RsGenExchange(RsGeneralDataService* gds, RsNetworkExchangeService* ns,
RsSerialType* serviceSerialiser, uint16_t mServType, RsGixs* gixs, uint32_t authenPolicy,
uint32_t messageStorePeriod = RS_GXS_DEFAULT_MSG_STORE_PERIOD);
RsGenExchange(RsGeneralDataService* gds, RsNetworkExchangeService* ns, RsSerialType* serviceSerialiser, uint16_t mServType, RsGixs* gixs, uint32_t authenPolicy);
virtual ~RsGenExchange();
@ -665,7 +663,7 @@ public:
* \brief getDefaultStoragePeriod. All times in seconds.
* \return
*/
virtual uint32_t getDefaultStoragePeriod() { return MESSAGE_STORE_PERIOD; }
virtual uint32_t getDefaultStoragePeriod() { return mNetService->getDefaultKeepAge() ; }
virtual uint32_t getStoragePeriod(const RsGxsGroupId& grpId) ;
virtual void setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) ;
@ -893,8 +891,6 @@ private:
std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > mMsgPendingValidate;
typedef std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > NxsMsgPendingVect;
const uint32_t MESSAGE_STORE_PERIOD;
bool mCleaning;
time_t mLastClean;
RsGxsMessageCleanUp* mMsgCleanUp;

View File

@ -311,7 +311,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs,
const RsServiceInfo serviceInfo,
RsGixsReputation* reputations, RsGcxs* circles, RsGixs *gixs,
PgpAuxUtils *pgpUtils, bool grpAutoSync,bool msgAutoSync)
PgpAuxUtils *pgpUtils, bool grpAutoSync, bool msgAutoSync, uint32_t default_store_period, uint32_t default_sync_period)
: p3ThreadedService(), p3Config(), mTransactionN(0),
mObserver(nxsObs), mDataStore(gds),
mServType(servType), mTransactionTimeOut(TRANSAC_TIMEOUT),
@ -321,11 +321,20 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
mCircles(circles), mGixs(gixs),
mReputations(reputations), mPgpUtils(pgpUtils),
mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),
mServiceInfo(serviceInfo)
mServiceInfo(serviceInfo), mDefaultMsgStorePeriod(default_store_period),
mDefaultMsgSyncPeriod(default_sync_period)
{
addSerialType(new RsNxsSerialiser(mServType));
mOwnId = mNetMgr->getOwnId();
mUpdateCounter = 0;
// check the consistency
if(mDefaultMsgStorePeriod > 0 && mDefaultMsgSyncPeriod > mDefaultMsgStorePeriod)
{
std::cerr << "(WW) in GXS service \"" << getServiceInfo().mServiceName << "\": too large message sync period will be set to message store period." << std::endl;
mDefaultMsgSyncPeriod == mDefaultMsgStorePeriod ;
}
}
void RsGxsNetService::getItemNames(std::map<uint8_t,std::string>& names) const
@ -623,8 +632,8 @@ void RsGxsNetService::syncWithPeers()
msg->PeerId(peerId);
msg->updateTS = updateTS;
int req_delay = (int)mServerGrpConfigMap[grpId].msg_req_delay ;
int keep_delay = (int)mServerGrpConfigMap[grpId].msg_keep_delay ;
int req_delay = (int)locked_getGrpConfig(grpId).msg_req_delay ;
int keep_delay = (int)locked_getGrpConfig(grpId).msg_keep_delay ;
// If we store for less than we request, we request less, otherwise the posts will be deleted after being obtained.
@ -681,7 +690,8 @@ void RsGxsNetService::syncGrpStatistics()
for(std::map<RsGxsGroupId,RsGxsGrpMetaData*>::const_iterator it(grpMeta.begin());it!=grpMeta.end();++it)
{
const RsGxsGrpConfig& rec = mServerGrpConfigMap[it->first] ;
const RsGxsGrpConfig& rec = locked_getGrpConfig(it->first) ;
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " group " << it->first ;
#endif
@ -807,7 +817,8 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl;
#endif
RS_STACK_MUTEX(mNxsMutex) ;
RsGxsGrpConfig& rec(mServerGrpConfigMap[grs->grpId]) ;
RsGxsGrpConfig& rec(locked_getGrpConfig(grs->grpId)) ;
uint32_t old_count = rec.max_visible_count ;
uint32_t old_suppliers_count = rec.suppliers.ids.size() ;
@ -2684,7 +2695,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
uint32_t mcount = msgItemL.size() ;
RsPeerId pid = msgItemL.front()->PeerId() ;
RsGxsGrpConfig& gnsr(mServerGrpConfigMap[grpId]) ;
RsGxsGrpConfig& gnsr(locked_getGrpConfig(grpId));
std::set<RsPeerId>::size_type oldSuppliersCount = gnsr.suppliers.ids.size();
uint32_t oldVisibleCount = gnsr.max_visible_count;
@ -4058,7 +4069,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
if(grp_is_known || mServerGrpConfigMap.find(item->grpId)!=mServerGrpConfigMap.end())
{
RsGxsGrpConfig & rec(mServerGrpConfigMap[item->grpId]) ; // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry
RsGxsGrpConfig& rec(locked_getGrpConfig(item->grpId)); // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry
rec.suppliers.ids.insert(peer) ;
}
if(!peer_can_receive_update)
@ -4128,7 +4139,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
time_t now = time(NULL) ;
uint32_t max_send_delay = mServerGrpConfigMap[item->grpId].msg_req_delay; // we should use "sync" but there's only one variable used in the GUI: the req one.
uint32_t max_send_delay = locked_getGrpConfig(item->grpId).msg_req_delay; // we should use "sync" but there's only one variable used in the GUI: the req one.
if(canSendMsgIds(msgMetas, *grpMeta, peer, should_encrypt_to_this_circle_id))
{
@ -4417,7 +4428,7 @@ bool RsGxsNetService::checkPermissionsForFriendGroup(const RsPeerId& sslId,const
void RsGxsNetService::pauseSynchronisation(bool /* enabled */)
{
std::cerr << "(EE) RsGxsNetService::pauseSynchronisation() called, but not implemented." << std::endl;
}
void RsGxsNetService::setSyncAge(const RsGxsGroupId &grpId, uint32_t age_in_secs)
@ -4426,7 +4437,7 @@ void RsGxsNetService::setSyncAge(const RsGxsGroupId &grpId, uint32_t age_in_secs
locked_checkDelay(age_in_secs) ;
RsGxsGrpConfig& conf(mServerGrpConfigMap[grpId]) ;
RsGxsGrpConfig& conf(locked_getGrpConfig(grpId));
if(conf.msg_req_delay != age_in_secs)
{
@ -4444,7 +4455,7 @@ void RsGxsNetService::setKeepAge(const RsGxsGroupId &grpId, uint32_t age_in_secs
locked_checkDelay(age_in_secs) ;
RsGxsGrpConfig& conf(mServerGrpConfigMap[grpId]) ;
RsGxsGrpConfig& conf(locked_getGrpConfig(grpId));
if(conf.msg_keep_delay != age_in_secs)
{
@ -4453,27 +4464,38 @@ void RsGxsNetService::setKeepAge(const RsGxsGroupId &grpId, uint32_t age_in_secs
}
}
RsGxsGrpConfig& RsGxsNetService::locked_getGrpConfig(const RsGxsGroupId& grp_id)
{
GrpConfigMap::iterator it = mServerGrpConfigMap.find(grp_id);
if(it == mServerGrpConfigMap.end())
{
RsGxsGrpConfig& conf(mServerGrpConfigMap[grp_id]) ;
conf.msg_keep_delay = mDefaultMsgStorePeriod;
conf.msg_send_delay = mDefaultMsgSyncPeriod;
conf.msg_req_delay = mDefaultMsgSyncPeriod;
conf.max_visible_count = 0 ;
conf.update_TS = 0 ;
return conf ;
}
else
return it->second;
}
uint32_t RsGxsNetService::getSyncAge(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mNxsMutex) ;
GrpConfigMap::const_iterator it = mServerGrpConfigMap.find(grpId) ;
if(it == mServerGrpConfigMap.end())
return RS_GXS_DEFAULT_MSG_REQ_PERIOD ;
else
return it->second.msg_req_delay ;
return locked_getGrpConfig(grpId).msg_req_delay ;
}
uint32_t RsGxsNetService::getKeepAge(const RsGxsGroupId& grpId,uint32_t default_value)
uint32_t RsGxsNetService::getKeepAge(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mNxsMutex) ;
GrpConfigMap::const_iterator it = mServerGrpConfigMap.find(grpId) ;
if(it == mServerGrpConfigMap.end())
return default_value ;
else
return it->second.msg_keep_delay ;
return locked_getGrpConfig(grpId).msg_keep_delay ;
}
int RsGxsNetService::requestGrp(const std::list<RsGxsGroupId>& grpId, const RsPeerId& peerId)
@ -4776,6 +4798,8 @@ bool RsGxsNetService::removeGroups(const std::list<RsGxsGroupId>& groups)
GXSNETDEBUG__G(*git) << " deleting info for group " << *git << std::endl;
#endif
// Here we do not use locked_getGrpConfig() because we dont want the entry to be created if it doesnot already exist.
GrpConfigMap::iterator it = mServerGrpConfigMap.find(*git) ;
if(it != mServerGrpConfigMap.end())

View File

@ -88,18 +88,21 @@ public:
* arrive
*/
RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr,
RsNxsObserver *nxsObs, // used to be = NULL.
const RsServiceInfo serviceInfo,
RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL,
PgpAuxUtils *pgpUtils = NULL,
bool grpAutoSync = true, bool msgAutoSync = true);
RsNxsNetMgr *netMgr,
RsNxsObserver *nxsObs, // used to be = NULL.
const RsServiceInfo serviceInfo,
RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL,
PgpAuxUtils *pgpUtils = NULL,
bool grpAutoSync = true, bool msgAutoSync = true,
uint32_t default_store_period = RS_GXS_DEFAULT_MSG_STORE_PERIOD,
uint32_t default_sync_period = RS_GXS_DEFAULT_MSG_REQ_PERIOD);
virtual ~RsGxsNetService();
virtual RsServiceInfo getServiceInfo() { return mServiceInfo; }
virtual void getItemNames(std::map<uint8_t,std::string>& names) const ;
public:
@ -111,9 +114,13 @@ public:
virtual void setKeepAge(const RsGxsGroupId& grpId,uint32_t age_in_secs);
virtual uint32_t getSyncAge(const RsGxsGroupId& id);
virtual uint32_t getKeepAge(const RsGxsGroupId& id,uint32_t default_value);
virtual uint32_t getKeepAge(const RsGxsGroupId& id);
virtual uint32_t getDefaultSyncAge() { return RS_GXS_DEFAULT_MSG_REQ_PERIOD ; }
virtual uint32_t getDefaultSyncAge() { return mDefaultMsgSyncPeriod ; }
virtual uint32_t getDefaultKeepAge() { return mDefaultMsgStorePeriod ; }
virtual void setDefaultKeepAge(uint32_t t) { mDefaultMsgStorePeriod = t ; }
virtual void setDefaultSyncAge(uint32_t t) { mDefaultMsgSyncPeriod = t ; }
/*!
* pauses synchronisation of subscribed groups and request for group id
@ -413,6 +420,7 @@ private:
static RsGxsGroupId hashGrpId(const RsGxsGroupId& gid,const RsPeerId& pid) ;
RsGxsGrpConfig& locked_getGrpConfig(const RsGxsGroupId& grp_id);
private:
typedef std::vector<RsNxsGrp*> GrpFragments;
@ -573,6 +581,9 @@ private:
std::set<RsGxsGroupId> mNewPublishKeysToNotify ;
void debugDump();
uint32_t mDefaultMsgStorePeriod ;
uint32_t mDefaultMsgSyncPeriod ;
};
#endif // RSGXSNETSERVICE_H

View File

@ -57,7 +57,8 @@ bool RsGxsMessageCleanUp::clean()
time_t now = time(NULL);
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " Cleaning up groups in service" << std::hex << mGenExchangeClient->serviceType() << std::dec << std::endl;
uint16_t service_type = mGenExchangeClient->serviceType() ;
GXSUTIL_DEBUG() << " Cleaning up groups in service " << std::hex << service_type << std::dec << std::endl;
#endif
while(!mGrpMeta.empty())
{

View File

@ -73,9 +73,13 @@ public:
virtual void setKeepAge(const RsGxsGroupId& id,uint32_t age_in_secs) =0;
virtual uint32_t getSyncAge(const RsGxsGroupId& id) =0;
virtual uint32_t getKeepAge(const RsGxsGroupId& id,uint32_t default_value) =0;
virtual uint32_t getKeepAge(const RsGxsGroupId& id) =0;
virtual void setDefaultKeepAge(uint32_t t) =0;
virtual void setDefaultSyncAge(uint32_t t) =0;
virtual uint32_t getDefaultSyncAge() =0;
virtual uint32_t getDefaultKeepAge() =0;
/*!
* Initiates a search through the network

View File

@ -92,7 +92,7 @@ public:
p3IdService& identities ) :
RsGenExchange( gds, nes, new RsGxsTransSerializer(),
RS_SERVICE_TYPE_GXS_TRANS, &identities,
AuthenPolicy(), GXS_STORAGE_PERIOD ),
AuthenPolicy()),
GxsTokenQueue(this),
RsGxsTrans(this),
mIdService(identities),
@ -150,6 +150,8 @@ public:
/// @see RsGenExchange::getServiceInfo()
virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_TRANS, "GXS Mails", 0, 1, 0, 1 ); }
static const uint32_t GXS_STORAGE_PERIOD = 0x127500; // 14 days.
static const uint32_t GXS_SYNC_PERIOD = 0x127500;
private:
/** Time interval of inactivity before a distribution group is unsubscribed.
* Approximatively 3 months seems ok ATM. */
@ -168,7 +170,6 @@ private:
* signed acknowledged is received for each of them.
* Two weeks seems fair ATM.
*/
static const uint32_t GXS_STORAGE_PERIOD = 0x127500;
static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS ; // every 20 mins. Could be less.
time_t mLastMsgCleanup ;

View File

@ -1298,16 +1298,10 @@ int RsServer::StartupRetroShare()
//
mPluginsManager->setServiceControl(serviceCtrl) ;
// std::cerr << "rsinitconf (core 1) = " << (void*)rsInitConfig<<std::endl;
// std::cerr << "gxs_passwd (core 1) = " << (void*)&rsInitConfig->gxs_passwd<<" \"" << rsInitConfig->gxs_passwd << "\""<< std::endl;
// Now load the plugins. This parses the available SO/DLL files for known symbols.
//
mPluginsManager->loadPlugins(plugins_directories) ;
// std::cerr << "rsinitconf (core 1) = " << (void*)rsInitConfig<<std::endl;
// std::cerr << "gxs_passwd (core 2) = " << (void*)&rsInitConfig->gxs_passwd<< " \"" << rsInitConfig->gxs_passwd << "\""<< std::endl;
// Also load some plugins explicitly. This is helpful for
// - developping plugins
//
@ -1375,7 +1369,7 @@ int RsServer::StartupRetroShare()
true, // synchronise group automatic
true); // sync messages automatic, since they contain subscription requests.
mGxsCircles->setNetworkExchangeService(gxscircles_ns) ;
mGxsCircles->setNetworkExchangeService(gxscircles_ns) ;
/**** Posted GXS service ****/
@ -1392,8 +1386,7 @@ int RsServer::StartupRetroShare()
mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils);
mPosted->setNetworkExchangeService(posted_ns) ;
mPosted->setNetworkExchangeService(posted_ns) ;
/**** Wiki GXS service ****/
@ -1493,10 +1486,12 @@ int RsServer::StartupRetroShare()
currGxsDir + "/", "gxstrans_db", RS_SERVICE_TYPE_GXS_TRANS,
NULL, rsInitConfig->gxs_passwd );
mGxsTrans = new p3GxsTrans(gxstrans_ds, NULL, *mGxsIdService);
RsGxsNetService* gxstrans_ns = new RsGxsNetService(
RS_SERVICE_TYPE_GXS_TRANS, gxstrans_ds, nxsMgr, mGxsTrans,
mGxsTrans->getServiceInfo(), mReputations, mGxsCircles,
mGxsIdService, pgpAuxUtils);
mGxsIdService, pgpAuxUtils,true,true,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD);
mGxsTrans->setNetworkExchangeService(gxstrans_ns);
pqih->addService(gxstrans_ns, true);
# endif // RS_GXS_TRANS

View File

@ -44,8 +44,6 @@
RsGxsForums *rsGxsForums = NULL;
const uint32_t GXSFORUMS_MSG_STORE_PERIOD = 60*60*24*31*12; // 12 months / 1 year
#define FORUM_TESTEVENT_DUMMYDATA 0x0001
#define DUMMYDATA_PERIOD 60 // long enough for some RsIdentities to be generated.
@ -56,8 +54,7 @@ const uint32_t GXSFORUMS_MSG_STORE_PERIOD = 60*60*24*31*12; // 12 months / 1 yea
p3GxsForums::p3GxsForums( RsGeneralDataService *gds,
RsNetworkExchangeService *nes, RsGixs* gixs ) :
RsGenExchange( gds, nes, new RsGxsForumSerialiser(),
RS_SERVICE_GXS_TYPE_FORUMS, gixs, forumsAuthenPolicy(),
GXSFORUMS_MSG_STORE_PERIOD),
RS_SERVICE_GXS_TYPE_FORUMS, gixs, forumsAuthenPolicy()),
RsGxsForums(this), mGenToken(0), mGenActive(false), mGenCount(0)
{
// Test Data disabled in Repo.