decided to continue propagating peer details in

- gxsnet now has a ctor option to disable auto group sync

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs_finale@6888 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2013-11-03 23:46:34 +00:00
parent 812ddef40e
commit 67c55991d7
6 changed files with 96 additions and 35 deletions

View File

@ -820,6 +820,7 @@ int RsGenExchange::validateMsg(RsNxsMsg *msg, const uint32_t& grpFlag, RsTlvSecu
}else }else
{ {
std::list<std::string> peers; std::list<std::string> peers;
peers.push_back(msg->PeerId());
mGixs->requestKey(metaData.mAuthorId, peers); mGixs->requestKey(metaData.mAuthorId, peers);
return VALIDATE_FAIL_TRY_LATER; return VALIDATE_FAIL_TRY_LATER;
} }
@ -893,6 +894,7 @@ int RsGenExchange::validateGrp(RsNxsGrp* grp, RsTlvSecurityKeySet& grpKeySet)
}else }else
{ {
std::list<std::string> peers; std::list<std::string> peers;
peers.push_back(grp->PeerId());
mGixs->requestKey(metaData.mAuthorId, peers); mGixs->requestKey(metaData.mAuthorId, peers);
return VALIDATE_FAIL_TRY_LATER; return VALIDATE_FAIL_TRY_LATER;
} }

View File

@ -36,16 +36,16 @@
#define GIXS_CUT_OFF 0 #define GIXS_CUT_OFF 0
#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing) #define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds #define TRANSAC_TIMEOUT 10 // 10 seconds
const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000; const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000;
RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, RsGixsReputation* reputations, RsGcxs* circles) RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, RsGixsReputation* reputations, RsGcxs* circles, bool grpAutoSync)
: p3Config(servType), p3ThreadedService(servType), : p3Config(servType), p3ThreadedService(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), mTransactionN(0), mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), mTransactionN(0),
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD), mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD),
mSyncTs(0), mReputations(reputations), mCircles(circles) mSyncTs(0), mReputations(reputations), mCircles(circles), mGrpAutoSync(grpAutoSync)
{ {
addSerialType(new RsNxsSerialiser(mServType)); addSerialType(new RsNxsSerialiser(mServType));
@ -85,13 +85,16 @@ void RsGxsNetService::syncWithPeers()
std::set<std::string>::iterator sit = peers.begin(); std::set<std::string>::iterator sit = peers.begin();
// for now just grps if(mGrpAutoSync)
for(; sit != peers.end(); sit++)
{ {
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType); // for now just grps
grp->clear(); for(; sit != peers.end(); sit++)
grp->PeerId(*sit); {
sendItem(grp); RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
grp->clear();
grp->PeerId(*sit);
sendItem(grp);
}
} }
#ifdef GXS_ENABLE_SYNC_MSGS #ifdef GXS_ENABLE_SYNC_MSGS
@ -773,8 +776,7 @@ void RsGxsNetService::run(){
bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr) bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr)
{ {
//return tr->mTimeOut < ((uint32_t) time(NULL)); return tr->mTimeOut < ((uint32_t) time(NULL));
return false;
} }
void RsGxsNetService::processTransactions(){ void RsGxsNetService::processTransactions(){

View File

@ -73,7 +73,7 @@ public:
* arrive * arrive
*/ */
RsGxsNetService(uint16_t servType, RsGeneralDataService* gds, RsNxsNetMgr* netMgr, RsGxsNetService(uint16_t servType, RsGeneralDataService* gds, RsNxsNetMgr* netMgr,
RsNxsObserver* nxsObs = NULL, RsGixsReputation* repuations = NULL, RsGcxs* circles = NULL); RsNxsObserver* nxsObs = NULL, RsGixsReputation* repuations = NULL, RsGcxs* circles = NULL, bool grpAutoSync = true);
virtual ~RsGxsNetService(); virtual ~RsGxsNetService();
@ -420,6 +420,7 @@ private:
RsGcxs* mCircles; RsGcxs* mCircles;
RsGixsReputation* mReputations; RsGixsReputation* mReputations;
bool mGrpAutoSync;
// need to be verfied // need to be verfied
std::vector<AuthorPending*> mPendingResp; std::vector<AuthorPending*> mPendingResp;

View File

@ -2268,7 +2268,8 @@ int RsServer::StartupRetroShare()
// create GXS photo service // create GXS photo service
RsGxsNetService* gxsid_ns = new RsGxsNetService( RsGxsNetService* gxsid_ns = new RsGxsNetService(
RS_SERVICE_GXSV2_TYPE_GXSID, gxsid_ds, nxsMgr, RS_SERVICE_GXSV2_TYPE_GXSID, gxsid_ds, nxsMgr,
mGxsIdService, mGxsIdService, mGxsCircles); mGxsIdService, mGxsIdService, mGxsCircles,
false); // don't synchronise group automatic (need explicit group request)
/**** GxsCircle service ****/ /**** GxsCircle service ****/
@ -2653,7 +2654,6 @@ int RsServer::StartupRetroShare()
createThread(*gxsforums_ns); createThread(*gxsforums_ns);
createThread(*gxschannels_ns); createThread(*gxschannels_ns);
#endif // RS_ENABLE_GXS #endif // RS_ENABLE_GXS
ftserver->StartupThreads(); ftserver->StartupThreads();

View File

@ -112,6 +112,7 @@ RsIdentity *rsIdentity = NULL;
#define GXSID_EVENT_DUMMY_PGPID 0x2002 #define GXSID_EVENT_DUMMY_PGPID 0x2002
#define GXSID_EVENT_DUMMY_UNKNOWN_PGPID 0x2003 #define GXSID_EVENT_DUMMY_UNKNOWN_PGPID 0x2003
#define GXSID_EVENT_DUMMY_PSEUDOID 0x2004 #define GXSID_EVENT_DUMMY_PSEUDOID 0x2004
#define GXSID_EVENT_REQUEST_IDS 0x2005
/* delays */ /* delays */
@ -411,9 +412,26 @@ bool p3IdService::requestKey(const RsGxsId &id, const std::list<PeerId> &peers)
{ {
if (haveKey(id)) if (haveKey(id))
return true; return true;
else
{
if(isPendingNetworkRequest(id))
return true;
}
return cache_request_load(id); return cache_request_load(id);
} }
bool p3IdService::isPendingNetworkRequest(const RsGxsId& gxsId) const
{
// if ids has beens confirmed as not physically present return
// immediately, id will be removed from list if found by auto nxs net search
if(mIdsNotPresent.find(gxsId) != mIdsNotPresent.end())
return true;
return false;
}
int p3IdService::getKey(const RsGxsId &id, RsTlvSecurityKey &key) int p3IdService::getKey(const RsGxsId &id, RsTlvSecurityKey &key)
{ {
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
@ -1298,7 +1316,7 @@ bool p3IdService::cache_store(const RsGxsIdGroupItem *item)
#define MIN_CYCLE_GAP 2 #define MIN_CYCLE_GAP 2
bool p3IdService::cache_request_load(const RsGxsId &id) bool p3IdService::cache_request_load(const RsGxsId &id, const std::list<std::string>& peers)
{ {
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
std::cerr << "p3IdService::cache_request_load(" << id << ")"; std::cerr << "p3IdService::cache_request_load(" << id << ")";
@ -1307,7 +1325,7 @@ bool p3IdService::cache_request_load(const RsGxsId &id)
{ {
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheLoad_ToCache.push_back(id); mCacheLoad_ToCache.insert(std::make_pair(id, peers));
} }
if (RsTickEvent::event_count(GXSID_EVENT_CACHELOAD) > 0) if (RsTickEvent::event_count(GXSID_EVENT_CACHELOAD) > 0)
@ -1339,16 +1357,17 @@ bool p3IdService::cache_start_load()
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
/* now we process the modGroupList -> a map so we can use it easily later, and create id list too */ /* now we process the modGroupList -> a map so we can use it easily later, and create id list too */
std::list<RsGxsId>::iterator it; std::map<RsGxsId, std::list<std::string> >::iterator it;
for(it = mCacheLoad_ToCache.begin(); it != mCacheLoad_ToCache.end(); it++) for(it = mCacheLoad_ToCache.begin(); it != mCacheLoad_ToCache.end(); it++)
{ {
#ifdef DEBUG_IDS #ifdef DEBUG_IDS
std::cerr << "p3IdService::cache_start_load() GroupId: " << *it; std::cerr << "p3IdService::cache_start_load() GroupId: " << it->first;
std::cerr << std::endl; std::cerr << std::endl;
#endif // DEBUG_IDS #endif // DEBUG_IDS
groupIds.push_back(*it); // might need conversion? groupIds.push_back(it->first); // might need conversion?
} }
// mPendingCache.insert(mCacheLoad_ToCache.begin(), mCacheLoad_ToCache.end());
mCacheLoad_ToCache.clear(); mCacheLoad_ToCache.clear();
} }
@ -1365,12 +1384,7 @@ bool p3IdService::cache_start_load()
uint32_t token = 0; uint32_t token = 0;
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts, groupIds); RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts, groupIds);
GxsTokenQueue::queueRequest(token, GXSIDREQ_CACHELOAD); GxsTokenQueue::queueRequest(token, GXSIDREQ_CACHELOAD);
std::set<RsGxsGroupId> groupIdSet;
groupIdSet.insert(groupIds.begin(), groupIds.end());
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mGroupsToCache.insert(std::make_pair(token, groupIdSet));
} }
return 1; return 1;
} }
@ -1410,7 +1424,7 @@ bool p3IdService::cache_load_for_token(uint32_t token)
{ {
// remove identities that are present // remove identities that are present
RsStackMutex stack(mIdMtx); RsStackMutex stack(mIdMtx);
mGroupsToCache[token].erase(item->meta.mGroupId); mPendingCache.erase(item->meta.mGroupId);
} }
/* cache the data */ /* cache the data */
@ -1420,13 +1434,13 @@ bool p3IdService::cache_load_for_token(uint32_t token)
{ {
// now store identities that aren't present // now store identities that aren't present
RsStackMutex stack(mIdMtx); RsStackMutex stack(mIdMtx);
const std::set<RsGxsGroupId>& groupIdSet = mGroupsToCache[token]; mIdsNotPresent.insert(mPendingCache.begin(), mPendingCache.end());
mPendingCache.clear();
if(!groupIdSet.empty()) if(!mIdsNotPresent.empty())
mGroupNotPresent[token].assign(groupIdSet.begin(), groupIdSet.end()); schedule_now(GXSID_EVENT_REQUEST_IDS);
mGroupsToCache.erase(token);
} }
} }
@ -1440,6 +1454,39 @@ bool p3IdService::cache_load_for_token(uint32_t token)
return true; return true;
} }
void p3IdService::requestIdsFromNet()
{
RsStackMutex stack(mIdMtx);
std::map<RsGxsId, std::list<std::string> >::const_iterator cit;
std::map<std::string, std::list<RsGxsId> > requests;
// transform to appropriate structure (<peer, std::list<RsGxsId> > map) to make request to nes
for(cit = mIdsNotPresent.begin(); cit != mIdsNotPresent.end(); cit++)
{
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::requestIdsFromNet() Id not found, deferring for net request: ";
std::cerr << cit->first;
std::cerr << std::endl;
#endif // DEBUG_IDS
}
const std::list<std::string>& peers = cit->second;
std::list<std::string>::const_iterator cit2;
for(cit2 = peers.begin(); cit2 != peers.end(); cit2++)
requests[*cit2].push_back(cit->first);
}
std::map<std::string, std::list<RsGxsId> >::const_iterator cit2;
for(cit2 = requests.begin(); cit2 != requests.end(); cit2++)
mNes->requestGrp(cit2->second, cit2->first);
mIdsNotPresent.clear();
}
bool p3IdService::cache_update_if_cached(const RsGxsId &id, std::string serviceString) bool p3IdService::cache_update_if_cached(const RsGxsId &id, std::string serviceString)
{ {
/* if these entries are cached - update with new info */ /* if these entries are cached - update with new info */
@ -2483,7 +2530,7 @@ bool p3IdService::recogn_process()
bool isDone = false; bool isDone = false;
{ {
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (!mRecognGroupsToProcess.empty()) if (!mRecognGroupsToProcess.empty() && !mGroupsToProcess.empty())
{ {
item = mRecognGroupsToProcess.front(); item = mRecognGroupsToProcess.front();
mGroupsToProcess.pop_front(); mGroupsToProcess.pop_front();
@ -3735,6 +3782,10 @@ void p3IdService::handle_event(uint32_t event_type, const std::string &elabel)
case GXSID_EVENT_DUMMY_PSEUDOID: case GXSID_EVENT_DUMMY_PSEUDOID:
generateDummy_UnknownPseudo(); generateDummy_UnknownPseudo();
break; break;
case GXSID_EVENT_REQUEST_IDS:
requestIdsFromNet();
break;
default: default:
/* error */ /* error */

View File

@ -284,16 +284,20 @@ virtual void handle_event(uint32_t event_type, const std::string &elabel);
*/ */
int cache_tick(); int cache_tick();
bool cache_request_load(const RsGxsId &id); bool cache_request_load(const RsGxsId &id, const std::list<std::string>& peers = std::list<std::string>());
bool cache_start_load(); bool cache_start_load();
bool cache_load_for_token(uint32_t token); bool cache_load_for_token(uint32_t token);
bool cache_store(const RsGxsIdGroupItem *item); bool cache_store(const RsGxsIdGroupItem *item);
bool cache_update_if_cached(const RsGxsId &id, std::string serviceString); bool cache_update_if_cached(const RsGxsId &id, std::string serviceString);
bool isPendingNetworkRequest(const RsGxsId& gxsId) const;
void requestIdsFromNet();
// Mutex protected. // Mutex protected.
std::list<RsGxsId> mCacheLoad_ToCache; //std::list<RsGxsId> mCacheLoad_ToCache;
std::map<RsGxsId, std::list<std::string> > mCacheLoad_ToCache, mPendingCache;
// Switching to RsMemCache for Key Caching. // Switching to RsMemCache for Key Caching.
RsMemCache<RsGxsId, RsGxsIdCache> mPublicKeyCache; RsMemCache<RsGxsId, RsGxsIdCache> mPublicKeyCache;
@ -421,8 +425,9 @@ std::string genRandomId(int len = 20);
private: private:
std::map<uint32_t, std::set<RsGxsGroupId> > mGroupsToCache; std::map<uint32_t, std::set<RsGxsGroupId> > mIdsPendingCache;
std::map<uint32_t, std::list<RsGxsGroupId> > mGroupNotPresent; std::map<uint32_t, std::list<RsGxsGroupId> > mGroupNotPresent;
std::map<RsGxsId, std::list<std::string> > mIdsNotPresent;
RsNetworkExchangeService* mNes; RsNetworkExchangeService* mNes;
}; };