Fix for NXS group sync: need to set destination peer when constructing

payload. 
Generalised nxstest to simulate sync among arbitrary number of peers
added pass conditions to test scenerio with seperate notify observers
per peer
cleaned up test code and enabled auto cleaning of run dir (remove db
junk)


git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5480 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2012-08-28 21:11:54 +00:00
parent e99d84403d
commit 60e1d5e68e
7 changed files with 279 additions and 187 deletions

View file

@ -182,14 +182,14 @@ public:
* @param msg map of message and decoded meta data information
* @return error code
*/
virtual int storeMessage(std::map<RsNxsMsg*, RsGxsMsgMetaData*>& msg) = 0;
virtual int storeMessage(std::map<RsNxsMsg*, RsGxsMsgMetaData*>& msgs) = 0;
/*!
* Stores a list of groups in data store
* @param grp map of group and decoded meta data
* @return error code
*/
virtual int storeGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grp) = 0;
virtual int storeGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grsp) = 0;
/*!

View file

@ -28,7 +28,7 @@
#define NXS_NET_DEBUG
#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing)
#define SYNC_PERIOD 10 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds
@ -36,7 +36,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs)
: p3Config(servType), p3ThreadedService(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), mTransactionN(0),
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD)
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD), mSyncTs(0)
{
addSerialType(new RsNxsSerialiser(mServType));
@ -57,8 +57,9 @@ int RsGxsNetService::tick(){
recvNxsItemQueue();
uint32_t now = time(NULL);
uint32_t elapsed = mSYNC_PERIOD + mSyncTs;
if((mSYNC_PERIOD + mSyncTs) < now)
if((elapsed) < now)
{
syncWithPeers();
mSyncTs = now;
@ -84,7 +85,14 @@ void RsGxsNetService::syncWithPeers()
sendItem(grp);
}
// TODO msgs
// // TODO msgs
// for(; sit != peers.end(); sit++)
// {
// RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType);
// msg->clear();
// msg->PeerId(*sit);
// sendItem(msg);
// }
}
@ -326,7 +334,8 @@ void RsGxsNetService::run(){
bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr)
{
return tr->mTimeOut < ((uint32_t) time(NULL));
//return tr->mTimeOut < ((uint32_t) time(NULL));
return false;
}
void RsGxsNetService::processTransactions(){
@ -599,7 +608,6 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
std::vector<RsNxsMsg*> msgs;
while(tr->mItems.size() > 0)
@ -762,6 +770,8 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
}
}
if(reqList.empty())
{
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ
@ -788,6 +798,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
delete newTrans;
}
}
}
void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
{
@ -832,7 +843,7 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
if(grpMetaMap.find(grpId) == grpMetaMap.end()){
RsNxsSyncGrpItem* grpItem = new RsNxsSyncGrpItem(mServType);
grpItem->PeerId(tr->mTransaction->PeerId());
grpItem->grpId = grpId;
grpItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
grpItem->transactionNumber = transN;
@ -897,7 +908,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
std::string peerId = tr->mTransaction->PeerId();
for(;mit != grps.end(); mit++)
{
mit->second->PeerId(peerId); // set so it gets send to right peer
mit->second->PeerId(peerId); // set so it gets sent to right peer
mit->second->transactionNumber = transN;
newTr->mItems.push_back(mit->second);
}
@ -913,6 +924,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
RsNxsTransac::FLAG_TYPE_GRPS;
ntr->nItems = grps.size();
ntr->PeerId(tr->mTransaction->PeerId());
newTr->mTransaction = new RsNxsTransac(*ntr);
newTr->mTransaction->PeerId(mOwnId);
@ -982,7 +994,6 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
if(grp.empty())
return;
std::vector<RsNxsSyncGrpItem*> grpSyncItems;
std::map<std::string, RsGxsGrpMetaData*>::iterator mit =
grp.begin();

View file

@ -1,77 +1,125 @@
#include "nxstesthub.h"
NxsTestHub::NxsTestHub(NxsTestScenario* nts) : mTestScenario(nts)
NxsTestHub::NxsTestHub(NxsTestScenario * nts, std::set<std::string> &peers) : mTestScenario(nts)
{
netServicePairs.first = new RsGxsNetService(mTestScenario->getServiceType(),
mTestScenario->dummyDataService1(), &netMgr1, mTestScenario);
netServicePairs.second = new RsGxsNetService(mTestScenario->getServiceType(),
mTestScenario->dummyDataService2(), &netMgr2, mTestScenario);
std::set<std::string>::iterator sit = peers.begin();
mServicePairs.first = netServicePairs.first;
mServicePairs.second = netServicePairs.second;
for(; sit != peers.end(); sit++)
{
std::set<std::string> msgPeers = peers;
createThread(*(netServicePairs.first));
createThread(*(netServicePairs.second));
// add peers all peers except one iterator currently points to
msgPeers.erase(*sit);
NxsNetDummyMgr* dummyMgr = new NxsNetDummyMgr(*sit, msgPeers);
RsGeneralDataService* ds = mTestScenario->getDataService(*sit);
NxsMessageTestObserver* obs = new NxsMessageTestObserver(ds);
RsGxsNetService* netService =
new RsGxsNetService(mTestScenario->getServiceType(),
ds, dummyMgr, obs);
mNetServices.insert(std::make_pair(*sit, netService));
mObservers.insert(std::make_pair(*sit, obs));
}
sit = peers.begin();
// launch net services
for(; sit != peers.end(); sit++)
{
RsGxsNetService* n = mNetServices[*sit];
createThread(*n);
mServices.insert(std::make_pair(*sit, n));
}
}
NxsTestHub::~NxsTestHub()
{
delete netServicePairs.first;
delete netServicePairs.second;
std::map<std::string, RsGxsNetService*>::iterator mit = mNetServices.begin();
for(; mit != mNetServices.end(); mit++)
delete mit->second;
}
void NxsTestHub::run()
{
std::list<RsItem*> send_queue_s1, send_queue_s2;
double timeDelta = .2;
while(isRunning()){
// make thread sleep for a couple secs
usleep(3000);
// make thread sleep for a bit
#ifndef WINDOWS_SYS
usleep((int) (timeDelta * 1000000));
#else
Sleep((int) (timeDelta * 1000));
#endif
p3Service* s1 = mServicePairs.first;
p3Service* s2 = mServicePairs.second;
std::map<std::string, p3Service*>::iterator mit = mServices.begin();
for(; mit != mServices.end(); mit++)
{
p3Service* s = mit->second;
s->tick();
}
mit = mServices.begin();
// collect msgs to send to peers from peers
for(; mit != mServices.end(); mit++)
{
const std::string& peer = mit->first;
p3Service* s = mit->second;
// first store all the sends from all services
RsItem* item = NULL;
while((item = s1->send()) != NULL)
while((item = s->send()) != NULL){
const std::string peerToReceive = item->PeerId();
// set the peer this item comes from
item->PeerId(peer);
mPeerQueues[peerToReceive].push_back(item);
}
}
// now route items to peers
std::map<std::string, std::vector<RsItem*> >::iterator mit_queue = mPeerQueues.begin();
for(; mit_queue != mPeerQueues.end(); mit_queue++)
{
item->PeerId("PeerB");
send_queue_s1.push_back(item);
}
while((item = s2->send()) != NULL)
std::vector<RsItem*>& queueV = mit_queue->second;
std::vector<RsItem*>::iterator vit = queueV.begin();
const std::string peerToReceive = mit_queue->first;
for(; vit != queueV.end(); vit++)
{
item->PeerId("PeerA");
send_queue_s2.push_back(item);
}
while(!send_queue_s1.empty()){
item = send_queue_s1.front();
s2->receive(dynamic_cast<RsRawItem*>(item));
send_queue_s1.pop_front();
}
RsItem* item = *vit;
p3Service* service = mServices[peerToReceive];
while(!send_queue_s2.empty()){
item = send_queue_s2.front();
s1->receive(dynamic_cast<RsRawItem*>(item));
send_queue_s2.pop_front();
service->receive(dynamic_cast<RsRawItem*>(item));
}
queueV.clear();
}
// tick services so nxs net services process items
s1->tick();
s2->tick();
}
// also shut down this net service peers if this goes down
netServicePairs.first->join();
netServicePairs.second->join();
}
void NxsTestHub::cleanUp()
{
std::map<std::string, RsGxsNetService*>::iterator mit = mNetServices.begin();
for(; mit != mNetServices.end(); mit++)
{
RsGxsNetService* n = mit->second;
n->join();
}
// also shut down this net service peers if this goes down
mTestScenario->cleanUp();
}

View file

@ -11,14 +11,13 @@
// of peers
class NxsNetDummyMgr1 : public RsNxsNetMgr
class NxsNetDummyMgr : public RsNxsNetMgr
{
public:
NxsNetDummyMgr1() : mOwnId("peerA") {
NxsNetDummyMgr(std::string ownId, std::set<std::string> peers) : mOwnId(ownId), mPeers(peers) {
mPeers.insert("peerB");
}
std::string getOwnId() { return mOwnId; }
@ -31,26 +30,6 @@ private:
};
class NxsNetDummyMgr2 : public RsNxsNetMgr
{
public:
NxsNetDummyMgr2() : mOwnId("peerB") {
mPeers.insert("peerA");
}
std::string getOwnId() { return mOwnId; }
void getOnlineList(std::set<std::string>& ssl_peers) { ssl_peers = mPeers; }
private:
std::string mOwnId;
std::set<std::string> mPeers;
};
/*!
* Testing of nxs services occurs through use of two services
@ -73,7 +52,7 @@ public:
* This construct the test hub
* for a give scenario in mind
*/
NxsTestHub(NxsTestScenario*);
NxsTestHub(NxsTestScenario*, std::set<std::string>& peers);
/*!
@ -97,12 +76,13 @@ public:
void cleanUp();
private:
std::pair<p3Service*, p3Service*> mServicePairs;
std::pair<RsGxsNetService*, RsGxsNetService*> netServicePairs;
NxsTestScenario *mTestScenario;
std::map<std::string, p3Service*> mServices;
std::map<std::string, RsGxsNetService*> mNetServices;
std::map<std::string, NxsMessageTestObserver*> mObservers;
NxsNetDummyMgr1 netMgr1;
NxsNetDummyMgr2 netMgr2;
std::map<std::string, std::vector<RsItem*> > mPeerQueues;
NxsTestScenario *mTestScenario;
};

View file

@ -10,12 +10,9 @@
#include "data_support.h"
NxsMessageTest::NxsMessageTest(uint16_t servtype)
: mServType(servtype)
: mServType(servtype), mMsgTestMtx("mMsgTestMtx")
{
mStorePair.first = new RsDataService(".", "dStore1", mServType);
mStorePair.second = new RsDataService(".", "dStore2", mServType);
setUpDataBases();
}
std::string NxsMessageTest::getTestName()
@ -24,21 +21,33 @@ std::string NxsMessageTest::getTestName()
}
NxsMessageTest::~NxsMessageTest(){
delete mStorePair.first;
delete mStorePair.second;
}
void NxsMessageTest::setUpDataBases()
std::map<std::string, RsGeneralDataService*>::iterator mit = mPeerStoreMap.begin();
for(; mit != mPeerStoreMap.end(); mit++)
{
// create several groups and then messages of that
// group for both second and first of pair
RsDataService* dStore = dynamic_cast<RsDataService*>(mStorePair.first);
delete mit->second;
}
std::set<std::string>::iterator sit = mStoreNames.begin();
// remove db file
for(; sit != mStoreNames.end(); sit++)
{
const std::string& name = *sit;
remove(name.c_str());
}
}
RsGeneralDataService* NxsMessageTest::getDataService(const std::string& peer)
{
if(mPeerStoreMap.find(peer) != mPeerStoreMap.end()) return NULL;
RsDataService* dStore = new RsDataService("./", peer.c_str(), mServType);
mStoreNames.insert(peer);
mPeerStoreMap.insert(std::make_pair(peer, dStore));
populateStore(dStore);
dStore = dynamic_cast<RsDataService*>(mStorePair.second);
populateStore(dStore);
dStore = NULL;
return;
return dStore;
}
uint16_t NxsMessageTest::getServiceType()
@ -49,7 +58,7 @@ uint16_t NxsMessageTest::getServiceType()
void NxsMessageTest::populateStore(RsGeneralDataService* dStore)
{
int nGrp = rand()%7;
int nGrp = (rand()%2)+1;
std::vector<std::string> grpIdList;
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grps;
RsNxsGrp* grp = NULL;
@ -122,40 +131,51 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore)
return;
}
void NxsMessageTest::notifyNewMessages(std::vector<RsNxsMsg*>& messages)
{
std::vector<RsNxsMsg*>::iterator vit = messages.begin();
for(; vit != messages.end(); vit++)
{
mPeerMsgs[(*vit)->PeerId()].push_back(*vit);
}
}
void NxsMessageTest::notifyNewGroups(std::vector<RsNxsGrp*>& groups)
{
std::vector<RsNxsGrp*>::iterator vit = groups.begin();
for(; vit != groups.end(); vit++)
{
mPeerGrps[(*vit)->PeerId()].push_back(*vit);
}
}
RsGeneralDataService* NxsMessageTest::dummyDataService1()
{
return mStorePair.first;
}
RsGeneralDataService* NxsMessageTest::dummyDataService2()
{
return mStorePair.second;
}
void NxsMessageTest::cleanUp()
{
mStorePair.first->resetDataStore();
mStorePair.second->resetDataStore();
std::map<std::string, RsGeneralDataService*>::iterator mit = mPeerStoreMap.begin();
for(; mit != mPeerStoreMap.end(); mit++)
{
RsGeneralDataService* d = mit->second;
d->resetDataStore();
}
return;
}
bool NxsMessageTest::testPassed(){
return false;
}
/*******************************/
NxsMessageTestObserver::NxsMessageTestObserver(RsGeneralDataService *dStore)
:mStore(dStore)
{
}
void NxsMessageTestObserver::notifyNewGroups(std::vector<RsNxsGrp *> &groups)
{
std::vector<RsNxsGrp*>::iterator vit = groups.begin();
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grps;
for(; vit != groups.end(); vit++)
{
RsNxsGrp* grp = *vit;
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
meta->mGroupId = grp->grpId;
grps.insert(std::make_pair(grp, meta));
}
mStore->storeGroup(grps);
}
void NxsMessageTestObserver::notifyNewMessages(std::vector<RsNxsMsg *> &messages)
{
}

View file

@ -15,14 +15,26 @@
/*!
* This scenario module provides data resources
*/
class NxsTestScenario : public RsNxsObserver
class NxsTestScenario
{
public:
virtual std::string getTestName() = 0;
virtual RsGeneralDataService* dummyDataService1() = 0;
virtual RsGeneralDataService* dummyDataService2() = 0;
/*!
* @param peer
* @param namePath
* @return data service with populated with random grp/msg data, null if peer or pathname exists
*/
virtual RsGeneralDataService* getDataService(const std::string& peer) = 0;
virtual bool testPassed() = 0;
/*!
* Service type for this test
* should correspond to serialiser service type
*/
virtual uint16_t getServiceType() = 0;
/*!
@ -34,6 +46,28 @@ public:
};
class NxsMessageTestObserver : public RsNxsObserver
{
public:
NxsMessageTestObserver(RsGeneralDataService* dStore);
/*!
* @param messages messages are deleted after function returns
*/
void notifyNewMessages(std::vector<RsNxsMsg*>& messages);
/*!
* @param messages messages are deleted after function returns
*/
void notifyNewGroups(std::vector<RsNxsGrp*>& groups);
private:
RsGeneralDataService* mStore;
};
class NxsMessageTest : public NxsTestScenario
{
@ -43,8 +77,7 @@ public:
virtual ~NxsMessageTest();
std::string getTestName();
uint16_t getServiceType();
RsGeneralDataService* dummyDataService1();
RsGeneralDataService* dummyDataService2();
RsGeneralDataService* getDataService(const std::string& peer);
/*!
* Call to remove files created
@ -52,18 +85,7 @@ public:
*/
void cleanUp();
public:
/*!
* @param messages messages are deleted after function returns
*/
void notifyNewMessages(std::vector<RsNxsMsg*>& messages);
/*!
* @param messages messages are deleted after function returns
*/
void notifyNewGroups(std::vector<RsNxsGrp*>& groups);
bool testPassed();
private:
void setUpDataBases();
@ -72,11 +94,12 @@ private:
private:
std::string mTestName;
std::pair<RsGeneralDataService*, RsGeneralDataService*> mStorePair;
std::map<std::string, std::vector<RsNxsMsg*> > mPeerMsgs;
std::map<std::string, std::vector<RsNxsGrp*> > mPeerGrps;
std::map<std::string, RsGeneralDataService*> mPeerStoreMap;
std::set<std::string> mStoreNames;
uint16_t mServType;
RsMutex mMsgTestMtx;
};

View file

@ -17,13 +17,23 @@ int main()
// first setup
NxsMessageTest msgTest(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM);
NxsTestHub hub(&msgTest);
std::set<std::string> peers;
peers.insert("PeerA");
peers.insert("PeerB");
NxsTestHub hub(&msgTest, peers);
// now get things started
createThread(hub);
double timeDelta = 30;
// put this thread to sleep for 10 secs
sleep(10);
// make thread sleep for a bit
#ifndef WINDOWS_SYS
usleep((int) (timeDelta * 1000000));
#else
Sleep((int) (timeDelta * 1000));
#endif
hub.join();
CHECK(hub.testsPassed());