diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index ea1b1b33f..48c3569bd 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1500,7 +1500,8 @@ int RsServer::StartupRetroShare() pqih->addService(gxsmails_ns, true); mConfigMgr->addConfiguration("gxs_mail.cfg", gxsmails_ns); - new TestGxsMailClientService(*mGxsMails); + TestGxsMailClientService* tgms = + new TestGxsMailClientService(*mGxsMails, *mGxsIdService); # endif // RS_GXS_MAIL // remove pword from memory @@ -1827,6 +1828,7 @@ int RsServer::StartupRetroShare() # ifdef RS_GXS_MAIL startServiceThread(mGxsMails, "gxs mail"); startServiceThread(gxsmails_ns, "gxs mail ns"); + tgms->start("Gxs Mail Test Service"); # endif // RS_GXS_MAIL #endif // RS_ENABLE_GXS diff --git a/libretroshare/src/serialiser/rsgxsmailitems.cc b/libretroshare/src/serialiser/rsgxsmailitems.cc index 38cd90140..f1a659d43 100644 --- a/libretroshare/src/serialiser/rsgxsmailitems.cc +++ b/libretroshare/src/serialiser/rsgxsmailitems.cc @@ -18,7 +18,7 @@ #include "serialiser/rsgxsmailitems.h" -const RsGxsId RsGxsMailBaseItem::allRecipientsHint("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"); +const RsGxsId RsGxsMailItem::allRecipientsHint("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"); bool RsGxsMailBaseItem::serialize(uint8_t* data, uint32_t size, @@ -26,9 +26,7 @@ bool RsGxsMailBaseItem::serialize(uint8_t* data, uint32_t size, { bool ok = setRsItemHeader(data, size, PacketId(), size); ok = ok && (offset += 8); // Take header in account - ok = ok && setRawUInt8(data, size, &offset, cryptoType); - ok = ok && recipientsHint.serialise(data, size, offset); - ok = ok && setRawUInt64(data, size, &offset, receiptId); + ok = ok && setRawUInt64(data, size, &offset, mailId); return ok; } @@ -39,18 +37,14 @@ bool RsGxsMailBaseItem::deserialize(const uint8_t* data, uint32_t& size, uint32_t rssize = getRsItemSize(dataPtr); uint32_t roffset = offset + 8; // Take header in account bool ok = rssize <= size; - uint8_t crType; - ok = ok && getRawUInt8(dataPtr, rssize, &roffset, &crType); - cryptoType = static_cast(crType); - ok = ok && recipientsHint.deserialise(dataPtr, rssize, roffset); - ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &receiptId); + ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &mailId); if(ok) { size = rssize; offset = roffset; } else size = 0; return ok; } -std::ostream&RsGxsMailBaseItem::print(std::ostream& out, uint16_t) -{ return out; } +std::ostream& RsGxsMailBaseItem::print(std::ostream &out, uint16_t) +{ return out << " RsGxsMailBaseItem::mailId: " << mailId; } bool RsGxsMailSerializer::serialise(RsItem* item, void* data, uint32_t* size) { diff --git a/libretroshare/src/serialiser/rsgxsmailitems.h b/libretroshare/src/serialiser/rsgxsmailitems.h index f215d10a2..4ec0094df 100644 --- a/libretroshare/src/serialiser/rsgxsmailitems.h +++ b/libretroshare/src/serialiser/rsgxsmailitems.h @@ -34,65 +34,55 @@ enum GxsMailItemsSubtypes GXS_MAIL_SUBTYPE_GROUP = 3 }; +typedef uint64_t RsGxsMailId; + struct RsNxsMailPresignedReceipt : RsNxsMsg { RsNxsMailPresignedReceipt() : RsNxsMsg(RS_SERVICE_TYPE_GXS_MAIL) {} }; -struct RsGxsMailPresignedReceipt : RsGxsMsgItem -{ - RsGxsMailPresignedReceipt() : - RsGxsMsgItem( RS_SERVICE_TYPE_GXS_MAIL, - static_cast(GXS_MAIL_SUBTYPE_RECEIPT) ), - receiptId(0) {} - - uint64_t receiptId; - - static uint32_t inline size() - { - return 8 + // Header - 8; // receiptId - } - bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const - { - bool ok = setRsItemHeader(data, size, PacketId(), size); - ok = ok && (offset += 8); // Take header in account - ok = ok && setRawUInt64(data, size, &offset, receiptId); - return ok; - } - bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset) - { - void* dataPtr = reinterpret_cast(const_cast(data)); - uint32_t rssize = getRsItemSize(dataPtr); - uint32_t roffset = offset + 8; // Take header in account - bool ok = rssize <= size; - ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &receiptId); - if(ok) { size = rssize; offset = roffset; } - else size = 0; - return ok; - } - - void clear() { receiptId = 0; } - std::ostream &print(std::ostream &out, uint16_t /*indent = 0*/) - { return out << receiptId; } -}; - - struct RsGxsMailBaseItem : RsGxsMsgItem { RsGxsMailBaseItem(GxsMailItemsSubtypes subtype) : RsGxsMsgItem( RS_SERVICE_TYPE_GXS_MAIL, - static_cast(subtype) ), - cryptoType(UNDEFINED_ENCRYPTION), receiptId(0) {} + static_cast(subtype) ), mailId(0) {} - /// Values must fit into uint8_t - enum EncryptionMode + RsGxsMailId mailId; + + void inline clear() { - CLEAR_TEXT = 1, - RSA = 2, - UNDEFINED_ENCRYPTION = 250 - }; - EncryptionMode cryptoType; + mailId = 0; + meta = RsMsgMetaData(); + } + + static uint32_t inline size() + { + return 8 + // Header + 8; // mailId + } + bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const; + bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset); + std::ostream &print(std::ostream &out, uint16_t /*indent = 0*/); +}; + +struct RsGxsMailPresignedReceipt : RsGxsMailBaseItem +{ + RsGxsMailPresignedReceipt() : RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_RECEIPT) {} +}; + +enum class RsGxsMailEncryptionMode : uint8_t +{ + CLEAR_TEXT = 1, + RSA = 2, + UNDEFINED_ENCRYPTION = 250 +}; + +struct RsGxsMailItem : RsGxsMailBaseItem +{ + RsGxsMailItem() : RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_MAIL), + cryptoType(RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION) {} + + RsGxsMailEncryptionMode cryptoType; /** * @brief recipientsHint used instead of plain recipient id, so sender can @@ -130,65 +120,37 @@ struct RsGxsMailBaseItem : RsGxsMsgItem * corresponding hint may be fruit of a "luky" salting of another id. */ RsGxsId recipientsHint; - - void static inline saltRecipientHint(RsGxsId& hint, const RsGxsId& salt) - { hint = hint | salt; } void inline saltRecipientHint(const RsGxsId& salt) - { saltRecipientHint(recipientsHint, salt); } + { recipientsHint = recipientsHint | salt; } /** * @brief maybeRecipient given an id and an hint check if they match * @see recipientHint - * @note this is not the final implementation as id and hint are not 32bit - * integers it is just to not forget how to verify the hint/id matching - * fastly with boolean ops * @return true if the id may be recipient of the hint, false otherwise */ - bool static inline maybeRecipient(const RsGxsId& hint, const RsGxsId& id) - { return (~id|hint) == allRecipientsHint; } bool inline maybeRecipient(const RsGxsId& id) const - { return maybeRecipient(recipientsHint, id); } + { return (~id|recipientsHint) == allRecipientsHint; } const static RsGxsId allRecipientsHint; - uint64_t receiptId; - - void inline clear() - { - cryptoType = UNDEFINED_ENCRYPTION; - recipientsHint.clear(); - receiptId = 0; - meta = RsMsgMetaData(); - } - - static uint32_t inline size() - { - return 8 + // Header - 1 + // cryptoType - RsGxsId::serial_size() + // recipientsHint - 8; // receiptId - } - bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const; - bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset); - std::ostream &print(std::ostream &out, uint16_t /*indent = 0*/); -}; - -struct RsGxsMailItem : RsGxsMailBaseItem -{ - RsGxsMailItem(GxsMailItemsSubtypes subtype) : - RsGxsMailBaseItem(subtype) {} - RsGxsMailItem() : - RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_MAIL) {} - /** This should travel encrypted, unless EncryptionMode::CLEAR_TEXT * is specified */ std::vector payload; - uint32_t size() const { return RsGxsMailBaseItem::size() + payload.size(); } + uint32_t size() const + { + return RsGxsMailBaseItem::size() + + 1 + // cryptoType + recipientsHint.serial_size() + + payload.size(); + } bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const { bool ok = size < MAX_SIZE; ok = ok && RsGxsMailBaseItem::serialize(data, size, offset); + ok = ok && setRawUInt8( data, size, &offset, + static_cast(cryptoType) ); + ok = ok && recipientsHint.serialise(data, size, offset); uint32_t psz = payload.size(); ok = ok && memcpy(data+offset, &payload[0], psz); offset += psz; @@ -196,13 +158,28 @@ struct RsGxsMailItem : RsGxsMailBaseItem } bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset) { - uint32_t bsz = RsGxsMailBaseItem::size(); - uint32_t psz = size - bsz; - return size < MAX_SIZE && size >= bsz - && RsGxsMailBaseItem::deserialize(data, size, offset) - && (payload.resize(psz), memcpy(&payload[0], data+offset, psz)); + void* dataPtr = reinterpret_cast(const_cast(data)); + uint32_t rssize = getRsItemSize(dataPtr); + uint32_t roffset = offset; + bool ok = rssize <= size && size < MAX_SIZE; + ok = ok && RsGxsMailBaseItem::deserialize(data, rssize, roffset); + uint8_t crType; + ok = ok && getRawUInt8(dataPtr, rssize, &roffset, &crType); + cryptoType = static_cast(crType); + ok = ok && recipientsHint.deserialise(dataPtr, rssize, roffset); + uint32_t psz = rssize - roffset; + ok = ok && (payload.resize(psz), memcpy(&payload[0], data+roffset, psz)); + if(ok) { size = rssize; offset = roffset; } + else size = 0; + return ok; + } + void clear() + { + RsGxsMailBaseItem::clear(); + cryptoType = RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION; + recipientsHint.clear(); + payload.clear(); } - void clear() { RsGxsMailBaseItem::clear(); payload.clear(); } /// Maximum mail size in bytes 10 MiB is more than anything sane can need const static uint32_t MAX_SIZE = 10*8*1024*1024; diff --git a/libretroshare/src/services/p3gxsmails.cpp b/libretroshare/src/services/p3gxsmails.cpp index 6db2d2c6c..ae7f3755d 100644 --- a/libretroshare/src/services/p3gxsmails.cpp +++ b/libretroshare/src/services/p3gxsmails.cpp @@ -20,21 +20,14 @@ #include "util/stacktrace.h" -bool p3GxsMails::sendMail( GxsMailsClient::GxsMailSubServices service, +bool p3GxsMails::sendMail( RsGxsMailId& mailId, + GxsMailsClient::GxsMailSubServices service, const RsGxsId& own_gxsid, const RsGxsId& recipient, const uint8_t* data, uint32_t size, - RsGxsMailBaseItem::EncryptionMode cm) + RsGxsMailEncryptionMode cm ) { std::cout << "p3GxsMails::sendEmail(...)" << std::endl; - if(preferredGroupId.isNull()) - { - requestGroupsData(); - std::cerr << "p3GxsMails::sendEmail(...) preferredGroupId.isNull()!" - << std::endl; - return false; - } - if(!idService.isOwnId(own_gxsid)) { std::cerr << "p3GxsMails::sendEmail(...) isOwnId(own_gxsid) false!" @@ -50,78 +43,31 @@ bool p3GxsMails::sendMail( GxsMailsClient::GxsMailSubServices service, return false; } - RsGxsMailItem* item = new RsGxsMailItem(); + OutgoingRecord pr( recipient, service, data, size ); + pr.mailItem.meta.mAuthorId = own_gxsid; + pr.mailItem.cryptoType = cm; + pr.mailItem.mailId = RSRandom::random_u64(); - // Public metadata - item->meta.mAuthorId = own_gxsid; - item->meta.mGroupId = preferredGroupId; - item->cryptoType = cm; - item->saltRecipientHint(recipient); - item->saltRecipientHint(RsGxsId::random()); - item->receiptId = RSRandom::random_u64(); - - RsNxsMailPresignedReceipt nrcpt; - preparePresignedReceipt(*item, nrcpt); - - uint16_t serv = static_cast(service); - uint32_t rcptsize = nrcpt.serial_size(); - item->payload.resize(2 + rcptsize + size); - uint32_t offset = 0; - setRawUInt16(&item->payload[0], 2, &offset, serv); - nrcpt.serialise(&item->payload[offset], rcptsize); offset += rcptsize; - memcpy(&item->payload[offset], data, size); //offset += size; - - std::cout << "p3GxsMails::sendMail(...) receipt size: " << rcptsize << std::endl; - - switch (cm) { - case RsGxsMailBaseItem::CLEAR_TEXT: - { - std::cerr << "p3GxsMails::sendMail(...) you are sending a mail without" - << " encryption, everyone can read it!" << std::endl; - print_stacktrace(); - break; - } - case RsGxsMailBaseItem::RSA: - { - uint8_t* encryptedData = NULL; - uint32_t encryptedSize = 0; - uint32_t encryptError = 0; - if( idService.encryptData( &item->payload[0], item->payload.size(), - encryptedData, encryptedSize, - recipient, encryptError, true ) ) - { - item->payload.resize(encryptedSize); - memcpy(&item->payload[0], encryptedData, encryptedSize); - free(encryptedData); - break; - } - else - { - std::cerr << "p3GxsMails::sendMail(...) RSA encryption failed! " - << "error_status: " << encryptError << std::endl; - print_stacktrace(); - return false; - } - } - case RsGxsMailBaseItem::UNDEFINED_ENCRYPTION: - default: - std::cerr << "p3GxsMails::sendMail(...) attempt to send mail with wrong" - << " EncryptionMode " << cm << " dropping mail!" << std::endl; - print_stacktrace(); - return false; + RS_STACK_MUTEX(outgoingMutex); + outgoingQueue.insert(prMap::value_type(pr.mailItem.mailId, pr)); } - uint32_t token; - std::cout << "p3GxsMails::sendEmail(...) sending mail to: "<< recipient - << " with cryptoType: " << item->cryptoType - << " recipientHint: " << item->recipientsHint - << " receiptId: " << item->receiptId - << " payload size: " << item->payload.size() << std::endl; - publishMsg(token, item); + mailId = pr.mailItem.mailId; return true; } +bool p3GxsMails::querySendMailStatus(RsGxsMailId mailId, GxsMailStatus& st) +{ + auto it = outgoingQueue.find(mailId); + if( it != outgoingQueue.end() ) + { + st = it->second.status; + return true; + } + return false; +} + void p3GxsMails::registerGxsMailsClient( GxsMailsClient::GxsMailSubServices serviceType, GxsMailsClient* service) { @@ -132,7 +78,8 @@ void p3GxsMails::registerGxsMailsClient( void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type) { - //std::cout << "p3GxsMails::handleResponse(" << token << ", " << req_type << ")" << std::endl; + std::cout << "p3GxsMails::handleResponse(" << token << ", " << req_type + << ")" << std::endl; switch (req_type) { case GROUPS_LIST: @@ -202,66 +149,34 @@ void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type) vT& mv(gIt->second); for( vT::const_iterator mIt = mv.begin(); mIt != mv.end(); ++mIt ) { - RsGxsMsgItem* gItem = *mIt; - switch(gItem->PacketSubType()) + RsGxsMsgItem* gIt = *mIt; + switch(gIt->PacketSubType()) { case GXS_MAIL_SUBTYPE_MAIL: + case GXS_MAIL_SUBTYPE_RECEIPT: { - RsGxsMailItem* msg = dynamic_cast(gItem); - if(!msg) + RsGxsMailBaseItem* mb = + dynamic_cast(*mIt); + if(mb) { + RS_STACK_MUTEX(ingoingMutex); + ingoingQueue.insert(inMap::value_type(mb->mailId, mb)); + } + else std::cerr << "p3GxsMails::handleResponse(...) " << "GXS_MAIL_SUBTYPE_MAIL cast error, " << "something really wrong is happening" << std::endl; - break; - } - - std::cout << "p3GxsMails::handleResponse(...) MAILS_UPDATE " - << "GXS_MAIL_SUBTYPE_MAIL handling: " - << msg->meta.mMsgId - << " with cryptoType: "<< msg->cryptoType - << " recipientHint: " << msg->recipientsHint - << " receiptId: "<< msg->receiptId - << " payload.size(): " << msg->payload.size() - << std::endl; - - handleEcryptedMail(msg); - break; - } - case GXS_MAIL_SUBTYPE_RECEIPT: - { - RsGxsMailPresignedReceipt* msg = - dynamic_cast(gItem); - if(!msg) - { - std::cerr << "p3GxsMails::handleResponse(...) " - << "GXS_MAIL_SUBTYPE_RECEIPT cast error, " - << "something really wrong is happening" - << std::endl; - break; - } - - std::cout << "p3GxsMails::handleResponse(...) MAILS_UPDATE " - << "GXS_MAIL_SUBTYPE_RECEIPT handling: " - << msg->meta.mMsgId - << "with receiptId: "<< msg->receiptId - << std::endl; - - /* TODO: Notify client services if the original mail was - * sent from this node and mark for deletion, otherwise - * just mark original mail for deletion. */ - break; } default: std::cerr << "p3GxsMails::handleResponse(...) MAILS_UPDATE " << "Unknown mail subtype : " - << static_cast(gItem->PacketSubType()) + << static_cast(gIt->PacketSubType()) << std::endl; + delete gIt; break; } - delete gItem; } } break; @@ -275,32 +190,52 @@ void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type) void p3GxsMails::service_tick() { - static int tc = 0; - ++tc; + GxsTokenQueue::checkRequests(); - if(((tc % 1000) == 0) || (tc == 50)) requestGroupsData(); - - if(tc == 500) { - RsGxsId gxsidA("d0df7474bdde0464679e6ef787890287"); - RsGxsId gxsidB("d060bea09dfa14883b5e6e517eb580cd"); - if(idService.isOwnId(gxsidA)) + RS_STACK_MUTEX(outgoingMutex); + for ( auto it = outgoingQueue.begin(); it != outgoingQueue.end(); ) { - std::string ciao("CiAone!"); - sendMail( GxsMailsClient::TEST_SERVICE, gxsidA, gxsidB, - reinterpret_cast(ciao.data()), - ciao.size(), RsGxsMailBaseItem::RSA ); + OutgoingRecord& pr(it->second); + GxsMailStatus oldStatus = pr.status; + processOutgoingRecord(pr); + if (oldStatus != pr.status) notifyClientService(pr); + if( pr.status >= GxsMailStatus::RECEIPT_RECEIVED ) + it = outgoingQueue.erase(it); + else ++it; } -// else if(idService.isOwnId(gxsidB)) -// { -// std::string ciao("CiBuono!"); -// sendMail( GxsMailsClient::TEST_SERVICE, gxsidB, gxsidA, -// reinterpret_cast(ciao.data()), -// ciao.size(), RsGxsMailBaseItem::RSA ); -// } } - GxsTokenQueue::checkRequests(); + + { + RS_STACK_MUTEX(ingoingMutex); + for( auto it = ingoingQueue.begin(); it != ingoingQueue.end(); ) + { + if( it->second->PacketSubType() != GXS_MAIL_SUBTYPE_MAIL ) + { ++it; continue; } + + RsGxsMailItem* msg = dynamic_cast(it->second); + if(!msg) + { + std::cout << "p3GxsMails::service_tick() GXS_MAIL_SUBTYPE_MAIL" + << "dynamic_cast failed, something really wrong is " + << "happening!" << std::endl; + ++it; continue; + } + + std::cout << "p3GxsMails::service_tick() GXS_MAIL_SUBTYPE_MAIL " + << "handling: " << msg->meta.mMsgId + << " with cryptoType: " + << static_cast(msg->cryptoType) + << " recipientHint: " << msg->recipientsHint + << " mailId: "<< msg->mailId + << " payload.size(): " << msg->payload.size() + << std::endl; + + handleEcryptedMail(msg); + it = ingoingQueue.erase(it); delete msg; + } + } } RsGenExchange::ServiceCreate_Return p3GxsMails::service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/) @@ -409,7 +344,7 @@ bool p3GxsMails::handleEcryptedMail(const RsGxsMailItem* mail) switch (mail->cryptoType) { - case RsGxsMailBaseItem::CLEAR_TEXT: + case RsGxsMailEncryptionMode::CLEAR_TEXT: { uint16_t csri = 0; uint32_t off = 0; @@ -419,7 +354,7 @@ bool p3GxsMails::handleEcryptedMail(const RsGxsMailItem* mail) return dispatchDecryptedMail( mail, &mail->payload[0], mail->payload.size() ); } - case RsGxsMailBaseItem::RSA: + case RsGxsMailEncryptionMode::RSA: { bool ok = true; for( std::set::const_iterator it = decryptIds.begin(); @@ -440,7 +375,7 @@ bool p3GxsMails::handleEcryptedMail(const RsGxsMailItem* mail) } default: std::cout << "Unknown encryption type:" - << mail->cryptoType << std::endl; + << static_cast(mail->cryptoType) << std::endl; return false; } } @@ -487,7 +422,7 @@ bool p3GxsMails::dispatchDecryptedMail( const RsGxsMailItem* received_msg, } if(reecipientService) - return reecipientService->receiveGxsMail( received_msg, + return reecipientService->receiveGxsMail( *received_msg, &decrypted_data[offset], decrypted_data_size-offset ); else @@ -499,39 +434,189 @@ bool p3GxsMails::dispatchDecryptedMail( const RsGxsMailItem* received_msg, } } -bool p3GxsMails::preparePresignedReceipt(const RsGxsMailItem& mail, RsNxsMailPresignedReceipt& receipt) +void p3GxsMails::processOutgoingRecord(OutgoingRecord& pr) { - RsGxsMailPresignedReceipt grcpt; - grcpt.meta = mail.meta; - grcpt.meta.mPublishTs = time(NULL); - grcpt.receiptId = mail.receiptId; - uint32_t groff = 0, grsz = grcpt.size(); - std::vector grsrz; - grsrz.resize(grsz); - grcpt.serialize(&grsrz[0], grsz, groff); - receipt.msg.setBinData(&grsrz[0], grsz); + //std::cout << "p3GxsMails::processRecord(...)" << std::endl; - receipt.grpId = preferredGroupId; - receipt.metaData = new RsGxsMsgMetaData(); - *receipt.metaData = grcpt.meta; - - if(createMessage(&receipt) != CREATE_SUCCESS) + switch (pr.status) { - std::cout << "p3GxsMails::preparePresignedReceipt(...) receipt creation" - << " failed!" << std::endl; - return false; + case GxsMailStatus::PENDING_PROCESSING: + { + pr.mailItem.saltRecipientHint(pr.recipient); + pr.mailItem.saltRecipientHint(RsGxsId::random()); } + case GxsMailStatus::PENDING_PREFERRED_GROUP: + { + if(preferredGroupId.isNull()) + { + requestGroupsData(); + pr.status = GxsMailStatus::PENDING_PREFERRED_GROUP; + break; + } - uint32_t metaSize = receipt.metaData->serial_size(); - std::vector srx; srx.resize(metaSize); - receipt.metaData->serialise(&srx[0], &metaSize); - receipt.meta.setBinData(&srx[0], metaSize); + pr.mailItem.meta.mGroupId = preferredGroupId; + } + case GxsMailStatus::PENDING_RECEIPT_CREATE: + { + RsGxsMailPresignedReceipt grcpt; + grcpt.meta = pr.mailItem.meta; + grcpt.meta.mPublishTs = time(NULL); + grcpt.mailId = pr.mailItem.mailId; + uint32_t groff = 0, grsz = grcpt.size(); + std::vector grsrz; + grsrz.resize(grsz); + grcpt.serialize(&grsrz[0], grsz, groff); - std::cout << "p3GxsMails::preparePresignedReceipt(...) prepared receipt" - << "with: grcpt.meta.mMsgId: " << grcpt.meta.mMsgId - << " msgId: " << receipt.msgId - << " metaData.mMsgId: " << receipt.metaData->mMsgId - << std::endl; - return true; + pr.presignedReceipt.grpId = preferredGroupId; + pr.presignedReceipt.metaData = new RsGxsMsgMetaData(); + *pr.presignedReceipt.metaData = grcpt.meta; + pr.presignedReceipt.msg.setBinData(&grsrz[0], grsz); + } + case GxsMailStatus::PENDING_RECEIPT_SIGNATURE: + { + switch (RsGenExchange::createMessage(&pr.presignedReceipt)) + { + case CREATE_SUCCESS: break; + case CREATE_FAIL_TRY_LATER: + pr.status = GxsMailStatus::PENDING_RECEIPT_CREATE; + return; + default: + pr.status = GxsMailStatus::FAILED_RECEIPT_SIGNATURE; + goto processingFailed; + } + + uint32_t metaSize = pr.presignedReceipt.metaData->serial_size(); + std::vector srx; srx.resize(metaSize); + pr.presignedReceipt.metaData->serialise(&srx[0], &metaSize); + pr.presignedReceipt.meta.setBinData(&srx[0], metaSize); + } + case GxsMailStatus::PENDING_PAYLOAD_CREATE: + { + uint16_t serv = static_cast(pr.clientService); + uint32_t rcptsize = pr.presignedReceipt.serial_size(); + uint32_t datasize = pr.mailData.size(); + pr.mailItem.payload.resize(2 + rcptsize + datasize); + uint32_t offset = 0; + setRawUInt16(&pr.mailItem.payload[0], 2, &offset, serv); + pr.presignedReceipt.serialise( &pr.mailItem.payload[offset], + rcptsize ); + offset += rcptsize; + memcpy(&pr.mailItem.payload[offset], &pr.mailData[0], datasize); + } + case GxsMailStatus::PENDING_PAYLOAD_ENCRYPT: + { + switch (pr.mailItem.cryptoType) + { + case RsGxsMailEncryptionMode::CLEAR_TEXT: + { + std::cerr << "p3GxsMails::sendMail(...) you are sending a mail " + << "without encryption, everyone can read it!" + << std::endl; + break; + } + case RsGxsMailEncryptionMode::RSA: + { + uint8_t* encryptedData = NULL; + uint32_t encryptedSize = 0; + uint32_t encryptError = 0; + if( idService.encryptData( &pr.mailItem.payload[0], + pr.mailItem.payload.size(), + encryptedData, encryptedSize, + pr.recipient, encryptError, true ) ) + { + pr.mailItem.payload.resize(encryptedSize); + memcpy( &pr.mailItem.payload[0], encryptedData, + encryptedSize ); + free(encryptedData); + break; + } + else + { + std::cerr << "p3GxsMails::sendMail(...) RSA encryption failed! " + << "error_status: " << encryptError << std::endl; + pr.status = GxsMailStatus::FAILED_ENCRYPTION; + goto processingFailed; + } + } + case RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION: + default: + std::cerr << "p3GxsMails::sendMail(...) attempt to send mail with " + << "wrong EncryptionMode: " + << static_cast(pr.mailItem.cryptoType) + << " dropping mail!" << std::endl; + pr.status = GxsMailStatus::FAILED_ENCRYPTION; + goto processingFailed; + } + } + case GxsMailStatus::PENDING_PUBLISH: + { + std::cout << "p3GxsMails::sendEmail(...) sending mail to: " + << pr.recipient + << " with cryptoType: " + << static_cast(pr.mailItem.cryptoType) + << " recipientHint: " << pr.mailItem.recipientsHint + << " receiptId: " << pr.mailItem.mailId + << " payload size: " << pr.mailItem.payload.size() + << std::endl; + + uint32_t token; + publishMsg(token, new RsGxsMailItem(pr.mailItem)); + pr.status = GxsMailStatus::PENDING_RECEIPT_RECEIVE; + break; + } + //case GxsMailStatus::PENDING_TRANSFER: + case GxsMailStatus::PENDING_RECEIPT_RECEIVE: + { + RS_STACK_MUTEX(ingoingMutex); + auto it = ingoingQueue.find(pr.mailItem.mailId); + if (it == ingoingQueue.end()) break; + RsGxsMailPresignedReceipt* rt = + dynamic_cast(it->second); + if( !rt || !idService.isOwnId(rt->meta.mAuthorId) ) break; + + ingoingQueue.erase(it); delete rt; + pr.status = GxsMailStatus::RECEIPT_RECEIVED; + // TODO: Malicious adversary could forge messages with same mailId and + // could end up overriding the legit receipt in ingoingQueue, and + // causing also a memleak(using unordered_multimap for ingoingQueue + // may fix this?) + // TODO: Resend message if older then treshold + } + case GxsMailStatus::RECEIPT_RECEIVED: + break; + +processingFailed: + case GxsMailStatus::FAILED_RECEIPT_SIGNATURE: + case GxsMailStatus::FAILED_ENCRYPTION: + default: + { + std::cout << "p3GxsMails::processRecord(" << pr.mailItem.mailId + << ") failed with: " << static_cast(pr.status) + << std::endl; + break; + } + } +} + +void p3GxsMails::notifyClientService(const OutgoingRecord& pr) +{ + RS_STACK_MUTEX(servClientsMutex); + auto it = servClients.find(pr.clientService); + if( it != servClients.end()) + { + GxsMailsClient* serv(it->second); + if(serv) + { + serv->notifySendMailStatus(pr.mailItem, pr.status); + return; + } + } + + std::cerr << "p3GxsMails::processRecord(...) (EE) processed" + << " mail for unkown service: " + << static_cast(pr.clientService) + << " fatally failed with: " + << static_cast(pr.status) << std::endl; + print_stacktrace(); } diff --git a/libretroshare/src/services/p3gxsmails.h b/libretroshare/src/services/p3gxsmails.h index 0dac2c5a6..dcbed0937 100644 --- a/libretroshare/src/services/p3gxsmails.h +++ b/libretroshare/src/services/p3gxsmails.h @@ -23,9 +23,27 @@ #include "gxs/gxstokenqueue.h" // For GxsTokenQueue #include "serialiser/rsgxsmailitems.h" // For RS_SERVICE_TYPE_GXS_MAIL #include "services/p3idservice.h" // For p3IdService +#include "util/rsthreads.h" + +enum class GxsMailStatus +{ + PENDING_PROCESSING = 0, + PENDING_PREFERRED_GROUP, + PENDING_RECEIPT_CREATE, + PENDING_RECEIPT_SIGNATURE, + PENDING_SERIALIZATION, + PENDING_PAYLOAD_CREATE, + PENDING_PAYLOAD_ENCRYPT, + PENDING_PUBLISH, + //PENDING_TRANSFER, /// This will be useful so the user can know if the mail reached some friend node, in case of internet connection interruption + PENDING_RECEIPT_RECEIVE, + /// Records with status >= RECEIPT_RECEIVED get deleted + RECEIPT_RECEIVED, + FAILED_RECEIPT_SIGNATURE = 240, + FAILED_ENCRYPTION +}; struct p3GxsMails; - struct GxsMailsClient { /// Subservices identifiers (like port for TCP) @@ -38,10 +56,12 @@ struct GxsMailsClient * @param dataSize size of the buffer * @return true if dispatching goes fine, false otherwise */ - virtual bool receiveGxsMail( const RsGxsMailItem* originalMessage, + virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage, const uint8_t* data, uint32_t dataSize ) = 0; -}; + virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage, + GxsMailStatus status ) = 0; +}; struct p3GxsMails : RsGenExchange, GxsTokenQueue { @@ -51,7 +71,9 @@ struct p3GxsMails : RsGenExchange, GxsTokenQueue RS_SERVICE_TYPE_GXS_MAIL, &identities, AuthenPolicy(), GXS_STORAGE_PERIOD ), GxsTokenQueue(this), idService(identities), - servClientsMutex("p3GxsMails client services map mutex") {} + servClientsMutex("p3GxsMails client services map mutex"), + outgoingMutex("p3GxsMails outgoing queue map mutex"), + ingoingMutex("p3GxsMails ingoing queue map mutex") {} /** * Send an email to recipient, in the process author of the email is @@ -61,37 +83,43 @@ struct p3GxsMails : RsGenExchange, GxsTokenQueue * This method is part of the public interface of this service. * @return true if the mail will be sent, false if not */ - bool sendMail( GxsMailsClient::GxsMailSubServices service, + bool sendMail( RsGxsMailId& mailId, + GxsMailsClient::GxsMailSubServices service, const RsGxsId& own_gxsid, const RsGxsId& recipient, const uint8_t* data, uint32_t size, - RsGxsMailBaseItem::EncryptionMode cm = RsGxsMailBaseItem::RSA + RsGxsMailEncryptionMode cm = RsGxsMailEncryptionMode::RSA ); + /** + * This method is part of the public interface of this service. + * @return false if mail is not found in outgoing queue, true otherwise + */ + bool querySendMailStatus( RsGxsMailId mailId, GxsMailStatus& st ); + /** * Register a client service to p3GxsMails to receive mails via * GxsMailsClient::receiveGxsMail(...) callback + * This method is part of the public interface of this service. */ void registerGxsMailsClient( GxsMailsClient::GxsMailSubServices serviceType, GxsMailsClient* service ); + /// @see RsGenExchange::getServiceInfo() + virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_MAIL, "GXS Mails", 0, 1, 0, 1 ); } +private: /// @see GxsTokenQueue::handleResponse(uint32_t token, uint32_t req_type) virtual void handleResponse(uint32_t token, uint32_t req_type); /// @see RsGenExchange::service_tick() virtual void service_tick(); - /// @see RsGenExchange::getServiceInfo() - virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_MAIL, "GXS Mails", 0, 1, 0, 1 ); } - /// @see RsGenExchange::service_CreateGroup(...) RsGenExchange::ServiceCreate_Return service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet&); -protected: /// @see RsGenExchange::notifyChanges(std::vector &changes) void notifyChanges(std::vector &changes); -private: /** Time interval of inactivity before a distribution group is unsubscribed. * Approximatively 3 months seems ok ATM. */ const static int32_t UNUSED_GROUP_UNSUBSCRIBE_INTERVAL = 0x76A700; @@ -101,7 +129,7 @@ private: * very fast taking in account we are handling mails for the whole network. * We do prefer to resend a not acknowledged yet mail after * GXS_STORAGE_PERIOD has passed and keep it little. - * Tought it can't be too little as this may cause signed acknowledged to + * Tought it can't be too little as this may cause signed receipts to * get lost thus causing resend and fastly grow perceived async latency, in * case two sporadically connected users sends mails each other. * TODO: While it is ok for signed acknowledged to stays in the DB for a @@ -132,6 +160,33 @@ private: std::map servClients; RsMutex servClientsMutex; + struct OutgoingRecord + { + OutgoingRecord( RsGxsId rec, GxsMailsClient::GxsMailSubServices cs, + const uint8_t* data, uint32_t size ) : + status(GxsMailStatus::PENDING_PROCESSING), recipient(rec), + clientService(cs) + { + mailData.resize(size); + memcpy(&mailData[0], data, size); + } + + GxsMailStatus status; + RsGxsId recipient; + RsGxsMailItem mailItem; /// Don't use a pointer would be invalid after publish + std::vector mailData; + GxsMailsClient::GxsMailSubServices clientService; + RsNxsMailPresignedReceipt presignedReceipt; + }; + /// Keep track of mails while being processed + typedef std::map prMap; + prMap outgoingQueue; + RsMutex outgoingMutex; + void processOutgoingRecord(OutgoingRecord& r); + + typedef std::map inMap; + inMap ingoingQueue; + RsMutex ingoingMutex; /// Request groups list to GXS backend. Async method. bool requestGroupsData(const std::list* groupIds = NULL); @@ -171,27 +226,69 @@ private: const uint8_t* decrypted_data, uint32_t decrypted_data_size ); - bool preparePresignedReceipt( const RsGxsMailItem& mail, - RsNxsMailPresignedReceipt& receipt ); + void notifyClientService(const OutgoingRecord& pr); }; - -struct TestGxsMailClientService : GxsMailsClient +struct TestGxsMailClientService : GxsMailsClient, RsSingleJobThread { - TestGxsMailClientService(p3GxsMails& gmxMailService) + TestGxsMailClientService( p3GxsMails& gxsMailService, + p3IdService& gxsIdService ) : + mailService(gxsMailService), idService(gxsIdService) { - gmxMailService.registerGxsMailsClient( GxsMailSubServices::TEST_SERVICE, - this ); + mailService.registerGxsMailsClient( GxsMailSubServices::TEST_SERVICE, + this ); } /// @see GxsMailsClient::receiveGxsMail(...) - virtual bool receiveGxsMail( const RsGxsMailItem* originalMessage, + virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage, const uint8_t* data, uint32_t dataSize ) { std::cout << "TestGxsMailClientService::receiveGxsMail(...) got message" - << " from: " << originalMessage->meta.mAuthorId << std::endl - << "\t" << std::string((char*)data, dataSize) << std::endl; + << " from: " << originalMessage.meta.mAuthorId << std::endl + << "\t>" << std::string((char*)data, dataSize) << "<" + << std::endl; return true; } + + /// @see GxsMailsClient::notifyMailStatus(...) + virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage, + GxsMailStatus status ) + { + std::cout << "TestGxsMailClientService::notifyMailsStatus(...) for: " + << originalMessage.mailId << " status: " + << static_cast(status) << std::endl; + if( status == GxsMailStatus::RECEIPT_RECEIVED ) + std::cout << "\t It mean Receipt has been Received!" << std::endl; + return true; + } + + /// @see RsSingleJobThread::run() + virtual void run() + { + usleep(10*1000*1000); + RsGxsId gxsidA("d0df7474bdde0464679e6ef787890287"); + RsGxsId gxsidB("d060bea09dfa14883b5e6e517eb580cd"); + RsGxsMailId mailId = 0; + if(idService.isOwnId(gxsidA)) + { + std::string ciao("CiAone!"); + mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidA, + gxsidB, + reinterpret_cast(ciao.data()), + ciao.size() ); + } + else if(idService.isOwnId(gxsidB)) + { + std::string ciao("CiBuono!"); + mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidB, + gxsidA, + reinterpret_cast(ciao.data()), + ciao.size() ); + } + } + +private: + p3GxsMails& mailService; + p3IdService& idService; };