From 24839ee2373e09ab25e29f8f7a2f9500621db493 Mon Sep 17 00:00:00 2001 From: chrisparker126 Date: Sat, 23 Nov 2013 23:39:55 +0000 Subject: [PATCH] decided to use polling and a database trigger to maintain consistentency of last group post for database sync - db reset done, you will lose your current gxs data git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs_finale@6906 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/gxs/rsdataservice.cc | 22 +++++- libretroshare/src/gxs/rsgenexchange.cc | 3 + libretroshare/src/gxs/rsgxsdata.cc | 2 + libretroshare/src/gxs/rsgxsdata.h | 6 +- libretroshare/src/gxs/rsgxsnetservice.cc | 94 +++++++++++++++--------- libretroshare/src/gxs/rsgxsnetservice.h | 2 + libretroshare/src/rsserver/rsinit.cc | 2 +- 7 files changed, 92 insertions(+), 39 deletions(-) diff --git a/libretroshare/src/gxs/rsdataservice.cc b/libretroshare/src/gxs/rsdataservice.cc index c0f7082a7..132e366bd 100644 --- a/libretroshare/src/gxs/rsdataservice.cc +++ b/libretroshare/src/gxs/rsdataservice.cc @@ -34,6 +34,8 @@ #define MSG_TABLE_NAME std::string("MESSAGES") #define GRP_TABLE_NAME std::string("GROUPS") +#define GRP_LAST_POST_UPDATE_TRIGGER std::string("LAST_POST_UPDATE") + // generic #define KEY_NXS_FILE std::string("nxsFile") @@ -48,6 +50,7 @@ #define KEY_NXS_META std::string("meta") #define KEY_NXS_SERV_STRING std::string("serv_str") #define KEY_NXS_HASH std::string("hash") +#define KEY_RECV_TS std::string("recv_time_stamp") // grp table columns @@ -111,6 +114,7 @@ #define COL_GRP_INTERN_CIRCLE 18 #define COL_GRP_ORIGINATOR 19 #define COL_GRP_AUTHEN_FLAGS 20 +#define COL_GRP_RECV_TS 21 // msg col numbers @@ -122,6 +126,7 @@ #define COL_THREAD_ID 11 #define COL_MSG_NAME 12 #define COL_MSG_SERV_STRING 13 +#define COL_MSG_RECV_TS 14 // generic meta shared col numbers #define COL_GRP_ID 0 @@ -154,7 +159,7 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d msgMetaColumns.push_back(KEY_SIGN_SET); msgMetaColumns.push_back(KEY_NXS_IDENTITY); msgMetaColumns.push_back(KEY_NXS_HASH); msgMetaColumns.push_back(KEY_MSG_ID); msgMetaColumns.push_back(KEY_ORIG_MSG_ID); msgMetaColumns.push_back(KEY_MSG_STATUS); msgMetaColumns.push_back(KEY_CHILD_TS); msgMetaColumns.push_back(KEY_MSG_PARENT_ID); msgMetaColumns.push_back(KEY_MSG_THREAD_ID); - msgMetaColumns.push_back(KEY_MSG_NAME); msgMetaColumns.push_back(KEY_NXS_SERV_STRING); + msgMetaColumns.push_back(KEY_MSG_NAME); msgMetaColumns.push_back(KEY_NXS_SERV_STRING); msgMetaColumns.push_back(KEY_RECV_TS); // for retrieving actual data msgColumns.push_back(KEY_GRP_ID); msgColumns.push_back(KEY_NXS_FILE); msgColumns.push_back(KEY_NXS_FILE_OFFSET); @@ -168,7 +173,7 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d grpMetaColumns.push_back(KEY_GRP_LAST_POST); grpMetaColumns.push_back(KEY_ORIG_GRP_ID); grpMetaColumns.push_back(KEY_NXS_SERV_STRING); grpMetaColumns.push_back(KEY_GRP_SIGN_FLAGS); grpMetaColumns.push_back(KEY_GRP_CIRCLE_ID); grpMetaColumns.push_back(KEY_GRP_CIRCLE_TYPE); grpMetaColumns.push_back(KEY_GRP_INTERNAL_CIRCLE); grpMetaColumns.push_back(KEY_GRP_ORIGINATOR); - grpMetaColumns.push_back(KEY_GRP_AUTHEN_FLAGS); + grpMetaColumns.push_back(KEY_GRP_AUTHEN_FLAGS); grpMetaColumns.push_back(KEY_RECV_TS); // for retrieving actual grp data grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_FILE); grpColumns.push_back(KEY_NXS_FILE_OFFSET); @@ -212,6 +217,7 @@ void RsDataService::initialise(){ KEY_MSG_NAME + " TEXT," + KEY_NXS_SERV_STRING + " TEXT," + KEY_NXS_HASH + " TEXT," + + KEY_RECV_TS + " INT," + KEY_NXS_FILE_LEN + " INT);"); // create table for grp data @@ -240,8 +246,15 @@ void RsDataService::initialise(){ KEY_GRP_INTERNAL_CIRCLE + " TEXT," + KEY_GRP_ORIGINATOR + " TEXT," + KEY_NXS_HASH + " TEXT," + + KEY_RECV_TS + " INT," + KEY_SIGN_SET + " BLOB);"); + mDb->execSQL("CREATE TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER + + "UPDATE OF " + KEY_RECV_TS + " ON " + MSG_TABLE_NAME + + std::string("BEGIN ") + + "UPDATE " + GRP_TABLE_NAME + "SET " + KEY_GRP_LAST_POST + "= new." + + KEY_RECV_TS + ";" + + std::string("END;")); } RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c) @@ -293,6 +306,7 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c) c.getString(COL_GRP_INTERN_CIRCLE, grpMeta->mInternalCircle); c.getString(COL_GRP_ORIGINATOR, grpMeta->mOriginator); grpMeta->mAuthenFlags = c.getInt32(COL_GRP_AUTHEN_FLAGS); + grpMeta->mRecvTS = c.getInt32(COL_GRP_RECV_TS); if(ok) @@ -379,6 +393,7 @@ RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c) c.getString(COL_MSG_NAME, msgMeta->mMsgName); c.getString(COL_MSG_SERV_STRING, msgMeta->mServiceString); c.getString(COL_HASH, msgMeta->mHash); + msgMeta->recvTS = c.getInt32(COL_MSG_RECV_TS); offset = 0; data = (char*)c.getData(COL_SIGN_SET, data_len); @@ -491,6 +506,7 @@ int RsDataService::storeMessage(std::map &msg) cv.put(KEY_GRP_ID, msgMetaPtr->mGroupId); cv.put(KEY_NXS_SERV_STRING, msgMetaPtr->mServiceString); cv.put(KEY_NXS_HASH, msgMetaPtr->mHash); + cv.put(KEY_RECV_TS, (int32_t)msgMetaPtr->recvTS); char signSetData[msgMetaPtr->signSet.TlvSize()]; @@ -598,6 +614,7 @@ int RsDataService::storeGroup(std::map &grp) cv.put(KEY_GRP_ORIGINATOR, grpMetaPtr->mOriginator); cv.put(KEY_GRP_AUTHEN_FLAGS, (int32_t)grpMetaPtr->mAuthenFlags); cv.put(KEY_NXS_HASH, grpMetaPtr->mHash); + cv.put(KEY_RECV_TS, (int32_t)grpMetaPtr->mRecvTS); if(! (grpMetaPtr->mAuthorId.empty()) ){ cv.put(KEY_NXS_IDENTITY, grpMetaPtr->mAuthorId); @@ -1117,6 +1134,7 @@ int RsDataService::resetDataStore() mDb->execSQL("DROP TABLE " + MSG_TABLE_NAME); mDb->execSQL("DROP TABLE " + GRP_TABLE_NAME); + mDb->execSQL("DROP TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER); } // recreate database diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index ffa2a3000..316832b90 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -1789,6 +1789,7 @@ void RsGenExchange::publishMsgs() msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_UNREAD; msgId = msg->msgId; grpId = msg->grpId; + msg->metaData->recvTS = time(NULL); computeHash(msg->msg, msg->metaData->mHash); mDataAccess->addMsgData(msg); msgChangeMap[grpId].push_back(msgId); @@ -2079,6 +2080,7 @@ void RsGenExchange::publishGrps() { grpId = grp->grpId; computeHash(grp->grp, grp->metaData->mHash); + grp->metaData->mRecvTS = time(NULL); if(ggps.mIsUpdate) mDataAccess->updateGroupData(grp); @@ -2407,6 +2409,7 @@ void RsGenExchange::processRecvdGroups() // now check if group already existss if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end()) { + meta->mRecvTS = time(NULL); if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY) meta->mOriginator = grp->PeerId(); diff --git a/libretroshare/src/gxs/rsgxsdata.cc b/libretroshare/src/gxs/rsgxsdata.cc index fbccd8af2..5de64b35d 100644 --- a/libretroshare/src/gxs/rsgxsdata.cc +++ b/libretroshare/src/gxs/rsgxsdata.cc @@ -74,6 +74,7 @@ void RsGxsGrpMetaData::clear(){ mOriginator.clear(); mCircleType = 0; mAuthenFlags = 0; + mRecvTS = 0; } @@ -196,6 +197,7 @@ void RsGxsMsgMetaData::clear() mMsgFlags = 0; mMsgStatus = 0; mChildTs = 0; + recvTS = 0; } bool RsGxsMsgMetaData::serialise(void *data, uint32_t *size) diff --git a/libretroshare/src/gxs/rsgxsdata.h b/libretroshare/src/gxs/rsgxsdata.h index 9a94cc8a0..5c17a5cd4 100644 --- a/libretroshare/src/gxs/rsgxsdata.h +++ b/libretroshare/src/gxs/rsgxsdata.h @@ -70,17 +70,16 @@ public: uint32_t mPop; // HOW DO WE DO THIS NOW. uint32_t mMsgCount; // ??? - time_t mLastPost; // ??? + uint32_t mLastPost; // ??? uint32_t mGroupStatus; + uint32_t mRecvTS; std::string mOriginator; std::string mInternalCircle; std::string mHash; }; - - class RsGxsMsgMetaData { public: @@ -114,6 +113,7 @@ public: uint32_t mMsgStatus; time_t mChildTs; + uint32_t recvTS; std::string mHash; bool validated; diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 877d437b4..1ec276391 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -877,6 +877,8 @@ void RsGxsNetService::run(){ Sleep((int) (timeDelta * 1000)); #endif + updateServerSyncTS(); + // process active transactions processTransactions(); @@ -891,6 +893,49 @@ void RsGxsNetService::run(){ } } +void RsGxsNetService::updateServerSyncTS() +{ + RsStackMutex stack(mNxsMutex); + + std::map gxsMap; + + // retrieve all grps and update TS + mDataStore->retrieveGxsGrpMetaData(gxsMap); + std::map::iterator mit = gxsMap.begin(); + + // as a grp list server also note this is the latest item you have + if(mGrpServerUpdateItem == NULL) + { + mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType); + } + + for(; mit != gxsMap.end(); mit++) + { + const RsGxsGroupId& grpId = mit->first; + RsGxsGrpMetaData* grpMeta = mit->second; + ServerMsgMap::iterator mapIT = mServerMsgUpdateMap.find(grpId); + RsGxsServerMsgUpdateItem* msui = NULL; + + if(mapIT == mServerMsgUpdateMap.end()) + { + msui = new RsGxsServerMsgUpdateItem(mServType); + msui->grpId = grpMeta->mGroupId; + mServerMsgUpdateMap.insert(std::make_pair(msui->grpId, msui)); + }else + { + msui = mapIT->second; + } + + msui->msgUpdateTS = grpMeta->mLastPost; + + // this might be very inefficient with time + if(grpMeta->mRecvTS > mGrpServerUpdateItem->grpUpdateTS) + mGrpServerUpdateItem->grpUpdateTS = grpMeta->mRecvTS; + } + + freeAndClearContainerResource, + RsGxsGrpMetaData*>(gxsMap); +} bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr) { return tr->mTimeOut < ((uint32_t) time(NULL)); @@ -1156,30 +1201,27 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr) // notify listener of grps mObserver->notifyNewGroups(grps); - // now note this as the latest you've received from this peer - std::string peerFrom = tr->mTransaction->PeerId(); - uint32_t updateTS = tr->mTransaction->updateTS; + // now note this as the latest you've received from this peer + std::string peerFrom = tr->mTransaction->PeerId(); + uint32_t updateTS = tr->mTransaction->updateTS; - ClientGrpMap::iterator it = mClientGrpUpdateMap.find(peerFrom); + ClientGrpMap::iterator it = mClientGrpUpdateMap.find(peerFrom); - RsGxsGrpUpdateItem* item = NULL; + RsGxsGrpUpdateItem* item = NULL; - if(it != mClientGrpUpdateMap.end()) - { - item = it->second; - }else - { - item = new RsGxsGrpUpdateItem(mServType); - } + if(it != mClientGrpUpdateMap.end()) + { + item = it->second; + }else + { + item = new RsGxsGrpUpdateItem(mServType); + mClientGrpUpdateMap.insert( + std::make_pair(peerFrom, item)); + } - item->grpUpdateTS = updateTS; - item->peerId = peerFrom; + item->grpUpdateTS = updateTS; + item->peerId = peerFrom; - mClientGrpUpdateMap.insert( - std::make_pair(peerFrom, item)); - - // as a grp list server also note this is the latest item you have - mGrpServerUpdateItem->grpUpdateTS = updateTS; }else if(flag & RsNxsTransac::FLAG_TYPE_MSGS) { @@ -1261,20 +1303,6 @@ void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransac *nxsTrans, const mui->msgUpdateTS[grpId] = nxsTrans->updateTS; mui->peerId = peerFrom; - ServerMsgMap::iterator mit = mServerMsgUpdateMap.find(grpId); - RsGxsServerMsgUpdateItem* msui = NULL; - if(mit != mServerMsgUpdateMap.end()) - { - msui = mit->second; - } - else - { - msui = new RsGxsServerMsgUpdateItem(mServType); - mServerMsgUpdateMap.insert(std::make_pair(grpId, msui)); - } - - msui->grpId = grpId; - msui->msgUpdateTS = nxsTrans->updateTS; } void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr) diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 0f7f95774..e1c27df83 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -327,6 +327,8 @@ private: void locked_doMsgUpdateWork(const RsNxsTransac* nxsTrans, const std::string& grpId); + void updateServerSyncTS(); + private: typedef std::vector GrpFragments; diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index 3c8f54155..6122adc4a 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -2227,7 +2227,7 @@ int RsServer::StartupRetroShare() std::string currGxsDir = RsInitConfig::configDir + "/GXS_phase2"; #ifdef GXS_DEV_TESTNET // Different Directory for testing. - currGxsDir += "_TESTNET5"; + currGxsDir += "_TESTNET6"; #endif bool cleanUpGxsDir = false;