p3MsgService uses p3GxsMails as backend too

This commit is contained in:
Gioacchino Mazzurco 2017-02-21 23:08:02 +01:00
parent 1376b9f031
commit b9091c4ad8
4 changed files with 485 additions and 275 deletions

View File

@ -1513,7 +1513,7 @@ int RsServer::StartupRetroShare()
p3ServiceInfo *serviceInfo = new p3ServiceInfo(serviceCtrl); p3ServiceInfo *serviceInfo = new p3ServiceInfo(serviceCtrl);
mDisc = new p3discovery2(mPeerMgr, mLinkMgr, mNetMgr, serviceCtrl); mDisc = new p3discovery2(mPeerMgr, mLinkMgr, mNetMgr, serviceCtrl);
mHeart = new p3heartbeat(serviceCtrl, pqih); mHeart = new p3heartbeat(serviceCtrl, pqih);
msgSrv = new p3MsgService(serviceCtrl,mGxsIdService); msgSrv = new p3MsgService( serviceCtrl, mGxsIdService, *mGxsMails );
chatSrv = new p3ChatService(serviceCtrl,mGxsIdService, mLinkMgr, mHistoryMgr); chatSrv = new p3ChatService(serviceCtrl,mGxsIdService, mLinkMgr, mHistoryMgr);
mStatusSrv = new p3StatusService(serviceCtrl); mStatusSrv = new p3StatusService(serviceCtrl);

View File

@ -47,7 +47,7 @@ struct p3GxsMails;
struct GxsMailsClient struct GxsMailsClient
{ {
/// Subservices identifiers (like port for TCP) /// Subservices identifiers (like port for TCP)
enum GxsMailSubServices { TEST_SERVICE = 1 }; enum GxsMailSubServices : uint16_t { TEST_SERVICE = 1, P3_MSG_SERVICE = 2 };
/** /**
* This will be called by p3GxsMails to dispatch mails to the subservice * This will be called by p3GxsMails to dispatch mails to the subservice

View File

@ -83,23 +83,29 @@ static const uint32_t RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME = 2*30*86400 ; // ke
* (3) from storage... * (3) from storage...
*/ */
p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv) p3MsgService::p3MsgService( p3ServiceControl *sc, p3IdService *id_serv,
:p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0) p3GxsMails& gxsMS )
: p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc),
mMsgMtx("p3MsgService"), mMsgUniqueId(0), gxsMailService(gxsMS),
gxsOngoingMutex("p3MsgService Gxs Outgoing Mutex"),
recentlyReceivedMutex("p3MsgService recently received hash mutex")
{ {
_serialiser = new RsMsgSerialiser(); // this serialiser is used for services. It's not the same than the one returned by setupSerialiser(). We need both!! /* this serialiser is used for services. It's not the same than the one
* returned by setupSerialiser(). We need both!! */
_serialiser = new RsMsgSerialiser();
addSerialType(_serialiser); addSerialType(_serialiser);
mMsgUniqueId = 1 ; // MsgIds are not transmitted, but only used locally as a storage index. As such, thay do not need to be different /* MsgIds are not transmitted, but only used locally as a storage index.
// at friends nodes. * As such, thay do not need to be different at friends nodes. */
mMsgUniqueId = 1;
mShouldEnableDistantMessaging = true ;
mDistantMessagingEnabled = false ;
mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ;
/* Initialize standard tag types */ mShouldEnableDistantMessaging = true;
if(sc) mDistantMessagingEnabled = false;
initStandardTagTypes(); mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE;
if(sc) initStandardTagTypes(); // Initialize standard tag types
gxsMailService.registerGxsMailsClient(GxsMailsClient::P3_MSG_SERVICE, this);
} }
const std::string MSG_APP_NAME = "msg"; const std::string MSG_APP_NAME = "msg";
@ -141,11 +147,11 @@ int p3MsgService::tick()
if(now > last_management_time + 5) if(now > last_management_time + 5)
{ {
manageDistantPeers() ; manageDistantPeers();
checkOutgoingMessages(); checkOutgoingMessages();
cleanListOfReceivedMessageHashes(); cleanListOfReceivedMessageHashes();
last_management_time = now ; last_management_time = now;
} }
return 0; return 0;
@ -153,21 +159,21 @@ int p3MsgService::tick()
void p3MsgService::cleanListOfReceivedMessageHashes() void p3MsgService::cleanListOfReceivedMessageHashes()
{ {
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(recentlyReceivedMutex);
time_t now = time(NULL) ; time_t now = time(NULL);
for(std::map<Sha1CheckSum,uint32_t>::iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();) for( auto it = mRecentlyReceivedMessageHashes.begin();
if(now > RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME + it->second) it != mRecentlyReceivedMessageHashes.end(); )
{ if( now > RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME + it->second )
std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(). Removing old hash " << it->first << ", aged " << now - it->second << " secs ago" << std::endl; {
std::map<Sha1CheckSum,uint32_t>::iterator tmp(it) ; std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(). "
++tmp ; << "Removing old hash " << it->first << ", aged "
mRecentlyReceivedDistantMessageHashes.erase(it) ; << now - it->second << " secs ago" << std::endl;
it=tmp ;
} it = mRecentlyReceivedMessageHashes.erase(it);
else }
++it ; else ++it;
} }
int p3MsgService::status() int p3MsgService::status()
@ -348,107 +354,102 @@ void p3MsgService::checkSizeAndSendMessage(RsMsgItem *msg)
sendItem(msg) ; sendItem(msg) ;
} }
int p3MsgService::checkOutgoingMessages() int p3MsgService::checkOutgoingMessages()
{ {
/* iterate through the outgoing queue bool changed = false;
* std::list<RsMsgItem*> output_queue;
* if online, send
*/
bool changed = false ; {
std::list<RsMsgItem*> output_queue ; RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
{ const RsPeerId& ownId = mServiceCtrl->getOwnId();
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
const RsPeerId& ownId = mServiceCtrl->getOwnId();
std::list<uint32_t>::iterator it; std::list<uint32_t>::iterator it;
std::list<uint32_t> toErase; std::list<uint32_t> toErase;
std::map<uint32_t, RsMsgItem *>::iterator mit; std::map<uint32_t, RsMsgItem *>::iterator mit;
for(mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit) for( mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit )
{ {
if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH) if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH) continue;
continue;
/* find the certificate */ /* find the certificate */
RsPeerId pid = mit->second->PeerId(); RsPeerId pid = mit->second->PeerId();
bool should_send = false ; bool should_send = false;
if( pid == ownId) if( pid == ownId) should_send = true;
should_send = true ;
if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */ // FEEDBACK Msg to Ourselves
should_send = true ; if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType,
pid) )
should_send = true;
if((mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && !(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED)) if( (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) &&
should_send = true ; !(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED))
should_send = true;
if(should_send) if(should_send)
{ {
/* send msg */ /* send msg */
pqioutput(PQL_DEBUG_BASIC, msgservicezone, pqioutput( PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::checkOutGoingMessages() Sending out message"); "p3MsgService::checkOutGoingMessages() Sending out message");
/* remove the pending flag */ /* remove the pending flag */
output_queue.push_back(mit->second) ; output_queue.push_back(mit->second) ;
// When the message is a distant msg, dont remove it yet from the list. Only mark it as being sent, so that we don't send it again. /* When the message is a distant msg, dont remove it yet from
// * the list. Only mark it as being sent, so that we don't send
if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT)) * it again. */
{ if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT))
(mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING; {
toErase.push_back(mit->first); (mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING;
changed = true ; toErase.push_back(mit->first);
} changed = true;
else }
{ else
{
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << "Message id " << mit->first << " is distant: kept in outgoing, and marked as ROUTED" << std::endl; std::cerr << "Message id " << mit->first << " is distant: "
<< "kept in outgoing, and marked as ROUTED"
<< std::endl;
#endif #endif
mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED ; mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED;
} }
} }
else else
{ {
pqioutput(PQL_DEBUG_BASIC, msgservicezone, pqioutput( PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::checkOutGoingMessages() Delaying until available..."); "p3MsgService::checkOutGoingMessages() Delaying until available...");
} }
} }
/* clean up */ /* clean up */
for(it = toErase.begin(); it != toErase.end(); ++it) for(it = toErase.begin(); it != toErase.end(); ++it)
{ {
mit = msgOutgoing.find(*it); mit = msgOutgoing.find(*it);
if (mit != msgOutgoing.end()) if ( mit != msgOutgoing.end() ) msgOutgoing.erase(mit);
{
msgOutgoing.erase(mit);
}
std::map<uint32_t, RsMsgSrcId*>::iterator srcIt = mSrcIds.find(*it); std::map<uint32_t, RsMsgSrcId*>::iterator srcIt = mSrcIds.find(*it);
if (srcIt != mSrcIds.end()) { if (srcIt != mSrcIds.end())
delete (srcIt->second); {
mSrcIds.erase(srcIt); delete (srcIt->second);
} mSrcIds.erase(srcIt);
} }
}
if (toErase.size() > 0) if (toErase.size() > 0) IndicateConfigChanged();
{ }
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
}
}
for(std::list<RsMsgItem*>::const_iterator it(output_queue.begin());it!=output_queue.end();++it) for( std::list<RsMsgItem*>::const_iterator it(output_queue.begin());
if((*it)->msgFlags & RS_MSG_FLAGS_DISTANT) // don't split distant messages. The global router takes care of it. it != output_queue.end(); ++it )
sendDistantMsgItem(*it) ; if( (*it)->msgFlags & RS_MSG_FLAGS_DISTANT ) // don't split distant messages. The global router takes care of it.
else sendDistantMsgItem(*it);
checkSizeAndSendMessage(*it) ; else
checkSizeAndSendMessage(*it);
if(changed) if(changed)
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD); RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD);
return 0; return 0;
} }
bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList) bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
@ -489,10 +490,13 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
grmap->ongoing_msgs = _ongoing_messages ; grmap->ongoing_msgs = _ongoing_messages ;
itemList.push_back(grmap) ; itemList.push_back(grmap) ;
RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap ; RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap;
ghm->hash_map = mRecentlyReceivedDistantMessageHashes ; {
itemList.push_back(ghm) ; RS_STACK_MUTEX(recentlyReceivedMutex);
ghm->hash_map = mRecentlyReceivedMessageHashes;
}
itemList.push_back(ghm);
RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ; RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ;
RsTlvKeyValue kv; RsTlvKeyValue kv;
@ -601,18 +605,21 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
// merge. // merge.
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it) for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it)
_ongoing_messages.insert(*it) ; _ongoing_messages.insert(*it) ;
} }
else if(NULL != (ghm = dynamic_cast<RsMsgDistantMessagesHashMap*>(*it))) else if(NULL != (ghm = dynamic_cast<RsMsgDistantMessagesHashMap*>(*it)))
{ {
mRecentlyReceivedDistantMessageHashes = ghm->hash_map ; {
RS_STACK_MUTEX(recentlyReceivedMutex);
mRecentlyReceivedMessageHashes = ghm->hash_map;
}
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << " loaded recently received message map: " << std::endl; std::cerr << " loaded recently received message map: " << std::endl;
for(std::map<Sha1CheckSum,uint32_t>::const_iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();++it) for(std::map<Sha1CheckSum,uint32_t>::const_iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();++it)
std::cerr << " " << it->first << " received " << time(NULL)-it->second << " secs ago." << std::endl; std::cerr << " " << it->first << " received " << time(NULL)-it->second << " secs ago." << std::endl;
#endif #endif
} }
else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it))) else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it)))
{ {
// delete standard tags as they are now save in config // delete standard tags as they are now save in config
@ -1107,16 +1114,17 @@ uint32_t p3MsgService::sendMessage(RsMsgItem *item) // no from field because
return item->msgId; return item->msgId;
} }
uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& from)
{
if(!item)
return 0 ;
item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */ uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item, const RsGxsId& from)
item->msgFlags |= (RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_OUTGOING | RS_MSG_FLAGS_PENDING); /* add pending flag */ {
if(!item) return 0;
item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */
item->msgFlags |= ( RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_OUTGOING |
RS_MSG_FLAGS_PENDING ); /* add pending flag */
{ {
RS_STACK_MUTEX(mMsgMtx) ; RS_STACK_MUTEX(mMsgMtx);
/* STORE MsgID */ /* STORE MsgID */
msgOutgoing[item->msgId] = item; msgOutgoing[item->msgId] = item;
@ -1128,17 +1136,16 @@ uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& fro
RsMsgSrcId* msi = new RsMsgSrcId(); RsMsgSrcId* msi = new RsMsgSrcId();
msi->msgId = item->msgId; msi->msgId = item->msgId;
msi->srcId = RsPeerId(from) ; msi->srcId = RsPeerId(from);
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi)); mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi));
} }
} }
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/ IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST, NOTIFY_TYPE_ADD); RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
NOTIFY_TYPE_ADD );
return item->msgId; return item->msgId;
} }
bool p3MsgService::MessageSend(MessageInfo &info) bool p3MsgService::MessageSend(MessageInfo &info)
@ -1853,73 +1860,89 @@ void p3MsgService::manageDistantPeers()
} }
} }
void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id, const RsGxsId &signer_id, uint32_t data_status) void p3MsgService::notifyDataStatus( const GRouterMsgPropagationId& id,
const RsGxsId &signer_id,
uint32_t data_status )
{ {
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
{ {
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mMsgMtx);
std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells us that item ID " << id << " could not be delivered on time." ; std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells "
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ; << "us that item ID " << id
<< " could not be delivered on time.";
if(it == _ongoing_messages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
return ;
}
uint32_t msg_id = it->second ;
std::cerr << " message id = " << msg_id << std::endl;
mDistantOutgoingMsgSigners[msg_id] = signer_id ; // this is needed because it's not saved in config, but we should probably include it in _ongoing_messages
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id) ; auto it = _ongoing_messages.find(id);
if(it == _ongoing_messages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. "
<< "Weird. grouter id = " << id << std::endl;
return;
}
if(mit == msgOutgoing.end()) uint32_t msg_id = it->second;
{ std::cerr << " message id = " << msg_id << std::endl;
std::cerr << " (EE) message has been notified as not delivered, but it not on outgoing list. Something's wrong!!" << std::endl;
return ;
}
std::cerr << " reseting the ROUTED flag so that the message is requested again" << std::endl;
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED ; // clear the routed flag so that the message is requested again /* this is needed because it's not saved in config, but we should
return ; * probably include it in _ongoing_messages */
} mDistantOutgoingMsgSigners[msg_id] = signer_id;
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id);
{
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ if(mit == msgOutgoing.end())
{
std::cerr << " (EE) message has been notified as not delivered, "
<< "but it not on outgoing list. Something's wrong!!"
<< std::endl;
return;
}
std::cerr << " reseting the ROUTED flag so that the message is "
<< "requested again" << std::endl;
// clear the routed flag so that the message is requested again
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED;
return;
}
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED)
{
RS_STACK_MUTEX(mMsgMtx);
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl; std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
#endif #endif
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ; auto it = _ongoing_messages.find(id);
if(it == _ongoing_messages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. "
<< "Weird. grouter id = " << id << std::endl;
return;
}
if(it == _ongoing_messages.end()) uint32_t msg_id = it->second ;
{
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
return ;
}
uint32_t msg_id = it->second ; // we should now remove the item from the msgOutgoing list.
// we should now remove the item from the msgOutgoing list. std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id);
if(it2 == msgOutgoing.end())
{
std::cerr << "(EE) message has been ACKed, but is not in outgoing "
<< "list. Something's wrong!!" << std::endl;
return;
}
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id) ; delete it2->second;
msgOutgoing.erase(it2);
if(it2 == msgOutgoing.end()) RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
{ NOTIFY_TYPE_ADD );
std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl; IndicateConfigChanged();
return ;
}
delete it2->second ; return;
msgOutgoing.erase(it2) ; }
std::cerr << "p3MsgService: unhandled data status info from global router"
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD); << " for msg ID " << id << ": this is a bug." << std::endl;
IndicateConfigChanged() ;
return ;
}
std::cerr << "p3MsgService: unhandled data status info from global router for msg ID " << id << ": this is a bug." << std::endl;
} }
bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id) bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id)
{ {
if(mDistantMessagePermissions & RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NON_CONTACTS) if(mDistantMessagePermissions & RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NON_CONTACTS)
@ -1945,104 +1968,274 @@ uint32_t p3MsgService::getDistantMessagingPermissionFlags()
{ {
return mDistantMessagePermissions ; return mDistantMessagePermissions ;
} }
void p3MsgService::receiveGRouterData(const RsGxsId &destination_key, const RsGxsId &signing_key, GRouterServiceId &/*client_id*/, uint8_t *data, uint32_t data_size) bool p3MsgService::receiveGxsMail( const RsGxsMailItem& originalMessage,
const uint8_t* data, uint32_t dataSize )
{ {
std::cerr << "p3MsgService::receiveGRouterData(): received message item of size " << data_size << ", for key " << destination_key << std::endl; Sha1CheckSum hash = RsDirUtil::sha1sum(data, dataSize);
// first make sure that we havn't already received the data. Since we allow to re-send messages, it's necessary to check. {
RS_STACK_MUTEX(recentlyReceivedMutex);
Sha1CheckSum hash = RsDirUtil::sha1sum(data,data_size) ; if( mRecentlyReceivedMessageHashes.find(hash) !=
mRecentlyReceivedMessageHashes.end() )
if(mRecentlyReceivedDistantMessageHashes.find(hash) != mRecentlyReceivedDistantMessageHashes.end()) {
{ std::cerr << "p3MsgService::receiveGxsMail(...) (WW) receiving "
std::cerr << "(WW) receiving distant message of hash " << hash << " more than once. This is not a bug, unless it happens very often." << std::endl; << "message of hash " << hash << " more than once. This "
<< "is not a bug, unless it happens very often."
<< std::endl;
return true;
}
mRecentlyReceivedMessageHashes[hash] = time(NULL);
}
IndicateConfigChanged();
RsItem *item = _serialiser->deserialise(const_cast<uint8_t*>(data), &dataSize);
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item);
if(msg_item)
{
std::cerr << "p3MsgService::receiveGxsMail(...) Encrypted item "
<< "correctly deserialised. Passing on to incoming list."
<< std::endl;
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT;
/* we expect complete msgs - remove partial flag just in case
* someone has funny ideas */
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
// hack to pass on GXS id.
msg_item->PeerId(RsPeerId(originalMessage.meta.mAuthorId));
handleIncomingItem(msg_item);
}
else
std::cerr << "p3MsgService::receiveGxsMail(...) Item could not be "
<< "deserialised. Format error??" << std::endl;
}
bool p3MsgService::notifySendMailStatus( const RsGxsMailItem& originalMessage,
GxsMailStatus status )
{
if( status >= GxsMailStatus::FAILED_RECEIPT_SIGNATURE )
{
uint32_t msg_id;
{
RS_STACK_MUTEX(gxsOngoingMutex);
std::cerr << "p3MsgService::notifySendMailStatus(...) mail delivery"
<< "mailId: " << originalMessage.mailId
<< " failed with " << static_cast<uint>(status);
auto it = gxsOngoingMessages.find(originalMessage.mailId);
if(it == gxsOngoingMessages.end())
{
std::cerr << " cannot find pending message to notify"
<< std::endl;
return false;
}
msg_id = it->second;
}
std::cerr << " message id = " << msg_id << std::endl;
{
RS_STACK_MUTEX(mMsgMtx);
auto mit = msgOutgoing.find(msg_id);
if( mit == msgOutgoing.end() )
{
std::cerr << " message has been notified as not delivered, "
<< "but it not on outgoing list."
<< std::endl;
return true;
}
std::cerr << " reseting the ROUTED flag so that the message is "
<< "requested again" << std::endl;
// clear the routed flag so that the message is requested again
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED;
return true;
}
}
if( status == GxsMailStatus::RECEIPT_RECEIVED )
{
uint32_t msg_id;
{
RS_STACK_MUTEX(gxsOngoingMutex);
auto it = gxsOngoingMessages.find(originalMessage.mailId);
if(it == gxsOngoingMessages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. "
<< "Weird.mailId = " << originalMessage.mailId
<< std::endl;
return false;
}
msg_id = it->second;
}
// we should now remove the item from the msgOutgoing list.
{
RS_STACK_MUTEX(mMsgMtx);
auto it2 = msgOutgoing.find(msg_id);
if(it2 == msgOutgoing.end())
{
std::cerr << "(EE) message has been ACKed, but is not in "
<< "outgoing list. Something's wrong!!" << std::endl;
return true;
}
delete it2->second;
msgOutgoing.erase(it2);
}
RsServer::notify()->notifyListChange( NOTIFY_LIST_MESSAGELIST,
NOTIFY_TYPE_ADD );
IndicateConfigChanged();
return true;
}
}
void p3MsgService::receiveGRouterData( const RsGxsId &destination_key,
const RsGxsId &signing_key,
GRouterServiceId &/*client_id*/,
uint8_t *data, uint32_t data_size )
{
std::cerr << "p3MsgService::receiveGRouterData(): received message item of"
<< " size " << data_size << ", for key " << destination_key
<< std::endl;
/* first make sure that we havn't already received the data. Since we allow
* to re-send messages, it's necessary to check. */
Sha1CheckSum hash = RsDirUtil::sha1sum(data, data_size);
{
RS_STACK_MUTEX(recentlyReceivedMutex);
if( mRecentlyReceivedMessageHashes.find(hash) !=
mRecentlyReceivedMessageHashes.end() )
{
std::cerr << "p3MsgService::receiveGRouterData(...) (WW) receiving"
<< "distant message of hash " << hash << " more than once"
<< ". This is not a bug, unless it happens very often."
<< std::endl;
free(data);
return;
}
mRecentlyReceivedMessageHashes[hash] = time(NULL);
}
IndicateConfigChanged() ;
RsItem *item = _serialiser->deserialise(data,&data_size) ;
free(data) ; free(data) ;
return ;
}
mRecentlyReceivedDistantMessageHashes[hash] = time(NULL) ;
IndicateConfigChanged() ;
RsItem *item = _serialiser->deserialise(data,&data_size) ;
free(data) ;
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ; RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ;
if(msg_item != NULL) if(msg_item != NULL)
{ {
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl; std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ; msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ;
/* we expect complete msgs - remove partial flag just in case someone has funny ideas */ /* we expect complete msgs - remove partial flag just in case someone has funny ideas */
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL; msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id. msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id.
handleIncomingItem(msg_item) ; handleIncomingItem(msg_item) ;
} }
else else
std::cerr << " Item could not be deserialised. Format error??" << std::endl; std::cerr << " Item could not be deserialised. Format error??" << std::endl;
} }
void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem) void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
{ {
RsGxsId destination_key_id(msgitem->PeerId()) ; RsGxsId destination_key_id(msgitem->PeerId());
RsGxsId signing_key_id ; RsGxsId signing_key_id;
msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT ;// just in case, but normally we should always have this flag set, when ending up here. /* just in case, but normally we should always have this flag set, when
* ending up here. */
{ msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT;
RS_STACK_MUTEX(mMsgMtx) ;
std::map<uint32_t,RsGxsId>::const_iterator it = mDistantOutgoingMsgSigners.find(msgitem->msgId) ; {
RS_STACK_MUTEX(mMsgMtx);
if(it == mDistantOutgoingMsgSigners.end())
{
std::cerr << "(EE) no signer registered for distant message " << msgitem->msgId << ". Cannot send!" << std::endl;
return ;
}
signing_key_id = it->second ;
if(signing_key_id.isNull()) std::map<uint32_t,RsGxsId>::const_iterator it =
{ mDistantOutgoingMsgSigners.find(msgitem->msgId);
std::cerr << "ERROR: cannot find signing key id for msg id " << msgitem->msgId << std::endl;
std::cerr << " available keys are:" << std::endl; if(it == mDistantOutgoingMsgSigners.end())
for(std::map<uint32_t,RsGxsId>::const_iterator it(mDistantOutgoingMsgSigners.begin());it!=mDistantOutgoingMsgSigners.end();++it) {
std::cerr << " " << it->first << " " << it->second << std::endl; std::cerr << "(EE) no signer registered for distant message "
return ; << msgitem->msgId << ". Cannot send!" << std::endl;
} return;
} }
signing_key_id = it->second;
if(signing_key_id.isNull())
{
std::cerr << "ERROR: cannot find signing key id for msg id "
<< msgitem->msgId << " available keys are:" << std::endl;
typedef std::map<uint32_t,RsGxsId>::const_iterator itT;
for( itT it = mDistantOutgoingMsgSigners.begin();
it != mDistantOutgoingMsgSigners.end(); ++it )
std::cerr << "\t" << it->first << " " << it->second
<< std::endl;
return;
}
}
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item" << std::endl; std::cerr << "p3MsgService::sendDistanteMsgItem(): sending distant msg item"
std::cerr << " msg ID : " << msgitem->msgId << std::endl; << " msg ID: " << msgitem->msgId << " to peer:"
std::cerr << " to peer : " << destination_key_id << std::endl; << destination_key_id << " signing: " << signing_key_id
std::cerr << " signing : " << signing_key_id << std::endl; << std::endl;
#endif #endif
// The item is serialized and turned into a generic turtle item. Use use the explicit serialiser to make sure that the msgId is not included /* The item is serialized and turned into a generic turtle item. Use use the
* explicit serialiser to make sure that the msgId is not included */
uint32_t msg_serialized_rssize = msgitem->serial_size(false) ; uint32_t msg_serialized_rssize = msgitem->serial_size(false);
RsTemporaryMemory msg_serialized_data(msg_serialized_rssize) ; RsTemporaryMemory msg_serialized_data(msg_serialized_rssize);
if(!msgitem->serialise(msg_serialized_data,msg_serialized_rssize,false)) if(!msgitem->serialise(msg_serialized_data,msg_serialized_rssize,false))
{ {
std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error." << std::endl; std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error."
return ; << std::endl;
} return;
}
#ifdef DEBUG_DISTANT_MSG #ifdef DEBUG_DISTANT_MSG
std::cerr << " serialised size : " << msg_serialized_rssize << std::endl; std::cerr << " serialised size : " << msg_serialized_rssize << std::endl;
#endif #endif
GRouterMsgPropagationId grouter_message_id ; GRouterMsgPropagationId grouter_message_id;
mGRouter->sendData(destination_key_id,GROUTER_CLIENT_ID_MESSAGES,msg_serialized_data,msg_serialized_rssize,signing_key_id,grouter_message_id) ; mGRouter->sendData( destination_key_id, GROUTER_CLIENT_ID_MESSAGES,
msg_serialized_data, msg_serialized_rssize,
signing_key_id, grouter_message_id );
RsGxsMailId gxsMailId;
gxsMailService.sendMail( gxsMailId, P3_MSG_SERVICE, signing_key_id,
destination_key_id, msg_serialized_data,
msg_serialized_rssize );
// now store the grouter id along with the message id, so that we can keep track of received messages /* now store the grouter id along with the message id, so that we can keep
* track of received messages */
{ {
RS_STACK_MUTEX(mMsgMtx) ; RS_STACK_MUTEX(mMsgMtx);
_ongoing_messages[grouter_message_id] = msgitem->msgId ; _ongoing_messages[grouter_message_id] = msgitem->msgId;
} }
IndicateConfigChanged(); // save _ongoing_messages
{
RS_STACK_MUTEX(gxsOngoingMutex);
gxsOngoingMessages[gxsMailId] = msgitem->msgId;
}
IndicateConfigChanged(); // save _ongoing_messages
} }

View File

@ -49,16 +49,19 @@
#include "grouter/grouterclientservice.h" #include "grouter/grouterclientservice.h"
#include "turtle/p3turtle.h" #include "turtle/p3turtle.h"
#include "turtle/turtleclientservice.h" #include "turtle/turtleclientservice.h"
#include "services/p3gxsmails.h"
class p3LinkMgr; class p3LinkMgr;
class p3IdService; class p3IdService;
// Temp tweak to test grouter // Temp tweak to test grouter
class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor, public GRouterClientService struct p3MsgService :
p3Service, p3Config, pqiServiceMonitor, GRouterClientService,
GxsMailsClient
{ {
public: p3MsgService(p3ServiceControl *sc, p3IdService *id_service, p3GxsMails& gxsMS);
p3MsgService(p3ServiceControl *sc, p3IdService *id_service);
virtual RsServiceInfo getServiceInfo(); virtual RsServiceInfo getServiceInfo();
/* External Interface */ /* External Interface */
bool getMessageSummaries(std::list<Rs::Msgs::MsgInfoSummary> &msgList); bool getMessageSummaries(std::list<Rs::Msgs::MsgInfoSummary> &msgList);
@ -106,7 +109,9 @@ public:
/*** Overloaded from pqiMonitor ***/ /*** Overloaded from pqiMonitor ***/
virtual void statusChange(const std::list<pqiServicePeer> &plist); virtual void statusChange(const std::list<pqiServicePeer> &plist);
int checkOutgoingMessages();
/// iterate through the outgoing queue if online, send
int checkOutgoingMessages();
/*** Overloaded from pqiMonitor ***/ /*** Overloaded from pqiMonitor ***/
/*** overloaded from p3turtle ***/ /*** overloaded from p3turtle ***/
@ -130,16 +135,25 @@ public:
void setDistantMessagingPermissionFlags(uint32_t flags) ; void setDistantMessagingPermissionFlags(uint32_t flags) ;
uint32_t getDistantMessagingPermissionFlags() ; uint32_t getDistantMessagingPermissionFlags() ;
private: /// @see GxsMailsClient::receiveGxsMail(...)
void sendDistantMsgItem(RsMsgItem *msgitem) ; virtual bool receiveGxsMail( const RsGxsMailItem& originalMessage,
const uint8_t* data, uint32_t dataSize );
// This contains the ongoing tunnel handling contacts. /// @see GxsMailsClient::notifySendMailStatus(...)
// The map is indexed by the hash virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage,
// GxsMailStatus status );
std::map<GRouterMsgPropagationId,uint32_t> _ongoing_messages ;
private:
void sendDistantMsgItem(RsMsgItem *msgitem);
/** This contains the ongoing tunnel handling contacts.
* The map is indexed by the hash */
std::map<GRouterMsgPropagationId, uint32_t> _ongoing_messages;
std::map<RsGxsMailId, uint32_t> gxsOngoingMessages;
RsMutex gxsOngoingMutex;
// Overloaded from GRouterClientService // Overloaded from GRouterClientService
virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ; virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ;
virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ; virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ;
virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,const RsGxsId& signer_id,uint32_t data_status) ; virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,const RsGxsId& signer_id,uint32_t data_status) ;
@ -194,8 +208,9 @@ private:
std::map<uint32_t, RsMsgTagType*> mTags; std::map<uint32_t, RsMsgTagType*> mTags;
std::map<uint32_t, RsMsgTags*> mMsgTags; std::map<uint32_t, RsMsgTags*> mMsgTags;
uint32_t mMsgUniqueId; uint32_t mMsgUniqueId;
std::map<Sha1CheckSum,uint32_t> mRecentlyReceivedDistantMessageHashes; std::map<Sha1CheckSum, uint32_t> mRecentlyReceivedMessageHashes;
RsMutex recentlyReceivedMutex;
// used delete msgSrcIds after config save // used delete msgSrcIds after config save
std::map<uint32_t, RsMsgSrcId*> mSrcIds; std::map<uint32_t, RsMsgSrcId*> mSrcIds;
@ -211,6 +226,8 @@ private:
bool mDistantMessagingEnabled ; bool mDistantMessagingEnabled ;
uint32_t mDistantMessagePermissions ; uint32_t mDistantMessagePermissions ;
bool mShouldEnableDistantMessaging ; bool mShouldEnableDistantMessaging ;
p3GxsMails& gxsMailService;
}; };
#endif // MESSAGE_SERVICE_HEADER #endif // MESSAGE_SERVICE_HEADER