From 71798f4e8ac20487f0267ea25e80c0d15908f636 Mon Sep 17 00:00:00 2001 From: drbob Date: Sun, 30 Jan 2011 01:37:59 +0000 Subject: [PATCH] moved p3distrib loading to background thread. * Added Cache Queues to p3distrib. * Created p3ThreadedService class. * added code to launch these threads. * debug code to see loading. Seems to be far too many called to gpg->verifySignature()... to look at. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@3998 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/rsserver/rsinit.cc | 15 +++ libretroshare/src/services/p3distrib.cc | 158 ++++++++++++++++++------ libretroshare/src/services/p3distrib.h | 10 +- libretroshare/src/services/p3service.h | 8 +- 4 files changed, 146 insertions(+), 45 deletions(-) diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index d9646cb2e..a5a3582f3 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -2843,6 +2843,21 @@ int RsServer::StartupRetroShare() mBitDht->start(); #endif + // startup the p3distrib threads (for cache loading). +#ifndef MINIMAL_LIBRS + mForums->start(); + mChannels->start(); + +#ifdef RS_USE_BLOGS + mBlogs->start(); +#endif +#endif // MINIMAL_LIBRS + + /**************************************************************************/ + + + + // create loopback device, and add to pqisslgrp. SearchModule *mod = new SearchModule(); diff --git a/libretroshare/src/services/p3distrib.cc b/libretroshare/src/services/p3distrib.cc index 8e1a68ea1..e41fdfb0f 100644 --- a/libretroshare/src/services/p3distrib.cc +++ b/libretroshare/src/services/p3distrib.cc @@ -47,9 +47,11 @@ /***** * #define DISTRIB_DEBUG 1 + * #define DISTRIB_THREAD_DEBUG 1 ****/ //#define DISTRIB_DEBUG 1 +#define DISTRIB_THREAD_DEBUG 1 RSA *extractPublicKey(RsTlvSecurityKey &key); RSA *extractPrivateKey(RsTlvSecurityKey &key); @@ -66,7 +68,7 @@ p3GroupDistrib::p3GroupDistrib(uint16_t subtype, :CacheSource(subtype, true, cs, sourcedir), CacheStore(subtype, true, cs, cft, storedir), - p3Config(configId), p3Service(subtype), + p3Config(configId), p3ThreadedService(subtype), mStorePeriod(storePeriod), mPubPeriod(pubPeriod), mLastPublishTime(0), @@ -160,6 +162,123 @@ int p3GroupDistrib::tick() /***************************************************************************************/ /***************************************************************************************/ +int p3GroupDistrib::loadCache(const CacheData &data) +{ +#ifdef DISTRIB_DEBUG + std::cerr << "p3GroupDistrib::loadCache()"; + std::cerr << std::endl; +#endif + + { + RsStackMutex stack(distribMtx); + +#ifdef DISTRIB_THREAD_DEBUG + std::cerr << "p3GroupDistrib::loadCache() Storing PendingRemoteCache"; + std::cerr << std::endl; +#endif + /* store the cache file for later processing */ + mPendingRemoteCache.push_back(data); + } + + if (data.size > 0) + { + CacheStore::lockData(); /***** LOCK ****/ + locked_storeCacheEntry(data); + CacheStore::unlockData(); /***** UNLOCK ****/ + } + + return 1; +} + +bool p3GroupDistrib::loadLocalCache(const CacheData &data) +{ +#ifdef DISTRIB_DEBUG + std::cerr << "p3GroupDistrib::loadLocalCache()"; + std::cerr << std::endl; +#endif + + { + RsStackMutex stack(distribMtx); + +#ifdef DISTRIB_THREAD_DEBUG + std::cerr << "p3GroupDistrib::loadCache() Storing PendingLocalCache"; + std::cerr << std::endl; +#endif + + /* store the cache file for later processing */ + mPendingLocalCache.push_back(data); + } + + if (data.size > 0) + { + refreshCache(data); + } + + return true; +} + + + + /* From RsThread */ +void p3GroupDistrib::run() /* called once the thread is started */ +{ + +#ifdef DISTRIB_THREAD_DEBUG + std::cerr << "p3GroupDistrib::run()"; + std::cerr << std::endl; +#endif + + while(1) + { + /* */ + CacheData cache; + bool validCache = false; + bool isLocal = false; + { + RsStackMutex stack(distribMtx); + + if (mPendingLocalCache.size() > 0) + { + cache = mPendingLocalCache.front(); + mPendingLocalCache.pop_front(); + validCache = true; + isLocal = true; + +#ifdef DISTRIB_THREAD_DEBUG + std::cerr << "p3GroupDistrib::run() found pendingLocalCache"; + std::cerr << std::endl; +#endif + + } + else if (mPendingRemoteCache.size() > 0) + { + cache = mPendingRemoteCache.front(); + mPendingRemoteCache.pop_front(); + validCache = true; + isLocal = false; + +#ifdef DISTRIB_THREAD_DEBUG + std::cerr << "p3GroupDistrib::run() found pendingRemoteCache"; + std::cerr << std::endl; +#endif + + } + } + + if (validCache) + { + loadAnyCache(cache, isLocal); + usleep(1000); + } + else + { + sleep(1); + } + } +} + + + int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local) { /* if subtype = 1 -> FileGroup, else -> FileMsgs */ @@ -185,43 +304,6 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local) return true; } -int p3GroupDistrib::loadCache(const CacheData &data) -{ -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadCache()"; - std::cerr << std::endl; -#endif - - loadAnyCache(data, false); - - if (data.size > 0) - { - CacheStore::lockData(); /***** LOCK ****/ - locked_storeCacheEntry(data); - CacheStore::unlockData(); /***** UNLOCK ****/ - } - - return 1; -} - -bool p3GroupDistrib::loadLocalCache(const CacheData &data) -{ -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadLocalCache()"; - std::cerr << std::endl; -#endif - - loadAnyCache(data, true); - - if (data.size > 0) - { - refreshCache(data); - } - - return true; -} - - /***************************************************************************************/ /***************************************************************************************/ /********************** load Cache Files ***************************************/ diff --git a/libretroshare/src/services/p3distrib.h b/libretroshare/src/services/p3distrib.h index 13d8f511b..0528c060d 100644 --- a/libretroshare/src/services/p3distrib.h +++ b/libretroshare/src/services/p3distrib.h @@ -247,7 +247,7 @@ const uint32_t GRP_UNSUBSCRIBED = 0x0006; * Group id is the public admin keys id * */ -class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, public p3Service +class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, public p3ThreadedService { public: @@ -268,8 +268,16 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu virtual bool loadLocalCache(const CacheData &data); /// overloaded from Cache Source virtual int loadCache(const CacheData &data); /// overloaded from Cache Store + + /* From RsThread */ + virtual void run(); /* called once the thread is started */ + private: + /* these lists are filled by the overloaded fns... then cleared by the thread */ + std::list mPendingLocalCache; + std::list mPendingRemoteCache; + /* top level load */ int loadAnyCache(const CacheData &data, bool local); diff --git a/libretroshare/src/services/p3service.h b/libretroshare/src/services/p3service.h index 37357940c..8007ac378 100644 --- a/libretroshare/src/services/p3service.h +++ b/libretroshare/src/services/p3service.h @@ -125,14 +125,12 @@ virtual RsRawItem * send() }; -#if 0 - class p3ThreadedService: public p3Service, public RsThread { protected: - p3ThreadedService(RsSerialiser *rss, uint32_t type) - :p3Service(rss, type) { return; } + p3ThreadedService(uint16_t type) + :p3Service(type) { return; } public: @@ -142,8 +140,6 @@ virtual ~p3ThreadedService() { return; } }; -#endif - #endif // P3_GENERIC_SERVICE_HEADER