From 46c945de9674147578e4ecf797eaffef5a4945fc Mon Sep 17 00:00:00 2001 From: chrisparker126 Date: Sat, 1 Sep 2012 14:47:22 +0000 Subject: [PATCH] nxs msg transaction now working and test commited some clean up still needed (msgs to sync should be determined by grp flag in db) git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5497 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/gxs/rsgxsnetservice.cc | 229 +++++++++++++++--- libretroshare/src/gxs/rsgxsnetservice.h | 3 + .../src/tests/gxs/nxstestscenario.cc | 17 +- .../src/tests/gxs/rsgxsnetservice_test.cc | 2 +- libretroshare/src/util/contentvalue.cc | 1 + 5 files changed, 215 insertions(+), 37 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index e619e1ebd..5becd34ec 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -28,7 +28,7 @@ #define NXS_NET_DEBUG -#define SYNC_PERIOD 10 // in microseconds every 10 seconds (1 second for testing) +#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing) #define TRANSAC_TIMEOUT 5 // 5 seconds @@ -85,14 +85,23 @@ void RsGxsNetService::syncWithPeers() sendItem(grp); } -// // TODO msgs -// for(; sit != peers.end(); sit++) -// { -// RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType); -// msg->clear(); -// msg->PeerId(*sit); -// sendItem(msg); -// } + sit = peers.begin(); + // TODO msgs + for(; sit != peers.end(); sit++) + { + RsStackMutex stack(mNxsMutex); + + std::set::iterator sit_grp = mGroupSubscribedTo.begin(); + + for(; sit_grp != mGroupSubscribedTo.end(); sit_grp++) + { + RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType); + msg->clear(); + msg->PeerId(*sit); + msg->grpId = *sit_grp; + sendItem(msg); + } + } } @@ -334,8 +343,8 @@ void RsGxsNetService::run(){ bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr) { - //return tr->mTimeOut < ((uint32_t) time(NULL)); - return false; + return tr->mTimeOut < ((uint32_t) time(NULL)); + // return false; } void RsGxsNetService::processTransactions(){ @@ -591,6 +600,10 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr) { tr->mItems.pop_front(); grps.push_back(grp); + + //TODO: remove subscription should be handled + // outside netservice + mGroupSubscribedTo.insert(grp->grpId); } else { @@ -605,6 +618,8 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr) // notify listener of grps mObserver->notifyNewGroups(grps); + + }else if(flag & RsNxsTransac::FLAG_TYPE_MSGS) { @@ -627,6 +642,8 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr) } } + int nitems = msgs.size(); + // notify listener of msgs mObserver->notifyNewMessages(msgs); @@ -735,10 +752,10 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) // get grp id for this transaction RsNxsSyncMsgItem* item = msgItemL.front(); const std::string& grpId = item->grpId; - GxsMsgReq reqIds; - reqIds[grpId] = std::vector(); + GxsMsgReq reqIds; + reqIds[grpId] = std::vector(); GxsMsgMetaResult result; - mDataStore->retrieveGxsMsgMetaData(reqIds, result); + mDataStore->retrieveGxsMsgMetaData(reqIds, result); std::vector &msgMetaV = result[grpId]; std::vector::const_iterator vit = msgMetaV.begin(); @@ -756,6 +773,8 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) std::list::iterator llit = msgItemL.begin(); std::list reqList; + const std::string peerFrom = tr->mTransaction->PeerId(); + for(; llit != msgItemL.end(); llit++) { const std::string& msgId = (*llit)->msgId; @@ -766,11 +785,12 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) msgItem->msgId = msgId; msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST; msgItem->transactionNumber = transN; + msgItem->PeerId(peerFrom); reqList.push_back(msgItem); } } - if(reqList.empty()) + if(!reqList.empty()) { RsNxsTransac* transac = new RsNxsTransac(mServType); @@ -852,26 +872,36 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr) } - RsNxsTransac* transac = new RsNxsTransac(mServType); - transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ - | RsNxsTransac::FLAG_BEGIN_P1; - transac->timestamp = 0; - transac->nItems = reqList.size(); - transac->PeerId(tr->mTransaction->PeerId()); - transac->transactionNumber = transN; + if(!reqList.empty()) + { - NxsTransaction* newTrans = new NxsTransaction(); - newTrans->mItems = reqList; - newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; - newTrans->mTimeOut = time(NULL) + mTransactionTimeOut; - newTrans->mTransaction = new RsNxsTransac(*transac); - newTrans->mTransaction->PeerId(mOwnId); + RsNxsTransac* transac = new RsNxsTransac(mServType); + transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ + | RsNxsTransac::FLAG_BEGIN_P1; + transac->timestamp = 0; + transac->nItems = reqList.size(); + transac->PeerId(tr->mTransaction->PeerId()); + transac->transactionNumber = transN; - sendItem(transac); + NxsTransaction* newTrans = new NxsTransaction(); + newTrans->mItems = reqList; + newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; + newTrans->mTimeOut = time(NULL) + mTransactionTimeOut; + newTrans->mTransaction = new RsNxsTransac(*transac); + newTrans->mTransaction->PeerId(mOwnId); - if(!locked_addTransaction(newTrans)) - delete newTrans; + sendItem(transac); + if(!locked_addTransaction(newTrans)) + delete newTrans; + + } + + // clean up meta data + std::map::iterator mit = grpMetaMap.begin(); + + for(; mit != grpMetaMap.end(); mit++) + delete mit->second; } void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) @@ -895,8 +925,13 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) grps[item->grpId] = NULL; } - mDataStore->retrieveNxsGrps(grps, false, false); - + if(!grps.empty()) + { + mDataStore->retrieveNxsGrps(grps, false, false); + } + else{ + return; + } NxsTransaction* newTr = new NxsTransaction(); newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; @@ -908,7 +943,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) std::string peerId = tr->mTransaction->PeerId(); for(;mit != grps.end(); mit++) { - mit->second->PeerId(peerId); // set so it gets sent to right peer + mit->second->PeerId(peerId); // set so it gets sent to right peer mit->second->transactionNumber = transN; newTr->mItems.push_back(mit->second); } @@ -941,6 +976,77 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr) { +#ifdef NXS_NET_DEBUG + std::cerr << "locked_genSendMsgsTransaction()" << std::endl; + std::cerr << "Generating Msg data send fron TransN: " << tr->mTransaction->transactionNumber + << std::endl; +#endif + + // go groups requested in transaction tr + + std::list::iterator lit = tr->mItems.begin(); + + GxsMsgReq msgIds; + GxsMsgResult msgs; + + if(tr->mItems.empty()){ + return; + } + + for(;lit != tr->mItems.end(); lit++) + { + RsNxsSyncMsgItem* item = dynamic_cast(*lit); + msgIds[item->grpId].push_back(item->msgId); + } + + mDataStore->retrieveNxsMsgs(msgIds, msgs, false, false); + + NxsTransaction* newTr = new NxsTransaction(); + newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; + + uint32_t transN = locked_getTransactionId(); + + // store grp items to send in transaction + GxsMsgResult::iterator mit = msgs.begin(); + std::string peerId = tr->mTransaction->PeerId(); + uint32_t msgSize = 0; + + for(;mit != msgs.end(); mit++) + { + std::vector& msgV = mit->second; + std::vector::iterator vit = msgV.begin(); + + for(; vit != msgV.end(); vit++) + { + RsNxsMsg* msg = *vit; + msg->PeerId(peerId); + msg->transactionNumber = transN; + newTr->mItems.push_back(msg); + msgSize++; + } + } + + if(newTr->mItems.empty()){ + delete newTr; + return; + } + + RsNxsTransac* ntr = new RsNxsTransac(mServType); + ntr->transactionNumber = transN; + ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | + RsNxsTransac::FLAG_TYPE_MSGS; + ntr->nItems = msgSize; + ntr->PeerId(peerId); + + newTr->mTransaction = new RsNxsTransac(*ntr); + newTr->mTransaction->PeerId(mOwnId); + newTr->mTimeOut = time(NULL) + mTransactionTimeOut; + + ntr->PeerId(tr->mTransaction->PeerId()); + sendItem(ntr); + + locked_addTransaction(newTr); + return; } uint32_t RsGxsNetService::locked_getTransactionId() @@ -1040,6 +1146,61 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item) { RsStackMutex stack(mNxsMutex); + + const std::string& peer = item->PeerId(); + + GxsMsgMetaResult metaResult; + GxsMsgReq req; + req[item->grpId] = std::vector(); + mDataStore->retrieveGxsMsgMetaData(req, metaResult); + + std::vector& msgMeta = metaResult[item->grpId]; + + if(req.empty()){ + return; + } + + std::vector::iterator vit = msgMeta.begin(); + + NxsTransaction* tr = new NxsTransaction(); + std::list& itemL = tr->mItems; + + uint32_t transN = locked_getTransactionId(); + + for(; vit != msgMeta.end(); vit++) + { + RsGxsMsgMetaData* m = *vit; + RsNxsSyncMsgItem* mItem = new + RsNxsSyncMsgItem(mServType); + mItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE; + mItem->grpId = m->mGroupId; + mItem->msgId = m->mMsgId; + mItem->PeerId(peer); + mItem->transactionNumber = transN; + itemL.push_back(mItem); + } + + tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; + RsNxsTransac* trItem = new RsNxsTransac(mServType); + trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 + | RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP; + + trItem->nItems = itemL.size(); + + trItem->timestamp = 0; + trItem->PeerId(peer); + trItem->transactionNumber = transN; + + // also make a copy for the resident transaction + tr->mTransaction = new RsNxsTransac(*trItem); + tr->mTransaction->PeerId(mOwnId); + tr->mTimeOut = time(NULL) + mTransactionTimeOut; + + // signal peer to prepare for transaction + sendItem(trItem); + + locked_addTransaction(tr); + return; } diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index c4c0b9b40..096e5c8c7 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -395,6 +395,9 @@ private: RsMutex mNxsMutex; uint32_t mSyncTs; + // TODO: remove, temp, for testing. + // subscription handled outside netservice + std::set mGroupSubscribedTo; const uint32_t mSYNC_PERIOD; }; diff --git a/libretroshare/src/tests/gxs/nxstestscenario.cc b/libretroshare/src/tests/gxs/nxstestscenario.cc index 61c0cac72..e5806a8a9 100644 --- a/libretroshare/src/tests/gxs/nxstestscenario.cc +++ b/libretroshare/src/tests/gxs/nxstestscenario.cc @@ -8,6 +8,7 @@ #include "nxstestscenario.h" #include "gxs/rsdataservice.h" #include "data_support.h" +#include NxsMessageTest::NxsMessageTest(uint16_t servtype) : mServType(servtype), mMsgTestMtx("mMsgTestMtx") @@ -42,7 +43,7 @@ RsGeneralDataService* NxsMessageTest::getDataService(const std::string& peer) { if(mPeerStoreMap.find(peer) != mPeerStoreMap.end()) return NULL; - RsDataService* dStore = new RsDataService("./", peer.c_str(), mServType); + RsDataService* dStore = new RsDataService("./", peer, mServType); mStoreNames.insert(peer); mPeerStoreMap.insert(std::make_pair(peer, dStore)); populateStore(dStore); @@ -134,7 +135,6 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore) void NxsMessageTest::cleanUp() { - std::map::iterator mit = mPeerStoreMap.begin(); for(; mit != mPeerStoreMap.end(); mit++) @@ -177,5 +177,18 @@ void NxsMessageTestObserver::notifyNewGroups(std::vector &groups) void NxsMessageTestObserver::notifyNewMessages(std::vector &messages) { + std::vector::iterator vit = messages.begin(); + std::map msgs; + + for(; vit != messages.end(); vit++) + { + RsNxsMsg* msg = *vit; + RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); + meta->mGroupId = msg->grpId; + meta->mMsgId = msg->msgId; + msgs.insert(std::make_pair(msg, meta)); + } + + mStore->storeMessage(msgs); } diff --git a/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc b/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc index d187ad144..ddcc2f6c0 100644 --- a/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc +++ b/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc @@ -25,7 +25,7 @@ int main() // now get things started createThread(hub); - double timeDelta = 30; + double timeDelta = 50; // put this thread to sleep for 10 secs // make thread sleep for a bit diff --git a/libretroshare/src/util/contentvalue.cc b/libretroshare/src/util/contentvalue.cc index 58f237cf7..cce492df3 100644 --- a/libretroshare/src/util/contentvalue.cc +++ b/libretroshare/src/util/contentvalue.cc @@ -112,6 +112,7 @@ ContentValue::ContentValue(ContentValue &from){ default: std::cerr << "ContentValue::ContentValue(ContentValue &from):" << "Error! Unrecognised data type!" << std::endl; + break; } } }