mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-02-05 01:25:39 -05:00
Merge pull request #875 from csoler/v0.6-GxsTransport
V0.6 gxs transport
This commit is contained in:
commit
39f003f9f9
@ -30,7 +30,7 @@ p3GxsTrans::~p3GxsTrans()
|
|||||||
|
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mIngoingMutex);
|
RS_STACK_MUTEX(mIngoingMutex);
|
||||||
for ( auto& kv : mIngoingQueue ) delete kv.second;
|
for ( auto& kv : mIncomingQueue) delete kv.second;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,8 +48,8 @@ bool p3GxsTrans::getStatistics(GxsTransStatistics& stats)
|
|||||||
|
|
||||||
RsGxsTransOutgoingRecord rec ;
|
RsGxsTransOutgoingRecord rec ;
|
||||||
rec.status = pr.status ;
|
rec.status = pr.status ;
|
||||||
rec.send_TS = pr.mailItem.meta.mPublishTs ;
|
rec.send_TS = pr.sent_ts ;
|
||||||
rec.group_id = pr.mailItem.meta.mGroupId ;
|
rec.group_id = pr.group_id ;
|
||||||
rec.trans_id = pr.mailItem.mailId ;
|
rec.trans_id = pr.mailItem.mailId ;
|
||||||
rec.recipient = pr.recipient ;
|
rec.recipient = pr.recipient ;
|
||||||
rec.data_size = pr.mailData.size();
|
rec.data_size = pr.mailData.size();
|
||||||
@ -89,8 +89,7 @@ bool p3GxsTrans::sendData( RsGxsTransId& mailId,
|
|||||||
OutgoingRecord pr( recipient, service, data, size );
|
OutgoingRecord pr( recipient, service, data, size );
|
||||||
|
|
||||||
pr.mailItem.clear();
|
pr.mailItem.clear();
|
||||||
pr.mailItem.meta.mAuthorId = own_gxsid;
|
pr.author = own_gxsid; //pr.mailItem.meta.mAuthorId = own_gxsid;
|
||||||
pr.mailItem.meta.mMsgId.clear();
|
|
||||||
pr.mailItem.cryptoType = cm;
|
pr.mailItem.cryptoType = cm;
|
||||||
pr.mailItem.mailId = RSRandom::random_u64();
|
pr.mailItem.mailId = RSRandom::random_u64();
|
||||||
|
|
||||||
@ -100,6 +99,8 @@ bool p3GxsTrans::sendData( RsGxsTransId& mailId,
|
|||||||
}
|
}
|
||||||
|
|
||||||
mailId = pr.mailItem.mailId;
|
mailId = pr.mailItem.mailId;
|
||||||
|
|
||||||
|
IndicateConfigChanged(); // This causes the saving of the message after all data has been filled in.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,8 +125,9 @@ void p3GxsTrans::registerGxsTransClient(
|
|||||||
|
|
||||||
void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
|
void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
|
||||||
{
|
{
|
||||||
std::cout << "p3GxsTrans::handleResponse(" << token << ", " << req_type
|
std::cout << "p3GxsTrans::handleResponse(" << token << ", " << req_type << ")" << std::endl;
|
||||||
<< ")" << std::endl;
|
bool changed = false ;
|
||||||
|
|
||||||
switch (req_type)
|
switch (req_type)
|
||||||
{
|
{
|
||||||
case GROUPS_LIST:
|
case GROUPS_LIST:
|
||||||
@ -222,12 +224,14 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
|
|||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
|
||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
|
||||||
{
|
{
|
||||||
RsGxsTransBaseItem* mb =
|
RsGxsTransBaseMsgItem* mb = dynamic_cast<RsGxsTransBaseMsgItem*>(*mIt);
|
||||||
dynamic_cast<RsGxsTransBaseItem*>(*mIt);
|
|
||||||
if(mb)
|
if(mb)
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mIngoingMutex);
|
RS_STACK_MUTEX(mIngoingMutex);
|
||||||
mIngoingQueue.insert(inMap::value_type(mb->mailId, mb));
|
mIncomingQueue.insert(inMap::value_type(mb->mailId,mb));
|
||||||
|
|
||||||
|
changed = true ;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
std::cerr << "p3GxsTrans::handleResponse(...) "
|
std::cerr << "p3GxsTrans::handleResponse(...) "
|
||||||
@ -253,6 +257,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
|
|||||||
<< req_type << std::endl;
|
<< req_type << std::endl;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(changed)
|
||||||
|
IndicateConfigChanged();
|
||||||
}
|
}
|
||||||
|
|
||||||
void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& m)
|
void p3GxsTrans::GxsTransIntegrityCleanupThread::getMessagesToDelete(GxsMsgReq& m)
|
||||||
@ -354,6 +361,7 @@ void p3GxsTrans::service_tick()
|
|||||||
GxsTokenQueue::checkRequests();
|
GxsTokenQueue::checkRequests();
|
||||||
|
|
||||||
time_t now = time(NULL);
|
time_t now = time(NULL);
|
||||||
|
bool changed = false ;
|
||||||
|
|
||||||
if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now)
|
if(mLastMsgCleanup + MAX_DELAY_BETWEEN_CLEANUPS < now)
|
||||||
{
|
{
|
||||||
@ -392,10 +400,15 @@ void p3GxsTrans::service_tick()
|
|||||||
{
|
{
|
||||||
OutgoingRecord& pr(it->second);
|
OutgoingRecord& pr(it->second);
|
||||||
GxsTransSendStatus oldStatus = pr.status;
|
GxsTransSendStatus oldStatus = pr.status;
|
||||||
processOutgoingRecord(pr);
|
|
||||||
|
locked_processOutgoingRecord(pr);
|
||||||
|
|
||||||
if (oldStatus != pr.status) notifyClientService(pr);
|
if (oldStatus != pr.status) notifyClientService(pr);
|
||||||
if( pr.status >= GxsTransSendStatus::RECEIPT_RECEIVED )
|
if( pr.status >= GxsTransSendStatus::RECEIPT_RECEIVED )
|
||||||
|
{
|
||||||
it = mOutgoingQueue.erase(it);
|
it = mOutgoingQueue.erase(it);
|
||||||
|
changed = true ;
|
||||||
|
}
|
||||||
else ++it;
|
else ++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -403,15 +416,14 @@ void p3GxsTrans::service_tick()
|
|||||||
|
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mIngoingMutex);
|
RS_STACK_MUTEX(mIngoingMutex);
|
||||||
for( auto it = mIngoingQueue.begin(); it != mIngoingQueue.end(); )
|
for( auto it = mIncomingQueue.begin(); it != mIncomingQueue.end(); )
|
||||||
{
|
{
|
||||||
switch(static_cast<GxsTransItemsSubtypes>(
|
switch(static_cast<GxsTransItemsSubtypes>( it->second->PacketSubType()))
|
||||||
it->second->PacketSubType()))
|
|
||||||
{
|
{
|
||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
|
||||||
{
|
{
|
||||||
RsGxsTransMailItem* msg =
|
RsGxsTransMailItem* msg = dynamic_cast<RsGxsTransMailItem*>(it->second);
|
||||||
dynamic_cast<RsGxsTransMailItem*>(it->second);
|
|
||||||
if(!msg)
|
if(!msg)
|
||||||
{
|
{
|
||||||
std::cerr << "p3GxsTrans::service_tick() (EE) "
|
std::cerr << "p3GxsTrans::service_tick() (EE) "
|
||||||
@ -436,8 +448,8 @@ void p3GxsTrans::service_tick()
|
|||||||
}
|
}
|
||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
|
||||||
{
|
{
|
||||||
RsGxsTransPresignedReceipt* rcpt =
|
RsGxsTransPresignedReceipt* rcpt = dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
|
||||||
dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
|
|
||||||
if(!rcpt)
|
if(!rcpt)
|
||||||
{
|
{
|
||||||
std::cerr << "p3GxsTrans::service_tick() (EE) "
|
std::cerr << "p3GxsTrans::service_tick() (EE) "
|
||||||
@ -467,9 +479,14 @@ void p3GxsTrans::service_tick()
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
delete it->second; it = mIngoingQueue.erase(it);
|
delete it->second ;
|
||||||
|
it = mIncomingQueue.erase(it);
|
||||||
|
changed = true ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(changed)
|
||||||
|
IndicateConfigChanged();
|
||||||
}
|
}
|
||||||
|
|
||||||
RsGenExchange::ServiceCreate_Return p3GxsTrans::service_CreateGroup(
|
RsGenExchange::ServiceCreate_Return p3GxsTrans::service_CreateGroup(
|
||||||
@ -678,7 +695,7 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr)
|
void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr)
|
||||||
{
|
{
|
||||||
//std::cout << "p3GxsTrans::processRecord(...)" << std::endl;
|
//std::cout << "p3GxsTrans::processRecord(...)" << std::endl;
|
||||||
|
|
||||||
@ -688,7 +705,7 @@ void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr)
|
|||||||
{
|
{
|
||||||
pr.mailItem.saltRecipientHint(pr.recipient);
|
pr.mailItem.saltRecipientHint(pr.recipient);
|
||||||
pr.mailItem.saltRecipientHint(RsGxsId::random());
|
pr.mailItem.saltRecipientHint(RsGxsId::random());
|
||||||
pr.mailItem.meta.mPublishTs = time(NULL);
|
pr.sent_ts = time(NULL) ; //pr.mailItem.meta.mPublishTs = time(NULL);
|
||||||
}
|
}
|
||||||
case GxsTransSendStatus::PENDING_PREFERRED_GROUP:
|
case GxsTransSendStatus::PENDING_PREFERRED_GROUP:
|
||||||
{
|
{
|
||||||
@ -699,12 +716,16 @@ void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pr.mailItem.meta.mGroupId = mPreferredGroupId;
|
pr.group_id = mPreferredGroupId ; //pr.mailItem.meta.mGroupId = mPreferredGroupId;
|
||||||
}
|
}
|
||||||
case GxsTransSendStatus::PENDING_RECEIPT_CREATE:
|
case GxsTransSendStatus::PENDING_RECEIPT_CREATE:
|
||||||
{
|
{
|
||||||
RsGxsTransPresignedReceipt grcpt;
|
RsGxsTransPresignedReceipt grcpt;
|
||||||
grcpt.meta = pr.mailItem.meta;
|
grcpt.meta.mAuthorId = pr.author ; //grcpt.meta = pr.mailItem.meta;
|
||||||
|
grcpt.meta.mGroupId = pr.group_id ; //grcpt.meta = pr.mailItem.meta;
|
||||||
|
grcpt.meta.mMsgId.clear() ;
|
||||||
|
grcpt.meta.mParentId.clear() ;
|
||||||
|
grcpt.meta.mOrigMsgId.clear() ;
|
||||||
grcpt.meta.mPublishTs = time(NULL);
|
grcpt.meta.mPublishTs = time(NULL);
|
||||||
grcpt.mailId = pr.mailItem.mailId;
|
grcpt.mailId = pr.mailItem.mailId;
|
||||||
uint32_t grsz = RsGxsTransSerializer().size(&grcpt);
|
uint32_t grsz = RsGxsTransSerializer().size(&grcpt);
|
||||||
@ -717,7 +738,7 @@ void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr)
|
|||||||
*pr.presignedReceipt.metaData = grcpt.meta;
|
*pr.presignedReceipt.metaData = grcpt.meta;
|
||||||
pr.presignedReceipt.msg.setBinData(&grsrz[0], grsz);
|
pr.presignedReceipt.msg.setBinData(&grsrz[0], grsz);
|
||||||
}
|
}
|
||||||
case GxsTransSendStatus::PENDING_RECEIPT_SIGNATURE:
|
case GxsTransSendStatus::PENDING_RECEIPT_SIGNATURE: // (cyril) This step is never actually used.
|
||||||
{
|
{
|
||||||
switch (RsGenExchange::createMessage(&pr.presignedReceipt))
|
switch (RsGenExchange::createMessage(&pr.presignedReceipt))
|
||||||
{
|
{
|
||||||
@ -804,27 +825,48 @@ void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr)
|
|||||||
<< " payload size: " << pr.mailItem.payload.size()
|
<< " payload size: " << pr.mailItem.payload.size()
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
|
RsGxsTransMailItem *mail_item = new RsGxsTransMailItem(pr.mailItem);
|
||||||
|
|
||||||
|
// pr.mailItem.meta is *not* serialised. So it is important to not rely on what's in it!
|
||||||
|
|
||||||
|
mail_item->meta.mGroupId = pr.group_id ;
|
||||||
|
mail_item->meta.mAuthorId = pr.author ;
|
||||||
|
|
||||||
|
mail_item->meta.mMsgId.clear();
|
||||||
|
mail_item->meta.mParentId.clear();
|
||||||
|
mail_item->meta.mOrigMsgId.clear();
|
||||||
|
|
||||||
uint32_t token;
|
uint32_t token;
|
||||||
publishMsg(token, new RsGxsTransMailItem(pr.mailItem));
|
publishMsg(token, mail_item) ;
|
||||||
|
|
||||||
pr.status = GxsTransSendStatus::PENDING_RECEIPT_RECEIVE;
|
pr.status = GxsTransSendStatus::PENDING_RECEIPT_RECEIVE;
|
||||||
|
|
||||||
|
IndicateConfigChanged(); // This causes the saving of the message after pr.status has changed.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
//case GxsTransSendStatus::PENDING_TRANSFER:
|
//case GxsTransSendStatus::PENDING_TRANSFER:
|
||||||
case GxsTransSendStatus::PENDING_RECEIPT_RECEIVE:
|
case GxsTransSendStatus::PENDING_RECEIPT_RECEIVE:
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mIngoingMutex);
|
RS_STACK_MUTEX(mIngoingMutex);
|
||||||
auto range = mIngoingQueue.equal_range(pr.mailItem.mailId);
|
auto range = mIncomingQueue.equal_range(pr.mailItem.mailId);
|
||||||
|
bool changed = false ;
|
||||||
|
|
||||||
for( auto it = range.first; it != range.second; ++it)
|
for( auto it = range.first; it != range.second; ++it)
|
||||||
{
|
{
|
||||||
RsGxsTransPresignedReceipt* rt =
|
RsGxsTransPresignedReceipt* rt = dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
|
||||||
dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
|
|
||||||
if(rt && mIdService.isOwnId(rt->meta.mAuthorId))
|
if(rt && mIdService.isOwnId(rt->meta.mAuthorId))
|
||||||
{
|
{
|
||||||
mIngoingQueue.erase(it); delete rt;
|
mIncomingQueue.erase(it); delete rt;
|
||||||
pr.status = GxsTransSendStatus::RECEIPT_RECEIVED;
|
pr.status = GxsTransSendStatus::RECEIPT_RECEIVED;
|
||||||
|
|
||||||
|
changed = true ;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(changed)
|
||||||
|
IndicateConfigChanged();
|
||||||
|
|
||||||
// TODO: Resend message if older then treshold
|
// TODO: Resend message if older then treshold
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -875,19 +917,23 @@ RsSerialiser* p3GxsTrans::setupSerialiser()
|
|||||||
|
|
||||||
bool p3GxsTrans::saveList(bool &cleanup, std::list<RsItem *>& saveList)
|
bool p3GxsTrans::saveList(bool &cleanup, std::list<RsItem *>& saveList)
|
||||||
{
|
{
|
||||||
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " "
|
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
|
||||||
<< mIngoingQueue.size() << " " << mOutgoingQueue.size()
|
|
||||||
<< std::endl;
|
|
||||||
|
|
||||||
mOutgoingMutex.lock();
|
mOutgoingMutex.lock();
|
||||||
mIngoingMutex.lock();
|
mIngoingMutex.lock();
|
||||||
|
|
||||||
for ( auto& kv : mOutgoingQueue ) saveList.push_back(&kv.second);
|
for ( auto& kv : mOutgoingQueue )
|
||||||
for ( auto& kv : mIngoingQueue ) saveList.push_back(kv.second);
|
{
|
||||||
|
std::cerr << "Saving outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::dec << "Group id: " << kv.second.group_id << ", TS=" << kv.second.sent_ts << std::endl;
|
||||||
|
saveList.push_back(&kv.second);
|
||||||
|
}
|
||||||
|
for ( auto& kv : mIncomingQueue )
|
||||||
|
{
|
||||||
|
std::cerr << "Saving incoming item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::endl;
|
||||||
|
saveList.push_back(kv.second);
|
||||||
|
}
|
||||||
|
|
||||||
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " "
|
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIncomingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
|
||||||
<< mIngoingQueue.size() << " " << mOutgoingQueue.size()
|
|
||||||
<< std::endl;
|
|
||||||
|
|
||||||
cleanup = false;
|
cleanup = false;
|
||||||
return true;
|
return true;
|
||||||
@ -902,7 +948,7 @@ void p3GxsTrans::saveDone()
|
|||||||
bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
|
bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
|
||||||
{
|
{
|
||||||
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
|
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
|
||||||
<< mIngoingQueue.size() << " " << mOutgoingQueue.size()
|
<< mIncomingQueue.size() << " " << mOutgoingQueue.size()
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
for(auto& v : loadList)
|
for(auto& v : loadList)
|
||||||
@ -911,14 +957,42 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
|
|||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
|
||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
|
||||||
{
|
{
|
||||||
RsGxsTransBaseItem* mi = dynamic_cast<RsGxsTransBaseItem*>(v);
|
RsGxsTransBaseMsgItem* mi = dynamic_cast<RsGxsTransBaseMsgItem*>(v);
|
||||||
if(mi)
|
if(mi)
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mIngoingMutex);
|
RS_STACK_MUTEX(mIngoingMutex);
|
||||||
mIngoingQueue.insert(inMap::value_type(mi->mailId, mi));
|
mIncomingQueue.insert(inMap::value_type(mi->mailId, mi));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM_deprecated:
|
||||||
|
{
|
||||||
|
OutgoingRecord_deprecated* dot = dynamic_cast<OutgoingRecord_deprecated*>(v);
|
||||||
|
|
||||||
|
if(dot)
|
||||||
|
{
|
||||||
|
std::cerr << "(EE) Read a deprecated GxsTrans outgoing item. Converting to new format..." << std::endl;
|
||||||
|
|
||||||
|
OutgoingRecord ot(dot->recipient,dot->clientService,&dot->mailData[0],dot->mailData.size()) ;
|
||||||
|
|
||||||
|
ot.status = dot->status ;
|
||||||
|
|
||||||
|
ot.author.clear(); // These 3 fields cannot be stored in mailItem.meta, which is not serialised, so they are lost.
|
||||||
|
ot.group_id.clear() ;
|
||||||
|
ot.sent_ts = 0;
|
||||||
|
|
||||||
|
ot.mailItem = dot->mailItem ;
|
||||||
|
ot.presignedReceipt = dot->presignedReceipt;
|
||||||
|
|
||||||
|
RS_STACK_MUTEX(mOutgoingMutex);
|
||||||
|
mOutgoingQueue.insert(prMap::value_type(ot.mailItem.mailId, ot));
|
||||||
|
|
||||||
|
std::cerr << "Loaded outgoing item (converted), ID " << std::hex << std::setfill('0') << std::setw(16) << ot.mailItem.mailId<< std::dec << ", Group id: " << ot.group_id << ", TS=" << ot.sent_ts << std::endl;
|
||||||
|
}
|
||||||
|
delete v;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM:
|
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM:
|
||||||
{
|
{
|
||||||
OutgoingRecord* ot = dynamic_cast<OutgoingRecord*>(v);
|
OutgoingRecord* ot = dynamic_cast<OutgoingRecord*>(v);
|
||||||
@ -927,6 +1001,8 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
|
|||||||
RS_STACK_MUTEX(mOutgoingMutex);
|
RS_STACK_MUTEX(mOutgoingMutex);
|
||||||
mOutgoingQueue.insert(
|
mOutgoingQueue.insert(
|
||||||
prMap::value_type(ot->mailItem.mailId, *ot));
|
prMap::value_type(ot->mailItem.mailId, *ot));
|
||||||
|
|
||||||
|
std::cerr << "Loading outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << ot->mailItem.mailId<< std::dec << "Group id: " << ot->group_id << ", TS=" << ot->sent_ts << std::endl;
|
||||||
}
|
}
|
||||||
delete v;
|
delete v;
|
||||||
break;
|
break;
|
||||||
@ -942,7 +1018,7 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
|
std::cout << "p3GxsTrans::loadList(...) " << loadList.size() << " "
|
||||||
<< mIngoingQueue.size() << " " << mOutgoingQueue.size()
|
<< mIncomingQueue.size() << " " << mOutgoingQueue.size()
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -193,7 +193,7 @@ private:
|
|||||||
typedef std::map<RsGxsTransId, OutgoingRecord> prMap;
|
typedef std::map<RsGxsTransId, OutgoingRecord> prMap;
|
||||||
prMap mOutgoingQueue;
|
prMap mOutgoingQueue;
|
||||||
RsMutex mOutgoingMutex;
|
RsMutex mOutgoingMutex;
|
||||||
void processOutgoingRecord(OutgoingRecord& r);
|
void locked_processOutgoingRecord(OutgoingRecord& r);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Ingoing mail and receipt processing queue.
|
* @brief Ingoing mail and receipt processing queue.
|
||||||
@ -204,8 +204,8 @@ private:
|
|||||||
* item to not being processed and memleaked multimap is used instead of map
|
* item to not being processed and memleaked multimap is used instead of map
|
||||||
* for incoming queue.
|
* for incoming queue.
|
||||||
*/
|
*/
|
||||||
typedef std::unordered_multimap<RsGxsTransId, RsGxsTransBaseItem*> inMap;
|
typedef std::unordered_multimap<RsGxsTransId, RsGxsTransBaseMsgItem*> inMap;
|
||||||
inMap mIngoingQueue;
|
inMap mIncomingQueue;
|
||||||
RsMutex mIngoingMutex;
|
RsMutex mIngoingMutex;
|
||||||
|
|
||||||
/// @see GxsTokenQueue::handleResponse(uint32_t token, uint32_t req_type)
|
/// @see GxsTokenQueue::handleResponse(uint32_t token, uint32_t req_type)
|
||||||
|
@ -21,10 +21,11 @@
|
|||||||
|
|
||||||
const RsGxsId RsGxsTransMailItem::allRecipientsHint("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
|
const RsGxsId RsGxsTransMailItem::allRecipientsHint("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
|
||||||
|
|
||||||
OutgoingRecord::OutgoingRecord() :
|
OutgoingRecord_deprecated::OutgoingRecord_deprecated()
|
||||||
RsItem( RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_GXS_TRANS,
|
: RsItem( RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_GXS_TRANS, static_cast<uint8_t>(GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM_deprecated) ) { clear();}
|
||||||
static_cast<uint8_t>(GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM) )
|
|
||||||
{ clear();}
|
OutgoingRecord::OutgoingRecord()
|
||||||
|
: RsItem( RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_GXS_TRANS, static_cast<uint8_t>(GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM) ) { clear();}
|
||||||
|
|
||||||
OutgoingRecord::OutgoingRecord( RsGxsId rec, GxsTransSubServices cs,
|
OutgoingRecord::OutgoingRecord( RsGxsId rec, GxsTransSubServices cs,
|
||||||
const uint8_t* data, uint32_t size ) :
|
const uint8_t* data, uint32_t size ) :
|
||||||
@ -41,8 +42,7 @@ OutgoingRecord::OutgoingRecord( RsGxsId rec, GxsTransSubServices cs,
|
|||||||
RS_REGISTER_ITEM_TYPE(RsGxsTransMailItem) // for mailItem
|
RS_REGISTER_ITEM_TYPE(RsGxsTransMailItem) // for mailItem
|
||||||
RS_REGISTER_ITEM_TYPE(RsNxsTransPresignedReceipt) // for presignedReceipt
|
RS_REGISTER_ITEM_TYPE(RsNxsTransPresignedReceipt) // for presignedReceipt
|
||||||
|
|
||||||
void OutgoingRecord::serial_process(RsGenericSerializer::SerializeJob j,
|
void OutgoingRecord_deprecated::serial_process(RsGenericSerializer::SerializeJob j, RsGenericSerializer::SerializeContext& ctx)
|
||||||
RsGenericSerializer::SerializeContext& ctx)
|
|
||||||
{
|
{
|
||||||
RS_REGISTER_SERIAL_MEMBER_TYPED(status, uint8_t);
|
RS_REGISTER_SERIAL_MEMBER_TYPED(status, uint8_t);
|
||||||
RS_REGISTER_SERIAL_MEMBER(recipient);
|
RS_REGISTER_SERIAL_MEMBER(recipient);
|
||||||
@ -51,3 +51,17 @@ void OutgoingRecord::serial_process(RsGenericSerializer::SerializeJob j,
|
|||||||
RS_REGISTER_SERIAL_MEMBER_TYPED(clientService, uint16_t);
|
RS_REGISTER_SERIAL_MEMBER_TYPED(clientService, uint16_t);
|
||||||
RS_REGISTER_SERIAL_MEMBER(presignedReceipt);
|
RS_REGISTER_SERIAL_MEMBER(presignedReceipt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void OutgoingRecord::serial_process(RsGenericSerializer::SerializeJob j,
|
||||||
|
RsGenericSerializer::SerializeContext& ctx)
|
||||||
|
{
|
||||||
|
RS_REGISTER_SERIAL_MEMBER_TYPED(status, uint8_t);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER(recipient);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER(author);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER(group_id);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER(sent_ts);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER(mailItem);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER(mailData);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER_TYPED(clientService, uint16_t);
|
||||||
|
RS_REGISTER_SERIAL_MEMBER(presignedReceipt);
|
||||||
|
}
|
||||||
|
@ -36,14 +36,14 @@ public:
|
|||||||
virtual ~RsNxsTransPresignedReceipt() {}
|
virtual ~RsNxsTransPresignedReceipt() {}
|
||||||
};
|
};
|
||||||
|
|
||||||
class RsGxsTransBaseItem : public RsGxsMsgItem
|
class RsGxsTransBaseMsgItem : public RsGxsMsgItem
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RsGxsTransBaseItem(GxsTransItemsSubtypes subtype) :
|
RsGxsTransBaseMsgItem(GxsTransItemsSubtypes subtype) :
|
||||||
RsGxsMsgItem( RS_SERVICE_TYPE_GXS_TRANS,
|
RsGxsMsgItem( RS_SERVICE_TYPE_GXS_TRANS,
|
||||||
static_cast<uint8_t>(subtype) ), mailId(0) {}
|
static_cast<uint8_t>(subtype) ), mailId(0) {}
|
||||||
|
|
||||||
virtual ~RsGxsTransBaseItem() {}
|
virtual ~RsGxsTransBaseMsgItem() {}
|
||||||
|
|
||||||
RsGxsTransId mailId;
|
RsGxsTransId mailId;
|
||||||
|
|
||||||
@ -58,10 +58,10 @@ public:
|
|||||||
{ RS_REGISTER_SERIAL_MEMBER_TYPED(mailId, uint64_t); }
|
{ RS_REGISTER_SERIAL_MEMBER_TYPED(mailId, uint64_t); }
|
||||||
};
|
};
|
||||||
|
|
||||||
class RsGxsTransPresignedReceipt : public RsGxsTransBaseItem
|
class RsGxsTransPresignedReceipt : public RsGxsTransBaseMsgItem
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RsGxsTransPresignedReceipt() : RsGxsTransBaseItem(GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT) {}
|
RsGxsTransPresignedReceipt() : RsGxsTransBaseMsgItem(GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT) {}
|
||||||
virtual ~RsGxsTransPresignedReceipt() {}
|
virtual ~RsGxsTransPresignedReceipt() {}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -72,11 +72,11 @@ enum class RsGxsTransEncryptionMode : uint8_t
|
|||||||
UNDEFINED_ENCRYPTION = 250
|
UNDEFINED_ENCRYPTION = 250
|
||||||
};
|
};
|
||||||
|
|
||||||
class RsGxsTransMailItem : public RsGxsTransBaseItem
|
class RsGxsTransMailItem : public RsGxsTransBaseMsgItem
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RsGxsTransMailItem() :
|
RsGxsTransMailItem() :
|
||||||
RsGxsTransBaseItem(GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL),
|
RsGxsTransBaseMsgItem(GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL),
|
||||||
cryptoType(RsGxsTransEncryptionMode::UNDEFINED_ENCRYPTION) {}
|
cryptoType(RsGxsTransEncryptionMode::UNDEFINED_ENCRYPTION) {}
|
||||||
|
|
||||||
virtual ~RsGxsTransMailItem() {}
|
virtual ~RsGxsTransMailItem() {}
|
||||||
@ -139,7 +139,7 @@ public:
|
|||||||
void serial_process( RsGenericSerializer::SerializeJob j,
|
void serial_process( RsGenericSerializer::SerializeJob j,
|
||||||
RsGenericSerializer::SerializeContext& ctx )
|
RsGenericSerializer::SerializeContext& ctx )
|
||||||
{
|
{
|
||||||
RsGxsTransBaseItem::serial_process(j, ctx);
|
RsGxsTransBaseMsgItem::serial_process(j, ctx);
|
||||||
RS_REGISTER_SERIAL_MEMBER_TYPED(cryptoType, uint8_t);
|
RS_REGISTER_SERIAL_MEMBER_TYPED(cryptoType, uint8_t);
|
||||||
RS_REGISTER_SERIAL_MEMBER(recipientHint);
|
RS_REGISTER_SERIAL_MEMBER(recipientHint);
|
||||||
RS_REGISTER_SERIAL_MEMBER(payload);
|
RS_REGISTER_SERIAL_MEMBER(payload);
|
||||||
@ -147,7 +147,7 @@ public:
|
|||||||
|
|
||||||
void clear()
|
void clear()
|
||||||
{
|
{
|
||||||
RsGxsTransBaseItem::clear();
|
RsGxsTransBaseMsgItem::clear();
|
||||||
cryptoType = RsGxsTransEncryptionMode::UNDEFINED_ENCRYPTION;
|
cryptoType = RsGxsTransEncryptionMode::UNDEFINED_ENCRYPTION;
|
||||||
recipientHint.clear();
|
recipientHint.clear();
|
||||||
payload.clear();
|
payload.clear();
|
||||||
@ -183,6 +183,33 @@ public:
|
|||||||
|
|
||||||
class RsGxsTransSerializer;
|
class RsGxsTransSerializer;
|
||||||
|
|
||||||
|
class OutgoingRecord_deprecated : public RsItem
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
OutgoingRecord_deprecated( RsGxsId rec, GxsTransSubServices cs, const uint8_t* data, uint32_t size );
|
||||||
|
|
||||||
|
virtual ~OutgoingRecord_deprecated() {}
|
||||||
|
|
||||||
|
GxsTransSendStatus status;
|
||||||
|
RsGxsId recipient;
|
||||||
|
/// Don't use a pointer would be invalid after publish
|
||||||
|
RsGxsTransMailItem mailItem;
|
||||||
|
|
||||||
|
std::vector<uint8_t> mailData;
|
||||||
|
GxsTransSubServices clientService;
|
||||||
|
|
||||||
|
RsNxsTransPresignedReceipt presignedReceipt;
|
||||||
|
|
||||||
|
void serial_process( RsGenericSerializer::SerializeJob j,
|
||||||
|
RsGenericSerializer::SerializeContext& ctx );
|
||||||
|
|
||||||
|
void clear() {}
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class RsGxsTransSerializer;
|
||||||
|
OutgoingRecord_deprecated();
|
||||||
|
};
|
||||||
|
|
||||||
class OutgoingRecord : public RsItem
|
class OutgoingRecord : public RsItem
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -193,10 +220,17 @@ public:
|
|||||||
|
|
||||||
GxsTransSendStatus status;
|
GxsTransSendStatus status;
|
||||||
RsGxsId recipient;
|
RsGxsId recipient;
|
||||||
|
|
||||||
|
RsGxsId author; // These 3 fields cannot be stored in mailItem.meta, which is not serialised.
|
||||||
|
RsGxsGroupId group_id ;
|
||||||
|
uint32_t sent_ts ;
|
||||||
|
|
||||||
/// Don't use a pointer would be invalid after publish
|
/// Don't use a pointer would be invalid after publish
|
||||||
RsGxsTransMailItem mailItem;
|
RsGxsTransMailItem mailItem;
|
||||||
|
|
||||||
std::vector<uint8_t> mailData;
|
std::vector<uint8_t> mailData;
|
||||||
GxsTransSubServices clientService;
|
GxsTransSubServices clientService;
|
||||||
|
|
||||||
RsNxsTransPresignedReceipt presignedReceipt;
|
RsNxsTransPresignedReceipt presignedReceipt;
|
||||||
|
|
||||||
void serial_process( RsGenericSerializer::SerializeJob j,
|
void serial_process( RsGenericSerializer::SerializeJob j,
|
||||||
@ -225,6 +259,7 @@ public:
|
|||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL: return new RsGxsTransMailItem();
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL: return new RsGxsTransMailItem();
|
||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT: return new RsGxsTransPresignedReceipt();
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT: return new RsGxsTransPresignedReceipt();
|
||||||
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_GROUP: return new RsGxsTransGroupItem();
|
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_GROUP: return new RsGxsTransGroupItem();
|
||||||
|
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM_deprecated: return new OutgoingRecord_deprecated();
|
||||||
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM: return new OutgoingRecord();
|
case GxsTransItemsSubtypes::OUTGOING_RECORD_ITEM: return new OutgoingRecord();
|
||||||
default: return NULL;
|
default: return NULL;
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,8 @@ enum class GxsTransItemsSubtypes : uint8_t
|
|||||||
GXS_TRANS_SUBTYPE_MAIL = 0x01,
|
GXS_TRANS_SUBTYPE_MAIL = 0x01,
|
||||||
GXS_TRANS_SUBTYPE_RECEIPT = 0x02,
|
GXS_TRANS_SUBTYPE_RECEIPT = 0x02,
|
||||||
GXS_TRANS_SUBTYPE_GROUP = 0x03,
|
GXS_TRANS_SUBTYPE_GROUP = 0x03,
|
||||||
OUTGOING_RECORD_ITEM = 0x04
|
OUTGOING_RECORD_ITEM_deprecated = 0x04,
|
||||||
|
OUTGOING_RECORD_ITEM = 0x05
|
||||||
};
|
};
|
||||||
|
|
||||||
enum class GxsTransSendStatus : uint8_t
|
enum class GxsTransSendStatus : uint8_t
|
||||||
|
Loading…
x
Reference in New Issue
Block a user