fixed possible cross deadlocks between RsGxsGenExchange and RsGxsNetService

This commit is contained in:
csoler 2016-06-28 20:59:56 -04:00
parent 00bdc509c5
commit 9f7ef8b46b
2 changed files with 369 additions and 345 deletions

View File

@ -2285,231 +2285,240 @@ bool RsGenExchange::checkKeys(const RsTlvSecurityKeySet& keySet)
void RsGenExchange::publishGrps() void RsGenExchange::publishGrps()
{ {
RS_STACK_MUTEX(mGenMtx) ; std::list<RsGxsGroupId> groups_to_subscribe ;
NxsGrpSignPendVect::iterator vit = mGrpsToPublish.begin();
typedef std::pair<bool, RsGxsGroupId> GrpNote;
std::map<uint32_t, GrpNote> toNotify;
while( vit != mGrpsToPublish.end() )
{ {
GxsGrpPendingSign& ggps = *vit; RS_STACK_MUTEX(mGenMtx) ;
NxsGrpSignPendVect::iterator vit = mGrpsToPublish.begin();
/* do intial checks to see if this entry has expired */ typedef std::pair<bool, RsGxsGroupId> GrpNote;
time_t now = time(NULL) ; std::map<uint32_t, GrpNote> toNotify;
uint32_t token = ggps.mToken;
while( vit != mGrpsToPublish.end() )
{
GxsGrpPendingSign& ggps = *vit;
/* do intial checks to see if this entry has expired */
time_t now = time(NULL) ;
uint32_t token = ggps.mToken;
if(now > (ggps.mStartTS + PENDING_SIGN_TIMEOUT) ) if(now > (ggps.mStartTS + PENDING_SIGN_TIMEOUT) )
{ {
// timed out // timed out
toNotify.insert(std::make_pair( toNotify.insert(std::make_pair(
token, GrpNote(false, RsGxsGroupId()))); token, GrpNote(false, RsGxsGroupId())));
delete ggps.mItem; delete ggps.mItem;
vit = mGrpsToPublish.erase(vit); vit = mGrpsToPublish.erase(vit);
continue; continue;
} }
RsGxsGroupId grpId; RsGxsGroupId grpId;
RsNxsGrp* grp = new RsNxsGrp(mServType); RsNxsGrp* grp = new RsNxsGrp(mServType);
RsGxsGrpItem* grpItem = ggps.mItem; RsGxsGrpItem* grpItem = ggps.mItem;
RsTlvSecurityKeySet fullKeySet; RsTlvSecurityKeySet fullKeySet;
if(!(ggps.mHaveKeys)) if(!(ggps.mHaveKeys))
{ {
generateGroupKeys(fullKeySet, true); generateGroupKeys(fullKeySet, true);
ggps.mHaveKeys = true; ggps.mHaveKeys = true;
ggps.mKeys = fullKeySet; ggps.mKeys = fullKeySet;
} }
else else
fullKeySet = ggps.mKeys; fullKeySet = ggps.mKeys;
// find private admin key // find private admin key
RsTlvPrivateRSAKey privAdminKey; RsTlvPrivateRSAKey privAdminKey;
bool privKeyFound = false; bool privKeyFound = false;
for(std::map<RsGxsId, RsTlvPrivateRSAKey>::iterator mit_keys = fullKeySet.private_keys.begin(); mit_keys != fullKeySet.private_keys.end(); ++mit_keys) for(std::map<RsGxsId, RsTlvPrivateRSAKey>::iterator mit_keys = fullKeySet.private_keys.begin(); mit_keys != fullKeySet.private_keys.end(); ++mit_keys)
{ {
RsTlvPrivateRSAKey& key = mit_keys->second; RsTlvPrivateRSAKey& key = mit_keys->second;
if(key.keyFlags == (RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_FULL)) if(key.keyFlags == (RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_FULL))
{ {
privAdminKey = key; privAdminKey = key;
privKeyFound = true; privKeyFound = true;
} }
} }
uint8_t create = CREATE_FAIL; uint8_t create = CREATE_FAIL;
if(privKeyFound) if(privKeyFound)
{ {
// get group id from private admin key id // get group id from private admin key id
grpItem->meta.mGroupId = grp->grpId = RsGxsGroupId(privAdminKey.keyId); grpItem->meta.mGroupId = grp->grpId = RsGxsGroupId(privAdminKey.keyId);
ServiceCreate_Return ret = service_CreateGroup(grpItem, fullKeySet); ServiceCreate_Return ret = service_CreateGroup(grpItem, fullKeySet);
bool serialOk = false, servCreateOk; bool serialOk = false, servCreateOk;
if(ret == SERVICE_CREATE_SUCCESS) if(ret == SERVICE_CREATE_SUCCESS)
{ {
uint32_t size = mSerialiser->size(grpItem); uint32_t size = mSerialiser->size(grpItem);
char *gData = new char[size]; char *gData = new char[size];
serialOk = mSerialiser->serialise(grpItem, gData, &size); serialOk = mSerialiser->serialise(grpItem, gData, &size);
grp->grp.setBinData(gData, size); grp->grp.setBinData(gData, size);
delete[] gData; delete[] gData;
servCreateOk = true; servCreateOk = true;
}else }else
{ {
servCreateOk = false; servCreateOk = false;
} }
if(serialOk && servCreateOk) if(serialOk && servCreateOk)
{ {
grp->metaData = new RsGxsGrpMetaData(); grp->metaData = new RsGxsGrpMetaData();
grpItem->meta.mPublishTs = time(NULL); grpItem->meta.mPublishTs = time(NULL);
*(grp->metaData) = grpItem->meta; *(grp->metaData) = grpItem->meta;
// TODO: change when publish key optimisation added (public groups don't have publish key // TODO: change when publish key optimisation added (public groups don't have publish key
grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_ADMIN | GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_ADMIN | GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED
| GXS_SERV::GROUP_SUBSCRIBE_PUBLISH; | GXS_SERV::GROUP_SUBSCRIBE_PUBLISH;
create = createGroup(grp, fullKeySet); create = createGroup(grp, fullKeySet);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::publishGrps() "; std::cerr << "RsGenExchange::publishGrps() ";
std::cerr << " GrpId: " << grp->grpId; std::cerr << " GrpId: " << grp->grpId;
std::cerr << " CircleType: " << (uint32_t) grp->metaData->mCircleType; std::cerr << " CircleType: " << (uint32_t) grp->metaData->mCircleType;
std::cerr << " CircleId: " << grp->metaData->mCircleId.toStdString(); std::cerr << " CircleId: " << grp->metaData->mCircleId.toStdString();
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
if(create == CREATE_SUCCESS) if(create == CREATE_SUCCESS)
{ {
// Here we need to make sure that no private keys are included. This is very important since private keys // Here we need to make sure that no private keys are included. This is very important since private keys
// can be used to modify the group. Normally the private key set is whiped out by createGroup, but // can be used to modify the group. Normally the private key set is whiped out by createGroup, but
grp->metaData->keys.private_keys.clear() ; grp->metaData->keys.private_keys.clear() ;
uint32_t mdSize = grp->metaData->serial_size(RS_GXS_GRP_META_DATA_CURRENT_API_VERSION); uint32_t mdSize = grp->metaData->serial_size(RS_GXS_GRP_META_DATA_CURRENT_API_VERSION);
{ {
RsTemporaryMemory metaData(mdSize); RsTemporaryMemory metaData(mdSize);
serialOk = grp->metaData->serialise(metaData, mdSize,RS_GXS_GRP_META_DATA_CURRENT_API_VERSION); serialOk = grp->metaData->serialise(metaData, mdSize,RS_GXS_GRP_META_DATA_CURRENT_API_VERSION);
#warning TODO: grp->meta should be renamed grp->public_meta ! #warning TODO: grp->meta should be renamed grp->public_meta !
grp->meta.setBinData(metaData, mdSize); grp->meta.setBinData(metaData, mdSize);
} }
// Place back private keys for publisher and database storage // Place back private keys for publisher and database storage
grp->metaData->keys.private_keys = fullKeySet.private_keys; grp->metaData->keys.private_keys = fullKeySet.private_keys;
if(mDataStore->validSize(grp) && serialOk) if(mDataStore->validSize(grp) && serialOk)
{ {
grpId = grp->grpId; grpId = grp->grpId;
computeHash(grp->grp, grp->metaData->mHash); computeHash(grp->grp, grp->metaData->mHash);
grp->metaData->mRecvTS = time(NULL); grp->metaData->mRecvTS = time(NULL);
if(ggps.mIsUpdate) if(ggps.mIsUpdate)
mDataAccess->updateGroupData(grp); mDataAccess->updateGroupData(grp);
else else
mDataAccess->addGroupData(grp); mDataAccess->addGroupData(grp);
#warning this is bad: addGroupData/updateGroupData actially deletes grp. But it may be used below? grp should be a class object and not deleted manually! #warning this is bad: addGroupData/updateGroupData actially deletes grp. But it may be used below? grp should be a class object and not deleted manually!
if(mNetService!=NULL)
mNetService->subscribeStatusChanged(grpId,true) ;
}
else
{
create = CREATE_FAIL;
}
}
}
else if(ret == SERVICE_CREATE_FAIL_TRY_LATER)
{
// if the service is not ready yet, reset the start timestamp to give the service more time
// the service should have it's own timeout mechanism
// services should return SERVICE_CREATE_FAIL if the action timed out
// at the moment this is only important for the idservice:
// the idservice may ask the user for a password, and the user needs time
ggps.mStartTS = now;
create = CREATE_FAIL_TRY_LATER;
}
else if(ret == SERVICE_CREATE_FAIL)
create = CREATE_FAIL;
}
else
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::publishGrps() Could not find private publish keys " << std::endl;
#endif
create = CREATE_FAIL;
}
if(create == CREATE_FAIL) groups_to_subscribe.push_back(grpId) ;
{ }
else
{
create = CREATE_FAIL;
}
}
}
else if(ret == SERVICE_CREATE_FAIL_TRY_LATER)
{
// if the service is not ready yet, reset the start timestamp to give the service more time
// the service should have it's own timeout mechanism
// services should return SERVICE_CREATE_FAIL if the action timed out
// at the moment this is only important for the idservice:
// the idservice may ask the user for a password, and the user needs time
ggps.mStartTS = now;
create = CREATE_FAIL_TRY_LATER;
}
else if(ret == SERVICE_CREATE_FAIL)
create = CREATE_FAIL;
}
else
{
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::publishGrps() failed to publish grp " << std::endl; std::cerr << "RsGenExchange::publishGrps() Could not find private publish keys " << std::endl;
#endif #endif
delete grp; create = CREATE_FAIL;
delete grpItem; }
vit = mGrpsToPublish.erase(vit);
toNotify.insert(std::make_pair(
token, GrpNote(false, grpId)));
} if(create == CREATE_FAIL)
else if(create == CREATE_FAIL_TRY_LATER) {
{
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::publishGrps() failed grp, trying again " << std::endl; std::cerr << "RsGenExchange::publishGrps() failed to publish grp " << std::endl;
#endif #endif
delete grp; delete grp;
ggps.mLastAttemptTS = time(NULL); delete grpItem;
++vit; vit = mGrpsToPublish.erase(vit);
} toNotify.insert(std::make_pair(
else if(create == CREATE_SUCCESS) token, GrpNote(false, grpId)));
{
delete grpItem; }
vit = mGrpsToPublish.erase(vit); else if(create == CREATE_FAIL_TRY_LATER)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::publishGrps() failed grp, trying again " << std::endl;
#endif
delete grp;
ggps.mLastAttemptTS = time(NULL);
++vit;
}
else if(create == CREATE_SUCCESS)
{
delete grpItem;
vit = mGrpsToPublish.erase(vit);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::publishGrps() ok -> pushing to notifies" std::cerr << "RsGenExchange::publishGrps() ok -> pushing to notifies"
<< std::endl; << std::endl;
#endif #endif
// add to published to allow acknowledgement // add to published to allow acknowledgement
toNotify.insert(std::make_pair(token, toNotify.insert(std::make_pair(token,
GrpNote(true,grpId))); GrpNote(true,grpId)));
} }
}
std::map<uint32_t, GrpNote>::iterator mit = toNotify.begin();
std::list<RsGxsGroupId> grpChanged;
for(; mit != toNotify.end(); ++mit)
{
GrpNote& note = mit->second;
uint8_t status = note.first ? RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE
: RsTokenService::GXS_REQUEST_V2_STATUS_FAILED;
mGrpNotify.insert(std::make_pair(mit->first, note.second));
mDataAccess->updatePublicRequestStatus(mit->first, status);
if(note.first)
grpChanged.push_back(note.second);
}
if(!grpChanged.empty())
{
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PUBLISH, false);
gc->mGrpIdList = grpChanged;
mNotifications.push_back(gc);
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpChanged.begin());it!=grpChanged.end();++it)
std::cerr << " " << *it << std::endl;
#endif
}
} }
std::map<uint32_t, GrpNote>::iterator mit = toNotify.begin(); // This is done off-mutex to avoid possible cross deadlocks with the net service.
std::list<RsGxsGroupId> grpChanged;
for(; mit != toNotify.end(); ++mit)
{
GrpNote& note = mit->second;
uint8_t status = note.first ? RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE
: RsTokenService::GXS_REQUEST_V2_STATUS_FAILED;
mGrpNotify.insert(std::make_pair(mit->first, note.second));
mDataAccess->updatePublicRequestStatus(mit->first, status);
if(note.first)
grpChanged.push_back(note.second);
}
if(!grpChanged.empty())
{
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PUBLISH, false);
gc->mGrpIdList = grpChanged;
mNotifications.push_back(gc);
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpChanged.begin());it!=grpChanged.end();++it)
std::cerr << " " << *it << std::endl;
#endif
}
if(mNetService!=NULL)
for(std::list<RsGxsGroupId>::const_iterator it(groups_to_subscribe.begin());it!=groups_to_subscribe.end();++it)
mNetService->subscribeStatusChanged((*it),true) ;
} }
@ -2590,218 +2599,228 @@ void RsGenExchange::computeHash(const RsTlvBinaryData& data, RsFileHash& hash)
void RsGenExchange::processRecvdMessages() void RsGenExchange::processRecvdMessages()
{ {
RS_STACK_MUTEX(mGenMtx) ; std::list<RsGxsMessageId> messages_to_reject ;
{
RS_STACK_MUTEX(mGenMtx) ;
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
if(!mMsgPendingValidate.empty()) if(!mMsgPendingValidate.empty())
std::cerr << "processing received messages" << std::endl; std::cerr << "processing received messages" << std::endl;
#endif #endif
NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin(); NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
if(!mMsgPendingValidate.empty()) if(!mMsgPendingValidate.empty())
std::cerr << " pending validation" << std::endl; std::cerr << " pending validation" << std::endl;
#endif #endif
for(; pend_it != mMsgPendingValidate.end();) for(; pend_it != mMsgPendingValidate.end();)
{ {
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = *pend_it; GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = *pend_it;
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", attempts=" << gpsi.mAttempts ; std::cerr << " grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", attempts=" << gpsi.mAttempts ;
#endif #endif
if(gpsi.mAttempts == VALIDATE_MAX_ATTEMPTS) if(gpsi.mAttempts == VALIDATE_MAX_ATTEMPTS)
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " = max! deleting." << std::endl; std::cerr << " = max! deleting." << std::endl;
#endif #endif
delete gpsi.mItem; delete gpsi.mItem;
pend_it = mMsgPendingValidate.erase(pend_it); pend_it = mMsgPendingValidate.erase(pend_it);
} }
else else
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " movign to recvd." << std::endl; std::cerr << " movign to recvd." << std::endl;
#endif #endif
mReceivedMsgs.push_back(gpsi.mItem); mReceivedMsgs.push_back(gpsi.mItem);
++pend_it; ++pend_it;
} }
} }
if(mReceivedMsgs.empty()) if(mReceivedMsgs.empty())
return; return;
std::vector<RsNxsMsg*>::iterator vit = mReceivedMsgs.begin(); std::vector<RsNxsMsg*>::iterator vit = mReceivedMsgs.begin();
GxsMsgReq msgIds; GxsMsgReq msgIds;
std::map<RsNxsMsg*, RsGxsMsgMetaData*> msgs; std::map<RsNxsMsg*, RsGxsMsgMetaData*> msgs;
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetas; std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetas;
// coalesce group meta retrieval for performance // coalesce group meta retrieval for performance
for(; vit != mReceivedMsgs.end(); ++vit) for(; vit != mReceivedMsgs.end(); ++vit)
{ {
RsNxsMsg* msg = *vit; RsNxsMsg* msg = *vit;
grpMetas.insert(std::make_pair(msg->grpId, (RsGxsGrpMetaData*)NULL)); grpMetas.insert(std::make_pair(msg->grpId, (RsGxsGrpMetaData*)NULL));
} }
mDataStore->retrieveGxsGrpMetaData(grpMetas); mDataStore->retrieveGxsGrpMetaData(grpMetas);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " updating received messages:" << std::endl; std::cerr << " updating received messages:" << std::endl;
#endif #endif
for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit) for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit)
{ {
RsNxsMsg* msg = *vit; RsNxsMsg* msg = *vit;
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
bool ok = false; bool ok = false;
if(msg->meta.bin_len != 0) if(msg->meta.bin_len != 0)
ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)); ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len));
msg->metaData = meta; msg->metaData = meta;
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
#endif #endif
uint8_t validateReturn = VALIDATE_FAIL; uint8_t validateReturn = VALIDATE_FAIL;
if(ok) if(ok)
{ {
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId); std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl; std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl;
#endif #endif
RsGxsGrpMetaData* grpMeta = NULL ; RsGxsGrpMetaData* grpMeta = NULL ;
// validate msg // validate msg
if(mit != grpMetas.end()) if(mit != grpMetas.end())
{ {
grpMeta = mit->second; grpMeta = mit->second;
validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys); validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl; std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl;
std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl; std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl;
std::cerr << " message validation result: " << (int)validateReturn << std::endl; std::cerr << " message validation result: " << (int)validateReturn << std::endl;
#endif #endif
} }
if(validateReturn == VALIDATE_SUCCESS) if(validateReturn == VALIDATE_SUCCESS)
{ {
meta->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD; meta->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD;
msgs.insert(std::make_pair(msg, meta)); msgs.insert(std::make_pair(msg, meta));
std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId]; std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId];
if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end()) if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end())
{ {
msgv.push_back(msg->msgId); msgv.push_back(msg->msgId);
} }
NxsMsgPendingVect::iterator validated_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), NxsMsgPendingVect::iterator validated_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
getMsgIdPair(*msg)); getMsgIdPair(*msg));
if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry); if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry);
computeHash(msg->msg, meta->mHash); computeHash(msg->msg, meta->mHash);
meta->recvTS = time(NULL); meta->recvTS = time(NULL);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " new status flags: " << meta->mMsgStatus << std::endl; std::cerr << " new status flags: " << meta->mMsgStatus << std::endl;
std::cerr << " computed hash: " << meta->mHash << std::endl; std::cerr << " computed hash: " << meta->mHash << std::endl;
std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl; std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl;
#endif #endif
if(!msg->metaData->mAuthorId.isNull()) if(!msg->metaData->mAuthorId.isNull())
mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ; mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ;
if(grpMeta->mSignFlags & GXS_SERV::FLAG_AUTHOR_AUTHENTICATION_TRACK_MESSAGES) if(grpMeta->mSignFlags & GXS_SERV::FLAG_AUTHOR_AUTHENTICATION_TRACK_MESSAGES)
mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ; mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ;
} }
if(validateReturn == VALIDATE_FAIL) if(validateReturn == VALIDATE_FAIL)
{ {
// In this case, we notify the network exchange service not to DL the message again, at least not yet. // In this case, we notify the network exchange service not to DL the message again, at least not yet.
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "Notifying the network service to not download this message again." << std::endl; std::cerr << "Notifying the network service to not download this message again." << std::endl;
#endif #endif
mNetService->rejectMessage(msg->msgId) ; messages_to_reject.push_back(msg->msgId) ;
} }
} }
else else
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " deserialisation failed!" <<std::endl; std::cerr << " deserialisation failed!" <<std::endl;
#endif #endif
validateReturn = VALIDATE_FAIL; validateReturn = VALIDATE_FAIL;
} }
if(validateReturn == VALIDATE_FAIL) if(validateReturn == VALIDATE_FAIL)
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "Validation failed for message id " std::cerr << "Validation failed for message id "
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; << "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif #endif
NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
getMsgIdPair(*msg)); getMsgIdPair(*msg));
if(failed_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(failed_entry); if(failed_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(failed_entry);
delete msg; delete msg;
} }
else if(validateReturn == VALIDATE_FAIL_TRY_LATER) else if(validateReturn == VALIDATE_FAIL_TRY_LATER)
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "failed to validate msg, trying again: " std::cerr << "failed to validate msg, trying again: "
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; << "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif #endif
RsGxsGrpMsgIdPair id; RsGxsGrpMsgIdPair id;
id.first = msg->grpId; id.first = msg->grpId;
id.second = msg->msgId; id.second = msg->msgId;
// first check you haven't made too many attempts // first check you haven't made too many attempts
NxsMsgPendingVect::iterator vit = std::find( NxsMsgPendingVect::iterator vit = std::find(
mMsgPendingValidate.begin(), mMsgPendingValidate.end(), id); mMsgPendingValidate.begin(), mMsgPendingValidate.end(), id);
if(vit == mMsgPendingValidate.end()) if(vit == mMsgPendingValidate.end())
{ {
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> item(msg, id); GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> item(msg, id);
mMsgPendingValidate.push_back(item); mMsgPendingValidate.push_back(item);
}else }else
{ {
vit->mAttempts++; vit->mAttempts++;
} }
} }
} }
// clean up resources from group meta retrieval // clean up resources from group meta retrieval
freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>, freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>,
RsGxsGrpMetaData*>(grpMetas); RsGxsGrpMetaData*>(grpMetas);
if(!msgIds.empty()) if(!msgIds.empty())
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " removing existing and old messages from incoming list." << std::endl; std::cerr << " removing existing and old messages from incoming list." << std::endl;
#endif #endif
removeDeleteExistingMessages(msgs, msgIds); removeDeleteExistingMessages(msgs, msgIds);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " storing remaining messages" << std::endl; std::cerr << " storing remaining messages" << std::endl;
#endif #endif
mDataStore->storeMessage(msgs); mDataStore->storeMessage(msgs);
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false); RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false);
c->msgChangeMap = msgIds; c->msgChangeMap = msgIds;
mNotifications.push_back(c); mNotifications.push_back(c);
} }
mReceivedMsgs.clear(); mReceivedMsgs.clear();
}
// Done off-mutex to avoid cross deadlocks in the netservice that might call the RsGenExchange as an observer..
if(mNetService != NULL)
for(std::list<RsGxsMessageId>::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it)
mNetService->rejectMessage(*it) ;
} }
void RsGenExchange::processRecvdGroups() void RsGenExchange::processRecvdGroups()

View File

@ -928,17 +928,22 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
#ifdef NXS_NET_DEBUG_6 #ifdef NXS_NET_DEBUG_6
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl; GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl;
#endif #endif
RS_STACK_MUTEX(mNxsMutex) ; bool should_notify = false ;
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ; {
RS_STACK_MUTEX(mNxsMutex) ;
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ;
uint32_t old_count = rec.max_visible_count ; uint32_t old_count = rec.max_visible_count ;
uint32_t old_suppliers_count = rec.suppliers.size() ; uint32_t old_suppliers_count = rec.suppliers.size() ;
rec.suppliers.insert(grs->PeerId()) ; rec.suppliers.insert(grs->PeerId()) ;
rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ; rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ;
rec.update_TS = time(NULL) ; rec.update_TS = time(NULL) ;
if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.size()) if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.size())
should_notify = true ;
}
if(should_notify)
mObserver->notifyChangedGroupStats(grs->grpId); mObserver->notifyChangedGroupStats(grs->grpId);
} }
else else