added new virtual method to refuse incoming msgs (similar to groups) in RsGenExchange, and used it in p3GxsTrans to refuse msgs from peers who send too much

This commit is contained in:
csoler 2017-06-14 23:19:52 +02:00
parent f62028b2c1
commit 09ff94c9ec
4 changed files with 115 additions and 4 deletions

View File

@ -2900,8 +2900,12 @@ void RsGenExchange::processRecvdMessages()
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
#endif
uint8_t validateReturn = VALIDATE_FAIL;
bool accept_new_msg = acceptNewMessage(meta);
if(ok)
if(!accept_new_msg && mNetService != NULL)
mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync.
if(ok && accept_new_msg)
{
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = grpMetas.find(msg->grpId);
@ -3042,8 +3046,8 @@ void RsGenExchange::processRecvdMessages()
mNetService->rejectMessage(*it) ;
}
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ )
{ return true; }
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; }
bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/ ) { return true; }
void RsGenExchange::processRecvdGroups()
{

View File

@ -260,6 +260,17 @@ public:
*/
virtual bool acceptNewGroup(const RsGxsGrpMetaData *grpMeta) ;
/*!
* \brief acceptNewMessage
* Early checks if the message can be accepted. This is mainly used to check wether the group is for instance overloaded and the service wants
* to put limitations to it.
* Returns true unless derived in GXS services.
*
* \param grpMeta Group metadata to check
* \return
*/
virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ;
bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe);
/*!

View File

@ -24,6 +24,8 @@ typedef unsigned int uint;
RsGxsTrans *rsGxsTrans = NULL ;
const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 60; // every 20 mins. Could be less.
p3GxsTrans::~p3GxsTrans()
{
p3Config::saveConfiguration();
@ -261,6 +263,12 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
if(changed)
IndicateConfigChanged();
}
void p3GxsTrans::GxsTransIntegrityCleanupThread::getPerUserStatistics(std::map<RsGxsId,MsgSizeCount>& m)
{
RS_STACK_MUTEX(mMtx) ;
m = total_message_size_and_count ;
}
void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& m)
{
@ -270,9 +278,27 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq&
mMsgToDel.clear();
}
// This method does two things:
// 1 - cleaning up old messages and messages for which an ACK has been received.
// 2 - building per user statistics across groups. This is important because it allows to mitigate the excess of
// messages, which might be due to spam.
//
// Note: the anti-spam system is disabled the level of GXS, because we want to allow to send anonymous messages
// between identities that might not have a reputation yet. Still, messages from identities with a bad reputation
// are still deleted by GXS.
//
// The group limits are enforced according to the following rules:
// * a temporal sliding window is computed for each identity and the number of messages signed by this identity is counted
// *
//
//
// Deleted messages are notified to the RsGxsNetService part which keeps a list of delete messages so as not to request them again
// during the same session. This allows to safely delete messages while avoiding re-synchronisation from friend nodes.
void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
{
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true);
@ -294,6 +320,8 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
// now messages
std::map<RsGxsId,MsgSizeCount> totalMessageSizeAndCount;
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> > stored_msgs ;
std::list<RsGxsTransId> received_msgs ;
@ -332,14 +360,22 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
else
std::cerr << " Unknown item type!" << std::endl;
totalMessageSizeAndCount[msg->metaData->mAuthorId].size += msg->msg.bin_len ;
totalMessageSizeAndCount[msg->metaData->mAuthorId].count++;
delete msg;
}
}
// From the collected information, build a list of group messages to delete.
GxsMsgReq msgsToDel ;
std::cerr << "Msg removal report:" << std::endl;
std::cerr << " Per user size and count: " << std::endl;
for(std::map<RsGxsId,MsgSizeCount>::const_iterator it(totalMessageSizeAndCount.begin());it!=totalMessageSizeAndCount.end();++it)
std::cerr << " " << it->first << ": " << it->second.count << " messages, for a total size of " << it->second.size << " bytes." << std::endl;
for(std::list<RsGxsTransId>::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it)
{
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> >::const_iterator it2 = stored_msgs.find(*it) ;
@ -385,6 +421,7 @@ void p3GxsTrans::service_tick()
{
GxsMsgReq msgToDel ;
mCleanupThread->getPerUserStatistics(per_user_statistics) ;
mCleanupThread->getMessagesToDelete(msgToDel) ;
if(!msgToDel.empty())
@ -1023,3 +1060,44 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
return true;
}
// We should also include the message size!
bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta)
{
#ifdef TODO
// 1 - check the total size of the msgs for the author of this particular msg.
// 2 - Reject depending on embedded limits.
// Depending on reputation, the messages will be rejected:
//
// Reputation | Maximum msg count | Maximum msg size
// ------------+----------------------+------------------
// Negative | 0 | 0 // This is already handled by the anti-spam
// R-Negative | 10 | 10k
// Neutral | 100 | 500k
// R-Positive | 500 | 2M
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ;
static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ;
static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ;
uint32_t max_count = 0 ;
uint32_t max_size = 0 ;
switch(rsReputations->overallReputationLevel(msgMeta.mAuthorId))
{
case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = 10 ;
max_size = 10*1024 ;
default:
case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count =
break ;
}
#endif
return true ;
}

View File

@ -57,6 +57,14 @@ struct GxsTransClient
GxsTransSendStatus status ) = 0;
};
struct MsgSizeCount
{
MsgSizeCount() : size(0),count(0) {}
uint32_t size ;
uint32_t count ;
};
/**
* @brief p3GxsTrans is a mail delivery service based on GXS.
* p3GxsTrans is capable of asynchronous mail delivery and acknowledgement.
@ -159,7 +167,7 @@ private:
* Two weeks seems fair ATM.
*/
static const uint32_t GXS_STORAGE_PERIOD = 0x127500;
static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS = 1203; // every 20 mins. Could be less.
static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS ; // every 20 mins. Could be less.
time_t mLastMsgCleanup ;
@ -290,14 +298,24 @@ private:
void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::vector<RsGxsMessageId> >& msgIds);
void getMessagesToDelete(GxsMsgReq& req) ;
void getPerUserStatistics(std::map<RsGxsId,MsgSizeCount>& m) ;
private:
RsGeneralDataService* const mDs;
RsMutex mMtx ;
GxsMsgReq mMsgToDel ;
std::map<RsGxsId,MsgSizeCount> total_message_size_and_count;
};
// Overloaded from RsGenExchange.
bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ;
GxsTransIntegrityCleanupThread *mCleanupThread ;
// statistics of the load across all groups, per user.
std::map<RsGxsId,MsgSizeCount> per_user_statistics ;
};