/* * libretroshare/src/gxs: rsgenexchange.cc * * RetroShare Gxs exchange interface. * * Copyright 2012-2012 by Christopher Evi-Parker, Robert Fernie * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License Version 2 as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Please report all bugs and problems to "retroshare@lunamutt.com". * */ #include #include #include #include #include "pqi/pqihash.h" #include "rsgenexchange.h" #include "gxssecurity.h" #include "util/contentvalue.h" #include "rsgxsflags.h" RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns, RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs) : mGenMtx("GenExchange"), mDataStore(gds), mNetService(ns), mSerialiser(serviceSerialiser), mServType(servType), mGixs(gixs) { mDataAccess = new RsGxsDataAccess(gds); } RsGenExchange::~RsGenExchange() { // need to destruct in a certain order (prob a bad thing!) delete mNetService; delete mDataAccess; mDataAccess = NULL; delete mDataStore; mDataStore = NULL; } void RsGenExchange::run() { double timeDelta = 0.06; // slow tick while(true) { tick(); #ifndef WINDOWS_SYS usleep((int) (timeDelta * 1000000)); #else Sleep((int) (timeDelta * 1000)); #endif } } void RsGenExchange::tick() { mDataAccess->processRequests(); publishGrps(); publishMsgs(); processGrpMetaChanges(); processMsgMetaChanges(); processRecvdData(); if(!mNotifications.empty()) { notifyChanges(mNotifications); mNotifications.clear(); } // implemented service tick function service_tick(); } bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token, RsGxsGrpMsgIdPair& msgId) { RsStackMutex stack(mGenMtx); std::map::iterator mit = mMsgNotify.find(token); if(mit == mMsgNotify.end()) { return false; } msgId = mit->second; // no dump token as client has ackowledged its completion mDataAccess->disposeOfPublicToken(token); return true; } bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grpId) { RsStackMutex stack(mGenMtx); std::map::iterator mit = mGrpNotify.find(token); if(mit == mGrpNotify.end()) return false; grpId = mit->second; // no dump token as client has ackowledged its completion mDataAccess->disposeOfPublicToken(token); return true; } void RsGenExchange::generateGroupKeys(RsTlvSecurityKeySet& keySet) { /* create Keys */ // admin keys RSA *rsa_admin = RSA_generate_key(2048, 65537, NULL, NULL); RSA *rsa_admin_pub = RSAPublicKey_dup(rsa_admin); // publish keys RSA *rsa_publish = RSA_generate_key(2048, 65537, NULL, NULL); RSA *rsa_publish_pub = RSAPublicKey_dup(rsa_admin); /* set keys */ RsTlvSecurityKey adminKey, privAdminKey; /* set publish keys */ RsTlvSecurityKey pubKey, privPubKey; GxsSecurity::setRSAPublicKey(adminKey, rsa_admin_pub); GxsSecurity::setRSAPrivateKey(privAdminKey, rsa_admin); GxsSecurity::setRSAPublicKey(pubKey, rsa_publish_pub); GxsSecurity::setRSAPrivateKey(privPubKey, rsa_publish); adminKey.startTS = time(NULL); adminKey.endTS = 0; /* no end */ privAdminKey.startTS = time(NULL); privAdminKey.endTS = 0; /* no end */ pubKey.startTS = time(NULL); pubKey.endTS = 0; /* no end */ privPubKey.startTS = time(NULL); privPubKey.endTS = 0; /* no end */ // for now all public adminKey.keyFlags = RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_PUBLIC_ONLY; privAdminKey.keyFlags = RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_FULL; // for now all public pubKey.keyFlags = RSTLV_KEY_DISTRIB_PUBLIC | RSTLV_KEY_TYPE_PUBLIC_ONLY; privPubKey.keyFlags = RSTLV_KEY_DISTRIB_PRIVATE | RSTLV_KEY_TYPE_FULL; keySet.keys[adminKey.keyId] = adminKey; keySet.keys[pubKey.keyId] = pubKey; keySet.keys[privAdminKey.keyId] = privAdminKey; keySet.keys[privPubKey.keyId] = privPubKey; // clean up RSA_free(rsa_admin); RSA_free(rsa_admin_pub); RSA_free(rsa_publish); RSA_free(rsa_publish_pub); } bool RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& keySet) { RsGxsGrpMetaData* meta = grp->metaData; /* add keys to grp */ meta->keys = keySet; // find private admin key RsTlvSecurityKey privAdminKey; std::map::iterator mit = keySet.keys.begin(); for(; mit != keySet.keys.end(); mit++) { RsTlvSecurityKey& pk = mit->second; if(pk.keyFlags & (RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_FULL)) { privAdminKey = pk; break; } } if(mit == keySet.keys.end()) return false; // group is self signing // for the creation of group signature // only public admin and publish keys are present // key set uint32_t metaDataLen = meta->serial_size(); uint32_t allGrpDataLen = metaDataLen + grp->grp.bin_len; char* metaData = new char[metaDataLen]; char* allGrpData = new char[allGrpDataLen]; // msgData + metaData meta->serialise(metaData, metaDataLen); // copy msg data and meta in allMsgData buffer memcpy(allGrpData, grp->grp.bin_data, grp->grp.bin_len); memcpy(allGrpData+(grp->grp.bin_len), metaData, metaDataLen); RsTlvKeySignature adminSign; bool ok = GxsSecurity::getSignature(allGrpData, allGrpDataLen, &privAdminKey, adminSign); // add admin sign to grpMeta meta->signSet.keySignSet[GXS_SERV::FLAG_AUTHEN_ADMIN] = adminSign; grp->grpId = meta->mGroupId = privAdminKey.keyId; // clean up delete[] allGrpData; delete[] metaData; return ok; } bool RsGenExchange::createMessage(RsNxsMsg* msg) { const RsGxsGroupId& id = msg->grpId; std::map metaMap; metaMap.insert(std::make_pair(id, (RsGxsGrpMetaData*)(NULL))); mDataStore->retrieveGxsGrpMetaData(metaMap); bool ok = true; RSA* rsa_pub = NULL; if(!metaMap[id]) { return false; } else { // get publish key RsGxsGrpMetaData* grpMeta = metaMap[id]; // public and shared is publish key RsTlvSecurityKeySet& keys = grpMeta->keys; RsTlvSecurityKey* pubKey; std::map::iterator mit = keys.keys.begin(), mit_end = keys.keys.end(); bool pub_key_found = false; for(; mit != mit_end; mit++) { pub_key_found = mit->second.keyFlags & (RSTLV_KEY_DISTRIB_PRIVATE | RSTLV_KEY_TYPE_FULL); if(pub_key_found) break; } if(pub_key_found) { RsGxsMsgMetaData &meta = *(msg->metaData); uint32_t metaDataLen = meta.serial_size(); uint32_t allMsgDataLen = metaDataLen + msg->msg.bin_len; char* metaData = new char[metaDataLen]; char* allMsgData = new char[allMsgDataLen]; // msgData + metaData meta.serialise(metaData, &metaDataLen); // copy msg data and meta in allmsgData buffer memcpy(allMsgData, msg->msg.bin_data, msg->msg.bin_len); memcpy(allMsgData+(msg->msg.bin_len), metaData, metaDataLen); // private publish key pubKey = &(mit->second); RsTlvKeySignatureSet& signSet = meta.signSet; RsTlvKeySignature pubSign = signSet.keySignSet[GXS_SERV::FLAG_AUTHEN_PUBLISH]; GxsSecurity::getSignature(allMsgData, allMsgDataLen, pubKey, pubSign); // get hash of msg data to create msg id pqihash hash; hash.addData(allMsgData, allMsgDataLen); hash.Complete(msg->msgId); // assign msg id to msg meta msg->metaData->mMsgId = msg->msgId; //place signature in msg meta signSet.keySignSet[GXS_SERV::FLAG_AUTHEN_PUBLISH] = pubSign; // clean up delete[] metaData; delete[] allMsgData; } else { ok = false; } delete grpMeta; } return ok; } bool RsGenExchange::getGroupList(const uint32_t &token, std::list &groupIds) { return mDataAccess->getGroupList(token, groupIds); } bool RsGenExchange::getMsgList(const uint32_t &token, GxsMsgIdResult &msgIds) { return mDataAccess->getMsgList(token, msgIds); } bool RsGenExchange::getGroupMeta(const uint32_t &token, std::list &groupInfo) { std::list metaL; bool ok = mDataAccess->getGroupSummary(token, metaL); std::list::iterator lit = metaL.begin(); RsGroupMetaData m; for(; lit != metaL.end(); lit++) { RsGxsGrpMetaData& gMeta = *(*lit); m = gMeta; groupInfo.push_back(m); delete (*lit); } return ok; } bool RsGenExchange::getMsgMeta(const uint32_t &token, GxsMsgMetaMap &msgInfo) { std::list metaL; GxsMsgMetaResult result; bool ok = mDataAccess->getMsgSummary(token, result); GxsMsgMetaResult::iterator mit = result.begin(); for(; mit != result.end(); mit++) { std::vector& metaV = mit->second; msgInfo[mit->first] = std::vector(); std::vector& msgInfoV = msgInfo[mit->first]; std::vector::iterator vit = metaV.begin(); RsMsgMetaData meta; for(; vit != metaV.end(); vit++) { RsGxsMsgMetaData& m = *(*vit); meta = m; msgInfoV.push_back(meta); delete *vit; } metaV.clear(); } return ok; } bool RsGenExchange::getGroupData(const uint32_t &token, std::vector& grpItem) { std::list nxsGrps; bool ok = mDataAccess->getGroupData(token, nxsGrps); std::list::iterator lit = nxsGrps.begin(); if(ok) { for(; lit != nxsGrps.end(); lit++) { RsTlvBinaryData& data = (*lit)->grp; RsItem* item = mSerialiser->deserialise(data.bin_data, &data.bin_len); if(item != NULL) { RsGxsGrpItem* gItem = dynamic_cast(item); gItem->meta = *((*lit)->metaData); grpItem.push_back(gItem); delete *lit; } } } return ok; } bool RsGenExchange::getMsgData(const uint32_t &token, GxsMsgDataMap &msgItems) { RsStackMutex stack(mGenMtx); NxsMsgDataResult msgResult; bool ok = mDataAccess->getMsgData(token, msgResult); NxsMsgDataResult::iterator mit = msgResult.begin(); if(ok) { for(; mit != msgResult.end(); mit++) { std::vector gxsMsgItems; const RsGxsGroupId& grpId = mit->first; std::vector& nxsMsgsV = mit->second; std::vector::iterator vit = nxsMsgsV.begin(); for(; vit != nxsMsgsV.end(); vit++) { RsNxsMsg*& msg = *vit; RsItem* item = mSerialiser->deserialise(msg->msg.bin_data, &msg->msg.bin_len); RsGxsMsgItem* mItem = dynamic_cast(item); mItem->meta = *((*vit)->metaData); // get meta info from nxs msg gxsMsgItems.push_back(mItem); delete msg; } msgItems[grpId] = gxsMsgItems; } } return ok; } RsTokenService* RsGenExchange::getTokenService() { return mDataAccess; } void RsGenExchange::notifyNewGroups(std::vector &groups) { std::vector::iterator vit = groups.begin(); // store these for tick() to pick them up for(; vit != groups.end(); vit++) mReceivedGrps.push_back(*vit); } void RsGenExchange::notifyNewMessages(std::vector& messages) { std::vector::iterator vit = messages.begin(); // store these for tick() to pick them up for(; vit != messages.end(); vit++) mReceivedMsgs.push_back(*vit); } void RsGenExchange::publishGroup(uint32_t& token, RsGxsGrpItem *grpItem) { RsStackMutex stack(mGenMtx); token = mDataAccess->generatePublicToken(); mGrpsToPublish.insert(std::make_pair(token, grpItem)); } void RsGenExchange::publishMsg(uint32_t& token, RsGxsMsgItem *msgItem) { RsStackMutex stack(mGenMtx); token = mDataAccess->generatePublicToken(); mMsgsToPublish.insert(std::make_pair(token, msgItem)); } void RsGenExchange::setGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& flag, const uint32_t& mask) { /* TODO APPLY MASK TO FLAGS */ RsStackMutex stack(mGenMtx); token = mDataAccess->generatePublicToken(); GrpLocMetaData g; g.grpId = grpId; g.val.put(RsGeneralDataService::GRP_META_SUBSCRIBE_FLAG, (int32_t)flag); mGrpLocMetaMap.insert(std::make_pair(token, g)); } void RsGenExchange::setGroupStatusFlags(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& status, const uint32_t& mask) { /* TODO APPLY MASK TO FLAGS */ RsStackMutex stack(mGenMtx); token = mDataAccess->generatePublicToken(); GrpLocMetaData g; g.grpId = grpId; g.val.put(RsGeneralDataService::GRP_META_STATUS, (int32_t)status); mGrpLocMetaMap.insert(std::make_pair(token, g)); } void RsGenExchange::setGroupServiceString(uint32_t& token, const RsGxsGroupId& grpId, const std::string& servString) { RsStackMutex stack(mGenMtx); token = mDataAccess->generatePublicToken(); GrpLocMetaData g; g.grpId = grpId; g.val.put(RsGeneralDataService::GRP_META_SERV_STRING, servString); mGrpLocMetaMap.insert(std::make_pair(token, g)); } void RsGenExchange::setMsgStatusFlags(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const uint32_t& status, const uint32_t& mask) { /* TODO APPLY MASK TO FLAGS */ RsStackMutex stack(mGenMtx); token = mDataAccess->generatePublicToken(); MsgLocMetaData m; m.val.put(RsGeneralDataService::MSG_META_STATUS, (int32_t)status); m.msgId = msgId; mMsgLocMetaMap.insert(std::make_pair(token, m)); } void RsGenExchange::setMsgServiceString(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const std::string& servString ) { RsStackMutex stack(mGenMtx); token = mDataAccess->generatePublicToken(); MsgLocMetaData m; m.val.put(RsGeneralDataService::MSG_META_SERV_STRING, servString); m.msgId = msgId; mMsgLocMetaMap.insert(std::make_pair(token, m)); } void RsGenExchange::processMsgMetaChanges() { RsStackMutex stack(mGenMtx); std::map::iterator mit = mMsgLocMetaMap.begin(), mit_end = mMsgLocMetaMap.end(); for(; mit != mit_end; mit++) { MsgLocMetaData& m = mit->second; bool ok = mDataStore->updateMessageMetaData(m) == 1; uint32_t token = mit->first; if(ok) { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); }else { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); } mMsgNotify.insert(std::make_pair(token, m.msgId)); } mMsgLocMetaMap.clear(); } void RsGenExchange::processGrpMetaChanges() { RsStackMutex stack(mGenMtx); std::map::iterator mit = mGrpLocMetaMap.begin(), mit_end = mGrpLocMetaMap.end(); for(; mit != mit_end; mit++) { GrpLocMetaData& g = mit->second; uint32_t token = mit->first; bool ok = mDataStore->updateGroupMetaData(g) == 1; if(ok) { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); }else { mDataAccess->updatePublicRequestStatus(token, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); } mGrpNotify.insert(std::make_pair(token, g.grpId)); } mGrpLocMetaMap.clear(); } void RsGenExchange::publishMsgs() { RsStackMutex stack(mGenMtx); std::map::iterator mit = mMsgsToPublish.begin(); for(; mit != mMsgsToPublish.end(); mit++) { RsNxsMsg* msg = new RsNxsMsg(mServType); RsGxsMsgItem* msgItem = mit->second; msg->grpId = msgItem->meta.mGroupId; uint32_t size = mSerialiser->size(msgItem); char* mData = new char[size]; bool serialOk = false; bool createOk = false; serialOk = mSerialiser->serialise(msgItem, mData, &size); if(serialOk) { msg->msg.setBinData(mData, size); // now create meta msg->metaData = new RsGxsMsgMetaData(); *(msg->metaData) = msgItem->meta; // assign time stamp msg->metaData->mPublishTs = time(NULL); // now intialise msg (sign it) createOk = createMessage(msg); RsGxsMessageId msgId; RsGxsGroupId grpId = msgItem->meta.mGroupId; bool msgDoesnExist = false; if(createOk) { GxsMsgReq req; std::vector msgV; msgV.push_back(msg->msgId); GxsMsgMetaResult res; req.insert(std::make_pair(msg->grpId, msgV)); mDataStore->retrieveGxsMsgMetaData(req, res); msgDoesnExist = res[grpId].empty(); } if(createOk && msgDoesnExist) { // empty orig msg id means this is the original // msg // TODO: a non empty msgid means one should at least // have the msg on disk, after which this msg is signed // based on the security settings // public grp (sign by grp public pub key, private/id: signed by // id if(msg->metaData->mOrigMsgId.empty()) { msg->metaData->mOrigMsgId = msg->metaData->mMsgId; } // now serialise meta data size = msg->metaData->serial_size(); char metaDataBuff[size]; msg->metaData->serialise(metaDataBuff, &size); msg->meta.setBinData(metaDataBuff, size); msgId = msg->msgId; grpId = msg->grpId; mDataAccess->addMsgData(msg); // add to published to allow acknowledgement mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(grpId, msgId))); mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); } else { // delete msg if created wasn't ok delete msg; mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishMsgs() failed to publish msg " << std::endl; #endif } } delete[] mData; delete msgItem; } // clear msg list as we're done publishing them and entries // are invalid mMsgsToPublish.clear(); } void RsGenExchange::service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& keySet) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::service_CreateGroup(): Does nothing" << std::endl; #endif return; } #define GEN_EXCH_GRP_CHUNK 3 void RsGenExchange::publishGrps() { RsStackMutex stack(mGenMtx); std::map::iterator mit = mGrpsToPublish.begin(); std::vector toRemove; int i = 0; for(; mit != mGrpsToPublish.end(); mit++) { toRemove.push_back(mit->first); i++; if(i > GEN_EXCH_GRP_CHUNK) break; RsNxsGrp* grp = new RsNxsGrp(mServType); RsGxsGrpItem* grpItem = mit->second; uint32_t size = mSerialiser->size(grpItem); RsTlvSecurityKeySet keySet; generateGroupKeys(keySet); service_CreateGroup(grpItem, keySet); char gData[size]; bool ok = mSerialiser->serialise(grpItem, gData, &size); grp->grp.setBinData(gData, size); if(ok) { grp->metaData = new RsGxsGrpMetaData(); grpItem->meta.mPublishTs = time(NULL); *(grp->metaData) = grpItem->meta; grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_ADMIN; ok &= createGroup(grp, keySet); size = grp->metaData->serial_size(); char mData[size]; grp->metaData->mGroupId = grp->grpId; ok &= grp->metaData->serialise(mData, size); grp->meta.setBinData(mData, size); RsGxsGroupId grpId = grp->grpId; mDataAccess->addGroupData(grp); // add to published to allow acknowledgement mGrpNotify.insert(std::make_pair(mit->first, grpId)); mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); } if(!ok) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::publishGrps() failed to publish grp " << std::endl; #endif delete grp; // add to published to allow acknowledgement, grpid is empty as grp creation failed mGrpNotify.insert(std::make_pair(mit->first, RsGxsGroupId(""))); mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED); continue; } delete grpItem; } // clear grp list as we're done publishing them and entries // are invalid for(int i = 0; i < toRemove.size(); i++) mGrpsToPublish.erase(toRemove[i]); } uint32_t RsGenExchange::generatePublicToken() { return mDataAccess->generatePublicToken(); } bool RsGenExchange::updatePublicRequestStatus(const uint32_t &token, const uint32_t &status) { return mDataAccess->updatePublicRequestStatus(token, status); } bool RsGenExchange::disposeOfPublicToken(const uint32_t &token) { return mDataAccess->disposeOfPublicToken(token); } RsGeneralDataService* RsGenExchange::getDataStore() { return mDataStore; } bool RsGenExchange::getGroupKeys(const RsGxsGroupId &grpId, RsTlvSecurityKeySet &keySet) { if(grpId.empty()) return false; RsStackMutex stack(mGenMtx); std::map grpMeta; grpMeta[grpId] = NULL; mDataStore->retrieveGxsGrpMetaData(grpMeta); if(grpMeta.empty()) return false; RsGxsGrpMetaData* meta = grpMeta[grpId]; if(meta == NULL) return false; keySet = meta->keys; return true; } void RsGenExchange::createDummyGroup(RsGxsGrpItem *grpItem) { RsStackMutex stack(mGenMtx); RsNxsGrp* grp = new RsNxsGrp(mServType); uint32_t size = mSerialiser->size(grpItem); char gData[size]; bool ok = mSerialiser->serialise(grpItem, gData, &size); grp->grp.setBinData(gData, size); RsTlvSecurityKeySet keySet; generateGroupKeys(keySet); service_CreateGroup(grpItem, keySet); if(ok) { grp->metaData = new RsGxsGrpMetaData(); grpItem->meta.mPublishTs = time(NULL); *(grp->metaData) = grpItem->meta; grp->metaData->mSubscribeFlags = ~GXS_SERV::GROUP_SUBSCRIBE_MASK; createGroup(grp, keySet); size = grp->metaData->serial_size(); char mData[size]; grp->metaData->mGroupId = grp->grpId; ok = grp->metaData->serialise(mData, size); grp->meta.setBinData(mData, size); mDataAccess->addGroupData(grp); } if(!ok) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::createDummyGroup(); failed to publish grp " << std::endl; #endif delete grp; } delete grpItem; } void RsGenExchange::processRecvdData() { processRecvdGroups(); processRecvdMessages(); } void RsGenExchange::processRecvdMessages() { RsStackMutex stack(mGenMtx); std::vector::iterator vit = mReceivedMsgs.begin(); GxsMsgReq msgIds; std::map msgs; for(; vit != mReceivedMsgs.end(); vit++) { RsNxsMsg* msg = *vit; RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); bool ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)); if(ok) { msgs.insert(std::make_pair(msg, meta)); msgIds[msg->grpId].push_back(msg->msgId); } else { #ifdef GXS_GENX_DEBUG std::cerr << "failed to deserialise incoming meta, grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; #endif delete msg; delete meta; } } if(!msgIds.empty()) { mDataStore->storeMessage(msgs); RsGxsMsgChange* c = new RsGxsMsgChange(); c->msgChangeMap = msgIds; mNotifications.push_back(c); } mReceivedMsgs.clear(); } void RsGenExchange::processRecvdGroups() { RsStackMutex stack(mGenMtx); std::vector::iterator vit = mReceivedGrps.begin(); std::map grps; std::list grpIds; for(; vit != mReceivedGrps.end(); vit++) { RsNxsGrp* grp = *vit; RsGxsGrpMetaData* meta = new RsGxsGrpMetaData(); bool ok = meta->deserialise(grp->meta.bin_data, grp->meta.bin_len); if(ok) { grps.insert(std::make_pair(grp, meta)); grpIds.push_back(grp->grpId); } else { #ifdef GXS_GENX_DEBUG std::cerr << "failed to deserialise incoming meta, grpId: " << grp->grpId << std::endl; #endif delete grp; delete meta; } } if(!grpIds.empty()) { RsGxsGroupChange* c = new RsGxsGroupChange(); c->grpIdList = grpIds; mNotifications.push_back(c); mDataStore->storeGroup(grps); } mReceivedGrps.clear(); }