more GXS changes for phase1

clean up now on by default
- unsubscription from group also leads to message clean up
- check for keep flag

added functions to retrieve stats 
- not active yet

fixed forum authen flag 
- do not need author to create group
- also policy variable not initialised correctly

added KEEP and DELETE flag 

improved GXS resilience to peers sending NULL grp/msg binary data for deserialisation (peers could crash other peer by sending empty RsTlvBinaryData)

rstokenservice iface affected so full recompile recommended

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6274 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2013-03-23 14:07:52 +00:00
parent 404e9362dc
commit d6328c8123
15 changed files with 299 additions and 209 deletions

View File

@ -234,7 +234,7 @@ void RsDataService::initialise(){
} }
RsGxsGrpMetaData* RsDataService::getGrpMeta(RetroCursor &c) RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c)
{ {
RsGxsGrpMetaData* grpMeta = new RsGxsGrpMetaData(); RsGxsGrpMetaData* grpMeta = new RsGxsGrpMetaData();
@ -291,7 +291,7 @@ RsGxsGrpMetaData* RsDataService::getGrpMeta(RetroCursor &c)
return NULL; return NULL;
} }
RsNxsGrp* RsDataService::getGroup(RetroCursor &c) RsNxsGrp* RsDataService::locked_getGroup(RetroCursor &c)
{ {
/*! /*!
* grpId, pub admin and pub publish key * grpId, pub admin and pub publish key
@ -345,7 +345,7 @@ RsNxsGrp* RsDataService::getGroup(RetroCursor &c)
return NULL; return NULL;
} }
RsGxsMsgMetaData* RsDataService::getMsgMeta(RetroCursor &c) RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c)
{ {
RsGxsMsgMetaData* msgMeta = new RsGxsMsgMetaData(); RsGxsMsgMetaData* msgMeta = new RsGxsMsgMetaData();
@ -395,7 +395,7 @@ RsGxsMsgMetaData* RsDataService::getMsgMeta(RetroCursor &c)
RsNxsMsg* RsDataService::getMessage(RetroCursor &c) RsNxsMsg* RsDataService::locked_getMessage(RetroCursor &c)
{ {
RsNxsMsg* msg = new RsNxsMsg(mServType); RsNxsMsg* msg = new RsNxsMsg(mServType);
@ -644,7 +644,7 @@ int RsDataService::retrieveNxsGrps(std::map<std::string, RsNxsGrp *> &grp, bool
{ {
std::vector<RsNxsGrp*> grps; std::vector<RsNxsGrp*> grps;
retrieveGroups(c, grps); locked_retrieveGroups(c, grps);
std::vector<RsNxsGrp*>::iterator vit = grps.begin(); std::vector<RsNxsGrp*>::iterator vit = grps.begin();
for(; vit != grps.end(); vit++) for(; vit != grps.end(); vit++)
@ -670,7 +670,7 @@ int RsDataService::retrieveNxsGrps(std::map<std::string, RsNxsGrp *> &grp, bool
if(c) if(c)
{ {
std::vector<RsNxsGrp*> grps; std::vector<RsNxsGrp*> grps;
retrieveGroups(c, grps); locked_retrieveGroups(c, grps);
if(!grps.empty()) if(!grps.empty())
{ {
@ -711,13 +711,13 @@ int RsDataService::retrieveNxsGrps(std::map<std::string, RsNxsGrp *> &grp, bool
return 1; return 1;
} }
void RsDataService::retrieveGroups(RetroCursor* c, std::vector<RsNxsGrp*>& grps){ void RsDataService::locked_retrieveGroups(RetroCursor* c, std::vector<RsNxsGrp*>& grps){
if(c){ if(c){
bool valid = c->moveToFirst(); bool valid = c->moveToFirst();
while(valid){ while(valid){
RsNxsGrp* g = getGroup(*c); RsNxsGrp* g = locked_getGroup(*c);
// only add the latest grp info // only add the latest grp info
if(g) if(g)
@ -752,7 +752,7 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, b
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, msgColumns, KEY_GRP_ID+ "='" + grpId + "'", ""); RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, msgColumns, KEY_GRP_ID+ "='" + grpId + "'", "");
if(c) if(c)
retrieveMessages(c, msgSet); locked_retrieveMessages(c, msgSet);
delete c; delete c;
}else{ }else{
@ -769,7 +769,7 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, b
+ "' AND " + KEY_MSG_ID + "='" + msgId + "'", ""); + "' AND " + KEY_MSG_ID + "='" + msgId + "'", "");
if(c) if(c)
retrieveMessages(c, msgSet); locked_retrieveMessages(c, msgSet);
delete c; delete c;
} }
@ -848,11 +848,11 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, b
return 1; return 1;
} }
void RsDataService::retrieveMessages(RetroCursor *c, std::vector<RsNxsMsg *> &msgs) void RsDataService::locked_retrieveMessages(RetroCursor *c, std::vector<RsNxsMsg *> &msgs)
{ {
bool valid = c->moveToFirst(); bool valid = c->moveToFirst();
while(valid){ while(valid){
RsNxsMsg* m = getMessage(*c); RsNxsMsg* m = locked_getMessage(*c);
if(m){ if(m){
msgs.push_back(m);; msgs.push_back(m);;
@ -882,7 +882,7 @@ int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaRes
if(msgIdV.empty()){ if(msgIdV.empty()){
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, msgMetaColumns, KEY_GRP_ID+ "='" + grpId + "'", ""); RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, msgMetaColumns, KEY_GRP_ID+ "='" + grpId + "'", "");
retrieveMsgMeta(c, metaSet); locked_retrieveMsgMeta(c, metaSet);
}else{ }else{
@ -894,7 +894,7 @@ int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaRes
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, msgMetaColumns, KEY_GRP_ID+ "='" + grpId RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, msgMetaColumns, KEY_GRP_ID+ "='" + grpId
+ "' AND " + KEY_MSG_ID + "='" + msgId + "'", ""); + "' AND " + KEY_MSG_ID + "='" + msgId + "'", "");
retrieveMsgMeta(c, metaSet); locked_retrieveMsgMeta(c, metaSet);
} }
} }
@ -904,14 +904,14 @@ int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaRes
return 1; return 1;
} }
void RsDataService::retrieveMsgMeta(RetroCursor *c, std::vector<RsGxsMsgMetaData *> &msgMeta) void RsDataService::locked_retrieveMsgMeta(RetroCursor *c, std::vector<RsGxsMsgMetaData *> &msgMeta)
{ {
if(c) if(c)
{ {
bool valid = c->moveToFirst(); bool valid = c->moveToFirst();
while(valid){ while(valid){
RsGxsMsgMetaData* m = getMsgMeta(*c); RsGxsMsgMetaData* m = locked_getMsgMeta(*c);
if(m != NULL) if(m != NULL)
msgMeta.push_back(m); msgMeta.push_back(m);
@ -937,7 +937,7 @@ int RsDataService::retrieveGxsGrpMetaData(std::map<RsGxsGroupId, RsGxsGrpMetaDat
while(valid) while(valid)
{ {
RsGxsGrpMetaData* g = getGrpMeta(*c); RsGxsGrpMetaData* g = locked_getGrpMeta(*c);
if(g) if(g)
{ {
@ -964,7 +964,7 @@ int RsDataService::retrieveGxsGrpMetaData(std::map<RsGxsGroupId, RsGxsGrpMetaDat
while(valid) while(valid)
{ {
RsGxsGrpMetaData* g = getGrpMeta(*c); RsGxsGrpMetaData* g = locked_getGrpMeta(*c);
if(g) if(g)
{ {
@ -1075,7 +1075,7 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
// can get offsets for each file // can get offsets for each file
std::vector<MsgOffset> msgOffsets; std::vector<MsgOffset> msgOffsets;
getMessageOffsets(grpId, msgOffsets); locked_getMessageOffsets(grpId, msgOffsets);
std::string oldFileName = mServiceDir + "/" + grpId + "-msgs"; std::string oldFileName = mServiceDir + "/" + grpId + "-msgs";
std::string newFileName = mServiceDir + "/" + grpId + "-msgs-temp"; std::string newFileName = mServiceDir + "/" + grpId + "-msgs-temp";
@ -1133,12 +1133,12 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
out.close(); out.close();
// now update the new positions in db // now update the new positions in db
updateMessageEntries(updates); locked_updateMessageEntries(updates);
// then delete removed messages // then delete removed messages
GxsMsgReq msgsToDelete; GxsMsgReq msgsToDelete;
msgsToDelete[grpId] = msgIdV; msgsToDelete[grpId] = msgIdV;
removeMessageEntries(msgsToDelete); locked_removeMessageEntries(msgsToDelete);
// now replace old file location with new file // now replace old file location with new file
remove(oldFileName.c_str()); remove(oldFileName.c_str());
@ -1150,7 +1150,7 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
bool RsDataService::updateMessageEntries(const MsgUpdates& updates) bool RsDataService::locked_updateMessageEntries(const MsgUpdates& updates)
{ {
// start a transaction // start a transaction
bool ret = mDb->execSQL("BEGIN;"); bool ret = mDb->execSQL("BEGIN;");
@ -1177,7 +1177,7 @@ bool RsDataService::updateMessageEntries(const MsgUpdates& updates)
return ret; return ret;
} }
bool RsDataService::removeMessageEntries(const GxsMsgReq& msgIds) bool RsDataService::locked_removeMessageEntries(const GxsMsgReq& msgIds)
{ {
// start a transaction // start a transaction
bool ret = mDb->execSQL("BEGIN;"); bool ret = mDb->execSQL("BEGIN;");
@ -1203,7 +1203,7 @@ bool RsDataService::removeMessageEntries(const GxsMsgReq& msgIds)
return ret; return ret;
} }
void RsDataService::getMessageOffsets(const RsGxsGroupId& grpId, std::vector<MsgOffset>& offsets) void RsDataService::locked_getMessageOffsets(const RsGxsGroupId& grpId, std::vector<MsgOffset>& offsets)
{ {
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgOffSetColumns, KEY_GRP_ID+ "='" + grpId + "'", ""); RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgOffSetColumns, KEY_GRP_ID+ "='" + grpId + "'", "");
@ -1233,15 +1233,6 @@ void RsDataService::getMessageOffsets(const RsGxsGroupId& grpId, std::vector<Msg
} }
} }
void RsDataService::lockStore()
{
mDbMutex.lock();
}
void RsDataService::unlockStore()
{
mDbMutex.unlock();
}
uint32_t RsDataService::cacheSize() const { uint32_t RsDataService::cacheSize() const {
return 0; return 0;
} }

View File

@ -141,19 +141,6 @@ public:
*/ */
int updateGroupMetaData(GrpLocMetaData& meta); int updateGroupMetaData(GrpLocMetaData& meta);
/*!
* Use to lock store when needing to ensure Db contents has not change
* @warning ensure you call unlock or you could cause a deadlock
* @see RsDataService::unlockStore()
*/
void lockStore();
/*!
* Use to unlock store after locking
* @see RsDataService::lockStore()
*/
void unlockStore();
/*! /*!
* Completely clear out data stored in * Completely clear out data stored in
* and returns this to a state * and returns this to a state
@ -173,45 +160,45 @@ private:
* @param c cursor to result set * @param c cursor to result set
* @param msgs messages retrieved from cursor are stored here * @param msgs messages retrieved from cursor are stored here
*/ */
void retrieveMessages(RetroCursor* c, std::vector<RsNxsMsg*>& msgs); void locked_retrieveMessages(RetroCursor* c, std::vector<RsNxsMsg*>& msgs);
/*! /*!
* Retrieves all the grp results from a cursor * Retrieves all the grp results from a cursor
* @param c cursor to result set * @param c cursor to result set
* @param grps groups retrieved from cursor are stored here * @param grps groups retrieved from cursor are stored here
*/ */
void retrieveGroups(RetroCursor* c, std::vector<RsNxsGrp*>& grps); void locked_retrieveGroups(RetroCursor* c, std::vector<RsNxsGrp*>& grps);
/*! /*!
* Retrieves all the msg meta results from a cursor * Retrieves all the msg meta results from a cursor
* @param c cursor to result set * @param c cursor to result set
* @param metaSet message metadata retrieved from cursor are stored here * @param metaSet message metadata retrieved from cursor are stored here
*/ */
void retrieveMsgMeta(RetroCursor* c, std::vector<RsGxsMsgMetaData*>& msgMeta); void locked_retrieveMsgMeta(RetroCursor* c, std::vector<RsGxsMsgMetaData*>& msgMeta);
/*! /*!
* extracts a msg meta item from a cursor at its * extracts a msg meta item from a cursor at its
* current position * current position
*/ */
RsGxsMsgMetaData* getMsgMeta(RetroCursor& c); RsGxsMsgMetaData* locked_getMsgMeta(RetroCursor& c);
/*! /*!
* extracts a grp meta item from a cursor at its * extracts a grp meta item from a cursor at its
* current position * current position
*/ */
RsGxsGrpMetaData* getGrpMeta(RetroCursor& c); RsGxsGrpMetaData* locked_getGrpMeta(RetroCursor& c);
/*! /*!
* extracts a msg item from a cursor at its * extracts a msg item from a cursor at its
* current position * current position
*/ */
RsNxsMsg* getMessage(RetroCursor& c); RsNxsMsg* locked_getMessage(RetroCursor& c);
/*! /*!
* extracts a grp item from a cursor at its * extracts a grp item from a cursor at its
* current position * current position
*/ */
RsNxsGrp* getGroup(RetroCursor& c); RsNxsGrp* locked_getGroup(RetroCursor& c);
/*! /*!
* Creates an sql database and its associated file * Creates an sql database and its associated file
@ -223,7 +210,7 @@ private:
* Remove entries for data base * Remove entries for data base
* @param msgIds * @param msgIds
*/ */
bool removeMessageEntries(const GxsMsgReq& msgIds); bool locked_removeMessageEntries(const GxsMsgReq& msgIds);
typedef std::map<RsGxsGroupId, std::vector<MsgUpdate> > MsgUpdates; typedef std::map<RsGxsGroupId, std::vector<MsgUpdate> > MsgUpdates;
@ -232,12 +219,12 @@ private:
* @param msgIds * @param msgIds
* @param cv contains values to update message entries with * @param cv contains values to update message entries with
*/ */
bool updateMessageEntries(const MsgUpdates& updates); bool locked_updateMessageEntries(const MsgUpdates& updates);
private: private:
void getMessageOffsets(const RsGxsGroupId& grpId, std::vector<MsgOffset>& msgOffsets); void locked_getMessageOffsets(const RsGxsGroupId& grpId, std::vector<MsgOffset>& msgOffsets);
private: private:

View File

@ -54,7 +54,7 @@
#define GEN_EXCH_DEBUG 1 #define GEN_EXCH_DEBUG 1
#define MSG_CLEANUP_PERIOD 60*3 // 3 minutes #define MSG_CLEANUP_PERIOD 60*3 // 3 minute
RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns, RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns,
RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs, RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs,
@ -126,7 +126,6 @@ void RsGenExchange::tick()
// implemented service tick function // implemented service tick function
service_tick(); service_tick();
#ifdef GXS_CLEAN_UP
time_t now = time(NULL); time_t now = time(NULL);
if((mLastClean + MSG_CLEANUP_PERIOD < now) || mCleaning) if((mLastClean + MSG_CLEANUP_PERIOD < now) || mCleaning)
@ -147,8 +146,6 @@ void RsGenExchange::tick()
mCleaning = true; mCleaning = true;
} }
} }
#endif
} }
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token, bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
@ -943,6 +940,17 @@ bool RsGenExchange::subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId,
return true; return true;
} }
bool RsGenExchange::getGroupStatistic(const uint32_t& token, GxsGroupStatistic& stats)
{
return false;
}
bool RsGenExchange::getServiceStatistic(const uint32_t& token, GxsServiceStatistic& stats)
{
return false;
}
bool RsGenExchange::updated(bool willCallGrpChanged, bool willCallMsgChanged) bool RsGenExchange::updated(bool willCallGrpChanged, bool willCallMsgChanged)
{ {
bool changed = false; bool changed = false;
@ -1090,7 +1098,10 @@ bool RsGenExchange::getGroupData(const uint32_t &token, std::vector<RsGxsGrpItem
for(; lit != nxsGrps.end(); lit++) for(; lit != nxsGrps.end(); lit++)
{ {
RsTlvBinaryData& data = (*lit)->grp; RsTlvBinaryData& data = (*lit)->grp;
RsItem* item = mSerialiser->deserialise(data.bin_data, &data.bin_len); RsItem* item = NULL;
if(data.bin_len != 0)
item = mSerialiser->deserialise(data.bin_data, &data.bin_len);
if(item) if(item)
{ {
@ -1139,9 +1150,12 @@ bool RsGenExchange::getMsgData(const uint32_t &token,
for(; vit != nxsMsgsV.end(); vit++) for(; vit != nxsMsgsV.end(); vit++)
{ {
RsNxsMsg*& msg = *vit; RsNxsMsg*& msg = *vit;
RsItem* item = NULL;
RsItem* item = mSerialiser->deserialise(msg->msg.bin_data, if(msg->msg.bin_len != 0)
item = mSerialiser->deserialise(msg->msg.bin_data,
&msg->msg.bin_len); &msg->msg.bin_len);
if (item) if (item)
{ {
RsGxsMsgItem* mItem = dynamic_cast<RsGxsMsgItem*>(item); RsGxsMsgItem* mItem = dynamic_cast<RsGxsMsgItem*>(item);
@ -1152,15 +1166,19 @@ bool RsGenExchange::getMsgData(const uint32_t &token,
} }
else else
{ {
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::getMsgData() deserialisation/dynamic_cast ERROR"; std::cerr << "RsGenExchange::getMsgData() deserialisation/dynamic_cast ERROR";
std::cerr << std::endl; std::cerr << std::endl;
#endif
delete item; delete item;
} }
} }
else else
{ {
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR"; std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR";
std::cerr << std::endl; std::cerr << std::endl;
#endif
} }
delete msg; delete msg;
} }
@ -1172,14 +1190,14 @@ bool RsGenExchange::getMsgData(const uint32_t &token,
bool RsGenExchange::getMsgRelatedData(const uint32_t &token, GxsMsgRelatedDataMap &msgItems) bool RsGenExchange::getMsgRelatedData(const uint32_t &token, GxsMsgRelatedDataMap &msgItems)
{ {
RsStackMutex stack(mGenMtx); RsStackMutex stack(mGenMtx);
NxsMsgRelatedDataResult msgResult; NxsMsgRelatedDataResult msgResult;
bool ok = mDataAccess->getMsgRelatedData(token, msgResult); bool ok = mDataAccess->getMsgRelatedData(token, msgResult);
NxsMsgRelatedDataResult::iterator mit = msgResult.begin();
if(ok) if(ok)
{ {
NxsMsgRelatedDataResult::iterator mit = msgResult.begin();
for(; mit != msgResult.end(); mit++) for(; mit != msgResult.end(); mit++)
{ {
std::vector<RsGxsMsgItem*> gxsMsgItems; std::vector<RsGxsMsgItem*> gxsMsgItems;
@ -1190,8 +1208,10 @@ bool RsGenExchange::getMsgRelatedData(const uint32_t &token, GxsMsgRelatedDataMa
for(; vit != nxsMsgsV.end(); vit++) for(; vit != nxsMsgsV.end(); vit++)
{ {
RsNxsMsg*& msg = *vit; RsNxsMsg*& msg = *vit;
RsItem* item = NULL;
RsItem* item = mSerialiser->deserialise(msg->msg.bin_data, if(msg->msg.bin_len != 0)
item = mSerialiser->deserialise(msg->msg.bin_data,
&msg->msg.bin_len); &msg->msg.bin_len);
if (item) if (item)
{ {
@ -1214,6 +1234,7 @@ bool RsGenExchange::getMsgRelatedData(const uint32_t &token, GxsMsgRelatedDataMa
std::cerr << "RsGenExchange::getMsgRelatedData() deserialisation ERROR"; std::cerr << "RsGenExchange::getMsgRelatedData() deserialisation ERROR";
std::cerr << std::endl; std::cerr << std::endl;
} }
delete msg; delete msg;
} }
msgItems[msgId] = gxsMsgItems; msgItems[msgId] = gxsMsgItems;
@ -1968,73 +1989,6 @@ bool RsGenExchange::getGroupKeys(const RsGxsGroupId &grpId, RsTlvSecurityKeySet
return true; return true;
} }
void RsGenExchange::createDummyGroup(RsGxsGrpItem *grpItem)
{
RsStackMutex stack(mGenMtx);
RsNxsGrp* grp = new RsNxsGrp(mServType);
uint32_t size = mSerialiser->size(grpItem);
char gData[size];
bool ok = mSerialiser->serialise(grpItem, gData, &size);
grp->grp.setBinData(gData, size);
RsTlvSecurityKeySet privateKeySet, publicKeySet;
generateGroupKeys(privateKeySet, publicKeySet,
!(grpItem->meta.mGroupFlags & GXS_SERV::FLAG_PRIVACY_PUBLIC));
// find private admin key
RsTlvSecurityKey privAdminKey;
std::map<std::string, RsTlvSecurityKey>::iterator mit_keys = privateKeySet.keys.begin();
bool privKeyFound = false;
for(; mit_keys != privateKeySet.keys.end(); mit_keys++)
{
RsTlvSecurityKey& key = mit_keys->second;
if(key.keyFlags == (RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_FULL))
{
privAdminKey = key;
privKeyFound = true;
}
}
if(privKeyFound)
{
// get group id from private admin key id
grpItem->meta.mGroupId = grp->grpId = privAdminKey.keyId;
}
else
{
ok = false;
}
service_CreateGroup(grpItem, privateKeySet);
if(ok)
{
grp->metaData = new RsGxsGrpMetaData();
grpItem->meta.mPublishTs = time(NULL);
*(grp->metaData) = grpItem->meta;
grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_ADMIN;
createGroup(grp, privateKeySet, publicKeySet);
mDataAccess->addGroupData(grp);
}
if(!ok)
{
#ifdef GEN_EXCH_DEBUG
#endif
std::cerr << "RsGenExchange::createDummyGroup(); failed to publish grp " << std::endl;
delete grp;
}
delete grpItem;
}
void RsGenExchange::processRecvdData() void RsGenExchange::processRecvdData()
{ {
processRecvdGroups(); processRecvdGroups();
@ -2048,7 +2002,6 @@ void RsGenExchange::processRecvdMessages()
{ {
RsStackMutex stack(mGenMtx); RsStackMutex stack(mGenMtx);
NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin(); NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();
for(; pend_it != mMsgPendingValidate.end();) for(; pend_it != mMsgPendingValidate.end();)
@ -2086,7 +2039,12 @@ void RsGenExchange::processRecvdMessages()
{ {
RsNxsMsg* msg = *vit; RsNxsMsg* msg = *vit;
RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); RsGxsMsgMetaData* meta = new RsGxsMsgMetaData();
bool ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len));
bool ok = false;
if(msg->meta.bin_len != 0)
ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len));
msg->metaData = meta; msg->metaData = meta;
uint8_t validateReturn = VALIDATE_FAIL; uint8_t validateReturn = VALIDATE_FAIL;
@ -2123,8 +2081,8 @@ void RsGenExchange::processRecvdMessages()
{ {
#ifdef GXS_GENX_DEBUG #ifdef GXS_GENX_DEBUG
std::cerr << "failed to deserialise incoming meta, grpId: " std::cerr << "failed to deserialise incoming meta, msgId: "
<< msg->grpId << ", msgId: " << msg->msgId << std::endl; << "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif #endif
NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(),
@ -2138,6 +2096,11 @@ void RsGenExchange::processRecvdMessages()
else if(validateReturn == VALIDATE_FAIL_TRY_LATER) else if(validateReturn == VALIDATE_FAIL_TRY_LATER)
{ {
#ifdef GXS_GENX_DEBUG
std::cerr << "failed to validate msg, trying again: "
<< "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl;
#endif
RsGxsGrpMsgIdPair id; RsGxsGrpMsgIdPair id;
id.first = msg->grpId; id.first = msg->grpId;
id.second = msg->msgId; id.second = msg->msgId;
@ -2187,7 +2150,11 @@ void RsGenExchange::processRecvdGroups()
GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = *vit; GxsPendingItem<RsNxsGrp*, RsGxsGroupId>& gpsi = *vit;
RsNxsGrp* grp = gpsi.mItem; RsNxsGrp* grp = gpsi.mItem;
RsGxsGrpMetaData* meta = new RsGxsGrpMetaData(); RsGxsGrpMetaData* meta = new RsGxsGrpMetaData();
bool deserialOk = meta->deserialise(grp->meta.bin_data, grp->meta.bin_len); bool deserialOk = false;
if(grp->meta.bin_len != 0)
deserialOk = meta->deserialise(grp->meta.bin_data, grp->meta.bin_len);
bool erase = true; bool erase = true;
if(deserialOk) if(deserialOk)
@ -2216,6 +2183,12 @@ void RsGenExchange::processRecvdGroups()
} }
else if(ret == VALIDATE_FAIL_TRY_LATER) else if(ret == VALIDATE_FAIL_TRY_LATER)
{ {
#ifdef GXS_GENX_DEBUG
std::cerr << "failed to validate incoming grp, trying again. grpId: "
<< grp->grpId << std::endl;
#endif
if(gpsi.mAttempts == VALIDATE_MAX_ATTEMPTS) if(gpsi.mAttempts == VALIDATE_MAX_ATTEMPTS)
{ {
delete grp; delete grp;

View File

@ -283,13 +283,23 @@ public:
bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe); bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe);
protected: /*!
* Gets service statistic for a given services
* @param token value to to retrieve requested stats
* @param stats the status
* @return true if token exists false otherwise
*/
bool getServiceStatistic(const uint32_t& token, GxsServiceStatistic& stats);
/*! /*!
* @param grpItem * Get group statistic
* @deprecated only here temporarily for testing * @param token to be redeemed
* @param stats the stats associated to token requ
* @return true if token is false otherwise
*/ */
void createDummyGroup(RsGxsGrpItem* grpItem); bool getGroupStatistic(const uint32_t& token, GxsGroupStatistic& stats);
protected:
/*! /*!
* retrieves group data associated to a request token * retrieves group data associated to a request token
@ -343,10 +353,16 @@ protected:
for(; vit != nxsMsgsV.end(); vit++) for(; vit != nxsMsgsV.end(); vit++)
{ {
RsNxsMsg*& msg = *vit; RsNxsMsg*& msg = *vit;
RsItem* item = NULL;
RsItem* item = mSerialiser->deserialise(msg->msg.bin_data, if(msg->msg.bin_len != 0)
item = mSerialiser->deserialise(msg->msg.bin_data,
&msg->msg.bin_len); &msg->msg.bin_len);
GxsMsgType* mItem = dynamic_cast<GxsMsgType*>(item);
GxsMsgType* mItem = NULL;
if(item)
mItem = dynamic_cast<GxsMsgType*>(item);
if(mItem == NULL) if(mItem == NULL)
{ {

View File

@ -269,6 +269,17 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType,
return true; return true;
} }
void RsGxsDataAccess::requestServiceStatistic(const uint32_t& token)
{
}
void RsGxsDataAccess::requestGroupStatistic(const uint32_t& token, const RsGxsGroupId& grpId)
{
}
bool RsGxsDataAccess::requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, bool RsGxsDataAccess::requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts,
const std::vector<RsGxsGrpMsgIdPair> &msgIds) const std::vector<RsGxsGrpMsgIdPair> &msgIds)
{ {

View File

@ -94,12 +94,33 @@ public:
*/ */
bool requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::vector<RsGxsGrpMsgIdPair> &msgIds); bool requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::vector<RsGxsGrpMsgIdPair> &msgIds);
/*!
* This request statistics on amount of data held
* number of groups
* number of groups subscribed
* number of messages
* size of db store
* total size of messages
* total size of groups
* @param token
*/
void requestServiceStatistic(const uint32_t& token);
/*!
* To request statistic on a group
* @param token set to value to be redeemed to get statistic
* @param grpId the id of the group
*/
void requestGroupStatistic(const uint32_t& token, const RsGxsGroupId& grpId);
/* Poll */ /* Poll */
uint32_t requestStatus(const uint32_t token); uint32_t requestStatus(const uint32_t token);
/* Cancel Request */ /* Cancel Request */
bool cancelRequest(const uint32_t &token); bool cancelRequest(const uint32_t &token);
/** E: RsTokenService **/ /** E: RsTokenService **/
public: public:

View File

@ -24,6 +24,7 @@
*/ */
#include "rsgxsutil.h" #include "rsgxsutil.h"
#include "retroshare/rsgxsflags.h"
RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize) RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize)
@ -37,22 +38,22 @@ RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService
for(;cit != grpMeta.end(); cit++) for(;cit != grpMeta.end(); cit++)
{ {
mGrpIds.push_back(cit->first); mGrpMeta.push_back(cit->second);
delete cit->second;
} }
} }
bool RsGxsMessageCleanUp::clean() bool RsGxsMessageCleanUp::clean()
{ {
int i = 0; int i = 1;
time_t now = time(NULL); time_t now = time(NULL);
while(!mGrpIds.empty()) while(!mGrpMeta.empty())
{ {
RsGxsGroupId grpId = mGrpIds.back(); RsGxsGrpMetaData* grpMeta = mGrpMeta.back();
mGrpIds.pop_back(); const RsGxsGroupId& grpId = grpMeta->mGroupId;
mGrpMeta.pop_back();
GxsMsgReq req; GxsMsgReq req;
GxsMsgMetaResult result; GxsMsgMetaResult result;
@ -71,7 +72,17 @@ bool RsGxsMessageCleanUp::clean()
for(; vit != metaV.end(); ) for(; vit != metaV.end(); )
{ {
RsGxsMsgMetaData* meta = *vit; RsGxsMsgMetaData* meta = *vit;
if(meta->mPublishTs + MESSAGE_STORE_PERIOD < now)
// check if expired
bool remove = (meta->mPublishTs + MESSAGE_STORE_PERIOD) < now;
// check client does not want the message kept regardless of age
remove &= !(meta->mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP);
// if not subscribed remove messages (can optimise this really)
remove = remove || (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED);
if( remove )
{ {
req[grpId].push_back(meta->mMsgId); req[grpId].push_back(meta->mMsgId);
} }
@ -83,9 +94,11 @@ bool RsGxsMessageCleanUp::clean()
mDs->removeMsgs(req); mDs->removeMsgs(req);
delete grpMeta;
i++; i++;
if(i > CHUNK_SIZE) break; if(i > CHUNK_SIZE) break;
} }
return mGrpIds.empty(); return mGrpMeta.empty();
} }

View File

@ -90,7 +90,7 @@ private:
RsGeneralDataService* const mDs; RsGeneralDataService* const mDs;
const uint32_t MESSAGE_STORE_PERIOD, CHUNK_SIZE; const uint32_t MESSAGE_STORE_PERIOD, CHUNK_SIZE;
std::vector<RsGxsGroupId> mGrpIds; std::vector<RsGxsGrpMetaData*> mGrpMeta;
}; };

View File

@ -89,11 +89,15 @@ namespace GXS_SERV {
/** START GXS Msg status flags **/ /** START GXS Msg status flags **/
static const uint32_t GXS_MSG_STATUS_UNPROCESSED = 0x000000100; static const uint32_t GXS_MSG_STATUS_UNPROCESSED = 0x0000001;
static const uint32_t GXS_MSG_STATUS_UNREAD = 0x00000200; static const uint32_t GXS_MSG_STATUS_UNREAD = 0x000002;
static const uint32_t GXS_MSG_STATUS_READ = 0x00000400; static const uint32_t GXS_MSG_STATUS_READ = 0x000004;
static const uint32_t GXS_MSG_STATUS_KEEP = 0x000008;
static const uint32_t GXS_MSG_STATUS_DELETE = 0x0000020;
/** END GXS Msg status flags **/ /** END GXS Msg status flags **/
@ -104,7 +108,6 @@ namespace GXS_SERV {
static const uint32_t GXS_GRP_STATUS_UNREAD = 0x00000200; static const uint32_t GXS_GRP_STATUS_UNREAD = 0x00000200;
/** END GXS Grp status flags **/ /** END GXS Grp status flags **/
} }

View File

@ -169,6 +169,21 @@ public:
*/ */
virtual bool acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grpId) = 0; virtual bool acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grpId) = 0;
/*!
* Gets service statistic for a given services
* @param token value to to retrieve requested stats
* @param stats the status
* @return true if token exists false otherwise
*/
virtual bool getServiceStatistic(const uint32_t& token, GxsServiceStatistic& stats) = 0;
/*!
*
* @param token to be redeemed
* @param stats the stats associated to token request
* @return true if token is false otherwise
*/
virtual bool getGroupStatistic(const uint32_t& token, GxsGroupStatistic& stats) = 0;
}; };

View File

@ -216,6 +216,28 @@ public:
return mGxs->acknowledgeTokenGrp(token, grpId); return mGxs->acknowledgeTokenGrp(token, grpId);
} }
/*!
* Gets service statistic for a given services
* @param token value to to retrieve requested stats
* @param stats the status
* @return true if token exists false otherwise
*/
bool getServiceStatistic(const uint32_t& token, GxsServiceStatistic& stats)
{
return mGxs->getServiceStatistic(token, stats);
}
/*!
*
* @param token to be redeemed
* @param stats the stats associated to token request
* @return true if token is false otherwise
*/
bool getGroupStatistic(const uint32_t& token, GxsGroupStatistic& stats)
{
return mGxs->getGroupStatistic(token, stats);
}
private: private:
RsGxsIface* mGxs; RsGxsIface* mGxs;

View File

@ -121,5 +121,26 @@ public:
}; };
class GxsGroupStatistic
{
public:
/// number of message
RsGxsGroupId mGrpId;
uint32_t mNumMsgs;
uint32_t mTotalSizeOfMsgs;
};
class GxsServiceStatistic
{
public:
uint32_t mNumMsgs;
uint32_t mNumGrps;
uint32_t mSizeOfMsgs;
uint32_t mSizeOfGrps;
uint32_t mNumGrpsSubscribed;
uint32_t mSizeStore;
};
#endif /* RSGXSIFACETYPES_H_ */ #endif /* RSGXSIFACETYPES_H_ */

View File

@ -189,6 +189,26 @@ public:
*/ */
virtual uint32_t requestStatus(const uint32_t token) = 0; virtual uint32_t requestStatus(const uint32_t token) = 0;
/*!
* This request statistics on amount of data held
* number of groups
* number of groups subscribed
* number of messages
* size of db store
* total size of messages
* total size of groups
* @param token
*/
virtual void requestServiceStatistic(const uint32_t& token) = 0;
/*!
* To request statistic on a group
* @param token set to value to be redeemed to get statistic
* @param grpId the id of the group
*/
virtual void requestGroupStatistic(const uint32_t& token, const RsGxsGroupId& grpId) = 0;
/* Cancel Request */ /* Cancel Request */
/*! /*!

View File

@ -291,6 +291,7 @@ public:
std::string grpId; /// group id, forms part of version id std::string grpId; /// group id, forms part of version id
std::string msgId; /// msg id std::string msgId; /// msg id
static int refcount; static int refcount;
/*! /*!
* This should contains all the data * This should contains all the data
* which is not specific to the Gxs service data * which is not specific to the Gxs service data

View File

@ -63,7 +63,7 @@ p3GxsForums::p3GxsForums(RsGeneralDataService *gds, RsNetworkExchangeService *ne
uint32_t p3GxsForums::forumsAuthenPolicy() uint32_t p3GxsForums::forumsAuthenPolicy()
{ {
uint32_t policy; uint32_t policy = 0;
uint32_t flag = GXS_SERV::MSG_AUTHEN_ROOT_AUTHOR_SIGN | uint32_t flag = GXS_SERV::MSG_AUTHEN_ROOT_AUTHOR_SIGN |
GXS_SERV::MSG_AUTHEN_CHILD_AUTHOR_SIGN; GXS_SERV::MSG_AUTHEN_CHILD_AUTHOR_SIGN;
@ -71,10 +71,6 @@ uint32_t p3GxsForums::forumsAuthenPolicy()
RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::PUBLIC_GRP_BITS); RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::PUBLIC_GRP_BITS);
RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::PRIVATE_GRP_BITS); RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::PRIVATE_GRP_BITS);
flag = GXS_SERV::GRP_OPTION_AUTHEN_AUTHOR_SIGN;
RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::GRP_OPTION_BITS);
return policy; return policy;
} }