RetroShare/libretroshare/src/gxstrans/p3gxstrans.cc
Gioacchino Mazzurco 329050a9c2
Use safer rstime_t instead of time_t
Avoid problems to serialization on different platforms, without breaking
nested STL containers serialization.

The conversion have been made with sed, and checked with grep, plus
kdiff3 visual ispection, plus rutime tests, so it should be fine.
2018-10-07 13:07:06 +02:00

1318 lines
42 KiB
C++

/*******************************************************************************
* libretroshare/src/gxstrans: p3gxstrans.cc *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2016-2017 Gioacchino Mazzurco <gio@eigenlab.org> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#include "util/rsdir.h"
#include "gxstrans/p3gxstrans.h"
#include "util/stacktrace.h"
//#define DEBUG_GXSTRANS 1
typedef unsigned int uint;
RsGxsTrans *rsGxsTrans = NULL ;
const uint32_t p3GxsTrans::MAX_DELAY_BETWEEN_CLEANUPS = 900; // every 15 mins. Could be less.
p3GxsTrans::~p3GxsTrans()
{
// (cyril) this cannot be called here! There's chances the thread that saves configs will be dead already!
//p3Config::saveConfiguration();
{
RS_STACK_MUTEX(mIngoingMutex);
for ( auto& kv : mIncomingQueue) delete kv.second;
}
}
bool p3GxsTrans::getStatistics(GxsTransStatistics& stats)
{
{
RS_STACK_MUTEX(mDataMutex);
stats.prefered_group_id = mPreferredGroupId;
}
stats.outgoing_records.clear();
{
RS_STACK_MUTEX(mOutgoingMutex);
for ( auto it = mOutgoingQueue.begin(); it != mOutgoingQueue.end(); ++it)
{
const OutgoingRecord& pr(it->second);
RsGxsTransOutgoingRecord rec ;
rec.status = pr.status ;
rec.send_TS = pr.sent_ts ;
rec.group_id = pr.group_id ;
rec.trans_id = pr.mailItem.mailId ;
rec.recipient = pr.recipient ;
rec.data_size = pr.mailData.size();
rec.data_hash = RsDirUtil::sha1sum(pr.mailData.data(),pr.mailData.size());
rec.client_service = pr.clientService ;
stats.outgoing_records.push_back(rec) ;
}
}
return true;
}
bool p3GxsTrans::sendData( RsGxsTransId& mailId,
GxsTransSubServices service,
const RsGxsId& own_gxsid, const RsGxsId& recipient,
const uint8_t* data, uint32_t size,
RsGxsTransEncryptionMode cm )
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::sendEmail(...)" << std::endl;
#endif
if(!mIdService.isOwnId(own_gxsid))
{
std::cerr << "p3GxsTrans::sendEmail(...) isOwnId(own_gxsid) false!"
<< std::endl;
return false;
}
if(recipient.isNull())
{
std::cerr << "p3GxsTrans::sendEmail(...) got invalid recipient"
<< std::endl;
print_stacktrace();
return false;
}
OutgoingRecord pr( recipient, service, data, size );
pr.mailItem.clear();
pr.author = own_gxsid; //pr.mailItem.meta.mAuthorId = own_gxsid;
pr.mailItem.cryptoType = cm;
pr.mailItem.mailId = RSRandom::random_u64();
{
RS_STACK_MUTEX(mOutgoingMutex);
mOutgoingQueue.insert(prMap::value_type(pr.mailItem.mailId, pr));
}
mailId = pr.mailItem.mailId;
IndicateConfigChanged(); // This causes the saving of the message after all data has been filled in.
return true;
}
bool p3GxsTrans::querySendStatus(RsGxsTransId mailId, GxsTransSendStatus& st)
{
RS_STACK_MUTEX(mOutgoingMutex);
auto it = mOutgoingQueue.find(mailId);
if( it != mOutgoingQueue.end() )
{
st = it->second.status;
return true;
}
return false;
}
void p3GxsTrans::registerGxsTransClient(
GxsTransSubServices serviceType, GxsTransClient* service)
{
RS_STACK_MUTEX(mServClientsMutex);
mServClients[serviceType] = service;
}
void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::handleResponse(" << token << ", " << req_type << ")" << std::endl;
#endif
bool changed = false ;
switch (req_type)
{
case GROUPS_LIST:
{
#ifdef DEBUG_GXSTRANS
std::cerr << " Reviewing available groups. " << std::endl;
#endif
std::vector<RsGxsGrpItem*> groups;
getGroupData(token, groups);
// First recompute the prefered group Id.
{
RS_STACK_MUTEX(mDataMutex);
for( auto grp : groups )
{
locked_supersedePreferredGroup(grp->meta.mGroupId);
if(RsGenExchange::getStoragePeriod(grp->meta.mGroupId) != GXS_STORAGE_PERIOD)
{
std::cerr << "(WW) forcing storage period in GxsTrans group " << grp->meta.mGroupId << " to " << GXS_STORAGE_PERIOD << " seconds. Value was " << RsGenExchange::getStoragePeriod(grp->meta.mGroupId) << std::endl;
RsGenExchange::setStoragePeriod(grp->meta.mGroupId,GXS_STORAGE_PERIOD) ;
}
}
}
#ifdef DEBUG_GXSTRANS
std::cerr << " computed preferred group id: " << mPreferredGroupId << std::endl;
#endif
for( auto grp : groups )
{
/* For each group check if it is better candidate then
* preferredGroupId, if it is supplant it and subscribe if it is not
* subscribed yet.
* Otherwise if it has recent messages subscribe.
* If the group was already subscribed has no recent messages
* unsubscribe.
*/
const RsGroupMetaData& meta = grp->meta;
bool subscribed = IS_GROUP_SUBSCRIBED(meta.mSubscribeFlags);
// if mLastPost is 0, then the group is not subscribed, so it only has impact on shouldSubscribe. In any case, a group
// with no information shouldn't be subscribed, so the olderThen() test is still valid in the case mLastPost=0.
bool old = olderThen( meta.mLastPost, UNUSED_GROUP_UNSUBSCRIBE_INTERVAL );
uint32_t token;
bool shouldSubscribe = false ;
bool shouldUnSubscribe = false ;
{
RS_STACK_MUTEX(mDataMutex);
shouldSubscribe = (!subscribed) && ((!old)|| meta.mGroupId == mPreferredGroupId );
shouldUnSubscribe = ( subscribed) && old && meta.mGroupId != mPreferredGroupId;
}
#ifdef DEBUG_GXSTRANS
std::cout << " group " << grp->meta.mGroupId << ", subscribed: " << subscribed << " last post: " << meta.mLastPost << " should subscribe: "<< shouldSubscribe
<< ", should unsubscribe: " << shouldUnSubscribe << std::endl;
#endif
if(shouldSubscribe)
RsGenExchange::subscribeToGroup(token, meta.mGroupId, true);
else if(shouldUnSubscribe)
RsGenExchange::subscribeToGroup(token, meta.mGroupId, false);
#ifdef GXS_MAIL_GRP_DEBUG
char buff[30];
struct tm* timeinfo;
timeinfo = localtime(&meta.mLastPost);
strftime(buff, sizeof(buff), "%Y %b %d %H:%M", timeinfo);
std::cout << "p3GxsTrans::handleResponse(...) GROUPS_LIST "
<< "meta.mGroupId: " << meta.mGroupId
<< " meta.mLastPost: " << buff
<< " subscribed: " << subscribed
<< " old: " << old
<< " shoudlSubscribe: " << shoudlSubscribe
<< " shoudlUnSubscribe: " << shoudlUnSubscribe
<< std::endl;
#endif // GXS_MAIL_GRP_DEBUG
delete grp;
}
bool have_preferred_group = false ;
{
RS_STACK_MUTEX(mDataMutex);
have_preferred_group = !mPreferredGroupId.isNull();
}
if(!have_preferred_group)
{
/* This is true only at first run when we haven't received mail
* distribuition groups from friends
* TODO: We should check if we have some connected friend too, to
* avoid to create yet another never used mail distribution group.
*/
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans::handleResponse(...) preferredGroupId.isNu"
<< "ll() let's create a new group." << std::endl;
#endif
uint32_t token;
publishGroup(token, new RsGxsTransGroupItem());
queueRequest(token, GROUP_CREATE);
}
break;
}
case GROUP_CREATE:
{
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans::handleResponse(...) GROUP_CREATE" << std::endl;
#endif
RsGxsGroupId grpId;
acknowledgeTokenGrp(token, grpId);
RS_STACK_MUTEX(mDataMutex);
locked_supersedePreferredGroup(grpId);
break;
}
case MAILS_UPDATE:
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::handleResponse(...) MAILS_UPDATE" << std::endl;
#endif
typedef std::map<RsGxsGroupId, std::vector<RsGxsMsgItem*> > GxsMsgDataMap;
GxsMsgDataMap gpMsgMap;
getMsgData(token, gpMsgMap);
for ( GxsMsgDataMap::iterator gIt = gpMsgMap.begin();
gIt != gpMsgMap.end(); ++gIt )
{
typedef std::vector<RsGxsMsgItem*> vT;
vT& mv(gIt->second);
for( vT::const_iterator mIt = mv.begin(); mIt != mv.end(); ++mIt )
{
RsGxsMsgItem* gIt = *mIt;
switch(static_cast<GxsTransItemsSubtypes>(gIt->PacketSubType()))
{
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
{
RsGxsTransBaseMsgItem* mb = dynamic_cast<RsGxsTransBaseMsgItem*>(*mIt);
if(mb)
{
RS_STACK_MUTEX(mIngoingMutex);
mIncomingQueue.insert(inMap::value_type(mb->mailId,mb));
changed = true ;
}
else
std::cerr << "p3GxsTrans::handleResponse(...) "
<< "GXS_MAIL_SUBTYPE_MAIL cast error, "
<< "something really wrong is happening"
<< std::endl;
break;
}
default:
std::cerr << "p3GxsTrans::handleResponse(...) MAILS_UPDATE "
<< "Unknown mail subtype : "
<< static_cast<uint>(gIt->PacketSubType())
<< std::endl;
delete gIt;
break;
}
}
}
break;
}
default:
std::cerr << "p3GxsTrans::handleResponse(...) Unknown req_type: "
<< req_type << std::endl;
break;
}
if(changed)
IndicateConfigChanged();
}
void p3GxsTrans::GxsTransIntegrityCleanupThread::getPerUserStatistics(std::map<RsGxsId,MsgSizeCount>& m)
{
RS_STACK_MUTEX(mMtx) ;
m = total_message_size_and_count ;
}
void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& m)
{
RS_STACK_MUTEX(mMtx) ;
m = mMsgToDel ;
mMsgToDel.clear();
}
// This method does two things:
// 1 - cleaning up old messages and messages for which an ACK has been received.
// 2 - building per user statistics across groups. This is important because it allows to mitigate the excess of
// messages, which might be due to spam.
//
// Note: the anti-spam system is disabled the level of GXS, because we want to allow to send anonymous messages
// between identities that might not have a reputation yet. Still, messages from identities with a bad reputation
// are still deleted by GXS.
//
// The group limits are enforced according to the following rules:
// * a temporal sliding window is computed for each identity and the number of messages signed by this identity is counted
// *
//
//
// Deleted messages are notified to the RsGxsNetService part which keeps a list of delete messages so as not to request them again
// during the same session. This allows to safely delete messages while avoiding re-synchronisation from friend nodes.
void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
{
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true);
#ifdef DEBUG_GXSTRANS
std::cerr << "GxsTransIntegrityCleanupThread::run()" << std::endl;
#endif
// compute hash and compare to stored value, if it fails then simply add it
// to list
GxsMsgReq grps;
for(std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin(); git != grp.end(); ++git)
{
RsNxsGrp* grp = git->second;
// store the group for retrieveNxsMsgs
grps[grp->grpId];
delete grp;
}
// now messages
std::map<RsGxsId,MsgSizeCount> totalMessageSizeAndCount;
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> > stored_msgs ;
std::list<RsGxsTransId> received_msgs ;
GxsMsgResult msgs;
mDs->retrieveNxsMsgs(grps, msgs, false, true);
for(GxsMsgResult::iterator mit = msgs.begin();mit != msgs.end(); ++mit)
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
#ifdef DEBUG_GXSTRANS
std::cerr << "Group " << mit->first << ": " << std::endl;
#endif
for(; vit != msgV.end(); ++vit)
{
RsNxsMsg* msg = *vit;
RsGxsTransSerializer s ;
uint32_t size = msg->msg.bin_len;
RsItem *item = s.deserialise(msg->msg.bin_data,&size);
RsGxsTransMailItem *mitem ;
RsGxsTransPresignedReceipt *pitem ;
if(item == NULL)
std::cerr << " Unrecocognised item type!" << std::endl;
else if(NULL != (mitem = dynamic_cast<RsGxsTransMailItem*>(item)))
{
#ifdef DEBUG_GXSTRANS
std::cerr << " " << msg->metaData->mMsgId << ": Mail data with ID " << std::hex << std::setfill('0') << std::setw(16) << mitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl;
#endif
stored_msgs[mitem->mailId] = std::make_pair(msg->metaData->mGroupId,msg->metaData->mMsgId) ;
}
else if(NULL != (pitem = dynamic_cast<RsGxsTransPresignedReceipt*>(item)))
{
#ifdef DEBUG_GXSTRANS
std::cerr << " " << msg->metaData->mMsgId << ": Signed rcpt of ID " << std::hex << pitem->mailId << std::dec << " from " << msg->metaData->mAuthorId << " size: " << msg->msg.bin_len << std::endl;
#endif
received_msgs.push_back(pitem->mailId) ;
}
else
std::cerr << " Unknown item type!" << std::endl;
totalMessageSizeAndCount[msg->metaData->mAuthorId].size += msg->msg.bin_len ;
totalMessageSizeAndCount[msg->metaData->mAuthorId].count++;
delete msg;
if(item != NULL)
delete item ;
}
}
// From the collected information, build a list of group messages to delete.
GxsMsgReq msgsToDel ;
#ifdef DEBUG_GXSTRANS
std::cerr << "Msg removal report:" << std::endl;
std::cerr << " Per user size and count: " << std::endl;
for(std::map<RsGxsId,MsgSizeCount>::const_iterator it(totalMessageSizeAndCount.begin());it!=totalMessageSizeAndCount.end();++it)
std::cerr << " " << it->first << ": " << it->second.count << " messages, for a total size of " << it->second.size << " bytes." << std::endl;
#endif
for(std::list<RsGxsTransId>::const_iterator it(received_msgs.begin());it!=received_msgs.end();++it)
{
std::map<RsGxsTransId,std::pair<RsGxsGroupId,RsGxsMessageId> >::const_iterator it2 = stored_msgs.find(*it) ;
if(stored_msgs.end() != it2)
{
msgsToDel[it2->second.first].insert(it2->second.second);
#ifdef DEBUG_GXSTRANS
std::cerr << " scheduling msg " << std::hex << it2->second.first << "," << it2->second.second << " for deletion." << std::endl;
#endif
}
}
RS_STACK_MUTEX(mMtx) ;
mMsgToDel = msgsToDel ;
total_message_size_and_count = totalMessageSizeAndCount;
mDone = true;
}
bool p3GxsTrans::GxsTransIntegrityCleanupThread::isDone()
{
RS_STACK_MUTEX(mMtx) ;
return mDone ;
}
void p3GxsTrans::service_tick()
{
GxsTokenQueue::checkRequests();
rstime_t now = time(NULL);
bool changed = false ;
if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now)
{
RS_STACK_MUTEX(mPerUserStatsMutex);
if(!mCleanupThread)
mCleanupThread = new GxsTransIntegrityCleanupThread(getDataStore());
if(mCleanupThread->isRunning())
std::cerr << "Cleanup thread is already running. Not running it again!" << std::endl;
else
{
#ifdef DEBUG_GXSTRANS
std::cerr << "Starting GxsIntegrity cleanup thread." << std::endl;
#endif
mCleanupThread->start() ;
mLastMsgCleanup = now ;
}
// This forces to review all groups, and decide to subscribe or not to each of them.
requestGroupsData();
}
// now grab collected messages to delete
if(mCleanupThread != NULL && mCleanupThread->isDone())
{
RS_STACK_MUTEX(mPerUserStatsMutex);
GxsMsgReq msgToDel ;
mCleanupThread->getMessagesToDelete(msgToDel) ;
if(!msgToDel.empty())
{
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans::service_tick(): deleting messages." << std::endl;
#endif
uint32_t token ;
deleteMsgs(token,msgToDel);
}
mCleanupThread->getPerUserStatistics(per_user_statistics) ;
#ifdef DEBUG_GXSTRANS
std::cerr << "p3GxsTrans: Got new set of per user statistics:"<< std::endl;
for(std::map<RsGxsId,MsgSizeCount>::const_iterator it(per_user_statistics.begin());it!=per_user_statistics.end();++it)
std::cerr << " " << it->first << ": " << it->second.count << " " << it->second.size << std::endl;
#endif
delete mCleanupThread;
mCleanupThread=NULL ;
}
{
RS_STACK_MUTEX(mOutgoingMutex);
for ( auto it = mOutgoingQueue.begin(); it != mOutgoingQueue.end(); )
{
OutgoingRecord& pr(it->second);
GxsTransSendStatus oldStatus = pr.status;
locked_processOutgoingRecord(pr);
if (oldStatus != pr.status) notifyClientService(pr);
if( pr.status >= GxsTransSendStatus::RECEIPT_RECEIVED )
{
it = mOutgoingQueue.erase(it);
changed = true ;
}
else ++it;
}
}
{
RS_STACK_MUTEX(mIngoingMutex);
for( auto it = mIncomingQueue.begin(); it != mIncomingQueue.end(); )
{
switch(static_cast<GxsTransItemsSubtypes>( it->second->PacketSubType()))
{
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
{
RsGxsTransMailItem* msg = dynamic_cast<RsGxsTransMailItem*>(it->second);
if(!msg)
{
std::cerr << "p3GxsTrans::service_tick() (EE) "
<< "GXS_MAIL_SUBTYPE_MAIL dynamic_cast failed, "
<< "something really wrong is happening!"
<< std::endl;
}
else
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::service_tick() "
<< "GXS_MAIL_SUBTYPE_MAIL handling: "
<< msg->meta.mMsgId
<< " with cryptoType: "
<< static_cast<uint32_t>(msg->cryptoType)
<< " recipientHint: " << msg->recipientHint
<< " mailId: "<< msg->mailId
<< " payload.size(): " << msg->payload.size()
<< std::endl;
#endif
handleEncryptedMail(msg);
}
break;
}
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
{
RsGxsTransPresignedReceipt* rcpt = dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
if(!rcpt)
{
std::cerr << "p3GxsTrans::service_tick() (EE) "
<< "GXS_MAIL_SUBTYPE_RECEIPT dynamic_cast failed,"
<< " something really wrong is happening!"
<< std::endl;
}
else if(mIdService.isOwnId(rcpt->meta.mAuthorId))
{
/* It is a receipt for a mail sent by this node live it in
* ingoingQueue so processOutgoingRecord(...) will take care
* of it at next tick */
++it;
continue;
}
else
{
/* TODO: It is a receipt for a message sent by someone else
* we can delete original mail from our GXS DB without
* waiting for GXS_STORAGE_PERIOD */
}
break;
}
default:
std::cerr << "p3GxsTrans::service_tick() (EE) got something "
<< "really unknown into ingoingQueue!!" << std::endl;
break;
}
delete it->second ;
it = mIncomingQueue.erase(it);
changed = true ;
}
}
if(changed)
IndicateConfigChanged();
}
RsGenExchange::ServiceCreate_Return p3GxsTrans::service_CreateGroup(
RsGxsGrpItem* /*grpItem*/, RsTlvSecurityKeySet& /*keySet*/ )
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::service_CreateGroup(...) "
<< grpItem->meta.mGroupId << std::endl;
#endif
return SERVICE_CREATE_SUCCESS;
}
void p3GxsTrans::notifyChanges(std::vector<RsGxsNotify*>& changes)
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::notifyChanges(...)" << std::endl;
#endif
for( std::vector<RsGxsNotify*>::const_iterator it = changes.begin();
it != changes.end(); ++it )
{
RsGxsGroupChange* grpChange = dynamic_cast<RsGxsGroupChange *>(*it);
RsGxsMsgChange* msgChange = dynamic_cast<RsGxsMsgChange *>(*it);
if (grpChange)
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::notifyChanges(...) grpChange" << std::endl;
#endif
requestGroupsData(&(grpChange->mGrpIdList));
}
else if(msgChange)
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::notifyChanges(...) msgChange" << std::endl;
#endif
uint32_t token;
RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
RsGenExchange::getTokenService()->requestMsgInfo( token, 0xcaca,
opts, msgChange->msgChangeMap );
GxsTokenQueue::queueRequest(token, MAILS_UPDATE);
#ifdef DEBUG_GXSTRANS
for( GxsMsgReq::const_iterator it = msgChange->msgChangeMap.begin();
it != msgChange->msgChangeMap.end(); ++it )
{
const RsGxsGroupId& grpId = it->first;
const std::vector<RsGxsMessageId>& msgsIds = it->second;
typedef std::vector<RsGxsMessageId>::const_iterator itT;
for(itT vit = msgsIds.begin(); vit != msgsIds.end(); ++vit)
{
const RsGxsMessageId& msgId = *vit;
std::cout << "p3GxsTrans::notifyChanges(...) got "
<< "notification for message " << msgId
<< " in group " << grpId << std::endl;
}
}
#endif
}
}
RsGxsIfaceHelper::receiveChanges(changes);
}
uint32_t p3GxsTrans::AuthenPolicy()
{
uint32_t policy = 0;
uint32_t flag = 0;
// This ensure propagated message have valid author signature
flag = GXS_SERV::MSG_AUTHEN_ROOT_AUTHOR_SIGN |
GXS_SERV::MSG_AUTHEN_CHILD_AUTHOR_SIGN;
RsGenExchange::setAuthenPolicyFlag( flag, policy,
RsGenExchange::PUBLIC_GRP_BITS );
/* This ensure that in for restricted and private groups only authorized
* authors get the messages. Really not used ATM but don't hurts. */
flag |= GXS_SERV::MSG_AUTHEN_ROOT_PUBLISH_SIGN |
GXS_SERV::MSG_AUTHEN_CHILD_PUBLISH_SIGN;
RsGenExchange::setAuthenPolicyFlag( flag, policy,
RsGenExchange::RESTRICTED_GRP_BITS );
RsGenExchange::setAuthenPolicyFlag( flag, policy,
RsGenExchange::PRIVATE_GRP_BITS );
/* This seems never used RetroShare wide but we should investigate it
* more before considering this conclusive */
flag = 0;
RsGenExchange::setAuthenPolicyFlag( flag, policy,
RsGenExchange::GRP_OPTION_BITS );
return policy;
}
bool p3GxsTrans::requestGroupsData(const std::list<RsGxsGroupId>* groupIds)
{
// std::cout << "p3GxsTrans::requestGroupsList()" << std::endl;
uint32_t token=0;
RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA;
if(!groupIds)
RsGenExchange::getTokenService()->requestGroupInfo(token, 0xcaca, opts);
else
RsGenExchange::getTokenService()->requestGroupInfo(token, 0xcaca, opts, *groupIds);
GxsTokenQueue::queueRequest(token, GROUPS_LIST);
return true;
}
bool p3GxsTrans::handleEncryptedMail(const RsGxsTransMailItem* mail)
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::handleEcryptedMail(...)" << std::endl;
#endif
std::set<RsGxsId> decryptIds;
std::list<RsGxsId> ownIds;
mIdService.getOwnIds(ownIds);
for(auto it = ownIds.begin(); it != ownIds.end(); ++it)
if(mail->maybeRecipient(*it)) decryptIds.insert(*it);
// Hint match none of our own ids
if(decryptIds.empty())
{
std::cout << "p3GxsTrans::handleEcryptedMail(...) hint doesn't match"
<< std::endl;
return true;
}
switch (mail->cryptoType)
{
case RsGxsTransEncryptionMode::CLEAR_TEXT:
{
uint16_t csri = 0;
uint32_t off = 0;
getRawUInt16(&mail->payload[0], mail->payload.size(), &off, &csri);
#ifdef DEBUG_GXSTRANS
std::cerr << "service: " << csri << " got CLEAR_TEXT mail!"
<< std::endl;
#endif
/* As we cannot verify recipient without encryption, just pass the hint
* as recipient */
return dispatchDecryptedMail( mail->meta.mAuthorId, mail->recipientHint,
&mail->payload[0], mail->payload.size() );
}
case RsGxsTransEncryptionMode::RSA:
{
bool ok = true;
for( std::set<RsGxsId>::const_iterator it = decryptIds.begin();
it != decryptIds.end(); ++it )
{
const RsGxsId& decryptId(*it);
uint8_t* decrypted_data = NULL;
uint32_t decrypted_data_size = 0;
uint32_t decryption_error;
if( mIdService.decryptData( &mail->payload[0],
mail->payload.size(), decrypted_data,
decrypted_data_size, decryptId,
decryption_error ) )
ok = ok && dispatchDecryptedMail( mail->meta.mAuthorId,
decryptId, decrypted_data,
decrypted_data_size );
free(decrypted_data);
}
return ok;
}
default:
std::cout << "Unknown encryption type:"
<< static_cast<uint32_t>(mail->cryptoType) << std::endl;
return false;
}
}
bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId,
const RsGxsId& decryptId,
const uint8_t* decrypted_data,
uint32_t decrypted_data_size )
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::dispatchDecryptedMail(, , " << decrypted_data_size
<< ")" << std::endl;
#endif
uint16_t csri = 0;
uint32_t offset = 0;
if(!getRawUInt16( decrypted_data, decrypted_data_size, &offset, &csri))
{
std::cerr << "p3GxsTrans::dispatchDecryptedMail(...) (EE) fatal error "
<< "deserializing service type, something really wrong is "
<< "happening!" << std::endl;
return false;
}
GxsTransSubServices rsrvc = static_cast<GxsTransSubServices>(csri);
uint32_t rcptsize = decrypted_data_size - offset;
RsNxsTransPresignedReceipt* receipt =
static_cast<RsNxsTransPresignedReceipt*>(
RsNxsSerialiser(RS_SERVICE_TYPE_GXS_TRANS).deserialise(
const_cast<uint8_t*>(&decrypted_data[offset]), &rcptsize ));
offset += rcptsize;
if(!receipt)
{
std::cerr << "p3GxsTrans::dispatchDecryptedMail(...) (EE) fatal error "
<< "deserializing presigned return receipt , something really"
<< " wrong is happening!" << std::endl;
return false;
}
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::dispatchDecryptedMail(...) dispatching receipt "
<< "with: msgId: " << receipt->msgId << std::endl;
#endif
std::vector<RsNxsMsg*> rcct; rcct.push_back(receipt);
RsGenExchange::receiveNewMessages(rcct);
GxsTransClient* recipientService = NULL;
{
RS_STACK_MUTEX(mServClientsMutex);
recipientService = mServClients[rsrvc];
}
if(recipientService)
return recipientService->receiveGxsTransMail(
authorId, decryptId, &decrypted_data[offset],
decrypted_data_size-offset );
else
{
std::cerr << "p3GxsTrans::dispatchReceivedMail(...) "
<< "got message for unknown service: "
<< csri << std::endl;
return false;
}
}
void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr)
{
//std::cout << "p3GxsTrans::processRecord(...)" << std::endl;
switch (pr.status)
{
case GxsTransSendStatus::PENDING_PROCESSING:
{
pr.mailItem.saltRecipientHint(pr.recipient);
pr.mailItem.saltRecipientHint(RsGxsId::random());
pr.sent_ts = time(NULL) ; //pr.mailItem.meta.mPublishTs = time(NULL);
}
/* fallthrough */
case GxsTransSendStatus::PENDING_PREFERRED_GROUP:
{
RS_STACK_MUTEX(mDataMutex);
if(mPreferredGroupId.isNull())
{
requestGroupsData();
pr.status = GxsTransSendStatus::PENDING_PREFERRED_GROUP;
break;
}
pr.group_id = mPreferredGroupId ; //pr.mailItem.meta.mGroupId = mPreferredGroupId;
}
/* fallthrough */
case GxsTransSendStatus::PENDING_RECEIPT_CREATE:
{
RsGxsTransPresignedReceipt grcpt;
grcpt.meta.mAuthorId = pr.author ; //grcpt.meta = pr.mailItem.meta;
grcpt.meta.mGroupId = pr.group_id ; //grcpt.meta = pr.mailItem.meta;
grcpt.meta.mMsgId.clear() ;
grcpt.meta.mParentId.clear() ;
grcpt.meta.mOrigMsgId.clear() ;
grcpt.meta.mPublishTs = time(NULL);
grcpt.mailId = pr.mailItem.mailId;
uint32_t grsz = RsGxsTransSerializer().size(&grcpt);
std::vector<uint8_t> grsrz;
grsrz.resize(grsz);
RsGxsTransSerializer().serialise(&grcpt, &grsrz[0], &grsz);
{
RS_STACK_MUTEX(mDataMutex);
pr.presignedReceipt.grpId = mPreferredGroupId;
}
pr.presignedReceipt.metaData = new RsGxsMsgMetaData();
*pr.presignedReceipt.metaData = grcpt.meta;
pr.presignedReceipt.msg.setBinData(&grsrz[0], grsz);
}
/* fallthrough */
case GxsTransSendStatus::PENDING_RECEIPT_SIGNATURE: // (cyril) This step is never actually used.
{
switch (RsGenExchange::createMessage(&pr.presignedReceipt))
{
case CREATE_SUCCESS: break;
case CREATE_FAIL_TRY_LATER:
pr.status = GxsTransSendStatus::PENDING_RECEIPT_CREATE;
return;
default:
pr.status = GxsTransSendStatus::FAILED_RECEIPT_SIGNATURE;
goto processingFailed;
}
uint32_t metaSize = pr.presignedReceipt.metaData->serial_size();
std::vector<uint8_t> srx; srx.resize(metaSize);
pr.presignedReceipt.metaData->serialise(&srx[0], &metaSize);
pr.presignedReceipt.meta.setBinData(&srx[0], metaSize);
}
/* fallthrough */
case GxsTransSendStatus::PENDING_PAYLOAD_CREATE:
{
uint16_t serv = static_cast<uint16_t>(pr.clientService);
uint32_t rcptsize = RsGxsTransSerializer().size(&pr.presignedReceipt);
uint32_t datasize = pr.mailData.size();
pr.mailItem.payload.resize(2 + rcptsize + datasize);
uint32_t offset = 0;
setRawUInt16(&pr.mailItem.payload[0], 2, &offset, serv);
RsGxsTransSerializer().serialise(&pr.presignedReceipt,
&pr.mailItem.payload[offset],
&rcptsize);
offset += rcptsize;
memcpy(&pr.mailItem.payload[offset], &pr.mailData[0], datasize);
}
/* fallthrough */
case GxsTransSendStatus::PENDING_PAYLOAD_ENCRYPT:
{
switch (pr.mailItem.cryptoType)
{
case RsGxsTransEncryptionMode::CLEAR_TEXT:
{
std::cerr << "p3GxsTrans::sendMail(...) you are sending a mail "
<< "without encryption, everyone can read it!"
<< std::endl;
break;
}
case RsGxsTransEncryptionMode::RSA:
{
uint8_t* encryptedData = NULL;
uint32_t encryptedSize = 0;
uint32_t encryptError = 0;
if( mIdService.encryptData( &pr.mailItem.payload[0],
pr.mailItem.payload.size(),
encryptedData, encryptedSize,
pr.recipient, encryptError, true ) )
{
pr.mailItem.payload.resize(encryptedSize);
memcpy( &pr.mailItem.payload[0], encryptedData, encryptedSize );
free(encryptedData);
break;
}
else
{
std::cerr << "p3GxsTrans::sendMail(...) RSA encryption failed! "
<< "error_status: " << encryptError << std::endl;
pr.status = GxsTransSendStatus::FAILED_ENCRYPTION;
goto processingFailed;
}
}
case RsGxsTransEncryptionMode::UNDEFINED_ENCRYPTION:
default:
std::cerr << "p3GxsTrans::sendMail(...) attempt to send mail with "
<< "wrong EncryptionMode: "
<< static_cast<uint>(pr.mailItem.cryptoType)
<< " dropping mail!" << std::endl;
pr.status = GxsTransSendStatus::FAILED_ENCRYPTION;
goto processingFailed;
}
}
/* fallthrough */
case GxsTransSendStatus::PENDING_PUBLISH:
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::sendEmail(...) sending mail to: "
<< pr.recipient
<< " with cryptoType: "
<< static_cast<uint>(pr.mailItem.cryptoType)
<< " recipientHint: " << pr.mailItem.recipientHint
<< " receiptId: " << pr.mailItem.mailId
<< " payload size: " << pr.mailItem.payload.size()
<< std::endl;
#endif
RsGxsTransMailItem *mail_item = new RsGxsTransMailItem(pr.mailItem);
// pr.mailItem.meta is *not* serialised. So it is important to not rely on what's in it!
mail_item->meta.mGroupId = pr.group_id ;
mail_item->meta.mAuthorId = pr.author ;
mail_item->meta.mMsgId.clear();
mail_item->meta.mParentId.clear();
mail_item->meta.mOrigMsgId.clear();
uint32_t token;
publishMsg(token, mail_item) ;
pr.status = GxsTransSendStatus::PENDING_RECEIPT_RECEIVE;
IndicateConfigChanged(); // This causes the saving of the message after pr.status has changed.
break;
}
//case GxsTransSendStatus::PENDING_TRANSFER:
case GxsTransSendStatus::PENDING_RECEIPT_RECEIVE:
{
RS_STACK_MUTEX(mIngoingMutex);
auto range = mIncomingQueue.equal_range(pr.mailItem.mailId);
bool changed = false ;
for( auto it = range.first; it != range.second; ++it)
{
RsGxsTransPresignedReceipt* rt = dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
if(rt && mIdService.isOwnId(rt->meta.mAuthorId))
{
mIncomingQueue.erase(it); delete rt;
pr.status = GxsTransSendStatus::RECEIPT_RECEIVED;
changed = true ;
break;
}
}
if(changed)
IndicateConfigChanged();
// TODO: Resend message if older then treshold
break;
}
case GxsTransSendStatus::RECEIPT_RECEIVED:
break;
processingFailed:
case GxsTransSendStatus::FAILED_RECEIPT_SIGNATURE:
case GxsTransSendStatus::FAILED_ENCRYPTION:
default:
{
std::cout << "p3GxsTrans::processRecord(" << pr.mailItem.mailId
<< ") failed with: " << static_cast<uint>(pr.status)
<< std::endl;
break;
}
}
}
void p3GxsTrans::notifyClientService(const OutgoingRecord& pr)
{
RS_STACK_MUTEX(mServClientsMutex);
auto it = mServClients.find(pr.clientService);
if( it != mServClients.end())
{
GxsTransClient* serv(it->second);
if(serv)
{
serv->notifyGxsTransSendStatus(pr.mailItem.mailId, pr.status);
return;
}
}
std::cerr << "p3GxsTrans::processRecord(...) (EE) processed"
<< " mail for unkown service: "
<< static_cast<uint32_t>(pr.clientService)
<< " fatally failed with: "
<< static_cast<uint32_t>(pr.status) << std::endl;
print_stacktrace();
}
RsSerialiser* p3GxsTrans::setupSerialiser()
{
RsSerialiser* rss = new RsSerialiser;
rss->addSerialType(new RsGxsTransSerializer);
return rss;
}
bool p3GxsTrans::saveList(bool &cleanup, std::list<RsItem *>& saveList)
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
#endif
mOutgoingMutex.lock();
mIngoingMutex.lock();
for ( auto& kv : mOutgoingQueue )
{
#ifdef DEBUG_GXSTRANS
std::cerr << "Saving outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::dec << "Group id: " << kv.second.group_id << ", TS=" << kv.second.sent_ts << std::endl;
#endif
saveList.push_back(&kv.second);
}
for ( auto& kv : mIncomingQueue )
{
#ifdef DEBUG_GXSTRANS
std::cerr << "Saving incoming item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::endl;
#endif
saveList.push_back(kv.second);
}
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
#endif
cleanup = false;
return true;
}
void p3GxsTrans::saveDone()
{
mOutgoingMutex.unlock();
mIngoingMutex.unlock();
}
bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
{
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
<< mIncomingQueue.size() << " " << mOutgoingQueue.size()
<< std::endl;
#endif
for(auto& v : loadList)
switch(static_cast<GxsTransItemsSubtypes>(v->PacketSubType()))
{
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
{
RsGxsTransBaseMsgItem* mi = dynamic_cast<RsGxsTransBaseMsgItem*>(v);
if(mi)
{
RS_STACK_MUTEX(mIngoingMutex);
mIncomingQueue.insert(inMap::value_type(mi->mailId, mi));
}
break;
}
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM_deprecated:
{
OutgoingRecord_deprecated* dot = dynamic_cast<OutgoingRecord_deprecated*>(v);
if(dot)
{
std::cerr << "(EE) Read a deprecated GxsTrans outgoing item. Converting to new format..." << std::endl;
OutgoingRecord ot(dot->recipient,dot->clientService,&dot->mailData[0],dot->mailData.size()) ;
ot.status = dot->status ;
ot.author.clear(); // These 3 fields cannot be stored in mailItem.meta, which is not serialised, so they are lost.
ot.group_id.clear() ;
ot.sent_ts = 0;
ot.mailItem = dot->mailItem ;
ot.presignedReceipt = dot->presignedReceipt;
RS_STACK_MUTEX(mOutgoingMutex);
mOutgoingQueue.insert(prMap::value_type(ot.mailItem.mailId, ot));
#ifdef DEBUG_GXSTRANS
std::cerr << "Loaded outgoing item (converted), ID " << std::hex << std::setfill('0') << std::setw(16) << ot.mailItem.mailId<< std::dec << ", Group id: " << ot.group_id << ", TS=" << ot.sent_ts << std::endl;
#endif
}
delete v;
break;
}
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM:
{
OutgoingRecord* ot = dynamic_cast<OutgoingRecord*>(v);
if(ot)
{
RS_STACK_MUTEX(mOutgoingMutex);
mOutgoingQueue.insert(
prMap::value_type(ot->mailItem.mailId, *ot));
#ifdef DEBUG_GXSTRANS
std::cerr << "Loading outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << ot->mailItem.mailId<< std::dec << "Group id: " << ot->group_id << ", TS=" << ot->sent_ts << std::endl;
#endif
}
delete v;
break;
}
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_GROUP:
default:
std::cerr << "p3GxsTrans::loadList(...) (EE) got item with "
<< "unhandled type: "
<< static_cast<uint>(v->PacketSubType())
<< std::endl;
delete v;
break;
}
#ifdef DEBUG_GXSTRANS
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
<< mIncomingQueue.size() << " " << mOutgoingQueue.size()
<< std::endl;
#endif
return true;
}
bool p3GxsTrans::acceptNewMessage(const RsGxsMsgMetaData *msgMeta,uint32_t msg_size)
{
// 1 - check the total size of the msgs for the author of this particular msg.
// 2 - Reject depending on embedded limits.
// Depending on reputation, the messages will be rejected:
//
// Reputation | Maximum msg count | Maximum msg size
// ------------+----------------------+------------------
// Negative | 0 | 0 // This is already handled by the anti-spam
// R-Negative | 10 | 10k
// Neutral | 100 | 20k
// R-Positive | 400 | 1M
// Positive | 1000 | 2M
// Ideally these values should be left as user-defined parameters, with the
// default values below used as backup.
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT = 10 ;
static const uint32_t GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT = 40 ;
static const uint32_t GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT = 400 ;
static const uint32_t GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT = 1000 ;
static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT = 10 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT = 200 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT = 1024 * 1024 ;
static const uint32_t GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT = 2 * 1024 * 1024 ;
uint32_t max_count = 0 ;
uint32_t max_size = 0 ;
uint32_t identity_flags = 0 ;
RsReputations::ReputationLevel rep_lev = rsReputations->overallReputationLevel(msgMeta->mAuthorId,&identity_flags);
switch(rep_lev)
{
case RsReputations::REPUTATION_REMOTELY_NEGATIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_NEGATIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_REMOTELY_NEGATIVE_DEFAULT;
break ;
case RsReputations::REPUTATION_NEUTRAL: max_count = GXSTRANS_MAX_COUNT_NEUTRAL_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_NEUTRAL_DEFAULT;
break ;
case RsReputations::REPUTATION_REMOTELY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_REMOTELY_POSITIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_REMOTELY_POSITIVE_DEFAULT;
break ;
case RsReputations::REPUTATION_LOCALLY_POSITIVE: max_count = GXSTRANS_MAX_COUNT_LOCALLY_POSITIVE_DEFAULT;
max_size = GXSTRANS_MAX_SIZE_LOCALLY_POSITIVE_DEFAULT;
break ;
default:
case RsReputations::REPUTATION_LOCALLY_NEGATIVE: max_count = 0 ;
max_size = 0 ;
break ;
}
bool pgp_linked = identity_flags & RS_IDENTITY_FLAGS_PGP_LINKED ;
if(rep_lev <= RsReputations::REPUTATION_NEUTRAL && !pgp_linked)
{
max_count /= 10 ;
max_size /= 10 ;
}
RS_STACK_MUTEX(mPerUserStatsMutex);
MsgSizeCount& s(per_user_statistics[msgMeta->mAuthorId]) ;
#ifdef DEBUG_GXSTRANS
std::cerr << "GxsTrans::acceptMessage(): size=" << msg_size << ", grp=" << msgMeta->mGroupId << ", gxs_id=" << msgMeta->mAuthorId << ", pgp_linked=" << pgp_linked << ", current (size,cnt)=("
<< s.size << "," << s.count << ") reputation=" << rep_lev << ", limits=(" << max_size << "," << max_count << ") " ;
#endif
if(s.size + msg_size > max_size || 1+s.count > max_count)
{
#ifdef DEBUG_GXSTRANS
std::cerr << "=> rejected." << std::endl;
#endif
return false ;
}
else
{
#ifdef DEBUG_GXSTRANS
std::cerr << "=> accepted." << std::endl;
#endif
s.count++ ;
s.size += msg_size ; // update the statistics, so that it's not possible to pass a bunch of msgs at once below the limits.
return true ;
}
}