added code to reject messages based on existing per-user statistics

This commit is contained in:
csoler 2017-06-15 19:32:31 +02:00
parent 09ff94c9ec
commit 26bb865d29
4 changed files with 60 additions and 20 deletions

View File

@ -2900,7 +2900,7 @@ void RsGenExchange::processRecvdMessages()
std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ;
#endif #endif
uint8_t validateReturn = VALIDATE_FAIL; uint8_t validateReturn = VALIDATE_FAIL;
bool accept_new_msg = acceptNewMessage(meta); bool accept_new_msg = acceptNewMessage(meta,msg->msg.bin_len);
if(!accept_new_msg && mNetService != NULL) if(!accept_new_msg && mNetService != NULL)
mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync. mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync.
@ -3047,7 +3047,7 @@ void RsGenExchange::processRecvdMessages()
} }
bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; } bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; }
bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/ ) { return true; } bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t size ) { return true; }
void RsGenExchange::processRecvdGroups() void RsGenExchange::processRecvdGroups()
{ {

View File

@ -269,7 +269,7 @@ public:
* \param grpMeta Group metadata to check * \param grpMeta Group metadata to check
* \return * \return
*/ */
virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ;
bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe); bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe);

View File

@ -421,7 +421,6 @@ void p3GxsTrans::service_tick()
{ {
GxsMsgReq msgToDel ; GxsMsgReq msgToDel ;
mCleanupThread->getPerUserStatistics(per_user_statistics) ;
mCleanupThread->getMessagesToDelete(msgToDel) ; mCleanupThread->getMessagesToDelete(msgToDel) ;
if(!msgToDel.empty()) if(!msgToDel.empty())
@ -429,6 +428,9 @@ void p3GxsTrans::service_tick()
std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl; std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl;
getDataStore()->removeMsgs(msgToDel); getDataStore()->removeMsgs(msgToDel);
} }
RS_STACK_MUTEX(mPerUserStatsMutex);
mCleanupThread->getPerUserStatistics(per_user_statistics) ;
} }
{ {
@ -1063,9 +1065,8 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
// We should also include the message size! // We should also include the message size!
bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta) bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_size)
{ {
#ifdef TODO
// 1 - check the total size of the msgs for the author of this particular msg. // 1 - check the total size of the msgs for the author of this particular msg.
// 2 - Reject depending on embedded limits. // 2 - Reject depending on embedded limits.
@ -1077,26 +1078,63 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta)
// Negative | 0 | 0 // This is already handled by the anti-spam // Negative | 0 | 0 // This is already handled by the anti-spam
// R-Negative | 10 | 10k // R-Negative | 10 | 10k
// Neutral | 100 | 500k // Neutral | 100 | 500k
// R-Positive | 500 | 2M // R-Positive | 400 | 1M
// Positive | 1000 | 2M
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; // Ideally these values should be left as user-defined parameters, with the
static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; // default values below used as backup.
static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ;
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ;
static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ;
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ;
static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ;
static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 512 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ;
uint32_t max_count = 0 ; uint32_t max_count = 0 ;
uint32_t max_size = 0 ; uint32_t max_size = 0 ;
RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId);
switch(rsReputations->overallReputationLevel(msgMeta.mAuthorId)) switch(rep_lev)
{ {
case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = 10 ; case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT;
max_size = 10*1024 ; max_size = GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT;
default: break ;
case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = case RsReputations::REPUTATION_NEUTRAL: max_count = GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT;
break ; max_size = GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT;
break ;
case RsReputations::REPUTATION_REMOTELY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT;
break ;
case RsReputations::REPUTATION_LOCALLY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT;
break ;
default:
case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = 0 ;
max_size = 0 ;
break ;
} }
#endif RS_STACK_MUTEX(mPerUserStatsMutex);
return true ;
MsgSizeCount& s(per_user_statistics[msgMeta->mAuthorId]) ;
std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", current (size,cnt)=("
<< s.size << "," << s.count << ") reputation=" << rep_lev << ", limits=(" << max_size << "," << max_count << ") " ;
if(s.size + msg_size > max_size || 1+s.count > max_count)
{
std::cerr << "=> rejected." << std::endl;
return false ;
}
else
{
std::cerr << "=> accepted." << std::endl;
return true ;
}
} }

View File

@ -98,7 +98,8 @@ public:
mIdService(identities), mIdService(identities),
mServClientsMutex("p3GxsTrans client services map mutex"), mServClientsMutex("p3GxsTrans client services map mutex"),
mOutgoingMutex("p3GxsTrans outgoing queue map mutex"), mOutgoingMutex("p3GxsTrans outgoing queue map mutex"),
mIngoingMutex("p3GxsTrans ingoing queue map mutex") mIngoingMutex("p3GxsTrans ingoing queue map mutex"),
mPerUserStatsMutex("p3GxsTrans user stats mutex")
{ {
mLastMsgCleanup = time(NULL) - 60; // to be changed into 0 mLastMsgCleanup = time(NULL) - 60; // to be changed into 0
mCleanupThread = NULL ; mCleanupThread = NULL ;
@ -310,12 +311,13 @@ private:
// Overloaded from RsGenExchange. // Overloaded from RsGenExchange.
bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ;
GxsTransIntegrityCleanupThread *mCleanupThread ; GxsTransIntegrityCleanupThread *mCleanupThread ;
// statistics of the load across all groups, per user. // statistics of the load across all groups, per user.
RsMutex mPerUserStatsMutex;
std::map<RsGxsId,MsgSizeCount> per_user_statistics ; std::map<RsGxsId,MsgSizeCount> per_user_statistics ;
}; };