mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-08-05 21:04:14 -04:00
Merge pull request #861 from RetroShare/v0.6-GxsTransport
V0.6 gxs transport
This commit is contained in:
commit
1ebcc6006b
58 changed files with 4372 additions and 1167 deletions
|
@ -1130,13 +1130,20 @@ bool p3IdService::validateData(const uint8_t *data,uint32_t data_size,const RsTl
|
|||
timeStampKey(signature.keyId,info);
|
||||
return true ;
|
||||
}
|
||||
bool p3IdService::encryptData(const uint8_t *decrypted_data,uint32_t decrypted_data_size,uint8_t *& encrypted_data,uint32_t& encrypted_data_size,const RsGxsId& encryption_key_id,bool force_load,uint32_t& error_status)
|
||||
|
||||
bool p3IdService::encryptData( const uint8_t *decrypted_data,
|
||||
uint32_t decrypted_data_size,
|
||||
uint8_t *& encrypted_data,
|
||||
uint32_t& encrypted_data_size,
|
||||
const RsGxsId& encryption_key_id,
|
||||
uint32_t& error_status,
|
||||
bool force_load )
|
||||
{
|
||||
RsTlvPublicRSAKey encryption_key ;
|
||||
|
||||
// get the key, and let the cache find it.
|
||||
for(int i=0;i<(force_load?6:1);++i)
|
||||
if(getKey(encryption_key_id,encryption_key))
|
||||
for(int i=0; i<(force_load?6:1);++i)
|
||||
if(getKey(encryption_key_id,encryption_key))
|
||||
break ;
|
||||
else
|
||||
usleep(500*1000) ; // sleep half a sec.
|
||||
|
@ -1160,18 +1167,110 @@ bool p3IdService::encryptData(const uint8_t *decrypted_data,uint32_t decrypted_d
|
|||
return true ;
|
||||
}
|
||||
|
||||
bool p3IdService::decryptData(const uint8_t *encrypted_data,uint32_t encrypted_data_size,uint8_t *& decrypted_data,uint32_t& decrypted_size,const RsGxsId& key_id,uint32_t& error_status)
|
||||
bool p3IdService::encryptData( const uint8_t* decrypted_data,
|
||||
uint32_t decrypted_data_size,
|
||||
uint8_t*& encrypted_data,
|
||||
uint32_t& encrypted_data_size,
|
||||
const std::set<RsGxsId>& encrypt_ids,
|
||||
uint32_t& error_status, bool force_load )
|
||||
{
|
||||
std::set<const RsGxsId*> keyNotYetFoundIds;
|
||||
|
||||
for( std::set<RsGxsId>::const_iterator it = encrypt_ids.begin();
|
||||
it != encrypt_ids.end(); ++it )
|
||||
{
|
||||
const RsGxsId& gId(*it);
|
||||
if(gId.isNull())
|
||||
{
|
||||
std::cerr << "p3IdService::encryptData(...) (EE) got null GXS id"
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
else keyNotYetFoundIds.insert(&gId);
|
||||
}
|
||||
|
||||
if(keyNotYetFoundIds.empty())
|
||||
{
|
||||
std::cerr << "p3IdService::encryptData(...) (EE) got empty GXS ids set"
|
||||
<< std::endl;
|
||||
print_stacktrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<RsTlvPublicRSAKey> encryption_keys;
|
||||
int maxRounds = force_load ? 6 : 1;
|
||||
for( int i=0; i < maxRounds; ++i )
|
||||
{
|
||||
for( std::set<const RsGxsId*>::iterator it = keyNotYetFoundIds.begin();
|
||||
it !=keyNotYetFoundIds.end(); ++it )
|
||||
{
|
||||
RsTlvPublicRSAKey encryption_key;
|
||||
if(getKey(**it, encryption_key) && !encryption_key.keyId.isNull())
|
||||
{
|
||||
encryption_keys.push_back(encryption_key);
|
||||
keyNotYetFoundIds.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
if(keyNotYetFoundIds.empty()) break;
|
||||
else usleep(500*1000);
|
||||
}
|
||||
|
||||
if(!keyNotYetFoundIds.empty())
|
||||
{
|
||||
std::cerr << "p3IdService::encryptData(...) (EE) Cannot get "
|
||||
<< "encryption key for: ";
|
||||
for( std::set<const RsGxsId*>::iterator it = keyNotYetFoundIds.begin();
|
||||
it !=keyNotYetFoundIds.end(); ++it )
|
||||
std::cerr << **it << " ";
|
||||
std::cerr << std::endl;
|
||||
print_stacktrace();
|
||||
|
||||
error_status = RS_GIXS_ERROR_KEY_NOT_AVAILABLE;
|
||||
return false;
|
||||
}
|
||||
|
||||
if(!GxsSecurity::encrypt( encrypted_data, encrypted_data_size,
|
||||
decrypted_data, decrypted_data_size,
|
||||
encryption_keys ))
|
||||
{
|
||||
std::cerr << "p3IdService::encryptData(...) (EE) Encryption failed."
|
||||
<< std::endl;
|
||||
print_stacktrace();
|
||||
|
||||
error_status = RS_GIXS_ERROR_UNKNOWN;
|
||||
return false ;
|
||||
}
|
||||
|
||||
for( std::set<RsGxsId>::const_iterator it = encrypt_ids.begin();
|
||||
it != encrypt_ids.end(); ++it )
|
||||
{
|
||||
timeStampKey( *it,
|
||||
RsIdentityUsage(
|
||||
serviceType(),
|
||||
RsIdentityUsage::IDENTITY_GENERIC_ENCRYPTION ) );
|
||||
}
|
||||
|
||||
error_status = RS_GIXS_ERROR_NO_ERROR;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool p3IdService::decryptData( const uint8_t *encrypted_data,
|
||||
uint32_t encrypted_data_size,
|
||||
uint8_t *& decrypted_data,
|
||||
uint32_t& decrypted_size,
|
||||
const RsGxsId& key_id, uint32_t& error_status,
|
||||
bool force_load )
|
||||
{
|
||||
RsTlvPrivateRSAKey encryption_key ;
|
||||
|
||||
// Get the key, and let the cache find it. It's our own key, so we should be able to find it, even if it takes
|
||||
// some seconds.
|
||||
|
||||
for(int i=0;i<4;++i)
|
||||
if(getPrivateKey(key_id,encryption_key))
|
||||
break ;
|
||||
else
|
||||
usleep(500*1000) ; // sleep half a sec.
|
||||
int maxRounds = force_load ? 6 : 1;
|
||||
for(int i=0; i<maxRounds ;++i)
|
||||
if(getPrivateKey(key_id,encryption_key)) break;
|
||||
else usleep(500*1000) ; // sleep half a sec.
|
||||
|
||||
if(encryption_key.keyId.isNull())
|
||||
{
|
||||
|
@ -1185,13 +1284,106 @@ bool p3IdService::decryptData(const uint8_t *encrypted_data,uint32_t encrypted_d
|
|||
std::cerr << " (EE) Decryption failed." << std::endl;
|
||||
error_status = RS_GIXS_ERROR_UNKNOWN ;
|
||||
return false ;
|
||||
}
|
||||
error_status = RS_GIXS_ERROR_NO_ERROR ;
|
||||
timeStampKey(key_id,RsIdentityUsage(serviceType(),RsIdentityUsage::IDENTITY_GENERIC_DECRYPTION)) ;
|
||||
}
|
||||
error_status = RS_GIXS_ERROR_NO_ERROR;
|
||||
timeStampKey( key_id,
|
||||
RsIdentityUsage(
|
||||
serviceType(),
|
||||
RsIdentityUsage::IDENTITY_GENERIC_DECRYPTION) );
|
||||
|
||||
return true ;
|
||||
}
|
||||
|
||||
bool p3IdService::decryptData( const uint8_t* encrypted_data,
|
||||
uint32_t encrypted_data_size,
|
||||
uint8_t*& decrypted_data,
|
||||
uint32_t& decrypted_data_size,
|
||||
const std::set<RsGxsId>& decrypt_ids,
|
||||
uint32_t& error_status,
|
||||
bool force_load )
|
||||
{
|
||||
std::set<const RsGxsId*> keyNotYetFoundIds;
|
||||
|
||||
for( std::set<RsGxsId>::const_iterator it = decrypt_ids.begin();
|
||||
it != decrypt_ids.end(); ++it )
|
||||
{
|
||||
const RsGxsId& gId(*it);
|
||||
if(gId.isNull())
|
||||
{
|
||||
std::cerr << "p3IdService::decryptData(...) (EE) got null GXS id"
|
||||
<< std::endl;
|
||||
print_stacktrace();
|
||||
return false;
|
||||
}
|
||||
else keyNotYetFoundIds.insert(&gId);
|
||||
}
|
||||
|
||||
if(keyNotYetFoundIds.empty())
|
||||
{
|
||||
std::cerr << "p3IdService::decryptData(...) (EE) got empty GXS ids set"
|
||||
<< std::endl;
|
||||
print_stacktrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<RsTlvPrivateRSAKey> decryption_keys;
|
||||
int maxRounds = force_load ? 6 : 1;
|
||||
for( int i=0; i < maxRounds; ++i )
|
||||
{
|
||||
for( std::set<const RsGxsId*>::iterator it = keyNotYetFoundIds.begin();
|
||||
it !=keyNotYetFoundIds.end(); ++it )
|
||||
{
|
||||
RsTlvPrivateRSAKey decryption_key;
|
||||
if( getPrivateKey(**it, decryption_key)
|
||||
&& !decryption_key.keyId.isNull() )
|
||||
{
|
||||
decryption_keys.push_back(decryption_key);
|
||||
keyNotYetFoundIds.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
if(keyNotYetFoundIds.empty()) break;
|
||||
else usleep(500*1000);
|
||||
}
|
||||
|
||||
if(!keyNotYetFoundIds.empty())
|
||||
{
|
||||
std::cerr << "p3IdService::decryptData(...) (EE) Cannot get private key"
|
||||
<< " for: ";
|
||||
for( std::set<const RsGxsId*>::iterator it = keyNotYetFoundIds.begin();
|
||||
it !=keyNotYetFoundIds.end(); ++it )
|
||||
std::cerr << **it << " ";
|
||||
std::cerr << std::endl;
|
||||
print_stacktrace();
|
||||
|
||||
error_status = RS_GIXS_ERROR_KEY_NOT_AVAILABLE;
|
||||
return false;
|
||||
}
|
||||
|
||||
if(!GxsSecurity::decrypt( decrypted_data, decrypted_data_size,
|
||||
encrypted_data, encrypted_data_size,
|
||||
decryption_keys ))
|
||||
{
|
||||
std::cerr << "p3IdService::decryptData(...) (EE) Decryption failed."
|
||||
<< std::endl;
|
||||
print_stacktrace();
|
||||
|
||||
error_status = RS_GIXS_ERROR_UNKNOWN;
|
||||
return false ;
|
||||
}
|
||||
|
||||
for( std::set<RsGxsId>::const_iterator it = decrypt_ids.begin();
|
||||
it != decrypt_ids.end(); ++it )
|
||||
{
|
||||
timeStampKey( *it,
|
||||
RsIdentityUsage(
|
||||
serviceType(),
|
||||
RsIdentityUsage::IDENTITY_GENERIC_DECRYPTION ) );
|
||||
}
|
||||
|
||||
error_status = RS_GIXS_ERROR_NO_ERROR;
|
||||
return true;
|
||||
}
|
||||
|
||||
#ifdef TO_BE_REMOVED
|
||||
/********************************************************************************/
|
||||
|
|
|
@ -296,11 +296,48 @@ public:
|
|||
|
||||
virtual bool isOwnId(const RsGxsId& key_id) ;
|
||||
|
||||
virtual bool signData(const uint8_t *data,uint32_t data_size,const RsGxsId& signer_id,RsTlvKeySignature& signature,uint32_t& signing_error) ;
|
||||
virtual bool validateData(const uint8_t *data, uint32_t data_size, const RsTlvKeySignature& signature, bool force_load, const RsIdentityUsage &info, uint32_t& signing_error) ;
|
||||
virtual bool signData( const uint8_t* data,
|
||||
uint32_t data_size,
|
||||
const RsGxsId& signer_id,
|
||||
RsTlvKeySignature& signature,
|
||||
uint32_t& signing_error);
|
||||
|
||||
virtual bool validateData( const uint8_t *data, uint32_t data_size,
|
||||
const RsTlvKeySignature& signature,
|
||||
bool force_load, const RsIdentityUsage &info,
|
||||
uint32_t& signing_error );
|
||||
|
||||
virtual bool encryptData( const uint8_t* decrypted_data,
|
||||
uint32_t decrypted_data_size,
|
||||
uint8_t*& encrypted_data,
|
||||
uint32_t& encrypted_data_size,
|
||||
const RsGxsId& encryption_key_id,
|
||||
uint32_t& error_status,
|
||||
bool force_load = true );
|
||||
|
||||
bool encryptData( const uint8_t* decrypted_data,
|
||||
uint32_t decrypted_data_size,
|
||||
uint8_t*& encrypted_data,
|
||||
uint32_t& encrypted_data_size,
|
||||
const std::set<RsGxsId>& encrypt_ids,
|
||||
uint32_t& error_status, bool force_loa = true );
|
||||
|
||||
virtual bool decryptData( const uint8_t* encrypted_data,
|
||||
uint32_t encrypted_data_size,
|
||||
uint8_t*& decrypted_data,
|
||||
uint32_t& decrypted_data_size,
|
||||
const RsGxsId& decryption_key_id,
|
||||
uint32_t& error_status,
|
||||
bool force_load = true );
|
||||
|
||||
virtual bool decryptData(const uint8_t* encrypted_data,
|
||||
uint32_t encrypted_data_size,
|
||||
uint8_t*& decrypted_data,
|
||||
uint32_t& decrypted_data_size,
|
||||
const std::set<RsGxsId>& decrypt_ids,
|
||||
uint32_t& error_status,
|
||||
bool force_load = true );
|
||||
|
||||
virtual bool encryptData(const uint8_t *decrypted_data,uint32_t decrypted_data_size,uint8_t *& encrypted_data,uint32_t& encrypted_data_size,const RsGxsId& encryption_key_id,bool force_load,uint32_t& encryption_error) ;
|
||||
virtual bool decryptData(const uint8_t *encrypted_data,uint32_t encrypted_data_size,uint8_t *& decrypted_data,uint32_t& decrypted_data_size,const RsGxsId& encryption_key_id,uint32_t& encryption_error) ;
|
||||
|
||||
virtual bool haveKey(const RsGxsId &id);
|
||||
virtual bool havePrivateKey(const RsGxsId &id);
|
||||
|
@ -308,7 +345,9 @@ public:
|
|||
virtual bool getKey(const RsGxsId &id, RsTlvPublicRSAKey &key);
|
||||
virtual bool getPrivateKey(const RsGxsId &id, RsTlvPrivateRSAKey &key);
|
||||
|
||||
virtual bool requestKey(const RsGxsId &id, const std::list<RsPeerId> &peers, const RsIdentityUsage &use_info);
|
||||
virtual bool requestKey( const RsGxsId &id,
|
||||
const std::list<RsPeerId> &peers,
|
||||
const RsIdentityUsage &use_info );
|
||||
virtual bool requestPrivateKey(const RsGxsId &id);
|
||||
|
||||
virtual bool serialiseIdentityToMemory(const RsGxsId& id,std::string& radix_string);
|
||||
|
|
|
@ -66,6 +66,8 @@
|
|||
//#define DISABLE_DISTANT_MESSAGES
|
||||
//#define DEBUG_DISTANT_MSG
|
||||
|
||||
typedef unsigned int uint;
|
||||
|
||||
using namespace Rs::Msgs;
|
||||
|
||||
static struct RsLog::logInfo msgservicezoneInfo = {RsLog::Default, "msgservice"};
|
||||
|
@ -84,23 +86,29 @@ static const uint32_t RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME = 2*30*86400 ; // ke
|
|||
* (3) from storage...
|
||||
*/
|
||||
|
||||
p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv)
|
||||
:p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0)
|
||||
p3MsgService::p3MsgService( p3ServiceControl *sc, p3IdService *id_serv,
|
||||
p3GxsTrans& gxsMS )
|
||||
: p3Service(), p3Config(),
|
||||
gxsOngoingMutex("p3MsgService Gxs Outgoing Mutex"), mIdService(id_serv),
|
||||
mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0),
|
||||
recentlyReceivedMutex("p3MsgService recently received hash mutex"),
|
||||
mGxsTransServ(gxsMS)
|
||||
{
|
||||
_serialiser = new RsMsgSerialiser(RsServiceSerializer::SERIALIZATION_FLAG_NONE); // this serialiser is used for services. It's not the same than the one returned by setupSerialiser(). We need both!!
|
||||
addSerialType(_serialiser);
|
||||
|
||||
mMsgUniqueId = 1 ; // MsgIds are not transmitted, but only used locally as a storage index. As such, thay do not need to be different
|
||||
// at friends nodes.
|
||||
|
||||
mShouldEnableDistantMessaging = true ;
|
||||
mDistantMessagingEnabled = false ;
|
||||
mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ;
|
||||
/* MsgIds are not transmitted, but only used locally as a storage index.
|
||||
* As such, thay do not need to be different at friends nodes. */
|
||||
mMsgUniqueId = 1;
|
||||
|
||||
/* Initialize standard tag types */
|
||||
if(sc)
|
||||
initStandardTagTypes();
|
||||
mShouldEnableDistantMessaging = true;
|
||||
mDistantMessagingEnabled = false;
|
||||
mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE;
|
||||
|
||||
if(sc) initStandardTagTypes(); // Initialize standard tag types
|
||||
|
||||
mGxsTransServ.registerGxsTransClient( GxsTransSubServices::P3_MSG_SERVICE,
|
||||
this );
|
||||
}
|
||||
|
||||
const std::string MSG_APP_NAME = "msg";
|
||||
|
@ -142,11 +150,11 @@ int p3MsgService::tick()
|
|||
|
||||
if(now > last_management_time + 5)
|
||||
{
|
||||
manageDistantPeers() ;
|
||||
checkOutgoingMessages();
|
||||
cleanListOfReceivedMessageHashes();
|
||||
manageDistantPeers();
|
||||
checkOutgoingMessages();
|
||||
cleanListOfReceivedMessageHashes();
|
||||
|
||||
last_management_time = now ;
|
||||
last_management_time = now;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -154,21 +162,21 @@ int p3MsgService::tick()
|
|||
|
||||
void p3MsgService::cleanListOfReceivedMessageHashes()
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
RS_STACK_MUTEX(recentlyReceivedMutex);
|
||||
|
||||
time_t now = time(NULL) ;
|
||||
|
||||
for(std::map<Sha1CheckSum,uint32_t>::iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();)
|
||||
if(now > RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME + it->second)
|
||||
{
|
||||
std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(). Removing old hash " << it->first << ", aged " << now - it->second << " secs ago" << std::endl;
|
||||
std::map<Sha1CheckSum,uint32_t>::iterator tmp(it) ;
|
||||
++tmp ;
|
||||
mRecentlyReceivedDistantMessageHashes.erase(it) ;
|
||||
it=tmp ;
|
||||
}
|
||||
else
|
||||
++it ;
|
||||
time_t now = time(NULL);
|
||||
|
||||
for( auto it = mRecentlyReceivedMessageHashes.begin();
|
||||
it != mRecentlyReceivedMessageHashes.end(); )
|
||||
if( now > RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME + it->second )
|
||||
{
|
||||
std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(). "
|
||||
<< "Removing old hash " << it->first << ", aged "
|
||||
<< now - it->second << " secs ago" << std::endl;
|
||||
|
||||
it = mRecentlyReceivedMessageHashes.erase(it);
|
||||
}
|
||||
else ++it;
|
||||
}
|
||||
|
||||
int p3MsgService::status()
|
||||
|
@ -349,111 +357,112 @@ void p3MsgService::checkSizeAndSendMessage(RsMsgItem *msg)
|
|||
sendItem(msg) ;
|
||||
}
|
||||
|
||||
int p3MsgService::checkOutgoingMessages()
|
||||
int p3MsgService::checkOutgoingMessages()
|
||||
{
|
||||
/* iterate through the outgoing queue
|
||||
*
|
||||
* if online, send
|
||||
*/
|
||||
bool changed = false;
|
||||
std::list<RsMsgItem*> output_queue;
|
||||
|
||||
bool changed = false ;
|
||||
std::list<RsMsgItem*> output_queue ;
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
|
||||
const RsPeerId& ownId = mServiceCtrl->getOwnId();
|
||||
const RsPeerId& ownId = mServiceCtrl->getOwnId();
|
||||
|
||||
std::list<uint32_t>::iterator it;
|
||||
std::list<uint32_t> toErase;
|
||||
std::list<uint32_t>::iterator it;
|
||||
std::list<uint32_t> toErase;
|
||||
|
||||
std::map<uint32_t, RsMsgItem *>::iterator mit;
|
||||
for(mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit)
|
||||
{
|
||||
if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH)
|
||||
continue;
|
||||
std::map<uint32_t, RsMsgItem *>::iterator mit;
|
||||
for( mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit )
|
||||
{
|
||||
if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH) continue;
|
||||
|
||||
/* find the certificate */
|
||||
RsPeerId pid = mit->second->PeerId();
|
||||
bool should_send = false ;
|
||||
/* find the certificate */
|
||||
RsPeerId pid = mit->second->PeerId();
|
||||
bool should_send = false;
|
||||
|
||||
if( pid == ownId)
|
||||
should_send = true ;
|
||||
if( pid == ownId) should_send = true;
|
||||
|
||||
if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */
|
||||
should_send = true ;
|
||||
// FEEDBACK Msg to Ourselves
|
||||
if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType,
|
||||
pid) )
|
||||
should_send = true;
|
||||
|
||||
if((mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && !(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED))
|
||||
should_send = true ;
|
||||
if( (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) &&
|
||||
!(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED))
|
||||
should_send = true;
|
||||
|
||||
if(should_send)
|
||||
{
|
||||
/* send msg */
|
||||
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
|
||||
"p3MsgService::checkOutGoingMessages() Sending out message");
|
||||
/* remove the pending flag */
|
||||
if(should_send)
|
||||
{
|
||||
/* send msg */
|
||||
pqioutput( PQL_DEBUG_BASIC, msgservicezone,
|
||||
"p3MsgService::checkOutGoingMessages() Sending out message");
|
||||
/* remove the pending flag */
|
||||
|
||||
output_queue.push_back(mit->second) ;
|
||||
output_queue.push_back(mit->second) ;
|
||||
|
||||
// When the message is a distant msg, dont remove it yet from the list. Only mark it as being sent, so that we don't send it again.
|
||||
//
|
||||
if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT))
|
||||
{
|
||||
(mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING;
|
||||
toErase.push_back(mit->first);
|
||||
changed = true ;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* When the message is a distant msg, dont remove it yet from
|
||||
* the list. Only mark it as being sent, so that we don't send
|
||||
* it again. */
|
||||
if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT))
|
||||
{
|
||||
(mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING;
|
||||
toErase.push_back(mit->first);
|
||||
changed = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << "Message id " << mit->first << " is distant: kept in outgoing, and marked as ROUTED" << std::endl;
|
||||
std::cerr << "Message id " << mit->first << " is distant: "
|
||||
<< "kept in outgoing, and marked as ROUTED"
|
||||
<< std::endl;
|
||||
#endif
|
||||
mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED ;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
|
||||
"p3MsgService::checkOutGoingMessages() Delaying until available...");
|
||||
}
|
||||
}
|
||||
mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pqioutput( PQL_DEBUG_BASIC, msgservicezone,
|
||||
"p3MsgService::checkOutGoingMessages() Delaying until available...");
|
||||
}
|
||||
}
|
||||
|
||||
/* clean up */
|
||||
for(it = toErase.begin(); it != toErase.end(); ++it)
|
||||
{
|
||||
mit = msgOutgoing.find(*it);
|
||||
if (mit != msgOutgoing.end())
|
||||
{
|
||||
msgOutgoing.erase(mit);
|
||||
}
|
||||
/* clean up */
|
||||
for(it = toErase.begin(); it != toErase.end(); ++it)
|
||||
{
|
||||
mit = msgOutgoing.find(*it);
|
||||
if ( mit != msgOutgoing.end() ) msgOutgoing.erase(mit);
|
||||
|
||||
std::map<uint32_t, RsMsgSrcId*>::iterator srcIt = mSrcIds.find(*it);
|
||||
if (srcIt != mSrcIds.end()) {
|
||||
delete (srcIt->second);
|
||||
mSrcIds.erase(srcIt);
|
||||
}
|
||||
}
|
||||
std::map<uint32_t, RsMsgSrcId*>::iterator srcIt = mSrcIds.find(*it);
|
||||
if (srcIt != mSrcIds.end())
|
||||
{
|
||||
delete (srcIt->second);
|
||||
mSrcIds.erase(srcIt);
|
||||
}
|
||||
}
|
||||
|
||||
if (toErase.size() > 0)
|
||||
{
|
||||
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
|
||||
}
|
||||
}
|
||||
if (toErase.size() > 0) IndicateConfigChanged();
|
||||
}
|
||||
|
||||
for(std::list<RsMsgItem*>::const_iterator it(output_queue.begin());it!=output_queue.end();++it)
|
||||
if((*it)->msgFlags & RS_MSG_FLAGS_DISTANT) // don't split distant messages. The global router takes care of it.
|
||||
sendDistantMsgItem(*it) ;
|
||||
else
|
||||
checkSizeAndSendMessage(*it) ;
|
||||
for( std::list<RsMsgItem*>::const_iterator it(output_queue.begin());
|
||||
it != output_queue.end(); ++it )
|
||||
if( (*it)->msgFlags & RS_MSG_FLAGS_DISTANT ) // don't split distant messages. The global router takes care of it.
|
||||
sendDistantMsgItem(*it);
|
||||
else
|
||||
checkSizeAndSendMessage(*it);
|
||||
|
||||
if(changed)
|
||||
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD);
|
||||
if(changed)
|
||||
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD);
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
|
||||
bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
|
||||
{
|
||||
RsMsgGRouterMap* gxsmailmap = new RsMsgGRouterMap;
|
||||
{
|
||||
RS_STACK_MUTEX(gxsOngoingMutex);
|
||||
gxsmailmap->ongoing_msgs = gxsOngoingMessages;
|
||||
}
|
||||
itemList.push_front(gxsmailmap);
|
||||
|
||||
std::map<uint32_t, RsMsgItem *>::iterator mit;
|
||||
std::map<uint32_t, RsMsgTagType* >::iterator mit2;
|
||||
|
@ -461,9 +470,7 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
|
|||
std::map<uint32_t, RsMsgSrcId* >::iterator lit;
|
||||
std::map<uint32_t, RsMsgParentId* >::iterator mit4;
|
||||
|
||||
MsgTagType stdTags;
|
||||
|
||||
cleanup = true;
|
||||
cleanup = true;
|
||||
|
||||
mMsgMtx.lock();
|
||||
|
||||
|
@ -490,10 +497,13 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
|
|||
grmap->ongoing_msgs = _ongoing_messages ;
|
||||
|
||||
itemList.push_back(grmap) ;
|
||||
|
||||
RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap ;
|
||||
ghm->hash_map = mRecentlyReceivedDistantMessageHashes ;
|
||||
itemList.push_back(ghm) ;
|
||||
|
||||
RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap;
|
||||
{
|
||||
RS_STACK_MUTEX(recentlyReceivedMutex);
|
||||
ghm->hash_map = mRecentlyReceivedMessageHashes;
|
||||
}
|
||||
itemList.push_back(ghm);
|
||||
|
||||
RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ;
|
||||
RsTlvKeyValue kv;
|
||||
|
@ -505,7 +515,7 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
|
|||
kv.value = RsUtil::NumberToString(mDistantMessagePermissions) ;
|
||||
vitem->tlvkvs.pairs.push_back(kv) ;
|
||||
|
||||
itemList.push_back(vitem) ;
|
||||
itemList.push_back(vitem);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -567,8 +577,19 @@ void p3MsgService::initStandardTagTypes()
|
|||
}
|
||||
}
|
||||
|
||||
bool p3MsgService::loadList(std::list<RsItem*>& load)
|
||||
bool p3MsgService::loadList(std::list<RsItem*>& load)
|
||||
{
|
||||
auto gxsmIt = load.begin();
|
||||
RsMsgGRouterMap* gxsmailmap = dynamic_cast<RsMsgGRouterMap*>(*gxsmIt);
|
||||
if(gxsmailmap)
|
||||
{
|
||||
{
|
||||
RS_STACK_MUTEX(gxsOngoingMutex);
|
||||
gxsOngoingMessages = gxsmailmap->ongoing_msgs;
|
||||
}
|
||||
delete *gxsmIt; load.erase(gxsmIt);
|
||||
}
|
||||
|
||||
RsMsgItem *mitem;
|
||||
RsMsgTagType* mtt;
|
||||
RsMsgTags* mti;
|
||||
|
@ -578,7 +599,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||
RsMsgDistantMessagesHashMap *ghm;
|
||||
|
||||
std::list<RsMsgItem*> items;
|
||||
std::list<RsItem*>::iterator it;
|
||||
std::list<RsItem*>::iterator it;
|
||||
std::map<uint32_t, RsMsgTagType*>::iterator tagIt;
|
||||
std::map<uint32_t, RsPeerId> srcIdMsgMap;
|
||||
std::map<uint32_t, RsPeerId>::iterator srcIt;
|
||||
|
@ -586,9 +607,9 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||
uint32_t max_msg_id = 0 ;
|
||||
|
||||
// load items and calculate next unique msgId
|
||||
for(it = load.begin(); it != load.end(); ++it)
|
||||
for(it = load.begin(); it != load.end(); ++it)
|
||||
{
|
||||
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
|
||||
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
|
||||
{
|
||||
/* STORE MsgID */
|
||||
if (mitem->msgId > max_msg_id)
|
||||
|
@ -596,18 +617,21 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||
|
||||
items.push_back(mitem);
|
||||
}
|
||||
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
|
||||
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
|
||||
{
|
||||
// merge.
|
||||
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator bit(grm->ongoing_msgs.begin());bit!=grm->ongoing_msgs.end();++bit)
|
||||
_ongoing_messages.insert(*bit) ;
|
||||
|
||||
delete *it ;
|
||||
continue ;
|
||||
}
|
||||
else if(NULL != (ghm = dynamic_cast<RsMsgDistantMessagesHashMap*>(*it)))
|
||||
{
|
||||
mRecentlyReceivedDistantMessageHashes = ghm->hash_map ;
|
||||
typedef std::map<GRouterMsgPropagationId,uint32_t> tT;
|
||||
for( tT::const_iterator bit = grm->ongoing_msgs.begin();
|
||||
bit != grm->ongoing_msgs.end(); ++bit )
|
||||
_ongoing_messages.insert(*bit);
|
||||
delete *it;
|
||||
continue;
|
||||
}
|
||||
else if(NULL != (ghm = dynamic_cast<RsMsgDistantMessagesHashMap*>(*it)))
|
||||
{
|
||||
{
|
||||
RS_STACK_MUTEX(recentlyReceivedMutex);
|
||||
mRecentlyReceivedMessageHashes = ghm->hash_map;
|
||||
}
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << " loaded recently received message map: " << std::endl;
|
||||
|
||||
|
@ -632,23 +656,23 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||
}
|
||||
|
||||
}
|
||||
else if(NULL != (mti = dynamic_cast<RsMsgTags *>(*it)))
|
||||
else if(NULL != (mti = dynamic_cast<RsMsgTags *>(*it)))
|
||||
{
|
||||
mMsgTags.insert(std::pair<uint32_t, RsMsgTags* >(mti->msgId, mti));
|
||||
}
|
||||
else if(NULL != (msi = dynamic_cast<RsMsgSrcId *>(*it)))
|
||||
else if(NULL != (msi = dynamic_cast<RsMsgSrcId *>(*it)))
|
||||
{
|
||||
srcIdMsgMap.insert(std::pair<uint32_t, RsPeerId>(msi->msgId, msi->srcId));
|
||||
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi)); // does not need to be kept
|
||||
}
|
||||
else if(NULL != (msp = dynamic_cast<RsMsgParentId *>(*it)))
|
||||
else if(NULL != (msp = dynamic_cast<RsMsgParentId *>(*it)))
|
||||
{
|
||||
mParentId.insert(std::pair<uint32_t, RsMsgParentId*>(msp->msgId, msp));
|
||||
}
|
||||
|
||||
RsConfigKeyValueSet *vitem = NULL ;
|
||||
|
||||
if(NULL != (vitem = dynamic_cast<RsConfigKeyValueSet*>(*it)))
|
||||
if(NULL != (vitem = dynamic_cast<RsConfigKeyValueSet*>(*it)))
|
||||
{
|
||||
for(std::list<RsTlvKeyValue>::const_iterator kit = vitem->tlvkvs.pairs.begin(); kit != vitem->tlvkvs.pairs.end(); ++kit)
|
||||
{
|
||||
|
@ -679,7 +703,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||
}
|
||||
}
|
||||
|
||||
delete *it ;
|
||||
delete *it ;
|
||||
continue ;
|
||||
}
|
||||
}
|
||||
|
@ -697,7 +721,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||
mitem->msgId = getNewUniqueMsgId();
|
||||
}
|
||||
|
||||
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
|
||||
srcIt = srcIdMsgMap.find(mitem->msgId);
|
||||
if(srcIt != srcIdMsgMap.end()) {
|
||||
|
@ -1111,16 +1135,17 @@ uint32_t p3MsgService::sendMessage(RsMsgItem *item) // no from field because
|
|||
|
||||
return item->msgId;
|
||||
}
|
||||
uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& from)
|
||||
{
|
||||
if(!item)
|
||||
return 0 ;
|
||||
|
||||
item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */
|
||||
item->msgFlags |= (RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_OUTGOING | RS_MSG_FLAGS_PENDING); /* add pending flag */
|
||||
uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item, const RsGxsId& from)
|
||||
{
|
||||
if(!item) return 0;
|
||||
|
||||
item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */
|
||||
item->msgFlags |= ( RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_OUTGOING |
|
||||
RS_MSG_FLAGS_PENDING ); /* add pending flag */
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx) ;
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
|
||||
/* STORE MsgID */
|
||||
msgOutgoing[item->msgId] = item;
|
||||
|
@ -1132,17 +1157,16 @@ uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& fro
|
|||
|
||||
RsMsgSrcId* msi = new RsMsgSrcId();
|
||||
msi->msgId = item->msgId;
|
||||
msi->srcId = RsPeerId(from) ;
|
||||
msi->srcId = RsPeerId(from);
|
||||
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi));
|
||||
}
|
||||
}
|
||||
|
||||
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
|
||||
|
||||
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST, NOTIFY_TYPE_ADD);
|
||||
|
||||
RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
|
||||
NOTIFY_TYPE_ADD );
|
||||
return item->msgId;
|
||||
|
||||
}
|
||||
|
||||
bool p3MsgService::MessageSend(MessageInfo &info)
|
||||
|
@ -1857,73 +1881,91 @@ void p3MsgService::manageDistantPeers()
|
|||
}
|
||||
}
|
||||
|
||||
void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id, const RsGxsId &signer_id, uint32_t data_status)
|
||||
void p3MsgService::notifyDataStatus( const GRouterMsgPropagationId& id,
|
||||
const RsGxsId &signer_id,
|
||||
uint32_t data_status )
|
||||
{
|
||||
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
|
||||
std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells us that item ID " << id << " could not be delivered on time." ;
|
||||
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
|
||||
|
||||
if(it == _ongoing_messages.end())
|
||||
{
|
||||
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
|
||||
return ;
|
||||
}
|
||||
uint32_t msg_id = it->second ;
|
||||
std::cerr << " message id = " << msg_id << std::endl;
|
||||
mDistantOutgoingMsgSigners[msg_id] = signer_id ; // this is needed because it's not saved in config, but we should probably include it in _ongoing_messages
|
||||
std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells "
|
||||
<< "us that item ID " << id
|
||||
<< " could not be delivered on time.";
|
||||
|
||||
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id) ;
|
||||
auto it = _ongoing_messages.find(id);
|
||||
if(it == _ongoing_messages.end())
|
||||
{
|
||||
std::cerr << " (EE) cannot find pending message to acknowledge. "
|
||||
<< "Weird. grouter id = " << id << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
if(mit == msgOutgoing.end())
|
||||
{
|
||||
std::cerr << " (EE) message has been notified as not delivered, but it not on outgoing list. Something's wrong!!" << std::endl;
|
||||
return ;
|
||||
}
|
||||
std::cerr << " reseting the ROUTED flag so that the message is requested again" << std::endl;
|
||||
uint32_t msg_id = it->second;
|
||||
std::cerr << " message id = " << msg_id << std::endl;
|
||||
|
||||
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED ; // clear the routed flag so that the message is requested again
|
||||
return ;
|
||||
}
|
||||
|
||||
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED)
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||
/* this is needed because it's not saved in config, but we should
|
||||
* probably include it in _ongoing_messages */
|
||||
mDistantOutgoingMsgSigners[msg_id] = signer_id;
|
||||
|
||||
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id);
|
||||
if(mit == msgOutgoing.end())
|
||||
{
|
||||
std::cerr << " (II) message has been notified as not delivered, "
|
||||
<< "but it's not in outgoing list. Probably it has been "
|
||||
<< "delivered successfully by other means."
|
||||
<< std::endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << " reseting the ROUTED flag so that the message is "
|
||||
<< "requested again" << std::endl;
|
||||
|
||||
// clear the routed flag so that the message is requested again
|
||||
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED)
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
|
||||
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
|
||||
#endif
|
||||
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
|
||||
auto it = _ongoing_messages.find(id);
|
||||
if(it == _ongoing_messages.end())
|
||||
{
|
||||
std::cerr << " (EE) cannot find pending message to acknowledge. "
|
||||
<< "Weird. grouter id = " << id << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
if(it == _ongoing_messages.end())
|
||||
{
|
||||
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
|
||||
return ;
|
||||
}
|
||||
uint32_t msg_id = it->second;
|
||||
|
||||
uint32_t msg_id = it->second ;
|
||||
// we should now remove the item from the msgOutgoing list.
|
||||
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id);
|
||||
if(it2 == msgOutgoing.end())
|
||||
{
|
||||
std::cerr << "(II) message has been notified as delivered, but it's"
|
||||
<< " not in outgoing list. Probably it has been delivered"
|
||||
<< " successfully by other means." << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
// we should now remove the item from the msgOutgoing list.
|
||||
delete it2->second;
|
||||
msgOutgoing.erase(it2);
|
||||
|
||||
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id) ;
|
||||
RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
|
||||
NOTIFY_TYPE_ADD );
|
||||
IndicateConfigChanged();
|
||||
|
||||
if(it2 == msgOutgoing.end())
|
||||
{
|
||||
std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
delete it2->second ;
|
||||
msgOutgoing.erase(it2) ;
|
||||
|
||||
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
|
||||
IndicateConfigChanged() ;
|
||||
|
||||
return ;
|
||||
}
|
||||
std::cerr << "p3MsgService: unhandled data status info from global router for msg ID " << id << ": this is a bug." << std::endl;
|
||||
return;
|
||||
}
|
||||
std::cerr << "p3MsgService: unhandled data status info from global router"
|
||||
<< " for msg ID " << id << ": this is a bug." << std::endl;
|
||||
}
|
||||
|
||||
bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id)
|
||||
{
|
||||
if(mDistantMessagePermissions & RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NON_CONTACTS)
|
||||
|
@ -1949,81 +1991,256 @@ uint32_t p3MsgService::getDistantMessagingPermissionFlags()
|
|||
{
|
||||
return mDistantMessagePermissions ;
|
||||
}
|
||||
|
||||
void p3MsgService::receiveGRouterData(const RsGxsId &destination_key, const RsGxsId &signing_key, GRouterServiceId &/*client_id*/, uint8_t *data, uint32_t data_size)
|
||||
|
||||
bool p3MsgService::receiveGxsTransMail( const RsGxsId& authorId,
|
||||
const RsGxsId& recipientId,
|
||||
const uint8_t* data, uint32_t dataSize )
|
||||
{
|
||||
std::cerr << "p3MsgService::receiveGRouterData(): received message item of size " << data_size << ", for key " << destination_key << std::endl;
|
||||
std::cout << __PRETTY_FUNCTION__ << " " << authorId << ", " << recipientId
|
||||
<< ",, " << dataSize << std::endl;
|
||||
|
||||
// first make sure that we havn't already received the data. Since we allow to re-send messages, it's necessary to check.
|
||||
|
||||
Sha1CheckSum hash = RsDirUtil::sha1sum(data,data_size) ;
|
||||
|
||||
if(mRecentlyReceivedDistantMessageHashes.find(hash) != mRecentlyReceivedDistantMessageHashes.end())
|
||||
{
|
||||
std::cerr << "(WW) receiving distant message of hash " << hash << " more than once. This is not a bug, unless it happens very often." << std::endl;
|
||||
Sha1CheckSum hash = RsDirUtil::sha1sum(data, dataSize);
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(recentlyReceivedMutex);
|
||||
if( mRecentlyReceivedMessageHashes.find(hash) !=
|
||||
mRecentlyReceivedMessageHashes.end() )
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << " (II) receiving "
|
||||
<< "message of hash " << hash << " more than once. "
|
||||
<< "Probably it has arrived before by other means."
|
||||
<< std::endl;
|
||||
return true;
|
||||
}
|
||||
mRecentlyReceivedMessageHashes[hash] = time(NULL);
|
||||
}
|
||||
|
||||
IndicateConfigChanged();
|
||||
|
||||
RsItem *item = _serialiser->deserialise(const_cast<uint8_t*>(data), &dataSize);
|
||||
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item);
|
||||
|
||||
if(msg_item)
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << " Encrypted item correctly "
|
||||
<< "deserialised. Passing on to incoming list."
|
||||
<< std::endl;
|
||||
|
||||
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT;
|
||||
/* we expect complete msgs - remove partial flag just in case
|
||||
* someone has funny ideas */
|
||||
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
|
||||
|
||||
// hack to pass on GXS id.
|
||||
msg_item->PeerId(RsPeerId(authorId));
|
||||
handleIncomingItem(msg_item);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << " Item could not be "
|
||||
<< "deserialised. Format error??" << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool p3MsgService::notifyGxsTransSendStatus( RsGxsTransId mailId,
|
||||
GxsTransSendStatus status )
|
||||
{
|
||||
std::cout << __PRETTY_FUNCTION__ << " " << mailId << ", "
|
||||
<< static_cast<uint>(status) << std::endl;
|
||||
|
||||
if( status == GxsTransSendStatus::RECEIPT_RECEIVED )
|
||||
{
|
||||
uint32_t msg_id;
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(gxsOngoingMutex);
|
||||
|
||||
auto it = gxsOngoingMessages.find(mailId);
|
||||
if(it == gxsOngoingMessages.end())
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__<< " "
|
||||
<< mailId
|
||||
<< ", " << static_cast<uint>(status)
|
||||
<< " (EE) cannot find pending message to acknowledge!"
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
msg_id = it->second;
|
||||
}
|
||||
|
||||
// we should now remove the item from the msgOutgoing list.
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
|
||||
auto it2 = msgOutgoing.find(msg_id);
|
||||
if(it2 == msgOutgoing.end())
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << " " << mailId
|
||||
<< ", " << static_cast<uint>(status) << " (II) "
|
||||
<< "received receipt for message that is not in "
|
||||
<< "outgoing list, probably it has been acknoweldged "
|
||||
<< "before by other means." << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
delete it2->second;
|
||||
msgOutgoing.erase(it2);
|
||||
}
|
||||
|
||||
RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
|
||||
NOTIFY_TYPE_ADD );
|
||||
IndicateConfigChanged();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if( status >= GxsTransSendStatus::FAILED_RECEIPT_SIGNATURE )
|
||||
{
|
||||
uint32_t msg_id;
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(gxsOngoingMutex);
|
||||
|
||||
std::cerr << __PRETTY_FUNCTION__ << " mail delivery "
|
||||
<< "mailId: " << mailId
|
||||
<< " failed with " << static_cast<uint32_t>(status);
|
||||
|
||||
auto it = gxsOngoingMessages.find(mailId);
|
||||
if(it == gxsOngoingMessages.end())
|
||||
{
|
||||
std::cerr << " cannot find pending message to notify"
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
msg_id = it->second;
|
||||
}
|
||||
|
||||
std::cerr << " message id = " << msg_id << std::endl;
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
auto mit = msgOutgoing.find(msg_id);
|
||||
if( mit == msgOutgoing.end() )
|
||||
{
|
||||
std::cerr << " message has been notified as not delivered, "
|
||||
<< "but it not on outgoing list."
|
||||
<< std::endl;
|
||||
return true;
|
||||
}
|
||||
std::cerr << " reseting the ROUTED flag so that the message is "
|
||||
<< "requested again" << std::endl;
|
||||
|
||||
// clear the routed flag so that the message is requested again
|
||||
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void p3MsgService::receiveGRouterData( const RsGxsId &destination_key,
|
||||
const RsGxsId &signing_key,
|
||||
GRouterServiceId &/*client_id*/,
|
||||
uint8_t *data, uint32_t data_size )
|
||||
{
|
||||
std::cerr << "p3MsgService::receiveGRouterData(): received message item of"
|
||||
<< " size " << data_size << ", for key " << destination_key
|
||||
<< std::endl;
|
||||
|
||||
/* first make sure that we havn't already received the data. Since we allow
|
||||
* to re-send messages, it's necessary to check. */
|
||||
|
||||
Sha1CheckSum hash = RsDirUtil::sha1sum(data, data_size);
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(recentlyReceivedMutex);
|
||||
if( mRecentlyReceivedMessageHashes.find(hash) !=
|
||||
mRecentlyReceivedMessageHashes.end() )
|
||||
{
|
||||
std::cerr << "p3MsgService::receiveGRouterData(...) (II) receiving"
|
||||
<< "distant message of hash " << hash << " more than once"
|
||||
<< ". Probably it has arrived before by other means."
|
||||
<< std::endl;
|
||||
free(data);
|
||||
return;
|
||||
}
|
||||
mRecentlyReceivedMessageHashes[hash] = time(NULL);
|
||||
}
|
||||
|
||||
IndicateConfigChanged() ;
|
||||
|
||||
RsItem *item = _serialiser->deserialise(data,&data_size) ;
|
||||
free(data) ;
|
||||
return ;
|
||||
}
|
||||
mRecentlyReceivedDistantMessageHashes[hash] = time(NULL) ;
|
||||
IndicateConfigChanged() ;
|
||||
|
||||
RsItem *item = _serialiser->deserialise(data,&data_size) ;
|
||||
free(data) ;
|
||||
|
||||
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ;
|
||||
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ;
|
||||
|
||||
if(msg_item != NULL)
|
||||
{
|
||||
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
|
||||
if(msg_item != NULL)
|
||||
{
|
||||
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
|
||||
|
||||
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ;
|
||||
/* we expect complete msgs - remove partial flag just in case someone has funny ideas */
|
||||
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
|
||||
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ;
|
||||
/* we expect complete msgs - remove partial flag just in case someone has funny ideas */
|
||||
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
|
||||
|
||||
msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id.
|
||||
handleIncomingItem(msg_item) ;
|
||||
}
|
||||
else
|
||||
std::cerr << " Item could not be deserialised. Format error??" << std::endl;
|
||||
msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id.
|
||||
handleIncomingItem(msg_item) ;
|
||||
}
|
||||
else
|
||||
std::cerr << " Item could not be deserialised. Format error??" << std::endl;
|
||||
}
|
||||
|
||||
void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
|
||||
{
|
||||
RsGxsId destination_key_id(msgitem->PeerId()) ;
|
||||
RsGxsId signing_key_id ;
|
||||
RsGxsId destination_key_id(msgitem->PeerId());
|
||||
RsGxsId signing_key_id;
|
||||
|
||||
msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT ;// just in case, but normally we should always have this flag set, when ending up here.
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx) ;
|
||||
/* just in case, but normally we should always have this flag set, when
|
||||
* ending up here. */
|
||||
msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT;
|
||||
|
||||
std::map<uint32_t,RsGxsId>::const_iterator it = mDistantOutgoingMsgSigners.find(msgitem->msgId) ;
|
||||
|
||||
if(it == mDistantOutgoingMsgSigners.end())
|
||||
{
|
||||
std::cerr << "(EE) no signer registered for distant message " << msgitem->msgId << ". Cannot send!" << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
signing_key_id = it->second ;
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
|
||||
if(signing_key_id.isNull())
|
||||
{
|
||||
std::cerr << "ERROR: cannot find signing key id for msg id " << msgitem->msgId << std::endl;
|
||||
std::cerr << " available keys are:" << std::endl;
|
||||
for(std::map<uint32_t,RsGxsId>::const_iterator it(mDistantOutgoingMsgSigners.begin());it!=mDistantOutgoingMsgSigners.end();++it)
|
||||
std::cerr << " " << it->first << " " << it->second << std::endl;
|
||||
return ;
|
||||
}
|
||||
}
|
||||
std::map<uint32_t,RsGxsId>::const_iterator it =
|
||||
mDistantOutgoingMsgSigners.find(msgitem->msgId);
|
||||
|
||||
if(it == mDistantOutgoingMsgSigners.end())
|
||||
{
|
||||
std::cerr << "(EE) no signer registered for distant message "
|
||||
<< msgitem->msgId << ". Cannot send!" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
signing_key_id = it->second;
|
||||
|
||||
if(signing_key_id.isNull())
|
||||
{
|
||||
std::cerr << "ERROR: cannot find signing key id for msg id "
|
||||
<< msgitem->msgId << " available keys are:" << std::endl;
|
||||
typedef std::map<uint32_t,RsGxsId>::const_iterator itT;
|
||||
for( itT it = mDistantOutgoingMsgSigners.begin();
|
||||
it != mDistantOutgoingMsgSigners.end(); ++it )
|
||||
std::cerr << "\t" << it->first << " " << it->second
|
||||
<< std::endl;
|
||||
return;
|
||||
}
|
||||
}
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item" << std::endl;
|
||||
std::cerr << " msg ID : " << msgitem->msgId << std::endl;
|
||||
std::cerr << " to peer : " << destination_key_id << std::endl;
|
||||
std::cerr << " signing : " << signing_key_id << std::endl;
|
||||
std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item"
|
||||
<< " msg ID: " << msgitem->msgId << " to peer:"
|
||||
<< destination_key_id << " signing: " << signing_key_id
|
||||
<< std::endl;
|
||||
#endif
|
||||
|
||||
// The item is serialized and turned into a generic turtle item. Use use the explicit serialiser to make sure that the msgId is not included
|
||||
/* The item is serialized and turned into a generic turtle item. Use use the
|
||||
* explicit serialiser to make sure that the msgId is not included */
|
||||
|
||||
uint32_t msg_serialized_rssize = RsMsgSerialiser(RsServiceSerializer::SERIALIZATION_FLAG_NONE).size(msgitem) ;
|
||||
RsTemporaryMemory msg_serialized_data(msg_serialized_rssize) ;
|
||||
|
@ -2034,19 +2251,32 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
|
|||
return ;
|
||||
}
|
||||
#ifdef DEBUG_DISTANT_MSG
|
||||
std::cerr << " serialised size : " << msg_serialized_rssize << std::endl;
|
||||
std::cerr << " serialised size : " << msg_serialized_rssize << std::endl;
|
||||
#endif
|
||||
|
||||
GRouterMsgPropagationId grouter_message_id ;
|
||||
mGRouter->sendData(destination_key_id,GROUTER_CLIENT_ID_MESSAGES,msg_serialized_data,msg_serialized_rssize,signing_key_id,grouter_message_id) ;
|
||||
GRouterMsgPropagationId grouter_message_id;
|
||||
mGRouter->sendData( destination_key_id, GROUTER_CLIENT_ID_MESSAGES,
|
||||
msg_serialized_data, msg_serialized_rssize,
|
||||
signing_key_id, grouter_message_id );
|
||||
RsGxsTransId gxsMailId;
|
||||
mGxsTransServ.sendData( gxsMailId, GxsTransSubServices::P3_MSG_SERVICE,
|
||||
signing_key_id, destination_key_id,
|
||||
msg_serialized_data, msg_serialized_rssize );
|
||||
|
||||
// now store the grouter id along with the message id, so that we can keep track of received messages
|
||||
/* now store the grouter id along with the message id, so that we can keep
|
||||
* track of received messages */
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx) ;
|
||||
_ongoing_messages[grouter_message_id] = msgitem->msgId ;
|
||||
}
|
||||
IndicateConfigChanged(); // save _ongoing_messages
|
||||
{
|
||||
RS_STACK_MUTEX(mMsgMtx);
|
||||
_ongoing_messages[grouter_message_id] = msgitem->msgId;
|
||||
}
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(gxsOngoingMutex);
|
||||
gxsOngoingMessages[gxsMailId] = msgitem->msgId;
|
||||
}
|
||||
|
||||
IndicateConfigChanged(); // save _ongoing_messages
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -49,16 +49,19 @@
|
|||
#include "grouter/grouterclientservice.h"
|
||||
#include "turtle/p3turtle.h"
|
||||
#include "turtle/turtleclientservice.h"
|
||||
#include "gxstrans/p3gxstrans.h"
|
||||
|
||||
class p3LinkMgr;
|
||||
class p3IdService;
|
||||
|
||||
// Temp tweak to test grouter
|
||||
class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor, public GRouterClientService
|
||||
struct p3MsgService :
|
||||
p3Service, p3Config, pqiServiceMonitor, GRouterClientService,
|
||||
GxsTransClient
|
||||
{
|
||||
public:
|
||||
p3MsgService(p3ServiceControl *sc, p3IdService *id_service);
|
||||
virtual RsServiceInfo getServiceInfo();
|
||||
p3MsgService(p3ServiceControl *sc, p3IdService *id_service, p3GxsTrans& gxsMS);
|
||||
|
||||
virtual RsServiceInfo getServiceInfo();
|
||||
|
||||
/* External Interface */
|
||||
bool getMessageSummaries(std::list<Rs::Msgs::MsgInfoSummary> &msgList);
|
||||
|
@ -106,7 +109,9 @@ public:
|
|||
|
||||
/*** Overloaded from pqiMonitor ***/
|
||||
virtual void statusChange(const std::list<pqiServicePeer> &plist);
|
||||
int checkOutgoingMessages();
|
||||
|
||||
/// iterate through the outgoing queue if online, send
|
||||
int checkOutgoingMessages();
|
||||
/*** Overloaded from pqiMonitor ***/
|
||||
|
||||
/*** overloaded from p3turtle ***/
|
||||
|
@ -130,16 +135,27 @@ public:
|
|||
void setDistantMessagingPermissionFlags(uint32_t flags) ;
|
||||
uint32_t getDistantMessagingPermissionFlags() ;
|
||||
|
||||
private:
|
||||
void sendDistantMsgItem(RsMsgItem *msgitem) ;
|
||||
/// @see GxsTransClient::receiveGxsTransMail(...)
|
||||
virtual bool receiveGxsTransMail( const RsGxsId& authorId,
|
||||
const RsGxsId& recipientId,
|
||||
const uint8_t* data, uint32_t dataSize );
|
||||
|
||||
// This contains the ongoing tunnel handling contacts.
|
||||
// The map is indexed by the hash
|
||||
//
|
||||
std::map<GRouterMsgPropagationId,uint32_t> _ongoing_messages ;
|
||||
/// @see GxsTransClient::notifyGxsTransSendStatus(...)
|
||||
virtual bool notifyGxsTransSendStatus( RsGxsTransId mailId,
|
||||
GxsTransSendStatus status );
|
||||
|
||||
private:
|
||||
void sendDistantMsgItem(RsMsgItem *msgitem);
|
||||
|
||||
/** This contains the ongoing tunnel handling contacts.
|
||||
* The map is indexed by the hash */
|
||||
std::map<GRouterMsgPropagationId, uint32_t> _ongoing_messages;
|
||||
|
||||
/// Contains ongoing messages handed to gxs mail
|
||||
std::map<RsGxsTransId, uint32_t> gxsOngoingMessages;
|
||||
RsMutex gxsOngoingMutex;
|
||||
|
||||
// Overloaded from GRouterClientService
|
||||
|
||||
virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ;
|
||||
virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ;
|
||||
virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,const RsGxsId& signer_id,uint32_t data_status) ;
|
||||
|
@ -194,8 +210,9 @@ private:
|
|||
std::map<uint32_t, RsMsgTagType*> mTags;
|
||||
std::map<uint32_t, RsMsgTags*> mMsgTags;
|
||||
|
||||
uint32_t mMsgUniqueId;
|
||||
std::map<Sha1CheckSum,uint32_t> mRecentlyReceivedDistantMessageHashes;
|
||||
uint32_t mMsgUniqueId;
|
||||
std::map<Sha1CheckSum, uint32_t> mRecentlyReceivedMessageHashes;
|
||||
RsMutex recentlyReceivedMutex;
|
||||
|
||||
// used delete msgSrcIds after config save
|
||||
std::map<uint32_t, RsMsgSrcId*> mSrcIds;
|
||||
|
@ -211,6 +228,8 @@ private:
|
|||
bool mDistantMessagingEnabled ;
|
||||
uint32_t mDistantMessagePermissions ;
|
||||
bool mShouldEnableDistantMessaging ;
|
||||
|
||||
p3GxsTrans& mGxsTransServ;
|
||||
};
|
||||
|
||||
#endif // MESSAGE_SERVICE_HEADER
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue