nxs msg transaction now working and test commited

some clean up still needed (msgs to sync should be determined by grp flag in db)

 

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5497 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2012-09-01 14:47:22 +00:00
parent bd1435c72b
commit 46c945de96
5 changed files with 215 additions and 37 deletions

View File

@ -28,7 +28,7 @@
#define NXS_NET_DEBUG #define NXS_NET_DEBUG
#define SYNC_PERIOD 10 // in microseconds every 10 seconds (1 second for testing) #define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds #define TRANSAC_TIMEOUT 5 // 5 seconds
@ -85,14 +85,23 @@ void RsGxsNetService::syncWithPeers()
sendItem(grp); sendItem(grp);
} }
// // TODO msgs sit = peers.begin();
// for(; sit != peers.end(); sit++) // TODO msgs
// { for(; sit != peers.end(); sit++)
// RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType); {
// msg->clear(); RsStackMutex stack(mNxsMutex);
// msg->PeerId(*sit);
// sendItem(msg); std::set<std::string>::iterator sit_grp = mGroupSubscribedTo.begin();
// }
for(; sit_grp != mGroupSubscribedTo.end(); sit_grp++)
{
RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType);
msg->clear();
msg->PeerId(*sit);
msg->grpId = *sit_grp;
sendItem(msg);
}
}
} }
@ -334,8 +343,8 @@ void RsGxsNetService::run(){
bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr) bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr)
{ {
//return tr->mTimeOut < ((uint32_t) time(NULL)); return tr->mTimeOut < ((uint32_t) time(NULL));
return false; // return false;
} }
void RsGxsNetService::processTransactions(){ void RsGxsNetService::processTransactions(){
@ -591,6 +600,10 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
{ {
tr->mItems.pop_front(); tr->mItems.pop_front();
grps.push_back(grp); grps.push_back(grp);
//TODO: remove subscription should be handled
// outside netservice
mGroupSubscribedTo.insert(grp->grpId);
} }
else else
{ {
@ -605,6 +618,8 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
// notify listener of grps // notify listener of grps
mObserver->notifyNewGroups(grps); mObserver->notifyNewGroups(grps);
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS) }else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{ {
@ -627,6 +642,8 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
} }
} }
int nitems = msgs.size();
// notify listener of msgs // notify listener of msgs
mObserver->notifyNewMessages(msgs); mObserver->notifyNewMessages(msgs);
@ -735,10 +752,10 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
// get grp id for this transaction // get grp id for this transaction
RsNxsSyncMsgItem* item = msgItemL.front(); RsNxsSyncMsgItem* item = msgItemL.front();
const std::string& grpId = item->grpId; const std::string& grpId = item->grpId;
GxsMsgReq reqIds; GxsMsgReq reqIds;
reqIds[grpId] = std::vector<RsGxsMessageId>(); reqIds[grpId] = std::vector<RsGxsMessageId>();
GxsMsgMetaResult result; GxsMsgMetaResult result;
mDataStore->retrieveGxsMsgMetaData(reqIds, result); mDataStore->retrieveGxsMsgMetaData(reqIds, result);
std::vector<RsGxsMsgMetaData*> &msgMetaV = result[grpId]; std::vector<RsGxsMsgMetaData*> &msgMetaV = result[grpId];
std::vector<RsGxsMsgMetaData*>::const_iterator vit = msgMetaV.begin(); std::vector<RsGxsMsgMetaData*>::const_iterator vit = msgMetaV.begin();
@ -756,6 +773,8 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
std::list<RsNxsSyncMsgItem*>::iterator llit = msgItemL.begin(); std::list<RsNxsSyncMsgItem*>::iterator llit = msgItemL.begin();
std::list<RsNxsItem*> reqList; std::list<RsNxsItem*> reqList;
const std::string peerFrom = tr->mTransaction->PeerId();
for(; llit != msgItemL.end(); llit++) for(; llit != msgItemL.end(); llit++)
{ {
const std::string& msgId = (*llit)->msgId; const std::string& msgId = (*llit)->msgId;
@ -766,11 +785,12 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
msgItem->msgId = msgId; msgItem->msgId = msgId;
msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST; msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
msgItem->transactionNumber = transN; msgItem->transactionNumber = transN;
msgItem->PeerId(peerFrom);
reqList.push_back(msgItem); reqList.push_back(msgItem);
} }
} }
if(reqList.empty()) if(!reqList.empty())
{ {
RsNxsTransac* transac = new RsNxsTransac(mServType); RsNxsTransac* transac = new RsNxsTransac(mServType);
@ -852,26 +872,36 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
} }
RsNxsTransac* transac = new RsNxsTransac(mServType); if(!reqList.empty())
transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ {
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(tr->mTransaction->PeerId());
transac->transactionNumber = transN;
NxsTransaction* newTrans = new NxsTransaction(); RsNxsTransac* transac = new RsNxsTransac(mServType);
newTrans->mItems = reqList; transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; | RsNxsTransac::FLAG_BEGIN_P1;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut; transac->timestamp = 0;
newTrans->mTransaction = new RsNxsTransac(*transac); transac->nItems = reqList.size();
newTrans->mTransaction->PeerId(mOwnId); transac->PeerId(tr->mTransaction->PeerId());
transac->transactionNumber = transN;
sendItem(transac); NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
if(!locked_addTransaction(newTrans)) sendItem(transac);
delete newTrans;
if(!locked_addTransaction(newTrans))
delete newTrans;
}
// clean up meta data
std::map<std::string, RsGxsGrpMetaData*>::iterator mit = grpMetaMap.begin();
for(; mit != grpMetaMap.end(); mit++)
delete mit->second;
} }
void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
@ -895,8 +925,13 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
grps[item->grpId] = NULL; grps[item->grpId] = NULL;
} }
mDataStore->retrieveNxsGrps(grps, false, false); if(!grps.empty())
{
mDataStore->retrieveNxsGrps(grps, false, false);
}
else{
return;
}
NxsTransaction* newTr = new NxsTransaction(); NxsTransaction* newTr = new NxsTransaction();
newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
@ -908,7 +943,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
std::string peerId = tr->mTransaction->PeerId(); std::string peerId = tr->mTransaction->PeerId();
for(;mit != grps.end(); mit++) for(;mit != grps.end(); mit++)
{ {
mit->second->PeerId(peerId); // set so it gets sent to right peer mit->second->PeerId(peerId); // set so it gets sent to right peer
mit->second->transactionNumber = transN; mit->second->transactionNumber = transN;
newTr->mItems.push_back(mit->second); newTr->mItems.push_back(mit->second);
} }
@ -941,6 +976,77 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr) void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
{ {
#ifdef NXS_NET_DEBUG
std::cerr << "locked_genSendMsgsTransaction()" << std::endl;
std::cerr << "Generating Msg data send fron TransN: " << tr->mTransaction->transactionNumber
<< std::endl;
#endif
// go groups requested in transaction tr
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
GxsMsgReq msgIds;
GxsMsgResult msgs;
if(tr->mItems.empty()){
return;
}
for(;lit != tr->mItems.end(); lit++)
{
RsNxsSyncMsgItem* item = dynamic_cast<RsNxsSyncMsgItem*>(*lit);
msgIds[item->grpId].push_back(item->msgId);
}
mDataStore->retrieveNxsMsgs(msgIds, msgs, false, false);
NxsTransaction* newTr = new NxsTransaction();
newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
uint32_t transN = locked_getTransactionId();
// store grp items to send in transaction
GxsMsgResult::iterator mit = msgs.begin();
std::string peerId = tr->mTransaction->PeerId();
uint32_t msgSize = 0;
for(;mit != msgs.end(); mit++)
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
for(; vit != msgV.end(); vit++)
{
RsNxsMsg* msg = *vit;
msg->PeerId(peerId);
msg->transactionNumber = transN;
newTr->mItems.push_back(msg);
msgSize++;
}
}
if(newTr->mItems.empty()){
delete newTr;
return;
}
RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = transN;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
RsNxsTransac::FLAG_TYPE_MSGS;
ntr->nItems = msgSize;
ntr->PeerId(peerId);
newTr->mTransaction = new RsNxsTransac(*ntr);
newTr->mTransaction->PeerId(mOwnId);
newTr->mTimeOut = time(NULL) + mTransactionTimeOut;
ntr->PeerId(tr->mTransaction->PeerId());
sendItem(ntr);
locked_addTransaction(newTr);
return; return;
} }
uint32_t RsGxsNetService::locked_getTransactionId() uint32_t RsGxsNetService::locked_getTransactionId()
@ -1040,6 +1146,61 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item) void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
{ {
RsStackMutex stack(mNxsMutex); RsStackMutex stack(mNxsMutex);
const std::string& peer = item->PeerId();
GxsMsgMetaResult metaResult;
GxsMsgReq req;
req[item->grpId] = std::vector<std::string>();
mDataStore->retrieveGxsMsgMetaData(req, metaResult);
std::vector<RsGxsMsgMetaData*>& msgMeta = metaResult[item->grpId];
if(req.empty()){
return;
}
std::vector<RsGxsMsgMetaData*>::iterator vit = msgMeta.begin();
NxsTransaction* tr = new NxsTransaction();
std::list<RsNxsItem*>& itemL = tr->mItems;
uint32_t transN = locked_getTransactionId();
for(; vit != msgMeta.end(); vit++)
{
RsGxsMsgMetaData* m = *vit;
RsNxsSyncMsgItem* mItem = new
RsNxsSyncMsgItem(mServType);
mItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
mItem->grpId = m->mGroupId;
mItem->msgId = m->mMsgId;
mItem->PeerId(peer);
mItem->transactionNumber = transN;
itemL.push_back(mItem);
}
tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
RsNxsTransac* trItem = new RsNxsTransac(mServType);
trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1
| RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP;
trItem->nItems = itemL.size();
trItem->timestamp = 0;
trItem->PeerId(peer);
trItem->transactionNumber = transN;
// also make a copy for the resident transaction
tr->mTransaction = new RsNxsTransac(*trItem);
tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
// signal peer to prepare for transaction
sendItem(trItem);
locked_addTransaction(tr);
return; return;
} }

View File

@ -395,6 +395,9 @@ private:
RsMutex mNxsMutex; RsMutex mNxsMutex;
uint32_t mSyncTs; uint32_t mSyncTs;
// TODO: remove, temp, for testing.
// subscription handled outside netservice
std::set<std::string> mGroupSubscribedTo;
const uint32_t mSYNC_PERIOD; const uint32_t mSYNC_PERIOD;
}; };

View File

@ -8,6 +8,7 @@
#include "nxstestscenario.h" #include "nxstestscenario.h"
#include "gxs/rsdataservice.h" #include "gxs/rsdataservice.h"
#include "data_support.h" #include "data_support.h"
#include <stdio.h>
NxsMessageTest::NxsMessageTest(uint16_t servtype) NxsMessageTest::NxsMessageTest(uint16_t servtype)
: mServType(servtype), mMsgTestMtx("mMsgTestMtx") : mServType(servtype), mMsgTestMtx("mMsgTestMtx")
@ -42,7 +43,7 @@ RsGeneralDataService* NxsMessageTest::getDataService(const std::string& peer)
{ {
if(mPeerStoreMap.find(peer) != mPeerStoreMap.end()) return NULL; if(mPeerStoreMap.find(peer) != mPeerStoreMap.end()) return NULL;
RsDataService* dStore = new RsDataService("./", peer.c_str(), mServType); RsDataService* dStore = new RsDataService("./", peer, mServType);
mStoreNames.insert(peer); mStoreNames.insert(peer);
mPeerStoreMap.insert(std::make_pair(peer, dStore)); mPeerStoreMap.insert(std::make_pair(peer, dStore));
populateStore(dStore); populateStore(dStore);
@ -134,7 +135,6 @@ void NxsMessageTest::populateStore(RsGeneralDataService* dStore)
void NxsMessageTest::cleanUp() void NxsMessageTest::cleanUp()
{ {
std::map<std::string, RsGeneralDataService*>::iterator mit = mPeerStoreMap.begin(); std::map<std::string, RsGeneralDataService*>::iterator mit = mPeerStoreMap.begin();
for(; mit != mPeerStoreMap.end(); mit++) for(; mit != mPeerStoreMap.end(); mit++)
@ -177,5 +177,18 @@ void NxsMessageTestObserver::notifyNewGroups(std::vector<RsNxsGrp *> &groups)
void NxsMessageTestObserver::notifyNewMessages(std::vector<RsNxsMsg *> &messages) void NxsMessageTestObserver::notifyNewMessages(std::vector<RsNxsMsg *> &messages)
{ {
std::vector<RsNxsMsg*>::iterator vit = messages.begin();
std::map<RsNxsMsg*, RsGxsMsgMetaData*> msgs;
for(; vit != messages.end(); vit++)
{
RsNxsMsg* msg = *vit;
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
meta->mGroupId = msg->grpId;
meta->mMsgId = msg->msgId;
msgs.insert(std::make_pair(msg, meta));
}
mStore->storeMessage(msgs);
} }

View File

@ -25,7 +25,7 @@ int main()
// now get things started // now get things started
createThread(hub); createThread(hub);
double timeDelta = 30; double timeDelta = 50;
// put this thread to sleep for 10 secs // put this thread to sleep for 10 secs
// make thread sleep for a bit // make thread sleep for a bit

View File

@ -112,6 +112,7 @@ ContentValue::ContentValue(ContentValue &from){
default: default:
std::cerr << "ContentValue::ContentValue(ContentValue &from):" std::cerr << "ContentValue::ContentValue(ContentValue &from):"
<< "Error! Unrecognised data type!" << std::endl; << "Error! Unrecognised data type!" << std::endl;
break;
} }
} }
} }