/* * libretroshare/src/gxs: rsgenexchange.cc * * RetroShare Gxs exchange interface. * * Copyright 2012-2012 by Christopher Evi-Parker, Robert Fernie * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License Version 2 as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Please report all bugs and problems to "retroshare@lunamutt.com". * */ #include #include "pqi/pqihash.h" #include "rsgenexchange.h" #include "gxssecurity.h" #include "util/contentvalue.h" #include "util/rsprint.h" #include "retroshare/rsgxsflags.h" #include "retroshare/rsgxscircles.h" #include "retroshare/rsgrouter.h" #include "retroshare/rsidentity.h" #include "retroshare/rspeers.h" #include "rsitems/rsnxsitems.h" #include "rsgixs.h" #include "rsgxsutil.h" #include "rsserver/p3face.h" #include #define PUB_GRP_MASK 0x000f #define RESTR_GRP_MASK 0x00f0 #define PRIV_GRP_MASK 0x0f00 #define GRP_OPTIONS_MASK 0xf000 #define PUB_GRP_OFFSET 0 #define RESTR_GRP_OFFSET 8 #define PRIV_GRP_OFFSET 16 #define GRP_OPTIONS_OFFSET 24 // Authentication key indices. Used to store them in a map // these where originally flags, but used as indexes. Still, we need // to keep their old values to ensure backward compatibility. static const uint32_t INDEX_AUTHEN_IDENTITY = 0x00000010; // identity static const uint32_t INDEX_AUTHEN_PUBLISH = 0x00000020; // publish key static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key #define GXS_MASK "GXS_MASK_HACK" //#define GEN_EXCH_DEBUG 1 static const uint32_t MSG_CLEANUP_PERIOD = 60*59; // 59 minutes static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns, RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs, uint32_t authenPolicy) : mGenMtx("GenExchange"), mDataStore(gds), mNetService(ns), mSerialiser(serviceSerialiser), mServType(servType), mGixs(gixs), mAuthenPolicy(authenPolicy), mCleaning(false), mLastClean((int)time(NULL) - (int)(RSRandom::random_u32() % MSG_CLEANUP_PERIOD)), // this helps unsynchronising the checks for the different services mMsgCleanUp(NULL), mChecking(false), mLastCheck((int)time(NULL) - (int)(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD) + 120), // this helps unsynchronising the checks for the different services, with 2 min security to avoid checking right away before statistics come up. mIntegrityCheck(NULL), SIGN_MAX_WAITING_TIME(60), SIGN_FAIL(0), SIGN_SUCCESS(1), SIGN_FAIL_TRY_LATER(2), VALIDATE_FAIL(0), VALIDATE_SUCCESS(1), VALIDATE_FAIL_TRY_LATER(2), VALIDATE_MAX_WAITING_TIME(60) { mDataAccess = new RsGxsDataAccess(gds); } void RsGenExchange::setNetworkExchangeService(RsNetworkExchangeService *ns) { if(mNetService != NULL) std::cerr << "(EE) Cannot override existing network exchange service. Make sure it has been deleted otherwise." << std::endl; else { mNetService = ns ; } } RsGenExchange::~RsGenExchange() { // need to destruct in a certain order (bad thing, TODO: put down instance ownership rules!) delete mNetService; delete mDataAccess; mDataAccess = NULL; delete mDataStore; mDataStore = NULL; for(uint32_t i=0;igetGroupServerUpdateTS(gid,grp_server_update_TS,msg_server_update_TS) ; } void RsGenExchange::data_tick() { static const double timeDelta = 0.1; // slow tick in sec tick(); usleep((int) (timeDelta * 1000 *1000)); // timeDelta sec } void RsGenExchange::tick() { // Meta Changes should happen first. // This is important, as services want to change Meta, then get results. // Services shouldn't rely on this ordering - but some do. processGrpMetaChanges(); processMsgMetaChanges(); mDataAccess->processRequests(); publishGrps(); publishMsgs(); processGroupUpdatePublish(); processGroupDelete(); processMessageDelete(); processRecvdData(); processRoutingClues() ; if(!mNotifications.empty()) { notifyChanges(mNotifications); mNotifications.clear(); } // implemented service tick function service_tick(); time_t now = time(NULL); if((mLastClean + MSG_CLEANUP_PERIOD < now) || mCleaning) { if(mMsgCleanUp) { if(mMsgCleanUp->clean()) { mCleaning = false; delete mMsgCleanUp; mMsgCleanUp = NULL; mLastClean = time(NULL); } } else { mMsgCleanUp = new RsGxsMessageCleanUp(mDataStore, this, 1); mCleaning = true; } } now = time(NULL); if(mChecking || (mLastCheck + INTEGRITY_CHECK_PERIOD < now)) { mLastCheck = time(NULL); { RS_STACK_MUTEX(mGenMtx) ; if(!mIntegrityCheck) { mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,this,mGixs); mIntegrityCheck->start("gxs integrity"); mChecking = true; } } if(mIntegrityCheck->isDone()) { RS_STACK_MUTEX(mGenMtx) ; std::list grpIds; std::map > msgIds; mIntegrityCheck->getDeletedIds(grpIds, msgIds); if (!grpIds.empty()) { RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, false); gc->mGrpIdList = grpIds; #ifdef GEN_EXCH_DEBUG std::cerr << " adding the following grp ids to notification: " << std::endl; for(std::list::const_iterator it(grpIds.begin());it!=grpIds.end();++it) std::cerr << " " << *it << std::endl; #endif mNotifications.push_back(gc); // also notify the network exchange service that these groups no longer exist. if(mNetService) mNetService->removeGroups(grpIds) ; } if (!msgIds.empty()) { RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, false); c->msgChangeMap = msgIds; mNotifications.push_back(c); } delete mIntegrityCheck; mIntegrityCheck = NULL; mChecking = false; } } } bool RsGenExchange::messagePublicationTest(const RsGxsMsgMetaData& meta) { if(!mNetService) { #ifdef GEN_EXCH_DEBUG std::cerr << "(EE) No network service in service " << std::hex << serviceType() << std::dec << ": cannot read message storage time." << std::endl; #endif return false ; } uint32_t st = mNetService->getKeepAge(meta.mGroupId); time_t storageTimeLimit = meta.mPublishTs + st; return meta.mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP || st == 0 || storageTimeLimit >= time(NULL); } bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token, RsGxsGrpMsgIdPair& msgId) { RS_STACK_MUTEX(mGenMtx) ; #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::acknowledgeTokenMsg(). token=" << token << std::endl; #endif std::map::iterator mit = mMsgNotify.find(token); if(mit == mMsgNotify.end()) { #ifdef GEN_EXCH_DEBUG std::cerr << " no notification found for this token." << std::endl; #endif return false; } msgId = mit->second; // no dump token as client has ackowledged its completion mDataAccess->disposeOfPublicToken(token); #ifdef GEN_EXCH_DEBUG std::cerr << " found grpId=" << msgId.first <<", msgId=" << msgId.second << std::endl; std::cerr << " disposing token from mDataAccess" << std::endl; #endif return true; } bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grpId) { RS_STACK_MUTEX(mGenMtx) ; #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::acknowledgeTokenGrp(). token=" << token << std::endl; #endif std::map::iterator mit = mGrpNotify.find(token); if(mit == mGrpNotify.end()) { #ifdef GEN_EXCH_DEBUG std::cerr << " no notification found for this token." << std::endl; #endif return false; } grpId = mit->second; // no dump token as client has ackowledged its completion mDataAccess->disposeOfPublicToken(token); #ifdef GEN_EXCH_DEBUG std::cerr << " found grpId=" << grpId << std::endl; std::cerr << " disposing token from mDataAccess" << std::endl; #endif return true; } void RsGenExchange::generateGroupKeys(RsTlvSecurityKeySet& keySet, bool genPublishKeys) { /* create Keys */ RsTlvPublicRSAKey pubAdminKey ; RsTlvPrivateRSAKey privAdminKey; GxsSecurity::generateKeyPair(pubAdminKey,privAdminKey) ; // for now all public pubAdminKey.keyFlags |= RSTLV_KEY_DISTRIB_ADMIN ; privAdminKey.keyFlags |= RSTLV_KEY_DISTRIB_ADMIN ; keySet.public_keys[pubAdminKey.keyId] = pubAdminKey; keySet.private_keys[privAdminKey.keyId] = privAdminKey; if(genPublishKeys) { /* set publish keys */ RsTlvPublicRSAKey pubPublishKey ; RsTlvPrivateRSAKey privPublishKey; GxsSecurity::generateKeyPair(pubPublishKey,privPublishKey) ; // for now all public pubPublishKey.keyFlags |= RSTLV_KEY_DISTRIB_PUBLISH ; privPublishKey.keyFlags |= RSTLV_KEY_DISTRIB_PUBLISH ; keySet.public_keys[pubPublishKey.keyId] = pubPublishKey; keySet.private_keys[privPublishKey.keyId] = privPublishKey; } } uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createGroup()"; std::cerr << std::endl; #endif RsGxsGrpMetaData* meta = grp->metaData; /* add public admin and publish keys to grp */ // find private admin key RsTlvPrivateRSAKey privAdminKey; bool privKeyFound = false; // private admin key for( std::map::iterator mit = keySet.private_keys.begin(); mit != keySet.private_keys.end(); ++mit) { RsTlvPrivateRSAKey& key = mit->second; if((key.keyFlags & RSTLV_KEY_DISTRIB_ADMIN) && (key.keyFlags & RSTLV_KEY_TYPE_FULL)) { privAdminKey = key; privKeyFound = true; break ; } } if(!privKeyFound) { std::cerr << "RsGenExchange::createGroup() Missing private ADMIN Key"; std::cerr << std::endl; return false; } // only public keys are included to be transported. The 2nd line below is very important. meta->keys = keySet; meta->keys.private_keys.clear() ; // group is self signing // for the creation of group signature // only public admin and publish keys are present in meta uint32_t metaDataLen = meta->serial_size(RS_GXS_GRP_META_DATA_CURRENT_API_VERSION); uint32_t allGrpDataLen = metaDataLen + grp->grp.bin_len; char* metaData = new char[metaDataLen]; char* allGrpData = new char[allGrpDataLen]; // msgData + metaData meta->serialise(metaData, metaDataLen,RS_GXS_GRP_META_DATA_CURRENT_API_VERSION); // copy msg data and meta in allMsgData buffer memcpy(allGrpData, grp->grp.bin_data, grp->grp.bin_len); memcpy(allGrpData+(grp->grp.bin_len), metaData, metaDataLen); RsTlvKeySignature adminSign; bool ok = GxsSecurity::getSignature(allGrpData, allGrpDataLen, privAdminKey, adminSign); // add admin sign to grpMeta meta->signSet.keySignSet[INDEX_AUTHEN_ADMIN] = adminSign; RsTlvBinaryData grpData(mServType); grpData.setBinData(allGrpData, allGrpDataLen); uint8_t ret = createGroupSignatures(meta->signSet, grpData, *(grp->metaData)); // clean up delete[] allGrpData; delete[] metaData; if (!ok) { std::cerr << "RsGenExchange::createGroup() ERROR !okay (getSignature error)"; std::cerr << std::endl; return CREATE_FAIL; } if(ret == SIGN_FAIL) { return CREATE_FAIL; }else if(ret == SIGN_FAIL_TRY_LATER) { return CREATE_FAIL_TRY_LATER; }else if(ret == SIGN_SUCCESS) { return CREATE_SUCCESS; }else{ return CREATE_FAIL; } } int RsGenExchange::createGroupSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinaryData& grpData, RsGxsGrpMetaData& grpMeta) { bool needIdentitySign = false; int id_ret; uint8_t author_flag = GXS_SERV::GRP_OPTION_AUTHEN_AUTHOR_SIGN; PrivacyBitPos pos = GRP_OPTION_BITS; // Check required permissions, and allow them to sign it - if they want too - as well! if ((!grpMeta.mAuthorId.isNull()) || checkAuthenFlag(pos, author_flag)) { needIdentitySign = true; #ifdef GEN_EXCH_DEBUG std::cerr << "Needs Identity sign! (Service Flags)"; std::cerr << std::endl; #endif } if (needIdentitySign) { if(grpMeta.mAuthorId.isNull()) { std::cerr << "RsGenExchange::createGroupSignatures() "; std::cerr << "Group signature is required by service, but the author id is null." << std::endl; id_ret = SIGN_FAIL; } else if(mGixs) { bool haveKey = mGixs->havePrivateKey(grpMeta.mAuthorId); if(haveKey) { RsTlvPrivateRSAKey authorKey; mGixs->getPrivateKey(grpMeta.mAuthorId, authorKey); RsTlvKeySignature sign; if(GxsSecurity::getSignature((char*)grpData.bin_data, grpData.bin_len, authorKey, sign)) { id_ret = SIGN_SUCCESS; mGixs->timeStampKey(grpMeta.mAuthorId,RsIdentityUsage(mServType,RsIdentityUsage::GROUP_AUTHOR_SIGNATURE_CREATION,grpMeta.mGroupId)) ; signSet.keySignSet[INDEX_AUTHEN_IDENTITY] = sign; } else id_ret = SIGN_FAIL; } else { mGixs->requestPrivateKey(grpMeta.mAuthorId); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createGroupSignatures(): "; std::cerr << " ERROR AUTHOR KEY: " << grpMeta.mAuthorId << " is not Cached / available for Message Signing\n"; std::cerr << "RsGenExchange::createGroupSignatures(): Requestiong AUTHOR KEY"; std::cerr << std::endl; #endif id_ret = SIGN_FAIL_TRY_LATER; } } else { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createGroupSignatures()"; std::cerr << "Gixs not enabled while request identity signature validation!" << std::endl; #endif id_ret = SIGN_FAIL; } } else { id_ret = SIGN_SUCCESS; } return id_ret; } int RsGenExchange::createMsgSignatures(RsTlvKeySignatureSet& signSet, RsTlvBinaryData& msgData, const RsGxsMsgMetaData& msgMeta, RsGxsGrpMetaData& grpMeta) { bool needPublishSign = false, needIdentitySign = false; uint32_t grpFlag = grpMeta.mGroupFlags; bool publishSignSuccess = false; #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createMsgSignatures() for Msg.mMsgName: " << msgMeta.mMsgName; std::cerr << std::endl; #endif // publish signature is determined by whether group is public or not // for private group signature is not needed as it needs decrypting with // the private publish key anyways // restricted is a special case which heeds whether publish sign needs to be checked or not // one may or may not want uint8_t author_flag = GXS_SERV::MSG_AUTHEN_ROOT_AUTHOR_SIGN; uint8_t publish_flag = GXS_SERV::MSG_AUTHEN_ROOT_PUBLISH_SIGN; if(!msgMeta.mParentId.isNull()) { // Child Message. author_flag = GXS_SERV::MSG_AUTHEN_CHILD_AUTHOR_SIGN; publish_flag = GXS_SERV::MSG_AUTHEN_CHILD_PUBLISH_SIGN; } PrivacyBitPos pos = PUBLIC_GRP_BITS; if (grpFlag & GXS_SERV::FLAG_PRIVACY_RESTRICTED) { pos = RESTRICTED_GRP_BITS; } else if (grpFlag & GXS_SERV::FLAG_PRIVACY_PRIVATE) { pos = PRIVATE_GRP_BITS; } needIdentitySign = false; needPublishSign = false; if (checkAuthenFlag(pos, publish_flag)) { needPublishSign = true; #ifdef GEN_EXCH_DEBUG std::cerr << "Needs Publish sign! (Service Flags)"; std::cerr << std::endl; #endif } // Check required permissions, and allow them to sign it - if they want too - as well! if (checkAuthenFlag(pos, author_flag)) { needIdentitySign = true; #ifdef GEN_EXCH_DEBUG std::cerr << "Needs Identity sign! (Service Flags)"; std::cerr << std::endl; #endif } if (!msgMeta.mAuthorId.isNull()) { needIdentitySign = true; #ifdef GEN_EXCH_DEBUG std::cerr << "Needs Identity sign! (AuthorId Exists)"; std::cerr << std::endl; #endif } if(needPublishSign) { // public and shared is publish key const RsTlvSecurityKeySet& keys = grpMeta.keys; const RsTlvPrivateRSAKey *publishKey; std::map::const_iterator mit = keys.private_keys.begin(), mit_end = keys.private_keys.end(); bool publish_key_found = false; for(; mit != mit_end; ++mit) { publish_key_found = mit->second.keyFlags == (RSTLV_KEY_DISTRIB_PUBLISH | RSTLV_KEY_TYPE_FULL); if(publish_key_found) break; } if (publish_key_found) { // private publish key publishKey = &(mit->second); RsTlvKeySignature publishSign = signSet.keySignSet[INDEX_AUTHEN_PUBLISH]; publishSignSuccess = GxsSecurity::getSignature((char*)msgData.bin_data, msgData.bin_len, *publishKey, publishSign); //place signature in msg meta signSet.keySignSet[INDEX_AUTHEN_PUBLISH] = publishSign; }else { std::cerr << "RsGenExchange::createMsgSignatures()"; std::cerr << " ERROR Cannot find PUBLISH KEY for Message Signing!"; std::cerr << " ERROR Publish Sign failed!"; std::cerr << std::endl; } } else // publish sign not needed so set as successful { publishSignSuccess = true; } int id_ret; if (needIdentitySign) { if(mGixs) { bool haveKey = mGixs->havePrivateKey(msgMeta.mAuthorId); if(haveKey) { RsTlvPrivateRSAKey authorKey; mGixs->getPrivateKey(msgMeta.mAuthorId, authorKey); RsTlvKeySignature sign; if(GxsSecurity::getSignature((char*)msgData.bin_data, msgData.bin_len, authorKey, sign)) { id_ret = SIGN_SUCCESS; mGixs->timeStampKey(msgMeta.mAuthorId,RsIdentityUsage(mServType,RsIdentityUsage::MESSAGE_AUTHOR_SIGNATURE_CREATION,msgMeta.mGroupId,msgMeta.mMsgId)) ; signSet.keySignSet[INDEX_AUTHEN_IDENTITY] = sign; } else id_ret = SIGN_FAIL; } else { mGixs->requestPrivateKey(msgMeta.mAuthorId); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createMsgSignatures(): "; std::cerr << " ERROR AUTHOR KEY: " << msgMeta.mAuthorId << " is not Cached / available for Message Signing\n"; std::cerr << "RsGenExchange::createMsgSignatures(): Requestiong AUTHOR KEY"; std::cerr << std::endl; #endif id_ret = SIGN_FAIL_TRY_LATER; } } else { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createMsgSignatures()"; std::cerr << "Gixs not enabled while request identity signature validation!" << std::endl; #endif id_ret = SIGN_FAIL; } } else { id_ret = SIGN_SUCCESS; } if(publishSignSuccess) { return id_ret; } else { return SIGN_FAIL; } } int RsGenExchange::createMessage(RsNxsMsg* msg) { const RsGxsGroupId& id = msg->grpId; #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createMessage() " << std::endl; #endif std::map metaMap; metaMap.insert(std::make_pair(id, (RsGxsGrpMetaData*)(NULL))); mDataStore->retrieveGxsGrpMetaData(metaMap); RsGxsMsgMetaData &meta = *(msg->metaData); int ret_val; if(!metaMap[id]) { return CREATE_FAIL; } else { // get publish key RsGxsGrpMetaData* grpMeta = metaMap[id]; uint32_t metaDataLen = meta.serial_size(); uint32_t allMsgDataLen = metaDataLen + msg->msg.bin_len; char* metaData = new char[metaDataLen]; char* allMsgData = new char[allMsgDataLen]; // msgData + metaData meta.serialise(metaData, &metaDataLen); // copy msg data and meta in allmsgData buffer memcpy(allMsgData, msg->msg.bin_data, msg->msg.bin_len); memcpy(allMsgData+(msg->msg.bin_len), metaData, metaDataLen); RsTlvBinaryData msgData(0); msgData.setBinData(allMsgData, allMsgDataLen); // create signatures ret_val = createMsgSignatures(meta.signSet, msgData, meta, *grpMeta); // get hash of msg data to create msg id pqihash hash; hash.addData(allMsgData, allMsgDataLen); RsFileHash hashId; hash.Complete(hashId); msg->msgId = hashId; // assign msg id to msg meta msg->metaData->mMsgId = msg->msgId; delete[] metaData; delete[] allMsgData; delete grpMeta; } if(ret_val == SIGN_FAIL) return CREATE_FAIL; else if(ret_val == SIGN_FAIL_TRY_LATER) return CREATE_FAIL_TRY_LATER; else if(ret_val == SIGN_SUCCESS) return CREATE_SUCCESS; else { std::cerr << "Unknown return value from signature attempt!"; return CREATE_FAIL; } } int RsGenExchange::validateMsg(RsNxsMsg *msg, const uint32_t& grpFlag, const uint32_t& /*signFlag*/, RsTlvSecurityKeySet& grpKeySet) { bool needIdentitySign = false; bool needPublishSign = false; bool publishValidate = true, idValidate = true; uint8_t author_flag = GXS_SERV::MSG_AUTHEN_ROOT_AUTHOR_SIGN; uint8_t publish_flag = GXS_SERV::MSG_AUTHEN_ROOT_PUBLISH_SIGN; if(!msg->metaData->mParentId.isNull()) { // Child Message. author_flag = GXS_SERV::MSG_AUTHEN_CHILD_AUTHOR_SIGN; publish_flag = GXS_SERV::MSG_AUTHEN_CHILD_PUBLISH_SIGN; } PrivacyBitPos pos = PUBLIC_GRP_BITS; if (grpFlag & GXS_SERV::FLAG_PRIVACY_RESTRICTED) { pos = RESTRICTED_GRP_BITS; } else if (grpFlag & GXS_SERV::FLAG_PRIVACY_PRIVATE) { pos = PRIVATE_GRP_BITS; } if (checkAuthenFlag(pos, publish_flag)) needPublishSign = true; // Check required permissions, if they have signed it anyway - we need to validate it. if ((checkAuthenFlag(pos, author_flag)) || (!msg->metaData->mAuthorId.isNull())) needIdentitySign = true; #ifdef GEN_EXCH_DEBUG std::cerr << "Validate message: msgId=" << msg->msgId << ", grpId=" << msg->grpId << " grpFlags=" << std::hex << grpFlag << std::dec << ". Need publish=" << needPublishSign << ", needIdentitySign=" << needIdentitySign ; #endif RsGxsMsgMetaData& metaData = *(msg->metaData); if(needPublishSign) { RsTlvKeySignature sign = metaData.signSet.keySignSet[INDEX_AUTHEN_PUBLISH]; std::map& keys = grpKeySet.public_keys; std::map::iterator mit = keys.begin(); RsGxsId keyId; for(; mit != keys.end() ; ++mit) { RsTlvPublicRSAKey& key = mit->second; if(key.keyFlags & RSTLV_KEY_DISTRIB_PUBLIC_deprecated) { keyId = key.keyId; std::cerr << "WARNING: old style publish key with flags " << key.keyFlags << std::endl; std::cerr << " this cannot be fixed, but RS will deal with it." << std::endl; break ; } if(key.keyFlags & RSTLV_KEY_DISTRIB_PUBLISH) // we might have the private key, but we still should be able to check the signature { keyId = key.keyId; break; } } if(!keyId.isNull()) { RsTlvPublicRSAKey& key = keys[keyId]; publishValidate &= GxsSecurity::validateNxsMsg(*msg, sign, key); } else { std::cerr << "(EE) public publish key not found in group that require publish key validation. This should not happen! msgId=" << metaData.mMsgId << ", grpId=" << metaData.mGroupId << std::endl; std::cerr << "(EE) public keys available for this group are: " << std::endl; for(std::map::const_iterator it(grpKeySet.public_keys.begin());it!=grpKeySet.public_keys.end();++it) std::cerr << "(EE) " << it->first << std::endl; std::cerr << "(EE) private keys available for this group are: " << std::endl; for(std::map::const_iterator it(grpKeySet.private_keys.begin());it!=grpKeySet.private_keys.end();++it) std::cerr << "(EE) " << it->first << std::endl; publishValidate = false; } } else { publishValidate = true; } if(needIdentitySign) { if(mGixs) { bool haveKey = mGixs->haveKey(metaData.mAuthorId); if(haveKey) { RsTlvPublicRSAKey authorKey; bool auth_key_fetched = mGixs->getKey(metaData.mAuthorId, authorKey) ; if (auth_key_fetched) { RsTlvKeySignature sign = metaData.signSet.keySignSet[INDEX_AUTHEN_IDENTITY]; idValidate &= GxsSecurity::validateNxsMsg(*msg, sign, authorKey); mGixs->timeStampKey(metaData.mAuthorId,RsIdentityUsage(mServType,RsIdentityUsage::MESSAGE_AUTHOR_SIGNATURE_VALIDATION,metaData.mGroupId,metaData.mMsgId)) ; } else { std::cerr << "RsGenExchange::validateMsg()"; std::cerr << " ERROR Cannot Retrieve AUTHOR KEY for Message Validation"; std::cerr << std::endl; idValidate = false; } if(idValidate) { // get key data and check that the key is actually PGP-linked. If not, reject the post. RsIdentityDetails details ; if(!mGixs->getIdDetails(metaData.mAuthorId,details)) { // the key cannot ke reached, although it's in cache. Weird situation. std::cerr << "RsGenExchange::validateMsg(): cannot get key data for ID=" << metaData.mAuthorId << ", although it's supposed to be already in cache. Cannot validate." << std::endl; idValidate = false ; } else { // now check reputation of the message author. The reputation will need to be at least as high as this value for the msg to validate. // At validation step, we accept all messages, except the ones signed by locally rejected identities. if(details.mReputation.mOverallReputationLevel == RsReputations::REPUTATION_LOCALLY_NEGATIVE) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::validateMsg(): message from " << metaData.mAuthorId << ", rejected because reputation level (" << details.mReputation.mOverallReputationLevel <<") indicate that you banned this ID." << std::endl; #endif idValidate = false ; } } } } else { std::list peers; peers.push_back(msg->PeerId()); mGixs->requestKey(metaData.mAuthorId, peers, RsIdentityUsage(serviceType(),RsIdentityUsage::MESSAGE_AUTHOR_SIGNATURE_VALIDATION,metaData.mGroupId,metaData.mMsgId)); #ifdef GEN_EXCH_DEBUG std::cerr << ", Key missing. Retry later." << std::endl; #endif return VALIDATE_FAIL_TRY_LATER; } } else { #ifdef GEN_EXCH_DEBUG std::cerr << "Gixs not enabled while request identity signature validation!" << std::endl; #endif idValidate = false; } } else { idValidate = true; } #ifdef GEN_EXCH_DEBUG std::cerr << ", publish val=" << publishValidate << ", idValidate=" << idValidate << ". Result=" << (publishValidate && idValidate) << std::endl; #endif if(publishValidate && idValidate) return VALIDATE_SUCCESS; else return VALIDATE_FAIL; } int RsGenExchange::validateGrp(RsNxsGrp* grp) { bool needIdentitySign = false, idValidate = false; RsGxsGrpMetaData& metaData = *(grp->metaData); uint8_t author_flag = GXS_SERV::GRP_OPTION_AUTHEN_AUTHOR_SIGN; PrivacyBitPos pos = GRP_OPTION_BITS; #ifdef GEN_EXCH_DEBUG std::cerr << "Validating group " << grp->grpId << ", authorId = " << metaData.mAuthorId << std::endl; #endif // Check required permissions, and allow them to sign it - if they want too - as well! if ((!metaData.mAuthorId.isNull()) || checkAuthenFlag(pos, author_flag)) { needIdentitySign = true; #ifdef GEN_EXCH_DEBUG std::cerr << " Needs Identity sign! (Service Flags). Identity signing key is " << metaData.mAuthorId << std::endl; #endif } if(needIdentitySign) { if(mGixs) { bool haveKey = mGixs->haveKey(metaData.mAuthorId); if(haveKey) { #ifdef GEN_EXCH_DEBUG std::cerr << " have ID key in cache: yes" << std::endl; #endif RsTlvPublicRSAKey authorKey; bool auth_key_fetched = mGixs->getKey(metaData.mAuthorId, authorKey) ; if (auth_key_fetched) { RsTlvKeySignature sign = metaData.signSet.keySignSet[INDEX_AUTHEN_IDENTITY]; idValidate = GxsSecurity::validateNxsGrp(*grp, sign, authorKey); #ifdef GEN_EXCH_DEBUG std::cerr << " key ID validation result: " << idValidate << std::endl; #endif mGixs->timeStampKey(metaData.mAuthorId,RsIdentityUsage(mServType,RsIdentityUsage::GROUP_AUTHOR_SIGNATURE_VALIDATION,metaData.mGroupId)); } else { std::cerr << "RsGenExchange::validateGrp()"; std::cerr << " ERROR Cannot Retrieve AUTHOR KEY for Group Sign Validation"; std::cerr << std::endl; idValidate = false; } }else { #ifdef GEN_EXCH_DEBUG std::cerr << " have key in cache: no. Return VALIDATE_LATER" << std::endl; std::cerr << " requesting key " << metaData.mAuthorId << " to origin peer " << grp->PeerId() << std::endl; #endif std::list peers; peers.push_back(grp->PeerId()); mGixs->requestKey(metaData.mAuthorId, peers,RsIdentityUsage(mServType,RsIdentityUsage::GROUP_AUTHOR_SIGNATURE_VALIDATION,metaData.mGroupId)); return VALIDATE_FAIL_TRY_LATER; } } else { #ifdef GEN_EXCH_DEBUG std::cerr << " (EE) Gixs not enabled while request identity signature validation!" << std::endl; #endif idValidate = false; } } else { idValidate = true; } if(idValidate) return VALIDATE_SUCCESS; else return VALIDATE_FAIL; } bool RsGenExchange::checkAuthenFlag(const PrivacyBitPos& pos, const uint8_t& flag) const { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::checkMsgAuthenFlag(pos: " << pos << " flag: "; std::cerr << (int) flag << " mAuthenPolicy: " << mAuthenPolicy << ")"; std::cerr << std::endl; #endif switch(pos) { case PUBLIC_GRP_BITS: return mAuthenPolicy & flag; break; case RESTRICTED_GRP_BITS: return flag & (mAuthenPolicy >> RESTR_GRP_OFFSET); break; case PRIVATE_GRP_BITS: return flag & (mAuthenPolicy >> PRIV_GRP_OFFSET); break; case GRP_OPTION_BITS: return flag & (mAuthenPolicy >> GRP_OPTIONS_OFFSET); break; default: std::cerr << "pos option not recognised"; return false; } } static void addMessageChanged(std::map > &msgs, const std::map > &msgChanged) { if (msgs.empty()) { msgs = msgChanged; } else { std::map >::const_iterator mapIt; for (mapIt = msgChanged.begin(); mapIt != msgChanged.end(); ++mapIt) { const RsGxsGroupId &grpId = mapIt->first; const std::vector &srcMsgIds = mapIt->second; std::vector &destMsgIds = msgs[grpId]; std::vector::const_iterator msgIt; for (msgIt = srcMsgIds.begin(); msgIt != srcMsgIds.end(); ++msgIt) { if (std::find(destMsgIds.begin(), destMsgIds.end(), *msgIt) == destMsgIds.end()) { destMsgIds.push_back(*msgIt); } } } } } void RsGenExchange::receiveChanges(std::vector& changes) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::receiveChanges()" << std::endl; #endif RsGxsChanges out; out.mService = getTokenService(); // collect all changes in one GxsChanges object std::vector::iterator vit = changes.begin(); for(; vit != changes.end(); ++vit) { RsGxsNotify* n = *vit; RsGxsGroupChange* gc; RsGxsMsgChange* mc; if((mc = dynamic_cast(n)) != NULL) { if (mc->metaChange()) { addMessageChanged(out.mMsgsMeta, mc->msgChangeMap); } else { addMessageChanged(out.mMsgs, mc->msgChangeMap); } } else if((gc = dynamic_cast(n)) != NULL) { if(gc->metaChange()) { out.mGrpsMeta.splice(out.mGrpsMeta.end(), gc->mGrpIdList); } else { out.mGrps.splice(out.mGrps.end(), gc->mGrpIdList); } } else std::cerr << "(EE) Unknown changes type!!" << std::endl; delete n; } changes.clear() ; RsServer::notify()->notifyGxsChange(out); } bool RsGenExchange::subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe) { if(subscribe) setGroupSubscribeFlags(token, grpId, GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED, (GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED)); else setGroupSubscribeFlags(token, grpId, GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED, (GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED)); if(mNetService != NULL) mNetService->subscribeStatusChanged(grpId,subscribe) ; #ifdef GEN_EXCH_DEBUG else std::cerr << "(EE) No mNetService in RsGenExchange for service 0x" << std::hex << mServType << std::dec << std::endl; #endif return true; } bool RsGenExchange::getGroupStatistic(const uint32_t& token, GxsGroupStatistic& stats) { return mDataAccess->getGroupStatistic(token, stats); } bool RsGenExchange::getServiceStatistic(const uint32_t& token, GxsServiceStatistic& stats) { return mDataAccess->getServiceStatistic(token, stats); } bool RsGenExchange::getGroupList(const uint32_t &token, std::list &groupIds) { return mDataAccess->getGroupList(token, groupIds); } bool RsGenExchange::getMsgList(const uint32_t &token, GxsMsgIdResult &msgIds) { return mDataAccess->getMsgList(token, msgIds); } bool RsGenExchange::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds) { return mDataAccess->getMsgRelatedList(token, msgIds); } bool RsGenExchange::getGroupMeta(const uint32_t &token, std::list &groupInfo) { std::list metaL; bool ok = mDataAccess->getGroupSummary(token, metaL); RsGroupMetaData m; for( std::list::iterator lit = metaL.begin(); lit != metaL.end(); ++lit) { RsGxsGrpMetaData& gMeta = *(*lit); m = gMeta; RsGroupNetworkStats sts ; if(mNetService != NULL && mNetService->getGroupNetworkStats(gMeta.mGroupId,sts)) { m.mPop = sts.mSuppliers ; m.mVisibleMsgCount = sts.mMaxVisibleCount ; if((!(IS_GROUP_SUBSCRIBED(gMeta.mSubscribeFlags))) || gMeta.mLastPost == 0) m.mLastPost = sts.mLastGroupModificationTS ; } else { m.mPop= 0 ; m.mVisibleMsgCount = 0 ; } groupInfo.push_back(m); delete (*lit); } return ok; } bool RsGenExchange::getMsgMeta(const uint32_t &token, GxsMsgMetaMap &msgInfo) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::getMsgMeta(): retrieving meta data for token " << token << std::endl; #endif std::list metaL; GxsMsgMetaResult result; bool ok = mDataAccess->getMsgSummary(token, result); GxsMsgMetaResult::iterator mit = result.begin(); for(; mit != result.end(); ++mit) { std::vector& metaV = mit->second; std::vector& msgInfoV = msgInfo[mit->first]; std::vector::iterator vit = metaV.begin(); RsMsgMetaData meta; for(; vit != metaV.end(); ++vit) { RsGxsMsgMetaData& m = *(*vit); meta = m; msgInfoV.push_back(meta); delete *vit; } metaV.clear(); } return ok; } bool RsGenExchange::getMsgRelatedMeta(const uint32_t &token, GxsMsgRelatedMetaMap &msgMeta) { MsgRelatedMetaResult result; bool ok = mDataAccess->getMsgRelatedSummary(token, result); MsgRelatedMetaResult::iterator mit = result.begin(); for(; mit != result.end(); ++mit) { std::vector& metaV = mit->second; std::vector& msgInfoV = msgMeta[mit->first]; std::vector::iterator vit = metaV.begin(); RsMsgMetaData meta; for(; vit != metaV.end(); ++vit) { RsGxsMsgMetaData& m = *(*vit); meta = m; msgInfoV.push_back(meta); delete *vit; } metaV.clear(); } return ok; } bool RsGenExchange::getSerializedGroupData(uint32_t token, RsGxsGroupId& id, unsigned char *& data, uint32_t& size) { RS_STACK_MUTEX(mGenMtx) ; std::list nxsGrps; if(!mDataAccess->getGroupData(token, nxsGrps)) return false ; if(nxsGrps.size() != 1) { std::cerr << "(EE) getSerializedGroupData() got multiple groups in single request. This is unexpected." << std::endl; for(std::list::const_iterator it(nxsGrps.begin());it!=nxsGrps.end();++it) delete *it ; return false ; } RsNxsGrp *nxs_grp = *(nxsGrps.begin()); size = RsNxsSerialiser(mServType).size(nxs_grp); id = nxs_grp->metaData->mGroupId ; if(size > 1024*1024 || NULL==(data = (unsigned char *)rs_malloc(size))) { std::cerr << "(EE) getSerializedGroupData() cannot allocate mem chunk of size " << size << ". Too big, or no room." << std::endl; delete nxs_grp ; return false ; } return RsNxsSerialiser(mServType).serialise(nxs_grp,data,&size) ; } bool RsGenExchange::deserializeGroupData(unsigned char *data, uint32_t size, RsGxsGroupId* gId /*= nullptr*/) { RS_STACK_MUTEX(mGenMtx) ; RsItem *item = RsNxsSerialiser(mServType).deserialise(data, &size); RsNxsGrp *nxs_grp = dynamic_cast(item); if(item == NULL) { std::cerr << "(EE) RsGenExchange::deserializeGroupData(): cannot " << "deserialise this data. Something's wrong." << std::endl; delete item; return false; } if(mGrpPendingValidate.find(nxs_grp->grpId) != mGrpPendingValidate.end()) { std::cerr << "(WW) Group " << nxs_grp->grpId << " is already pending validation. Not adding again." << std::endl; return true; } if(gId) *gId = nxs_grp->grpId; mGrpPendingValidate.insert(std::make_pair(nxs_grp->grpId, GxsPendingItem(nxs_grp, nxs_grp->grpId,time(NULL)))); return true; } bool RsGenExchange::getGroupData(const uint32_t &token, std::vector& grpItem) { std::list nxsGrps; bool ok = mDataAccess->getGroupData(token, nxsGrps); std::list::iterator lit = nxsGrps.begin(); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::getGroupData() RsNxsGrp::len: " << nxsGrps.size(); std::cerr << std::endl; #endif if(ok) { for(; lit != nxsGrps.end(); ++lit) { RsTlvBinaryData& data = (*lit)->grp; RsItem* item = NULL; if(data.bin_len != 0) item = mSerialiser->deserialise(data.bin_data, &data.bin_len); if(item) { RsGxsGrpItem* gItem = dynamic_cast(item); if (gItem) { gItem->meta = *((*lit)->metaData); RsGroupNetworkStats sts ; if(mNetService && mNetService->getGroupNetworkStats(gItem->meta.mGroupId,sts)) { gItem->meta.mPop = sts.mSuppliers; gItem->meta.mVisibleMsgCount = sts.mMaxVisibleCount; // When the group is not subscribed, the last post value is not updated, because there's no message stored. As a consequence, // we rely on network statistics to give this value, but it is not as accurate as if it was locally computed, because of blocked // posts, friends not available, sync delays, etc. Similarly if the group has just been subscribed, the last post info is probably // uninitialised, so we will it too. if((!(IS_GROUP_SUBSCRIBED(gItem->meta.mSubscribeFlags))) || gItem->meta.mLastPost == 0) gItem->meta.mLastPost = sts.mLastGroupModificationTS ; } else { gItem->meta.mPop = 0; gItem->meta.mVisibleMsgCount = 0; } // Also check the group privacy flags. A while ago, it as possible to publish a group without privacy flags. Now it is not possible anymore. // As a consequence, it's important to supply a correct value in this flag before the data can be edited/updated. if((gItem->meta.mGroupFlags & GXS_SERV::FLAG_PRIVACY_MASK) == 0) { #ifdef GEN_EXCH_DEBUG std::cerr << "(WW) getGroupData(): mGroupFlags for group " << gItem->meta.mGroupId << " has incorrect value " << std::hex << gItem->meta.mGroupFlags << std::dec << ". Setting value to GXS_SERV::FLAG_PRIVACY_PUBLIC." << std::endl; #endif gItem->meta.mGroupFlags |= GXS_SERV::FLAG_PRIVACY_PUBLIC; } grpItem.push_back(gItem); } else { std::cerr << "RsGenExchange::getGroupData() deserialisation/dynamic_cast ERROR"; std::cerr << std::endl; delete item; } } else if(data.bin_len > 0) //std::cerr << "(EE) RsGenExchange::getGroupData() Item type is probably not handled. Data is: " << RsUtil::BinToHex((unsigned char*)data.bin_data,std::min(50u,data.bin_len)) << ((data.bin_len>50)?"...":"") << std::endl; std::cerr << "(EE) RsGenExchange::getGroupData() Item type is probably not handled. Data is: " << RsUtil::BinToHex((unsigned char*)data.bin_data,data.bin_len) << std::endl; delete *lit; } } return ok; } bool RsGenExchange::getMsgData(uint32_t token, GxsMsgDataMap &msgItems) { RS_STACK_MUTEX(mGenMtx) ; NxsMsgDataResult msgResult; bool ok = mDataAccess->getMsgData(token, msgResult); if(ok) { NxsMsgDataResult::iterator mit = msgResult.begin(); for(; mit != msgResult.end(); ++mit) { const RsGxsGroupId& grpId = mit->first; std::vector& gxsMsgItems = msgItems[grpId]; std::vector& nxsMsgsV = mit->second; std::vector::iterator vit = nxsMsgsV.begin(); for(; vit != nxsMsgsV.end(); ++vit) { RsNxsMsg*& msg = *vit; RsItem* item = NULL; if(msg->msg.bin_len != 0) item = mSerialiser->deserialise(msg->msg.bin_data, &msg->msg.bin_len); if (item) { RsGxsMsgItem* mItem = dynamic_cast(item); if (mItem) { mItem->meta = *((*vit)->metaData); // get meta info from nxs msg gxsMsgItems.push_back(mItem); } else { std::cerr << "RsGenExchange::getMsgData() deserialisation/dynamic_cast ERROR"; std::cerr << std::endl; delete item; } } else { std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR"; std::cerr << std::endl; } delete msg; } } } return ok; } bool RsGenExchange::getMsgRelatedData( uint32_t token, GxsMsgRelatedDataMap &msgItems ) { RS_STACK_MUTEX(mGenMtx) ; NxsMsgRelatedDataResult msgResult; bool ok = mDataAccess->getMsgRelatedData(token, msgResult); if(ok) { NxsMsgRelatedDataResult::iterator mit = msgResult.begin(); for(; mit != msgResult.end(); ++mit) { const RsGxsGrpMsgIdPair& msgId = mit->first; std::vector &gxsMsgItems = msgItems[msgId]; std::vector& nxsMsgsV = mit->second; std::vector::iterator vit = nxsMsgsV.begin(); for(; vit != nxsMsgsV.end(); ++vit) { RsNxsMsg*& msg = *vit; RsItem* item = NULL; if(msg->msg.bin_len != 0) item = mSerialiser->deserialise(msg->msg.bin_data, &msg->msg.bin_len); if (item) { RsGxsMsgItem* mItem = dynamic_cast(item); if (mItem) { mItem->meta = *((*vit)->metaData); // get meta info from nxs msg gxsMsgItems.push_back(mItem); } else { std::cerr << "RsGenExchange::getMsgRelatedData() deserialisation/dynamic_cast ERROR"; std::cerr << std::endl; delete item; } } else { std::cerr << "RsGenExchange::getMsgRelatedData() deserialisation ERROR"; std::cerr << std::endl; } delete msg; } } } return ok; } RsTokenService* RsGenExchange::getTokenService() { return mDataAccess; } bool RsGenExchange::setAuthenPolicyFlag(const uint8_t &msgFlag, uint32_t& authenFlag, const PrivacyBitPos &pos) { uint32_t temp = 0; temp = msgFlag; switch(pos) { case PUBLIC_GRP_BITS: authenFlag &= ~PUB_GRP_MASK; authenFlag |= temp; break; case RESTRICTED_GRP_BITS: authenFlag &= ~RESTR_GRP_MASK; authenFlag |= (temp << RESTR_GRP_OFFSET); break; case PRIVATE_GRP_BITS: authenFlag &= ~PRIV_GRP_MASK; authenFlag |= (temp << PRIV_GRP_OFFSET); break; case GRP_OPTION_BITS: authenFlag &= ~GRP_OPTIONS_MASK; authenFlag |= (temp << GRP_OPTIONS_OFFSET); break; default: std::cerr << "pos option not recognised"; return false; } return true; } void RsGenExchange::notifyNewGroups(std::vector &groups) { RS_STACK_MUTEX(mGenMtx) ; std::vector::iterator vit = groups.begin(); // store these for tick() to pick them up for(; vit != groups.end(); ++vit) { RsNxsGrp* grp = *vit; NxsGrpPendValidVect::iterator received = mGrpPendingValidate.find(grp->grpId); // drop group if you already have them // TODO: move this to nxs layer to save bandwidth if(received == mGrpPendingValidate.end()) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::notifyNewGroups() Received GrpId: " << grp->grpId; std::cerr << std::endl; #endif mGrpPendingValidate.insert(std::make_pair(grp->grpId, GxsPendingItem(grp, grp->grpId,time(NULL)))); } else { delete grp; } } } void RsGenExchange::notifyNewMessages(std::vector& messages) { RS_STACK_MUTEX(mGenMtx) ; // store these for tick() to pick them up for(uint32_t i=0;imsgId) ; // if we have msg already just delete it if(it == mMsgPendingValidate.end()) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::notifyNewMessages() Received Msg: "; std::cerr << " GrpId: " << msg->grpId; std::cerr << " MsgId: " << msg->msgId; std::cerr << std::endl; #endif RsGxsGrpMsgIdPair id; id.first = msg->grpId; id.second = msg->msgId; mMsgPendingValidate.insert(std::make_pair(msg->msgId,GxsPendingItem(msg, id,time(NULL)))); } else { #ifdef GEN_EXCH_DEBUG std::cerr << " message is already in pending validation list. dropping." << std::endl; #endif delete msg; } } } void RsGenExchange::notifyReceivePublishKey(const RsGxsGroupId &grpId) { RS_STACK_MUTEX(mGenMtx); RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PUBLISHKEY, false); gc->mGrpIdList.push_back(grpId); mNotifications.push_back(gc); } void RsGenExchange::notifyChangedGroupStats(const RsGxsGroupId &grpId) { RS_STACK_MUTEX(mGenMtx); RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, false); gc->mGrpIdList.push_back(grpId); mNotifications.push_back(gc); } bool RsGenExchange::checkGroupMetaConsistency(const RsGroupMetaData& meta) { std::cerr << "Checking group consistency:" << std::endl; if(meta.mGroupName.empty()) { std::cerr << "(EE) cannot create a group with no name." << std::endl; return false; } uint32_t gf = meta.mGroupFlags & GXS_SERV::FLAG_PRIVACY_MASK ; if(gf != GXS_SERV::FLAG_PRIVACY_PUBLIC && gf != GXS_SERV::FLAG_PRIVACY_RESTRICTED && gf != GXS_SERV::FLAG_PRIVACY_PRIVATE) { std::cerr << "(EE) mGroupFlags has incorrect value " << std::hex << meta.mGroupFlags << std::dec << ". A value among GXS_SERV::FLAG_PRIVACY_{PUBLIC,RESTRICTED,PRIVATE} is expected." << std::endl; return false ; } if(meta.mCircleType < GXS_CIRCLE_TYPE_PUBLIC || meta.mCircleType > GXS_CIRCLE_TYPE_YOUR_EYES_ONLY) { std::cerr << "(EE) mCircleType has incorrect value " << std::hex << meta.mCircleType << std::dec << ". A single value among GXS_CIRCLE_TYPE_{PUBLIC,EXTERNAL,YOUR_FRIENDS_ONLY,LOCAL,EXT_SELF,YOUR_EYES_ONLY} is expected." << std::endl; return false ; } if(meta.mCircleType == GXS_CIRCLE_TYPE_EXTERNAL) { if(!meta.mInternalCircle.isNull()) { std::cerr << "(EE) Group circle type is EXTERNAL, but an internal circle ID " << meta.mInternalCircle << " was supplied. This is an error." << std::endl; return false ; } if(meta.mCircleId.isNull()) { std::cerr << "(EE) Group circle type is EXTERNAL, but no external circle ID was supplied. meta.mCircleId is indeed empty. This is an error." << std::endl; return false ; } } if(meta.mCircleType == GXS_CIRCLE_TYPE_YOUR_FRIENDS_ONLY) { if(!meta.mCircleId.isNull()) { std::cerr << "(EE) Group circle type is YOUR_FRIENDS_ONLY, but an external circle ID " << meta.mCircleId << " was supplied. This is an error." << std::endl; return false ; } if(meta.mInternalCircle.isNull()) { std::cerr << "(EE) Group circle type is YOUR_FRIENDS_ONLY, but no internal circle ID was supplied. meta.mInternalCircle is indeed empty. This is an error." << std::endl; return false ; } } if(meta.mCircleType == GXS_CIRCLE_TYPE_EXT_SELF) { if(!meta.mCircleId.isNull()) { std::cerr << "(EE) Group circle type is EXT_SELF, but an external circle ID " << meta.mCircleId << " was supplied. This is an error." << std::endl; return false ; } if(!meta.mInternalCircle.isNull()) { std::cerr << "(EE) Group circle type is EXT_SELF, but an internal circle ID " << meta.mInternalCircle << " was supplied. This is an error." << std::endl; return false ; } } std::cerr << "Group is clean." << std::endl; return true ; } void RsGenExchange::publishGroup(uint32_t& token, RsGxsGrpItem *grpItem) { if(!checkGroupMetaConsistency(grpItem->meta)) { std::cerr << "(EE) Cannot publish group. Some information was not supplied." << std::endl; return ; } RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); GxsGrpPendingSign ggps(grpItem, token); mGrpsToPublish.push_back(ggps); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishGroup() token: " << token; std::cerr << std::endl; #endif } void RsGenExchange::updateGroup(uint32_t& token, RsGxsGrpItem* grpItem) { if(!checkGroupMetaConsistency(grpItem->meta)) { std::cerr << "(EE) Cannot update group. Some information was not supplied." << std::endl; return ; } RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); mGroupUpdatePublish.push_back(GroupUpdatePublish(grpItem, token)); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::updateGroup() token: " << token; std::cerr << std::endl; #endif } void RsGenExchange::deleteGroup(uint32_t& token, const RsGxsGroupId& grpId) { RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); mGroupDeletePublish.push_back(GroupDeletePublish(grpId, token)); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::deleteGroup() token: " << token; std::cerr << std::endl; #endif } void RsGenExchange::deleteMsgs(uint32_t& token, const GxsMsgReq& msgs) { RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); mMsgDeletePublish.push_back(MsgDeletePublish(msgs, token)); // This code below will suspend any requests of the deleted messages for 24 hrs. This of course only works // if all friend nodes consistently delete the messages in the mean time. if(mNetService != NULL) for(GxsMsgReq::const_iterator it(msgs.begin());it!=msgs.end();++it) for(uint32_t i=0;isecond.size();++i) mNetService->rejectMessage(it->second[i]) ; } void RsGenExchange::publishMsg(uint32_t& token, RsGxsMsgItem *msgItem) { RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); mMsgsToPublish.insert(std::make_pair(token, msgItem)); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishMsg() token: " << token; std::cerr << std::endl; #endif } uint32_t RsGenExchange::getDefaultSyncPeriod() { RS_STACK_MUTEX(mGenMtx) ; if(mNetService != NULL) return mNetService->getDefaultSyncAge(); else { std::cerr << "(EE) No network service available. Cannot get default sync period. " << std::endl; return 0; } } RsReputations::ReputationLevel RsGenExchange::minReputationForForwardingMessages(uint32_t group_sign_flags,uint32_t identity_sign_flags) { return RsNetworkExchangeService::minReputationForForwardingMessages(group_sign_flags,identity_sign_flags); } uint32_t RsGenExchange::getSyncPeriod(const RsGxsGroupId& grpId) { RS_STACK_MUTEX(mGenMtx) ; if(mNetService != NULL) return mNetService->getSyncAge(grpId); else return RS_GXS_DEFAULT_MSG_REQ_PERIOD; } bool RsGenExchange::getGroupNetworkStats(const RsGxsGroupId& grpId,RsGroupNetworkStats& stats) { return (!mNetService) || mNetService->getGroupNetworkStats(grpId,stats) ; } void RsGenExchange::setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) { if(mNetService != NULL) return mNetService->setSyncAge(grpId,age_in_secs) ; else std::cerr << "(EE) No network service available. Cannot set storage period. " << std::endl; } uint32_t RsGenExchange::getStoragePeriod(const RsGxsGroupId& grpId) { RS_STACK_MUTEX(mGenMtx) ; if(!mNetService) { #ifdef GEN_EXCH_DEBUG std::cerr << "(EE) No network service in service " << std::hex << serviceType() << std::dec << ": cannot read message storage time. Returning infinity." << std::endl; #endif return false ; } return mNetService->getKeepAge(grpId) ; } void RsGenExchange::setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) { if(mNetService != NULL) return mNetService->setKeepAge(grpId,age_in_secs) ; else std::cerr << "(EE) No network service available. Cannot set storage period. " << std::endl; } void RsGenExchange::setGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& flag, const uint32_t& mask) { /* TODO APPLY MASK TO FLAGS */ RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); GrpLocMetaData g; g.grpId = grpId; g.val.put(RsGeneralDataService::GRP_META_SUBSCRIBE_FLAG, (int32_t)flag); g.val.put(RsGeneralDataService::GRP_META_SUBSCRIBE_FLAG+GXS_MASK, (int32_t)mask); // HACK, need to perform mask operation in a non-blocking location mGrpLocMetaMap.insert(std::make_pair(token, g)); } void RsGenExchange::setGroupStatusFlags(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& status, const uint32_t& mask) { /* TODO APPLY MASK TO FLAGS */ RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); GrpLocMetaData g; g.grpId = grpId; g.val.put(RsGeneralDataService::GRP_META_STATUS, (int32_t)status); g.val.put(RsGeneralDataService::GRP_META_STATUS+GXS_MASK, (int32_t)mask); // HACK, need to perform mask operation in a non-blocking location mGrpLocMetaMap.insert(std::make_pair(token, g)); } void RsGenExchange::setGroupServiceString(uint32_t& token, const RsGxsGroupId& grpId, const std::string& servString) { RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); GrpLocMetaData g; g.grpId = grpId; g.val.put(RsGeneralDataService::GRP_META_SERV_STRING, servString); mGrpLocMetaMap.insert(std::make_pair(token, g)); } void RsGenExchange::setMsgStatusFlags(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const uint32_t& status, const uint32_t& mask) { /* TODO APPLY MASK TO FLAGS */ RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); MsgLocMetaData m; m.val.put(RsGeneralDataService::MSG_META_STATUS, (int32_t)status); m.val.put(RsGeneralDataService::MSG_META_STATUS+GXS_MASK, (int32_t)mask); // HACK, need to perform mask operation in a non-blocking location m.msgId = msgId; mMsgLocMetaMap.insert(std::make_pair(token, m)); } void RsGenExchange::setMsgServiceString(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const std::string& servString ) { RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); MsgLocMetaData m; m.val.put(RsGeneralDataService::MSG_META_SERV_STRING, servString); m.msgId = msgId; mMsgLocMetaMap.insert(std::make_pair(token, m)); } void RsGenExchange::processMsgMetaChanges() { std::map metaMap; { RS_STACK_MUTEX(mGenMtx); if (mMsgLocMetaMap.empty()) { return; } metaMap = mMsgLocMetaMap; mMsgLocMetaMap.clear(); } GxsMsgReq msgIds; std::map::iterator mit; for (mit = metaMap.begin(); mit != metaMap.end(); ++mit) { MsgLocMetaData& m = mit->second; int32_t value, mask; bool ok = true; bool changed = false; // for meta flag changes get flag to apply mask if(m.val.getAsInt32(RsGeneralDataService::MSG_META_STATUS, value)) { ok = false; if(m.val.getAsInt32(RsGeneralDataService::MSG_META_STATUS+GXS_MASK, mask)) { GxsMsgReq req; std::vector msgIdV; msgIdV.push_back(m.msgId.second); req.insert(std::make_pair(m.msgId.first, msgIdV)); GxsMsgMetaResult result; mDataStore->retrieveGxsMsgMetaData(req, result); GxsMsgMetaResult::iterator mit = result.find(m.msgId.first); if(mit != result.end()) { std::vector& msgMetaV = mit->second; if(!msgMetaV.empty()) { RsGxsMsgMetaData* meta = *(msgMetaV.begin()); value = (meta->mMsgStatus & ~mask) | (mask & value); changed = (static_cast(meta->mMsgStatus) != value); m.val.put(RsGeneralDataService::MSG_META_STATUS, value); delete meta; ok = true; } } m.val.removeKeyValue(RsGeneralDataService::MSG_META_STATUS+GXS_MASK); } } ok &= mDataStore->updateMessageMetaData(m) == 1; uint32_t token = mit->first; if(ok) { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); if (changed) { msgIds[m.msgId.first].push_back(m.msgId.second); } } else { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); } { RS_STACK_MUTEX(mGenMtx); mMsgNotify.insert(std::make_pair(token, m.msgId)); } } if (!msgIds.empty()) { RS_STACK_MUTEX(mGenMtx); RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, true); c->msgChangeMap = msgIds; mNotifications.push_back(c); } } void RsGenExchange::processGrpMetaChanges() { std::map metaMap; { RS_STACK_MUTEX(mGenMtx); if (mGrpLocMetaMap.empty()) { return; } metaMap = mGrpLocMetaMap; mGrpLocMetaMap.clear(); } std::list grpChanged; std::map::iterator mit; for (mit = metaMap.begin(); mit != metaMap.end(); ++mit) { GrpLocMetaData& g = mit->second; uint32_t token = mit->first; // process mask bool ok = processGrpMask(g.grpId, g.val); ok = ok && (mDataStore->updateGroupMetaData(g) == 1); if(ok) { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); grpChanged.push_back(g.grpId); }else { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); } { RS_STACK_MUTEX(mGenMtx); mGrpNotify.insert(std::make_pair(token, g.grpId)); } } if(!grpChanged.empty()) { RS_STACK_MUTEX(mGenMtx); RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, true); gc->mGrpIdList = grpChanged; mNotifications.push_back(gc); #ifdef GEN_EXCH_DEBUG std::cerr << " adding the following grp ids to notification: " << std::endl; for(std::list::const_iterator it(grpChanged.begin());it!=grpChanged.end();++it) std::cerr << " " << *it << std::endl; #endif } } bool RsGenExchange::processGrpMask(const RsGxsGroupId& grpId, ContentValue &grpCv) { // first find out which mask is involved int32_t value, mask, currValue; std::string key; RsGxsGrpMetaData* grpMeta = NULL; bool ok = false; std::map grpMetaMap; std::map::iterator mit; grpMetaMap.insert(std::make_pair(grpId, (RsGxsGrpMetaData*)(NULL))); mDataStore->retrieveGxsGrpMetaData(grpMetaMap); if((mit = grpMetaMap.find(grpId)) != grpMetaMap.end()) { grpMeta = mit->second; if (!grpMeta) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::processGrpMask() Ignore update for not existing grp id " << grpId.toStdString(); std::cerr << std::endl; #endif return false; } ok = true; } if(grpCv.getAsInt32(RsGeneralDataService::GRP_META_STATUS, value) && grpMeta) { key = RsGeneralDataService::GRP_META_STATUS; currValue = grpMeta->mGroupStatus; } else if(grpCv.getAsInt32(RsGeneralDataService::GRP_META_SUBSCRIBE_FLAG, value) && grpMeta) { key = RsGeneralDataService::GRP_META_SUBSCRIBE_FLAG; currValue = grpMeta->mSubscribeFlags; }else { if(grpMeta) delete grpMeta; return !(grpCv.empty()); } ok &= grpCv.getAsInt32(key+GXS_MASK, mask); // remove mask entry so it doesn't affect actual entry grpCv.removeKeyValue(key+GXS_MASK); // apply mask to current value value = (currValue & ~mask) | (value & mask); grpCv.put(key, value); if(grpMeta) delete grpMeta; return ok; } void RsGenExchange::publishMsgs() { RS_STACK_MUTEX(mGenMtx) ; time_t now = time(NULL); // stick back msgs pending signature typedef std::map > PendSignMap; PendSignMap::iterator sign_it = mMsgPendingSign.begin(); for(; sign_it != mMsgPendingSign.end(); ++sign_it) { GxsPendingItem& item = sign_it->second; mMsgsToPublish.insert(std::make_pair(sign_it->first, item.mItem)); } std::map > msgChangeMap; std::map::iterator mit = mMsgsToPublish.begin(); for(; mit != mMsgsToPublish.end(); ++mit) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishMsgs() Publishing a Message"; std::cerr << std::endl; #endif RsGxsMsgItem* msgItem = mit->second; const uint32_t& token = mit->first; uint32_t size = mSerialiser->size(msgItem); char* mData = new char[size]; bool serialOk = false; // for fatal sign creation bool createOk = false; // if sign requests to try later bool tryLater = false; serialOk = mSerialiser->serialise(msgItem, mData, &size); if(serialOk) { RsNxsMsg* msg = new RsNxsMsg(mServType); msg->grpId = msgItem->meta.mGroupId; msg->msg.setBinData(mData, size); // now create meta msg->metaData = new RsGxsMsgMetaData(); *(msg->metaData) = msgItem->meta; // assign time stamp msg->metaData->mPublishTs = time(NULL); // now intialise msg (sign it) uint8_t createReturn = createMessage(msg); if(createReturn == CREATE_FAIL) { createOk = false; } else if(createReturn == CREATE_FAIL_TRY_LATER) { PendSignMap::iterator pit = mMsgPendingSign.find(token); tryLater = true; // add to queue of messages waiting for a successful // sign attempt if(pit == mMsgPendingSign.end()) { GxsPendingItem gsi(msgItem, token,time(NULL)); mMsgPendingSign.insert(std::make_pair(token, gsi)); } else { // remove from attempts queue if over sign // attempts limit if(pit->second.mFirstTryTS + SIGN_MAX_WAITING_TIME < now) { std::cerr << "Pending signature grp=" << pit->second.mItem->meta.mGroupId << ", msg=" << pit->second.mItem->meta.mMsgId << ", has exceeded validation time limit. The author's key can probably not be obtained. This is unexpected." << std::endl; mMsgPendingSign.erase(token); tryLater = false; } } createOk = false; } else if(createReturn == CREATE_SUCCESS) { createOk = true; // erase from queue if it exists mMsgPendingSign.erase(token); } else // unknown return, just fail createOk = false; RsGxsMessageId msgId; RsGxsGroupId grpId = msgItem->meta.mGroupId; bool validSize = false; // check message not over single msg storage limit if(createOk) { validSize = mDataStore->validSize(msg); } if(createOk && validSize) { // empty orig msg id means this is the original // msg. // (csoler) Why are we doing this??? if(msg->metaData->mOrigMsgId.isNull()) { msg->metaData->mOrigMsgId = msg->metaData->mMsgId; } // now serialise meta data size = msg->metaData->serial_size(); char* metaDataBuff = new char[size]; bool s = msg->metaData->serialise(metaDataBuff, &size); s &= msg->meta.setBinData(metaDataBuff, size); msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED; msgId = msg->msgId; grpId = msg->grpId; msg->metaData->recvTS = time(NULL); // FIXTESTS global variable rsPeers not available in unittests! if(rsPeers) mRoutingClues[msg->metaData->mAuthorId].insert(rsPeers->getOwnId()) ; computeHash(msg->msg, msg->metaData->mHash); mDataAccess->addMsgData(msg); delete msg ; msgChangeMap[grpId].push_back(msgId); delete[] metaDataBuff; if(mNetService != NULL) mNetService->stampMsgServerUpdateTS(grpId) ; // add to published to allow acknowledgement mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(grpId, msgId))); mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); } else { // delete msg if create msg not ok delete msg; if(!tryLater) mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); std::cerr << "RsGenExchange::publishMsgs() failed to publish msg " << std::endl; } } else { std::cerr << "RsGenExchange::publishMsgs() failed to serialise msg " << std::endl; } delete[] mData; if(!tryLater) delete msgItem; } // clear msg item map as we're done publishing them and all // entries are invalid mMsgsToPublish.clear(); if(!msgChangeMap.empty()) { RsGxsMsgChange* ch = new RsGxsMsgChange(RsGxsNotify::TYPE_PUBLISH, false); ch->msgChangeMap = msgChangeMap; mNotifications.push_back(ch); } } RsGenExchange::ServiceCreate_Return RsGenExchange::service_CreateGroup(RsGxsGrpItem* /* grpItem */, RsTlvSecurityKeySet& /* keySet */) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::service_CreateGroup(): Does nothing" << std::endl; #endif return SERVICE_CREATE_SUCCESS; } #define PENDING_SIGN_TIMEOUT 10 // 5 seconds void RsGenExchange::processGroupUpdatePublish() { RS_STACK_MUTEX(mGenMtx) ; // get keys for group update publish // first build meta request map for groups to be updated std::map grpMeta; std::vector::iterator vit = mGroupUpdatePublish.begin(); for(; vit != mGroupUpdatePublish.end(); ++vit) { GroupUpdatePublish& gup = *vit; const RsGxsGroupId& groupId = gup.grpItem->meta.mGroupId; grpMeta.insert(std::make_pair(groupId, (RsGxsGrpMetaData*)(NULL))); } if(grpMeta.empty()) return; mDataStore->retrieveGxsGrpMetaData(grpMeta); // now vit = mGroupUpdatePublish.begin(); for(; vit != mGroupUpdatePublish.end(); ++vit) { GroupUpdatePublish& gup = *vit; const RsGxsGroupId& groupId = gup.grpItem->meta.mGroupId; std::map::iterator mit = grpMeta.find(groupId); RsGxsGrpMetaData* meta = NULL; if(mit == grpMeta.end() || mit->second == NULL) { std::cerr << "Error! could not find meta of old group to update!" << std::endl; mDataAccess->updatePublicRequestStatus(gup.mToken, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); delete gup.grpItem; continue; }else { meta = mit->second; } //gup.grpItem->meta = *meta; GxsGrpPendingSign ggps(gup.grpItem, gup.mToken); if(checkKeys(meta->keys)) { ggps.mKeys = meta->keys; GxsSecurity::createPublicKeysFromPrivateKeys(ggps.mKeys) ; ggps.mHaveKeys = true; ggps.mStartTS = time(NULL); ggps.mLastAttemptTS = 0; ggps.mIsUpdate = true; ggps.mToken = gup.mToken; mGrpsToPublish.push_back(ggps); } else { std::cerr << "(EE) publish group fails because RS cannot find the private publish and author keys" << std::endl; delete gup.grpItem; mDataAccess->updatePublicRequestStatus(gup.mToken, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); } delete meta; } mGroupUpdatePublish.clear(); } void RsGenExchange::processRoutingClues() { RS_STACK_MUTEX(mGenMtx) ; for(std::map >::const_iterator it = mRoutingClues.begin();it!=mRoutingClues.end();++it) for(std::set::const_iterator it2(it->second.begin());it2!=it->second.end();++it2) rsGRouter->addRoutingClue(GRouterKeyId(it->first),(*it2) ) ; mRoutingClues.clear() ; } void RsGenExchange::processGroupDelete() { RS_STACK_MUTEX(mGenMtx) ; // get keys for group delete publish typedef std::pair GrpNote; std::map toNotify; std::vector::iterator vit = mGroupDeletePublish.begin(); for(; vit != mGroupDeletePublish.end(); ++vit) { std::vector gprIds; gprIds.push_back(vit->mGroupId); mDataStore->removeGroups(gprIds); toNotify.insert(std::make_pair( vit->mToken, GrpNote(true, vit->mGroupId))); } std::list grpDeleted; std::map::iterator mit = toNotify.begin(); for(; mit != toNotify.end(); ++mit) { GrpNote& note = mit->second; uint8_t status = note.first ? RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE : RsTokenService::GXS_REQUEST_V2_STATUS_FAILED; mGrpNotify.insert(std::make_pair(mit->first, note.second)); mDataAccess->updatePublicRequestStatus(mit->first, status); if(note.first) grpDeleted.push_back(note.second); } if(!grpDeleted.empty()) { RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PUBLISH, false); gc->mGrpIdList = grpDeleted; mNotifications.push_back(gc); } mGroupDeletePublish.clear(); } void RsGenExchange::processMessageDelete() { RS_STACK_MUTEX(mGenMtx) ; #ifdef TODO typedef std::pair GrpNote; std::map toNotify; #endif for( std::vector::iterator vit = mMsgDeletePublish.begin(); vit != mMsgDeletePublish.end(); ++vit) { #ifdef TODO uint32_t token = (*vit).mToken; const RsGxsGroupId& groupId = gdp.grpItem->meta.mGroupId; toNotify.insert(std::make_pair( token, GrpNote(true, groupId))); #endif mDataStore->removeMsgs( (*vit).mMsgs ); } #warning csoler: TODO: notify for deleted messages #ifdef SUSPENDED std::list grpDeleted; std::map::iterator mit = toNotify.begin(); for(; mit != toNotify.end(); ++mit) { GrpNote& note = mit->second; uint8_t status = note.first ? RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE : RsTokenService::GXS_REQUEST_V2_STATUS_FAILED; mGrpNotify.insert(std::make_pair(mit->first, note.second)); mDataAccess->updatePublicRequestStatus(mit->first, status); if(note.first) grpDeleted.push_back(note.second); } if(!grpDeleted.empty()) { RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PUBLISH, false); gc->mGrpIdList = grpDeleted; mNotifications.push_back(gc); } #endif mMsgDeletePublish.clear(); } bool RsGenExchange::checkKeys(const RsTlvSecurityKeySet& keySet) { typedef std::map keyMap; const keyMap& allKeys = keySet.private_keys; keyMap::const_iterator cit = allKeys.begin(); bool adminFound = false, publishFound = false; for(; cit != allKeys.end(); ++cit) { const RsTlvPrivateRSAKey& key = cit->second; if(key.keyFlags & RSTLV_KEY_TYPE_FULL) // this one is not useful. Just a security. { if(key.keyFlags & RSTLV_KEY_DISTRIB_ADMIN) adminFound = true; if(key.keyFlags & RSTLV_KEY_DISTRIB_PUBLISH) publishFound = true; } else if(key.keyFlags & RSTLV_KEY_TYPE_PUBLIC_ONLY) // this one is not useful. Just a security. { std::cerr << "(EE) found a public only key in the private key list" << std::endl; return false ; } } // user must have both private and public parts of publish and admin keys return adminFound && publishFound; } void RsGenExchange::publishGrps() { std::list groups_to_subscribe ; { RS_STACK_MUTEX(mGenMtx) ; NxsGrpSignPendVect::iterator vit = mGrpsToPublish.begin(); typedef std::pair GrpNote; std::map toNotify; while( vit != mGrpsToPublish.end() ) { GxsGrpPendingSign& ggps = *vit; /* do intial checks to see if this entry has expired */ time_t now = time(NULL) ; uint32_t token = ggps.mToken; if(now > (ggps.mStartTS + PENDING_SIGN_TIMEOUT) ) { // timed out toNotify.insert(std::make_pair( token, GrpNote(false, RsGxsGroupId()))); delete ggps.mItem; vit = mGrpsToPublish.erase(vit); continue; } RsGxsGroupId grpId; RsNxsGrp* grp = new RsNxsGrp(mServType); RsGxsGrpItem* grpItem = ggps.mItem; RsTlvSecurityKeySet fullKeySet; if(!(ggps.mHaveKeys)) { generateGroupKeys(fullKeySet, true); ggps.mHaveKeys = true; ggps.mKeys = fullKeySet; } else fullKeySet = ggps.mKeys; // find private admin key RsTlvPrivateRSAKey privAdminKey; bool privKeyFound = false; for(std::map::iterator mit_keys = fullKeySet.private_keys.begin(); mit_keys != fullKeySet.private_keys.end(); ++mit_keys) { RsTlvPrivateRSAKey& key = mit_keys->second; if(key.keyFlags == (RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_FULL)) { privAdminKey = key; privKeyFound = true; } } uint8_t create = CREATE_FAIL; if(privKeyFound) { // get group id from private admin key id grpItem->meta.mGroupId = grp->grpId = RsGxsGroupId(privAdminKey.keyId); ServiceCreate_Return ret = service_CreateGroup(grpItem, fullKeySet); bool serialOk = false, servCreateOk; if(ret == SERVICE_CREATE_SUCCESS) { uint32_t size = mSerialiser->size(grpItem); char *gData = new char[size]; serialOk = mSerialiser->serialise(grpItem, gData, &size); grp->grp.setBinData(gData, size); delete[] gData; servCreateOk = true; }else { servCreateOk = false; } if(serialOk && servCreateOk) { grp->metaData = new RsGxsGrpMetaData(); grpItem->meta.mPublishTs = time(NULL); *(grp->metaData) = grpItem->meta; // TODO: change when publish key optimisation added (public groups don't have publish key grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_ADMIN | GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_PUBLISH; create = createGroup(grp, fullKeySet); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishGrps() "; std::cerr << " GrpId: " << grp->grpId; std::cerr << " CircleType: " << (uint32_t) grp->metaData->mCircleType; std::cerr << " CircleId: " << grp->metaData->mCircleId.toStdString(); std::cerr << std::endl; #endif if(create == CREATE_SUCCESS) { // Here we need to make sure that no private keys are included. This is very important since private keys // can be used to modify the group. Normally the private key set is whiped out by createGroup, but grp->metaData->keys.private_keys.clear() ; uint32_t mdSize = grp->metaData->serial_size(RS_GXS_GRP_META_DATA_CURRENT_API_VERSION); { RsTemporaryMemory metaData(mdSize); serialOk = grp->metaData->serialise(metaData, mdSize,RS_GXS_GRP_META_DATA_CURRENT_API_VERSION); #warning csoler: TODO: grp->meta should be renamed grp->public_meta ! grp->meta.setBinData(metaData, mdSize); } // Place back private keys for publisher and database storage grp->metaData->keys.private_keys = fullKeySet.private_keys; if(mDataStore->validSize(grp) && serialOk) { grpId = grp->grpId; computeHash(grp->grp, grp->metaData->mHash); grp->metaData->mRecvTS = time(NULL); if(ggps.mIsUpdate) mDataAccess->updateGroupData(grp); else mDataAccess->addGroupData(grp); delete grp ; groups_to_subscribe.push_back(grpId) ; } else { create = CREATE_FAIL; } } } else if(ret == SERVICE_CREATE_FAIL_TRY_LATER) { // if the service is not ready yet, reset the start timestamp to give the service more time // the service should have it's own timeout mechanism // services should return SERVICE_CREATE_FAIL if the action timed out // at the moment this is only important for the idservice: // the idservice may ask the user for a password, and the user needs time ggps.mStartTS = now; create = CREATE_FAIL_TRY_LATER; } else if(ret == SERVICE_CREATE_FAIL) create = CREATE_FAIL; } else { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishGrps() Could not find private publish keys " << std::endl; #endif create = CREATE_FAIL; } if(create == CREATE_FAIL) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishGrps() failed to publish grp " << std::endl; #endif delete grp; delete grpItem; vit = mGrpsToPublish.erase(vit); toNotify.insert(std::make_pair( token, GrpNote(false, grpId))); } else if(create == CREATE_FAIL_TRY_LATER) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishGrps() failed grp, trying again " << std::endl; #endif delete grp; ggps.mLastAttemptTS = time(NULL); ++vit; } else if(create == CREATE_SUCCESS) { delete grpItem; vit = mGrpsToPublish.erase(vit); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishGrps() ok -> pushing to notifies" << std::endl; #endif // add to published to allow acknowledgement toNotify.insert(std::make_pair(token, GrpNote(true,grpId))); } } std::map::iterator mit = toNotify.begin(); std::list grpChanged; for(; mit != toNotify.end(); ++mit) { GrpNote& note = mit->second; uint8_t status = note.first ? RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE : RsTokenService::GXS_REQUEST_V2_STATUS_FAILED; mGrpNotify.insert(std::make_pair(mit->first, note.second)); mDataAccess->updatePublicRequestStatus(mit->first, status); if(note.first) grpChanged.push_back(note.second); } if(!grpChanged.empty()) { RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PUBLISH, false); gc->mGrpIdList = grpChanged; mNotifications.push_back(gc); #ifdef GEN_EXCH_DEBUG std::cerr << " adding the following grp ids to notification: " << std::endl; for(std::list::const_iterator it(grpChanged.begin());it!=grpChanged.end();++it) std::cerr << " " << *it << std::endl; #endif } } // This is done off-mutex to avoid possible cross deadlocks with the net service. if(mNetService!=NULL) for(std::list::const_iterator it(groups_to_subscribe.begin());it!=groups_to_subscribe.end();++it) mNetService->subscribeStatusChanged((*it),true) ; } uint32_t RsGenExchange::generatePublicToken() { return mDataAccess->generatePublicToken(); } bool RsGenExchange::updatePublicRequestStatus(const uint32_t &token, const uint32_t &status) { return mDataAccess->updatePublicRequestStatus(token, status); } bool RsGenExchange::disposeOfPublicToken(const uint32_t &token) { return mDataAccess->disposeOfPublicToken(token); } RsGeneralDataService* RsGenExchange::getDataStore() { return mDataStore; } bool RsGenExchange::getGroupKeys(const RsGxsGroupId &grpId, RsTlvSecurityKeySet &keySet) { if(grpId.isNull()) return false; RS_STACK_MUTEX(mGenMtx) ; std::map grpMeta; grpMeta[grpId] = NULL; mDataStore->retrieveGxsGrpMetaData(grpMeta); if(grpMeta.empty()) return false; RsGxsGrpMetaData* meta = grpMeta[grpId]; if(meta == NULL) return false; keySet = meta->keys; GxsSecurity::createPublicKeysFromPrivateKeys(keySet) ; for(std::map::iterator it=grpMeta.begin();it!=grpMeta.end();++it) delete it->second ; return true; } void RsGenExchange::shareGroupPublishKey(const RsGxsGroupId& grpId,const std::set& peers) { if(grpId.isNull()) return ; mNetService->sharePublishKey(grpId,peers) ; } void RsGenExchange::processRecvdData() { processRecvdGroups(); processRecvdMessages(); performUpdateValidation(); } void RsGenExchange::computeHash(const RsTlvBinaryData& data, RsFileHash& hash) { pqihash pHash; pHash.addData(data.bin_data, data.bin_len); pHash.Complete(hash); } void RsGenExchange::processRecvdMessages() { std::list messages_to_reject ; { RS_STACK_MUTEX(mGenMtx) ; time_t now = time(NULL); #ifdef GEN_EXCH_DEBUG if(!mMsgPendingValidate.empty()) std::cerr << "processing received messages" << std::endl; #endif // 1 - First, make sure items metadata is deserialised, clean old failed items, and collect the groups Ids we have to check RsGxsGrpMetaTemporaryMap grpMetas; for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();) { GxsPendingItem& gpsi = pend_it->second; RsNxsMsg *msg = gpsi.mItem ; if(msg->metaData == NULL) { RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len))) msg->metaData = meta; else delete meta; } bool accept_new_msg = msg->metaData != NULL && acceptNewMessage(msg->metaData,msg->msg.bin_len); if(!accept_new_msg) messages_to_reject.push_back(msg->metaData->mMsgId); // This prevents reloading the message again at next sync. if(!accept_new_msg || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now) { std::cerr << "Pending validation grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", has exceeded validation time limit. The author's key can probably not be obtained. This is unexpected." << std::endl; delete gpsi.mItem; pend_it = mMsgPendingValidate.erase(pend_it); } else { grpMetas.insert(std::make_pair(pend_it->second.mItem->grpId, (RsGxsGrpMetaData*)NULL)); ++pend_it; } } // 2 - Retrieve the metadata for the associated groups. mDataStore->retrieveGxsGrpMetaData(grpMetas); GxsMsgReq msgIds; RsNxsMsgDataTemporaryList msgs_to_store; #ifdef GEN_EXCH_DEBUG std::cerr << " updating received messages:" << std::endl; #endif // 3 - Validate each message for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();) { RsNxsMsg* msg = pend_it->second.mItem; // (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because // RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process // will indefinitly loop on the same 20 posts. Since the posts are there already, keeping them is the least problematique way to fix this problem. // // uint32_t max_sync_age = ( mNetService != NULL)?( mNetService->getSyncAge(msg->metaData->mGroupId)):RS_GXS_DEFAULT_MSG_REQ_PERIOD; // // if(max_sync_age != 0 && msg->metaData->mPublishTs + max_sync_age < time(NULL)) // { // std::cerr << "(WW) not validating message " << msg->metaData->mMsgId << " in group " << msg->metaData->mGroupId << " because it is older than synchronisation limit. This message was probably sent by a friend node that does not accept sync limits already." << std::endl; // ok = false ; // } #ifdef GEN_EXCH_DEBUG std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; #endif std::map::iterator mit = grpMetas.find(msg->grpId); #ifdef GEN_EXCH_DEBUG std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl; #endif // validate msg if(mit == grpMetas.end()) { std::cerr << "RsGenExchange::processRecvdMessages(): impossible situation: grp meta " << msg->grpId << " not available." << std::endl; ++pend_it ; continue ; } RsGxsGrpMetaData *grpMeta = mit->second; GxsSecurity::createPublicKeysFromPrivateKeys(grpMeta->keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing. int validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys); #ifdef GEN_EXCH_DEBUG std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl; std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl; std::cerr << " message validation result: " << (int)validateReturn << std::endl; #endif if(validateReturn == VALIDATE_SUCCESS) { msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD; msgs_to_store.push_back(msg); std::vector &msgv = msgIds[msg->grpId]; if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end()) msgv.push_back(msg->msgId); computeHash(msg->msg, msg->metaData->mHash); msg->metaData->recvTS = time(NULL); #ifdef GEN_EXCH_DEBUG std::cerr << " new status flags: " << meta->mMsgStatus << std::endl; std::cerr << " computed hash: " << meta->mHash << std::endl; std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl; #endif if(!msg->metaData->mAuthorId.isNull()) mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ; } else if(validateReturn == VALIDATE_FAIL) { // In this case, we notify the network exchange service not to DL the message again, at least not yet. #ifdef GEN_EXCH_DEBUG std::cerr << "Validation failed for message id " << "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; #endif messages_to_reject.push_back(msg->msgId) ; delete msg ; } else if(validateReturn == VALIDATE_FAIL_TRY_LATER) { ++pend_it ; continue; } // Remove the entry from mMsgPendingValidate, but do not delete msg since it's either pushed into msg_to_store or deleted in the FAIL case! NxsMsgPendingVect::iterator tmp = pend_it ; ++tmp ; mMsgPendingValidate.erase(pend_it) ; pend_it = tmp ; } if(!msgIds.empty()) { #ifdef GEN_EXCH_DEBUG std::cerr << " removing existing and old messages from incoming list." << std::endl; #endif removeDeleteExistingMessages(msgs_to_store, msgIds); #ifdef GEN_EXCH_DEBUG std::cerr << " storing remaining messages" << std::endl; #endif mDataStore->storeMessage(msgs_to_store); RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false); c->msgChangeMap = msgIds; mNotifications.push_back(c); } } // Done off-mutex to avoid cross deadlocks in the netservice that might call the RsGenExchange as an observer.. if(mNetService != NULL) for(std::list::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it) mNetService->rejectMessage(*it) ; } bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; } bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t /*size*/ ) { return true; } void RsGenExchange::processRecvdGroups() { RS_STACK_MUTEX(mGenMtx) ; if(mGrpPendingValidate.empty()) return; #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::Processing received groups" << std::endl; #endif std::list grpIds; RsNxsGrpDataTemporaryList grps_to_store; // 1 - retrieve the existing groups so as to check what's not new std::vector existingGrpIds; mDataStore->retrieveGroupIds(existingGrpIds); // 2 - go through each and every new group data and validate the signatures. for(NxsGrpPendValidVect::iterator vit = mGrpPendingValidate.begin(); vit != mGrpPendingValidate.end();) { GxsPendingItem& gpsi = vit->second; RsNxsGrp* grp = gpsi.mItem; #ifdef GEN_EXCH_DEBUG std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl; #endif if(grp->metaData == NULL) { RsGxsGrpMetaData* meta = new RsGxsGrpMetaData(); if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len)) grp->metaData = meta ; else delete meta ; } // early deletion of group from the pending list if it's malformed, not accepted, or has been tried unsuccessfully for too long if(grp->metaData == NULL || !acceptNewGroup(grp->metaData) || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < time(NULL)) { NxsGrpPendValidVect::iterator tmp(vit) ; ++tmp ; delete grp ; mGrpPendingValidate.erase(vit) ; vit = tmp ; continue; } // group signature validation uint8_t ret = validateGrp(grp); if(ret == VALIDATE_SUCCESS) { grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD; computeHash(grp->grp, grp->metaData->mHash); // group has been validated. Let's notify the global router for the clue if(!grp->metaData->mAuthorId.isNull()) { #ifdef GEN_EXCH_DEBUG std::cerr << "Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl; #endif mRoutingClues[grp->metaData->mAuthorId].insert(grp->PeerId()) ; } // This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates // of grp metadata arrive. grp->metaData->mRecvTS = time(NULL); // now check if group already exists if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end()) { grp->metaData->mOriginator = grp->PeerId(); grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED; grps_to_store.push_back(grp); grpIds.push_back(grp->grpId); } else { GroupUpdate update; update.newGrp = grp; mGroupUpdates.push_back(update); } } else if(ret == VALIDATE_FAIL) { #ifdef GEN_EXCH_DEBUG std::cerr << " failed to validate incoming meta, grpId: " << grp->grpId << ": wrong signature" << std::endl; #endif delete grp; } else if(ret == VALIDATE_FAIL_TRY_LATER) { #ifdef GEN_EXCH_DEBUG std::cerr << " failed to validate incoming grp, trying again later. grpId: " << grp->grpId << std::endl; #endif ++vit ; continue; } // Erase entry from the list NxsGrpPendValidVect::iterator tmp(vit) ; ++tmp ; mGrpPendingValidate.erase(vit) ; vit = tmp ; } if(!grpIds.empty()) { RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, false); c->mGrpIdList = grpIds; mNotifications.push_back(c); mDataStore->storeGroup(grps_to_store); #ifdef GEN_EXCH_DEBUG std::cerr << " adding the following grp ids to notification: " << std::endl; for(std::list::const_iterator it(grpIds.begin());it!=grpIds.end();++it) std::cerr << " " << *it << std::endl; #endif } } void RsGenExchange::performUpdateValidation() { RS_STACK_MUTEX(mGenMtx) ; if(mGroupUpdates.empty()) return; #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::performUpdateValidation() " << std::endl; #endif std::map grpMetas; std::vector::iterator vit = mGroupUpdates.begin(); for(; vit != mGroupUpdates.end(); ++vit) grpMetas.insert(std::make_pair(vit->newGrp->grpId, (RsGxsGrpMetaData*)NULL)); if(!grpMetas.empty()) mDataStore->retrieveGxsGrpMetaData(grpMetas); else return; vit = mGroupUpdates.begin(); for(; vit != mGroupUpdates.end(); ++vit) { GroupUpdate& gu = *vit; std::map::iterator mit = grpMetas.find(gu.newGrp->grpId); gu.oldGrpMeta = mit->second; gu.validUpdate = updateValid(*(gu.oldGrpMeta), *(gu.newGrp)); } #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::performUpdateValidation() " << std::endl; #endif vit = mGroupUpdates.begin(); RsNxsGrpDataTemporaryList grps ; for(; vit != mGroupUpdates.end(); ++vit) { GroupUpdate& gu = *vit; if(gu.validUpdate) { if(gu.newGrp->metaData->mCircleType == GXS_CIRCLE_TYPE_YOUR_FRIENDS_ONLY) gu.newGrp->metaData->mOriginator = gu.newGrp->PeerId(); // Keep subscriptionflag to what it was. This avoids clearing off the flag when updates to group meta information // is received. gu.newGrp->metaData->mSubscribeFlags = gu.oldGrpMeta->mSubscribeFlags ; grps.push_back(gu.newGrp); } else { delete gu.newGrp; gu.newGrp = NULL ; } delete gu.oldGrpMeta; gu.oldGrpMeta = NULL ; } // notify the client RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, true); for(uint32_t i=0;imGrpIdList.push_back(mGroupUpdates[i].newGrp->grpId) ; #ifdef GEN_EXCH_DEBUG std::cerr << " " << mGroupUpdates[i].newGrp->grpId << std::endl; #endif } mNotifications.push_back(c); // Warning: updateGroup will destroy the objects in grps. Dont use it afterwards! mDataStore->updateGroup(grps); #ifdef GEN_EXCH_DEBUG std::cerr << " adding the following grp ids to notification: " << std::endl; #endif // cleanup mGroupUpdates.clear(); } bool RsGenExchange::updateValid(RsGxsGrpMetaData& oldGrpMeta, RsNxsGrp& newGrp) const { std::map& signSet = newGrp.metaData->signSet.keySignSet; std::map::iterator mit = signSet.find(INDEX_AUTHEN_ADMIN); if(mit == signSet.end()) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::updateValid() new admin sign not found! " << std::endl; std::cerr << "RsGenExchange::updateValid() grpId: " << oldGrpMeta.mGroupId << std::endl; #endif return false; } RsTlvKeySignature adminSign = mit->second; GxsSecurity::createPublicKeysFromPrivateKeys(oldGrpMeta.keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing. std::map& keys = oldGrpMeta.keys.public_keys; std::map::iterator keyMit = keys.find(RsGxsId(oldGrpMeta.mGroupId)); if(keyMit == keys.end()) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::updateValid() admin key not found! " << std::endl; #endif return false; } // also check this is the latest published group bool latest = newGrp.metaData->mPublishTs > oldGrpMeta.mPublishTs; mGixs->timeStampKey(newGrp.metaData->mAuthorId, RsIdentityUsage(mServType,RsIdentityUsage::GROUP_ADMIN_SIGNATURE_CREATION, oldGrpMeta.mGroupId)) ; return GxsSecurity::validateNxsGrp(newGrp, adminSign, keyMit->second) && latest; } void RsGenExchange::setGroupReputationCutOff(uint32_t& token, const RsGxsGroupId& grpId, int CutOff) { RS_STACK_MUTEX(mGenMtx) ; token = mDataAccess->generatePublicToken(); GrpLocMetaData g; g.grpId = grpId; g.val.put(RsGeneralDataService::GRP_META_CUTOFF_LEVEL, (int32_t)CutOff); mGrpLocMetaMap.insert(std::make_pair(token, g)); } void RsGenExchange::removeDeleteExistingMessages( std::list& msgs, GxsMsgReq& msgIdsNotify) { // first get grp ids of messages to be stored RsGxsGroupId::std_set mGrpIdsUnique; for(std::list::const_iterator cit = msgs.begin(); cit != msgs.end(); ++cit) mGrpIdsUnique.insert((*cit)->metaData->mGroupId); //RsGxsGroupId::std_list grpIds(mGrpIdsUnique.begin(), mGrpIdsUnique.end()); //RsGxsGroupId::std_list::const_iterator it = grpIds.begin(); typedef std::map MsgIdReq; MsgIdReq msgIdReq; // now get a list of all msgs ids for each group for(RsGxsGroupId::std_set::const_iterator it(mGrpIdsUnique.begin()); it != mGrpIdsUnique.end(); ++it) { mDataStore->retrieveMsgIds(*it, msgIdReq[*it]); #ifdef GEN_EXCH_DEBUG const std::vector& vec(msgIdReq[*it]) ; std::cerr << " retrieved " << vec.size() << " message ids for group " << *it << std::endl; for(uint32_t i=0;i::iterator cit2 = msgs.begin(); cit2 != msgs.end();) { const RsGxsMessageId::std_vector& msgIds = msgIdReq[(*cit2)->metaData->mGroupId]; #ifdef GEN_EXCH_DEBUG std::cerr << " grpid=" << cit2->second->mGroupId << ", msgid=" << cit2->second->mMsgId ; #endif // Avoid storing messages that are already in the database, as well as messages that are too old (or generally do not pass the database storage test) // if(std::find(msgIds.begin(), msgIds.end(), (*cit2)->metaData->mMsgId) != msgIds.end() || !messagePublicationTest( *(*cit2)->metaData)) { // msg exist in retrieved index. We should use a std::set here instead of a vector. RsGxsMessageId::std_vector& notifyIds = msgIdsNotify[ (*cit2)->metaData->mGroupId]; RsGxsMessageId::std_vector::iterator it2 = std::find(notifyIds.begin(), notifyIds.end(), (*cit2)->metaData->mMsgId); if(it2 != notifyIds.end()) { notifyIds.erase(it2); if (notifyIds.empty()) { msgIdsNotify.erase( (*cit2)->metaData->mGroupId); } } #ifdef GEN_EXCH_DEBUG std::cerr << " discarding " << cit2->second->mMsgId << std::endl; #endif delete *cit2; cit2 = msgs.erase(cit2); } else ++cit2; } }