* Added isOnline to rspeers.h interface

Various improvements to get p3distrib Messages working:
 * Defensive programming in cachestrapper (forcing ownId on save).
 * Changed CONFIG_CACHE_ID to last -> to force correct loading order at startup
   This should be Config Files, Local Cache Files, then Remote Caches.
 * Hack to maintain existing Cache Files.
 * Improvments to forum_test.

Lots of work on p3distrib:
 * fixed nextCacheSubId.
 * Added new Msgs to Config Save.
 * Fixed Cache reload issues.
 * Overall: Enabled Forum Msg publication.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@611 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-06-16 20:37:48 +00:00
parent 0d6471433c
commit 098851592c
10 changed files with 298 additions and 54 deletions

View file

@ -51,17 +51,15 @@ p3GroupDistrib::p3GroupDistrib(uint16_t subtype,
CacheStore(subtype, true, cs, cft, storedir),
p3Config(configId), nullService(subtype),
mStorePeriod(storePeriod),
mPubPeriod(pubPeriod)
mPubPeriod(pubPeriod),
mLastPublishTime(0),
mMaxCacheSubId(1)
{
/* not much yet */
time_t now = time(NULL);
/* set this a little in the future -> so we can
* adjust the publish point if needed
*/
mNextPublishTime = now + mPubPeriod / 4;
mGroupsRepublish = false;
/* force publication of groups (cleared if local cache file found) */
mGroupsRepublish = true;
return;
}
@ -75,23 +73,20 @@ int p3GroupDistrib::tick()
std::cerr << std::endl;
#endif
#if 0
time_t now = time(NULL);
bool toPublish;
{
RsStackMutex stack(distribMtx); /**** STACK LOCKED MUTEX ****/
toPublish = (now > mNextPublishTime);
toPublish = (mPendingPublish.size() > 0) && (now > mPubPeriod + mLastPublishTime);
}
if (toPublish)
{
RsStackMutex stack(distribMtx); /**** STACK LOCKED MUTEX ****/
locked_publishPendingMsgs();
mNextPublishTime = now + mPubPeriod;
locked_publishPendingMsgs(); /* flags taken care of in here */
}
#endif
bool toPublishGroups;
{
@ -121,7 +116,7 @@ int p3GroupDistrib::tick()
int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local)
{
/* if subtype = 0 -> FileGroup, else -> FileMsgs */
/* if subtype = 1 -> FileGroup, else -> FileMsgs */
std::string file = data.path;
file += "/";
@ -133,13 +128,13 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local)
std::cerr << "Cid: " << data.cid.type << ":" << data.cid.subid << std::endl;
#endif
if (data.cid.subid == 0)
if (data.cid.subid == 1)
{
loadFileGroups(file, data.pid, local);
}
else
{
loadFileMsgs(file, data.cid.subid, data.pid, local);
loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local);
}
return true;
}
@ -151,7 +146,16 @@ int p3GroupDistrib::loadCache(const CacheData &data)
std::cerr << std::endl;
#endif
return loadAnyCache(data, false);
loadAnyCache(data, false);
if (data.size > 0)
{
CacheStore::lockData(); /***** LOCK ****/
locked_storeCacheEntry(data);
CacheStore::unlockData(); /***** UNLOCK ****/
}
return 1;
}
bool p3GroupDistrib::loadLocalCache(const CacheData &data)
@ -161,7 +165,14 @@ bool p3GroupDistrib::loadLocalCache(const CacheData &data)
std::cerr << std::endl;
#endif
return loadAnyCache(data, true);
loadAnyCache(data, true);
if (data.size > 0)
{
refreshCache(data);
}
return true;
}
@ -222,11 +233,20 @@ void p3GroupDistrib::loadFileGroups(std::string filename, std::string src, bool
delete streamer;
/* clear publication of groups if local cache file found */
RsStackMutex stack(distribMtx); /******* STACK LOCKED MUTEX ***********/
if (local)
{
mGroupsRepublish = false;
}
return;
}
void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std::string src, bool local)
void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std::string src, uint32_t ts, bool local)
{
#ifdef DISTRIB_DEBUG
@ -271,6 +291,39 @@ void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std
streamer->tick();
}
if (local)
{
/* now we create a map of time -> subid
* This is used to determine the newest and the oldest items
*/
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Updating Local TimeStamps";
std::cerr << std::endl;
std::cerr << "p3GroupDistrib::loadFileMsgs() CacheSubId: " << cacheSubId << " recvd: " << ts;
std::cerr << std::endl;
#endif
mLocalCacheTs[ts] = cacheSubId;
if (cacheSubId > mMaxCacheSubId)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() New Max CacheSubId";
std::cerr << std::endl;
#endif
mMaxCacheSubId = cacheSubId;
}
if ((ts < now) && (ts > mLastPublishTime))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() New LastPublishTime";
std::cerr << std::endl;
#endif
mLastPublishTime = ts;
}
}
delete streamer;
return;
}
@ -532,6 +585,7 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l
std::cerr << "p3GroupDistrib::loadMsg() unpack failed" << std::endl;
std::cerr << std::endl;
#endif
delete newMsg;
return;
}
@ -541,6 +595,7 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l
std::cerr << "p3GroupDistrib::loadMsg() check failed" << std::endl;
std::cerr << std::endl;
#endif
delete newMsg;
delete msg;
return;
}
@ -554,9 +609,22 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l
#endif
/* else if group = subscribed | listener -> publish */
if (git->second.flags & (RS_DISTRIB_SUBSCRIBED))
/* if it has come from us... then it has been published already */
if ((!local) && (git->second.flags & (RS_DISTRIB_SUBSCRIBED)))
{
locked_toPublishMsg(msg);
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadMsg() To be Published!";
std::cerr << std::endl;
#endif
locked_toPublishMsg(newMsg);
}
else
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadMsg() Deleted Original Msg (No Publish)";
std::cerr << std::endl;
#endif
delete newMsg;
}
locked_notifyGroupChanged(git->second);
@ -568,24 +636,76 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l
/****************** create/mod Cache Content **********************************/
/***************************************************************************************/
/***************************************************************************************/
uint16_t p3GroupDistrib::determineCacheSubId()
void p3GroupDistrib::locked_toPublishMsg(RsDistribSignedMsg *msg)
{
return 1;
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_toPublishMsg() Adding to PendingPublish List";
std::cerr << std::endl;
#endif
mPendingPublish.push_back(msg);
if (msg->PeerId() == mOwnId)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_toPublishMsg() Local -> ConfigSave Requested";
std::cerr << std::endl;
#endif
/* we need to trigger Configuration save */
IndicateConfigChanged(); /**** INDICATE CONFIG CHANGED! *****/
}
}
void p3GroupDistrib::locked_toPublishMsg(RsDistribMsg *msg)
uint16_t p3GroupDistrib::locked_determineCacheSubId()
{
mPendingPublish.push_back(msg);
/* if oldest cache is previous to StorePeriod - use that */
time_t now = time(NULL);
uint16_t id = 1;
uint32_t oldest = now;
if (mLocalCacheTs.size() > 0)
{
oldest = mLocalCacheTs.begin()->first;
}
if (oldest < now - mStorePeriod)
{
/* clear it out, return id */
id = mLocalCacheTs.begin()->second;
mLocalCacheTs.erase(mLocalCacheTs.begin());
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_determineCacheSubId() Replacing Old CacheId: " << id;
std::cerr << std::endl;
#endif
return id;
}
mMaxCacheSubId++;
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_determineCacheSubId() Returning new Id: " << mMaxCacheSubId;
std::cerr << std::endl;
#endif
/* else return maximum */
return mMaxCacheSubId;
}
void p3GroupDistrib::locked_publishPendingMsgs()
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_publishPendingMsgs()";
std::cerr << std::endl;
#endif
/* get the next message id */
CacheData newCache;
time_t now = time(NULL);
newCache.pid = mOwnId;
newCache.cid.type = CacheSource::getCacheType();
newCache.cid.subid = determineCacheSubId(); // NOT fixed - should rotate.
newCache.cid.subid = locked_determineCacheSubId();
/* create filename */
std::string path = CacheSource::getCacheDir();
@ -597,20 +717,34 @@ void p3GroupDistrib::locked_publishPendingMsgs()
BinInterface *bio = new BinFileInterface(filename.c_str(),
BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA);
pqistreamer *streamer = createStreamer(bio, mOwnId,
BIN_FLAGS_NO_DELETE);
pqistreamer *streamer = createStreamer(bio, mOwnId, 0); /* messages are deleted! */
std::list<RsDistribMsg *>::iterator it;
bool resave = false;
std::list<RsDistribSignedMsg *>::iterator it;
for(it = mPendingPublish.begin(); it != mPendingPublish.end(); it++)
{
streamer->SendItem(*it); /* doesnt delete it */
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_publishPendingMsgs() Publishing:";
std::cerr << std::endl;
(*it)->print(std::cerr, 10);
std::cerr << std::endl;
#endif
if ((*it)->PeerId() == mOwnId)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_publishPendingMsgs() Own Publish";
std::cerr << std::endl;
#endif
resave = true;
}
streamer->SendItem(*it); /* deletes it */
streamer->tick();
}
/* cleanup */
mPendingPublish.clear();
delete streamer;
streamer->tick(); /* once more for good luck! */
/* Extract File Information from pqistreamer */
newCache.path = path;
@ -618,13 +752,27 @@ void p3GroupDistrib::locked_publishPendingMsgs()
newCache.hash = bio->gethash();
newCache.size = bio->bytecount();
newCache.recvd = time(NULL);
newCache.recvd = now;
/* cleanup */
mPendingPublish.clear();
delete streamer;
/* indicate not to save for a while */
mLastPublishTime = now;
/* push file to CacheSource */
refreshCache(newCache);
/* flag to store config (saying we've published messages) */
IndicateConfigChanged(); /**** INDICATE CONFIG CHANGED! *****/
if (resave)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::locked_publishPendingMsgs() Indicate Save Data Changed";
std::cerr << std::endl;
#endif
/* flag to store config (saying we've published messages) */
IndicateConfigChanged(); /**** INDICATE CONFIG CHANGED! *****/
}
}
@ -635,12 +783,12 @@ void p3GroupDistrib::publishDistribGroups()
std::cerr << std::endl;
#endif
/* set subid = 0 */
/* set subid = 1 */
CacheData newCache;
newCache.pid = mOwnId;
newCache.cid.type = CacheSource::getCacheType();
newCache.cid.subid = 0;
newCache.cid.subid = 1;
/* create filename */
std::string path = CacheSource::getCacheDir();
@ -1086,6 +1234,15 @@ std::list<RsItem *> p3GroupDistrib::saveList(bool &cleanup)
}
std::list<RsDistribSignedMsg *>::iterator mit;
for(mit = mPendingPublish.begin(); mit != mPendingPublish.end(); mit++)
{
if ((*mit)->PeerId() == mOwnId)
{
saveData.push_back(*mit);
}
}
return saveData;
}
@ -1113,6 +1270,7 @@ bool p3GroupDistrib::loadList(std::list<RsItem *> load)
RsDistribGrp *newGrp = NULL;
RsDistribGrpKey *newKey = NULL;
RsDistribSignedMsg *newMsg = NULL;
if ((newGrp = dynamic_cast<RsDistribGrp *>(*lit)))
{
@ -1134,6 +1292,11 @@ bool p3GroupDistrib::loadList(std::list<RsItem *> load)
{
loadGroupKey(newKey);
}
else if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(*lit)))
{
newMsg->PeerId(mOwnId);
loadMsg(newMsg, mOwnId, false); /* false so it'll pushed to PendingPublish list */
}
}
/* no need to republish until something new comes in */
@ -1545,8 +1708,12 @@ std::string p3GroupDistrib::publishMsg(RsDistribMsg *msg, bool personalSign)
std::cerr << std::endl;
#endif
/* load proper */
loadMsg(signedMsg, "ownId", true);
/* load proper -
* If we pretend it is coming from an alternative source
* it'll automatically get published with other msgs
*/
signedMsg->PeerId(mOwnId);
loadMsg(signedMsg, mOwnId, false);
/* done */
return msgId;
@ -2262,14 +2429,20 @@ RsDistribMsg *p3GroupDistrib::unpackDistribSignedMsg(RsDistribSignedMsg *newMsg)
/* transfer data that is not in the serialiser */
distribMsg->msgId = newMsg->msgId;
distribMsg->publishSignature = newMsg->publishSignature;
distribMsg->personalSignature = newMsg->personalSignature;
newMsg->publishSignature.ShallowClear();
newMsg->personalSignature.ShallowClear();
/* Full copies required ? */
distribMsg->publishSignature.keyId = newMsg->publishSignature.keyId;
distribMsg->publishSignature.signData.setBinData(
newMsg->publishSignature.signData.bin_data,
newMsg->publishSignature.signData.bin_len);
distribMsg->personalSignature.keyId = newMsg->personalSignature.keyId;
distribMsg->personalSignature.signData.setBinData(
newMsg->personalSignature.signData.bin_data,
newMsg->personalSignature.signData.bin_len);
}
delete newMsg;
delete serialType;
return distribMsg;