Merge pull request #887 from csoler/v0.6-GxsTransport

V0.6 gxs transport
This commit is contained in:
csoler 2017-06-17 23:07:10 +02:00 committed by GitHub
commit 0133be757a
4 changed files with 250 additions and 6 deletions

View file

@ -2912,8 +2912,12 @@ void RsGenExchange::processRecvdMessages()
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
#endif #endif
uint8_t validateReturn = VALIDATE_FAIL; uint8_t validateReturn = VALIDATE_FAIL;
bool accept_new_msg = acceptNewMessage(meta,msg->msg.bin_len);
if(ok) if(!accept_new_msg && mNetService != NULL)
mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync.
if(ok && accept_new_msg)
{ {
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId); std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
@ -3054,8 +3058,8 @@ void RsGenExchange::processRecvdMessages()
mNetService->rejectMessage(*it) ; mNetService->rejectMessage(*it) ;
} }
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; }
{ return true; } bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t size ) { return true; }
void RsGenExchange::processRecvdGroups() void RsGenExchange::processRecvdGroups()
{ {

View file

@ -260,6 +260,17 @@ public:
*/ */
virtual bool acceptNewGroup(const RsGxsGrpMetaData *grpMeta) ; virtual bool acceptNewGroup(const RsGxsGrpMetaData *grpMeta) ;
/*!
* \brief acceptNewMessage
* Early checks if the message can be accepted. This is mainly used to check wether the group is for instance overloaded and the service wants
* to put limitations to it.
* Returns true unless derived in GXS services.
*
* \param grpMeta Group metadata to check
* \return
*/
virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ;
bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe); bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe);
/*! /*!

View file

@ -20,10 +20,13 @@
#include "gxstrans/p3gxstrans.h" #include "gxstrans/p3gxstrans.h"
#include "util/stacktrace.h" #include "util/stacktrace.h"
#define DEBUG_GXSTRANS 1
typedef unsigned int uint; typedef unsigned int uint;
RsGxsTrans *rsGxsTrans = NULL ; RsGxsTrans *rsGxsTrans = NULL ;
const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 900; // every 15 mins. Could be less.
p3GxsTrans::~p3GxsTrans() p3GxsTrans::~p3GxsTrans()
{ {
p3Config::saveConfiguration(); p3Config::saveConfiguration();
@ -69,7 +72,9 @@ bool p3GxsTrans::sendData( RsGxsTransId& mailId,
const uint8_t* data, uint32_t size, const uint8_t* data, uint32_t size,
RsGxsTransEncryptionMode cm ) RsGxsTransEncryptionMode cm )
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::sendEmail(...)" << std::endl; std::cout << "p3GxsTrans::sendEmail(...)" << std::endl;
#endif
if(!mIdService.isOwnId(own_gxsid)) if(!mIdService.isOwnId(own_gxsid))
{ {
@ -125,7 +130,9 @@ void p3GxsTrans::registerGxsTransClient(
void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::handleResponse(" << token << ", " << req_type << ")" << std::endl; std::cout << "p3GxsTrans::handleResponse(" << token << ", " << req_type << ")" << std::endl;
#endif
bool changed = false ; bool changed = false ;
switch (req_type) switch (req_type)
@ -188,8 +195,10 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
* avoid to create yet another never used mail distribution group. * avoid to create yet another never used mail distribution group.
*/ */
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans::handleResponse(...) preferredGroupId.isNu" std::cerr << "p3GxsTrans::handleResponse(...) preferredGroupId.isNu"
<< "ll() let's create a new group." << std::endl; << "ll() let's create a new group." << std::endl;
#endif
uint32_t token; uint32_t token;
publishGroup(token, new RsGxsTransGroupItem()); publishGroup(token, new RsGxsTransGroupItem());
queueRequest(token, GROUP_CREATE); queueRequest(token, GROUP_CREATE);
@ -199,7 +208,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
} }
case GROUP_CREATE: case GROUP_CREATE:
{ {
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans::handleResponse(...) GROUP_CREATE" << std::endl; std::cerr << "p3GxsTrans::handleResponse(...) GROUP_CREATE" << std::endl;
#endif
RsGxsGroupId grpId; RsGxsGroupId grpId;
acknowledgeTokenGrp(token, grpId); acknowledgeTokenGrp(token, grpId);
supersedePreferredGroup(grpId); supersedePreferredGroup(grpId);
@ -207,7 +218,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
} }
case MAILS_UPDATE: case MAILS_UPDATE:
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::handleResponse(...) MAILS_UPDATE" << std::endl; std::cout << "p3GxsTrans::handleResponse(...) MAILS_UPDATE" << std::endl;
#endif
typedef std::map<RsGxsGroupId, std::vector<RsGxsMsgItem*> > GxsMsgDataMap; typedef std::map<RsGxsGroupId, std::vector<RsGxsMsgItem*> > GxsMsgDataMap;
GxsMsgDataMap gpMsgMap; GxsMsgDataMap gpMsgMap;
getMsgData(token, gpMsgMap); getMsgData(token, gpMsgMap);
@ -261,6 +274,12 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
if(changed) if(changed)
IndicateConfigChanged(); IndicateConfigChanged();
} }
void p3GxsTrans::GxsTransIntegrityCleanupThread::getPerUserStatistics(std::map<RsGxsId,MsgSizeCount>& m)
{
RS_STACK_MUTEX(mMtx) ;
m = total_message_size_and_count ;
}
void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& m) void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& m)
{ {
@ -270,13 +289,33 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq&
mMsgToDel.clear(); mMsgToDel.clear();
} }
// This method does two things:
// 1 - cleaning up old messages and messages for which an ACK has been received.
// 2 - building per user statistics across groups. This is important because it allows to mitigate the excess of
// messages, which might be due to spam.
//
// Note: the anti-spam system is disabled the level of GXS, because we want to allow to send anonymous messages
// between identities that might not have a reputation yet. Still, messages from identities with a bad reputation
// are still deleted by GXS.
//
// The group limits are enforced according to the following rules:
// * a temporal sliding window is computed for each identity and the number of messages signed by this identity is counted
// *
//
//
// Deleted messages are notified to the RsGxsNetService part which keeps a list of delete messages so as not to request them again
// during the same session. This allows to safely delete messages while avoiding re-synchronisation from friend nodes.
void p3GxsTrans::GxsTransIntegrityCleanupThread::run() void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
{ {
// first take out all the groups // first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp; std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true); mDs->retrieveNxsGrps(grp, true, true);
#ifdef DEBUG_GXSTRANS
std::cerr << "GxsTransIntegrityCleanupThread::run()" << std::endl; std::cerr << "GxsTransIntegrityCleanupThread::run()" << std::endl;
#endif
// compute hash and compare to stored value, if it fails then simply add it // compute hash and compare to stored value, if it fails then simply add it
// to list // to list
@ -294,6 +333,8 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
// now messages // now messages
std::map<RsGxsId,MsgSizeCount> totalMessageSizeAndCount;
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> > stored_msgs ; std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> > stored_msgs ;
std::list<RsGxsTransId> received_msgs ; std::list<RsGxsTransId> received_msgs ;
@ -319,27 +360,41 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
std::cerr << " Unrecocognised item type!" << std::endl; std::cerr << " Unrecocognised item type!" << std::endl;
else if(NULL != (mitem = dynamic_cast<RsGxsTransMailItem*>(item))) else if(NULL != (mitem = dynamic_cast<RsGxsTransMailItem*>(item)))
{ {
#ifdef DEBUG_GXSTRANS
std::cerr << " " << msg->metaData->mMsgId << ": Mail data with ID " << std::hex << std::setfill('0') << std::setw(16) << mitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl; std::cerr << " " << msg->metaData->mMsgId << ": Mail data with ID " << std::hex << std::setfill('0') << std::setw(16) << mitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl;
#endif
stored_msgs[mitem->mailId] = std::make_pair(msg->metaData->mGroupId,msg->metaData->mMsgId) ; stored_msgs[mitem->mailId] = std::make_pair(msg->metaData->mGroupId,msg->metaData->mMsgId) ;
} }
else if(NULL != (pitem = dynamic_cast<RsGxsTransPresignedReceipt*>(item))) else if(NULL != (pitem = dynamic_cast<RsGxsTransPresignedReceipt*>(item)))
{ {
#ifdef DEBUG_GXSTRANS
std::cerr << " " << msg->metaData->mMsgId << ": Signed rcpt of ID " << std::hex << pitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl; std::cerr << " " << msg->metaData->mMsgId << ": Signed rcpt of ID " << std::hex << pitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl;
#endif
received_msgs.push_back(pitem->mailId) ; received_msgs.push_back(pitem->mailId) ;
} }
else else
std::cerr << " Unknown item type!" << std::endl; std::cerr << " Unknown item type!" << std::endl;
totalMessageSizeAndCount[msg->metaData->mAuthorId].size += msg->msg.bin_len ;
totalMessageSizeAndCount[msg->metaData->mAuthorId].count++;
delete msg; delete msg;
} }
} }
// From the collected information, build a list of group messages to delete.
GxsMsgReq msgsToDel ; GxsMsgReq msgsToDel ;
#ifdef DEBUG_GXSTRANS
std::cerr << "Msg removal report:" << std::endl; std::cerr << "Msg removal report:" << std::endl;
std::cerr << " Per user size and count: " << std::endl;
for(std::map<RsGxsId,MsgSizeCount>::const_iterator it(totalMessageSizeAndCount.begin());it!=totalMessageSizeAndCount.end();++it)
std::cerr << " " << it->first << ": " << it->second.count << " messages, for a total size of " << it->second.size << " bytes." << std::endl;
#endif
for(std::list<RsGxsTransId>::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it) for(std::list<RsGxsTransId>::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it)
{ {
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> >::const_iterator it2 = stored_msgs.find(*it) ; std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> >::const_iterator it2 = stored_msgs.find(*it) ;
@ -348,12 +403,15 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
{ {
msgsToDel[it2->second.first].push_back(it2->second.second); msgsToDel[it2->second.first].push_back(it2->second.second);
#ifdef DEBUG_GXSTRANS
std::cerr << " scheduling msg " << std::hex << it2->second.first << "," << it2->second.second << " for deletion." << std::endl; std::cerr << " scheduling msg " << std::hex << it2->second.first << "," << it2->second.second << " for deletion." << std::endl;
#endif
} }
} }
RS_STACK_MUTEX(mMtx) ; RS_STACK_MUTEX(mMtx) ;
mMsgToDel = msgsToDel ; mMsgToDel = msgsToDel ;
total_message_size_and_count = totalMessageSizeAndCount;
mDone = true; mDone = true;
} }
@ -371,6 +429,7 @@ void p3GxsTrans::service_tick()
if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now) if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now)
{ {
RS_STACK_MUTEX(mPerUserStatsMutex);
if(!mCleanupThread) if(!mCleanupThread)
mCleanupThread = new GxsTransIntegrityCleanupThread(getDataStore()); mCleanupThread = new GxsTransIntegrityCleanupThread(getDataStore());
@ -378,7 +437,9 @@ void p3GxsTrans::service_tick()
std::cerr << "Cleanup thread is already running. Not running it again!" << std::endl; std::cerr << "Cleanup thread is already running. Not running it again!" << std::endl;
else else
{ {
#ifdef DEBUG_GXSTRANS
std::cerr << "Starting GxsIntegrity cleanup thread." << std::endl; std::cerr << "Starting GxsIntegrity cleanup thread." << std::endl;
#endif
mCleanupThread->start() ; mCleanupThread->start() ;
mLastMsgCleanup = now ; mLastMsgCleanup = now ;
@ -389,16 +450,30 @@ void p3GxsTrans::service_tick()
if(mCleanupThread != NULL && mCleanupThread->isDone()) if(mCleanupThread != NULL && mCleanupThread->isDone())
{ {
RS_STACK_MUTEX(mPerUserStatsMutex);
GxsMsgReq msgToDel ; GxsMsgReq msgToDel ;
mCleanupThread->getMessagesToDelete(msgToDel) ; mCleanupThread->getMessagesToDelete(msgToDel) ;
if(!msgToDel.empty()) if(!msgToDel.empty())
{ {
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl; std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl;
#endif
uint32_t token ; uint32_t token ;
deleteMsgs(token,msgToDel); deleteMsgs(token,msgToDel);
} }
mCleanupThread->getPerUserStatistics(per_user_statistics) ;
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans: Got new set of per user statistics:"<< std::endl;
for(std::map<RsGxsId,MsgSizeCount>::const_iterator it(per_user_statistics.begin());it!=per_user_statistics.end();++it)
std::cerr << " " << it->first << ": " << it->second.count << " " << it->second.size << std::endl;
#endif
delete mCleanupThread;
mCleanupThread=NULL ;
} }
{ {
@ -440,6 +515,7 @@ void p3GxsTrans::service_tick()
} }
else else
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::service_tick() " std::cout << "p3GxsTrans::service_tick() "
<< "GXS_MAIL_SUBTYPE_MAIL handling: " << "GXS_MAIL_SUBTYPE_MAIL handling: "
<< msg->meta.mMsgId << msg->meta.mMsgId
@ -449,6 +525,7 @@ void p3GxsTrans::service_tick()
<< " mailId: "<< msg->mailId << " mailId: "<< msg->mailId
<< " payload.size(): " << msg->payload.size() << " payload.size(): " << msg->payload.size()
<< std::endl; << std::endl;
#endif
handleEncryptedMail(msg); handleEncryptedMail(msg);
} }
break; break;
@ -499,14 +576,18 @@ void p3GxsTrans::service_tick()
RsGenExchange::ServiceCreate_Return p3GxsTrans::service_CreateGroup( RsGenExchange::ServiceCreate_Return p3GxsTrans::service_CreateGroup(
RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/ ) RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/ )
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::service_CreateGroup(...) " std::cout << "p3GxsTrans::service_CreateGroup(...) "
<< grpItem->meta.mGroupId << std::endl; << grpItem->meta.mGroupId << std::endl;
#endif
return SERVICE_CREATE_SUCCESS; return SERVICE_CREATE_SUCCESS;
} }
void p3GxsTrans::notifyChanges(std::vector<RsGxsNotify*>& changes) void p3GxsTrans::notifyChanges(std::vector<RsGxsNotify*>& changes)
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::notifyChanges(...)" << std::endl; std::cout << "p3GxsTrans::notifyChanges(...)" << std::endl;
#endif
for( std::vector<RsGxsNotify*>::const_iterator it = changes.begin(); for( std::vector<RsGxsNotify*>::const_iterator it = changes.begin();
it != changes.end(); ++it ) it != changes.end(); ++it )
{ {
@ -515,12 +596,16 @@ void p3GxsTrans::notifyChanges(std::vector<RsGxsNotify*>& changes)
if (grpChange) if (grpChange)
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::notifyChanges(...) grpChange" << std::endl; std::cout << "p3GxsTrans::notifyChanges(...) grpChange" << std::endl;
#endif
requestGroupsData(&(grpChange->mGrpIdList)); requestGroupsData(&(grpChange->mGrpIdList));
} }
else if(msgChange) else if(msgChange)
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::notifyChanges(...) msgChange" << std::endl; std::cout << "p3GxsTrans::notifyChanges(...) msgChange" << std::endl;
#endif
uint32_t token; uint32_t token;
RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA; RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
RsGenExchange::getTokenService()->requestMsgInfo( token, 0xcaca, RsGenExchange::getTokenService()->requestMsgInfo( token, 0xcaca,
@ -536,9 +621,11 @@ void p3GxsTrans::notifyChanges(std::vector<RsGxsNotify*>& changes)
for(itT vit = msgsIds.begin(); vit != msgsIds.end(); ++vit) for(itT vit = msgsIds.begin(); vit != msgsIds.end(); ++vit)
{ {
const RsGxsMessageId& msgId = *vit; const RsGxsMessageId& msgId = *vit;
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::notifyChanges(...) got " std::cout << "p3GxsTrans::notifyChanges(...) got "
<< "notification for message " << msgId << "notification for message " << msgId
<< " in group " << grpId << std::endl; << " in group " << grpId << std::endl;
#endif
} }
} }
} }
@ -588,7 +675,9 @@ bool p3GxsTrans::requestGroupsData(const std::list<RsGxsGroupId>* groupIds)
bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail) bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail)
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::handleEcryptedMail(...)" << std::endl; std::cout << "p3GxsTrans::handleEcryptedMail(...)" << std::endl;
#endif
std::set<RsGxsId> decryptIds; std::set<RsGxsId> decryptIds;
std::list<RsGxsId> ownIds; std::list<RsGxsId> ownIds;
@ -611,8 +700,11 @@ bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail)
uint16_t csri = 0; uint16_t csri = 0;
uint32_t off = 0; uint32_t off = 0;
getRawUInt16(&mail->payload[0], mail->payload.size(), &off, &csri); getRawUInt16(&mail->payload[0], mail->payload.size(), &off, &csri);
#ifdef DEBUG_GXSTRANS
std::cerr << "service: " << csri << " got CLEAR_TEXT mail!" std::cerr << "service: " << csri << " got CLEAR_TEXT mail!"
<< std::endl; << std::endl;
#endif
/* As we cannot verify recipient without encryption, just pass the hint /* As we cannot verify recipient without encryption, just pass the hint
* as recipient */ * as recipient */
return dispatchDecryptedMail( mail->meta.mAuthorId, mail->recipientHint, return dispatchDecryptedMail( mail->meta.mAuthorId, mail->recipientHint,
@ -651,8 +743,10 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId,
const uint8_t* decrypted_data, const uint8_t* decrypted_data,
uint32_t decrypted_data_size ) uint32_t decrypted_data_size )
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::dispatchDecryptedMail(, , " << decrypted_data_size std::cout << "p3GxsTrans::dispatchDecryptedMail(, , " << decrypted_data_size
<< ")" << std::endl; << ")" << std::endl;
#endif
uint16_t csri = 0; uint16_t csri = 0;
uint32_t offset = 0; uint32_t offset = 0;
@ -678,8 +772,10 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId,
<< " wrong is happening!" << std::endl; << " wrong is happening!" << std::endl;
return false; return false;
} }
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::dispatchDecryptedMail(...) dispatching receipt " std::cout << "p3GxsTrans::dispatchDecryptedMail(...) dispatching receipt "
<< "with: msgId: " << receipt->msgId << std::endl; << "with: msgId: " << receipt->msgId << std::endl;
#endif
std::vector<RsNxsMsg*> rcct; rcct.push_back(receipt); std::vector<RsNxsMsg*> rcct; rcct.push_back(receipt);
RsGenExchange::notifyNewMessages(rcct); RsGenExchange::notifyNewMessages(rcct);
@ -824,6 +920,7 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr)
} }
case GxsTransSendStatus::PENDING_PUBLISH: case GxsTransSendStatus::PENDING_PUBLISH:
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::sendEmail(...) sending mail to: " std::cout << "p3GxsTrans::sendEmail(...) sending mail to: "
<< pr.recipient << pr.recipient
<< " with cryptoType: " << " with cryptoType: "
@ -832,6 +929,7 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr)
<< " receiptId: " << pr.mailItem.mailId << " receiptId: " << pr.mailItem.mailId
<< " payload size: " << pr.mailItem.payload.size() << " payload size: " << pr.mailItem.payload.size()
<< std::endl; << std::endl;
#endif
RsGxsTransMailItem *mail_item = new RsGxsTransMailItem(pr.mailItem); RsGxsTransMailItem *mail_item = new RsGxsTransMailItem(pr.mailItem);
@ -925,23 +1023,31 @@ RsSerialiser* p3GxsTrans::setupSerialiser()
bool p3GxsTrans::saveList(bool &cleanup, std::list<RsItem *>& saveList) bool p3GxsTrans::saveList(bool &cleanup, std::list<RsItem *>& saveList)
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
#endif
mOutgoingMutex.lock(); mOutgoingMutex.lock();
mIngoingMutex.lock(); mIngoingMutex.lock();
for ( auto& kv : mOutgoingQueue ) for ( auto& kv : mOutgoingQueue )
{ {
#ifdef DEBUG_GXSTRANS
std::cerr << "Saving outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::dec << "Group id: " << kv.second.group_id << ", TS=" << kv.second.sent_ts << std::endl; std::cerr << "Saving outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::dec << "Group id: " << kv.second.group_id << ", TS=" << kv.second.sent_ts << std::endl;
#endif
saveList.push_back(&kv.second); saveList.push_back(&kv.second);
} }
for ( auto& kv : mIncomingQueue ) for ( auto& kv : mIncomingQueue )
{ {
#ifdef DEBUG_GXSTRANS
std::cerr << "Saving incoming item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::endl; std::cerr << "Saving incoming item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::endl;
#endif
saveList.push_back(kv.second); saveList.push_back(kv.second);
} }
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
#endif
cleanup = false; cleanup = false;
return true; return true;
@ -955,9 +1061,11 @@ void p3GxsTrans::saveDone()
bool p3GxsTrans::loadList(std::list<RsItem *>&loadList) bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
{ {
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " " std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
<< mIncomingQueue.size() << " " << mOutgoingQueue.size() << mIncomingQueue.size() << " " << mOutgoingQueue.size()
<< std::endl; << std::endl;
#endif
for(auto& v : loadList) for(auto& v : loadList)
switch(static_cast<GxsTransItemsSubtypes>(v->PacketSubType())) switch(static_cast<GxsTransItemsSubtypes>(v->PacketSubType()))
@ -995,7 +1103,9 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
RS_STACK_MUTEX(mOutgoingMutex); RS_STACK_MUTEX(mOutgoingMutex);
mOutgoingQueue.insert(prMap::value_type(ot.mailItem.mailId, ot)); mOutgoingQueue.insert(prMap::value_type(ot.mailItem.mailId, ot));
#ifdef DEBUG_GXSTRANS
std::cerr << "Loaded outgoing item (converted), ID " << std::hex << std::setfill('0') << std::setw(16) << ot.mailItem.mailId<< std::dec << ", Group id: " << ot.group_id << ", TS=" << ot.sent_ts << std::endl; std::cerr << "Loaded outgoing item (converted), ID " << std::hex << std::setfill('0') << std::setw(16) << ot.mailItem.mailId<< std::dec << ", Group id: " << ot.group_id << ", TS=" << ot.sent_ts << std::endl;
#endif
} }
delete v; delete v;
break; break;
@ -1010,7 +1120,9 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
mOutgoingQueue.insert( mOutgoingQueue.insert(
prMap::value_type(ot->mailItem.mailId, *ot)); prMap::value_type(ot->mailItem.mailId, *ot));
#ifdef DEBUG_GXSTRANS
std::cerr << "Loading outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << ot->mailItem.mailId<< std::dec << "Group id: " << ot->group_id << ", TS=" << ot->sent_ts << std::endl; std::cerr << "Loading outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << ot->mailItem.mailId<< std::dec << "Group id: " << ot->group_id << ", TS=" << ot->sent_ts << std::endl;
#endif
} }
delete v; delete v;
break; break;
@ -1025,9 +1137,106 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
break; break;
} }
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " " std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
<< mIncomingQueue.size() << " " << mOutgoingQueue.size() << mIncomingQueue.size() << " " << mOutgoingQueue.size()
<< std::endl; << std::endl;
#endif
return true; return true;
} }
bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_size)
{
// 1 - check the total size of the msgs for the author of this particular msg.
// 2 - Reject depending on embedded limits.
// Depending on reputation, the messages will be rejected:
//
// Reputation | Maximum msg count | Maximum msg size
// ------------+----------------------+------------------
// Negative | 0 | 0 // This is already handled by the anti-spam
// R-Negative | 10 | 10k
// Neutral | 100 | 20k
// R-Positive | 400 | 1M
// Positive | 1000 | 2M
// Ideally these values should be left as user-defined parameters, with the
// default values below used as backup.
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ;
static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 40 ;
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ;
static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ;
static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 200 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ;
uint32_t max_count = 0 ;
uint32_t max_size = 0 ;
uint32_t identity_flags = 0 ;
RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId,&identity_flags);
switch(rep_lev)
{
case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT;
break ;
case RsReputations::REPUTATION_NEUTRAL: max_count = GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT;
break ;
case RsReputations::REPUTATION_REMOTELY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT;
break ;
case RsReputations::REPUTATION_LOCALLY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT;
break ;
default:
case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = 0 ;
max_size = 0 ;
break ;
}
bool pgp_linked = identity_flags & RS_IDENTITY_FLAGS_PGP_LINKED ;
if(rep_lev <= RsReputations::REPUTATION_NEUTRAL && !pgp_linked)
{
max_count /= 10 ;
max_size /= 10 ;
}
RS_STACK_MUTEX(mPerUserStatsMutex);
MsgSizeCount& s(per_user_statistics[msgMeta->mAuthorId]) ;
#ifdef DEBUG_GXSTRANS
std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", pgp_linked=" << pgp_linked << ", current (size,cnt)=("
<< s.size << "," << s.count << ") reputation=" << rep_lev << ", limits=(" << max_size << "," << max_count << ") " ;
#endif
if(s.size + msg_size > max_size || 1+s.count > max_count)
{
#ifdef DEBUG_GXSTRANS
std::cerr << "=> rejected." << std::endl;
#endif
return false ;
}
else
{
#ifdef DEBUG_GXSTRANS
std::cerr << "=> accepted." << std::endl;
#endif
s.count++ ;
s.size += msg_size ; // update the statistics, so that it's not possible to pass a bunch of msgs at once below the limits.
return true ;
}
}

View file

@ -57,6 +57,14 @@ struct GxsTransClient
GxsTransSendStatus status ) = 0; GxsTransSendStatus status ) = 0;
}; };
struct MsgSizeCount
{
MsgSizeCount() : size(0),count(0) {}
uint32_t size ;
uint32_t count ;
};
/** /**
* @brief p3GxsTrans is a mail delivery service based on GXS. * @brief p3GxsTrans is a mail delivery service based on GXS.
* p3GxsTrans is capable of asynchronous mail delivery and acknowledgement. * p3GxsTrans is capable of asynchronous mail delivery and acknowledgement.
@ -90,9 +98,10 @@ public:
mIdService(identities), mIdService(identities),
mServClientsMutex("p3GxsTrans client services map mutex"), mServClientsMutex("p3GxsTrans client services map mutex"),
mOutgoingMutex("p3GxsTrans outgoing queue map mutex"), mOutgoingMutex("p3GxsTrans outgoing queue map mutex"),
mIngoingMutex("p3GxsTrans ingoing queue map mutex") mIngoingMutex("p3GxsTrans ingoing queue map mutex"),
mPerUserStatsMutex("p3GxsTrans user stats mutex")
{ {
mLastMsgCleanup = time(NULL) - 60; // to be changed into 0 mLastMsgCleanup = time(NULL) - MAX_DELAY_BETWEEN_CLEANUPS + 30; // always check 30 secs after start
mCleanupThread = NULL ; mCleanupThread = NULL ;
} }
@ -159,7 +168,7 @@ private:
* Two weeks seems fair ATM. * Two weeks seems fair ATM.
*/ */
static const uint32_t GXS_STORAGE_PERIOD = 0x127500; static const uint32_t GXS_STORAGE_PERIOD = 0x127500;
static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS = 1203; // every 20 mins. Could be less. static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS ; // every 20 mins. Could be less.
time_t mLastMsgCleanup ; time_t mLastMsgCleanup ;
@ -290,15 +299,26 @@ private:
void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::vector<RsGxsMessageId> >& msgIds); void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::vector<RsGxsMessageId> >& msgIds);
void getMessagesToDelete(GxsMsgReq& req) ; void getMessagesToDelete(GxsMsgReq& req) ;
void getPerUserStatistics(std::map<RsGxsId,MsgSizeCount>& m) ;
private: private:
RsGeneralDataService* const mDs; RsGeneralDataService* const mDs;
RsMutex mMtx ; RsMutex mMtx ;
GxsMsgReq mMsgToDel ; GxsMsgReq mMsgToDel ;
std::map<RsGxsId,MsgSizeCount> total_message_size_and_count;
bool mDone ; bool mDone ;
}; };
// Overloaded from RsGenExchange.
bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ;
GxsTransIntegrityCleanupThread *mCleanupThread ; GxsTransIntegrityCleanupThread *mCleanupThread ;
// statistics of the load across all groups, per user.
RsMutex mPerUserStatsMutex;
std::map<RsGxsId,MsgSizeCount> per_user_statistics ;
}; };