mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
cleanup code in RsGenExchange
This commit is contained in:
parent
181a824d11
commit
9881b616ac
@ -2890,8 +2890,8 @@ void RsGenExchange::processRecvdMessages()
|
|||||||
|
|
||||||
bool accept_new_msg = msg->metaData != NULL && acceptNewMessage(msg->metaData,msg->msg.bin_len);
|
bool accept_new_msg = msg->metaData != NULL && acceptNewMessage(msg->metaData,msg->msg.bin_len);
|
||||||
|
|
||||||
if(!accept_new_msg && mNetService != NULL)
|
if(!accept_new_msg)
|
||||||
mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync.
|
messages_to_reject.push_back(msg->metaData->mMsgId); // This prevents reloading the message again at next sync.
|
||||||
|
|
||||||
if(!accept_new_msg || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now)
|
if(!accept_new_msg || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now)
|
||||||
{
|
{
|
||||||
@ -3037,234 +3037,6 @@ void RsGenExchange::processRecvdMessages()
|
|||||||
mNetService->rejectMessage(*it) ;
|
mNetService->rejectMessage(*it) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef OLD_VERSION
|
|
||||||
void RsGenExchange::processRecvdMessages()
|
|
||||||
{
|
|
||||||
std::list<RsGxsMessageId> messages_to_reject ;
|
|
||||||
|
|
||||||
{
|
|
||||||
RS_STACK_MUTEX(mGenMtx) ;
|
|
||||||
|
|
||||||
time_t now = time(NULL);
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
if(!mMsgPendingValidate.empty())
|
|
||||||
std::cerr << "processing received messages" << std::endl;
|
|
||||||
#endif
|
|
||||||
NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();
|
|
||||||
|
|
||||||
for(; pend_it != mMsgPendingValidate.end();)
|
|
||||||
{
|
|
||||||
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = *pend_it;
|
|
||||||
|
|
||||||
if(gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now)
|
|
||||||
{
|
|
||||||
std::cerr << "Pending validation grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", has exceeded validation time limit. The author's key can probably not be obtained. This is unexpected." << std::endl;
|
|
||||||
|
|
||||||
delete gpsi.mItem;
|
|
||||||
pend_it = mMsgPendingValidate.erase(pend_it);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " movign to recvd." << std::endl;
|
|
||||||
#endif
|
|
||||||
mReceivedMsgs.push_back(gpsi.mItem);
|
|
||||||
++pend_it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(mReceivedMsgs.empty())
|
|
||||||
return;
|
|
||||||
|
|
||||||
std::vector<RsNxsMsg*>::iterator vit = mReceivedMsgs.begin();
|
|
||||||
GxsMsgReq msgIds;
|
|
||||||
RsNxsMsgDataTemporaryList msgs;
|
|
||||||
RsGxsGrpMetaTemporaryMap grpMetas;
|
|
||||||
|
|
||||||
// coalesce group meta retrieval for performance
|
|
||||||
for(; vit != mReceivedMsgs.end(); ++vit)
|
|
||||||
{
|
|
||||||
RsNxsMsg* msg = *vit;
|
|
||||||
grpMetas.insert(std::make_pair(msg->grpId, (RsGxsGrpMetaData*)NULL));
|
|
||||||
}
|
|
||||||
|
|
||||||
mDataStore->retrieveGxsGrpMetaData(grpMetas);
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " updating received messages:" << std::endl;
|
|
||||||
#endif
|
|
||||||
for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit)
|
|
||||||
{
|
|
||||||
RsNxsMsg* msg = *vit;
|
|
||||||
|
|
||||||
if(msg->metaData == NULL)
|
|
||||||
{
|
|
||||||
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
|
|
||||||
|
|
||||||
if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)))
|
|
||||||
msg->metaData = meta;
|
|
||||||
else
|
|
||||||
delete meta;
|
|
||||||
}
|
|
||||||
|
|
||||||
// (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because
|
|
||||||
// RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process
|
|
||||||
// will indefinitly loop on the same 20 posts. Since the posts are there already, keeping them is the least problematique way to fix this problem.
|
|
||||||
//
|
|
||||||
// uint32_t max_sync_age = ( mNetService != NULL)?( mNetService->getSyncAge(msg->metaData->mGroupId)):RS_GXS_DEFAULT_MSG_REQ_PERIOD;
|
|
||||||
//
|
|
||||||
// if(max_sync_age != 0 && msg->metaData->mPublishTs + max_sync_age < time(NULL))
|
|
||||||
// {
|
|
||||||
// std::cerr << "(WW) not validating message " << msg->metaData->mMsgId << " in group " << msg->metaData->mGroupId << " because it is older than synchronisation limit. This message was probably sent by a friend node that does not accept sync limits already." << std::endl;
|
|
||||||
// ok = false ;
|
|
||||||
// }
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
|
|
||||||
#endif
|
|
||||||
uint8_t validateReturn = VALIDATE_FAIL;
|
|
||||||
|
|
||||||
bool accept_new_msg = acceptNewMessage(msg->metaData,msg->msg.bin_len);
|
|
||||||
|
|
||||||
if(ok && !accept_new_msg && mNetService != NULL)
|
|
||||||
mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync.
|
|
||||||
|
|
||||||
if(ok && accept_new_msg)
|
|
||||||
{
|
|
||||||
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl;
|
|
||||||
#endif
|
|
||||||
RsGxsGrpMetaData* grpMeta = NULL ;
|
|
||||||
|
|
||||||
// validate msg
|
|
||||||
if(mit != grpMetas.end())
|
|
||||||
{
|
|
||||||
grpMeta = mit->second;
|
|
||||||
GxsSecurity::createPublicKeysFromPrivateKeys(grpMeta->keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing.
|
|
||||||
|
|
||||||
validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys);
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl;
|
|
||||||
std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl;
|
|
||||||
std::cerr << " message validation result: " << (int)validateReturn << std::endl;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
if(validateReturn == VALIDATE_SUCCESS)
|
|
||||||
{
|
|
||||||
meta->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD;
|
|
||||||
msgs.push_back(msg);
|
|
||||||
|
|
||||||
std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId];
|
|
||||||
if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end())
|
|
||||||
{
|
|
||||||
msgv.push_back(msg->msgId);
|
|
||||||
}
|
|
||||||
|
|
||||||
NxsMsgPendingVect::iterator validated_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
|
|
||||||
getMsgIdPair(*msg));
|
|
||||||
|
|
||||||
if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry);
|
|
||||||
|
|
||||||
computeHash(msg->msg, meta->mHash);
|
|
||||||
meta->recvTS = time(NULL);
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " new status flags: " << meta->mMsgStatus << std::endl;
|
|
||||||
std::cerr << " computed hash: " << meta->mHash << std::endl;
|
|
||||||
std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if(!msg->metaData->mAuthorId.isNull())
|
|
||||||
mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(validateReturn == VALIDATE_FAIL)
|
|
||||||
{
|
|
||||||
// In this case, we notify the network exchange service not to DL the message again, at least not yet.
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << "Notifying the network service to not download this message again." << std::endl;
|
|
||||||
#endif
|
|
||||||
messages_to_reject.push_back(msg->msgId) ;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " deserialisation failed!" <<std::endl;
|
|
||||||
#endif
|
|
||||||
validateReturn = VALIDATE_FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(validateReturn == VALIDATE_FAIL)
|
|
||||||
{
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << "Validation failed for message id "
|
|
||||||
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
|
|
||||||
getMsgIdPair(*msg));
|
|
||||||
|
|
||||||
if(failed_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(failed_entry);
|
|
||||||
delete msg;
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
else if(validateReturn == VALIDATE_FAIL_TRY_LATER)
|
|
||||||
{
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << "failed to validate msg, trying again: "
|
|
||||||
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
RsGxsGrpMsgIdPair id;
|
|
||||||
id.first = msg->grpId;
|
|
||||||
id.second = msg->msgId;
|
|
||||||
|
|
||||||
// first check you haven't made too many attempts
|
|
||||||
|
|
||||||
NxsMsgPendingVect::iterator vit = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), id);
|
|
||||||
|
|
||||||
if(vit == mMsgPendingValidate.end())
|
|
||||||
mMsgPendingValidate.push_back(GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>(msg, id,time(NULL)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(!msgIds.empty())
|
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " removing existing and old messages from incoming list." << std::endl;
|
|
||||||
#endif
|
|
||||||
removeDeleteExistingMessages(msgs, msgIds);
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " storing remaining messages" << std::endl;
|
|
||||||
#endif
|
|
||||||
mDataStore->storeMessage(msgs);
|
|
||||||
|
|
||||||
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false);
|
|
||||||
c->msgChangeMap = msgIds;
|
|
||||||
mNotifications.push_back(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
mReceivedMsgs.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Done off-mutex to avoid cross deadlocks in the netservice that might call the RsGenExchange as an observer..
|
|
||||||
|
|
||||||
if(mNetService != NULL)
|
|
||||||
for(std::list<RsGxsMessageId>::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it)
|
|
||||||
mNetService->rejectMessage(*it) ;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; }
|
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; }
|
||||||
bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t /*size*/ ) { return true; }
|
bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t /*size*/ ) { return true; }
|
||||||
|
|
||||||
@ -3397,148 +3169,6 @@ void RsGenExchange::processRecvdGroups()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef OLD_VERSION
|
|
||||||
void RsGenExchange::processRecvdGroups()
|
|
||||||
{
|
|
||||||
RS_STACK_MUTEX(mGenMtx) ;
|
|
||||||
|
|
||||||
if(mReceivedGrps.empty())
|
|
||||||
return;
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << "RsGenExchange::Processing received groups" << std::endl;
|
|
||||||
#endif
|
|
||||||
NxsGrpPendValidVect::iterator vit = mReceivedGrps.begin();
|
|
||||||
std::vector<RsGxsGroupId> existingGrpIds;
|
|
||||||
std::list<RsGxsGroupId> grpIds;
|
|
||||||
|
|
||||||
RsNxsGrpDataTemporaryList grps;
|
|
||||||
|
|
||||||
mDataStore->retrieveGroupIds(existingGrpIds);
|
|
||||||
|
|
||||||
while( vit != mReceivedGrps.end())
|
|
||||||
{
|
|
||||||
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = *vit;
|
|
||||||
RsNxsGrp* grp = gpsi.mItem;
|
|
||||||
|
|
||||||
if(grp->metaData == NULL)
|
|
||||||
{
|
|
||||||
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
|
|
||||||
|
|
||||||
if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len))
|
|
||||||
grp->metaData = meta ;
|
|
||||||
else
|
|
||||||
delete meta ;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool erase = true;
|
|
||||||
|
|
||||||
if(grp->metaData != NULL && acceptNewGroup(grp->metaData))
|
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl;
|
|
||||||
#endif
|
|
||||||
uint8_t ret = validateGrp(grp);
|
|
||||||
|
|
||||||
if(ret == VALIDATE_SUCCESS)
|
|
||||||
{
|
|
||||||
grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
|
|
||||||
|
|
||||||
computeHash(grp->grp, grp->metaData->mHash);
|
|
||||||
|
|
||||||
// group has been validated. Let's notify the global router for the clue
|
|
||||||
|
|
||||||
if(!meta->mAuthorId.isNull())
|
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << "Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
mRoutingClues[meta->mAuthorId].insert(grp->PeerId()) ;
|
|
||||||
}
|
|
||||||
|
|
||||||
// This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates
|
|
||||||
// of grp metadata arrive.
|
|
||||||
|
|
||||||
meta->mRecvTS = time(NULL);
|
|
||||||
|
|
||||||
// now check if group already existss
|
|
||||||
if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end())
|
|
||||||
{
|
|
||||||
//if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
|
|
||||||
meta->mOriginator = grp->PeerId();
|
|
||||||
|
|
||||||
meta->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED;
|
|
||||||
|
|
||||||
grps.push_back(grp);
|
|
||||||
grpIds.push_back(grp->grpId);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
GroupUpdate update;
|
|
||||||
update.newGrp = grp;
|
|
||||||
mGroupUpdates.push_back(update);
|
|
||||||
}
|
|
||||||
erase = true;
|
|
||||||
}
|
|
||||||
else if(ret == VALIDATE_FAIL)
|
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " failed to validate incoming meta, grpId: " << grp->grpId << ": wrong signature" << std::endl;
|
|
||||||
#endif
|
|
||||||
delete grp;
|
|
||||||
erase = true;
|
|
||||||
}
|
|
||||||
else if(ret == VALIDATE_FAIL_TRY_LATER)
|
|
||||||
{
|
|
||||||
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " failed to validate incoming grp, trying again. grpId: " << grp->grpId << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if(gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < time(NULL))
|
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " validation time got group " << grp->grpId << " exceeded maximum. Will delete group " << std::endl;
|
|
||||||
#endif
|
|
||||||
delete grp;
|
|
||||||
erase = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
erase = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if(!deserialOk)
|
|
||||||
std::cerr << "(EE) deserialise error in group meta data" << std::endl;
|
|
||||||
|
|
||||||
delete grp;
|
|
||||||
delete meta;
|
|
||||||
erase = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(erase)
|
|
||||||
vit = mReceivedGrps.erase(vit);
|
|
||||||
else
|
|
||||||
++vit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(!grpIds.empty())
|
|
||||||
{
|
|
||||||
RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, false);
|
|
||||||
c->mGrpIdList = grpIds;
|
|
||||||
mNotifications.push_back(c);
|
|
||||||
mDataStore->storeGroup(grps);
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
|
||||||
std::cerr << " adding the following grp ids to notification: " << std::endl;
|
|
||||||
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
|
|
||||||
std::cerr << " " << *it << std::endl;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void RsGenExchange::performUpdateValidation()
|
void RsGenExchange::performUpdateValidation()
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mGenMtx) ;
|
RS_STACK_MUTEX(mGenMtx) ;
|
||||||
|
Loading…
Reference in New Issue
Block a user