added new flag for msg tracking

This commit is contained in:
csoler 2015-10-25 18:27:15 -04:00
parent 3de29c589c
commit 207e84d719
2 changed files with 149 additions and 146 deletions

View file

@ -2517,206 +2517,208 @@ void RsGenExchange::computeHash(const RsTlvBinaryData& data, RsFileHash& hash)
void RsGenExchange::processRecvdMessages() void RsGenExchange::processRecvdMessages()
{ {
RS_STACK_MUTEX(mGenMtx) ; RS_STACK_MUTEX(mGenMtx) ;
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
if(!mMsgPendingValidate.empty()) if(!mMsgPendingValidate.empty())
std::cerr << "processing received messages" << std::endl; std::cerr << "processing received messages" << std::endl;
#endif #endif
NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin(); NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
if(!mMsgPendingValidate.empty()) if(!mMsgPendingValidate.empty())
std::cerr << " pending validation" << std::endl; std::cerr << " pending validation" << std::endl;
#endif #endif
for(; pend_it != mMsgPendingValidate.end();) for(; pend_it != mMsgPendingValidate.end();)
{ {
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = *pend_it; GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = *pend_it;
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", attempts=" << gpsi.mAttempts ; std::cerr << " grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", attempts=" << gpsi.mAttempts ;
#endif #endif
if(gpsi.mAttempts == VALIDATE_MAX_ATTEMPTS) if(gpsi.mAttempts == VALIDATE_MAX_ATTEMPTS)
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " = max! deleting." << std::endl; std::cerr << " = max! deleting." << std::endl;
#endif #endif
delete gpsi.mItem; delete gpsi.mItem;
pend_it = mMsgPendingValidate.erase(pend_it); pend_it = mMsgPendingValidate.erase(pend_it);
} }
else else
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " movign to recvd." << std::endl; std::cerr << " movign to recvd." << std::endl;
#endif #endif
mReceivedMsgs.push_back(gpsi.mItem); mReceivedMsgs.push_back(gpsi.mItem);
++pend_it; ++pend_it;
} }
} }
if(mReceivedMsgs.empty()) if(mReceivedMsgs.empty())
return; return;
std::vector<RsNxsMsg*>::iterator vit = mReceivedMsgs.begin(); std::vector<RsNxsMsg*>::iterator vit = mReceivedMsgs.begin();
GxsMsgReq msgIds; GxsMsgReq msgIds;
std::map<RsNxsMsg*, RsGxsMsgMetaData*> msgs; std::map<RsNxsMsg*, RsGxsMsgMetaData*> msgs;
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetas; std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetas;
// coalesce group meta retrieval for performance // coalesce group meta retrieval for performance
for(; vit != mReceivedMsgs.end(); ++vit) for(; vit != mReceivedMsgs.end(); ++vit)
{ {
RsNxsMsg* msg = *vit; RsNxsMsg* msg = *vit;
grpMetas.insert(std::make_pair(msg->grpId, (RsGxsGrpMetaData*)NULL)); grpMetas.insert(std::make_pair(msg->grpId, (RsGxsGrpMetaData*)NULL));
} }
mDataStore->retrieveGxsGrpMetaData(grpMetas); mDataStore->retrieveGxsGrpMetaData(grpMetas);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " updating received messages:" << std::endl; std::cerr << " updating received messages:" << std::endl;
#endif #endif
for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit) for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit)
{ {
RsNxsMsg* msg = *vit; RsNxsMsg* msg = *vit;
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
bool ok = false; bool ok = false;
if(msg->meta.bin_len != 0) if(msg->meta.bin_len != 0)
ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)); ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len));
msg->metaData = meta; msg->metaData = meta;
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
#endif #endif
uint8_t validateReturn = VALIDATE_FAIL; uint8_t validateReturn = VALIDATE_FAIL;
if(ok) if(ok)
{ {
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId); std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl; std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl;
#endif #endif
RsGxsGrpMetaData* grpMeta = NULL ;
// validate msg // validate msg
if(mit != grpMetas.end()) if(mit != grpMetas.end())
{ {
RsGxsGrpMetaData* grpMeta = mit->second; grpMeta = mit->second;
validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys); validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys);
#ifdef GEN_EXCH_DEBUG
std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl;
std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl;
std::cerr << " message validation result: " << (int)validateReturn << std::endl;
#endif
}
if(validateReturn == VALIDATE_SUCCESS)
{
meta->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD;
msgs.insert(std::make_pair(msg, meta));
std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId];
if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end())
{
msgv.push_back(msg->msgId);
}
NxsMsgPendingVect::iterator validated_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
getMsgIdPair(*msg));
if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry);
computeHash(msg->msg, meta->mHash);
meta->recvTS = time(NULL);
#ifdef GEN_EXCH_DEBUG
std::cerr << " new status flags: " << meta->mMsgStatus << std::endl;
std::cerr << " computed hash: " << meta->mHash << std::endl;
std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl;
#endif
if(!msg->metaData->mAuthorId.isNull())
mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ;
mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ;
}
}
else
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " deserialisation failed!" <<std::endl;
#endif
validateReturn = VALIDATE_FAIL;
}
if(validateReturn == VALIDATE_FAIL)
{
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "failed to deserialise incoming meta, msgId: " std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl;
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl;
std::cerr << " message validation result: " << (int)validateReturn << std::endl;
#endif
}
if(validateReturn == VALIDATE_SUCCESS)
{
meta->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD;
msgs.insert(std::make_pair(msg, meta));
std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId];
if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end())
{
msgv.push_back(msg->msgId);
}
NxsMsgPendingVect::iterator validated_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
getMsgIdPair(*msg));
if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry);
computeHash(msg->msg, meta->mHash);
meta->recvTS = time(NULL);
#ifdef GEN_EXCH_DEBUG
std::cerr << " new status flags: " << meta->mMsgStatus << std::endl;
std::cerr << " computed hash: " << meta->mHash << std::endl;
std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl;
#endif #endif
NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), if(!msg->metaData->mAuthorId.isNull())
getMsgIdPair(*msg)); mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ;
if(failed_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(failed_entry); if(grpMeta->mSignFlags & GXS_SERV::FLAG_AUTHOR_AUTHENTICATION_TRACK_MESSAGES)
mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ;
}
}
else
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " deserialisation failed!" <<std::endl;
#endif
validateReturn = VALIDATE_FAIL;
}
if(validateReturn == VALIDATE_FAIL)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "failed to deserialise incoming meta, msgId: "
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif
NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
getMsgIdPair(*msg));
if(failed_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(failed_entry);
delete msg; delete msg;
} }
else if(validateReturn == VALIDATE_FAIL_TRY_LATER) else if(validateReturn == VALIDATE_FAIL_TRY_LATER)
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << "failed to validate msg, trying again: " std::cerr << "failed to validate msg, trying again: "
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; << "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif #endif
RsGxsGrpMsgIdPair id; RsGxsGrpMsgIdPair id;
id.first = msg->grpId; id.first = msg->grpId;
id.second = msg->msgId; id.second = msg->msgId;
// first check you haven't made too many attempts // first check you haven't made too many attempts
NxsMsgPendingVect::iterator vit = std::find( NxsMsgPendingVect::iterator vit = std::find(
mMsgPendingValidate.begin(), mMsgPendingValidate.end(), id); mMsgPendingValidate.begin(), mMsgPendingValidate.end(), id);
if(vit == mMsgPendingValidate.end()) if(vit == mMsgPendingValidate.end())
{ {
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> item(msg, id); GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> item(msg, id);
mMsgPendingValidate.push_back(item); mMsgPendingValidate.push_back(item);
}else }else
{ {
vit->mAttempts++; vit->mAttempts++;
} }
} }
} }
// clean up resources from group meta retrieval // clean up resources from group meta retrieval
freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>, freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>,
RsGxsGrpMetaData*>(grpMetas); RsGxsGrpMetaData*>(grpMetas);
if(!msgIds.empty()) if(!msgIds.empty())
{ {
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " removing existing and old messages from incoming list." << std::endl; std::cerr << " removing existing and old messages from incoming list." << std::endl;
#endif #endif
removeDeleteExistingMessages(msgs, msgIds); removeDeleteExistingMessages(msgs, msgIds);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " storing remaining messages" << std::endl; std::cerr << " storing remaining messages" << std::endl;
#endif #endif
mDataStore->storeMessage(msgs); mDataStore->storeMessage(msgs);
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false); RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false);
c->msgChangeMap = msgIds; c->msgChangeMap = msgIds;
mNotifications.push_back(c); mNotifications.push_back(c);
} }
mReceivedMsgs.clear(); mReceivedMsgs.clear();
} }
void RsGenExchange::processRecvdGroups() void RsGenExchange::processRecvdGroups()

View file

@ -28,11 +28,12 @@ namespace GXS_SERV {
/** END authentication **/ /** END authentication **/
/** START author authentication flags **/ /** START author authentication flags **/
static const uint32_t FLAG_AUTHOR_AUTHENTICATION_MASK = 0x0000ff00; static const uint32_t FLAG_AUTHOR_AUTHENTICATION_MASK = 0x0000ff00;
static const uint32_t FLAG_AUTHOR_AUTHENTICATION_NONE = 0x00000000; static const uint32_t FLAG_AUTHOR_AUTHENTICATION_NONE = 0x00000000;
static const uint32_t FLAG_AUTHOR_AUTHENTICATION_GPG = 0x00000100; static const uint32_t FLAG_AUTHOR_AUTHENTICATION_GPG = 0x00000100;
static const uint32_t FLAG_AUTHOR_AUTHENTICATION_REQUIRED = 0x00000200; static const uint32_t FLAG_AUTHOR_AUTHENTICATION_REQUIRED = 0x00000200;
static const uint32_t FLAG_AUTHOR_AUTHENTICATION_IFNOPUBSIGN = 0x00000400; static const uint32_t FLAG_AUTHOR_AUTHENTICATION_IFNOPUBSIGN = 0x00000400;
static const uint32_t FLAG_AUTHOR_AUTHENTICATION_TRACK_MESSAGES = 0x00000800;
static const uint32_t FLAG_GROUP_SIGN_PUBLISH_MASK = 0x000000ff; static const uint32_t FLAG_GROUP_SIGN_PUBLISH_MASK = 0x000000ff;
static const uint32_t FLAG_GROUP_SIGN_PUBLISH_ENCRYPTED = 0x00000001; static const uint32_t FLAG_GROUP_SIGN_PUBLISH_ENCRYPTED = 0x00000001;