mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-05-22 07:41:20 -04:00
remaining code for nxs sync optimisation
- still needs testing git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs_finale@6903 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
2e886bf443
commit
f74aacd759
7 changed files with 678 additions and 59 deletions
|
@ -90,10 +90,21 @@ void RsGxsNetService::syncWithPeers()
|
|||
// for now just grps
|
||||
for(; sit != peers.end(); sit++)
|
||||
{
|
||||
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
|
||||
grp->clear();
|
||||
grp->PeerId(*sit);
|
||||
sendItem(grp);
|
||||
|
||||
const std::string peerId = *sit;
|
||||
|
||||
ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId);
|
||||
uint32_t updateTS = 0;
|
||||
if(cit != mClientGrpUpdateMap.end())
|
||||
{
|
||||
const RsGxsGrpUpdateItem *gui = cit->second;
|
||||
updateTS = gui->grpUpdateTS;
|
||||
}
|
||||
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
|
||||
grp->clear();
|
||||
grp->PeerId(*sit);
|
||||
grp->updateTS = updateTS;
|
||||
sendItem(grp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -111,7 +122,9 @@ void RsGxsNetService::syncWithPeers()
|
|||
RsGxsGrpMetaData* meta = mit->second;
|
||||
|
||||
if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
|
||||
{
|
||||
grpIds.push_back(mit->first);
|
||||
}
|
||||
|
||||
delete meta;
|
||||
}
|
||||
|
@ -125,13 +138,36 @@ void RsGxsNetService::syncWithPeers()
|
|||
|
||||
std::vector<RsGxsGroupId>::iterator vit = grpIds.begin();
|
||||
|
||||
// now see if you have an updateTS so optimise whether you need
|
||||
// to get a new list of peer data
|
||||
RsGxsMsgUpdateItem* mui = NULL;
|
||||
|
||||
ClientMsgMap::const_iterator cit = mClientMsgUpdateMap.find(*sit);
|
||||
|
||||
if(cit != mClientMsgUpdateMap.end())
|
||||
{
|
||||
mui = cit->second;
|
||||
}
|
||||
|
||||
for(; vit != grpIds.end(); vit++)
|
||||
{
|
||||
RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType);
|
||||
msg->clear();
|
||||
msg->PeerId(*sit);
|
||||
msg->grpId = *vit;
|
||||
sendItem(msg);
|
||||
uint32_t updateTS = 0;
|
||||
if(mui)
|
||||
{
|
||||
std::map<std::string, uint32_t>::const_iterator cit2 = mui->msgUpdateTS.find(*vit);
|
||||
|
||||
if(cit2 != mui->msgUpdateTS.end())
|
||||
{
|
||||
updateTS = cit2->second;
|
||||
}
|
||||
}
|
||||
|
||||
RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType);
|
||||
msg->clear();
|
||||
msg->PeerId(*sit);
|
||||
msg->grpId = *vit;
|
||||
msg->updateTS = updateTS;
|
||||
sendItem(msg);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -526,20 +562,101 @@ void RsGxsNetService::collateMsgFragments(MsgFragments fragments, std::map<RsGxs
|
|||
fragments.clear();
|
||||
}
|
||||
|
||||
|
||||
bool RsGxsNetService::loadList(std::list<RsItem*>& load)
|
||||
class StoreHere
|
||||
{
|
||||
return false;
|
||||
public:
|
||||
|
||||
StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap cmm,
|
||||
RsGxsNetService::ServerMsgMap& smm,
|
||||
RsGxsServerGrpUpdateItem*& sgm) : mClientGrpMap(cgm), mClientMsgMap(cmm),
|
||||
mServerMsgMap(smm), mServerGrpUpdateItem(sgm)
|
||||
{}
|
||||
|
||||
void operator() (RsItem* item)
|
||||
{
|
||||
RsGxsMsgUpdateItem* mui;
|
||||
RsGxsGrpUpdateItem* gui;
|
||||
RsGxsServerGrpUpdateItem* gsui;
|
||||
RsGxsServerMsgUpdateItem* msui;
|
||||
|
||||
if((mui = dynamic_cast<RsGxsMsgUpdateItem*>(item)) != NULL)
|
||||
mClientMsgMap.insert(std::make_pair(mui->peerId, mui));
|
||||
else if((gui = dynamic_cast<RsGxsGrpUpdateItem*>(item)) != NULL)
|
||||
mClientGrpMap.insert(std::make_pair(gui->peerId, gui));
|
||||
else if((msui = dynamic_cast<RsGxsServerMsgUpdateItem*>(item)) != NULL)
|
||||
mServerMsgMap.insert(std::make_pair(msui->grpId, msui));
|
||||
else if((gsui = dynamic_cast<RsGxsServerGrpUpdateItem*>(item)) != NULL)
|
||||
{
|
||||
if(mServerGrpUpdateItem)
|
||||
{
|
||||
mServerGrpUpdateItem = gsui;
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef NXS_NET_DEBUG
|
||||
std::cerr << "Error! More than one server group update item exists!" << std::endl;
|
||||
#endif
|
||||
delete gsui;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "Type not expected!" << std::endl;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
RsGxsNetService::ClientGrpMap& mClientGrpMap;
|
||||
RsGxsNetService::ClientMsgMap& mClientMsgMap;
|
||||
RsGxsNetService::ServerMsgMap& mServerMsgMap;
|
||||
RsGxsServerGrpUpdateItem*& mServerGrpUpdateItem;
|
||||
|
||||
};
|
||||
|
||||
bool RsGxsNetService::loadList(std::list<RsItem *> &load)
|
||||
{
|
||||
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap,
|
||||
mServerMsgUpdateMap, mGrpServerUpdateItem));
|
||||
return true;
|
||||
}
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
template <typename UpdateMap>
|
||||
struct get_second : public std::unary_function<typename UpdateMap::value_type, RsItem*>
|
||||
{
|
||||
RsItem* operator()(const typename UpdateMap::value_type& value) const
|
||||
{
|
||||
return value.second;
|
||||
}
|
||||
};
|
||||
|
||||
bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save)
|
||||
{
|
||||
return false;
|
||||
RsStackMutex stack(mNxsMutex);
|
||||
|
||||
// hardcore templates
|
||||
std::transform(mClientGrpUpdateMap.begin(), mClientGrpUpdateMap.end(),
|
||||
std::back_inserter(save), get_second<ClientGrpMap>());
|
||||
|
||||
std::transform(mClientMsgUpdateMap.begin(), mClientMsgUpdateMap.end(),
|
||||
std::back_inserter(save), get_second<ClientMsgMap>());
|
||||
|
||||
std::transform(mServerMsgUpdateMap.begin(), mServerMsgUpdateMap.end(),
|
||||
std::back_inserter(save), get_second<ServerMsgMap>());
|
||||
|
||||
save.push_back(mGrpServerUpdateItem);
|
||||
}
|
||||
|
||||
RsSerialiser *RsGxsNetService::setupSerialiser()
|
||||
{
|
||||
return NULL;
|
||||
|
||||
RsSerialiser *rss = new RsSerialiser;
|
||||
rss->addSerialType(new RsGxsUpdateSerialiser(mServType));
|
||||
|
||||
return rss;
|
||||
}
|
||||
|
||||
void RsGxsNetService::recvNxsItemQueue(){
|
||||
|
@ -1039,25 +1156,53 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||
// notify listener of grps
|
||||
mObserver->notifyNewGroups(grps);
|
||||
|
||||
// now note this as the latest you've received from this peer
|
||||
std::string peerFrom = tr->mTransaction->PeerId();
|
||||
uint32_t updateTS = tr->mTransaction->updateTS;
|
||||
|
||||
ClientGrpMap::iterator it = mClientGrpUpdateMap.find(peerFrom);
|
||||
|
||||
RsGxsGrpUpdateItem* item = NULL;
|
||||
|
||||
if(it != mClientGrpUpdateMap.end())
|
||||
{
|
||||
item = it->second;
|
||||
}else
|
||||
{
|
||||
item = new RsGxsGrpUpdateItem(mServType);
|
||||
}
|
||||
|
||||
item->grpUpdateTS = updateTS;
|
||||
item->peerId = peerFrom;
|
||||
|
||||
mClientGrpUpdateMap.insert(
|
||||
std::make_pair(peerFrom, item));
|
||||
|
||||
// as a grp list server also note this is the latest item you have
|
||||
mGrpServerUpdateItem->grpUpdateTS = updateTS;
|
||||
|
||||
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
|
||||
{
|
||||
|
||||
std::vector<RsNxsMsg*> msgs;
|
||||
|
||||
std::string grpId;
|
||||
while(tr->mItems.size() > 0)
|
||||
{
|
||||
RsNxsMsg* msg = dynamic_cast<RsNxsMsg*>(tr->mItems.front());
|
||||
if(msg)
|
||||
{
|
||||
tr->mItems.pop_front();
|
||||
msgs.push_back(msg);
|
||||
if(grpId.empty())
|
||||
grpId = msg->grpId;
|
||||
|
||||
tr->mItems.pop_front();
|
||||
msgs.push_back(msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef NXS_NET_DEBUG
|
||||
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg"
|
||||
<< std::endl;
|
||||
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg"
|
||||
<< std::endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
@ -1081,6 +1226,10 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||
// notify listener of msgs
|
||||
mObserver->notifyNewMessages(msgs);
|
||||
|
||||
// now note that this is the latest you've received from this peer
|
||||
// for the grp id
|
||||
locked_doMsgUpdateWork(tr->mTransaction, grpId);
|
||||
|
||||
}
|
||||
}else if(tr->mFlag == NxsTransaction::FLAG_STATE_FAILED){
|
||||
// don't do anything transaction will simply be cleaned
|
||||
|
@ -1088,6 +1237,46 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
|
|||
return;
|
||||
}
|
||||
|
||||
void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransac *nxsTrans, const std::string &grpId)
|
||||
{
|
||||
|
||||
// firts check if peer exists
|
||||
const std::string& peerFrom = nxsTrans->PeerId();
|
||||
|
||||
ClientMsgMap::iterator it = mClientMsgUpdateMap.find(peerFrom);
|
||||
|
||||
RsGxsMsgUpdateItem* mui = NULL;
|
||||
|
||||
// now update the peer's entry for this grp id
|
||||
if(it != mClientMsgUpdateMap.end())
|
||||
{
|
||||
mui = it->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
mui = new RsGxsMsgUpdateItem(mServType);
|
||||
mClientMsgUpdateMap.insert(std::make_pair(peerFrom, mui));
|
||||
}
|
||||
|
||||
mui->msgUpdateTS[grpId] = nxsTrans->updateTS;
|
||||
mui->peerId = peerFrom;
|
||||
|
||||
ServerMsgMap::iterator mit = mServerMsgUpdateMap.find(grpId);
|
||||
RsGxsServerMsgUpdateItem* msui = NULL;
|
||||
if(mit != mServerMsgUpdateMap.end())
|
||||
{
|
||||
msui = mit->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
msui = new RsGxsServerMsgUpdateItem(mServType);
|
||||
mServerMsgUpdateMap.insert(std::make_pair(grpId, msui));
|
||||
}
|
||||
|
||||
msui->grpId = grpId;
|
||||
msui->msgUpdateTS = nxsTrans->updateTS;
|
||||
}
|
||||
|
||||
void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr)
|
||||
{
|
||||
uint16_t flag = tr->mTransaction->transactFlag;
|
||||
|
@ -1128,6 +1317,7 @@ void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr)
|
|||
std::cerr << "complete Sending Grp Data, transN: " <<
|
||||
tr->mTransaction->transactionNumber << std::endl;
|
||||
#endif
|
||||
|
||||
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
|
||||
{
|
||||
#ifdef NXS_NET_DEBUG
|
||||
|
@ -1169,7 +1359,7 @@ void RsGxsNetService::locked_pushMsgTransactionFromList(
|
|||
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
|
||||
// create transaction copy with your id to indicate
|
||||
// its an outgoing transaction
|
||||
newTrans->mTransaction = new RsNxsTransac(*transac);
|
||||
newTrans->mTransaction = new RsNxsTransac(*transac);
|
||||
newTrans->mTransaction->PeerId(mOwnId);
|
||||
sendItem(transac);
|
||||
{
|
||||
|
@ -1250,7 +1440,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
|
|||
msgIdSet.insert((*vit)->mMsgId);
|
||||
delete(*vit);
|
||||
}
|
||||
msgMetaV.clear();
|
||||
msgMetaV.clear();
|
||||
|
||||
// get unique id for this transaction
|
||||
uint32_t transN = locked_getTransactionId();
|
||||
|
@ -1516,6 +1706,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
|
|||
ntr->transactionNumber = transN;
|
||||
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
|
||||
RsNxsTransac::FLAG_TYPE_GRPS;
|
||||
ntr->updateTS = time(NULL);
|
||||
ntr->nItems = grps.size();
|
||||
ntr->PeerId(tr->mTransaction->PeerId());
|
||||
|
||||
|
@ -1697,6 +1888,7 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
|
|||
ntr->transactionNumber = transN;
|
||||
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
|
||||
RsNxsTransac::FLAG_TYPE_MSGS;
|
||||
ntr->updateTS = time(NULL);
|
||||
ntr->nItems = msgSize;
|
||||
ntr->PeerId(peerId);
|
||||
|
||||
|
@ -1779,6 +1971,10 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
|
|||
|
||||
std::string peer = item->PeerId();
|
||||
|
||||
// don't sync if you have no new updates for this peer
|
||||
if(item->updateTS >= mGrpServerUpdateItem->grpUpdateTS && item->updateTS != 0)
|
||||
return;
|
||||
|
||||
std::map<std::string, RsGxsGrpMetaData*> grp;
|
||||
mDataStore->retrieveGxsGrpMetaData(grp);
|
||||
|
||||
|
@ -1893,6 +2089,16 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
|
|||
|
||||
const std::string& peer = item->PeerId();
|
||||
|
||||
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(item->grpId);
|
||||
|
||||
if(cit != mServerMsgUpdateMap.end())
|
||||
{
|
||||
const RsGxsServerMsgUpdateItem *msui = cit->second;
|
||||
|
||||
if(item->updateTS > msui->msgUpdateTS && item->updateTS != 0)
|
||||
return;
|
||||
}
|
||||
|
||||
GxsMsgMetaResult metaResult;
|
||||
GxsMsgReq req;
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "rsnxsobserver.h"
|
||||
#include "pqi/p3linkmgr.h"
|
||||
#include "serialiser/rsnxsitems.h"
|
||||
#include "serialiser/rsgxsupdateitems.h"
|
||||
#include "rsgxsnetutils.h"
|
||||
#include "pqi/p3cfgmgr.h"
|
||||
#include "rsgixs.h"
|
||||
|
@ -323,6 +324,8 @@ private:
|
|||
bool locked_canReceive(const RsGxsGrpMetaData * const grpMeta, const std::string& peerId);
|
||||
|
||||
void processExplicitGroupRequests();
|
||||
|
||||
void locked_doMsgUpdateWork(const RsNxsTransac* nxsTrans, const std::string& grpId);
|
||||
|
||||
private:
|
||||
|
||||
|
@ -427,6 +430,23 @@ private:
|
|||
std::vector<GrpCircleVetting*> mPendingCircleVets;
|
||||
|
||||
std::map<std::string, std::list<RsGxsGroupId> > mExplicitRequest;
|
||||
|
||||
// nxs sync optimisation
|
||||
// can pull dynamically the latest timestamp for each message
|
||||
|
||||
public:
|
||||
|
||||
typedef std::map<std::string, RsGxsMsgUpdateItem*> ClientMsgMap;
|
||||
typedef std::map<std::string, RsGxsServerMsgUpdateItem*> ServerMsgMap;
|
||||
typedef std::map<std::string, RsGxsGrpUpdateItem*> ClientGrpMap;
|
||||
|
||||
private:
|
||||
|
||||
ClientMsgMap mClientMsgUpdateMap;
|
||||
ServerMsgMap mServerMsgUpdateMap;
|
||||
ClientGrpMap mClientGrpUpdateMap;
|
||||
|
||||
RsGxsServerGrpUpdateItem* mGrpServerUpdateItem;
|
||||
};
|
||||
|
||||
#endif // RSGXSNETSERVICE_H
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue