group synchronisation now working (message syn not up yet, will do later, simple extension)

updated hub test


git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-new_cache_system@5300 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2012-07-15 12:38:20 +00:00
parent d3e5ec2836
commit 52a911329e
8 changed files with 112 additions and 63 deletions

View File

@ -577,11 +577,13 @@ int RsDataService::retrieveNxsGrps(std::map<std::string, RsNxsGrp *> &grp, bool
{
std::vector<RsNxsGrp*> grps;
retrieveGroups(c, grps);
std::vector<RsNxsGrp*>::iterator vit = grps.begin();
for(; vit != grps.end(); vit++)
if(!grps.empty())
{
grp[(*vit)->grpId] = *vit;
RsNxsGrp* ng = grps.front();
grp[ng->grpId] = ng;
}else{
grp.erase(grpId);
}
delete c;

View File

@ -23,7 +23,7 @@ public:
/*!
* Retrieves groups, if empty, retrieves all grps, if map is not empty
* only retrieve entries
* only retrieve entries, if entry cannot be found, it is removed from map
* @param grp retrieved groups
* @param cache whether to store retrieval in mem for faster later retrieval
* @return error code

View File

@ -2,18 +2,19 @@
#define NXS_NET_DEBUG
#define SYNC_PERIOD 1 // in microseconds every 10 seconds (1 second for testing)
#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds
RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs)
: p3Config(servType), p3ThreadedService(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds),
mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), mTransactionN(0),
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD)
{
addSerialType(new RsNxsSerialiser(mServType));
mOwnId = mNetMgr->getOwnId();
}
RsGxsNetService::~RsGxsNetService()
@ -190,13 +191,19 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item)
* For ending a transaction the
*/
const std::string& peer = item->PeerId();
std::string peer;
// for outgoing transaction use own id
if(item->transactFlag & (RsNxsTransac::FLAG_BEGIN_P2 | RsNxsTransac::FLAG_END_SUCCESS))
peer = mOwnId;
else
peer = item->PeerId();
uint32_t transN = item->transactionNumber;
item->timestamp = time(NULL); // register time received
NxsTransaction* tr = NULL;
#ifdef NXS_NET_DEBUG
std::cerr << "locked_processTransac() " << std::endl;
std::cerr << "locked_processTransac(), Received transaction item: " << transN << std::endl;
std::cerr << "locked_processTransac(), With peer: " << item->PeerId() << std::endl;
@ -225,13 +232,13 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item)
TransactionIdMap& transMap = mTransactions[peer];
if(transExists)
return false;
return false; // should not happen!
// create new transaction
tr = new NxsTransaction();
transMap[transN] = tr;
tr->mTransaction = item;
tr->mTimeOut = item->timestamp;
tr->mTimeOut = item->timestamp + mTransactionTimeOut;
// note state as receiving, commencement item
// is sent on next run() loop
@ -240,7 +247,7 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item)
// commencement item for outgoing transaction
}else if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P2){
// transaction does not exist
// transaction must exist
if(!peerTrExists || !transExists)
return false;
@ -291,13 +298,14 @@ void RsGxsNetService::run(){
}
}
bool RsGxsNetService::checkTransacTimedOut(NxsTransaction* tr)
bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr)
{
return tr->mTimeOut < ((uint32_t) time(NULL));
}
void RsGxsNetService::processTransactions(){
RsStackMutex stack(mNxsMutex);
TransactionsPeerMap::iterator mit = mTransactions.begin();
@ -324,7 +332,7 @@ void RsGxsNetService::processTransactions(){
uint32_t transN = tr->mTransaction->transactionNumber;
// first check transaction has not expired
if(checkTransacTimedOut(tr))
if(locked_checkTransacTimedOut(tr))
{
#ifdef NXS_NET_DEBUG
@ -405,7 +413,7 @@ void RsGxsNetService::processTransactions(){
uint32_t transN = tr->mTransaction->transactionNumber;
// first check transaction has not expired
if(checkTransacTimedOut(tr))
if(locked_checkTransacTimedOut(tr))
{
#ifdef NXS_NET_DEBUG
@ -455,6 +463,7 @@ void RsGxsNetService::processTransactions(){
trans->transactionNumber = transN;
trans->PeerId(tr->mTransaction->PeerId());
sendItem(trans);
tr->mFlag = NxsTransaction::FLAG_STATE_RECEIVING;
}
else{
@ -481,6 +490,7 @@ void RsGxsNetService::processTransactions(){
void RsGxsNetService::processCompletedTransactions()
{
RsStackMutex stack(mNxsMutex);
/*!
* Depending on transaction we may have to respond to peer
* responsible for transaction
@ -495,9 +505,9 @@ void RsGxsNetService::processCompletedTransactions()
bool outgoing = tr->mTransaction->PeerId() == mOwnId;
if(outgoing){
processCompletedOutgoingTrans(tr);
locked_processCompletedOutgoingTrans(tr);
}else{
locked_processCompletedIncomingTrans(tr);
}
@ -506,7 +516,7 @@ void RsGxsNetService::processCompletedTransactions()
}
}
void RsGxsNetService::processCompletedIncomingTrans(NxsTransaction* tr)
void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
{
uint16_t flag = tr->mTransaction->transactFlag;
@ -517,20 +527,20 @@ void RsGxsNetService::processCompletedIncomingTrans(NxsTransaction* tr)
if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP)
{
// generate request based on a peers response
genReqMsgTransaction(tr);
locked_genReqMsgTransaction(tr);
}else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP)
{
genReqGrpTransaction(tr);
locked_genReqGrpTransaction(tr);
}
// you've finished receiving request information now gen
else if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ)
{
genSendMsgsTransaction(tr);
locked_genSendMsgsTransaction(tr);
}
else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ)
{
genSendGrpsTransaction(tr);
locked_genSendGrpsTransaction(tr);
}
else if(flag & RsNxsTransac::FLAG_TYPE_GRPS)
{
@ -593,7 +603,7 @@ void RsGxsNetService::processCompletedIncomingTrans(NxsTransaction* tr)
return;
}
void RsGxsNetService::processCompletedOutgoingTrans(NxsTransaction* tr)
void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr)
{
uint16_t flag = tr->mTransaction->transactFlag;
@ -658,7 +668,7 @@ void RsGxsNetService::processCompletedOutgoingTrans(NxsTransaction* tr)
}
void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
{
// to create a transaction you need to know who you are transacting with
@ -705,7 +715,7 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr)
msgIdSet.insert((*vit)->mMsgId);
// get unique id for this transaction
uint32_t transN = getTransactionId();
uint32_t transN = locked_getTransactionId();
// add msgs that you don't have to request list
@ -733,11 +743,12 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr)
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(tr->mTransaction->PeerId());
transac->transactionNumber = transN;
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = 0;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
// create transaction copy with your id to indicate
// its an outgoing transaction
@ -747,13 +758,12 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr)
sendItem(transac);
{
RsStackMutex stack(mNxsMutex);
if(!locked_addTransaction(newTrans))
delete newTrans;
}
}
void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
{
// to create a transaction you need to know who you are transacting with
@ -788,7 +798,7 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr)
std::list<RsNxsSyncGrpItem*>::iterator llit = grpItemL.begin();
std::list<RsNxsItem*> reqList;
uint32_t transN = getTransactionId();
uint32_t transN = locked_getTransactionId();
for(; llit != grpItemL.end(); llit++)
{
@ -811,26 +821,31 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr)
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(tr->mTransaction->PeerId());
transac->transactionNumber = transN;
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = 0;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
{
RsStackMutex stack(mNxsMutex);
if(!locked_addTransaction(newTrans))
delete newTrans;
}
if(!locked_addTransaction(newTrans))
delete newTrans;
}
void RsGxsNetService::genSendGrpsTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
{
#ifdef NXS_NET_DEBUG
std::cerr << "locked_genSendGrpsTransaction()" << std::endl;
std::cerr << "Generating Grp data send fron TransN: " << tr->mTransaction->transactionNumber
<< std::endl;
#endif
// go groups requested in transaction tr
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
@ -845,41 +860,54 @@ void RsGxsNetService::genSendGrpsTransaction(NxsTransaction* tr)
mDataStore->retrieveNxsGrps(grps, false);
NxsTransaction* newTr = new NxsTransaction();
newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
uint32_t transN = locked_getTransactionId();
// store grp items to send in transaction
std::map<std::string, RsNxsGrp*>::iterator mit = grps.begin();
std::string peerId = tr->mTransaction->PeerId();
for(;mit != grps.end(); mit++)
{
mit->second->PeerId(peerId); // set so it gets send to right peer
mit->second->transactionNumber = transN;
newTr->mItems.push_back(mit->second);
}
if(newTr->mItems.empty()){
delete newTr;
return;
}
RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = getTransactionId();
ntr->transactionNumber = transN;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
RsNxsTransac::FLAG_TYPE_GRPS;
ntr->nItems = grps.size();
newTr->mTransaction = new RsNxsTransac(*ntr);
newTr->mTransaction->PeerId(mOwnId);
newTr->mTimeOut = time(NULL) + mTransactionTimeOut;
ntr->PeerId(tr->mTransaction->PeerId());
sendItem(ntr);
locked_addTransaction(newTr);
return;
}
void RsGxsNetService::genSendMsgsTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
{
return;
}
uint32_t RsGxsNetService::getTransactionId()
uint32_t RsGxsNetService::locked_getTransactionId()
{
RsStackMutex stack(mNxsMutex);
return mTransactionN++;
return ++mTransactionN;
}
bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr)
{
@ -918,6 +946,8 @@ void RsGxsNetService::cleanTransactionItems(NxsTransaction* tr) const
void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
{
RsStackMutex stack(mNxsMutex);
std::string peer = item->PeerId();
std::map<std::string, RsGxsGrpMetaData*> grp;
@ -933,6 +963,8 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
NxsTransaction* tr = new NxsTransaction();
std::list<RsNxsItem*>& itemL = tr->mItems;
uint32_t transN = locked_getTransactionId();
for(; mit != grp.end(); mit++)
{
RsNxsSyncGrpItem* gItem = new
@ -941,6 +973,7 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
gItem->grpId = mit->first;
gItem->publishTs = mit->second->mPublishTs;
gItem->PeerId(peer);
gItem->transactionNumber = transN;
itemL.push_back(gItem);
}
@ -950,18 +983,18 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
| RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP;
trItem->nItems = itemL.size();
trItem->timestamp = time(NULL);
trItem->timestamp = 0;
trItem->PeerId(peer);
trItem->transactionNumber = getTransactionId();
trItem->transactionNumber = transN;
// also make a copy for the resident transaction
tr->mTransaction = new RsNxsTransac(*trItem);
tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
// signal peer to prepare for transaction
sendItem(trItem);
RsStackMutex stack(mNxsMutex);
locked_addTransaction(tr);
return;
@ -969,6 +1002,7 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
{
RsStackMutex stack(mNxsMutex);
return;
}

View File

@ -212,13 +212,13 @@ private:
* Process transaction owned/started by user
* @param tr transaction to process, ownership stays with callee
*/
void processCompletedOutgoingTrans(NxsTransaction* tr);
void locked_processCompletedOutgoingTrans(NxsTransaction* tr);
/*!
* Process transactions started/owned by other peers
* @param tr transaction to process, ownership stays with callee
*/
void processCompletedIncomingTrans(NxsTransaction* tr);
void locked_processCompletedIncomingTrans(NxsTransaction* tr);
/*!
@ -240,7 +240,7 @@ private:
* This retrieves a unique transaction id that
* can be used in an outgoing transaction
*/
uint32_t getTransactionId();
uint32_t locked_getTransactionId();
/*!
* This attempts to push the transaction id counter back if you have
@ -265,28 +265,28 @@ private:
* of msgs received from peer stored in passed transaction
* @param tr transaction responsible for generating msg request
*/
void genReqMsgTransaction(NxsTransaction* tr);
void locked_genReqMsgTransaction(NxsTransaction* tr);
/*!
* Generates new transaction to send grp requests based on list
* of grps received from peer stored in passed transaction
* @param tr transaction responsible for generating grp request
*/
void genReqGrpTransaction(NxsTransaction* tr);
void locked_genReqGrpTransaction(NxsTransaction* tr);
/*!
* Generates new transaction to send msg data based on list
* of grpids received from peer stored in passed transaction
* @param tr transaction responsible for generating grp request
*/
void genSendMsgsTransaction(NxsTransaction* tr);
void locked_genSendMsgsTransaction(NxsTransaction* tr);
/*!
* Generates new transaction to send grp data based on list
* of grps received from peer stored in passed transaction
* @param tr transaction responsible for generating grp request
*/
void genSendGrpsTransaction(NxsTransaction* tr);
void locked_genSendGrpsTransaction(NxsTransaction* tr);
/*!
* convenience function to add a transaction to list
@ -300,7 +300,7 @@ private:
* @param tr the transaction to check for timeout
* @return false if transaction has timed out, true otherwise
*/
bool checkTransacTimedOut(NxsTransaction* tr);
bool locked_checkTransacTimedOut(NxsTransaction* tr);
/** E: Transaction processing **/

View File

@ -3,8 +3,10 @@
NxsTestHub::NxsTestHub(NxsTestScenario* nts) : mTestScenario(nts)
{
netServicePairs.first = new RsGxsNetService(0, mTestScenario->dummyDataService1(), &netMgr1, mTestScenario);
netServicePairs.second = new RsGxsNetService(0, mTestScenario->dummyDataService2(), &netMgr2, mTestScenario);
netServicePairs.first = new RsGxsNetService(mTestScenario->getServiceType(),
mTestScenario->dummyDataService1(), &netMgr1, mTestScenario);
netServicePairs.second = new RsGxsNetService(mTestScenario->getServiceType(),
mTestScenario->dummyDataService2(), &netMgr2, mTestScenario);
mServicePairs.first = netServicePairs.first;
mServicePairs.second = netServicePairs.second;
@ -28,7 +30,7 @@ void NxsTestHub::run()
while(isRunning()){
// make thread sleep for a couple secs
usleep(300);
usleep(3000);
p3Service* s1 = mServicePairs.first;
p3Service* s2 = mServicePairs.second;
@ -36,11 +38,13 @@ void NxsTestHub::run()
RsItem* item = NULL;
while((item = s1->send()) != NULL)
{
item->PeerId("PeerB");
send_queue_s1.push_back(item);
}
while((item = s2->send()) != NULL)
{
item->PeerId("PeerA");
send_queue_s2.push_back(item);
}
@ -56,7 +60,7 @@ void NxsTestHub::run()
send_queue_s2.pop_front();
}
// tick services so nxs net services processe items
// tick services so nxs net services process items
s1->tick();
s2->tick();
}

View File

@ -9,10 +9,11 @@
#include "gxs/rsdataservice.h"
#include "data_support.h"
NxsMessageTest::NxsMessageTest()
NxsMessageTest::NxsMessageTest(uint16_t servtype)
: mServType(servtype)
{
mStorePair.first = new RsDataService(".", "dStore1", 0);
mStorePair.second = new RsDataService(".", "dStore2", 0);
mStorePair.first = new RsDataService(".", "dStore1", mServType);
mStorePair.second = new RsDataService(".", "dStore2", mServType);
setUpDataBases();
}
@ -40,6 +41,11 @@ void NxsMessageTest::setUpDataBases()
return;
}
uint16_t NxsMessageTest::getServiceType()
{
return mServType;
}
void NxsMessageTest::populateStore(RsGeneralDataService* dStore)
{
@ -51,7 +57,7 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore)
for(int i = 0; i < nGrp; i++)
{
std::pair<RsNxsGrp*, RsGxsGrpMetaData*> p;
grp = new RsNxsGrp(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM);
grp = new RsNxsGrp(mServType);
grpMeta = new RsGxsGrpMetaData();
p.first = grp;
p.second = grpMeta;
@ -83,7 +89,7 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore)
for(int i=0; i<nMsgs; i++)
{
msg = new RsNxsMsg(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM);
msg = new RsNxsMsg(mServType);
msgMeta = new RsGxsMsgMetaData();
init_item(*msg);
init_item(msgMeta);

View File

@ -23,6 +23,7 @@ public:
virtual std::string getTestName() = 0;
virtual RsGeneralDataService* dummyDataService1() = 0;
virtual RsGeneralDataService* dummyDataService2() = 0;
virtual uint16_t getServiceType() = 0;
/*!
* Call to remove files created
@ -38,9 +39,10 @@ class NxsMessageTest : public NxsTestScenario
public:
NxsMessageTest();
NxsMessageTest(uint16_t servtype);
virtual ~NxsMessageTest();
std::string getTestName();
uint16_t getServiceType();
RsGeneralDataService* dummyDataService1();
RsGeneralDataService* dummyDataService2();
@ -73,6 +75,7 @@ private:
std::pair<RsGeneralDataService*, RsGeneralDataService*> mStorePair;
std::map<std::string, std::vector<RsNxsMsg*> > mPeerMsgs;
std::map<std::string, std::vector<RsNxsGrp*> > mPeerGrps;
uint16_t mServType;
};

View File

@ -16,7 +16,7 @@ int main()
{
// first setup
NxsMessageTest msgTest;
NxsMessageTest msgTest(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM);
NxsTestHub hub(&msgTest);
// now get things started