diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index 48c3569bd..b33dcef1d 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1513,7 +1513,7 @@ int RsServer::StartupRetroShare() p3ServiceInfo *serviceInfo = new p3ServiceInfo(serviceCtrl); mDisc = new p3discovery2(mPeerMgr, mLinkMgr, mNetMgr, serviceCtrl); mHeart = new p3heartbeat(serviceCtrl, pqih); - msgSrv = new p3MsgService(serviceCtrl,mGxsIdService); + msgSrv = new p3MsgService( serviceCtrl, mGxsIdService, *mGxsMails ); chatSrv = new p3ChatService(serviceCtrl,mGxsIdService, mLinkMgr, mHistoryMgr); mStatusSrv = new p3StatusService(serviceCtrl); diff --git a/libretroshare/src/services/p3gxsmails.h b/libretroshare/src/services/p3gxsmails.h index dcbed0937..9184f7074 100644 --- a/libretroshare/src/services/p3gxsmails.h +++ b/libretroshare/src/services/p3gxsmails.h @@ -47,7 +47,7 @@ struct p3GxsMails; struct GxsMailsClient { /// Subservices identifiers (like port for TCP) - enum GxsMailSubServices { TEST_SERVICE = 1 }; + enum GxsMailSubServices : uint16_t { TEST_SERVICE = 1, P3_MSG_SERVICE = 2 }; /** * This will be called by p3GxsMails to dispatch mails to the subservice diff --git a/libretroshare/src/services/p3msgservice.cc b/libretroshare/src/services/p3msgservice.cc index f5dbed520..e4b302443 100644 --- a/libretroshare/src/services/p3msgservice.cc +++ b/libretroshare/src/services/p3msgservice.cc @@ -83,23 +83,29 @@ static const uint32_t RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME = 2*30*86400 ; // ke * (3) from storage... */ -p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv) - :p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0) +p3MsgService::p3MsgService( p3ServiceControl *sc, p3IdService *id_serv, + p3GxsMails& gxsMS ) + : p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), + mMsgMtx("p3MsgService"), mMsgUniqueId(0), gxsMailService(gxsMS), + gxsOngoingMutex("p3MsgService Gxs Outgoing Mutex"), + recentlyReceivedMutex("p3MsgService recently received hash mutex") { - _serialiser = new RsMsgSerialiser(); // this serialiser is used for services. It's not the same than the one returned by setupSerialiser(). We need both!! + /* this serialiser is used for services. It's not the same than the one + * returned by setupSerialiser(). We need both!! */ + _serialiser = new RsMsgSerialiser(); addSerialType(_serialiser); - mMsgUniqueId = 1 ; // MsgIds are not transmitted, but only used locally as a storage index. As such, thay do not need to be different - // at friends nodes. - - mShouldEnableDistantMessaging = true ; - mDistantMessagingEnabled = false ; - mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ; + /* MsgIds are not transmitted, but only used locally as a storage index. + * As such, thay do not need to be different at friends nodes. */ + mMsgUniqueId = 1; - /* Initialize standard tag types */ - if(sc) - initStandardTagTypes(); + mShouldEnableDistantMessaging = true; + mDistantMessagingEnabled = false; + mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE; + if(sc) initStandardTagTypes(); // Initialize standard tag types + + gxsMailService.registerGxsMailsClient(GxsMailsClient::P3_MSG_SERVICE, this); } const std::string MSG_APP_NAME = "msg"; @@ -141,11 +147,11 @@ int p3MsgService::tick() if(now > last_management_time + 5) { - manageDistantPeers() ; - checkOutgoingMessages(); - cleanListOfReceivedMessageHashes(); + manageDistantPeers(); + checkOutgoingMessages(); + cleanListOfReceivedMessageHashes(); - last_management_time = now ; + last_management_time = now; } return 0; @@ -153,21 +159,21 @@ int p3MsgService::tick() void p3MsgService::cleanListOfReceivedMessageHashes() { - RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ + RS_STACK_MUTEX(recentlyReceivedMutex); - time_t now = time(NULL) ; - - for(std::map::iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();) - if(now > RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME + it->second) - { - std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(). Removing old hash " << it->first << ", aged " << now - it->second << " secs ago" << std::endl; - std::map::iterator tmp(it) ; - ++tmp ; - mRecentlyReceivedDistantMessageHashes.erase(it) ; - it=tmp ; - } - else - ++it ; + time_t now = time(NULL); + + for( auto it = mRecentlyReceivedMessageHashes.begin(); + it != mRecentlyReceivedMessageHashes.end(); ) + if( now > RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME + it->second ) + { + std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(). " + << "Removing old hash " << it->first << ", aged " + << now - it->second << " secs ago" << std::endl; + + it = mRecentlyReceivedMessageHashes.erase(it); + } + else ++it; } int p3MsgService::status() @@ -348,107 +354,102 @@ void p3MsgService::checkSizeAndSendMessage(RsMsgItem *msg) sendItem(msg) ; } -int p3MsgService::checkOutgoingMessages() +int p3MsgService::checkOutgoingMessages() { - /* iterate through the outgoing queue - * - * if online, send - */ + bool changed = false; + std::list output_queue; - bool changed = false ; - std::list output_queue ; + { + RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ - { - RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ - - const RsPeerId& ownId = mServiceCtrl->getOwnId(); + const RsPeerId& ownId = mServiceCtrl->getOwnId(); - std::list::iterator it; - std::list toErase; + std::list::iterator it; + std::list toErase; - std::map::iterator mit; - for(mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit) - { - if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH) - continue; + std::map::iterator mit; + for( mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit ) + { + if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH) continue; - /* find the certificate */ - RsPeerId pid = mit->second->PeerId(); - bool should_send = false ; + /* find the certificate */ + RsPeerId pid = mit->second->PeerId(); + bool should_send = false; - if( pid == ownId) - should_send = true ; + if( pid == ownId) should_send = true; - if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */ - should_send = true ; + // FEEDBACK Msg to Ourselves + if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, + pid) ) + should_send = true; - if((mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && !(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED)) - should_send = true ; + if( (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && + !(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED)) + should_send = true; - if(should_send) - { - /* send msg */ - pqioutput(PQL_DEBUG_BASIC, msgservicezone, - "p3MsgService::checkOutGoingMessages() Sending out message"); - /* remove the pending flag */ + if(should_send) + { + /* send msg */ + pqioutput( PQL_DEBUG_BASIC, msgservicezone, + "p3MsgService::checkOutGoingMessages() Sending out message"); + /* remove the pending flag */ - output_queue.push_back(mit->second) ; + output_queue.push_back(mit->second) ; - // When the message is a distant msg, dont remove it yet from the list. Only mark it as being sent, so that we don't send it again. - // - if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT)) - { - (mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING; - toErase.push_back(mit->first); - changed = true ; - } - else - { + /* When the message is a distant msg, dont remove it yet from + * the list. Only mark it as being sent, so that we don't send + * it again. */ + if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT)) + { + (mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING; + toErase.push_back(mit->first); + changed = true; + } + else + { #ifdef DEBUG_DISTANT_MSG - std::cerr << "Message id " << mit->first << " is distant: kept in outgoing, and marked as ROUTED" << std::endl; + std::cerr << "Message id " << mit->first << " is distant: " + << "kept in outgoing, and marked as ROUTED" + << std::endl; #endif - mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED ; - } - } - else - { - pqioutput(PQL_DEBUG_BASIC, msgservicezone, - "p3MsgService::checkOutGoingMessages() Delaying until available..."); - } - } + mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED; + } + } + else + { + pqioutput( PQL_DEBUG_BASIC, msgservicezone, + "p3MsgService::checkOutGoingMessages() Delaying until available..."); + } + } - /* clean up */ - for(it = toErase.begin(); it != toErase.end(); ++it) - { - mit = msgOutgoing.find(*it); - if (mit != msgOutgoing.end()) - { - msgOutgoing.erase(mit); - } + /* clean up */ + for(it = toErase.begin(); it != toErase.end(); ++it) + { + mit = msgOutgoing.find(*it); + if ( mit != msgOutgoing.end() ) msgOutgoing.erase(mit); - std::map::iterator srcIt = mSrcIds.find(*it); - if (srcIt != mSrcIds.end()) { - delete (srcIt->second); - mSrcIds.erase(srcIt); - } - } + std::map::iterator srcIt = mSrcIds.find(*it); + if (srcIt != mSrcIds.end()) + { + delete (srcIt->second); + mSrcIds.erase(srcIt); + } + } - if (toErase.size() > 0) - { - IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/ - } - } + if (toErase.size() > 0) IndicateConfigChanged(); + } - for(std::list::const_iterator it(output_queue.begin());it!=output_queue.end();++it) - if((*it)->msgFlags & RS_MSG_FLAGS_DISTANT) // don't split distant messages. The global router takes care of it. - sendDistantMsgItem(*it) ; - else - checkSizeAndSendMessage(*it) ; + for( std::list::const_iterator it(output_queue.begin()); + it != output_queue.end(); ++it ) + if( (*it)->msgFlags & RS_MSG_FLAGS_DISTANT ) // don't split distant messages. The global router takes care of it. + sendDistantMsgItem(*it); + else + checkSizeAndSendMessage(*it); - if(changed) - RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD); + if(changed) + RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD); - return 0; + return 0; } bool p3MsgService::saveList(bool& cleanup, std::list& itemList) @@ -489,10 +490,13 @@ bool p3MsgService::saveList(bool& cleanup, std::list& itemList) grmap->ongoing_msgs = _ongoing_messages ; itemList.push_back(grmap) ; - - RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap ; - ghm->hash_map = mRecentlyReceivedDistantMessageHashes ; - itemList.push_back(ghm) ; + + RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap; + { + RS_STACK_MUTEX(recentlyReceivedMutex); + ghm->hash_map = mRecentlyReceivedMessageHashes; + } + itemList.push_back(ghm); RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ; RsTlvKeyValue kv; @@ -601,18 +605,21 @@ bool p3MsgService::loadList(std::list& load) // merge. for(std::map::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it) _ongoing_messages.insert(*it) ; - } - else if(NULL != (ghm = dynamic_cast(*it))) - { - mRecentlyReceivedDistantMessageHashes = ghm->hash_map ; - + } + else if(NULL != (ghm = dynamic_cast(*it))) + { + { + RS_STACK_MUTEX(recentlyReceivedMutex); + mRecentlyReceivedMessageHashes = ghm->hash_map; + } + #ifdef DEBUG_DISTANT_MSG std::cerr << " loaded recently received message map: " << std::endl; for(std::map::const_iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();++it) std::cerr << " " << it->first << " received " << time(NULL)-it->second << " secs ago." << std::endl; #endif - } + } else if(NULL != (mtt = dynamic_cast(*it))) { // delete standard tags as they are now save in config @@ -1107,16 +1114,17 @@ uint32_t p3MsgService::sendMessage(RsMsgItem *item) // no from field because return item->msgId; } -uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& from) -{ - if(!item) - return 0 ; - item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */ - item->msgFlags |= (RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_OUTGOING | RS_MSG_FLAGS_PENDING); /* add pending flag */ +uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item, const RsGxsId& from) +{ + if(!item) return 0; + + item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */ + item->msgFlags |= ( RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_OUTGOING | + RS_MSG_FLAGS_PENDING ); /* add pending flag */ { - RS_STACK_MUTEX(mMsgMtx) ; + RS_STACK_MUTEX(mMsgMtx); /* STORE MsgID */ msgOutgoing[item->msgId] = item; @@ -1128,17 +1136,16 @@ uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& fro RsMsgSrcId* msi = new RsMsgSrcId(); msi->msgId = item->msgId; - msi->srcId = RsPeerId(from) ; + msi->srcId = RsPeerId(from); mSrcIds.insert(std::pair(msi->msgId, msi)); } } IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/ - RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST, NOTIFY_TYPE_ADD); - + RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST, + NOTIFY_TYPE_ADD ); return item->msgId; - } bool p3MsgService::MessageSend(MessageInfo &info) @@ -1853,73 +1860,89 @@ void p3MsgService::manageDistantPeers() } } -void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id, const RsGxsId &signer_id, uint32_t data_status) +void p3MsgService::notifyDataStatus( const GRouterMsgPropagationId& id, + const RsGxsId &signer_id, + uint32_t data_status ) { - if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) - { - RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ + if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) + { + RS_STACK_MUTEX(mMsgMtx); - std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells us that item ID " << id << " could not be delivered on time." ; - std::map::iterator it = _ongoing_messages.find(id) ; - - if(it == _ongoing_messages.end()) - { - std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl; - return ; - } - uint32_t msg_id = it->second ; - std::cerr << " message id = " << msg_id << std::endl; - mDistantOutgoingMsgSigners[msg_id] = signer_id ; // this is needed because it's not saved in config, but we should probably include it in _ongoing_messages + std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells " + << "us that item ID " << id + << " could not be delivered on time."; - std::map::iterator mit = msgOutgoing.find(msg_id) ; + auto it = _ongoing_messages.find(id); + if(it == _ongoing_messages.end()) + { + std::cerr << " (EE) cannot find pending message to acknowledge. " + << "Weird. grouter id = " << id << std::endl; + return; + } - if(mit == msgOutgoing.end()) - { - std::cerr << " (EE) message has been notified as not delivered, but it not on outgoing list. Something's wrong!!" << std::endl; - return ; - } - std::cerr << " reseting the ROUTED flag so that the message is requested again" << std::endl; + uint32_t msg_id = it->second; + std::cerr << " message id = " << msg_id << std::endl; - mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED ; // clear the routed flag so that the message is requested again - return ; - } - - if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) - { - RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ + /* this is needed because it's not saved in config, but we should + * probably include it in _ongoing_messages */ + mDistantOutgoingMsgSigners[msg_id] = signer_id; + + std::map::iterator mit = msgOutgoing.find(msg_id); + + if(mit == msgOutgoing.end()) + { + std::cerr << " (EE) message has been notified as not delivered, " + << "but it not on outgoing list. Something's wrong!!" + << std::endl; + return; + } + std::cerr << " reseting the ROUTED flag so that the message is " + << "requested again" << std::endl; + + // clear the routed flag so that the message is requested again + mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED; + return; + } + + if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) + { + RS_STACK_MUTEX(mMsgMtx); #ifdef DEBUG_DISTANT_MSG - std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl; + std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl; #endif - std::map::iterator it = _ongoing_messages.find(id) ; + auto it = _ongoing_messages.find(id); + if(it == _ongoing_messages.end()) + { + std::cerr << " (EE) cannot find pending message to acknowledge. " + << "Weird. grouter id = " << id << std::endl; + return; + } - if(it == _ongoing_messages.end()) - { - std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl; - return ; - } + uint32_t msg_id = it->second ; - uint32_t msg_id = it->second ; + // we should now remove the item from the msgOutgoing list. - // we should now remove the item from the msgOutgoing list. + std::map::iterator it2 = msgOutgoing.find(msg_id); + if(it2 == msgOutgoing.end()) + { + std::cerr << "(EE) message has been ACKed, but is not in outgoing " + << "list. Something's wrong!!" << std::endl; + return; + } - std::map::iterator it2 = msgOutgoing.find(msg_id) ; + delete it2->second; + msgOutgoing.erase(it2); - if(it2 == msgOutgoing.end()) - { - std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl; - return ; - } + RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST, + NOTIFY_TYPE_ADD ); + IndicateConfigChanged(); - delete it2->second ; - msgOutgoing.erase(it2) ; - - RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD); - IndicateConfigChanged() ; - - return ; - } - std::cerr << "p3MsgService: unhandled data status info from global router for msg ID " << id << ": this is a bug." << std::endl; + return; + } + std::cerr << "p3MsgService: unhandled data status info from global router" + << " for msg ID " << id << ": this is a bug." << std::endl; } + bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id) { if(mDistantMessagePermissions & RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NON_CONTACTS) @@ -1945,104 +1968,274 @@ uint32_t p3MsgService::getDistantMessagingPermissionFlags() { return mDistantMessagePermissions ; } - -void p3MsgService::receiveGRouterData(const RsGxsId &destination_key, const RsGxsId &signing_key, GRouterServiceId &/*client_id*/, uint8_t *data, uint32_t data_size) + +bool p3MsgService::receiveGxsMail( const RsGxsMailItem& originalMessage, + const uint8_t* data, uint32_t dataSize ) { - std::cerr << "p3MsgService::receiveGRouterData(): received message item of size " << data_size << ", for key " << destination_key << std::endl; + Sha1CheckSum hash = RsDirUtil::sha1sum(data, dataSize); - // first make sure that we havn't already received the data. Since we allow to re-send messages, it's necessary to check. - - Sha1CheckSum hash = RsDirUtil::sha1sum(data,data_size) ; - - if(mRecentlyReceivedDistantMessageHashes.find(hash) != mRecentlyReceivedDistantMessageHashes.end()) - { - std::cerr << "(WW) receiving distant message of hash " << hash << " more than once. This is not a bug, unless it happens very often." << std::endl; + { + RS_STACK_MUTEX(recentlyReceivedMutex); + if( mRecentlyReceivedMessageHashes.find(hash) != + mRecentlyReceivedMessageHashes.end() ) + { + std::cerr << "p3MsgService::receiveGxsMail(...) (WW) receiving " + << "message of hash " << hash << " more than once. This " + << "is not a bug, unless it happens very often." + << std::endl; + return true; + } + mRecentlyReceivedMessageHashes[hash] = time(NULL); + } + + IndicateConfigChanged(); + + RsItem *item = _serialiser->deserialise(const_cast(data), &dataSize); + RsMsgItem *msg_item = dynamic_cast(item); + + if(msg_item) + { + std::cerr << "p3MsgService::receiveGxsMail(...) Encrypted item " + << "correctly deserialised. Passing on to incoming list." + << std::endl; + + msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT; + /* we expect complete msgs - remove partial flag just in case + * someone has funny ideas */ + msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL; + + // hack to pass on GXS id. + msg_item->PeerId(RsPeerId(originalMessage.meta.mAuthorId)); + handleIncomingItem(msg_item); + } + else + std::cerr << "p3MsgService::receiveGxsMail(...) Item could not be " + << "deserialised. Format error??" << std::endl; +} + +bool p3MsgService::notifySendMailStatus( const RsGxsMailItem& originalMessage, + GxsMailStatus status ) +{ + if( status >= GxsMailStatus::FAILED_RECEIPT_SIGNATURE ) + { + uint32_t msg_id; + + { + RS_STACK_MUTEX(gxsOngoingMutex); + + std::cerr << "p3MsgService::notifySendMailStatus(...) mail delivery" + << "mailId: " << originalMessage.mailId + << " failed with " << static_cast(status); + + auto it = gxsOngoingMessages.find(originalMessage.mailId); + if(it == gxsOngoingMessages.end()) + { + std::cerr << " cannot find pending message to notify" + << std::endl; + return false; + } + + msg_id = it->second; + } + + std::cerr << " message id = " << msg_id << std::endl; + + { + RS_STACK_MUTEX(mMsgMtx); + auto mit = msgOutgoing.find(msg_id); + if( mit == msgOutgoing.end() ) + { + std::cerr << " message has been notified as not delivered, " + << "but it not on outgoing list." + << std::endl; + return true; + } + std::cerr << " reseting the ROUTED flag so that the message is " + << "requested again" << std::endl; + + // clear the routed flag so that the message is requested again + mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED; + return true; + } + } + + if( status == GxsMailStatus::RECEIPT_RECEIVED ) + { + uint32_t msg_id; + + { + RS_STACK_MUTEX(gxsOngoingMutex); + + auto it = gxsOngoingMessages.find(originalMessage.mailId); + if(it == gxsOngoingMessages.end()) + { + std::cerr << " (EE) cannot find pending message to acknowledge. " + << "Weird.mailId = " << originalMessage.mailId + << std::endl; + return false; + } + + msg_id = it->second; + } + + // we should now remove the item from the msgOutgoing list. + + { + RS_STACK_MUTEX(mMsgMtx); + + auto it2 = msgOutgoing.find(msg_id); + if(it2 == msgOutgoing.end()) + { + std::cerr << "(EE) message has been ACKed, but is not in " + << "outgoing list. Something's wrong!!" << std::endl; + return true; + } + + delete it2->second; + msgOutgoing.erase(it2); + } + + RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST, + NOTIFY_TYPE_ADD ); + IndicateConfigChanged(); + + return true; + } +} + +void p3MsgService::receiveGRouterData( const RsGxsId &destination_key, + const RsGxsId &signing_key, + GRouterServiceId &/*client_id*/, + uint8_t *data, uint32_t data_size ) +{ + std::cerr << "p3MsgService::receiveGRouterData(): received message item of" + << " size " << data_size << ", for key " << destination_key + << std::endl; + + /* first make sure that we havn't already received the data. Since we allow + * to re-send messages, it's necessary to check. */ + + Sha1CheckSum hash = RsDirUtil::sha1sum(data, data_size); + + { + RS_STACK_MUTEX(recentlyReceivedMutex); + if( mRecentlyReceivedMessageHashes.find(hash) != + mRecentlyReceivedMessageHashes.end() ) + { + std::cerr << "p3MsgService::receiveGRouterData(...) (WW) receiving" + << "distant message of hash " << hash << " more than once" + << ". This is not a bug, unless it happens very often." + << std::endl; + free(data); + return; + } + mRecentlyReceivedMessageHashes[hash] = time(NULL); + } + + IndicateConfigChanged() ; + + RsItem *item = _serialiser->deserialise(data,&data_size) ; free(data) ; - return ; - } - mRecentlyReceivedDistantMessageHashes[hash] = time(NULL) ; - IndicateConfigChanged() ; - - RsItem *item = _serialiser->deserialise(data,&data_size) ; - free(data) ; - RsMsgItem *msg_item = dynamic_cast(item) ; + RsMsgItem *msg_item = dynamic_cast(item) ; - if(msg_item != NULL) - { - std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl; + if(msg_item != NULL) + { + std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl; - msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ; - /* we expect complete msgs - remove partial flag just in case someone has funny ideas */ - msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL; + msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ; + /* we expect complete msgs - remove partial flag just in case someone has funny ideas */ + msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL; - msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id. - handleIncomingItem(msg_item) ; - } - else - std::cerr << " Item could not be deserialised. Format error??" << std::endl; + msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id. + handleIncomingItem(msg_item) ; + } + else + std::cerr << " Item could not be deserialised. Format error??" << std::endl; } void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem) { - RsGxsId destination_key_id(msgitem->PeerId()) ; - RsGxsId signing_key_id ; + RsGxsId destination_key_id(msgitem->PeerId()); + RsGxsId signing_key_id; - msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT ;// just in case, but normally we should always have this flag set, when ending up here. - - { - RS_STACK_MUTEX(mMsgMtx) ; + /* just in case, but normally we should always have this flag set, when + * ending up here. */ + msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT; - std::map::const_iterator it = mDistantOutgoingMsgSigners.find(msgitem->msgId) ; - - if(it == mDistantOutgoingMsgSigners.end()) - { - std::cerr << "(EE) no signer registered for distant message " << msgitem->msgId << ". Cannot send!" << std::endl; - return ; - } - - signing_key_id = it->second ; + { + RS_STACK_MUTEX(mMsgMtx); - if(signing_key_id.isNull()) - { - std::cerr << "ERROR: cannot find signing key id for msg id " << msgitem->msgId << std::endl; - std::cerr << " available keys are:" << std::endl; - for(std::map::const_iterator it(mDistantOutgoingMsgSigners.begin());it!=mDistantOutgoingMsgSigners.end();++it) - std::cerr << " " << it->first << " " << it->second << std::endl; - return ; - } - } + std::map::const_iterator it = + mDistantOutgoingMsgSigners.find(msgitem->msgId); + + if(it == mDistantOutgoingMsgSigners.end()) + { + std::cerr << "(EE) no signer registered for distant message " + << msgitem->msgId << ". Cannot send!" << std::endl; + return; + } + + signing_key_id = it->second; + + if(signing_key_id.isNull()) + { + std::cerr << "ERROR: cannot find signing key id for msg id " + << msgitem->msgId << " available keys are:" << std::endl; + typedef std::map::const_iterator itT; + for( itT it = mDistantOutgoingMsgSigners.begin(); + it != mDistantOutgoingMsgSigners.end(); ++it ) + std::cerr << "\t" << it->first << " " << it->second + << std::endl; + return; + } + } #ifdef DEBUG_DISTANT_MSG - std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item" << std::endl; - std::cerr << " msg ID : " << msgitem->msgId << std::endl; - std::cerr << " to peer : " << destination_key_id << std::endl; - std::cerr << " signing : " << signing_key_id << std::endl; + std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item" + << " msg ID: " << msgitem->msgId << " to peer:" + << destination_key_id << " signing: " << signing_key_id + << std::endl; #endif - // The item is serialized and turned into a generic turtle item. Use use the explicit serialiser to make sure that the msgId is not included + /* The item is serialized and turned into a generic turtle item. Use use the + * explicit serialiser to make sure that the msgId is not included */ - uint32_t msg_serialized_rssize = msgitem->serial_size(false) ; - RsTemporaryMemory msg_serialized_data(msg_serialized_rssize) ; + uint32_t msg_serialized_rssize = msgitem->serial_size(false); + RsTemporaryMemory msg_serialized_data(msg_serialized_rssize); - if(!msgitem->serialise(msg_serialized_data,msg_serialized_rssize,false)) - { - std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error." << std::endl; - return ; - } + if(!msgitem->serialise(msg_serialized_data,msg_serialized_rssize,false)) + { + std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error." + << std::endl; + return; + } #ifdef DEBUG_DISTANT_MSG - std::cerr << " serialised size : " << msg_serialized_rssize << std::endl; + std::cerr << " serialised size : " << msg_serialized_rssize << std::endl; #endif - GRouterMsgPropagationId grouter_message_id ; - mGRouter->sendData(destination_key_id,GROUTER_CLIENT_ID_MESSAGES,msg_serialized_data,msg_serialized_rssize,signing_key_id,grouter_message_id) ; + GRouterMsgPropagationId grouter_message_id; + mGRouter->sendData( destination_key_id, GROUTER_CLIENT_ID_MESSAGES, + msg_serialized_data, msg_serialized_rssize, + signing_key_id, grouter_message_id ); + RsGxsMailId gxsMailId; + gxsMailService.sendMail( gxsMailId, P3_MSG_SERVICE, signing_key_id, + destination_key_id, msg_serialized_data, + msg_serialized_rssize ); - // now store the grouter id along with the message id, so that we can keep track of received messages + /* now store the grouter id along with the message id, so that we can keep + * track of received messages */ - { - RS_STACK_MUTEX(mMsgMtx) ; - _ongoing_messages[grouter_message_id] = msgitem->msgId ; - } - IndicateConfigChanged(); // save _ongoing_messages + { + RS_STACK_MUTEX(mMsgMtx); + _ongoing_messages[grouter_message_id] = msgitem->msgId; + } + + { + RS_STACK_MUTEX(gxsOngoingMutex); + gxsOngoingMessages[gxsMailId] = msgitem->msgId; + } + + IndicateConfigChanged(); // save _ongoing_messages } diff --git a/libretroshare/src/services/p3msgservice.h b/libretroshare/src/services/p3msgservice.h index a36486514..a40dfd704 100644 --- a/libretroshare/src/services/p3msgservice.h +++ b/libretroshare/src/services/p3msgservice.h @@ -49,16 +49,19 @@ #include "grouter/grouterclientservice.h" #include "turtle/p3turtle.h" #include "turtle/turtleclientservice.h" +#include "services/p3gxsmails.h" class p3LinkMgr; class p3IdService; // Temp tweak to test grouter -class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor, public GRouterClientService +struct p3MsgService : + p3Service, p3Config, pqiServiceMonitor, GRouterClientService, + GxsMailsClient { -public: - p3MsgService(p3ServiceControl *sc, p3IdService *id_service); - virtual RsServiceInfo getServiceInfo(); + p3MsgService(p3ServiceControl *sc, p3IdService *id_service, p3GxsMails& gxsMS); + + virtual RsServiceInfo getServiceInfo(); /* External Interface */ bool getMessageSummaries(std::list &msgList); @@ -106,7 +109,9 @@ public: /*** Overloaded from pqiMonitor ***/ virtual void statusChange(const std::list &plist); - int checkOutgoingMessages(); + + /// iterate through the outgoing queue if online, send + int checkOutgoingMessages(); /*** Overloaded from pqiMonitor ***/ /*** overloaded from p3turtle ***/ @@ -130,16 +135,25 @@ public: void setDistantMessagingPermissionFlags(uint32_t flags) ; uint32_t getDistantMessagingPermissionFlags() ; -private: - void sendDistantMsgItem(RsMsgItem *msgitem) ; + /// @see GxsMailsClient::receiveGxsMail(...) + virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage, + const uint8_t* data, uint32_t dataSize ); - // This contains the ongoing tunnel handling contacts. - // The map is indexed by the hash - // - std::map _ongoing_messages ; + /// @see GxsMailsClient::notifySendMailStatus(...) + virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage, + GxsMailStatus status ); + +private: + void sendDistantMsgItem(RsMsgItem *msgitem); + + /** This contains the ongoing tunnel handling contacts. + * The map is indexed by the hash */ + std::map _ongoing_messages; + + std::map gxsOngoingMessages; + RsMutex gxsOngoingMutex; // Overloaded from GRouterClientService - virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ; virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ; virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,const RsGxsId& signer_id,uint32_t data_status) ; @@ -194,8 +208,9 @@ private: std::map mTags; std::map mMsgTags; - uint32_t mMsgUniqueId; - std::map mRecentlyReceivedDistantMessageHashes; + uint32_t mMsgUniqueId; + std::map mRecentlyReceivedMessageHashes; + RsMutex recentlyReceivedMutex; // used delete msgSrcIds after config save std::map mSrcIds; @@ -211,6 +226,8 @@ private: bool mDistantMessagingEnabled ; uint32_t mDistantMessagePermissions ; bool mShouldEnableDistantMessaging ; + + p3GxsMails& gxsMailService; }; #endif // MESSAGE_SERVICE_HEADER