moved p3distrib loading to background thread.

* Added Cache Queues to p3distrib.
 * Created p3ThreadedService class.
 * added code to launch these threads.
 * debug code to see loading.

Seems to be far too many called to gpg->verifySignature()... to look at.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@3998 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2011-01-30 01:37:59 +00:00
parent 1f9395058c
commit 71798f4e8a
4 changed files with 146 additions and 45 deletions

View File

@ -2843,6 +2843,21 @@ int RsServer::StartupRetroShare()
mBitDht->start();
#endif
// startup the p3distrib threads (for cache loading).
#ifndef MINIMAL_LIBRS
mForums->start();
mChannels->start();
#ifdef RS_USE_BLOGS
mBlogs->start();
#endif
#endif // MINIMAL_LIBRS
/**************************************************************************/
// create loopback device, and add to pqisslgrp.
SearchModule *mod = new SearchModule();

View File

@ -47,9 +47,11 @@
/*****
* #define DISTRIB_DEBUG 1
* #define DISTRIB_THREAD_DEBUG 1
****/
//#define DISTRIB_DEBUG 1
#define DISTRIB_THREAD_DEBUG 1
RSA *extractPublicKey(RsTlvSecurityKey &key);
RSA *extractPrivateKey(RsTlvSecurityKey &key);
@ -66,7 +68,7 @@ p3GroupDistrib::p3GroupDistrib(uint16_t subtype,
:CacheSource(subtype, true, cs, sourcedir),
CacheStore(subtype, true, cs, cft, storedir),
p3Config(configId), p3Service(subtype),
p3Config(configId), p3ThreadedService(subtype),
mStorePeriod(storePeriod),
mPubPeriod(pubPeriod),
mLastPublishTime(0),
@ -160,6 +162,123 @@ int p3GroupDistrib::tick()
/***************************************************************************************/
/***************************************************************************************/
int p3GroupDistrib::loadCache(const CacheData &data)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadCache()";
std::cerr << std::endl;
#endif
{
RsStackMutex stack(distribMtx);
#ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::loadCache() Storing PendingRemoteCache";
std::cerr << std::endl;
#endif
/* store the cache file for later processing */
mPendingRemoteCache.push_back(data);
}
if (data.size > 0)
{
CacheStore::lockData(); /***** LOCK ****/
locked_storeCacheEntry(data);
CacheStore::unlockData(); /***** UNLOCK ****/
}
return 1;
}
bool p3GroupDistrib::loadLocalCache(const CacheData &data)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadLocalCache()";
std::cerr << std::endl;
#endif
{
RsStackMutex stack(distribMtx);
#ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::loadCache() Storing PendingLocalCache";
std::cerr << std::endl;
#endif
/* store the cache file for later processing */
mPendingLocalCache.push_back(data);
}
if (data.size > 0)
{
refreshCache(data);
}
return true;
}
/* From RsThread */
void p3GroupDistrib::run() /* called once the thread is started */
{
#ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::run()";
std::cerr << std::endl;
#endif
while(1)
{
/* */
CacheData cache;
bool validCache = false;
bool isLocal = false;
{
RsStackMutex stack(distribMtx);
if (mPendingLocalCache.size() > 0)
{
cache = mPendingLocalCache.front();
mPendingLocalCache.pop_front();
validCache = true;
isLocal = true;
#ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::run() found pendingLocalCache";
std::cerr << std::endl;
#endif
}
else if (mPendingRemoteCache.size() > 0)
{
cache = mPendingRemoteCache.front();
mPendingRemoteCache.pop_front();
validCache = true;
isLocal = false;
#ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::run() found pendingRemoteCache";
std::cerr << std::endl;
#endif
}
}
if (validCache)
{
loadAnyCache(cache, isLocal);
usleep(1000);
}
else
{
sleep(1);
}
}
}
int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local)
{
/* if subtype = 1 -> FileGroup, else -> FileMsgs */
@ -185,43 +304,6 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local)
return true;
}
int p3GroupDistrib::loadCache(const CacheData &data)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadCache()";
std::cerr << std::endl;
#endif
loadAnyCache(data, false);
if (data.size > 0)
{
CacheStore::lockData(); /***** LOCK ****/
locked_storeCacheEntry(data);
CacheStore::unlockData(); /***** UNLOCK ****/
}
return 1;
}
bool p3GroupDistrib::loadLocalCache(const CacheData &data)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadLocalCache()";
std::cerr << std::endl;
#endif
loadAnyCache(data, true);
if (data.size > 0)
{
refreshCache(data);
}
return true;
}
/***************************************************************************************/
/***************************************************************************************/
/********************** load Cache Files ***************************************/

View File

@ -247,7 +247,7 @@ const uint32_t GRP_UNSUBSCRIBED = 0x0006;
* Group id is the public admin keys id
*
*/
class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, public p3Service
class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, public p3ThreadedService
{
public:
@ -268,8 +268,16 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
virtual bool loadLocalCache(const CacheData &data); /// overloaded from Cache Source
virtual int loadCache(const CacheData &data); /// overloaded from Cache Store
/* From RsThread */
virtual void run(); /* called once the thread is started */
private:
/* these lists are filled by the overloaded fns... then cleared by the thread */
std::list<CacheData> mPendingLocalCache;
std::list<CacheData> mPendingRemoteCache;
/* top level load */
int loadAnyCache(const CacheData &data, bool local);

View File

@ -125,14 +125,12 @@ virtual RsRawItem * send()
};
#if 0
class p3ThreadedService: public p3Service, public RsThread
{
protected:
p3ThreadedService(RsSerialiser *rss, uint32_t type)
:p3Service(rss, type) { return; }
p3ThreadedService(uint16_t type)
:p3Service(type) { return; }
public:
@ -142,8 +140,6 @@ virtual ~p3ThreadedService() { return; }
};
#endif
#endif // P3_GENERIC_SERVICE_HEADER