merging gxs_phase2 branch

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6401 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2013-06-04 21:00:43 +00:00
parent 1150366913
commit 325fa4f222
116 changed files with 6050 additions and 3596 deletions

View file

@ -47,6 +47,7 @@
#define KEY_NXS_FLAGS std::string("flags")
#define KEY_NXS_META std::string("meta")
#define KEY_NXS_SERV_STRING std::string("serv_str")
#define KEY_NXS_HASH std::string("hash")
// grp table columns
@ -57,6 +58,7 @@
#define KEY_GRP_CIRCLE_TYPE std::string("circleType")
#define KEY_GRP_INTERNAL_CIRCLE std::string("internalCircle")
#define KEY_GRP_ORIGINATOR std::string("originator")
#define KEY_GRP_AUTHEN_FLAGS std::string("authenFlags")
// grp local
#define KEY_GRP_SUBCR_FLAG std::string("subscribeFlag")
@ -94,31 +96,32 @@
// grp col numbers
#define COL_KEY_SET 5
#define COL_GRP_SUBCR_FLAG 6
#define COL_GRP_POP 7
#define COL_MSG_COUNT 8
#define COL_GRP_STATUS 9
#define COL_GRP_NAME 10
#define COL_GRP_LAST_POST 11
#define COL_ORIG_GRP_ID 12
#define COL_GRP_SERV_STRING 13
#define COL_GRP_SIGN_FLAGS 14
#define COL_GRP_CIRCLE_ID 15
#define COL_GRP_CIRCL_TYPE 16
#define COL_GRP_INTERN_CIRCLE 17
#define COL_GRP_ORIGINATOR 18
#define COL_KEY_SET 6
#define COL_GRP_SUBCR_FLAG 7
#define COL_GRP_POP 8
#define COL_MSG_COUNT 9
#define COL_GRP_STATUS 10
#define COL_GRP_NAME 11
#define COL_GRP_LAST_POST 12
#define COL_ORIG_GRP_ID 13
#define COL_GRP_SERV_STRING 14
#define COL_GRP_SIGN_FLAGS 15
#define COL_GRP_CIRCLE_ID 16
#define COL_GRP_CIRCL_TYPE 17
#define COL_GRP_INTERN_CIRCLE 18
#define COL_GRP_ORIGINATOR 19
#define COL_GRP_AUTHEN_FLAGS 20
// msg col numbers
#define COL_MSG_ID 5
#define COL_ORIG_MSG_ID 6
#define COL_MSG_STATUS 7
#define COL_CHILD_TS 8
#define COL_PARENT_ID 9
#define COL_THREAD_ID 10
#define COL_MSG_NAME 11
#define COL_MSG_SERV_STRING 12
#define COL_MSG_ID 6
#define COL_ORIG_MSG_ID 7
#define COL_MSG_STATUS 8
#define COL_CHILD_TS 9
#define COL_PARENT_ID 10
#define COL_THREAD_ID 11
#define COL_MSG_NAME 12
#define COL_MSG_SERV_STRING 13
// generic meta shared col numbers
#define COL_GRP_ID 0
@ -126,6 +129,7 @@
#define COL_NXS_FLAGS 2
#define COL_SIGN_SET 3
#define COL_IDENTITY 4
#define COL_HASH 5
#define RS_DATA_SERVICE_DEBUG
@ -139,15 +143,15 @@ const std::string RsGeneralDataService::MSG_META_STATUS = KEY_MSG_STATUS;
const uint32_t RsGeneralDataService::GXS_MAX_ITEM_SIZE = 1572864; // 1.5 Mbytes
RsDataService::RsDataService(const std::string &serviceDir, const std::string &dbName, uint16_t serviceType,
RsGxsSearchModule *mod)
RsGxsSearchModule *mod, const std::string& key)
: RsGeneralDataService(), mServiceDir(serviceDir), mDbName(mServiceDir + "/" + dbName), mServType(serviceType),
mDbMutex("RsDataService"){
mDbMutex("RsDataService"), mDb( new RetroDb(mDbName, RetroDb::OPEN_READWRITE_CREATE, key)) {
initialise();
// for retrieving msg meta
msgMetaColumns.push_back(KEY_GRP_ID); msgMetaColumns.push_back(KEY_TIME_STAMP); msgMetaColumns.push_back(KEY_NXS_FLAGS);
msgMetaColumns.push_back(KEY_SIGN_SET); msgMetaColumns.push_back(KEY_NXS_IDENTITY);
msgMetaColumns.push_back(KEY_SIGN_SET); msgMetaColumns.push_back(KEY_NXS_IDENTITY); msgMetaColumns.push_back(KEY_NXS_HASH);
msgMetaColumns.push_back(KEY_MSG_ID); msgMetaColumns.push_back(KEY_ORIG_MSG_ID); msgMetaColumns.push_back(KEY_MSG_STATUS);
msgMetaColumns.push_back(KEY_CHILD_TS); msgMetaColumns.push_back(KEY_MSG_PARENT_ID); msgMetaColumns.push_back(KEY_MSG_THREAD_ID);
msgMetaColumns.push_back(KEY_MSG_NAME); msgMetaColumns.push_back(KEY_NXS_SERV_STRING);
@ -158,12 +162,13 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
// for retrieving grp meta data
grpMetaColumns.push_back(KEY_GRP_ID); grpMetaColumns.push_back(KEY_TIME_STAMP); grpMetaColumns.push_back(KEY_NXS_FLAGS);
grpMetaColumns.push_back(KEY_SIGN_SET); grpMetaColumns.push_back(KEY_NXS_IDENTITY);
grpMetaColumns.push_back(KEY_SIGN_SET); grpMetaColumns.push_back(KEY_NXS_IDENTITY); grpMetaColumns.push_back(KEY_NXS_HASH);
grpMetaColumns.push_back(KEY_KEY_SET); grpMetaColumns.push_back(KEY_GRP_SUBCR_FLAG); grpMetaColumns.push_back(KEY_GRP_POP);
grpMetaColumns.push_back(KEY_MSG_COUNT); grpMetaColumns.push_back(KEY_GRP_STATUS); grpMetaColumns.push_back(KEY_GRP_NAME);
grpMetaColumns.push_back(KEY_GRP_LAST_POST); grpMetaColumns.push_back(KEY_ORIG_GRP_ID); grpMetaColumns.push_back(KEY_NXS_SERV_STRING);
grpMetaColumns.push_back(KEY_GRP_SIGN_FLAGS); grpMetaColumns.push_back(KEY_GRP_CIRCLE_ID); grpMetaColumns.push_back(KEY_GRP_CIRCLE_TYPE);
grpMetaColumns.push_back(KEY_GRP_INTERNAL_CIRCLE); grpMetaColumns.push_back(KEY_GRP_ORIGINATOR);
grpMetaColumns.push_back(KEY_GRP_AUTHEN_FLAGS);
// for retrieving actual grp data
grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_FILE); grpColumns.push_back(KEY_NXS_FILE_OFFSET);
@ -184,7 +189,7 @@ void RsDataService::initialise(){
RsStackMutex stack(mDbMutex);
// initialise database
mDb = new RetroDb(mDbName, RetroDb::OPEN_READWRITE_CREATE);
// create table for msg data
mDb->execSQL("CREATE TABLE " + MSG_TABLE_NAME + "(" +
@ -204,6 +209,7 @@ void RsDataService::initialise(){
KEY_MSG_PARENT_ID + " TEXT,"+
KEY_MSG_NAME + " TEXT," +
KEY_NXS_SERV_STRING + " TEXT," +
KEY_NXS_HASH + " TEXT," +
KEY_NXS_FILE_LEN + " INT);");
// create table for grp data
@ -225,11 +231,13 @@ void RsDataService::initialise(){
KEY_ORIG_GRP_ID + " TEXT," +
KEY_NXS_SERV_STRING + " TEXT," +
KEY_NXS_FLAGS + " INT," +
KEY_GRP_AUTHEN_FLAGS + " INT," +
KEY_GRP_SIGN_FLAGS + " INT," +
KEY_GRP_CIRCLE_ID + " TEXT," +
KEY_GRP_CIRCLE_TYPE + " INT," +
KEY_GRP_INTERNAL_CIRCLE + " TEXT," +
KEY_GRP_ORIGINATOR + " TEXT," +
KEY_NXS_HASH + " TEXT," +
KEY_SIGN_SET + " BLOB);");
}
@ -256,6 +264,7 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c)
c.getString(COL_GRP_NAME, grpMeta->mGroupName);
c.getString(COL_ORIG_GRP_ID, grpMeta->mOrigGrpId);
c.getString(COL_GRP_SERV_STRING, grpMeta->mServiceString);
c.getString(COL_HASH, grpMeta->mHash);
grpMeta->mSignFlags = c.getInt32(COL_GRP_SIGN_FLAGS);
grpMeta->mPublishTs = c.getInt32(COL_TIME_STAMP);
@ -281,6 +290,7 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c)
grpMeta->mCircleType = c.getInt32(COL_GRP_CIRCL_TYPE);
c.getString(COL_GRP_INTERN_CIRCLE, grpMeta->mInternalCircle);
c.getString(COL_GRP_ORIGINATOR, grpMeta->mOriginator);
grpMeta->mAuthenFlags = c.getInt32(COL_GRP_AUTHEN_FLAGS);
if(ok)
@ -366,6 +376,7 @@ RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c)
c.getString(COL_IDENTITY, msgMeta->mAuthorId);
c.getString(COL_MSG_NAME, msgMeta->mMsgName);
c.getString(COL_MSG_SERV_STRING, msgMeta->mServiceString);
c.getString(COL_HASH, msgMeta->mHash);
offset = 0;
data = (char*)c.getData(COL_SIGN_SET, data_len);
@ -477,6 +488,7 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
cv.put(KEY_MSG_ID, msgMetaPtr->mMsgId);
cv.put(KEY_GRP_ID, msgMetaPtr->mGroupId);
cv.put(KEY_NXS_SERV_STRING, msgMetaPtr->mServiceString);
cv.put(KEY_NXS_HASH, msgMetaPtr->mHash);
char signSetData[msgMetaPtr->signSet.TlvSize()];
@ -582,6 +594,8 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
cv.put(KEY_GRP_CIRCLE_TYPE, (int32_t)grpMetaPtr->mCircleType);
cv.put(KEY_GRP_INTERNAL_CIRCLE, grpMetaPtr->mInternalCircle);
cv.put(KEY_GRP_ORIGINATOR, grpMetaPtr->mOriginator);
cv.put(KEY_GRP_AUTHEN_FLAGS, (int32_t)grpMetaPtr->mAuthenFlags);
cv.put(KEY_NXS_HASH, grpMetaPtr->mHash);
if(! (grpMetaPtr->mAuthorId.empty()) ){
cv.put(KEY_NXS_IDENTITY, grpMetaPtr->mAuthorId);
@ -1018,12 +1032,6 @@ int RsDataService::resetDataStore()
return 1;
}
int RsDataService::removeGroups(const std::vector<std::string> &grpIds)
{
return 0;
}
int RsDataService::updateGroupMetaData(GrpLocMetaData &meta)
{
RsStackMutex stack(mDbMutex);
@ -1055,7 +1063,7 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
// get for all msgs their offsets and lengths
// for message not contained in msg id vector
// store their data file segments in buffer
// then recalculate the retained messages
// then recalculate the retained messages'
// new offsets, update db with new offsets
// replace old msg file with new file
// remove messages that were not retained from
@ -1113,7 +1121,7 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
up.msgId = m.msgId;
up.cv.put(KEY_NXS_FILE_OFFSET, (int32_t)newOffset);
newBuffer.insert(dataBuff.end(), dataBuff.begin()+m.msgOffset,
newBuffer.insert(newBuffer.end(), dataBuff.begin()+m.msgOffset,
dataBuff.begin()+m.msgOffset+m.msgLen);
newOffset += msgLen;
@ -1148,6 +1156,26 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
return 1;
}
int RsDataService::removeGroups(const std::vector<std::string> &grpIds)
{
RsStackMutex stack(mDbMutex);
// the grp id is the group file name
// first remove file then remove group
// from db
std::vector<std::string>::const_iterator vit = grpIds.begin();
for(; vit != grpIds.end(); vit++)
{
const std::string grpFileName = mServiceDir + "/" + *vit;
remove(grpFileName.c_str());
}
locked_removeGroupEntries(grpIds);
return 1;
}
bool RsDataService::locked_updateMessageEntries(const MsgUpdates& updates)
@ -1203,6 +1231,24 @@ bool RsDataService::locked_removeMessageEntries(const GxsMsgReq& msgIds)
return ret;
}
bool RsDataService::locked_removeGroupEntries(const std::vector<std::string>& grpIds)
{
// start a transaction
bool ret = mDb->execSQL("BEGIN;");
std::vector<std::string>::const_iterator vit = grpIds.begin();
for(; vit != grpIds.end(); vit++)
{
const RsGxsGroupId& grpId = *vit;
mDb->sqlDelete(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId + "'", "");
}
ret &= mDb->execSQL("COMMIT;");
return ret;
}
void RsDataService::locked_getMessageOffsets(const RsGxsGroupId& grpId, std::vector<MsgOffset>& offsets)
{

View file

@ -52,7 +52,8 @@ class RsDataService : public RsGeneralDataService
{
public:
RsDataService(const std::string& serviceDir, const std::string& dbName, uint16_t serviceType, RsGxsSearchModule* mod = NULL);
RsDataService(const std::string& serviceDir, const std::string& dbName, uint16_t serviceType,
RsGxsSearchModule* mod = NULL, const std::string& key = "");
virtual ~RsDataService();
/*!
@ -211,6 +212,7 @@ private:
* @param msgIds
*/
bool locked_removeMessageEntries(const GxsMsgReq& msgIds);
bool locked_removeGroupEntries(const std::vector<std::string>& grpIds);
typedef std::map<RsGxsGroupId, std::vector<MsgUpdate> > MsgUpdates;
@ -228,9 +230,6 @@ private:
private:
RetroDb* mDb;
RsMutex mDbMutex;
std::list<std::string> msgColumns;
@ -242,6 +241,8 @@ private:
std::string mServiceDir, mDbName;
uint16_t mServType;
RetroDb* mDb;
};
#endif // RSDATASERVICE_H

View file

@ -36,6 +36,7 @@
#include "gxssecurity.h"
#include "util/contentvalue.h"
#include "retroshare/rsgxsflags.h"
#include "retroshare/rsgxscircles.h"
#include "rsgixs.h"
#include "rsgxsutil.h"
@ -55,6 +56,7 @@
#define GEN_EXCH_DEBUG 1
#define MSG_CLEANUP_PERIOD 60*5 // 5 minutes
#define INTEGRITY_CHECK_PERIOD 60*30 // 30 minutes
RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns,
RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs,
@ -64,7 +66,8 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
CREATE_FAIL(0), CREATE_SUCCESS(1), CREATE_FAIL_TRY_LATER(2), SIGN_MAX_ATTEMPTS(5),
SIGN_FAIL(0), SIGN_SUCCESS(1), SIGN_FAIL_TRY_LATER(2),
VALIDATE_FAIL(0), VALIDATE_SUCCESS(1), VALIDATE_FAIL_TRY_LATER(2), VALIDATE_MAX_ATTEMPTS(5),
mCleaning(false), mLastClean(time(NULL)), mMsgCleanUp(NULL)
mCleaning(false), mLastClean(time(NULL)), mMsgCleanUp(NULL), mChecking(false), mIntegrityCheck(NULL),
mLastCheck(time(NULL))
{
mDataAccess = new RsGxsDataAccess(gds);
@ -146,6 +149,28 @@ void RsGenExchange::tick()
mCleaning = true;
}
}
now = time(NULL);
if(mChecking || (mLastCheck + INTEGRITY_CHECK_PERIOD < now))
{
if(mIntegrityCheck)
{
if(mIntegrityCheck->isDone())
{
mIntegrityCheck->join();
delete mIntegrityCheck;
mIntegrityCheck = NULL;
mLastCheck = time(NULL);
mChecking = false;
}
}
else
{
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore);
mIntegrityCheck->start();
mChecking = true;
}
}
}
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
@ -1585,6 +1610,7 @@ void RsGenExchange::publishMsgs()
mMsgsToPublish.insert(std::make_pair(sign_it->first, item.mItem));
}
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > msgChangeMap;
std::map<uint32_t, RsGxsMsgItem*>::iterator mit = mMsgsToPublish.begin();
for(; mit != mMsgsToPublish.end(); mit++)
@ -1700,13 +1726,16 @@ void RsGenExchange::publishMsgs()
msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_UNREAD;
msgId = msg->msgId;
grpId = msg->grpId;
computeHash(msg->msg, msg->metaData->mHash);
mDataAccess->addMsgData(msg);
msgChangeMap[grpId].push_back(msgId);
delete[] metaDataBuff;
// add to published to allow acknowledgement
mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(grpId, msgId)));
mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE);
}
else
{
@ -1738,6 +1767,14 @@ void RsGenExchange::publishMsgs()
// clear msg item map as we're done publishing them and all
// entries are invalid
mMsgsToPublish.clear();
if(!msgChangeMap.empty())
{
RsGxsMsgChange* ch = new RsGxsMsgChange(RsGxsNotify::TYPE_PUBLISH);
ch->msgChangeMap = msgChangeMap;
mNotifications.push_back(ch);
}
}
RsGenExchange::ServiceCreate_Return RsGenExchange::service_CreateGroup(RsGxsGrpItem* /* grpItem */,
@ -1873,6 +1910,7 @@ void RsGenExchange::publishGrps()
if(mDataStore->validSize(grp) && serialOk)
{
grpId = grp->grpId;
computeHash(grp->grp, grp->metaData->mHash);
mDataAccess->addGroupData(grp);
}
else
@ -1932,6 +1970,7 @@ void RsGenExchange::publishGrps()
std::map<uint32_t, GrpNote>::iterator mit = toNotify.begin();
std::list<RsGxsGroupId> grpChanged;
for(; mit != toNotify.end(); mit++)
{
GrpNote& note = mit->second;
@ -1940,6 +1979,16 @@ void RsGenExchange::publishGrps()
mGrpNotify.insert(std::make_pair(mit->first, note.second));
mDataAccess->updatePublicRequestStatus(mit->first, status);
if(note.first)
grpChanged.push_back(note.second);
}
if(!grpChanged.empty())
{
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PUBLISH);
gc->mGrpIdList = grpChanged;
mNotifications.push_back(gc);
}
}
@ -1997,6 +2046,12 @@ void RsGenExchange::processRecvdData()
}
void RsGenExchange::computeHash(const RsTlvBinaryData& data, std::string& hash)
{
pqihash pHash;
pHash.addData(data.bin_data, data.bin_len);
pHash.Complete(hash);
}
void RsGenExchange::processRecvdMessages()
{
@ -2070,6 +2125,8 @@ void RsGenExchange::processRecvdMessages()
getMsgIdPair(*msg));
if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry);
computeHash(msg->msg, meta->mHash);
}
}
else
@ -2128,7 +2185,7 @@ void RsGenExchange::processRecvdMessages()
if(!msgIds.empty())
{
mDataStore->storeMessage(msgs);
RsGxsMsgChange* c = new RsGxsMsgChange();
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE);
c->msgChangeMap = msgIds;
mNotifications.push_back(c);
}
@ -2167,6 +2224,11 @@ void RsGenExchange::processRecvdGroups()
{
meta->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
meta->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED;
if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
meta->mOriginator = grp->PeerId();
computeHash(grp->grp, meta->mHash);
grps.insert(std::make_pair(grp, meta));
grpIds.push_back(grp->grpId);
@ -2215,7 +2277,7 @@ void RsGenExchange::processRecvdGroups()
if(!grpIds.empty())
{
RsGxsGroupChange* c = new RsGxsGroupChange();
RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE);
c->mGrpIdList = grpIds;
mNotifications.push_back(c);
mDataStore->storeGroup(grps);

View file

@ -307,7 +307,37 @@ protected:
*/
bool getGroupData(const uint32_t &token, std::vector<RsGxsGrpItem*>& grpItem);
template<class GrpType>
bool getGroupDataT(const uint32_t &token, std::vector<GrpType*>& grpItem)
{
std::vector<RsGxsGrpItem*> items;
bool ok = getGroupData(token, items);
std::vector<RsGxsGrpItem*>::iterator vit = items.begin();
for(; vit != items.end(); vit++)
{
RsGxsGrpItem* gi = *vit;
GrpType* item = dynamic_cast<GrpType*>(gi);
if(item)
{
grpItem.push_back(item);
}
else
{
#ifdef GXS_DEBUG
std::cerr << "\nRsGenExchange::getGroupDataT(): Wrong type!\n";
#endif
delete gi;
}
}
return ok;
}
public:
/*!
* retrieves message data associated to a request token
* @param token token to be redeemed for message item retrieval
@ -315,6 +345,40 @@ public:
*/
bool getMsgData(const uint32_t &token, GxsMsgDataMap& msgItems);
template <class MsgType>
bool getMsgDataT(const uint32_t &token, std::map<RsGxsGroupId,
std::vector<MsgType*> >& msgItems)
{
GxsMsgDataMap msgData;
bool ok = getMsgData(token, msgData);
GxsMsgDataMap::iterator mit = msgData.begin();
for(; mit != msgData.end(); mit++)
{
const RsGxsGroupId& grpId = mit->first;
std::vector<RsGxsMsgItem*>& mv = mit->second;
std::vector<RsGxsMsgItem*>::iterator vit = mv.begin();
for(; vit != mv.end(); vit++)
{
RsGxsMsgItem* mi = *vit;
MsgType* mt = dynamic_cast<MsgType*>(mi);
if(mt != NULL)
{
msgItems[grpId].push_back(mt);
}
else
{
std::cerr << "RsGenExchange::getMsgDataT(): bad cast to msg type" << std::endl;
delete mi;
}
}
}
return ok;
}
/*!
* retrieves message related data associated to a request token
* @param token token to be redeemed for message item retrieval
@ -670,6 +734,8 @@ private:
void groupShareKeys(std::list<std::string> peers);
static void computeHash(const RsTlvBinaryData& data, std::string& hash);
private:
RsMutex mGenMtx;
@ -718,6 +784,11 @@ private:
time_t mLastClean;
RsGxsMessageCleanUp* mMsgCleanUp;
bool mChecking, mCheckStarted;
time_t mLastCheck;
RsGxsIntegrityCheck* mIntegrityCheck;
private:
std::vector<RsGxsNotify*> mChanges;

View file

@ -165,6 +165,7 @@ public:
class GixsReputation
{
public:
GixsReputation() : score(0) {}
RsGxsId id;
int score;
};
@ -174,7 +175,9 @@ class RsGixsReputation
{
public:
// get Reputation.
virtual bool getReputation(const RsGxsId &id, const GixsReputation &rep) = 0;
virtual bool haveReputation(const RsGxsId &id) = 0;
virtual bool loadReputation(const RsGxsId &id) = 0;
virtual bool getReputation(const RsGxsId &id, GixsReputation &rep) = 0;
};
@ -206,6 +209,7 @@ class RsGcxs
virtual bool loadCircle(const RsGxsCircleId &circleId) = 0;
virtual int canSend(const RsGxsCircleId &circleId, const RsPgpId &id) = 0;
virtual int canReceive(const RsGxsCircleId &circleId, const RsPgpId &id) = 0;
virtual bool recipients(const RsGxsCircleId &circleId, std::list<RsPgpId> &friendlist) = 0;
};

View file

@ -48,6 +48,7 @@ uint32_t RsGxsGrpMetaData::serial_size()
s += keys.TlvSize();
s += 4; // for mCircleType
s += GetTlvStringSize(mCircleId);
s += 4; // mAuthenFlag
return s;
}
@ -72,6 +73,7 @@ void RsGxsGrpMetaData::clear(){
mInternalCircle.clear();
mOriginator.clear();
mCircleType = 0;
mAuthenFlags = 0;
}
@ -105,9 +107,11 @@ bool RsGxsGrpMetaData::serialise(void *data, uint32_t &pktsize)
ok &= setRawUInt32(data, tlvsize, &offset, mGroupFlags);
ok &= setRawUInt32(data, tlvsize, &offset, mPublishTs);
ok &= setRawUInt32(data, tlvsize, &offset, mCircleType);
ok &= setRawUInt32(data, tlvsize, &offset, mAuthenFlags);
ok &= SetTlvString(data, tlvsize, &offset, 0, mAuthorId);
ok &= SetTlvString(data, tlvsize, &offset, 0, mServiceString);
ok &= SetTlvString(data, tlvsize, &offset, 0, mCircleId);
ok &= signSet.SetTlv(data, tlvsize, &offset);
ok &= keys.SetTlv(data, tlvsize, &offset);
@ -133,6 +137,7 @@ bool RsGxsGrpMetaData::deserialise(void *data, uint32_t &pktsize)
ok &= getRawUInt32(data, pktsize, &offset, &mGroupFlags);
ok &= getRawUInt32(data, pktsize, &offset, &mPublishTs);
ok &= getRawUInt32(data, pktsize, &offset, &mCircleType);
ok &= getRawUInt32(data, pktsize, &offset, &mAuthenFlags);
ok &= GetTlvString(data, pktsize, &offset, 0, mAuthorId);
ok &= GetTlvString(data, pktsize, &offset, 0, mServiceString);
ok &= GetTlvString(data, pktsize, &offset, 0, mCircleId);
@ -278,6 +283,7 @@ void RsGxsGrpMetaData::operator =(const RsGroupMetaData& rMeta)
this->mCircleType = rMeta.mCircleType;
this->mInternalCircle = rMeta.mInternalCircle;
this->mOriginator = rMeta.mOriginator;
this->mAuthenFlags = rMeta.mAuthenFlags;
}
void RsGxsMsgMetaData::operator =(const RsMsgMetaData& rMeta)

View file

@ -65,6 +65,7 @@ public:
RsTlvSecurityKeySet keys;
std::string mServiceString;
uint32_t mAuthenFlags;
// BELOW HERE IS LOCAL DATA, THAT IS NOT FROM MSG.
@ -77,6 +78,7 @@ public:
uint32_t mGroupStatus;
std::string mOriginator;
std::string mInternalCircle;
std::string mHash;
};
@ -115,6 +117,7 @@ public:
uint32_t mMsgStatus;
time_t mChildTs;
std::string mHash;
bool validated;
};

View file

@ -3,7 +3,7 @@
*
* RetroShare C++ Interface.
*
* Copyright 2012-2012 by Robert Fernie, Christopher Evi-Parker
* Copyright 2012-2013 by Robert Fernie, Christopher Evi-Parker
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public

View file

@ -29,9 +29,11 @@
#include "rsgxsnetservice.h"
#include "retroshare/rsgxsflags.h"
#include "retroshare/rsgxscircles.h"
#include "retroshare/rspeers.h"
#define NXS_NET_DEBUG 1
#define GIXS_CUT_OFF 0
#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds
@ -39,10 +41,11 @@
const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000;
RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs)
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, RsGixsReputation* reputations, RsGcxs* circles)
: p3Config(servType), p3ThreadedService(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), mTransactionN(0),
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD), mSyncTs(0)
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD),
mSyncTs(0), mReputations(reputations), mCircles(circles)
{
addSerialType(new RsNxsSerialiser(mServType));
@ -279,6 +282,150 @@ struct GrpFragCollate
bool operator()(RsNxsGrp* grp) { return grp->grpId == mGrpId;}
};
void RsGxsNetService::locked_createTransactionFromPending(
MsgRespPending* msgPend)
{
MsgAuthorV::const_iterator cit = msgPend->mMsgAuthV.begin();
std::list<RsNxsItem*> reqList;
uint32_t transN = locked_getTransactionId();
for(; cit != msgPend->mMsgAuthV.end(); cit++)
{
const MsgAuthEntry& entry = *cit;
if(entry.mPassedVetting)
{
RsNxsSyncMsgItem* msgItem = new RsNxsSyncMsgItem(mServType);
msgItem->grpId = entry.mGrpId;
msgItem->msgId = entry.mMsgId;
msgItem->authorId = entry.mAuthorId;
msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
msgItem->transactionNumber = transN;
msgItem->PeerId(msgPend->mPeerId);
reqList.push_back(msgItem);
}
}
if(!reqList.empty())
locked_pushMsgTransactionFromList(reqList, msgPend->mPeerId, transN);
}
void RsGxsNetService::locked_createTransactionFromPending(
GrpRespPending* grpPend)
{
GrpAuthorV::const_iterator cit = grpPend->mGrpAuthV.begin();
std::list<RsNxsItem*> reqList;
uint32_t transN = locked_getTransactionId();
for(; cit != grpPend->mGrpAuthV.end(); cit++)
{
const GrpAuthEntry& entry = *cit;
if(entry.mPassedVetting)
{
RsNxsSyncGrpItem* msgItem = new RsNxsSyncGrpItem(mServType);
msgItem->grpId = entry.mGrpId;
msgItem->authorId = entry.mAuthorId;
msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
msgItem->transactionNumber = transN;
msgItem->PeerId(grpPend->mPeerId);
reqList.push_back(msgItem);
}
}
if(!reqList.empty())
locked_pushGrpTransactionFromList(reqList, grpPend->mPeerId, transN);
}
void RsGxsNetService::locked_createTransactionFromPending(GrpCircleIdRequestVetting* grpPend)
{
std::vector<GrpIdCircleVet>::iterator cit = grpPend->mGrpCircleV.begin();
uint32_t transN = locked_getTransactionId();
std::list<RsNxsItem*> itemL;
for(; cit != grpPend->mGrpCircleV.end(); cit++)
{
const GrpIdCircleVet& entry = *cit;
if(entry.mCleared)
{
RsNxsSyncGrpItem* gItem = new
RsNxsSyncGrpItem(mServType);
gItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
gItem->grpId = entry.mGroupId;
gItem->publishTs = 0;
gItem->PeerId(grpPend->mPeerId);
gItem->transactionNumber = transN;
itemL.push_back(gItem);
}
}
if(!itemL.empty())
locked_pushGrpRespFromList(itemL, grpPend->mPeerId, transN);
}
void RsGxsNetService::locked_createTransactionFromPending(MsgCircleIdsRequestVetting* msgPend)
{
std::vector<MsgIdCircleVet>::iterator vit = msgPend->mMsgs.begin();
std::list<RsNxsItem*> itemL;
uint32_t transN = locked_getTransactionId();
for(; vit != msgPend->mMsgs.end(); vit++)
{
MsgIdCircleVet& mic = *vit;
RsNxsSyncMsgItem* mItem = new
RsNxsSyncMsgItem(mServType);
mItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
mItem->grpId = msgPend->mGrpId;
mItem->msgId = mic.mMsgId;
mItem->authorId = mic.mAuthorId;
mItem->PeerId(msgPend->mPeerId);
mItem->transactionNumber = transN;
itemL.push_back(mItem);
}
if(!itemL.empty())
locked_pushMsgRespFromList(itemL, msgPend->mPeerId, transN);
}
bool RsGxsNetService::locked_canReceive(const RsGxsGrpMetaData * const grpMeta,
const std::string& peerId)
{
double timeDelta = 0.2;
if(grpMeta->mCircleType == GXS_CIRCLE_TYPE_EXTERNAL)
{
int i=0;
mCircles->loadCircle(grpMeta->mCircleId);
// check 5 times at most
// spin for 1 second at most
while(i < 5)
{
#ifndef WINDOWS_SYS
usleep((int) (timeDelta * 1000000));
#else
Sleep((int) (timeDelta * 1000));
#endif
if(mCircles->isLoaded(grpMeta->mCircleId))
{
const RsPgpId& pgpId = rsPeers->getGPGId(peerId);
return mCircles->canSend(grpMeta->mCircleId, pgpId);
}
i++;
}
}else
{
return true;
}
return false;
}
void RsGxsNetService::collateGrpFragments(GrpFragments fragments,
std::map<RsGxsGroupId, GrpFragments>& partFragments) const
{
@ -611,6 +758,9 @@ void RsGxsNetService::run(){
// process completed transactions
processCompletedTransactions();
// vetting of id and circle info
runVetting();
}
}
@ -881,14 +1031,12 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
<< std::endl;
#endif
}
}
// notify listener of grps
mObserver->notifyNewGroups(grps);
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{
@ -1002,6 +1150,31 @@ void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr)
}
void RsGxsNetService::locked_pushMsgTransactionFromList(
std::list<RsNxsItem*>& reqList, const std::string& peerId, const uint32_t& transN)
{
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(peerId);
transac->transactionNumber = transN;
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
// create transaction copy with your id to indicate
// its an outgoing transaction
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
{
if (!locked_addTransaction(newTrans))
delete newTrans;
}
}
void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
{
@ -1031,12 +1204,38 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
}
}
if(msgItemL.empty())
return;
if(msgItemL.empty())
return;
// get grp id for this transaction
RsNxsSyncMsgItem* item = msgItemL.front();
const std::string& grpId = item->grpId;
std::map<std::string, RsGxsGrpMetaData*> grpMetaMap;
grpMetaMap[grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
RsGxsGrpMetaData* grpMeta = grpMetaMap[grpId];
// you want to find out if you can receive it
// number polls essentially represent multiple
// of sleep interval
if(grpMeta)
{
bool can = locked_canReceive(grpMeta, tr->mTransaction->PeerId());
delete grpMeta;
if(!can)
return;
}else
{
return;
}
GxsMsgReq reqIds;
reqIds[grpId] = std::vector<RsGxsMessageId>();
GxsMsgMetaResult result;
@ -1060,49 +1259,87 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
const std::string peerFrom = tr->mTransaction->PeerId();
MsgAuthorV toVet;
for(; llit != msgItemL.end(); llit++)
{
const std::string& msgId = (*llit)->msgId;
RsNxsSyncMsgItem*& syncItem = *llit;
const std::string& msgId = syncItem->msgId;
if(msgIdSet.find(msgId) == msgIdSet.end()){
RsNxsSyncMsgItem* msgItem = new RsNxsSyncMsgItem(mServType);
msgItem->grpId = grpId;
msgItem->msgId = msgId;
msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
msgItem->transactionNumber = transN;
msgItem->PeerId(peerFrom);
reqList.push_back(msgItem);
if(mReputations->haveReputation(syncItem->authorId) || syncItem->authorId.empty())
{
GixsReputation rep;
mReputations->getReputation(syncItem->authorId, rep);
if(rep.score > GIXS_CUT_OFF)
{
RsNxsSyncMsgItem* msgItem = new RsNxsSyncMsgItem(mServType);
msgItem->grpId = grpId;
msgItem->msgId = msgId;
msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
msgItem->transactionNumber = transN;
msgItem->PeerId(peerFrom);
reqList.push_back(msgItem);
}
}
else
{
// preload for speed
mReputations->loadReputation(syncItem->authorId);
MsgAuthEntry entry;
entry.mAuthorId = syncItem->authorId;
entry.mGrpId = syncItem->grpId;
entry.mMsgId = syncItem->msgId;
toVet.push_back(entry);
}
}
}
if(!reqList.empty())
{
if(!toVet.empty())
{
MsgRespPending* mrp = new MsgRespPending(mReputations, tr->mTransaction->PeerId(), toVet);
mPendingResp.push_back(mrp);
}
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(tr->mTransaction->PeerId());
transac->transactionNumber = transN;
if(!reqList.empty())
{
locked_pushMsgTransactionFromList(reqList, tr->mTransaction->PeerId(), transN);
}
}
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
// create transaction copy with your id to indicate
// its an outgoing transaction
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
{
if(!locked_addTransaction(newTrans))
delete newTrans;
}
}
void RsGxsNetService::locked_pushGrpTransactionFromList(
std::list<RsNxsItem*>& reqList, const std::string& peerId, const uint32_t& transN)
{
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(peerId);
transac->transactionNumber = transN;
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
if (!locked_addTransaction(newTrans))
delete newTrans;
}
void RsGxsNetService::addGroupItemToList(NxsTransaction*& tr,
const std::string& grpId, uint32_t& transN,
std::list<RsNxsItem*>& reqList)
{
RsNxsSyncGrpItem* grpItem = new RsNxsSyncGrpItem(mServType);
grpItem->PeerId(tr->mTransaction->PeerId());
grpItem->grpId = grpId;
grpItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
grpItem->transactionNumber = transN;
reqList.push_back(grpItem);
}
void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
@ -1125,7 +1362,7 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
}else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::genReqMsgTransaction(): item failed to caste to RsNxsSyncMsgItem* "
std::cerr << "RsGxsNetService::genReqGrpTransaction(): item failed to caste to RsNxsSyncMsgItem* "
<< std::endl;
#endif
delete item;
@ -1142,43 +1379,60 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
uint32_t transN = locked_getTransactionId();
GrpAuthorV toVet;
for(; llit != grpItemL.end(); llit++)
{
const std::string& grpId = (*llit)->grpId;
RsNxsSyncGrpItem*& grpSyncItem = *llit;
const std::string& grpId = grpSyncItem->grpId;
if(grpMetaMap.find(grpId) == grpMetaMap.end()){
RsNxsSyncGrpItem* grpItem = new RsNxsSyncGrpItem(mServType);
grpItem->PeerId(tr->mTransaction->PeerId());
grpItem->grpId = grpId;
grpItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST;
grpItem->transactionNumber = transN;
reqList.push_back(grpItem);
// determine if you need to check reputation
bool checkRep = !grpSyncItem->authorId.empty();
// check if you have reputation, if you don't then
// place in holding pen
if(checkRep)
{
if(mReputations->haveReputation(grpSyncItem->authorId))
{
GixsReputation rep;
mReputations->getReputation(grpSyncItem->authorId, rep);
if(rep.score > GIXS_CUT_OFF)
{
addGroupItemToList(tr, grpId, transN, reqList);
}
}
else
{
// preload reputation for later
mReputations->loadReputation(grpSyncItem->authorId);
GrpAuthEntry entry;
entry.mAuthorId = grpSyncItem->authorId;
entry.mGrpId = grpSyncItem->grpId;
toVet.push_back(entry);
}
}
else
{
addGroupItemToList(tr, grpId, transN, reqList);
}
}
}
if(!toVet.empty())
{
std::string peerId = tr->mTransaction->PeerId();
GrpRespPending* grp = new GrpRespPending(mReputations, peerId, toVet);
mPendingResp.push_back(grp);
}
if(!reqList.empty())
{
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timestamp = 0;
transac->nItems = reqList.size();
transac->PeerId(tr->mTransaction->PeerId());
transac->transactionNumber = transN;
NxsTransaction* newTrans = new NxsTransaction();
newTrans->mItems = reqList;
newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
if(!locked_addTransaction(newTrans))
delete newTrans;
locked_pushGrpTransactionFromList(reqList, tr->mTransaction->PeerId(), transN);
}
@ -1258,6 +1512,86 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
return;
}
void RsGxsNetService::runVetting()
{
RsStackMutex stack(mNxsMutex);
std::vector<AuthorPending*>::iterator vit = mPendingResp.begin();
for(; vit != mPendingResp.end(); )
{
AuthorPending* ap = *vit;
if(ap->accepted() || ap->expired())
{
// add to transactions
if(AuthorPending::MSG_PEND == ap->getType())
{
MsgRespPending* mrp = static_cast<MsgRespPending*>(ap);
locked_createTransactionFromPending(mrp);
}
else if(AuthorPending::GRP_PEND == ap->getType())
{
GrpRespPending* grp = static_cast<GrpRespPending*>(ap);
locked_createTransactionFromPending(grp);
}else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::runVetting(): Unknown pending type! Type: " << ap->getType()
<< std::endl;
#endif
}
delete ap;
vit = mPendingResp.erase(vit);
}
else
{
vit++;
}
}
// now lets do circle vetting
std::vector<GrpCircleVetting*>::iterator vit2 = mPendingCircleVets.begin();
for(; vit2 != mPendingCircleVets.end(); )
{
GrpCircleVetting*& gcv = *vit2;
if(gcv->cleared() || gcv->expired())
{
if(gcv->getType() == GrpCircleVetting::GRP_ID_PEND)
{
GrpCircleIdRequestVetting* gcirv =
static_cast<GrpCircleIdRequestVetting*>(gcv);
locked_createTransactionFromPending(gcirv);
}
else if(gcv->getType() == GrpCircleVetting::MSG_ID_SEND_PEND)
{
MsgCircleIdsRequestVetting* mcirv =
static_cast<MsgCircleIdsRequestVetting*>(gcv);
locked_createTransactionFromPending(mcirv);
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::runVetting(): Unknown Circle pending type! Type: " << gcv->getType()
<< std::endl;
#endif
}
delete gcv;
vit2 = mPendingCircleVets.erase(vit2);
}
else
{
vit2++;
}
}
}
void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
{
@ -1386,6 +1720,29 @@ void RsGxsNetService::cleanTransactionItems(NxsTransaction* tr) const
tr->mItems.clear();
}
void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList,
const std::string& peer, const uint32_t& transN)
{
NxsTransaction* tr = new NxsTransaction();
tr->mItems = respList;
tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
RsNxsTransac* trItem = new RsNxsTransac(mServType);
trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1
| RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP;
trItem->nItems = respList.size();
trItem->timestamp = 0;
trItem->PeerId(peer);
trItem->transactionNumber = transN;
// also make a copy for the resident transaction
tr->mTransaction = new RsNxsTransac(*trItem);
tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
// signal peer to prepare for transaction
sendItem(trItem);
locked_addTransaction(tr);
}
void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
{
@ -1402,11 +1759,12 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
std::map<std::string, RsGxsGrpMetaData*>::iterator mit =
grp.begin();
NxsTransaction* tr = new NxsTransaction();
std::list<RsNxsItem*>& itemL = tr->mItems;
std::list<RsNxsItem*> itemL;
uint32_t transN = locked_getTransactionId();
std::vector<GrpIdCircleVet> toVet;
for(; mit != grp.end(); mit++)
{
RsGxsGrpMetaData* grpMeta = mit->second;
@ -1414,42 +1772,92 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
if(grpMeta->mSubscribeFlags &
GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
{
RsNxsSyncGrpItem* gItem = new
RsNxsSyncGrpItem(mServType);
gItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
gItem->grpId = mit->first;
gItem->publishTs = mit->second->mPublishTs;
gItem->PeerId(peer);
gItem->transactionNumber = transN;
itemL.push_back(gItem);
// check if you can send this id to peer
// or if you need to add to the holding
// pen for peer to be vetted
if(canSendGrpId(peer, *grpMeta, toVet))
{
RsNxsSyncGrpItem* gItem = new
RsNxsSyncGrpItem(mServType);
gItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
gItem->grpId = mit->first;
gItem->publishTs = mit->second->mPublishTs;
gItem->authorId = grpMeta->mAuthorId;
gItem->PeerId(peer);
gItem->transactionNumber = transN;
itemL.push_back(gItem);
}
}
delete grpMeta; // release resource
}
tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
RsNxsTransac* trItem = new RsNxsTransac(mServType);
trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1
| RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP;
trItem->nItems = itemL.size();
if(!toVet.empty())
{
mPendingCircleVets.push_back(new GrpCircleIdRequestVetting(mCircles, toVet, peer));
}
trItem->timestamp = 0;
trItem->PeerId(peer);
trItem->transactionNumber = transN;
// also make a copy for the resident transaction
tr->mTransaction = new RsNxsTransac(*trItem);
tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
// signal peer to prepare for transaction
sendItem(trItem);
locked_addTransaction(tr);
locked_pushGrpRespFromList(itemL, peer, transN);
return;
}
bool RsGxsNetService::canSendGrpId(const std::string& sslId, RsGxsGrpMetaData& grpMeta, std::vector<GrpIdCircleVet>& toVet)
{
// first do the simple checks
uint8_t circleType = grpMeta.mCircleType;
if(circleType == GXS_CIRCLE_TYPE_LOCAL)
return false;
if(circleType == GXS_CIRCLE_TYPE_PUBLIC)
return true;
const RsGxsCircleId& circleId = grpMeta.mCircleId;
if(circleType == GXS_CIRCLE_TYPE_EXTERNAL)
{
if(mCircles->isLoaded(circleId))
{
const RsPgpId& pgpId = rsPeers->getGPGId(sslId);
return mCircles->canSend(circleId, pgpId);
}
toVet.push_back(GrpIdCircleVet(grpMeta.mGroupId, circleId));
return false;
}
if(circleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
{
// a non empty internal circle id means this
// is the personal circle owner
if(!grpMeta.mInternalCircle.empty())
{
const RsGxsCircleId& internalCircleId = grpMeta.mCircleId;
if(mCircles->isLoaded(internalCircleId))
{
const RsPgpId& pgpId = rsPeers->getGPGId(sslId);
return mCircles->canSend(internalCircleId, pgpId);
}
toVet.push_back(GrpIdCircleVet(grpMeta.mGroupId, internalCircleId));
return false;
}
else
{
// an empty internal circle id means this peer can only
// send circle related info from peer he received it
if(grpMeta.mOriginator == sslId)
return true;
else
return false;
}
}
return true;
}
void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
{
RsStackMutex stack(mNxsMutex);
@ -1458,35 +1866,65 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
GxsMsgMetaResult metaResult;
GxsMsgReq req;
std::map<std::string, RsGxsGrpMetaData*> grpMetas;
grpMetas[item->grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetas);
RsGxsGrpMetaData* grpMeta = grpMetas[item->grpId];
if(grpMeta == NULL)
return;
req[item->grpId] = std::vector<std::string>();
mDataStore->retrieveGxsMsgMetaData(req, metaResult);
std::vector<RsGxsMsgMetaData*>& msgMetas = metaResult[item->grpId];
std::vector<RsGxsMsgMetaData*>& msgMeta = metaResult[item->grpId];
if(req.empty()){
if(req.empty())
{
return;
}
std::vector<RsGxsMsgMetaData*>::iterator vit = msgMeta.begin();
NxsTransaction* tr = new NxsTransaction();
std::list<RsNxsItem*>& itemL = tr->mItems;
std::list<RsNxsItem*> itemL;
uint32_t transN = locked_getTransactionId();
for(; vit != msgMeta.end(); vit++)
if(/*canSendMsgIds(msgMetas, *grpMeta, peer)*/ true)
{
RsGxsMsgMetaData* m = *vit;
RsNxsSyncMsgItem* mItem = new
RsNxsSyncMsgItem(mServType);
mItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
mItem->grpId = m->mGroupId;
mItem->msgId = m->mMsgId;
mItem->PeerId(peer);
mItem->transactionNumber = transN;
itemL.push_back(mItem);
std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetas.begin();
for(; vit != msgMetas.end(); vit++)
{
RsGxsMsgMetaData* m = *vit;
RsNxsSyncMsgItem* mItem = new
RsNxsSyncMsgItem(mServType);
mItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
mItem->grpId = m->mGroupId;
mItem->msgId = m->mMsgId;
mItem->authorId = m->mAuthorId;
mItem->PeerId(peer);
mItem->transactionNumber = transN;
itemL.push_back(mItem);
}
if(!itemL.empty())
locked_pushMsgRespFromList(itemL, peer, transN);
}
std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetas.begin();
// release meta resource
for(vit = msgMetas.begin(); vit != msgMetas.end(); vit++)
delete *vit;
return;
}
void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const std::string& sslId,
const uint32_t& transN)
{
NxsTransaction* tr = new NxsTransaction();
tr->mItems = itemL;
tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
RsNxsTransac* trItem = new RsNxsTransac(mServType);
trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1
@ -1495,7 +1933,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
trItem->nItems = itemL.size();
trItem->timestamp = 0;
trItem->PeerId(peer);
trItem->PeerId(sslId);
trItem->transactionNumber = transN;
// also make a copy for the resident transaction
@ -1507,10 +1945,91 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
sendItem(trItem);
locked_addTransaction(tr);
return;
}
bool RsGxsNetService::canSendMsgIds(const std::vector<RsGxsMsgMetaData*>& msgMetas,
const RsGxsGrpMetaData& grpMeta, const std::string& sslId)
{
// first do the simple checks
uint8_t circleType = grpMeta.mCircleType;
if(circleType == GXS_CIRCLE_TYPE_LOCAL)
return false;
if(circleType == GXS_CIRCLE_TYPE_PUBLIC)
return true;
const RsGxsCircleId& circleId = grpMeta.mCircleId;
if(circleType == GXS_CIRCLE_TYPE_EXTERNAL)
{
if(mCircles->isLoaded(circleId))
{
const RsPgpId& pgpId = rsPeers->getGPGId(sslId);
return mCircles->canSend(circleId, pgpId);
}
std::vector<MsgIdCircleVet> toVet;
std::vector<RsGxsMsgMetaData*>::const_iterator vit = msgMetas.begin();
for(; vit != msgMetas.end(); vit++)
{
const RsGxsMsgMetaData* const& meta = *vit;
MsgIdCircleVet mic(meta->mMsgId, meta->mAuthorId);
toVet.push_back(mic);
}
if(!toVet.empty())
mPendingCircleVets.push_back(new MsgCircleIdsRequestVetting(mCircles, toVet, grpMeta.mGroupId,
sslId, grpMeta.mCircleId));
return false;
}
if(circleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
{
// a non empty internal circle id means this
// is the personal circle owner
if(!grpMeta.mInternalCircle.empty())
{
const RsGxsCircleId& internalCircleId = grpMeta.mCircleId;
if(mCircles->isLoaded(internalCircleId))
{
const RsPgpId& pgpId = rsPeers->getGPGId(sslId);
return mCircles->canSend(internalCircleId, pgpId);
}
std::vector<MsgIdCircleVet> toVet;
std::vector<RsGxsMsgMetaData*>::const_iterator vit = msgMetas.begin();
for(; vit != msgMetas.end(); vit++)
{
const RsGxsMsgMetaData* const& meta = *vit;
MsgIdCircleVet mic(meta->mMsgId, meta->mAuthorId);
toVet.push_back(mic);
}
if(!toVet.empty())
mPendingCircleVets.push_back(new MsgCircleIdsRequestVetting(mCircles, toVet, grpMeta.mGroupId,
sslId, grpMeta.mCircleId));
return false;
}
else
{
// an empty internal circle id means this peer can only
// send circle related info from peer he received it
if(grpMeta.mOriginator == sslId)
return true;
else
return false;
}
}
return true;
}
/** inherited methods **/
@ -1524,68 +2043,3 @@ void RsGxsNetService::setSyncAge(uint32_t age)
}
/** NxsTransaction definition **/
const uint8_t NxsTransaction::FLAG_STATE_STARTING = 0x0001; // when
const uint8_t NxsTransaction::FLAG_STATE_RECEIVING = 0x0002; // begin receiving items for incoming trans
const uint8_t NxsTransaction::FLAG_STATE_SENDING = 0x0004; // begin sending items for outgoing trans
const uint8_t NxsTransaction::FLAG_STATE_COMPLETED = 0x008;
const uint8_t NxsTransaction::FLAG_STATE_FAILED = 0x0010;
const uint8_t NxsTransaction::FLAG_STATE_WAITING_CONFIRM = 0x0020;
NxsTransaction::NxsTransaction()
: mFlag(0), mTimeOut(0), mTransaction(NULL) {
}
NxsTransaction::~NxsTransaction(){
std::list<RsNxsItem*>::iterator lit = mItems.begin();
for(; lit != mItems.end(); lit++)
{
delete *lit;
*lit = NULL;
}
delete mTransaction;
mTransaction = NULL;
}
/* Net Manager */
RsNxsNetMgrImpl::RsNxsNetMgrImpl(p3LinkMgr *lMgr)
: mLinkMgr(lMgr), mNxsNetMgrMtx("RsNxsNetMgrImpl")
{
}
std::string RsNxsNetMgrImpl::getOwnId()
{
RsStackMutex stack(mNxsNetMgrMtx);
return mLinkMgr->getOwnId();
}
void RsNxsNetMgrImpl::getOnlineList(std::set<std::string> &ssl_peers)
{
ssl_peers.clear();
std::list<std::string> pList;
{
RsStackMutex stack(mNxsNetMgrMtx);
mLinkMgr->getOnlineList(pList);
}
std::list<std::string>::const_iterator lit = pList.begin();
for(; lit != pList.end(); lit++)
ssl_peers.insert(*lit);
}

View file

@ -34,79 +34,9 @@
#include "rsnxsobserver.h"
#include "pqi/p3linkmgr.h"
#include "serialiser/rsnxsitems.h"
#include "rsgxsnetutils.h"
#include "pqi/p3cfgmgr.h"
/*!
* This represents a transaction made
* with the NxsNetService in all states
* of operation until completion
*
*/
class NxsTransaction
{
public:
static const uint8_t FLAG_STATE_STARTING; // when
static const uint8_t FLAG_STATE_RECEIVING; // begin receiving items for incoming trans
static const uint8_t FLAG_STATE_SENDING; // begin sending items for outgoing trans
static const uint8_t FLAG_STATE_COMPLETED;
static const uint8_t FLAG_STATE_FAILED;
static const uint8_t FLAG_STATE_WAITING_CONFIRM;
NxsTransaction();
~NxsTransaction();
uint32_t mFlag; // current state of transaction
uint32_t mTimeOut;
/*!
* this contains who we
* c what peer this transaction involves.
* c The type of transaction
* c transaction id
* c timeout set for this transaction
* c and itemCount
*/
RsNxsTransac* mTransaction;
std::list<RsNxsItem*> mItems; // items received or sent
};
/*!
* An abstraction of the net manager
* for retrieving Rs peers whom you will be synchronising
* and also you own Id
* Useful for testing also (abstracts away Rs's p3NetMgr)
*/
class RsNxsNetMgr
{
public:
virtual std::string getOwnId() = 0;
virtual void getOnlineList(std::set<std::string>& ssl_peers) = 0;
};
class RsNxsNetMgrImpl : public RsNxsNetMgr
{
public:
RsNxsNetMgrImpl(p3LinkMgr* lMgr);
std::string getOwnId();
void getOnlineList(std::set<std::string>& ssl_peers);
private:
p3LinkMgr* mLinkMgr;
RsMutex mNxsNetMgrMtx;
};
#include "rsgixs.h"
/// keep track of transaction number
typedef std::map<uint32_t, NxsTransaction*> TransactionIdMap;
@ -139,9 +69,11 @@ public:
* @param servType service type
* @param gds The data service which allows read access to a service/store
* @param nxsObs observer will be notified whenever new messages/grps
* @param nxsObs observer will be notified whenever new messages/grps
* arrive
*/
RsGxsNetService(uint16_t servType, RsGeneralDataService* gds, RsNxsNetMgr* netMgr, RsNxsObserver* nxsObs = NULL);
RsGxsNetService(uint16_t servType, RsGeneralDataService* gds, RsNxsNetMgr* netMgr,
RsNxsObserver* nxsObs = NULL, RsGixsReputation* repuations = NULL, RsGcxs* circles = NULL);
virtual ~RsGxsNetService();
@ -361,7 +293,34 @@ private:
/** E: item handlers **/
void runVetting();
/*!
* @param peerId The peer to vet to see if they can receive this groupid
* @param grpMeta this is the meta item to determine if it can be sent to given peer
* @param toVet groupid/peer to vet are stored here if their circle id is not cached
* @return false, if you cannot send to this peer, true otherwise
*/
bool canSendGrpId(const std::string& sslId, RsGxsGrpMetaData& grpMeta, std::vector<GrpIdCircleVet>& toVet);
bool canSendMsgIds(const std::vector<RsGxsMsgMetaData*>& msgMetas, const RsGxsGrpMetaData&, const std::string& sslId);
void locked_createTransactionFromPending(MsgRespPending* grpPend);
void locked_createTransactionFromPending(GrpRespPending* msgPend);
void locked_createTransactionFromPending(GrpCircleIdRequestVetting* grpPend);
void locked_createTransactionFromPending(MsgCircleIdsRequestVetting* grpPend);
void locked_pushMsgTransactionFromList(std::list<RsNxsItem*>& reqList, const std::string& peerId, const uint32_t& transN);
void locked_pushGrpTransactionFromList(std::list<RsNxsItem*>& reqList, const std::string& peerId, const uint32_t& transN);
void locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList, const std::string& peer, const uint32_t& transN);
void locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const std::string& sslId, const uint32_t& transN);
void syncWithPeers();
void addGroupItemToList(NxsTransaction*& tr,
const std::string& grpId, uint32_t& transN,
std::list<RsNxsItem*>& reqList);
bool locked_canReceive(const RsGxsGrpMetaData * const grpMeta, const std::string& peerId);
private:
@ -457,6 +416,12 @@ private:
const uint32_t mSYNC_PERIOD;
RsGcxs* mCircles;
RsGixsReputation* mReputations;
// need to be verfied
std::vector<AuthorPending*> mPendingResp;
std::vector<GrpCircleVetting*> mPendingCircleVets;
};
#endif // RSGXSNETSERVICE_H

View file

@ -0,0 +1,302 @@
/*
* libretroshare/src/gxs: rsgxnetutils.cc
*
* Helper objects for the operation rsgxsnetservice
*
* Copyright 2012-2013 by Christopher Evi-Parker
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "rsgxsnetutils.h"
#include "retroshare/rspeers.h"
const time_t AuthorPending::EXPIRY_PERIOD_OFFSET = 30; // 30 seconds
const int AuthorPending::MSG_PEND = 1;
const int AuthorPending::GRP_PEND = 2;
AuthorPending::AuthorPending(RsGixsReputation* rep, time_t timeStamp)
: mRep(rep), mTimeStamp(timeStamp) {
}
AuthorPending::~AuthorPending()
{
}
bool AuthorPending::expired() const
{
return time(NULL) > (mTimeStamp + EXPIRY_PERIOD_OFFSET);
}
bool AuthorPending::getAuthorRep(GixsReputation& rep,
const std::string& authorId)
{
if(mRep->haveReputation(authorId))
{
return mRep->getReputation(authorId, rep);
}
mRep->loadReputation(authorId);
return false;
}
MsgAuthEntry::MsgAuthEntry()
: mPassedVetting(false) {}
GrpAuthEntry::GrpAuthEntry()
: mPassedVetting(false) {}
MsgRespPending::MsgRespPending(RsGixsReputation* rep, const std::string& peerId, const MsgAuthorV& msgAuthV, int cutOff)
: AuthorPending(rep, time(NULL)), mPeerId(peerId), mMsgAuthV(msgAuthV), mCutOff(cutOff)
{
}
GrpRespPending::GrpRespPending(RsGixsReputation* rep, const std::string& peerId, const GrpAuthorV& grpAuthV, int cutOff)
: AuthorPending(rep, time(NULL)), mPeerId(peerId), mGrpAuthV(grpAuthV), mCutOff(cutOff)
{
}
int MsgRespPending::getType() const
{
return MSG_PEND;
}
bool MsgRespPending::accepted()
{
MsgAuthorV::iterator cit = mMsgAuthV.begin();
MsgAuthorV::size_type count = 0;
for(; cit != mMsgAuthV.end(); cit++)
{
MsgAuthEntry& entry = *cit;
if(!entry.mPassedVetting)
{
GixsReputation rep;
if(getAuthorRep(rep, entry.mAuthorId))
{
if(rep.score > mCutOff)
{
entry.mPassedVetting = true;
count++;
}
}
}else
{
count++;
}
}
return count == mMsgAuthV.size();
}
int GrpRespPending::getType() const
{
return GRP_PEND;
}
bool GrpRespPending::accepted()
{
GrpAuthorV::iterator cit = mGrpAuthV.begin();
GrpAuthorV::size_type count = 0;
for(; cit != mGrpAuthV.end(); cit++)
{
GrpAuthEntry& entry = *cit;
if(!entry.mPassedVetting)
{
GixsReputation rep;
if(getAuthorRep(rep, entry.mAuthorId))
{
if(rep.score > mCutOff)
{
entry.mPassedVetting = true;
count++;
}
}
}else
{
count++;
}
}
return count == mGrpAuthV.size();
}
/** NxsTransaction definition **/
const uint8_t NxsTransaction::FLAG_STATE_STARTING = 0x0001; // when
const uint8_t NxsTransaction::FLAG_STATE_RECEIVING = 0x0002; // begin receiving items for incoming trans
const uint8_t NxsTransaction::FLAG_STATE_SENDING = 0x0004; // begin sending items for outgoing trans
const uint8_t NxsTransaction::FLAG_STATE_COMPLETED = 0x008;
const uint8_t NxsTransaction::FLAG_STATE_FAILED = 0x0010;
const uint8_t NxsTransaction::FLAG_STATE_WAITING_CONFIRM = 0x0020;
NxsTransaction::NxsTransaction()
: mFlag(0), mTimeOut(0), mTransaction(NULL) {
}
NxsTransaction::~NxsTransaction(){
std::list<RsNxsItem*>::iterator lit = mItems.begin();
for(; lit != mItems.end(); lit++)
{
delete *lit;
*lit = NULL;
}
if(mTransaction)
delete mTransaction;
mTransaction = NULL;
}
/* Net Manager */
RsNxsNetMgrImpl::RsNxsNetMgrImpl(p3LinkMgr *lMgr)
: mLinkMgr(lMgr), mNxsNetMgrMtx("RsNxsNetMgrImpl")
{
}
std::string RsNxsNetMgrImpl::getOwnId()
{
RsStackMutex stack(mNxsNetMgrMtx);
return mLinkMgr->getOwnId();
}
void RsNxsNetMgrImpl::getOnlineList(std::set<std::string> &ssl_peers)
{
ssl_peers.clear();
std::list<std::string> pList;
{
RsStackMutex stack(mNxsNetMgrMtx);
mLinkMgr->getOnlineList(pList);
}
std::list<std::string>::const_iterator lit = pList.begin();
for(; lit != pList.end(); lit++)
ssl_peers.insert(*lit);
}
const time_t GrpCircleVetting::EXPIRY_PERIOD_OFFSET = 5; // 10 seconds
const int GrpCircleVetting::GRP_ID_PEND = 1;
const int GrpCircleVetting::GRP_ITEM_PEND = 2;
const int GrpCircleVetting::MSG_ID_SEND_PEND = 3;
const int GrpCircleVetting::MSG_ID_RECV_PEND = 3;
GrpIdCircleVet::GrpIdCircleVet(const RsGxsGroupId& grpId, const RsGxsCircleId& circleId)
: mGroupId(grpId), mCircleId(circleId), mCleared(false) {}
GrpCircleVetting::GrpCircleVetting(RsGcxs* const circles)
: mCircles(circles) {}
GrpCircleVetting::~GrpCircleVetting() {}
bool GrpCircleVetting::expired()
{
return time(NULL) > (mTimeStamp + EXPIRY_PERIOD_OFFSET);
}
bool GrpCircleVetting::canSend(const RsPgpId& peerId, const RsGxsCircleId& circleId)
{
if(mCircles->isLoaded(circleId))
{
const RsPgpId& pgpId = rsPeers->getGPGId(peerId);
return mCircles->canSend(circleId, pgpId);
}
mCircles->loadCircle(circleId);
return false;
}
GrpCircleIdRequestVetting::GrpCircleIdRequestVetting(
RsGcxs* const circles, std::vector<GrpIdCircleVet> grpCircleV, const std::string& peerId)
: GrpCircleVetting(circles), mGrpCircleV(grpCircleV), mPeerId(peerId) {}
bool GrpCircleIdRequestVetting::cleared()
{
std::vector<GrpIdCircleVet>::size_type i, count;
for(i = 0; i < mGrpCircleV.size(); i++)
{
GrpIdCircleVet& gic = mGrpCircleV[i];
if(!gic.mCleared)
{
if(canSend(mPeerId, gic.mCircleId))
{
gic.mCleared = true;
count++;
}
}
else
{
count++;
}
}
return count == mGrpCircleV.size();
}
int GrpCircleIdRequestVetting::getType() const
{
return GRP_ID_PEND;
}
MsgIdCircleVet::MsgIdCircleVet(const RsGxsMessageId& msgId,
const std::string& authorId)
: mMsgId(msgId), mAuthorId(authorId) {
}
MsgCircleIdsRequestVetting::MsgCircleIdsRequestVetting(RsGcxs* const circles,
std::vector<MsgIdCircleVet> msgs, const RsGxsGroupId& grpId,
const std::string& peerId, const RsGxsCircleId& circleId)
: GrpCircleVetting(circles), mMsgs(msgs), mGrpId(grpId), mPeerId(peerId), mCircleId(circleId) {}
bool MsgCircleIdsRequestVetting::cleared()
{
return canSend(mPeerId, mCircleId);
}
int MsgCircleIdsRequestVetting::getType() const
{
return MSG_ID_SEND_PEND;
}

View file

@ -0,0 +1,285 @@
/*
* libretroshare/src/gxs: rsgxnetutils.h
*
* Helper objects for the operation rsgxsnetservice
*
* Copyright 2012-2013 by Christopher Evi-Parker
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#ifndef RSGXSNETUTILS_H_
#define RSGXSNETUTILS_H_
#include <stdlib.h>
#include "retroshare/rsgxsifacetypes.h"
#include "pqi/p3linkmgr.h"
#include "serialiser/rsnxsitems.h"
#include "rsgixs.h"
/*!
* This represents a transaction made
* with the NxsNetService in all states
* of operation until completion
*
*/
class NxsTransaction
{
public:
static const uint8_t FLAG_STATE_STARTING; // when
static const uint8_t FLAG_STATE_RECEIVING; // begin receiving items for incoming trans
static const uint8_t FLAG_STATE_SENDING; // begin sending items for outgoing trans
static const uint8_t FLAG_STATE_COMPLETED;
static const uint8_t FLAG_STATE_FAILED;
static const uint8_t FLAG_STATE_WAITING_CONFIRM;
NxsTransaction();
~NxsTransaction();
uint32_t mFlag; // current state of transaction
uint32_t mTimeOut;
/*!
* this contains who we
* c what peer this transaction involves.
* c The type of transaction
* c transaction id
* c timeout set for this transaction
* c and itemCount
*/
RsNxsTransac* mTransaction;
std::list<RsNxsItem*> mItems; // items received or sent
};
/*!
* An abstraction of the net manager
* for retrieving Rs peers whom you will be synchronising
* and also you own Id
* Useful for testing also (abstracts away Rs's p3NetMgr)
*/
class RsNxsNetMgr
{
public:
virtual ~RsNxsNetMgr(){};
virtual std::string getOwnId() = 0;
virtual void getOnlineList(std::set<std::string>& ssl_peers) = 0;
};
class RsNxsNetMgrImpl : public RsNxsNetMgr
{
public:
RsNxsNetMgrImpl(p3LinkMgr* lMgr);
virtual ~RsNxsNetMgrImpl(){};
std::string getOwnId();
void getOnlineList(std::set<std::string>& ssl_peers);
private:
p3LinkMgr* mLinkMgr;
RsMutex mNxsNetMgrMtx;
};
/*!
* Partial abstract class
* which represents
* items waiting to be vetted
* on account of their author Id
*/
class AuthorPending
{
public:
static const int MSG_PEND;
static const int GRP_PEND;
static const time_t EXPIRY_PERIOD_OFFSET;
AuthorPending(RsGixsReputation* rep, time_t timeStamp);
virtual ~AuthorPending();
virtual int getType() const = 0 ;
/*!
* @return true if all authors pass vetting and their messages
* should be requested
*/
virtual bool accepted() = 0;
/*!
* @return true if message is past set expiry date
*/
bool expired() const;
protected:
/*!
* Convenience function to get author reputation
* @param rep reputation of author
* @param authorId reputation to get
* @return true if successfully retrieve repution
*/
bool getAuthorRep(GixsReputation& rep, const std::string& authorId);
private:
RsGixsReputation* mRep;
time_t mTimeStamp;
};
class MsgAuthEntry
{
public:
MsgAuthEntry();
RsGxsMessageId mMsgId;
RsGxsGroupId mGrpId;
std::string mAuthorId;
bool mPassedVetting;
};
class GrpAuthEntry
{
public:
GrpAuthEntry();
RsGxsGroupId mGrpId;
std::string mAuthorId;
bool mPassedVetting;
};
typedef std::vector<MsgAuthEntry> MsgAuthorV;
typedef std::vector<GrpAuthEntry> GrpAuthorV;
class MsgRespPending : public AuthorPending
{
public:
MsgRespPending(RsGixsReputation* rep, const std::string& peerId, const MsgAuthorV& msgAuthV, int cutOff = 0);
int getType() const;
bool accepted();
std::string mPeerId;
MsgAuthorV mMsgAuthV;
int mCutOff;
};
class GrpRespPending : public AuthorPending
{
public:
GrpRespPending(RsGixsReputation* rep, const std::string& peerId, const GrpAuthorV& grpAuthV, int cutOff = 0);
int getType() const;
bool accepted();
std::string mPeerId;
GrpAuthorV mGrpAuthV;
int mCutOff;
};
///////////////
class GrpIdCircleVet
{
public:
GrpIdCircleVet(const RsGxsGroupId& grpId, const RsGxsCircleId& circleId);
RsGxsGroupId mGroupId;
RsGxsCircleId mCircleId;
bool mCleared;
};
class MsgIdCircleVet
{
public:
MsgIdCircleVet(const RsGxsMessageId& grpId, const std::string& authorId);
RsGxsMessageId mMsgId;
std::string mAuthorId;
};
class GrpItemCircleVet
{
public:
RsNxsGrp* grpItem;
RsGxsCircleId mCircleId;
bool mCleared;
};
class GrpCircleVetting
{
public:
static const time_t EXPIRY_PERIOD_OFFSET;
static const int GRP_ID_PEND;
static const int GRP_ITEM_PEND;
static const int MSG_ID_SEND_PEND;
static const int MSG_ID_RECV_PEND;
GrpCircleVetting(RsGcxs* const circles);
virtual ~GrpCircleVetting();
bool expired();
virtual int getType() const = 0;
virtual bool cleared() = 0;
protected:
bool canSend(const RsPgpId& peerId, const RsGxsCircleId& circleId);
private:
RsGcxs* const mCircles;
time_t mTimeStamp;
};
class GrpCircleIdRequestVetting : public GrpCircleVetting
{
public:
GrpCircleIdRequestVetting(RsGcxs* const circles, std::vector<GrpIdCircleVet> mGrpCircleV, const std::string& peerId);
bool cleared();
int getType() const;
std::vector<GrpIdCircleVet> mGrpCircleV;
std::string mPeerId;
};
class MsgCircleIdsRequestVetting : public GrpCircleVetting
{
public:
MsgCircleIdsRequestVetting(RsGcxs* const circles, std::vector<MsgIdCircleVet> msgs, const RsGxsGroupId& grpId,
const std::string& peerId, const RsGxsCircleId& circleId);
bool cleared();
int getType() const;
std::vector<MsgIdCircleVet> mMsgs;
RsGxsGroupId mGrpId;
std::string mPeerId;
RsGxsCircleId mCircleId;
};
#endif /* RSGXSNETUTILS_H_ */

View file

@ -25,6 +25,7 @@
#include "rsgxsutil.h"
#include "retroshare/rsgxsflags.h"
#include "pqi/pqihash.h"
RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize)
@ -102,3 +103,79 @@ bool RsGxsMessageCleanUp::clean()
return mGrpMeta.empty();
}
RsGxsIntegrityCheck::RsGxsIntegrityCheck(
RsGeneralDataService* const dataService) :
mDs(dataService), mDone(false), mIntegrityMutex("integrity")
{ }
void RsGxsIntegrityCheck::run()
{
check();
}
bool RsGxsIntegrityCheck::check()
{
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true);
std::vector<RsGxsGroupId> grpsToDel;
// compute hash and compare to stored value, if it fails then simply add it
// to list
std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin();
for(; git != grp.end(); git++)
{
RsNxsGrp* grp = git->second;
std::string currHash;
pqihash pHash;
pHash.addData(grp->grp.bin_data, grp->grp.bin_len);
pHash.Complete(currHash);
if(currHash != grp->metaData->mHash) grpsToDel.push_back(grp->grpId);
delete grp;
}
mDs->removeGroups(grpsToDel);
// now messages
GxsMsgReq msgsToDel;
GxsMsgResult msgs;
mDs->retrieveNxsMsgs(msgsToDel, msgs, false, true);
GxsMsgResult::iterator mit = msgs.begin();
for(; mit != msgs.begin(); mit++)
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
for(; vit != msgV.end(); vit++)
{
RsNxsMsg* msg = *vit;
std::string currHash;
pqihash pHash;
pHash.addData(msg->msg.bin_data, msg->msg.bin_len);
pHash.Complete(currHash);
if(currHash != msg->metaData->mHash) msgsToDel[msg->grpId].push_back(msg->msgId);
delete msg;
}
}
mDs->removeMsgs(msgsToDel);
RsStackMutex stack(mIntegrityMutex);
mDone = true;
return true;
}
bool RsGxsIntegrityCheck::isDone()
{
RsStackMutex stack(mIntegrityMutex);
return mDone;
}

View file

@ -82,7 +82,7 @@ public:
bool clean();
/*!
* TODO: Rather manual progressions consider running through a thread
* TODO: Rather than manual progressions consider running through a thread
*/
void run(){}
@ -93,5 +93,40 @@ private:
std::vector<RsGxsGrpMetaData*> mGrpMeta;
};
/*!
* Checks the integrity message and groups
* in rsDataService using computed hash
*/
class RsGxsIntegrityCheck : public RsThread
{
enum CheckState { CheckStart, CheckChecking };
public:
/*!
*
* @param dataService
* @param mGroupTS
* @param chunkSize
* @param sleepPeriod
*/
RsGxsIntegrityCheck(RsGeneralDataService* const dataService);
bool check();
bool isDone();
void run();
private:
RsGeneralDataService* const mDs;
std::vector<RsNxsItem*> mItems;
bool mDone;
RsMutex mIntegrityMutex;
};
#endif /* GXSUTIL_H_ */