Merge pull request #592 from csoler/v0.6-GXS-LimitedSync2

V0.6 gxs limited sync2
This commit is contained in:
csoler 2016-12-14 00:09:54 +01:00 committed by GitHub
commit 12023397ee
25 changed files with 1169 additions and 990 deletions

View File

@ -206,8 +206,8 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
RsDataService::~RsDataService(){
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::~RsDataService()";
std::cerr << std::endl;
std::cerr << "RsDataService::~RsDataService()";
std::cerr << std::endl;
#endif
mDb->closeDb();
@ -489,8 +489,8 @@ bool RsDataService::finishReleaseUpdate(int release, bool result)
RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c, int colOffset)
{
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::locked_getGrpMeta()";
std::cerr << std::endl;
std::cerr << "RsDataService::locked_getGrpMeta()";
std::cerr << std::endl;
#endif
RsGxsGrpMetaData* grpMeta = new RsGxsGrpMetaData();
@ -528,8 +528,8 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c, int colOffset
if(data)
ok &= grpMeta->keys.GetTlv(data, data_len, &offset);
else
grpMeta->keys.TlvClear() ;
else
grpMeta->keys.TlvClear() ;
// local meta
grpMeta->mSubscribeFlags = c.getInt32(mColGrpMeta_SubscrFlag + colOffset);
@ -717,7 +717,7 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
RsNxsMsg* msgPtr = mit->first;
RsGxsMsgMetaData* msgMetaPtr = mit->second;
#ifdef RS_DATA_SERVICE_DEBUG
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::storeMessage() ";
std::cerr << " GroupId: " << msgMetaPtr->mGroupId.toStdString();
std::cerr << " MessageId: " << msgMetaPtr->mMsgId.toStdString();
@ -792,12 +792,12 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
for(mit = msg.begin(); mit != msg.end(); ++mit)
{
//TODO: API encourages aliasing, remove this abomination
if(mit->second != mit->first->metaData)
delete mit->second;
//TODO: API encourages aliasing, remove this abomination
if(mit->second != mit->first->metaData)
delete mit->second;
delete mit->first;
;
delete mit->first;
;
}
return ret;
@ -805,9 +805,9 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
bool RsDataService::validSize(RsNxsMsg* msg) const
{
if((msg->msg.TlvSize() + msg->meta.TlvSize()) <= GXS_MAX_ITEM_SIZE) return true;
if((msg->msg.TlvSize() + msg->meta.TlvSize()) <= GXS_MAX_ITEM_SIZE) return true;
return false;
return false;
}
@ -830,11 +830,11 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
// if data is larger than max item size do not add
if(!validSize(grpPtr)) continue;
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::storeGroup() GrpId: " << grpPtr->grpId.toStdString();
std::cerr << " CircleType: " << (uint32_t) grpMetaPtr->mCircleType;
std::cerr << " CircleId: " << grpMetaPtr->mCircleId.toStdString();
std::cerr << std::endl;
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::storeGroup() GrpId: " << grpPtr->grpId.toStdString();
std::cerr << " CircleType: " << (uint32_t) grpMetaPtr->mCircleType;
std::cerr << " CircleId: " << grpMetaPtr->mCircleId.toStdString();
std::cerr << std::endl;
#endif
/*!
@ -890,22 +890,22 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
locked_clearGrpMetaCache(grpMetaPtr->mGroupId);
if (!mDb->sqlInsert(GRP_TABLE_NAME, "", cv))
{
std::cerr << "RsDataService::storeGroup() sqlInsert Failed";
std::cerr << std::endl;
std::cerr << "\t For GroupId: " << grpMetaPtr->mGroupId.toStdString();
std::cerr << std::endl;
}
{
std::cerr << "RsDataService::storeGroup() sqlInsert Failed";
std::cerr << std::endl;
std::cerr << "\t For GroupId: " << grpMetaPtr->mGroupId.toStdString();
std::cerr << std::endl;
}
}
// finish transaction
bool ret = mDb->commitTransaction();
for(sit = grp.begin(); sit != grp.end(); ++sit)
{
//TODO: API encourages aliasing, remove this abomination
if(sit->second != sit->first->metaData)
delete sit->second;
delete sit->first;
//TODO: API encourages aliasing, remove this abomination
if(sit->second != sit->first->metaData)
delete sit->second;
delete sit->first;
}
@ -993,10 +993,10 @@ int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
for(sit = grp.begin(); sit != grp.end(); ++sit)
{
//TODO: API encourages aliasing, remove this abomination
if(sit->second != sit->first->metaData)
delete sit->second;
delete sit->first;
//TODO: API encourages aliasing, remove this abomination
if(sit->second != sit->first->metaData)
delete sit->second;
delete sit->first;
}
@ -1031,8 +1031,8 @@ int RsDataService::updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurity
bool RsDataService::validSize(RsNxsGrp* grp) const
{
if((grp->grp.TlvSize() + grp->meta.TlvSize()) <= GXS_MAX_ITEM_SIZE) return true;
return false;
if((grp->grp.TlvSize() + grp->meta.TlvSize()) <= GXS_MAX_ITEM_SIZE) return true;
return false;
}
int RsDataService::retrieveNxsGrps(std::map<RsGxsGroupId, RsNxsGrp *> &grp, bool withMeta, bool /* cache */)
@ -1306,8 +1306,8 @@ void RsDataService::locked_retrieveMsgMeta(RetroCursor *c, std::vector<RsGxsMsgM
int RsDataService::retrieveGxsGrpMetaData(std::map<RsGxsGroupId, RsGxsGrpMetaData *>& grp)
{
#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::retrieveGxsGrpMetaData()";
std::cerr << std::endl;
std::cerr << "RsDataService::retrieveGxsGrpMetaData()";
std::cerr << std::endl;
#endif
RsStackMutex stack(mDbMutex);
@ -1478,7 +1478,7 @@ int RsDataService::updateMessageMetaData(MsgLocMetaData &metaData)
std::cerr << (void*)this << ": Updating Msg Meta data: grpId = " << metaData.msgId.first << " msgId = " << metaData.msgId.second << std::endl;
#endif
RsStackMutex stack(mDbMutex);
RsStackMutex stack(mDbMutex);
RsGxsGroupId& grpId = metaData.msgId.first;
RsGxsMessageId& msgId = metaData.msgId.second;
return mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString()
@ -1487,20 +1487,20 @@ int RsDataService::updateMessageMetaData(MsgLocMetaData &metaData)
int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
{
RsStackMutex stack(mDbMutex);
RsStackMutex stack(mDbMutex);
GxsMsgReq::const_iterator mit = msgIds.begin();
GxsMsgReq::const_iterator mit = msgIds.begin();
for(; mit != msgIds.end(); ++mit)
{
const std::vector<RsGxsMessageId>& msgIdV = mit->second;
const RsGxsGroupId& grpId = mit->first;
for(; mit != msgIds.end(); ++mit)
{
const std::vector<RsGxsMessageId>& msgIdV = mit->second;
const RsGxsGroupId& grpId = mit->first;
// delete messages
GxsMsgReq msgsToDelete;
msgsToDelete[grpId] = msgIdV;
locked_removeMessageEntries(msgsToDelete);
}
// delete messages
GxsMsgReq msgsToDelete;
msgsToDelete[grpId] = msgIdV;
locked_removeMessageEntries(msgsToDelete);
}
return 1;
}
@ -1508,9 +1508,9 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
int RsDataService::removeGroups(const std::vector<RsGxsGroupId> &grpIds)
{
RsStackMutex stack(mDbMutex);
RsStackMutex stack(mDbMutex);
locked_removeGroupEntries(grpIds);
locked_removeGroupEntries(grpIds);
return 1;
}
@ -1520,79 +1520,79 @@ int RsDataService::retrieveGroupIds(std::vector<RsGxsGroupId> &grpIds)
RsStackMutex stack(mDbMutex);
#ifdef RS_DATA_SERVICE_DEBUG_TIME
RsScopeTimer timer("");
int resultCount = 0;
RsScopeTimer timer("");
int resultCount = 0;
#endif
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, mGrpIdColumn, "", "");
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, mGrpIdColumn, "", "");
if(c)
{
bool valid = c->moveToFirst();
if(c)
{
bool valid = c->moveToFirst();
while(valid)
{
std::string grpId;
c->getString(mColGrpId_GrpId, grpId);
grpIds.push_back(RsGxsGroupId(grpId));
valid = c->moveToNext();
while(valid)
{
std::string grpId;
c->getString(mColGrpId_GrpId, grpId);
grpIds.push_back(RsGxsGroupId(grpId));
valid = c->moveToNext();
#ifdef RS_DATA_SERVICE_DEBUG_TIME
++resultCount;
++resultCount;
#endif
}
delete c;
}else
{
return 0;
}
}
delete c;
}else
{
return 0;
}
#ifdef RS_DATA_SERVICE_DEBUG_TIME
std::cerr << "RsDataService::retrieveGroupIds() " << mDbName << ", Results: " << resultCount << ", Time: " << timer.duration() << std::endl;
std::cerr << "RsDataService::retrieveGroupIds() " << mDbName << ", Results: " << resultCount << ", Time: " << timer.duration() << std::endl;
#endif
return 1;
return 1;
}
int RsDataService::retrieveMsgIds(const RsGxsGroupId& grpId, RsGxsMessageId::std_vector& msgIds)
{
#ifdef RS_DATA_SERVICE_DEBUG_TIME
RsScopeTimer timer("");
int resultCount = 0;
RsScopeTimer timer("");
int resultCount = 0;
#endif
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgIdColumn, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgIdColumn, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
if(c)
{
bool valid = c->moveToFirst();
if(c)
{
bool valid = c->moveToFirst();
while(valid)
{
std::string msgId;
while(valid)
{
std::string msgId;
c->getString(mColMsgId_MsgId, msgId);
if(c->columnCount() != 1)
std::cerr << "(EE) ********* not retrieving all columns!!" << std::endl;
msgIds.push_back(RsGxsMessageId(msgId));
valid = c->moveToNext();
msgIds.push_back(RsGxsMessageId(msgId));
valid = c->moveToNext();
#ifdef RS_DATA_SERVICE_DEBUG_TIME
++resultCount;
++resultCount;
#endif
}
delete c;
}else
{
return 0;
}
}
delete c;
}else
{
return 0;
}
#ifdef RS_DATA_SERVICE_DEBUG_TIME
std::cerr << "RsDataService::retrieveNxsGrps() " << mDbName << ", Results: " << resultCount << ", Time: " << timer.duration() << std::endl;
std::cerr << "RsDataService::retrieveNxsGrps() " << mDbName << ", Results: " << resultCount << ", Time: " << timer.duration() << std::endl;
#endif
return 1;
return 1;
}
@ -1605,16 +1605,16 @@ bool RsDataService::locked_removeMessageEntries(const GxsMsgReq& msgIds)
for(; mit != msgIds.end(); ++mit)
{
const RsGxsGroupId& grpId = mit->first;
const std::vector<RsGxsMessageId>& msgsV = mit->second;
std::vector<RsGxsMessageId>::const_iterator vit = msgsV.begin();
const RsGxsGroupId& grpId = mit->first;
const std::vector<RsGxsMessageId>& msgsV = mit->second;
std::vector<RsGxsMessageId>::const_iterator vit = msgsV.begin();
for(; vit != msgsV.end(); ++vit)
{
const RsGxsMessageId& msgId = *vit;
mDb->sqlDelete(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString()
for(; vit != msgsV.end(); ++vit)
{
const RsGxsMessageId& msgId = *vit;
mDb->sqlDelete(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString()
+ "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", "");
}
}
}
ret &= mDb->commitTransaction();
@ -1632,8 +1632,8 @@ bool RsDataService::locked_removeGroupEntries(const std::vector<RsGxsGroupId>& g
for(; vit != grpIds.end(); ++vit)
{
const RsGxsGroupId& grpId = *vit;
mDb->sqlDelete(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
const RsGxsGroupId& grpId = *vit;
mDb->sqlDelete(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
}
ret &= mDb->commitTransaction();

View File

@ -128,11 +128,10 @@ bool RsGenExchange::getGroupServerUpdateTS(const RsGxsGroupId& gid, time_t& grp_
void RsGenExchange::data_tick()
{
static const double timeDelta = 0.1; // slow tick in sec
static const double timeDelta = 0.1; // slow tick in sec
tick();
usleep((int) (timeDelta * 1000 *1000)); // timeDelta sec
tick();
usleep((int) (timeDelta * 1000 *1000)); // timeDelta sec
}
void RsGenExchange::tick()
@ -181,9 +180,10 @@ void RsGenExchange::tick()
mLastClean = time(NULL);
}
}else
}
else
{
mMsgCleanUp = new RsGxsMessageCleanUp(mDataStore, MESSAGE_STORE_PERIOD, 1);
mMsgCleanUp = new RsGxsMessageCleanUp(mDataStore, this, 1);
mCleaning = true;
}
}
@ -229,7 +229,7 @@ void RsGenExchange::tick()
}
else
{
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,mGixs);
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,this,mGixs);
mIntegrityCheck->start("gxs integrity");
mChecking = true;
}
@ -1608,6 +1608,54 @@ void RsGenExchange::publishMsg(uint32_t& token, RsGxsMsgItem *msgItem)
}
uint32_t RsGenExchange::getDefaultSyncPeriod()
{
RS_STACK_MUTEX(mGenMtx) ;
if(mNetService != NULL)
return mNetService->getDefaultSyncAge();
else
{
std::cerr << "(EE) No network service available. Cannot get default sync period. " << std::endl;
return 0;
}
}
uint32_t RsGenExchange::getSyncPeriod(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mGenMtx) ;
if(mNetService != NULL)
return mNetService->getSyncAge(grpId);
else
return RS_GXS_DEFAULT_MSG_REQ_PERIOD;
}
void RsGenExchange::setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs)
{
if(mNetService != NULL)
return mNetService->setSyncAge(grpId,age_in_secs) ;
else
std::cerr << "(EE) No network service available. Cannot set storage period. " << std::endl;
}
uint32_t RsGenExchange::getStoragePeriod(const RsGxsGroupId& grpId)
{
RS_STACK_MUTEX(mGenMtx) ;
if(mNetService != NULL)
return mNetService->getKeepAge(grpId,MESSAGE_STORE_PERIOD) ;
else
return MESSAGE_STORE_PERIOD;
}
void RsGenExchange::setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs)
{
if(mNetService != NULL)
return mNetService->setKeepAge(grpId,age_in_secs) ;
else
std::cerr << "(EE) No network service available. Cannot set storage period. " << std::endl;
}
void RsGenExchange::setGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& flag, const uint32_t& mask)
{
/* TODO APPLY MASK TO FLAGS */
@ -2674,6 +2722,18 @@ void RsGenExchange::processRecvdMessages()
msg->metaData = meta;
// (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because
// RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process
// will indefinitly loop on the same 20 posts. Since the posts are there already, keeping them is the least problematique way to fix this problem.
//
// uint32_t max_sync_age = ( mNetService != NULL)?( mNetService->getSyncAge(msg->metaData->mGroupId)):RS_GXS_DEFAULT_MSG_REQ_PERIOD;
//
// if(max_sync_age != 0 && msg->metaData->mPublishTs + max_sync_age < time(NULL))
// {
// std::cerr << "(WW) not validating message " << msg->metaData->mMsgId << " in group " << msg->metaData->mGroupId << " because it is older than synchronisation limit. This message was probably sent by a friend node that does not accept sync limits already." << std::endl;
// ok = false ;
// }
#ifdef GEN_EXCH_DEBUG
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
#endif

View File

@ -39,8 +39,6 @@
#include "serialiser/rsnxsitems.h"
#include "rsgxsutil.h"
#define DEFAULT_MSG_STORE_PERIOD 60*60*24*31*4 // 4 months
template<class GxsItem, typename Identity = std::string>
class GxsPendingItem
{
@ -128,7 +126,7 @@ public:
*/
RsGenExchange(RsGeneralDataService* gds, RsNetworkExchangeService* ns,
RsSerialType* serviceSerialiser, uint16_t mServType, RsGixs* gixs, uint32_t authenPolicy,
uint32_t messageStorePeriod = DEFAULT_MSG_STORE_PERIOD);
uint32_t messageStorePeriod = RS_GXS_DEFAULT_MSG_STORE_PERIOD);
virtual ~RsGenExchange();
@ -613,11 +611,6 @@ public:
*/
void updateGroupLastMsgTimeStamp(uint32_t& token, const RsGxsGroupId& grpId);
/*!
* @return storage time of messages in months
*/
int getStoragePeriod(){ return MESSAGE_STORE_PERIOD/(60*60*24*31);}
/*!
* sets the msg status flag
* @param token this is set to token value associated to this request
@ -649,6 +642,20 @@ public:
*/
bool getGroupServerUpdateTS(const RsGxsGroupId& gid,time_t& grp_server_update_TS,time_t& msg_server_update_TS) ;
/*!
* \brief getDefaultStoragePeriod. All times in seconds.
* \return
*/
virtual uint32_t getDefaultStoragePeriod() { return MESSAGE_STORE_PERIOD; }
virtual uint32_t getStoragePeriod(const RsGxsGroupId& grpId) ;
virtual void setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) ;
virtual uint32_t getDefaultSyncPeriod();
virtual uint32_t getSyncPeriod(const RsGxsGroupId& grpId) ;
virtual void setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) ;
uint16_t serviceType() const { return mServType ; }
protected:
/** Notifications **/
@ -821,8 +828,6 @@ private:
*/
void removeDeleteExistingMessages(RsGeneralDataService::MsgStoreMap& msgs, GxsMsgReq& msgIdsNotify);
private:
RsMutex mGenMtx;
RsGxsDataAccess* mDataAccess;
RsGeneralDataService* mDataStore;
@ -862,7 +867,6 @@ private:
std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > mMsgPendingValidate;
typedef std::vector<GxsPendingItem<RsNxsMsg*, RsGxsGrpMsgIdPair> > NxsMsgPendingVect;
const uint32_t MESSAGE_STORE_PERIOD;
bool mCleaning;

View File

@ -32,8 +32,13 @@
/* data types used throughout Gxs from netservice to genexchange */
typedef std::map<RsGxsGroupId, std::vector<RsGxsMsgMetaData*> > GxsMsgMetaResult;
typedef std::map<RsGxsGroupId, std::vector<RsGxsMsgMetaData*> > GxsMsgMetaResult;
typedef std::map<RsGxsGrpMsgIdPair, std::vector<RsGxsMsgMetaData*> > MsgRelatedMetaResult;
// Default values that are used throughout GXS code
static const uint32_t RS_GXS_DEFAULT_MSG_STORE_PERIOD = 86400 * 30 * 6 ; // six months. Default time for which messages are keps in the database.
static const uint32_t RS_GXS_DEFAULT_MSG_SEND_PERIOD = 86400 * 30 * 1 ; // one month. Default delay after which we don't send messages
static const uint32_t RS_GXS_DEFAULT_MSG_REQ_PERIOD = 86400 * 30 * 1 ; // one month. Default Delay after which we don't request messages
#endif // RSGXS_H

File diff suppressed because it is too large Load Diff

View File

@ -103,11 +103,16 @@ public:
/*!
* Use this to set how far back synchronisation of messages should take place
* @param age the max age a sync item can to be allowed in a synchronisation
* Use this to set how far back synchronisation and storage of messages should take place
* @param age the max age a sync/storage item can to be allowed in a synchronisation
*/
// NOT IMPLEMENTED
virtual void setSyncAge(uint32_t age);
virtual void setSyncAge(const RsGxsGroupId& grpId,uint32_t age_in_secs);
virtual void setKeepAge(const RsGxsGroupId& grpId,uint32_t age_in_secs);
virtual uint32_t getSyncAge(const RsGxsGroupId& id);
virtual uint32_t getKeepAge(const RsGxsGroupId& id,uint32_t default_value);
virtual uint32_t getDefaultSyncAge() { return RS_GXS_DEFAULT_MSG_REQ_PERIOD ; }
/*!
* pauses synchronisation of subscribed groups and request for group id
@ -402,6 +407,7 @@ private:
bool locked_CanReceiveUpdate(const RsNxsSyncGrpReqItem *item);
bool locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item, bool &grp_is_known);
void locked_resetClientTS(const RsGxsGroupId& grpId);
static RsGxsGroupId hashGrpId(const RsGxsGroupId& gid,const RsPeerId& pid) ;
@ -482,6 +488,7 @@ private:
private:
static void locked_checkDelay(uint32_t& time_in_secs);
/*** transactions ***/
@ -542,18 +549,18 @@ private:
public:
typedef std::map<RsPeerId, RsGxsMsgUpdateItem*> ClientMsgMap;
typedef std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*> ServerMsgMap;
typedef std::map<RsPeerId, RsGxsGrpUpdateItem*> ClientGrpMap;
typedef std::map<RsPeerId, RsGxsMsgUpdate> ClientMsgMap;
typedef std::map<RsGxsGroupId, RsGxsServerMsgUpdate> ServerMsgMap;
typedef std::map<RsPeerId, RsGxsGrpUpdate> ClientGrpMap;
typedef std::map<RsGxsGroupId, RsGxsGrpConfig> GrpConfigMap;
private:
ClientMsgMap mClientMsgUpdateMap;
ServerMsgMap mServerMsgUpdateMap;
ClientGrpMap mClientGrpUpdateMap;
GrpConfigMap mServerGrpConfigMap;
std::map<RsGxsGroupId,RsGroupNetworkStatsRecord> mGroupNetworkStats ;
RsGxsServerGrpUpdateItem* mGrpServerUpdateItem;
RsGxsServerGrpUpdate mGrpServerUpdate;
RsServiceInfo mServiceInfo;
std::map<RsGxsMessageId,time_t> mRejectedMessages;

View File

@ -33,10 +33,12 @@
static const uint32_t MAX_GXS_IDS_REQUESTS_NET = 10 ; // max number of requests from cache/net (avoids killing the system!)
//#define GXSUTIL_DEBUG 1
//#define DEBUG_GXSUTIL 1
RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize)
: mDs(dataService), MESSAGE_STORE_PERIOD(messageStorePeriod), CHUNK_SIZE(chunkSize)
#define GXSUTIL_DEBUG() std::cerr << time(NULL) << " : GXS_UTIL : " << __FUNCTION__ << " : "
RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, RsGenExchange *genex, uint32_t chunkSize)
: mDs(dataService), mGenExchangeClient(genex), CHUNK_SIZE(chunkSize)
{
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMeta;
@ -45,18 +47,18 @@ RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator cit = grpMeta.begin();
for(;cit != grpMeta.end(); ++cit)
{
mGrpMeta.push_back(cit->second);
}
}
bool RsGxsMessageCleanUp::clean()
{
uint32_t i = 1;
time_t now = time(NULL);
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " Cleaning up groups in service" << std::hex << mGenExchangeClient->serviceType() << std::dec << std::endl;
#endif
while(!mGrpMeta.empty())
{
RsGxsGrpMetaData* grpMeta = mGrpMeta.back();
@ -70,8 +72,13 @@ bool RsGxsMessageCleanUp::clean()
GxsMsgMetaResult::iterator mit = result.begin();
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " Cleaning up group message for group ID " << grpId << std::endl;
#endif
req.clear();
uint32_t store_period = mGenExchangeClient->getStoragePeriod(grpId) ;
for(; mit != result.end(); ++mit)
{
std::vector<RsGxsMsgMetaData*>& metaV = mit->second;
@ -82,7 +89,7 @@ bool RsGxsMessageCleanUp::clean()
RsGxsMsgMetaData* meta = *vit;
// check if expired
bool remove = (meta->mPublishTs + MESSAGE_STORE_PERIOD) < now;
bool remove = store_period > 0 && (meta->mPublishTs + store_period) < now;
// check client does not want the message kept regardless of age
remove &= !(meta->mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP);
@ -95,7 +102,7 @@ bool RsGxsMessageCleanUp::clean()
{
req[grpId].push_back(meta->mMsgId);
std::cerr << "Scheduling msg id " << meta->mMsgId << " in grp " << grpId << " for removal." << std::endl;
GXSUTIL_DEBUG() << " Scheduling msg id " << meta->mMsgId << " in grp " << grpId << " for removal." << std::endl;
}
delete meta;
@ -114,8 +121,8 @@ bool RsGxsMessageCleanUp::clean()
return mGrpMeta.empty();
}
RsGxsIntegrityCheck::RsGxsIntegrityCheck(RsGeneralDataService* const dataService, RsGixs *gixs) :
mDs(dataService), mDone(false), mIntegrityMutex("integrity"),mGixs(gixs)
RsGxsIntegrityCheck::RsGxsIntegrityCheck(RsGeneralDataService* const dataService, RsGenExchange *genex, RsGixs *gixs) :
mDs(dataService),mGenExchangeClient(genex), mDone(false), mIntegrityMutex("integrity"),mGixs(gixs)
{ }
void RsGxsIntegrityCheck::run()
@ -160,8 +167,8 @@ bool RsGxsIntegrityCheck::check()
if(!grp->metaData->mAuthorId.isNull())
{
#ifdef GXSUTIL_DEBUG
std::cerr << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->grpId << std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->grpId << std::endl;
#endif
if(rsIdentity!=NULL && !rsIdentity->isBanned(grp->metaData->mAuthorId))
@ -180,6 +187,22 @@ bool RsGxsIntegrityCheck::check()
{
grpsToDel.push_back(grp->grpId);
}
#ifdef TODO
if(!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED))
{
RsGroupNetworkStats stats ;
mGenExchangeClient->getGroupNetworkStats(grp->grpId,stats);
if(stats.mSuppliers == 0 && stats.mMaxVisibleCount == 0)
{
GXSUTIL_DEBUG() << "Scheduling group \"" << grp->metaData->mGroupName << "\" ID=" << grp->grpId << " for deletion because it has no suppliers not any visible data at friends." << std::endl;
#warning Should we do that here? What happens for groups that are normally empty such as identities?
grpsToDel.push_back(grp->grpId);
}
}
#endif
delete grp;
}
@ -243,8 +266,8 @@ bool RsGxsIntegrityCheck::check()
}
else if(!msg->metaData->mAuthorId.isNull() && subscribed_groups.find(msg->metaData->mGroupId)!=subscribed_groups.end())
{
#ifdef GXSUTIL_DEBUG
std::cerr << "TimeStamping message authors' key ID " << msg->metaData->mAuthorId << " in message " << msg->msgId << ", group ID " << msg->grpId<< std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "TimeStamping message authors' key ID " << msg->metaData->mAuthorId << " in message " << msg->msgId << ", group ID " << msg->grpId<< std::endl;
#endif
if(rsIdentity!=NULL && !rsIdentity->isBanned(msg->metaData->mAuthorId))
used_gxs_ids.insert(msg->metaData->mAuthorId) ;
@ -266,9 +289,9 @@ bool RsGxsIntegrityCheck::check()
}
mDeletedMsgs = msgsToDel;
#ifdef GXSUTIL_DEBUG
std::cerr << "At end of pass, this is the list used GXS ids: " << std::endl;
std::cerr << " requesting them to GXS identity service to enforce loading." << std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "At end of pass, this is the list used GXS ids: " << std::endl;
GXSUTIL_DEBUG() << " requesting them to GXS identity service to enforce loading." << std::endl;
#endif
std::list<RsPeerId> connected_friends ;
@ -279,14 +302,14 @@ bool RsGxsIntegrityCheck::check()
for(std::set<RsGxsId>::const_iterator it(used_gxs_ids.begin());it!=used_gxs_ids.end();++it)
{
gxs_ids.push_back(*it) ;
#ifdef GXSUTIL_DEBUG
std::cerr << " " << *it << std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " " << *it << std::endl;
#endif
}
uint32_t nb_requested_not_in_cache = 0;
#ifdef GXSUTIL_DEBUG
std::cerr << " issuing random get on friends for non existing IDs" << std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " issuing random get on friends for non existing IDs" << std::endl;
#endif
// now request a cache update for them, which triggers downloading from friends, if missing.
@ -294,8 +317,8 @@ bool RsGxsIntegrityCheck::check()
for(;nb_requested_not_in_cache<MAX_GXS_IDS_REQUESTS_NET && !gxs_ids.empty();)
{
uint32_t n = RSRandom::random_u32() % gxs_ids.size() ;
#ifdef GXSUTIL_DEBUG
std::cerr << " requesting ID " << gxs_ids[n] ;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n] ;
#endif
if(!mGixs->haveKey(gxs_ids[n])) // checks if we have it already in the cache (conservative way to ensure that we atually have it)
@ -303,14 +326,14 @@ bool RsGxsIntegrityCheck::check()
mGixs->requestKey(gxs_ids[n],connected_friends);
++nb_requested_not_in_cache ;
#ifdef GXSUTIL_DEBUG
std::cerr << " ... from cache/net" << std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " ... from cache/net" << std::endl;
#endif
}
else
{
#ifdef GXSUTIL_DEBUG
std::cerr << " ... already in cache" << std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " ... already in cache" << std::endl;
#endif
// Note: we could time_stamp even in the case where the id is not cached. Anyway, it's not really a problem here, since IDs have a high chance of
@ -322,8 +345,8 @@ bool RsGxsIntegrityCheck::check()
gxs_ids[n] = gxs_ids[gxs_ids.size()-1] ;
gxs_ids.pop_back() ;
}
#ifdef GXSUTIL_DEBUG
std::cerr << " total actual cache requests: "<< nb_requested_not_in_cache << std::endl;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " total actual cache requests: "<< nb_requested_not_in_cache << std::endl;
#endif
return true;

View File

@ -31,6 +31,7 @@
#include "rsgds.h"
class RsGixs ;
class RsGenExchange ;
/*!
* Handy function for cleaning out meta result containers
@ -73,7 +74,7 @@ public:
* @param chunkSize
* @param sleepPeriod
*/
RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize);
RsGxsMessageCleanUp(RsGeneralDataService* const dataService, RsGenExchange *genex, uint32_t chunkSize);
/*!
* On construction this should be called to progress deletions
@ -90,7 +91,8 @@ public:
private:
RsGeneralDataService* const mDs;
const uint32_t MESSAGE_STORE_PERIOD, CHUNK_SIZE;
RsGenExchange *mGenExchangeClient;
uint32_t CHUNK_SIZE;
std::vector<RsGxsGrpMetaData*> mGrpMeta;
};
@ -113,8 +115,7 @@ public:
* @param chunkSize
* @param sleepPeriod
*/
RsGxsIntegrityCheck(RsGeneralDataService* const dataService, RsGixs *gixs);
RsGxsIntegrityCheck(RsGeneralDataService* const dataService, RsGenExchange *genex, RsGixs *gixs);
bool check();
bool isDone();
@ -126,6 +127,7 @@ public:
private:
RsGeneralDataService* const mDs;
RsGenExchange *mGenExchangeClient;
bool mDone;
RsMutex mIntegrityMutex;
std::list<RsGxsGroupId> mDeletedGrps;

View File

@ -65,9 +65,15 @@ public:
/*!
* Use this to set how far back synchronisation of messages should take place
* @param age the max age a sync item can to be allowed in a synchronisation
* @param age in seconds the max age a sync/store item can to be allowed in a synchronisation
*/
virtual void setSyncAge(uint32_t age) = 0;
virtual void setSyncAge(const RsGxsGroupId& id,uint32_t age_in_secs) =0;
virtual void setKeepAge(const RsGxsGroupId& id,uint32_t age_in_secs) =0;
virtual uint32_t getSyncAge(const RsGxsGroupId& id) =0;
virtual uint32_t getKeepAge(const RsGxsGroupId& id,uint32_t default_value) =0;
virtual uint32_t getDefaultSyncAge() =0;
/*!
* Initiates a search through the network

View File

@ -172,9 +172,15 @@ public:
virtual void setGroupReputationCutOff(uint32_t& token, const RsGxsGroupId& grpId, int CutOff) = 0;
/*!
* @return storage time of messages in months
* @return storage/sync time of messages in secs
*/
virtual int getStoragePeriod() = 0;
virtual uint32_t getDefaultStoragePeriod() = 0;
virtual uint32_t getStoragePeriod(const RsGxsGroupId& grpId) = 0;
virtual void setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) = 0;
virtual uint32_t getDefaultSyncPeriod() = 0;
virtual uint32_t getSyncPeriod(const RsGxsGroupId& grpId) = 0;
virtual void setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs) = 0;
};

View File

@ -209,11 +209,31 @@ public:
}
/*!
* @return storage time of messages in months
* @return storage/sync time of messages in secs
*/
int getStoragePeriod()
uint32_t getDefaultStoragePeriod()
{
return mGxs->getStoragePeriod();
return mGxs->getDefaultStoragePeriod();
}
uint32_t getStoragePeriod(const RsGxsGroupId& grpId)
{
return mGxs->getStoragePeriod(grpId);
}
void setStoragePeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs)
{
mGxs->setStoragePeriod(grpId,age_in_secs);
}
uint32_t getDefaultSyncPeriod()
{
return mGxs->getDefaultSyncPeriod();
}
uint32_t getSyncPeriod(const RsGxsGroupId& grpId)
{
return mGxs->getSyncPeriod(grpId);
}
void setSyncPeriod(const RsGxsGroupId& grpId,uint32_t age_in_secs)
{
mGxs->setSyncPeriod(grpId,age_in_secs);
}
private:

View File

@ -26,30 +26,39 @@
#include "rsgxsupdateitems.h"
#include "rsbaseserial.h"
/**********************************************************************************************/
/* CLEAR */
/**********************************************************************************************/
void RsGxsGrpUpdateItem::clear()
{
grpUpdateTS = 0;
peerId.clear();
peerID.clear();
}
std::ostream& RsGxsGrpUpdateItem::print(std::ostream& out, uint16_t indent)
{
printRsItemBase(out, "RsGxsGrpUpdateItem", indent);
uint16_t int_Indent = indent + 2;
out << "peerId: " << peerId << std::endl;
printIndent(out, int_Indent);
out << "grpUpdateTS: " << grpUpdateTS << std::endl;
printIndent(out, int_Indent);
return out ;
}
void RsGxsMsgUpdateItem::clear()
{
msgUpdateInfos.clear();
peerId.clear();
peerID.clear();
}
void RsGxsServerMsgUpdateItem::clear()
{
msgUpdateTS = 0;
grpId.clear();
}
void RsGxsServerGrpUpdateItem::clear()
{
grpUpdateTS = 0;
}
/**********************************************************************************************/
/* PRINT */
/**********************************************************************************************/
std::ostream& RsGxsMsgUpdateItem::print(std::ostream& out, uint16_t indent)
{
RsPeerId peerId;
@ -75,13 +84,15 @@ std::ostream& RsGxsMsgUpdateItem::print(std::ostream& out, uint16_t indent)
return out;
}
void RsGxsServerMsgUpdateItem::clear()
std::ostream& RsGxsGrpUpdateItem::print(std::ostream& out, uint16_t indent)
{
msgUpdateTS = 0;
grpId.clear();
printRsItemBase(out, "RsGxsGrpUpdateItem", indent);
uint16_t int_Indent = indent + 2;
out << "peerId: " << peerID << std::endl;
printIndent(out, int_Indent);
out << "grpUpdateTS: " << grpUpdateTS << std::endl;
printIndent(out, int_Indent);
return out ;
}
std::ostream& RsGxsServerMsgUpdateItem::print(std::ostream& out, uint16_t indent)
@ -96,11 +107,6 @@ std::ostream& RsGxsServerMsgUpdateItem::print(std::ostream& out, uint16_t indent
}
void RsGxsServerGrpUpdateItem::clear()
{
grpUpdateTS = 0;
}
std::ostream& RsGxsServerGrpUpdateItem::print(std::ostream& out, uint16_t indent)
{
printRsItemBase(out, "RsGxsServerGrpUpdateItem", indent);
@ -111,147 +117,128 @@ std::ostream& RsGxsServerGrpUpdateItem::print(std::ostream& out, uint16_t indent
return out;
}
/**********************************************************************************************/
/* SERIALISER */
/**********************************************************************************************/
uint32_t RsGxsUpdateSerialiser::size(RsItem* item)
bool RsGxsNetServiceItem::serialise_header(void *data,uint32_t& pktsize,uint32_t& tlvsize, uint32_t& offset) const
{
RsGxsMsgUpdateItem* mui = NULL;
RsGxsGrpUpdateItem* gui = NULL;
RsGxsServerGrpUpdateItem* gsui = NULL;
RsGxsServerMsgUpdateItem* msui = NULL;
tlvsize = serial_size() ;
offset = 0;
if((mui = dynamic_cast<RsGxsMsgUpdateItem*>(item)) != NULL)
{
return sizeGxsMsgUpdate(mui);
}else if(( gui = dynamic_cast<RsGxsGrpUpdateItem*>(item)) != NULL){
return sizeGxsGrpUpdate(gui);
}else if((gsui = dynamic_cast<RsGxsServerGrpUpdateItem*>(item)) != NULL)
{
return sizeGxsServerGrpUpdate(gsui);
}else if((msui = dynamic_cast<RsGxsServerMsgUpdateItem*>(item)) != NULL)
{
return sizeGxsServerMsgUpdate(msui);
}else
{
if (pktsize < tlvsize)
return false; /* not enough space */
pktsize = tlvsize;
if(!setRsItemHeader(data, tlvsize, PacketId(), tlvsize))
{
std::cerr << "RsFileTransferItem::serialise_header(): ERROR. Not enough size!" << std::endl;
return false ;
}
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::size(): Could not find appropriate size function"
<< std::endl;
std::cerr << "RsFileItemSerialiser::serialiseData() Header: " << ok << std::endl;
#endif
return 0;
}
offset += 8;
return true ;
}
bool RsGxsUpdateSerialiser::serialise(RsItem* item, void* data,
uint32_t* size)
{
RsGxsMsgUpdateItem* mui;
RsGxsGrpUpdateItem* gui;
RsGxsServerGrpUpdateItem* gsui;
RsGxsServerMsgUpdateItem* msui;
if((mui = dynamic_cast<RsGxsMsgUpdateItem*>(item)) != NULL)
return serialiseGxsMsgUpdate(mui, data, size);
else if((gui = dynamic_cast<RsGxsGrpUpdateItem*>(item)) != NULL)
return serialiseGxsGrpUpdate(gui, data, size);
else if((msui = dynamic_cast<RsGxsServerMsgUpdateItem*>(item)) != NULL)
return serialiseGxsServerMsgUpdate(msui, data, size);
else if((gsui = dynamic_cast<RsGxsServerGrpUpdateItem*>(item)) != NULL)
return serialiseGxsServerGrpUpdate(gsui, data, size);
else
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialise() item does not caste to known type"
<< std::endl;
#endif
return false;
}
}
RsItem* RsGxsUpdateSerialiser::deserialise(void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialise()" << std::endl;
std::cerr << "RsGxsUpdateSerialiser::deserialise()" << std::endl;
#endif
/* get the type and size */
uint32_t rstype = getRsItemId(data);
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(SERVICE_TYPE != getRsItemService(rstype)))
{
return NULL; /* wrong type */
}
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (SERVICE_TYPE != getRsItemService(rstype)))
return NULL; /* wrong type */
switch(getRsItemSubType(rstype))
{
case RS_PKT_SUBTYPE_GXS_MSG_UPDATE: return deserialGxsMsgUpdate(data, size);
case RS_PKT_SUBTYPE_GXS_GRP_UPDATE: return deserialGxsGrpUpddate(data, size);
case RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE: return deserialGxsServerGrpUpddate(data, size);
case RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE: return deserialGxsServerMsgUpdate(data, size);
case RS_PKT_SUBTYPE_GXS_GRP_CONFIG: return deserialGxsGrpConfig(data, size);
case RS_PKT_SUBTYPE_GXS_MSG_UPDATE:
return deserialGxsMsgUpdate(data, size);
case RS_PKT_SUBTYPE_GXS_GRP_UPDATE:
return deserialGxsGrpUpddate(data, size);
case RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE:
return deserialGxsServerGrpUpddate(data, size);
case RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE:
return deserialGxsServerMsgUpdate(data, size);
default:
default:
{
#ifdef RSSERIAL_DEBUG
# ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialise() : data has no type"
<< std::endl;
#endif
<< std::endl;
# endif
return NULL;
}
}
}
uint32_t RsGxsUpdateSerialiser::sizeGxsGrpUpdate(RsGxsGrpUpdateItem* item)
/**********************************************************************************************/
/* SERIAL_SIZE() */
/**********************************************************************************************/
uint32_t RsGxsGrpUpdateItem::serial_size() const
{
uint32_t s = 8; // header size
s += item->peerId.serial_size();
s += peerID.serial_size();
s += 4; // mUpdateTS
return s;
}
uint32_t RsGxsUpdateSerialiser::sizeGxsServerGrpUpdate(RsGxsServerGrpUpdateItem* /* item */)
uint32_t RsGxsServerGrpUpdateItem::serial_size() const
{
uint32_t s = 8; // header size
s += 4; // time stamp
return s;
}
bool RsGxsUpdateSerialiser::serialiseGxsGrpUpdate(RsGxsGrpUpdateItem* item,
void* data, uint32_t* size)
uint32_t RsGxsMsgUpdateItem::serial_size() const
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsGrpUpdate()" << std::endl;
#endif
uint32_t s = 8; // header size
s += peerID.serial_size() ;//GetTlvStringSize(item->peerId);
uint32_t tlvsize = sizeGxsGrpUpdate(item);
uint32_t offset = 0;
s += msgUpdateInfos.size() * (4 + 4 + RsGxsGroupId::serial_size());
s += 4; // number of map items
if(*size < tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsGrpUpdate() size do not match" << std::endl;
#endif
return false;
}
return s;
}
*size = tlvsize;
uint32_t RsGxsServerMsgUpdateItem::serial_size() const
{
uint32_t s = 8; // header size
s += grpId.serial_size();
s += 4; // grp TS
return s;
}
uint32_t RsGxsGrpConfigItem::serial_size() const
{
uint32_t s = 8; // header size
s += grpId.serial_size();
s += 4; // msg_keep_delay
s += 4; // msg_send_delay
s += 4; // msg_req_delay
return s;
}
/**********************************************************************************************/
/* SERIALISE() */
/**********************************************************************************************/
bool RsGxsGrpUpdateItem::serialise(void* data, uint32_t& size) const
{
uint32_t tlvsize,offset=0;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
if(!serialise_header(data,size,tlvsize,offset))
return false ;
/* skip the header */
offset += 8;
/* RsGxsGrpUpdateItem */
ok &= item->peerId.serialise(data, *size, offset) ;
ok &= setRawUInt32(data, *size, &offset, item->grpUpdateTS);
ok &= peerID.serialise(data, size, offset) ;
ok &= setRawUInt32(data, size, &offset, grpUpdateTS);
if(offset != tlvsize){
#ifdef RSSERIAL_DEBUG
@ -260,45 +247,20 @@ bool RsGxsUpdateSerialiser::serialiseGxsGrpUpdate(RsGxsGrpUpdateItem* item,
ok = false;
}
#ifdef RSSERIAL_DEBUG
if (!ok)
{
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsGrpUpdate() NOK" << std::endl;
}
#endif
return ok;
}
bool RsGxsUpdateSerialiser::serialiseGxsServerGrpUpdate(RsGxsServerGrpUpdateItem* item,
void* data, uint32_t* size)
bool RsGxsServerGrpUpdateItem::serialise(void* data, uint32_t& size) const
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerGrpUpdate()" << std::endl;
#endif
uint32_t tlvsize = sizeGxsServerGrpUpdate(item);
uint32_t offset = 0;
if(*size < tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerGrpUpdate() size do not match" << std::endl;
#endif
return false;
}
*size = tlvsize;
uint32_t tlvsize,offset=0;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
/* skip the header */
offset += 8;
if(!serialise_header(data,size,tlvsize,offset))
return false ;
/* RsGxsServerGrpUpdateItem */
ok &= setRawUInt32(data, *size, &offset, item->grpUpdateTS);
ok &= setRawUInt32(data, size, &offset, grpUpdateTS);
if(offset != tlvsize){
#ifdef RSSERIAL_DEBUG
@ -307,15 +269,153 @@ bool RsGxsUpdateSerialiser::serialiseGxsServerGrpUpdate(RsGxsServerGrpUpdateItem
ok = false;
}
#ifdef RSSERIAL_DEBUG
if (!ok)
return ok;
}
bool RsGxsMsgUpdateItem::serialise(void* data, uint32_t& size) const
{
uint32_t tlvsize,offset=0;
bool ok = true;
if(!serialise_header(data,size,tlvsize,offset))
return false ;
ok &= peerID.serialise(data, size, offset) ;
std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator cit(msgUpdateInfos.begin());
uint32_t numItems = msgUpdateInfos.size();
ok &= setRawUInt32(data, size, &offset, numItems);
for(; cit != msgUpdateInfos.end(); ++cit)
{
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerGrpUpdate() NOK" << std::endl;
ok &= cit->first.serialise(data, size, offset);
ok &= setRawUInt32(data, size, &offset, cit->second.time_stamp);
ok &= setRawUInt32(data, size, &offset, cit->second.message_count);
}
if(offset != tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsMsgUpdate() FAIL Size Error! " << std::endl;
#endif
ok = false;
}
return ok;
}
bool RsGxsServerMsgUpdateItem::serialise( void* data, uint32_t& size) const
{
uint32_t tlvsize,offset=0;
bool ok = true;
if(!serialise_header(data,size,tlvsize,offset))
return false ;
ok &= grpId.serialise(data, size, offset) ;
ok &= setRawUInt32(data, size, &offset, msgUpdateTS);
if(offset != tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerMsgUpdate() FAIL Size Error! " << std::endl;
#endif
ok = false;
}
return ok;
}
bool RsGxsGrpConfigItem::serialise( void* data, uint32_t& size) const
{
uint32_t tlvsize,offset=0;
bool ok = true;
if(!serialise_header(data,size,tlvsize,offset))
return false ;
ok &= grpId.serialise(data, size, offset) ;
ok &= setRawUInt32(data, size, &offset, msg_keep_delay);
ok &= setRawUInt32(data, size, &offset, msg_send_delay);
ok &= setRawUInt32(data, size, &offset, msg_req_delay);
if(offset != tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerMsgUpdate() FAIL Size Error! " << std::endl;
#endif
ok = false;
}
return ok;
}
/**********************************************************************************************/
/* DESERIALISE() */
/**********************************************************************************************/
RsGxsGrpConfigItem* RsGxsUpdateSerialiser::deserialGxsGrpConfig(void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsServerGrpUpdate()" << std::endl;
#endif
/* get the type and size */
uint32_t rstype = getRsItemId(data);
uint32_t rssize = getRsItemSize(data);
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (SERVICE_TYPE != getRsItemService(rstype)) || (RS_PKT_SUBTYPE_GXS_GRP_CONFIG != getRsItemSubType(rstype)))
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsGrpUpdate() FAIL wrong type" << std::endl;
#endif
return NULL; /* wrong type */
}
if (*size < rssize) /* check size */
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsGrpUpdate() FAIL wrong size" << std::endl;
#endif
return NULL; /* not enough data */
}
/* set the packet length */
*size = rssize;
bool ok = true;
RsGxsGrpConfigItem* item = new RsGxsGrpConfigItem(getRsItemService(rstype));
/* skip the header */
offset += 8;
ok &= item->grpId.deserialise(data, *size, offset) ;
ok &= getRawUInt32(data, *size, &offset, &(item->msg_keep_delay));
ok &= getRawUInt32(data, *size, &offset, &(item->msg_send_delay));
ok &= getRawUInt32(data, *size, &offset, &(item->msg_req_delay));
if (offset != rssize)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxxGrpUpdate() FAIL size mismatch" << std::endl;
#endif
/* error */
delete item;
return NULL;
}
if (!ok)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsGrpUpdate() NOK" << std::endl;
#endif
delete item;
return NULL;
}
return item;
}
RsGxsGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsGrpUpddate(void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
@ -356,7 +456,7 @@ RsGxsGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsGrpUpddate(void* data, uin
/* skip the header */
offset += 8;
ok &= item->peerId.deserialise(data, *size, offset) ;
ok &= item->peerID.deserialise(data, *size, offset) ;
ok &= getRawUInt32(data, *size, &offset, &(item->grpUpdateTS));
if (offset != rssize)
@ -381,8 +481,7 @@ RsGxsGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsGrpUpddate(void* data, uin
return item;
}
RsGxsServerGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerGrpUpddate(void* data,
uint32_t* size)
RsGxsServerGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerGrpUpddate(void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsServerGrpUpdate()" << std::endl;
@ -394,9 +493,7 @@ RsGxsServerGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerGrpUpddate(voi
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(SERVICE_TYPE != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE != getRsItemSubType(rstype)))
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (SERVICE_TYPE != getRsItemService(rstype)) || (RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE != getRsItemSubType(rstype)))
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsServerGrpUpdate() FAIL wrong type" << std::endl;
@ -446,137 +543,9 @@ RsGxsServerGrpUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerGrpUpddate(voi
return item;
}
uint32_t RsGxsUpdateSerialiser::sizeGxsMsgUpdate(RsGxsMsgUpdateItem* item)
{
uint32_t s = 8; // header size
s += item->peerId.serial_size() ;//GetTlvStringSize(item->peerId);
s += item->msgUpdateInfos.size() * (4 + 4 + RsGxsGroupId::serial_size());
s += 4; // number of map items
return s;
}
uint32_t RsGxsUpdateSerialiser::sizeGxsServerMsgUpdate(RsGxsServerMsgUpdateItem* item)
{
uint32_t s = 8; // header size
s += item->grpId.serial_size();
s += 4; // grp TS
return s;
}
bool RsGxsUpdateSerialiser::serialiseGxsMsgUpdate(RsGxsMsgUpdateItem* item,
void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsMsgUpdate()" << std::endl;
#endif
uint32_t tlvsize = sizeGxsMsgUpdate(item);
uint32_t offset = 0;
if(*size < tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsMsgUpdate() size do not match" << std::endl;
#endif
return false;
}
*size = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
/* skip the header */
offset += 8;
/* RsGxsMsgUpdateItem */
ok &= item->peerId.serialise(data, *size, offset) ;
std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator cit(item->msgUpdateInfos.begin());
uint32_t numItems = item->msgUpdateInfos.size();
ok &= setRawUInt32(data, *size, &offset, numItems);
for(; cit != item->msgUpdateInfos.end(); ++cit)
{
ok &= cit->first.serialise(data, *size, offset);
ok &= setRawUInt32(data, *size, &offset, cit->second.time_stamp);
ok &= setRawUInt32(data, *size, &offset, cit->second.message_count);
}
if(offset != tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsMsgUpdate() FAIL Size Error! " << std::endl;
#endif
ok = false;
}
#ifdef RSSERIAL_DEBUG
if (!ok)
{
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsMsgUpdate() NOK" << std::endl;
}
#endif
return ok;
}
bool RsGxsUpdateSerialiser::serialiseGxsServerMsgUpdate(RsGxsServerMsgUpdateItem* item,
void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerMsgUpdate()" << std::endl;
#endif
uint32_t tlvsize = sizeGxsServerMsgUpdate(item);
uint32_t offset = 0;
if(*size < tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerMsgUpdate() size do not match" << std::endl;
#endif
return false;
}
*size = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
/* skip the header */
offset += 8;
/* RsNxsSyncm */
ok &= item->grpId.serialise(data, *size, offset) ;
ok &= setRawUInt32(data, *size, &offset, item->msgUpdateTS);
if(offset != tlvsize){
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerMsgUpdate() FAIL Size Error! " << std::endl;
#endif
ok = false;
}
#ifdef RSSERIAL_DEBUG
if (!ok)
{
std::cerr << "RsGxsUpdateSerialiser::serialiseGxsServerMsgUpdate() NOK" << std::endl;
}
#endif
return ok;
}
RsGxsMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsMsgUpdate(void* data,
uint32_t* size)
RsGxsMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsMsgUpdate(void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsMsgUpdate()" << std::endl;
@ -588,9 +557,7 @@ RsGxsMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsMsgUpdate(void* data,
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(SERVICE_TYPE != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_GXS_MSG_UPDATE != getRsItemSubType(rstype)))
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (SERVICE_TYPE != getRsItemService(rstype)) || (RS_PKT_SUBTYPE_GXS_MSG_UPDATE != getRsItemSubType(rstype)))
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsMsgUpdate() FAIL wrong type" << std::endl;
@ -616,7 +583,7 @@ RsGxsMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsMsgUpdate(void* data,
/* skip the header */
offset += 8;
ok &= item->peerId.deserialise(data, *size, offset) ;
ok &= item->peerID.deserialise(data, *size, offset) ;
uint32_t numUpdateItems;
ok &= getRawUInt32(data, *size, &offset, &(numUpdateItems));
std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>& msgUpdateInfos = item->msgUpdateInfos;
@ -662,8 +629,7 @@ RsGxsMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsMsgUpdate(void* data,
return item;
}
RsGxsServerMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerMsgUpdate(void* data,
uint32_t* size)
RsGxsServerMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerMsgUpdate(void* data, uint32_t* size)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsServerMsgUpdate()" << std::endl;
@ -675,9 +641,7 @@ RsGxsServerMsgUpdateItem* RsGxsUpdateSerialiser::deserialGxsServerMsgUpdate(void
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(SERVICE_TYPE != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE != getRsItemSubType(rstype)))
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (SERVICE_TYPE != getRsItemService(rstype)) || (RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE != getRsItemSubType(rstype)))
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsGxsUpdateSerialiser::deserialGxsServerMsgUpdate() FAIL wrong type" << std::endl;

View File

@ -37,77 +37,174 @@
#include "serialiser/rstlvkeys.h"
#endif
#include "gxs/rsgxs.h"
#include "gxs/rsgxsdata.h"
#include "serialiser/rstlvidset.h"
const uint8_t RS_PKT_SUBTYPE_GXS_GRP_UPDATE = 0x0001;
const uint8_t RS_PKT_SUBTYPE_GXS_MSG_UPDATE_deprecated = 0x0002;
const uint8_t RS_PKT_SUBTYPE_GXS_MSG_UPDATE = 0x0003;
const uint8_t RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE = 0x0004;
const uint8_t RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE = 0x0008;
const uint8_t RS_PKT_SUBTYPE_GXS_GRP_UPDATE = 0x01;
const uint8_t RS_PKT_SUBTYPE_GXS_MSG_UPDATE_deprecated = 0x02;
const uint8_t RS_PKT_SUBTYPE_GXS_MSG_UPDATE = 0x03;
const uint8_t RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE = 0x04;
const uint8_t RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE = 0x08;
const uint8_t RS_PKT_SUBTYPE_GXS_GRP_CONFIG = 0x09;
class RsGxsGrpUpdateItem : public RsItem {
class RsGxsNetServiceItem: public RsItem
{
public:
RsGxsGrpUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType,
RS_PKT_SUBTYPE_GXS_GRP_UPDATE)
{clear();}
RsGxsNetServiceItem(uint16_t serv_type,uint8_t subtype) : RsItem(RS_PKT_VERSION_SERVICE, serv_type, subtype) {}
virtual ~RsGxsNetServiceItem() {}
virtual bool serialise(void *data,uint32_t& size) const = 0 ;
virtual uint32_t serial_size() const = 0 ;
virtual void clear() = 0 ;
virtual std::ostream& print(std::ostream &out, uint16_t indent = 0) = 0;
protected:
bool serialise_header(void *data, uint32_t& pktsize, uint32_t& tlvsize, uint32_t& offset) const;
};
class RsGxsGrpConfig
{
public:
RsGxsGrpConfig()
{
msg_keep_delay = RS_GXS_DEFAULT_MSG_STORE_PERIOD ;
msg_send_delay = RS_GXS_DEFAULT_MSG_SEND_PERIOD ;
msg_req_delay = RS_GXS_DEFAULT_MSG_REQ_PERIOD ;
max_visible_count = 0 ;
update_TS = 0 ;
}
uint32_t msg_keep_delay ; // delay after which we discard the posts
uint32_t msg_send_delay ; // delay after which we dont send the posts anymore
uint32_t msg_req_delay ; // delay after which we dont get the posts from friends
RsTlvPeerIdSet suppliers; // list of friends who feed this group
uint32_t max_visible_count ; // max visible count reported by contributing friends
time_t update_TS ; // last time the max visible count was updated.
};
class RsGxsGrpConfigItem : public RsGxsNetServiceItem, public RsGxsGrpConfig
{
public:
RsGxsGrpConfigItem(uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_GRP_CONFIG) {}
RsGxsGrpConfigItem(const RsGxsGrpConfig& m,uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_GRP_CONFIG),RsGxsGrpConfig(m) {}
virtual ~RsGxsGrpConfigItem() {}
virtual void clear() {}
virtual std::ostream &print(std::ostream &out, uint16_t indent) { return out;}
virtual bool serialise(void *data,uint32_t& size) const ;
virtual uint32_t serial_size() const ;
RsGxsGroupId grpId ;
};
class RsGxsGrpUpdate
{
public:
RsGxsGrpUpdate() { grpUpdateTS=0;}
uint32_t grpUpdateTS;
};
class RsGxsGrpUpdateItem : public RsGxsNetServiceItem, public RsGxsGrpUpdate
{
public:
RsGxsGrpUpdateItem(uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_GRP_UPDATE) {clear();}
RsGxsGrpUpdateItem(const RsGxsGrpUpdate& u,uint16_t serv_type) : RsGxsNetServiceItem(serv_type, RS_PKT_SUBTYPE_GXS_GRP_UPDATE), RsGxsGrpUpdate(u) {clear();}
virtual ~RsGxsGrpUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
RsPeerId peerId;
uint32_t grpUpdateTS;
virtual bool serialise(void *data,uint32_t& size) const ;
virtual uint32_t serial_size() const ;
RsPeerId peerID;
};
class RsGxsServerGrpUpdateItem : public RsItem {
class RsGxsServerGrpUpdate
{
public:
RsGxsServerGrpUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType,
RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE)
{ clear();}
RsGxsServerGrpUpdate() { grpUpdateTS = 0 ; }
uint32_t grpUpdateTS;
};
class RsGxsServerGrpUpdateItem : public RsGxsNetServiceItem, public RsGxsServerGrpUpdate
{
public:
RsGxsServerGrpUpdateItem(uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE) { clear();}
RsGxsServerGrpUpdateItem(const RsGxsServerGrpUpdate& u,uint16_t serv_type) : RsGxsNetServiceItem(serv_type, RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE), RsGxsServerGrpUpdate(u) {clear();}
virtual ~RsGxsServerGrpUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
uint32_t grpUpdateTS;
virtual bool serialise(void *data,uint32_t& size) const ;
virtual uint32_t serial_size() const ;
};
class RsGxsMsgUpdateItem : public RsItem
class RsGxsMsgUpdate
{
public:
RsGxsMsgUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE, servType, RS_PKT_SUBTYPE_GXS_MSG_UPDATE)
{ clear();}
struct MsgUpdateInfo
{
MsgUpdateInfo(): time_stamp(0), message_count(0) {}
uint32_t time_stamp ;
uint32_t message_count ;
};
std::map<RsGxsGroupId, MsgUpdateInfo> msgUpdateInfos;
};
class RsGxsMsgUpdateItem : public RsGxsNetServiceItem, public RsGxsMsgUpdate
{
public:
RsGxsMsgUpdateItem(uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_MSG_UPDATE) { clear();}
RsGxsMsgUpdateItem(const RsGxsMsgUpdate& m,uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_MSG_UPDATE), RsGxsMsgUpdate(m) { clear();}
virtual ~RsGxsMsgUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
struct MsgUpdateInfo
{
MsgUpdateInfo(): time_stamp(0), message_count(0) {}
uint32_t time_stamp ;
uint32_t message_count ;
};
virtual bool serialise(void *data,uint32_t& size) const ;
virtual uint32_t serial_size() const ;
RsPeerId peerId;
std::map<RsGxsGroupId, MsgUpdateInfo> msgUpdateInfos;
RsPeerId peerID;
};
class RsGxsServerMsgUpdateItem : public RsItem
class RsGxsServerMsgUpdate
{
public:
RsGxsServerMsgUpdateItem(uint16_t servType) : RsItem(RS_PKT_VERSION_SERVICE,
servType, RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE)
{ clear();}
RsGxsServerMsgUpdate() { msgUpdateTS = 0 ;}
uint32_t msgUpdateTS; // local time stamp this group last received a new msg
};
class RsGxsServerMsgUpdateItem : public RsGxsNetServiceItem, public RsGxsServerMsgUpdate
{
public:
RsGxsServerMsgUpdateItem(uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE) { clear();}
RsGxsServerMsgUpdateItem(const RsGxsServerMsgUpdate& m,uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE),RsGxsServerMsgUpdate(m) { clear();}
virtual ~RsGxsServerMsgUpdateItem() {}
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent);
RsGxsGroupId grpId;
uint32_t msgUpdateTS; // local time stamp this group last received a new msg
virtual bool serialise(void *data,uint32_t& size) const ;
virtual uint32_t serial_size() const ;
RsGxsGroupId grpId;
};
@ -115,47 +212,50 @@ class RsGxsUpdateSerialiser : public RsSerialType
{
public:
RsGxsUpdateSerialiser(uint16_t servtype) :
RsSerialType(RS_PKT_VERSION_SERVICE, servtype), SERVICE_TYPE(servtype) { return; }
RsGxsUpdateSerialiser(uint16_t servtype) : RsSerialType(RS_PKT_VERSION_SERVICE, servtype), SERVICE_TYPE(servtype) {}
virtual ~RsGxsUpdateSerialiser() { return; }
virtual ~RsGxsUpdateSerialiser() {}
virtual uint32_t size(RsItem *item);
virtual bool serialise(RsItem *item, void *data, uint32_t *size);
virtual RsItem* deserialise(void *data, uint32_t *size);
virtual uint32_t size(RsItem *item)
{
RsGxsNetServiceItem *gitem = dynamic_cast<RsGxsNetServiceItem *>(item);
if (!gitem)
{
std::cerr << "(EE) trying to serialise/size an item that is not a RsGxsNetServiceItem!" << std::endl;
return 0;
}
return gitem->serial_size() ;
}
virtual bool serialise(RsItem *item, void *data, uint32_t *size)
{
RsGxsNetServiceItem *gitem = dynamic_cast<RsGxsNetServiceItem *>(item);
if (!gitem)
{
std::cerr << "(EE) trying to serialise an item that is not a RsGxsNetServiceItem!" << std::endl;
return false;
}
return gitem->serialise(data,*size) ;
}
virtual RsItem* deserialise(void *data, uint32_t *size);
private:
RsGxsGrpConfigItem *deserialGxsGrpConfig(void *data, uint32_t *size);
RsGxsServerMsgUpdateItem *deserialGxsServerMsgUpdate(void *data, uint32_t *size);
RsGxsMsgUpdateItem *deserialGxsMsgUpdate(void *data, uint32_t *size);
RsGxsServerGrpUpdateItem *deserialGxsServerGrpUpddate(void *data, uint32_t *size);
RsGxsGrpUpdateItem *deserialGxsGrpUpddate(void *data, uint32_t *size);
bool checkItemHeader(void *data, uint32_t *size, uint16_t service_type,uint8_t subservice_type);
/* for RS_PKT_SUBTYPE_GRP_UPDATE_ITEM */
virtual uint32_t sizeGxsGrpUpdate(RsGxsGrpUpdateItem* item);
virtual bool serialiseGxsGrpUpdate(RsGxsGrpUpdateItem *item, void *data, uint32_t *size);
virtual RsGxsGrpUpdateItem* deserialGxsGrpUpddate(void *data, uint32_t *size);
/* for RS_PKT_SUBTYPE_GRP_SERVER_UPDATE_ITEM */
virtual uint32_t sizeGxsServerGrpUpdate(RsGxsServerGrpUpdateItem* item);
virtual bool serialiseGxsServerGrpUpdate(RsGxsServerGrpUpdateItem *item, void *data, uint32_t *size);
virtual RsGxsServerGrpUpdateItem* deserialGxsServerGrpUpddate(void *data, uint32_t *size);
/* for RS_PKT_SUBTYPE_GXS_MSG_UPDATE_ITEM */
virtual uint32_t sizeGxsMsgUpdate(RsGxsMsgUpdateItem* item);
virtual bool serialiseGxsMsgUpdate(RsGxsMsgUpdateItem *item, void *data, uint32_t *size);
virtual RsGxsMsgUpdateItem* deserialGxsMsgUpdate(void *data, uint32_t *size);
/* for RS_PKT_SUBTYPE_GXS_SERVER_UPDATE_ITEM */
virtual uint32_t sizeGxsServerMsgUpdate(RsGxsServerMsgUpdateItem* item);
virtual bool serialiseGxsServerMsgUpdate(RsGxsServerMsgUpdateItem *item, void *data, uint32_t *size);
virtual RsGxsServerMsgUpdateItem* deserialGxsServerMsgUpdate(void *data, uint32_t *size);
private:
const uint16_t SERVICE_TYPE;
const uint16_t SERVICE_TYPE;
};
#endif /* RSGXSUPDATEITEMS_H_ */

View File

@ -371,7 +371,7 @@ bool RsNxsSyncMsgReqItem::serialise(void *data, uint32_t& size) const
ok &= setRawUInt32(data, size, &offset, transactionNumber);
ok &= setRawUInt8(data, size, &offset, flag);
ok &= setRawUInt32(data, size, &offset, createdSince);
ok &= setRawUInt32(data, size, &offset, createdSinceTS);
ok &= SetTlvString(data, size, &offset, TLV_TYPE_STR_HASH_SHA1, syncHash);
ok &= grpId.serialise(data, size, offset);
ok &= setRawUInt32(data, size, &offset, updateTS);
@ -779,7 +779,7 @@ RsNxsSyncMsgReqItem* RsNxsSerialiser::deserialNxsSyncMsgReqItem(void *data, uint
ok &= getRawUInt32(data, *size, &offset, &(item->transactionNumber));
ok &= getRawUInt8(data, *size, &offset, &(item->flag));
ok &= getRawUInt32(data, *size, &offset, &(item->createdSince));
ok &= getRawUInt32(data, *size, &offset, &(item->createdSinceTS));
ok &= GetTlvString(data, *size, &offset, TLV_TYPE_STR_HASH_SHA1, item->syncHash);
ok &= item->grpId.deserialise(data, *size, offset);
ok &= getRawUInt32(data, *size, &offset, &(item->updateTS));
@ -1099,7 +1099,7 @@ void RsNxsSyncMsgReqItem::clear()
{
grpId.clear();
flag = 0;
createdSince = 0;
createdSinceTS = 0;
syncHash.clear();
updateTS = 0;
}
@ -1181,7 +1181,7 @@ std::ostream& RsNxsSyncMsgReqItem::print(std::ostream &out, uint16_t indent)
printIndent(out , int_Indent);
out << "GrpId: " << grpId << std::endl;
printIndent(out , int_Indent);
out << "createdSince: " << createdSince << std::endl;
out << "createdSince: " << createdSinceTS << std::endl;
printIndent(out , int_Indent);
out << "syncHash: " << syncHash << std::endl;
printIndent(out , int_Indent);

View File

@ -363,7 +363,7 @@ public:
RsGxsGroupId grpId;
uint8_t flag;
uint32_t createdSince;
uint32_t createdSinceTS;
uint32_t updateTS; // time of last update
std::string syncHash;
};

View File

@ -64,8 +64,8 @@ QString PostedDialog::getHelpString() const
<p>Links can be commented by subscribed users. A promotion system also gives the opportunity to \
enlight important links.</p> \
<p>There is no restriction on which links are shared. Be careful when clicking on them.</p>\
<p>Posted links get deleted after %1 months.</p>\
").arg(QString::number(rsPosted->getStoragePeriod()));
<p>Posted links are kept for %1 days, and sync-ed over the last %2 days, unless you change this.</p>\
").arg(QString::number(rsPosted->getDefaultStoragePeriod()/86400)).arg(QString::number(rsPosted->getDefaultSyncPeriod()/86400));
return hlp_str ;
}

View File

@ -347,7 +347,6 @@ void GxsGroupDialog::setupVisibility()
ui.commentGroupBox->setVisible(mEnabledFlags & GXS_GROUP_FLAGS_COMMENTS);
ui.commentsLabel->setVisible(mEnabledFlags & GXS_GROUP_FLAGS_COMMENTS);
ui.commentsValueLabel->setVisible(mEnabledFlags & GXS_GROUP_FLAGS_COMMENTS);
//ui.commentslabel->setVisible(mEnabledFlags & GXS_GROUP_FLAGS_COMMENTS);
ui.extraFrame->setVisible(mEnabledFlags & GXS_GROUP_FLAGS_EXTRA);
}

View File

@ -285,15 +285,35 @@ void GxsGroupFrameDialog::groupTreeCustomPopupMenu(QPoint point)
action = contextMnu.addAction(QIcon(IMAGE_EDIT), tr("Edit Details"), this, SLOT(editGroupDetails()));
action->setEnabled (!mGroupId.isNull() && isAdmin);
uint32_t current_store_time = mInterface->getStoragePeriod(mGroupId)/86400 ;
uint32_t current_sync_time = mInterface->getSyncPeriod(mGroupId)/86400 ;
std::cerr << "Got sync=" << current_sync_time << ". store=" << current_store_time << std::endl;
QAction *actnn = NULL;
QMenu *ctxMenu2 = contextMnu.addMenu(tr("Synchronise posts of last...")) ;
actnn = ctxMenu2->addAction(tr(" 5 days" ),this,SLOT(setSyncPostsDelay())) ; actnn->setData(QVariant( 5)) ; if(current_sync_time == 5) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 2 weeks" ),this,SLOT(setSyncPostsDelay())) ; actnn->setData(QVariant( 15)) ; if(current_sync_time == 15) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 1 month" ),this,SLOT(setSyncPostsDelay())) ; actnn->setData(QVariant( 30)) ; if(current_sync_time == 30) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 3 months" ),this,SLOT(setSyncPostsDelay())) ; actnn->setData(QVariant( 90)) ; if(current_sync_time == 90) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 6 months" ),this,SLOT(setSyncPostsDelay())) ; actnn->setData(QVariant(180)) ; if(current_sync_time ==180) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 1 year " ),this,SLOT(setSyncPostsDelay())) ; actnn->setData(QVariant(365)) ; if(current_sync_time ==365) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" Indefinitly"),this,SLOT(setSyncPostsDelay())) ; actnn->setData(QVariant( 0)) ; if(current_sync_time == 0) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
ctxMenu2 = contextMnu.addMenu(tr("Store posts for at most...")) ;
actnn = ctxMenu2->addAction(tr(" 5 days" ),this,SLOT(setStorePostsDelay())) ; actnn->setData(QVariant( 5)) ; if(current_store_time == 5) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 2 weeks" ),this,SLOT(setStorePostsDelay())) ; actnn->setData(QVariant( 15)) ; if(current_store_time == 15) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 1 month" ),this,SLOT(setStorePostsDelay())) ; actnn->setData(QVariant( 30)) ; if(current_store_time == 30) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 3 months" ),this,SLOT(setStorePostsDelay())) ; actnn->setData(QVariant( 90)) ; if(current_store_time == 90) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 6 months" ),this,SLOT(setStorePostsDelay())) ; actnn->setData(QVariant(180)) ; if(current_store_time ==180) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" 1 year " ),this,SLOT(setStorePostsDelay())) ; actnn->setData(QVariant(365)) ; if(current_store_time ==365) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
actnn = ctxMenu2->addAction(tr(" Indefinitly"),this,SLOT(setStorePostsDelay())) ; actnn->setData(QVariant( 0)) ; if(current_store_time == 0) { actnn->setEnabled(false);actnn->setIcon(QIcon(":/images/start.png"));}
if (shareKeyType()) {
action = contextMnu.addAction(QIcon(IMAGE_SHARE), tr("Share publish permissions"), this, SLOT(sharePublishKey()));
action->setEnabled(!mGroupId.isNull() && isPublisher);
}
//if (!mGroupId.isNull() && isPublisher && !isAdmin) {
// contextMnu.addAction(QIcon(":/images/settings16.png"), tr("Restore Publish Rights" ), this, SLOT(restoreGroupKeys()));
//}
if (getLinkType() != RetroShareLink::TYPE_UNKNOWN) {
action = contextMnu.addAction(QIcon(IMAGE_COPYLINK), tr("Copy RetroShare Link"), this, SLOT(copyGroupLink()));
action->setEnabled(!mGroupId.isNull());
@ -318,6 +338,70 @@ void GxsGroupFrameDialog::groupTreeCustomPopupMenu(QPoint point)
contextMnu.exec(QCursor::pos());
}
void GxsGroupFrameDialog::setStorePostsDelay()
{
QAction *action = dynamic_cast<QAction*>(sender()) ;
if(!action || mGroupId.isNull())
{
std::cerr << "(EE) Cannot find action/group that called me! Group is " << mGroupId << ", action is " << (void*)action << " " << __PRETTY_FUNCTION__ << std::endl;
return;
}
uint32_t duration = action->data().toUInt() ;
std::cerr << "Data is " << duration << std::endl;
mInterface->setStoragePeriod(mGroupId,duration * 86400) ;
// If the sync is larger, we reduce it. No need to sync more than we store. The machinery below also takes care of this.
//
uint32_t sync_period = mInterface->getSyncPeriod(mGroupId);
if(duration > 0) // the >0 test is to discard the indefinitly test. Basically, if we store for less than indefinitly, the sync is reduced accordingly.
{
if(sync_period == 0 || sync_period > duration*86400)
{
mInterface->setSyncPeriod(mGroupId,duration * 86400) ;
std::cerr << "(II) auto adjusting sync period to " << duration<< " days as well." << std::endl;
}
}
}
void GxsGroupFrameDialog::setSyncPostsDelay()
{
QAction *action = dynamic_cast<QAction*>(sender()) ;
if(!action || mGroupId.isNull())
{
std::cerr << "(EE) Cannot find action/group that called me! Group is " << mGroupId << ", action is " << (void*)action << " " << __PRETTY_FUNCTION__ << std::endl;
return;
}
uint32_t duration = action->data().toUInt() ;
std::cerr << "Data is " << duration << std::endl;
mInterface->setSyncPeriod(mGroupId,duration * 86400) ;
// If the store is smaller, we increase it accordingly. No need to sync more than we store. The machinery below also takes care of this.
//
uint32_t store_period = mInterface->getStoragePeriod(mGroupId);
if(duration == 0)
mInterface->setStoragePeriod(mGroupId,duration * 86400) ; // indefinite sync => indefinite storage
else
{
if(store_period != 0 && store_period < duration*86400)
{
mInterface->setStoragePeriod(mGroupId,duration * 86400) ; // indefinite sync => indefinite storage
std::cerr << "(II) auto adjusting storage period to " << duration<< " days as well." << std::endl;
}
}
}
void GxsGroupFrameDialog::restoreGroupKeys(void)
{
QMessageBox::warning(this, "RetroShare", "ToDo");

View File

@ -99,6 +99,8 @@ private slots:
/** Create the context popup menu and it's submenus */
void groupTreeCustomPopupMenu(QPoint point);
void settingsChanged();
void setSyncPostsDelay();
void setStorePostsDelay();
void restoreGroupKeys();
void newGroup();

View File

@ -65,8 +65,8 @@ QString GxsChannelDialog::getHelpString() const
the posting rights or the reading rights with friend Retroshare nodes.</p>\
<p>Channels can be made anonymous, or attached to a Retroshare identity so that readers can contact you if needed.\
Enable \"Allow Comments\" if you want to let users comment on your posts.</p>\
<p>Channel posts get deleted after %1 months.</p>\
").arg(QString::number(rsGxsChannels->getStoragePeriod()));
<p>Channel posts are kept for %1 days, and sync-ed over the last %2 days, unless you change this.</p>\
").arg(QString::number(rsGxsChannels->getDefaultStoragePeriod()/86400)).arg(QString::number(rsGxsChannels->getDefaultSyncPeriod()/86400));
return hlp_str ;
}

View File

@ -746,6 +746,22 @@ void GxsForumThreadWidget::insertGroupData()
calculateIconsAndFonts();
}
static QString getDurationString(uint32_t days)
{
switch(days)
{
case 0: return QObject::tr("Indefinitely") ;
case 5: return QObject::tr("5 days") ;
case 15: return QObject::tr("2 weeks") ;
case 30: return QObject::tr("1 month") ;
case 60: return QObject::tr("2 month") ;
case 180: return QObject::tr("6 month") ;
case 365: return QObject::tr("1 year") ;
default:
return QString::number(days)+" " + QObject::tr("days") ;
}
}
/*static*/ void GxsForumThreadWidget::loadAuthorIdCallback(GxsIdDetailsType type, const RsIdentityDetails &details, QObject *object, const QVariant &)
{
GxsForumThreadWidget *tw = dynamic_cast<GxsForumThreadWidget*>(object);
@ -784,7 +800,9 @@ void GxsForumThreadWidget::insertGroupData()
tw->mForumDescription = QString("<b>%1: \t</b>%2<br/>").arg(tr("Forum name"), QString::fromUtf8( group.mMeta.mGroupName.c_str()));
tw->mForumDescription += QString("<b>%1: \t</b>%2<br/>").arg(tr("Subscribers")).arg(group.mMeta.mPop);
tw->mForumDescription += QString("<b>%1: \t</b>%2<br/>").arg(tr("Posts (at neighbor nodes)")).arg(group.mMeta.mVisibleMsgCount);
tw->mForumDescription += QString("<b>%1: \t</b>%2<br/>").arg(tr("Synchronization")).arg(getDurationString( rsGxsForums->getSyncPeriod(group.mMeta.mGroupId)/86400 )) ;
tw->mForumDescription += QString("<b>%1: \t</b>%2<br/>").arg(tr("Storage")).arg(getDurationString( rsGxsForums->getStoragePeriod(group.mMeta.mGroupId)/86400));
QString distrib_string = tr("[unknown]");
switch(group.mMeta.mCircleType)
{

View File

@ -54,16 +54,8 @@ QString GxsForumsDialog::getHelpString() const
<p>Retroshare Forums look like internet forums, but they work in a decentralized way</p> \
<p>You see forums your friends are subscribed to, and you forward subscribed forums to \
your friends. This automatically promotes interesting forums in the network.</p> \
<p>Forum messages get deleted after %1 months.</p>\
").arg(QString::number(rsGxsForums->getStoragePeriod()));
// not true anymore in v0.6
/*
<p>Forums are either Authenticated (<img src=\":/images/konv_message2.png\" width=\"12\"/>) \
or anonymous (<img src=\":/images/konversation.png\" width=\"12\"/>). The former \
class is more resistant to spamming because posts are \
cryptographically signed using a Retroshare pseudo-identity.</p>") ;
*/
<p>Forum messages are kept for %1 days and sync-ed over the last %2 days, unless you configure it otherwise.</p>\
").arg(QString::number(rsGxsForums->getDefaultStoragePeriod()/86400)).arg(QString::number(rsGxsForums->getDefaultSyncPeriod()/86400));
return hlp_str ;
}

View File

@ -191,7 +191,7 @@ RsSerialType* init_item(RsNxsSyncMsgReqItem& rsgm)
rsgm.clear();
rsgm.flag = RsNxsSyncMsgItem::FLAG_USE_SYNC_HASH;
rsgm.createdSince = rand()%24232;
rsgm.createdSinceTS = rand()%24232;
rsgm.transactionNumber = rand()%23;
init_random(rsgm.grpId) ;
randString(SHORT_STR, rsgm.syncHash);
@ -251,7 +251,7 @@ bool operator==(const RsNxsSyncMsgReqItem& l, const RsNxsSyncMsgReqItem& r)
{
if(l.flag != r.flag) return false;
if(l.createdSince != r.createdSince) return false;
if(l.createdSinceTS != r.createdSinceTS) return false;
if(l.syncHash != r.syncHash) return false;
if(l.grpId != r.grpId) return false;
if(l.transactionNumber != r.transactionNumber) return false;

View File

@ -15,7 +15,8 @@ public:
RsDummyNetService(){ return;}
virtual ~RsDummyNetService() { }
void setSyncAge(uint32_t age){}
void setSyncAge(const RsGxsGroupId& id,uint32_t age_in_secs){}
void setKeepAge(const RsGxsGroupId& id,uint32_t age_in_secs){}
void requestGroupsOfPeer(const std::string& peerId){}

View File

@ -15,17 +15,17 @@ RsSerialType* init_item(RsGxsGrpUpdateItem& i)
{
i.clear();
i.grpUpdateTS = rand()%2424;
i.peerId = RsPeerId::random();
i.peerID = RsPeerId::random();
return new RsGxsUpdateSerialiser(RS_SERVICE_TYPE_PLUGIN_SIMPLE_FORUM);
}
RsSerialType* init_item(RsGxsMsgUpdateItem& i)
{
i.clear();
i.peerId = RsPeerId::random();
i.peerID = RsPeerId::random();
int numUpdates = rand()%123;
i.peerId = RsPeerId::random();
i.peerID = RsPeerId::random();
for(int j=0; j < numUpdates; j++)
{
struct RsGxsMsgUpdateItem::MsgUpdateInfo info;
@ -56,7 +56,7 @@ RsSerialType* init_item(RsGxsServerMsgUpdateItem& i)
bool operator ==(const RsGxsGrpUpdateItem& l, const RsGxsGrpUpdateItem& r)
{
bool ok = l.grpUpdateTS == r.grpUpdateTS;
ok &= l.peerId == r.peerId;
ok &= l.peerID == r.peerID;
return ok;
}
@ -68,7 +68,7 @@ bool operator ==(const RsGxsMsgUpdateItem::MsgUpdateInfo& l, const RsGxsMsgUpdat
bool operator ==(const RsGxsMsgUpdateItem& l, const RsGxsMsgUpdateItem& r)
{
bool ok = l.peerId == r.peerId;
bool ok = l.peerID == r.peerID;
const std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>& lUp = l.msgUpdateInfos, rUp = r.msgUpdateInfos;