mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
Merge pull request #2245 from csoler/v0.6-BugFixing_7
Fixing the token queues
This commit is contained in:
commit
7613fe1a44
@ -647,7 +647,8 @@ uint64_t InternalFileHierarchyStorage::recursUpdateCumulatedSize(const Directory
|
||||
uint64_t local_cumulative_size = 0;
|
||||
|
||||
for(uint32_t i=0;i<d.subfiles.size();++i)
|
||||
local_cumulative_size += static_cast<FileEntry*>(mNodes[d.subfiles[i]])->file_size;
|
||||
if(mNodes[d.subfiles[i]]) // normally not needed, but an extra-security
|
||||
local_cumulative_size += static_cast<FileEntry*>(mNodes[d.subfiles[i]])->file_size;
|
||||
|
||||
for(uint32_t i=0;i<d.subdirs.size();++i)
|
||||
local_cumulative_size += recursUpdateCumulatedSize(d.subdirs[i]);
|
||||
|
@ -1308,8 +1308,13 @@ int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaRes
|
||||
auto meta = locked_getMsgMeta(*c, 0);
|
||||
|
||||
if(meta)
|
||||
{
|
||||
metaSet.push_back(meta);
|
||||
|
||||
if(mUseCache)
|
||||
mMsgMetaDataCache[grpId].updateMeta(msgId,meta);
|
||||
}
|
||||
|
||||
delete c;
|
||||
}
|
||||
}
|
||||
@ -1438,8 +1443,13 @@ int RsDataService::retrieveGxsGrpMetaData(std::map<RsGxsGroupId,std::shared_ptr<
|
||||
auto meta = locked_getGrpMeta(*c, 0);
|
||||
|
||||
if(meta)
|
||||
{
|
||||
mit->second = meta;
|
||||
|
||||
if(mUseCache)
|
||||
mGrpMetaDataCache.updateMeta(grpId,meta);
|
||||
}
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
++resultCount;
|
||||
#endif
|
||||
@ -1503,9 +1513,30 @@ int RsDataService::updateGroupMetaData(const GrpLocMetaData& meta)
|
||||
std::cerr << (void*)this << ": erasing old entry from cache." << std::endl;
|
||||
#endif
|
||||
|
||||
mGrpMetaDataCache.clear(meta.grpId);
|
||||
if( mDb->sqlUpdate(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", meta.val))
|
||||
{
|
||||
// If we use the cache, update the meta data immediately.
|
||||
|
||||
return mDb->sqlUpdate(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", meta.val) ? 1 : 0;
|
||||
if(mUseCache)
|
||||
{
|
||||
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, mGrpMetaColumns, "grpId='" + grpId.toStdString() + "'", "");
|
||||
|
||||
c->moveToFirst();
|
||||
|
||||
// temporarily disable the cache so that we get the value from the DB itself.
|
||||
mUseCache=false;
|
||||
auto meta = locked_getGrpMeta(*c, 0);
|
||||
mUseCache=true;
|
||||
|
||||
if(meta)
|
||||
mGrpMetaDataCache.updateMeta(grpId,meta);
|
||||
|
||||
delete c;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RsDataService::updateMessageMetaData(const MsgLocMetaData& metaData)
|
||||
@ -1518,9 +1549,30 @@ int RsDataService::updateMessageMetaData(const MsgLocMetaData& metaData)
|
||||
const RsGxsGroupId& grpId = metaData.msgId.first;
|
||||
const RsGxsMessageId& msgId = metaData.msgId.second;
|
||||
|
||||
mMsgMetaDataCache[grpId].clear(msgId);
|
||||
if(mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", metaData.val) )
|
||||
{
|
||||
// If we use the cache, update the meta data immediately.
|
||||
|
||||
return mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", metaData.val) ? 1 : 0;
|
||||
if(mUseCache)
|
||||
{
|
||||
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgMetaColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", "");
|
||||
|
||||
c->moveToFirst();
|
||||
|
||||
// temporarily disable the cache so that we get the value from the DB itself.
|
||||
mUseCache=false;
|
||||
auto meta = locked_getMsgMeta(*c, 0);
|
||||
mUseCache=true;
|
||||
|
||||
if(meta)
|
||||
mMsgMetaDataCache[grpId].updateMeta(msgId,meta);
|
||||
|
||||
delete c;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
|
||||
|
@ -79,14 +79,18 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void updateMeta(const ID& id,const MetaDataClass& meta)
|
||||
{
|
||||
void updateMeta(const ID& id,const MetaDataClass& meta)
|
||||
{
|
||||
mMetas[id] = std::make_shared<MetaDataClass>(meta); // create a new shared_ptr to possibly replace the previous one
|
||||
}
|
||||
|
||||
void updateMeta(const ID& id,const std::shared_ptr<MetaDataClass>& meta)
|
||||
{
|
||||
mMetas[id] = meta; // create a new shared_ptr to possibly replace the previous one
|
||||
}
|
||||
|
||||
void clear(const ID& id)
|
||||
{
|
||||
rstime_t now = time(NULL) ;
|
||||
auto it = mMetas.find(id) ;
|
||||
|
||||
// We dont actually delete the item, because it might be used by a calling client.
|
||||
@ -101,8 +105,11 @@ public:
|
||||
#endif
|
||||
|
||||
mMetas.erase(it) ;
|
||||
mCache_ContainsAllMetas = false;
|
||||
}
|
||||
|
||||
// No need to modify mCache_ContainsAllMetas since, assuming that the cache always contains
|
||||
// all possible elements from the DB, clearing one from the cache means that it is also deleted from the db, so
|
||||
// the property is preserved.
|
||||
}
|
||||
}
|
||||
|
||||
void debug_computeSize(uint32_t& nb_items, uint64_t& total_size) const
|
||||
|
@ -68,6 +68,37 @@ static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes
|
||||
* #define GEN_EXCH_DEBUG 1
|
||||
*/
|
||||
|
||||
#if defined(GEN_EXCH_DEBUG)
|
||||
static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_FORUMS;// use this to allow to this service id only, or 0 for all services
|
||||
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
|
||||
class nullstream: public std::ostream {};
|
||||
|
||||
// static std::string nice_time_stamp(rstime_t now,rstime_t TS)
|
||||
// {
|
||||
// if(TS == 0)
|
||||
// return "Never" ;
|
||||
// else
|
||||
// {
|
||||
// std::ostringstream s;
|
||||
// s << now - TS << " secs ago" ;
|
||||
// return s.str() ;
|
||||
// }
|
||||
// }
|
||||
|
||||
static std::ostream& gxsgenexchangedebug(uint32_t service_type)
|
||||
{
|
||||
static nullstream null ;
|
||||
|
||||
if (service_to_print==0 || service_type == 0 || (service_type == service_to_print))
|
||||
return std::cerr << time(NULL) << ":RSGENEXCHANGE service " << std::hex << service_type << std::dec << ": " ;
|
||||
else
|
||||
return null ;
|
||||
}
|
||||
|
||||
#define GXSGENEXCHANGEDEBUG gxsgenexchangedebug(serviceType())
|
||||
|
||||
#endif
|
||||
|
||||
// Data flow in RsGenExchange
|
||||
//
|
||||
// publishGroup()
|
||||
@ -386,20 +417,19 @@ bool RsGenExchange::messagePublicationTest(const RsGxsMsgMetaData& meta)
|
||||
return meta.mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP_FOREVER || st == 0 || storageTimeLimit >= time(NULL);
|
||||
}
|
||||
|
||||
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
|
||||
RsGxsGrpMsgIdPair& msgId)
|
||||
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token, RsGxsGrpMsgIdPair& msgId)
|
||||
{
|
||||
RS_STACK_MUTEX(mGenMtx) ;
|
||||
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "RsGenExchange::acknowledgeTokenMsg(). token=" << token << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "RsGenExchange::acknowledgeTokenMsg(). token=" << token << std::endl;
|
||||
#endif
|
||||
std::map<uint32_t, RsGxsGrpMsgIdPair >::iterator mit = mMsgNotify.find(token);
|
||||
|
||||
if(mit == mMsgNotify.end())
|
||||
{
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << " no notification found for this token." << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << " no notification found for this token." << std::endl;
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
@ -409,10 +439,10 @@ bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
|
||||
|
||||
// no dump token as client has ackowledged its completion
|
||||
mDataAccess->disposeOfPublicToken(token);
|
||||
mMsgNotify.erase(mit);
|
||||
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << " found grpId=" << msgId.first <<", msgId=" << msgId.second << std::endl;
|
||||
std::cerr << " disposing token from mDataAccess" << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << " found grpId=" << msgId.first <<", msgId=" << msgId.second << " disposing token from mDataAccess" << std::endl;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
@ -424,15 +454,14 @@ bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grp
|
||||
RS_STACK_MUTEX(mGenMtx) ;
|
||||
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "RsGenExchange::acknowledgeTokenGrp(). token=" << token << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "RsGenExchange::acknowledgeTokenGrp(). token=" << token << std::endl;
|
||||
#endif
|
||||
std::map<uint32_t, RsGxsGroupId >::iterator mit =
|
||||
mGrpNotify.find(token);
|
||||
std::map<uint32_t, RsGxsGroupId >::iterator mit = mGrpNotify.find(token);
|
||||
|
||||
if(mit == mGrpNotify.end())
|
||||
{
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << " no notification found for this token." << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << " no notification found for this token." << std::endl;
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
@ -441,10 +470,10 @@ bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grp
|
||||
|
||||
// no dump token as client has ackowledged its completion
|
||||
mDataAccess->disposeOfPublicToken(token);
|
||||
mGrpNotify.erase(mit);
|
||||
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << " found grpId=" << grpId << std::endl;
|
||||
std::cerr << " disposing token from mDataAccess" << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << " found grpId=" << grpId << ". Disposing token from mDataAccess" << std::endl;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
@ -484,8 +513,7 @@ void RsGenExchange::generateGroupKeys(RsTlvSecurityKeySet& keySet, bool genPubli
|
||||
uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet)
|
||||
{
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "RsGenExchange::createGroup()";
|
||||
std::cerr << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "RsGenExchange::createGroup()"<< std::endl;
|
||||
#endif
|
||||
|
||||
RsGxsGrpMetaData* meta = grp->metaData;
|
||||
@ -510,8 +538,7 @@ uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet)
|
||||
|
||||
if(!privKeyFound)
|
||||
{
|
||||
std::cerr << "RsGenExchange::createGroup() Missing private ADMIN Key";
|
||||
std::cerr << std::endl;
|
||||
std::cerr << "RsGenExchange::createGroup() Missing private ADMIN Key" << std::endl;
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -553,8 +580,7 @@ uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet)
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
std::cerr << "RsGenExchange::createGroup() ERROR !okay (getSignature error)";
|
||||
std::cerr << std::endl;
|
||||
std::cerr << "RsGenExchange::createGroup() ERROR !okay (getSignature error)" << std::endl;
|
||||
return CREATE_FAIL;
|
||||
}
|
||||
|
||||
@ -587,8 +613,7 @@ int RsGenExchange::createGroupSignatures(RsTlvKeySignatureSet& signSet, RsTlvBin
|
||||
{
|
||||
needIdentitySign = true;
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "Needs Identity sign! (Service Flags)";
|
||||
std::cerr << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "Needs Identity sign! (Service Flags)"<< std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -660,8 +685,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
|
||||
bool publishSignSuccess = false;
|
||||
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "RsGenExchange::createMsgSignatures() for Msg.mMsgName: " << msgMeta.mMsgName;
|
||||
std::cerr << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "RsGenExchange::createMsgSignatures() for Msg.mMsgName: " << msgMeta.mMsgName<< std::endl;
|
||||
#endif
|
||||
|
||||
|
||||
@ -698,8 +722,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
|
||||
{
|
||||
needPublishSign = true;
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "Needs Publish sign! (Service Flags)";
|
||||
std::cerr << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "Needs Publish sign! (Service Flags)"<< std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -708,8 +731,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
|
||||
{
|
||||
needIdentitySign = true;
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "Needs Identity sign! (Service Flags)";
|
||||
std::cerr << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "Needs Identity sign! (Service Flags)"<< std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -717,8 +739,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
|
||||
{
|
||||
needIdentitySign = true;
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "Needs Identity sign! (AuthorId Exists)";
|
||||
std::cerr << std::endl;
|
||||
GXSGENEXCHANGEDEBUG << "Needs Identity sign! (AuthorId Exists)"<< std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -2038,7 +2059,7 @@ void RsGenExchange::setGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId&
|
||||
void RsGenExchange::setGroupStatusFlags(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& status, const uint32_t& mask)
|
||||
{
|
||||
/* TODO APPLY MASK TO FLAGS */
|
||||
RS_STACK_MUTEX(mGenMtx) ;
|
||||
RS_STACK_MUTEX(mGenMtx) ;
|
||||
token = mDataAccess->generatePublicToken();
|
||||
|
||||
GrpLocMetaData g;
|
||||
@ -2194,6 +2215,9 @@ void RsGenExchange::processGrpMetaChanges()
|
||||
GrpLocMetaData& g = mit->second;
|
||||
uint32_t token = mit->first;
|
||||
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
RsDbg() << " Processing GrpMetaChange for token " << token << std::endl;
|
||||
#endif
|
||||
// process mask
|
||||
bool ok = processGrpMask(g.grpId, g.val);
|
||||
|
||||
@ -2203,7 +2227,8 @@ void RsGenExchange::processGrpMetaChanges()
|
||||
{
|
||||
mDataAccess->updatePublicRequestStatus(token, RsTokenService::COMPLETE);
|
||||
grpChanged.push_back(g.grpId);
|
||||
}else
|
||||
}
|
||||
else
|
||||
{
|
||||
mDataAccess->updatePublicRequestStatus(token, RsTokenService::FAILED);
|
||||
}
|
||||
@ -2211,6 +2236,9 @@ void RsGenExchange::processGrpMetaChanges()
|
||||
{
|
||||
RS_STACK_MUTEX(mGenMtx);
|
||||
mGrpNotify.insert(std::make_pair(token, g.grpId));
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
RsDbg() << " Processing GrpMetaChange Adding token " << token << " to mGrpNotify" << std::endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
@ -2594,8 +2622,7 @@ void RsGenExchange::processGroupDelete()
|
||||
typedef std::pair<bool, RsGxsGroupId> GrpNote;
|
||||
std::map<uint32_t, GrpNote> toNotify;
|
||||
|
||||
std::vector<GroupDeletePublish>::iterator vit = mGroupDeletePublish.begin();
|
||||
for(; vit != mGroupDeletePublish.end(); ++vit)
|
||||
for( auto vit = mGroupDeletePublish.begin();vit != mGroupDeletePublish.end(); ++vit)
|
||||
{
|
||||
std::vector<RsGxsGroupId> gprIds;
|
||||
gprIds.push_back(vit->mGroupId);
|
||||
@ -2603,18 +2630,16 @@ void RsGenExchange::processGroupDelete()
|
||||
toNotify.insert(std::make_pair( vit->mToken, GrpNote(true, vit->mGroupId)));
|
||||
}
|
||||
|
||||
|
||||
std::list<RsGxsGroupId> grpDeleted;
|
||||
std::map<uint32_t, GrpNote>::iterator mit = toNotify.begin();
|
||||
for(; mit != toNotify.end(); ++mit)
|
||||
for(auto mit=toNotify.begin(); mit != toNotify.end(); ++mit)
|
||||
{
|
||||
GrpNote& note = mit->second;
|
||||
RsTokenService::GxsRequestStatus status =
|
||||
note.first ? RsTokenService::COMPLETE
|
||||
: RsTokenService::FAILED;
|
||||
|
||||
mGrpNotify.insert(std::make_pair(mit->first, note.second));
|
||||
mDataAccess->updatePublicRequestStatus(mit->first, status);
|
||||
mDataAccess->disposeOfPublicToken(mit->first);
|
||||
|
||||
if(note.first)
|
||||
grpDeleted.push_back(note.second);
|
||||
@ -2632,39 +2657,52 @@ void RsGenExchange::processGroupDelete()
|
||||
void RsGenExchange::processMessageDelete()
|
||||
{
|
||||
RS_STACK_MUTEX(mGenMtx) ;
|
||||
#ifdef TODO
|
||||
typedef std::pair<bool, RsGxsGroupId> GrpNote;
|
||||
std::map<uint32_t, GrpNote> toNotify;
|
||||
|
||||
struct MsgNote {
|
||||
MsgNote(bool s,const GxsMsgReq& mid) : state(s),msgIds(mid){}
|
||||
|
||||
bool state;
|
||||
GxsMsgReq msgIds;
|
||||
};
|
||||
std::map<uint32_t, MsgNote> toNotify;
|
||||
|
||||
for( auto vit = mMsgDeletePublish.begin(); vit != mMsgDeletePublish.end(); ++vit)
|
||||
{
|
||||
uint32_t token = (*vit).mToken;
|
||||
bool res = mDataStore->removeMsgs( (*vit).mMsgs );
|
||||
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
for(auto mit: (*vit).mMsgs)
|
||||
{
|
||||
std::cerr << "Attempt to delete messages: token=" << token << std::endl;
|
||||
for(const auto& msg:mit.second)
|
||||
std::cerr << " grpId=" << mit.first << ", msgId=" << msg << std::endl;
|
||||
std::cerr << " Result: " << res << std::endl;
|
||||
}
|
||||
#endif
|
||||
toNotify.insert(std::make_pair(token, MsgNote(res,(*vit).mMsgs)));
|
||||
}
|
||||
|
||||
for( std::vector<MsgDeletePublish>::iterator vit = mMsgDeletePublish.begin(); vit != mMsgDeletePublish.end(); ++vit)
|
||||
{
|
||||
#ifdef TODO
|
||||
uint32_t token = (*vit).mToken;
|
||||
const RsGxsGroupId& groupId = gdp.grpItem->meta.mGroupId;
|
||||
toNotify.insert(std::make_pair( token, GrpNote(true, groupId)));
|
||||
#endif
|
||||
mDataStore->removeMsgs( (*vit).mMsgs );
|
||||
}
|
||||
std::list<GxsMsgReq> msgDeleted;
|
||||
|
||||
// std::list<RsGxsGroupId> grpDeleted;
|
||||
// std::map<uint32_t, GrpNote>::iterator mit = toNotify.begin();
|
||||
// for(; mit != toNotify.end(); ++mit)
|
||||
// {
|
||||
// GrpNote& note = mit->second;
|
||||
// uint8_t status = note.first ? RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE
|
||||
// : RsTokenService::GXS_REQUEST_V2_STATUS_FAILED;
|
||||
//
|
||||
// mGrpNotify.insert(std::make_pair(mit->first, note.second));
|
||||
// mDataAccess->updatePublicRequestStatus(mit->first, status);
|
||||
//
|
||||
// if(note.first)
|
||||
// grpDeleted.push_back(note.second);
|
||||
// }
|
||||
for(auto mit = toNotify.begin(); mit != toNotify.end(); ++mit)
|
||||
{
|
||||
MsgNote& note = mit->second;
|
||||
RsTokenService::GxsRequestStatus status =
|
||||
note.state ? RsTokenService::COMPLETE
|
||||
: RsTokenService::FAILED;
|
||||
|
||||
for(uint32_t i=0;i<mMsgDeletePublish.size();++i)
|
||||
for(auto it(mMsgDeletePublish[i].mMsgs.begin());it!=mMsgDeletePublish[i].mMsgs.end();++it)
|
||||
mNotifications.push_back(new RsGxsGroupChange(RsGxsNotify::TYPE_MESSAGE_DELETED,it->first, false));
|
||||
mDataAccess->updatePublicRequestStatus(mit->first, status);
|
||||
mDataAccess->disposeOfPublicToken(mit->first);
|
||||
|
||||
if(note.state)
|
||||
msgDeleted.push_back(note.msgIds);
|
||||
}
|
||||
|
||||
for(const auto& msgreq:msgDeleted)
|
||||
for(const auto& msgit:msgreq)
|
||||
for(const auto& msg:msgit.second)
|
||||
mNotifications.push_back(new RsGxsMsgChange(RsGxsNotify::TYPE_MESSAGE_DELETED,msgit.first,msg, false));
|
||||
|
||||
mMsgDeletePublish.clear();
|
||||
}
|
||||
@ -2958,7 +2996,11 @@ void RsGenExchange::publishGrps()
|
||||
|
||||
uint32_t RsGenExchange::generatePublicToken()
|
||||
{
|
||||
return mDataAccess->generatePublicToken();
|
||||
uint32_t token = mDataAccess->generatePublicToken();
|
||||
#ifdef GEN_EXCH_DEBUG
|
||||
std::cerr << "New token generated: " << token << " in RsGenExchange::generatePublicToken()" << std::endl;
|
||||
#endif
|
||||
return token;
|
||||
}
|
||||
|
||||
bool RsGenExchange::updatePublicRequestStatus(
|
||||
|
@ -30,6 +30,49 @@
|
||||
* #define DATA_DEBUG 1
|
||||
**********/
|
||||
|
||||
// Debug system to allow to print only for some services (group, Peer, etc)
|
||||
|
||||
#if defined(DATA_DEBUG)
|
||||
static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_FORUMS;// use this to allow to this service id only, or 0 for all services
|
||||
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
|
||||
|
||||
class nullstream: public std::ostream {};
|
||||
|
||||
static std::string nice_time_stamp(rstime_t now,rstime_t TS)
|
||||
{
|
||||
if(TS == 0)
|
||||
return "Never" ;
|
||||
else
|
||||
{
|
||||
std::ostringstream s;
|
||||
s << now - TS << " secs ago" ;
|
||||
return s.str() ;
|
||||
}
|
||||
}
|
||||
|
||||
static std::ostream& gxsdatadebug(uint32_t service_type)
|
||||
{
|
||||
static nullstream null ;
|
||||
|
||||
if (service_to_print==0 || service_type == 0 || (service_type == service_to_print))
|
||||
return std::cerr << time(NULL) << ":GXSDATASERVICE: " ;
|
||||
else
|
||||
return null ;
|
||||
}
|
||||
|
||||
#define GXSDATADEBUG gxsdatadebug(mDataStore->serviceType())
|
||||
|
||||
static const std::vector<std::string> tokenStatusString( {
|
||||
std::string("FAILED"),
|
||||
std::string("PENDING"),
|
||||
std::string("PARTIAL"),
|
||||
std::string("COMPLETE"),
|
||||
std::string("DONE"),
|
||||
std::string("CANCELLED"),
|
||||
});
|
||||
|
||||
#endif
|
||||
|
||||
bool operator<(const std::pair<uint32_t,GxsRequest*>& p1,const std::pair<uint32_t,GxsRequest*>& p2)
|
||||
{
|
||||
return p1.second->Options.mPriority <= p2.second->Options.mPriority ; // <= so that new elements with same priority are inserted before
|
||||
@ -48,7 +91,7 @@ bool RsGxsDataAccess::requestGroupInfo( uint32_t &token, uint32_t ansType, const
|
||||
{
|
||||
if(groupIds.empty())
|
||||
{
|
||||
RsErr() << __PRETTY_FUNCTION__ << " (WW) Group Id list is empty!" << std::endl;
|
||||
std::cerr << __PRETTY_FUNCTION__ << " (WW) Group Id list is empty!" << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -89,7 +132,7 @@ bool RsGxsDataAccess::requestGroupInfo( uint32_t &token, uint32_t ansType, const
|
||||
generateToken(token);
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
RsErr() << "RsGxsDataAccess::requestGroupInfo() gets token: " << token << std::endl;
|
||||
GXSDATADEBUG << "RsGxsDataAccess::requestGroupInfo() gets token: " << token << std::endl;
|
||||
#endif
|
||||
|
||||
setReq(req, token, ansType, opts);
|
||||
@ -120,7 +163,7 @@ bool RsGxsDataAccess::requestGroupInfo(uint32_t &token, uint32_t ansType, const
|
||||
|
||||
generateToken(token);
|
||||
#ifdef DATA_DEBUG
|
||||
RsErr() << "RsGxsDataAccess::requestGroupInfo() gets Token: " << token << std::endl;
|
||||
GXSDATADEBUG << "RsGxsDataAccess::requestGroupInfo() gets Token: " << token << std::endl;
|
||||
#endif
|
||||
|
||||
setReq(req, token, ansType, opts);
|
||||
@ -187,7 +230,7 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, const Rs
|
||||
{
|
||||
generateToken(token);
|
||||
#ifdef DATA_DEBUG
|
||||
RsErr() << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl;
|
||||
GXSDATADEBUG << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -239,7 +282,7 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, const Rs
|
||||
{
|
||||
generateToken(token);
|
||||
#ifdef DATA_DEBUG
|
||||
RsErr() << __PRETTY_FUNCTION__ << " gets Token: " << token << std::endl;
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__ << " gets Token: " << token << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -314,11 +357,11 @@ void RsGxsDataAccess::storeRequest(GxsRequest* req)
|
||||
mPublicToken[req->token] = PENDING;
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
RsErr() << "Stored request token=" << req->token << " priority = " << static_cast<int>(req->Options.mPriority) << " Current request Queue is:" ;
|
||||
GXSDATADEBUG << "Stored request token=" << req->token << " priority = " << static_cast<int>(req->Options.mPriority) << " Current request Queue is:" ;
|
||||
for(auto it(mRequestQueue.begin());it!=mRequestQueue.end();++it)
|
||||
RsErr() << it->first << " (p=" << static_cast<int>(req->Options.mPriority) << ") ";
|
||||
std::cerr << std::endl;
|
||||
RsErr() << "Completed requests waiting for client: " << mCompletedRequests.size() << std::endl;
|
||||
GXSDATADEBUG << it->first << " (p=" << static_cast<int>(req->Options.mPriority) << ") ";
|
||||
GXSDATADEBUG << std::endl;
|
||||
GXSDATADEBUG << "PublicToken size: " << mPublicToken.size() << " Completed requests waiting for client: " << mCompletedRequests.size() << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -370,6 +413,9 @@ bool RsGxsDataAccess::locked_clearRequest(const uint32_t& token)
|
||||
if(it2 != mPublicToken.end())
|
||||
mPublicToken.erase(it2);
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Removing public token " << token << ". Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -709,7 +755,10 @@ void RsGxsDataAccess::processRequests()
|
||||
|
||||
while (!mRequestQueue.empty())
|
||||
{
|
||||
// Extract the first elements from the request queue. cleanup all other elements marked at terminated.
|
||||
#ifdef DATA_DEBUG
|
||||
dumpTokenQueues();
|
||||
#endif
|
||||
// Extract the first elements from the request queue. cleanup all other elements marked at terminated.
|
||||
|
||||
GxsRequest* req = nullptr;
|
||||
{
|
||||
@ -735,7 +784,7 @@ void RsGxsDataAccess::processRequests()
|
||||
case FAILED:
|
||||
case CANCELLED:
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << " request " << mRequestQueue.begin()->second->token << ": status = " << mRequestQueue.begin()->second->status << ": removing from the RequestQueue" << std::endl;
|
||||
GXSDATADEBUG << " Service " << std::hex << mDataStore->serviceType() << std::dec << ": request " << mRequestQueue.begin()->second->token << ": status = " << mRequestQueue.begin()->second->status << ": removing from the RequestQueue" << std::endl;
|
||||
#endif
|
||||
delete mRequestQueue.begin()->second;
|
||||
mRequestQueue.erase(mRequestQueue.begin());
|
||||
@ -767,7 +816,7 @@ void RsGxsDataAccess::processRequests()
|
||||
ServiceStatisticRequest* ssr;
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "Processing request: " << req->token << " Status: " << req->status << " ReqType: " << req->reqType << " Age: " << time(nullptr) - req->reqTime << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Processing request: " << req->token << " Status: " << req->status << " ReqType: " << req->reqType << " Age: " << time(nullptr) - req->reqTime << std::endl;
|
||||
#endif
|
||||
|
||||
/* PROCESS REQUEST! */
|
||||
@ -826,7 +875,7 @@ void RsGxsDataAccess::processRequests()
|
||||
// When the request is complete, we move it to the complete list, so that the caller can easily retrieve the request data
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << " Request completed successfully. Marking as COMPLETE." << std::endl;
|
||||
GXSDATADEBUG << " Service " << std::hex << mDataStore->serviceType() << std::dec << ": Request completed successfully. Marking as COMPLETE." << std::endl;
|
||||
#endif
|
||||
req->status = COMPLETE ;
|
||||
mCompletedRequests[req->token] = req;
|
||||
@ -837,7 +886,7 @@ void RsGxsDataAccess::processRequests()
|
||||
mPublicToken[req->token] = FAILED;
|
||||
delete req;//req belongs to no one now
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << " Request failed. Marking as FAILED." << std::endl;
|
||||
GXSDATADEBUG << " Service " << std::hex << mDataStore->serviceType() << std::dec << ": Request failed. Marking as FAILED." << std::endl;
|
||||
#endif
|
||||
}
|
||||
} // END OF MUTEX.
|
||||
@ -990,7 +1039,7 @@ bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokRe
|
||||
*
|
||||
*/
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgList()" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList()" << std::endl;
|
||||
#endif
|
||||
|
||||
bool onlyOrigMsgs = false;
|
||||
@ -1001,14 +1050,14 @@ bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokRe
|
||||
if (opts.mOptions & RS_TOKREQOPT_MSG_ORIGMSG)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgList() MSG_ORIGMSG" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList() MSG_ORIGMSG" << std::endl;
|
||||
#endif
|
||||
onlyOrigMsgs = true;
|
||||
}
|
||||
else if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgList() MSG_LATEST" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList() MSG_LATEST" << std::endl;
|
||||
#endif
|
||||
onlyLatestMsgs = true;
|
||||
}
|
||||
@ -1016,7 +1065,7 @@ bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokRe
|
||||
if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgList() MSG_THREAD" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList() MSG_THREAD" << std::endl;
|
||||
#endif
|
||||
onlyThreadHeadMsgs = true;
|
||||
}
|
||||
@ -1199,7 +1248,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
* 1) No Flags => return nothing
|
||||
*/
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedList()" << std::endl;
|
||||
GXSDATADEBUG << "RsGxsDataAccess::getMsgRelatedList()" << std::endl;
|
||||
#endif
|
||||
|
||||
const RsTokReqOptions& opts = req->Options;
|
||||
@ -1212,14 +1261,14 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_LATEST" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_LATEST" << std::endl;
|
||||
#endif
|
||||
onlyLatestMsgs = true;
|
||||
}
|
||||
else if (opts.mOptions & RS_TOKREQOPT_MSG_VERSIONS)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_VERSIONS" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_VERSIONS" << std::endl;
|
||||
#endif
|
||||
onlyAllVersions = true;
|
||||
}
|
||||
@ -1227,7 +1276,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
if (opts.mOptions & RS_TOKREQOPT_MSG_PARENT)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_PARENTS" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_PARENTS" << std::endl;
|
||||
#endif
|
||||
onlyChildMsgs = true;
|
||||
}
|
||||
@ -1235,7 +1284,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_THREAD" << std::endl;
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_THREAD" << std::endl;
|
||||
#endif
|
||||
onlyThreadMsgs = true;
|
||||
}
|
||||
@ -1350,7 +1399,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
if (oit == origMsgTs.end())
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedList() Found New OrigMsgId: "
|
||||
GXSDATADEBUG << "RsGxsDataAccess::getMsgRelatedList() Found New OrigMsgId: "
|
||||
<< meta->mOrigMsgId
|
||||
<< " MsgId: " << meta->mMsgId
|
||||
<< " TS: " << meta->mPublishTs
|
||||
@ -1363,7 +1412,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
else if (oit->second.second < meta->mPublishTs)
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedList() Found Later Msg. OrigMsgId: "
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() Found Later Msg. OrigMsgId: "
|
||||
<< meta->mOrigMsgId
|
||||
<< " MsgId: " << meta->mMsgId
|
||||
<< " TS: " << meta->mPublishTs
|
||||
@ -1584,7 +1633,7 @@ void RsGxsDataAccess::filterMsgIdList( GxsMsgIdResult& resultsMap, const RsTokRe
|
||||
MsgMetaFilter::const_iterator cit = msgMetas.find(groupId);
|
||||
if(cit == msgMetas.end()) continue;
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
|
||||
<< " for group: " << groupId << " before filtering"
|
||||
<< std::endl;
|
||||
#endif
|
||||
@ -1607,7 +1656,7 @@ void RsGxsDataAccess::filterMsgIdList( GxsMsgIdResult& resultsMap, const RsTokRe
|
||||
}
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
|
||||
<< " for group: " << groupId << " after filtering"
|
||||
<< std::endl;
|
||||
#endif
|
||||
@ -1640,7 +1689,7 @@ bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& stat
|
||||
GxsRequest* req = locked_retrieveCompletedRequest(token);
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << "CheckRequestStatus: token=" << token ;
|
||||
GXSDATADEBUG << "CheckRequestStatus: token=" << token << std::endl ;
|
||||
#endif
|
||||
|
||||
if(req != nullptr)
|
||||
@ -1650,7 +1699,7 @@ bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& stat
|
||||
status = COMPLETE;
|
||||
ts = req->reqTime;
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__ << " Returning status = COMPLETE" << std::endl;
|
||||
GXSDATADEBUG << " Returning status = COMPLETE" << std::endl;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
@ -1661,14 +1710,14 @@ bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& stat
|
||||
{
|
||||
status = it->second;
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__ << " Returning status = " << status << std::endl;
|
||||
GXSDATADEBUG << " Returning status = " << status << std::endl;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
status = FAILED;
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << " Token not found. Returning FAILED" << std::endl;
|
||||
GXSDATADEBUG << " Token not found. Returning FAILED" << std::endl;
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
@ -1749,7 +1798,12 @@ uint32_t RsGxsDataAccess::generatePublicToken()
|
||||
{
|
||||
RS_STACK_MUTEX(mDataMutex);
|
||||
mPublicToken[token] = PENDING ;
|
||||
}
|
||||
#ifdef DATA_DEBUG
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Adding new public token " << token << " in PENDING state. Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl;
|
||||
if(mDataStore->serviceType() == 0x218 && token==19)
|
||||
print_stacktrace();
|
||||
#endif
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
@ -1765,7 +1819,10 @@ bool RsGxsDataAccess::updatePublicRequestStatus( uint32_t token, RsTokenService:
|
||||
if(mit != mPublicToken.end())
|
||||
{
|
||||
mit->second = status;
|
||||
return true;
|
||||
#ifdef DATA_DEBUG
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": updating public token " << token << " to state " << tokenStatusString[status] << std::endl;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
@ -1780,12 +1837,35 @@ bool RsGxsDataAccess::disposeOfPublicToken(uint32_t token)
|
||||
if(mit != mPublicToken.end())
|
||||
{
|
||||
mPublicToken.erase(mit);
|
||||
return true;
|
||||
#ifdef DATA_DEBUG
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Deleting public token " << token << ". Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl;
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef DATA_DEBUG
|
||||
void RsGxsDataAccess::dumpTokenQueues()
|
||||
{
|
||||
RS_STACK_MUTEX(mDataMutex);
|
||||
|
||||
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": dumping token list."<< std::endl;
|
||||
|
||||
for(auto tokenpair:mPublicToken)
|
||||
GXSDATADEBUG << " Public Token " << tokenpair.first << " : " << tokenStatusString[tokenpair.second] << std::endl;
|
||||
|
||||
for(auto tokenpair:mCompletedRequests)
|
||||
GXSDATADEBUG << " Completed Tokens: " << tokenpair.first << std::endl;
|
||||
|
||||
for(auto tokenpair:mRequestQueue)
|
||||
GXSDATADEBUG << " RequestQueue: " << tokenpair.first << " status " << tokenStatusString[tokenpair.second->status] << std::endl;
|
||||
|
||||
GXSDATADEBUG << std::endl;
|
||||
}
|
||||
#endif
|
||||
|
||||
bool RsGxsDataAccess::checkGrpFilter(const RsTokReqOptions &opts, const std::shared_ptr<RsGxsGrpMetaData>& meta) const
|
||||
{
|
||||
|
||||
@ -1815,7 +1895,7 @@ bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::sha
|
||||
(opts.mStatusMask & meta->mMsgStatus) )
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__
|
||||
<< " Continue checking Msg as StatusMatches: "
|
||||
<< " Mask: " << opts.mStatusMask
|
||||
<< " StatusFilter: " << opts.mStatusFilter
|
||||
@ -1826,7 +1906,7 @@ bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::sha
|
||||
else
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__
|
||||
<< " Dropping Msg due to !StatusMatch "
|
||||
<< " Mask: " << opts.mStatusMask
|
||||
<< " StatusFilter: " << opts.mStatusFilter
|
||||
@ -1840,7 +1920,7 @@ bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::sha
|
||||
else
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__
|
||||
<< " Status check not requested"
|
||||
<< " mStatusMask: " << opts.mStatusMask
|
||||
<< " MsgId: " << meta->mMsgId << std::endl;
|
||||
@ -1854,7 +1934,7 @@ bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::sha
|
||||
(opts.mMsgFlagMask & meta->mMsgFlags) )
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__
|
||||
<< " Accepting Msg as FlagMatches: "
|
||||
<< " Mask: " << opts.mMsgFlagMask
|
||||
<< " FlagFilter: " << opts.mMsgFlagFilter
|
||||
@ -1865,7 +1945,7 @@ bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::sha
|
||||
else
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__
|
||||
<< " Dropping Msg due to !FlagMatch "
|
||||
<< " Mask: " << opts.mMsgFlagMask
|
||||
<< " FlagFilter: " << opts.mMsgFlagFilter
|
||||
@ -1879,7 +1959,7 @@ bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::sha
|
||||
else
|
||||
{
|
||||
#ifdef DATA_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__
|
||||
GXSDATADEBUG << __PRETTY_FUNCTION__
|
||||
<< " Flags check not requested"
|
||||
<< " mMsgFlagMask: " << opts.mMsgFlagMask
|
||||
<< " MsgId: " << meta->mMsgId << std::endl;
|
||||
|
@ -499,6 +499,7 @@ private:
|
||||
*/
|
||||
bool getMsgIdList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut);
|
||||
|
||||
void dumpTokenQueues();
|
||||
private:
|
||||
bool locked_clearRequest(const uint32_t &token);
|
||||
|
||||
|
@ -1777,12 +1777,13 @@ bool p3GxsChannels::createComment(RsGxsComment& comment) // deprecated
|
||||
return true;
|
||||
}
|
||||
|
||||
bool p3GxsChannels::subscribeToChannel(
|
||||
const RsGxsGroupId& groupId, bool subscribe )
|
||||
bool p3GxsChannels::subscribeToChannel( const RsGxsGroupId& groupId, bool subscribe )
|
||||
{
|
||||
uint32_t token;
|
||||
if( !subscribeToGroup(token, groupId, subscribe)
|
||||
|| waitToken(token) != RsTokenService::COMPLETE ) return false;
|
||||
if( !subscribeToGroup(token, groupId, subscribe) || waitToken(token) != RsTokenService::COMPLETE ) return false;
|
||||
|
||||
RsGxsGroupId grpId;
|
||||
acknowledgeGrp(token,grpId);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1791,6 +1792,10 @@ bool p3GxsChannels::markRead(const RsGxsGrpMsgIdPair& msgId, bool read)
|
||||
uint32_t token;
|
||||
setMessageReadStatus(token, msgId, read);
|
||||
if(waitToken(token) != RsTokenService::COMPLETE ) return false;
|
||||
|
||||
RsGxsGrpMsgIdPair p;
|
||||
acknowledgeMsg(token,p);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -601,7 +601,7 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
|
||||
#endif
|
||||
RsGxsCircleId circle_id(msgChange->mGroupId);
|
||||
|
||||
if(rsEvents && (c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW))
|
||||
if(rsEvents && ((c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW) || (c->getType() == RsGxsNotify::TYPE_PUBLISHED)))
|
||||
{
|
||||
const RsGxsCircleSubscriptionRequestItem *item = dynamic_cast<const RsGxsCircleSubscriptionRequestItem *>(msgChange->mNewMsgItem);
|
||||
|
||||
@ -807,7 +807,7 @@ bool p3GxsCircles::getCircleDetails(const RsGxsCircleId& id, RsGxsCircleDetails&
|
||||
details.mRestrictedCircleId = data.mRestrictedCircleId;
|
||||
|
||||
details.mAllowedNodes = data.mAllowedNodes;
|
||||
details.mSubscriptionFlags.clear();
|
||||
details.mSubscriptionFlags.clear();
|
||||
details.mAllowedGxsIds.clear();
|
||||
details.mAmIAllowed = false ;
|
||||
details.mAmIAdmin = bool(data.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN);
|
||||
@ -1501,6 +1501,70 @@ bool p3GxsCircles::checkCircleCache()
|
||||
return true ;
|
||||
}
|
||||
|
||||
bool p3GxsCircles::locked_setGroupUnprocessedStatus(RsGxsCircleCache& cache,bool unprocessed)
|
||||
{
|
||||
uint32_t token2;
|
||||
|
||||
if(unprocessed)
|
||||
cache.mGroupStatus |= GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
|
||||
else
|
||||
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
|
||||
|
||||
RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), unprocessed, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
|
||||
|
||||
// Now we need to async acknowledge the token when the job is finished. We cannot do this sync because it's the
|
||||
// current thread that takes care of calling the handling of group processing.
|
||||
|
||||
RsThread::async([token2,this]()
|
||||
{
|
||||
std::chrono::milliseconds maxWait = std::chrono::milliseconds(10000);
|
||||
std::chrono::milliseconds checkEvery = std::chrono::milliseconds(100);
|
||||
|
||||
auto timeout = std::chrono::steady_clock::now() + maxWait; // wait for 10 secs at most
|
||||
auto st = requestStatus(token2);
|
||||
|
||||
while( !(st == RsTokenService::FAILED || st >= RsTokenService::COMPLETE) && std::chrono::steady_clock::now() < timeout )
|
||||
{
|
||||
std::this_thread::sleep_for(checkEvery);
|
||||
st = requestStatus(token2);
|
||||
}
|
||||
|
||||
RsGxsGroupId grpId;
|
||||
acknowledgeGrp(token2,grpId);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
bool p3GxsCircles::locked_subscribeToCircle(const RsGxsCircleId &grpId, bool subscribe)
|
||||
{
|
||||
uint32_t token;
|
||||
if(!RsGenExchange::subscribeToGroup(token, RsGxsGroupId(grpId), subscribe))
|
||||
return false;
|
||||
|
||||
// Now we need to async acknowledge the token when the job is finished. We cannot do this sync because it's the
|
||||
// current thread that takes care of calling the handling of group processing.
|
||||
|
||||
RsThread::async([token,this]()
|
||||
{
|
||||
std::chrono::milliseconds maxWait = std::chrono::milliseconds(10000);
|
||||
std::chrono::milliseconds checkEvery = std::chrono::milliseconds(100);
|
||||
|
||||
auto timeout = std::chrono::steady_clock::now() + maxWait; // wait for 10 secs at most
|
||||
auto st = requestStatus(token);
|
||||
|
||||
while( !(st == RsTokenService::FAILED || st >= RsTokenService::COMPLETE) && std::chrono::steady_clock::now() < timeout )
|
||||
{
|
||||
std::this_thread::sleep_for(checkEvery);
|
||||
st = requestStatus(token);
|
||||
}
|
||||
|
||||
RsGxsGroupId grpId;
|
||||
acknowledgeGrp(token,grpId);
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool p3GxsCircles::locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache& cache)
|
||||
{
|
||||
rstime_t now = time(NULL) ;
|
||||
@ -1513,20 +1577,19 @@ bool p3GxsCircles::locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache&
|
||||
#ifdef DEBUG_CIRCLES
|
||||
std::cerr << "Cache entry for circle " << cache.mCircleId << " needs a swab over membership requests. Re-scheduling it." << std::endl;
|
||||
#endif
|
||||
cache.mGroupStatus |= GXS_SERV::GXS_GRP_STATUS_UNPROCESSED; // forces processing of cache entry
|
||||
uint32_t token;
|
||||
RsGenExchange::setGroupStatusFlags(token, RsGxsGroupId(cache.mCircleId.toStdString()), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
|
||||
locked_setGroupUnprocessedStatus(cache,true); // forces the re-check of the group
|
||||
|
||||
// this should be called regularly
|
||||
// this should be called regularly
|
||||
|
||||
RsTokReqOptions opts;
|
||||
opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
|
||||
std::list<RsGxsGroupId> grpIds ;
|
||||
uint32_t token2;
|
||||
|
||||
grpIds.push_back(RsGxsGroupId(cache.mCircleId)) ;
|
||||
|
||||
RsGenExchange::getTokenService()->requestMsgInfo(token, RS_TOKREQ_ANSTYPE_SUMMARY, opts, grpIds);
|
||||
GxsTokenQueue::queueRequest(token, CIRCLEREQ_MESSAGE_DATA);
|
||||
RsGenExchange::getTokenService()->requestMsgInfo(token2, RS_TOKREQ_ANSTYPE_SUMMARY, opts, grpIds);
|
||||
GxsTokenQueue::queueRequest(token2, CIRCLEREQ_MESSAGE_DATA);
|
||||
}
|
||||
return true ;
|
||||
}
|
||||
@ -1599,8 +1662,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cac
|
||||
/* we are part of this group - subscribe, clear unprocessed flag */
|
||||
std::cerr << " either admin or have posted a subscribe/unsubscribe message => AutoSubscribing!" << std::endl;
|
||||
#endif
|
||||
uint32_t token;
|
||||
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), true);
|
||||
locked_subscribeToCircle(cache.mCircleId,true);
|
||||
mShouldSendCacheUpdateNotification = true;
|
||||
}
|
||||
#ifdef DEBUG_CIRCLES
|
||||
@ -1617,8 +1679,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cac
|
||||
/* we know all the peers - we are not part - we can flag as PROCESSED. */
|
||||
if(cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
|
||||
{
|
||||
uint32_t token;
|
||||
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), false);
|
||||
locked_subscribeToCircle(cache.mCircleId,false);
|
||||
mShouldSendCacheUpdateNotification = true;
|
||||
#ifdef DEBUG_CIRCLES
|
||||
std::cerr << " Neither admin nor subscription msg author! Let's unsubscribe this circle of unfriendly Napoleons!" << std::endl;
|
||||
@ -1634,9 +1695,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cac
|
||||
#ifdef DEBUG_CIRCLES
|
||||
std::cerr << " Marking the cache entry as processed." << std::endl;
|
||||
#endif
|
||||
uint32_t token2;
|
||||
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
|
||||
RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
|
||||
locked_setGroupUnprocessedStatus(cache,false);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -1830,9 +1889,7 @@ void p3GxsCircles::handle_event(uint32_t event_type, const std::string &elabel)
|
||||
// | | Grp Subscribed: NO | Grp Subscribed: NO |
|
||||
// +-------------+------------------------------+-----------------------------+
|
||||
|
||||
bool p3GxsCircles::pushCircleMembershipRequest(
|
||||
const RsGxsId& own_gxsid, const RsGxsCircleId& circle_id,
|
||||
RsGxsCircleSubscriptionType request_type )
|
||||
bool p3GxsCircles::pushCircleMembershipRequest( const RsGxsId& own_gxsid, const RsGxsCircleId& circle_id, RsGxsCircleSubscriptionType request_type )
|
||||
{
|
||||
Dbg3() << __PRETTY_FUNCTION__ << "own_gxsid = " << own_gxsid
|
||||
<< ", circle=" << circle_id << ", req type=" << request_type
|
||||
@ -1859,10 +1916,7 @@ bool p3GxsCircles::pushCircleMembershipRequest(
|
||||
// If the circle is not subscribed, then subscribe, whatever the subscription type. Indeed, if we publish a msg, even a msg for
|
||||
// unsubscribing, we need to have a subscribed group first.
|
||||
|
||||
uint32_t token ;
|
||||
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(circle_id), true);
|
||||
|
||||
if(waitToken(token) != RsTokenService::COMPLETE)
|
||||
if(!locked_subscribeToCircle(circle_id,true))
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << " Could not subscribe to Circle group." << std::endl;
|
||||
return false;
|
||||
@ -1897,7 +1951,29 @@ bool p3GxsCircles::pushCircleMembershipRequest(
|
||||
std::cerr << " ThreadId : " << s->meta.mThreadId << std::endl;
|
||||
#endif
|
||||
|
||||
uint32_t token;
|
||||
RsGenExchange::publishMsg(token, s);
|
||||
|
||||
// This is manual handling of token. We need to clear it up from the notification when done, and that needs
|
||||
// to be async-ed, since the processing of message publication is done in the same thread.
|
||||
|
||||
RsThread::async( [this,token]()
|
||||
{
|
||||
std::chrono::milliseconds maxWait = std::chrono::milliseconds(10000);
|
||||
std::chrono::milliseconds checkEvery = std::chrono::milliseconds(100);
|
||||
|
||||
auto timeout = std::chrono::steady_clock::now() + maxWait; // wait for 10 secs at most
|
||||
auto st = requestStatus(token);
|
||||
|
||||
while( !(st == RsTokenService::FAILED || st >= RsTokenService::COMPLETE) && std::chrono::steady_clock::now() < timeout )
|
||||
{
|
||||
std::this_thread::sleep_for(checkEvery);
|
||||
st = requestStatus(token);
|
||||
}
|
||||
|
||||
std::pair<RsGxsGroupId,RsGxsMessageId> grpmsgId;
|
||||
acknowledgeMsg(token,grpmsgId);
|
||||
});
|
||||
|
||||
// update the cache.
|
||||
force_cache_reload(circle_id);
|
||||
@ -2046,9 +2122,12 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
|
||||
locked_checkCircleCacheForAutoSubscribe(cache);
|
||||
}
|
||||
|
||||
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
|
||||
uint32_t token2;
|
||||
RsGenExchange::deleteMsgs(token2,messages_to_delete);
|
||||
if(!messages_to_delete.empty())
|
||||
{
|
||||
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
|
||||
uint32_t token2;
|
||||
RsGenExchange::deleteMsgs(token2,messages_to_delete);
|
||||
}
|
||||
return true ;
|
||||
}
|
||||
|
||||
|
@ -346,6 +346,8 @@ protected:
|
||||
bool locked_processMembershipMessages(RsGxsCircleCache& cache,const std::vector<RsGxsMsgItem*>& items, GxsMsgReq& messages_to_delete,const std::set<RsGxsId>& own_ids);
|
||||
bool locked_processLoadingCacheEntry(RsGxsCircleCache &cache);
|
||||
bool locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache &cache);
|
||||
bool locked_setGroupUnprocessedStatus(RsGxsCircleCache& cache,bool unprocessed);
|
||||
bool locked_subscribeToCircle(const RsGxsCircleId &grpId, bool subscribe);
|
||||
|
||||
p3IdService *mIdentities; // Needed for constructing Circle Info,
|
||||
PgpAuxUtils *mPgpUtils;
|
||||
|
@ -804,8 +804,7 @@ bool p3GxsForums::getForumContent(
|
||||
GxsMsgReq msgIds;
|
||||
msgIds[forumId] = msgs_to_request;
|
||||
|
||||
if( !requestMsgInfo(token, opts, msgIds) ||
|
||||
waitToken(token,std::chrono::seconds(5)) != RsTokenService::COMPLETE )
|
||||
if( !requestMsgInfo(token, opts, msgIds) || waitToken(token,std::chrono::seconds(5)) != RsTokenService::COMPLETE )
|
||||
return false;
|
||||
|
||||
return getMsgData(token, msgs);
|
||||
@ -836,15 +835,21 @@ bool p3GxsForums::markRead(const RsGxsGrpMsgIdPair& msgId, bool read)
|
||||
uint32_t token;
|
||||
setMessageReadStatus(token, msgId, read);
|
||||
if(waitToken(token,std::chrono::milliseconds(5000)) != RsTokenService::COMPLETE ) return false;
|
||||
|
||||
RsGxsGrpMsgIdPair p;
|
||||
acknowledgeMsg(token,p);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool p3GxsForums::subscribeToForum(
|
||||
const RsGxsGroupId& groupId, bool subscribe )
|
||||
bool p3GxsForums::subscribeToForum(const RsGxsGroupId& groupId, bool subscribe )
|
||||
{
|
||||
uint32_t token;
|
||||
if( !RsGenExchange::subscribeToGroup(token, groupId, subscribe)
|
||||
|| waitToken(token) != RsTokenService::COMPLETE ) return false;
|
||||
if( !RsGenExchange::subscribeToGroup(token, groupId, subscribe) || waitToken(token) != RsTokenService::COMPLETE ) return false;
|
||||
|
||||
RsGxsGroupId grp;
|
||||
acknowledgeGrp(token,grp);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -680,6 +680,26 @@ void p3IdService::notifyChanges(std::vector<RsGxsNotify *> &changes)
|
||||
{
|
||||
uint32_t token;
|
||||
RsGenExchange::subscribeToGroup(token, gid, true);
|
||||
|
||||
// we need to acknowledge the token in a async process
|
||||
|
||||
RsThread::async( [this,token]()
|
||||
{
|
||||
std::chrono::milliseconds maxWait = std::chrono::milliseconds(10000);
|
||||
std::chrono::milliseconds checkEvery = std::chrono::milliseconds(100);
|
||||
|
||||
auto timeout = std::chrono::steady_clock::now() + maxWait; // wait for 10 secs at most
|
||||
auto st = requestStatus(token);
|
||||
|
||||
while( !(st == RsTokenService::FAILED || st >= RsTokenService::COMPLETE) && std::chrono::steady_clock::now() < timeout )
|
||||
{
|
||||
std::this_thread::sleep_for(checkEvery);
|
||||
st = requestStatus(token);
|
||||
}
|
||||
|
||||
RsGxsGroupId grpId;
|
||||
acknowledgeGrp(token,grpId);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -494,6 +494,8 @@ bool p3Posted::setPostReadStatus(const RsGxsGrpMsgIdPair& msgId, bool read)
|
||||
std::cerr << __PRETTY_FUNCTION__ << " Error! GXS operation failed." << std::endl;
|
||||
return false;
|
||||
}
|
||||
RsGxsGrpMsgIdPair p;
|
||||
acknowledgeMsg(token,p);
|
||||
return true;
|
||||
}
|
||||
bool p3Posted::editBoard(RsPostedGroup& board)
|
||||
|
@ -252,9 +252,9 @@ void BoardPostDisplayWidgetBase::setup()
|
||||
|
||||
scoreLabel()->setText(score);
|
||||
|
||||
// FIX THIS UP LATER.
|
||||
notes()->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), RSHTML_FORMATTEXT_EMBED_SMILEYS | RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
pictureLabel()->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), RSHTML_FORMATTEXT_EMBED_SMILEYS | RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
// FIX THIS UP LATER. Smileys are extra costly, so we only use them where really necessary
|
||||
notes()->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), /* RSHTML_FORMATTEXT_EMBED_SMILEYS |*/ RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
pictureLabel()->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), /* RSHTML_FORMATTEXT_EMBED_SMILEYS |*/ RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
|
||||
// feed.
|
||||
//frame_comment->show();
|
||||
|
@ -286,7 +286,7 @@ void PostedCardView::fill()
|
||||
ui->scoreLabel->setText(score);
|
||||
|
||||
// FIX THIS UP LATER.
|
||||
ui->notes->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), RSHTML_FORMATTEXT_EMBED_SMILEYS | RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
ui->notes->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), /* RSHTML_FORMATTEXT_EMBED_SMILEYS |*/ RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
|
||||
QTextDocument doc;
|
||||
doc.setHtml(ui->notes->text());
|
||||
|
@ -603,7 +603,7 @@ void PostedItem::fill()
|
||||
ui->scoreLabel->setText(score);
|
||||
|
||||
// FIX THIS UP LATER.
|
||||
ui->notes->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), RSHTML_FORMATTEXT_EMBED_SMILEYS | RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
ui->notes->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mNotes.c_str()), /* RSHTML_FORMATTEXT_EMBED_SMILEYS |*/ RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
|
||||
QTextDocument doc;
|
||||
doc.setHtml(ui->notes->text());
|
||||
|
@ -548,7 +548,7 @@ void PostedListWidgetWithModel::showPostDetails()
|
||||
|
||||
std::cerr << "Showing details about selected index : "<< index.row() << "," << index.column() << std::endl;
|
||||
|
||||
ui->postDetails_TE->setText(RsHtml().formatText(NULL, QString::fromUtf8(post.mMsg.c_str()), RSHTML_FORMATTEXT_EMBED_SMILEYS | RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
ui->postDetails_TE->setText(RsHtml().formatText(NULL, QString::fromUtf8(post.mMsg.c_str()), /* RSHTML_FORMATTEXT_EMBED_SMILEYS |*/ RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
|
||||
QPixmap postImage;
|
||||
|
||||
|
@ -732,13 +732,13 @@ void RsPostedPostsModel::setAllMsgReadStatus(bool read)
|
||||
for(uint32_t i=0;i<mPosts.size();++i)
|
||||
pairs.push_back(RsGxsGrpMsgIdPair(mPosts[i].mMeta.mGroupId,mPosts[i].mMeta.mMsgId));
|
||||
|
||||
RsThread::async([read,pairs]()
|
||||
{
|
||||
// Call blocking API
|
||||
|
||||
for(auto& p:pairs)
|
||||
for(auto& p:pairs)
|
||||
RsThread::async([read,p]()
|
||||
{
|
||||
rsPosted->setPostReadStatus(p,read);
|
||||
} );
|
||||
} );
|
||||
}
|
||||
void RsPostedPostsModel::setMsgReadStatus(const QModelIndex& i,bool read_status)
|
||||
{
|
||||
|
@ -616,7 +616,7 @@ void GxsChannelPostItem::fill()
|
||||
|
||||
void GxsChannelPostItem::fillExpandFrame()
|
||||
{
|
||||
ui->msgLabel->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mMsg.c_str()), RSHTML_FORMATTEXT_EMBED_SMILEYS | RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
ui->msgLabel->setText(RsHtml().formatText(NULL, QString::fromUtf8(mPost.mMsg.c_str()), /* RSHTML_FORMATTEXT_EMBED_SMILEYS |*/ RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
|
||||
}
|
||||
|
||||
|
@ -795,12 +795,12 @@ void RsGxsChannelPostsModel::setAllMsgReadStatus(bool read_status)
|
||||
|
||||
// 2 - then call the async methods
|
||||
|
||||
RsThread::async([pairs, read_status]()
|
||||
{
|
||||
for(uint32_t i=0;i<pairs.size();++i)
|
||||
if(!rsGxsChannels->markRead(pairs[i],read_status))
|
||||
RsErr() << "setAllMsgReadStatus: failed to change status of msg " << pairs[i].first << " in group " << pairs[i].second << " to status " << read_status << std::endl;
|
||||
});
|
||||
for(uint32_t i=0;i<pairs.size();++i)
|
||||
RsThread::async([p=pairs[i], read_status]() // use async because each markRead() waits for the token to complete in order to properly acknowledge it.
|
||||
{
|
||||
if(!rsGxsChannels->markRead(p,read_status))
|
||||
RsErr() << "setAllMsgReadStatus: failed to change status of msg " << p.first << " in group " << p.second << " to status " << read_status << std::endl;
|
||||
});
|
||||
}
|
||||
|
||||
void RsGxsChannelPostsModel::setMsgReadStatus(const QModelIndex& i,bool read_status)
|
||||
|
@ -818,7 +818,7 @@ void GxsChannelPostsWidgetWithModel::showPostDetails()
|
||||
std::cerr << "Showing details about selected index : "<< index.row() << "," << index.column() << std::endl;
|
||||
#endif
|
||||
|
||||
ui->postDetails_TE->setText(RsHtml().formatText(NULL, QString::fromUtf8(post.mMsg.c_str()), RSHTML_FORMATTEXT_EMBED_SMILEYS | RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
ui->postDetails_TE->setText(RsHtml().formatText(NULL, QString::fromUtf8(post.mMsg.c_str()), /* RSHTML_FORMATTEXT_EMBED_SMILEYS |*/ RSHTML_FORMATTEXT_EMBED_LINKS));
|
||||
|
||||
QPixmap postImage;
|
||||
|
||||
|
@ -1256,17 +1256,22 @@ void RsGxsForumModel::recursSetMsgReadStatus(ForumModelIndex i,bool read_status,
|
||||
if (bChanged)
|
||||
{
|
||||
//Don't recurs post versions as this should be done before, if no change.
|
||||
uint32_t token;
|
||||
auto s = getPostVersions(mPosts[i].mMsgId) ;
|
||||
|
||||
if(!s.empty())
|
||||
for(auto it(s.begin());it!=s.end();++it)
|
||||
{
|
||||
rsGxsForums->setMessageReadStatus(token,std::make_pair( mForumGroup.mMeta.mGroupId, it->second ), read_status);
|
||||
std::cerr << "Setting version " << it->second << " of post " << mPosts[i].mMsgId << " as read." << std::endl;
|
||||
RsThread::async( [grpId=mForumGroup.mMeta.mGroupId,msgId=it->second,original_msg_id=mPosts[i].mMsgId,read_status]()
|
||||
{
|
||||
rsGxsForums->markRead(std::make_pair( grpId, msgId ), read_status);
|
||||
std::cerr << "Setting version " << msgId << " of post " << original_msg_id << " as read." << std::endl;
|
||||
});
|
||||
}
|
||||
else
|
||||
rsGxsForums->setMessageReadStatus(token,std::make_pair( mForumGroup.mMeta.mGroupId, mPosts[i].mMsgId ), read_status);
|
||||
RsThread::async( [grpId=mForumGroup.mMeta.mGroupId,original_msg_id=mPosts[i].mMsgId,read_status]()
|
||||
{
|
||||
rsGxsForums->markRead(std::make_pair( grpId, original_msg_id), read_status);
|
||||
});
|
||||
|
||||
void *ref ;
|
||||
convertTabEntryToRefPointer(i,ref); // we dont use i+1 here because i is not a row, but an index in the mPosts tab
|
||||
|
Loading…
Reference in New Issue
Block a user