Merge pull request #860 from csoler/v0.6-GxsTransport

V0.6 gxs transport
This commit is contained in:
csoler 2017-05-30 21:40:41 +02:00 committed by GitHub
commit eacf23dcf4
7 changed files with 236 additions and 27 deletions

View File

@ -255,10 +255,113 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
} }
} }
void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
{
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true);
std::cerr << "GxsTransIntegrityCleanupThread::run()" << std::endl;
// compute hash and compare to stored value, if it fails then simply add it
// to list
GxsMsgReq grps;
for(std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin(); git != grp.end(); ++git)
{
RsNxsGrp* grp = git->second;
// store the group for retrieveNxsMsgs
grps[grp->grpId];
delete grp;
}
// now messages
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> > stored_msgs ;
std::list<RsGxsTransId> received_msgs ;
GxsMsgResult msgs;
mDs->retrieveNxsMsgs(grps, msgs, false, true);
for(GxsMsgResult::iterator mit = msgs.begin();mit != msgs.end(); ++mit)
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
for(; vit != msgV.end(); ++vit)
{
RsNxsMsg* msg = *vit;
RsGxsTransSerializer s ;
uint32_t size = msg->msg.bin_len;
RsItem *item = s.deserialise(msg->msg.bin_data,&size);
RsGxsTransMailItem *mitem ;
RsGxsTransPresignedReceipt *pitem ;
if(item == NULL)
std::cerr << " Unrecocognised item type!" << std::endl;
else if(NULL != (mitem = dynamic_cast<RsGxsTransMailItem*>(item)))
{
std::cerr << " " << msg->metaData->mMsgId << ": Mail data with ID " << std::hex << mitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl;
stored_msgs[mitem->mailId] = std::make_pair(msg->metaData->mGroupId,msg->metaData->mMsgId) ;
}
else if(NULL != (pitem = dynamic_cast<RsGxsTransPresignedReceipt*>(item)))
{
std::cerr << " " << msg->metaData->mMsgId << ": Signed rcpt of ID " << std::hex << pitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl;
received_msgs.push_back(pitem->mailId) ;
}
else
std::cerr << " Unknown item type!" << std::endl;
delete msg;
}
}
GxsMsgReq msgsToDel ;
std::cerr << "Msg removal report:" << std::endl;
for(std::list<RsGxsTransId>::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it)
{
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> >::const_iterator it2 = stored_msgs.find(*it) ;
if(stored_msgs.end() != it2)
{
msgsToDel[it2->second.first].push_back(it2->second.second);
std::cerr << " scheduling msg " << std::hex << it2->second.first << "," << it2->second.second << " for deletion." << std::endl;
}
}
mDs->removeMsgs(msgsToDel);
}
void p3GxsTrans::service_tick() void p3GxsTrans::service_tick()
{ {
GxsTokenQueue::checkRequests(); GxsTokenQueue::checkRequests();
time_t now = time(NULL);
if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now)
{
if(!mCleanupThread)
mCleanupThread = new GxsTransIntegrityCleanupThread(getDataStore());
if(mCleanupThread->isRunning())
std::cerr << "Cleanup thread is already running. Not running it again!" << std::endl;
else
{
std::cerr << "Starting GxsIntegrity cleanup thread." << std::endl;
mCleanupThread->start() ;
mLastMsgCleanup = now ;
}
}
{ {
RS_STACK_MUTEX(mOutgoingMutex); RS_STACK_MUTEX(mOutgoingMutex);
for ( auto it = mOutgoingQueue.begin(); it != mOutgoingQueue.end(); ) for ( auto it = mOutgoingQueue.begin(); it != mOutgoingQueue.end(); )

View File

@ -90,7 +90,11 @@ public:
mIdService(identities), mIdService(identities),
mServClientsMutex("p3GxsTrans client services map mutex"), mServClientsMutex("p3GxsTrans client services map mutex"),
mOutgoingMutex("p3GxsTrans outgoing queue map mutex"), mOutgoingMutex("p3GxsTrans outgoing queue map mutex"),
mIngoingMutex("p3GxsTrans ingoing queue map mutex") {} mIngoingMutex("p3GxsTrans ingoing queue map mutex")
{
mLastMsgCleanup = time(NULL) - 60; // to be changed into 0
mCleanupThread = NULL ;
}
virtual ~p3GxsTrans(); virtual ~p3GxsTrans();
@ -154,7 +158,10 @@ private:
* signed acknowledged is received for each of them. * signed acknowledged is received for each of them.
* Two weeks seems fair ATM. * Two weeks seems fair ATM.
*/ */
const static uint32_t GXS_STORAGE_PERIOD = 0x127500; static const uint32_t GXS_STORAGE_PERIOD = 0x127500;
static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS = 1203; // every 20 mins. Could be less.
time_t mLastMsgCleanup ;
/// Define how the backend should handle authentication based on signatures /// Define how the backend should handle authentication based on signatures
static uint32_t AuthenPolicy(); static uint32_t AuthenPolicy();
@ -266,5 +273,27 @@ private:
uint32_t decrypted_data_size ); uint32_t decrypted_data_size );
void notifyClientService(const OutgoingRecord& pr); void notifyClientService(const OutgoingRecord& pr);
/*!
* Checks the integrity message and groups
*/
class GxsTransIntegrityCleanupThread : public RsSingleJobThread
{
enum CheckState { CheckStart, CheckChecking };
public:
GxsTransIntegrityCleanupThread(RsGeneralDataService *const dataService): mDs(dataService) {}
bool isDone();
void run();
void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::vector<RsGxsMessageId> >& msgIds);
private:
RsGeneralDataService* const mDs;
};
GxsTransIntegrityCleanupThread *mCleanupThread ;
}; };

View File

@ -61,6 +61,16 @@ int RS_pthread_setname_np(pthread_t __target_thread, const char *__buf) {
#include <iostream> #include <iostream>
#endif #endif
void RsThread::go()
{
mShouldStopSemaphore.set(0) ;
mHasStoppedSemaphore.set(0) ;
runloop();
mHasStoppedSemaphore.set(1);
mShouldStopSemaphore.set(0);
}
void *RsThread::rsthread_init(void* p) void *RsThread::rsthread_init(void* p)
{ {
RsThread *thread = (RsThread *) p; RsThread *thread = (RsThread *) p;
@ -76,7 +86,7 @@ void *RsThread::rsthread_init(void* p)
std::cerr << "[Thread ID:" << std::hex << pthread_self() << std::dec << "] thread is started. Calling runloop()..." << std::endl; std::cerr << "[Thread ID:" << std::hex << pthread_self() << std::dec << "] thread is started. Calling runloop()..." << std::endl;
#endif #endif
thread -> runloop(); thread->go();
return NULL; return NULL;
} }
RsThread::RsThread() RsThread::RsThread()
@ -216,14 +226,11 @@ RsTickingThread::RsTickingThread()
void RsSingleJobThread::runloop() void RsSingleJobThread::runloop()
{ {
mHasStoppedSemaphore.set(0) ;
run() ; run() ;
} }
void RsTickingThread::runloop() void RsTickingThread::runloop()
{ {
mHasStoppedSemaphore.set(0) ; // first time we are 100% the thread is actually running.
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << "RsTickingThread::runloop(). Setting stopped=0" << std::endl; THREAD_DEBUG << "RsTickingThread::runloop(). Setting stopped=0" << std::endl;
#endif #endif
@ -235,7 +242,6 @@ void RsTickingThread::runloop()
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::runloop(): asked to stop. setting hasStopped=1, and returning. Thread ends." << std::endl; THREAD_DEBUG << "pqithreadstreamer::runloop(): asked to stop. setting hasStopped=1, and returning. Thread ends." << std::endl;
#endif #endif
mHasStoppedSemaphore.set(1);
return ; return ;
} }

View File

@ -263,6 +263,7 @@ class RsThread
protected: protected:
virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */ virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */
void go() ; // this one calls runloop and also sets the flags correctly when the thread is finished running.
RsSemaphore mHasStoppedSemaphore; RsSemaphore mHasStoppedSemaphore;
RsSemaphore mShouldStopSemaphore; RsSemaphore mShouldStopSemaphore;

View File

@ -24,6 +24,7 @@
#include <QObject> #include <QObject>
#include <QFontMetrics> #include <QFontMetrics>
#include <QWheelEvent> #include <QWheelEvent>
#include <QDateTime>
#include <time.h> #include <time.h>
#include <QMenu> #include <QMenu>
@ -44,15 +45,15 @@
#include "util/QtVersion.h" #include "util/QtVersion.h"
#include "gui/common/UIStateHelper.h" #include "gui/common/UIStateHelper.h"
#include "util/misc.h" #include "util/misc.h"
#include "gui/gxs/GxsIdLabel.h"
#define COL_PENDING_ID 0 #define COL_PENDING_ID 0
#define COL_PENDING_DESTINATION 1 #define COL_PENDING_DESTINATION 1
#define COL_PENDING_NICKNAME 2 #define COL_PENDING_DATASTATUS 2
#define COL_PENDING_DATASTATUS 3 #define COL_PENDING_DATASIZE 3
#define COL_PENDING_DATASIZE 4 #define COL_PENDING_DATAHASH 4
#define COL_PENDING_DATAHASH 5 #define COL_PENDING_SEND 5
#define COL_PENDING_SEND 6 #define COL_PENDING_GROUP_ID 6
#define COL_PENDING_GROUP_ID 7
#define COL_GROUP_GRP_ID 0 #define COL_GROUP_GRP_ID 0
#define COL_GROUP_NUM_MSGS 1 #define COL_GROUP_NUM_MSGS 1
@ -67,6 +68,9 @@ static const int GXSTRANS_STATISTICS_DELAY_BETWEEN_GROUP_REQ = 30 ; // never req
#define GXSTRANS_GROUP_META 0x01 #define GXSTRANS_GROUP_META 0x01
#define GXSTRANS_GROUP_DATA 0x02 #define GXSTRANS_GROUP_DATA 0x02
#define GXSTRANS_GROUP_STAT 0x03 #define GXSTRANS_GROUP_STAT 0x03
#define GXSTRANS_MSG_META 0x04
#define DEBUG_GXSTRANS_STATS 1
GxsTransportStatistics::GxsTransportStatistics(QWidget *parent) GxsTransportStatistics::GxsTransportStatistics(QWidget *parent)
: RsAutoUpdatePage(2000,parent) : RsAutoUpdatePage(2000,parent)
@ -195,6 +199,9 @@ void GxsTransportStatistics::updateContent()
rsGxsTrans->getStatistics(transinfo) ; rsGxsTrans->getStatistics(transinfo) ;
// clear
treeWidget->clear(); treeWidget->clear();
time_t now = time(NULL) ; time_t now = time(NULL) ;
@ -217,17 +224,28 @@ void GxsTransportStatistics::updateContent()
nickname = tr("Unknown"); nickname = tr("Unknown");
item -> setData(COL_PENDING_ID, Qt::DisplayRole, QString::number(rec.trans_id,16).rightJustified(8,'0')); item -> setData(COL_PENDING_ID, Qt::DisplayRole, QString::number(rec.trans_id,16).rightJustified(8,'0'));
item -> setData(COL_PENDING_NICKNAME, Qt::DisplayRole, nickname ) ;
item -> setData(COL_PENDING_DESTINATION, Qt::DisplayRole, QString::fromStdString(rec.recipient.toStdString()));
item -> setData(COL_PENDING_DATASTATUS, Qt::DisplayRole, getStatusString(rec.status)); item -> setData(COL_PENDING_DATASTATUS, Qt::DisplayRole, getStatusString(rec.status));
item -> setData(COL_PENDING_DATASIZE, Qt::DisplayRole, misc::friendlyUnit(rec.data_size)); item -> setData(COL_PENDING_DATASIZE, Qt::DisplayRole, misc::friendlyUnit(rec.data_size));
item -> setData(COL_PENDING_DATAHASH, Qt::DisplayRole, QString::fromStdString(rec.data_hash.toStdString())); item -> setData(COL_PENDING_DATAHASH, Qt::DisplayRole, QString::fromStdString(rec.data_hash.toStdString()));
item -> setData(COL_PENDING_SEND, Qt::DisplayRole, QString::number(now - rec.send_TS)); item -> setData(COL_PENDING_SEND, Qt::DisplayRole, QDateTime::fromTime_t(rec.send_TS).toString());
item -> setData(COL_PENDING_GROUP_ID, Qt::DisplayRole, QString::fromStdString(rec.group_id.toStdString())); item -> setData(COL_PENDING_GROUP_ID, Qt::DisplayRole, QString::fromStdString(rec.group_id.toStdString()));
GxsIdLabel *label = new GxsIdLabel() ;
label->setId(rec.recipient) ;
treeWidget -> setItemWidget(item,COL_PENDING_DESTINATION, label) ;
} }
// 2 - fill the table for pending group data // 2 - fill the table for pending group data
// record openned groups
std::set<RsGxsGroupId> openned_groups ;
for(uint32_t i=0;i<groupTreeWidget->topLevelItemCount();++i)
if(groupTreeWidget->isItemExpanded(groupTreeWidget->topLevelItem(i)))
openned_groups.insert(RsGxsGroupId(groupTreeWidget->topLevelItem(i)->data(COL_GROUP_GRP_ID,Qt::DisplayRole).toString().toStdString())) ;
groupTreeWidget->clear(); groupTreeWidget->clear();
for(std::map<RsGxsGroupId,RsGxsTransGroupStatistics>::const_iterator it(mGroupStats.begin());it!=mGroupStats.end();++it) for(std::map<RsGxsGroupId,RsGxsTransGroupStatistics>::const_iterator it(mGroupStats.begin());it!=mGroupStats.end();++it)
@ -236,12 +254,26 @@ void GxsTransportStatistics::updateContent()
QTreeWidgetItem *item = new QTreeWidgetItem(); QTreeWidgetItem *item = new QTreeWidgetItem();
groupTreeWidget->addTopLevelItem(item); groupTreeWidget->addTopLevelItem(item);
groupTreeWidget->setItemExpanded(item,openned_groups.find(it->first) != openned_groups.end());
item->setData(COL_GROUP_GRP_ID, Qt::DisplayRole, QString::fromStdString(stat.mGrpId.toStdString())) ; item->setData(COL_GROUP_GRP_ID, Qt::DisplayRole, QString::fromStdString(stat.mGrpId.toStdString())) ;
item->setData(COL_GROUP_NUM_MSGS, Qt::DisplayRole, QString::number(stat.mNumMsgs)) ; item->setData(COL_GROUP_NUM_MSGS, Qt::DisplayRole, QString::number(stat.mNumMsgs)) ;
item->setData(COL_GROUP_SIZE_MSGS,Qt::DisplayRole, QString::number(stat.mTotalSizeOfMsgs)) ; item->setData(COL_GROUP_SIZE_MSGS, Qt::DisplayRole, QString::number(stat.mTotalSizeOfMsgs)) ;
item->setData(COL_GROUP_SUBSCRIBED,Qt::DisplayRole, stat.subscribed?tr("Yes"):tr("No")) ; item->setData(COL_GROUP_SUBSCRIBED,Qt::DisplayRole, stat.subscribed?tr("Yes"):tr("No")) ;
item->setData(COL_GROUP_POPULARITY,Qt::DisplayRole, QString::number(stat.popularity)) ; item->setData(COL_GROUP_POPULARITY,Qt::DisplayRole, QString::number(stat.popularity)) ;
for(uint32_t i=0;i<it->second.messages_metas.size();++i)
{
QTreeWidgetItem *sitem = new QTreeWidgetItem(item) ;
const RsMsgMetaData& meta(it->second.messages_metas[i]) ;
GxsIdLabel *label = new GxsIdLabel();
label->setId(meta.mAuthorId) ;
groupTreeWidget->setItemWidget(sitem,COL_GROUP_GRP_ID,label) ;
sitem->setData(COL_GROUP_NUM_MSGS,Qt::DisplayRole, QDateTime::fromTime_t(meta.mPublishTs).toString());
}
} }
} }
@ -279,6 +311,9 @@ void GxsTransportStatistics::loadRequest(const TokenQueue *queue, const TokenReq
case GXSTRANS_GROUP_STAT: loadGroupStat(req.mToken); case GXSTRANS_GROUP_STAT: loadGroupStat(req.mToken);
break; break;
case GXSTRANS_MSG_META: loadMsgMeta(req.mToken);
break;
default: default:
std::cerr << "GxsTransportStatistics::loadRequest() ERROR: INVALID TYPE"; std::cerr << "GxsTransportStatistics::loadRequest() ERROR: INVALID TYPE";
std::cerr << std::endl; std::cerr << std::endl;
@ -310,6 +345,27 @@ void GxsTransportStatistics::requestGroupStat(const RsGxsGroupId &groupId)
rsGxsTrans->getTokenService()->requestGroupStatistic(token, groupId); rsGxsTrans->getTokenService()->requestGroupStatistic(token, groupId);
mTransQueue->queueRequest(token, 0, RS_TOKREQ_ANSTYPE_ACK, GXSTRANS_GROUP_STAT); mTransQueue->queueRequest(token, 0, RS_TOKREQ_ANSTYPE_ACK, GXSTRANS_GROUP_STAT);
} }
void GxsTransportStatistics::requestMsgMeta(const RsGxsGroupId& grpId)
{
mStateHelper->setLoading(GXSTRANS_MSG_META, true);
#ifdef DEBUG_GXSTRANS_STATS
std::cerr << "GxsTransportStatisticsWidget::requestGroupMeta()";
std::cerr << std::endl;
#endif
mTransQueue->cancelActiveRequestTokens(GXSTRANS_MSG_META);
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_MSG_META;
std::list<RsGxsGroupId> grouplist ;
grouplist.push_back(grpId) ;
uint32_t token;
rsGxsTrans->getTokenService()->requestMsgInfo(token, RS_TOKREQ_ANSTYPE_SUMMARY, opts, grouplist);
mTransQueue->queueRequest(token, 0, RS_TOKREQ_ANSTYPE_ACK, GXSTRANS_MSG_META);
}
void GxsTransportStatistics::loadGroupStat(const uint32_t &token) void GxsTransportStatistics::loadGroupStat(const uint32_t &token)
{ {
@ -356,6 +412,7 @@ void GxsTransportStatistics::loadGroupMeta(const uint32_t& token)
#endif #endif
requestGroupStat(vit->mGroupId) ; requestGroupStat(vit->mGroupId) ;
requestMsgMeta(vit->mGroupId) ;
RsGxsTransGroupStatistics& s(mGroupStats[vit->mGroupId]); RsGxsTransGroupStatistics& s(mGroupStats[vit->mGroupId]);
s.popularity = vit->mPop ; s.popularity = vit->mPop ;
@ -370,3 +427,17 @@ void GxsTransportStatistics::loadGroupMeta(const uint32_t& token)
else else
++it; ++it;
} }
void GxsTransportStatistics::loadMsgMeta(const uint32_t& token)
{
mStateHelper->setLoading(GXSTRANS_MSG_META, false);
GxsMsgMetaMap m ;
if (!rsGxsTrans->getMsgSummary(token,m))
return ;
for(GxsMsgMetaMap::const_iterator it(m.begin());it!=m.end();++it)
mGroupStats[it->first].messages_metas = it->second ;
}

View File

@ -41,6 +41,8 @@ public:
bool subscribed ; bool subscribed ;
int popularity ; int popularity ;
std::vector<RsMsgMetaData> messages_metas ;
}; };
class GxsTransportStatistics: public RsAutoUpdatePage, public TokenResponse, public Ui::GxsTransportStatistics class GxsTransportStatistics: public RsAutoUpdatePage, public TokenResponse, public Ui::GxsTransportStatistics
@ -66,8 +68,10 @@ private slots:
private: private:
void loadGroupMeta(const uint32_t& token); void loadGroupMeta(const uint32_t& token);
void loadGroupStat(const uint32_t& token); void loadGroupStat(const uint32_t& token);
void loadMsgMeta(const uint32_t& token);
void requestGroupMeta(); void requestGroupMeta();
void requestMsgMeta(const RsGxsGroupId& groupId);
void requestGroupStat(const RsGxsGroupId &groupId); void requestGroupStat(const RsGxsGroupId &groupId);
void processSettings(bool bLoad); void processSettings(bool bLoad);

View File

@ -28,12 +28,12 @@
<widget class="QTreeWidget" name="groupTreeWidget"> <widget class="QTreeWidget" name="groupTreeWidget">
<column> <column>
<property name="text"> <property name="text">
<string>Group ID</string> <string>Group ID / Author</string>
</property> </property>
</column> </column>
<column> <column>
<property name="text"> <property name="text">
<string>Number of messages</string> <string>Number of messages / Publish TS</string>
</property> </property>
</column> </column>
<column> <column>
@ -81,12 +81,7 @@
</column> </column>
<column> <column>
<property name="text"> <property name="text">
<string>Destination ID</string> <string>Destination</string>
</property>
</column>
<column>
<property name="text">
<string>Destination Name</string>
</property> </property>
</column> </column>
<column> <column>