diff --git a/libretroshare/src/gxs/rsdataservice.h b/libretroshare/src/gxs/rsdataservice.h index e19167a43..3fe8b831e 100644 --- a/libretroshare/src/gxs/rsdataservice.h +++ b/libretroshare/src/gxs/rsdataservice.h @@ -1,7 +1,7 @@ #ifndef RSDATASERVICE_H #define RSDATASERVICE_H -#include "rsgds.h" +#include "gxs/rsgds.h" #include "util/retrodb.h" diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 97c003822..706f533b8 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -1,22 +1,73 @@ #include "rsgxsnetservice.h" +#define SYNC_PERIOD 1000 // every 10 seconds +#define TRANSAC_TIMEOUT 10 // 10 seconds + RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs) : p3Config(servType), p3ThreadedService(servType), mServType(servType), mDataStore(gds), - mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr) + mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD) { + addSerialType(new RsNxsSerialiser(mServType)); } +RsGxsNetService::~RsGxsNetService() +{ + +} int RsGxsNetService::tick(){ - + // always check for new items arriving + // from peers if(receivedItems()) recvNxsItemQueue(); + uint32_t now = time(NULL); + if((mSYNC_PERIOD + mSyncTs) < now) + { + syncWithPeers(); + } + + return 1; +} + +void RsGxsNetService::syncWithPeers() +{ + + std::set peers; + mNetMgr->getOnlineList(peers); + + std::set::iterator sit = peers.begin(); + + // for now just grps + for(; sit != peers.end(); sit++) + { + RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType); + grp->PeerId(*sit); + sendItem(grp); + } + + // TODO msgs + +} + +bool RsGxsNetService::loadList(std::list& load) +{ + return false; +} + +bool RsGxsNetService::saveList(bool& cleanup, std::list& save) +{ + return false; +} + +RsSerialiser *RsGxsNetService::setupSerialiser() +{ + return NULL; } void RsGxsNetService::recvNxsItemQueue(){ @@ -33,9 +84,10 @@ void RsGxsNetService::recvNxsItemQueue(){ RsNxsItem *ni = dynamic_cast(item) ; if(ni != NULL) { + + // a live transaction has a non zero value if(ni->transactionNumber != 0){ - // accumulate if(handleTransaction(ni)) continue ; } @@ -59,7 +111,7 @@ bool RsGxsNetService::handleTransaction(RsNxsItem* item){ /*! * This attempts to handle a transaction * It first checks if this transaction id already exists - * If it does then create check this not a initiating transactions + * If it does then check this not a initiating transactions */ RsStackMutex stack(mNxsMutex); @@ -68,14 +120,14 @@ bool RsGxsNetService::handleTransaction(RsNxsItem* item){ RsNxsTransac* transItem = dynamic_cast(item); - // if this is an RsNxsTransac item process + // if this is a RsNxsTransac item process if(transItem){ return locked_processTransac(transItem); } // then this must be transaction content to be consumed // first check peer exist for transaction - bool peerTransExists = mTransactions.find(peer) == mTransactions.end(); + bool peerTransExists = mTransactions.find(peer) != mTransactions.end(); // then check transaction exists @@ -104,6 +156,22 @@ bool RsGxsNetService::handleTransaction(RsNxsItem* item){ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) { + /*! + * To process the transaction item + * It can either be initiating a transaction + * or ending one that already exists + * + * For initiating an incoming transaction the peer + * and transaction item need not exists + * as the peer will be added and transaction number + * added thereafter + * + * For commencing/starting an outgoing transaction + * the transaction must exist already + * + * For ending a transaction the + */ + const std::string& peer = item->PeerId(); uint32_t transN = item->transactionNumber; NxsTransaction* tr = NULL; @@ -114,11 +182,12 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) if(peerTrExists){ TransactionIdMap& transMap = mTransactions[peer]; - // remove current transaction if it does exist + // record whether transaction exists already transExists = transMap.find(transN) != transMap.end(); } + // initiating an incoming transaction if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P1){ // create a transaction if the peer does not exist @@ -137,9 +206,11 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) tr->mTransaction = item; tr->mTimestamp = time(NULL); - // note state as receiving + // note state as receiving, commencement item + // is sent on next run() loop tr->mFlag = NxsTransaction::FLAG_STATE_STARTING; + // commencement item for outgoing transaction }else if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P2){ // transaction does not exist @@ -147,11 +218,13 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) return false; - // this means you need to start a transaction + // alter state so transaction content is sent on + // next run() loop TransactionIdMap& transMap = mTransactions[mOwnId]; NxsTransaction* tr = transMap[transN]; tr->mFlag = NxsTransaction::FLAG_STATE_SENDING; + // end transac item for outgoing transaction }else if(item->transactFlag & RsNxsTransac::FLAG_END_SUCCESS){ // transaction does not exist @@ -159,14 +232,14 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) return false; } - // this means you need to start a transaction + // alter state so that transaction is removed + // on next run() loop TransactionIdMap& transMap = mTransactions[mOwnId]; NxsTransaction* tr = transMap[transN]; tr->mFlag = NxsTransaction::FLAG_STATE_COMPLETED; - } - return false; + return true; } void RsGxsNetService::run(){ @@ -182,11 +255,12 @@ void RsGxsNetService::run(){ Sleep((int) (timeDelta * 1000)); #endif + // process active transactions processTransactions(); + // process completed transactions processCompletedTransactions(); - processSyncRequests(); } } @@ -203,8 +277,12 @@ void RsGxsNetService::processTransactions(){ mmit_end = transMap.end(); + /*! + * Transactions owned by peer + */ if(mit->first == mOwnId){ + // transaction to be removed std::list toRemove; for(; mmit != mmit_end; mmit++){ @@ -267,6 +345,10 @@ void RsGxsNetService::processTransactions(){ uint32_t transN = tr->mTransaction->transactionNumber; if(flag & NxsTransaction::FLAG_STATE_RECEIVING){ + + // if the number it item received equal that indicated + // then transaction is marked as completed + // to be moved to complete transations // check if done if(tr->mItems.size() == tr->mTransaction->nItems) tr->mFlag = NxsTransaction::FLAG_STATE_COMPLETED; @@ -300,7 +382,7 @@ void RsGxsNetService::processTransactions(){ } else{ - // if no state + // unrecognised state std::cerr << "RsGxsNetService::processTransactions() Unrecognised statem, deleting " << std::endl; std::cerr << "RsGxsNetService::processTransactions() Id: " << transN << std::endl; @@ -334,6 +416,8 @@ void RsGxsNetService::processCompletedTransactions() uint16_t flag = tr->mTransaction->transactFlag; + // for a completed list response transaction + // one needs generate requests from this if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP) { // generate request based on a peers response @@ -346,20 +430,23 @@ void RsGxsNetService::processCompletedTransactions() else if( (flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ) || (flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ) ) { - // don't do anything + // don't do anything, this should simply be removed }else if(flag & RsNxsTransac::FLAG_TYPE_GRPS) { std::list::iterator lit = tr->mItems.begin(); - std::list grps; + std::vector grps; while(tr->mItems.size() != 0) { RsNxsGrp* grp = dynamic_cast(tr->mItems.front()); if(grp) + { tr->mItems.pop_front(); + grps.push_back(grp); + } else { #ifdef NXS_NET_DEBUG @@ -371,13 +458,13 @@ void RsGxsNetService::processCompletedTransactions() } // notify listener of grps - notifyListenerGrps(grps); + mObserver->notifyNewGroups(grps); }else if(flag & RsNxsTransac::FLAG_TYPE_MSGS) { std::list::iterator lit = tr->mItems.begin(); - std::list msgs; + std::vector msgs; while(tr->mItems.size() > 0) { @@ -385,7 +472,9 @@ void RsGxsNetService::processCompletedTransactions() if(msg) { tr->mItems.pop_front(); - }else + msgs.push_back(msg); + } + else { #ifdef NXS_NET_DEBUG std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg" @@ -395,15 +484,16 @@ void RsGxsNetService::processCompletedTransactions() } // notify listener of msgs - notifyListenerMsgs(msgs); - } + mObserver->notifyNewMessages(msgs); + } delete tr; mComplTransactions.pop_front(); } } + void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr) { @@ -564,6 +654,12 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr) } } +uint32_t RsGxsNetService::getTransactionId() +{ + RsStackMutex stack(mNxsMutex); + + return mTransactionN++; +} bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr) { const std::string& peer = tr->mTransaction->PeerId(); @@ -583,7 +679,6 @@ bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr) void RsGxsNetService::cleanTransactionItems(NxsTransaction* tr) const { - std::list::iterator lit = tr->mItems.begin(); for(; lit != tr->mItems.end(); lit++) @@ -594,6 +689,65 @@ void RsGxsNetService::cleanTransactionItems(NxsTransaction* tr) const tr->mItems.clear(); } +void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) +{ + + std::string peer = item->PeerId(); + delete item; + + std::map grp; + mDataStore->retrieveGxsGrpMetaData(grp); + + if(grp.empty()) + return; + + std::vector grpSyncItems; + std::map::iterator mit = + grp.begin(); + + NxsTransaction* tr = new NxsTransaction(); + std::list& itemL = tr->mItems; + + for(; mit != grp.end(); mit++) + { + RsNxsSyncGrpItem* gItem = new + RsNxsSyncGrpItem(mServType); + gItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE; + gItem->grpId = mit->first; + gItem->publishTs = mit->second->mPublishTs; + gItem->PeerId(peer); + itemL.push_back(gItem); + } + + tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; + RsNxsTransac* trItem = new RsNxsTransac(mServType); + trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 + | RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP; + trItem->nItems = itemL.size(); + time_t now; + gmtime(&now); + trItem->timeout = now + TRANSAC_TIMEOUT; + trItem->PeerId(peer); + trItem->transactionNumber = getTransactionId(); + + // also make a copy for the resident transaction + tr->mTransaction = new RsNxsTransac(*trItem); + + // signal peer to prepare for transaction + sendItem(trItem); + + RsStackMutex stack(mNxsMutex); + locked_addTransaction(tr); + + return; +} + +void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item) +{ + return; +} + + /** inherited methods **/ void RsGxsNetService::pauseSynchronisation(bool enabled) @@ -608,6 +762,14 @@ void RsGxsNetService::setSyncAge(uint32_t age) /** NxsTransaction definition **/ +const uint8_t NxsTransaction::FLAG_STATE_STARTING = 0x0001; // when +const uint8_t NxsTransaction::FLAG_STATE_RECEIVING = 0x0002; // begin receiving items for incoming trans +const uint8_t NxsTransaction::FLAG_STATE_SENDING = 0x0004; // begin sending items for outgoing trans +const uint8_t NxsTransaction::FLAG_STATE_COMPLETED = 0x008; +const uint8_t NxsTransaction::FLAG_STATE_FAILED = 0x0010; +const uint8_t NxsTransaction::FLAG_STATE_WAITING_CONFIRM = 0x0020; + + NxsTransaction::NxsTransaction() : mFlag(0), mTimestamp(0), mTransaction(NULL) { diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 7b7ab22ff..f14438014 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -83,13 +83,24 @@ typedef std::map TransactionsPeerMap; /*! - * Resource use + * This class implements the RsNetWorkExchangeService + * using transactions to handle synchrnisation of Nxs items between + * peers in a network + * Transactions requires the maintenance of several states between peers, and whether + * + * Thus a data structure maintains: peers, and their active transactions + * Then for each transaction it needs to be noted if this is an outgoing or incoming transaction + * Outgoing transaction are in 3 different states: + * 1. START 2. INITIATED 3. SENDING 4. END + * Incoming transaction are also in 3 different states + * 1. START 2. RECEIVING 3. END */ class RsGxsNetService : public RsNetworkExchangeService, public p3ThreadedService, public p3Config { public: + /*! * only one observer is allowed * @param servType service type @@ -99,6 +110,8 @@ public: */ RsGxsNetService(uint16_t servType, RsGeneralDataService* gds, RsNxsNetMgr* netMgr, RsNxsObserver* nxsObs = NULL); + virtual ~RsGxsNetService(); + public: @@ -147,7 +160,13 @@ public: */ int requestGrp(const std::list& grpId, uint8_t hops){ return 0;} + /* p3Config methods */ +public: + + bool loadList(std::list& load); + bool saveList(bool &cleanup, std::list&); + RsSerialiser *setupSerialiser(); public: @@ -169,12 +188,6 @@ private: */ void recvNxsItemQueue(); - /*! - * Processes synchronisation requests. If request is valid this generates - * msg/grp response transaction with sending peer - */ - void processSyncRequests(); - /** S: Transaction processing **/ @@ -224,13 +237,13 @@ private: * The cb listener is the owner of the grps * @param grps */ - void notifyListenerGrps(std::list& grps); + //void notifyListenerGrps(std::list& grps); /*! * The cb listener is the owner of the msgs * @param msgs */ - void notifyListenerMsgs(std::list& msgs); + //void notifyListenerMsgs(std::list& msgs); /*! * @param tr transaction responsible for generating msg request @@ -263,7 +276,9 @@ private: /*! * Handles an nxs item for group synchronisation - * @param item contaims grp sync info + * by startin a transaction and sending a list + * of groups held by user + * @param item contains grp sync info */ void handleRecvSyncGroup(RsNxsSyncGrp* item); @@ -276,6 +291,8 @@ private: /** E: item handlers **/ + void syncWithPeers(); + private: /*** transactions ***/ @@ -286,7 +303,7 @@ private: /// completed transactions std::list mComplTransactions; - /// transaction id + /// transaction id counter uint32_t mTransactionN; /*** transactions ***/ @@ -308,6 +325,9 @@ private: /// for other members save transactions RsMutex mNxsMutex; + uint32_t mSyncTs; + const uint32_t mSYNC_PERIOD; + }; #endif // RSGXSNETSERVICE_H diff --git a/libretroshare/src/gxs/rsnxs.h b/libretroshare/src/gxs/rsnxs.h index 2950bc266..67a3f4279 100644 --- a/libretroshare/src/gxs/rsnxs.h +++ b/libretroshare/src/gxs/rsnxs.h @@ -60,7 +60,7 @@ class RsNetworkExchangeService { public: - RsNetworkExchangeService(); + RsNetworkExchangeService(){ return;} /*! * Use this to set how far back synchronisation of messages should take place @@ -107,7 +107,7 @@ public: * from peers * @param enabled set to false to disable pause, and true otherwise */ - virtual void pauseSynchronisation(bool enabled); + virtual void pauseSynchronisation(bool enabled) = 0; /*! diff --git a/libretroshare/src/gxs/rsnxsobserver.h b/libretroshare/src/gxs/rsnxsobserver.h index bb50081a4..67ff2f799 100644 --- a/libretroshare/src/gxs/rsnxsobserver.h +++ b/libretroshare/src/gxs/rsnxsobserver.h @@ -42,7 +42,7 @@ public: /*! * @param messages messages are deleted after function returns */ - virtual void notifyNewMessages(std::vector messages) = 0; + virtual void notifyNewMessages(std::vector& messages) = 0; /*! * @param messages messages are deleted after function returns diff --git a/libretroshare/src/serialiser/rsnxsitems.cc b/libretroshare/src/serialiser/rsnxsitems.cc index 06150bef5..43a01396e 100644 --- a/libretroshare/src/serialiser/rsnxsitems.cc +++ b/libretroshare/src/serialiser/rsnxsitems.cc @@ -9,9 +9,26 @@ const uint8_t RsNxsSyncMsgItem::FLAG_REQUEST = 0x001; const uint8_t RsNxsSyncMsgItem::FLAG_RESPONSE = 0x002; const uint8_t RsNxsSyncGrp::FLAG_USE_SYNC_HASH = 0x001; - const uint8_t RsNxsSyncMsg::FLAG_USE_SYNC_HASH = 0x001; +/** transaction state **/ +const uint16_t RsNxsTransac::FLAG_BEGIN_P1 = 0x0001; +const uint16_t RsNxsTransac::FLAG_BEGIN_P2 = 0x0002; +const uint16_t RsNxsTransac::FLAG_END_SUCCESS = 0x0004; +const uint16_t RsNxsTransac::FLAG_CANCEL = 0x0008; +const uint16_t RsNxsTransac::FLAG_END_FAIL_NUM = 0x0010; +const uint16_t RsNxsTransac::FLAG_END_FAIL_TIMEOUT = 0x0020; +const uint16_t RsNxsTransac::FLAG_END_FAIL_FULL = 0x0040; + + +/** transaction type **/ +const uint16_t RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP = 0x0100; +const uint16_t RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP = 0x0200; +const uint16_t RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ = 0x0400; +const uint16_t RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ = 0x0800; +const uint16_t RsNxsTransac::FLAG_TYPE_GRPS = 0x1000; +const uint16_t RsNxsTransac::FLAG_TYPE_MSGS = 0x2000; + uint32_t RsNxsSerialiser::size(RsItem *item) { diff --git a/libretroshare/src/serialiser/rsnxsitems.h b/libretroshare/src/serialiser/rsnxsitems.h index b4fec30d8..e1ebae61d 100644 --- a/libretroshare/src/serialiser/rsnxsitems.h +++ b/libretroshare/src/serialiser/rsnxsitems.h @@ -114,7 +114,7 @@ public: static const uint16_t FLAG_TRANS_MASK = 0xf; static const uint16_t FLAG_TYPE_MASK = 0xff; - /** transaction **/ + /** transaction state **/ static const uint16_t FLAG_BEGIN_P1; static const uint16_t FLAG_BEGIN_P2; static const uint16_t FLAG_END_SUCCESS; diff --git a/libretroshare/src/tests/gxs/nxstesthub.cc b/libretroshare/src/tests/gxs/nxstesthub.cc index 60076b1fd..3eb5fd985 100644 --- a/libretroshare/src/tests/gxs/nxstesthub.cc +++ b/libretroshare/src/tests/gxs/nxstesthub.cc @@ -1,5 +1,74 @@ #include "nxstesthub.h" -NxsTestHub::NxsTestHub() +NxsTestHub::NxsTestHub(NxsTestScenario* nts) : mTestScenario(nts) { + + netServicePairs.first = new RsGxsNetService(0, mTestScenario->dummyDataService1(), &netMgr1, mTestScenario); + netServicePairs.second = new RsGxsNetService(0, mTestScenario->dummyDataService2(), &netMgr2, mTestScenario); + + mServicePairs.first = netServicePairs.first; + mServicePairs.second = netServicePairs.second; + + createThread(*(netServicePairs.first)); + createThread(*(netServicePairs.second)); +} + +NxsTestHub::~NxsTestHub() +{ + delete netServicePairs.first; + delete netServicePairs.second; +} + + +void NxsTestHub::run() +{ + + std::list send_queue_s1, send_queue_s2; + + while(isRunning()){ + + // make thread sleep for a couple secs + usleep(300); + + p3Service* s1 = mServicePairs.first; + p3Service* s2 = mServicePairs.second; + + RsItem* item = NULL; + while((item = s1->send()) != NULL) + { + send_queue_s1.push_back(item); + } + + while((item = s2->send()) != NULL) + { + send_queue_s2.push_back(item); + } + + while(!send_queue_s1.empty()){ + item = send_queue_s1.front(); + s2->receive(dynamic_cast(item)); + send_queue_s1.pop_front(); + } + + while(!send_queue_s2.empty()){ + item = send_queue_s2.front(); + s1->receive(dynamic_cast(item)); + send_queue_s2.pop_front(); + } + + // tick services so nxs net services processe items + s1->tick(); + s2->tick(); + } + + // also shut down this net service peers if this goes down + netServicePairs.first->join(); + netServicePairs.second->join(); +} + +bool NxsTestHub::testsPassed() +{ + + + return false; } diff --git a/libretroshare/src/tests/gxs/nxstesthub.h b/libretroshare/src/tests/gxs/nxstesthub.h index 1e2bc7f72..24391589e 100644 --- a/libretroshare/src/tests/gxs/nxstesthub.h +++ b/libretroshare/src/tests/gxs/nxstesthub.h @@ -3,35 +3,83 @@ #include "util/rsthreads.h" #include "gxs/rsgxsnetservice.h" +#include "nxstestscenario.h" -/*! - * This scenario module allows you to model - * simply back and forth conversation between nxs and a virtual peer - * (this module being the virtual peer) - */ -class NxsScenario +// it would probably be useful if the test scenario +// provided the net dummy managers +// hence one could envision synchronising between an arbitrary number +// of peers + + +class NxsNetDummyMgr1 : public RsNxsNetMgr { - static int SCENARIO_OUTGOING; - static int SCENARIO_INCOMING; - public: - virtual int scenarioType() = 0; + NxsNetDummyMgr1() : mOwnId("peerA") { - virtual void receive(RsNxsItem* ) = 0; - virtual RsNxsItem* send() = 0; + mPeers.insert("peerB"); + } + + std::string getOwnId() { return mOwnId; } + void getOnlineList(std::set& ssl_peers) { ssl_peers = mPeers; } + +private: + + std::string mOwnId; + std::set mPeers; }; +class NxsNetDummyMgr2 : public RsNxsNetMgr +{ + +public: + + NxsNetDummyMgr2() : mOwnId("peerB") { + + mPeers.insert("peerA"); + + } + + std::string getOwnId() { return mOwnId; } + void getOnlineList(std::set& ssl_peers) { ssl_peers = mPeers; } + +private: + + std::string mOwnId; + std::set mPeers; +}; +/*! + * Testing of nxs services occurs through use of two services + * When a service sends this class can interrogate the send and the receives of + * + * NxsScenario stores the type of synchronisation to be tested + * Operation: + * First NxsTestHub needs to be instantiated with a test scenario + * * The scenario contains two databases to be used on the communicating pair of RsGxsNetService instances (net instances) + * The Test hub has a ticker service for the p3Services which allows the netservices to search what groups and messages they have + * and synchronise according to their subscriptions. The default is to subscribe to all groups held by other peer + * The threads for both net instances are started which begins their processing of transactions + */ class NxsTestHub : public RsThread { public: - NxsTestHub(NxsScenario* , std::pair servicePair); + /*! + * This construct the test hub + * for a give scenario in mind + */ + NxsTestHub(NxsTestScenario*); + + + /*! + * + */ + virtual ~NxsTestHub(); /*! * To be called only after this thread has @@ -39,7 +87,22 @@ public: */ bool testsPassed(); + /*! + * This simulates the p3Service ticker and calls both gxs net services tick methods + * Also enables transport of messages between both services + */ void run(); + + +private: + + std::pair mServicePairs; + std::pair netServicePairs; + NxsTestScenario *mTestScenario; + + NxsNetDummyMgr1 netMgr1; + NxsNetDummyMgr2 netMgr2; + }; #endif // NXSTESTHUB_H diff --git a/libretroshare/src/tests/gxs/nxstestscenario.cc b/libretroshare/src/tests/gxs/nxstestscenario.cc new file mode 100644 index 000000000..0427c29f4 --- /dev/null +++ b/libretroshare/src/tests/gxs/nxstestscenario.cc @@ -0,0 +1,147 @@ +/* + * nxstestscenario.cc + * + * Created on: 10 Jul 2012 + * Author: crispy + */ + +#include "nxstestscenario.h" +#include "gxs/rsdataservice.h" +#include "data_support.h" + +NxsMessageTest::NxsMessageTest() +{ + mStorePair.first = new RsDataService(".", "dStore1", 0); + mStorePair.second = new RsDataService(".", "dStore2", 0); + + setUpDataBases(); +} + +std::string NxsMessageTest::getTestName() +{ + return std::string("Nxs Message Test!"); +} + +NxsMessageTest::~NxsMessageTest(){ + delete mStorePair.first; + delete mStorePair.second; +} +void NxsMessageTest::setUpDataBases() +{ + // create several groups and then messages of that + // group for both second and first of pair + RsDataService* dStore = dynamic_cast(mStorePair.first); + populateStore(dStore); + + dStore = dynamic_cast(mStorePair.second); + populateStore(dStore); + + dStore = NULL; + return; +} + +void NxsMessageTest::populateStore(RsGeneralDataService* dStore) +{ + + int nGrp = rand()%7; + std::vector grpIdList; + std::map grps; + RsNxsGrp* grp = NULL; + RsGxsGrpMetaData* grpMeta =NULL; + for(int i = 0; i < nGrp; i++) + { + std::pair p; + grp = new RsNxsGrp(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM); + grpMeta = new RsGxsGrpMetaData(); + p.first = grp; + p.second = grpMeta; + init_item(*grp); + init_item(grpMeta); + grpMeta->mGroupId = grp->grpId; + grps.insert(p); + grpIdList.push_back(grp->grpId); + grpMeta = NULL; + grp = NULL; + } + + dStore->storeGroup(grps); + + + std::map::iterator grp_it + = grps.begin(); + for(; grp_it != grps.end(); grp_it++) + { + delete grp_it->first; + delete grp_it->second; + } + + + int nMsgs = rand()%23; + std::map msgs; + RsNxsMsg* msg = NULL; + RsGxsMsgMetaData* msgMeta = NULL; + + for(int i=0; i p(msg, msgMeta); + int chosen = 0; + + const std::string& grpId = grpIdList[rand()%nGrp]; + msgMeta->mMsgId = msg->msgId; + msgMeta->mGroupId = msg->grpId = grpId; + + msg = NULL; + msgMeta = NULL; + + msgs.insert(p); + } + + + dStore->storeMessage(msgs); + + // clean up + std::map::iterator msg_it + = msgs.begin(); + + for(; msg_it != msgs.end(); msg_it++) + { + delete msg_it->first; + delete msg_it->second; + } + + return; +} + +void NxsMessageTest::notifyNewMessages(std::vector& messages) +{ + std::vector::iterator vit = messages.begin(); + + for(; vit != messages.end(); vit++) + { + mPeerMsgs[(*vit)->PeerId()].push_back(*vit); + } +} + +void NxsMessageTest::notifyNewGroups(std::vector& groups) +{ + std::vector::iterator vit = groups.begin(); + + for(; vit != groups.end(); vit++) + { + mPeerGrps[(*vit)->PeerId()].push_back(*vit); + } +} + +RsGeneralDataService* NxsMessageTest::dummyDataService1() +{ + return mStorePair.first; +} + +RsGeneralDataService* NxsMessageTest::dummyDataService2() +{ + return mStorePair.second; +} diff --git a/libretroshare/src/tests/gxs/nxstestscenario.h b/libretroshare/src/tests/gxs/nxstestscenario.h new file mode 100644 index 000000000..15b469737 --- /dev/null +++ b/libretroshare/src/tests/gxs/nxstestscenario.h @@ -0,0 +1,68 @@ +/* + * nxstestscenario.h + * + * Created on: 10 Jul 2012 + * Author: crispy + */ + +#ifndef NXSTESTSCENARIO_H_ +#define NXSTESTSCENARIO_H_ + +#include +#include "gxs/rsdataservice.h" +#include "gxs/rsnxsobserver.h" + +/*! + * This scenario module provides data resources + */ +class NxsTestScenario : public RsNxsObserver +{ + +public: + + virtual std::string getTestName() = 0; + virtual RsGeneralDataService* dummyDataService1() = 0; + virtual RsGeneralDataService* dummyDataService2() = 0; + + +}; + +class NxsMessageTest : public NxsTestScenario +{ + +public: + + NxsMessageTest(); + virtual ~NxsMessageTest(); + std::string getTestName(); + RsGeneralDataService* dummyDataService1(); + RsGeneralDataService* dummyDataService2(); + +public: + + /*! + * @param messages messages are deleted after function returns + */ + void notifyNewMessages(std::vector& messages); + + /*! + * @param messages messages are deleted after function returns + */ + void notifyNewGroups(std::vector& groups); + + +private: + void setUpDataBases(); + void populateStore(RsGeneralDataService* dStore); + +private: + + std::string mTestName; + std::pair mStorePair; + std::map > mPeerMsgs; + std::map > mPeerGrps; + +}; + + +#endif /* NXSTESTSCENARIO_H_ */ diff --git a/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc b/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc new file mode 100644 index 000000000..6ab66b6e1 --- /dev/null +++ b/libretroshare/src/tests/gxs/rsgxsnetservice_test.cc @@ -0,0 +1,34 @@ +/* + * rsgxsnetservice_test.cc + * + * Created on: 11 Jul 2012 + * Author: crispy + */ + +#include "util/utest.h" +#include "nxstesthub.h" +#include "nxstestscenario.h" + +INITTEST(); + + +int main() +{ + + // first setup + NxsMessageTest msgTest; + NxsTestHub hub(&msgTest); + + // now get things started + createThread(hub); + + // put this thread to sleep for 10 secs + usleep(10000); + + hub.join(); + CHECK(hub.testsPassed()); + + FINALREPORT("RsGxsNetService Tests"); + + return TESTRESULT(); +} diff --git a/libretroshare/src/util/retrodb.cc b/libretroshare/src/util/retrodb.cc index 1773c7913..3af1e67aa 100644 --- a/libretroshare/src/util/retrodb.cc +++ b/libretroshare/src/util/retrodb.cc @@ -30,7 +30,7 @@ #include "retrodb.h" -#define RETRODB_DEBUG +//#define RETRODB_DEBUG void free_blob(void* dat){