Merge pull request #243 from csoler/v0.6-Messaging

V0.6 messaging
This commit is contained in:
Cyril Soler 2016-01-08 23:08:14 -05:00
commit 56a844b58d
7 changed files with 469 additions and 226 deletions

View File

@ -1184,16 +1184,18 @@ void p3GRouter::autoWash()
#ifdef GROUTER_DEBUG
grouter_debug() << " Removing cached item " << std::hex << it->first << std::dec << std::endl;
#endif
GRouterClientService *client = NULL ;
GRouterServiceId service_id = 0;
//GRouterClientService *client = NULL ;
//GRouterServiceId service_id = 0;
if( it->second.data_status != RS_GROUTER_DATA_STATUS_DONE )
{
if(!locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client,service_id))
std::cerr << " ERROR: cannot find client for cancelled message " << it->first << std::endl;
else
failed_msgs[it->first] = client;
}
{
GRouterClientService *client = NULL;
if(locked_getLocallyRegisteredClientFromServiceId(it->second.client_id,client))
failed_msgs[it->first] = client ;
else
std::cerr << " ERROR: client id " << it->second.client_id << " not registered. Consistency error." << std::endl;
}
delete it->second.data_item ;
@ -1411,7 +1413,7 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
#endif
it->second.data_status = RS_GROUTER_DATA_STATUS_DONE;
if(locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client_service,service_id))
if(locked_getLocallyRegisteredClientFromServiceId(it->second.client_id,client_service))
mid = it->first ;
else
{
@ -1604,6 +1606,7 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
info.receipt_item = receipt_item ; // inited before, or NULL.
info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
info.last_sent_TS = 0 ;
info.client_id = data_item->service_id ;
info.item_hash = item_hash ;
info.last_tunnel_request_TS = 0 ;
info.sending_attempts = 0 ;
@ -1680,26 +1683,10 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
IndicateConfigChanged() ;
}
bool p3GRouter::locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id)
bool p3GRouter::locked_getLocallyRegisteredClientFromServiceId(const GRouterServiceId& service_id,GRouterClientService *& client)
{
client = NULL ;
service_id = 0;
RsGxsId gxs_id ;
if(!locked_getGxsIdAndClientId(hash,gxs_id,service_id))
{
std::cerr << " p3GRouter::ERROR: locked_getGxsIdAndClientId(): no key registered for hash " << hash << std::endl;
return false ;
}
if(gxs_id != destination_key)
{
std::cerr << " ERROR: verification (destination) GXS key " << destination_key << " does not match key from hash " << gxs_id << std::endl;
return false;
}
// now find the client given its id.
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(service_id) ;
if(its == _registered_services.end())
@ -1981,6 +1968,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
info.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
info.last_sent_TS = 0 ;
info.client_id = client_id ;
info.last_tunnel_request_TS = 0 ;
info.item_hash = computeDataItemHash(data_item) ;
info.sending_attempts = 0 ;
@ -2029,7 +2017,8 @@ Sha1CheckSum p3GRouter::makeTunnelHash(const RsGxsId& destination,const GRouterS
return RsDirUtil::sha1sum(bytes,20) ;
}
bool p3GRouter::locked_getGxsIdAndClientId(const TurtleFileHash& sum,RsGxsId& gxs_id,GRouterServiceId& client_id)
#ifdef TO_REMOVE
bool p3GRouter::locked_getGxsOwnIdAndClientIdFromHash(const TurtleFileHash& sum,RsGxsId& gxs_id,GRouterServiceId& client_id)
{
assert( gxs_id.SIZE_IN_BYTES == 16) ;
assert(Sha1CheckSum::SIZE_IN_BYTES == 20) ;
@ -2047,6 +2036,7 @@ bool p3GRouter::locked_getGxsIdAndClientId(const TurtleFileHash& sum,RsGxsId& gx
return true ;
}
#endif
bool p3GRouter::loadList(std::list<RsItem*>& items)
{
{
@ -2210,7 +2200,8 @@ void p3GRouter::debugDump()
for(std::map<Sha1CheckSum, GRouterPublishedKeyInfo>::const_iterator it(_owned_key_ids.begin());it!=_owned_key_ids.end();++it)
{
grouter_debug() << " Hash : " << it->first << std::endl;
grouter_debug() << " Hash : " << it->first << std::endl;
grouter_debug() << " Key : " << it->second.authentication_key << std::endl;
grouter_debug() << " Service id : " << std::hex << it->second.service_id << std::dec << std::endl;
grouter_debug() << " Description : " << it->second.description_string << std::endl;
}
@ -2226,15 +2217,17 @@ void p3GRouter::debugDump()
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
{
grouter_debug() << " Msg id : " << std::hex << it->first << std::dec ;
grouter_debug() << " data hash : " << it->second.item_hash ;
grouter_debug() << " Destination : " << it->second.data_item->destination_key ;
grouter_debug() << " Received : " << now - it->second.received_time_TS << " secs ago.";
grouter_debug() << " Last sent : " << now - it->second.last_sent_TS << " secs ago.";
grouter_debug() << " Transaction TS : " << now - it->second.data_transaction_TS << " secs ago.";
grouter_debug() << " Data Status : " << statusString[it->second.data_status] << std::endl;
grouter_debug() << " Tunl Status : " << statusString[it->second.tunnel_status] << std::endl;
grouter_debug() << " Receipt ok : " << (it->second.receipt_item != NULL) << std::endl;
grouter_debug() << " Msg id: " << std::hex << it->first << std::dec ;
grouter_debug() << " data hash: " << it->second.item_hash ;
grouter_debug() << " client id: " << std::hex << it->second.client_id << std::dec;
grouter_debug() << " Flags: " << std::hex << it->second.routing_flags << std::dec;
grouter_debug() << " Destination: " << it->second.data_item->destination_key ;
grouter_debug() << " Received: " << now - it->second.received_time_TS << " secs ago.";
grouter_debug() << " Last sent: " << now - it->second.last_sent_TS << " secs ago.";
grouter_debug() << " Transaction TS: " << now - it->second.data_transaction_TS << " secs ago.";
grouter_debug() << " Data Status: " << statusString[it->second.data_status] << std::endl;
grouter_debug() << " Tunl Status: " << statusString[it->second.tunnel_status] << std::endl;
grouter_debug() << " Receipt ok: " << (it->second.receipt_item != NULL) << std::endl;
}
grouter_debug() << " Tunnels: " << std::endl;

View File

@ -252,8 +252,7 @@ private:
void handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_item) ;
void handleIncomingDataItem(RsGRouterGenericDataItem *data_item) ;
bool locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id);
bool locked_getLocallyRegisteredClientFromServiceId(const GRouterServiceId& service_id,GRouterClientService *& client);
// utility functions
//
@ -270,7 +269,7 @@ private:
static Sha1CheckSum makeTunnelHash(const RsGxsId& destination,const GRouterServiceId& client);
bool locked_getGxsIdAndClientId(const TurtleFileHash &sum,RsGxsId& gxs_id,GRouterServiceId& client_id);
//bool locked_getGxsIdAndClientId(const TurtleFileHash &sum,RsGxsId& gxs_id,GRouterServiceId& client_id);
bool locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& item);
void locked_collectAvailableFriends(const GRouterKeyId &gxs_id,std::list<RsPeerId>& friend_peers, const std::set<RsPeerId>& incoming_routes,bool is_origin);

View File

@ -899,7 +899,142 @@ RsMsgGRouterMap* RsMsgSerialiser::deserialiseMsgGRouterMap(void* data, uint32_t*
/************************* end of definition of msgGRouterMap serialisation functions ************************/
/************************* definition of msgDistantMessageMap serialisation functions ************************/
std::ostream& RsMsgDistantMessagesHashMap::print(std::ostream& out, uint16_t indent)
{
printRsItemBase(out, "RsMsgDistantMessagesHashMap", indent);
uint16_t int_Indent = indent + 2;
for(std::map<Sha1CheckSum,uint32_t>::const_iterator it(hash_map.begin());it!=hash_map.end();++it)
{
printIndent(out, int_Indent);
out << " " << std::hex << it->first << std::dec << " : " << it->second << std::endl;
}
printRsItemEnd(out, "RsMsgDistantMessagesHashMap", indent);
return out;
}
void RsMsgDistantMessagesHashMap::clear()
{
hash_map.clear() ;
return;
}
uint32_t RsMsgDistantMessagesHashMap::serial_size(bool)
{
uint32_t s = 8; /* header */
s += 4; // number of entries
s += (Sha1CheckSum::SIZE_IN_BYTES+4)*hash_map.size(); // entries
return s;
}
bool RsMsgDistantMessagesHashMap::serialise(void *data, uint32_t& pktsize,bool config)
{
uint32_t tlvsize = serial_size(config) ;
uint32_t offset = 0;
if (pktsize < tlvsize)
return false; /* not enough space */
pktsize = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, PacketId(), tlvsize);
#ifdef RSSERIAL_DEBUG
std::cerr << "RsMsgSerialiser::serialiseMsgDistantMessagesHashMap() Header: " << ok << std::endl;
std::cerr << "RsMsgSerialiser::serialiseMsgDistantMessagesHashMap() Size: " << tlvsize << std::endl;
#endif
/* skip the header */
offset += 8;
ok &= setRawUInt32(data, tlvsize, &offset, hash_map.size());
for(std::map<Sha1CheckSum,uint32_t>::const_iterator it=hash_map.begin();ok && it!=hash_map.end();++it)
{
ok &= it->first.serialise(data, tlvsize, offset) ;
ok &= setRawUInt32(data, tlvsize, &offset, it->second);
}
if (offset != tlvsize)
{
ok = false;
std::cerr << "RsMsgSerialiser::serialiseMsgDistantMessagesHashMap() Size Error! " << std::endl;
}
return ok;
}
RsMsgDistantMessagesHashMap* RsMsgSerialiser::deserialiseMsgDistantMessageHashMap(void* data, uint32_t* pktsize)
{
/* get the type and size */
uint32_t rstype = getRsItemId(data);
uint32_t rssize = getRsItemSize(data);
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(RS_SERVICE_TYPE_MSG != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_MSG_DISTANT_MSG_MAP != getRsItemSubType(rstype)))
{
return NULL; /* wrong type */
}
if (*pktsize < rssize) /* check size */
return NULL; /* not enough data */
/* set the packet length */
*pktsize = rssize;
bool ok = true;
/* ready to load */
RsMsgDistantMessagesHashMap *item = new RsMsgDistantMessagesHashMap();
item->clear();
/* skip the header */
offset += 8;
uint32_t s=0 ;
/* get mandatory parts first */
ok &= getRawUInt32(data, rssize, &offset, &s);
for(uint32_t i=0;i<s && ok;++i)
{
Sha1CheckSum s ;
uint32_t tm ;
ok &= s.deserialise(data, rssize, offset) ;
ok &= getRawUInt32(data, rssize, &offset, &tm);
item->hash_map.insert(std::make_pair(s,tm)) ;
}
if (offset != rssize)
{
/* error */
delete item;
return NULL;
}
if (!ok)
{
delete item;
return NULL;
}
return item;
}
/************************************** Message ParentId **********************/
std::ostream& RsMsgParentId::print(std::ostream& out, uint16_t indent)

View File

@ -45,12 +45,13 @@
/**************************************************************************/
// for defining tags themselves and msg tags
const uint8_t RS_PKT_SUBTYPE_MSG_TAG_TYPE = 0x03;
const uint8_t RS_PKT_SUBTYPE_MSG_TAGS = 0x04;
const uint8_t RS_PKT_SUBTYPE_MSG_SRC_TAG = 0x05;
const uint8_t RS_PKT_SUBTYPE_MSG_PARENT_TAG = 0x06;
const uint8_t RS_PKT_SUBTYPE_MSG_INVITE = 0x07;
const uint8_t RS_PKT_SUBTYPE_MSG_GROUTER_MAP = 0x08;
const uint8_t RS_PKT_SUBTYPE_MSG_TAG_TYPE = 0x03;
const uint8_t RS_PKT_SUBTYPE_MSG_TAGS = 0x04;
const uint8_t RS_PKT_SUBTYPE_MSG_SRC_TAG = 0x05;
const uint8_t RS_PKT_SUBTYPE_MSG_PARENT_TAG = 0x06;
const uint8_t RS_PKT_SUBTYPE_MSG_INVITE = 0x07;
const uint8_t RS_PKT_SUBTYPE_MSG_GROUTER_MAP = 0x08;
const uint8_t RS_PKT_SUBTYPE_MSG_DISTANT_MSG_MAP = 0x09;
/**************************************************************************/
@ -226,7 +227,23 @@ class RsMsgGRouterMap : public RsMessageItem
//
std::map<GRouterMsgPropagationId,uint32_t> ongoing_msgs ;
};
class RsMsgDistantMessagesHashMap : public RsMessageItem
{
public:
RsMsgDistantMessagesHashMap() : RsMessageItem(RS_PKT_SUBTYPE_MSG_DISTANT_MSG_MAP) {}
std::ostream &print(std::ostream &out, uint16_t indent = 0);
virtual bool serialise(void *data,uint32_t& size,bool config) ;
virtual uint32_t serial_size(bool config) ;
virtual ~RsMsgDistantMessagesHashMap() {}
virtual void clear();
// ----------- Specific fields ------------- //
//
std::map<Sha1CheckSum,uint32_t> hash_map ;
};
class RsMsgParentId : public RsMessageItem
{
public:
@ -275,7 +292,8 @@ class RsMsgSerialiser: public RsSerialType
virtual RsMsgSrcId *deserialiseMsgSrcIdItem(void *data, uint32_t *size);
virtual RsMsgParentId *deserialiseMsgParentIdItem(void *data, uint32_t *size);
virtual RsPublicMsgInviteConfigItem *deserialisePublicMsgInviteConfigItem(void *data, uint32_t *size);
virtual RsMsgGRouterMap *deserialiseMsgGRouterMap(void *data, uint32_t *size);
virtual RsMsgGRouterMap *deserialiseMsgGRouterMap(void *data, uint32_t *size);
virtual RsMsgDistantMessagesHashMap *deserialiseMsgDistantMessageHashMap(void *data, uint32_t *size);
bool m_bConfiguration; // is set to true for saving configuration (enables serialising msgId)
};

View File

@ -80,11 +80,14 @@ const int msgservicezone = 54319;
p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv)
:p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(time(NULL))
:p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0)
{
_serialiser = new RsMsgSerialiser();
_serialiser = new RsMsgSerialiser(); // this serialiser is used for services. It's not the same than the one returned by setupSerialiser(). We need both!!
addSerialType(_serialiser);
mMsgUniqueId = RSRandom::random_u32() ; // better than time(NULL). We don't need crypto-safe random here. Just something much likely
// different from what friends use.
mShouldEnableDistantMessaging = true ;
mDistantMessagingEnabled = false ;
mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ;
@ -152,7 +155,7 @@ int p3MsgService::status()
return 1;
}
void p3MsgService::processMsg(RsMsgItem *mi, bool incoming)
void p3MsgService::processIncomingMsg(RsMsgItem *mi)
{
mi -> recvTime = time(NULL);
mi -> msgId = getNewUniqueMsgId();
@ -160,26 +163,19 @@ void p3MsgService::processMsg(RsMsgItem *mi, bool incoming)
{
RsStackMutex stack(mMsgMtx); /*** STACK LOCKED MTX ***/
if (incoming)
/* from a peer */
mi->msgFlags &= (RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_SYSTEM); // remove flags except those
mi->msgFlags |= RS_MSG_FLAGS_NEW;
p3Notify *notify = RsServer::notify();
if (notify)
{
/* from a peer */
notify->AddPopupMessage(RS_POPUP_MSG, mi->PeerId().toStdString(), mi->subject, mi->message);
mi->msgFlags &= (RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_SYSTEM); // remove flags except those
mi->msgFlags |= RS_MSG_FLAGS_NEW;
p3Notify *notify = RsServer::notify();
if (notify)
{
notify->AddPopupMessage(RS_POPUP_MSG, mi->PeerId().toStdString(), mi->subject, mi->message);
std::string out;
rs_sprintf(out, "%lu", mi->msgId);
notify->AddFeedItem(RS_FEED_ITEM_MESSAGE, out, "", "");
}
}
else
{
mi->msgFlags |= RS_MSG_OUTGOING;
std::string out;
rs_sprintf(out, "%lu", mi->msgId);
notify->AddFeedItem(RS_FEED_ITEM_MESSAGE, out, "", "");
}
imsg[mi->msgId] = mi;
@ -187,13 +183,12 @@ void p3MsgService::processMsg(RsMsgItem *mi, bool incoming)
msi->msgId = mi->msgId;
msi->srcId = mi->PeerId();
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi));
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
/**** STACK UNLOCKED ***/
}
if (incoming)
{
// If the peer is allowed to push files, then auto-download the recommended files.
if(rsPeers->servicePermissionFlags(mi->PeerId()) & RS_NODE_PERM_ALLOW_PUSH)
{
@ -203,7 +198,6 @@ void p3MsgService::processMsg(RsMsgItem *mi, bool incoming)
for(std::list<RsTlvFileItem>::const_iterator it(mi->attachment.items.begin());it!=mi->attachment.items.end();++it)
rsFiles->FileRequest((*it).name,(*it).hash,(*it).filesize,std::string(),RS_FILE_REQ_ANONYMOUS_ROUTING,srcIds) ;
}
}
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
}
@ -271,7 +265,7 @@ void p3MsgService::handleIncomingItem(RsMsgItem *mi)
if(checkAndRebuildPartialMessage(mi)) // only returns true when a msg is complete.
{
processMsg(mi, true);
processIncomingMsg(mi);
changed = true ;
}
if(changed)
@ -333,96 +327,118 @@ void p3MsgService::checkSizeAndSendMessage(RsMsgItem *msg)
int p3MsgService::checkOutgoingMessages()
{
/* iterate through the outgoing queue
/* iterate through the outgoing queue
*
* if online, send
*/
bool changed = false ;
std::list<RsMsgItem*> output_queue ;
static const uint32_t OLD_MESSAGE_FLUSHING_DELAY = 86400*7 ; // re-send old messages every week. This mainly ensures that
// messages that where never sent get sent at some point.
time_t now = time(NULL);
bool changed = false ;
std::list<RsMsgItem*> output_queue ;
{
const RsPeerId& ownId = mServiceCtrl->getOwnId();
{
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
const RsPeerId& ownId = mServiceCtrl->getOwnId();
std::list<uint32_t>::iterator it;
std::list<uint32_t> toErase;
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
std::list<uint32_t>::iterator it;
std::list<uint32_t> toErase;
std::map<uint32_t, RsMsgItem *>::iterator mit;
for(mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit)
{
if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH)
continue;
std::map<uint32_t, RsMsgItem *>::iterator mit;
for(mit = msgOutgoing.begin(); mit != msgOutgoing.end(); ++mit)
{
if (mit->second->msgFlags & RS_MSG_FLAGS_TRASH)
continue;
/* find the certificate */
RsPeerId pid = mit->second->PeerId();
/* find the certificate */
RsPeerId pid = mit->second->PeerId();
bool should_send = false ;
if( pid == ownId
|| ( (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && (!(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED)))
|| mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */
{
/* send msg */
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::checkOutGoingMessages() Sending out message");
/* remove the pending flag */
if( pid == ownId)
should_send = true ;
output_queue.push_back(mit->second) ;
if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */
should_send = true ;
// When the message is a distant msg, dont remove it yet from the list. Only mark it as being sent, so that we don't send it again.
//
if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT))
{
(mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING;
toErase.push_back(mit->first);
changed = true ;
}
else
if (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT)
{
if(!(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED))
should_send = true ;
if(mit->second->sendTime + OLD_MESSAGE_FLUSHING_DELAY < now)
{
should_send = true ;
mit->second->sendTime = now;
}
}
if(should_send)
{
/* send msg */
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::checkOutGoingMessages() Sending out message");
/* remove the pending flag */
output_queue.push_back(mit->second) ;
// When the message is a distant msg, dont remove it yet from the list. Only mark it as being sent, so that we don't send it again.
//
if(!(mit->second->msgFlags & RS_MSG_FLAGS_DISTANT))
{
(mit->second)->msgFlags &= ~RS_MSG_FLAGS_PENDING;
toErase.push_back(mit->first);
changed = true ;
}
else
{
#ifdef DEBUG_DISTANT_MSG
std::cerr << "Message id " << mit->first << " is distant: kept in outgoing, and marked as ROUTED" << std::endl;
std::cerr << "Message id " << mit->first << " is distant: kept in outgoing, and marked as ROUTED" << std::endl;
#endif
mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED ;
}
}
else
{
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::checkOutGoingMessages() Delaying until available...");
}
}
mit->second->msgFlags |= RS_MSG_FLAGS_ROUTED ;
}
}
else
{
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::checkOutGoingMessages() Delaying until available...");
}
}
/* clean up */
for(it = toErase.begin(); it != toErase.end(); ++it)
{
mit = msgOutgoing.find(*it);
if (mit != msgOutgoing.end())
{
msgOutgoing.erase(mit);
}
/* clean up */
for(it = toErase.begin(); it != toErase.end(); ++it)
{
mit = msgOutgoing.find(*it);
if (mit != msgOutgoing.end())
{
msgOutgoing.erase(mit);
}
std::map<uint32_t, RsMsgSrcId*>::iterator srcIt = mSrcIds.find(*it);
if (srcIt != mSrcIds.end()) {
delete (srcIt->second);
mSrcIds.erase(srcIt);
}
}
std::map<uint32_t, RsMsgSrcId*>::iterator srcIt = mSrcIds.find(*it);
if (srcIt != mSrcIds.end()) {
delete (srcIt->second);
mSrcIds.erase(srcIt);
}
}
if (toErase.size() > 0)
{
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
}
}
if (toErase.size() > 0)
{
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
}
}
for(std::list<RsMsgItem*>::const_iterator it(output_queue.begin());it!=output_queue.end();++it)
if((*it)->msgFlags & RS_MSG_FLAGS_DISTANT) // don't split distant messages. The global router takes care of it.
sendDistantMsgItem(*it) ;
else
checkSizeAndSendMessage(*it) ;
if((*it)->msgFlags & RS_MSG_FLAGS_DISTANT) // don't split distant messages. The global router takes care of it.
sendDistantMsgItem(*it) ;
else
checkSizeAndSendMessage(*it) ;
if(changed)
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD);
if(changed)
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_MOD);
return 0;
return 0;
}
bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
@ -463,6 +479,10 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
grmap->ongoing_msgs = _ongoing_messages ;
itemList.push_back(grmap) ;
RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap ;
ghm->hash_map = mRecentlyReceivedDistantMessageHashes ;
itemList.push_back(ghm) ;
RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ;
RsTlvKeyValue kv;
@ -485,7 +505,7 @@ void p3MsgService::saveDone()
mMsgMtx.unlock();
}
RsSerialiser* p3MsgService::setupSerialiser()
RsSerialiser* p3MsgService::setupSerialiser() // this serialiser is used for config. So it adds somemore info in the serialised items
{
RsSerialiser *rss = new RsSerialiser ;
@ -544,7 +564,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
RsMsgSrcId* msi;
RsMsgParentId* msp;
RsMsgGRouterMap* grm;
// RsPublicMsgInviteConfigItem* msv;
RsMsgDistantMessagesHashMap *ghm;
std::list<RsMsgItem*> items;
std::list<RsItem*>::iterator it;
@ -552,6 +572,8 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
std::map<uint32_t, RsPeerId> srcIdMsgMap;
std::map<uint32_t, RsPeerId>::iterator srcIt;
uint32_t max_msg_id = 0 ;
// load items and calculate next unique msgId
for(it = load.begin(); it != load.end(); ++it)
{
@ -559,9 +581,9 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
{
/* STORE MsgID */
if (mitem->msgId >= mMsgUniqueId) {
mMsgUniqueId = mitem->msgId + 1;
}
if (mitem->msgId > max_msg_id)
max_msg_id = mitem->msgId ;
items.push_back(mitem);
}
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
@ -570,6 +592,10 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it)
_ongoing_messages.insert(*it) ;
}
else if(NULL != (ghm = dynamic_cast<RsMsgDistantMessagesHashMap*>(*it)))
{
mRecentlyReceivedDistantMessageHashes = ghm->hash_map ;
}
else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it)))
{
// delete standard tags as they are now save in config
@ -636,6 +662,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
continue ;
}
}
mMsgUniqueId = max_msg_id + 1; // make it unique with respect to what was loaded. Not totally safe, but works 99.9999% of the cases.
load.clear() ;
// sort items into lists
@ -1030,61 +1057,85 @@ bool p3MsgService::setMsgParentId(uint32_t msgId, uint32_t msgParentId)
/****************************************/
/****************************************/
/* Message Items */
uint32_t p3MsgService::sendMessage(RsMsgItem *item)
uint32_t p3MsgService::sendMessage(RsMsgItem *item) // no from field because it's implicitly our own PeerId
{
if(!item)
return 0 ;
return 0 ;
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::sendMessage()");
pqioutput(PQL_DEBUG_BASIC, msgservicezone, "p3MsgService::sendMessage()");
item -> msgId = getNewUniqueMsgId(); /* grabs Mtx as well */
item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */
item->msgFlags |= (RS_MSG_FLAGS_OUTGOING | RS_MSG_FLAGS_PENDING); /* add pending flag */
{
RS_STACK_MUTEX(mMsgMtx) ;
/* STORE MsgID */
msgOutgoing[item->msgId] = item;
if (item->PeerId() != mServiceCtrl->getOwnId())
{
/* not to the loopback device */
RsMsgSrcId* msi = new RsMsgSrcId();
msi->msgId = item->msgId;
msi->srcId = mServiceCtrl->getOwnId();
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi));
}
}
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST, NOTIFY_TYPE_ADD);
return item->msgId;
}
uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& from)
{
if(!item)
return 0 ;
item->msgId = getNewUniqueMsgId(); /* grabs Mtx as well */
item->msgFlags |= (RS_MSG_FLAGS_DISTANT | RS_MSG_FLAGS_OUTGOING | RS_MSG_FLAGS_PENDING); /* add pending flag */
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
RS_STACK_MUTEX(mMsgMtx) ;
/* add pending flag */
item->msgFlags |= (RS_MSG_FLAGS_OUTGOING | RS_MSG_FLAGS_PENDING);
/* STORE MsgID */
msgOutgoing[item->msgId] = item;
mDistantOutgoingMsgSigners[item->msgId] = from ;
if (item->PeerId() != mServiceCtrl->getOwnId()) {
if (item->PeerId() != mServiceCtrl->getOwnId())
{
/* not to the loopback device */
RsMsgSrcId* msi = new RsMsgSrcId();
msi->msgId = item->msgId;
msi->srcId = item->PeerId();
msi->srcId = RsPeerId(from) ;
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi));
}
}
}
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST, NOTIFY_TYPE_ADD);
return item->msgId;
}
uint32_t p3MsgService::sendDistantMessage(RsMsgItem *item,const RsGxsId& from)
{
uint32_t msg_id = sendMessage(item) ;
return item->msgId;
RS_STACK_MUTEX(mMsgMtx) ;
mDistantOutgoingMsgSigners[msg_id] = from ;
return msg_id ;
}
bool p3MsgService::MessageSend(MessageInfo &info)
{
for(std::set<RsPeerId>::const_iterator pit = info.rspeerid_msgto.begin(); pit != info.rspeerid_msgto.end(); ++pit) sendMessage(initMIRsMsg(info, *pit));
for(std::set<RsPeerId>::const_iterator pit = info.rspeerid_msgcc.begin(); pit != info.rspeerid_msgcc.end(); ++pit) sendMessage(initMIRsMsg(info, *pit));
for(std::set<RsPeerId>::const_iterator pit = info.rspeerid_msgbcc.begin(); pit != info.rspeerid_msgbcc.end(); ++pit) sendMessage(initMIRsMsg(info, *pit));
for(std::set<RsPeerId>::const_iterator pit = info.rspeerid_msgto.begin(); pit != info.rspeerid_msgto.end(); ++pit) sendMessage(initMIRsMsg(info, *pit));
for(std::set<RsPeerId>::const_iterator pit = info.rspeerid_msgcc.begin(); pit != info.rspeerid_msgcc.end(); ++pit) sendMessage(initMIRsMsg(info, *pit));
for(std::set<RsPeerId>::const_iterator pit = info.rspeerid_msgbcc.begin(); pit != info.rspeerid_msgbcc.end(); ++pit) sendMessage(initMIRsMsg(info, *pit));
for(std::set<RsGxsId>::const_iterator pit = info.rsgxsid_msgto.begin(); pit != info.rsgxsid_msgto.end(); ++pit) sendDistantMessage(initMIRsMsg(info, *pit),info.rsgxsid_srcId);
for(std::set<RsGxsId>::const_iterator pit = info.rsgxsid_msgcc.begin(); pit != info.rsgxsid_msgcc.end(); ++pit) sendDistantMessage(initMIRsMsg(info, *pit),info.rsgxsid_srcId);
for(std::set<RsGxsId>::const_iterator pit = info.rsgxsid_msgbcc.begin(); pit != info.rsgxsid_msgbcc.end(); ++pit) sendDistantMessage(initMIRsMsg(info, *pit),info.rsgxsid_srcId);
for(std::set<RsGxsId>::const_iterator pit = info.rsgxsid_msgto.begin(); pit != info.rsgxsid_msgto.end(); ++pit) sendDistantMessage(initMIRsMsg(info, *pit),info.rsgxsid_srcId);
for(std::set<RsGxsId>::const_iterator pit = info.rsgxsid_msgcc.begin(); pit != info.rsgxsid_msgcc.end(); ++pit) sendDistantMessage(initMIRsMsg(info, *pit),info.rsgxsid_srcId);
for(std::set<RsGxsId>::const_iterator pit = info.rsgxsid_msgbcc.begin(); pit != info.rsgxsid_msgbcc.end(); ++pit) sendDistantMessage(initMIRsMsg(info, *pit),info.rsgxsid_srcId);
// store message in outgoing list. In order to appear as sent the message needs to have the OUTGOING flg, but no pending flag on.
/* send to ourselves as well */
RsMsgItem *msg = initMIRsMsg(info, mServiceCtrl->getOwnId());
if (msg)
@ -1095,10 +1146,17 @@ bool p3MsgService::MessageSend(MessageInfo &info)
msg->msgFlags |= RS_MSG_FLAGS_SIGNATURE_CHECKS; // this is always true, since we are sending the message
/* use processMsg to get the new msgId */
processMsg(msg, false);
msg->recvTime = time(NULL);
msg->msgId = getNewUniqueMsgId();
msg->msgFlags |= RS_MSG_OUTGOING;
// return new message id
rs_sprintf(info.msgId, "%lu", msg->msgId);
imsg[msg->msgId] = msg;
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
//
// // return new message id
// rs_sprintf(info.msgId, "%lu", msg->msgId);
}
return true;
@ -1138,7 +1196,7 @@ bool p3MsgService::SystemMessage(const std::string &title, const std::string &me
msg->rspeerid_msgto.ids.insert(ownId);
processMsg(msg, true);
processIncomingMsg(msg);
return true;
}
@ -1782,44 +1840,67 @@ void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t d
{
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
{
std::cerr << __PRETTY_FUNCTION__ << ": Not fully implemented. The global router fails to send apacket, but we don't deal with it. Please remind the devs to do it" << std::endl;
return ;
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells us that item ID " << id << " could not be delivered on time." ;
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
if(it == _ongoing_messages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
return ;
}
uint32_t msg_id = it->second ;
std::cerr << " message id = " << msg_id << std::endl;
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id) ;
if(mit == msgOutgoing.end())
{
std::cerr << " (EE) message has been notified as not delivered, but it not on outgoing list. Something's wrong!!" << std::endl;
return ;
}
std::cerr << " reseting the ROUTED flag so that the message is requested again" << std::endl;
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED ; // clear the routed flag so that the message is requested again
return ;
}
if(data_status != GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED)
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED)
{
std::cerr << "p3MsgService: unhandled data status info from global router for msg ID " << id << ": this is a bug." << std::endl;
return ;
}
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
#ifdef DEBUG_DISTANT_MSG
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
#endif
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
if(it == _ongoing_messages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
return ;
}
if(it == _ongoing_messages.end())
{
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
return ;
}
uint32_t msg_id = it->second ;
uint32_t msg_id = it->second ;
// we should now remove the item from the msgOutgoing list.
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id) ;
// we should now remove the item from the msgOutgoing list.
if(it2 == msgOutgoing.end())
{
std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl;
return ;
}
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id) ;
delete it2->second ;
msgOutgoing.erase(it2) ;
if(it2 == msgOutgoing.end())
{
std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl;
return ;
}
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
IndicateConfigChanged() ;
delete it2->second ;
msgOutgoing.erase(it2) ;
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
IndicateConfigChanged() ;
return ;
}
std::cerr << "p3MsgService: unhandled data status info from global router for msg ID " << id << ": this is a bug." << std::endl;
}
bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id)
{
@ -1851,8 +1932,19 @@ void p3MsgService::receiveGRouterData(const RsGxsId& destination_key, const RsGx
{
std::cerr << "p3MsgService::receiveGRouterData(): received message item of size " << data_size << ", for key " << destination_key << std::endl;
// first make sure that we havn't already received the data. Since we allow to re-send messages, it's necessary to check.
Sha1CheckSum hash = RsDirUtil::sha1sum(data,data_size) ;
if(mRecentlyReceivedDistantMessageHashes.find(hash) != mRecentlyReceivedDistantMessageHashes.end())
{
std::cerr << "(WW) receiving distant message of hash " << hash << " more than once. This is not a bug, unless it happens very often." << std::endl;
free(data) ;
return ;
}
mRecentlyReceivedDistantMessageHashes[hash] = time(NULL) ;
RsItem *item = _serialiser->deserialise(data,&data_size) ;
free(data) ;
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ;
@ -1861,9 +1953,9 @@ void p3MsgService::receiveGRouterData(const RsGxsId& destination_key, const RsGx
{
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ;
/* we expect complete msgs - remove partial flag just in case someone has funny ideas */
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ;
/* we expect complete msgs - remove partial flag just in case someone has funny ideas */
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id.
handleIncomingItem(msg_item) ;
@ -1899,12 +1991,12 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
std::cerr << " signing : " << signing_key_id << std::endl;
#endif
// The item is serialized and turned into a generic turtle item.
// The item is serialized and turned into a generic turtle item. Use use the explicit serialiser to make sure that the msgId is not included
uint32_t msg_serialized_rssize = _serialiser->size(msgitem) ;
uint32_t msg_serialized_rssize = msgitem->serial_size(false) ;
unsigned char *msg_serialized_data = new unsigned char[msg_serialized_rssize] ;
if(!_serialiser->serialise(msgitem,msg_serialized_data,&msg_serialized_rssize))
if(!msgitem->serialise(msg_serialized_data,msg_serialized_rssize,false))
{
std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error." << std::endl;
delete[] msg_serialized_data ;

View File

@ -160,7 +160,7 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
void checkSizeAndSendMessage(RsMsgItem *msg);
int incomingMsgs();
void processMsg(RsMsgItem *mi, bool incoming);
void processIncomingMsg(RsMsgItem *mi) ;
bool checkAndRebuildPartialMessage(RsMsgItem*) ;
void initRsMI(RsMsgItem *msg, Rs::Msgs::MessageInfo &mi);
@ -194,6 +194,7 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
std::map<uint32_t, RsMsgTags*> mMsgTags;
uint32_t mMsgUniqueId;
std::map<Sha1CheckSum,uint32_t> mRecentlyReceivedDistantMessageHashes;
// used delete msgSrcIds after config save
std::map<uint32_t, RsMsgSrcId*> mSrcIds;

View File

@ -852,8 +852,6 @@ void MessagesDialog::insertMessages()
rsMail -> getMessageSummaries(msgList);
std::cerr << "MessagesDialog::insertMessages()" << std::endl;
int filterColumn = ui.filterLineEdit->currentFilter();
/* check the mode we are in */
@ -1098,25 +1096,34 @@ void MessagesDialog::insertMessages()
// From ....
{
bool setText = true;
if (msgbox == RS_MSG_INBOX || msgbox == RS_MSG_OUTBOX) {
if (msgbox == RS_MSG_INBOX || msgbox == RS_MSG_OUTBOX)
{
if ((it->msgflags & RS_MSG_SYSTEM) && it->srcId == ownId) {
text = "RetroShare";
} else {
}
else
{
if (it->msgflags & RS_MSG_DISTANT)
{
// distant message
setText = false;
if (gotInfo || rsMail->getMessage(it->msgId, msgInfo)) {
gotInfo = true;
item->setId(RsGxsId(msgInfo.rsgxsid_srcId), COLUMN_FROM, false);
} else {
if(msgbox != RS_MSG_INBOX && !msgInfo.rsgxsid_msgto.empty())
item->setId(RsGxsId(*msgInfo.rsgxsid_msgto.begin()), COLUMN_FROM, false);
else
item->setId(RsGxsId(msgInfo.rsgxsid_srcId), COLUMN_FROM, false);
}
else
std::cerr << "MessagesDialog::insertMsgTxtAndFiles() Couldn't find Msg" << std::endl;
}
} else {
}
else
text = QString::fromUtf8(rsPeers->getPeerName(it->srcId).c_str());
}
}
} else {
}
else
{
if (gotInfo || rsMail->getMessage(it->msgId, msgInfo)) {
gotInfo = true;
@ -1459,8 +1466,6 @@ void MessagesDialog::setMsgStar(const QList<QTreeWidgetItem*> &items, bool star)
void MessagesDialog::insertMsgTxtAndFiles(QTreeWidgetItem *item, bool bSetToRead)
{
std::cerr << "MessagesDialog::insertMsgTxtAndFiles()" << std::endl;
/* get its Ids */
std::string cid;
std::string mid;