mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-01-25 23:06:10 -05:00
rewrote processRecvdMessages() and processRecvdGroups() in RsGenExchange. Removed mReceivedGrps and mReceivedMsgs, cleaned-up terrible branching and memory management
This commit is contained in:
parent
461ccf3b84
commit
041f989f1c
@ -1339,11 +1339,16 @@ bool RsGenExchange::deserializeGroupData(unsigned char *data, uint32_t size,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
mReceivedGrps.push_back(
|
if(mGrpPendingValidate.find(nxs_grp->grpId) != mGrpPendingValidate.end())
|
||||||
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>(
|
{
|
||||||
nxs_grp, nxs_grp->grpId,time(NULL)) );
|
std::cerr << "(WW) Group " << nxs_grp->grpId << " is already pending validation. Not adding again." << std::endl;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
if(gId) *gId = nxs_grp->grpId;
|
if(gId)
|
||||||
|
*gId = nxs_grp->grpId;
|
||||||
|
|
||||||
|
mGrpPendingValidate.insert(std::make_pair(nxs_grp->grpId, GxsPendingItem<RsNxsGrp*, RsGxsGroupId>(nxs_grp, nxs_grp->grpId,time(NULL))));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1576,20 +1581,18 @@ void RsGenExchange::notifyNewGroups(std::vector<RsNxsGrp *> &groups)
|
|||||||
for(; vit != groups.end(); ++vit)
|
for(; vit != groups.end(); ++vit)
|
||||||
{
|
{
|
||||||
RsNxsGrp* grp = *vit;
|
RsNxsGrp* grp = *vit;
|
||||||
NxsGrpPendValidVect::iterator received = std::find(mReceivedGrps.begin(),
|
NxsGrpPendValidVect::iterator received = mGrpPendingValidate.find(grp->grpId);
|
||||||
mReceivedGrps.end(), grp->grpId);
|
|
||||||
|
|
||||||
// drop group if you already have them
|
// drop group if you already have them
|
||||||
// TODO: move this to nxs layer to save bandwidth
|
// TODO: move this to nxs layer to save bandwidth
|
||||||
if(received == mReceivedGrps.end())
|
if(received == mGrpPendingValidate.end())
|
||||||
{
|
{
|
||||||
#ifdef GEN_EXCH_DEBUG
|
#ifdef GEN_EXCH_DEBUG
|
||||||
std::cerr << "RsGenExchange::notifyNewGroups() Received GrpId: " << grp->grpId;
|
std::cerr << "RsGenExchange::notifyNewGroups() Received GrpId: " << grp->grpId;
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
GxsPendingItem<RsNxsGrp*, RsGxsGroupId> gpsi(grp, grp->grpId,time(NULL));
|
mGrpPendingValidate.insert(std::make_pair(grp->grpId, GxsPendingItem<RsNxsGrp*, RsGxsGroupId>(grp, grp->grpId,time(NULL))));
|
||||||
mReceivedGrps.push_back(gpsi);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1604,37 +1607,36 @@ void RsGenExchange::notifyNewMessages(std::vector<RsNxsMsg *>& messages)
|
|||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mGenMtx) ;
|
RS_STACK_MUTEX(mGenMtx) ;
|
||||||
|
|
||||||
std::vector<RsNxsMsg*>::iterator vit = messages.begin();
|
// store these for tick() to pick them up
|
||||||
|
|
||||||
// store these for tick() to pick them up
|
for(uint32_t i=0;i<messages.size();++i)
|
||||||
for(; vit != messages.end(); ++vit)
|
|
||||||
{
|
|
||||||
RsNxsMsg* msg = *vit;
|
|
||||||
|
|
||||||
NxsMsgPendingVect::iterator it =
|
|
||||||
std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), getMsgIdPair(*msg));
|
|
||||||
|
|
||||||
// if we have msg already just delete it
|
|
||||||
if(it == mMsgPendingValidate.end())
|
|
||||||
{
|
{
|
||||||
#ifdef GEN_EXCH_DEBUG
|
RsNxsMsg* msg = messages[i];
|
||||||
std::cerr << "RsGenExchange::notifyNewMessages() Received Msg: ";
|
NxsMsgPendingVect::iterator it = mMsgPendingValidate.find(msg->msgId) ;
|
||||||
std::cerr << " GrpId: " << msg->grpId;
|
|
||||||
std::cerr << " MsgId: " << msg->msgId;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
mReceivedMsgs.push_back(msg);
|
// if we have msg already just delete it
|
||||||
}
|
if(it == mMsgPendingValidate.end())
|
||||||
else
|
{
|
||||||
{
|
|
||||||
#ifdef GEN_EXCH_DEBUG
|
#ifdef GEN_EXCH_DEBUG
|
||||||
std::cerr << " message is already in pending validation list. dropping." << std::endl;
|
std::cerr << "RsGenExchange::notifyNewMessages() Received Msg: ";
|
||||||
|
std::cerr << " GrpId: " << msg->grpId;
|
||||||
|
std::cerr << " MsgId: " << msg->msgId;
|
||||||
|
std::cerr << std::endl;
|
||||||
#endif
|
#endif
|
||||||
delete msg;
|
RsGxsGrpMsgIdPair id;
|
||||||
}
|
id.first = msg->grpId;
|
||||||
}
|
id.second = msg->msgId;
|
||||||
|
|
||||||
|
mMsgPendingValidate.insert(std::make_pair(msg->msgId,GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>(msg, id,time(NULL))));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " message is already in pending validation list. dropping." << std::endl;
|
||||||
|
#endif
|
||||||
|
delete msg;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RsGenExchange::notifyReceivePublishKey(const RsGxsGroupId &grpId)
|
void RsGenExchange::notifyReceivePublishKey(const RsGxsGroupId &grpId)
|
||||||
@ -2854,6 +2856,188 @@ void RsGenExchange::computeHash(const RsTlvBinaryData& data, RsFileHash& hash)
|
|||||||
pHash.Complete(hash);
|
pHash.Complete(hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RsGenExchange::processRecvdMessages()
|
||||||
|
{
|
||||||
|
std::list<RsGxsMessageId> messages_to_reject ;
|
||||||
|
|
||||||
|
{
|
||||||
|
RS_STACK_MUTEX(mGenMtx) ;
|
||||||
|
|
||||||
|
time_t now = time(NULL);
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
if(!mMsgPendingValidate.empty())
|
||||||
|
std::cerr << "processing received messages" << std::endl;
|
||||||
|
#endif
|
||||||
|
// 1 - First, make sure items metadata is deserialised, clean old failed items, and collect the groups Ids we have to check
|
||||||
|
|
||||||
|
RsGxsGrpMetaTemporaryMap grpMetas;
|
||||||
|
|
||||||
|
for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();)
|
||||||
|
{
|
||||||
|
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = pend_it->second;
|
||||||
|
RsNxsMsg *msg = gpsi.mItem ;
|
||||||
|
|
||||||
|
if(msg->metaData == NULL)
|
||||||
|
{
|
||||||
|
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
|
||||||
|
|
||||||
|
if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)))
|
||||||
|
msg->metaData = meta;
|
||||||
|
else
|
||||||
|
delete meta;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool accept_new_msg = msg->metaData != NULL && acceptNewMessage(msg->metaData,msg->msg.bin_len);
|
||||||
|
|
||||||
|
if(!accept_new_msg && mNetService != NULL)
|
||||||
|
mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync.
|
||||||
|
|
||||||
|
if(!accept_new_msg || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now)
|
||||||
|
{
|
||||||
|
std::cerr << "Pending validation grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", has exceeded validation time limit. The author's key can probably not be obtained. This is unexpected." << std::endl;
|
||||||
|
|
||||||
|
delete gpsi.mItem;
|
||||||
|
pend_it = mMsgPendingValidate.erase(pend_it);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
grpMetas.insert(std::make_pair(pend_it->second.mItem->grpId, (RsGxsGrpMetaData*)NULL));
|
||||||
|
++pend_it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2 - Retrieve the metadata for the associated groups.
|
||||||
|
|
||||||
|
mDataStore->retrieveGxsGrpMetaData(grpMetas);
|
||||||
|
|
||||||
|
GxsMsgReq msgIds;
|
||||||
|
RsNxsMsgDataTemporaryList msgs_to_store;
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " updating received messages:" << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// 3 - Validate each message
|
||||||
|
|
||||||
|
for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();)
|
||||||
|
{
|
||||||
|
RsNxsMsg* msg = pend_it->second.mItem;
|
||||||
|
|
||||||
|
// (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because
|
||||||
|
// RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process
|
||||||
|
// will indefinitly loop on the same 20 posts. Since the posts are there already, keeping them is the least problematique way to fix this problem.
|
||||||
|
//
|
||||||
|
// uint32_t max_sync_age = ( mNetService != NULL)?( mNetService->getSyncAge(msg->metaData->mGroupId)):RS_GXS_DEFAULT_MSG_REQ_PERIOD;
|
||||||
|
//
|
||||||
|
// if(max_sync_age != 0 && msg->metaData->mPublishTs + max_sync_age < time(NULL))
|
||||||
|
// {
|
||||||
|
// std::cerr << "(WW) not validating message " << msg->metaData->mMsgId << " in group " << msg->metaData->mGroupId << " because it is older than synchronisation limit. This message was probably sent by a friend node that does not accept sync limits already." << std::endl;
|
||||||
|
// ok = false ;
|
||||||
|
// }
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
|
||||||
|
#endif
|
||||||
|
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl;
|
||||||
|
#endif
|
||||||
|
// validate msg
|
||||||
|
|
||||||
|
if(mit == grpMetas.end())
|
||||||
|
{
|
||||||
|
std::cerr << "RsGenExchange::processRecvdMessages(): impossible situation: grp meta " << msg->grpId << " not available." << std::endl;
|
||||||
|
++pend_it ;
|
||||||
|
continue ;
|
||||||
|
}
|
||||||
|
|
||||||
|
RsGxsGrpMetaData *grpMeta = mit->second;
|
||||||
|
|
||||||
|
GxsSecurity::createPublicKeysFromPrivateKeys(grpMeta->keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing.
|
||||||
|
|
||||||
|
int validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys);
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl;
|
||||||
|
std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl;
|
||||||
|
std::cerr << " message validation result: " << (int)validateReturn << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if(validateReturn == VALIDATE_SUCCESS)
|
||||||
|
{
|
||||||
|
msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD;
|
||||||
|
msgs_to_store.push_back(msg);
|
||||||
|
|
||||||
|
std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId];
|
||||||
|
|
||||||
|
if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end())
|
||||||
|
msgv.push_back(msg->msgId);
|
||||||
|
|
||||||
|
computeHash(msg->msg, msg->metaData->mHash);
|
||||||
|
msg->metaData->recvTS = time(NULL);
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " new status flags: " << meta->mMsgStatus << std::endl;
|
||||||
|
std::cerr << " computed hash: " << meta->mHash << std::endl;
|
||||||
|
std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if(!msg->metaData->mAuthorId.isNull())
|
||||||
|
mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ;
|
||||||
|
}
|
||||||
|
else if(validateReturn == VALIDATE_FAIL)
|
||||||
|
{
|
||||||
|
// In this case, we notify the network exchange service not to DL the message again, at least not yet.
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << "Validation failed for message id "
|
||||||
|
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
|
||||||
|
#endif
|
||||||
|
messages_to_reject.push_back(msg->msgId) ;
|
||||||
|
delete msg ;
|
||||||
|
}
|
||||||
|
else if(validateReturn == VALIDATE_FAIL_TRY_LATER)
|
||||||
|
{
|
||||||
|
++pend_it ;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the entry from mMsgPendingValidate, but do not delete msg since it's either pushed into msg_to_store or deleted in the FAIL case!
|
||||||
|
|
||||||
|
NxsMsgPendingVect::iterator tmp = pend_it ;
|
||||||
|
++tmp ;
|
||||||
|
mMsgPendingValidate.erase(pend_it) ;
|
||||||
|
pend_it = tmp ;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!msgIds.empty())
|
||||||
|
{
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " removing existing and old messages from incoming list." << std::endl;
|
||||||
|
#endif
|
||||||
|
removeDeleteExistingMessages(msgs_to_store, msgIds);
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " storing remaining messages" << std::endl;
|
||||||
|
#endif
|
||||||
|
mDataStore->storeMessage(msgs_to_store);
|
||||||
|
|
||||||
|
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false);
|
||||||
|
c->msgChangeMap = msgIds;
|
||||||
|
mNotifications.push_back(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done off-mutex to avoid cross deadlocks in the netservice that might call the RsGenExchange as an observer..
|
||||||
|
|
||||||
|
if(mNetService != NULL)
|
||||||
|
for(std::list<RsGxsMessageId>::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it)
|
||||||
|
mNetService->rejectMessage(*it) ;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef OLD_VERSION
|
||||||
void RsGenExchange::processRecvdMessages()
|
void RsGenExchange::processRecvdMessages()
|
||||||
{
|
{
|
||||||
std::list<RsGxsMessageId> messages_to_reject ;
|
std::list<RsGxsMessageId> messages_to_reject ;
|
||||||
@ -2913,14 +3097,16 @@ void RsGenExchange::processRecvdMessages()
|
|||||||
for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit)
|
for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit)
|
||||||
{
|
{
|
||||||
RsNxsMsg* msg = *vit;
|
RsNxsMsg* msg = *vit;
|
||||||
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
|
|
||||||
|
|
||||||
bool ok = false;
|
if(msg->metaData == NULL)
|
||||||
|
{
|
||||||
|
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
|
||||||
|
|
||||||
if(msg->meta.bin_len != 0)
|
if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)))
|
||||||
ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len));
|
msg->metaData = meta;
|
||||||
|
else
|
||||||
msg->metaData = meta;
|
delete meta;
|
||||||
|
}
|
||||||
|
|
||||||
// (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because
|
// (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because
|
||||||
// RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process
|
// RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process
|
||||||
@ -2938,10 +3124,11 @@ void RsGenExchange::processRecvdMessages()
|
|||||||
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
|
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
|
||||||
#endif
|
#endif
|
||||||
uint8_t validateReturn = VALIDATE_FAIL;
|
uint8_t validateReturn = VALIDATE_FAIL;
|
||||||
bool accept_new_msg = acceptNewMessage(meta,msg->msg.bin_len);
|
|
||||||
|
|
||||||
if(!accept_new_msg && mNetService != NULL)
|
bool accept_new_msg = acceptNewMessage(msg->metaData,msg->msg.bin_len);
|
||||||
mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync.
|
|
||||||
|
if(ok && !accept_new_msg && mNetService != NULL)
|
||||||
|
mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync.
|
||||||
|
|
||||||
if(ok && accept_new_msg)
|
if(ok && accept_new_msg)
|
||||||
{
|
{
|
||||||
@ -3076,10 +3263,141 @@ void RsGenExchange::processRecvdMessages()
|
|||||||
for(std::list<RsGxsMessageId>::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it)
|
for(std::list<RsGxsMessageId>::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it)
|
||||||
mNetService->rejectMessage(*it) ;
|
mNetService->rejectMessage(*it) ;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; }
|
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; }
|
||||||
bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t /*size*/ ) { return true; }
|
bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t /*size*/ ) { return true; }
|
||||||
|
|
||||||
|
void RsGenExchange::processRecvdGroups()
|
||||||
|
{
|
||||||
|
RS_STACK_MUTEX(mGenMtx) ;
|
||||||
|
|
||||||
|
if(mGrpPendingValidate.empty())
|
||||||
|
return;
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << "RsGenExchange::Processing received groups" << std::endl;
|
||||||
|
#endif
|
||||||
|
std::list<RsGxsGroupId> grpIds;
|
||||||
|
RsNxsGrpDataTemporaryList grps_to_store;
|
||||||
|
|
||||||
|
// 1 - retrieve the existing groups so as to check what's not new
|
||||||
|
std::vector<RsGxsGroupId> existingGrpIds;
|
||||||
|
mDataStore->retrieveGroupIds(existingGrpIds);
|
||||||
|
|
||||||
|
// 2 - go through each and every new group data and validate the signatures.
|
||||||
|
|
||||||
|
for(NxsGrpPendValidVect::iterator vit = mGrpPendingValidate.begin(); vit != mGrpPendingValidate.end();)
|
||||||
|
{
|
||||||
|
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = vit->second;
|
||||||
|
RsNxsGrp* grp = gpsi.mItem;
|
||||||
|
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl;
|
||||||
|
#endif
|
||||||
|
if(grp->metaData == NULL)
|
||||||
|
{
|
||||||
|
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
|
||||||
|
|
||||||
|
if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len))
|
||||||
|
grp->metaData = meta ;
|
||||||
|
else
|
||||||
|
delete meta ;
|
||||||
|
}
|
||||||
|
|
||||||
|
// early deletion of group from the pending list if it's malformed, not accepted, or has been tried unsuccessfully for too long
|
||||||
|
|
||||||
|
if(grp->metaData == NULL || !acceptNewGroup(grp->metaData) || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < time(NULL))
|
||||||
|
{
|
||||||
|
NxsGrpPendValidVect::iterator tmp(vit) ;
|
||||||
|
++tmp ;
|
||||||
|
delete grp ;
|
||||||
|
mGrpPendingValidate.erase(vit) ;
|
||||||
|
vit = tmp ;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// group signature validation
|
||||||
|
|
||||||
|
uint8_t ret = validateGrp(grp);
|
||||||
|
|
||||||
|
if(ret == VALIDATE_SUCCESS)
|
||||||
|
{
|
||||||
|
grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
|
||||||
|
|
||||||
|
computeHash(grp->grp, grp->metaData->mHash);
|
||||||
|
|
||||||
|
// group has been validated. Let's notify the global router for the clue
|
||||||
|
|
||||||
|
if(!grp->metaData->mAuthorId.isNull())
|
||||||
|
{
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << "Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl;
|
||||||
|
#endif
|
||||||
|
mRoutingClues[grp->metaData->mAuthorId].insert(grp->PeerId()) ;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates
|
||||||
|
// of grp metadata arrive.
|
||||||
|
|
||||||
|
grp->metaData->mRecvTS = time(NULL);
|
||||||
|
|
||||||
|
// now check if group already exists
|
||||||
|
|
||||||
|
if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end())
|
||||||
|
{
|
||||||
|
grp->metaData->mOriginator = grp->PeerId();
|
||||||
|
grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED;
|
||||||
|
|
||||||
|
grps_to_store.push_back(grp);
|
||||||
|
grpIds.push_back(grp->grpId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
GroupUpdate update;
|
||||||
|
update.newGrp = grp;
|
||||||
|
mGroupUpdates.push_back(update);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if(ret == VALIDATE_FAIL)
|
||||||
|
{
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " failed to validate incoming meta, grpId: " << grp->grpId << ": wrong signature" << std::endl;
|
||||||
|
#endif
|
||||||
|
delete grp;
|
||||||
|
}
|
||||||
|
else if(ret == VALIDATE_FAIL_TRY_LATER)
|
||||||
|
{
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " failed to validate incoming grp, trying again later. grpId: " << grp->grpId << std::endl;
|
||||||
|
#endif
|
||||||
|
++vit ;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Erase entry from the list
|
||||||
|
|
||||||
|
NxsGrpPendValidVect::iterator tmp(vit) ;
|
||||||
|
++tmp ;
|
||||||
|
mGrpPendingValidate.erase(vit) ;
|
||||||
|
vit = tmp ;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!grpIds.empty())
|
||||||
|
{
|
||||||
|
RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, false);
|
||||||
|
c->mGrpIdList = grpIds;
|
||||||
|
mNotifications.push_back(c);
|
||||||
|
mDataStore->storeGroup(grps_to_store);
|
||||||
|
#ifdef GEN_EXCH_DEBUG
|
||||||
|
std::cerr << " adding the following grp ids to notification: " << std::endl;
|
||||||
|
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
|
||||||
|
std::cerr << " " << *it << std::endl;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef OLD_VERSION
|
||||||
void RsGenExchange::processRecvdGroups()
|
void RsGenExchange::processRecvdGroups()
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mGenMtx) ;
|
RS_STACK_MUTEX(mGenMtx) ;
|
||||||
@ -3102,27 +3420,31 @@ void RsGenExchange::processRecvdGroups()
|
|||||||
{
|
{
|
||||||
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = *vit;
|
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = *vit;
|
||||||
RsNxsGrp* grp = gpsi.mItem;
|
RsNxsGrp* grp = gpsi.mItem;
|
||||||
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
|
|
||||||
bool deserialOk = false;
|
|
||||||
|
|
||||||
if(grp->meta.bin_len != 0)
|
if(grp->metaData == NULL)
|
||||||
deserialOk = meta->deserialise(grp->meta.bin_data, grp->meta.bin_len);
|
{
|
||||||
|
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
|
||||||
|
|
||||||
|
if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len))
|
||||||
|
grp->metaData = meta ;
|
||||||
|
else
|
||||||
|
delete meta ;
|
||||||
|
}
|
||||||
|
|
||||||
bool erase = true;
|
bool erase = true;
|
||||||
|
|
||||||
if(deserialOk && acceptNewGroup(meta))
|
if(grp->metaData != NULL && acceptNewGroup(grp->metaData))
|
||||||
{
|
{
|
||||||
#ifdef GEN_EXCH_DEBUG
|
#ifdef GEN_EXCH_DEBUG
|
||||||
std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl;
|
std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
grp->metaData = meta;
|
|
||||||
uint8_t ret = validateGrp(grp);
|
uint8_t ret = validateGrp(grp);
|
||||||
|
|
||||||
if(ret == VALIDATE_SUCCESS)
|
if(ret == VALIDATE_SUCCESS)
|
||||||
{
|
{
|
||||||
meta->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
|
grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
|
||||||
|
|
||||||
computeHash(grp->grp, meta->mHash);
|
computeHash(grp->grp, grp->metaData->mHash);
|
||||||
|
|
||||||
// group has been validated. Let's notify the global router for the clue
|
// group has been validated. Let's notify the global router for the clue
|
||||||
|
|
||||||
@ -3215,6 +3537,7 @@ void RsGenExchange::processRecvdGroups()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void RsGenExchange::performUpdateValidation()
|
void RsGenExchange::performUpdateValidation()
|
||||||
{
|
{
|
||||||
|
@ -863,8 +863,8 @@ private:
|
|||||||
|
|
||||||
std::vector<RsNxsMsg*> mReceivedMsgs;
|
std::vector<RsNxsMsg*> mReceivedMsgs;
|
||||||
|
|
||||||
typedef std::vector<GxsPendingItem<RsNxsGrp*, RsGxsGroupId> > NxsGrpPendValidVect;
|
typedef std::map<RsGxsGroupId,GxsPendingItem<RsNxsGrp*, RsGxsGroupId> > NxsGrpPendValidVect;
|
||||||
NxsGrpPendValidVect mReceivedGrps;
|
NxsGrpPendValidVect mGrpPendingValidate;
|
||||||
|
|
||||||
std::vector<GxsGrpPendingSign> mGrpsToPublish;
|
std::vector<GxsGrpPendingSign> mGrpsToPublish;
|
||||||
typedef std::vector<GxsGrpPendingSign> NxsGrpSignPendVect;
|
typedef std::vector<GxsGrpPendingSign> NxsGrpSignPendVect;
|
||||||
@ -885,11 +885,10 @@ private:
|
|||||||
/// authentication policy
|
/// authentication policy
|
||||||
uint32_t mAuthenPolicy;
|
uint32_t mAuthenPolicy;
|
||||||
|
|
||||||
std::map<uint32_t, GxsPendingItem<RsGxsMsgItem*, uint32_t> >
|
std::map<uint32_t, GxsPendingItem<RsGxsMsgItem*, uint32_t> > mMsgPendingSign;
|
||||||
mMsgPendingSign;
|
|
||||||
|
|
||||||
std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > mMsgPendingValidate;
|
typedef std::map<RsGxsMessageId,GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > NxsMsgPendingVect;
|
||||||
typedef std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > NxsMsgPendingVect;
|
NxsMsgPendingVect mMsgPendingValidate;
|
||||||
|
|
||||||
bool mCleaning;
|
bool mCleaning;
|
||||||
time_t mLastClean;
|
time_t mLastClean;
|
||||||
|
@ -42,8 +42,6 @@ RsSerialType* init_item(CompressedChunkMap& map)
|
|||||||
map._map.clear() ;
|
map._map.clear() ;
|
||||||
for(uint32_t i=0;i<15;++i)
|
for(uint32_t i=0;i<15;++i)
|
||||||
map._map.push_back(rand()) ;
|
map._map.push_back(rand()) ;
|
||||||
|
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const CompressedChunkMap& m1,const CompressedChunkMap& m2)
|
bool operator==(const CompressedChunkMap& m1,const CompressedChunkMap& m2)
|
||||||
{
|
{
|
||||||
@ -62,11 +60,10 @@ bool operator==(const RsTurtleFileMapRequestItem& it1,const RsTurtleFileMapReque
|
|||||||
|
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleFileMapRequestItem& item)
|
void init_item(RsTurtleFileMapRequestItem& item)
|
||||||
{
|
{
|
||||||
item.direction = 1 ;
|
item.direction = 1 ;
|
||||||
item.tunnel_id = 0x4ff823e2 ;
|
item.tunnel_id = 0x4ff823e2 ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleFileMapItem& it1,const RsTurtleFileMapItem& it2)
|
bool operator==(const RsTurtleFileMapItem& it1,const RsTurtleFileMapItem& it2)
|
||||||
{
|
{
|
||||||
@ -76,14 +73,13 @@ bool operator==(const RsTurtleFileMapItem& it1,const RsTurtleFileMapItem& it2)
|
|||||||
|
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleFileMapItem& item)
|
void init_item(RsTurtleFileMapItem& item)
|
||||||
{
|
{
|
||||||
item.direction = 1 ;
|
item.direction = 1 ;
|
||||||
item.tunnel_id = 0xf48fe232 ;
|
item.tunnel_id = 0xf48fe232 ;
|
||||||
init_item(item.compressed_map) ;
|
init_item(item.compressed_map) ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleFileDataItem& item)
|
void init_item(RsTurtleFileDataItem& item)
|
||||||
{
|
{
|
||||||
static const uint32_t S = 3456 ;
|
static const uint32_t S = 3456 ;
|
||||||
item.tunnel_id = 0x33eef982 ;
|
item.tunnel_id = 0x33eef982 ;
|
||||||
@ -92,7 +88,6 @@ RsSerialType* init_item(RsTurtleFileDataItem& item)
|
|||||||
item.chunk_data = new unsigned char[S] ;
|
item.chunk_data = new unsigned char[S] ;
|
||||||
for(uint32_t i=0;i<S;++i)
|
for(uint32_t i=0;i<S;++i)
|
||||||
((unsigned char *)item.chunk_data)[i] = rand()%256 ;
|
((unsigned char *)item.chunk_data)[i] = rand()%256 ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleFileDataItem& i1,const RsTurtleFileDataItem& i2)
|
bool operator==(const RsTurtleFileDataItem& i1,const RsTurtleFileDataItem& i2)
|
||||||
{
|
{
|
||||||
@ -104,12 +99,11 @@ bool operator==(const RsTurtleFileDataItem& i1,const RsTurtleFileDataItem& i2)
|
|||||||
return false ;
|
return false ;
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleFileRequestItem& item)
|
void init_item(RsTurtleFileRequestItem& item)
|
||||||
{
|
{
|
||||||
item.tunnel_id = rand() ;
|
item.tunnel_id = rand() ;
|
||||||
item.chunk_offset = 0x25ea228437894379ull ;
|
item.chunk_offset = 0x25ea228437894379ull ;
|
||||||
item.chunk_size = rand() ;
|
item.chunk_size = rand() ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleFileRequestItem& it1,const RsTurtleFileRequestItem& it2)
|
bool operator==(const RsTurtleFileRequestItem& it1,const RsTurtleFileRequestItem& it2)
|
||||||
{
|
{
|
||||||
@ -119,11 +113,10 @@ bool operator==(const RsTurtleFileRequestItem& it1,const RsTurtleFileRequestItem
|
|||||||
|
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleTunnelOkItem& item)
|
void init_item(RsTurtleTunnelOkItem& item)
|
||||||
{
|
{
|
||||||
item.tunnel_id = rand() ;
|
item.tunnel_id = rand() ;
|
||||||
item.request_id = rand() ;
|
item.request_id = rand() ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleTunnelOkItem& it1,const RsTurtleTunnelOkItem& it2)
|
bool operator==(const RsTurtleTunnelOkItem& it1,const RsTurtleTunnelOkItem& it2)
|
||||||
{
|
{
|
||||||
@ -131,13 +124,12 @@ bool operator==(const RsTurtleTunnelOkItem& it1,const RsTurtleTunnelOkItem& it2)
|
|||||||
if(it1.request_id != it2.request_id) return false ;
|
if(it1.request_id != it2.request_id) return false ;
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleOpenTunnelItem& item)
|
void init_item(RsTurtleOpenTunnelItem& item)
|
||||||
{
|
{
|
||||||
item.depth = rand() ;
|
item.depth = rand() ;
|
||||||
item.request_id = rand() ;
|
item.request_id = rand() ;
|
||||||
item.partial_tunnel_id = rand() ;
|
item.partial_tunnel_id = rand() ;
|
||||||
item.file_hash = RsFileHash("c0edcfecc0844ef175d61dd589ab288d262b6bc8") ;
|
item.file_hash = RsFileHash("c0edcfecc0844ef175d61dd589ab288d262b6bc8") ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleOpenTunnelItem& it1,const RsTurtleOpenTunnelItem& it2)
|
bool operator==(const RsTurtleOpenTunnelItem& it1,const RsTurtleOpenTunnelItem& it2)
|
||||||
{
|
{
|
||||||
@ -147,7 +139,7 @@ bool operator==(const RsTurtleOpenTunnelItem& it1,const RsTurtleOpenTunnelItem&
|
|||||||
if(it1.file_hash != it2.file_hash) return false ;
|
if(it1.file_hash != it2.file_hash) return false ;
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleRegExpSearchRequestItem& item)
|
void init_item(RsTurtleRegExpSearchRequestItem& item)
|
||||||
{
|
{
|
||||||
item.request_id = rand() ;
|
item.request_id = rand() ;
|
||||||
item.depth = rand() ;
|
item.depth = rand() ;
|
||||||
@ -158,7 +150,6 @@ RsSerialType* init_item(RsTurtleRegExpSearchRequestItem& item)
|
|||||||
for(uint32_t i=0;i<10u;++i) item.expr._tokens.push_back(rand()%8) ;
|
for(uint32_t i=0;i<10u;++i) item.expr._tokens.push_back(rand()%8) ;
|
||||||
for(uint32_t i=0;i<6u;++i) item.expr._ints.push_back(rand()) ;
|
for(uint32_t i=0;i<6u;++i) item.expr._ints.push_back(rand()) ;
|
||||||
for(uint32_t i=0;i<8u;++i) item.expr._strings.push_back("test string") ;
|
for(uint32_t i=0;i<8u;++i) item.expr._strings.push_back("test string") ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleRegExpSearchRequestItem& it1,const RsTurtleRegExpSearchRequestItem& it2)
|
bool operator==(const RsTurtleRegExpSearchRequestItem& it1,const RsTurtleRegExpSearchRequestItem& it2)
|
||||||
{
|
{
|
||||||
@ -172,12 +163,11 @@ bool operator==(const RsTurtleRegExpSearchRequestItem& it1,const RsTurtleRegExpS
|
|||||||
for(uint32_t i=0;i<it1.expr._strings.size();++i) if(it1.expr._strings[i] != it2.expr._strings[i]) return false ;
|
for(uint32_t i=0;i<it1.expr._strings.size();++i) if(it1.expr._strings[i] != it2.expr._strings[i]) return false ;
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleStringSearchRequestItem& item)
|
void init_item(RsTurtleStringSearchRequestItem& item)
|
||||||
{
|
{
|
||||||
item.request_id = rand() ;
|
item.request_id = rand() ;
|
||||||
item.depth = rand() ;
|
item.depth = rand() ;
|
||||||
item.match_string = std::string("432hkjfdsjkhjk43r3fw") ;
|
item.match_string = std::string("432hkjfdsjkhjk43r3fw") ;
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleStringSearchRequestItem& it1,const RsTurtleStringSearchRequestItem& it2)
|
bool operator==(const RsTurtleStringSearchRequestItem& it1,const RsTurtleStringSearchRequestItem& it2)
|
||||||
{
|
{
|
||||||
@ -200,7 +190,7 @@ bool operator==(const TurtleFileInfo& it1,const TurtleFileInfo& it2)
|
|||||||
if(it1.size != it2.size) return false ;
|
if(it1.size != it2.size) return false ;
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
RsSerialType* init_item(RsTurtleSearchResultItem& item)
|
void init_item(RsTurtleSearchResultItem& item)
|
||||||
{
|
{
|
||||||
item.depth = rand() ;
|
item.depth = rand() ;
|
||||||
item.request_id = rand() ;
|
item.request_id = rand() ;
|
||||||
@ -212,7 +202,6 @@ RsSerialType* init_item(RsTurtleSearchResultItem& item)
|
|||||||
init_item(f) ;
|
init_item(f) ;
|
||||||
item.result.push_back(f) ;
|
item.result.push_back(f) ;
|
||||||
}
|
}
|
||||||
return new RsTurtleSerialiser();
|
|
||||||
}
|
}
|
||||||
bool operator==(const RsTurtleSearchResultItem& it1,const RsTurtleSearchResultItem& it2)
|
bool operator==(const RsTurtleSearchResultItem& it1,const RsTurtleSearchResultItem& it2)
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user