Merge pull request #961 from csoler/v0.6.3

V0.6.3
This commit is contained in:
csoler 2017-07-26 14:01:28 +02:00 committed by GitHub
commit 286e7e0b1a
38 changed files with 680 additions and 739 deletions

View file

@ -272,6 +272,8 @@ bool GxsSecurity::generateKeyPair(RsTlvPublicRSAKey& public_key,RsTlvPrivateRSAK
RSA_generate_key_ex(rsa, 2048, ebn, NULL);
RSA *rsa_pub = RSAPublicKey_dup(rsa);
BN_clear_free(ebn) ;
public_key.keyFlags = RSTLV_KEY_TYPE_PUBLIC_ONLY ;
private_key.keyFlags = RSTLV_KEY_TYPE_FULL ;
@ -580,6 +582,8 @@ bool GxsSecurity::encrypt(uint8_t *& out, uint32_t &outlen, const uint8_t *in, u
// intialize context and send store encrypted cipher in ek
if(!EVP_SealInit(ctx, EVP_aes_128_cbc(), &ek, &eklen, iv, &public_key, 1)) return false;
EVP_PKEY_free(public_key) ;
// now assign memory to out accounting for data, and cipher block size, key length, and key length val
out = (uint8_t*)rs_malloc(inlen + cipher_block_size + size_net_ekl + eklen + EVP_MAX_IV_LENGTH) ;
@ -857,6 +861,7 @@ bool GxsSecurity::decrypt(uint8_t *& out, uint32_t & outlen, const uint8_t *in,
std::cerr << "(EE) Cannot decrypt data. Most likely reason: private GXS key is missing." << std::endl;
return false;
}
EVP_PKEY_free(privateKey) ;
if(inlen < (uint32_t)in_offset)
{

View file

@ -702,20 +702,20 @@ RsNxsMsg* RsDataService::locked_getMessage(RetroCursor &c)
return NULL;
}
int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
int RsDataService::storeMessage(const std::list<RsNxsMsg*>& msg)
{
RsStackMutex stack(mDbMutex);
std::map<RsNxsMsg*, RsGxsMsgMetaData* >::iterator mit = msg.begin();
// start a transaction
mDb->beginTransaction();
for(; mit != msg.end(); ++mit)
for(std::list<RsNxsMsg*>::const_iterator mit = msg.begin(); mit != msg.end(); ++mit)
{
RsNxsMsg* msgPtr = mit->first;
RsGxsMsgMetaData* msgMetaPtr = mit->second;
RsNxsMsg* msgPtr = *mit;
RsGxsMsgMetaData* msgMetaPtr = msgPtr->metaData;
assert(msgMetaPtr != NULL);
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::storeMessage() ";
@ -790,16 +790,6 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
// finish transaction
bool ret = mDb->commitTransaction();
for(mit = msg.begin(); mit != msg.end(); ++mit)
{
//TODO: API encourages aliasing, remove this abomination
if(mit->second != mit->first->metaData)
delete mit->second;
delete mit->first;
;
}
return ret;
}
@ -811,104 +801,94 @@ bool RsDataService::validSize(RsNxsMsg* msg) const
}
int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
int RsDataService::storeGroup(const std::list<RsNxsGrp*>& grp)
{
RsStackMutex stack(mDbMutex);
std::map<RsNxsGrp*, RsGxsGrpMetaData* >::iterator sit = grp.begin();
// begin transaction
mDb->beginTransaction();
for(; sit != grp.end(); ++sit)
{
for(std::list<RsNxsGrp*>::const_iterator sit = grp.begin();sit != grp.end(); ++sit)
{
RsNxsGrp* grpPtr = *sit;
RsGxsGrpMetaData* grpMetaPtr = grpPtr->metaData;
RsNxsGrp* grpPtr = sit->first;
RsGxsGrpMetaData* grpMetaPtr = sit->second;
assert(grpMetaPtr != NULL);
// if data is larger than max item size do not add
if(!validSize(grpPtr)) continue;
// if data is larger than max item size do not add
if(!validSize(grpPtr)) continue;
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::storeGroup() GrpId: " << grpPtr->grpId.toStdString();
std::cerr << " CircleType: " << (uint32_t) grpMetaPtr->mCircleType;
std::cerr << " CircleId: " << grpMetaPtr->mCircleId.toStdString();
std::cerr << std::endl;
std::cerr << "RsDataService::storeGroup() GrpId: " << grpPtr->grpId.toStdString();
std::cerr << " CircleType: " << (uint32_t) grpMetaPtr->mCircleType;
std::cerr << " CircleId: " << grpMetaPtr->mCircleId.toStdString();
std::cerr << std::endl;
#endif
/*!
* STORE data, data len,
* grpId, flags, publish time stamp, identity,
* id signature, admin signatue, key set, last posting ts
* and meta data
**/
ContentValue cv;
/*!
* STORE data, data len,
* grpId, flags, publish time stamp, identity,
* id signature, admin signatue, key set, last posting ts
* and meta data
**/
ContentValue cv;
uint32_t dataLen = grpPtr->grp.TlvSize();
char grpData[dataLen];
uint32_t offset = 0;
grpPtr->grp.SetTlv(grpData, dataLen, &offset);
cv.put(KEY_NXS_DATA, dataLen, grpData);
uint32_t dataLen = grpPtr->grp.TlvSize();
char grpData[dataLen];
uint32_t offset = 0;
grpPtr->grp.SetTlv(grpData, dataLen, &offset);
cv.put(KEY_NXS_DATA, dataLen, grpData);
cv.put(KEY_NXS_DATA_LEN, (int32_t) dataLen);
cv.put(KEY_GRP_ID, grpPtr->grpId.toStdString());
cv.put(KEY_GRP_NAME, grpMetaPtr->mGroupName);
cv.put(KEY_ORIG_GRP_ID, grpMetaPtr->mOrigGrpId.toStdString());
cv.put(KEY_NXS_SERV_STRING, grpMetaPtr->mServiceString);
cv.put(KEY_NXS_FLAGS, (int32_t)grpMetaPtr->mGroupFlags);
cv.put(KEY_TIME_STAMP, (int32_t)grpMetaPtr->mPublishTs);
cv.put(KEY_GRP_SIGN_FLAGS, (int32_t)grpMetaPtr->mSignFlags);
cv.put(KEY_GRP_CIRCLE_ID, grpMetaPtr->mCircleId.toStdString());
cv.put(KEY_GRP_CIRCLE_TYPE, (int32_t)grpMetaPtr->mCircleType);
cv.put(KEY_GRP_INTERNAL_CIRCLE, grpMetaPtr->mInternalCircle.toStdString());
cv.put(KEY_GRP_ORIGINATOR, grpMetaPtr->mOriginator.toStdString());
cv.put(KEY_GRP_AUTHEN_FLAGS, (int32_t)grpMetaPtr->mAuthenFlags);
cv.put(KEY_PARENT_GRP_ID, grpMetaPtr->mParentGrpId.toStdString());
cv.put(KEY_NXS_HASH, grpMetaPtr->mHash.toStdString());
cv.put(KEY_RECV_TS, (int32_t)grpMetaPtr->mRecvTS);
cv.put(KEY_GRP_REP_CUTOFF, (int32_t)grpMetaPtr->mReputationCutOff);
cv.put(KEY_NXS_IDENTITY, grpMetaPtr->mAuthorId.toStdString());
cv.put(KEY_NXS_DATA_LEN, (int32_t) dataLen);
cv.put(KEY_GRP_ID, grpPtr->grpId.toStdString());
cv.put(KEY_GRP_NAME, grpMetaPtr->mGroupName);
cv.put(KEY_ORIG_GRP_ID, grpMetaPtr->mOrigGrpId.toStdString());
cv.put(KEY_NXS_SERV_STRING, grpMetaPtr->mServiceString);
cv.put(KEY_NXS_FLAGS, (int32_t)grpMetaPtr->mGroupFlags);
cv.put(KEY_TIME_STAMP, (int32_t)grpMetaPtr->mPublishTs);
cv.put(KEY_GRP_SIGN_FLAGS, (int32_t)grpMetaPtr->mSignFlags);
cv.put(KEY_GRP_CIRCLE_ID, grpMetaPtr->mCircleId.toStdString());
cv.put(KEY_GRP_CIRCLE_TYPE, (int32_t)grpMetaPtr->mCircleType);
cv.put(KEY_GRP_INTERNAL_CIRCLE, grpMetaPtr->mInternalCircle.toStdString());
cv.put(KEY_GRP_ORIGINATOR, grpMetaPtr->mOriginator.toStdString());
cv.put(KEY_GRP_AUTHEN_FLAGS, (int32_t)grpMetaPtr->mAuthenFlags);
cv.put(KEY_PARENT_GRP_ID, grpMetaPtr->mParentGrpId.toStdString());
cv.put(KEY_NXS_HASH, grpMetaPtr->mHash.toStdString());
cv.put(KEY_RECV_TS, (int32_t)grpMetaPtr->mRecvTS);
cv.put(KEY_GRP_REP_CUTOFF, (int32_t)grpMetaPtr->mReputationCutOff);
cv.put(KEY_NXS_IDENTITY, grpMetaPtr->mAuthorId.toStdString());
offset = 0;
char keySetData[grpMetaPtr->keys.TlvSize()];
grpMetaPtr->keys.SetTlv(keySetData, grpMetaPtr->keys.TlvSize(), &offset);
cv.put(KEY_KEY_SET, grpMetaPtr->keys.TlvSize(), keySetData);
offset = 0;
char keySetData[grpMetaPtr->keys.TlvSize()];
grpMetaPtr->keys.SetTlv(keySetData, grpMetaPtr->keys.TlvSize(), &offset);
cv.put(KEY_KEY_SET, grpMetaPtr->keys.TlvSize(), keySetData);
offset = 0;
char metaData[grpPtr->meta.TlvSize()];
grpPtr->meta.SetTlv(metaData, grpPtr->meta.TlvSize(), &offset);
cv.put(KEY_NXS_META, grpPtr->meta.TlvSize(), metaData);
offset = 0;
char metaData[grpPtr->meta.TlvSize()];
grpPtr->meta.SetTlv(metaData, grpPtr->meta.TlvSize(), &offset);
cv.put(KEY_NXS_META, grpPtr->meta.TlvSize(), metaData);
// local meta data
cv.put(KEY_GRP_SUBCR_FLAG, (int32_t)grpMetaPtr->mSubscribeFlags);
cv.put(KEY_GRP_POP, (int32_t)grpMetaPtr->mPop);
cv.put(KEY_MSG_COUNT, (int32_t)grpMetaPtr->mVisibleMsgCount);
cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus);
cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost);
// local meta data
cv.put(KEY_GRP_SUBCR_FLAG, (int32_t)grpMetaPtr->mSubscribeFlags);
cv.put(KEY_GRP_POP, (int32_t)grpMetaPtr->mPop);
cv.put(KEY_MSG_COUNT, (int32_t)grpMetaPtr->mVisibleMsgCount);
cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus);
cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost);
locked_clearGrpMetaCache(grpMetaPtr->mGroupId);
locked_clearGrpMetaCache(grpMetaPtr->mGroupId);
if (!mDb->sqlInsert(GRP_TABLE_NAME, "", cv))
{
std::cerr << "RsDataService::storeGroup() sqlInsert Failed";
std::cerr << std::endl;
std::cerr << "\t For GroupId: " << grpMetaPtr->mGroupId.toStdString();
std::cerr << std::endl;
}
}
if (!mDb->sqlInsert(GRP_TABLE_NAME, "", cv))
{
std::cerr << "RsDataService::storeGroup() sqlInsert Failed";
std::cerr << std::endl;
std::cerr << "\t For GroupId: " << grpMetaPtr->mGroupId.toStdString();
std::cerr << std::endl;
}
}
// finish transaction
bool ret = mDb->commitTransaction();
for(sit = grp.begin(); sit != grp.end(); ++sit)
{
//TODO: API encourages aliasing, remove this abomination
if(sit->second != sit->first->metaData)
delete sit->second;
delete sit->first;
}
return ret;
}
@ -918,21 +898,21 @@ void RsDataService::locked_clearGrpMetaCache(const RsGxsGroupId& gid)
mGrpMetaDataCache_ContainsAllDatabase = false;
}
int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
int RsDataService::updateGroup(const std::list<RsNxsGrp *> &grp)
{
RsStackMutex stack(mDbMutex);
std::map<RsNxsGrp*, RsGxsGrpMetaData* >::iterator sit = grp.begin();
// begin transaction
mDb->beginTransaction();
for(; sit != grp.end(); ++sit)
for( std::list<RsNxsGrp*>::const_iterator sit = grp.begin(); sit != grp.end(); ++sit)
{
RsNxsGrp* grpPtr = sit->first;
RsGxsGrpMetaData* grpMetaPtr = sit->second;
RsNxsGrp* grpPtr = *sit;
RsGxsGrpMetaData* grpMetaPtr = grpPtr->metaData;
assert(grpMetaPtr != NULL);
// if data is larger than max item size do not add
if(!validSize(grpPtr)) continue;
@ -991,15 +971,6 @@ int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
// finish transaction
bool ret = mDb->commitTransaction();
for(sit = grp.begin(); sit != grp.end(); ++sit)
{
//TODO: API encourages aliasing, remove this abomination
if(sit->second != sit->first->metaData)
delete sit->second;
delete sit->first;
}
return ret;
}

View file

@ -127,21 +127,21 @@ public:
* @param msg map of message and decoded meta data information
* @return error code
*/
int storeMessage(std::map<RsNxsMsg*, RsGxsMsgMetaData*>& msg);
int storeMessage(const std::list<RsNxsMsg*>& msg);
/*!
* Stores a list of groups in data store
* @param grp map of group and decoded meta data
* @return error code
*/
int storeGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grp);
int storeGroup(const std::list<RsNxsGrp*>& grp);
/*!
* Updates group entries in Db
* @param grp map of group and decoded meta data
* @return error code
*/
int updateGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grsp);
int updateGroup(const std::list<RsNxsGrp*>& grsp);
/*!
* @param metaData The meta data item to update

View file

@ -223,14 +223,14 @@ public:
* @param msg map of message and decoded meta data information
* @return error code
*/
virtual int storeMessage(std::map<RsNxsMsg*, RsGxsMsgMetaData*>& msgs) = 0;
virtual int storeMessage(const std::list<RsNxsMsg*>& msgs) = 0;
/*!
* Stores a list of groups in data store
* @param grp map of group and decoded meta data
* @return error code
*/
virtual int storeGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grsp) = 0;
virtual int storeGroup(const std::list<RsNxsGrp*>& grsp) = 0;
/*!
@ -238,7 +238,7 @@ public:
* @param grp map of group and decoded meta data
* @return error code
*/
virtual int updateGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grsp) = 0;
virtual int updateGroup(const std::list<RsNxsGrp*>& grsp) = 0;
/*!
* @param metaData

View file

@ -117,6 +117,14 @@ RsGenExchange::~RsGenExchange()
delete mDataStore;
mDataStore = NULL;
for(uint32_t i=0;i<mNotifications.size();++i)
delete mNotifications[i] ;
for(uint32_t i=0;i<mGrpsToPublish.size();++i)
delete mGrpsToPublish[i].mItem ;
mNotifications.clear();
mGrpsToPublish.clear();
}
bool RsGenExchange::getGroupServerUpdateTS(const RsGxsGroupId& gid, time_t& grp_server_update_TS, time_t& msg_server_update_TS)
@ -1331,11 +1339,16 @@ bool RsGenExchange::deserializeGroupData(unsigned char *data, uint32_t size,
return false;
}
mReceivedGrps.push_back(
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>(
nxs_grp, nxs_grp->grpId,time(NULL)) );
if(mGrpPendingValidate.find(nxs_grp->grpId) != mGrpPendingValidate.end())
{
std::cerr << "(WW) Group " << nxs_grp->grpId << " is already pending validation. Not adding again." << std::endl;
return true;
}
if(gId) *gId = nxs_grp->grpId;
if(gId)
*gId = nxs_grp->grpId;
mGrpPendingValidate.insert(std::make_pair(nxs_grp->grpId, GxsPendingItem<RsNxsGrp*, RsGxsGroupId>(nxs_grp, nxs_grp->grpId,time(NULL))));
return true;
}
@ -1568,20 +1581,18 @@ void RsGenExchange::notifyNewGroups(std::vector<RsNxsGrp *> &groups)
for(; vit != groups.end(); ++vit)
{
RsNxsGrp* grp = *vit;
NxsGrpPendValidVect::iterator received = std::find(mReceivedGrps.begin(),
mReceivedGrps.end(), grp->grpId);
NxsGrpPendValidVect::iterator received = mGrpPendingValidate.find(grp->grpId);
// drop group if you already have them
// TODO: move this to nxs layer to save bandwidth
if(received == mReceivedGrps.end())
if(received == mGrpPendingValidate.end())
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::notifyNewGroups() Received GrpId: " << grp->grpId;
std::cerr << std::endl;
#endif
GxsPendingItem<RsNxsGrp*, RsGxsGroupId> gpsi(grp, grp->grpId,time(NULL));
mReceivedGrps.push_back(gpsi);
mGrpPendingValidate.insert(std::make_pair(grp->grpId, GxsPendingItem<RsNxsGrp*, RsGxsGroupId>(grp, grp->grpId,time(NULL))));
}
else
{
@ -1596,37 +1607,36 @@ void RsGenExchange::notifyNewMessages(std::vector<RsNxsMsg *>& messages)
{
RS_STACK_MUTEX(mGenMtx) ;
std::vector<RsNxsMsg*>::iterator vit = messages.begin();
// store these for tick() to pick them up
// store these for tick() to pick them up
for(; vit != messages.end(); ++vit)
{
RsNxsMsg* msg = *vit;
NxsMsgPendingVect::iterator it =
std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), getMsgIdPair(*msg));
// if we have msg already just delete it
if(it == mMsgPendingValidate.end())
for(uint32_t i=0;i<messages.size();++i)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::notifyNewMessages() Received Msg: ";
std::cerr << " GrpId: " << msg->grpId;
std::cerr << " MsgId: " << msg->msgId;
std::cerr << std::endl;
#endif
RsNxsMsg* msg = messages[i];
NxsMsgPendingVect::iterator it = mMsgPendingValidate.find(msg->msgId) ;
mReceivedMsgs.push_back(msg);
}
else
{
// if we have msg already just delete it
if(it == mMsgPendingValidate.end())
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " message is already in pending validation list. dropping." << std::endl;
std::cerr << "RsGenExchange::notifyNewMessages() Received Msg: ";
std::cerr << " GrpId: " << msg->grpId;
std::cerr << " MsgId: " << msg->msgId;
std::cerr << std::endl;
#endif
delete msg;
}
}
RsGxsGrpMsgIdPair id;
id.first = msg->grpId;
id.second = msg->msgId;
mMsgPendingValidate.insert(std::make_pair(msg->msgId,GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>(msg, id,time(NULL))));
}
else
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " message is already in pending validation list. dropping." << std::endl;
#endif
delete msg;
}
}
}
void RsGenExchange::notifyReceivePublishKey(const RsGxsGroupId &grpId)
@ -2264,6 +2274,8 @@ void RsGenExchange::publishMsgs()
computeHash(msg->msg, msg->metaData->mHash);
mDataAccess->addMsgData(msg);
delete msg ;
msgChangeMap[grpId].push_back(msgId);
delete[] metaDataBuff;
@ -2664,9 +2676,9 @@ void RsGenExchange::publishGrps()
mDataAccess->updateGroupData(grp);
else
mDataAccess->addGroupData(grp);
#warning csoler: this is bad: addGroupData/updateGroupData actially deletes grp. But it may be used below? grp should be a class object and not deleted manually!
groups_to_subscribe.push_back(grpId) ;
delete grp ;
groups_to_subscribe.push_back(grpId) ;
}
else
{
@ -2847,7 +2859,7 @@ void RsGenExchange::computeHash(const RsTlvBinaryData& data, RsFileHash& hash)
void RsGenExchange::processRecvdMessages()
{
std::list<RsGxsMessageId> messages_to_reject ;
{
RS_STACK_MUTEX(mGenMtx) ;
@ -2857,13 +2869,31 @@ void RsGenExchange::processRecvdMessages()
if(!mMsgPendingValidate.empty())
std::cerr << "processing received messages" << std::endl;
#endif
NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();
// 1 - First, make sure items metadata is deserialised, clean old failed items, and collect the groups Ids we have to check
for(; pend_it != mMsgPendingValidate.end();)
RsGxsGrpMetaTemporaryMap grpMetas;
for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();)
{
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = *pend_it;
GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>& gpsi = pend_it->second;
RsNxsMsg *msg = gpsi.mItem ;
if(gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now)
if(msg->metaData == NULL)
{
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)))
msg->metaData = meta;
else
delete meta;
}
bool accept_new_msg = msg->metaData != NULL && acceptNewMessage(msg->metaData,msg->msg.bin_len);
if(!accept_new_msg)
messages_to_reject.push_back(msg->metaData->mMsgId); // This prevents reloading the message again at next sync.
if(!accept_new_msg || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now)
{
std::cerr << "Pending validation grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", has exceeded validation time limit. The author's key can probably not be obtained. This is unexpected." << std::endl;
@ -2872,46 +2902,27 @@ void RsGenExchange::processRecvdMessages()
}
else
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " movign to recvd." << std::endl;
#endif
mReceivedMsgs.push_back(gpsi.mItem);
grpMetas.insert(std::make_pair(pend_it->second.mItem->grpId, (RsGxsGrpMetaData*)NULL));
++pend_it;
}
}
if(mReceivedMsgs.empty())
return;
std::vector<RsNxsMsg*>::iterator vit = mReceivedMsgs.begin();
GxsMsgReq msgIds;
std::map<RsNxsMsg*, RsGxsMsgMetaData*> msgs;
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetas;
// coalesce group meta retrieval for performance
for(; vit != mReceivedMsgs.end(); ++vit)
{
RsNxsMsg* msg = *vit;
grpMetas.insert(std::make_pair(msg->grpId, (RsGxsGrpMetaData*)NULL));
}
// 2 - Retrieve the metadata for the associated groups.
mDataStore->retrieveGxsGrpMetaData(grpMetas);
GxsMsgReq msgIds;
RsNxsMsgDataTemporaryList msgs_to_store;
#ifdef GEN_EXCH_DEBUG
std::cerr << " updating received messages:" << std::endl;
#endif
for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit)
// 3 - Validate each message
for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();)
{
RsNxsMsg* msg = *vit;
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
bool ok = false;
if(msg->meta.bin_len != 0)
ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len));
msg->metaData = meta;
RsNxsMsg* msg = pend_it->second.mItem;
// (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because
// RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process
@ -2928,147 +2939,99 @@ void RsGenExchange::processRecvdMessages()
#ifdef GEN_EXCH_DEBUG
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
#endif
uint8_t validateReturn = VALIDATE_FAIL;
bool accept_new_msg = acceptNewMessage(meta,msg->msg.bin_len);
if(!accept_new_msg && mNetService != NULL)
mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync.
if(ok && accept_new_msg)
{
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
#ifdef GEN_EXCH_DEBUG
std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl;
#endif
RsGxsGrpMetaData* grpMeta = NULL ;
// validate msg
// validate msg
if(mit != grpMetas.end())
{
grpMeta = mit->second;
GxsSecurity::createPublicKeysFromPrivateKeys(grpMeta->keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing.
if(mit == grpMetas.end())
{
std::cerr << "RsGenExchange::processRecvdMessages(): impossible situation: grp meta " << msg->grpId << " not available." << std::endl;
++pend_it ;
continue ;
}
validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys);
RsGxsGrpMetaData *grpMeta = mit->second;
GxsSecurity::createPublicKeysFromPrivateKeys(grpMeta->keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing.
int validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys);
#ifdef GEN_EXCH_DEBUG
std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl;
std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl;
std::cerr << " message validation result: " << (int)validateReturn << std::endl;
#endif
}
if(validateReturn == VALIDATE_SUCCESS)
{
meta->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD;
msgs.insert(std::make_pair(msg, meta));
std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId];
if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end())
{
msgv.push_back(msg->msgId);
}
NxsMsgPendingVect::iterator validated_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
getMsgIdPair(*msg));
if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry);
computeHash(msg->msg, meta->mHash);
meta->recvTS = time(NULL);
#ifdef GEN_EXCH_DEBUG
std::cerr << " new status flags: " << meta->mMsgStatus << std::endl;
std::cerr << " computed hash: " << meta->mHash << std::endl;
std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl;
std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl;
std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl;
std::cerr << " message validation result: " << (int)validateReturn << std::endl;
#endif
if(!msg->metaData->mAuthorId.isNull())
mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ;
}
if(validateReturn == VALIDATE_SUCCESS)
{
msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD;
msgs_to_store.push_back(msg);
if(validateReturn == VALIDATE_FAIL)
{
// In this case, we notify the network exchange service not to DL the message again, at least not yet.
std::vector<RsGxsMessageId> &msgv = msgIds[msg->grpId];
if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end())
msgv.push_back(msg->msgId);
computeHash(msg->msg, msg->metaData->mHash);
msg->metaData->recvTS = time(NULL);
#ifdef GEN_EXCH_DEBUG
std::cerr << "Notifying the network service to not download this message again." << std::endl;
std::cerr << " new status flags: " << meta->mMsgStatus << std::endl;
std::cerr << " computed hash: " << meta->mHash << std::endl;
std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl;
#endif
messages_to_reject.push_back(msg->msgId) ;
}
}
else
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " deserialisation failed!" <<std::endl;
#endif
validateReturn = VALIDATE_FAIL;
}
if(validateReturn == VALIDATE_FAIL)
{
if(!msg->metaData->mAuthorId.isNull())
mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ;
}
else if(validateReturn == VALIDATE_FAIL)
{
// In this case, we notify the network exchange service not to DL the message again, at least not yet.
#ifdef GEN_EXCH_DEBUG
std::cerr << "Validation failed for message id "
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif
messages_to_reject.push_back(msg->msgId) ;
delete msg ;
}
else if(validateReturn == VALIDATE_FAIL_TRY_LATER)
{
++pend_it ;
continue;
}
NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
getMsgIdPair(*msg));
// Remove the entry from mMsgPendingValidate, but do not delete msg since it's either pushed into msg_to_store or deleted in the FAIL case!
if(failed_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(failed_entry);
delete msg;
}
else if(validateReturn == VALIDATE_FAIL_TRY_LATER)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "failed to validate msg, trying again: "
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif
RsGxsGrpMsgIdPair id;
id.first = msg->grpId;
id.second = msg->msgId;
// first check you haven't made too many attempts
NxsMsgPendingVect::iterator vit = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), id);
if(vit == mMsgPendingValidate.end())
mMsgPendingValidate.push_back(GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair>(msg, id,time(NULL)));
// else
// delete msg ;
}
NxsMsgPendingVect::iterator tmp = pend_it ;
++tmp ;
mMsgPendingValidate.erase(pend_it) ;
pend_it = tmp ;
}
// clean up resources from group meta retrieval
freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>,
RsGxsGrpMetaData*>(grpMetas);
if(!msgIds.empty())
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " removing existing and old messages from incoming list." << std::endl;
#endif
removeDeleteExistingMessages(msgs, msgIds);
removeDeleteExistingMessages(msgs_to_store, msgIds);
#ifdef GEN_EXCH_DEBUG
std::cerr << " storing remaining messages" << std::endl;
#endif
mDataStore->storeMessage(msgs);
mDataStore->storeMessage(msgs_to_store);
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false);
c->msgChangeMap = msgIds;
mNotifications.push_back(c);
}
mReceivedMsgs.clear();
}
// Done off-mutex to avoid cross deadlocks in the netservice that might call the RsGenExchange as an observer..
if(mNetService != NULL)
for(std::list<RsGxsMessageId>::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it)
mNetService->rejectMessage(*it) ;
@ -3081,122 +3044,115 @@ void RsGenExchange::processRecvdGroups()
{
RS_STACK_MUTEX(mGenMtx) ;
if(mReceivedGrps.empty())
if(mGrpPendingValidate.empty())
return;
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::Processing received groups" << std::endl;
#endif
NxsGrpPendValidVect::iterator vit = mReceivedGrps.begin();
std::vector<RsGxsGroupId> existingGrpIds;
std::list<RsGxsGroupId> grpIds;
RsNxsGrpDataTemporaryList grps_to_store;
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grps;
// 1 - retrieve the existing groups so as to check what's not new
std::vector<RsGxsGroupId> existingGrpIds;
mDataStore->retrieveGroupIds(existingGrpIds);
while( vit != mReceivedGrps.end())
// 2 - go through each and every new group data and validate the signatures.
for(NxsGrpPendValidVect::iterator vit = mGrpPendingValidate.begin(); vit != mGrpPendingValidate.end();)
{
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = *vit;
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = vit->second;
RsNxsGrp* grp = gpsi.mItem;
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
bool deserialOk = false;
if(grp->meta.bin_len != 0)
deserialOk = meta->deserialise(grp->meta.bin_data, grp->meta.bin_len);
bool erase = true;
if(deserialOk && acceptNewGroup(meta))
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl;
#endif
grp->metaData = meta;
uint8_t ret = validateGrp(grp);
if(ret == VALIDATE_SUCCESS)
{
meta->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
computeHash(grp->grp, meta->mHash);
// group has been validated. Let's notify the global router for the clue
if(!meta->mAuthorId.isNull())
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl;
#endif
mRoutingClues[meta->mAuthorId].insert(grp->PeerId()) ;
}
// This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates
// of grp metadata arrive.
meta->mRecvTS = time(NULL);
// now check if group already existss
if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end())
{
//if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
meta->mOriginator = grp->PeerId();
meta->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED;
grps.insert(std::make_pair(grp, meta));
grpIds.push_back(grp->grpId);
}
else
{
GroupUpdate update;
update.newGrp = grp;
mGroupUpdates.push_back(update);
}
erase = true;
}
else if(ret == VALIDATE_FAIL)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " failed to validate incoming meta, grpId: " << grp->grpId << ": wrong signature" << std::endl;
#endif
delete grp;
erase = true;
}
else if(ret == VALIDATE_FAIL_TRY_LATER)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " failed to validate incoming grp, trying again. grpId: " << grp->grpId << std::endl;
std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl;
#endif
if(grp->metaData == NULL)
{
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
if(gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < time(NULL))
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " validation time got group " << grp->grpId << " exceeded maximum. Will delete group " << std::endl;
#endif
delete grp;
erase = true;
}
else
erase = false;
}
}
else
{
if(!deserialOk)
std::cerr << "(EE) deserialise error in group meta data" << std::endl;
delete grp;
delete meta;
erase = true;
if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len))
grp->metaData = meta ;
else
delete meta ;
}
if(erase)
vit = mReceivedGrps.erase(vit);
else
++vit;
// early deletion of group from the pending list if it's malformed, not accepted, or has been tried unsuccessfully for too long
if(grp->metaData == NULL || !acceptNewGroup(grp->metaData) || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < time(NULL))
{
NxsGrpPendValidVect::iterator tmp(vit) ;
++tmp ;
delete grp ;
mGrpPendingValidate.erase(vit) ;
vit = tmp ;
continue;
}
// group signature validation
uint8_t ret = validateGrp(grp);
if(ret == VALIDATE_SUCCESS)
{
grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
computeHash(grp->grp, grp->metaData->mHash);
// group has been validated. Let's notify the global router for the clue
if(!grp->metaData->mAuthorId.isNull())
{
#ifdef GEN_EXCH_DEBUG
std::cerr << "Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl;
#endif
mRoutingClues[grp->metaData->mAuthorId].insert(grp->PeerId()) ;
}
// This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates
// of grp metadata arrive.
grp->metaData->mRecvTS = time(NULL);
// now check if group already exists
if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end())
{
grp->metaData->mOriginator = grp->PeerId();
grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED;
grps_to_store.push_back(grp);
grpIds.push_back(grp->grpId);
}
else
{
GroupUpdate update;
update.newGrp = grp;
mGroupUpdates.push_back(update);
}
}
else if(ret == VALIDATE_FAIL)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " failed to validate incoming meta, grpId: " << grp->grpId << ": wrong signature" << std::endl;
#endif
delete grp;
}
else if(ret == VALIDATE_FAIL_TRY_LATER)
{
#ifdef GEN_EXCH_DEBUG
std::cerr << " failed to validate incoming grp, trying again later. grpId: " << grp->grpId << std::endl;
#endif
++vit ;
continue;
}
// Erase entry from the list
NxsGrpPendValidVect::iterator tmp(vit) ;
++tmp ;
mGrpPendingValidate.erase(vit) ;
vit = tmp ;
}
if(!grpIds.empty())
@ -3204,7 +3160,7 @@ void RsGenExchange::processRecvdGroups()
RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, false);
c->mGrpIdList = grpIds;
mNotifications.push_back(c);
mDataStore->storeGroup(grps);
mDataStore->storeGroup(grps_to_store);
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
@ -3250,7 +3206,9 @@ void RsGenExchange::performUpdateValidation()
#endif
vit = mGroupUpdates.begin();
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grps;
RsNxsGrpDataTemporaryList grps ;
for(; vit != mGroupUpdates.end(); ++vit)
{
GroupUpdate& gu = *vit;
@ -3265,7 +3223,7 @@ void RsGenExchange::performUpdateValidation()
gu.newGrp->metaData->mSubscribeFlags = gu.oldGrpMeta->mSubscribeFlags ;
grps.insert(std::make_pair(gu.newGrp, gu.newGrp->metaData));
grps.push_back(gu.newGrp);
}
else
{
@ -3353,14 +3311,14 @@ void RsGenExchange::setGroupReputationCutOff(uint32_t& token, const RsGxsGroupId
mGrpLocMetaMap.insert(std::make_pair(token, g));
}
void RsGenExchange::removeDeleteExistingMessages( RsGeneralDataService::MsgStoreMap& msgs, GxsMsgReq& msgIdsNotify)
void RsGenExchange::removeDeleteExistingMessages( std::list<RsNxsMsg*>& msgs, GxsMsgReq& msgIdsNotify)
{
// first get grp ids of messages to be stored
RsGxsGroupId::std_set mGrpIdsUnique;
for(RsGeneralDataService::MsgStoreMap::const_iterator cit = msgs.begin(); cit != msgs.end(); ++cit)
mGrpIdsUnique.insert(cit->second->mGroupId);
for(std::list<RsNxsMsg*>::const_iterator cit = msgs.begin(); cit != msgs.end(); ++cit)
mGrpIdsUnique.insert((*cit)->metaData->mGroupId);
//RsGxsGroupId::std_list grpIds(mGrpIdsUnique.begin(), mGrpIdsUnique.end());
//RsGxsGroupId::std_list::const_iterator it = grpIds.begin();
@ -3381,13 +3339,10 @@ void RsGenExchange::removeDeleteExistingMessages( RsGeneralDataService::MsgStore
#endif
}
//RsGeneralDataService::MsgStoreMap::iterator cit2 = msgs.begin();
RsGeneralDataService::MsgStoreMap filtered;
// now for each msg to be stored that exist in the retrieved msg/grp "index" delete and erase from map
for(RsGeneralDataService::MsgStoreMap::iterator cit2 = msgs.begin(); cit2 != msgs.end(); ++cit2)
for(std::list<RsNxsMsg*>::iterator cit2 = msgs.begin(); cit2 != msgs.end();)
{
const RsGxsMessageId::std_vector& msgIds = msgIdReq[cit2->second->mGroupId];
const RsGxsMessageId::std_vector& msgIds = msgIdReq[(*cit2)->metaData->mGroupId];
#ifdef GEN_EXCH_DEBUG
std::cerr << " grpid=" << cit2->second->mGroupId << ", msgid=" << cit2->second->mMsgId ;
@ -3395,38 +3350,29 @@ void RsGenExchange::removeDeleteExistingMessages( RsGeneralDataService::MsgStore
// Avoid storing messages that are already in the database, as well as messages that are too old (or generally do not pass the database storage test)
//
if(std::find(msgIds.begin(), msgIds.end(), cit2->second->mMsgId) == msgIds.end() && messagePublicationTest(*cit2->second))
if(std::find(msgIds.begin(), msgIds.end(), (*cit2)->metaData->mMsgId) != msgIds.end() || !messagePublicationTest( *(*cit2)->metaData))
{
// passes tests, so add to filtered list
//
filtered.insert(*cit2);
#ifdef GEN_EXCH_DEBUG
std::cerr << " keeping " << cit2->second->mMsgId << std::endl;
#endif
}
else // remove message from list
{
// msg exist in retrieved index
RsGxsMessageId::std_vector& notifyIds = msgIdsNotify[cit2->second->mGroupId];
RsGxsMessageId::std_vector::iterator it2 = std::find(notifyIds.begin(),
notifyIds.end(), cit2->second->mMsgId);
// msg exist in retrieved index. We should use a std::set here instead of a vector.
RsGxsMessageId::std_vector& notifyIds = msgIdsNotify[ (*cit2)->metaData->mGroupId];
RsGxsMessageId::std_vector::iterator it2 = std::find(notifyIds.begin(), notifyIds.end(), (*cit2)->metaData->mMsgId);
if(it2 != notifyIds.end())
{
notifyIds.erase(it2);
if (notifyIds.empty())
{
msgIdsNotify.erase(cit2->second->mGroupId);
msgIdsNotify.erase( (*cit2)->metaData->mGroupId);
}
}
#ifdef GEN_EXCH_DEBUG
std::cerr << " discarding " << cit2->second->mMsgId << std::endl;
#endif
delete cit2->first;
// cit2->second will be deleted too in the destructor of cit2->first (RsNxsMsg)
delete *cit2;
cit2 = msgs.erase(cit2);
}
else
++cit2;
}
msgs = filtered;
}

View file

@ -850,7 +850,7 @@ private:
* @param msgs messages to be filtered
* @param msgIdsNotify message notification map to be filtered
*/
void removeDeleteExistingMessages(RsGeneralDataService::MsgStoreMap& msgs, GxsMsgReq& msgIdsNotify);
void removeDeleteExistingMessages(std::list<RsNxsMsg*>& msgs, GxsMsgReq& msgIdsNotify);
RsMutex mGenMtx;
RsGxsDataAccess* mDataAccess;
@ -863,8 +863,8 @@ private:
std::vector<RsNxsMsg*> mReceivedMsgs;
typedef std::vector<GxsPendingItem<RsNxsGrp*, RsGxsGroupId> > NxsGrpPendValidVect;
NxsGrpPendValidVect mReceivedGrps;
typedef std::map<RsGxsGroupId,GxsPendingItem<RsNxsGrp*, RsGxsGroupId> > NxsGrpPendValidVect;
NxsGrpPendValidVect mGrpPendingValidate;
std::vector<GxsGrpPendingSign> mGrpsToPublish;
typedef std::vector<GxsGrpPendingSign> NxsGrpSignPendVect;
@ -885,11 +885,10 @@ private:
/// authentication policy
uint32_t mAuthenPolicy;
std::map<uint32_t, GxsPendingItem<RsGxsMsgItem*, uint32_t> >
mMsgPendingSign;
std::map<uint32_t, GxsPendingItem<RsGxsMsgItem*, uint32_t> > mMsgPendingSign;
std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > mMsgPendingValidate;
typedef std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > NxsMsgPendingVect;
typedef std::map<RsGxsMessageId,GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > NxsMsgPendingVect;
NxsMsgPendingVect mMsgPendingValidate;
bool mCleaning;
time_t mLastClean;

View file

@ -45,6 +45,11 @@ RsGxsDataAccess::RsGxsDataAccess(RsGeneralDataService* ds) :
mDataStore(ds), mDataMutex("RsGxsDataAccess"), mNextToken(0) {}
RsGxsDataAccess::~RsGxsDataAccess()
{
for(std::map<uint32_t, GxsRequest*>::const_iterator it(mRequests.begin());it!=mRequests.end();++it)
delete it->second ;
}
bool RsGxsDataAccess::requestGroupInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts,
const std::list<RsGxsGroupId> &groupIds)
{
@ -1803,8 +1808,8 @@ bool RsGxsDataAccess::addGroupData(RsNxsGrp* grp) {
RsStackMutex stack(mDataMutex);
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grpM;
grpM.insert(std::make_pair(grp, grp->metaData));
std::list<RsNxsGrp*> grpM;
grpM.push_back(grp);
return mDataStore->storeGroup(grpM);
}
@ -1812,8 +1817,8 @@ bool RsGxsDataAccess::updateGroupData(RsNxsGrp* grp) {
RsStackMutex stack(mDataMutex);
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grpM;
grpM.insert(std::make_pair(grp, grp->metaData));
std::list<RsNxsGrp*> grpM;
grpM.push_back(grp);
return mDataStore->updateGroup(grpM);
}
@ -1821,8 +1826,8 @@ bool RsGxsDataAccess::addMsgData(RsNxsMsg* msg) {
RsStackMutex stack(mDataMutex);
std::map<RsNxsMsg*, RsGxsMsgMetaData*> msgM;
msgM.insert(std::make_pair(msg, msg->metaData));
std::list<RsNxsMsg*> msgM;
msgM.push_back(msg);
return mDataStore->storeMessage(msgM);
}

View file

@ -38,7 +38,7 @@ class RsGxsDataAccess : public RsTokenService
{
public:
RsGxsDataAccess(RsGeneralDataService* ds);
virtual ~RsGxsDataAccess() { return ;}
virtual ~RsGxsDataAccess() ;
public:

View file

@ -551,14 +551,13 @@ void RsGxsNetService::syncWithPeers()
#ifndef GXS_DISABLE_SYNC_MSGS
typedef RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> GrpMetaMap;
GrpMetaMap grpMeta;
RsGxsGrpMetaTemporaryMap grpMeta;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
GrpMetaMap toRequest;
RsGxsGrpMetaTemporaryMap toRequest;
for(GrpMetaMap::iterator mit = grpMeta.begin(); mit != grpMeta.end(); ++mit)
for(RsGxsGrpMetaTemporaryMap::iterator mit = grpMeta.begin(); mit != grpMeta.end(); ++mit)
{
RsGxsGrpMetaData* meta = mit->second;
@ -596,7 +595,7 @@ void RsGxsNetService::syncWithPeers()
GXSNETDEBUG_P_(peerId) << " syncing messages with peer " << peerId << std::endl;
#endif
GrpMetaMap::const_iterator mmit = toRequest.begin();
RsGxsGrpMetaTemporaryMap::const_iterator mmit = toRequest.begin();
for(; mmit != toRequest.end(); ++mmit)
{
const RsGxsGrpMetaData* meta = mmit->second;
@ -678,7 +677,7 @@ void RsGxsNetService::syncGrpStatistics()
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG___<< "Sync-ing group statistics." << std::endl;
#endif
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grpMeta;
RsGxsGrpMetaTemporaryMap grpMeta;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
@ -751,7 +750,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats Request for group " << grs->grpId << " from friend " << grs->PeerId() << std::endl;
#endif
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grpMetas;
RsGxsGrpMetaTemporaryMap grpMetas;
grpMetas[grs->grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetas);
@ -1905,7 +1904,7 @@ void RsGxsNetService::updateClientSyncTS()
void RsGxsNetService::updateServerSyncTS()
{
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> gxsMap;
RsGxsGrpMetaTemporaryMap gxsMap;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___<< "updateServerSyncTS(): updating last modification time stamp of local data." << std::endl;
@ -2719,7 +2718,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
GXSNETDEBUG_PG(item->PeerId(),grpId) << " grpId = " << grpId << std::endl;
GXSNETDEBUG_PG(item->PeerId(),grpId) << " retrieving grp mesta data..." << std::endl;
#endif
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grpMetaMap;
RsGxsGrpMetaTemporaryMap grpMetaMap;
grpMetaMap[grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
@ -2977,7 +2976,7 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
#endif
std::list<RsNxsSyncGrpItem*> grpItemL;
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grpMetaMap;
RsGxsGrpMetaTemporaryMap grpMetaMap;
for(std::list<RsNxsItem*>::iterator lit = tr->mItems.begin(); lit != tr->mItems.end(); ++lit)
{
@ -3086,7 +3085,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
RsGxsMetaDataTemporaryMap<RsNxsGrp> grps ;
t_RsGxsGenericDataTemporaryMap<RsGxsGroupId,RsNxsGrp> grps ;
for(;lit != tr->mItems.end(); ++lit)
{
@ -3803,7 +3802,7 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrpReqItem *item)
return;
}
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grp;
RsGxsGrpMetaTemporaryMap grp;
mDataStore->retrieveGxsGrpMetaData(grp);
#ifdef NXS_NET_DEBUG_0
@ -4089,7 +4088,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
return;
}
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grpMetas;
RsGxsGrpMetaTemporaryMap grpMetas;
grpMetas[item->grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetas);
@ -4628,7 +4627,7 @@ void RsGxsNetService::sharePublishKeysPending()
// Get the meta data for this group Id
//
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grpMetaMap;
RsGxsGrpMetaTemporaryMap grpMetaMap;
grpMetaMap[mit->first] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
@ -4712,7 +4711,7 @@ void RsGxsNetService::handleRecvPublishKeys(RsNxsGroupPublishKeyItem *item)
// Get the meta data for this group Id
//
RsGxsMetaDataTemporaryMap<RsGxsGrpMetaData> grpMetaMap;
RsGxsGrpMetaTemporaryMap grpMetaMap;
grpMetaMap[item->grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);

View file

@ -71,13 +71,10 @@ class RsGroupNetworkStatsRecord
* Incoming transaction are in 3 different states
* 1. START 2. RECEIVING 3. END
*/
class RsGxsNetService : public RsNetworkExchangeService, public p3ThreadedService,
public p3Config
class RsGxsNetService : public RsNetworkExchangeService, public p3ThreadedService, public p3Config
{
public:
typedef RsSharedPtr<RsGxsNetService> pointer;
static const uint32_t FRAGMENT_SIZE;
/*!
* only one observer is allowed

View file

@ -33,55 +33,48 @@
class RsGixs ;
class RsGenExchange ;
/*!
* Handy function for cleaning out meta result containers
* @param container
*/
template <class Container, class Item>
void freeAndClearContainerResource(Container container)
{
typename Container::iterator meta_it = container.begin();
for(; meta_it != container.end(); ++meta_it)
if(meta_it->second != NULL)
delete meta_it->second;
container.clear();
}
// temporary holds a map of pointers to class T, and destroys all pointers on delete.
template<class T>
class RsGxsMetaDataTemporaryMap: public std::map<RsGxsGroupId,T*>
class non_copiable
{
public:
virtual ~RsGxsMetaDataTemporaryMap()
non_copiable() {}
private:
non_copiable& operator=(const non_copiable&) { return *this ;}
non_copiable(const non_copiable&) {}
};
template<class IdClass,class IdData>
class t_RsGxsGenericDataTemporaryMap: public std::map<IdClass,IdData *>, public non_copiable
{
public:
virtual ~t_RsGxsGenericDataTemporaryMap()
{
clear() ;
}
virtual void clear()
{
for(typename RsGxsMetaDataTemporaryMap<T>::iterator it = this->begin();it!=this->end();++it)
for(typename t_RsGxsGenericDataTemporaryMap<IdClass,IdData>::iterator it = this->begin();it!=this->end();++it)
if(it->second != NULL)
delete it->second ;
std::map<RsGxsGroupId,T*>::clear() ;
std::map<IdClass,IdData*>::clear() ;
}
};
template<class T>
class RsGxsMetaDataTemporaryMapVector: public std::map<RsGxsGroupId,std::vector<T*> >
class t_RsGxsGenericDataTemporaryMapVector: public std::map<RsGxsGroupId,std::vector<T*> >, public non_copiable
{
public:
virtual ~RsGxsMetaDataTemporaryMapVector()
virtual ~t_RsGxsGenericDataTemporaryMapVector()
{
clear() ;
}
virtual void clear()
{
for(typename RsGxsMetaDataTemporaryMapVector<T>::iterator it = this->begin();it!=this->end();++it)
for(typename t_RsGxsGenericDataTemporaryMapVector<T>::iterator it = this->begin();it!=this->end();++it)
{
for(uint32_t i=0;i<it->second.size();++i)
delete it->second[i] ;
@ -92,6 +85,34 @@ public:
std::map<RsGxsGroupId,std::vector<T*> >::clear() ;
}
};
template<class T>
class t_RsGxsGenericDataTemporaryList: public std::list<T*>, public non_copiable
{
public:
virtual ~t_RsGxsGenericDataTemporaryList()
{
clear() ;
}
virtual void clear()
{
for(typename t_RsGxsGenericDataTemporaryList<T>::iterator it = this->begin();it!=this->end();++it)
delete *it;
std::list<T*>::clear() ;
}
};
typedef t_RsGxsGenericDataTemporaryMap<RsGxsGroupId,RsGxsGrpMetaData> RsGxsGrpMetaTemporaryMap;
typedef t_RsGxsGenericDataTemporaryMap<RsGxsGroupId,RsNxsGrp> RsNxsGrpDataTemporaryMap;
typedef t_RsGxsGenericDataTemporaryMapVector<RsGxsMsgMetaData> RsGxsMsgMetaTemporaryMap ;
typedef t_RsGxsGenericDataTemporaryMapVector<RsNxsMsg> RsNxsMsgDataTemporaryMap ;
typedef t_RsGxsGenericDataTemporaryList<RsNxsGrp> RsNxsGrpDataTemporaryList ;
typedef t_RsGxsGenericDataTemporaryList<RsNxsMsg> RsNxsMsgDataTemporaryList ;
#ifdef UNUSED
template<class T>
class RsGxsMetaDataTemporaryMapVector: public std::vector<T*>

View file

@ -2320,6 +2320,7 @@ bool p3PeerMgrIMPL::loadList(std::list<RsItem *>& load)
std::cerr << "(II) Loaded group in new format. ID = " << info.id << std::endl;
groupList[info.id] = info ;
delete *it ;
continue;
}
RsPeerBandwidthLimitsItem *pblitem = dynamic_cast<RsPeerBandwidthLimitsItem*>(*it) ;

View file

@ -238,7 +238,7 @@ class RsMsgParentId : public RsMessageItem
class RsMsgSerialiser: public RsServiceSerializer
{
public:
RsMsgSerialiser(SerializationFlags flags)
RsMsgSerialiser(SerializationFlags flags = RsServiceSerializer::SERIALIZATION_FLAG_NONE)
:RsServiceSerializer(RS_SERVICE_TYPE_MSG,RsGenericSerializer::FORMAT_BINARY,flags){}
virtual ~RsMsgSerialiser() {}

View file

@ -2047,7 +2047,7 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
#ifdef DEBUG_CIRCLES
std::cerr << "Processing circle membership requests." << std::endl;
#endif
RsGxsMetaDataTemporaryMapVector<RsGxsMsgItem> msgItems;
t_RsGxsGenericDataTemporaryMapVector<RsGxsMsgItem> msgItems;
if(!RsGenExchange::getMsgData(token, msgItems))
{

View file

@ -12,6 +12,7 @@
/*!
* Not thread safe!!
* And also has a memory leak. Do not use (csoler, 24 Jul 2017).
*/
template<class T>
class RsSharedPtr