added new GrpServerConfigMap to store additional data on groups for net service. Removed pointers to simplify the code

This commit is contained in:
csoler 2016-11-11 00:10:01 +01:00
parent 143829c881
commit 5612647672
4 changed files with 209 additions and 280 deletions

View File

@ -321,7 +321,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
mCircles(circles), mGixs(gixs),
mReputations(reputations), mPgpUtils(pgpUtils),
mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),
mGrpServerUpdateItem(NULL), mServiceInfo(serviceInfo)
mServiceInfo(serviceInfo)
{
addSerialType(new RsNxsSerialiser(mServType));
mOwnId = mNetMgr->getOwnId();
@ -341,16 +341,7 @@ RsGxsNetService::~RsGxsNetService()
}
mTransactions.clear() ;
delete mGrpServerUpdateItem ;
for(ClientGrpMap::iterator it = mClientGrpUpdateMap.begin();it!=mClientGrpUpdateMap.end();++it)
delete it->second ;
mClientGrpUpdateMap.clear() ;
for(std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*>::iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();)
delete it->second ;
mServerMsgUpdateMap.clear() ;
}
@ -534,9 +525,10 @@ void RsGxsNetService::syncWithPeers()
ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId);
uint32_t updateTS = 0;
if(cit != mClientGrpUpdateMap.end())
{
const RsGxsGrpUpdateItem *gui = cit->second;
const RsGxsGrpUpdate *gui = &cit->second;
updateTS = gui->grpUpdateTS;
}
RsNxsSyncGrpReqItem *grp = new RsNxsSyncGrpReqItem(mServType);
@ -589,12 +581,12 @@ void RsGxsNetService::syncWithPeers()
// now see if you have an updateTS so optimise whether you need
// to get a new list of peer data
RsGxsMsgUpdateItem* mui = NULL;
const RsGxsMsgUpdate *mui = NULL;
ClientMsgMap::const_iterator cit = mClientMsgUpdateMap.find(peerId);
if(cit != mClientMsgUpdateMap.end())
mui = cit->second;
mui = &cit->second;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(peerId) << " syncing messages with peer " << peerId << std::endl;
@ -679,29 +671,29 @@ void RsGxsNetService::syncGrpStatistics()
for(std::map<RsGxsGroupId,RsGxsGrpMetaData*>::const_iterator it(grpMeta.begin());it!=grpMeta.end();++it)
{
RsGroupNetworkStatsRecord& rec(GrpConfigMap[it->first]) ;
const RsGxsGrpConfig& rec = mServerGrpConfigMap[it->first] ;
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " group " << it->first ;
#endif
if(rec.update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.size() > 0)
if(rec.update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.ids.size() > 0)
{
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl;
#endif
// randomly select GROUP_STATS_UPDATE_NB_PEERS friends among the suppliers of this group
uint32_t n = RSRandom::random_u32() % rec.suppliers.size() ;
uint32_t n = RSRandom::random_u32() % rec.suppliers.ids.size() ;
std::set<RsPeerId>::const_iterator rit = rec.suppliers.begin();
std::set<RsPeerId>::const_iterator rit = rec.suppliers.ids.begin();
for(uint32_t i=0;i<n;++i)
++rit ;
for(uint32_t i=0;i<std::min(rec.suppliers.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i)
for(uint32_t i=0;i<std::min(rec.suppliers.ids.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i)
{
// we started at a random position in the set, wrap around if the end is reached
if(rit == rec.suppliers.end())
rit = rec.suppliers.begin() ;
if(rit == rec.suppliers.ids.end())
rit = rec.suppliers.ids.begin() ;
RsPeerId peer_id = *rit ;
++rit ;
@ -807,16 +799,16 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
bool should_notify = false ;
{
RS_STACK_MUTEX(mNxsMutex) ;
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ;
RsGxsGrpConfig& rec(mServerGrpConfigMap[grs->grpId]) ;
uint32_t old_count = rec.max_visible_count ;
uint32_t old_suppliers_count = rec.suppliers.size() ;
uint32_t old_suppliers_count = rec.suppliers.ids.size() ;
rec.suppliers.insert(grs->PeerId()) ;
rec.suppliers.ids.insert(grs->PeerId()) ;
rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ;
rec.update_TS = time(NULL) ;
if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.size())
if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.ids.size())
should_notify = true ;
}
if(should_notify)
@ -839,24 +831,17 @@ void RsGxsNetService::subscribeStatusChanged(const RsGxsGroupId& grpId,bool subs
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << "Changing subscribe status for grp " << grpId << " to " << subscribed << ": reseting all server msg time stamps for this group, and server global TS." << std::endl;
#endif
std::map<RsGxsGroupId,RsGxsServerMsgUpdateItem*>::iterator it = mServerMsgUpdateMap.find(grpId) ;
std::map<RsGxsGroupId,RsGxsServerMsgUpdate>::iterator it = mServerMsgUpdateMap.find(grpId) ;
if(mServerMsgUpdateMap.end() == it)
{
RsGxsServerMsgUpdateItem *item = new RsGxsServerMsgUpdateItem(mServType) ;
item->grpId = grpId ;
item->msgUpdateTS = time(NULL) ;
}
else
it->second->msgUpdateTS = time(NULL) ; // reset!
RsGxsServerMsgUpdate& item(mServerMsgUpdateMap[grpId]) ;
item.grpId = grpId ; // just in case
item.msgUpdateTS = time(NULL) ;
// We also update mGrpServerUpdateItem so as to trigger a new grp list exchange with friends (friends will send their known ClientTS which
// will be lower than our own grpUpdateTS, triggering our sending of the new subscribed grp list.
if(mGrpServerUpdateItem == NULL)
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
mGrpServerUpdateItem->grpUpdateTS = time(NULL) ;
mGrpServerUpdate.grpUpdateTS = time(NULL) ;
}
bool RsGxsNetService::fragmentMsg(RsNxsMsg& msg, MsgFragments& msgFragments) const
@ -1350,8 +1335,8 @@ class StoreHere
{
public:
StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap& cmm, RsGxsNetService::ServerMsgMap& smm,RsGxsNetService::GrpConfigMap& gcm, RsGxsServerGrpUpdateItem*& sgm)
: mClientGrpMap(cgm), mClientMsgMap(cmm), mServerMsgMap(smm), mServerGrpUpdateItem(sgm), m
StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap& cmm, RsGxsNetService::ServerMsgMap& smm,RsGxsNetService::GrpConfigMap& gcm, RsGxsServerGrpUpdate& sgm)
: mClientGrpMap(cgm), mClientMsgMap(cmm), mServerMsgMap(smm), mGrpConfigMap(gcm), mServerGrpUpdate(sgm)
{}
void operator() (RsItem* item)
@ -1363,28 +1348,19 @@ public:
RsGxsGrpConfigItem* mgci;
if((mui = dynamic_cast<RsGxsMsgUpdateItem*>(item)) != NULL)
mClientMsgMap.insert(std::make_pair(mui->peerId, mui));
mClientMsgMap.insert(std::make_pair(mui->peerID, *mui));
else if((mgci = dynamic_cast<RsGxsGrpConfigItem*>(item)) != NULL)
mGrpConfigMap.insert(std::make_pair(mgci->grpId, mgci));
mGrpConfigMap.insert(std::make_pair(mgci->grpId, *mgci));
else if((gui = dynamic_cast<RsGxsGrpUpdateItem*>(item)) != NULL)
mClientGrpMap.insert(std::make_pair(gui->peerId, gui));
mClientGrpMap.insert(std::make_pair(gui->peerID, *gui));
else if((msui = dynamic_cast<RsGxsServerMsgUpdateItem*>(item)) != NULL)
mServerMsgMap.insert(std::make_pair(msui->grpId, msui));
mServerMsgMap.insert(std::make_pair(msui->grpId, *msui));
else if((gsui = dynamic_cast<RsGxsServerGrpUpdateItem*>(item)) != NULL)
{
if(mServerGrpUpdateItem == NULL)
mServerGrpUpdateItem = gsui;
else
{
std::cerr << "Error! More than one server group update item exists!" << std::endl;
delete gsui;
}
}
mServerGrpUpdate = *gsui;
else
{
std::cerr << "Type not expected!" << std::endl;
delete item ;
}
delete item ;
}
private:
@ -1394,7 +1370,7 @@ private:
RsGxsNetService::ServerMsgMap& mServerMsgMap;
RsGxsNetService::GrpConfigMap& mGrpConfigMap;
RsGxsServerGrpUpdateItem*& mServerGrpUpdateItem;
RsGxsServerGrpUpdate& mServerGrpUpdate;
};
@ -1404,13 +1380,14 @@ bool RsGxsNetService::loadList(std::list<RsItem *> &load)
// The delete is done in StoreHere, if necessary
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem));
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mServerGrpConfigMap, mGrpServerUpdate));
// We reset group statistics here. This is the best place since we know at this point which are all unsubscribed groups.
time_t now = time(NULL);
for(std::map<RsGxsGroupId,RsGroupNetworkStatsRecord>::iterator it(mGroupNetworkStats.begin());it!=mGroupNetworkStats.end();++it)
#warning Do we keep that?
for(GrpConfigMap::iterator it(mServerGrpConfigMap.begin());it!=mServerGrpConfigMap.end();++it)
{
// At each reload, we reset the count of visible messages. It will be rapidely restored to its real value from friends.
@ -1423,7 +1400,7 @@ bool RsGxsNetService::loadList(std::list<RsItem *> &load)
// Similarly, we remove all suppliers.
// Actual suppliers will come back automatically.
it->second.suppliers.clear() ;
it->second.suppliers.ids.clear() ;
}
return true;
@ -1431,13 +1408,17 @@ bool RsGxsNetService::loadList(std::list<RsItem *> &load)
#include <algorithm>
template <typename UpdateMap>
template <typename UpdateMap,class ItemClass>
struct get_second : public std::unary_function<typename UpdateMap::value_type, RsItem*>
{
get_second(uint16_t serv_type): mServType(serv_type) {}
RsItem* operator()(const typename UpdateMap::value_type& value) const
{
return value.second;
return new ItemClass(value.second,mServType);
}
uint16_t mServType ;
};
bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save)
@ -1445,14 +1426,16 @@ bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save)
RS_STACK_MUTEX(mNxsMutex) ;
// hardcore templates
std::transform(mClientGrpUpdateMap.begin(), mClientGrpUpdateMap.end(), std::back_inserter(save), get_second<ClientGrpMap>());
std::transform(mClientMsgUpdateMap.begin(), mClientMsgUpdateMap.end(), std::back_inserter(save), get_second<ClientMsgMap>());
std::transform(mServerMsgUpdateMap.begin(), mServerMsgUpdateMap.end(), std::back_inserter(save), get_second<ServerMsgMap>());
std::transform(mServerGrpConfigMap.begin(), mServerGrpConfigMap.end(), std::back_inserter(save), get_second<GrpConfigMap>());
std::transform(mClientGrpUpdateMap.begin(), mClientGrpUpdateMap.end(), std::back_inserter(save), get_second<ClientGrpMap,RsGxsGrpUpdateItem>(mServType));
std::transform(mClientMsgUpdateMap.begin(), mClientMsgUpdateMap.end(), std::back_inserter(save), get_second<ClientMsgMap,RsGxsMsgUpdateItem>(mServType));
std::transform(mServerMsgUpdateMap.begin(), mServerMsgUpdateMap.end(), std::back_inserter(save), get_second<ServerMsgMap,RsGxsServerMsgUpdateItem>(mServType));
std::transform(mServerGrpConfigMap.begin(), mServerGrpConfigMap.end(), std::back_inserter(save), get_second<GrpConfigMap,RsGxsGrpConfigItem>(mServType));
save.push_back(mGrpServerUpdateItem);
RsGxsServerGrpUpdateItem *it = new RsGxsServerGrpUpdateItem(mGrpServerUpdate,mServType) ;
cleanup = false;
save.push_back(it);
cleanup = true;
return true;
}
@ -1875,16 +1858,13 @@ void RsGxsNetService::updateServerSyncTS()
// if needed.
// as a grp list server also note this is the latest item you have
if(mGrpServerUpdateItem == NULL)
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
// then remove from mServerMsgUpdateMap, all items that are not in the group list!
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << " cleaning server map of groups with no data:" << std::endl;
#endif
for(std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*>::iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();)
for(ServerMsgMap::iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();)
if(gxsMap.find(it->first) == gxsMap.end())
{
// not found! Removing server update info for this group
@ -1892,7 +1872,7 @@ void RsGxsNetService::updateServerSyncTS()
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(it->first) << " removing server update info for group " << it->first << std::endl;
#endif
std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*>::iterator tmp(it) ;
ServerMsgMap::iterator tmp(it) ;
++tmp ;
mServerMsgUpdateMap.erase(it) ;
it = tmp ;
@ -1931,15 +1911,15 @@ void RsGxsNetService::updateServerSyncTS()
GXSNETDEBUG__G(mit->first) << " Group " << mit->first << " is conditionned to circle " << mit->second->mCircleId << ". local Grp TS=" << time(NULL) - mGrpServerUpdateItem->grpUpdateTS << " secs ago, circle grp server update TS=" << time(NULL) - circle_group_server_ts << " secs ago";
#endif
if(circle_group_server_ts > mGrpServerUpdateItem->grpUpdateTS)
{
if(circle_group_server_ts > mGrpServerUpdate.grpUpdateTS)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(mit->first) << " - Updating local Grp Server update TS to follow changes in circles." << std::endl;
GXSNETDEBUG__G(mit->first) << " - Updating local Grp Server update TS to follow changes in circles." << std::endl;
#endif
RS_STACK_MUTEX(mNxsMutex) ;
mGrpServerUpdateItem->grpUpdateTS = circle_group_server_ts ;
}
RS_STACK_MUTEX(mNxsMutex) ;
mGrpServerUpdate.grpUpdateTS = circle_group_server_ts ;
}
#ifdef NXS_NET_DEBUG_0
else
GXSNETDEBUG__G(mit->first) << " - Nothing to do." << std::endl;
@ -1952,7 +1932,6 @@ void RsGxsNetService::updateServerSyncTS()
RS_STACK_MUTEX(mNxsMutex) ;
const RsGxsGrpMetaData* grpMeta = mit->second;
RsGxsServerMsgUpdateItem* msui = NULL;
#ifdef TO_REMOVE
// That accounts for modification of the meta data.
@ -1965,20 +1944,10 @@ void RsGxsNetService::updateServerSyncTS()
}
#endif
ServerMsgMap::iterator mapIT = mServerMsgUpdateMap.find(grpId);
// I keep the creation, but the data is not used yet.
if(mapIT == mServerMsgUpdateMap.end())
{
msui = new RsGxsServerMsgUpdateItem(mServType);
msui->grpId = grpMeta->mGroupId;
mServerMsgUpdateMap.insert(std::make_pair(msui->grpId, msui));
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << " created new entry for group " << grpId << std::endl;
#endif
}
else
msui = mapIT->second;
RsGxsServerMsgUpdate& msui(mServerMsgUpdateMap[grpId]) ;
msui.grpId = grpMeta->mGroupId;
// (cyril) I'm removing this, because the msgUpdateTS is updated when new messages are received by calling locked_stampMsgServerUpdateTS().
// mLastPost is actually updated somewhere when loading group meta data. It's not clear yet whether it is set to the latest publish time (wrong)
@ -1998,12 +1967,12 @@ void RsGxsNetService::updateServerSyncTS()
// will then be compared and pssibly trigger a MetaData transmission. mRecvTS is upated when creating, receiving for the first time, or receiving
// an update, all in rsgenexchange.cc, after group/update validation. It is therefore a local TS, that can be compared to grpUpdateTS (same machine).
if(mGrpServerUpdateItem->grpUpdateTS < grpMeta->mRecvTS)
if(mGrpServerUpdate.grpUpdateTS < grpMeta->mRecvTS)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << " updated msgUpdateTS to last RecvTS = " << time(NULL) - grpMeta->mRecvTS << " secs ago for group "<< grpId << ". This is probably because an update has been received." << std::endl;
#endif
mGrpServerUpdateItem->grpUpdateTS = grpMeta->mRecvTS;
mGrpServerUpdate.grpUpdateTS = grpMeta->mRecvTS;
change = true;
}
}
@ -2272,12 +2241,12 @@ bool RsGxsNetService::getGroupNetworkStats(const RsGxsGroupId& gid,RsGroupNetwor
{
RS_STACK_MUTEX(mNxsMutex) ;
std::map<RsGxsGroupId,RsGroupNetworkStatsRecord>::const_iterator it = mGroupNetworkStats.find(gid) ;
GrpConfigMap::const_iterator it ( mServerGrpConfigMap.find(gid) );
if(it == mGroupNetworkStats.end())
if(it == mServerGrpConfigMap.end())
return false ;
stats.mSuppliers = it->second.suppliers.size();
stats.mSuppliers = it->second.suppliers.ids.size();
stats.mMaxVisibleCount = it->second.max_visible_count ;
return true ;
@ -2400,22 +2369,14 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
ClientGrpMap::iterator it = mClientGrpUpdateMap.find(peerFrom);
RsGxsGrpUpdateItem* item = NULL;
RsGxsGrpUpdate& item(mClientGrpUpdateMap[peerFrom]) ;
if(it != mClientGrpUpdateMap.end())
{
item = it->second;
}else
{
item = new RsGxsGrpUpdateItem(mServType);
mClientGrpUpdateMap.insert(std::make_pair(peerFrom, item));
}
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " and updating mClientGrpUpdateMap for peer " << peerFrom << " of new time stamp " << nice_time_stamp(time(NULL),updateTS) << std::endl;
#endif
item->grpUpdateTS = updateTS;
item->peerId = peerFrom;
item.grpUpdateTS = updateTS;
item.peerID = peerFrom;
IndicateConfigChanged();
@ -2507,32 +2468,17 @@ void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransacItem *nxsTrans, c
// firts check if peer exists
const RsPeerId& peerFrom = nxsTrans->PeerId();
ClientMsgMap::iterator it = mClientMsgUpdateMap.find(peerFrom);
if(peerFrom.isNull())
{
std::cerr << "(EE) update from null peer!" << std::endl;
print_stacktrace() ;
}
RsGxsMsgUpdateItem* mui = NULL;
RsGxsMsgUpdate& mui(mClientMsgUpdateMap[peerFrom]) ;
// now update the peer's entry for this grp id
if(it != mClientMsgUpdateMap.end())
{
mui = it->second;
}
else
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(nxsTrans->PeerId(),grpId) << " created new entry." << std::endl;
#endif
mui = new RsGxsMsgUpdateItem(mServType);
mClientMsgUpdateMap.insert(std::make_pair(peerFrom, mui));
}
mui->peerId = peerFrom;
mui.peerID = peerFrom;
if(mPartialMsgUpdates[peerFrom].find(grpId) != mPartialMsgUpdates[peerFrom].end())
{
@ -2545,7 +2491,7 @@ void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransacItem *nxsTrans, c
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(nxsTrans->PeerId(),grpId) << " this is a full update. Updating time stamp." << std::endl;
#endif
mui->msgUpdateInfos[grpId].time_stamp = nxsTrans->updateTS;
mui.msgUpdateInfos[grpId].time_stamp = nxsTrans->updateTS;
IndicateConfigChanged();
}
}
@ -2692,15 +2638,15 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
uint32_t mcount = msgItemL.size() ;
RsPeerId pid = msgItemL.front()->PeerId() ;
RsGroupNetworkStatsRecord& gnsr = mGroupNetworkStats[grpId];
RsGxsGrpConfig& gnsr(mServerGrpConfigMap[grpId]) ;
std::set<RsPeerId>::size_type oldSuppliersCount = gnsr.suppliers.size();
std::set<RsPeerId>::size_type oldSuppliersCount = gnsr.suppliers.ids.size();
uint32_t oldVisibleCount = gnsr.max_visible_count;
gnsr.suppliers.insert(pid) ;
gnsr.suppliers.ids.insert(pid) ;
gnsr.max_visible_count = std::max(gnsr.max_visible_count, mcount) ;
if (oldVisibleCount != gnsr.max_visible_count || oldSuppliersCount != gnsr.suppliers.size())
if (oldVisibleCount != gnsr.max_visible_count || oldSuppliersCount != gnsr.suppliers.ids.size())
mObserver->notifyChangedGroupStats(grpId);
#ifdef NXS_NET_DEBUG_1
@ -2938,22 +2884,11 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages)
{
std::map<RsPeerId,RsGxsMsgUpdateItem*>::iterator it = mClientMsgUpdateMap.find(pid) ;
RsGxsMsgUpdate& up(mClientMsgUpdateMap[pid]);
RsGxsMsgUpdateItem *pitem;
if(it == mClientMsgUpdateMap.end())
{
pitem = new RsGxsMsgUpdateItem(mServType) ;
pitem->peerId = pid ;
mClientMsgUpdateMap[pid] = pitem ;
}
else
pitem = it->second ;
pitem->msgUpdateInfos[grpId].time_stamp = tm;
pitem->msgUpdateInfos[grpId].message_count = std::max(n_messages, pitem->msgUpdateInfos[grpId].message_count) ;
up.peerID = pid ;
up.msgUpdateInfos[grpId].time_stamp = tm;
up.msgUpdateInfos[grpId].message_count = std::max(n_messages, up.msgUpdateInfos[grpId].message_count) ;
IndicateConfigChanged();
}
@ -3129,20 +3064,14 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
locked_pushGrpTransactionFromList(reqList, tr->mTransaction->PeerId(), transN);
else
{
ClientGrpMap::iterator it = mClientGrpUpdateMap.find(tr->mTransaction->PeerId());
RsGxsGrpUpdateItem* item = NULL;
if(it != mClientGrpUpdateMap.end())
item = it->second;
else
{
item = new RsGxsGrpUpdateItem(mServType);
mClientGrpUpdateMap.insert(std::make_pair(tr->mTransaction->PeerId(), item));
}
RsGxsGrpUpdate& item (mClientGrpUpdateMap[tr->mTransaction->PeerId()]);
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << " reqList is empty, updating anyway ClientGrpUpdate TS for peer " << tr->mTransaction->PeerId() << " to: " << tr->mTransaction->updateTS << std::endl;
#endif
item->grpUpdateTS = tr->mTransaction->updateTS;
item->peerId = tr->mTransaction->PeerId();
item.grpUpdateTS = tr->mTransaction->updateTS;
item.peerID = tr->mTransaction->PeerId();
IndicateConfigChanged();
}
}
@ -3201,8 +3130,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
}
uint32_t updateTS = 0;
if(mGrpServerUpdateItem)
updateTS = mGrpServerUpdateItem->grpUpdateTS;
updateTS = mGrpServerUpdate.grpUpdateTS;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (tr->mTransaction->PeerId()) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending global group TS "
@ -3492,11 +3420,7 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
// now send a transaction item and store the transaction data
uint32_t updateTS = 0;
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(grpId);
if(cit != mServerMsgUpdateMap.end())
updateTS = cit->second->msgUpdateTS;
uint32_t updateTS = mServerMsgUpdateMap[grpId].msgUpdateTS;
RsNxsTransacItem* ntr = new RsNxsTransacItem(mServType);
ntr->transactionNumber = transN;
@ -3824,7 +3748,8 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_ (peer) << "Setting tr->mTransaction->updateTS to " << mGrpServerUpdateItem->grpUpdateTS << std::endl;
#endif
trItem->updateTS = mGrpServerUpdateItem->grpUpdateTS;
trItem->updateTS = mGrpServerUpdate.grpUpdateTS;
// also make a copy for the resident transaction
tr->mTransaction = new RsNxsTransacItem(*trItem);
tr->mTransaction->PeerId(mOwnId);
@ -3851,19 +3776,11 @@ bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncGrpReqItem *item)
// Because this is the global modification time for groups, async-ed computers will eventually figure out that their data needs
// to be synced.
if(mGrpServerUpdateItem)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(item->PeerId()) << " local modification time stamp: " << std::dec<< time(NULL) - mGrpServerUpdateItem->grpUpdateTS << " secs ago. Update sent: " <<
((item->updateTS < mGrpServerUpdateItem->grpUpdateTS)?"YES":"NO") << std::endl;
GXSNETDEBUG_P_(item->PeerId()) << " local modification time stamp: " << std::dec<< time(NULL) - mGrpServerUpdate.grpUpdateTS << " secs ago. Update sent: " <<
((item->updateTS < mGrpServerUpdate.grpUpdateTS)?"YES":"NO") << std::endl;
#endif
return item->updateTS < mGrpServerUpdateItem->grpUpdateTS;
}
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(item->PeerId()) << " no local time stamp. This will be fixed after updateServerSyncTS(). Not sending for now. " << std::endl;
#endif
return false;
return item->updateTS < mGrpServerUpdate.grpUpdateTS;
}
void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrpReqItem *item)
@ -4109,7 +4026,7 @@ bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& gr
#endif
grp_is_known = true ;
return item->updateTS < it->second->msgUpdateTS ;
return item->updateTS < it->second.msgUpdateTS ;
}
return false ;
@ -4118,13 +4035,11 @@ bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& gr
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(item->grpId);
if(cit != mServerMsgUpdateMap.end())
{
const RsGxsServerMsgUpdateItem *msui = cit->second;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " local time stamp: " << std::dec<< time(NULL) - msui->msgUpdateTS << " secs ago. Update sent: " << (item->updateTS < msui->msgUpdateTS) << std::endl;
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " local time stamp: " << std::dec<< time(NULL) - cit->second.msgUpdateTS << " secs ago. Update sent: " << (item->updateTS < cit->second.msgUpdateTS) << std::endl;
#endif
grp_is_known = true ;
return item->updateTS < msui->msgUpdateTS ;
return item->updateTS < cit->second.msgUpdateTS ;
}
#ifdef NXS_NET_DEBUG_0
@ -4160,8 +4075,8 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
if(grp_is_known)
{
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[item->grpId]) ; // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry
rec.suppliers.insert(peer) ;
RsGxsGrpConfig & rec(mServerGrpConfigMap[item->grpId]) ; // this creates it if needed. When the grp is unknown (and hashed) this will would create a unused entry
rec.suppliers.ids.insert(peer) ;
}
if(!peer_can_receive_update)
{
@ -4329,31 +4244,21 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, c
tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(grp_id);
// This time stamp is not supposed to be used on the other side. We just set it to avoid sending an uninitialiszed value.
if(cit != mServerMsgUpdateMap.end())
trItem->updateTS = cit->second->msgUpdateTS;
else
{
std::cerr << "(EE) cannot find a server TS for message of group " << grp_id << " in locked_pushMsgRespFromList. This is weird." << std::endl;
trItem->updateTS = 0 ;
}
// This time stamp is not supposed to be used on the other side. We just set it to avoid sending an uninitialiszed value.
trItem->updateTS = mServerMsgUpdateMap[grp_id].msgUpdateTS;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (sslId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending messages response to peer "
<< sslId << " with " << itemL.size() << " messages " << std::endl;
GXSNETDEBUG_P_ (sslId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending messages response to peer "
<< sslId << " with " << itemL.size() << " messages " << std::endl;
#endif
// signal peer to prepare for transaction
if(locked_addTransaction(tr))
sendItem(trItem);
else
{
delete tr ;
delete trItem ;
}
else
{
delete tr ;
delete trItem ;
}
}
bool RsGxsNetService::canSendMsgIds(std::vector<RsGxsMsgMetaData*>& msgMetas, const RsGxsGrpMetaData& grpMeta, const RsPeerId& sslId,RsGxsCircleId& should_encrypt_id)
@ -4780,17 +4685,14 @@ bool RsGxsNetService::getGroupServerUpdateTS(const RsGxsGroupId& gid,time_t& gro
{
RS_STACK_MUTEX(mNxsMutex) ;
if(mGrpServerUpdateItem == NULL)
return false ;
group_server_update_TS = mGrpServerUpdate.grpUpdateTS ;
group_server_update_TS = mGrpServerUpdateItem->grpUpdateTS ;
std::map<RsGxsGroupId,RsGxsServerMsgUpdateItem*>::iterator it = mServerMsgUpdateMap.find(gid) ;
ServerMsgMap::iterator it = mServerMsgUpdateMap.find(gid) ;
if(mServerMsgUpdateMap.end() == it)
msg_server_update_TS = 0 ;
else
msg_server_update_TS = it->second->msgUpdateTS ;
msg_server_update_TS = it->second.msgUpdateTS ;
return true ;
}
@ -4804,16 +4706,10 @@ bool RsGxsNetService::stampMsgServerUpdateTS(const RsGxsGroupId& gid)
bool RsGxsNetService::locked_stampMsgServerUpdateTS(const RsGxsGroupId& gid)
{
std::map<RsGxsGroupId,RsGxsServerMsgUpdateItem*>::iterator it = mServerMsgUpdateMap.find(gid) ;
RsGxsServerMsgUpdate& m(mServerMsgUpdateMap[gid]);
if(mServerMsgUpdateMap.end() == it)
{
RsGxsServerMsgUpdateItem *item = new RsGxsServerMsgUpdateItem(mServType);
item->grpId = gid ;
item->msgUpdateTS = time(NULL) ;
}
else
it->second->msgUpdateTS = time(NULL) ;
m.grpId = gid ;
m.msgUpdateTS = time(NULL) ;
return true;
}

View File

@ -542,10 +542,10 @@ private:
public:
typedef std::map<RsPeerId, RsGxsMsgUpdateItem*> ClientMsgMap;
typedef std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*> ServerMsgMap;
typedef std::map<RsPeerId, RsGxsGrpUpdateItem*> ClientGrpMap;
typedef std::map<RsGxsGroupId, RsGxsGrpConfigItem*> GrpConfigMap;
typedef std::map<RsPeerId, RsGxsMsgUpdate> ClientMsgMap;
typedef std::map<RsGxsGroupId, RsGxsServerMsgUpdate> ServerMsgMap;
typedef std::map<RsPeerId, RsGxsGrpUpdate> ClientGrpMap;
typedef std::map<RsGxsGroupId, RsGxsGrpConfig> GrpConfigMap;
private:
ClientMsgMap mClientMsgUpdateMap;
@ -553,7 +553,7 @@ private:
ClientGrpMap mClientGrpUpdateMap;
GrpConfigMap mServerGrpConfigMap;
RsGxsServerGrpUpdateItem* mGrpServerUpdateItem;
RsGxsServerGrpUpdate mGrpServerUpdate;
RsServiceInfo mServiceInfo;
std::map<RsGxsMessageId,time_t> mRejectedMessages;

View File

@ -29,13 +29,13 @@
void RsGxsGrpUpdateItem::clear()
{
grpUpdateTS = 0;
peerId.clear();
peerID.clear();
}
std::ostream& RsGxsGrpUpdateItem::print(std::ostream& out, uint16_t indent)
{
printRsItemBase(out, "RsGxsGrpUpdateItem", indent);
uint16_t int_Indent = indent + 2;
out << "peerId: " << peerId << std::endl;
out << "peerId: " << peerID << std::endl;
printIndent(out, int_Indent);
out << "grpUpdateTS: " << grpUpdateTS << std::endl;
printIndent(out, int_Indent);
@ -47,7 +47,7 @@ std::ostream& RsGxsGrpUpdateItem::print(std::ostream& out, uint16_t indent)
void RsGxsMsgUpdateItem::clear()
{
msgUpdateInfos.clear();
peerId.clear();
peerID.clear();
}
std::ostream& RsGxsMsgUpdateItem::print(std::ostream& out, uint16_t indent)
@ -209,7 +209,7 @@ RsItem* RsGxsUpdateSerialiser::deserialise(void* data, uint32_t* size)
uint32_t RsGxsUpdateSerialiser::sizeGxsGrpUpdate(RsGxsGrpUpdateItem* item)
{
uint32_t s = 8; // header size
s += item->peerId.serial_size();
s += item->peerID.serial_size();
s += 4; // mUpdateTS
return s;
}
@ -250,7 +250,7 @@ bool RsGxsUpdateSerialiser::serialiseGxsGrpUpdate(RsGxsGrpUpdateItem* item,
/* RsGxsGrpUpdateItem */
ok &= item->peerId.serialise(data, *size, offset) ;
ok &= item->peerID.serialise(data, *size, offset) ;
ok &= setRawUInt32(data, *size, &offset, item->grpUpdateTS);
if(offset != tlvsize){
@ -356,7 +356,7 @@ RsGxsGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsGrpUpddate(void* data, uin
/* skip the header */
offset += 8;
ok &= item->peerId.deserialise(data, *size, offset) ;
ok &= item->peerID.deserialise(data, *size, offset) ;
ok &= getRawUInt32(data, *size, &offset, &(item->grpUpdateTS));
if (offset != rssize)
@ -449,7 +449,7 @@ RsGxsServerGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerGrpUpddate(voi
uint32_t RsGxsUpdateSerialiser::sizeGxsMsgUpdate(RsGxsMsgUpdateItem* item)
{
uint32_t s = 8; // header size
s += item->peerId.serial_size() ;//GetTlvStringSize(item->peerId);
s += item->peerID.serial_size() ;//GetTlvStringSize(item->peerId);
s += item->msgUpdateInfos.size() * (4 + 4 + RsGxsGroupId::serial_size());
s += 4; // number of map items
@ -495,7 +495,7 @@ bool RsGxsUpdateSerialiser::serialiseGxsMsgUpdate(RsGxsMsgUpdateItem* item,
/* RsGxsMsgUpdateItem */
ok &= item->peerId.serialise(data, *size, offset) ;
ok &= item->peerID.serialise(data, *size, offset) ;
std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator cit(item->msgUpdateInfos.begin());
@ -616,7 +616,7 @@ RsGxsMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsMsgUpdate(void* data,
/* skip the header */
offset += 8;
ok &= item->peerId.deserialise(data, *size, offset) ;
ok &= item->peerID.deserialise(data, *size, offset) ;
uint32_t numUpdateItems;
ok &= getRawUInt32(data, *size, &offset, &(numUpdateItems));
std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>& msgUpdateInfos = item->msgUpdateInfos;

View File

@ -49,69 +49,85 @@ const uint8_t RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE = 0x04;
const uint8_t RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE = 0x08;
const uint8_t RS_PKT_SUBTYPE_GXS_GRP_CONFIG = 0x09;
class RsGxsGrpConfigItem : public RsItem {
class RsGxsGrpConfig
{
public:
RsGxsGrpConfigItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_GRP_CONFIG)
{
msg_keep_delay = RS_GXS_DEFAULT_MSG_STORE_PERIOD ;
msg_send_delay = RS_GXS_DEFAULT_MSG_SEND_PERIOD ;
msg_req_delay = RS_GXS_DEFAULT_MSG_REQ_PERIOD ;
RsGxsGrpConfig()
{
msg_keep_delay = RS_GXS_DEFAULT_MSG_STORE_PERIOD ;
msg_send_delay = RS_GXS_DEFAULT_MSG_SEND_PERIOD ;
msg_req_delay = RS_GXS_DEFAULT_MSG_REQ_PERIOD ;
max_visible_count = 0 ;
update_TS = 0 ;
}
max_visible_count = 0 ;
update_TS = 0 ;
}
RsGxsGroupId grpId ;
uint32_t msg_keep_delay ; // delay after which we discard the posts
uint32_t msg_send_delay ; // delay after which we dont send the posts anymore
uint32_t msg_req_delay ; // delay after which we dont get the posts from friends
RsTlvPeerIdSet suppliers; // list of friends who feed this group
uint32_t max_visible_count ; // max visible count reported by contributing friends
time_t update_TS ; // last time the max visible count was updated.
};
class RsGxsGrpConfigItem : public RsItem, public RsGxsGrpConfig
{
public:
RsGxsGrpConfigItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_GRP_CONFIG) {}
RsGxsGrpConfigItem(const RsGxsGrpConfig& m,uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_GRP_CONFIG),RsGxsGrpConfig(m) {}
virtual ~RsGxsGrpConfigItem() {}
virtual void clear() {}
virtual std::ostream &print(std::ostream &out, uint16_t indent) { return out;}
RsGxsGroupId grpId ;
uint32_t msg_keep_delay ; // delay after which we discard the posts
uint32_t msg_send_delay ; // delay after which we dont send the posts anymore
uint32_t msg_req_delay ; // delay after which we dont get the posts from friends
RsTlvPeerIdSet suppliers; // list of friends who feed this group
uint32_t max_visible_count ; // max visible count reported by contributing friends
time_t update_TS ; // last time the max visible count was updated.
};
class RsGxsGrpUpdateItem : public RsItem {
class RsGxsGrpUpdate
{
public:
RsGxsGrpUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType,
RS_PKT_SUBTYPE_GXS_GRP_UPDATE)
{clear();}
RsGxsGrpUpdate() { grpUpdateTS=0;}
RsPeerId peerID;
uint32_t grpUpdateTS;
};
class RsGxsGrpUpdateItem : public RsItem, public RsGxsGrpUpdate
{
public:
RsGxsGrpUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_GRP_UPDATE) {clear();}
RsGxsGrpUpdateItem(const RsGxsGrpUpdate& u,uint16_t serv_type) : RsGxsGrpUpdate(u),RsItem(RS_PKT_VERSION_SERVICE, serv_type, RS_PKT_SUBTYPE_GXS_GRP_UPDATE) {clear();}
virtual ~RsGxsGrpUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
RsPeerId peerId;
uint32_t grpUpdateTS;
};
class RsGxsServerGrpUpdateItem : public RsItem {
class RsGxsServerGrpUpdate
{
public:
RsGxsServerGrpUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType,
RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE)
{ clear();}
RsGxsServerGrpUpdate() { grpUpdateTS = 0 ; }
uint32_t grpUpdateTS;
};
class RsGxsServerGrpUpdateItem : public RsItem, public RsGxsServerGrpUpdate
{
public:
RsGxsServerGrpUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE) { clear();}
RsGxsServerGrpUpdateItem(const RsGxsServerGrpUpdate& u,uint16_t serv_type) : RsGxsServerGrpUpdate(u),RsItem(RS_PKT_VERSION_SERVICE, serv_type, RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE) {clear();}
virtual ~RsGxsServerGrpUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
uint32_t grpUpdateTS;
};
class RsGxsMsgUpdateItem : public RsItem
class RsGxsMsgUpdate
{
public:
RsGxsMsgUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_MSG_UPDATE)
{ clear();}
virtual ~RsGxsMsgUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
struct MsgUpdateInfo
{
MsgUpdateInfo(): time_stamp(0), message_count(0) {}
@ -120,23 +136,40 @@ public:
uint32_t message_count ;
};
RsPeerId peerId;
RsPeerId peerID;
std::map<RsGxsGroupId, MsgUpdateInfo> msgUpdateInfos;
};
class RsGxsServerMsgUpdateItem : public RsItem
class RsGxsMsgUpdateItem : public RsItem, public RsGxsMsgUpdate
{
public:
RsGxsServerMsgUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE,
servType, RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE)
{ clear();}
RsGxsMsgUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_MSG_UPDATE) { clear();}
RsGxsMsgUpdateItem(const RsGxsMsgUpdate& m,uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_MSG_UPDATE), RsGxsMsgUpdate(m) { clear();}
virtual ~RsGxsMsgUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
};
class RsGxsServerMsgUpdate
{
public:
RsGxsServerMsgUpdate() { msgUpdateTS = 0 ;}
RsGxsGroupId grpId;
uint32_t msgUpdateTS; // local time stamp this group last received a new msg
};
class RsGxsServerMsgUpdateItem : public RsItem, public RsGxsServerMsgUpdate
{
public:
RsGxsServerMsgUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE) { clear();}
RsGxsServerMsgUpdateItem(const RsGxsServerMsgUpdate& m,uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE),RsGxsServerMsgUpdate(m) { clear();}
virtual ~RsGxsServerMsgUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
RsGxsGroupId grpId;
uint32_t msgUpdateTS; // local time stamp this group last received a new msg
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
};