added better debug info in RsGenexchange (allowing service-based debug) and fixed growing of mGrpNotify and mMsgNotify in DataAccess due to not acknowledging tokens for explicit operations in circles

This commit is contained in:
csoler 2021-01-24 21:42:45 +01:00
parent 53f0c396e7
commit 202ce3327d
4 changed files with 171 additions and 62 deletions

View File

@ -69,6 +69,37 @@ static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes
*/
#define GEN_EXCH_DEBUG 1
#if defined(GEN_EXCH_DEBUG)
static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_GXSCIRCLE;// use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {};
// static std::string nice_time_stamp(rstime_t now,rstime_t TS)
// {
// if(TS == 0)
// return "Never" ;
// else
// {
// std::ostringstream s;
// s << now - TS << " secs ago" ;
// return s.str() ;
// }
// }
static std::ostream& gxsgenexchangedebug(uint32_t service_type)
{
static nullstream null ;
if (service_to_print==0 || service_type == 0 || (service_type == service_to_print))
return std::cerr << time(NULL) << ":GXSDATASERVICE service " << std::hex << service_type << std::dec << ": " ;
else
return null ;
}
#define GXSGENEXCHANGEDEBUG gxsgenexchangedebug(serviceType())
#endif
// Data flow in RsGenExchange
//
// publishGroup()
@ -250,6 +281,8 @@ void RsGenExchange::threadTick()
void RsGenExchange::tick()
{
GXSGENEXCHANGEDEBUG << "RsGenExchange::tick(): mGrpNotify size=" << mGrpNotify.size() << ", mMsgNotify size=" << mMsgNotify.size() << std::endl;
// Meta Changes should happen first.
// This is important, as services want to change Meta, then get results.
// Services shouldn't rely on this ordering - but some do.
@ -387,20 +420,19 @@ bool RsGenExchange::messagePublicationTest(const RsGxsMsgMetaData& meta)
return meta.mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP_FOREVER || st == 0 || storageTimeLimit >= time(NULL);
}
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
RsGxsGrpMsgIdPair& msgId)
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token, RsGxsGrpMsgIdPair& msgId)
{
RS_STACK_MUTEX(mGenMtx) ;
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::acknowledgeTokenMsg(). token=" << token << std::endl;
GXSGENEXCHANGEDEBUG << "RsGenExchange::acknowledgeTokenMsg(). token=" << token << std::endl;
#endif
std::map<uint32_t, RsGxsGrpMsgIdPair >::iterator mit = mMsgNotify.find(token);
if(mit == mMsgNotify.end())
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " no notification found for this token." << std::endl;
GXSGENEXCHANGEDEBUG << " no notification found for this token." << std::endl;
#endif
return false;
}
@ -413,8 +445,7 @@ bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
mMsgNotify.erase(mit);
#ifdef GEN_EXCH_DEBUG
std::cerr << " found grpId=" << msgId.first <<", msgId=" << msgId.second << std::endl;
std::cerr << " disposing token from mDataAccess" << std::endl;
GXSGENEXCHANGEDEBUG << " found grpId=" << msgId.first <<", msgId=" << msgId.second << " disposing token from mDataAccess" << std::endl;
#endif
return true;
}
@ -426,15 +457,14 @@ bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grp
RS_STACK_MUTEX(mGenMtx) ;
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::acknowledgeTokenGrp(). token=" << token << std::endl;
GXSGENEXCHANGEDEBUG << "RsGenExchange::acknowledgeTokenGrp(). token=" << token << std::endl;
#endif
std::map<uint32_t, RsGxsGroupId >::iterator mit =
mGrpNotify.find(token);
std::map<uint32_t, RsGxsGroupId >::iterator mit = mGrpNotify.find(token);
if(mit == mGrpNotify.end())
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " no notification found for this token." << std::endl;
GXSGENEXCHANGEDEBUG << " no notification found for this token." << std::endl;
#endif
return false;
}
@ -446,8 +476,7 @@ bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grp
mGrpNotify.erase(mit);
#ifdef GEN_EXCH_DEBUG
std::cerr << " found grpId=" << grpId << std::endl;
std::cerr << " disposing token from mDataAccess" << std::endl;
GXSGENEXCHANGEDEBUG << " found grpId=" << grpId << ". Disposing token from mDataAccess" << std::endl;
#endif
return true;
}
@ -487,8 +516,7 @@ void RsGenExchange::generateGroupKeys(RsTlvSecurityKeySet& keySet, bool genPubli
uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::createGroup()";
std::cerr << std::endl;
GXSGENEXCHANGEDEBUG << "RsGenExchange::createGroup()"<< std::endl;
#endif
RsGxsGrpMetaData* meta = grp->metaData;
@ -513,8 +541,7 @@ uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet)
if(!privKeyFound)
{
std::cerr << "RsGenExchange::createGroup() Missing private ADMIN Key";
std::cerr << std::endl;
GXSGENEXCHANGEDEBUG << "RsGenExchange::createGroup() Missing private ADMIN Key" << std::endl;
return false;
}
@ -556,8 +583,7 @@ uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet)
if (!ok)
{
std::cerr << "RsGenExchange::createGroup() ERROR !okay (getSignature error)";
std::cerr << std::endl;
std::cerr << "RsGenExchange::createGroup() ERROR !okay (getSignature error)" << std::endl;
return CREATE_FAIL;
}
@ -590,8 +616,7 @@ int RsGenExchange::createGroupSignatures(RsTlvKeySignatureSet& signSet, RsTlvBin
{
needIdentitySign = true;
#ifdef GEN_EXCH_DEBUG
std::cerr << "Needs Identity sign! (Service Flags)";
std::cerr << std::endl;
GXSGENEXCHANGEDEBUG << "Needs Identity sign! (Service Flags)"<< std::endl;
#endif
}
@ -663,8 +688,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
bool publishSignSuccess = false;
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::createMsgSignatures() for Msg.mMsgName: " << msgMeta.mMsgName;
std::cerr << std::endl;
GXSGENEXCHANGEDEBUG << "RsGenExchange::createMsgSignatures() for Msg.mMsgName: " << msgMeta.mMsgName<< std::endl;
#endif
@ -701,8 +725,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
{
needPublishSign = true;
#ifdef GEN_EXCH_DEBUG
std::cerr << "Needs Publish sign! (Service Flags)";
std::cerr << std::endl;
GXSGENEXCHANGEDEBUG << "Needs Publish sign! (Service Flags)"<< std::endl;
#endif
}
@ -711,8 +734,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
{
needIdentitySign = true;
#ifdef GEN_EXCH_DEBUG
std::cerr << "Needs Identity sign! (Service Flags)";
std::cerr << std::endl;
GXSGENEXCHANGEDEBUG << "Needs Identity sign! (Service Flags)"<< std::endl;
#endif
}
@ -720,8 +742,7 @@ int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinar
{
needIdentitySign = true;
#ifdef GEN_EXCH_DEBUG
std::cerr << "Needs Identity sign! (AuthorId Exists)";
std::cerr << std::endl;
GXSGENEXCHANGEDEBUG << "Needs Identity sign! (AuthorId Exists)"<< std::endl;
#endif
}
@ -2041,7 +2062,7 @@ void RsGenExchange::setGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId&
void RsGenExchange::setGroupStatusFlags(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& status, const uint32_t& mask)
{
/* TODO APPLY MASK TO FLAGS */
RS_STACK_MUTEX(mGenMtx) ;
RS_STACK_MUTEX(mGenMtx) ;
token = mDataAccess->generatePublicToken();
GrpLocMetaData g;
@ -2161,8 +2182,7 @@ void RsGenExchange::processMsgMetaChanges()
{
RS_STACK_MUTEX(mGenMtx);
//mMsgNotify.insert(std::make_pair(token, m.msgId));// (csoler) Is that needed??
mDataAccess->disposeOfPublicToken(token);
mMsgNotify.insert(std::make_pair(token, m.msgId));
}
}
@ -2198,6 +2218,9 @@ void RsGenExchange::processGrpMetaChanges()
GrpLocMetaData& g = mit->second;
uint32_t token = mit->first;
#ifdef GEN_EXCH_DEBUG
RsDbg() << " Processing GrpMetaChange for token " << token << std::endl;
#endif
// process mask
bool ok = processGrpMask(g.grpId, g.val);
@ -2207,15 +2230,18 @@ void RsGenExchange::processGrpMetaChanges()
{
mDataAccess->updatePublicRequestStatus(token, RsTokenService::COMPLETE);
grpChanged.push_back(g.grpId);
}else
}
else
{
mDataAccess->updatePublicRequestStatus(token, RsTokenService::FAILED);
}
{
RS_STACK_MUTEX(mGenMtx);
mDataAccess->disposeOfPublicToken(token);
//mGrpNotify.insert(std::make_pair(token, g.grpId)); // (csoler) I'm not sure that is even useful
mGrpNotify.insert(std::make_pair(token, g.grpId));
#ifdef GEN_EXCH_DEBUG
RsDbg() << " Processing GrpMetaChange Adding token " << token << " to mGrpNotify" << std::endl;
#endif
}
}

View File

@ -1690,7 +1690,7 @@ bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& stat
GxsRequest* req = locked_retrieveCompletedRequest(token);
#ifdef DATA_DEBUG
GXSDATADEBUG << "CheckRequestStatus: token=" << token ;
GXSDATADEBUG << "CheckRequestStatus: token=" << token << std::endl ;
#endif
if(req != nullptr)
@ -1700,7 +1700,7 @@ bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& stat
status = COMPLETE;
ts = req->reqTime;
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__ << " Returning status = COMPLETE" << std::endl;
GXSDATADEBUG << " Returning status = COMPLETE" << std::endl;
#endif
return true;
}
@ -1711,7 +1711,7 @@ bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& stat
{
status = it->second;
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__ << " Returning status = " << status << std::endl;
GXSDATADEBUG << " Returning status = " << status << std::endl;
#endif
return true;
}

View File

@ -601,7 +601,7 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
#endif
RsGxsCircleId circle_id(msgChange->mGroupId);
if(rsEvents && (c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW))
if(rsEvents && ((c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW) || (c->getType() == RsGxsNotify::TYPE_PUBLISHED)))
{
const RsGxsCircleSubscriptionRequestItem *item = dynamic_cast<const RsGxsCircleSubscriptionRequestItem *>(msgChange->mNewMsgItem);
@ -807,7 +807,7 @@ bool p3GxsCircles::getCircleDetails(const RsGxsCircleId& id, RsGxsCircleDetails&
details.mRestrictedCircleId = data.mRestrictedCircleId;
details.mAllowedNodes = data.mAllowedNodes;
details.mSubscriptionFlags.clear();
details.mSubscriptionFlags.clear();
details.mAllowedGxsIds.clear();
details.mAmIAllowed = false ;
details.mAmIAdmin = bool(data.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN);
@ -1501,6 +1501,72 @@ bool p3GxsCircles::checkCircleCache()
return true ;
}
bool p3GxsCircles::locked_setGroupUnprocessedStatus(RsGxsCircleCache& cache,bool unprocessed)
{
uint32_t token2;
if(unprocessed)
cache.mGroupStatus |= GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
else
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), unprocessed, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
std::cerr << "********** new token for setGrpStatusFlags: " << token2 << std::endl;
// Now we need to async acknowledge the token when the job is finished. We cannot do this sync because it's the
// current thread that takes care of calling the handling of group processing.
RsThread::async([token2,this]()
{
std::chrono::milliseconds maxWait = std::chrono::milliseconds(10000);
std::chrono::milliseconds checkEvery = std::chrono::milliseconds(100);
auto timeout = std::chrono::steady_clock::now() + maxWait; // wait for 10 secs at most
auto st = requestStatus(token2);
while( !(st == RsTokenService::FAILED || st >= RsTokenService::COMPLETE) && std::chrono::steady_clock::now() < timeout )
{
std::this_thread::sleep_for(checkEvery);
st = requestStatus(token2);
}
RsGxsGroupId grpId;
acknowledgeGrp(token2,grpId);
});
return true;
}
bool p3GxsCircles::locked_subscribeToCircle(const RsGxsCircleId &grpId, bool subscribe)
{
uint32_t token;
if(!RsGenExchange::subscribeToGroup(token, RsGxsGroupId(grpId), subscribe))
return false;
// Now we need to async acknowledge the token when the job is finished. We cannot do this sync because it's the
// current thread that takes care of calling the handling of group processing.
RsThread::async([token,this]()
{
std::chrono::milliseconds maxWait = std::chrono::milliseconds(10000);
std::chrono::milliseconds checkEvery = std::chrono::milliseconds(100);
auto timeout = std::chrono::steady_clock::now() + maxWait; // wait for 10 secs at most
auto st = requestStatus(token);
while( !(st == RsTokenService::FAILED || st >= RsTokenService::COMPLETE) && std::chrono::steady_clock::now() < timeout )
{
std::this_thread::sleep_for(checkEvery);
st = requestStatus(token);
}
RsGxsGroupId grpId;
acknowledgeGrp(token,grpId);
});
return true;
}
bool p3GxsCircles::locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache& cache)
{
rstime_t now = time(NULL) ;
@ -1513,20 +1579,19 @@ bool p3GxsCircles::locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache&
#ifdef DEBUG_CIRCLES
std::cerr << "Cache entry for circle " << cache.mCircleId << " needs a swab over membership requests. Re-scheduling it." << std::endl;
#endif
cache.mGroupStatus |= GXS_SERV::GXS_GRP_STATUS_UNPROCESSED; // forces processing of cache entry
uint32_t token;
RsGenExchange::setGroupStatusFlags(token, RsGxsGroupId(cache.mCircleId.toStdString()), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
locked_setGroupUnprocessedStatus(cache,true); // forces the re-check of the group
// this should be called regularly
// this should be called regularly
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
std::list<RsGxsGroupId> grpIds ;
uint32_t token2;
grpIds.push_back(RsGxsGroupId(cache.mCircleId)) ;
RsGenExchange::getTokenService()->requestMsgInfo(token, RS_TOKREQ_ANSTYPE_SUMMARY, opts, grpIds);
GxsTokenQueue::queueRequest(token, CIRCLEREQ_MESSAGE_DATA);
RsGenExchange::getTokenService()->requestMsgInfo(token2, RS_TOKREQ_ANSTYPE_SUMMARY, opts, grpIds);
GxsTokenQueue::queueRequest(token2, CIRCLEREQ_MESSAGE_DATA);
}
return true ;
}
@ -1599,8 +1664,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cac
/* we are part of this group - subscribe, clear unprocessed flag */
std::cerr << " either admin or have posted a subscribe/unsubscribe message => AutoSubscribing!" << std::endl;
#endif
uint32_t token;
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), true);
locked_subscribeToCircle(cache.mCircleId,true);
mShouldSendCacheUpdateNotification = true;
}
#ifdef DEBUG_CIRCLES
@ -1617,8 +1681,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cac
/* we know all the peers - we are not part - we can flag as PROCESSED. */
if(cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
{
uint32_t token;
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), false);
locked_subscribeToCircle(cache.mCircleId,false);
mShouldSendCacheUpdateNotification = true;
#ifdef DEBUG_CIRCLES
std::cerr << " Neither admin nor subscription msg author! Let's unsubscribe this circle of unfriendly Napoleons!" << std::endl;
@ -1634,9 +1697,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cac
#ifdef DEBUG_CIRCLES
std::cerr << " Marking the cache entry as processed." << std::endl;
#endif
uint32_t token2;
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
locked_setGroupUnprocessedStatus(cache,false);
return true;
}
@ -1830,9 +1891,7 @@ void p3GxsCircles::handle_event(uint32_t event_type, const std::string &elabel)
// | | Grp Subscribed: NO | Grp Subscribed: NO |
// +-------------+------------------------------+-----------------------------+
bool p3GxsCircles::pushCircleMembershipRequest(
const RsGxsId& own_gxsid, const RsGxsCircleId& circle_id,
RsGxsCircleSubscriptionType request_type )
bool p3GxsCircles::pushCircleMembershipRequest( const RsGxsId& own_gxsid, const RsGxsCircleId& circle_id, RsGxsCircleSubscriptionType request_type )
{
Dbg3() << __PRETTY_FUNCTION__ << "own_gxsid = " << own_gxsid
<< ", circle=" << circle_id << ", req type=" << request_type
@ -1859,10 +1918,7 @@ bool p3GxsCircles::pushCircleMembershipRequest(
// If the circle is not subscribed, then subscribe, whatever the subscription type. Indeed, if we publish a msg, even a msg for
// unsubscribing, we need to have a subscribed group first.
uint32_t token ;
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(circle_id), true);
if(waitToken(token) != RsTokenService::COMPLETE)
if(!locked_subscribeToCircle(circle_id,true))
{
std::cerr << __PRETTY_FUNCTION__ << " Could not subscribe to Circle group." << std::endl;
return false;
@ -1897,7 +1953,29 @@ bool p3GxsCircles::pushCircleMembershipRequest(
std::cerr << " ThreadId : " << s->meta.mThreadId << std::endl;
#endif
uint32_t token;
RsGenExchange::publishMsg(token, s);
// This is manual handling of token. We need to clear it up from the notification when done, and that needs
// to be async-ed, since the processing of message publication is done in the same thread.
RsThread::async( [this,token]()
{
std::chrono::milliseconds maxWait = std::chrono::milliseconds(10000);
std::chrono::milliseconds checkEvery = std::chrono::milliseconds(100);
auto timeout = std::chrono::steady_clock::now() + maxWait; // wait for 10 secs at most
auto st = requestStatus(token);
while( !(st == RsTokenService::FAILED || st >= RsTokenService::COMPLETE) && std::chrono::steady_clock::now() < timeout )
{
std::this_thread::sleep_for(checkEvery);
st = requestStatus(token);
}
std::pair<RsGxsGroupId,RsGxsMessageId> grpmsgId;
acknowledgeMsg(token,grpmsgId);
});
// update the cache.
force_cache_reload(circle_id);
@ -2046,9 +2124,12 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
locked_checkCircleCacheForAutoSubscribe(cache);
}
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
uint32_t token2;
RsGenExchange::deleteMsgs(token2,messages_to_delete);
if(!messages_to_delete.empty())
{
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
uint32_t token2;
RsGenExchange::deleteMsgs(token2,messages_to_delete);
}
return true ;
}

View File

@ -346,6 +346,8 @@ protected:
bool locked_processMembershipMessages(RsGxsCircleCache& cache,const std::vector<RsGxsMsgItem*>& items, GxsMsgReq& messages_to_delete,const std::set<RsGxsId>& own_ids);
bool locked_processLoadingCacheEntry(RsGxsCircleCache &cache);
bool locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache &cache);
bool locked_setGroupUnprocessedStatus(RsGxsCircleCache& cache,bool unprocessed);
bool locked_subscribeToCircle(const RsGxsCircleId &grpId, bool subscribe);
p3IdService *mIdentities; // Needed for constructing Circle Info,
PgpAuxUtils *mPgpUtils;