#pragma once /* * GXS Mailing Service * Copyright (C) 2016-2017 Gioacchino Mazzurco * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include #include #include #include "retroshare/rsgxsifacetypes.h" // For RsGxsId, RsGxsCircleId #include "gxs/gxstokenqueue.h" // For GxsTokenQueue #include "serialiser/rsgxsmailitems.h" // For RS_SERVICE_TYPE_GXS_MAIL #include "services/p3idservice.h" // For p3IdService #include "util/rsthreads.h" enum class GxsMailStatus { PENDING_PROCESSING = 0, PENDING_PREFERRED_GROUP, PENDING_RECEIPT_CREATE, PENDING_RECEIPT_SIGNATURE, PENDING_SERIALIZATION, PENDING_PAYLOAD_CREATE, PENDING_PAYLOAD_ENCRYPT, PENDING_PUBLISH, //PENDING_TRANSFER, /// This will be useful so the user can know if the mail reached some friend node, in case of internet connection interruption PENDING_RECEIPT_RECEIVE, /// Records with status >= RECEIPT_RECEIVED get deleted RECEIPT_RECEIVED, FAILED_RECEIPT_SIGNATURE = 240, FAILED_ENCRYPTION }; struct p3GxsMails; struct GxsMailsClient { /// Subservices identifiers (like port for TCP) enum GxsMailSubServices : uint16_t { TEST_SERVICE = 1, P3_MSG_SERVICE = 2 }; /** * This will be called by p3GxsMails to dispatch mails to the subservice * @param originalMessage message as received from GXS backend (encrypted) * @param data buffer containing the decrypted data * @param dataSize size of the buffer * @return true if dispatching goes fine, false otherwise */ virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage, const uint8_t* data, uint32_t dataSize ) = 0; virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage, GxsMailStatus status ) = 0; }; struct p3GxsMails : RsGenExchange, GxsTokenQueue // TODO: p3Config { p3GxsMails( RsGeneralDataService* gds, RsNetworkExchangeService* nes, p3IdService& identities ) : RsGenExchange( gds, nes, new RsGxsMailSerializer(), RS_SERVICE_TYPE_GXS_MAIL, &identities, AuthenPolicy(), GXS_STORAGE_PERIOD ), GxsTokenQueue(this), idService(identities), servClientsMutex("p3GxsMails client services map mutex"), outgoingMutex("p3GxsMails outgoing queue map mutex"), ingoingMutex("p3GxsMails ingoing queue map mutex") {} /** * Send an email to recipient, in the process author of the email is * disclosed to the network (because the sent GXS item is signed), while * recipient is not @see RsGxsMailBaseItem::recipientsHint for details on * recipient protection. * This method is part of the public interface of this service. * @return true if the mail will be sent, false if not */ bool sendMail( RsGxsMailId& mailId, GxsMailsClient::GxsMailSubServices service, const RsGxsId& own_gxsid, const RsGxsId& recipient, const uint8_t* data, uint32_t size, RsGxsMailEncryptionMode cm = RsGxsMailEncryptionMode::RSA ); /** * This method is part of the public interface of this service. * @return false if mail is not found in outgoing queue, true otherwise */ bool querySendMailStatus( RsGxsMailId mailId, GxsMailStatus& st ); /** * Register a client service to p3GxsMails to receive mails via * GxsMailsClient::receiveGxsMail(...) callback * This method is part of the public interface of this service. */ void registerGxsMailsClient( GxsMailsClient::GxsMailSubServices serviceType, GxsMailsClient* service ); /// @see RsGenExchange::getServiceInfo() virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_MAIL, "GXS Mails", 0, 1, 0, 1 ); } private: /// @see GxsTokenQueue::handleResponse(uint32_t token, uint32_t req_type) virtual void handleResponse(uint32_t token, uint32_t req_type); /// @see RsGenExchange::service_tick() virtual void service_tick(); /// @see RsGenExchange::service_CreateGroup(...) RsGenExchange::ServiceCreate_Return service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet&); /// @see RsGenExchange::notifyChanges(std::vector &changes) void notifyChanges(std::vector &changes); /** Time interval of inactivity before a distribution group is unsubscribed. * Approximatively 3 months seems ok ATM. */ const static int32_t UNUSED_GROUP_UNSUBSCRIBE_INTERVAL = 0x76A700; /** * This should be as little as possible as the size of the database can grow * very fast taking in account we are handling mails for the whole network. * We do prefer to resend a not acknowledged yet mail after * GXS_STORAGE_PERIOD has passed and keep it little. * Tought it can't be too little as this may cause signed receipts to * get lost thus causing resend and fastly grow perceived async latency, in * case two sporadically connected users sends mails each other. * TODO: While it is ok for signed acknowledged to stays in the DB for a * full GXS_STORAGE_PERIOD, mails should be removed as soon as a valid * signed acknowledged is received for each of them. * Two weeks seems fair ATM. */ const static uint32_t GXS_STORAGE_PERIOD = 0x127500; /// Define how the backend should handle authentication based on signatures static uint32_t AuthenPolicy(); /// Types to mark queries in tokens queue enum GxsReqResTypes { GROUPS_LIST = 1, GROUP_CREATE = 2, MAILS_UPDATE = 3 }; /// Store the id of the preferred GXS group to send emails RsGxsGroupId preferredGroupId; /// Used for items {de,en}cryption p3IdService& idService; /// Stores pointers to client services to notify them about new mails std::map servClients; RsMutex servClientsMutex; struct OutgoingRecord { OutgoingRecord( RsGxsId rec, GxsMailsClient::GxsMailSubServices cs, const uint8_t* data, uint32_t size ) : status(GxsMailStatus::PENDING_PROCESSING), recipient(rec), clientService(cs) { mailData.resize(size); memcpy(&mailData[0], data, size); } GxsMailStatus status; RsGxsId recipient; RsGxsMailItem mailItem; /// Don't use a pointer would be invalid after publish std::vector mailData; GxsMailsClient::GxsMailSubServices clientService; RsNxsMailPresignedReceipt presignedReceipt; }; /// Keep track of mails while being processed typedef std::map prMap; prMap outgoingQueue; RsMutex outgoingMutex; void processOutgoingRecord(OutgoingRecord& r); typedef std::unordered_multimap inMap; inMap ingoingQueue; RsMutex ingoingMutex; /// Request groups list to GXS backend. Async method. bool requestGroupsData(const std::list* groupIds = NULL); /** * Check if current preferredGroupId is the best against potentialGrId, if * the passed one is better update it. * Useful when GXS backend notifies groups changes, or when a reponse to an * async grop request (@see GXS_REQUEST_TYPE_GROUP_*) is received. * @return true if preferredGroupId has been supeseded by potentialGrId * false otherwise. */ bool inline supersedePreferredGroup(const RsGxsGroupId& potentialGrId) { if(preferredGroupId < potentialGrId) { std::cerr << "supersedePreferredGroup(...) " << potentialGrId << " supersed " << preferredGroupId << std::endl; preferredGroupId = potentialGrId; return true; } return false; } /** @return true if has passed more then interval seconds between timeStamp * and ref. @param ref by default now is taked as reference. */ bool static inline olderThen(time_t timeStamp, int32_t interval, time_t ref = time(NULL)) { return (timeStamp + interval) < ref; } /// Decrypt email content and pass it to dispatchDecryptedMail(...) bool handleEcryptedMail(const RsGxsMailItem* mail); /// Dispatch the message to the recipient service bool dispatchDecryptedMail( const RsGxsMailItem* received_msg, const uint8_t* decrypted_data, uint32_t decrypted_data_size ); void notifyClientService(const OutgoingRecord& pr); }; struct TestGxsMailClientService : GxsMailsClient, RsSingleJobThread { TestGxsMailClientService( p3GxsMails& gxsMailService, p3IdService& gxsIdService ) : mailService(gxsMailService), idService(gxsIdService) { mailService.registerGxsMailsClient( GxsMailSubServices::TEST_SERVICE, this ); } /// @see GxsMailsClient::receiveGxsMail(...) virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage, const uint8_t* data, uint32_t dataSize ) { std::cout << "TestGxsMailClientService::receiveGxsMail(...) got message" << " from: " << originalMessage.meta.mAuthorId << std::endl << "\t>" << std::string((char*)data, dataSize) << "<" << std::endl; return true; } /// @see GxsMailsClient::notifyMailStatus(...) virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage, GxsMailStatus status ) { std::cout << "TestGxsMailClientService::notifyMailsStatus(...) for: " << originalMessage.mailId << " status: " << static_cast(status) << std::endl; if( status == GxsMailStatus::RECEIPT_RECEIVED ) std::cout << "\t It mean Receipt has been Received!" << std::endl; return true; } /// @see RsSingleJobThread::run() virtual void run() { usleep(10*1000*1000); RsGxsId gxsidA("d0df7474bdde0464679e6ef787890287"); RsGxsId gxsidB("d060bea09dfa14883b5e6e517eb580cd"); RsGxsMailId mailId = 0; if(idService.isOwnId(gxsidA)) { std::string ciao("CiAone!"); mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidA, gxsidB, reinterpret_cast(ciao.data()), ciao.size() ); } else if(idService.isOwnId(gxsidB)) { std::string ciao("CiBuono!"); mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidB, gxsidA, reinterpret_cast(ciao.data()), ciao.size() ); } } private: p3GxsMails& mailService; p3IdService& idService; };