p3GxsMails expose proper async API

Moved testing code to TestGxsMailClientService
RsGxsMailPresignedReceipt and RsGxsMailItem inherit RsGxsMailBaseItem
p3GxsMails::sendMail(...) check paramenters and return immediately
Added GxsMailsClient::notifySendMailStatus(...) to notify sent mails status
Added p3GxsMails::querySendMailStatus(...) so clients can query status
This commit is contained in:
Gioacchino Mazzurco 2017-02-21 12:02:27 +01:00
parent fcdb3d6c88
commit 1376b9f031
5 changed files with 455 additions and 300 deletions

View File

@ -1500,7 +1500,8 @@ int RsServer::StartupRetroShare()
pqih->addService(gxsmails_ns, true);
mConfigMgr->addConfiguration("gxs_mail.cfg", gxsmails_ns);
new TestGxsMailClientService(*mGxsMails);
TestGxsMailClientService* tgms =
new TestGxsMailClientService(*mGxsMails, *mGxsIdService);
# endif // RS_GXS_MAIL
// remove pword from memory
@ -1827,6 +1828,7 @@ int RsServer::StartupRetroShare()
# ifdef RS_GXS_MAIL
startServiceThread(mGxsMails, "gxs mail");
startServiceThread(gxsmails_ns, "gxs mail ns");
tgms->start("Gxs Mail Test Service");
# endif // RS_GXS_MAIL
#endif // RS_ENABLE_GXS

View File

@ -18,7 +18,7 @@
#include "serialiser/rsgxsmailitems.h"
const RsGxsId RsGxsMailBaseItem::allRecipientsHint("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
const RsGxsId RsGxsMailItem::allRecipientsHint("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
bool RsGxsMailBaseItem::serialize(uint8_t* data, uint32_t size,
@ -26,9 +26,7 @@ bool RsGxsMailBaseItem::serialize(uint8_t* data, uint32_t size,
{
bool ok = setRsItemHeader(data, size, PacketId(), size);
ok = ok && (offset += 8); // Take header in account
ok = ok && setRawUInt8(data, size, &offset, cryptoType);
ok = ok && recipientsHint.serialise(data, size, offset);
ok = ok && setRawUInt64(data, size, &offset, receiptId);
ok = ok && setRawUInt64(data, size, &offset, mailId);
return ok;
}
@ -39,18 +37,14 @@ bool RsGxsMailBaseItem::deserialize(const uint8_t* data, uint32_t& size,
uint32_t rssize = getRsItemSize(dataPtr);
uint32_t roffset = offset + 8; // Take header in account
bool ok = rssize <= size;
uint8_t crType;
ok = ok && getRawUInt8(dataPtr, rssize, &roffset, &crType);
cryptoType = static_cast<EncryptionMode>(crType);
ok = ok && recipientsHint.deserialise(dataPtr, rssize, roffset);
ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &receiptId);
ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &mailId);
if(ok) { size = rssize; offset = roffset; }
else size = 0;
return ok;
}
std::ostream&RsGxsMailBaseItem::print(std::ostream& out, uint16_t)
{ return out; }
std::ostream& RsGxsMailBaseItem::print(std::ostream &out, uint16_t)
{ return out << " RsGxsMailBaseItem::mailId: " << mailId; }
bool RsGxsMailSerializer::serialise(RsItem* item, void* data, uint32_t* size)
{

View File

@ -34,65 +34,55 @@ enum GxsMailItemsSubtypes
GXS_MAIL_SUBTYPE_GROUP = 3
};
typedef uint64_t RsGxsMailId;
struct RsNxsMailPresignedReceipt : RsNxsMsg
{
RsNxsMailPresignedReceipt() : RsNxsMsg(RS_SERVICE_TYPE_GXS_MAIL) {}
};
struct RsGxsMailPresignedReceipt : RsGxsMsgItem
{
RsGxsMailPresignedReceipt() :
RsGxsMsgItem( RS_SERVICE_TYPE_GXS_MAIL,
static_cast<uint8_t>(GXS_MAIL_SUBTYPE_RECEIPT) ),
receiptId(0) {}
uint64_t receiptId;
static uint32_t inline size()
{
return 8 + // Header
8; // receiptId
}
bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const
{
bool ok = setRsItemHeader(data, size, PacketId(), size);
ok = ok && (offset += 8); // Take header in account
ok = ok && setRawUInt64(data, size, &offset, receiptId);
return ok;
}
bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset)
{
void* dataPtr = reinterpret_cast<void*>(const_cast<uint8_t*>(data));
uint32_t rssize = getRsItemSize(dataPtr);
uint32_t roffset = offset + 8; // Take header in account
bool ok = rssize <= size;
ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &receiptId);
if(ok) { size = rssize; offset = roffset; }
else size = 0;
return ok;
}
void clear() { receiptId = 0; }
std::ostream &print(std::ostream &out, uint16_t /*indent = 0*/)
{ return out << receiptId; }
};
struct RsGxsMailBaseItem : RsGxsMsgItem
{
RsGxsMailBaseItem(GxsMailItemsSubtypes subtype) :
RsGxsMsgItem( RS_SERVICE_TYPE_GXS_MAIL,
static_cast<uint8_t>(subtype) ),
cryptoType(UNDEFINED_ENCRYPTION), receiptId(0) {}
static_cast<uint8_t>(subtype) ), mailId(0) {}
/// Values must fit into uint8_t
enum EncryptionMode
RsGxsMailId mailId;
void inline clear()
{
CLEAR_TEXT = 1,
RSA = 2,
UNDEFINED_ENCRYPTION = 250
};
EncryptionMode cryptoType;
mailId = 0;
meta = RsMsgMetaData();
}
static uint32_t inline size()
{
return 8 + // Header
8; // mailId
}
bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const;
bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset);
std::ostream &print(std::ostream &out, uint16_t /*indent = 0*/);
};
struct RsGxsMailPresignedReceipt : RsGxsMailBaseItem
{
RsGxsMailPresignedReceipt() : RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_RECEIPT) {}
};
enum class RsGxsMailEncryptionMode : uint8_t
{
CLEAR_TEXT = 1,
RSA = 2,
UNDEFINED_ENCRYPTION = 250
};
struct RsGxsMailItem : RsGxsMailBaseItem
{
RsGxsMailItem() : RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_MAIL),
cryptoType(RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION) {}
RsGxsMailEncryptionMode cryptoType;
/**
* @brief recipientsHint used instead of plain recipient id, so sender can
@ -130,65 +120,37 @@ struct RsGxsMailBaseItem : RsGxsMsgItem
* corresponding hint may be fruit of a "luky" salting of another id.
*/
RsGxsId recipientsHint;
void static inline saltRecipientHint(RsGxsId& hint, const RsGxsId& salt)
{ hint = hint | salt; }
void inline saltRecipientHint(const RsGxsId& salt)
{ saltRecipientHint(recipientsHint, salt); }
{ recipientsHint = recipientsHint | salt; }
/**
* @brief maybeRecipient given an id and an hint check if they match
* @see recipientHint
* @note this is not the final implementation as id and hint are not 32bit
* integers it is just to not forget how to verify the hint/id matching
* fastly with boolean ops
* @return true if the id may be recipient of the hint, false otherwise
*/
bool static inline maybeRecipient(const RsGxsId& hint, const RsGxsId& id)
{ return (~id|hint) == allRecipientsHint; }
bool inline maybeRecipient(const RsGxsId& id) const
{ return maybeRecipient(recipientsHint, id); }
{ return (~id|recipientsHint) == allRecipientsHint; }
const static RsGxsId allRecipientsHint;
uint64_t receiptId;
void inline clear()
{
cryptoType = UNDEFINED_ENCRYPTION;
recipientsHint.clear();
receiptId = 0;
meta = RsMsgMetaData();
}
static uint32_t inline size()
{
return 8 + // Header
1 + // cryptoType
RsGxsId::serial_size() + // recipientsHint
8; // receiptId
}
bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const;
bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset);
std::ostream &print(std::ostream &out, uint16_t /*indent = 0*/);
};
struct RsGxsMailItem : RsGxsMailBaseItem
{
RsGxsMailItem(GxsMailItemsSubtypes subtype) :
RsGxsMailBaseItem(subtype) {}
RsGxsMailItem() :
RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_MAIL) {}
/** This should travel encrypted, unless EncryptionMode::CLEAR_TEXT
* is specified */
std::vector<uint8_t> payload;
uint32_t size() const { return RsGxsMailBaseItem::size() + payload.size(); }
uint32_t size() const
{
return RsGxsMailBaseItem::size() +
1 + // cryptoType
recipientsHint.serial_size() +
payload.size();
}
bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const
{
bool ok = size < MAX_SIZE;
ok = ok && RsGxsMailBaseItem::serialize(data, size, offset);
ok = ok && setRawUInt8( data, size, &offset,
static_cast<uint8_t>(cryptoType) );
ok = ok && recipientsHint.serialise(data, size, offset);
uint32_t psz = payload.size();
ok = ok && memcpy(data+offset, &payload[0], psz);
offset += psz;
@ -196,13 +158,28 @@ struct RsGxsMailItem : RsGxsMailBaseItem
}
bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset)
{
uint32_t bsz = RsGxsMailBaseItem::size();
uint32_t psz = size - bsz;
return size < MAX_SIZE && size >= bsz
&& RsGxsMailBaseItem::deserialize(data, size, offset)
&& (payload.resize(psz), memcpy(&payload[0], data+offset, psz));
void* dataPtr = reinterpret_cast<void*>(const_cast<uint8_t*>(data));
uint32_t rssize = getRsItemSize(dataPtr);
uint32_t roffset = offset;
bool ok = rssize <= size && size < MAX_SIZE;
ok = ok && RsGxsMailBaseItem::deserialize(data, rssize, roffset);
uint8_t crType;
ok = ok && getRawUInt8(dataPtr, rssize, &roffset, &crType);
cryptoType = static_cast<RsGxsMailEncryptionMode>(crType);
ok = ok && recipientsHint.deserialise(dataPtr, rssize, roffset);
uint32_t psz = rssize - roffset;
ok = ok && (payload.resize(psz), memcpy(&payload[0], data+roffset, psz));
if(ok) { size = rssize; offset = roffset; }
else size = 0;
return ok;
}
void clear()
{
RsGxsMailBaseItem::clear();
cryptoType = RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION;
recipientsHint.clear();
payload.clear();
}
void clear() { RsGxsMailBaseItem::clear(); payload.clear(); }
/// Maximum mail size in bytes 10 MiB is more than anything sane can need
const static uint32_t MAX_SIZE = 10*8*1024*1024;

View File

@ -20,21 +20,14 @@
#include "util/stacktrace.h"
bool p3GxsMails::sendMail( GxsMailsClient::GxsMailSubServices service,
bool p3GxsMails::sendMail( RsGxsMailId& mailId,
GxsMailsClient::GxsMailSubServices service,
const RsGxsId& own_gxsid, const RsGxsId& recipient,
const uint8_t* data, uint32_t size,
RsGxsMailBaseItem::EncryptionMode cm)
RsGxsMailEncryptionMode cm )
{
std::cout << "p3GxsMails::sendEmail(...)" << std::endl;
if(preferredGroupId.isNull())
{
requestGroupsData();
std::cerr << "p3GxsMails::sendEmail(...) preferredGroupId.isNull()!"
<< std::endl;
return false;
}
if(!idService.isOwnId(own_gxsid))
{
std::cerr << "p3GxsMails::sendEmail(...) isOwnId(own_gxsid) false!"
@ -50,78 +43,31 @@ bool p3GxsMails::sendMail( GxsMailsClient::GxsMailSubServices service,
return false;
}
RsGxsMailItem* item = new RsGxsMailItem();
OutgoingRecord pr( recipient, service, data, size );
pr.mailItem.meta.mAuthorId = own_gxsid;
pr.mailItem.cryptoType = cm;
pr.mailItem.mailId = RSRandom::random_u64();
// Public metadata
item->meta.mAuthorId = own_gxsid;
item->meta.mGroupId = preferredGroupId;
item->cryptoType = cm;
item->saltRecipientHint(recipient);
item->saltRecipientHint(RsGxsId::random());
item->receiptId = RSRandom::random_u64();
RsNxsMailPresignedReceipt nrcpt;
preparePresignedReceipt(*item, nrcpt);
uint16_t serv = static_cast<uint16_t>(service);
uint32_t rcptsize = nrcpt.serial_size();
item->payload.resize(2 + rcptsize + size);
uint32_t offset = 0;
setRawUInt16(&item->payload[0], 2, &offset, serv);
nrcpt.serialise(&item->payload[offset], rcptsize); offset += rcptsize;
memcpy(&item->payload[offset], data, size); //offset += size;
std::cout << "p3GxsMails::sendMail(...) receipt size: " << rcptsize << std::endl;
switch (cm)
{
case RsGxsMailBaseItem::CLEAR_TEXT:
{
std::cerr << "p3GxsMails::sendMail(...) you are sending a mail without"
<< " encryption, everyone can read it!" << std::endl;
print_stacktrace();
break;
}
case RsGxsMailBaseItem::RSA:
{
uint8_t* encryptedData = NULL;
uint32_t encryptedSize = 0;
uint32_t encryptError = 0;
if( idService.encryptData( &item->payload[0], item->payload.size(),
encryptedData, encryptedSize,
recipient, encryptError, true ) )
{
item->payload.resize(encryptedSize);
memcpy(&item->payload[0], encryptedData, encryptedSize);
free(encryptedData);
break;
}
else
{
std::cerr << "p3GxsMails::sendMail(...) RSA encryption failed! "
<< "error_status: " << encryptError << std::endl;
print_stacktrace();
return false;
}
}
case RsGxsMailBaseItem::UNDEFINED_ENCRYPTION:
default:
std::cerr << "p3GxsMails::sendMail(...) attempt to send mail with wrong"
<< " EncryptionMode " << cm << " dropping mail!" << std::endl;
print_stacktrace();
return false;
RS_STACK_MUTEX(outgoingMutex);
outgoingQueue.insert(prMap::value_type(pr.mailItem.mailId, pr));
}
uint32_t token;
std::cout << "p3GxsMails::sendEmail(...) sending mail to: "<< recipient
<< " with cryptoType: " << item->cryptoType
<< " recipientHint: " << item->recipientsHint
<< " receiptId: " << item->receiptId
<< " payload size: " << item->payload.size() << std::endl;
publishMsg(token, item);
mailId = pr.mailItem.mailId;
return true;
}
bool p3GxsMails::querySendMailStatus(RsGxsMailId mailId, GxsMailStatus& st)
{
auto it = outgoingQueue.find(mailId);
if( it != outgoingQueue.end() )
{
st = it->second.status;
return true;
}
return false;
}
void p3GxsMails::registerGxsMailsClient(
GxsMailsClient::GxsMailSubServices serviceType, GxsMailsClient* service)
{
@ -132,7 +78,8 @@ void p3GxsMails::registerGxsMailsClient(
void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type)
{
//std::cout << "p3GxsMails::handleResponse(" << token << ", " << req_type << ")" << std::endl;
std::cout << "p3GxsMails::handleResponse(" << token << ", " << req_type
<< ")" << std::endl;
switch (req_type)
{
case GROUPS_LIST:
@ -202,66 +149,34 @@ void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type)
vT& mv(gIt->second);
for( vT::const_iterator mIt = mv.begin(); mIt != mv.end(); ++mIt )
{
RsGxsMsgItem* gItem = *mIt;
switch(gItem->PacketSubType())
RsGxsMsgItem* gIt = *mIt;
switch(gIt->PacketSubType())
{
case GXS_MAIL_SUBTYPE_MAIL:
case GXS_MAIL_SUBTYPE_RECEIPT:
{
RsGxsMailItem* msg = dynamic_cast<RsGxsMailItem*>(gItem);
if(!msg)
RsGxsMailBaseItem* mb =
dynamic_cast<RsGxsMailBaseItem*>(*mIt);
if(mb)
{
RS_STACK_MUTEX(ingoingMutex);
ingoingQueue.insert(inMap::value_type(mb->mailId, mb));
}
else
std::cerr << "p3GxsMails::handleResponse(...) "
<< "GXS_MAIL_SUBTYPE_MAIL cast error, "
<< "something really wrong is happening"
<< std::endl;
break;
}
std::cout << "p3GxsMails::handleResponse(...) MAILS_UPDATE "
<< "GXS_MAIL_SUBTYPE_MAIL handling: "
<< msg->meta.mMsgId
<< " with cryptoType: "<< msg->cryptoType
<< " recipientHint: " << msg->recipientsHint
<< " receiptId: "<< msg->receiptId
<< " payload.size(): " << msg->payload.size()
<< std::endl;
handleEcryptedMail(msg);
break;
}
case GXS_MAIL_SUBTYPE_RECEIPT:
{
RsGxsMailPresignedReceipt* msg =
dynamic_cast<RsGxsMailPresignedReceipt*>(gItem);
if(!msg)
{
std::cerr << "p3GxsMails::handleResponse(...) "
<< "GXS_MAIL_SUBTYPE_RECEIPT cast error, "
<< "something really wrong is happening"
<< std::endl;
break;
}
std::cout << "p3GxsMails::handleResponse(...) MAILS_UPDATE "
<< "GXS_MAIL_SUBTYPE_RECEIPT handling: "
<< msg->meta.mMsgId
<< "with receiptId: "<< msg->receiptId
<< std::endl;
/* TODO: Notify client services if the original mail was
* sent from this node and mark for deletion, otherwise
* just mark original mail for deletion. */
break;
}
default:
std::cerr << "p3GxsMails::handleResponse(...) MAILS_UPDATE "
<< "Unknown mail subtype : "
<< static_cast<uint32_t>(gItem->PacketSubType())
<< static_cast<uint>(gIt->PacketSubType())
<< std::endl;
delete gIt;
break;
}
delete gItem;
}
}
break;
@ -275,32 +190,52 @@ void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type)
void p3GxsMails::service_tick()
{
static int tc = 0;
++tc;
GxsTokenQueue::checkRequests();
if(((tc % 1000) == 0) || (tc == 50)) requestGroupsData();
if(tc == 500)
{
RsGxsId gxsidA("d0df7474bdde0464679e6ef787890287");
RsGxsId gxsidB("d060bea09dfa14883b5e6e517eb580cd");
if(idService.isOwnId(gxsidA))
RS_STACK_MUTEX(outgoingMutex);
for ( auto it = outgoingQueue.begin(); it != outgoingQueue.end(); )
{
std::string ciao("CiAone!");
sendMail( GxsMailsClient::TEST_SERVICE, gxsidA, gxsidB,
reinterpret_cast<const uint8_t*>(ciao.data()),
ciao.size(), RsGxsMailBaseItem::RSA );
OutgoingRecord& pr(it->second);
GxsMailStatus oldStatus = pr.status;
processOutgoingRecord(pr);
if (oldStatus != pr.status) notifyClientService(pr);
if( pr.status >= GxsMailStatus::RECEIPT_RECEIVED )
it = outgoingQueue.erase(it);
else ++it;
}
// else if(idService.isOwnId(gxsidB))
// {
// std::string ciao("CiBuono!");
// sendMail( GxsMailsClient::TEST_SERVICE, gxsidB, gxsidA,
// reinterpret_cast<const uint8_t*>(ciao.data()),
// ciao.size(), RsGxsMailBaseItem::RSA );
// }
}
GxsTokenQueue::checkRequests();
{
RS_STACK_MUTEX(ingoingMutex);
for( auto it = ingoingQueue.begin(); it != ingoingQueue.end(); )
{
if( it->second->PacketSubType() != GXS_MAIL_SUBTYPE_MAIL )
{ ++it; continue; }
RsGxsMailItem* msg = dynamic_cast<RsGxsMailItem*>(it->second);
if(!msg)
{
std::cout << "p3GxsMails::service_tick() GXS_MAIL_SUBTYPE_MAIL"
<< "dynamic_cast failed, something really wrong is "
<< "happening!" << std::endl;
++it; continue;
}
std::cout << "p3GxsMails::service_tick() GXS_MAIL_SUBTYPE_MAIL "
<< "handling: " << msg->meta.mMsgId
<< " with cryptoType: "
<< static_cast<uint32_t>(msg->cryptoType)
<< " recipientHint: " << msg->recipientsHint
<< " mailId: "<< msg->mailId
<< " payload.size(): " << msg->payload.size()
<< std::endl;
handleEcryptedMail(msg);
it = ingoingQueue.erase(it); delete msg;
}
}
}
RsGenExchange::ServiceCreate_Return p3GxsMails::service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/)
@ -409,7 +344,7 @@ bool p3GxsMails::handleEcryptedMail(const RsGxsMailItem* mail)
switch (mail->cryptoType)
{
case RsGxsMailBaseItem::CLEAR_TEXT:
case RsGxsMailEncryptionMode::CLEAR_TEXT:
{
uint16_t csri = 0;
uint32_t off = 0;
@ -419,7 +354,7 @@ bool p3GxsMails::handleEcryptedMail(const RsGxsMailItem* mail)
return dispatchDecryptedMail( mail, &mail->payload[0],
mail->payload.size() );
}
case RsGxsMailBaseItem::RSA:
case RsGxsMailEncryptionMode::RSA:
{
bool ok = true;
for( std::set<RsGxsId>::const_iterator it = decryptIds.begin();
@ -440,7 +375,7 @@ bool p3GxsMails::handleEcryptedMail(const RsGxsMailItem* mail)
}
default:
std::cout << "Unknown encryption type:"
<< mail->cryptoType << std::endl;
<< static_cast<uint32_t>(mail->cryptoType) << std::endl;
return false;
}
}
@ -487,7 +422,7 @@ bool p3GxsMails::dispatchDecryptedMail( const RsGxsMailItem* received_msg,
}
if(reecipientService)
return reecipientService->receiveGxsMail( received_msg,
return reecipientService->receiveGxsMail( *received_msg,
&decrypted_data[offset],
decrypted_data_size-offset );
else
@ -499,39 +434,189 @@ bool p3GxsMails::dispatchDecryptedMail( const RsGxsMailItem* received_msg,
}
}
bool p3GxsMails::preparePresignedReceipt(const RsGxsMailItem& mail, RsNxsMailPresignedReceipt& receipt)
void p3GxsMails::processOutgoingRecord(OutgoingRecord& pr)
{
RsGxsMailPresignedReceipt grcpt;
grcpt.meta = mail.meta;
grcpt.meta.mPublishTs = time(NULL);
grcpt.receiptId = mail.receiptId;
uint32_t groff = 0, grsz = grcpt.size();
std::vector<uint8_t> grsrz;
grsrz.resize(grsz);
grcpt.serialize(&grsrz[0], grsz, groff);
receipt.msg.setBinData(&grsrz[0], grsz);
//std::cout << "p3GxsMails::processRecord(...)" << std::endl;
receipt.grpId = preferredGroupId;
receipt.metaData = new RsGxsMsgMetaData();
*receipt.metaData = grcpt.meta;
if(createMessage(&receipt) != CREATE_SUCCESS)
switch (pr.status)
{
std::cout << "p3GxsMails::preparePresignedReceipt(...) receipt creation"
<< " failed!" << std::endl;
return false;
case GxsMailStatus::PENDING_PROCESSING:
{
pr.mailItem.saltRecipientHint(pr.recipient);
pr.mailItem.saltRecipientHint(RsGxsId::random());
}
case GxsMailStatus::PENDING_PREFERRED_GROUP:
{
if(preferredGroupId.isNull())
{
requestGroupsData();
pr.status = GxsMailStatus::PENDING_PREFERRED_GROUP;
break;
}
uint32_t metaSize = receipt.metaData->serial_size();
std::vector<uint8_t> srx; srx.resize(metaSize);
receipt.metaData->serialise(&srx[0], &metaSize);
receipt.meta.setBinData(&srx[0], metaSize);
pr.mailItem.meta.mGroupId = preferredGroupId;
}
case GxsMailStatus::PENDING_RECEIPT_CREATE:
{
RsGxsMailPresignedReceipt grcpt;
grcpt.meta = pr.mailItem.meta;
grcpt.meta.mPublishTs = time(NULL);
grcpt.mailId = pr.mailItem.mailId;
uint32_t groff = 0, grsz = grcpt.size();
std::vector<uint8_t> grsrz;
grsrz.resize(grsz);
grcpt.serialize(&grsrz[0], grsz, groff);
std::cout << "p3GxsMails::preparePresignedReceipt(...) prepared receipt"
<< "with: grcpt.meta.mMsgId: " << grcpt.meta.mMsgId
<< " msgId: " << receipt.msgId
<< " metaData.mMsgId: " << receipt.metaData->mMsgId
<< std::endl;
return true;
pr.presignedReceipt.grpId = preferredGroupId;
pr.presignedReceipt.metaData = new RsGxsMsgMetaData();
*pr.presignedReceipt.metaData = grcpt.meta;
pr.presignedReceipt.msg.setBinData(&grsrz[0], grsz);
}
case GxsMailStatus::PENDING_RECEIPT_SIGNATURE:
{
switch (RsGenExchange::createMessage(&pr.presignedReceipt))
{
case CREATE_SUCCESS: break;
case CREATE_FAIL_TRY_LATER:
pr.status = GxsMailStatus::PENDING_RECEIPT_CREATE;
return;
default:
pr.status = GxsMailStatus::FAILED_RECEIPT_SIGNATURE;
goto processingFailed;
}
uint32_t metaSize = pr.presignedReceipt.metaData->serial_size();
std::vector<uint8_t> srx; srx.resize(metaSize);
pr.presignedReceipt.metaData->serialise(&srx[0], &metaSize);
pr.presignedReceipt.meta.setBinData(&srx[0], metaSize);
}
case GxsMailStatus::PENDING_PAYLOAD_CREATE:
{
uint16_t serv = static_cast<uint16_t>(pr.clientService);
uint32_t rcptsize = pr.presignedReceipt.serial_size();
uint32_t datasize = pr.mailData.size();
pr.mailItem.payload.resize(2 + rcptsize + datasize);
uint32_t offset = 0;
setRawUInt16(&pr.mailItem.payload[0], 2, &offset, serv);
pr.presignedReceipt.serialise( &pr.mailItem.payload[offset],
rcptsize );
offset += rcptsize;
memcpy(&pr.mailItem.payload[offset], &pr.mailData[0], datasize);
}
case GxsMailStatus::PENDING_PAYLOAD_ENCRYPT:
{
switch (pr.mailItem.cryptoType)
{
case RsGxsMailEncryptionMode::CLEAR_TEXT:
{
std::cerr << "p3GxsMails::sendMail(...) you are sending a mail "
<< "without encryption, everyone can read it!"
<< std::endl;
break;
}
case RsGxsMailEncryptionMode::RSA:
{
uint8_t* encryptedData = NULL;
uint32_t encryptedSize = 0;
uint32_t encryptError = 0;
if( idService.encryptData( &pr.mailItem.payload[0],
pr.mailItem.payload.size(),
encryptedData, encryptedSize,
pr.recipient, encryptError, true ) )
{
pr.mailItem.payload.resize(encryptedSize);
memcpy( &pr.mailItem.payload[0], encryptedData,
encryptedSize );
free(encryptedData);
break;
}
else
{
std::cerr << "p3GxsMails::sendMail(...) RSA encryption failed! "
<< "error_status: " << encryptError << std::endl;
pr.status = GxsMailStatus::FAILED_ENCRYPTION;
goto processingFailed;
}
}
case RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION:
default:
std::cerr << "p3GxsMails::sendMail(...) attempt to send mail with "
<< "wrong EncryptionMode: "
<< static_cast<uint>(pr.mailItem.cryptoType)
<< " dropping mail!" << std::endl;
pr.status = GxsMailStatus::FAILED_ENCRYPTION;
goto processingFailed;
}
}
case GxsMailStatus::PENDING_PUBLISH:
{
std::cout << "p3GxsMails::sendEmail(...) sending mail to: "
<< pr.recipient
<< " with cryptoType: "
<< static_cast<uint>(pr.mailItem.cryptoType)
<< " recipientHint: " << pr.mailItem.recipientsHint
<< " receiptId: " << pr.mailItem.mailId
<< " payload size: " << pr.mailItem.payload.size()
<< std::endl;
uint32_t token;
publishMsg(token, new RsGxsMailItem(pr.mailItem));
pr.status = GxsMailStatus::PENDING_RECEIPT_RECEIVE;
break;
}
//case GxsMailStatus::PENDING_TRANSFER:
case GxsMailStatus::PENDING_RECEIPT_RECEIVE:
{
RS_STACK_MUTEX(ingoingMutex);
auto it = ingoingQueue.find(pr.mailItem.mailId);
if (it == ingoingQueue.end()) break;
RsGxsMailPresignedReceipt* rt =
dynamic_cast<RsGxsMailPresignedReceipt*>(it->second);
if( !rt || !idService.isOwnId(rt->meta.mAuthorId) ) break;
ingoingQueue.erase(it); delete rt;
pr.status = GxsMailStatus::RECEIPT_RECEIVED;
// TODO: Malicious adversary could forge messages with same mailId and
// could end up overriding the legit receipt in ingoingQueue, and
// causing also a memleak(using unordered_multimap for ingoingQueue
// may fix this?)
// TODO: Resend message if older then treshold
}
case GxsMailStatus::RECEIPT_RECEIVED:
break;
processingFailed:
case GxsMailStatus::FAILED_RECEIPT_SIGNATURE:
case GxsMailStatus::FAILED_ENCRYPTION:
default:
{
std::cout << "p3GxsMails::processRecord(" << pr.mailItem.mailId
<< ") failed with: " << static_cast<uint>(pr.status)
<< std::endl;
break;
}
}
}
void p3GxsMails::notifyClientService(const OutgoingRecord& pr)
{
RS_STACK_MUTEX(servClientsMutex);
auto it = servClients.find(pr.clientService);
if( it != servClients.end())
{
GxsMailsClient* serv(it->second);
if(serv)
{
serv->notifySendMailStatus(pr.mailItem, pr.status);
return;
}
}
std::cerr << "p3GxsMails::processRecord(...) (EE) processed"
<< " mail for unkown service: "
<< static_cast<uint32_t>(pr.clientService)
<< " fatally failed with: "
<< static_cast<uint32_t>(pr.status) << std::endl;
print_stacktrace();
}

View File

@ -23,9 +23,27 @@
#include "gxs/gxstokenqueue.h" // For GxsTokenQueue
#include "serialiser/rsgxsmailitems.h" // For RS_SERVICE_TYPE_GXS_MAIL
#include "services/p3idservice.h" // For p3IdService
#include "util/rsthreads.h"
enum class GxsMailStatus
{
PENDING_PROCESSING = 0,
PENDING_PREFERRED_GROUP,
PENDING_RECEIPT_CREATE,
PENDING_RECEIPT_SIGNATURE,
PENDING_SERIALIZATION,
PENDING_PAYLOAD_CREATE,
PENDING_PAYLOAD_ENCRYPT,
PENDING_PUBLISH,
//PENDING_TRANSFER, /// This will be useful so the user can know if the mail reached some friend node, in case of internet connection interruption
PENDING_RECEIPT_RECEIVE,
/// Records with status >= RECEIPT_RECEIVED get deleted
RECEIPT_RECEIVED,
FAILED_RECEIPT_SIGNATURE = 240,
FAILED_ENCRYPTION
};
struct p3GxsMails;
struct GxsMailsClient
{
/// Subservices identifiers (like port for TCP)
@ -38,10 +56,12 @@ struct GxsMailsClient
* @param dataSize size of the buffer
* @return true if dispatching goes fine, false otherwise
*/
virtual bool receiveGxsMail( const RsGxsMailItem* originalMessage,
virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage,
const uint8_t* data, uint32_t dataSize ) = 0;
};
virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage,
GxsMailStatus status ) = 0;
};
struct p3GxsMails : RsGenExchange, GxsTokenQueue
{
@ -51,7 +71,9 @@ struct p3GxsMails : RsGenExchange, GxsTokenQueue
RS_SERVICE_TYPE_GXS_MAIL, &identities,
AuthenPolicy(), GXS_STORAGE_PERIOD ),
GxsTokenQueue(this), idService(identities),
servClientsMutex("p3GxsMails client services map mutex") {}
servClientsMutex("p3GxsMails client services map mutex"),
outgoingMutex("p3GxsMails outgoing queue map mutex"),
ingoingMutex("p3GxsMails ingoing queue map mutex") {}
/**
* Send an email to recipient, in the process author of the email is
@ -61,37 +83,43 @@ struct p3GxsMails : RsGenExchange, GxsTokenQueue
* This method is part of the public interface of this service.
* @return true if the mail will be sent, false if not
*/
bool sendMail( GxsMailsClient::GxsMailSubServices service,
bool sendMail( RsGxsMailId& mailId,
GxsMailsClient::GxsMailSubServices service,
const RsGxsId& own_gxsid, const RsGxsId& recipient,
const uint8_t* data, uint32_t size,
RsGxsMailBaseItem::EncryptionMode cm = RsGxsMailBaseItem::RSA
RsGxsMailEncryptionMode cm = RsGxsMailEncryptionMode::RSA
);
/**
* This method is part of the public interface of this service.
* @return false if mail is not found in outgoing queue, true otherwise
*/
bool querySendMailStatus( RsGxsMailId mailId, GxsMailStatus& st );
/**
* Register a client service to p3GxsMails to receive mails via
* GxsMailsClient::receiveGxsMail(...) callback
* This method is part of the public interface of this service.
*/
void registerGxsMailsClient( GxsMailsClient::GxsMailSubServices serviceType,
GxsMailsClient* service );
/// @see RsGenExchange::getServiceInfo()
virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_MAIL, "GXS Mails", 0, 1, 0, 1 ); }
private:
/// @see GxsTokenQueue::handleResponse(uint32_t token, uint32_t req_type)
virtual void handleResponse(uint32_t token, uint32_t req_type);
/// @see RsGenExchange::service_tick()
virtual void service_tick();
/// @see RsGenExchange::getServiceInfo()
virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_MAIL, "GXS Mails", 0, 1, 0, 1 ); }
/// @see RsGenExchange::service_CreateGroup(...)
RsGenExchange::ServiceCreate_Return service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet&);
protected:
/// @see RsGenExchange::notifyChanges(std::vector<RsGxsNotify *> &changes)
void notifyChanges(std::vector<RsGxsNotify *> &changes);
private:
/** Time interval of inactivity before a distribution group is unsubscribed.
* Approximatively 3 months seems ok ATM. */
const static int32_t UNUSED_GROUP_UNSUBSCRIBE_INTERVAL = 0x76A700;
@ -101,7 +129,7 @@ private:
* very fast taking in account we are handling mails for the whole network.
* We do prefer to resend a not acknowledged yet mail after
* GXS_STORAGE_PERIOD has passed and keep it little.
* Tought it can't be too little as this may cause signed acknowledged to
* Tought it can't be too little as this may cause signed receipts to
* get lost thus causing resend and fastly grow perceived async latency, in
* case two sporadically connected users sends mails each other.
* TODO: While it is ok for signed acknowledged to stays in the DB for a
@ -132,6 +160,33 @@ private:
std::map<GxsMailsClient::GxsMailSubServices, GxsMailsClient*> servClients;
RsMutex servClientsMutex;
struct OutgoingRecord
{
OutgoingRecord( RsGxsId rec, GxsMailsClient::GxsMailSubServices cs,
const uint8_t* data, uint32_t size ) :
status(GxsMailStatus::PENDING_PROCESSING), recipient(rec),
clientService(cs)
{
mailData.resize(size);
memcpy(&mailData[0], data, size);
}
GxsMailStatus status;
RsGxsId recipient;
RsGxsMailItem mailItem; /// Don't use a pointer would be invalid after publish
std::vector<uint8_t> mailData;
GxsMailsClient::GxsMailSubServices clientService;
RsNxsMailPresignedReceipt presignedReceipt;
};
/// Keep track of mails while being processed
typedef std::map<RsGxsMailId, OutgoingRecord> prMap;
prMap outgoingQueue;
RsMutex outgoingMutex;
void processOutgoingRecord(OutgoingRecord& r);
typedef std::map<RsGxsMailId, RsGxsMailBaseItem*> inMap;
inMap ingoingQueue;
RsMutex ingoingMutex;
/// Request groups list to GXS backend. Async method.
bool requestGroupsData(const std::list<RsGxsGroupId>* groupIds = NULL);
@ -171,27 +226,69 @@ private:
const uint8_t* decrypted_data,
uint32_t decrypted_data_size );
bool preparePresignedReceipt( const RsGxsMailItem& mail,
RsNxsMailPresignedReceipt& receipt );
void notifyClientService(const OutgoingRecord& pr);
};
struct TestGxsMailClientService : GxsMailsClient
struct TestGxsMailClientService : GxsMailsClient, RsSingleJobThread
{
TestGxsMailClientService(p3GxsMails& gmxMailService)
TestGxsMailClientService( p3GxsMails& gxsMailService,
p3IdService& gxsIdService ) :
mailService(gxsMailService), idService(gxsIdService)
{
gmxMailService.registerGxsMailsClient( GxsMailSubServices::TEST_SERVICE,
this );
mailService.registerGxsMailsClient( GxsMailSubServices::TEST_SERVICE,
this );
}
/// @see GxsMailsClient::receiveGxsMail(...)
virtual bool receiveGxsMail( const RsGxsMailItem* originalMessage,
virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage,
const uint8_t* data, uint32_t dataSize )
{
std::cout << "TestGxsMailClientService::receiveGxsMail(...) got message"
<< " from: " << originalMessage->meta.mAuthorId << std::endl
<< "\t" << std::string((char*)data, dataSize) << std::endl;
<< " from: " << originalMessage.meta.mAuthorId << std::endl
<< "\t>" << std::string((char*)data, dataSize) << "<"
<< std::endl;
return true;
}
/// @see GxsMailsClient::notifyMailStatus(...)
virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage,
GxsMailStatus status )
{
std::cout << "TestGxsMailClientService::notifyMailsStatus(...) for: "
<< originalMessage.mailId << " status: "
<< static_cast<uint>(status) << std::endl;
if( status == GxsMailStatus::RECEIPT_RECEIVED )
std::cout << "\t It mean Receipt has been Received!" << std::endl;
return true;
}
/// @see RsSingleJobThread::run()
virtual void run()
{
usleep(10*1000*1000);
RsGxsId gxsidA("d0df7474bdde0464679e6ef787890287");
RsGxsId gxsidB("d060bea09dfa14883b5e6e517eb580cd");
RsGxsMailId mailId = 0;
if(idService.isOwnId(gxsidA))
{
std::string ciao("CiAone!");
mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidA,
gxsidB,
reinterpret_cast<const uint8_t*>(ciao.data()),
ciao.size() );
}
else if(idService.isOwnId(gxsidB))
{
std::string ciao("CiBuono!");
mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidB,
gxsidA,
reinterpret_cast<const uint8_t*>(ciao.data()),
ciao.size() );
}
}
private:
p3GxsMails& mailService;
p3IdService& idService;
};