seperated content value from retrodb

added local meta changing fucntionality to gxs 
also added msgrelated info id retrieval to tokeservice


git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5452 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2012-08-21 21:32:07 +00:00
parent 8a7c011c5d
commit 6dd18eea46
13 changed files with 983 additions and 746 deletions

View file

@ -120,6 +120,14 @@
#define RS_DATA_SERVICE_DEBUG
const std::string RsGeneralDataService::GRP_META_SERV_STRING = KEY_NXS_SERV_STRING;
const std::string RsGeneralDataService::GRP_META_STATUS = KEY_GRP_STATUS;
const std::string RsGeneralDataService::GRP_META_SUBSCRIBE_FLAG = KEY_GRP_SUBCR_FLAG;
const std::string RsGeneralDataService::MSG_META_SERV_STRING = KEY_NXS_SERV_STRING;
const std::string RsGeneralDataService::MSG_META_STATUS = KEY_MSG_STATUS;
RsDataService::RsDataService(const std::string &serviceDir, const std::string &dbName, uint16_t serviceType,
RsGxsSearchModule *mod)
: RsGeneralDataService(), mServiceDir(serviceDir), mDbName(mServiceDir + "/" + dbName), mServType(serviceType){
@ -910,12 +918,16 @@ int RsDataService::removeGroups(const std::vector<std::string> &grpIds)
int RsDataService::updateGroupMetaData(GrpLocMetaData &meta)
{
return 0;
RsGxsGroupId& grpId = meta.grpId;
return mDb->sqlUpdate(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId + "'", meta.val) ? 1 : 0;
}
int RsDataService::updateMessageMetaData(MsgLocMetaData &metaData)
{
return 0;
RsGxsGroupId& grpId = metaData.msgId.first;
RsGxsMessageId& msgId = metaData.msgId.second;
return mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId
+ "' AND " + KEY_MSG_ID + "='" + msgId + "'", metaData.val) ? 1 : 0;
}

View file

@ -55,6 +55,8 @@ public:
class MsgLocMetaData {
public:
MsgLocMetaData(const MsgLocMetaData& meta){ msgId = meta.msgId; val = meta.val;}
MsgLocMetaData() {}
RsGxsGrpMsgIdPair msgId;
ContentValue val;
};
@ -66,6 +68,8 @@ public:
class GrpLocMetaData {
public:
GrpLocMetaData(const GrpLocMetaData& meta){ grpId = meta.grpId; val = meta.val;}
GrpLocMetaData(){}
RsGxsGroupId grpId;
ContentValue val;
@ -98,6 +102,16 @@ typedef std::map<RsGxsGroupId, std::vector<RsNxsMsg*> > GxsMsgResult; // <grpId,
class RsGeneralDataService
{
public:
static const std::string MSG_META_SERV_STRING;
static const std::string MSG_META_STATUS;
static const std::string GRP_META_SUBSCRIBE_FLAG;
static const std::string GRP_META_STATUS;
static const std::string GRP_META_SERV_STRING;
public:
RsGeneralDataService(){}

View file

@ -31,6 +31,7 @@
#include "rsgenexchange.h"
#include "gxssecurity.h"
#include "util/contentvalue.h"
RsGenExchange::RsGenExchange(RsGeneralDataService *gds,
RsNetworkExchangeService *ns, RsSerialType *serviceSerialiser, uint16_t servType)
@ -63,26 +64,33 @@ void RsGenExchange::tick()
publishMsgs();
processGrpMetaChanges();
processMsgMetaChanges();
notifyChanges(mNotifications);
mNotifications.clear();
}
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
std::pair<RsGxsGroupId, RsGxsMessageId>& msgId)
RsGxsGrpMsgIdPair& msgId)
{
RsStackMutex stack(mGenMtx);
std::map<uint32_t, std::pair<RsGxsGroupId, RsGxsMessageId> >::iterator mit =
mMsgPublished.find(token);
std::map<uint32_t, RsGxsGrpMsgIdPair >::iterator mit =
mMsgNotify.find(token);
if(mit == mMsgPublished.end())
return false;
if(mit == mMsgNotify.end())
{
return false;
}
msgId = mit->second;
// no dump token as client has ackowledged its completion
mDataAccess->disposeOfPublicToken(token);
msgId = mit->second;
// no dump token as client has ackowledged its completion
mDataAccess->disposeOfPublicToken(token);
return true;
}
@ -95,9 +103,9 @@ bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token,
RsStackMutex stack(mGenMtx);
std::map<uint32_t, RsGxsGroupId >::iterator mit =
mGrpPublished.find(token);
mGrpNotify.find(token);
if(mit == mGrpPublished.end())
if(mit == mGrpNotify.end())
return false;
grpId = mit->second;
@ -395,28 +403,131 @@ void RsGenExchange::notifyNewMessages(std::vector<RsNxsMsg *>& messages)
}
bool RsGenExchange::publishGroup(uint32_t& token, RsGxsGrpItem *grpItem)
void RsGenExchange::publishGroup(uint32_t& token, RsGxsGrpItem *grpItem)
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
mGrpsToPublish.insert(std::make_pair(token, grpItem));
return true;
}
bool RsGenExchange::publishMsg(uint32_t& token, RsGxsMsgItem *msgItem)
void RsGenExchange::publishMsg(uint32_t& token, RsGxsMsgItem *msgItem)
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
mMsgsToPublish.insert(std::make_pair(token, msgItem));
}
void RsGenExchange::setGroupSubscribeFlag(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& flag)
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
GrpLocMetaData g;
g.grpId = grpId;
g.val.put(RsGeneralDataService::GRP_META_SUBSCRIBE_FLAG, (int32_t)flag);
mGrpLocMetaMap.insert(std::make_pair(token, g));
}
void RsGenExchange::setGroupStatusFlag(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& status)
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
GrpLocMetaData g;
g.grpId = grpId;
g.val.put(RsGeneralDataService::GRP_META_STATUS, (int32_t)status);
mGrpLocMetaMap.insert(std::make_pair(token, g));
}
void RsGenExchange::setGroupServiceString(uint32_t& token, const RsGxsGroupId& grpId, const std::string& servString)
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
GrpLocMetaData g;
g.grpId = grpId;
g.val.put(RsGeneralDataService::GRP_META_SERV_STRING, servString);
mGrpLocMetaMap.insert(std::make_pair(token, g));
}
void RsGenExchange::setMsgStatusFlag(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const uint32_t& status)
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
MsgLocMetaData m;
m.val.put(RsGeneralDataService::MSG_META_STATUS, (int32_t)status);
m.msgId = msgId;
mMsgLocMetaMap.insert(std::make_pair(token, m));
}
void RsGenExchange::setMsgServiceString(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const std::string& servString )
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
MsgLocMetaData m;
m.val.put(RsGeneralDataService::MSG_META_SERV_STRING, servString);
m.msgId = msgId;
mMsgLocMetaMap.insert(std::make_pair(token, m));
}
void RsGenExchange::processMsgMetaChanges()
{
RsStackMutex stack(mGenMtx);
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
mMsgsToPublish.insert(std::make_pair(token, msgItem));
std::map<uint32_t, MsgLocMetaData>::iterator mit = mMsgLocMetaMap.begin(),
mit_end = mMsgLocMetaMap.end();
return true;
for(; mit != mit_end; mit++)
{
MsgLocMetaData& m = mit->second;
bool ok = mDataStore->updateMessageMetaData(m) == 1;
uint32_t token = mit->first;
if(ok)
{
mDataAccess->updatePublicRequestStatus(token, RsTokenServiceV2::GXS_REQUEST_STATUS_COMPLETE);
}else
{
mDataAccess->updatePublicRequestStatus(token, RsTokenServiceV2::GXS_REQUEST_STATUS_FAILED);
}
mMsgNotify.insert(std::make_pair(token, m.msgId));
}
mMsgLocMetaMap.clear();
}
void RsGenExchange::processGrpMetaChanges()
{
RsStackMutex stack(mGenMtx);
std::map<uint32_t, GrpLocMetaData>::iterator mit = mGrpLocMetaMap.begin(),
mit_end = mGrpLocMetaMap.end();
for(; mit != mit_end; mit++)
{
GrpLocMetaData& g = mit->second;
uint32_t token = mit->first;
bool ok = mDataStore->updateGroupMetaData(g) == 1;
if(ok)
{
mDataAccess->updatePublicRequestStatus(token, RsTokenServiceV2::GXS_REQUEST_STATUS_COMPLETE);
}else
{
mDataAccess->updatePublicRequestStatus(token, RsTokenServiceV2::GXS_REQUEST_STATUS_FAILED);
}
mGrpNotify.insert(std::make_pair(token, g.grpId));
}
mGrpLocMetaMap.clear();
}
void RsGenExchange::publishMsgs()
{
@ -453,7 +564,7 @@ void RsGenExchange::publishMsgs()
ok = mDataAccess->addMsgData(msg);
// add to published to allow acknowledgement
mMsgPublished.insert(std::make_pair(mit->first, std::make_pair(msg->grpId, msg->msgId)));
mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(msg->grpId, msg->msgId)));
mDataAccess->updatePublicRequestStatus(mit->first, RsTokenServiceV2::GXS_REQUEST_STATUS_COMPLETE);
}
@ -463,7 +574,7 @@ void RsGenExchange::publishMsgs()
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::publishMsgs() failed to publish msg " << std::endl;
#endif
mMsgPublished.insert(std::make_pair(mit->first, std::make_pair(RsGxsGroupId(""), RsGxsMessageId(""))));
mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(RsGxsGroupId(""), RsGxsMessageId(""))));
delete msg;
continue;
@ -509,7 +620,7 @@ void RsGenExchange::publishGrps()
ok = mDataAccess->addGroupData(grp);
// add to published to allow acknowledgement
mGrpPublished.insert(std::make_pair(mit->first, grp->grpId));
mGrpNotify.insert(std::make_pair(mit->first, grp->grpId));
mDataAccess->updatePublicRequestStatus(mit->first, RsTokenServiceV2::GXS_REQUEST_STATUS_COMPLETE);
}
@ -522,7 +633,7 @@ void RsGenExchange::publishGrps()
delete grp;
// add to published to allow acknowledgement, grpid is empty as grp creation failed
mGrpPublished.insert(std::make_pair(mit->first, RsGxsGroupId("")));
mGrpNotify.insert(std::make_pair(mit->first, RsGxsGroupId("")));
mDataAccess->updatePublicRequestStatus(mit->first, RsTokenServiceV2::GXS_REQUEST_STATUS_FAILED);
continue;
}

View file

@ -161,7 +161,7 @@ public:
* @param msgIds map of grpid->msgIds of message created/modified
* @return true if token exists false otherwise
*/
bool acknowledgeTokenMsg(const uint32_t& token, std::pair<RsGxsGroupId, RsGxsMessageId>& msgId);
bool acknowledgeTokenMsg(const uint32_t& token, RsGxsGrpMsgIdPair& msgId);
/*!
* This allows the client service to acknowledge that their grps has
@ -184,7 +184,7 @@ protected:
* @param token
* @param grpItem
*/
bool publishGroup(uint32_t& token, RsGxsGrpItem* grpItem);
void publishGroup(uint32_t& token, RsGxsGrpItem* grpItem);
/*!
* Enables publication of a message item
@ -194,7 +194,23 @@ protected:
* @param token
* @param msgItem
*/
bool publishMsg(uint32_t& token, RsGxsMsgItem* msgItem);
void publishMsg(uint32_t& token, RsGxsMsgItem* msgItem);
/*!
* sets the group subscribe flag
* @param token this is set to token value associated to this request
* @param
*/
void setGroupSubscribeFlag(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& status);
void setGroupStatusFlag(uint32_t& token, const RsGxsGroupId& grpId, const uint32_t& status);
void setGroupServiceString(uint32_t& token, const RsGxsGroupId& grpId, const std::string& servString);
void setMsgStatusFlag(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const uint32_t& status);
void setMsgServiceString(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const std::string& servString );
protected:
@ -227,6 +243,16 @@ private:
void publishMsgs();
/*!
* processes msg local meta changes
*/
void processMsgMetaChanges();
/*!
* Processes group local meta changes
*/
void processGrpMetaChanges();
void createGroup(RsNxsGrp* grp);
bool createMessage(RsNxsMsg* msg);
@ -244,8 +270,12 @@ private:
std::map<uint32_t, RsGxsGrpItem*> mGrpsToPublish;
std::map<uint32_t, RsGxsMsgItem*> mMsgsToPublish;
std::map<uint32_t, std::pair<RsGxsGroupId, RsGxsMessageId> > mMsgPublished;
std::map<uint32_t, RsGxsGroupId> mGrpPublished;
std::map<uint32_t, RsGxsGrpMsgIdPair > mMsgNotify;
std::map<uint32_t, RsGxsGroupId> mGrpNotify;
// for loc meta changes
std::map<uint32_t, GrpLocMetaData > mGrpLocMetaMap;
std::map<uint32_t, MsgLocMetaData> mMsgLocMetaMap;
std::vector<RsGxsNotify*> mNotifications;

View file

@ -153,58 +153,22 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType,
return true;
}
bool RsGxsDataAccess::requestSetGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId &grpId, uint32_t subscribeFlags,
uint32_t subscribeMask)
bool RsGxsDataAccess::requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptionsV2 &opts, const GxsMsgReq& msgIds)
{
generateToken(token);
MsgRelatedInfoReq* req = new MsgRelatedInfoReq();
req->mMsgIds = msgIds;
GroupSetFlagReq* req = new GroupSetFlagReq();
generateToken(token);
req->flag = subscribeFlags;
req->flagMask = subscribeMask;
req->grpId = grpId;
setReq(req, token, ansType, opts);
storeRequest(req);
std::cerr << "RsGxsDataAccess::requestSetGroupSubscribeFlags() gets Token: " << token << std::endl;
storeRequest(req);
return false;
return true;
}
bool RsGxsDataAccess::requestSetGroupStatus(uint32_t& token, const RsGxsGroupId& grpId, uint32_t status, uint32_t statusMask)
{
generateToken(token);
GroupSetFlagReq* req = new GroupSetFlagReq();
req->flag = status;
req->flagMask = statusMask;
req->grpId = grpId;
std::cerr << "RsGxsDataAccess::requestSetGroupStatus() gets Token: " << token << std::endl;
storeRequest(req);
return true;
}
bool RsGxsDataAccess::requestSetMessageStatus(uint32_t& token, const RsGxsGrpMsgIdPair &msgId, uint32_t status,
uint32_t statusMask)
{
generateToken(token);
MessageSetFlagReq* req = new MessageSetFlagReq();
req->flag = status;
req->flagMask = statusMask;
req->msgId = msgId;
std::cerr << "RsGxsDataAccess::requestSetGroupStatus() gets Token: " << token << std::endl;
storeRequest(req);
return true;
}
void RsGxsDataAccess::setReq(GxsRequest* req, const uint32_t& token, const uint32_t& ansType, const RsTokReqOptionsV2& opts) const
{
@ -424,6 +388,37 @@ bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds)
return true;
}
bool RsGxsDataAccess::getMsgRelatedInfo(const uint32_t &token, GxsMsgIdResult &msgIds)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){
std::cerr << "RsGxsDataAccess::getMsgRelatedInfo() Unable to retrieve group data" << std::endl;
return false;
}else if(req->status == GXS_REQUEST_STATUS_COMPLETE){
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req);
if(mrireq)
{
msgIds = mrireq->mMsgIdResult;
locked_updateRequestStatus(token, GXS_REQUEST_STATUS_DONE);
}else{
std::cerr << "RsGxsDataAccess::::getMsgRelatedInfo() Req found, failed caste" << std::endl;
return false;
}
}else{
std::cerr << "RsGxsDataAccess::::getMsgRelatedInfo() Req not ready" << std::endl;
return false;
}
return true;
}
bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list<RsGxsGroupId>& groupIds)
{
RsStackMutex stack(mDataMutex);
@ -488,6 +483,7 @@ void RsGxsDataAccess::processRequests()
MsgMetaReq* mmr;
MsgDataReq* mdr;
MsgIdReq* mir;
MsgRelatedInfoReq* mri;
for(it = mRequests.begin(); it != mRequests.end(); it++)
{
@ -527,6 +523,10 @@ void RsGxsDataAccess::processRequests()
{
getMsgList(mir);
}
else if((mri = dynamic_cast<MsgRelatedInfoReq*>(req)) != NULL)
{
getMsgRelatedInfo(mri);
}
else
{
#ifdef GXSDATA_SERVE_DEBUG
@ -569,7 +569,6 @@ void RsGxsDataAccess::processRequests()
bool RsGxsDataAccess::getGroupData(GroupDataReq* req)
{
std::map<RsGxsGroupId, RsNxsGrp*> grpData;
std::list<RsGxsGroupId>::iterator lit = req->mGroupIds.begin(),
@ -650,170 +649,200 @@ bool RsGxsDataAccess::getMsgSummary(MsgMetaReq* req)
return true;
}
bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq* req)
{
GxsMsgMetaResult result;
const RsTokReqOptionsV2& opts = req->Options;
{
RsStackMutex stack(mDataMutex);
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
}
/* CASEs this handles.
* Input is groupList + Flags.
* 1) No Flags => All Messages in those Groups.
*
*/
std::cerr << "RsGxsDataAccess::getMsgList()";
std::cerr << std::endl;
bool onlyOrigMsgs = false;
bool onlyLatestMsgs = false;
bool onlyThreadHeadMsgs = false;
// Can only choose one of these two.
if (opts.mOptions & RS_TOKREQOPT_MSG_ORIGMSG)
{
std::cerr << "RsGxsDataAccess::getMsgList() MSG_ORIGMSG";
std::cerr << std::endl;
onlyOrigMsgs = true;
}
else if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST)
{
std::cerr << "RsGxsDataAccess::getMsgList() MSG_LATEST";
std::cerr << std::endl;
onlyLatestMsgs = true;
}
if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD)
{
std::cerr << "RsGxsDataAccess::getMsgList() MSG_THREAD";
std::cerr << std::endl;
onlyThreadHeadMsgs = true;
}
GxsMsgMetaResult::iterator meta_it;
MsgMetaFilter metaFilter;
for(meta_it = result.begin(); meta_it != result.end(); meta_it++)
{
const RsGxsGroupId& grpId = meta_it->first;
metaFilter[grpId] = std::map<RsGxsMessageId, RsGxsMsgMetaData*>();
const std::vector<RsGxsMsgMetaData*>& metaV = meta_it->second;
if (onlyLatestMsgs) // THIS ONE IS HARD -> LOTS OF COMP.
{
std::vector<RsGxsMsgMetaData*>::const_iterator vit = metaV.begin();
// RUN THROUGH ALL MSGS... in map origId -> TS.
std::map<RsGxsGroupId, std::pair<RsGxsMessageId, time_t> > origMsgTs;
std::map<RsGxsGroupId, std::pair<RsGxsMessageId, time_t> >::iterator oit;
for(; vit != metaV.end(); vit++)
{
RsGxsMsgMetaData* msgMeta = *vit;
/* if we are grabbing thread Head... then parentId == empty. */
if (onlyThreadHeadMsgs)
{
if (!(msgMeta->mParentId.empty()))
{
continue;
}
}
oit = origMsgTs.find(msgMeta->mOrigMsgId);
bool addMsg = false;
if (oit == origMsgTs.end())
{
std::cerr << "RsGxsDataAccess::getMsgList() Found New OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
std::cerr << std::endl;
addMsg = true;
}
// check timestamps.
else if (oit->second.second < msgMeta->mPublishTs)
{
std::cerr << "RsGxsDataAccess::getMsgList() Found Later Msg. OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
addMsg = true;
}
if (addMsg)
{
// add as latest. (overwriting if necessary)
origMsgTs[msgMeta->mOrigMsgId] = std::make_pair(msgMeta->mMsgId, msgMeta->mPublishTs);
metaFilter[grpId].insert(std::make_pair(msgMeta->mMsgId, msgMeta));
}
}
// Add the discovered Latest Msgs.
for(oit = origMsgTs.begin(); oit != origMsgTs.end(); oit++)
{
req->mMsgIdResult[grpId].push_back(oit->second.first);
}
}
else // ALL OTHER CASES.
{
std::vector<RsGxsMsgMetaData*>::const_iterator vit = metaV.begin();
for(; vit != metaV.end(); vit++)
{
RsGxsMsgMetaData* msgMeta = *vit;
bool add = false;
/* if we are grabbing thread Head... then parentId == empty. */
if (onlyThreadHeadMsgs)
{
if (!(msgMeta->mParentId.empty()))
{
continue;
}
}
if (onlyOrigMsgs)
{
if (msgMeta->mMsgId == msgMeta->mOrigMsgId)
{
add = true;
}
}
else
{
add = true;
}
if (add)
{
req->mMsgIdResult[grpId].push_back(msgMeta->mMsgId);
}
}
}
}
filterMsgList(req->mMsgIdResult, opts, metaFilter);
// delete the data
cleanseMetaFilter(metaFilter);
return true;
}
bool RsGxsDataAccess::getMsgList(MsgIdReq* req)
{
GxsMsgMetaResult result;
const RsTokReqOptionsV2& opts = req->Options;
GxsMsgMetaResult result;
{
RsStackMutex stack(mDataMutex);
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
}
{
RsStackMutex stack(mDataMutex);
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
}
GxsMsgMetaResult::iterator mit = result.begin(), mit_end = result.end();
/* CASEs this handles.
* Input is groupList + Flags.
* 1) No Flags => All Messages in those Groups.
*
*/
std::cerr << "RsGxsDataAccess::getMsgList()";
std::cerr << std::endl;
for(; mit != mit_end; mit++)
{
const RsGxsGroupId grpId = mit->first;
std::vector<RsGxsMsgMetaData*>& metaV = mit->second;
std::vector<RsGxsMsgMetaData*>::iterator vit = metaV.begin(),
vit_end = metaV.end();
bool onlyOrigMsgs = false;
bool onlyLatestMsgs = false;
bool onlyThreadHeadMsgs = false;
// Can only choose one of these two.
if (opts.mOptions & RS_TOKREQOPT_MSG_ORIGMSG)
{
std::cerr << "RsGxsDataAccess::getMsgList() MSG_ORIGMSG";
std::cerr << std::endl;
onlyOrigMsgs = true;
}
else if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST)
{
std::cerr << "RsGxsDataAccess::getMsgList() MSG_LATEST";
std::cerr << std::endl;
onlyLatestMsgs = true;
}
if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD)
{
std::cerr << "RsGxsDataAccess::getMsgList() MSG_THREAD";
std::cerr << std::endl;
onlyThreadHeadMsgs = true;
}
GxsMsgMetaResult::iterator meta_it;
MsgMetaFilter metaFilter;
for(meta_it = result.begin(); meta_it != result.end(); meta_it++)
{
const RsGxsGroupId& grpId = meta_it->first;
metaFilter[grpId] = std::map<RsGxsMessageId, RsGxsMsgMetaData*>();
const std::vector<RsGxsMsgMetaData*>& metaV = meta_it->second;
if (onlyLatestMsgs) // THIS ONE IS HARD -> LOTS OF COMP.
{
std::vector<RsGxsMsgMetaData*>::const_iterator vit = metaV.begin();
// RUN THROUGH ALL MSGS... in map origId -> TS.
std::map<RsGxsGroupId, std::pair<RsGxsMessageId, time_t> > origMsgTs;
std::map<RsGxsGroupId, std::pair<RsGxsMessageId, time_t> >::iterator oit;
for(; vit != metaV.end(); vit++)
{
RsGxsMsgMetaData* msgMeta = *vit;
/* if we are grabbing thread Head... then parentId == empty. */
if (onlyThreadHeadMsgs)
{
if (!(msgMeta->mParentId.empty()))
{
continue;
}
}
oit = origMsgTs.find(msgMeta->mOrigMsgId);
bool addMsg = false;
if (oit == origMsgTs.end())
{
std::cerr << "RsGxsDataAccess::getMsgList() Found New OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
std::cerr << std::endl;
addMsg = true;
}
// check timestamps.
else if (oit->second.second < msgMeta->mPublishTs)
{
std::cerr << "RsGxsDataAccess::getMsgList() Found Later Msg. OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
addMsg = true;
}
if (addMsg)
{
// add as latest. (overwriting if necessary)
origMsgTs[msgMeta->mOrigMsgId] = std::make_pair(msgMeta->mMsgId, msgMeta->mPublishTs);
metaFilter[grpId].insert(std::make_pair(msgMeta->mMsgId, msgMeta));
}
}
// Add the discovered Latest Msgs.
for(oit = origMsgTs.begin(); oit != origMsgTs.end(); oit++)
{
req->mMsgIds[grpId].push_back(oit->second.first);
}
}
else // ALL OTHER CASES.
{
std::vector<RsGxsMsgMetaData*>::const_iterator vit = metaV.begin();
for(; vit != metaV.end(); vit++)
{
RsGxsMsgMetaData* msgMeta = *vit;
bool add = false;
/* if we are grabbing thread Head... then parentId == empty. */
if (onlyThreadHeadMsgs)
{
if (!(msgMeta->mParentId.empty()))
{
continue;
}
}
if (onlyOrigMsgs)
{
if (msgMeta->mMsgId == msgMeta->mOrigMsgId)
{
add = true;
}
}
else
{
add = true;
}
if (add)
{
req->mMsgIdResult[grpId].push_back(msgMeta->mMsgId);
}
}
}
}
filterMsgList(req->mMsgIdResult, opts, metaFilter);
// delete the data
cleanseMetaFilter(metaFilter);
return true;
for(; vit != vit_end; vit++)
{
RsGxsMsgMetaData* meta = *vit;
req->mMsgIdResult[grpId].push_back(meta->mMsgId);
delete meta; // discard meta data mem
}
}
return true;
}
void RsGxsDataAccess::cleanseMetaFilter(MsgMetaFilter& filter)

View file

@ -64,36 +64,14 @@ public:
bool requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptionsV2 &opts, const GxsMsgReq&);
/*!
* This sets the status of the message
* @param msgId the message id to set status for
* @param status status
* @param statusMask the mask for the settings targetted
* @return true if request made successfully, false otherwise
* For requesting msgs related to a given msg id within a group
* @param token The token returned for the request
* @param ansType The type of result wanted
* @param opts Additional option that affect outcome of request. Please see specific services, for valid values
* @param groupIds The ids of the groups to get, second entry of map empty to query for all msgs
* @return true if request successful false otherwise
*/
bool requestSetMessageStatus(uint32_t &token, const RsGxsGrpMsgIdPair &msgId,
const uint32_t status, const uint32_t statusMask);
/*!
*
* @param token
* @param grpId
* @param status
* @param statusMask
* @return true if request made successfully, false otherwise
*/
bool requestSetGroupStatus(uint32_t &token, const RsGxsGroupId &grpId, const uint32_t status,
const uint32_t statusMask);
/*!
* Use request status to find out if successfully set
* @param groupId
* @param subscribeFlags
* @param subscribeMask
* @return true if request made successfully, false otherwise
*/
bool requestSetGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId &groupId, uint32_t subscribeFlags,
uint32_t subscribeMask);
bool requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptionsV2 &opts, const GxsMsgReq&);
/* Poll */
uint32_t requestStatus(const uint32_t token);
@ -103,8 +81,6 @@ public:
/** E: RsTokenService **/
public:
/*!
@ -177,6 +153,14 @@ public:
*/
bool getMsgData(const uint32_t &token, NxsMsgDataResult& msgData);
/*!
*
* @param token request token to be redeemed
* @param msgIds
* @return false if data cannot be found for token
*/
bool getMsgRelatedInfo(const uint32_t &token, GxsMsgIdResult &msgIds);
private:
/** helper functions to implement token service **/
@ -321,6 +305,15 @@ private:
*/
bool getMsgData(MsgDataReq* req);
/*!
* Attempts to retrieve messages related to msgIds of associated equest
* @param token request token to be redeemed
* @param msgIds
* @return false if data cannot be found for token
*/
bool getMsgRelatedInfo(MsgRelatedInfoReq* req);
/*!
* This filter msgs based of options supplied (at the moment just status masks)
* @param msgIds The msgsIds to filter

View file

@ -95,6 +95,14 @@ public:
NxsMsgDataResult mMsgData;
};
class MsgRelatedInfoReq : public GxsRequest
{
public:
GxsMsgReq mMsgIds;
GxsMsgIdResult mMsgIdResult;
};
class GroupSetFlagReq : public GxsRequest
{
public:

View file

@ -132,46 +132,20 @@ public:
* @param ansType The type of result wanted
* @param opts Additional option that affect outcome of request. Please see specific services, for valid values
* @param groupIds The ids of the groups to get, second entry of map empty to query for all msgs
* @return
* @return true if request successful false otherwise
*/
virtual bool requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptionsV2 &opts, const GxsMsgReq& msgIds) = 0;
/*!
* This sets the status of the message
* @param msgId the message id to set status for
* @param status status
* @param statusMask the mask for the settings targetted
* @return true if request made successfully, false otherwise
* For requesting msgs related to a given msg id within a group
* @param token The token returned for the request
* @param ansType The type of result wanted
* @param opts Additional option that affect outcome of request. Please see specific services, for valid values
* @param groupIds The ids of the groups to get, second entry of map empty to query for all msgs
* @return true if request successful false otherwise
*/
virtual bool requestSetMessageStatus(uint32_t &token, const RsGxsGrpMsgIdPair &msgId,
const uint32_t status, const uint32_t statusMask) = 0;
virtual bool requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptionsV2 &opts, const GxsMsgReq& msgIds) = 0;
/*!
* Set the status of a group given by group Id
* @param token The token returned for this request
* @param grpId The Id of the group to apply status change to
* @param status The status to apply
* @param statusMask The status mask (target particular type of status)
* @return true if request made successfully, false otherwise
*/
virtual bool requestSetGroupStatus(uint32_t &token, const RsGxsGroupId &grpId, const uint32_t status,
const uint32_t statusMask) = 0;
/*!
* Use request status to find out if successfully set
* @param groupId
* @param subscribeFlags
* @param subscribeMask
* @return true if request made successfully, false otherwise
*/
virtual bool requestSetGroupSubscribeFlags(uint32_t& token, const RsGxsGroupId &groupId, uint32_t subscribeFlags, uint32_t subscribeMask) = 0;
// (FUTURE WORK).
//virtual bool groupRestoreKeys(const std::string &groupId) = 0;
//virtual bool groupShareKeys(const std::string &groupId, std::list<std::string>& peers) = 0;
/* Poll */