merging rs_gxs-finale to v0.6 branch

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-initdev@6953 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2013-12-21 18:20:47 +00:00
commit 39909b705f
67 changed files with 5686 additions and 488 deletions

View file

@ -38,7 +38,7 @@ GxsSecurity::~GxsSecurity()
{
}
RSA *GxsSecurity::extractPublicKey(RsTlvSecurityKey& key)
RSA *GxsSecurity::extractPublicKey(const RsTlvSecurityKey& key)
{
const unsigned char *keyptr = (const unsigned char *) key.keyData.bin_data;
long keylen = key.keyData.bin_len;
@ -489,7 +489,7 @@ void GxsSecurity::setRSAPrivateKey(RsTlvSecurityKey & key, RSA *rsa_priv)
key.keyId = keyId;
}
RSA *GxsSecurity::extractPrivateKey(RsTlvSecurityKey & key)
RSA *GxsSecurity::extractPrivateKey(const RsTlvSecurityKey & key)
{
const unsigned char *keyptr = (const unsigned char *) key.keyData.bin_data;
long keylen = key.keyData.bin_len;

View file

@ -51,14 +51,14 @@ public:
* @param key RsTlvSecurityKey to extract public RSA key from
* @return pointer to the public RSA key if successful, null otherwise
*/
static RSA *extractPublicKey(RsTlvSecurityKey &key);
static RSA *extractPublicKey(const RsTlvSecurityKey &key);
/*!
* extracts the public key from an RsTlvSecurityKey
* @param key RsTlvSecurityKey to extract private RSA key from
* @return pointer to the private RSA key if successful, null otherwise
*/
static RSA *extractPrivateKey(RsTlvSecurityKey &key);
static RSA *extractPrivateKey(const RsTlvSecurityKey &key);
/*!
* stores the rsa public key in a RsTlvSecurityKey

View file

@ -34,6 +34,8 @@
#define MSG_TABLE_NAME std::string("MESSAGES")
#define GRP_TABLE_NAME std::string("GROUPS")
#define GRP_LAST_POST_UPDATE_TRIGGER std::string("LAST_POST_UPDATE")
// generic
#define KEY_NXS_FILE std::string("nxsFile")
@ -48,6 +50,7 @@
#define KEY_NXS_META std::string("meta")
#define KEY_NXS_SERV_STRING std::string("serv_str")
#define KEY_NXS_HASH std::string("hash")
#define KEY_RECV_TS std::string("recv_time_stamp")
// grp table columns
@ -111,6 +114,7 @@
#define COL_GRP_INTERN_CIRCLE 18
#define COL_GRP_ORIGINATOR 19
#define COL_GRP_AUTHEN_FLAGS 20
#define COL_GRP_RECV_TS 21
// msg col numbers
@ -122,6 +126,7 @@
#define COL_THREAD_ID 11
#define COL_MSG_NAME 12
#define COL_MSG_SERV_STRING 13
#define COL_MSG_RECV_TS 14
// generic meta shared col numbers
#define COL_GRP_ID 0
@ -154,7 +159,7 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
msgMetaColumns.push_back(KEY_SIGN_SET); msgMetaColumns.push_back(KEY_NXS_IDENTITY); msgMetaColumns.push_back(KEY_NXS_HASH);
msgMetaColumns.push_back(KEY_MSG_ID); msgMetaColumns.push_back(KEY_ORIG_MSG_ID); msgMetaColumns.push_back(KEY_MSG_STATUS);
msgMetaColumns.push_back(KEY_CHILD_TS); msgMetaColumns.push_back(KEY_MSG_PARENT_ID); msgMetaColumns.push_back(KEY_MSG_THREAD_ID);
msgMetaColumns.push_back(KEY_MSG_NAME); msgMetaColumns.push_back(KEY_NXS_SERV_STRING);
msgMetaColumns.push_back(KEY_MSG_NAME); msgMetaColumns.push_back(KEY_NXS_SERV_STRING); msgMetaColumns.push_back(KEY_RECV_TS);
// for retrieving actual data
msgColumns.push_back(KEY_GRP_ID); msgColumns.push_back(KEY_NXS_FILE); msgColumns.push_back(KEY_NXS_FILE_OFFSET);
@ -168,7 +173,7 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
grpMetaColumns.push_back(KEY_GRP_LAST_POST); grpMetaColumns.push_back(KEY_ORIG_GRP_ID); grpMetaColumns.push_back(KEY_NXS_SERV_STRING);
grpMetaColumns.push_back(KEY_GRP_SIGN_FLAGS); grpMetaColumns.push_back(KEY_GRP_CIRCLE_ID); grpMetaColumns.push_back(KEY_GRP_CIRCLE_TYPE);
grpMetaColumns.push_back(KEY_GRP_INTERNAL_CIRCLE); grpMetaColumns.push_back(KEY_GRP_ORIGINATOR);
grpMetaColumns.push_back(KEY_GRP_AUTHEN_FLAGS);
grpMetaColumns.push_back(KEY_GRP_AUTHEN_FLAGS); grpMetaColumns.push_back(KEY_RECV_TS);
// for retrieving actual grp data
grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_FILE); grpColumns.push_back(KEY_NXS_FILE_OFFSET);
@ -177,6 +182,8 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
// for retrieving msg offsets
mMsgOffSetColumns.push_back(KEY_MSG_ID); mMsgOffSetColumns.push_back(KEY_NXS_FILE_OFFSET);
mMsgOffSetColumns.push_back(KEY_NXS_FILE_LEN);
grpIdColumn.push_back(KEY_GRP_ID);
}
RsDataService::~RsDataService(){
@ -210,6 +217,7 @@ void RsDataService::initialise(){
KEY_MSG_NAME + " TEXT," +
KEY_NXS_SERV_STRING + " TEXT," +
KEY_NXS_HASH + " TEXT," +
KEY_RECV_TS + " INT," +
KEY_NXS_FILE_LEN + " INT);");
// create table for grp data
@ -238,8 +246,15 @@ void RsDataService::initialise(){
KEY_GRP_INTERNAL_CIRCLE + " TEXT," +
KEY_GRP_ORIGINATOR + " TEXT," +
KEY_NXS_HASH + " TEXT," +
KEY_RECV_TS + " INT," +
KEY_SIGN_SET + " BLOB);");
mDb->execSQL("CREATE TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER +
" INSERT ON " + MSG_TABLE_NAME +
std::string(" BEGIN ") +
" UPDATE " + GRP_TABLE_NAME + " SET " + KEY_GRP_LAST_POST + "= new."
+ KEY_RECV_TS + " WHERE " + KEY_GRP_ID + "=new." + KEY_GRP_ID + ";"
+ std::string("END;"));
}
RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c)
@ -291,6 +306,7 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c)
c.getString(COL_GRP_INTERN_CIRCLE, grpMeta->mInternalCircle);
c.getString(COL_GRP_ORIGINATOR, grpMeta->mOriginator);
grpMeta->mAuthenFlags = c.getInt32(COL_GRP_AUTHEN_FLAGS);
grpMeta->mRecvTS = c.getInt32(COL_GRP_RECV_TS);
if(ok)
@ -377,6 +393,7 @@ RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c)
c.getString(COL_MSG_NAME, msgMeta->mMsgName);
c.getString(COL_MSG_SERV_STRING, msgMeta->mServiceString);
c.getString(COL_HASH, msgMeta->mHash);
msgMeta->recvTS = c.getInt32(COL_MSG_RECV_TS);
offset = 0;
data = (char*)c.getData(COL_SIGN_SET, data_len);
@ -489,6 +506,7 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
cv.put(KEY_GRP_ID, msgMetaPtr->mGroupId);
cv.put(KEY_NXS_SERV_STRING, msgMetaPtr->mServiceString);
cv.put(KEY_NXS_HASH, msgMetaPtr->mHash);
cv.put(KEY_RECV_TS, (int32_t)msgMetaPtr->recvTS);
char signSetData[msgMetaPtr->signSet.TlvSize()];
@ -596,6 +614,7 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
cv.put(KEY_GRP_ORIGINATOR, grpMetaPtr->mOriginator);
cv.put(KEY_GRP_AUTHEN_FLAGS, (int32_t)grpMetaPtr->mAuthenFlags);
cv.put(KEY_NXS_HASH, grpMetaPtr->mHash);
cv.put(KEY_RECV_TS, (int32_t)grpMetaPtr->mRecvTS);
if(! (grpMetaPtr->mAuthorId.empty()) ){
cv.put(KEY_NXS_IDENTITY, grpMetaPtr->mAuthorId);
@ -641,6 +660,98 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
return ret;
}
int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
{
RsStackMutex stack(mDbMutex);
std::map<RsNxsGrp*, RsGxsGrpMetaData* >::iterator sit = grp.begin();
// begin transaction
mDb->execSQL("BEGIN;");
for(; sit != grp.end(); sit++)
{
RsNxsGrp* grpPtr = sit->first;
RsGxsGrpMetaData* grpMetaPtr = sit->second;
// if data is larger than max item size do not add
if(!validSize(grpPtr)) continue;
std::string grpFile = mServiceDir + "/" + grpPtr->grpId;
std::ofstream ostrm(grpFile.c_str(), std::ios::binary | std::ios::trunc);
uint32_t offset = 0; // get file offset
/*!
* STORE file offset, file length, file name,
* grpId, flags, publish time stamp, identity,
* id signature, admin signatue, key set, last posting ts
* and meta data
**/
ContentValue cv;
cv.put(KEY_NXS_FILE_OFFSET, (int32_t)offset);
cv.put(KEY_NXS_FILE_LEN, (int32_t)grpPtr->grp.TlvSize());
cv.put(KEY_NXS_FILE, grpFile);
cv.put(KEY_GRP_ID, grpPtr->grpId);
cv.put(KEY_GRP_NAME, grpMetaPtr->mGroupName);
cv.put(KEY_ORIG_GRP_ID, grpMetaPtr->mOrigGrpId);
cv.put(KEY_NXS_SERV_STRING, grpMetaPtr->mServiceString);
cv.put(KEY_NXS_FLAGS, (int32_t)grpMetaPtr->mGroupFlags);
cv.put(KEY_TIME_STAMP, (int32_t)grpMetaPtr->mPublishTs);
cv.put(KEY_GRP_SIGN_FLAGS, (int32_t)grpMetaPtr->mSignFlags);
cv.put(KEY_GRP_CIRCLE_ID, grpMetaPtr->mCircleId);
cv.put(KEY_GRP_CIRCLE_TYPE, (int32_t)grpMetaPtr->mCircleType);
cv.put(KEY_GRP_INTERNAL_CIRCLE, grpMetaPtr->mInternalCircle);
cv.put(KEY_GRP_ORIGINATOR, grpMetaPtr->mOriginator);
cv.put(KEY_GRP_AUTHEN_FLAGS, (int32_t)grpMetaPtr->mAuthenFlags);
cv.put(KEY_NXS_HASH, grpMetaPtr->mHash);
if(! (grpMetaPtr->mAuthorId.empty()) ){
cv.put(KEY_NXS_IDENTITY, grpMetaPtr->mAuthorId);
}
offset = 0;
char keySetData[grpMetaPtr->keys.TlvSize()];
grpMetaPtr->keys.SetTlv(keySetData, grpMetaPtr->keys.TlvSize(), &offset);
cv.put(KEY_KEY_SET, grpMetaPtr->keys.TlvSize(), keySetData);
offset = 0;
char metaData[grpPtr->meta.TlvSize()];
grpPtr->meta.SetTlv(metaData, grpPtr->meta.TlvSize(), &offset);
cv.put(KEY_NXS_META, grpPtr->meta.TlvSize(), metaData);
// local meta data
cv.put(KEY_GRP_SUBCR_FLAG, (int32_t)grpMetaPtr->mSubscribeFlags);
cv.put(KEY_GRP_POP, (int32_t)grpMetaPtr->mPop);
cv.put(KEY_MSG_COUNT, (int32_t)grpMetaPtr->mMsgCount);
cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus);
cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost);
offset = 0;
char grpData[grpPtr->grp.TlvSize()];
grpPtr->grp.SetTlv(grpData, grpPtr->grp.TlvSize(), &offset);
ostrm.write(grpData, grpPtr->grp.TlvSize());
ostrm.close();
mDb->sqlUpdate(GRP_TABLE_NAME, "grpId='" + grpPtr->grpId + "'", cv);
}
// finish transaction
bool ret = mDb->execSQL("COMMIT;");
for(sit = grp.begin(); sit != grp.end(); sit++)
{
//TODO: API encourages aliasing, remove this abomination
if(sit->second != sit->first->metaData)
delete sit->second;
delete sit->first;
}
return ret;
}
bool RsDataService::validSize(RsNxsGrp* grp) const
{
if((grp->grp.TlvSize() + grp->meta.TlvSize()) <= GXS_MAX_ITEM_SIZE) return true;
@ -964,34 +1075,33 @@ int RsDataService::retrieveGxsGrpMetaData(std::map<RsGxsGroupId, RsGxsGrpMetaDat
}else
{
std::map<RsGxsGroupId, RsGxsGrpMetaData *>::iterator mit = grp.begin();
for(; mit != grp.end(); mit++)
{
const RsGxsGroupId& grpId = mit->first;
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, grpMetaColumns, "grpId='" + grpId + "'", "");
for(; mit != grp.end(); mit++)
{
const RsGxsGroupId& grpId = mit->first;
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, grpMetaColumns, "grpId='" + grpId + "'", "");
if(c)
{
bool valid = c->moveToFirst();
if(c)
{
bool valid = c->moveToFirst();
while(valid)
{
RsGxsGrpMetaData* g = locked_getGrpMeta(*c);
while(valid)
{
RsGxsGrpMetaData* g = locked_getGrpMeta(*c);
if(g)
{
grp[g->mGroupId] = g;
}
valid = c->moveToNext();
}
delete c;
}
if(g)
{
grp[g->mGroupId] = g;
}
valid = c->moveToNext();
}
delete c;
}
}
}
}
}
return 1;
@ -1010,21 +1120,22 @@ int RsDataService::resetDataStore()
std::map<std::string, RsNxsGrp*>::iterator mit
= grps.begin();
// remove all grp msgs files from service dir
for(; mit != grps.end(); mit++){
std::string file = mServiceDir + "/" + mit->first;
std::string msgFile = file + "-msgs";
remove(file.c_str()); // remove group file
remove(msgFile.c_str()); // and remove messages file
delete mit->second;
}
{
RsStackMutex stack(mDbMutex);
mDb->closeDb();
}
remove(mDbName.c_str()); // remove db file
// remove all grp msgs files from service dir
for(; mit != grps.end(); mit++){
std::string file = mServiceDir + "/" + mit->first;
std::string msgFile = file + "-msgs";
remove(file.c_str()); // remove group file
remove(msgFile.c_str()); // and remove messages file
delete mit->second;
}
mDb->execSQL("DROP TABLE " + MSG_TABLE_NAME);
mDb->execSQL("DROP TABLE " + GRP_TABLE_NAME);
mDb->execSQL("DROP TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER);
}
// recreate database
initialise();
@ -1177,6 +1288,31 @@ int RsDataService::removeGroups(const std::vector<std::string> &grpIds)
return 1;
}
int RsDataService::retrieveGroupIds(std::vector<std::string> &grpIds)
{
RsStackMutex stack(mDbMutex);
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, grpIdColumn, "", "");
if(c)
{
bool valid = c->moveToFirst();
while(valid)
{
std::string grpId;
c->getString(0, grpId);
grpIds.push_back(grpId);
valid = c->moveToNext();
}
delete c;
}else
{
return 0;
}
return 1;
}
bool RsDataService::locked_updateMessageEntries(const MsgUpdates& updates)
{

View file

@ -106,6 +106,13 @@ public:
*/
int removeGroups(const std::vector<RsGxsGroupId>& grpIds);
/*!
* Retrieves all group ids in store
* @param grpIds all grpids in store is inserted into this vector
* @return error code
*/
int retrieveGroupIds(std::vector<std::string> &grpIds);
/*!
* @return the cache size set for this RsGeneralDataService in bytes
*/
@ -130,6 +137,13 @@ public:
*/
int storeGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grp);
/*!
* Updates group entries in Db
* @param grp map of group and decoded meta data
* @return error code
*/
int updateGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grsp);
/*!
* @param metaData The meta data item to update
* @return error code
@ -238,6 +252,7 @@ private:
std::list<std::string> grpColumns;
std::list<std::string> grpMetaColumns;
std::list<std::string> grpIdColumn;
std::string mServiceDir, mDbName;
uint16_t mServType;

View file

@ -169,6 +169,13 @@ public:
*/
virtual int removeGroups(const std::vector<RsGxsGroupId>& grpIds) = 0;
/*!
* Retrieves all group ids in store
* @param grpIds all grpids in store is inserted into this vector
* @return error code
*/
virtual int retrieveGroupIds(std::vector<RsGxsGroupId>& grpIds) = 0;
/*!
* @return the cache size set for this RsGeneralDataService in bytes
*/
@ -194,6 +201,13 @@ public:
virtual int storeGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grsp) = 0;
/*!
* Updates group entries in Db
* @param grp map of group and decoded meta data
* @return error code
*/
virtual int updateGroup(std::map<RsNxsGrp*, RsGxsGrpMetaData*>& grsp) = 0;
/*!
* @param metaData
*/

View file

@ -113,6 +113,8 @@ void RsGenExchange::tick()
publishMsgs();
processGroupUpdatePublish();
processRecvdData();
if(!mNotifications.empty())
@ -273,6 +275,56 @@ void RsGenExchange::generateGroupKeys(RsTlvSecurityKeySet& privatekeySet,
}
}
void RsGenExchange::generatePublicFromPrivateKeys(const RsTlvSecurityKeySet &privatekeySet,
RsTlvSecurityKeySet &publickeySet)
{
// actually just copy settings of one key except mark its key flags public
typedef std::map<std::string, RsTlvSecurityKey> keyMap;
const keyMap& allKeys = privatekeySet.keys;
keyMap::const_iterator cit = allKeys.begin();
bool adminFound = false, publishFound = false;
for(; cit != allKeys.end(); cit++)
{
const RsTlvSecurityKey& privKey = cit->second;
if(privKey.keyFlags & RSTLV_KEY_TYPE_FULL)
{
RsTlvSecurityKey pubKey;
pubKey = privKey;
RSA *rsaPrivKey = NULL, *rsaPubKey = NULL;
rsaPrivKey = GxsSecurity::extractPrivateKey(privKey);
if(rsaPrivKey)
rsaPubKey = RSAPublicKey_dup(rsaPrivKey);
if(rsaPrivKey && rsaPubKey)
{
GxsSecurity::setRSAPublicKey(pubKey, rsaPubKey);
if(pubKey.keyFlags & RSTLV_KEY_DISTRIB_ADMIN)
pubKey.keyFlags = RSTLV_KEY_DISTRIB_ADMIN | RSTLV_KEY_TYPE_PUBLIC_ONLY;
if(pubKey.keyFlags & RSTLV_KEY_DISTRIB_PRIVATE)
pubKey.keyFlags = RSTLV_KEY_DISTRIB_PRIVATE | RSTLV_KEY_TYPE_PUBLIC_ONLY;
pubKey.endTS = pubKey.startTS + 60 * 60 * 24 * 365 * 5; /* approx 5 years */
publickeySet.keys.insert(std::make_pair(pubKey.keyId, pubKey));
}
if(rsaPrivKey)
RSA_free(rsaPrivKey);
if(rsaPubKey)
RSA_free(rsaPubKey);
}
}
}
uint8_t RsGenExchange::createGroup(RsNxsGrp *grp, RsTlvSecurityKeySet& privateKeySet, RsTlvSecurityKeySet& publicKeySet)
{
std::cerr << "RsGenExchange::createGroup()";
@ -763,6 +815,7 @@ int RsGenExchange::validateMsg(RsNxsMsg *msg, const uint32_t& grpFlag, RsTlvSecu
}else
{
std::list<std::string> peers;
peers.push_back(msg->PeerId());
mGixs->requestKey(metaData.mAuthorId, peers);
return VALIDATE_FAIL_TRY_LATER;
}
@ -836,6 +889,7 @@ int RsGenExchange::validateGrp(RsNxsGrp* grp, RsTlvSecurityKeySet& grpKeySet)
}else
{
std::list<std::string> peers;
peers.push_back(grp->PeerId());
mGixs->requestKey(metaData.mAuthorId, peers);
return VALIDATE_FAIL_TRY_LATER;
}
@ -1364,6 +1418,20 @@ void RsGenExchange::publishGroup(uint32_t& token, RsGxsGrpItem *grpItem)
}
void RsGenExchange::updateGroup(uint32_t& token, RsGxsGroupUpdateMeta& updateMeta, RsGxsGrpItem* grpItem)
{
RsStackMutex stack(mGenMtx);
token = mDataAccess->generatePublicToken();
mGroupUpdatePublish.push_back(GroupUpdatePublish(grpItem, updateMeta, token));
#ifdef GEN_EXCH_DEBUG
std::cerr << "RsGenExchange::updateGroup() token: " << token;
std::cerr << std::endl;
#endif
}
void RsGenExchange::publishMsg(uint32_t& token, RsGxsMsgItem *msgItem)
{
RsStackMutex stack(mGenMtx);
@ -1716,6 +1784,7 @@ void RsGenExchange::publishMsgs()
msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_UNREAD;
msgId = msg->msgId;
grpId = msg->grpId;
msg->metaData->recvTS = time(NULL);
computeHash(msg->msg, msg->metaData->mHash);
mDataAccess->addMsgData(msg);
msgChangeMap[grpId].push_back(msgId);
@ -1780,6 +1849,115 @@ RsGenExchange::ServiceCreate_Return RsGenExchange::service_CreateGroup(RsGxsGrpI
#define PENDING_SIGN_TIMEOUT 10 // 5 seconds
void RsGenExchange::processGroupUpdatePublish()
{
RsStackMutex stack(mGenMtx);
// get keys for group update publish
// first build meta request map for groups to be updated
std::map<std::string, RsGxsGrpMetaData*> grpMeta;
std::vector<GroupUpdatePublish>::iterator vit = mGroupUpdatePublish.begin();
for(; vit != mGroupUpdatePublish.end(); vit++)
{
GroupUpdatePublish& gup = *vit;
const RsGxsGroupId& groupId = gup.grpItem->meta.mGroupId;
grpMeta.insert(std::make_pair(groupId, (RsGxsGrpMetaData*)(NULL)));
}
if(grpMeta.empty())
return;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
// now
vit = mGroupUpdatePublish.begin();
for(; vit != mGroupUpdatePublish.end(); vit++)
{
GroupUpdatePublish& gup = *vit;
const RsGxsGroupId& groupId = gup.grpItem->meta.mGroupId;
std::map<std::string, RsGxsGrpMetaData*>::iterator mit = grpMeta.find(groupId);
RsGxsGrpMetaData* meta = NULL;
if(mit == grpMeta.end())
{
std::cerr << "Error! could not find meta of old group to update!" << std::endl;
mDataAccess->updatePublicRequestStatus(gup.mToken, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED);
delete gup.grpItem;
continue;
}else
{
meta = mit->second;
}
gup.grpItem->meta = *meta;
assignMetaUpdates(gup.grpItem->meta, gup.mUpdateMeta);
GxsGrpPendingSign ggps(gup.grpItem, ggps.mToken);
bool publishAndAdminPrivatePresent = checkKeys(meta->keys);
if(publishAndAdminPrivatePresent)
{
ggps.mPrivateKeys = meta->keys;
generatePublicFromPrivateKeys(ggps.mPrivateKeys, ggps.mPublicKeys);
ggps.mHaveKeys = true;
ggps.mStartTS = time(NULL);
ggps.mLastAttemptTS = 0;
ggps.mIsUpdate = true;
ggps.mToken = gup.mToken;
mGrpsToPublish.push_back(ggps);
}else
{
delete gup.grpItem;
mDataAccess->updatePublicRequestStatus(gup.mToken, RsTokenService::GXS_REQUEST_V2_STATUS_FAILED);
}
delete meta;
}
mGroupUpdatePublish.clear();
}
void RsGenExchange::assignMetaUpdates(RsGroupMetaData& meta, const RsGxsGroupUpdateMeta metaUpdate) const
{
const RsGxsGroupUpdateMeta::GxsMetaUpdate* updates = metaUpdate.getUpdates();
RsGxsGroupUpdateMeta::GxsMetaUpdate::const_iterator mit = updates->begin();
for(; mit != updates->end(); mit++)
{
if(mit->first == RsGxsGroupUpdateMeta::NAME)
meta.mGroupName = mit->second;
}
}
bool RsGenExchange::checkKeys(const RsTlvSecurityKeySet& keySet)
{
typedef std::map<std::string, RsTlvSecurityKey> keyMap;
const keyMap& allKeys = keySet.keys;
keyMap::const_iterator cit = allKeys.begin();
bool adminFound = false, publishFound = false;
for(; cit != allKeys.end(); cit++)
{
const RsTlvSecurityKey& key = cit->second;
if(key.keyFlags & RSTLV_KEY_TYPE_FULL)
{
if(key.keyFlags & RSTLV_KEY_DISTRIB_ADMIN)
adminFound = true;
if(key.keyFlags & RSTLV_KEY_DISTRIB_PRIVATE)
publishFound = true;
}
}
// user must have both private and public parts of publish and admin keys
return adminFound && publishFound;
}
void RsGenExchange::publishGrps()
{
RsStackMutex stack(mGenMtx);
@ -1850,10 +2028,6 @@ void RsGenExchange::publishGrps()
// get group id from private admin key id
grpItem->meta.mGroupId = grp->grpId = privAdminKey.keyId;
// what!? this will remove the private keys!
privatekeySet.keys.insert(publicKeySet.keys.begin(),
publicKeySet.keys.end());
ServiceCreate_Return ret = service_CreateGroup(grpItem, privatekeySet);
@ -1901,7 +2075,12 @@ void RsGenExchange::publishGrps()
{
grpId = grp->grpId;
computeHash(grp->grp, grp->metaData->mHash);
mDataAccess->addGroupData(grp);
grp->metaData->mRecvTS = time(NULL);
if(ggps.mIsUpdate)
mDataAccess->updateGroupData(grp);
else
mDataAccess->addGroupData(grp);
}
else
{
@ -2033,6 +2212,8 @@ void RsGenExchange::processRecvdData()
processRecvdGroups();
processRecvdMessages();
performUpdateValidation();
}
@ -2117,6 +2298,7 @@ void RsGenExchange::processRecvdMessages()
if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry);
computeHash(msg->msg, meta->mHash);
meta->recvTS = time(NULL);
}
}
else
@ -2188,9 +2370,12 @@ void RsGenExchange::processRecvdGroups()
RsStackMutex stack(mGenMtx);
NxsGrpPendValidVect::iterator vit = mReceivedGrps.begin();
std::vector<std::string> existingGrpIds;
std::list<std::string> grpIds;
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grps;
std::list<RsGxsGroupId> grpIds;
mDataStore->retrieveGroupIds(existingGrpIds);
while( vit != mReceivedGrps.end())
{
@ -2215,13 +2400,24 @@ void RsGenExchange::processRecvdGroups()
meta->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
meta->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED;
if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
meta->mOriginator = grp->PeerId();
computeHash(grp->grp, meta->mHash);
grps.insert(std::make_pair(grp, meta));
grpIds.push_back(grp->grpId);
// now check if group already existss
if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end())
{
meta->mRecvTS = time(NULL);
if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
meta->mOriginator = grp->PeerId();
grps.insert(std::make_pair(grp, meta));
grpIds.push_back(grp->grpId);
}
else
{
GroupUpdate update;
update.newGrp = grp;
mGroupUpdates.push_back(update);
}
erase = true;
}
else if(ret == VALIDATE_FAIL)
@ -2273,3 +2469,99 @@ void RsGenExchange::processRecvdGroups()
mDataStore->storeGroup(grps);
}
}
void RsGenExchange::performUpdateValidation()
{
RsStackMutex stack(mGenMtx);
#ifdef GXS_GENX_DEBUG
std::cerr << "RsGenExchange::performUpdateValidation() " << std::endl;
#endif
if(mGroupUpdates.empty())
return;
std::map<std::string, RsGxsGrpMetaData*> grpMetas;
std::vector<GroupUpdate>::iterator vit = mGroupUpdates.begin();
for(; vit != mGroupUpdates.end(); vit++)
grpMetas.insert(std::make_pair(vit->newGrp->grpId, (RsGxsGrpMetaData*)NULL));
if(!grpMetas.empty())
mDataStore->retrieveGxsGrpMetaData(grpMetas);
else
return;
vit = mGroupUpdates.begin();
for(; vit != mGroupUpdates.end(); vit++)
{
GroupUpdate& gu = *vit;
std::map<std::string, RsGxsGrpMetaData*>::iterator mit =
grpMetas.find(gu.newGrp->grpId);
gu.oldGrpMeta = mit->second;
gu.validUpdate = updateValid(*(gu.oldGrpMeta), *(gu.newGrp));
}
#ifdef GXS_GENX_DEBUG
std::cerr << "RsGenExchange::performUpdateValidation() " << std::endl;
#endif
vit = mGroupUpdates.begin();
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grps;
for(; vit != mGroupUpdates.end(); vit++)
{
GroupUpdate& gu = *vit;
if(gu.validUpdate)
{
if(gu.newGrp->metaData->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
gu.newGrp->metaData->mOriginator = gu.newGrp->PeerId();
grps.insert(std::make_pair(gu.newGrp, gu.newGrp->metaData));
}
else
{
delete gu.newGrp;
}
delete gu.oldGrpMeta;
}
mDataStore->updateGroup(grps);
mGroupUpdates.clear();
}
bool RsGenExchange::updateValid(RsGxsGrpMetaData& oldGrpMeta, RsNxsGrp& newGrp) const
{
std::map<SignType, RsTlvKeySignature>& signSet = newGrp.metaData->signSet.keySignSet;
std::map<SignType, RsTlvKeySignature>::iterator mit = signSet.find(GXS_SERV::FLAG_AUTHEN_ADMIN);
if(mit == signSet.end())
{
#ifdef GXS_GENX_DEBUG
std::cerr << "RsGenExchange::updateValid() new admin sign not found! " << std::endl;
std::cerr << "RsGenExchange::updateValid() grpId: " << oldGrp.grpId << std::endl;
#endif
return false;
}
RsTlvKeySignature adminSign = mit->second;
std::map<std::string, RsTlvSecurityKey>& keys = oldGrpMeta.keys.keys;
std::map<std::string, RsTlvSecurityKey>::iterator keyMit = keys.find(oldGrpMeta.mGroupId);
if(keyMit == keys.end())
{
#ifdef GXS_GENX_DEBUG
std::cerr << "RsGenExchange::updateValid() admin key not found! " << std::endl;
#endif
return false;
}
// also check this is the latest published group
bool latest = newGrp.metaData->mPublishTs > oldGrpMeta.mPublishTs;
return GxsSecurity::validateNxsGrp(newGrp, adminSign, keyMit->second) && latest;
}

View file

@ -71,13 +71,14 @@ class GxsGrpPendingSign
public:
GxsGrpPendingSign(RsGxsGrpItem* item, uint32_t token): mLastAttemptTS(0), mStartTS(time(NULL)), mToken(token),
mItem(item), mHaveKeys(false)
mItem(item), mHaveKeys(false), mIsUpdate(false)
{}
time_t mLastAttemptTS, mStartTS;
uint32_t mToken;
RsGxsGrpItem* mItem;
bool mHaveKeys; // mKeys->first == true if key present
bool mIsUpdate;
RsTlvSecurityKeySet mPrivateKeys;
RsTlvSecurityKeySet mPublicKeys;
};
@ -516,7 +517,6 @@ protected:
/*!
* Enables publication of a group item \n
* If the item exists already this is simply versioned \n
* This will induce a related change message \n
* Ownership of item passes to this rsgenexchange \n
* @param token
@ -524,6 +524,16 @@ protected:
*/
void publishGroup(uint32_t& token, RsGxsGrpItem* grpItem);
/*!
* Updates an existing group item \n
* This will induce a related change message \n
* Ownership of item passes to this rsgenexchange \n
* @param token
* @param grpItem
*/
void updateGroup(uint32_t& token, RsGxsGroupUpdateMeta& updateMeta, RsGxsGrpItem* grpItem);
public:
/*!
* Enables publication of a message item \n
@ -629,6 +639,8 @@ private:
void publishGrps();
void processGroupUpdatePublish();
void publishMsgs();
/*!
@ -700,11 +712,20 @@ private:
/*!
* Generate a set of keys that can define a GXS group
* @param privatekeySet contains private generated keys
* @param privatekeySet contains public generated keys (counterpart of private)
* @param publickeySet contains public generated keys (counterpart of private)
* @param genPublicKeys should publish key pair also be generated
*/
void generateGroupKeys(RsTlvSecurityKeySet& privatekeySet, RsTlvSecurityKeySet& publickeySet, bool genPublishKeys);
/*!
* Generate public set of keys from their private counterparts
* No keys will be generated if one fails
* @param privatekeySet contains private generated keys
* @param publickeySet contains public generated keys (counterpart of private)
* @return false if key gen failed for a key set
*/
void generatePublicFromPrivateKeys(const RsTlvSecurityKeySet& privatekeySet, RsTlvSecurityKeySet& publickeySet);
/*!
* Attempts to validate msg signatures
* @param msg message to be validated
@ -736,6 +757,32 @@ private:
static void computeHash(const RsTlvBinaryData& data, std::string& hash);
/*!
* Checks validation of recently received groups to be
* updated
*/
void performUpdateValidation();
/*!
* Checks if the update is valid (i.e. the new admin signature is by the old admin key)
* @param oldGrp the old group to be updated (must have meta data member initialised)
* @param newGrp the new group that updates the old group (must have meta data member initialised)
* @return
*/
bool updateValid(RsGxsGrpMetaData& oldGrp, RsNxsGrp& newGrp) const;
/*!
* convenience function for checking private publish and admin keys are present
* @param keySet The keys set to split into a private and public set
* @return false, if private admin and publish keys cannot be found, true otherwise
*/
bool checkKeys(const RsTlvSecurityKeySet& keySet);
/*!
* Convenience function for assigning the meta update items to the actual group meta
*/
void assignMetaUpdates(RsGroupMetaData& meta, const RsGxsGroupUpdateMeta metaUpdate) const;
private:
RsMutex mGenMtx;
@ -798,6 +845,13 @@ private:
const uint8_t CREATE_FAIL, CREATE_SUCCESS, CREATE_FAIL_TRY_LATER, SIGN_MAX_ATTEMPTS;
const uint8_t SIGN_FAIL, SIGN_SUCCESS, SIGN_FAIL_TRY_LATER;
const uint8_t VALIDATE_FAIL, VALIDATE_SUCCESS, VALIDATE_FAIL_TRY_LATER, VALIDATE_MAX_ATTEMPTS;
private:
std::vector<GroupUpdate> mGroupUpdates, mPeersGroupUpdate;
std::vector<GroupUpdatePublish> mGroupUpdatePublish;
};
#endif // RSGENEXCHANGE_H

View file

@ -176,7 +176,7 @@ class RsGixsReputation
public:
// get Reputation.
virtual bool haveReputation(const RsGxsId &id) = 0;
virtual bool loadReputation(const RsGxsId &id) = 0;
virtual bool loadReputation(const RsGxsId &id, const std::list<std::string>& peers) = 0;
virtual bool getReputation(const RsGxsId &id, GixsReputation &rep) = 0;
};

View file

@ -74,6 +74,7 @@ void RsGxsGrpMetaData::clear(){
mOriginator.clear();
mCircleType = 0;
mAuthenFlags = 0;
mRecvTS = 0;
}
@ -196,6 +197,7 @@ void RsGxsMsgMetaData::clear()
mMsgFlags = 0;
mMsgStatus = 0;
mChildTs = 0;
recvTS = 0;
}
bool RsGxsMsgMetaData::serialise(void *data, uint32_t *size)

View file

@ -70,17 +70,16 @@ public:
uint32_t mPop; // HOW DO WE DO THIS NOW.
uint32_t mMsgCount; // ???
time_t mLastPost; // ???
uint32_t mLastPost; // ???
uint32_t mGroupStatus;
uint32_t mRecvTS;
std::string mOriginator;
std::string mInternalCircle;
std::string mHash;
};
class RsGxsMsgMetaData
{
public:
@ -114,6 +113,7 @@ public:
uint32_t mMsgStatus;
time_t mChildTs;
uint32_t recvTS;
std::string mHash;
bool validated;

View file

@ -1487,7 +1487,14 @@ bool RsGxsDataAccess::addGroupData(RsNxsGrp* grp) {
return mDataStore->storeGroup(grpM);
}
bool RsGxsDataAccess::updateGroupData(RsNxsGrp* grp) {
RsStackMutex stack(mDataMutex);
std::map<RsNxsGrp*, RsGxsGrpMetaData*> grpM;
grpM.insert(std::make_pair(grp, grp->metaData));
return mDataStore->updateGroup(grpM);
}
bool RsGxsDataAccess::addMsgData(RsNxsMsg* msg) {

View file

@ -126,14 +126,21 @@ public:
public:
/*!
* This adds a groups to the gxs data base, this is a blocking call
* Responsibility for grp still lies with callee \n
* This adds a groups to the gxs data base, this is a blocking call \n
* If function returns successfully DataAccess can be queried for grp
* @param grp the group to add, responsibility grp passed lies with callee
* @return false if group cound not be added
*/
bool addGroupData(RsNxsGrp* grp);
/*!
* This updates a groups in the gxs data base, this is a blocking call \n
* If function returns successfully DataAccess can be queried for grp
* @param grp the group to add, responsibility grp passed lies with callee
* @return false if group cound not be added
*/
bool updateGroupData(RsNxsGrp* grp);
/*!
* This adds a group to the gxs data base, this is a blocking call \n
* Responsibility for msg still lies with callee \n

View file

@ -36,16 +36,16 @@
#define GIXS_CUT_OFF 0
#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds
#define TRANSAC_TIMEOUT 10 // 10 seconds
const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000;
RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, RsGixsReputation* reputations, RsGcxs* circles)
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, RsGixsReputation* reputations, RsGcxs* circles, bool grpAutoSync)
: p3Config(servType), p3ThreadedService(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mDataStore(gds), mTransactionN(0),
mObserver(nxsObs), mNxsMutex("RsGxsNetService"), mNetMgr(netMgr), mSYNC_PERIOD(SYNC_PERIOD),
mSyncTs(0), mReputations(reputations), mCircles(circles)
mSyncTs(0), mReputations(reputations), mCircles(circles), mGrpAutoSync(grpAutoSync), mGrpServerUpdateItem(NULL)
{
addSerialType(new RsNxsSerialiser(mServType));
@ -85,13 +85,27 @@ void RsGxsNetService::syncWithPeers()
std::set<std::string>::iterator sit = peers.begin();
// for now just grps
for(; sit != peers.end(); sit++)
if(mGrpAutoSync)
{
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
grp->clear();
grp->PeerId(*sit);
sendItem(grp);
// for now just grps
for(; sit != peers.end(); sit++)
{
const std::string peerId = *sit;
ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId);
uint32_t updateTS = 0;
if(cit != mClientGrpUpdateMap.end())
{
const RsGxsGrpUpdateItem *gui = cit->second;
updateTS = gui->grpUpdateTS;
}
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
grp->clear();
grp->PeerId(*sit);
grp->updateTS = updateTS;
sendItem(grp);
}
}
#ifdef GXS_ENABLE_SYNC_MSGS
@ -108,7 +122,9 @@ void RsGxsNetService::syncWithPeers()
RsGxsGrpMetaData* meta = mit->second;
if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
{
grpIds.push_back(mit->first);
}
delete meta;
}
@ -122,13 +138,36 @@ void RsGxsNetService::syncWithPeers()
std::vector<RsGxsGroupId>::iterator vit = grpIds.begin();
// now see if you have an updateTS so optimise whether you need
// to get a new list of peer data
RsGxsMsgUpdateItem* mui = NULL;
ClientMsgMap::const_iterator cit = mClientMsgUpdateMap.find(*sit);
if(cit != mClientMsgUpdateMap.end())
{
mui = cit->second;
}
for(; vit != grpIds.end(); vit++)
{
RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType);
msg->clear();
msg->PeerId(*sit);
msg->grpId = *vit;
sendItem(msg);
uint32_t updateTS = 0;
if(mui)
{
std::map<std::string, uint32_t>::const_iterator cit2 = mui->msgUpdateTS.find(*vit);
if(cit2 != mui->msgUpdateTS.end())
{
updateTS = cit2->second;
}
}
RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType);
msg->clear();
msg->PeerId(*sit);
msg->grpId = *vit;
msg->updateTS = updateTS;
sendItem(msg);
}
}
#endif
@ -524,20 +563,105 @@ void RsGxsNetService::collateMsgFragments(MsgFragments fragments, std::map<RsGxs
fragments.clear();
}
bool RsGxsNetService::loadList(std::list<RsItem*>& load)
class StoreHere
{
return false;
public:
StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap& cmm,
RsGxsNetService::ServerMsgMap& smm,
RsGxsServerGrpUpdateItem*& sgm) : mClientGrpMap(cgm), mClientMsgMap(cmm),
mServerMsgMap(smm), mServerGrpUpdateItem(sgm)
{}
void operator() (RsItem* item)
{
RsGxsMsgUpdateItem* mui;
RsGxsGrpUpdateItem* gui;
RsGxsServerGrpUpdateItem* gsui;
RsGxsServerMsgUpdateItem* msui;
if((mui = dynamic_cast<RsGxsMsgUpdateItem*>(item)) != NULL)
mClientMsgMap.insert(std::make_pair(mui->peerId, mui));
else if((gui = dynamic_cast<RsGxsGrpUpdateItem*>(item)) != NULL)
mClientGrpMap.insert(std::make_pair(gui->peerId, gui));
else if((msui = dynamic_cast<RsGxsServerMsgUpdateItem*>(item)) != NULL)
mServerMsgMap.insert(std::make_pair(msui->grpId, msui));
else if((gsui = dynamic_cast<RsGxsServerGrpUpdateItem*>(item)) != NULL)
{
if(mServerGrpUpdateItem == NULL)
{
mServerGrpUpdateItem = gsui;
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << "Error! More than one server group update item exists!" << std::endl;
#endif
delete gsui;
}
}
else
{
std::cerr << "Type not expected!" << std::endl;
}
}
private:
RsGxsNetService::ClientGrpMap& mClientGrpMap;
RsGxsNetService::ClientMsgMap& mClientMsgMap;
RsGxsNetService::ServerMsgMap& mServerMsgMap;
RsGxsServerGrpUpdateItem*& mServerGrpUpdateItem;
};
bool RsGxsNetService::loadList(std::list<RsItem *> &load)
{
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap,
mServerMsgUpdateMap, mGrpServerUpdateItem));
return true;
}
#include <algorithm>
template <typename UpdateMap>
struct get_second : public std::unary_function<typename UpdateMap::value_type, RsItem*>
{
RsItem* operator()(const typename UpdateMap::value_type& value) const
{
return value.second;
}
};
bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save)
{
return false;
RsStackMutex stack(mNxsMutex);
// hardcore templates
std::transform(mClientGrpUpdateMap.begin(), mClientGrpUpdateMap.end(),
std::back_inserter(save), get_second<ClientGrpMap>());
std::transform(mClientMsgUpdateMap.begin(), mClientMsgUpdateMap.end(),
std::back_inserter(save), get_second<ClientMsgMap>());
std::transform(mServerMsgUpdateMap.begin(), mServerMsgUpdateMap.end(),
std::back_inserter(save), get_second<ServerMsgMap>());
save.push_back(mGrpServerUpdateItem);
cleanup = false;
return true;
}
RsSerialiser *RsGxsNetService::setupSerialiser()
{
return NULL;
RsSerialiser *rss = new RsSerialiser;
rss->addSerialType(new RsGxsUpdateSerialiser(mServType));
return rss;
}
void RsGxsNetService::recvNxsItemQueue(){
@ -749,6 +873,7 @@ void RsGxsNetService::run(){
double timeDelta = 0.2;
int updateCounter = 0;
while(isRunning()){
@ -758,6 +883,14 @@ void RsGxsNetService::run(){
Sleep((int) (timeDelta * 1000));
#endif
if(updateCounter == 3)
{
updateServerSyncTS();
updateCounter = 0;
}
else
updateCounter++;
// process active transactions
processTransactions();
@ -767,13 +900,71 @@ void RsGxsNetService::run(){
// vetting of id and circle info
runVetting();
processExplicitGroupRequests();
}
}
void RsGxsNetService::updateServerSyncTS()
{
RsStackMutex stack(mNxsMutex);
std::map<RsGxsGroupId, RsGxsGrpMetaData*> gxsMap;
// retrieve all grps and update TS
mDataStore->retrieveGxsGrpMetaData(gxsMap);
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = gxsMap.begin();
// as a grp list server also note this is the latest item you have
if(mGrpServerUpdateItem == NULL)
{
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
}
bool change = false;
for(; mit != gxsMap.end(); mit++)
{
const RsGxsGroupId& grpId = mit->first;
RsGxsGrpMetaData* grpMeta = mit->second;
ServerMsgMap::iterator mapIT = mServerMsgUpdateMap.find(grpId);
RsGxsServerMsgUpdateItem* msui = NULL;
if(mapIT == mServerMsgUpdateMap.end())
{
msui = new RsGxsServerMsgUpdateItem(mServType);
msui->grpId = grpMeta->mGroupId;
mServerMsgUpdateMap.insert(std::make_pair(msui->grpId, msui));
}else
{
msui = mapIT->second;
}
if(grpMeta->mLastPost > msui->msgUpdateTS )
{
change = true;
msui->msgUpdateTS = grpMeta->mLastPost;
}
// this might be very inefficient with time
if(grpMeta->mRecvTS > mGrpServerUpdateItem->grpUpdateTS)
{
mGrpServerUpdateItem->grpUpdateTS = grpMeta->mRecvTS;
change = true;
}
}
// actual change in config settings, then save configuration
if(change)
IndicateConfigChanged();
freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>,
RsGxsGrpMetaData*>(gxsMap);
}
bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr)
{
//return tr->mTimeOut < ((uint32_t) time(NULL));
return false;
return tr->mTimeOut < ((uint32_t) time(NULL));
}
void RsGxsNetService::processTransactions(){
@ -1036,25 +1227,52 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
// notify listener of grps
mObserver->notifyNewGroups(grps);
// now note this as the latest you've received from this peer
std::string peerFrom = tr->mTransaction->PeerId();
uint32_t updateTS = tr->mTransaction->updateTS;
ClientGrpMap::iterator it = mClientGrpUpdateMap.find(peerFrom);
RsGxsGrpUpdateItem* item = NULL;
if(it != mClientGrpUpdateMap.end())
{
item = it->second;
}else
{
item = new RsGxsGrpUpdateItem(mServType);
mClientGrpUpdateMap.insert(
std::make_pair(peerFrom, item));
}
item->grpUpdateTS = updateTS;
item->peerId = peerFrom;
IndicateConfigChanged();
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{
std::vector<RsNxsMsg*> msgs;
std::string grpId;
while(tr->mItems.size() > 0)
{
RsNxsMsg* msg = dynamic_cast<RsNxsMsg*>(tr->mItems.front());
if(msg)
{
tr->mItems.pop_front();
msgs.push_back(msg);
if(grpId.empty())
grpId = msg->grpId;
tr->mItems.pop_front();
msgs.push_back(msg);
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg"
<< std::endl;
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg"
<< std::endl;
#endif
}
}
@ -1078,6 +1296,10 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
// notify listener of msgs
mObserver->notifyNewMessages(msgs);
// now note that this is the latest you've received from this peer
// for the grp id
locked_doMsgUpdateWork(tr->mTransaction, grpId);
}
}else if(tr->mFlag == NxsTransaction::FLAG_STATE_FAILED){
// don't do anything transaction will simply be cleaned
@ -1085,6 +1307,33 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
return;
}
void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransac *nxsTrans, const std::string &grpId)
{
// firts check if peer exists
const std::string& peerFrom = nxsTrans->PeerId();
ClientMsgMap::iterator it = mClientMsgUpdateMap.find(peerFrom);
RsGxsMsgUpdateItem* mui = NULL;
// now update the peer's entry for this grp id
if(it != mClientMsgUpdateMap.end())
{
mui = it->second;
}
else
{
mui = new RsGxsMsgUpdateItem(mServType);
mClientMsgUpdateMap.insert(std::make_pair(peerFrom, mui));
}
mui->msgUpdateTS[grpId] = nxsTrans->updateTS;
mui->peerId = peerFrom;
IndicateConfigChanged();
}
void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr)
{
uint16_t flag = tr->mTransaction->transactFlag;
@ -1125,6 +1374,7 @@ void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr)
std::cerr << "complete Sending Grp Data, transN: " <<
tr->mTransaction->transactionNumber << std::endl;
#endif
}else if(flag & RsNxsTransac::FLAG_TYPE_MSGS)
{
#ifdef NXS_NET_DEBUG
@ -1166,7 +1416,7 @@ void RsGxsNetService::locked_pushMsgTransactionFromList(
newTrans->mTimeOut = time(NULL) + mTransactionTimeOut;
// create transaction copy with your id to indicate
// its an outgoing transaction
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction = new RsNxsTransac(*transac);
newTrans->mTransaction->PeerId(mOwnId);
sendItem(transac);
{
@ -1248,7 +1498,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
msgIdSet.insert((*vit)->mMsgId);
delete(*vit);
}
msgMetaV.clear();
msgMetaV.clear();
// get unique id for this transaction
uint32_t transN = locked_getTransactionId();
@ -1261,6 +1511,9 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
MsgAuthorV toVet;
std::list<std::string> peers;
peers.push_back(tr->mTransaction->PeerId());
for(; llit != msgItemL.end(); llit++)
{
RsNxsSyncMsgItem*& syncItem = *llit;
@ -1294,7 +1547,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
else
{
// preload for speed
mReputations->loadReputation(syncItem->authorId);
mReputations->loadReputation(syncItem->authorId, peers);
MsgAuthEntry entry;
entry.mAuthorId = syncItem->authorId;
entry.mGrpId = syncItem->grpId;
@ -1375,6 +1628,7 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
}
std::map<std::string, RsGxsGrpMetaData*> grpMetaMap;
std::map<std::string, RsGxsGrpMetaData*>::const_iterator metaIter;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
// now do compare and add loop
@ -1384,13 +1638,20 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
uint32_t transN = locked_getTransactionId();
GrpAuthorV toVet;
std::list<std::string> peers;
peers.push_back(tr->mTransaction->PeerId());
for(; llit != grpItemL.end(); llit++)
{
RsNxsSyncGrpItem*& grpSyncItem = *llit;
const std::string& grpId = grpSyncItem->grpId;
metaIter = grpMetaMap.find(grpId);
bool haveItem = metaIter != grpMetaMap.end();
bool latestVersion = false;
if(grpMetaMap.find(grpId) == grpMetaMap.end()){
latestVersion = grpSyncItem->publishTs > metaIter->second->mPublishTs;
if(!haveItem || (haveItem && latestVersion) ){
// determine if you need to check reputation
bool checkRep = !grpSyncItem->authorId.empty();
@ -1412,7 +1673,7 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
else
{
// preload reputation for later
mReputations->loadReputation(grpSyncItem->authorId);
mReputations->loadReputation(grpSyncItem->authorId, peers);
GrpAuthEntry entry;
entry.mAuthorId = grpSyncItem->authorId;
entry.mGrpId = grpSyncItem->grpId;
@ -1505,11 +1766,15 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
return;
}
uint32_t updateTS = 0;
if(mGrpServerUpdateItem)
updateTS = mGrpServerUpdateItem->grpUpdateTS;
RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = transN;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
RsNxsTransac::FLAG_TYPE_GRPS;
ntr->updateTS = updateTS;
ntr->nItems = grps.size();
ntr->PeerId(tr->mTransaction->PeerId());
@ -1625,12 +1890,17 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
return;
}
std::string grpId = "";
for(;lit != tr->mItems.end(); lit++)
{
RsNxsSyncMsgItem* item = dynamic_cast<RsNxsSyncMsgItem*>(*lit);
if (item)
{
msgIds[item->grpId].push_back(item->msgId);
if(grpId.empty())
grpId = item->grpId;
}
else
{
@ -1687,10 +1957,18 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
return;
}
uint32_t updateTS = 0;
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(grpId);
if(cit != mServerMsgUpdateMap.end())
updateTS = cit->second->msgUpdateTS;
RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = transN;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
RsNxsTransac::FLAG_TYPE_MSGS;
ntr->updateTS = updateTS;
ntr->nItems = msgSize;
ntr->PeerId(peerId);
@ -1766,13 +2044,32 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
locked_addTransaction(tr);
}
bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncGrp *item)
{
// don't sync if you have no new updates for this peer
if(mGrpServerUpdateItem)
{
if(item->updateTS >= mGrpServerUpdateItem->grpUpdateTS && item->updateTS != 0)
{
return false;
}
}
return true;
}
void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
{
RsStackMutex stack(mNxsMutex);
if(!locked_CanReceiveUpdate(item))
return;
std::string peer = item->PeerId();
std::map<std::string, RsGxsGrpMetaData*> grp;
mDataStore->retrieveGxsGrpMetaData(grp);
@ -1826,6 +2123,8 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
return;
}
bool RsGxsNetService::canSendGrpId(const std::string& sslId, RsGxsGrpMetaData& grpMeta, std::vector<GrpIdCircleVet>& toVet)
{
// first do the simple checks
@ -1881,10 +2180,31 @@ bool RsGxsNetService::canSendGrpId(const std::string& sslId, RsGxsGrpMetaData& g
return true;
}
bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncMsg *item)
{
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(item->grpId);
if(cit != mServerMsgUpdateMap.end())
{
const RsGxsServerMsgUpdateItem *msui = cit->second;
if(item->updateTS >= msui->msgUpdateTS && item->updateTS != 0)
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::locked_CanReceiveUpdate(): Msgs up to date" << std::endl;
#endif
return false;
}
}
return true;
}
void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
{
RsStackMutex stack(mNxsMutex);
if(!locked_CanReceiveUpdate(item))
return;
const std::string& peer = item->PeerId();
GxsMsgMetaResult metaResult;
@ -2066,3 +2386,40 @@ void RsGxsNetService::setSyncAge(uint32_t age)
}
int RsGxsNetService::requestGrp(const std::list<RsGxsGroupId>& grpId, const std::string& peerId)
{
RsStackMutex stack(mNxsMutex);
mExplicitRequest[peerId].assign(grpId.begin(), grpId.end());
return 1;
}
void RsGxsNetService::processExplicitGroupRequests()
{
RsStackMutex stack(mNxsMutex);
std::map<std::string, std::list<RsGxsGroupId> >::const_iterator cit = mExplicitRequest.begin();
for(; cit != mExplicitRequest.end(); cit++)
{
const std::string& peerId = cit->first;
const std::list<RsGxsGroupId>& groupIdList = cit->second;
std::list<RsNxsItem*> grpSyncItems;
std::list<RsGxsGroupId>::const_iterator git = groupIdList.begin();
uint32_t transN = locked_getTransactionId();
for(; git != groupIdList.end(); git++)
{
RsNxsSyncGrpItem* item = new RsNxsSyncGrpItem(mServType);
item->grpId = *git;
item->PeerId(peerId);
item->flag = RsNxsSyncGrpItem::FLAG_REQUEST;
item->transactionNumber = transN;
grpSyncItems.push_back(item);
}
if(!grpSyncItems.empty())
locked_pushGrpTransactionFromList(grpSyncItems, peerId, transN);
}
mExplicitRequest.clear();
}

View file

@ -34,6 +34,7 @@
#include "rsnxsobserver.h"
#include "pqi/p3linkmgr.h"
#include "serialiser/rsnxsitems.h"
#include "serialiser/rsgxsupdateitems.h"
#include "rsgxsnetutils.h"
#include "pqi/p3cfgmgr.h"
#include "rsgixs.h"
@ -73,7 +74,7 @@ public:
* arrive
*/
RsGxsNetService(uint16_t servType, RsGeneralDataService* gds, RsNxsNetMgr* netMgr,
RsNxsObserver* nxsObs = NULL, RsGixsReputation* repuations = NULL, RsGcxs* circles = NULL);
RsNxsObserver* nxsObs = NULL, RsGixsReputation* repuations = NULL, RsGcxs* circles = NULL, bool grpAutoSync = true);
virtual ~RsGxsNetService();
@ -115,7 +116,7 @@ public:
* @param msgId the messages to retrieve
* @return request token to be redeemed
*/
int requestMsg(const std::string& msgId, uint8_t hops){ return 0;}
int requestMsg(const RsGxsGrpMsgIdPair& msgId){ return 0;}
/*!
* Request for this group is sent through to peers on your network
@ -123,7 +124,7 @@ public:
* @param enabled set to false to disable pause, and true otherwise
* @return request token to be redeemed
*/
int requestGrp(const std::list<std::string>& grpId, uint8_t hops){ return 0;}
int requestGrp(const std::list<RsGxsGroupId>& grpId, const std::string& peerId);
/* p3Config methods */
@ -322,6 +323,15 @@ private:
bool locked_canReceive(const RsGxsGrpMetaData * const grpMeta, const std::string& peerId);
void processExplicitGroupRequests();
void locked_doMsgUpdateWork(const RsNxsTransac* nxsTrans, const std::string& grpId);
void updateServerSyncTS();
bool locked_CanReceiveUpdate(const RsNxsSyncGrp* item);
bool locked_CanReceiveUpdate(const RsNxsSyncMsg* item);
private:
typedef std::vector<RsNxsGrp*> GrpFragments;
@ -418,10 +428,30 @@ private:
RsGcxs* mCircles;
RsGixsReputation* mReputations;
bool mGrpAutoSync;
// need to be verfied
std::vector<AuthorPending*> mPendingResp;
std::vector<GrpCircleVetting*> mPendingCircleVets;
std::map<std::string, std::list<RsGxsGroupId> > mExplicitRequest;
// nxs sync optimisation
// can pull dynamically the latest timestamp for each message
public:
typedef std::map<std::string, RsGxsMsgUpdateItem*> ClientMsgMap;
typedef std::map<std::string, RsGxsServerMsgUpdateItem*> ServerMsgMap;
typedef std::map<std::string, RsGxsGrpUpdateItem*> ClientGrpMap;
private:
ClientMsgMap mClientMsgUpdateMap;
ServerMsgMap mServerMsgUpdateMap;
ClientGrpMap mClientGrpUpdateMap;
RsGxsServerGrpUpdateItem* mGrpServerUpdateItem;
};
#endif // RSGXSNETSERVICE_H

View file

@ -47,14 +47,16 @@ bool AuthorPending::expired() const
}
bool AuthorPending::getAuthorRep(GixsReputation& rep,
const std::string& authorId)
const std::string& authorId, const std::string& peerId)
{
if(mRep->haveReputation(authorId))
{
return mRep->getReputation(authorId, rep);
}
mRep->loadReputation(authorId);
std::list<std::string> peers;
peers.push_back(peerId);
mRep->loadReputation(authorId, peers);
return false;
}
@ -94,7 +96,7 @@ bool MsgRespPending::accepted()
if(!entry.mPassedVetting)
{
GixsReputation rep;
if(getAuthorRep(rep, entry.mAuthorId))
if(getAuthorRep(rep, entry.mAuthorId, mPeerId))
{
if(rep.score > mCutOff)
{
@ -129,7 +131,7 @@ bool GrpRespPending::accepted()
{
GixsReputation rep;
if(getAuthorRep(rep, entry.mAuthorId))
if(getAuthorRep(rep, entry.mAuthorId, mPeerId))
{
if(rep.score > mCutOff)
{

View file

@ -141,7 +141,7 @@ protected:
* @param authorId reputation to get
* @return true if successfully retrieve repution
*/
bool getAuthorRep(GixsReputation& rep, const std::string& authorId);
bool getAuthorRep(GixsReputation& rep, const std::string& authorId, const std::string& peerId);
private:

View file

@ -129,4 +129,24 @@ private:
};
class GroupUpdate
{
public:
GroupUpdate() : oldGrpMeta(NULL), newGrp(NULL), validUpdate(false)
{}
RsGxsGrpMetaData* oldGrpMeta;
RsNxsGrp* newGrp;
bool validUpdate;
};
class GroupUpdatePublish
{
public:
GroupUpdatePublish(RsGxsGrpItem* item, RsGxsGroupUpdateMeta updateMeta, uint32_t token)
: grpItem(item), mToken(token), mUpdateMeta(updateMeta) {}
RsGxsGrpItem* grpItem;
RsGxsGroupUpdateMeta mUpdateMeta;
uint32_t mToken;
};
#endif /* GXSUTIL_H_ */

View file

@ -68,21 +68,6 @@ public:
*/
virtual void setSyncAge(uint32_t age) = 0;
/*!
* Explicitly requests all the groups contained by a peer
* Circumvents polling of peers for message
* @param peerId id of peer
*/
virtual void requestGroupsOfPeer(const std::string& peerId) = 0;
/*!
* get messages of a peer for a given group id, this circumvents the normal
* polling of peers for messages of given group id
* @param peerId Id of peer
* @param grpId id of group to request messages for
*/
virtual void requestMessagesOfPeer(const std::string& peerId, const std::string& grpId) = 0;
/*!
* Initiates a search through the network
* This returns messages which contains the search terms set in RsGxsSearch
@ -116,7 +101,7 @@ public:
* @param msgId the messages to retrieve
* @return request token to be redeemed
*/
virtual int requestMsg(const std::string& msgId, uint8_t hops) = 0;
virtual int requestMsg(const RsGxsGrpMsgIdPair& msgId) = 0;
/*!
* Request for this group is sent through to peers on your network
@ -124,7 +109,7 @@ public:
* @param enabled set to false to disable pause, and true otherwise
* @return request token to be redeemed
*/
virtual int requestGrp(const std::list<std::string>& grpId, uint8_t hops) = 0;
virtual int requestGrp(const std::list<RsGxsGroupId>& grpId, const std::string& peerId) = 0;
};