mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-06-14 01:23:13 -04:00
Fixes for Gxs msg retrieval:
msgId retrieval fixed, including incorrect stack mtx lock msgRelatedId retrieval fixed, mem leak removed (did not clean meta) msg data retrieval fixed added related tests RsThread::start now initialises mIsRunning to true, needed to unit tear down on gxs test system. git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5471 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
c8350ad011
commit
9d42715cad
9 changed files with 72 additions and 87 deletions
|
@ -48,10 +48,12 @@ void GxsCoreServer::run()
|
||||||
{
|
{
|
||||||
std::set<RsGxsService*>::iterator sit;
|
std::set<RsGxsService*>::iterator sit;
|
||||||
|
|
||||||
double timeDelta = 0.2;
|
double timeDelta = 0.02;
|
||||||
|
|
||||||
while(isRunning())
|
while(isRunning())
|
||||||
{
|
{
|
||||||
|
for(sit = mGxsServices.begin(); sit != mGxsServices.end(); sit++)
|
||||||
|
(*sit)->tick();
|
||||||
|
|
||||||
#ifndef WINDOWS_SYS
|
#ifndef WINDOWS_SYS
|
||||||
usleep((int) (timeDelta * 1000000));
|
usleep((int) (timeDelta * 1000000));
|
||||||
|
@ -59,9 +61,6 @@ void GxsCoreServer::run()
|
||||||
Sleep((int) (timeDelta * 1000));
|
Sleep((int) (timeDelta * 1000));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
for(sit = mGxsServices.begin(); sit != mGxsServices.end(); sit++)
|
|
||||||
(*sit)->tick();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -778,6 +778,7 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, b
|
||||||
{
|
{
|
||||||
RsGxsMsgMetaData* meta = *meta_lit;
|
RsGxsMsgMetaData* meta = *meta_lit;
|
||||||
delete meta;
|
delete meta;
|
||||||
|
meta_lit = msgMetaV.erase(meta_lit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -551,7 +551,7 @@ void RsGenExchange::publishMsgs()
|
||||||
{
|
{
|
||||||
msg->metaData = new RsGxsMsgMetaData();
|
msg->metaData = new RsGxsMsgMetaData();
|
||||||
msg->msg.setBinData(mData, size);
|
msg->msg.setBinData(mData, size);
|
||||||
*(msg->metaData) = msgItem->meta;
|
*(msg->metaData) = msgItem->meta;
|
||||||
size = msg->metaData->serial_size();
|
size = msg->metaData->serial_size();
|
||||||
char metaDataBuff[size];
|
char metaDataBuff[size];
|
||||||
|
|
||||||
|
@ -560,8 +560,24 @@ void RsGenExchange::publishMsgs()
|
||||||
|
|
||||||
ok = createMessage(msg);
|
ok = createMessage(msg);
|
||||||
|
|
||||||
if(ok)
|
if(ok)
|
||||||
|
{
|
||||||
|
msg->metaData->mPublishTs = time(NULL);
|
||||||
|
|
||||||
|
// empty orig msg id means this is the original
|
||||||
|
// msg
|
||||||
|
// TODO: a non empty msgid means one should at least
|
||||||
|
// have the msg on disk, after which this msg is signed
|
||||||
|
// based on the security settings
|
||||||
|
// public grp (sign by grp public pub key, private/id: signed by
|
||||||
|
// id
|
||||||
|
if(msg->metaData->mOrigMsgId.empty())
|
||||||
|
{
|
||||||
|
msg->metaData->mOrigMsgId = msg->metaData->mMsgId;
|
||||||
|
}
|
||||||
|
|
||||||
ok = mDataAccess->addMsgData(msg);
|
ok = mDataAccess->addMsgData(msg);
|
||||||
|
}
|
||||||
|
|
||||||
// add to published to allow acknowledgement
|
// add to published to allow acknowledgement
|
||||||
mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(msg->grpId, msg->msgId)));
|
mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(msg->grpId, msg->msgId)));
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <queue>
|
#include <queue>
|
||||||
|
#include <ctime>
|
||||||
|
|
||||||
#include "rsgxs.h"
|
#include "rsgxs.h"
|
||||||
#include "rsgds.h"
|
#include "rsgds.h"
|
||||||
|
@ -40,6 +41,7 @@ typedef std::map<RsGxsGroupId, std::vector<RsGxsMsgItem*> > GxsMsgDataMap;
|
||||||
typedef std::map<RsGxsGroupId, RsGxsGrpItem*> GxsGroupDataMap;
|
typedef std::map<RsGxsGroupId, RsGxsGrpItem*> GxsGroupDataMap;
|
||||||
typedef std::map<RsGxsGroupId, std::vector<RsMsgMetaData> > GxsMsgMetaMap;
|
typedef std::map<RsGxsGroupId, std::vector<RsMsgMetaData> > GxsMsgMetaMap;
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* This should form the parent class to \n
|
* This should form the parent class to \n
|
||||||
* all gxs services. This provides access to service's msg/grp data \n
|
* all gxs services. This provides access to service's msg/grp data \n
|
||||||
|
@ -75,7 +77,6 @@ public:
|
||||||
|
|
||||||
virtual ~RsGenExchange();
|
virtual ~RsGenExchange();
|
||||||
|
|
||||||
|
|
||||||
/** S: Observer implementation **/
|
/** S: Observer implementation **/
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
|
@ -155,7 +156,7 @@ protected:
|
||||||
public:
|
public:
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* This allows the client service to acknowledge that their msgs has
|
* This allows the client service to acknowledge that their msgs has \n
|
||||||
* been created/modified and retrieve the create/modified msg ids
|
* been created/modified and retrieve the create/modified msg ids
|
||||||
* @param token the token related to modification/create request
|
* @param token the token related to modification/create request
|
||||||
* @param msgIds map of grpid->msgIds of message created/modified
|
* @param msgIds map of grpid->msgIds of message created/modified
|
||||||
|
@ -164,7 +165,7 @@ public:
|
||||||
bool acknowledgeTokenMsg(const uint32_t& token, RsGxsGrpMsgIdPair& msgId);
|
bool acknowledgeTokenMsg(const uint32_t& token, RsGxsGrpMsgIdPair& msgId);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* This allows the client service to acknowledge that their grps has
|
* This allows the client service to acknowledge that their grps has \n
|
||||||
* been created/modified and retrieve the create/modified grp ids
|
* been created/modified and retrieve the create/modified grp ids
|
||||||
* @param token the token related to modification/create request
|
* @param token the token related to modification/create request
|
||||||
* @param msgIds vector of ids of groups created/modified
|
* @param msgIds vector of ids of groups created/modified
|
||||||
|
@ -177,19 +178,21 @@ protected:
|
||||||
/** Modifications **/
|
/** Modifications **/
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* Enables publication of a group item
|
* Enables publication of a group item \n
|
||||||
* If the item exists already this is simply versioned
|
* If the item exists already this is simply versioned \n
|
||||||
* This will induce a related change message
|
* This will induce a related change message \n
|
||||||
* Ownership of item passes to this rsgenexchange
|
* Ownership of item passes to this rsgenexchange \n
|
||||||
* @param token
|
* @param token
|
||||||
* @param grpItem
|
* @param grpItem
|
||||||
*/
|
*/
|
||||||
void publishGroup(uint32_t& token, RsGxsGrpItem* grpItem);
|
void publishGroup(uint32_t& token, RsGxsGrpItem* grpItem);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* Enables publication of a message item
|
* Enables publication of a message item \n
|
||||||
* If the item exists already this is simply versioned
|
* Setting mOrigMsgId meta member to blank \n
|
||||||
* This will induce a related a change message
|
* leads to this msg being an original msg \n
|
||||||
|
* if mOrigMsgId is not blank the msgId then this msg is \n
|
||||||
|
* considered a versioned msg \n
|
||||||
* Ownership of item passes to this rsgenexchange
|
* Ownership of item passes to this rsgenexchange
|
||||||
* @param token
|
* @param token
|
||||||
* @param msgItem
|
* @param msgItem
|
||||||
|
|
|
@ -134,6 +134,7 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType,
|
||||||
}else if(reqType & GXS_REQUEST_TYPE_MSG_IDS)
|
}else if(reqType & GXS_REQUEST_TYPE_MSG_IDS)
|
||||||
{
|
{
|
||||||
MsgIdReq* mir = new MsgIdReq();
|
MsgIdReq* mir = new MsgIdReq();
|
||||||
|
mir->mMsgIds = msgIds;
|
||||||
req = mir;
|
req = mir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,12 +339,12 @@ bool RsGxsDataAccess::getMsgSummary(const uint32_t& token, GxsMsgMetaResult& msg
|
||||||
return false;
|
return false;
|
||||||
}else if(req->status == GXS_REQUEST_STATUS_COMPLETE){
|
}else if(req->status == GXS_REQUEST_STATUS_COMPLETE){
|
||||||
|
|
||||||
MsgMetaReq* mmreq = dynamic_cast<MsgMetaReq*>(req);
|
MsgMetaReq* mmreq = dynamic_cast<MsgMetaReq*>(req);
|
||||||
|
|
||||||
if(mmreq)
|
if(mmreq)
|
||||||
{
|
{
|
||||||
msgInfo = mmreq->mMsgMetaData;
|
msgInfo = mmreq->mMsgMetaData;
|
||||||
locked_updateRequestStatus(token, GXS_REQUEST_STATUS_DONE);
|
locked_updateRequestStatus(token, GXS_REQUEST_STATUS_DONE);
|
||||||
|
|
||||||
}else{
|
}else{
|
||||||
std::cerr << "RsGxsDataAccess::getMsgSummary() Req found, failed caste" << std::endl;
|
std::cerr << "RsGxsDataAccess::getMsgSummary() Req found, failed caste" << std::endl;
|
||||||
|
@ -370,13 +371,19 @@ bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds)
|
||||||
}else if(req->status == GXS_REQUEST_STATUS_COMPLETE){
|
}else if(req->status == GXS_REQUEST_STATUS_COMPLETE){
|
||||||
|
|
||||||
MsgIdReq* mireq = dynamic_cast<MsgIdReq*>(req);
|
MsgIdReq* mireq = dynamic_cast<MsgIdReq*>(req);
|
||||||
|
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req);
|
||||||
|
|
||||||
if(mireq)
|
if(mireq)
|
||||||
{
|
{
|
||||||
msgIds = mireq->mMsgIdResult;
|
msgIds = mireq->mMsgIdResult;
|
||||||
locked_updateRequestStatus(token, GXS_REQUEST_STATUS_DONE);
|
locked_updateRequestStatus(token, GXS_REQUEST_STATUS_DONE);
|
||||||
|
}
|
||||||
}else{
|
else if(mrireq)
|
||||||
|
{
|
||||||
|
msgIds = mrireq->mMsgIdResult;
|
||||||
|
locked_updateRequestStatus(token, GXS_REQUEST_STATUS_DONE);
|
||||||
|
}
|
||||||
|
else{
|
||||||
std::cerr << "RsGxsDataAccess::getMsgList() Req found, failed caste" << std::endl;
|
std::cerr << "RsGxsDataAccess::getMsgList() Req found, failed caste" << std::endl;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -388,37 +395,6 @@ bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RsGxsDataAccess::getMsgRelatedInfo(const uint32_t &token, GxsMsgIdResult &msgIds)
|
|
||||||
{
|
|
||||||
RsStackMutex stack(mDataMutex);
|
|
||||||
|
|
||||||
GxsRequest* req = locked_retrieveRequest(token);
|
|
||||||
|
|
||||||
if(req == NULL){
|
|
||||||
|
|
||||||
std::cerr << "RsGxsDataAccess::getMsgRelatedInfo() Unable to retrieve group data" << std::endl;
|
|
||||||
return false;
|
|
||||||
}else if(req->status == GXS_REQUEST_STATUS_COMPLETE){
|
|
||||||
|
|
||||||
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req);
|
|
||||||
|
|
||||||
if(mrireq)
|
|
||||||
{
|
|
||||||
msgIds = mrireq->mMsgIdResult;
|
|
||||||
locked_updateRequestStatus(token, GXS_REQUEST_STATUS_DONE);
|
|
||||||
|
|
||||||
}else{
|
|
||||||
std::cerr << "RsGxsDataAccess::::getMsgRelatedInfo() Req found, failed caste" << std::endl;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
std::cerr << "RsGxsDataAccess::::getMsgRelatedInfo() Req not ready" << std::endl;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list<RsGxsGroupId>& groupIds)
|
bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list<RsGxsGroupId>& groupIds)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mDataMutex);
|
RsStackMutex stack(mDataMutex);
|
||||||
|
@ -656,10 +632,8 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq* req)
|
||||||
|
|
||||||
const RsTokReqOptionsV2& opts = req->Options;
|
const RsTokReqOptionsV2& opts = req->Options;
|
||||||
|
|
||||||
{
|
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
|
||||||
RsStackMutex stack(mDataMutex);
|
|
||||||
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* CASEs this handles.
|
/* CASEs this handles.
|
||||||
|
@ -764,7 +738,6 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq* req)
|
||||||
for(oit = origMsgTs.begin(); oit != origMsgTs.end(); oit++)
|
for(oit = origMsgTs.begin(); oit != origMsgTs.end(); oit++)
|
||||||
{
|
{
|
||||||
req->mMsgIdResult[grpId].push_back(oit->second.first);
|
req->mMsgIdResult[grpId].push_back(oit->second.first);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -802,6 +775,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq* req)
|
||||||
if (add)
|
if (add)
|
||||||
{
|
{
|
||||||
req->mMsgIdResult[grpId].push_back(msgMeta->mMsgId);
|
req->mMsgIdResult[grpId].push_back(msgMeta->mMsgId);
|
||||||
|
metaFilter[grpId].insert(std::make_pair(msgMeta->mMsgId, msgMeta));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -810,8 +784,10 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq* req)
|
||||||
|
|
||||||
filterMsgList(req->mMsgIdResult, opts, metaFilter);
|
filterMsgList(req->mMsgIdResult, opts, metaFilter);
|
||||||
|
|
||||||
// delete the data
|
metaFilter.clear();
|
||||||
cleanseMetaFilter(metaFilter);
|
|
||||||
|
// delete meta data
|
||||||
|
cleanseMsgMetaMap(result);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -821,10 +797,9 @@ bool RsGxsDataAccess::getMsgList(MsgIdReq* req)
|
||||||
|
|
||||||
GxsMsgMetaResult result;
|
GxsMsgMetaResult result;
|
||||||
|
|
||||||
{
|
|
||||||
RsStackMutex stack(mDataMutex);
|
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
|
||||||
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
GxsMsgMetaResult::iterator mit = result.begin(), mit_end = result.end();
|
GxsMsgMetaResult::iterator mit = result.begin(), mit_end = result.end();
|
||||||
|
|
||||||
|
@ -845,24 +820,22 @@ bool RsGxsDataAccess::getMsgList(MsgIdReq* req)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RsGxsDataAccess::cleanseMetaFilter(MsgMetaFilter& filter)
|
void RsGxsDataAccess::cleanseMsgMetaMap(GxsMsgMetaResult& result)
|
||||||
{
|
{
|
||||||
MsgMetaFilter::iterator mit = filter.begin();
|
GxsMsgMetaResult::iterator mit = result.begin();
|
||||||
|
|
||||||
for(; mit !=filter.end(); mit++)
|
for(; mit !=result.end(); mit++)
|
||||||
{
|
{
|
||||||
std::map<RsGxsMessageId, RsGxsMsgMetaData*>& metaM =
|
|
||||||
mit->second;
|
|
||||||
std::map<RsGxsMessageId, RsGxsMsgMetaData*>::iterator mit2
|
|
||||||
= metaM.begin();
|
|
||||||
|
|
||||||
for(; mit2 != metaM.end(); mit2++)
|
std::vector<RsGxsMsgMetaData*>& msgMetaV = mit->second;
|
||||||
|
std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetaV.begin();
|
||||||
|
for(; vit != msgMetaV.end(); vit++)
|
||||||
{
|
{
|
||||||
delete mit2->second;
|
delete *vit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
filter.clear();
|
result.clear();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -153,14 +153,6 @@ public:
|
||||||
*/
|
*/
|
||||||
bool getMsgData(const uint32_t &token, NxsMsgDataResult& msgData);
|
bool getMsgData(const uint32_t &token, NxsMsgDataResult& msgData);
|
||||||
|
|
||||||
/*!
|
|
||||||
*
|
|
||||||
* @param token request token to be redeemed
|
|
||||||
* @param msgIds
|
|
||||||
* @return false if data cannot be found for token
|
|
||||||
*/
|
|
||||||
bool getMsgRelatedInfo(const uint32_t &token, GxsMsgIdResult &msgIds);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
/** helper functions to implement token service **/
|
/** helper functions to implement token service **/
|
||||||
|
@ -231,7 +223,7 @@ private:
|
||||||
* Convenience function to delete the ids
|
* Convenience function to delete the ids
|
||||||
* @param filter the meta filter to clean
|
* @param filter the meta filter to clean
|
||||||
*/
|
*/
|
||||||
void cleanseMetaFilter(MsgMetaFilter& filter);
|
void cleanseMsgMetaMap(GxsMsgMetaResult& result);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
|
|
@ -124,6 +124,7 @@ public:
|
||||||
: RsItem(RS_PKT_VERSION_SERVICE, service, subtype) { return; }
|
: RsItem(RS_PKT_VERSION_SERVICE, service, subtype) { return; }
|
||||||
virtual ~RsGxsGrpItem(){}
|
virtual ~RsGxsGrpItem(){}
|
||||||
|
|
||||||
|
|
||||||
RsGroupMetaData meta;
|
RsGroupMetaData meta;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
|
|
||||||
#include "retrodb.h"
|
#include "retrodb.h"
|
||||||
|
|
||||||
#define RETRODB_DEBUG
|
//#define RETRODB_DEBUG
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -119,7 +119,7 @@ class RsThread
|
||||||
RsThread();
|
RsThread();
|
||||||
virtual ~RsThread() {}
|
virtual ~RsThread() {}
|
||||||
|
|
||||||
virtual void start() { createThread(*this); }
|
virtual void start() { mIsRunning = true; createThread(*this); }
|
||||||
virtual void run() = 0; /* called once the thread is started */
|
virtual void run() = 0; /* called once the thread is started */
|
||||||
virtual void join(); /* waits for the the mTid thread to stop */
|
virtual void join(); /* waits for the the mTid thread to stop */
|
||||||
virtual void stop(); /* calls pthread_exit() */
|
virtual void stop(); /* calls pthread_exit() */
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue