Improvements to Identity service.

* Added Id & Wiki serialisers to constructors.
 * Enabled cache_tick()... thought it doesn't process anything yet.
 * Added debug to Id Key cache system.
 * Fixed requests to match expected params.
 * Added cachetest basics - this is not finished.
 * Enabled generation of Lots of Dummy Ids - This demonstrates some queuing issues.
 * Added #define to easily enable GxsId as only new service - for testing.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5756 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-11-02 00:45:50 +00:00
parent c46acc29de
commit bd6b17a301
4 changed files with 221 additions and 30 deletions

View File

@ -95,13 +95,13 @@ bool RsGxsDataAccess::requestGroupInfo(uint32_t &token, uint32_t ansType, const
if(req == NULL)
{
std::cerr << "RsGxsDataAccess::requestMsgInfo() request type not recognised, type "
std::cerr << "RsGxsDataAccess::requestGroupInfo() request type not recognised, type "
<< reqType << std::endl;
return false;
}else
{
generateToken(token);
std::cerr << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl;
std::cerr << "RsGxsDataAccess::requestGroupInfo() gets Token: " << token << std::endl;
}
setReq(req, token, ansType, opts);
@ -134,13 +134,13 @@ bool RsGxsDataAccess::requestGroupInfo(uint32_t &token, uint32_t ansType, const
if(req == NULL)
{
std::cerr << "RsGxsDataAccess::requestMsgInfo() request type not recognised, type "
std::cerr << "RsGxsDataAccess::requestGroupInfo() request type not recognised, type "
<< reqType << std::endl;
return false;
}else
{
generateToken(token);
std::cerr << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl;
std::cerr << "RsGxsDataAccess::requestGroupInfo() gets Token: " << token << std::endl;
}
setReq(req, token, ansType, opts);

View File

@ -1814,6 +1814,7 @@ RsTurtle *rsTurtle = NULL ;
#define ENABLE_GXS_SERVICES 1
#define ENABLE_GXS_CORE 1
#define ENABLE_OTHER_GXS_SERVICES 1 // DISABLE TO LEAVE ONLY GXSID (for testing)
#ifdef ENABLE_GXS_CORE
#include "gxs/gxscoreserver.h"
@ -2314,7 +2315,7 @@ int RsServer::StartupRetroShare()
RS_SERVICE_GXSV1_TYPE_GXSID, gxsid_ds, nxsMgr, mGxsIdService);
#if ENABLE_OTHER_GXS_SERVICES
/**** Photo service ****/
p3PhotoServiceV2 *mPhotoV2 = NULL;
@ -2362,7 +2363,7 @@ int RsServer::StartupRetroShare()
// create GXS photo service
RsGxsNetService* wiki_ns = new RsGxsNetService(
RS_SERVICE_GXSV1_TYPE_WIKI, wiki_ds, nxsMgr, mWiki);
#endif
#endif // ENABLE_GXS_SERVICES
@ -2372,21 +2373,27 @@ int RsServer::StartupRetroShare()
GxsCoreServer* mGxsCore = new GxsCoreServer();
mGxsCore->addService(mGxsIdService);
#if ENABLE_OTHER_GXS_SERVICES
mGxsCore->addService(mPhotoV2);
mGxsCore->addService(mPosted);
mGxsCore->addService(mWiki);
#endif
// cores ready start up GXS net servers
createThread(*gxsid_ns);
#if ENABLE_OTHER_GXS_SERVICES
createThread(*photo_ns);
createThread(*posted_ns);
createThread(*wiki_ns);
#endif
// now add to p3service
pqih->addService(gxsid_ns);
#if ENABLE_OTHER_GXS_SERVICES
pqih->addService(photo_ns);
pqih->addService(posted_ns);
pqih->addService(wiki_ns);
#endif
// start up gxs core server
createThread(*mGxsCore);
@ -2653,9 +2660,11 @@ int RsServer::StartupRetroShare()
// Testing of new cache system interfaces.
rsIdentity = mGxsIdService;
#if ENABLE_OTHER_GXS_SERVICES
rsWiki = mWiki;
rsPosted = mPosted;
rsPhotoV2 = mPhotoV2;
#endif
rsWireVEG = mWire;
rsForumsVEG = mForumsV2;

View File

@ -51,7 +51,7 @@ RsIdentity *rsIdentity = NULL;
/********************************************************************************/
p3IdService::p3IdService(RsGeneralDataService *gds, RsNetworkExchangeService *nes)
: RsGxsIdExchange(gds, nes, NULL, RS_SERVICE_GXSV1_TYPE_GXSID), RsIdentity(this),
: RsGxsIdExchange(gds, nes, new RsGxsIdSerialiser(), RS_SERVICE_GXSV1_TYPE_GXSID), RsIdentity(this),
mIdMtx("p3IdService")
{
@ -65,13 +65,19 @@ void p3IdService::service_tick()
// Disable for now.
// background_tick();
//cache_tick();
cache_tick();
// internal testing - request keys. (NOT FINISHED YET)
//cachetest_tick();
return;
}
void p3IdService::notifyChanges(std::vector<RsGxsNotify *> &changes)
{
std::cerr << "p3IdService::notifyChanges()";
std::cerr << std::endl;
receiveChanges(changes);
}
@ -279,6 +285,9 @@ RsGxsIdCache::RsGxsIdCache(const RsGxsIdGroupItem *item)
id = item->meta.mGroupId;
name = item->meta.mGroupName;
std::cerr << "RsGxsIdCache::RsGxsIdCache() for: " << id;
std::cerr << std::endl;
/* extract key from keys */
bool key_ok = false;
@ -320,8 +329,14 @@ bool p3IdService::cache_is_loaded(const RsGxsId &id)
it = mCacheDataMap.find(id);
if (it == mCacheDataMap.end())
{
std::cerr << "p3IdService::cache_is_loaded(" << id << ") false";
std::cerr << std::endl;
return false;
}
std::cerr << "p3IdService::cache_is_loaded(" << id << ") false";
std::cerr << std::endl;
return true;
}
@ -334,9 +349,15 @@ bool p3IdService::cache_fetch(const RsGxsId &id, RsGxsIdCache &data)
it = mCacheDataMap.find(id);
if (it == mCacheDataMap.end())
{
std::cerr << "p3IdService::cache_fetch(" << id << ") false";
std::cerr << std::endl;
return false;
}
std::cerr << "p3IdService::cache_fetch(" << id << ") OK";
std::cerr << std::endl;
data = it->second;
/* update ts on data */
@ -351,6 +372,11 @@ bool p3IdService::cache_fetch(const RsGxsId &id, RsGxsIdCache &data)
bool p3IdService::cache_store(const RsGxsIdGroupItem *item)
{
std::cerr << "p3IdService::cache_store() Item: ";
std::cerr << std::endl;
//item->print(std::cerr, 0); NEEDS CONST!!!! TODO
std::cerr << std::endl;
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// Create Cache Data.
@ -362,6 +388,8 @@ bool p3IdService::cache_store(const RsGxsIdGroupItem *item)
if (it != mCacheDataMap.end())
{
// ERROR.
std::cerr << "p3IdService::cache_store() ERROR entry exists already";
std::cerr << std::endl;
return false;
}
@ -383,6 +411,9 @@ bool p3IdService::locked_cache_update_lrumap(const RsGxsId &key, time_t old_ts,
{
if (old_ts == 0)
{
std::cerr << "p3IdService::locked_cache_update_lrumap(" << key << ") just insert!";
std::cerr << std::endl;
LruData data;
data.key = key;
/* new insertion */
@ -401,20 +432,30 @@ bool p3IdService::locked_cache_update_lrumap(const RsGxsId &key, time_t old_ts,
{
LruData data = mit->second;
mCacheLruMap.erase(mit);
std::cerr << "p3IdService::locked_cache_update_lrumap(" << key << ") rm old";
std::cerr << std::endl;
if (new_ts != 0) // == 0, means remove.
{
std::cerr << "p3IdService::locked_cache_update_lrumap(" << key << ") added new_ts";
std::cerr << std::endl;
mCacheLruMap.insert(std::make_pair(new_ts, data));
}
return true;
}
}
std::cerr << "p3IdService::locked_cache_update_lrumap(" << key << ") ERROR";
std::cerr << std::endl;
return false;
}
bool p3IdService::cache_resize()
{
std::cerr << "p3IdService::cache_resize()";
std::cerr << std::endl;
int count_to_clear = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
@ -424,12 +465,15 @@ bool p3IdService::cache_resize()
(mCacheLruMap.size() != mCacheDataCount))
{
// ERROR.
std::cerr << "p3IdService::cache_resize() CONSISTENCY ERROR";
std::cerr << std::endl;
}
if (mCacheDataCount > MAX_CACHE_SIZE)
{
count_to_clear = mCacheDataCount - MAX_CACHE_SIZE;
std::cerr << "p3IdService::cache_resize() to_clear: " << count_to_clear;
std::cerr << std::endl;
}
}
@ -466,6 +510,8 @@ bool p3IdService::cache_discard_LRU(int count_to_clear)
}
else
{
std::cerr << "p3IdService::cache_discard_LRU() removing: " << data.key;
std::cerr << std::endl;
mCacheDataMap.erase(it);
mCacheDataCount--;
}
@ -491,11 +537,8 @@ bool p3IdService::cache_discard_LRU(int count_to_clear)
int p3IdService::cache_tick()
{
std::cerr << "p3IdService::cache_tick()";
std::cerr << std::endl;
// Run Background Stuff.
background_checkTokenRequest();
//std::cerr << "p3IdService::cache_tick()";
//std::cerr << std::endl;
/* every minute - run a background check */
time_t now = time(NULL);
@ -522,6 +565,9 @@ int p3IdService::cache_tick()
bool p3IdService::cache_request_load(const RsGxsId &id)
{
std::cerr << "p3IdService::cache_request_load(" << id << ")";
std::cerr << std::endl;
bool start = false;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
@ -548,10 +594,6 @@ bool p3IdService::cache_start_load()
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheLoad_LastCycle = time(NULL);
mCacheLoad_Status = 1;
/* now we process the modGroupList -> a map so we can use it easily later, and create id list too */
std::list<RsGxsId>::iterator it;
for(it = mCacheLoad_ToCache.begin(); it != mCacheLoad_ToCache.end(); it++)
@ -562,15 +604,25 @@ bool p3IdService::cache_start_load()
mCacheLoad_ToCache.clear();
}
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
RsTokReqOptions opts;
uint32_t token = 0;
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts, groupIds);
if (groupIds.size() > 0)
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheLoad_Tokens.push_back(token);
std::cerr << "p3IdService::cache_start_load() #Groups: " << groupIds.size();
std::cerr << std::endl;
mCacheLoad_LastCycle = time(NULL);
mCacheLoad_Status = 1;
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA;
uint32_t token = 0;
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheLoad_Tokens.push_back(token);
}
}
return 1;
}
@ -584,6 +636,9 @@ bool p3IdService::cache_check_loading()
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
for(it = mCacheLoad_Tokens.begin(); it != mCacheLoad_Tokens.end();)
{
std::cerr << "p3IdService::cache_check_loading() token: " << *it;
std::cerr << std::endl;
uint32_t token = *it;
uint32_t status = RsGenExchange::getTokenService()->requestStatus(token);
//checkRequestStatus(token, status, reqtype, anstype, ts);
@ -656,6 +711,136 @@ bool p3IdService::cache_check_consistency()
return false;
}
/************************************************************************************/
/************************************************************************************/
#if 0
bool p3IdService::cachetest_tick()
{
/* every minute - run a background check */
time_t now = time(NULL);
bool doCycle = false;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (now - mCacheTest_LastTs > TEST_PERIOD)
{
doTest = true;
mCacheTest_LastTs = now;
}
}
if (doCycle)
{
cachetest_getlist();
}
cachetest_request();
return true;
}
bool p3IdService::cachetest_getlist()
{
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (mCacheTest_Active)
{
return false;
}
}
uint32_t ansType = RS_TOKREQ_ANSTYPE_LIST;
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_GROUP_LIST;
uint32_t token = 0;
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheTest_Token = token;
mCacheTest_Active = true;
}
return true;
}
bool p3IdService::cachetest_request()
{
uint32_t token = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (!mCacheTest_Active)
{
return false;
}
token = mCacheTest_Token;
}
uint32_t status = RsGenExchange::getTokenService()->requestStatus(token);
if (status == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE)
{
std::cerr << "p3IdService::cache_load_for_token() : " << token;
std::cerr << std::endl;
std::vector<RsGxsId> grpIds;
bool ok = RsGenExchange::getGroupList(token, grpIds);
if(ok)
{
std::vector<RsGxsId>::iterator vit = grpIds.begin();
for(; vit != grpIds.end(); vit++)
{
/* 5% chance of checking it! */
if (RsRandom::f32() < 0.05)
{
/* try the cache! */
if (!haveKey(*vit))
{
std::list<PeerId> nullpeers;
requestKey(*vit, nullpeers);
}
else
{
RsTlvSecurityKey seckey;
if (getKey(*vit, seckey))
{
// success!
}
}
/* try private key too! */
if (!havePrivateKey(*vit))
{
requestPrivateKey(*vit);
}
else
{
RsTlvSecurityKey seckey;
if (getPrivateKey(*vit, seckey))
{
// success!
}
}
}
}
}
else
{
std::cerr << "p3IdService::cache_load_for_token() ERROR no data";
std::cerr << std::endl;
return false;
}
}
return true;
}
#endif
/************************************************************************************/
/************************************************************************************/
@ -678,9 +863,6 @@ void p3IdService::generateDummyData()
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// Temporarily disable - until system is properly tested.
return;
/* grab all the gpg ids... and make some ids */
std::list<std::string> gpgids;

View File

@ -34,7 +34,7 @@ RsWiki *rsWiki = NULL;
p3Wiki::p3Wiki(RsGeneralDataService* gds, RsNetworkExchangeService* nes)
:RsGenExchange(gds, nes, NULL, RS_SERVICE_GXSV1_TYPE_WIKI), RsWiki(this)
:RsGenExchange(gds, nes, new RsGxsWikiSerialiser(), RS_SERVICE_GXSV1_TYPE_WIKI), RsWiki(this)
{