From 26bb865d2969367f42ed99168ab805b84b40f3e3 Mon Sep 17 00:00:00 2001 From: csoler Date: Thu, 15 Jun 2017 19:32:31 +0200 Subject: [PATCH] added code to reject messages based on existing per-user statistics --- libretroshare/src/gxs/rsgenexchange.cc | 4 +- libretroshare/src/gxs/rsgenexchange.h | 2 +- libretroshare/src/gxstrans/p3gxstrans.cc | 68 ++++++++++++++++++------ libretroshare/src/gxstrans/p3gxstrans.h | 6 ++- 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 22b6dd069..55eea22a6 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -2900,7 +2900,7 @@ void RsGenExchange::processRecvdMessages() std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; #endif uint8_t validateReturn = VALIDATE_FAIL; - bool accept_new_msg = acceptNewMessage(meta); + bool accept_new_msg = acceptNewMessage(meta,msg->msg.bin_len); if(!accept_new_msg && mNetService != NULL) mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync. @@ -3047,7 +3047,7 @@ void RsGenExchange::processRecvdMessages() } bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; } -bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/ ) { return true; } +bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t size ) { return true; } void RsGenExchange::processRecvdGroups() { diff --git a/libretroshare/src/gxs/rsgenexchange.h b/libretroshare/src/gxs/rsgenexchange.h index 2bc80fd65..ee7449f2f 100644 --- a/libretroshare/src/gxs/rsgenexchange.h +++ b/libretroshare/src/gxs/rsgenexchange.h @@ -269,7 +269,7 @@ public: * \param grpMeta Group metadata to check * \return */ - virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; + virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ; bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe); diff --git a/libretroshare/src/gxstrans/p3gxstrans.cc b/libretroshare/src/gxstrans/p3gxstrans.cc index bd335c9a6..a1d354fd2 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.cc +++ b/libretroshare/src/gxstrans/p3gxstrans.cc @@ -421,7 +421,6 @@ void p3GxsTrans::service_tick() { GxsMsgReq msgToDel ; - mCleanupThread->getPerUserStatistics(per_user_statistics) ; mCleanupThread->getMessagesToDelete(msgToDel) ; if(!msgToDel.empty()) @@ -429,6 +428,9 @@ void p3GxsTrans::service_tick() std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl; getDataStore()->removeMsgs(msgToDel); } + + RS_STACK_MUTEX(mPerUserStatsMutex); + mCleanupThread->getPerUserStatistics(per_user_statistics) ; } { @@ -1063,9 +1065,8 @@ bool p3GxsTrans::loadList(std::list&loadList) // We should also include the message size! -bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta) +bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_size) { -#ifdef TODO // 1 - check the total size of the msgs for the author of this particular msg. // 2 - Reject depending on embedded limits. @@ -1077,26 +1078,63 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta) // Negative | 0 | 0 // This is already handled by the anti-spam // R-Negative | 10 | 10k // Neutral | 100 | 500k - // R-Positive | 500 | 2M + // R-Positive | 400 | 1M + // Positive | 1000 | 2M - static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; - static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; - static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; + // Ideally these values should be left as user-defined parameters, with the + // default values below used as backup. + + static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; + static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; + static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ; + static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ; + + static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 512 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ; uint32_t max_count = 0 ; uint32_t max_size = 0 ; + RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId); - switch(rsReputations->overallReputationLevel(msgMeta.mAuthorId)) + switch(rep_lev) { - case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = 10 ; - max_size = 10*1024 ; - default: - case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = - break ; + case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT; + break ; + case RsReputations::REPUTATION_NEUTRAL: max_count = GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT; + break ; + case RsReputations::REPUTATION_REMOTELY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT; + break ; + case RsReputations::REPUTATION_LOCALLY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT; + break ; + default: + case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = 0 ; + max_size = 0 ; + break ; } -#endif - return true ; + RS_STACK_MUTEX(mPerUserStatsMutex); + + MsgSizeCount& s(per_user_statistics[msgMeta->mAuthorId]) ; + + std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", current (size,cnt)=(" + << s.size << "," << s.count << ") reputation=" << rep_lev << ", limits=(" << max_size << "," << max_count << ") " ; + + if(s.size + msg_size > max_size || 1+s.count > max_count) + { + std::cerr << "=> rejected." << std::endl; + return false ; + } + else + { + std::cerr << "=> accepted." << std::endl; + return true ; + } } diff --git a/libretroshare/src/gxstrans/p3gxstrans.h b/libretroshare/src/gxstrans/p3gxstrans.h index 8d5792343..28060ad0f 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.h +++ b/libretroshare/src/gxstrans/p3gxstrans.h @@ -98,7 +98,8 @@ public: mIdService(identities), mServClientsMutex("p3GxsTrans client services map mutex"), mOutgoingMutex("p3GxsTrans outgoing queue map mutex"), - mIngoingMutex("p3GxsTrans ingoing queue map mutex") + mIngoingMutex("p3GxsTrans ingoing queue map mutex"), + mPerUserStatsMutex("p3GxsTrans user stats mutex") { mLastMsgCleanup = time(NULL) - 60; // to be changed into 0 mCleanupThread = NULL ; @@ -310,12 +311,13 @@ private: // Overloaded from RsGenExchange. - bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; + bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ; GxsTransIntegrityCleanupThread *mCleanupThread ; // statistics of the load across all groups, per user. + RsMutex mPerUserStatsMutex; std::map per_user_statistics ; };