From 52a911329e6036cf484c607ebb883e57b97dfe80 Mon Sep 17 00:00:00 2001 From: chrisparker126 Date: Sun, 15 Jul 2012 12:38:20 +0000 Subject: [PATCH] group synchronisation now working (message syn not up yet, will do later, simple extension) updated hub test git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-new_cache_system@5300 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/gxs/rsdataservice.cc | 8 +- libretroshare/src/gxs/rsdataservice.h | 2 +- libretroshare/src/gxs/rsgxsnetservice.cc | 114 ++++++++++++------ libretroshare/src/gxs/rsgxsnetservice.h | 16 +-- libretroshare/src/tests/gxs/nxstesthub.cc | 12 +- .../src/tests/gxs/nxstestscenario.cc | 16 ++- libretroshare/src/tests/gxs/nxstestscenario.h | 5 +- .../src/tests/gxs/rsgxsnetservice_test.cc | 2 +- 8 files changed, 112 insertions(+), 63 deletions(-) diff --git a/libretroshare/src/gxs/rsdataservice.cc b/libretroshare/src/gxs/rsdataservice.cc index 201bce020..a14c4306d 100644 --- a/libretroshare/src/gxs/rsdataservice.cc +++ b/libretroshare/src/gxs/rsdataservice.cc @@ -577,11 +577,13 @@ int RsDataService::retrieveNxsGrps(std::map &grp, bool { std::vector grps; retrieveGroups(c, grps); - std::vector::iterator vit = grps.begin(); - for(; vit != grps.end(); vit++) + if(!grps.empty()) { - grp[(*vit)->grpId] = *vit; + RsNxsGrp* ng = grps.front(); + grp[ng->grpId] = ng; + }else{ + grp.erase(grpId); } delete c; diff --git a/libretroshare/src/gxs/rsdataservice.h b/libretroshare/src/gxs/rsdataservice.h index a6ab1f1d3..59e1e1bea 100644 --- a/libretroshare/src/gxs/rsdataservice.h +++ b/libretroshare/src/gxs/rsdataservice.h @@ -23,7 +23,7 @@ public: /*! * Retrieves groups, if empty, retrieves all grps, if map is not empty - * only retrieve entries + * only retrieve entries, if entry cannot be found, it is removed from map * @param grp retrieved groups * @param cache whether to store retrieval in mem for faster later retrieval * @return error code diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 834fd92f2..2486c4a12 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -2,18 +2,19 @@ #define NXS_NET_DEBUG -#define SYNC_PERIOD 1 // in microseconds every 10 seconds (1 second for testing) +#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing) #define TRANSAC_TIMEOUT 5 // 5 seconds RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs) : p3Config(servType), p3ThreadedService(servType), - mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), + mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), mTransactionN(0), mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD) { addSerialType(new RsNxsSerialiser(mServType)); + mOwnId = mNetMgr->getOwnId(); } RsGxsNetService::~RsGxsNetService() @@ -190,13 +191,19 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) * For ending a transaction the */ - const std::string& peer = item->PeerId(); + std::string peer; + + // for outgoing transaction use own id + if(item->transactFlag & (RsNxsTransac::FLAG_BEGIN_P2 | RsNxsTransac::FLAG_END_SUCCESS)) + peer = mOwnId; + else + peer = item->PeerId(); + uint32_t transN = item->transactionNumber; item->timestamp = time(NULL); // register time received NxsTransaction* tr = NULL; #ifdef NXS_NET_DEBUG - std::cerr << "locked_processTransac() " << std::endl; std::cerr << "locked_processTransac(), Received transaction item: " << transN << std::endl; std::cerr << "locked_processTransac(), With peer: " << item->PeerId() << std::endl; @@ -225,13 +232,13 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) TransactionIdMap& transMap = mTransactions[peer]; if(transExists) - return false; + return false; // should not happen! // create new transaction tr = new NxsTransaction(); transMap[transN] = tr; tr->mTransaction = item; - tr->mTimeOut = item->timestamp; + tr->mTimeOut = item->timestamp + mTransactionTimeOut; // note state as receiving, commencement item // is sent on next run() loop @@ -240,7 +247,7 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) // commencement item for outgoing transaction }else if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P2){ - // transaction does not exist + // transaction must exist if(!peerTrExists || !transExists) return false; @@ -291,13 +298,14 @@ void RsGxsNetService::run(){ } } -bool RsGxsNetService::checkTransacTimedOut(NxsTransaction* tr) +bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr) { return tr->mTimeOut < ((uint32_t) time(NULL)); } void RsGxsNetService::processTransactions(){ + RsStackMutex stack(mNxsMutex); TransactionsPeerMap::iterator mit = mTransactions.begin(); @@ -324,7 +332,7 @@ void RsGxsNetService::processTransactions(){ uint32_t transN = tr->mTransaction->transactionNumber; // first check transaction has not expired - if(checkTransacTimedOut(tr)) + if(locked_checkTransacTimedOut(tr)) { #ifdef NXS_NET_DEBUG @@ -405,7 +413,7 @@ void RsGxsNetService::processTransactions(){ uint32_t transN = tr->mTransaction->transactionNumber; // first check transaction has not expired - if(checkTransacTimedOut(tr)) + if(locked_checkTransacTimedOut(tr)) { #ifdef NXS_NET_DEBUG @@ -455,6 +463,7 @@ void RsGxsNetService::processTransactions(){ trans->transactionNumber = transN; trans->PeerId(tr->mTransaction->PeerId()); sendItem(trans); + tr->mFlag = NxsTransaction::FLAG_STATE_RECEIVING; } else{ @@ -481,6 +490,7 @@ void RsGxsNetService::processTransactions(){ void RsGxsNetService::processCompletedTransactions() { + RsStackMutex stack(mNxsMutex); /*! * Depending on transaction we may have to respond to peer * responsible for transaction @@ -495,9 +505,9 @@ void RsGxsNetService::processCompletedTransactions() bool outgoing = tr->mTransaction->PeerId() == mOwnId; if(outgoing){ - processCompletedOutgoingTrans(tr); + locked_processCompletedOutgoingTrans(tr); }else{ - + locked_processCompletedIncomingTrans(tr); } @@ -506,7 +516,7 @@ void RsGxsNetService::processCompletedTransactions() } } -void RsGxsNetService::processCompletedIncomingTrans(NxsTransaction* tr) +void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr) { uint16_t flag = tr->mTransaction->transactFlag; @@ -517,20 +527,20 @@ void RsGxsNetService::processCompletedIncomingTrans(NxsTransaction* tr) if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP) { // generate request based on a peers response - genReqMsgTransaction(tr); + locked_genReqMsgTransaction(tr); }else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP) { - genReqGrpTransaction(tr); + locked_genReqGrpTransaction(tr); } // you've finished receiving request information now gen else if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ) { - genSendMsgsTransaction(tr); + locked_genSendMsgsTransaction(tr); } else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ) { - genSendGrpsTransaction(tr); + locked_genSendGrpsTransaction(tr); } else if(flag & RsNxsTransac::FLAG_TYPE_GRPS) { @@ -593,7 +603,7 @@ void RsGxsNetService::processCompletedIncomingTrans(NxsTransaction* tr) return; } -void RsGxsNetService::processCompletedOutgoingTrans(NxsTransaction* tr) +void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr) { uint16_t flag = tr->mTransaction->transactFlag; @@ -658,7 +668,7 @@ void RsGxsNetService::processCompletedOutgoingTrans(NxsTransaction* tr) } -void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr) +void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) { // to create a transaction you need to know who you are transacting with @@ -705,7 +715,7 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr) msgIdSet.insert((*vit)->mMsgId); // get unique id for this transaction - uint32_t transN = getTransactionId(); + uint32_t transN = locked_getTransactionId(); // add msgs that you don't have to request list @@ -733,11 +743,12 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr) transac->timestamp = 0; transac->nItems = reqList.size(); transac->PeerId(tr->mTransaction->PeerId()); + transac->transactionNumber = transN; NxsTransaction* newTrans = new NxsTransaction(); newTrans->mItems = reqList; newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; - newTrans->mTimeOut = 0; + newTrans->mTimeOut = time(NULL) + mTransactionTimeOut; // create transaction copy with your id to indicate // its an outgoing transaction @@ -747,13 +758,12 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr) sendItem(transac); { - RsStackMutex stack(mNxsMutex); if(!locked_addTransaction(newTrans)) delete newTrans; } } -void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr) +void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr) { // to create a transaction you need to know who you are transacting with @@ -788,7 +798,7 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr) std::list::iterator llit = grpItemL.begin(); std::list reqList; - uint32_t transN = getTransactionId(); + uint32_t transN = locked_getTransactionId(); for(; llit != grpItemL.end(); llit++) { @@ -811,26 +821,31 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr) transac->timestamp = 0; transac->nItems = reqList.size(); transac->PeerId(tr->mTransaction->PeerId()); + transac->transactionNumber = transN; NxsTransaction* newTrans = new NxsTransaction(); newTrans->mItems = reqList; newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; - newTrans->mTimeOut = 0; + newTrans->mTimeOut = time(NULL) + mTransactionTimeOut; newTrans->mTransaction = new RsNxsTransac(*transac); newTrans->mTransaction->PeerId(mOwnId); sendItem(transac); - { - RsStackMutex stack(mNxsMutex); - if(!locked_addTransaction(newTrans)) - delete newTrans; - } + if(!locked_addTransaction(newTrans)) + delete newTrans; + } -void RsGxsNetService::genSendGrpsTransaction(NxsTransaction* tr) +void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) { +#ifdef NXS_NET_DEBUG + std::cerr << "locked_genSendGrpsTransaction()" << std::endl; + std::cerr << "Generating Grp data send fron TransN: " << tr->mTransaction->transactionNumber + << std::endl; +#endif + // go groups requested in transaction tr std::list::iterator lit = tr->mItems.begin(); @@ -845,41 +860,54 @@ void RsGxsNetService::genSendGrpsTransaction(NxsTransaction* tr) mDataStore->retrieveNxsGrps(grps, false); + NxsTransaction* newTr = new NxsTransaction(); newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; + uint32_t transN = locked_getTransactionId(); + // store grp items to send in transaction std::map::iterator mit = grps.begin(); + std::string peerId = tr->mTransaction->PeerId(); for(;mit != grps.end(); mit++) { + mit->second->PeerId(peerId); // set so it gets send to right peer + mit->second->transactionNumber = transN; newTr->mItems.push_back(mit->second); } + if(newTr->mItems.empty()){ + delete newTr; + return; + } + + RsNxsTransac* ntr = new RsNxsTransac(mServType); - ntr->transactionNumber = getTransactionId(); + ntr->transactionNumber = transN; ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_GRPS; ntr->nItems = grps.size(); newTr->mTransaction = new RsNxsTransac(*ntr); newTr->mTransaction->PeerId(mOwnId); + newTr->mTimeOut = time(NULL) + mTransactionTimeOut; ntr->PeerId(tr->mTransaction->PeerId()); sendItem(ntr); + locked_addTransaction(newTr); + return; } -void RsGxsNetService::genSendMsgsTransaction(NxsTransaction* tr) +void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr) { return; } -uint32_t RsGxsNetService::getTransactionId() +uint32_t RsGxsNetService::locked_getTransactionId() { - RsStackMutex stack(mNxsMutex); - - return mTransactionN++; + return ++mTransactionN; } bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr) { @@ -918,6 +946,8 @@ void RsGxsNetService::cleanTransactionItems(NxsTransaction* tr) const void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) { + RsStackMutex stack(mNxsMutex); + std::string peer = item->PeerId(); std::map grp; @@ -933,6 +963,8 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) NxsTransaction* tr = new NxsTransaction(); std::list& itemL = tr->mItems; + uint32_t transN = locked_getTransactionId(); + for(; mit != grp.end(); mit++) { RsNxsSyncGrpItem* gItem = new @@ -941,6 +973,7 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) gItem->grpId = mit->first; gItem->publishTs = mit->second->mPublishTs; gItem->PeerId(peer); + gItem->transactionNumber = transN; itemL.push_back(gItem); } @@ -950,18 +983,18 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) | RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP; trItem->nItems = itemL.size(); - trItem->timestamp = time(NULL); + trItem->timestamp = 0; trItem->PeerId(peer); - trItem->transactionNumber = getTransactionId(); + trItem->transactionNumber = transN; // also make a copy for the resident transaction tr->mTransaction = new RsNxsTransac(*trItem); tr->mTransaction->PeerId(mOwnId); + tr->mTimeOut = time(NULL) + mTransactionTimeOut; // signal peer to prepare for transaction sendItem(trItem); - RsStackMutex stack(mNxsMutex); locked_addTransaction(tr); return; @@ -969,6 +1002,7 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item) { + RsStackMutex stack(mNxsMutex); return; } diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 597cec882..6d12d5119 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -212,13 +212,13 @@ private: * Process transaction owned/started by user * @param tr transaction to process, ownership stays with callee */ - void processCompletedOutgoingTrans(NxsTransaction* tr); + void locked_processCompletedOutgoingTrans(NxsTransaction* tr); /*! * Process transactions started/owned by other peers * @param tr transaction to process, ownership stays with callee */ - void processCompletedIncomingTrans(NxsTransaction* tr); + void locked_processCompletedIncomingTrans(NxsTransaction* tr); /*! @@ -240,7 +240,7 @@ private: * This retrieves a unique transaction id that * can be used in an outgoing transaction */ - uint32_t getTransactionId(); + uint32_t locked_getTransactionId(); /*! * This attempts to push the transaction id counter back if you have @@ -265,28 +265,28 @@ private: * of msgs received from peer stored in passed transaction * @param tr transaction responsible for generating msg request */ - void genReqMsgTransaction(NxsTransaction* tr); + void locked_genReqMsgTransaction(NxsTransaction* tr); /*! * Generates new transaction to send grp requests based on list * of grps received from peer stored in passed transaction * @param tr transaction responsible for generating grp request */ - void genReqGrpTransaction(NxsTransaction* tr); + void locked_genReqGrpTransaction(NxsTransaction* tr); /*! * Generates new transaction to send msg data based on list * of grpids received from peer stored in passed transaction * @param tr transaction responsible for generating grp request */ - void genSendMsgsTransaction(NxsTransaction* tr); + void locked_genSendMsgsTransaction(NxsTransaction* tr); /*! * Generates new transaction to send grp data based on list * of grps received from peer stored in passed transaction * @param tr transaction responsible for generating grp request */ - void genSendGrpsTransaction(NxsTransaction* tr); + void locked_genSendGrpsTransaction(NxsTransaction* tr); /*! * convenience function to add a transaction to list @@ -300,7 +300,7 @@ private: * @param tr the transaction to check for timeout * @return false if transaction has timed out, true otherwise */ - bool checkTransacTimedOut(NxsTransaction* tr); + bool locked_checkTransacTimedOut(NxsTransaction* tr); /** E: Transaction processing **/ diff --git a/libretroshare/src/tests/gxs/nxstesthub.cc b/libretroshare/src/tests/gxs/nxstesthub.cc index a9b9b542c..d6ca55f01 100644 --- a/libretroshare/src/tests/gxs/nxstesthub.cc +++ b/libretroshare/src/tests/gxs/nxstesthub.cc @@ -3,8 +3,10 @@ NxsTestHub::NxsTestHub(NxsTestScenario* nts) : mTestScenario(nts) { - netServicePairs.first = new RsGxsNetService(0, mTestScenario->dummyDataService1(), &netMgr1, mTestScenario); - netServicePairs.second = new RsGxsNetService(0, mTestScenario->dummyDataService2(), &netMgr2, mTestScenario); + netServicePairs.first = new RsGxsNetService(mTestScenario->getServiceType(), + mTestScenario->dummyDataService1(), &netMgr1, mTestScenario); + netServicePairs.second = new RsGxsNetService(mTestScenario->getServiceType(), + mTestScenario->dummyDataService2(), &netMgr2, mTestScenario); mServicePairs.first = netServicePairs.first; mServicePairs.second = netServicePairs.second; @@ -28,7 +30,7 @@ void NxsTestHub::run() while(isRunning()){ // make thread sleep for a couple secs - usleep(300); + usleep(3000); p3Service* s1 = mServicePairs.first; p3Service* s2 = mServicePairs.second; @@ -36,11 +38,13 @@ void NxsTestHub::run() RsItem* item = NULL; while((item = s1->send()) != NULL) { + item->PeerId("PeerB"); send_queue_s1.push_back(item); } while((item = s2->send()) != NULL) { + item->PeerId("PeerA"); send_queue_s2.push_back(item); } @@ -56,7 +60,7 @@ void NxsTestHub::run() send_queue_s2.pop_front(); } - // tick services so nxs net services processe items + // tick services so nxs net services process items s1->tick(); s2->tick(); } diff --git a/libretroshare/src/tests/gxs/nxstestscenario.cc b/libretroshare/src/tests/gxs/nxstestscenario.cc index 66b0359b2..901ed555a 100644 --- a/libretroshare/src/tests/gxs/nxstestscenario.cc +++ b/libretroshare/src/tests/gxs/nxstestscenario.cc @@ -9,10 +9,11 @@ #include "gxs/rsdataservice.h" #include "data_support.h" -NxsMessageTest::NxsMessageTest() +NxsMessageTest::NxsMessageTest(uint16_t servtype) +: mServType(servtype) { - mStorePair.first = new RsDataService(".", "dStore1", 0); - mStorePair.second = new RsDataService(".", "dStore2", 0); + mStorePair.first = new RsDataService(".", "dStore1", mServType); + mStorePair.second = new RsDataService(".", "dStore2", mServType); setUpDataBases(); } @@ -40,6 +41,11 @@ void NxsMessageTest::setUpDataBases() return; } +uint16_t NxsMessageTest::getServiceType() +{ + return mServType; +} + void NxsMessageTest::populateStore(RsGeneralDataService* dStore) { @@ -51,7 +57,7 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore) for(int i = 0; i < nGrp; i++) { std::pair p; - grp = new RsNxsGrp(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM); + grp = new RsNxsGrp(mServType); grpMeta = new RsGxsGrpMetaData(); p.first = grp; p.second = grpMeta; @@ -83,7 +89,7 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore) for(int i=0; i mStorePair; std::map > mPeerMsgs; std::map > mPeerGrps; + uint16_t mServType; }; diff --git a/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc b/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc index c0ff8157b..68ccbe10a 100644 --- a/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc +++ b/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc @@ -16,7 +16,7 @@ int main() { // first setup - NxsMessageTest msgTest; + NxsMessageTest msgTest(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM); NxsTestHub hub(&msgTest); // now get things started