From 09ff94c9ec39db7c8ebb444e2a1b46ab4eccf029 Mon Sep 17 00:00:00 2001 From: csoler Date: Wed, 14 Jun 2017 23:19:52 +0200 Subject: [PATCH 1/5] added new virtual method to refuse incoming msgs (similar to groups) in RsGenExchange, and used it in p3GxsTrans to refuse msgs from peers who send too much --- libretroshare/src/gxs/rsgenexchange.cc | 10 ++- libretroshare/src/gxs/rsgenexchange.h | 11 ++++ libretroshare/src/gxstrans/p3gxstrans.cc | 78 ++++++++++++++++++++++++ libretroshare/src/gxstrans/p3gxstrans.h | 20 +++++- 4 files changed, 115 insertions(+), 4 deletions(-) diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 5845a2887..22b6dd069 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -2900,8 +2900,12 @@ void RsGenExchange::processRecvdMessages() std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; #endif uint8_t validateReturn = VALIDATE_FAIL; + bool accept_new_msg = acceptNewMessage(meta); - if(ok) + if(!accept_new_msg && mNetService != NULL) + mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync. + + if(ok && accept_new_msg) { std::map::iterator mit = grpMetas.find(msg->grpId); @@ -3042,8 +3046,8 @@ void RsGenExchange::processRecvdMessages() mNetService->rejectMessage(*it) ; } -bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) -{ return true; } +bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; } +bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/ ) { return true; } void RsGenExchange::processRecvdGroups() { diff --git a/libretroshare/src/gxs/rsgenexchange.h b/libretroshare/src/gxs/rsgenexchange.h index 932177427..2bc80fd65 100644 --- a/libretroshare/src/gxs/rsgenexchange.h +++ b/libretroshare/src/gxs/rsgenexchange.h @@ -260,6 +260,17 @@ public: */ virtual bool acceptNewGroup(const RsGxsGrpMetaData *grpMeta) ; + /*! + * \brief acceptNewMessage + * Early checks if the message can be accepted. This is mainly used to check wether the group is for instance overloaded and the service wants + * to put limitations to it. + * Returns true unless derived in GXS services. + * + * \param grpMeta Group metadata to check + * \return + */ + virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; + bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe); /*! diff --git a/libretroshare/src/gxstrans/p3gxstrans.cc b/libretroshare/src/gxstrans/p3gxstrans.cc index 41a100e4c..bd335c9a6 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.cc +++ b/libretroshare/src/gxstrans/p3gxstrans.cc @@ -24,6 +24,8 @@ typedef unsigned int uint; RsGxsTrans *rsGxsTrans = NULL ; +const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 60; // every 20 mins. Could be less. + p3GxsTrans::~p3GxsTrans() { p3Config::saveConfiguration(); @@ -261,6 +263,12 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) if(changed) IndicateConfigChanged(); } +void p3GxsTrans::GxsTransIntegrityCleanupThread::getPerUserStatistics(std::map& m) +{ + RS_STACK_MUTEX(mMtx) ; + + m = total_message_size_and_count ; +} void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& m) { @@ -270,9 +278,27 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& mMsgToDel.clear(); } +// This method does two things: +// 1 - cleaning up old messages and messages for which an ACK has been received. +// 2 - building per user statistics across groups. This is important because it allows to mitigate the excess of +// messages, which might be due to spam. +// +// Note: the anti-spam system is disabled the level of GXS, because we want to allow to send anonymous messages +// between identities that might not have a reputation yet. Still, messages from identities with a bad reputation +// are still deleted by GXS. +// +// The group limits are enforced according to the following rules: +// * a temporal sliding window is computed for each identity and the number of messages signed by this identity is counted +// * +// +// +// Deleted messages are notified to the RsGxsNetService part which keeps a list of delete messages so as not to request them again +// during the same session. This allows to safely delete messages while avoiding re-synchronisation from friend nodes. + void p3GxsTrans::GxsTransIntegrityCleanupThread::run() { // first take out all the groups + std::map grp; mDs->retrieveNxsGrps(grp, true, true); @@ -294,6 +320,8 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() // now messages + std::map totalMessageSizeAndCount; + std::map > stored_msgs ; std::list received_msgs ; @@ -332,14 +360,22 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() else std::cerr << " Unknown item type!" << std::endl; + totalMessageSizeAndCount[msg->metaData->mAuthorId].size += msg->msg.bin_len ; + totalMessageSizeAndCount[msg->metaData->mAuthorId].count++; delete msg; } } + // From the collected information, build a list of group messages to delete. + GxsMsgReq msgsToDel ; std::cerr << "Msg removal report:" << std::endl; + std::cerr << " Per user size and count: " << std::endl; + for(std::map::const_iterator it(totalMessageSizeAndCount.begin());it!=totalMessageSizeAndCount.end();++it) + std::cerr << " " << it->first << ": " << it->second.count << " messages, for a total size of " << it->second.size << " bytes." << std::endl; + for(std::list::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it) { std::map >::const_iterator it2 = stored_msgs.find(*it) ; @@ -385,6 +421,7 @@ void p3GxsTrans::service_tick() { GxsMsgReq msgToDel ; + mCleanupThread->getPerUserStatistics(per_user_statistics) ; mCleanupThread->getMessagesToDelete(msgToDel) ; if(!msgToDel.empty()) @@ -1023,3 +1060,44 @@ bool p3GxsTrans::loadList(std::list&loadList) return true; } + +// We should also include the message size! + +bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta) +{ +#ifdef TODO + // 1 - check the total size of the msgs for the author of this particular msg. + + // 2 - Reject depending on embedded limits. + + // Depending on reputation, the messages will be rejected: + // + // Reputation | Maximum msg count | Maximum msg size + // ------------+----------------------+------------------ + // Negative | 0 | 0 // This is already handled by the anti-spam + // R-Negative | 10 | 10k + // Neutral | 100 | 500k + // R-Positive | 500 | 2M + + static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; + static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; + static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; + + uint32_t max_count = 0 ; + uint32_t max_size = 0 ; + + switch(rsReputations->overallReputationLevel(msgMeta.mAuthorId)) + { + case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = 10 ; + max_size = 10*1024 ; + default: + case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = + break ; + } + +#endif + return true ; +} + + + diff --git a/libretroshare/src/gxstrans/p3gxstrans.h b/libretroshare/src/gxstrans/p3gxstrans.h index fb205409d..8d5792343 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.h +++ b/libretroshare/src/gxstrans/p3gxstrans.h @@ -57,6 +57,14 @@ struct GxsTransClient GxsTransSendStatus status ) = 0; }; +struct MsgSizeCount +{ + MsgSizeCount() : size(0),count(0) {} + + uint32_t size ; + uint32_t count ; +}; + /** * @brief p3GxsTrans is a mail delivery service based on GXS. * p3GxsTrans is capable of asynchronous mail delivery and acknowledgement. @@ -159,7 +167,7 @@ private: * Two weeks seems fair ATM. */ static const uint32_t GXS_STORAGE_PERIOD = 0x127500; - static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS = 1203; // every 20 mins. Could be less. + static const uint32_t MAX_DELAY_BETWEEN_CLEANUPS ; // every 20 mins. Could be less. time_t mLastMsgCleanup ; @@ -290,14 +298,24 @@ private: void getDeletedIds(std::list& grpIds, std::map >& msgIds); void getMessagesToDelete(GxsMsgReq& req) ; + void getPerUserStatistics(std::map& m) ; private: RsGeneralDataService* const mDs; RsMutex mMtx ; GxsMsgReq mMsgToDel ; + std::map total_message_size_and_count; }; + // Overloaded from RsGenExchange. + + bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; + GxsTransIntegrityCleanupThread *mCleanupThread ; + + // statistics of the load across all groups, per user. + + std::map per_user_statistics ; }; From 26bb865d2969367f42ed99168ab805b84b40f3e3 Mon Sep 17 00:00:00 2001 From: csoler Date: Thu, 15 Jun 2017 19:32:31 +0200 Subject: [PATCH 2/5] added code to reject messages based on existing per-user statistics --- libretroshare/src/gxs/rsgenexchange.cc | 4 +- libretroshare/src/gxs/rsgenexchange.h | 2 +- libretroshare/src/gxstrans/p3gxstrans.cc | 68 ++++++++++++++++++------ libretroshare/src/gxstrans/p3gxstrans.h | 6 ++- 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 22b6dd069..55eea22a6 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -2900,7 +2900,7 @@ void RsGenExchange::processRecvdMessages() std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; #endif uint8_t validateReturn = VALIDATE_FAIL; - bool accept_new_msg = acceptNewMessage(meta); + bool accept_new_msg = acceptNewMessage(meta,msg->msg.bin_len); if(!accept_new_msg && mNetService != NULL) mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync. @@ -3047,7 +3047,7 @@ void RsGenExchange::processRecvdMessages() } bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; } -bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/ ) { return true; } +bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t size ) { return true; } void RsGenExchange::processRecvdGroups() { diff --git a/libretroshare/src/gxs/rsgenexchange.h b/libretroshare/src/gxs/rsgenexchange.h index 2bc80fd65..ee7449f2f 100644 --- a/libretroshare/src/gxs/rsgenexchange.h +++ b/libretroshare/src/gxs/rsgenexchange.h @@ -269,7 +269,7 @@ public: * \param grpMeta Group metadata to check * \return */ - virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; + virtual bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ; bool subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe); diff --git a/libretroshare/src/gxstrans/p3gxstrans.cc b/libretroshare/src/gxstrans/p3gxstrans.cc index bd335c9a6..a1d354fd2 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.cc +++ b/libretroshare/src/gxstrans/p3gxstrans.cc @@ -421,7 +421,6 @@ void p3GxsTrans::service_tick() { GxsMsgReq msgToDel ; - mCleanupThread->getPerUserStatistics(per_user_statistics) ; mCleanupThread->getMessagesToDelete(msgToDel) ; if(!msgToDel.empty()) @@ -429,6 +428,9 @@ void p3GxsTrans::service_tick() std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl; getDataStore()->removeMsgs(msgToDel); } + + RS_STACK_MUTEX(mPerUserStatsMutex); + mCleanupThread->getPerUserStatistics(per_user_statistics) ; } { @@ -1063,9 +1065,8 @@ bool p3GxsTrans::loadList(std::list&loadList) // We should also include the message size! -bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta) +bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_size) { -#ifdef TODO // 1 - check the total size of the msgs for the author of this particular msg. // 2 - Reject depending on embedded limits. @@ -1077,26 +1078,63 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta) // Negative | 0 | 0 // This is already handled by the anti-spam // R-Negative | 10 | 10k // Neutral | 100 | 500k - // R-Positive | 500 | 2M + // R-Positive | 400 | 1M + // Positive | 1000 | 2M - static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; - static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; - static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; + // Ideally these values should be left as user-defined parameters, with the + // default values below used as backup. + + static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; + static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; + static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ; + static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ; + + static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 512 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ; uint32_t max_count = 0 ; uint32_t max_size = 0 ; + RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId); - switch(rsReputations->overallReputationLevel(msgMeta.mAuthorId)) + switch(rep_lev) { - case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = 10 ; - max_size = 10*1024 ; - default: - case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = - break ; + case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT; + break ; + case RsReputations::REPUTATION_NEUTRAL: max_count = GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT; + break ; + case RsReputations::REPUTATION_REMOTELY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT; + break ; + case RsReputations::REPUTATION_LOCALLY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT; + max_size = GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT; + break ; + default: + case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = 0 ; + max_size = 0 ; + break ; } -#endif - return true ; + RS_STACK_MUTEX(mPerUserStatsMutex); + + MsgSizeCount& s(per_user_statistics[msgMeta->mAuthorId]) ; + + std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", current (size,cnt)=(" + << s.size << "," << s.count << ") reputation=" << rep_lev << ", limits=(" << max_size << "," << max_count << ") " ; + + if(s.size + msg_size > max_size || 1+s.count > max_count) + { + std::cerr << "=> rejected." << std::endl; + return false ; + } + else + { + std::cerr << "=> accepted." << std::endl; + return true ; + } } diff --git a/libretroshare/src/gxstrans/p3gxstrans.h b/libretroshare/src/gxstrans/p3gxstrans.h index 8d5792343..28060ad0f 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.h +++ b/libretroshare/src/gxstrans/p3gxstrans.h @@ -98,7 +98,8 @@ public: mIdService(identities), mServClientsMutex("p3GxsTrans client services map mutex"), mOutgoingMutex("p3GxsTrans outgoing queue map mutex"), - mIngoingMutex("p3GxsTrans ingoing queue map mutex") + mIngoingMutex("p3GxsTrans ingoing queue map mutex"), + mPerUserStatsMutex("p3GxsTrans user stats mutex") { mLastMsgCleanup = time(NULL) - 60; // to be changed into 0 mCleanupThread = NULL ; @@ -310,12 +311,13 @@ private: // Overloaded from RsGenExchange. - bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta) ; + bool acceptNewMessage(const RsGxsMsgMetaData *msgMeta, uint32_t size) ; GxsTransIntegrityCleanupThread *mCleanupThread ; // statistics of the load across all groups, per user. + RsMutex mPerUserStatsMutex; std::map per_user_statistics ; }; From 6633b04a44c17d878a2038f1653f837b06c1d15c Mon Sep 17 00:00:00 2001 From: csoler Date: Thu, 15 Jun 2017 23:57:02 +0200 Subject: [PATCH 3/5] changed constants for GxsTransport anti-spam --- libretroshare/src/gxstrans/p3gxstrans.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/libretroshare/src/gxstrans/p3gxstrans.cc b/libretroshare/src/gxstrans/p3gxstrans.cc index a1d354fd2..cdd011450 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.cc +++ b/libretroshare/src/gxstrans/p3gxstrans.cc @@ -1063,8 +1063,6 @@ bool p3GxsTrans::loadList(std::list&loadList) return true; } -// We should also include the message size! - bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_size) { // 1 - check the total size of the msgs for the author of this particular msg. @@ -1077,7 +1075,7 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s // ------------+----------------------+------------------ // Negative | 0 | 0 // This is already handled by the anti-spam // R-Negative | 10 | 10k - // Neutral | 100 | 500k + // Neutral | 20 | 20k // R-Positive | 400 | 1M // Positive | 1000 | 2M @@ -1085,12 +1083,12 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s // default values below used as backup. static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; - static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 100 ; + static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 20 ; static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ; static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ; static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ; - static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 512 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 20 * 1024 ; static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ; static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ; From eb4bb5be656a76ca167cbb0839fd137f836fa2a0 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 17 Jun 2017 21:42:00 +0200 Subject: [PATCH 4/5] fixed bug in gxs trans stats counting --- libretroshare/src/gxstrans/p3gxstrans.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/libretroshare/src/gxstrans/p3gxstrans.cc b/libretroshare/src/gxstrans/p3gxstrans.cc index 5804c91cc..ec1fbaaba 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.cc +++ b/libretroshare/src/gxstrans/p3gxstrans.cc @@ -390,6 +390,7 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() RS_STACK_MUTEX(mMtx) ; mMsgToDel = msgsToDel ; + total_message_size_and_count = totalMessageSizeAndCount; mDone = true; } @@ -407,6 +408,7 @@ void p3GxsTrans::service_tick() if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now) { + RS_STACK_MUTEX(mPerUserStatsMutex); if(!mCleanupThread) mCleanupThread = new GxsTransIntegrityCleanupThread(getDataStore()); @@ -425,6 +427,7 @@ void p3GxsTrans::service_tick() if(mCleanupThread != NULL && mCleanupThread->isDone()) { + RS_STACK_MUTEX(mPerUserStatsMutex); GxsMsgReq msgToDel ; mCleanupThread->getMessagesToDelete(msgToDel) ; @@ -436,8 +439,14 @@ void p3GxsTrans::service_tick() deleteMsgs(token,msgToDel); } - RS_STACK_MUTEX(mPerUserStatsMutex); mCleanupThread->getPerUserStatistics(per_user_statistics) ; + + std::cerr << "p3GxsTrans: Got new set of per user statistics:"<< std::endl; + for(std::map::const_iterator it(per_user_statistics.begin());it!=per_user_statistics.end();++it) + std::cerr << " " << it->first << ": " << it->second.count << " " << it->second.size << std::endl; + + delete mCleanupThread; + mCleanupThread=NULL ; } { From 12a45294a3a5b17e1fd01772ad2e8288f10f50c3 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 17 Jun 2017 23:02:28 +0200 Subject: [PATCH 5/5] added ifdef for comments on GxsTrans --- libretroshare/src/gxstrans/p3gxstrans.cc | 98 ++++++++++++++++++++++-- libretroshare/src/gxstrans/p3gxstrans.h | 2 +- 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/libretroshare/src/gxstrans/p3gxstrans.cc b/libretroshare/src/gxstrans/p3gxstrans.cc index ec1fbaaba..48d60a1b8 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.cc +++ b/libretroshare/src/gxstrans/p3gxstrans.cc @@ -20,11 +20,12 @@ #include "gxstrans/p3gxstrans.h" #include "util/stacktrace.h" +#define DEBUG_GXSTRANS 1 typedef unsigned int uint; RsGxsTrans *rsGxsTrans = NULL ; -const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 60; // every 20 mins. Could be less. +const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 900; // every 15 mins. Could be less. p3GxsTrans::~p3GxsTrans() { @@ -71,7 +72,9 @@ bool p3GxsTrans::sendData( RsGxsTransId& mailId, const uint8_t* data, uint32_t size, RsGxsTransEncryptionMode cm ) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::sendEmail(...)" << std::endl; +#endif if(!mIdService.isOwnId(own_gxsid)) { @@ -127,7 +130,9 @@ void p3GxsTrans::registerGxsTransClient( void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::handleResponse(" << token << ", " << req_type << ")" << std::endl; +#endif bool changed = false ; switch (req_type) @@ -190,8 +195,10 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) * avoid to create yet another never used mail distribution group. */ +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans::handleResponse(...) preferredGroupId.isNu" << "ll() let's create a new group." << std::endl; +#endif uint32_t token; publishGroup(token, new RsGxsTransGroupItem()); queueRequest(token, GROUP_CREATE); @@ -201,7 +208,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) } case GROUP_CREATE: { +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans::handleResponse(...) GROUP_CREATE" << std::endl; +#endif RsGxsGroupId grpId; acknowledgeTokenGrp(token, grpId); supersedePreferredGroup(grpId); @@ -209,7 +218,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) } case MAILS_UPDATE: { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::handleResponse(...) MAILS_UPDATE" << std::endl; +#endif typedef std::map > GxsMsgDataMap; GxsMsgDataMap gpMsgMap; getMsgData(token, gpMsgMap); @@ -302,7 +313,9 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() std::map grp; mDs->retrieveNxsGrps(grp, true, true); +#ifdef DEBUG_GXSTRANS std::cerr << "GxsTransIntegrityCleanupThread::run()" << std::endl; +#endif // compute hash and compare to stored value, if it fails then simply add it // to list @@ -347,13 +360,17 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() std::cerr << " Unrecocognised item type!" << std::endl; else if(NULL != (mitem = dynamic_cast(item))) { +#ifdef DEBUG_GXSTRANS std::cerr << " " << msg->metaData->mMsgId << ": Mail data with ID " << std::hex << std::setfill('0') << std::setw(16) << mitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl; +#endif stored_msgs[mitem->mailId] = std::make_pair(msg->metaData->mGroupId,msg->metaData->mMsgId) ; } else if(NULL != (pitem = dynamic_cast(item))) { +#ifdef DEBUG_GXSTRANS std::cerr << " " << msg->metaData->mMsgId << ": Signed rcpt of ID " << std::hex << pitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl; +#endif received_msgs.push_back(pitem->mailId) ; } @@ -370,11 +387,13 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() GxsMsgReq msgsToDel ; +#ifdef DEBUG_GXSTRANS std::cerr << "Msg removal report:" << std::endl; std::cerr << " Per user size and count: " << std::endl; for(std::map::const_iterator it(totalMessageSizeAndCount.begin());it!=totalMessageSizeAndCount.end();++it) std::cerr << " " << it->first << ": " << it->second.count << " messages, for a total size of " << it->second.size << " bytes." << std::endl; +#endif for(std::list::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it) { @@ -384,7 +403,9 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() { msgsToDel[it2->second.first].push_back(it2->second.second); +#ifdef DEBUG_GXSTRANS std::cerr << " scheduling msg " << std::hex << it2->second.first << "," << it2->second.second << " for deletion." << std::endl; +#endif } } @@ -416,7 +437,9 @@ void p3GxsTrans::service_tick() std::cerr << "Cleanup thread is already running. Not running it again!" << std::endl; else { +#ifdef DEBUG_GXSTRANS std::cerr << "Starting GxsIntegrity cleanup thread." << std::endl; +#endif mCleanupThread->start() ; mLastMsgCleanup = now ; @@ -434,16 +457,20 @@ void p3GxsTrans::service_tick() if(!msgToDel.empty()) { +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl; +#endif uint32_t token ; deleteMsgs(token,msgToDel); } mCleanupThread->getPerUserStatistics(per_user_statistics) ; +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans: Got new set of per user statistics:"<< std::endl; for(std::map::const_iterator it(per_user_statistics.begin());it!=per_user_statistics.end();++it) std::cerr << " " << it->first << ": " << it->second.count << " " << it->second.size << std::endl; +#endif delete mCleanupThread; mCleanupThread=NULL ; @@ -488,6 +515,7 @@ void p3GxsTrans::service_tick() } else { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::service_tick() " << "GXS_MAIL_SUBTYPE_MAIL handling: " << msg->meta.mMsgId @@ -497,6 +525,7 @@ void p3GxsTrans::service_tick() << " mailId: "<< msg->mailId << " payload.size(): " << msg->payload.size() << std::endl; +#endif handleEncryptedMail(msg); } break; @@ -547,14 +576,18 @@ void p3GxsTrans::service_tick() RsGenExchange::ServiceCreate_Return p3GxsTrans::service_CreateGroup( RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/ ) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::service_CreateGroup(...) " << grpItem->meta.mGroupId << std::endl; +#endif return SERVICE_CREATE_SUCCESS; } void p3GxsTrans::notifyChanges(std::vector& changes) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...)" << std::endl; +#endif for( std::vector::const_iterator it = changes.begin(); it != changes.end(); ++it ) { @@ -563,12 +596,16 @@ void p3GxsTrans::notifyChanges(std::vector& changes) if (grpChange) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...) grpChange" << std::endl; +#endif requestGroupsData(&(grpChange->mGrpIdList)); } else if(msgChange) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...) msgChange" << std::endl; +#endif uint32_t token; RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA; RsGenExchange::getTokenService()->requestMsgInfo( token, 0xcaca, @@ -584,9 +621,11 @@ void p3GxsTrans::notifyChanges(std::vector& changes) for(itT vit = msgsIds.begin(); vit != msgsIds.end(); ++vit) { const RsGxsMessageId& msgId = *vit; +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...) got " << "notification for message " << msgId << " in group " << grpId << std::endl; +#endif } } } @@ -636,7 +675,9 @@ bool p3GxsTrans::requestGroupsData(const std::list* groupIds) bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::handleEcryptedMail(...)" << std::endl; +#endif std::set decryptIds; std::list ownIds; @@ -659,8 +700,11 @@ bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail) uint16_t csri = 0; uint32_t off = 0; getRawUInt16(&mail->payload[0], mail->payload.size(), &off, &csri); + +#ifdef DEBUG_GXSTRANS std::cerr << "service: " << csri << " got CLEAR_TEXT mail!" << std::endl; +#endif /* As we cannot verify recipient without encryption, just pass the hint * as recipient */ return dispatchDecryptedMail( mail->meta.mAuthorId, mail->recipientHint, @@ -699,8 +743,10 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId, const uint8_t* decrypted_data, uint32_t decrypted_data_size ) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::dispatchDecryptedMail(, , " << decrypted_data_size << ")" << std::endl; +#endif uint16_t csri = 0; uint32_t offset = 0; @@ -726,8 +772,10 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId, << " wrong is happening!" << std::endl; return false; } +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::dispatchDecryptedMail(...) dispatching receipt " << "with: msgId: " << receipt->msgId << std::endl; +#endif std::vector rcct; rcct.push_back(receipt); RsGenExchange::notifyNewMessages(rcct); @@ -872,6 +920,7 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr) } case GxsTransSendStatus::PENDING_PUBLISH: { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::sendEmail(...) sending mail to: " << pr.recipient << " with cryptoType: " @@ -880,6 +929,7 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr) << " receiptId: " << pr.mailItem.mailId << " payload size: " << pr.mailItem.payload.size() << std::endl; +#endif RsGxsTransMailItem *mail_item = new RsGxsTransMailItem(pr.mailItem); @@ -973,23 +1023,31 @@ RsSerialiser* p3GxsTrans::setupSerialiser() bool p3GxsTrans::saveList(bool &cleanup, std::list& saveList) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif mOutgoingMutex.lock(); mIngoingMutex.lock(); for ( auto& kv : mOutgoingQueue ) { +#ifdef DEBUG_GXSTRANS std::cerr << "Saving outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::dec << "Group id: " << kv.second.group_id << ", TS=" << kv.second.sent_ts << std::endl; +#endif saveList.push_back(&kv.second); } for ( auto& kv : mIncomingQueue ) { +#ifdef DEBUG_GXSTRANS std::cerr << "Saving incoming item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::endl; +#endif saveList.push_back(kv.second); } +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif cleanup = false; return true; @@ -1003,9 +1061,11 @@ void p3GxsTrans::saveDone() bool p3GxsTrans::loadList(std::list&loadList) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif for(auto& v : loadList) switch(static_cast(v->PacketSubType())) @@ -1043,7 +1103,9 @@ bool p3GxsTrans::loadList(std::list&loadList) RS_STACK_MUTEX(mOutgoingMutex); mOutgoingQueue.insert(prMap::value_type(ot.mailItem.mailId, ot)); +#ifdef DEBUG_GXSTRANS std::cerr << "Loaded outgoing item (converted), ID " << std::hex << std::setfill('0') << std::setw(16) << ot.mailItem.mailId<< std::dec << ", Group id: " << ot.group_id << ", TS=" << ot.sent_ts << std::endl; +#endif } delete v; break; @@ -1058,7 +1120,9 @@ bool p3GxsTrans::loadList(std::list&loadList) mOutgoingQueue.insert( prMap::value_type(ot->mailItem.mailId, *ot)); +#ifdef DEBUG_GXSTRANS std::cerr << "Loading outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << ot->mailItem.mailId<< std::dec << "Group id: " << ot->group_id << ", TS=" << ot->sent_ts << std::endl; +#endif } delete v; break; @@ -1073,9 +1137,11 @@ bool p3GxsTrans::loadList(std::list&loadList) break; } +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif return true; } @@ -1092,7 +1158,7 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s // ------------+----------------------+------------------ // Negative | 0 | 0 // This is already handled by the anti-spam // R-Negative | 10 | 10k - // Neutral | 20 | 20k + // Neutral | 100 | 20k // R-Positive | 400 | 1M // Positive | 1000 | 2M @@ -1100,18 +1166,20 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s // default values below used as backup. static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; - static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 20 ; + static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 40 ; static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ; static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ; static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ; - static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 20 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 200 * 1024 ; static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ; static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ; uint32_t max_count = 0 ; uint32_t max_size = 0 ; - RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId); + uint32_t identity_flags = 0 ; + + RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId,&identity_flags); switch(rep_lev) { @@ -1133,21 +1201,39 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s break ; } + bool pgp_linked = identity_flags & RS_IDENTITY_FLAGS_PGP_LINKED ; + + if(rep_lev <= RsReputations::REPUTATION_NEUTRAL && !pgp_linked) + { + max_count /= 10 ; + max_size /= 10 ; + } + RS_STACK_MUTEX(mPerUserStatsMutex); MsgSizeCount& s(per_user_statistics[msgMeta->mAuthorId]) ; - std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", current (size,cnt)=(" +#ifdef DEBUG_GXSTRANS + std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", pgp_linked=" << pgp_linked << ", current (size,cnt)=(" << s.size << "," << s.count << ") reputation=" << rep_lev << ", limits=(" << max_size << "," << max_count << ") " ; +#endif if(s.size + msg_size > max_size || 1+s.count > max_count) { +#ifdef DEBUG_GXSTRANS std::cerr << "=> rejected." << std::endl; +#endif return false ; } else { +#ifdef DEBUG_GXSTRANS std::cerr << "=> accepted." << std::endl; +#endif + + s.count++ ; + s.size += msg_size ; // update the statistics, so that it's not possible to pass a bunch of msgs at once below the limits. + return true ; } } diff --git a/libretroshare/src/gxstrans/p3gxstrans.h b/libretroshare/src/gxstrans/p3gxstrans.h index 3dd0a7567..1b029444a 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.h +++ b/libretroshare/src/gxstrans/p3gxstrans.h @@ -101,7 +101,7 @@ public: mIngoingMutex("p3GxsTrans ingoing queue map mutex"), mPerUserStatsMutex("p3GxsTrans user stats mutex") { - mLastMsgCleanup = time(NULL) - 60; // to be changed into 0 + mLastMsgCleanup = time(NULL) - MAX_DELAY_BETWEEN_CLEANUPS + 30; // always check 30 secs after start mCleanupThread = NULL ; }