Merge pull request #654 from csoler/v0.6-GXS-LimitedSync2

V0.6 gxs limited sync2
This commit is contained in:
csoler 2017-01-21 15:06:46 +01:00 committed by GitHub
commit e3240de8e0
8 changed files with 76 additions and 13 deletions

View File

@ -1634,10 +1634,14 @@ bool RsDataService::locked_removeGroupEntries(const std::vector<RsGxsGroupId>& g
const RsGxsGroupId& grpId = *vit; const RsGxsGroupId& grpId = *vit;
mDb->sqlDelete(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", ""); mDb->sqlDelete(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
// also remove the group meta from cache.
mGrpMetaDataCache.erase(*vit) ;
} }
ret &= mDb->commitTransaction(); ret &= mDb->commitTransaction();
mGrpMetaDataCache_ContainsAllDatabase = false ;
return ret; return ret;
} }
uint32_t RsDataService::cacheSize() const { uint32_t RsDataService::cacheSize() const {

View File

@ -90,6 +90,8 @@ public:
uint32_t mSuppliers ; uint32_t mSuppliers ;
uint32_t mMaxVisibleCount ; uint32_t mMaxVisibleCount ;
bool mGrpAutoSync ;
bool mAllowMsgSync;
}; };
typedef std::map<RsGxsGroupId, std::vector<RsNxsMsg*> > NxsMsgDataResult; typedef std::map<RsGxsGroupId, std::vector<RsNxsMsg*> > NxsMsgDataResult;

View File

@ -63,8 +63,8 @@ static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key
//#define GEN_EXCH_DEBUG 1 //#define GEN_EXCH_DEBUG 1
#define MSG_CLEANUP_PERIOD 60*59 // 59 minutes static const uint32_t MSG_CLEANUP_PERIOD = 60*59; // 59 minutes
#define INTEGRITY_CHECK_PERIOD 60*31 // 31 minutes static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes
RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns, RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns,
RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs, RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs,
@ -81,7 +81,7 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
mLastClean((int)time(NULL) - (int)(RSRandom::random_u32() % MSG_CLEANUP_PERIOD)), // this helps unsynchronising the checks for the different services mLastClean((int)time(NULL) - (int)(RSRandom::random_u32() % MSG_CLEANUP_PERIOD)), // this helps unsynchronising the checks for the different services
mMsgCleanUp(NULL), mMsgCleanUp(NULL),
mChecking(false), mChecking(false),
mLastCheck((int)time(NULL) - (int)(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD)), // this helps unsynchronising the checks for the different services mLastCheck((int)time(NULL) - (int)(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD) + 120), // this helps unsynchronising the checks for the different services, with 2 min security to avoid checking right away before statistics come up.
mIntegrityCheck(NULL), mIntegrityCheck(NULL),
CREATE_FAIL(0), CREATE_FAIL(0),
CREATE_SUCCESS(1), CREATE_SUCCESS(1),
@ -211,6 +211,11 @@ void RsGenExchange::tick()
std::cerr << " " << *it << std::endl; std::cerr << " " << *it << std::endl;
#endif #endif
mNotifications.push_back(gc); mNotifications.push_back(gc);
// also notify the network exchange service that these groups no longer exist.
if(mNetService)
mNetService->removeGroups(grpIds) ;
} }
if (!msgIds.empty()) { if (!msgIds.empty()) {
@ -1718,6 +1723,11 @@ uint32_t RsGenExchange::getSyncPeriod(const RsGxsGroupId& grpId)
return RS_GXS_DEFAULT_MSG_REQ_PERIOD; return RS_GXS_DEFAULT_MSG_REQ_PERIOD;
} }
bool RsGenExchange::getGroupNetworkStats(const RsGxsGroupId& grpId,RsGroupNetworkStats& stats)
{
return (!mNetService) || mNetService->getGroupNetworkStats(grpId,stats) ;
}
void RsGenExchange::setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) void RsGenExchange::setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs)
{ {
if(mNetService != NULL) if(mNetService != NULL)

View File

@ -646,6 +646,7 @@ public:
virtual uint32_t getDefaultSyncPeriod(); virtual uint32_t getDefaultSyncPeriod();
virtual uint32_t getSyncPeriod(const RsGxsGroupId& grpId) ; virtual uint32_t getSyncPeriod(const RsGxsGroupId& grpId) ;
virtual void setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) ; virtual void setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) ;
virtual bool getGroupNetworkStats(const RsGxsGroupId& grpId,RsGroupNetworkStats& stats);
uint16_t serviceType() const { return mServType ; } uint16_t serviceType() const { return mServType ; }
uint32_t serviceFullType() const { return ((uint32_t)mServType << 8) + (((uint32_t) RS_PKT_VERSION_SERVICE) << 24); } uint32_t serviceFullType() const { return ((uint32_t)mServType << 8) + (((uint32_t) RS_PKT_VERSION_SERVICE) << 24); }

View File

@ -225,12 +225,12 @@
NXS_NET_DEBUG_7 encryption/decryption of transactions NXS_NET_DEBUG_7 encryption/decryption of transactions
***/ ***/
//#define NXS_NET_DEBUG_0 1 #define NXS_NET_DEBUG_0 1
//#define NXS_NET_DEBUG_1 1 //#define NXS_NET_DEBUG_1 1
//#define NXS_NET_DEBUG_2 1 //#define NXS_NET_DEBUG_2 1
//#define NXS_NET_DEBUG_3 1 //#define NXS_NET_DEBUG_3 1
//#define NXS_NET_DEBUG_4 1 //#define NXS_NET_DEBUG_4 1
//#define NXS_NET_DEBUG_5 1 #define NXS_NET_DEBUG_5 1
//#define NXS_NET_DEBUG_6 1 //#define NXS_NET_DEBUG_6 1
//#define NXS_NET_DEBUG_7 1 //#define NXS_NET_DEBUG_7 1
@ -2311,6 +2311,8 @@ bool RsGxsNetService::getGroupNetworkStats(const RsGxsGroupId& gid,RsGroupNetwor
stats.mSuppliers = it->second.suppliers.ids.size(); stats.mSuppliers = it->second.suppliers.ids.size();
stats.mMaxVisibleCount = it->second.max_visible_count ; stats.mMaxVisibleCount = it->second.max_visible_count ;
stats.mAllowMsgSync = mAllowMsgSync ;
stats.mGrpAutoSync = mGrpAutoSync ;
return true ; return true ;
} }
@ -4046,7 +4048,12 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << "handleRecvSyncMsg(): Received last update TS of group " << item->grpId << ", for peer " << peer << ", TS = " << time(NULL) - item->updateTS << " secs ago." ; GXSNETDEBUG_PG(item->PeerId(),item->grpId) << "handleRecvSyncMsg(): Received last update TS of group " << item->grpId << ", for peer " << peer << ", TS = " << time(NULL) - item->updateTS << " secs ago." ;
#endif #endif
if(grp_is_known) // We update suppliers in two cases:
// Case 1: the grp is known because it is the hash of an existing group, but it's not yet in the server config map
// Case 2: the gtp is not known, possibly because it was deleted, but there's an entry in mServerGrpConfigMap due to statistics gathering. Still, statistics are only
// gathered from known suppliers. So statistics never add new suppliers. These are only added here.
if(grp_is_known || mServerGrpConfigMap.find(item->grpId)!=mServerGrpConfigMap.end())
{ {
RsGxsGrpConfig & rec(mServerGrpConfigMap[item->grpId]) ; // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry RsGxsGrpConfig & rec(mServerGrpConfigMap[item->grpId]) ; // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry
rec.suppliers.ids.insert(peer) ; rec.suppliers.ids.insert(peer) ;
@ -4724,7 +4731,7 @@ void RsGxsNetService::handleRecvPublishKeys(RsNxsGroupPublishKeyItem *item)
if(ret) if(ret)
{ {
#ifdef NXS_NET_DEBUG #ifdef NXS_NET_DEBUG_3
GXSNETDEBUG_PG(item->PeerId(),item->grpId)<< " updated database with new publish keys." << std::endl; GXSNETDEBUG_PG(item->PeerId(),item->grpId)<< " updated database with new publish keys." << std::endl;
#endif #endif
mNewPublishKeysToNotify.insert(item->grpId) ; mNewPublishKeysToNotify.insert(item->grpId) ;
@ -4751,6 +4758,39 @@ bool RsGxsNetService::getGroupServerUpdateTS(const RsGxsGroupId& gid,time_t& gro
return true ; return true ;
} }
bool RsGxsNetService::removeGroups(const std::list<RsGxsGroupId>& groups)
{
RS_STACK_MUTEX(mNxsMutex) ;
GXSNETDEBUG___ << "Removing group information from deleted groups:" << std::endl;
for(std::list<RsGxsGroupId>::const_iterator git(groups.begin());git!=groups.end();++git)
{
GXSNETDEBUG__G(*git) << " deleting info for group " << *git << std::endl;
GrpConfigMap::iterator it = mServerGrpConfigMap.find(*git) ;
if(it != mServerGrpConfigMap.end())
{
it->second.suppliers.TlvClear(); // we dont erase the entry, because we want to keep the user-defined sync parameters.
it->second.max_visible_count = 0;
}
mServerMsgUpdateMap.erase(*git) ;
for(ClientMsgMap::iterator it(mClientMsgUpdateMap.begin());it!=mClientMsgUpdateMap.end();++it)
it->second.msgUpdateInfos.erase(*git) ;
// This last step is very important: it makes RS re-sync all groups after deleting, with every new peer. If may happen indeed that groups
// are deleted because there's no suppliers since the actual supplier friend is offline for too long. In this case, the group needs
// to re-appear when the friend who is a subscriber comes online again.
mClientGrpUpdateMap.clear();
}
IndicateConfigChanged();
}
bool RsGxsNetService::stampMsgServerUpdateTS(const RsGxsGroupId& gid) bool RsGxsNetService::stampMsgServerUpdateTS(const RsGxsGroupId& gid)
{ {
RS_STACK_MUTEX(mNxsMutex) ; RS_STACK_MUTEX(mNxsMutex) ;

View File

@ -161,6 +161,7 @@ public:
virtual bool getGroupServerUpdateTS(const RsGxsGroupId& gid,time_t& grp_server_update_TS,time_t& msg_server_update_TS) ; virtual bool getGroupServerUpdateTS(const RsGxsGroupId& gid,time_t& grp_server_update_TS,time_t& msg_server_update_TS) ;
virtual bool stampMsgServerUpdateTS(const RsGxsGroupId& gid) ; virtual bool stampMsgServerUpdateTS(const RsGxsGroupId& gid) ;
virtual bool removeGroups(const std::list<RsGxsGroupId>& groups);
/* p3Config methods */ /* p3Config methods */
public: public:

View File

@ -188,20 +188,17 @@ bool RsGxsIntegrityCheck::check()
grpsToDel.push_back(grp->grpId); grpsToDel.push_back(grp->grpId);
} }
#ifdef TODO if(!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) && !(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) && !(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_PUBLISH))
if(!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED))
{ {
RsGroupNetworkStats stats ; RsGroupNetworkStats stats ;
mGenExchangeClient->getGroupNetworkStats(grp->grpId,stats); mGenExchangeClient->getGroupNetworkStats(grp->grpId,stats);
if(stats.mSuppliers == 0 && stats.mMaxVisibleCount == 0) if(stats.mSuppliers == 0 && stats.mMaxVisibleCount == 0 && stats.mGrpAutoSync)
{ {
GXSUTIL_DEBUG() << "Scheduling group \"" << grp->metaData->mGroupName << "\" ID=" << grp->grpId << " for deletion because it has no suppliers not any visible data at friends." << std::endl; GXSUTIL_DEBUG() << "Scheduling group \"" << grp->metaData->mGroupName << "\" ID=" << grp->grpId << " in service " << std::hex << mGenExchangeClient->serviceType() << std::dec << " for deletion because it has no suppliers not any visible data at friends." << std::endl;
#warning Should we do that here? What happens for groups that are normally empty such as identities?
grpsToDel.push_back(grp->grpId); grpsToDel.push_back(grp->grpId);
} }
} }
#endif
delete grp; delete grp;
} }

View File

@ -162,6 +162,14 @@ public:
*/ */
virtual bool stampMsgServerUpdateTS(const RsGxsGroupId& gid) =0; virtual bool stampMsgServerUpdateTS(const RsGxsGroupId& gid) =0;
/*!
* \brief removeGroups
* Removes time stamp information from the list of groups. This allows to re-sync them if suppliers are present.
* \param groups list of groups to remove from the update maps
* \return true if nothing bad happens.
*/
virtual bool removeGroups(const std::list<RsGxsGroupId>& groups)=0;
/*! /*!
* \brief minReputationForForwardingMessages * \brief minReputationForForwardingMessages
* Encodes the policy for sending/requesting messages depending on anti-spam settings. * Encodes the policy for sending/requesting messages depending on anti-spam settings.