added cleanup thread for GxsTransport

This commit is contained in:
csoler 2017-05-30 20:45:39 +02:00
parent f4c167c256
commit 5410c51ab9
2 changed files with 119 additions and 2 deletions

View File

@ -255,10 +255,94 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
}
}
void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
{
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true);
std::cerr << "GxsTransIntegrityCleanupThread::run()" << std::endl;
// compute hash and compare to stored value, if it fails then simply add it
// to list
GxsMsgReq grps;
for(std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin(); git != grp.end(); ++git)
{
RsNxsGrp* grp = git->second;
// store the group for retrieveNxsMsgs
grps[grp->grpId];
delete grp;
}
// now messages
GxsMsgReq msgsToDel;
GxsMsgResult msgs;
mDs->retrieveNxsMsgs(grps, msgs, false, true);
for(GxsMsgResult::iterator mit = msgs.begin();mit != msgs.end(); ++mit)
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
for(; vit != msgV.end(); ++vit)
{
RsNxsMsg* msg = *vit;
RsGxsTransSerializer s ;
uint32_t size = msg->msg.bin_len;
RsItem *item = s.deserialise(msg->msg.bin_data,&size);
RsGxsTransMailItem *mitem ;
RsGxsTransPresignedReceipt *pitem ;
if(item == NULL)
std::cerr << " Unrecocognised item type!" << std::endl;
else if(NULL != (mitem = dynamic_cast<RsGxsTransMailItem*>(item)))
{
std::cerr << " Mail data with ID " << std::hex << mitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << std::endl;
}
else if(NULL != (pitem = dynamic_cast<RsGxsTransPresignedReceipt*>(item)))
{
std::cerr << " Signed rcpt of ID " << std::hex << pitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << std::endl;
}
else
std::cerr << " Unknown item type!" << std::endl;
delete msg;
}
}
RS_STACK_MUTEX(mIntegrityMutex);
//mDs->removeMsgs(msgsToDel);
mDone = true;
}
void p3GxsTrans::service_tick()
{
GxsTokenQueue::checkRequests();
time_t now = time(NULL);
if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now)
{
if(!mCleanupThread)
mCleanupThread = new GxsTransIntegrityCleanupThread(getDataStore());
if(mCleanupThread->isRunning())
std::cerr << "Cleanup thread is already running. Not running it again!" << std::endl;
else
{
std::cerr << "Starting GxsIntegrity cleanup thread." << std::endl;
mCleanupThread->start() ;
mLastMsgCleanup = now ;
}
}
{
RS_STACK_MUTEX(mOutgoingMutex);
for ( auto it = mOutgoingQueue.begin(); it != mOutgoingQueue.end(); )

View File

@ -90,7 +90,11 @@ public:
mIdService(identities),
mServClientsMutex("p3GxsTrans client services map mutex"),
mOutgoingMutex("p3GxsTrans outgoing queue map mutex"),
mIngoingMutex("p3GxsTrans ingoing queue map mutex") {}
mIngoingMutex("p3GxsTrans ingoing queue map mutex")
{
mLastMsgCleanup = time(NULL) - 60; // to be changed into 0
mCleanupThread = NULL ;
}
virtual ~p3GxsTrans();
@ -154,7 +158,10 @@ private:
* signed acknowledged is received for each of them.
* Two weeks seems fair ATM.
*/
const static uint32_t GXS_STORAGE_PERIOD = 0x127500;
static const uint32_t GXS_STORAGE_PERIOD = 0x127500;
static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS = 120; // should be 3600
time_t mLastMsgCleanup ;
/// Define how the backend should handle authentication based on signatures
static uint32_t AuthenPolicy();
@ -266,5 +273,31 @@ private:
uint32_t decrypted_data_size );
void notifyClientService(const OutgoingRecord& pr);
/*!
* Checks the integrity message and groups
*/
class GxsTransIntegrityCleanupThread : public RsSingleJobThread
{
enum CheckState { CheckStart, CheckChecking };
public:
GxsTransIntegrityCleanupThread(RsGeneralDataService *const dataService): mDs(dataService),mIntegrityMutex("GxsTransIntegrityCheck") {}
bool isDone();
void run();
void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::vector<RsGxsMessageId> >& msgIds);
private:
RsGeneralDataService* const mDs;
bool mDone;
RsMutex mIntegrityMutex;
std::list<RsGxsGroupId> mDeletedGrps;
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > mDeletedMsgs;
};
GxsTransIntegrityCleanupThread *mCleanupThread ;
};