added calls to indicateConfigChanged() to force saving GxsTrans outgoing records when updated

This commit is contained in:
csoler 2017-06-05 21:15:42 +02:00
parent 1967204821
commit b7c09bd6cf
2 changed files with 46 additions and 17 deletions

View File

@ -100,6 +100,8 @@ bool p3GxsTrans::sendData( RsGxsTransId& mailId,
} }
mailId = pr.mailItem.mailId; mailId = pr.mailItem.mailId;
IndicateConfigChanged(); // This causes the saving of the message after all data has been filled in.
return true; return true;
} }
@ -228,6 +230,7 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
{ {
RS_STACK_MUTEX(mIngoingMutex); RS_STACK_MUTEX(mIngoingMutex);
mIngoingQueue.insert(inMap::value_type(mb->mailId, mb)); mIngoingQueue.insert(inMap::value_type(mb->mailId, mb));
IndicateConfigChanged();
} }
else else
std::cerr << "p3GxsTrans::handleResponse(...) " std::cerr << "p3GxsTrans::handleResponse(...) "
@ -392,11 +395,20 @@ void p3GxsTrans::service_tick()
{ {
OutgoingRecord& pr(it->second); OutgoingRecord& pr(it->second);
GxsTransSendStatus oldStatus = pr.status; GxsTransSendStatus oldStatus = pr.status;
processOutgoingRecord(pr);
locked_processOutgoingRecord(pr);
bool changed = false ;
if (oldStatus != pr.status) notifyClientService(pr); if (oldStatus != pr.status) notifyClientService(pr);
if( pr.status >= GxsTransSendStatus::RECEIPT_RECEIVED ) if( pr.status >= GxsTransSendStatus::RECEIPT_RECEIVED )
{
it = mOutgoingQueue.erase(it); it = mOutgoingQueue.erase(it);
changed = true ;
}
else ++it; else ++it;
if(changed)
IndicateConfigChanged();
} }
} }
@ -410,8 +422,8 @@ void p3GxsTrans::service_tick()
{ {
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL: case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_MAIL:
{ {
RsGxsTransMailItem* msg = RsGxsTransMailItem* msg = dynamic_cast<RsGxsTransMailItem*>(it->second);
dynamic_cast<RsGxsTransMailItem*>(it->second);
if(!msg) if(!msg)
{ {
std::cerr << "p3GxsTrans::service_tick() (EE) " std::cerr << "p3GxsTrans::service_tick() (EE) "
@ -436,8 +448,8 @@ void p3GxsTrans::service_tick()
} }
case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT: case GxsTransItemsSubtypes::GXS_TRANS_SUBTYPE_RECEIPT:
{ {
RsGxsTransPresignedReceipt* rcpt = RsGxsTransPresignedReceipt* rcpt = dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
if(!rcpt) if(!rcpt)
{ {
std::cerr << "p3GxsTrans::service_tick() (EE) " std::cerr << "p3GxsTrans::service_tick() (EE) "
@ -468,6 +480,8 @@ void p3GxsTrans::service_tick()
} }
delete it->second; it = mIngoingQueue.erase(it); delete it->second; it = mIngoingQueue.erase(it);
IndicateConfigChanged();
} }
} }
} }
@ -678,7 +692,7 @@ bool p3GxsTrans::dispatchDecryptedMail( const RsGxsId& authorId,
} }
} }
void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr) void p3GxsTrans::locked_processOutgoingRecord(OutgoingRecord& pr)
{ {
//std::cout << "p3GxsTrans::processRecord(...)" << std::endl; //std::cout << "p3GxsTrans::processRecord(...)" << std::endl;
@ -807,6 +821,8 @@ void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr)
uint32_t token; uint32_t token;
publishMsg(token, new RsGxsTransMailItem(pr.mailItem)); publishMsg(token, new RsGxsTransMailItem(pr.mailItem));
pr.status = GxsTransSendStatus::PENDING_RECEIPT_RECEIVE; pr.status = GxsTransSendStatus::PENDING_RECEIPT_RECEIVE;
IndicateConfigChanged(); // This causes the saving of the message after all data has been filled in.
break; break;
} }
//case GxsTransSendStatus::PENDING_TRANSFER: //case GxsTransSendStatus::PENDING_TRANSFER:
@ -814,17 +830,24 @@ void p3GxsTrans::processOutgoingRecord(OutgoingRecord& pr)
{ {
RS_STACK_MUTEX(mIngoingMutex); RS_STACK_MUTEX(mIngoingMutex);
auto range = mIngoingQueue.equal_range(pr.mailItem.mailId); auto range = mIngoingQueue.equal_range(pr.mailItem.mailId);
bool changed = false ;
for( auto it = range.first; it != range.second; ++it) for( auto it = range.first; it != range.second; ++it)
{ {
RsGxsTransPresignedReceipt* rt = RsGxsTransPresignedReceipt* rt = dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
dynamic_cast<RsGxsTransPresignedReceipt*>(it->second);
if(rt && mIdService.isOwnId(rt->meta.mAuthorId)) if(rt && mIdService.isOwnId(rt->meta.mAuthorId))
{ {
mIngoingQueue.erase(it); delete rt; mIngoingQueue.erase(it); delete rt;
pr.status = GxsTransSendStatus::RECEIPT_RECEIVED; pr.status = GxsTransSendStatus::RECEIPT_RECEIVED;
changed = true ;
break; break;
} }
} }
if(changed)
IndicateConfigChanged();
// TODO: Resend message if older then treshold // TODO: Resend message if older then treshold
break; break;
} }
@ -875,19 +898,23 @@ RsSerialiser* p3GxsTrans::setupSerialiser()
bool p3GxsTrans::saveList(bool &cleanup, std::list<RsItem *>& saveList) bool p3GxsTrans::saveList(bool &cleanup, std::list<RsItem *>& saveList)
{ {
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIngoingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
<< mIngoingQueue.size() << " " << mOutgoingQueue.size()
<< std::endl;
mOutgoingMutex.lock(); mOutgoingMutex.lock();
mIngoingMutex.lock(); mIngoingMutex.lock();
for ( auto& kv : mOutgoingQueue ) saveList.push_back(&kv.second); for ( auto& kv : mOutgoingQueue )
for ( auto& kv : mIngoingQueue ) saveList.push_back(kv.second); {
std::cerr << "Saving outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::dec << "Group id: " << kv.second.mailItem.meta.mGroupId << ", TS=" << kv.second.mailItem.meta.mPublishTs << std::endl;
saveList.push_back(&kv.second);
}
for ( auto& kv : mIngoingQueue )
{
std::cerr << "Saving incoming item, ID " << std::hex << std::setfill('0') << std::setw(16) << kv.first << std::endl;
saveList.push_back(kv.second);
}
std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " std::cout << "p3GxsTrans::saveList(...)" << saveList.size() << " " << mIngoingQueue.size() << " " << mOutgoingQueue.size() << std::endl;
<< mIngoingQueue.size() << " " << mOutgoingQueue.size()
<< std::endl;
cleanup = false; cleanup = false;
return true; return true;
@ -927,6 +954,8 @@ bool p3GxsTrans::loadList(std::list<RsItem *>&loadList)
RS_STACK_MUTEX(mOutgoingMutex); RS_STACK_MUTEX(mOutgoingMutex);
mOutgoingQueue.insert( mOutgoingQueue.insert(
prMap::value_type(ot->mailItem.mailId, *ot)); prMap::value_type(ot->mailItem.mailId, *ot));
std::cerr << "Loading outgoing item, ID " << std::hex << std::setfill('0') << std::setw(16) << ot->mailItem.mailId<< std::dec << "Group id: " << ot->mailItem.meta.mGroupId << ", TS=" << ot->mailItem.meta.mPublishTs << std::endl;
} }
delete v; delete v;
break; break;

View File

@ -193,7 +193,7 @@ private:
typedef std::map<RsGxsTransId, OutgoingRecord> prMap; typedef std::map<RsGxsTransId, OutgoingRecord> prMap;
prMap mOutgoingQueue; prMap mOutgoingQueue;
RsMutex mOutgoingMutex; RsMutex mOutgoingMutex;
void processOutgoingRecord(OutgoingRecord& r); void locked_processOutgoingRecord(OutgoingRecord& r);
/** /**
* @brief Ingoing mail and receipt processing queue. * @brief Ingoing mail and receipt processing queue.