added cleanup function to test hub

refined group retrieval for RsDataService
also added fix for RsNxsTrasac flag masks
added group data send transaction generation, and fix for request generation

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-new_cache_system@5298 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2012-07-14 17:59:54 +00:00
parent 24f6f874f2
commit d3e5ec2836
12 changed files with 489 additions and 147 deletions

View File

@ -545,10 +545,54 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
int RsDataService::retrieveNxsGrps(std::map<std::string, RsNxsGrp *> &grp, bool cache){
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, grpColumns, "", "");
if(grp.empty()){
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, grpColumns, "", "");
if(c)
{
std::vector<RsNxsGrp*> grps;
retrieveGroups(c, grps);
std::vector<RsNxsGrp*>::iterator vit = grps.begin();
for(; vit != grps.end(); vit++)
{
grp[(*vit)->grpId] = *vit;
}
delete c;
}
}else{
std::map<std::string, RsNxsGrp *>::iterator mit = grp.begin();
for(; mit != grp.end(); mit++)
{
const std::string& grpId = mit->first;
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, grpColumns, "grpId='" + grpId + "'", "");
if(c)
{
std::vector<RsNxsGrp*> grps;
retrieveGroups(c, grps);
std::vector<RsNxsGrp*>::iterator vit = grps.begin();
for(; vit != grps.end(); vit++)
{
grp[(*vit)->grpId] = *vit;
}
delete c;
}
}
}
}
void RsDataService::retrieveGroups(RetroCursor* c, std::vector<RsNxsGrp*>& grps){
if(c){
bool valid = c->moveToFirst();
while(valid){
@ -557,15 +601,10 @@ int RsDataService::retrieveNxsGrps(std::map<std::string, RsNxsGrp *> &grp, bool
// only add the latest grp info
if(g)
{
grp[g->grpId] = g;
grps.push_back(g);
}
valid = c->moveToNext();
}
delete c;
return 1;
}else{
return 0;
}
}
@ -704,8 +743,8 @@ int RsDataService::resetDataStore()
for(; mit != grps.end(); mit++){
std::string file = mServiceDir + "/" + mit->first;
std::string msgFile = file + "-msgs";
remove(file.c_str());
remove(msgFile.c_str());
remove(file.c_str()); // remove group file
remove(msgFile.c_str()); // and remove messages file
}
mDb->closeDb();

View File

@ -22,7 +22,8 @@ public:
int retrieveNxsMsgs(const GxsMsgReq& reqIds, GxsMsgResult& msg, bool cache);
/*!
* Retrieves all groups stored (most current versions only)
* Retrieves groups, if empty, retrieves all grps, if map is not empty
* only retrieve entries
* @param grp retrieved groups
* @param cache whether to store retrieval in mem for faster later retrieval
* @return error code
@ -114,6 +115,13 @@ private:
*/
void retrieveMessages(RetroCursor* c, std::vector<RsNxsMsg*>& msgs);
/*!
* Retrieves all the msg results from a cursor
* @param c cursor to result set
* @param msgs messages retrieved from cursor are stored here
*/
void retrieveGroups(RetroCursor* c, std::vector<RsNxsGrp*>& msgs);
/*!
* extracts a msg meta item from a cursor at its
* current position

View File

@ -1,11 +1,15 @@
#include "rsgxsnetservice.h"
#define SYNC_PERIOD 1000 // every 10 seconds
#define TRANSAC_TIMEOUT 10 // 10 seconds
#define NXS_NET_DEBUG
#define SYNC_PERIOD 1 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds
RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs)
: p3Config(servType), p3ThreadedService(servType), mServType(servType), mDataStore(gds),
: p3Config(servType), p3ThreadedService(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds),
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD)
{
@ -30,6 +34,7 @@ int RsGxsNetService::tick(){
if((mSYNC_PERIOD + mSyncTs) < now)
{
syncWithPeers();
mSyncTs = now;
}
return 1;
@ -47,6 +52,7 @@ void RsGxsNetService::syncWithPeers()
for(; sit != peers.end(); sit++)
{
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
grp->clear();
grp->PeerId(*sit);
sendItem(grp);
}
@ -88,6 +94,11 @@ void RsGxsNetService::recvNxsItemQueue(){
// a live transaction has a non zero value
if(ni->transactionNumber != 0){
#ifdef NXS_NET_DEBUG
std::cerr << "recvNxsItemQueue()" << std::endl;
std::cerr << "handlingTransaction, transN" << ni->transactionNumber << std::endl;
#endif
if(handleTransaction(ni))
continue ;
}
@ -142,6 +153,13 @@ bool RsGxsNetService::handleTransaction(RsNxsItem* item){
transExists = transMap.find(transN) != transMap.end();
if(transExists){
#ifdef NXS_NET_DEBUG
std::cerr << "handleTransaction() " << std::endl;
std::cerr << "Consuming Transaction content, transN: " << item->transactionNumber << std::endl;
std::cerr << "Consuming Transaction content, from Peer: " << item->PeerId() << std::endl;
#endif
tr = transMap[transN];
tr->mItems.push_back(item);
}
@ -174,8 +192,17 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item)
const std::string& peer = item->PeerId();
uint32_t transN = item->transactionNumber;
item->timestamp = time(NULL); // register time received
NxsTransaction* tr = NULL;
#ifdef NXS_NET_DEBUG
std::cerr << "locked_processTransac() " << std::endl;
std::cerr << "locked_processTransac(), Received transaction item: " << transN << std::endl;
std::cerr << "locked_processTransac(), With peer: " << item->PeerId() << std::endl;
std::cerr << "locked_processTransac(), trans type: " << item->transactFlag << std::endl;
#endif
bool peerTrExists = mTransactions.find(peer) != mTransactions.end();
bool transExists = false;
@ -204,7 +231,7 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item)
tr = new NxsTransaction();
transMap[transN] = tr;
tr->mTransaction = item;
tr->mTimestamp = time(NULL);
tr->mTimeOut = item->timestamp;
// note state as receiving, commencement item
// is sent on next run() loop
@ -264,6 +291,10 @@ void RsGxsNetService::run(){
}
}
bool RsGxsNetService::checkTransacTimedOut(NxsTransaction* tr)
{
return tr->mTimeOut < ((uint32_t) time(NULL));
}
void RsGxsNetService::processTransactions(){
@ -292,9 +323,29 @@ void RsGxsNetService::processTransactions(){
std::list<RsNxsItem*>::iterator lit, lit_end;
uint32_t transN = tr->mTransaction->transactionNumber;
// first check transaction has not expired
if(checkTransacTimedOut(tr))
{
#ifdef NXS_NET_DEBUG
std::cerr << "processTransactions() " << std::endl;
std::cerr << "Transaction has failed, tranN: " << transN << std::endl;
std::cerr << "Transaction has failed, Peer: " << mit->first << std::endl;
#endif
tr->mFlag = NxsTransaction::FLAG_STATE_FAILED;
toRemove.push_back(transN);
continue;
}
// send items requested
if(flag & NxsTransaction::FLAG_STATE_SENDING){
#ifdef NXS_NET_DEBUG
std::cerr << "processTransactions() " << std::endl;
std::cerr << "Sending Transaction content, transN: " << transN << std::endl;
std::cerr << "with peer: " << tr->mTransaction->PeerId();
#endif
lit = tr->mItems.begin();
lit_end = tr->mItems.end();
@ -302,7 +353,7 @@ void RsGxsNetService::processTransactions(){
sendItem(*lit);
}
tr->mItems.clear();
tr->mItems.clear(); // clear so they don't get deleted in trans cleaning
tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
}else if(flag & NxsTransaction::FLAG_STATE_WAITING_CONFIRM){
@ -313,6 +364,15 @@ void RsGxsNetService::processTransactions(){
// move to completed transactions
toRemove.push_back(transN);
mComplTransactions.push_back(tr);
}else{
std::cerr << "processTransactions() " << std::endl;
std::cerr << "processTransactions(), Unknown flag for active transaction, transN: " << transN
<< std::endl;
std::cerr << "processTransactions(), Unknown flag, Peer: " << mit->first;
toRemove.push_back(transN);
tr->mFlag = NxsTransaction::FLAG_STATE_FAILED;
mComplTransactions.push_back(tr);
}
}
@ -344,6 +404,21 @@ void RsGxsNetService::processTransactions(){
uint16_t flag = tr->mFlag;
uint32_t transN = tr->mTransaction->transactionNumber;
// first check transaction has not expired
if(checkTransacTimedOut(tr))
{
#ifdef NXS_NET_DEBUG
std::cerr << "processTransactions() " << std::endl;
std::cerr << "Transaction has failed, tranN: " << transN << std::endl;
std::cerr << "Transaction has failed, Peer: " << mit->first << std::endl;
#endif
tr->mFlag = NxsTransaction::FLAG_STATE_FAILED;
toRemove.push_back(transN);
continue;
}
if(flag & NxsTransaction::FLAG_STATE_RECEIVING){
// if the number it item received equal that indicated
@ -375,19 +450,22 @@ void RsGxsNetService::processTransactions(){
// send item to tell peer your are ready to start
RsNxsTransac* trans = new RsNxsTransac(mServType);
trans->clear();
trans->transactFlag = RsNxsTransac::FLAG_BEGIN_P2;
trans->transactFlag = RsNxsTransac::FLAG_BEGIN_P2 |
(tr->mTransaction->transactFlag & RsNxsTransac::FLAG_TYPE_MASK);
trans->transactionNumber = transN;
trans->PeerId(tr->mTransaction->PeerId());
sendItem(trans);
}
else{
// unrecognised state
std::cerr << "RsGxsNetService::processTransactions() Unrecognised statem, deleting " << std::endl;
std::cerr << "RsGxsNetService::processTransactions() Id: "
<< transN << std::endl;
toRemove.push_back(transN);
std::cerr << "processTransactions() " << std::endl;
std::cerr << "processTransactions(), Unknown flag for active transaction, transN: " << transN
<< std::endl;
std::cerr << "processTransactions(), Unknown flag, Peer: " << mit->first;
toRemove.push_back(mmit->first);
mComplTransactions.push_back(tr);
tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; // flag as a failed transaction
}
}
@ -414,84 +492,170 @@ void RsGxsNetService::processCompletedTransactions()
NxsTransaction* tr = mComplTransactions.front();
uint16_t flag = tr->mTransaction->transactFlag;
bool outgoing = tr->mTransaction->PeerId() == mOwnId;
// for a completed list response transaction
// one needs generate requests from this
if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP)
{
// generate request based on a peers response
genReqMsgTransaction(tr);
}else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP)
{
genReqGrpTransaction(tr);
}
else if( (flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ) ||
(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ) )
{
// don't do anything, this should simply be removed
}else if(flag & RsNxsTransac::FLAG_TYPE_GRPS)
{
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
std::vector<RsNxsGrp*> grps;
while(tr->mItems.size() != 0)
{
RsNxsGrp* grp = dynamic_cast<RsNxsGrp*>(tr->mItems.front());
if(grp)
{
tr->mItems.pop_front();
grps.push_back(grp);
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to grp"
<< std::endl;
#endif
}
}
// notify listener of grps
mObserver->notifyNewGroups(grps);
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
std::vector<RsNxsMsg*> msgs;
while(tr->mItems.size() > 0)
{
RsNxsMsg* msg = dynamic_cast<RsNxsMsg*>(tr->mItems.front());
if(msg)
{
tr->mItems.pop_front();
msgs.push_back(msg);
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg"
<< std::endl;
#endif
}
}
// notify listener of msgs
mObserver->notifyNewMessages(msgs);
if(outgoing){
processCompletedOutgoingTrans(tr);
}else{
}
delete tr;
mComplTransactions.pop_front();
}
}
void RsGxsNetService::processCompletedIncomingTrans(NxsTransaction* tr)
{
uint16_t flag = tr->mTransaction->transactFlag;
if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED){
// for a completed list response transaction
// one needs generate requests from this
if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP)
{
// generate request based on a peers response
genReqMsgTransaction(tr);
}else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP)
{
genReqGrpTransaction(tr);
}
// you've finished receiving request information now gen
else if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ)
{
genSendMsgsTransaction(tr);
}
else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ)
{
genSendGrpsTransaction(tr);
}
else if(flag & RsNxsTransac::FLAG_TYPE_GRPS)
{
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
std::vector<RsNxsGrp*> grps;
while(tr->mItems.size() != 0)
{
RsNxsGrp* grp = dynamic_cast<RsNxsGrp*>(tr->mItems.front());
if(grp)
{
tr->mItems.pop_front();
grps.push_back(grp);
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to grp"
<< std::endl;
#endif
}
}
// notify listener of grps
mObserver->notifyNewGroups(grps);
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
std::vector<RsNxsMsg*> msgs;
while(tr->mItems.size() > 0)
{
RsNxsMsg* msg = dynamic_cast<RsNxsMsg*>(tr->mItems.front());
if(msg)
{
tr->mItems.pop_front();
msgs.push_back(msg);
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg"
<< std::endl;
#endif
}
}
// notify listener of msgs
mObserver->notifyNewMessages(msgs);
}
}else if(tr->mFlag == NxsTransaction::FLAG_STATE_FAILED){
// don't do anything transaction will simply be cleaned
}
return;
}
void RsGxsNetService::processCompletedOutgoingTrans(NxsTransaction* tr)
{
uint16_t flag = tr->mTransaction->transactFlag;
if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED){
// for a completed list response transaction
// one needs generate requests from this
if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP)
{
#ifdef NXS_NET_DEBUG
std::cerr << "processCompletedOutgoingTrans()" << std::endl;
std::cerr << "complete Sending Msg List Response, transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP)
{
#ifdef NXS_NET_DEBUG
std::cerr << "processCompletedOutgoingTrans()" << std::endl;
std::cerr << "complete Sending Grp Response, transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}
// you've finished sending a request so don't do anything
else if( (flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ) ||
(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ) )
{
#ifdef NXS_NET_DEBUG
std::cerr << "processCompletedOutgoingTrans()" << std::endl;
std::cerr << "complete Sending Msg/Grp Request, transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}else if(flag & RsNxsTransac::FLAG_TYPE_GRPS)
{
#ifdef NXS_NET_DEBUG
std::cerr << "processCompletedOutgoingTrans()" << std::endl;
std::cerr << "complete Sending Grp Data, transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{
#ifdef NXS_NET_DEBUG
std::cerr << "processCompletedOutgoingTrans()" << std::endl;
std::cerr << "complete Sending Msg Data, transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}
}else if(tr->mFlag == NxsTransaction::FLAG_STATE_FAILED){
#ifdef NXS_NET_DEBUG
std::cerr << "processCompletedOutgoingTrans()" << std::endl;
std::cerr << "Failed transaction! transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}else{
#ifdef NXS_NET_DEBUG
std::cerr << "processCompletedOutgoingTrans()" << std::endl;
std::cerr << "Serious error unrecognised trans Flag! transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}
}
void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr)
@ -524,35 +688,35 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr)
}
// get grp id for this transaction
RsNxsSyncMsgItem* item = msgItemL.front();
const std::string& grpId = item->grpId;
std::vector<std::string> grpIdV;
grpIdV.push_back(grpId);
GxsMsgMetaResult result;
mDataStore->retrieveGxsMsgMetaData(grpIdV, result);
std::vector<RsGxsMsgMetaData*> &msgMetaV = result[grpId];
// get grp id for this transaction
RsNxsSyncMsgItem* item = msgItemL.front();
const std::string& grpId = item->grpId;
std::vector<std::string> grpIdV;
grpIdV.push_back(grpId);
GxsMsgMetaResult result;
mDataStore->retrieveGxsMsgMetaData(grpIdV, result);
std::vector<RsGxsMsgMetaData*> &msgMetaV = result[grpId];
std::vector<RsGxsMsgMetaData*>::const_iterator vit = msgMetaV.begin();
std::set<std::string> msgIdSet;
std::vector<RsGxsMsgMetaData*>::const_iterator vit = msgMetaV.begin();
std::set<std::string> msgIdSet;
// put ids in set for each searching
for(; vit != msgMetaV.end(); vit++)
msgIdSet.insert((*vit)->mMsgId);
// put ids in set for each searching
for(; vit != msgMetaV.end(); vit++)
msgIdSet.insert((*vit)->mMsgId);
// get unique id for this transaction
// get unique id for this transaction
uint32_t transN = getTransactionId();
// now do compare and add loop
std::list<RsNxsSyncMsgItem*>::iterator llit = msgItemL.begin();
std::list<RsNxsItem*> reqList;
// add msgs that you don't have to request list
std::list<RsNxsSyncMsgItem*>::iterator llit = msgItemL.begin();
std::list<RsNxsItem*> reqList;
for(; llit != msgItemL.end(); llit++)
{
const std::string& msgId = (*llit)->msgId;
if(msgIdSet.find(msgId) == msgIdSet.end()){
if(msgIdSet.find(msgId) == msgIdSet.end()){
RsNxsSyncMsgItem* msgItem = new RsNxsSyncMsgItem(mServType);
msgItem->grpId = grpId;
msgItem->msgId = msgId;
@ -566,14 +730,22 @@ void RsGxsNetService::genReqMsgTransaction(NxsTransaction* tr)
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timeout = mTransactionTimeOut;
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(tr->mTransaction->PeerId());
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_STARTING;
newTrans->mTimestamp = 0;
newTrans->mTransaction = transac;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = 0;
// create transaction copy with your id to indicate
// its an outgoing transaction
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
{
RsStackMutex stack(mNxsMutex);
if(!locked_addTransaction(newTrans))
@ -609,10 +781,8 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr)
}
}
RsNxsSyncGrpItem* item = grpItemL.front();
const std::string& grpId = item->grpId;
std::map<std::string, RsGxsGrpMetaData*> grpMetaMap;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
std::map<std::string, RsGxsGrpMetaData*> grpMetaMap;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
// now do compare and add loop
std::list<RsNxsSyncGrpItem*>::iterator llit = grpItemL.begin();
@ -624,7 +794,7 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr)
{
const std::string& grpId = (*llit)->grpId;
if(grpMetaMap.find(grpId) == grpMetaMap.end()){
if(grpMetaMap.find(grpId) == grpMetaMap.end()){
RsNxsSyncGrpItem* grpItem = new RsNxsSyncGrpItem(mServType);
grpItem->grpId = grpId;
@ -636,16 +806,20 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr)
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ
transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timeout = mTransactionTimeOut;
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(tr->mTransaction->PeerId());
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_STARTING;
newTrans->mTimestamp = 0;
newTrans->mTransaction = transac;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = 0;
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
{
RsStackMutex stack(mNxsMutex);
@ -654,6 +828,53 @@ void RsGxsNetService::genReqGrpTransaction(NxsTransaction* tr)
}
}
void RsGxsNetService::genSendGrpsTransaction(NxsTransaction* tr)
{
// go groups requested in transaction tr
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
std::map<std::string, RsNxsGrp*> grps;
for(;lit != tr->mItems.end(); lit++)
{
RsNxsSyncGrpItem* item = dynamic_cast<RsNxsSyncGrpItem*>(*lit);
grps[item->grpId] = NULL;
}
mDataStore->retrieveNxsGrps(grps, false);
NxsTransaction* newTr = new NxsTransaction();
newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
// store grp items to send in transaction
std::map<std::string, RsNxsGrp*>::iterator mit = grps.begin();
for(;mit != grps.end(); mit++)
{
newTr->mItems.push_back(mit->second);
}
RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = getTransactionId();
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
RsNxsTransac::FLAG_TYPE_GRPS;
ntr->nItems = grps.size();
newTr->mTransaction = new RsNxsTransac(*ntr);
newTr->mTransaction->PeerId(mOwnId);
ntr->PeerId(tr->mTransaction->PeerId());
sendItem(ntr);
return;
}
void RsGxsNetService::genSendMsgsTransaction(NxsTransaction* tr)
{
return;
}
uint32_t RsGxsNetService::getTransactionId()
{
RsStackMutex stack(mNxsMutex);
@ -670,6 +891,11 @@ bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr)
if(transNumExist){
#ifdef NXS_NET_DEBUG
std::cerr << "locked_addTransaction() " << std::endl;
std::cerr << "Transaction number exist already, transN: " << transN
<< std::endl;
#endif
return false;
}else{
transMap[transN] = tr;
@ -693,7 +919,6 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
{
std::string peer = item->PeerId();
delete item;
std::map<std::string, RsGxsGrpMetaData*> grp;
mDataStore->retrieveGxsGrpMetaData(grp);
@ -724,14 +949,14 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1
| RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP;
trItem->nItems = itemL.size();
time_t now;
gmtime(&now);
trItem->timeout = now + TRANSAC_TIMEOUT;
trItem->timestamp = time(NULL);
trItem->PeerId(peer);
trItem->transactionNumber = getTransactionId();
// also make a copy for the resident transaction
tr->mTransaction = new RsNxsTransac(*trItem);
tr->mTransaction->PeerId(mOwnId);
// signal peer to prepare for transaction
sendItem(trItem);
@ -771,7 +996,7 @@ const uint8_t NxsTransaction::FLAG_STATE_WAITING_CONFIRM = 0x0020;
NxsTransaction::NxsTransaction()
: mFlag(0), mTimestamp(0), mTransaction(NULL) {
: mFlag(0), mTimeOut(0), mTransaction(NULL) {
}

View File

@ -31,12 +31,11 @@ public:
static const uint8_t FLAG_STATE_FAILED;
static const uint8_t FLAG_STATE_WAITING_CONFIRM;
NxsTransaction();
~NxsTransaction();
uint32_t mFlag; // current state of transaction
uint32_t mTimestamp;
uint32_t mTimeOut;
/*!
* this contains who we
@ -50,11 +49,15 @@ public:
std::list<RsNxsItem*> mItems; // items received or sent
};
/*!
* An abstraction of the net manager
* for retrieving Rs peers whom you will be synchronising
* and also you own Id
* Useful for testing also (abstracts away Rs's p3NetMgr)
*/
class RsNxsNetMgr
{
public:
virtual std::string getOwnId() = 0;
@ -205,6 +208,18 @@ private:
*/
void processCompletedTransactions();
/*!
* Process transaction owned/started by user
* @param tr transaction to process, ownership stays with callee
*/
void processCompletedOutgoingTrans(NxsTransaction* tr);
/*!
* Process transactions started/owned by other peers
* @param tr transaction to process, ownership stays with callee
*/
void processCompletedIncomingTrans(NxsTransaction* tr);
/*!
* Process a transaction item, assumes a general lock
@ -246,22 +261,47 @@ private:
//void notifyListenerMsgs(std::list<RsNxsMsg*>& msgs);
/*!
* Generates new transaction to send msg requests based on list
* of msgs received from peer stored in passed transaction
* @param tr transaction responsible for generating msg request
*/
void genReqMsgTransaction(NxsTransaction* tr);
/*!
* Generates new transaction to send grp requests based on list
* of grps received from peer stored in passed transaction
* @param tr transaction responsible for generating grp request
*/
void genReqGrpTransaction(NxsTransaction* tr);
/*!
* Generates new transaction to send msg data based on list
* of grpids received from peer stored in passed transaction
* @param tr transaction responsible for generating grp request
*/
void genSendMsgsTransaction(NxsTransaction* tr);
/*!
* Generates new transaction to send grp data based on list
* of grps received from peer stored in passed transaction
* @param tr transaction responsible for generating grp request
*/
void genSendGrpsTransaction(NxsTransaction* tr);
/*!
* convenience function to add a transaction to list
* @param tr transaction to add
*/
bool locked_addTransaction(NxsTransaction* tr);
void cleanTransactionItems(NxsTransaction* tr) const;
/*!
* @param tr the transaction to check for timeout
* @return false if transaction has timed out, true otherwise
*/
bool checkTransacTimedOut(NxsTransaction* tr);
/** E: Transaction processing **/
/** S: item handlers **/
@ -316,8 +356,12 @@ private:
RsNxsObserver* mObserver;
RsGeneralDataService* mDataStore;
uint16_t mServType;
// how much time must elapse before a timeout failure
// for an active transaction
uint32_t mTransactionTimeOut;
std::string mOwnId;
RsNxsNetMgr* mNetMgr;

View File

@ -397,7 +397,7 @@ bool RsNxsSerialiser::serialiseNxsTrans(RsNxsTransac *item, void *data, uint32_t
ok &= setRawUInt32(data, *size, &offset, item->transactionNumber);
ok &= setRawUInt16(data, *size, &offset, item->transactFlag);
ok &= setRawUInt32(data, *size, &offset, item->nItems);
ok &= setRawUInt32(data, *size, &offset, item->timeout);
ok &= setRawUInt32(data, *size, &offset, item->timestamp);
@ -836,7 +836,7 @@ RsNxsTransac* RsNxsSerialiser::deserialNxsTrans(void *data, uint32_t *size){
ok &= getRawUInt32(data, *size, &offset, &(item->transactionNumber));
ok &= getRawUInt16(data, *size, &offset, &(item->transactFlag));
ok &= getRawUInt32(data, *size, &offset, &(item->nItems));
ok &= getRawUInt32(data, *size, &offset, &(item->timeout));
ok &= getRawUInt32(data, *size, &offset, &(item->timestamp));
if (offset != rssize)
{
@ -1155,7 +1155,7 @@ void RsNxsSyncMsgItem::clear()
void RsNxsTransac::clear(){
transactFlag = 0;
nItems = 0;
timeout = 0;
timestamp = 0;
transactionNumber = 0;
}
@ -1289,7 +1289,7 @@ std::ostream& RsNxsTransac::print(std::ostream &out, uint16_t indent){
printIndent(out , int_Indent);
out << "nItems: " << nItems << std::endl;
printIndent(out , int_Indent);
out << "timeout: " << timeout << std::endl;
out << "timeout: " << timestamp << std::endl;
printIndent(out , int_Indent);
out << "transactionNumber: " << transactionNumber << std::endl;
printIndent(out , int_Indent);

View File

@ -111,8 +111,8 @@ class RsNxsTransac : public RsNxsItem {
public:
static const uint16_t FLAG_TRANS_MASK = 0xf;
static const uint16_t FLAG_TYPE_MASK = 0xff;
static const uint16_t FLAG_STATE_MASK = 0xff;
static const uint16_t FLAG_TYPE_MASK = 0xff00;
/** transaction state **/
static const uint16_t FLAG_BEGIN_P1;
@ -140,7 +140,7 @@ public:
uint16_t transactFlag;
uint32_t nItems;
uint32_t timeout;
uint32_t timestamp;
};
/*!

View File

@ -66,9 +66,12 @@ void NxsTestHub::run()
netServicePairs.second->join();
}
void NxsTestHub::cleanUp()
{
mTestScenario->cleanUp();
}
bool NxsTestHub::testsPassed()
{
return false;
}

View File

@ -94,6 +94,7 @@ public:
void run();
void cleanUp();
private:
std::pair<p3Service*, p3Service*> mServicePairs;

View File

@ -88,8 +88,8 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore)
init_item(*msg);
init_item(msgMeta);
std::pair<RsNxsMsg*, RsGxsMsgMetaData*> p(msg, msgMeta);
int chosen = 0;
// pick a grp at random to associate the msg to
const std::string& grpId = grpIdList[rand()%nGrp];
msgMeta->mMsgId = msg->msgId;
msgMeta->mGroupId = msg->grpId = grpId;
@ -145,3 +145,11 @@ RsGeneralDataService* NxsMessageTest::dummyDataService2()
{
return mStorePair.second;
}
void NxsMessageTest::cleanUp()
{
mStorePair.first->resetDataStore();
mStorePair.second->resetDataStore();
return;
}

View File

@ -24,6 +24,12 @@ public:
virtual RsGeneralDataService* dummyDataService1() = 0;
virtual RsGeneralDataService* dummyDataService2() = 0;
/*!
* Call to remove files created
* in the test directory
*/
virtual void cleanUp() = 0;
};
@ -38,6 +44,12 @@ public:
RsGeneralDataService* dummyDataService1();
RsGeneralDataService* dummyDataService2();
/*!
* Call to remove files created
* in the test directory
*/
void cleanUp();
public:
/*!

View File

@ -23,11 +23,13 @@ int main()
createThread(hub);
// put this thread to sleep for 10 secs
usleep(10000);
sleep(10);
hub.join();
CHECK(hub.testsPassed());
hub.cleanUp();
FINALREPORT("RsGxsNetService Tests");
return TESTRESULT();

View File

@ -86,7 +86,7 @@ RsSerialType* init_item(RsNxsTransac& rstx){
rstx.clear();
rstx.timeout = rand()%14141;
rstx.timestamp = rand()%14141;
rstx.transactFlag = rand()%2424;
rstx.nItems = rand()%33132;
rstx.transactionNumber = rand()%242112;
@ -162,7 +162,7 @@ bool operator==(const RsNxsTransac& l, const RsNxsTransac& r){
if(l.transactFlag != r.transactFlag) return false;
if(l.transactionNumber != r.transactionNumber) return false;
if(l.timeout != r.timeout) return false;
if(l.timestamp != r.timestamp) return false;
if(l.nItems != r.nItems) return false;