diff --git a/libretroshare/src/gxstrans/p3gxstrans.cc b/libretroshare/src/gxstrans/p3gxstrans.cc index ec1fbaaba..48d60a1b8 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.cc +++ b/libretroshare/src/gxstrans/p3gxstrans.cc @@ -20,11 +20,12 @@ #include "gxstrans/p3gxstrans.h" #include "util/stacktrace.h" +#define DEBUG_GXSTRANS 1 typedef unsigned int uint; RsGxsTrans *rsGxsTrans = NULL ; -const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 60; // every 20 mins. Could be less. +const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 900; // every 15 mins. Could be less. p3GxsTrans::~p3GxsTrans() { @@ -71,7 +72,9 @@ bool p3GxsTrans::sendData( RsGxsTransId& mailId, const uint8_t* data, uint32_t size, RsGxsTransEncryptionMode cm ) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::sendEmail(...)" << std::endl; +#endif if(!mIdService.isOwnId(own_gxsid)) { @@ -127,7 +130,9 @@ void p3GxsTrans::registerGxsTransClient( void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::handleResponse(" << token << ", " << req_type << ")" << std::endl; +#endif bool changed = false ; switch (req_type) @@ -190,8 +195,10 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) * avoid to create yet another never used mail distribution group. */ +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans::handleResponse(...) preferredGroupId.isNu" << "ll() let's create a new group." << std::endl; +#endif uint32_t token; publishGroup(token, new RsGxsTransGroupItem()); queueRequest(token, GROUP_CREATE); @@ -201,7 +208,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) } case GROUP_CREATE: { +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans::handleResponse(...) GROUP_CREATE" << std::endl; +#endif RsGxsGroupId grpId; acknowledgeTokenGrp(token, grpId); supersedePreferredGroup(grpId); @@ -209,7 +218,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type) } case MAILS_UPDATE: { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::handleResponse(...) MAILS_UPDATE" << std::endl; +#endif typedef std::map > GxsMsgDataMap; GxsMsgDataMap gpMsgMap; getMsgData(token, gpMsgMap); @@ -302,7 +313,9 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() std::map grp; mDs->retrieveNxsGrps(grp, true, true); +#ifdef DEBUG_GXSTRANS std::cerr << "GxsTransIntegrityCleanupThread::run()" << std::endl; +#endif // compute hash and compare to stored value, if it fails then simply add it // to list @@ -347,13 +360,17 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() std::cerr << " Unrecocognised item type!" << std::endl; else if(NULL != (mitem = dynamic_cast(item))) { +#ifdef DEBUG_GXSTRANS std::cerr << " " << msg->metaData->mMsgId << ": Mail data with ID " << std::hex << std::setfill('0') << std::setw(16) << mitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl; +#endif stored_msgs[mitem->mailId] = std::make_pair(msg->metaData->mGroupId,msg->metaData->mMsgId) ; } else if(NULL != (pitem = dynamic_cast(item))) { +#ifdef DEBUG_GXSTRANS std::cerr << " " << msg->metaData->mMsgId << ": Signed rcpt of ID " << std::hex << pitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl; +#endif received_msgs.push_back(pitem->mailId) ; } @@ -370,11 +387,13 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() GxsMsgReq msgsToDel ; +#ifdef DEBUG_GXSTRANS std::cerr << "Msg removal report:" << std::endl; std::cerr << " Per user size and count: " << std::endl; for(std::map::const_iterator it(totalMessageSizeAndCount.begin());it!=totalMessageSizeAndCount.end();++it) std::cerr << " " << it->first << ": " << it->second.count << " messages, for a total size of " << it->second.size << " bytes." << std::endl; +#endif for(std::list::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it) { @@ -384,7 +403,9 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run() { msgsToDel[it2->second.first].push_back(it2->second.second); +#ifdef DEBUG_GXSTRANS std::cerr << " scheduling msg " << std::hex << it2->second.first << "," << it2->second.second << " for deletion." << std::endl; +#endif } } @@ -416,7 +437,9 @@ void p3GxsTrans::service_tick() std::cerr << "Cleanup thread is already running. Not running it again!" << std::endl; else { +#ifdef DEBUG_GXSTRANS std::cerr << "Starting GxsIntegrity cleanup thread." << std::endl; +#endif mCleanupThread->start() ; mLastMsgCleanup = now ; @@ -434,16 +457,20 @@ void p3GxsTrans::service_tick() if(!msgToDel.empty()) { +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl; +#endif uint32_t token ; deleteMsgs(token,msgToDel); } mCleanupThread->getPerUserStatistics(per_user_statistics) ; +#ifdef DEBUG_GXSTRANS std::cerr << "p3GxsTrans: Got new set of per user statistics:"<< std::endl; for(std::map::const_iterator it(per_user_statistics.begin());it!=per_user_statistics.end();++it) std::cerr << " " << it->first << ": " << it->second.count << " " << it->second.size << std::endl; +#endif delete mCleanupThread; mCleanupThread=NULL ; @@ -488,6 +515,7 @@ void p3GxsTrans::service_tick() } else { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::service_tick() " << "GXS_MAIL_SUBTYPE_MAIL handling: " << msg->meta.mMsgId @@ -497,6 +525,7 @@ void p3GxsTrans::service_tick() << " mailId: "<< msg->mailId << " payload.size(): " << msg->payload.size() << std::endl; +#endif handleEncryptedMail(msg); } break; @@ -547,14 +576,18 @@ void p3GxsTrans::service_tick() RsGenExchange::ServiceCreate_Return p3GxsTrans::service_CreateGroup( RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/ ) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::service_CreateGroup(...) " << grpItem->meta.mGroupId << std::endl; +#endif return SERVICE_CREATE_SUCCESS; } void p3GxsTrans::notifyChanges(std::vector& changes) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...)" << std::endl; +#endif for( std::vector::const_iterator it = changes.begin(); it != changes.end(); ++it ) { @@ -563,12 +596,16 @@ void p3GxsTrans::notifyChanges(std::vector& changes) if (grpChange) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...) grpChange" << std::endl; +#endif requestGroupsData(&(grpChange->mGrpIdList)); } else if(msgChange) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...) msgChange" << std::endl; +#endif uint32_t token; RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA; RsGenExchange::getTokenService()->requestMsgInfo( token, 0xcaca, @@ -584,9 +621,11 @@ void p3GxsTrans::notifyChanges(std::vector& changes) for(itT vit = msgsIds.begin(); vit != msgsIds.end(); ++vit) { const RsGxsMessageId& msgId = *vit; +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::notifyChanges(...) got " << "notification for message " << msgId << " in group " << grpId << std::endl; +#endif } } } @@ -636,7 +675,9 @@ bool p3GxsTrans::requestGroupsData(const std::list* groupIds) bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::handleEcryptedMail(...)" << std::endl; +#endif std::set decryptIds; std::list ownIds; @@ -659,8 +700,11 @@ bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail) uint16_t csri = 0; uint32_t off = 0; getRawUInt16(&mail->payload[0], mail->payload.size(), &off, &csri); + +#ifdef DEBUG_GXSTRANS std::cerr << "service: " << csri << " got CLEAR_TEXT mail!" << std::endl; +#endif /* As we cannot verify recipient without encryption, just pass the hint * as recipient */ return dispatchDecryptedMail( mail->meta.mAuthorId, mail->recipientHint, @@ -699,8 +743,10 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId, const uint8_t* decrypted_data, uint32_t decrypted_data_size ) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::dispatchDecryptedMail(, , " << decrypted_data_size << ")" << std::endl; +#endif uint16_t csri = 0; uint32_t offset = 0; @@ -726,8 +772,10 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId, << " wrong is happening!" << std::endl; return false; } +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::dispatchDecryptedMail(...) dispatching receipt " << "with: msgId: " << receipt->msgId << std::endl; +#endif std::vector rcct; rcct.push_back(receipt); RsGenExchange::notifyNewMessages(rcct); @@ -872,6 +920,7 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr) } case GxsTransSendStatus::PENDING_PUBLISH: { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::sendEmail(...) sending mail to: " << pr.recipient << " with cryptoType: " @@ -880,6 +929,7 @@ void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr) << " receiptId: " << pr.mailItem.mailId << " payload size: " << pr.mailItem.payload.size() << std::endl; +#endif RsGxsTransMailItem *mail_item = new RsGxsTransMailItem(pr.mailItem); @@ -973,23 +1023,31 @@ RsSerialiser* p3GxsTrans::setupSerialiser() bool p3GxsTrans::saveList(bool &cleanup, std::list& saveList) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif mOutgoingMutex.lock(); mIngoingMutex.lock(); for ( auto& kv : mOutgoingQueue ) { +#ifdef DEBUG_GXSTRANS std::cerr << "Saving outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::dec << "Group id: " << kv.second.group_id << ", TS=" << kv.second.sent_ts << std::endl; +#endif saveList.push_back(&kv.second); } for ( auto& kv : mIncomingQueue ) { +#ifdef DEBUG_GXSTRANS std::cerr << "Saving incoming item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::endl; +#endif saveList.push_back(kv.second); } +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif cleanup = false; return true; @@ -1003,9 +1061,11 @@ void p3GxsTrans::saveDone() bool p3GxsTrans::loadList(std::list&loadList) { +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif for(auto& v : loadList) switch(static_cast(v->PacketSubType())) @@ -1043,7 +1103,9 @@ bool p3GxsTrans::loadList(std::list&loadList) RS_STACK_MUTEX(mOutgoingMutex); mOutgoingQueue.insert(prMap::value_type(ot.mailItem.mailId, ot)); +#ifdef DEBUG_GXSTRANS std::cerr << "Loaded outgoing item (converted), ID " << std::hex << std::setfill('0') << std::setw(16) << ot.mailItem.mailId<< std::dec << ", Group id: " << ot.group_id << ", TS=" << ot.sent_ts << std::endl; +#endif } delete v; break; @@ -1058,7 +1120,9 @@ bool p3GxsTrans::loadList(std::list&loadList) mOutgoingQueue.insert( prMap::value_type(ot->mailItem.mailId, *ot)); +#ifdef DEBUG_GXSTRANS std::cerr << "Loading outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << ot->mailItem.mailId<< std::dec << "Group id: " << ot->group_id << ", TS=" << ot->sent_ts << std::endl; +#endif } delete v; break; @@ -1073,9 +1137,11 @@ bool p3GxsTrans::loadList(std::list&loadList) break; } +#ifdef DEBUG_GXSTRANS std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl; +#endif return true; } @@ -1092,7 +1158,7 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s // ------------+----------------------+------------------ // Negative | 0 | 0 // This is already handled by the anti-spam // R-Negative | 10 | 10k - // Neutral | 20 | 20k + // Neutral | 100 | 20k // R-Positive | 400 | 1M // Positive | 1000 | 2M @@ -1100,18 +1166,20 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s // default values below used as backup. static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ; - static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 20 ; + static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 40 ; static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ; static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ; static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ; - static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 20 * 1024 ; + static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 200 * 1024 ; static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ; static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ; uint32_t max_count = 0 ; uint32_t max_size = 0 ; - RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId); + uint32_t identity_flags = 0 ; + + RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId,&identity_flags); switch(rep_lev) { @@ -1133,21 +1201,39 @@ bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_s break ; } + bool pgp_linked = identity_flags & RS_IDENTITY_FLAGS_PGP_LINKED ; + + if(rep_lev <= RsReputations::REPUTATION_NEUTRAL && !pgp_linked) + { + max_count /= 10 ; + max_size /= 10 ; + } + RS_STACK_MUTEX(mPerUserStatsMutex); MsgSizeCount& s(per_user_statistics[msgMeta->mAuthorId]) ; - std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", current (size,cnt)=(" +#ifdef DEBUG_GXSTRANS + std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", pgp_linked=" << pgp_linked << ", current (size,cnt)=(" << s.size << "," << s.count << ") reputation=" << rep_lev << ", limits=(" << max_size << "," << max_count << ") " ; +#endif if(s.size + msg_size > max_size || 1+s.count > max_count) { +#ifdef DEBUG_GXSTRANS std::cerr << "=> rejected." << std::endl; +#endif return false ; } else { +#ifdef DEBUG_GXSTRANS std::cerr << "=> accepted." << std::endl; +#endif + + s.count++ ; + s.size += msg_size ; // update the statistics, so that it's not possible to pass a bunch of msgs at once below the limits. + return true ; } } diff --git a/libretroshare/src/gxstrans/p3gxstrans.h b/libretroshare/src/gxstrans/p3gxstrans.h index 3dd0a7567..1b029444a 100644 --- a/libretroshare/src/gxstrans/p3gxstrans.h +++ b/libretroshare/src/gxstrans/p3gxstrans.h @@ -101,7 +101,7 @@ public: mIngoingMutex("p3GxsTrans ingoing queue map mutex"), mPerUserStatsMutex("p3GxsTrans user stats mutex") { - mLastMsgCleanup = time(NULL) - 60; // to be changed into 0 + mLastMsgCleanup = time(NULL) - MAX_DELAY_BETWEEN_CLEANUPS + 30; // always check 30 secs after start mCleanupThread = NULL ; }