p3GxsMails queues persistents accross RS sessions

RsServer properly handle deletion (childs are not yet)
p3GxsMails properly handle deletion
RsControl::instance() use proper static initialization
p3GxsMails register configuration files at right time
RsGxsMailBaseItem take in account offset in header pointer
RsGxsMailSerializer use C++11 safe enum class for items types
RsGxsMailItem take in account offset in header pointer
RsGxsMailItem::deserialize(...) properly calculate final offset
p3GxsMails::handleResponse(...) delete group items after usage
move ex inner struct OutgoingRecord to gxs mail items header
p3MsgService::saveList removed unused variable
p3MsgService::notifyDataStatus(...) take in account multiple backends
p3MsgService::receiveGxsMail(...) take in account multiple backends
p3MsgService::receiveGRouterData(...) take in account multiple backends
Added prersistence to p3MsgService::gxsOngoingMessages
This commit is contained in:
Gioacchino Mazzurco 2017-02-26 00:46:02 +01:00
parent 748e75d3e1
commit 379fb97062
10 changed files with 535 additions and 201 deletions

View File

@ -53,9 +53,10 @@ int InitRetroShare(int argc, char **argv, RsInit *config);
class RsControl /* The Main Interface Class - for controlling the server */ class RsControl /* The Main Interface Class - for controlling the server */
{ {
public: public:
static RsControl *instance() ; /// TODO: This should return a reference instead of a pointer!
static void earlyInitNotificationSystem() { instance() ; } static RsControl *instance();
static void earlyInitNotificationSystem() { instance(); }
/* Real Startup Fn */ /* Real Startup Fn */
virtual int StartupRetroShare() = 0; virtual int StartupRetroShare() = 0;

View File

@ -122,7 +122,7 @@ RsServer::RsServer()
RsServer::~RsServer() RsServer::~RsServer()
{ {
return; delete mGxsMails;
} }
/* General Internal Helper Functions /* General Internal Helper Functions

View File

@ -179,6 +179,7 @@ class RsServer: public RsControl, public RsTickingThread
// p3GxsForums *mGxsForums; // p3GxsForums *mGxsForums;
// p3GxsChannels *mGxsChannels; // p3GxsChannels *mGxsChannels;
// p3Wire *mWire; // p3Wire *mWire;
p3GxsMails* mGxsMails;
/* Config */ /* Config */
p3ConfigMgr *mConfigMgr; p3ConfigMgr *mConfigMgr;

View File

@ -944,14 +944,11 @@ RsGRouter *rsGRouter = NULL ;
RsControl *RsControl::instance() RsControl *RsControl::instance()
{ {
static RsServer *rsicontrol = NULL ; static RsServer rsicontrol;
return &rsicontrol;
if(rsicontrol == NULL)
rsicontrol = new RsServer();
return rsicontrol;
} }
/* /*
* The Real RetroShare Startup Function. * The Real RetroShare Startup Function.
*/ */
@ -1491,14 +1488,13 @@ int RsServer::StartupRetroShare()
RsGeneralDataService* gxsmail_ds = new RsDataService( RsGeneralDataService* gxsmail_ds = new RsDataService(
currGxsDir + "/", "gxsmails_db", RS_SERVICE_TYPE_GXS_MAIL, currGxsDir + "/", "gxsmails_db", RS_SERVICE_TYPE_GXS_MAIL,
NULL, rsInitConfig->gxs_passwd ); NULL, rsInitConfig->gxs_passwd );
p3GxsMails* mGxsMails = new p3GxsMails(gxsmail_ds, NULL, *mGxsIdService); mGxsMails = new p3GxsMails(gxsmail_ds, NULL, *mGxsIdService);
RsGxsNetService* gxsmails_ns = new RsGxsNetService( RsGxsNetService* gxsmails_ns = new RsGxsNetService(
RS_SERVICE_TYPE_GXS_MAIL, gxsmail_ds, nxsMgr, mGxsMails, RS_SERVICE_TYPE_GXS_MAIL, gxsmail_ds, nxsMgr, mGxsMails,
mGxsMails->getServiceInfo(), mReputations, mGxsCircles, mGxsMails->getServiceInfo(), mReputations, mGxsCircles,
mGxsIdService, pgpAuxUtils); mGxsIdService, pgpAuxUtils);
mGxsMails->setNetworkExchangeService(gxsmails_ns); mGxsMails->setNetworkExchangeService(gxsmails_ns);
pqih->addService(gxsmails_ns, true); pqih->addService(gxsmails_ns, true);
mConfigMgr->addConfiguration("gxs_mail.cfg", gxsmails_ns);
TestGxsMailClientService* tgms = TestGxsMailClientService* tgms =
new TestGxsMailClientService(*mGxsMails, *mGxsIdService); new TestGxsMailClientService(*mGxsMails, *mGxsIdService);
@ -1673,13 +1669,20 @@ int RsServer::StartupRetroShare()
#ifdef ENABLE_GROUTER #ifdef ENABLE_GROUTER
mConfigMgr->addConfiguration("grouter.cfg", gr); mConfigMgr->addConfiguration("grouter.cfg", gr);
#endif #endif
mConfigMgr->addConfiguration("p3identity.cfg", mGxsIdService);
#ifdef RS_USE_BITDHT #ifdef RS_USE_BITDHT
mConfigMgr->addConfiguration("bitdht.cfg", mBitDht); mConfigMgr->addConfiguration("bitdht.cfg", mBitDht);
#endif #endif
#ifdef RS_ENABLE_GXS #ifdef RS_ENABLE_GXS
# ifdef RS_GXS_MAIL
mConfigMgr->addConfiguration("gxs_mail_ns.cfg", gxsmails_ns);
mConfigMgr->addConfiguration("gxs_mail.cfg", mGxsMails);
# endif
mConfigMgr->addConfiguration("p3identity.cfg", mGxsIdService);
mConfigMgr->addConfiguration("identity.cfg", gxsid_ns); mConfigMgr->addConfiguration("identity.cfg", gxsid_ns);
mConfigMgr->addConfiguration("gxsforums.cfg", gxsforums_ns); mConfigMgr->addConfiguration("gxsforums.cfg", gxsforums_ns);
mConfigMgr->addConfiguration("gxschannels.cfg", gxschannels_ns); mConfigMgr->addConfiguration("gxschannels.cfg", gxschannels_ns);

View File

@ -24,7 +24,7 @@ const RsGxsId RsGxsMailItem::allRecipientsHint("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
bool RsGxsMailBaseItem::serialize(uint8_t* data, uint32_t size, bool RsGxsMailBaseItem::serialize(uint8_t* data, uint32_t size,
uint32_t& offset) const uint32_t& offset) const
{ {
bool ok = setRsItemHeader(data, size, PacketId(), size); bool ok = setRsItemHeader(data+offset, size, PacketId(), size);
ok = ok && (offset += 8); // Take header in account ok = ok && (offset += 8); // Take header in account
ok = ok && setRawUInt64(data, size, &offset, mailId); ok = ok && setRawUInt64(data, size, &offset, mailId);
return ok; return ok;
@ -33,9 +33,11 @@ bool RsGxsMailBaseItem::serialize(uint8_t* data, uint32_t size,
bool RsGxsMailBaseItem::deserialize(const uint8_t* data, uint32_t& size, bool RsGxsMailBaseItem::deserialize(const uint8_t* data, uint32_t& size,
uint32_t& offset) uint32_t& offset)
{ {
void* dataPtr = reinterpret_cast<void*>(const_cast<uint8_t*>(data)); void* hdrPtr = const_cast<uint8_t*>(data+offset);
uint32_t rssize = getRsItemSize(dataPtr); uint32_t rssize = getRsItemSize(hdrPtr);
uint32_t roffset = offset + 8; // Take header in account uint32_t roffset = offset + 8; // Take header in account
void* dataPtr = const_cast<uint8_t*>(data);
bool ok = rssize <= size; bool ok = rssize <= size;
ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &mailId); ok = ok && getRawUInt64(dataPtr, rssize, &roffset, &mailId);
if(ok) { size = rssize; offset = roffset; } if(ok) { size = rssize; offset = roffset; }
@ -57,27 +59,34 @@ bool RsGxsMailSerializer::serialise(RsItem* item, void* data, uint32_t* size)
} }
uint8_t* dataPtr = reinterpret_cast<uint8_t*>(data); uint8_t* dataPtr = reinterpret_cast<uint8_t*>(data);
bool ok = true; bool ok = false;
switch(item->PacketSubType()) switch(static_cast<GxsMailItemsSubtypes>(item->PacketSubType()))
{ {
case GXS_MAIL_SUBTYPE_MAIL: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_MAIL:
{ {
uint32_t offset = 0; uint32_t offset = 0;
RsGxsMailItem* i = dynamic_cast<RsGxsMailItem*>(item); RsGxsMailItem* i = dynamic_cast<RsGxsMailItem*>(item);
ok = i->serialize(dataPtr, itemSize, offset); ok = i && i->serialize(dataPtr, itemSize, offset);
break; break;
} }
case GXS_MAIL_SUBTYPE_RECEIPT: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_RECEIPT:
{ {
RsGxsMailPresignedReceipt* i = RsGxsMailPresignedReceipt* i =
dynamic_cast<RsGxsMailPresignedReceipt*>(item); dynamic_cast<RsGxsMailPresignedReceipt*>(item);
uint32_t offset = 0; uint32_t offset = 0;
ok = i->serialize(dataPtr, itemSize, offset); ok = i && i->serialize(dataPtr, itemSize, offset);
break; break;
} }
case GXS_MAIL_SUBTYPE_GROUP: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_GROUP:
ok = setRsItemHeader(data, itemSize, item->PacketId(), itemSize); ok = setRsItemHeader(data, itemSize, item->PacketId(), itemSize);
break; break;
case GxsMailItemsSubtypes::OUTGOING_RECORD_ITEM:
{
uint32_t offset = 0;
OutgoingRecord* i = dynamic_cast<OutgoingRecord*>(item);
ok = i && i->serialize(dataPtr, itemSize, offset);
break;
}
default: ok = false; break; default: ok = false; break;
} }
@ -91,3 +100,113 @@ bool RsGxsMailSerializer::serialise(RsItem* item, void* data, uint32_t* size)
return false; return false;
} }
OutgoingRecord::OutgoingRecord( RsGxsId rec, GxsMailSubServices cs,
const uint8_t* data, uint32_t size ) :
RsItem( RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_GXS_MAIL,
static_cast<uint8_t>(GxsMailItemsSubtypes::OUTGOING_RECORD_ITEM) ),
status(GxsMailStatus::PENDING_PROCESSING), recipient(rec),
clientService(cs)
{
mailData.resize(size);
memcpy(&mailData[0], data, size);
}
void OutgoingRecord::clear()
{
status = GxsMailStatus::UNKNOWN;
recipient.clear();
mailItem.clear();
mailData.clear();
clientService = GxsMailSubServices::UNKNOWN;
presignedReceipt.clear();
}
std::ostream& OutgoingRecord::print(std::ostream& out, uint16_t)
{ return out << "TODO: OutgoingRecordItem::print(...)"; }
OutgoingRecord::OutgoingRecord() :
RsItem( RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_GXS_MAIL,
static_cast<uint8_t>(GxsMailItemsSubtypes::OUTGOING_RECORD_ITEM) )
{ clear();}
uint32_t OutgoingRecord::size() const
{
return 8 + // Header
1 + // status
recipient.serial_size() +
mailItem.size() +
4 + // sizeof(mailData.size())
mailData.size() +
2 + // clientService
presignedReceipt.serial_size();
}
bool OutgoingRecord::serialize( uint8_t* data, uint32_t size,
uint32_t& offset) const
{
bool ok = true;
ok = ok && setRsItemHeader(data+offset, size, PacketId(), size)
&& (offset += 8); // Take header in account
ok = ok && setRawUInt8(data, size, &offset, static_cast<uint8_t>(status));
ok = ok && recipient.serialise(data, size, offset);
uint32_t tmpOffset = 0;
uint32_t tmpSize = mailItem.size();
ok = ok && mailItem.serialize(data+offset, tmpSize, tmpOffset)
&& (offset += tmpOffset);
uint32_t dSize = mailData.size();
ok = ok && setRawUInt32(data, size, &offset, dSize)
&& memcpy(data+offset, &mailData[0], dSize) && (offset += dSize);
ok = ok && setRawUInt16( data, size, &offset,
static_cast<uint16_t>(clientService) );
dSize = presignedReceipt.serial_size();
ok = ok && presignedReceipt.serialise(data+offset, dSize)
&& (offset += dSize);
return ok;
}
bool OutgoingRecord::deserialize(
const uint8_t* data, uint32_t& size, uint32_t& offset)
{
bool ok = true;
void* dataPtr = const_cast<uint8_t*>(data);
offset += 8; // Header
uint8_t tmpStatus = 0;
ok = ok && getRawUInt8(dataPtr, size, &offset, &tmpStatus);
status = static_cast<GxsMailStatus>(tmpStatus);
uint32_t tmpSize = size;
ok = ok && recipient.deserialise(dataPtr, tmpSize, offset);
void* hdrPtr = const_cast<uint8_t*>(data+offset);
tmpSize = getRsItemSize(hdrPtr);
uint32_t tmpOffset = 0;
ok = ok && mailItem.deserialize(static_cast<uint8_t*>(hdrPtr), tmpSize, tmpOffset);
ok = ok && (offset += tmpOffset);
tmpSize = size;
ok = getRawUInt32(dataPtr, tmpSize, &offset, &tmpSize);
ok = ok && (tmpSize+offset < size);
ok = ok && (mailData.resize(tmpSize), memcpy(&mailData[0], data, tmpSize));
ok = ok && (offset += tmpSize);
uint16_t cs = 0;
ok = ok && getRawUInt16(dataPtr, offset+2, &offset, &cs);
clientService = static_cast<GxsMailSubServices>(cs);
tmpSize = size;
ok = ok && presignedReceipt.deserialize(data, tmpSize, offset);
return ok;
}

View File

@ -26,12 +26,21 @@
#include "retroshare/rsgxscircles.h" // For: GXS_CIRCLE_TYPE_PUBLIC #include "retroshare/rsgxscircles.h" // For: GXS_CIRCLE_TYPE_PUBLIC
#include "services/p3idservice.h" #include "services/p3idservice.h"
/// Subservices identifiers (like port for TCP)
enum class GxsMailSubServices : uint16_t
{
UNKNOWN = 0,
TEST_SERVICE = 1,
P3_MSG_SERVICE = 2
};
/// Values must fit into uint8_t /// Values must fit into uint8_t
enum GxsMailItemsSubtypes enum class GxsMailItemsSubtypes : uint8_t
{ {
GXS_MAIL_SUBTYPE_MAIL = 1, GXS_MAIL_SUBTYPE_MAIL = 1,
GXS_MAIL_SUBTYPE_RECEIPT = 2, GXS_MAIL_SUBTYPE_RECEIPT = 2,
GXS_MAIL_SUBTYPE_GROUP = 3 GXS_MAIL_SUBTYPE_GROUP = 3,
OUTGOING_RECORD_ITEM = 4
}; };
typedef uint64_t RsGxsMailId; typedef uint64_t RsGxsMailId;
@ -67,7 +76,8 @@ struct RsGxsMailBaseItem : RsGxsMsgItem
struct RsGxsMailPresignedReceipt : RsGxsMailBaseItem struct RsGxsMailPresignedReceipt : RsGxsMailBaseItem
{ {
RsGxsMailPresignedReceipt() : RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_RECEIPT) {} RsGxsMailPresignedReceipt() :
RsGxsMailBaseItem(GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_RECEIPT) {}
}; };
enum class RsGxsMailEncryptionMode : uint8_t enum class RsGxsMailEncryptionMode : uint8_t
@ -79,7 +89,8 @@ enum class RsGxsMailEncryptionMode : uint8_t
struct RsGxsMailItem : RsGxsMailBaseItem struct RsGxsMailItem : RsGxsMailBaseItem
{ {
RsGxsMailItem() : RsGxsMailBaseItem(GXS_MAIL_SUBTYPE_MAIL), RsGxsMailItem() :
RsGxsMailBaseItem(GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_MAIL),
cryptoType(RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION) {} cryptoType(RsGxsMailEncryptionMode::UNDEFINED_ENCRYPTION) {}
RsGxsMailEncryptionMode cryptoType; RsGxsMailEncryptionMode cryptoType;
@ -158,17 +169,21 @@ struct RsGxsMailItem : RsGxsMailBaseItem
} }
bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset) bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset)
{ {
void* dataPtr = reinterpret_cast<void*>(const_cast<uint8_t*>(data)); void* sizePtr = const_cast<uint8_t*>(data+offset);
uint32_t rssize = getRsItemSize(dataPtr); uint32_t rssize = getRsItemSize(sizePtr);
uint32_t roffset = offset; uint32_t roffset = offset;
bool ok = rssize <= size && size < MAX_SIZE; bool ok = rssize <= size && size < MAX_SIZE;
ok = ok && RsGxsMailBaseItem::deserialize(data, rssize, roffset); ok = ok && RsGxsMailBaseItem::deserialize(data, rssize, roffset);
void* dataPtr = const_cast<uint8_t*>(data);
uint8_t crType; uint8_t crType;
ok = ok && getRawUInt8(dataPtr, rssize, &roffset, &crType); ok = ok && getRawUInt8(dataPtr, rssize, &roffset, &crType);
cryptoType = static_cast<RsGxsMailEncryptionMode>(crType); cryptoType = static_cast<RsGxsMailEncryptionMode>(crType);
ok = ok && recipientsHint.deserialise(dataPtr, rssize, roffset); ok = ok && recipientsHint.deserialise(dataPtr, rssize, roffset);
uint32_t psz = rssize - roffset; uint32_t psz = rssize - roffset;
ok = ok && (payload.resize(psz), memcpy(&payload[0], data+roffset, psz)); ok = ok && (payload.resize(psz), memcpy(&payload[0], data+roffset, psz));
ok = ok && (roffset += psz);
if(ok) { size = rssize; offset = roffset; } if(ok) { size = rssize; offset = roffset; }
else size = 0; else size = 0;
return ok; return ok;
@ -188,7 +203,9 @@ struct RsGxsMailItem : RsGxsMailBaseItem
struct RsGxsMailGroupItem : RsGxsGrpItem struct RsGxsMailGroupItem : RsGxsGrpItem
{ {
RsGxsMailGroupItem() : RsGxsMailGroupItem() :
RsGxsGrpItem(RS_SERVICE_TYPE_GXS_MAIL, GXS_MAIL_SUBTYPE_GROUP) RsGxsGrpItem( RS_SERVICE_TYPE_GXS_MAIL,
static_cast<uint8_t>(
GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_GROUP) )
{ {
meta.mGroupFlags = GXS_SERV::FLAG_PRIVACY_PUBLIC; meta.mGroupFlags = GXS_SERV::FLAG_PRIVACY_PUBLIC;
meta.mGroupName = "Mail"; meta.mGroupName = "Mail";
@ -200,6 +217,54 @@ struct RsGxsMailGroupItem : RsGxsGrpItem
{ return out; } { return out; }
}; };
enum class GxsMailStatus : uint8_t
{
UNKNOWN = 0,
PENDING_PROCESSING,
PENDING_PREFERRED_GROUP,
PENDING_RECEIPT_CREATE,
PENDING_RECEIPT_SIGNATURE,
PENDING_SERIALIZATION,
PENDING_PAYLOAD_CREATE,
PENDING_PAYLOAD_ENCRYPT,
PENDING_PUBLISH,
/** This will be useful so the user can know if the mail reached at least
* some friend node, in case of internet connection interruption */
//PENDING_TRANSFER,
PENDING_RECEIPT_RECEIVE,
/// Records with status >= RECEIPT_RECEIVED get deleted
RECEIPT_RECEIVED,
FAILED_RECEIPT_SIGNATURE = 240,
FAILED_ENCRYPTION
};
class RsGxsMailSerializer;
struct OutgoingRecord : RsItem
{
OutgoingRecord( RsGxsId rec, GxsMailSubServices cs,
const uint8_t* data, uint32_t size );
GxsMailStatus status;
RsGxsId recipient;
/// Don't use a pointer would be invalid after publish
RsGxsMailItem mailItem;
std::vector<uint8_t> mailData;
GxsMailSubServices clientService;
RsNxsMailPresignedReceipt presignedReceipt;
uint32_t size() const;
bool serialize(uint8_t* data, uint32_t size, uint32_t& offset) const;
bool deserialize(const uint8_t* data, uint32_t& size, uint32_t& offset);
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent = 0);
private:
friend class RsGxsMailSerializer;
OutgoingRecord();
};
struct RsGxsMailSerializer : RsSerialType struct RsGxsMailSerializer : RsSerialType
{ {
RsGxsMailSerializer() : RsSerialType( RS_PKT_VERSION_SERVICE, RsGxsMailSerializer() : RsSerialType( RS_PKT_VERSION_SERVICE,
@ -209,17 +274,23 @@ struct RsGxsMailSerializer : RsSerialType
uint32_t size(RsItem* item) uint32_t size(RsItem* item)
{ {
uint32_t sz = 0; uint32_t sz = 0;
switch(item->PacketSubType()) switch(static_cast<GxsMailItemsSubtypes>(item->PacketSubType()))
{ {
case GXS_MAIL_SUBTYPE_MAIL: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_MAIL:
{ {
RsGxsMailItem* i = dynamic_cast<RsGxsMailItem*>(item); RsGxsMailItem* i = dynamic_cast<RsGxsMailItem*>(item);
if(i) sz = i->size(); if(i) sz = i->size();
break; break;
} }
case GXS_MAIL_SUBTYPE_RECEIPT: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_RECEIPT:
sz = RsGxsMailPresignedReceipt::size(); break; sz = RsGxsMailPresignedReceipt::size(); break;
case GXS_MAIL_SUBTYPE_GROUP: sz = 8; break; case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_GROUP: sz = 8; break;
case GxsMailItemsSubtypes::OUTGOING_RECORD_ITEM:
{
OutgoingRecord* ci = dynamic_cast<OutgoingRecord*>(item);
if(ci) sz = ci->size();
break;
}
default: break; default: break;
} }
@ -248,9 +319,9 @@ struct RsGxsMailSerializer : RsSerialType
bool ok = true; bool ok = true;
RsItem* ret = NULL; RsItem* ret = NULL;
switch (getRsItemSubType(rstype)) switch (static_cast<GxsMailItemsSubtypes>(getRsItemSubType(rstype)))
{ {
case GXS_MAIL_SUBTYPE_MAIL: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_MAIL:
{ {
RsGxsMailItem* i = new RsGxsMailItem(); RsGxsMailItem* i = new RsGxsMailItem();
uint32_t offset = 0; uint32_t offset = 0;
@ -258,7 +329,7 @@ struct RsGxsMailSerializer : RsSerialType
ret = i; ret = i;
break; break;
} }
case GXS_MAIL_SUBTYPE_RECEIPT: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_RECEIPT:
{ {
RsGxsMailPresignedReceipt* i = new RsGxsMailPresignedReceipt(); RsGxsMailPresignedReceipt* i = new RsGxsMailPresignedReceipt();
uint32_t offset = 0; uint32_t offset = 0;
@ -266,11 +337,19 @@ struct RsGxsMailSerializer : RsSerialType
ret = i; ret = i;
break; break;
} }
case GXS_MAIL_SUBTYPE_GROUP: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_GROUP:
{ {
ret = new RsGxsMailGroupItem(); ret = new RsGxsMailGroupItem();
break; break;
} }
case GxsMailItemsSubtypes::OUTGOING_RECORD_ITEM:
{
OutgoingRecord* i = new OutgoingRecord();
uint32_t offset = 0;
ok = ok && i->deserialize(dataPtr, *size, offset);
ret = i;
break;
}
default: default:
ok = false; ok = false;
break; break;

View File

@ -20,8 +20,18 @@
#include "util/stacktrace.h" #include "util/stacktrace.h"
p3GxsMails::~p3GxsMails()
{
p3Config::saveConfiguration();
{
RS_STACK_MUTEX(ingoingMutex);
for ( auto& kv : ingoingQueue ) delete kv.second;
}
}
bool p3GxsMails::sendMail( RsGxsMailId& mailId, bool p3GxsMails::sendMail( RsGxsMailId& mailId,
GxsMailsClient::GxsMailSubServices service, GxsMailSubServices service,
const RsGxsId& own_gxsid, const RsGxsId& recipient, const RsGxsId& own_gxsid, const RsGxsId& recipient,
const uint8_t* data, uint32_t size, const uint8_t* data, uint32_t size,
RsGxsMailEncryptionMode cm ) RsGxsMailEncryptionMode cm )
@ -69,13 +79,12 @@ bool p3GxsMails::querySendMailStatus(RsGxsMailId mailId, GxsMailStatus& st)
} }
void p3GxsMails::registerGxsMailsClient( void p3GxsMails::registerGxsMailsClient(
GxsMailsClient::GxsMailSubServices serviceType, GxsMailsClient* service) GxsMailSubServices serviceType, GxsMailsClient* service)
{ {
RS_STACK_MUTEX(servClientsMutex); RS_STACK_MUTEX(servClientsMutex);
servClients[serviceType] = service; servClients[serviceType] = service;
} }
void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type) void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type)
{ {
std::cout << "p3GxsMails::handleResponse(" << token << ", " << req_type std::cout << "p3GxsMails::handleResponse(" << token << ", " << req_type
@ -87,8 +96,7 @@ void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type)
std::vector<RsGxsGrpItem*> groups; std::vector<RsGxsGrpItem*> groups;
getGroupData(token, groups); getGroupData(token, groups);
for( std::vector<RsGxsGrpItem *>::iterator it = groups.begin(); for( auto grp : groups )
it != groups.end(); ++it )
{ {
/* For each group check if it is better candidate then /* For each group check if it is better candidate then
* preferredGroupId, if it is supplant it and subscribe if it is not * preferredGroupId, if it is supplant it and subscribe if it is not
@ -98,17 +106,39 @@ void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type)
* unsubscribe. * unsubscribe.
*/ */
const RsGroupMetaData& meta = (*it)->meta; const RsGroupMetaData& meta = grp->meta;
bool subscribed = IS_GROUP_SUBSCRIBED(meta.mSubscribeFlags); bool subscribed = IS_GROUP_SUBSCRIBED(meta.mSubscribeFlags);
bool old = olderThen( meta.mLastPost, bool old = olderThen( meta.mLastPost,
UNUSED_GROUP_UNSUBSCRIBE_INTERVAL ); UNUSED_GROUP_UNSUBSCRIBE_INTERVAL );
bool supersede = supersedePreferredGroup(meta.mGroupId); bool supersede = supersedePreferredGroup(meta.mGroupId);
uint32_t token; uint32_t token;
if( !subscribed && ( !old || supersede )) bool shoudlSubscribe = !subscribed && ( !old || supersede );
bool shoudlUnSubscribe = subscribed && old
&& meta.mGroupId != preferredGroupId;
if(shoudlSubscribe)
subscribeToGroup(token, meta.mGroupId, true); subscribeToGroup(token, meta.mGroupId, true);
else if( subscribed && old ) else if(shoudlUnSubscribe)
subscribeToGroup(token, meta.mGroupId, false); subscribeToGroup(token, meta.mGroupId, false);
#ifdef GXS_MAIL_GRP_DEBUG
char buff[30];
struct tm* timeinfo;
timeinfo = localtime(&meta.mLastPost);
strftime(buff, sizeof(buff), "%Y %b %d %H:%M", timeinfo);
std::cout << "p3GxsMails::handleResponse(...) GROUPS_LIST "
<< "meta.mGroupId: " << meta.mGroupId
<< " meta.mLastPost: " << buff
<< " subscribed: " << subscribed
<< " old: " << old
<< " shoudlSubscribe: " << shoudlSubscribe
<< " shoudlUnSubscribe: " << shoudlUnSubscribe
<< std::endl;
#endif // GXS_MAIL_GRP_DEBUG
delete grp;
} }
if(preferredGroupId.isNull()) if(preferredGroupId.isNull())
@ -150,10 +180,10 @@ void p3GxsMails::handleResponse(uint32_t token, uint32_t req_type)
for( vT::const_iterator mIt = mv.begin(); mIt != mv.end(); ++mIt ) for( vT::const_iterator mIt = mv.begin(); mIt != mv.end(); ++mIt )
{ {
RsGxsMsgItem* gIt = *mIt; RsGxsMsgItem* gIt = *mIt;
switch(gIt->PacketSubType()) switch(static_cast<GxsMailItemsSubtypes>(gIt->PacketSubType()))
{ {
case GXS_MAIL_SUBTYPE_MAIL: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_MAIL:
case GXS_MAIL_SUBTYPE_RECEIPT: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_RECEIPT:
{ {
RsGxsMailBaseItem* mb = RsGxsMailBaseItem* mb =
dynamic_cast<RsGxsMailBaseItem*>(*mIt); dynamic_cast<RsGxsMailBaseItem*>(*mIt);
@ -211,9 +241,10 @@ void p3GxsMails::service_tick()
RS_STACK_MUTEX(ingoingMutex); RS_STACK_MUTEX(ingoingMutex);
for( auto it = ingoingQueue.begin(); it != ingoingQueue.end(); ) for( auto it = ingoingQueue.begin(); it != ingoingQueue.end(); )
{ {
switch (it->second->PacketSubType()) switch(static_cast<GxsMailItemsSubtypes>(
it->second->PacketSubType()))
{ {
case GXS_MAIL_SUBTYPE_MAIL: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_MAIL:
{ {
RsGxsMailItem* msg = dynamic_cast<RsGxsMailItem*>(it->second); RsGxsMailItem* msg = dynamic_cast<RsGxsMailItem*>(it->second);
if(!msg) if(!msg)
@ -238,7 +269,7 @@ void p3GxsMails::service_tick()
} }
break; break;
} }
case GXS_MAIL_SUBTYPE_RECEIPT: case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_RECEIPT:
{ {
RsGxsMailPresignedReceipt* rcpt = RsGxsMailPresignedReceipt* rcpt =
dynamic_cast<RsGxsMailPresignedReceipt*>(it->second); dynamic_cast<RsGxsMailPresignedReceipt*>(it->second);
@ -276,9 +307,11 @@ void p3GxsMails::service_tick()
} }
} }
RsGenExchange::ServiceCreate_Return p3GxsMails::service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/) RsGenExchange::ServiceCreate_Return p3GxsMails::service_CreateGroup(
RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /*keySet*/ )
{ {
std::cout << "p3GxsMails::service_CreateGroup(...) " << grpItem->meta.mGroupId << std::endl; std::cout << "p3GxsMails::service_CreateGroup(...) "
<< grpItem->meta.mGroupId << std::endl;
return SERVICE_CREATE_SUCCESS; return SERVICE_CREATE_SUCCESS;
} }
@ -434,8 +467,7 @@ bool p3GxsMails::dispatchDecryptedMail( const RsGxsMailItem* received_msg,
<< "happening!" << std::endl; << "happening!" << std::endl;
return false; return false;
} }
GxsMailsClient::GxsMailSubServices rsrvc; GxsMailSubServices rsrvc = static_cast<GxsMailSubServices>(csri);
rsrvc = static_cast<GxsMailsClient::GxsMailSubServices>(csri);
RsNxsMailPresignedReceipt* receipt = new RsNxsMailPresignedReceipt(); RsNxsMailPresignedReceipt* receipt = new RsNxsMailPresignedReceipt();
uint32_t rcptsize = decrypted_data_size; uint32_t rcptsize = decrypted_data_size;
@ -659,3 +691,85 @@ void p3GxsMails::notifyClientService(const OutgoingRecord& pr)
print_stacktrace(); print_stacktrace();
} }
RsSerialiser* p3GxsMails::setupSerialiser()
{
RsSerialiser* rss = new RsSerialiser;
rss->addSerialType(new RsGxsMailSerializer);
return rss;
}
bool p3GxsMails::saveList(bool &cleanup, std::list<RsItem *>& saveList)
{
std::cout << "p3GxsMails::saveList(...)" << saveList.size() << " "
<< ingoingQueue.size() << " " << outgoingQueue.size()
<< std::endl;
outgoingMutex.lock();
ingoingMutex.lock();
for ( auto& kv : outgoingQueue ) saveList.push_back(&kv.second);
for ( auto& kv : ingoingQueue ) saveList.push_back(kv.second);
std::cout << "p3GxsMails::saveList(...)" << saveList.size() << " "
<< ingoingQueue.size() << " " << outgoingQueue.size()
<< std::endl;
cleanup = false;
return true;
}
void p3GxsMails::saveDone()
{
outgoingMutex.unlock();
ingoingMutex.unlock();
}
bool p3GxsMails::loadList(std::list<RsItem *>&loadList)
{
std::cout << "p3GxsMails::loadList(...) " << loadList.size() << " "
<< ingoingQueue.size() << " " << outgoingQueue.size()
<< std::endl;
for(auto& v : loadList)
switch(static_cast<GxsMailItemsSubtypes>(v->PacketSubType()))
{
case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_MAIL:
case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_RECEIPT:
{
RsGxsMailBaseItem* mi = dynamic_cast<RsGxsMailBaseItem*>(v);
if(mi)
{
RS_STACK_MUTEX(ingoingMutex);
ingoingQueue.insert(inMap::value_type(mi->mailId, mi));
}
break;
}
case GxsMailItemsSubtypes::OUTGOING_RECORD_ITEM:
{
OutgoingRecord* ot = dynamic_cast<OutgoingRecord*>(v);
if(ot)
{
RS_STACK_MUTEX(outgoingMutex);
outgoingQueue.insert(
prMap::value_type(ot->mailItem.mailId, *ot));
}
delete v;
break;
}
case GxsMailItemsSubtypes::GXS_MAIL_SUBTYPE_GROUP:
default:
std::cerr << "p3GxsMails::loadList(...) (EE) got item with "
<< "unhandled type: "
<< static_cast<uint>(v->PacketSubType())
<< std::endl;
delete v;
break;
}
std::cout << "p3GxsMails::loadList(...) " << loadList.size() << " "
<< ingoingQueue.size() << " " << outgoingQueue.size()
<< std::endl;
return true;
}

View File

@ -27,30 +27,9 @@
#include "services/p3idservice.h" // For p3IdService #include "services/p3idservice.h" // For p3IdService
#include "util/rsthreads.h" #include "util/rsthreads.h"
enum class GxsMailStatus
{
PENDING_PROCESSING = 0,
PENDING_PREFERRED_GROUP,
PENDING_RECEIPT_CREATE,
PENDING_RECEIPT_SIGNATURE,
PENDING_SERIALIZATION,
PENDING_PAYLOAD_CREATE,
PENDING_PAYLOAD_ENCRYPT,
PENDING_PUBLISH,
//PENDING_TRANSFER, /// This will be useful so the user can know if the mail reached some friend node, in case of internet connection interruption
PENDING_RECEIPT_RECEIVE,
/// Records with status >= RECEIPT_RECEIVED get deleted
RECEIPT_RECEIVED,
FAILED_RECEIPT_SIGNATURE = 240,
FAILED_ENCRYPTION
};
struct p3GxsMails; struct p3GxsMails;
struct GxsMailsClient struct GxsMailsClient
{ {
/// Subservices identifiers (like port for TCP)
enum GxsMailSubServices : uint16_t { TEST_SERVICE = 1, P3_MSG_SERVICE = 2 };
/** /**
* This will be called by p3GxsMails to dispatch mails to the subservice * This will be called by p3GxsMails to dispatch mails to the subservice
* @param originalMessage message as received from GXS backend (encrypted) * @param originalMessage message as received from GXS backend (encrypted)
@ -65,7 +44,7 @@ struct GxsMailsClient
GxsMailStatus status ) = 0; GxsMailStatus status ) = 0;
}; };
struct p3GxsMails : RsGenExchange, GxsTokenQueue // TODO: p3Config struct p3GxsMails : RsGenExchange, GxsTokenQueue, p3Config
{ {
p3GxsMails( RsGeneralDataService* gds, RsNetworkExchangeService* nes, p3GxsMails( RsGeneralDataService* gds, RsNetworkExchangeService* nes,
p3IdService& identities ) : p3IdService& identities ) :
@ -76,6 +55,7 @@ struct p3GxsMails : RsGenExchange, GxsTokenQueue // TODO: p3Config
servClientsMutex("p3GxsMails client services map mutex"), servClientsMutex("p3GxsMails client services map mutex"),
outgoingMutex("p3GxsMails outgoing queue map mutex"), outgoingMutex("p3GxsMails outgoing queue map mutex"),
ingoingMutex("p3GxsMails ingoing queue map mutex") {} ingoingMutex("p3GxsMails ingoing queue map mutex") {}
~p3GxsMails();
/** /**
* Send an email to recipient, in the process author of the email is * Send an email to recipient, in the process author of the email is
@ -86,7 +66,7 @@ struct p3GxsMails : RsGenExchange, GxsTokenQueue // TODO: p3Config
* @return true if the mail will be sent, false if not * @return true if the mail will be sent, false if not
*/ */
bool sendMail( RsGxsMailId& mailId, bool sendMail( RsGxsMailId& mailId,
GxsMailsClient::GxsMailSubServices service, GxsMailSubServices service,
const RsGxsId& own_gxsid, const RsGxsId& recipient, const RsGxsId& own_gxsid, const RsGxsId& recipient,
const uint8_t* data, uint32_t size, const uint8_t* data, uint32_t size,
RsGxsMailEncryptionMode cm = RsGxsMailEncryptionMode::RSA RsGxsMailEncryptionMode cm = RsGxsMailEncryptionMode::RSA
@ -103,25 +83,13 @@ struct p3GxsMails : RsGenExchange, GxsTokenQueue // TODO: p3Config
* GxsMailsClient::receiveGxsMail(...) callback * GxsMailsClient::receiveGxsMail(...) callback
* This method is part of the public interface of this service. * This method is part of the public interface of this service.
*/ */
void registerGxsMailsClient( GxsMailsClient::GxsMailSubServices serviceType, void registerGxsMailsClient( GxsMailSubServices serviceType,
GxsMailsClient* service ); GxsMailsClient* service );
/// @see RsGenExchange::getServiceInfo() /// @see RsGenExchange::getServiceInfo()
virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_MAIL, "GXS Mails", 0, 1, 0, 1 ); } virtual RsServiceInfo getServiceInfo() { return RsServiceInfo( RS_SERVICE_TYPE_GXS_MAIL, "GXS Mails", 0, 1, 0, 1 ); }
private: private:
/// @see GxsTokenQueue::handleResponse(uint32_t token, uint32_t req_type)
virtual void handleResponse(uint32_t token, uint32_t req_type);
/// @see RsGenExchange::service_tick()
virtual void service_tick();
/// @see RsGenExchange::service_CreateGroup(...)
RsGenExchange::ServiceCreate_Return service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet&);
/// @see RsGenExchange::notifyChanges(std::vector<RsGxsNotify *> &changes)
void notifyChanges(std::vector<RsGxsNotify *> &changes);
/** Time interval of inactivity before a distribution group is unsubscribed. /** Time interval of inactivity before a distribution group is unsubscribed.
* Approximatively 3 months seems ok ATM. */ * Approximatively 3 months seems ok ATM. */
const static int32_t UNUSED_GROUP_UNSUBSCRIBE_INTERVAL = 0x76A700; const static int32_t UNUSED_GROUP_UNSUBSCRIBE_INTERVAL = 0x76A700;
@ -134,7 +102,7 @@ private:
* Tought it can't be too little as this may cause signed receipts to * Tought it can't be too little as this may cause signed receipts to
* get lost thus causing resend and fastly grow perceived async latency, in * get lost thus causing resend and fastly grow perceived async latency, in
* case two sporadically connected users sends mails each other. * case two sporadically connected users sends mails each other.
* TODO: While it is ok for signed acknowledged to stays in the DB for a * While it is ok for signed acknowledged to stays in the DB for a
* full GXS_STORAGE_PERIOD, mails should be removed as soon as a valid * full GXS_STORAGE_PERIOD, mails should be removed as soon as a valid
* signed acknowledged is received for each of them. * signed acknowledged is received for each of them.
* Two weeks seems fair ATM. * Two weeks seems fair ATM.
@ -159,37 +127,54 @@ private:
p3IdService& idService; p3IdService& idService;
/// Stores pointers to client services to notify them about new mails /// Stores pointers to client services to notify them about new mails
std::map<GxsMailsClient::GxsMailSubServices, GxsMailsClient*> servClients; std::map<GxsMailSubServices, GxsMailsClient*> servClients;
RsMutex servClientsMutex; RsMutex servClientsMutex;
struct OutgoingRecord /**
{ * @brief Keep track of outgoing mails.
OutgoingRecord( RsGxsId rec, GxsMailsClient::GxsMailSubServices cs, * Records enter the queue when a mail is sent, and are removed when a
const uint8_t* data, uint32_t size ) : * receipt has been received or sending is considered definetly failed.
status(GxsMailStatus::PENDING_PROCESSING), recipient(rec), * Items are saved in config for consistence accross RetroShare shutdowns.
clientService(cs) */
{
mailData.resize(size);
memcpy(&mailData[0], data, size);
}
GxsMailStatus status;
RsGxsId recipient;
RsGxsMailItem mailItem; /// Don't use a pointer would be invalid after publish
std::vector<uint8_t> mailData;
GxsMailsClient::GxsMailSubServices clientService;
RsNxsMailPresignedReceipt presignedReceipt;
};
/// Keep track of mails while being processed
typedef std::map<RsGxsMailId, OutgoingRecord> prMap; typedef std::map<RsGxsMailId, OutgoingRecord> prMap;
prMap outgoingQueue; prMap outgoingQueue;
RsMutex outgoingMutex; RsMutex outgoingMutex;
void processOutgoingRecord(OutgoingRecord& r); void processOutgoingRecord(OutgoingRecord& r);
/**
* @brief Ingoing mail and receipt processing queue.
* Items are saved in config and then deleted in destructor for consistence
* accross RetroShare shutdowns.
*/
typedef std::unordered_multimap<RsGxsMailId, RsGxsMailBaseItem*> inMap; typedef std::unordered_multimap<RsGxsMailId, RsGxsMailBaseItem*> inMap;
inMap ingoingQueue; inMap ingoingQueue;
RsMutex ingoingMutex; RsMutex ingoingMutex;
/// @see GxsTokenQueue::handleResponse(uint32_t token, uint32_t req_type)
virtual void handleResponse(uint32_t token, uint32_t req_type);
/// @see RsGenExchange::service_tick()
virtual void service_tick();
/// @see RsGenExchange::service_CreateGroup(...)
RsGenExchange::ServiceCreate_Return service_CreateGroup(
RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& );
/// @see RsGenExchange::notifyChanges(std::vector<RsGxsNotify *> &changes)
void notifyChanges(std::vector<RsGxsNotify *> &changes);
/// @see p3Config::setupSerialiser()
virtual RsSerialiser* setupSerialiser();
/// @see p3Config::saveList(bool &cleanup, std::list<RsItem *>&)
virtual bool saveList(bool &cleanup, std::list<RsItem *>&saveList);
/// @see p3Config::saveDone()
void saveDone();
/// @see p3Config::loadList(std::list<RsItem *>&)
virtual bool loadList(std::list<RsItem *>& loadList);
/// Request groups list to GXS backend. Async method. /// Request groups list to GXS backend. Async method.
bool requestGroupsData(const std::list<RsGxsGroupId>* groupIds = NULL); bool requestGroupsData(const std::list<RsGxsGroupId>* groupIds = NULL);
@ -267,6 +252,7 @@ struct TestGxsMailClientService : GxsMailsClient, RsSingleJobThread
/// @see RsSingleJobThread::run() /// @see RsSingleJobThread::run()
virtual void run() virtual void run()
{ {
#if 0
usleep(10*1000*1000); usleep(10*1000*1000);
RsGxsId gxsidA("d0df7474bdde0464679e6ef787890287"); RsGxsId gxsidA("d0df7474bdde0464679e6ef787890287");
RsGxsId gxsidB("d060bea09dfa14883b5e6e517eb580cd"); RsGxsId gxsidB("d060bea09dfa14883b5e6e517eb580cd");
@ -274,23 +260,23 @@ struct TestGxsMailClientService : GxsMailsClient, RsSingleJobThread
if(idService.isOwnId(gxsidA)) if(idService.isOwnId(gxsidA))
{ {
std::string ciao("CiAone!"); std::string ciao("CiAone!");
mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidA, mailService.sendMail( mailId, GxsMailSubServices::TEST_SERVICE,
gxsidB, gxsidA, gxsidB,
reinterpret_cast<const uint8_t*>(ciao.data()), reinterpret_cast<const uint8_t*>(ciao.data()),
ciao.size() ); ciao.size() );
} }
else if(idService.isOwnId(gxsidB)) else if(idService.isOwnId(gxsidB))
{ {
std::string ciao("CiBuono!"); std::string ciao("CiBuono!");
mailService.sendMail( mailId, GxsMailsClient::TEST_SERVICE, gxsidB, mailService.sendMail( mailId, GxsMailSubServices::TEST_SERVICE,
gxsidA, gxsidB, gxsidA,
reinterpret_cast<const uint8_t*>(ciao.data()), reinterpret_cast<const uint8_t*>(ciao.data()),
ciao.size() ); ciao.size() );
} }
#endif
} }
private: private:
p3GxsMails& mailService; p3GxsMails& mailService;
p3IdService& idService; p3IdService& idService;
}; };

View File

@ -105,7 +105,8 @@ p3MsgService::p3MsgService( p3ServiceControl *sc, p3IdService *id_serv,
if(sc) initStandardTagTypes(); // Initialize standard tag types if(sc) initStandardTagTypes(); // Initialize standard tag types
gxsMailService.registerGxsMailsClient(GxsMailsClient::P3_MSG_SERVICE, this); gxsMailService.registerGxsMailsClient( GxsMailSubServices::P3_MSG_SERVICE,
this );
} }
const std::string MSG_APP_NAME = "msg"; const std::string MSG_APP_NAME = "msg";
@ -452,8 +453,14 @@ int p3MsgService::checkOutgoingMessages()
return 0; return 0;
} }
bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList) bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
{ {
RsMsgGRouterMap* gxsmailmap = new RsMsgGRouterMap;
{
RS_STACK_MUTEX(gxsOngoingMutex);
gxsmailmap->ongoing_msgs = gxsOngoingMessages;
}
itemList.push_front(gxsmailmap);
std::map<uint32_t, RsMsgItem *>::iterator mit; std::map<uint32_t, RsMsgItem *>::iterator mit;
std::map<uint32_t, RsMsgTagType* >::iterator mit2; std::map<uint32_t, RsMsgTagType* >::iterator mit2;
@ -461,9 +468,7 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
std::map<uint32_t, RsMsgSrcId* >::iterator lit; std::map<uint32_t, RsMsgSrcId* >::iterator lit;
std::map<uint32_t, RsMsgParentId* >::iterator mit4; std::map<uint32_t, RsMsgParentId* >::iterator mit4;
MsgTagType stdTags; cleanup = true;
cleanup = true;
mMsgMtx.lock(); mMsgMtx.lock();
@ -508,7 +513,7 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
kv.value = RsUtil::NumberToString(mDistantMessagePermissions) ; kv.value = RsUtil::NumberToString(mDistantMessagePermissions) ;
vitem->tlvkvs.pairs.push_back(kv) ; vitem->tlvkvs.pairs.push_back(kv) ;
itemList.push_back(vitem) ; itemList.push_back(vitem);
return true; return true;
} }
@ -570,8 +575,19 @@ void p3MsgService::initStandardTagTypes()
} }
} }
bool p3MsgService::loadList(std::list<RsItem*>& load) bool p3MsgService::loadList(std::list<RsItem*>& load)
{ {
auto gxsmIt = load.begin();
RsMsgGRouterMap* gxsmailmap = dynamic_cast<RsMsgGRouterMap*>(*gxsmIt);
if(gxsmailmap)
{
{
RS_STACK_MUTEX(gxsOngoingMutex);
gxsOngoingMessages = gxsmailmap->ongoing_msgs;
}
delete *gxsmIt; load.erase(gxsmIt);
}
RsMsgItem *mitem; RsMsgItem *mitem;
RsMsgTagType* mtt; RsMsgTagType* mtt;
RsMsgTags* mti; RsMsgTags* mti;
@ -581,7 +597,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
RsMsgDistantMessagesHashMap *ghm; RsMsgDistantMessagesHashMap *ghm;
std::list<RsMsgItem*> items; std::list<RsMsgItem*> items;
std::list<RsItem*>::iterator it; std::list<RsItem*>::iterator it;
std::map<uint32_t, RsMsgTagType*>::iterator tagIt; std::map<uint32_t, RsMsgTagType*>::iterator tagIt;
std::map<uint32_t, RsPeerId> srcIdMsgMap; std::map<uint32_t, RsPeerId> srcIdMsgMap;
std::map<uint32_t, RsPeerId>::iterator srcIt; std::map<uint32_t, RsPeerId>::iterator srcIt;
@ -589,10 +605,10 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
uint32_t max_msg_id = 0 ; uint32_t max_msg_id = 0 ;
// load items and calculate next unique msgId // load items and calculate next unique msgId
for(it = load.begin(); it != load.end(); ++it) for(it = load.begin(); it != load.end(); ++it)
{ {
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it))) if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
{ {
/* STORE MsgID */ /* STORE MsgID */
if (mitem->msgId > max_msg_id) if (mitem->msgId > max_msg_id)
@ -600,7 +616,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
items.push_back(mitem); items.push_back(mitem);
} }
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it))) else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
{ {
// merge. // merge.
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it) for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it)
@ -620,7 +636,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
std::cerr << " " << it->first << " received " << time(NULL)-it->second << " secs ago." << std::endl; std::cerr << " " << it->first << " received " << time(NULL)-it->second << " secs ago." << std::endl;
#endif #endif
} }
else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it))) else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it)))
{ {
// delete standard tags as they are now save in config // delete standard tags as they are now save in config
if(mTags.end() == (tagIt = mTags.find(mtt->tagId))) if(mTags.end() == (tagIt = mTags.find(mtt->tagId)))
@ -635,23 +651,23 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
} }
} }
else if(NULL != (mti = dynamic_cast<RsMsgTags *>(*it))) else if(NULL != (mti = dynamic_cast<RsMsgTags *>(*it)))
{ {
mMsgTags.insert(std::pair<uint32_t, RsMsgTags* >(mti->msgId, mti)); mMsgTags.insert(std::pair<uint32_t, RsMsgTags* >(mti->msgId, mti));
} }
else if(NULL != (msi = dynamic_cast<RsMsgSrcId *>(*it))) else if(NULL != (msi = dynamic_cast<RsMsgSrcId *>(*it)))
{ {
srcIdMsgMap.insert(std::pair<uint32_t, RsPeerId>(msi->msgId, msi->srcId)); srcIdMsgMap.insert(std::pair<uint32_t, RsPeerId>(msi->msgId, msi->srcId));
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi)); // does not need to be kept mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi)); // does not need to be kept
} }
else if(NULL != (msp = dynamic_cast<RsMsgParentId *>(*it))) else if(NULL != (msp = dynamic_cast<RsMsgParentId *>(*it)))
{ {
mParentId.insert(std::pair<uint32_t, RsMsgParentId*>(msp->msgId, msp)); mParentId.insert(std::pair<uint32_t, RsMsgParentId*>(msp->msgId, msp));
} }
RsConfigKeyValueSet *vitem = NULL ; RsConfigKeyValueSet *vitem = NULL ;
if(NULL != (vitem = dynamic_cast<RsConfigKeyValueSet*>(*it))) if(NULL != (vitem = dynamic_cast<RsConfigKeyValueSet*>(*it)))
{ {
for(std::list<RsTlvKeyValue>::const_iterator kit = vitem->tlvkvs.pairs.begin(); kit != vitem->tlvkvs.pairs.end(); ++kit) for(std::list<RsTlvKeyValue>::const_iterator kit = vitem->tlvkvs.pairs.begin(); kit != vitem->tlvkvs.pairs.end(); ++kit)
{ {
@ -682,7 +698,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
} }
} }
delete *it ; delete *it ;
continue ; continue ;
} }
} }
@ -700,7 +716,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
mitem->msgId = getNewUniqueMsgId(); mitem->msgId = getNewUniqueMsgId();
} }
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mMsgMtx);
srcIt = srcIdMsgMap.find(mitem->msgId); srcIt = srcIdMsgMap.find(mitem->msgId);
if(srcIt != srcIdMsgMap.end()) { if(srcIt != srcIdMsgMap.end()) {
@ -1888,19 +1904,21 @@ void p3MsgService::notifyDataStatus( const GRouterMsgPropagationId& id,
mDistantOutgoingMsgSigners[msg_id] = signer_id; mDistantOutgoingMsgSigners[msg_id] = signer_id;
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id); std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id);
if(mit == msgOutgoing.end()) if(mit == msgOutgoing.end())
{ {
std::cerr << " (EE) message has been notified as not delivered, " std::cerr << " (II) message has been notified as not delivered, "
<< "but it not on outgoing list. Something's wrong!!" << "but it's not in outgoing list. Probably it has been "
<< "delivered successfully by other means."
<< std::endl; << std::endl;
return;
} }
std::cerr << " reseting the ROUTED flag so that the message is " else
<< "requested again" << std::endl; {
std::cerr << " reseting the ROUTED flag so that the message is "
<< "requested again" << std::endl;
// clear the routed flag so that the message is requested again // clear the routed flag so that the message is requested again
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED; mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED;
}
return; return;
} }
@ -1918,15 +1936,15 @@ void p3MsgService::notifyDataStatus( const GRouterMsgPropagationId& id,
return; return;
} }
uint32_t msg_id = it->second ; uint32_t msg_id = it->second;
// we should now remove the item from the msgOutgoing list. // we should now remove the item from the msgOutgoing list.
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id); std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id);
if(it2 == msgOutgoing.end()) if(it2 == msgOutgoing.end())
{ {
std::cerr << "(EE) message has been ACKed, but is not in outgoing " std::cerr << "(II) message has been notified as delivered, but it's"
<< "list. Something's wrong!!" << std::endl; << " not in outgoing list. Probably it has been delivered"
<< " successfully by other means." << std::endl;
return; return;
} }
@ -1972,6 +1990,9 @@ uint32_t p3MsgService::getDistantMessagingPermissionFlags()
bool p3MsgService::receiveGxsMail( const RsGxsMailItem& originalMessage, bool p3MsgService::receiveGxsMail( const RsGxsMailItem& originalMessage,
const uint8_t* data, uint32_t dataSize ) const uint8_t* data, uint32_t dataSize )
{ {
std::cout << "p3MsgService::receiveGxsMail(" << originalMessage.mailId
<< ",, " << dataSize << ")" << std::endl;
Sha1CheckSum hash = RsDirUtil::sha1sum(data, dataSize); Sha1CheckSum hash = RsDirUtil::sha1sum(data, dataSize);
{ {
@ -1979,9 +2000,9 @@ bool p3MsgService::receiveGxsMail( const RsGxsMailItem& originalMessage,
if( mRecentlyReceivedMessageHashes.find(hash) != if( mRecentlyReceivedMessageHashes.find(hash) !=
mRecentlyReceivedMessageHashes.end() ) mRecentlyReceivedMessageHashes.end() )
{ {
std::cerr << "p3MsgService::receiveGxsMail(...) (WW) receiving " std::cerr << "p3MsgService::receiveGxsMail(...) (II) receiving "
<< "message of hash " << hash << " more than once. This " << "message of hash " << hash << " more than once. "
<< "is not a bug, unless it happens very often." << "Probably it has arrived before by other means."
<< std::endl; << std::endl;
return true; return true;
} }
@ -2016,6 +2037,58 @@ bool p3MsgService::receiveGxsMail( const RsGxsMailItem& originalMessage,
bool p3MsgService::notifySendMailStatus( const RsGxsMailItem& originalMessage, bool p3MsgService::notifySendMailStatus( const RsGxsMailItem& originalMessage,
GxsMailStatus status ) GxsMailStatus status )
{ {
std::cout << "p3MsgService::notifySendMailStatus(" << originalMessage.mailId
<< ", " << static_cast<uint>(status) << ")" << std::endl;
if( status == GxsMailStatus::RECEIPT_RECEIVED )
{
uint32_t msg_id;
{
RS_STACK_MUTEX(gxsOngoingMutex);
auto it = gxsOngoingMessages.find(originalMessage.mailId);
if(it == gxsOngoingMessages.end())
{
std::cerr << "p3MsgService::notifySendMailStatus("
<< originalMessage.mailId
<< ", " << static_cast<uint>(status) << ") "
<< "(EE) cannot find pending message to acknowledge!"
<< std::endl;
return false;
}
msg_id = it->second;
}
// we should now remove the item from the msgOutgoing list.
{
RS_STACK_MUTEX(mMsgMtx);
auto it2 = msgOutgoing.find(msg_id);
if(it2 == msgOutgoing.end())
{
std::cerr << "p3MsgService::notifySendMailStatus("
<< originalMessage.mailId
<< ", " << static_cast<uint>(status) << ") (II) "
<< "received receipt for message that is not in "
<< "outgoing list, probably it has been acknoweldged "
<< "before by other means." << std::endl;
return true;
}
delete it2->second;
msgOutgoing.erase(it2);
}
RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
NOTIFY_TYPE_ADD );
IndicateConfigChanged();
return true;
}
if( status >= GxsMailStatus::FAILED_RECEIPT_SIGNATURE ) if( status >= GxsMailStatus::FAILED_RECEIPT_SIGNATURE )
{ {
uint32_t msg_id; uint32_t msg_id;
@ -2025,7 +2098,7 @@ bool p3MsgService::notifySendMailStatus( const RsGxsMailItem& originalMessage,
std::cerr << "p3MsgService::notifySendMailStatus(...) mail delivery" std::cerr << "p3MsgService::notifySendMailStatus(...) mail delivery"
<< "mailId: " << originalMessage.mailId << "mailId: " << originalMessage.mailId
<< " failed with " << static_cast<uint>(status); << " failed with " << static_cast<uint32_t>(status);
auto it = gxsOngoingMessages.find(originalMessage.mailId); auto it = gxsOngoingMessages.find(originalMessage.mailId);
if(it == gxsOngoingMessages.end()) if(it == gxsOngoingMessages.end())
@ -2058,49 +2131,6 @@ bool p3MsgService::notifySendMailStatus( const RsGxsMailItem& originalMessage,
return true; return true;
} }
} }
if( status == GxsMailStatus::RECEIPT_RECEIVED )
{
uint32_t msg_id;
{
RS_STACK_MUTEX(gxsOngoingMutex);
auto it = gxsOngoingMessages.find(originalMessage.mailId);
if(it == gxsOngoingMessages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. "
<< "Weird.mailId = " << originalMessage.mailId
<< std::endl;
return false;
}
msg_id = it->second;
}
// we should now remove the item from the msgOutgoing list.
{
RS_STACK_MUTEX(mMsgMtx);
auto it2 = msgOutgoing.find(msg_id);
if(it2 == msgOutgoing.end())
{
std::cerr << "(EE) message has been ACKed, but is not in "
<< "outgoing list. Something's wrong!!" << std::endl;
return true;
}
delete it2->second;
msgOutgoing.erase(it2);
}
RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
NOTIFY_TYPE_ADD );
IndicateConfigChanged();
return true;
}
} }
void p3MsgService::receiveGRouterData( const RsGxsId &destination_key, void p3MsgService::receiveGRouterData( const RsGxsId &destination_key,
@ -2122,9 +2152,9 @@ void p3MsgService::receiveGRouterData( const RsGxsId &destination_key,
if( mRecentlyReceivedMessageHashes.find(hash) != if( mRecentlyReceivedMessageHashes.find(hash) !=
mRecentlyReceivedMessageHashes.end() ) mRecentlyReceivedMessageHashes.end() )
{ {
std::cerr << "p3MsgService::receiveGRouterData(...) (WW) receiving" std::cerr << "p3MsgService::receiveGRouterData(...) (II) receiving"
<< "distant message of hash " << hash << " more than once" << "distant message of hash " << hash << " more than once"
<< ". This is not a bug, unless it happens very often." << ". Probably it has arrived before by other means."
<< std::endl; << std::endl;
free(data); free(data);
return; return;
@ -2218,9 +2248,9 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
msg_serialized_data, msg_serialized_rssize, msg_serialized_data, msg_serialized_rssize,
signing_key_id, grouter_message_id ); signing_key_id, grouter_message_id );
RsGxsMailId gxsMailId; RsGxsMailId gxsMailId;
gxsMailService.sendMail( gxsMailId, P3_MSG_SERVICE, signing_key_id, gxsMailService.sendMail( gxsMailId, GxsMailSubServices::P3_MSG_SERVICE,
destination_key_id, msg_serialized_data, signing_key_id, destination_key_id,
msg_serialized_rssize ); msg_serialized_data, msg_serialized_rssize );
/* now store the grouter id along with the message id, so that we can keep /* now store the grouter id along with the message id, so that we can keep
* track of received messages */ * track of received messages */

View File

@ -150,6 +150,7 @@ private:
* The map is indexed by the hash */ * The map is indexed by the hash */
std::map<GRouterMsgPropagationId, uint32_t> _ongoing_messages; std::map<GRouterMsgPropagationId, uint32_t> _ongoing_messages;
/// Contains ongoing messages handed to gxs mail
std::map<RsGxsMailId, uint32_t> gxsOngoingMessages; std::map<RsGxsMailId, uint32_t> gxsOngoingMessages;
RsMutex gxsOngoingMutex; RsMutex gxsOngoingMutex;