mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-08-13 00:25:48 -04:00
Fixed the FriendFeed backup of messages:
* switched to one universal PendingCache list. * added mHistoricalCaches variable to p3distrib - to indicate when old caches have been loaded. * added calls to p3GroupDistrib::HistoricalCachesDone() in rsinit.cc * added "historical" parameter to lots of p3distrib functions. * updated child classes to only add FeedItems if (historical == false). * Switched Validate / Duplicate Msg checks to speed up historical data load. * corrected rsrandom function for OSX. * bugfix to rsloginhandler function. (compile error). git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4008 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
8c35ecdc67
commit
0dcef10ec2
11 changed files with 198 additions and 112 deletions
|
@ -69,6 +69,7 @@ p3GroupDistrib::p3GroupDistrib(uint16_t subtype,
|
|||
:CacheSource(subtype, true, cs, sourcedir),
|
||||
CacheStore(subtype, true, cs, cft, storedir),
|
||||
p3Config(configId), p3ThreadedService(subtype),
|
||||
mHistoricalCaches(true),
|
||||
mStorePeriod(storePeriod),
|
||||
mPubPeriod(pubPeriod),
|
||||
mLastPublishTime(0),
|
||||
|
@ -177,7 +178,7 @@ int p3GroupDistrib::loadCache(const CacheData &data)
|
|||
std::cerr << std::endl;
|
||||
#endif
|
||||
/* store the cache file for later processing */
|
||||
mPendingRemoteCache.push_back(data);
|
||||
mPendingCaches.push_back(CacheDataPending(data, false, mHistoricalCaches));
|
||||
}
|
||||
|
||||
if (data.size > 0)
|
||||
|
@ -206,7 +207,7 @@ bool p3GroupDistrib::loadLocalCache(const CacheData &data)
|
|||
#endif
|
||||
|
||||
/* store the cache file for later processing */
|
||||
mPendingLocalCache.push_back(data);
|
||||
mPendingCaches.push_back(CacheDataPending(data, true, mHistoricalCaches));
|
||||
}
|
||||
|
||||
if (data.size > 0)
|
||||
|
@ -218,6 +219,18 @@ bool p3GroupDistrib::loadLocalCache(const CacheData &data)
|
|||
}
|
||||
|
||||
|
||||
/* Handle the Cache Pending Setup */
|
||||
CacheDataPending::CacheDataPending(const CacheData &data, bool local, bool historical)
|
||||
:mData(data), mLocal(local), mHistorical(historical)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
void p3GroupDistrib::HistoricalCachesDone()
|
||||
{
|
||||
RsStackMutex stack(distribMtx);
|
||||
mHistoricalCaches = false; // called when Stored Caches have been added to Pending List.
|
||||
}
|
||||
|
||||
/* From RsThread */
|
||||
void p3GroupDistrib::run() /* called once the thread is started */
|
||||
|
@ -234,31 +247,22 @@ void p3GroupDistrib::run() /* called once the thread is started */
|
|||
CacheData cache;
|
||||
bool validCache = false;
|
||||
bool isLocal = false;
|
||||
bool isHistorical = false;
|
||||
{
|
||||
RsStackMutex stack(distribMtx);
|
||||
|
||||
if (mPendingLocalCache.size() > 0)
|
||||
if (mPendingCaches.size() > 0)
|
||||
{
|
||||
cache = mPendingLocalCache.front();
|
||||
mPendingLocalCache.pop_front();
|
||||
CacheDataPending &pendingCache = mPendingCaches.front();
|
||||
cache = pendingCache.mData;
|
||||
isLocal = pendingCache.mLocal;
|
||||
isHistorical = pendingCache.mHistorical;
|
||||
|
||||
validCache = true;
|
||||
isLocal = true;
|
||||
mPendingCaches.pop_front();
|
||||
|
||||
#ifdef DISTRIB_THREAD_DEBUG
|
||||
std::cerr << "p3GroupDistrib::run() found pendingLocalCache";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
}
|
||||
else if (mPendingRemoteCache.size() > 0)
|
||||
{
|
||||
cache = mPendingRemoteCache.front();
|
||||
mPendingRemoteCache.pop_front();
|
||||
validCache = true;
|
||||
isLocal = false;
|
||||
|
||||
#ifdef DISTRIB_THREAD_DEBUG
|
||||
std::cerr << "p3GroupDistrib::run() found pendingRemoteCache";
|
||||
std::cerr << "p3GroupDistrib::run() found pendingCache";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
|
@ -267,7 +271,7 @@ void p3GroupDistrib::run() /* called once the thread is started */
|
|||
|
||||
if (validCache)
|
||||
{
|
||||
loadAnyCache(cache, isLocal);
|
||||
loadAnyCache(cache, isLocal, isHistorical);
|
||||
|
||||
#ifndef WINDOWS_SYS
|
||||
usleep(1000);
|
||||
|
@ -288,7 +292,7 @@ void p3GroupDistrib::run() /* called once the thread is started */
|
|||
|
||||
|
||||
|
||||
int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local)
|
||||
int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local, bool historical)
|
||||
{
|
||||
/* if subtype = 1 -> FileGroup, else -> FileMsgs */
|
||||
|
||||
|
@ -304,11 +308,11 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local)
|
|||
|
||||
if (data.cid.subid == 1)
|
||||
{
|
||||
loadFileGroups(file, data.pid, local);
|
||||
loadFileGroups(file, data.pid, local, historical);
|
||||
}
|
||||
else
|
||||
{
|
||||
loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local);
|
||||
loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -323,7 +327,7 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local)
|
|||
/* No need for special treatment for 'own' groups.
|
||||
* configuration should be loaded before cache files.
|
||||
*/
|
||||
void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::string &src, bool local)
|
||||
void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::string &src, bool local, bool historical)
|
||||
{
|
||||
#ifdef DISTRIB_DEBUG
|
||||
std::cerr << "p3GroupDistrib::loadFileGroups()";
|
||||
|
@ -354,11 +358,11 @@ void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::stri
|
|||
newKey = dynamic_cast<RsDistribGrpKey *>(item);
|
||||
if ((newGrp = dynamic_cast<RsDistribGrp *>(item)))
|
||||
{
|
||||
loadGroup(newGrp);
|
||||
loadGroup(newGrp, historical);
|
||||
}
|
||||
else if ((newKey = dynamic_cast<RsDistribGrpKey *>(item)))
|
||||
{
|
||||
loadGroupKey(newKey);
|
||||
loadGroupKey(newKey, historical);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -384,7 +388,7 @@ void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::stri
|
|||
}
|
||||
|
||||
|
||||
void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local)
|
||||
void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical)
|
||||
{
|
||||
|
||||
#ifdef DISTRIB_DEBUG
|
||||
|
@ -418,7 +422,7 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
|
|||
|
||||
if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item)))
|
||||
{
|
||||
loadMsg(newMsg, src, local);
|
||||
loadMsg(newMsg, src, local, historical);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -475,7 +479,7 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
|
|||
/***************************************************************************************/
|
||||
|
||||
|
||||
void p3GroupDistrib::loadGroup(RsDistribGrp *newGrp)
|
||||
void p3GroupDistrib::loadGroup(RsDistribGrp *newGrp, bool historical)
|
||||
{
|
||||
/* load groupInfo */
|
||||
const std::string &gid = newGrp -> grpId;
|
||||
|
@ -574,9 +578,9 @@ void p3GroupDistrib::loadGroup(RsDistribGrp *newGrp)
|
|||
/* Callback for any derived classes */
|
||||
|
||||
if (isNew)
|
||||
locked_notifyGroupChanged(it->second, GRP_NEW_UPDATE);
|
||||
locked_notifyGroupChanged(it->second, GRP_NEW_UPDATE, historical);
|
||||
else
|
||||
locked_notifyGroupChanged(it->second, GRP_UPDATE);
|
||||
locked_notifyGroupChanged(it->second, GRP_UPDATE, historical);
|
||||
}
|
||||
|
||||
#ifdef DISTRIB_DEBUG
|
||||
|
@ -586,7 +590,7 @@ void p3GroupDistrib::loadGroup(RsDistribGrp *newGrp)
|
|||
}
|
||||
|
||||
|
||||
bool p3GroupDistrib::loadGroupKey(RsDistribGrpKey *newKey)
|
||||
bool p3GroupDistrib::loadGroupKey(RsDistribGrpKey *newKey, bool historical)
|
||||
{
|
||||
/* load Key */
|
||||
const std::string &gid = newKey -> grpId;
|
||||
|
@ -659,7 +663,7 @@ bool p3GroupDistrib::loadGroupKey(RsDistribGrpKey *newKey)
|
|||
}
|
||||
|
||||
if (updateOk)
|
||||
locked_notifyGroupChanged(it->second, GRP_LOAD_KEY);
|
||||
locked_notifyGroupChanged(it->second, GRP_LOAD_KEY, historical);
|
||||
|
||||
|
||||
|
||||
|
@ -676,7 +680,7 @@ bool p3GroupDistrib::loadGroupKey(RsDistribGrpKey *newKey)
|
|||
}
|
||||
|
||||
|
||||
void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local)
|
||||
void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local, bool historical)
|
||||
{
|
||||
/****************** check the msg ******************/
|
||||
/* Do the most likely checks to fail first....
|
||||
|
@ -713,22 +717,12 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
|
|||
return;
|
||||
}
|
||||
|
||||
/****************** check the msg ******************/
|
||||
if (!locked_validateDistribSignedMsg(git->second, newMsg))
|
||||
{
|
||||
#ifdef DISTRIB_DEBUG
|
||||
std::cerr << "p3GroupDistrib::loadMsg() validate failed" << std::endl;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
delete newMsg;
|
||||
return;
|
||||
}
|
||||
|
||||
/* check for duplicate message
|
||||
*
|
||||
* do this after validate - because we are calling
|
||||
* duplicateMsg... only want to do if is good.
|
||||
/****************** check the msg ******************/
|
||||
/* check for duplicate message, do this first to ensure minimal signature validations.
|
||||
* therefore, duplicateMsg... could potentially be called on a dodgey msg (not a big problem!)
|
||||
*/
|
||||
|
||||
std::map<std::string, RsDistribMsg *>::iterator mit;
|
||||
mit = (git->second).msgs.find(newMsg->msgId);
|
||||
if (mit != (git->second).msgs.end())
|
||||
|
@ -738,7 +732,18 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
|
|||
std::cerr << std::endl;
|
||||
#endif
|
||||
/* if already there -> remove */
|
||||
locked_eventDuplicateMsg(&(git->second), mit->second, src);
|
||||
locked_eventDuplicateMsg(&(git->second), mit->second, src, historical);
|
||||
delete newMsg;
|
||||
return;
|
||||
}
|
||||
|
||||
/* if unique (new) msg - do validation */
|
||||
if (!locked_validateDistribSignedMsg(git->second, newMsg))
|
||||
{
|
||||
#ifdef DISTRIB_DEBUG
|
||||
std::cerr << "p3GroupDistrib::loadMsg() validate failed" << std::endl;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
delete newMsg;
|
||||
return;
|
||||
}
|
||||
|
@ -805,7 +810,7 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
|
|||
#endif
|
||||
|
||||
/* Callback for any derived classes to play with */
|
||||
locked_eventNewMsg(&(git->second), msg, src);
|
||||
locked_eventNewMsg(&(git->second), msg, src, historical);
|
||||
|
||||
/* else if group = subscribed | listener -> publish */
|
||||
/* if it has come from us... then it has been published already */
|
||||
|
@ -836,7 +841,7 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
|
|||
#endif
|
||||
delete newMsg;
|
||||
}
|
||||
locked_notifyGroupChanged(git->second, GRP_NEW_MSG);
|
||||
locked_notifyGroupChanged(git->second, GRP_NEW_MSG, historical);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1330,7 +1335,7 @@ bool p3GroupDistrib::subscribeToGroup(const std::string &grpId, bool subscrib
|
|||
{
|
||||
git->second.flags |= RS_DISTRIB_SUBSCRIBED;
|
||||
|
||||
locked_notifyGroupChanged(git->second, GRP_SUBSCRIBED);
|
||||
locked_notifyGroupChanged(git->second, GRP_SUBSCRIBED, false);
|
||||
mGroupsRepublish = true;
|
||||
|
||||
/* reprocess groups messages .... so actions can be taken (by inherited)
|
||||
|
@ -1350,7 +1355,7 @@ bool p3GroupDistrib::subscribeToGroup(const std::string &grpId, bool subscrib
|
|||
pit != git->second.sources.end(); pit++)
|
||||
{
|
||||
if(*pit != mOwnId)
|
||||
locked_eventDuplicateMsg(&(git->second), mit->second, *pit);
|
||||
locked_eventDuplicateMsg(&(git->second), mit->second, *pit, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1361,7 +1366,7 @@ bool p3GroupDistrib::subscribeToGroup(const std::string &grpId, bool subscrib
|
|||
{
|
||||
git->second.flags &= (~RS_DISTRIB_SUBSCRIBED);
|
||||
|
||||
locked_notifyGroupChanged(git->second, GRP_UNSUBSCRIBED);
|
||||
locked_notifyGroupChanged(git->second, GRP_UNSUBSCRIBED, false);
|
||||
mGroupsRepublish = true;
|
||||
}
|
||||
}
|
||||
|
@ -1402,7 +1407,7 @@ bool p3GroupDistrib::attemptPublishKeysRecvd()
|
|||
|
||||
if(locked_updateGroupPublishKey(it->second, mit->second)){
|
||||
|
||||
locked_notifyGroupChanged(it->second, GRP_LOAD_KEY);
|
||||
locked_notifyGroupChanged(it->second, GRP_LOAD_KEY, false);
|
||||
}
|
||||
|
||||
if(it->second.flags & RS_DISTRIB_SUBSCRIBED){
|
||||
|
@ -1646,7 +1651,7 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
|
|||
if ((newGrp = dynamic_cast<RsDistribGrp *>(*lit)))
|
||||
{
|
||||
const std::string &gid = newGrp -> grpId;
|
||||
loadGroup(newGrp);
|
||||
loadGroup(newGrp, false);
|
||||
|
||||
subscribeToGroup(gid, true);
|
||||
}
|
||||
|
@ -1662,14 +1667,14 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
|
|||
continue;
|
||||
}
|
||||
|
||||
loadGroupKey(newKey);
|
||||
loadGroupKey(newKey, false);
|
||||
|
||||
|
||||
}
|
||||
else if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(*lit)))
|
||||
{
|
||||
newMsg->PeerId(mOwnId);
|
||||
loadMsg(newMsg, mOwnId, false); /* false so it'll pushed to PendingPublish list */
|
||||
loadMsg(newMsg, mOwnId, false, false); /* false so it'll pushed to PendingPublish list */
|
||||
}
|
||||
else if ((newChildConfig = dynamic_cast<RsDistribConfigData *>(*lit)))
|
||||
{
|
||||
|
@ -1971,7 +1976,7 @@ std::string p3GroupDistrib::createGroup(std::wstring name, std::wstring desc, ui
|
|||
EVP_PKEY_free(key_admin);
|
||||
|
||||
/******************* load up new Group *********************/
|
||||
loadGroup(newGrp);
|
||||
loadGroup(newGrp, false);
|
||||
|
||||
/* add Keys to GroupInfo */
|
||||
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
|
||||
|
@ -2094,7 +2099,7 @@ bool p3GroupDistrib::restoreGrpKeys(std::string grpId){
|
|||
|
||||
if(ok){
|
||||
gi->flags |= RS_DISTRIB_SUBSCRIBED;
|
||||
locked_notifyGroupChanged(*gi, GRP_SUBSCRIBED);
|
||||
locked_notifyGroupChanged(*gi, GRP_SUBSCRIBED, false);
|
||||
IndicateConfigChanged();
|
||||
mGroupsRepublish = true;
|
||||
}
|
||||
|
@ -2274,7 +2279,7 @@ void p3GroupDistrib::receivePubKeys(){
|
|||
if(locked_updateGroupPublishKey(it->second, key_item)){
|
||||
|
||||
mPubKeyAvailableGrpId.insert(key_item->grpId);
|
||||
locked_notifyGroupChanged(it->second, GRP_LOAD_KEY);
|
||||
locked_notifyGroupChanged(it->second, GRP_LOAD_KEY, false);
|
||||
|
||||
// keep key if user not subscribed
|
||||
if(it->second.flags & RS_DISTRIB_SUBSCRIBED){
|
||||
|
@ -2483,7 +2488,7 @@ std::string p3GroupDistrib::publishMsg(RsDistribMsg *msg, bool personalSign)
|
|||
*/
|
||||
|
||||
signedMsg->PeerId(mOwnId);
|
||||
loadMsg(signedMsg, mOwnId, false);
|
||||
loadMsg(signedMsg, mOwnId, false, false);
|
||||
|
||||
/* done */
|
||||
return msgId;
|
||||
|
@ -3683,7 +3688,7 @@ std::ostream &operator<<(std::ostream &out, const GroupInfo &info)
|
|||
return out;
|
||||
}
|
||||
|
||||
void p3GroupDistrib::locked_notifyGroupChanged(GroupInfo &info, uint32_t flags)
|
||||
void p3GroupDistrib::locked_notifyGroupChanged(GroupInfo &info, uint32_t flags, bool historical)
|
||||
{
|
||||
mGroupsChanged = true;
|
||||
info.grpChanged = true;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue