Added Identity caching to Identity Service.

- various p3IdService::cache_... fns.
 - moved duplicated print functions to GXS code.
 - restored compilation of VEG services.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5705 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-10-21 19:15:46 +00:00
parent 85cd85fec6
commit dcb64f6631
6 changed files with 431 additions and 4 deletions

View File

@ -10,7 +10,7 @@ CONFIG += test_voip
# GXS Stuff.
CONFIG += newcache
#CONFIG += newservices
CONFIG += newservices
# Beware: All data of the stripped services are lost
DEFINES *= PQI_DISABLE_TUNNEL

View File

@ -1808,7 +1808,7 @@ RsTurtle *rsTurtle = NULL ;
#include "services/p3blogs.h"
#include "turtle/p3turtle.h"
//#define ENABLE_GXS_SERVICES 1
#define ENABLE_GXS_SERVICES 1
#define ENABLE_GXS_CORE 1
#ifdef ENABLE_GXS_CORE

View File

@ -47,13 +47,19 @@
std::ostream &operator<<(std::ostream &out, const RsGroupMetaData &meta)
{
out << "RsGroupMetaData(TODO PRINT)";
out << "[ GroupId: " << meta.mGroupId << " Name: " << meta.mGroupName << " ]";
return out;
}
std::ostream &operator<<(std::ostream &out, const RsMsgMetaData &meta)
{
out << "RsMsgMetaData(TODO PRINT)";
out << "[ GroupId: " << meta.mGroupId << " MsgId: " << meta.mMsgId;
out << " Name: " << meta.mMsgName;
out << " OrigMsgId: " << meta.mOrigMsgId;
out << " ThreadId: " << meta.mThreadId;
out << " ParentId: " << meta.mParentId;
out << " AuthorId: " << meta.mAuthorId;
out << " Name: " << meta.mMsgName << " ]";
return out;
}

View File

@ -1443,6 +1443,7 @@ bool p3GxsDataServiceVEG::fakeprocessrequests()
return true;
}
#if 0 // DISABLED AND MOVED TO GXS CODE.
std::ostream &operator<<(std::ostream &out, const RsGroupMetaData &meta)
{
@ -1462,4 +1463,5 @@ std::ostream &operator<<(std::ostream &out, const RsMsgMetaData &meta)
return out;
}
#endif

View File

@ -64,6 +64,8 @@ int p3IdService::internal_tick()
// Disable for now.
// background_tick();
cache_tick();
return 0;
}
@ -226,6 +228,365 @@ bool p3IdService::createMsg(uint32_t& token, RsGxsIdOpinion &opinion)
return true;
}
/************************************************************************************/
/************************************************************************************/
/************************************************************************************/
/* Cache of recently used keys
*
* It is expensive to fetch the keys, so we want to keep them around if possible.
* It only stores the immutable stuff.
*
* This is probably crude and crap to start with.
* Want Least Recently Used (LRU) discard policy, without having to search whole cache.
* Use two maps:
* - CacheMap[key] => data.
* - LRUMultiMap[AccessTS] => key
*
* NOTE: This could be moved to a seperate class and templated to make generic
* as it might be generally useful.
*
*/
bool p3IdService::cache_is_loaded(const RsGxsId &id)
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
std::map<RsGxsId, RsGxsIdCache>::iterator it;
it = mCacheDataMap.find(id);
if (it == mCacheDataMap.end())
{
return false;
}
return true;
}
bool p3IdService::cache_fetch(const RsGxsId &id, RsGxsIdCache &data)
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
std::map<RsGxsId, RsGxsIdCache>::iterator it;
it = mCacheDataMap.find(id);
if (it == mCacheDataMap.end())
{
return false;
}
data = it->second;
/* update ts on data */
time_t old_ts = it->second.lastUsedTs;
time_t new_ts = time(NULL);
it->second.lastUsedTs = new_ts;
locked_cache_update_lrumap(id, old_ts, new_ts);
return true;
}
bool p3IdService::cache_store(const RsGxsIdGroup &group)
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// Create Cache Data.
RsGxsIdCache cache(group);
// For consistency
std::map<RsGxsId, RsGxsIdCache>::iterator it;
it = mCacheDataMap.find(cache.id);
if (it != mCacheDataMap.end())
{
// ERROR.
return false;
}
mCacheDataMap[cache.id] = cache;
mCacheDataCount++;
/* add new lrumap entry */
time_t old_ts = 0;
time_t new_ts = time(NULL);
it->second.lastUsedTs = new_ts;
locked_cache_update_lrumap(cache.id, old_ts, new_ts);
return true;
}
bool p3IdService::locked_cache_update_lrumap(const RsGxsId &key, time_t old_ts, time_t new_ts)
{
if (old_ts == 0)
{
LruData data;
data.key = key;
/* new insertion */
mCacheLruMap.insert(std::make_pair(new_ts, data));
return true;
}
/* find old entry */
std::multimap<time_t, LruData>::iterator mit;
std::multimap<time_t, LruData>::iterator sit = mCacheLruMap.lower_bound(old_ts);
std::multimap<time_t, LruData>::iterator eit = mCacheLruMap.upper_bound(old_ts);
for(mit = sit; mit != eit; mit++)
{
if (mit->second.key == key)
{
LruData data = mit->second;
mCacheLruMap.erase(mit);
if (new_ts != 0) // == 0, means remove.
{
mCacheLruMap.insert(std::make_pair(new_ts, data));
}
return true;
}
}
return false;
}
bool p3IdService::cache_resize()
{
int count_to_clear = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// consistency check.
if ((mCacheDataMap.size() != mCacheDataCount) ||
(mCacheLruMap.size() != mCacheDataCount))
{
// ERROR.
}
if (mCacheDataCount > MAX_CACHE_SIZE)
{
count_to_clear = mCacheDataCount - MAX_CACHE_SIZE;
}
}
if (count_to_clear > 0)
{
cache_discard_LRU(count_to_clear);
}
return true;
}
bool p3IdService::cache_discard_LRU(int count_to_clear)
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
while(count_to_clear > 0)
{
std::multimap<time_t, LruData>::iterator mit = mCacheLruMap.begin();
if (mit != mCacheLruMap.end())
{
LruData data = mit->second;
mCacheLruMap.erase(mit);
/* now clear from real cache */
std::map<RsGxsId, RsGxsIdCache>::iterator it;
it = mCacheDataMap.find(data.key);
if (it == mCacheDataMap.end())
{
// ERROR
std::cerr << "p3IdService::cache_discard_LRU(): ERROR Missing key: " << data.key;
std::cerr << std::endl;
return false;
}
else
{
mCacheDataMap.erase(it);
mCacheDataCount--;
}
}
else
{
// No More Data, ERROR.
std::cerr << "p3IdService::cache_discard_LRU(): INFO more more cache data";
std::cerr << std::endl;
return true;
}
count_to_clear--;
}
return true;
}
/***** BELOW LOADS THE CACHE FROM GXS DATASTORE *****/
#define MIN_CYCLE_GAP 2
int p3IdService::cache_tick()
{
std::cerr << "p3IdService::cache_tick()";
std::cerr << std::endl;
// Run Background Stuff.
background_checkTokenRequest();
/* every minute - run a background check */
time_t now = time(NULL);
bool doCycle = false;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (now - mCacheLoad_LastCycle > MIN_CYCLE_GAP)
{
doCycle = true;
mCacheLoad_LastCycle = now;
}
}
if (doCycle)
{
cache_start_load();
}
cache_check_loading();
return 0;
}
bool p3IdService::cache_request_load(const RsGxsId &id)
{
bool start = false;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheLoad_ToCache.push_back(id);
if (time(NULL) - mCacheLoad_LastCycle > MIN_CYCLE_GAP)
{
start = true;
}
}
if (start)
cache_start_load();
return true;
}
bool p3IdService::cache_start_load()
{
/* trigger request to load missing ids into cache */
std::list<RsGxsGroupId> groupIds;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheLoad_LastCycle = time(NULL);
mCacheLoad_Status = 1;
/* now we process the modGroupList -> a map so we can use it easily later, and create id list too */
std::list<RsGxsId>::iterator it;
for(it = mCacheLoad_ToCache.begin(); it != mCacheLoad_ToCache.end(); it++)
{
groupIds.push_back(*it); // might need conversion?
}
mCacheLoad_ToCache.clear();
}
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
RsTokReqOptionsV2 opts;
uint32_t token = 0;
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mCacheLoad_Tokens.push_back(token);
}
return 1;
}
bool p3IdService::cache_check_loading()
{
/* check the status of all active tokens */
std::list<uint32_t> toload;
std::list<uint32_t>::iterator it;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
for(it = mCacheLoad_Tokens.begin(); it != mCacheLoad_Tokens.end();)
{
uint32_t token = *it;
uint32_t status = RsGenExchange::getTokenService()->requestStatus(token);
//checkRequestStatus(token, status, reqtype, anstype, ts);
if (status == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE)
{
it = mCacheLoad_Tokens.erase(it);
toload.push_back(token);
}
else
{
it++;
}
}
}
for(it = toload.begin(); it != toload.end(); it++)
{
cache_load_for_token(*it);
}
return 1;
}
bool p3IdService::cache_load_for_token(uint32_t token)
{
std::cerr << "p3IdService::cache_load_for_token() : " << token;
std::cerr << std::endl;
std::vector<RsGxsIdGroup> groups;
std::vector<RsGxsIdGroup>::iterator vit;
if (!getGroupData(token, groups))
{
std::cerr << "p3IdService::cache_load_for_token() ERROR no data";
std::cerr << std::endl;
return false;
}
for(vit = groups.begin(); vit != groups.end(); vit++)
{
RsGxsIdGroup &group = *vit;
std::cerr << "p3IdService::cache_load_for_token() Loaded Id with Meta: ";
std::cerr << group.mMeta;
std::cerr << std::endl;
/* cache the data */
cache_store(group);
}
/* drop old entries */
cache_resize();
return true;
}
bool p3IdService::cache_check_consistency()
{
return false;
}
/************************************************************************************/
/************************************************************************************/
/************************************************************************************/

View File

@ -71,6 +71,32 @@ public:
#define MAX_CACHE_SIZE 100 // Small for testing..
//#define MAX_CACHE_SIZE 10000 // More useful size
class RsGxsIdCache
{
public:
RsGxsIdCache();
RsGxsIdCache(const RsGxsIdGroup &group);
RsGxsId id;
std::string name;
RsTlvSecurityKey pubkey;
double reputation;
time_t lastUsedTs;
};
class LruData
{
public:
RsGxsId key;
};
// Not sure exactly what should be inherited here?
// Chris - please correct as necessary.
@ -129,6 +155,38 @@ virtual bool getReputation(const RsGxsId &id, const GixsReputation &rep);
private:
/************************************************************************
* This is the Cache for minimising calls to the DataStore.
*
*/
int cache_tick();
bool cache_is_loaded(const RsGxsId &id);
bool cache_fetch(const RsGxsId &key, RsGxsIdCache &data);
bool cache_store(const RsGxsIdGroup &group);
bool cache_resize();
bool cache_discard_LRU(int count_to_clear);
bool cache_request_load(const RsGxsId &id);
bool cache_start_load();
bool cache_check_loading();
bool cache_load_for_token(uint32_t token);
bool cache_check_consistency();
/* MUTEX PROTECTED DATA (mIdMtx - maybe should use a 2nd?) */
bool locked_cache_update_lrumap(const RsGxsId &key, time_t old_ts, time_t new_ts);
std::map<RsGxsId, RsGxsIdCache> mCacheDataMap;
std::multimap<time_t, LruData> mCacheLruMap;
uint32_t mCacheDataCount;
time_t mCacheLoad_LastCycle;
int mCacheLoad_Status;
std::list<RsGxsId> mCacheLoad_ToCache;
std::list<uint32_t> mCacheLoad_Tokens;
/************************************************************************
* Below is the background task for processing opinions => reputations
*