merged upstream/master

This commit is contained in:
csoler 2017-06-15 23:58:29 +02:00
commit 25565a7ecd
19 changed files with 393 additions and 159 deletions

View file

@ -190,52 +190,54 @@ void RsGenExchange::tick()
now = time(NULL);
if(mChecking || (mLastCheck + INTEGRITY_CHECK_PERIOD < now))
{
if(mIntegrityCheck)
mLastCheck = time(NULL);
{
if(mIntegrityCheck->isDone())
RS_STACK_MUTEX(mGenMtx) ;
if(!mIntegrityCheck)
{
std::list<RsGxsGroupId> grpIds;
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > msgIds;
mIntegrityCheck->getDeletedIds(grpIds, msgIds);
if (!grpIds.empty())
{
RS_STACK_MUTEX(mGenMtx) ;
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, false);
gc->mGrpIdList = grpIds;
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
std::cerr << " " << *it << std::endl;
#endif
mNotifications.push_back(gc);
// also notify the network exchange service that these groups no longer exist.
if(mNetService)
mNetService->removeGroups(grpIds) ;
}
if (!msgIds.empty()) {
RS_STACK_MUTEX(mGenMtx) ;
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, false);
c->msgChangeMap = msgIds;
mNotifications.push_back(c);
}
delete mIntegrityCheck;
mIntegrityCheck = NULL;
mLastCheck = time(NULL);
mChecking = false;
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,this,mGixs);
mIntegrityCheck->start("gxs integrity");
mChecking = true;
}
}
else
if(mIntegrityCheck->isDone())
{
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,this,mGixs);
mIntegrityCheck->start("gxs integrity");
mChecking = true;
RS_STACK_MUTEX(mGenMtx) ;
std::list<RsGxsGroupId> grpIds;
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > msgIds;
mIntegrityCheck->getDeletedIds(grpIds, msgIds);
if (!grpIds.empty())
{
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, false);
gc->mGrpIdList = grpIds;
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
std::cerr << " " << *it << std::endl;
#endif
mNotifications.push_back(gc);
// also notify the network exchange service that these groups no longer exist.
if(mNetService)
mNetService->removeGroups(grpIds) ;
}
if (!msgIds.empty())
{
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, false);
c->msgChangeMap = msgIds;
mNotifications.push_back(c);
}
delete mIntegrityCheck;
mIntegrityCheck = NULL;
mChecking = false;
}
}
}
@ -1752,8 +1754,18 @@ void RsGenExchange::deleteGroup(uint32_t& token, const RsGxsGroupId& grpId)
}
void RsGenExchange::deleteMsgs(uint32_t& token, const GxsMsgReq& msgs)
{
RS_STACK_MUTEX(mGenMtx) ;
token = mDataAccess->generatePublicToken();
mMsgDeletePublish.push_back(MsgDeletePublish(msgs, token));
// This code below will suspend any requests of the deleted messages for 24 hrs. This of course only works
// if all friend nodes consistently delete the messages in the mean time.
if(mNetService != NULL)
for(GxsMsgReq::const_iterator it(msgs.begin());it!=msgs.end();++it)
for(uint32_t i=0;i<it->second.size();++i)
mNetService->rejectMessage(it->second[i]) ;
}
void RsGenExchange::publishMsg(uint32_t& token, RsGxsMsgItem *msgItem)

View file

@ -268,7 +268,7 @@ static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_GXS_KEY_MISSING = 0x05 ;
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ;
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("")) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = 0x215 ; // use this to allow to this service id only, or 0 for all services
static const uint32_t service_to_print = RS_SERVICE_TYPE_GXS_TRANS ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {};
@ -447,6 +447,9 @@ void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id)
{
RS_STACK_MUTEX(mNxsMutex) ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << "adding message " << msg_id << " to rejection list for 24hrs." << std::endl;
#endif
mRejectedMessages[msg_id] = time(NULL) ;
}
void RsGxsNetService::cleanRejectedMessages()
@ -595,9 +598,9 @@ void RsGxsNetService::syncWithPeers()
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(peerId,grpId) << " peer can send messages for group " << grpId ;
if(!encrypt_to_this_circle_id.isNull())
std::cerr << " request should be encrypted for circle ID " << encrypt_to_this_circle_id << std::endl;
GXSNETDEBUG_PG(peerId,grpId) << " request should be encrypted for circle ID " << encrypt_to_this_circle_id << std::endl;
else
std::cerr << " request should be sent in clear." << std::endl;
GXSNETDEBUG_PG(peerId,grpId) << " request should be sent in clear." << std::endl;
#endif
// On default, the info has never been received so the TS is 0, meaning the peer has sent that it had no information.
@ -1839,7 +1842,7 @@ void RsGxsNetService::debugDump()
GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(time(NULL),it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl;
}
GXSNETDEBUG___<< " List of rejected message ids: " << mRejectedMessages.size() << std::endl;
GXSNETDEBUG___<< " List of rejected message ids: " << std::dec << mRejectedMessages.size() << std::endl;
#endif
}

View file

@ -136,6 +136,9 @@ RsGxsIntegrityCheck::RsGxsIntegrityCheck(RsGeneralDataService* const dataService
void RsGxsIntegrityCheck::run()
{
check();
RsStackMutex stack(mIntegrityMutex);
mDone = true;
}
bool RsGxsIntegrityCheck::check()
@ -286,71 +289,72 @@ bool RsGxsIntegrityCheck::check()
mDs->removeMsgs(msgsToDel);
RsStackMutex stack(mIntegrityMutex);
mDone = true;
{
RsStackMutex stack(mIntegrityMutex);
std::vector<RsGxsGroupId>::iterator grpIt;
for(grpIt = grpsToDel.begin(); grpIt != grpsToDel.end(); ++grpIt)
{
mDeletedGrps.push_back(*grpIt);
}
mDeletedMsgs = msgsToDel;
std::vector<RsGxsGroupId>::iterator grpIt;
for(grpIt = grpsToDel.begin(); grpIt != grpsToDel.end(); ++grpIt)
{
mDeletedGrps.push_back(*grpIt);
}
mDeletedMsgs = msgsToDel;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "At end of pass, this is the list used GXS ids: " << std::endl;
GXSUTIL_DEBUG() << " requesting them to GXS identity service to enforce loading." << std::endl;
GXSUTIL_DEBUG() << "At end of pass, this is the list used GXS ids: " << std::endl;
GXSUTIL_DEBUG() << " requesting them to GXS identity service to enforce loading." << std::endl;
#endif
std::list<RsPeerId> connected_friends ;
rsPeers->getOnlineList(connected_friends) ;
std::list<RsPeerId> connected_friends ;
rsPeers->getOnlineList(connected_friends) ;
std::vector<std::pair<RsGxsId,RsIdentityUsage> > gxs_ids ;
std::vector<std::pair<RsGxsId,RsIdentityUsage> > gxs_ids ;
for(std::map<RsGxsId,RsIdentityUsage>::const_iterator it(used_gxs_ids.begin());it!=used_gxs_ids.end();++it)
{
gxs_ids.push_back(*it) ;
for(std::map<RsGxsId,RsIdentityUsage>::const_iterator it(used_gxs_ids.begin());it!=used_gxs_ids.end();++it)
{
gxs_ids.push_back(*it) ;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " " << *it << std::endl;
GXSUTIL_DEBUG() << " " << *it << std::endl;
#endif
}
uint32_t nb_requested_not_in_cache = 0;
}
uint32_t nb_requested_not_in_cache = 0;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " issuing random get on friends for non existing IDs" << std::endl;
GXSUTIL_DEBUG() << " issuing random get on friends for non existing IDs" << std::endl;
#endif
// now request a cache update for them, which triggers downloading from friends, if missing.
// now request a cache update for them, which triggers downloading from friends, if missing.
for(;nb_requested_not_in_cache<MAX_GXS_IDS_REQUESTS_NET && !gxs_ids.empty();)
{
uint32_t n = RSRandom::random_u32() % gxs_ids.size() ;
for(;nb_requested_not_in_cache<MAX_GXS_IDS_REQUESTS_NET && !gxs_ids.empty();)
{
uint32_t n = RSRandom::random_u32() % gxs_ids.size() ;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n] ;
GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n] ;
#endif
if(!mGixs->haveKey(gxs_ids[n].first)) // checks if we have it already in the cache (conservative way to ensure that we atually have it)
{
mGixs->requestKey(gxs_ids[n].first,connected_friends,gxs_ids[n].second);
if(!mGixs->haveKey(gxs_ids[n].first)) // checks if we have it already in the cache (conservative way to ensure that we atually have it)
{
mGixs->requestKey(gxs_ids[n].first,connected_friends,gxs_ids[n].second);
++nb_requested_not_in_cache ;
++nb_requested_not_in_cache ;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " ... from cache/net" << std::endl;
GXSUTIL_DEBUG() << " ... from cache/net" << std::endl;
#endif
}
else
{
}
else
{
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " ... already in cache" << std::endl;
GXSUTIL_DEBUG() << " ... already in cache" << std::endl;
#endif
}
mGixs->timeStampKey(gxs_ids[n].first,gxs_ids[n].second);
}
mGixs->timeStampKey(gxs_ids[n].first,gxs_ids[n].second);
gxs_ids[n] = gxs_ids[gxs_ids.size()-1] ;
gxs_ids.pop_back() ;
}
gxs_ids[n] = gxs_ids[gxs_ids.size()-1] ;
gxs_ids.pop_back() ;
}
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " total actual cache requests: "<< nb_requested_not_in_cache << std::endl;
GXSUTIL_DEBUG() << " total actual cache requests: "<< nb_requested_not_in_cache << std::endl;
#endif
}
return true;
}