mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-12-28 17:09:34 -05:00
added auto re-send of distant messages after global router notified the message cannot be sent. Added a map n p3msgService to avoid receiving multiple times the same message.
This commit is contained in:
parent
9651f430d5
commit
0c591f08ae
@ -899,7 +899,142 @@ RsMsgGRouterMap* RsMsgSerialiser::deserialiseMsgGRouterMap(void* data, uint32_t*
|
|||||||
|
|
||||||
/************************* end of definition of msgGRouterMap serialisation functions ************************/
|
/************************* end of definition of msgGRouterMap serialisation functions ************************/
|
||||||
|
|
||||||
|
/************************* definition of msgDistantMessageMap serialisation functions ************************/
|
||||||
|
|
||||||
|
std::ostream& RsMsgDistantMessagesHashMap::print(std::ostream& out, uint16_t indent)
|
||||||
|
{
|
||||||
|
printRsItemBase(out, "RsMsgDistantMessagesHashMap", indent);
|
||||||
|
uint16_t int_Indent = indent + 2;
|
||||||
|
|
||||||
|
for(std::map<Sha1CheckSum,uint32_t>::const_iterator it(hash_map.begin());it!=hash_map.end();++it)
|
||||||
|
{
|
||||||
|
printIndent(out, int_Indent);
|
||||||
|
out << " " << std::hex << it->first << std::dec << " : " << it->second << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
printRsItemEnd(out, "RsMsgDistantMessagesHashMap", indent);
|
||||||
|
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RsMsgDistantMessagesHashMap::clear()
|
||||||
|
{
|
||||||
|
hash_map.clear() ;
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t RsMsgDistantMessagesHashMap::serial_size(bool)
|
||||||
|
{
|
||||||
|
uint32_t s = 8; /* header */
|
||||||
|
|
||||||
|
s += 4; // number of entries
|
||||||
|
s += (Sha1CheckSum::SIZE_IN_BYTES+4)*hash_map.size(); // entries
|
||||||
|
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RsMsgDistantMessagesHashMap::serialise(void *data, uint32_t& pktsize,bool config)
|
||||||
|
{
|
||||||
|
uint32_t tlvsize = serial_size(config) ;
|
||||||
|
uint32_t offset = 0;
|
||||||
|
|
||||||
|
if (pktsize < tlvsize)
|
||||||
|
return false; /* not enough space */
|
||||||
|
|
||||||
|
pktsize = tlvsize;
|
||||||
|
|
||||||
|
bool ok = true;
|
||||||
|
|
||||||
|
ok &= setRsItemHeader(data, tlvsize, PacketId(), tlvsize);
|
||||||
|
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsMsgSerialiser::serialiseMsgDistantMessagesHashMap() Header: " << ok << std::endl;
|
||||||
|
std::cerr << "RsMsgSerialiser::serialiseMsgDistantMessagesHashMap() Size: " << tlvsize << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* skip the header */
|
||||||
|
offset += 8;
|
||||||
|
|
||||||
|
ok &= setRawUInt32(data, tlvsize, &offset, hash_map.size());
|
||||||
|
|
||||||
|
for(std::map<Sha1CheckSum,uint32_t>::const_iterator it=hash_map.begin();ok && it!=hash_map.end();++it)
|
||||||
|
{
|
||||||
|
ok &= it->first.serialise(data, tlvsize, offset) ;
|
||||||
|
ok &= setRawUInt32(data, tlvsize, &offset, it->second);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offset != tlvsize)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
std::cerr << "RsMsgSerialiser::serialiseMsgDistantMessagesHashMap() Size Error! " << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
RsMsgDistantMessagesHashMap* RsMsgSerialiser::deserialiseMsgDistantMessageHashMap(void* data, uint32_t* pktsize)
|
||||||
|
{
|
||||||
|
/* get the type and size */
|
||||||
|
uint32_t rstype = getRsItemId(data);
|
||||||
|
uint32_t rssize = getRsItemSize(data);
|
||||||
|
|
||||||
|
uint32_t offset = 0;
|
||||||
|
|
||||||
|
|
||||||
|
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
|
||||||
|
(RS_SERVICE_TYPE_MSG != getRsItemService(rstype)) ||
|
||||||
|
(RS_PKT_SUBTYPE_MSG_DISTANT_MSG_MAP != getRsItemSubType(rstype)))
|
||||||
|
{
|
||||||
|
return NULL; /* wrong type */
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*pktsize < rssize) /* check size */
|
||||||
|
return NULL; /* not enough data */
|
||||||
|
|
||||||
|
/* set the packet length */
|
||||||
|
*pktsize = rssize;
|
||||||
|
|
||||||
|
bool ok = true;
|
||||||
|
|
||||||
|
/* ready to load */
|
||||||
|
RsMsgDistantMessagesHashMap *item = new RsMsgDistantMessagesHashMap();
|
||||||
|
item->clear();
|
||||||
|
|
||||||
|
/* skip the header */
|
||||||
|
offset += 8;
|
||||||
|
|
||||||
|
uint32_t s=0 ;
|
||||||
|
|
||||||
|
/* get mandatory parts first */
|
||||||
|
ok &= getRawUInt32(data, rssize, &offset, &s);
|
||||||
|
|
||||||
|
for(uint32_t i=0;i<s && ok;++i)
|
||||||
|
{
|
||||||
|
Sha1CheckSum s ;
|
||||||
|
uint32_t tm ;
|
||||||
|
|
||||||
|
ok &= s.deserialise(data, rssize, offset) ;
|
||||||
|
ok &= getRawUInt32(data, rssize, &offset, &tm);
|
||||||
|
|
||||||
|
item->hash_map.insert(std::make_pair(s,tm)) ;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offset != rssize)
|
||||||
|
{
|
||||||
|
/* error */
|
||||||
|
delete item;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ok)
|
||||||
|
{
|
||||||
|
delete item;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return item;
|
||||||
|
}
|
||||||
/************************************** Message ParentId **********************/
|
/************************************** Message ParentId **********************/
|
||||||
|
|
||||||
std::ostream& RsMsgParentId::print(std::ostream& out, uint16_t indent)
|
std::ostream& RsMsgParentId::print(std::ostream& out, uint16_t indent)
|
||||||
|
@ -45,12 +45,13 @@
|
|||||||
/**************************************************************************/
|
/**************************************************************************/
|
||||||
|
|
||||||
// for defining tags themselves and msg tags
|
// for defining tags themselves and msg tags
|
||||||
const uint8_t RS_PKT_SUBTYPE_MSG_TAG_TYPE = 0x03;
|
const uint8_t RS_PKT_SUBTYPE_MSG_TAG_TYPE = 0x03;
|
||||||
const uint8_t RS_PKT_SUBTYPE_MSG_TAGS = 0x04;
|
const uint8_t RS_PKT_SUBTYPE_MSG_TAGS = 0x04;
|
||||||
const uint8_t RS_PKT_SUBTYPE_MSG_SRC_TAG = 0x05;
|
const uint8_t RS_PKT_SUBTYPE_MSG_SRC_TAG = 0x05;
|
||||||
const uint8_t RS_PKT_SUBTYPE_MSG_PARENT_TAG = 0x06;
|
const uint8_t RS_PKT_SUBTYPE_MSG_PARENT_TAG = 0x06;
|
||||||
const uint8_t RS_PKT_SUBTYPE_MSG_INVITE = 0x07;
|
const uint8_t RS_PKT_SUBTYPE_MSG_INVITE = 0x07;
|
||||||
const uint8_t RS_PKT_SUBTYPE_MSG_GROUTER_MAP = 0x08;
|
const uint8_t RS_PKT_SUBTYPE_MSG_GROUTER_MAP = 0x08;
|
||||||
|
const uint8_t RS_PKT_SUBTYPE_MSG_DISTANT_MSG_MAP = 0x09;
|
||||||
|
|
||||||
|
|
||||||
/**************************************************************************/
|
/**************************************************************************/
|
||||||
@ -226,7 +227,23 @@ class RsMsgGRouterMap : public RsMessageItem
|
|||||||
//
|
//
|
||||||
std::map<GRouterMsgPropagationId,uint32_t> ongoing_msgs ;
|
std::map<GRouterMsgPropagationId,uint32_t> ongoing_msgs ;
|
||||||
};
|
};
|
||||||
|
class RsMsgDistantMessagesHashMap : public RsMessageItem
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RsMsgDistantMessagesHashMap() : RsMessageItem(RS_PKT_SUBTYPE_MSG_DISTANT_MSG_MAP) {}
|
||||||
|
|
||||||
|
std::ostream &print(std::ostream &out, uint16_t indent = 0);
|
||||||
|
|
||||||
|
virtual bool serialise(void *data,uint32_t& size,bool config) ;
|
||||||
|
virtual uint32_t serial_size(bool config) ;
|
||||||
|
|
||||||
|
virtual ~RsMsgDistantMessagesHashMap() {}
|
||||||
|
virtual void clear();
|
||||||
|
|
||||||
|
// ----------- Specific fields ------------- //
|
||||||
|
//
|
||||||
|
std::map<Sha1CheckSum,uint32_t> hash_map ;
|
||||||
|
};
|
||||||
class RsMsgParentId : public RsMessageItem
|
class RsMsgParentId : public RsMessageItem
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -275,7 +292,8 @@ class RsMsgSerialiser: public RsSerialType
|
|||||||
virtual RsMsgSrcId *deserialiseMsgSrcIdItem(void *data, uint32_t *size);
|
virtual RsMsgSrcId *deserialiseMsgSrcIdItem(void *data, uint32_t *size);
|
||||||
virtual RsMsgParentId *deserialiseMsgParentIdItem(void *data, uint32_t *size);
|
virtual RsMsgParentId *deserialiseMsgParentIdItem(void *data, uint32_t *size);
|
||||||
virtual RsPublicMsgInviteConfigItem *deserialisePublicMsgInviteConfigItem(void *data, uint32_t *size);
|
virtual RsPublicMsgInviteConfigItem *deserialisePublicMsgInviteConfigItem(void *data, uint32_t *size);
|
||||||
virtual RsMsgGRouterMap *deserialiseMsgGRouterMap(void *data, uint32_t *size);
|
virtual RsMsgGRouterMap *deserialiseMsgGRouterMap(void *data, uint32_t *size);
|
||||||
|
virtual RsMsgDistantMessagesHashMap *deserialiseMsgDistantMessageHashMap(void *data, uint32_t *size);
|
||||||
|
|
||||||
bool m_bConfiguration; // is set to true for saving configuration (enables serialising msgId)
|
bool m_bConfiguration; // is set to true for saving configuration (enables serialising msgId)
|
||||||
};
|
};
|
||||||
|
@ -80,11 +80,14 @@ const int msgservicezone = 54319;
|
|||||||
|
|
||||||
|
|
||||||
p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv)
|
p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv)
|
||||||
:p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(time(NULL))
|
:p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0)
|
||||||
{
|
{
|
||||||
_serialiser = new RsMsgSerialiser();
|
_serialiser = new RsMsgSerialiser(); // this serialiser is used for services. It's not the same than the one returned by setupSerialiser(). We need both!!
|
||||||
addSerialType(_serialiser);
|
addSerialType(_serialiser);
|
||||||
|
|
||||||
|
mMsgUniqueId = RSRandom::random_u32() ; // better than time(NULL). We don't need crypto-safe random here. Just something much likely
|
||||||
|
// different from what friends use.
|
||||||
|
|
||||||
mShouldEnableDistantMessaging = true ;
|
mShouldEnableDistantMessaging = true ;
|
||||||
mDistantMessagingEnabled = false ;
|
mDistantMessagingEnabled = false ;
|
||||||
mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ;
|
mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ;
|
||||||
@ -454,6 +457,10 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
|
|||||||
grmap->ongoing_msgs = _ongoing_messages ;
|
grmap->ongoing_msgs = _ongoing_messages ;
|
||||||
|
|
||||||
itemList.push_back(grmap) ;
|
itemList.push_back(grmap) ;
|
||||||
|
|
||||||
|
RsMsgDistantMessagesHashMap *ghm = new RsMsgDistantMessagesHashMap ;
|
||||||
|
ghm->hash_map = mRecentlyReceivedDistantMessageHashes ;
|
||||||
|
itemList.push_back(ghm) ;
|
||||||
|
|
||||||
RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ;
|
RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ;
|
||||||
RsTlvKeyValue kv;
|
RsTlvKeyValue kv;
|
||||||
@ -476,7 +483,7 @@ void p3MsgService::saveDone()
|
|||||||
mMsgMtx.unlock();
|
mMsgMtx.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
RsSerialiser* p3MsgService::setupSerialiser()
|
RsSerialiser* p3MsgService::setupSerialiser() // this serialiser is used for config. So it adds somemore info in the serialised items
|
||||||
{
|
{
|
||||||
RsSerialiser *rss = new RsSerialiser ;
|
RsSerialiser *rss = new RsSerialiser ;
|
||||||
|
|
||||||
@ -535,7 +542,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||||||
RsMsgSrcId* msi;
|
RsMsgSrcId* msi;
|
||||||
RsMsgParentId* msp;
|
RsMsgParentId* msp;
|
||||||
RsMsgGRouterMap* grm;
|
RsMsgGRouterMap* grm;
|
||||||
// RsPublicMsgInviteConfigItem* msv;
|
RsMsgDistantMessagesHashMap *ghm;
|
||||||
|
|
||||||
std::list<RsMsgItem*> items;
|
std::list<RsMsgItem*> items;
|
||||||
std::list<RsItem*>::iterator it;
|
std::list<RsItem*>::iterator it;
|
||||||
@ -543,6 +550,8 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||||||
std::map<uint32_t, RsPeerId> srcIdMsgMap;
|
std::map<uint32_t, RsPeerId> srcIdMsgMap;
|
||||||
std::map<uint32_t, RsPeerId>::iterator srcIt;
|
std::map<uint32_t, RsPeerId>::iterator srcIt;
|
||||||
|
|
||||||
|
uint32_t max_msg_id = 0 ;
|
||||||
|
|
||||||
// load items and calculate next unique msgId
|
// load items and calculate next unique msgId
|
||||||
for(it = load.begin(); it != load.end(); ++it)
|
for(it = load.begin(); it != load.end(); ++it)
|
||||||
{
|
{
|
||||||
@ -550,9 +559,9 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||||||
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
|
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
|
||||||
{
|
{
|
||||||
/* STORE MsgID */
|
/* STORE MsgID */
|
||||||
if (mitem->msgId >= mMsgUniqueId) {
|
if (mitem->msgId > max_msg_id)
|
||||||
mMsgUniqueId = mitem->msgId + 1;
|
max_msg_id = mitem->msgId ;
|
||||||
}
|
|
||||||
items.push_back(mitem);
|
items.push_back(mitem);
|
||||||
}
|
}
|
||||||
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
|
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
|
||||||
@ -561,6 +570,10 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||||||
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it)
|
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it)
|
||||||
_ongoing_messages.insert(*it) ;
|
_ongoing_messages.insert(*it) ;
|
||||||
}
|
}
|
||||||
|
else if(NULL != (ghm = dynamic_cast<RsMsgDistantMessagesHashMap*>(*it)))
|
||||||
|
{
|
||||||
|
mRecentlyReceivedDistantMessageHashes = ghm->hash_map ;
|
||||||
|
}
|
||||||
else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it)))
|
else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it)))
|
||||||
{
|
{
|
||||||
// delete standard tags as they are now save in config
|
// delete standard tags as they are now save in config
|
||||||
@ -627,6 +640,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
|
|||||||
continue ;
|
continue ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
mMsgUniqueId = max_msg_id + 1; // make it unique with respect to what was loaded. Not totally safe, but works 99.9999% of the cases.
|
||||||
load.clear() ;
|
load.clear() ;
|
||||||
|
|
||||||
// sort items into lists
|
// sort items into lists
|
||||||
@ -1804,44 +1818,67 @@ void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t d
|
|||||||
{
|
{
|
||||||
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
|
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED)
|
||||||
{
|
{
|
||||||
std::cerr << __PRETTY_FUNCTION__ << ": Not fully implemented. The global router fails to send apacket, but we don't deal with it. Please remind the devs to do it" << std::endl;
|
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||||
return ;
|
|
||||||
|
std::cerr << "(WW) p3MsgService::notifyDataStatus: Global router tells us that item ID " << id << " could not be delivered on time." ;
|
||||||
|
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
|
||||||
|
|
||||||
|
if(it == _ongoing_messages.end())
|
||||||
|
{
|
||||||
|
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
uint32_t msg_id = it->second ;
|
||||||
|
std::cerr << " message id = " << msg_id << std::endl;
|
||||||
|
|
||||||
|
std::map<uint32_t,RsMsgItem*>::iterator mit = msgOutgoing.find(msg_id) ;
|
||||||
|
|
||||||
|
if(mit == msgOutgoing.end())
|
||||||
|
{
|
||||||
|
std::cerr << " (EE) message has been notified as not delivered, but it not on outgoing list. Something's wrong!!" << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
std::cerr << " reseting the ROUTED flag so that the message is requested again" << std::endl;
|
||||||
|
|
||||||
|
mit->second->msgFlags &= ~RS_MSG_FLAGS_ROUTED ; // clear the routed flag so that the message is requested again
|
||||||
|
return ;
|
||||||
}
|
}
|
||||||
if(data_status != GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED)
|
|
||||||
|
if(data_status == GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED)
|
||||||
{
|
{
|
||||||
std::cerr << "p3MsgService: unhandled data status info from global router for msg ID " << id << ": this is a bug." << std::endl;
|
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
|
||||||
return ;
|
|
||||||
}
|
|
||||||
|
|
||||||
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
|
|
||||||
#ifdef DEBUG_DISTANT_MSG
|
#ifdef DEBUG_DISTANT_MSG
|
||||||
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
|
std::cerr << "p3MsgService::acknowledgeDataReceived(): acknowledging data received for msg propagation id " << id << std::endl;
|
||||||
#endif
|
#endif
|
||||||
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
|
std::map<GRouterMsgPropagationId,uint32_t>::iterator it = _ongoing_messages.find(id) ;
|
||||||
|
|
||||||
if(it == _ongoing_messages.end())
|
if(it == _ongoing_messages.end())
|
||||||
{
|
{
|
||||||
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
|
std::cerr << " (EE) cannot find pending message to acknowledge. Weird. grouter id = " << id << std::endl;
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t msg_id = it->second ;
|
uint32_t msg_id = it->second ;
|
||||||
|
|
||||||
// we should now remove the item from the msgOutgoing list.
|
// we should now remove the item from the msgOutgoing list.
|
||||||
|
|
||||||
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id) ;
|
|
||||||
|
|
||||||
if(it2 == msgOutgoing.end())
|
std::map<uint32_t,RsMsgItem*>::iterator it2 = msgOutgoing.find(msg_id) ;
|
||||||
{
|
|
||||||
std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl;
|
|
||||||
return ;
|
|
||||||
}
|
|
||||||
|
|
||||||
delete it2->second ;
|
if(it2 == msgOutgoing.end())
|
||||||
msgOutgoing.erase(it2) ;
|
{
|
||||||
|
std::cerr << "(EE) message has been ACKed, but is not in outgoing list. Something's wrong!!" << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
|
delete it2->second ;
|
||||||
IndicateConfigChanged() ;
|
msgOutgoing.erase(it2) ;
|
||||||
|
|
||||||
|
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
|
||||||
|
IndicateConfigChanged() ;
|
||||||
|
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
std::cerr << "p3MsgService: unhandled data status info from global router for msg ID " << id << ": this is a bug." << std::endl;
|
||||||
}
|
}
|
||||||
bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id)
|
bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id)
|
||||||
{
|
{
|
||||||
@ -1873,8 +1910,19 @@ void p3MsgService::receiveGRouterData(const RsGxsId& destination_key, const RsGx
|
|||||||
{
|
{
|
||||||
std::cerr << "p3MsgService::receiveGRouterData(): received message item of size " << data_size << ", for key " << destination_key << std::endl;
|
std::cerr << "p3MsgService::receiveGRouterData(): received message item of size " << data_size << ", for key " << destination_key << std::endl;
|
||||||
|
|
||||||
|
// first make sure that we havn't already received the data. Since we allow to re-send messages, it's necessary to check.
|
||||||
|
|
||||||
|
Sha1CheckSum hash = RsDirUtil::sha1sum(data,data_size) ;
|
||||||
|
|
||||||
|
if(mRecentlyReceivedDistantMessageHashes.find(hash) != mRecentlyReceivedDistantMessageHashes.end())
|
||||||
|
{
|
||||||
|
std::cerr << "(WW) receiving distant message of hash " << hash << " more than once. This is not a bug, unless it happens very often." << std::endl;
|
||||||
|
free(data) ;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
mRecentlyReceivedDistantMessageHashes[hash] = time(NULL) ;
|
||||||
|
|
||||||
RsItem *item = _serialiser->deserialise(data,&data_size) ;
|
RsItem *item = _serialiser->deserialise(data,&data_size) ;
|
||||||
|
|
||||||
free(data) ;
|
free(data) ;
|
||||||
|
|
||||||
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ;
|
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ;
|
||||||
@ -1883,9 +1931,9 @@ void p3MsgService::receiveGRouterData(const RsGxsId& destination_key, const RsGx
|
|||||||
{
|
{
|
||||||
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
|
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
|
||||||
|
|
||||||
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ;
|
msg_item->msgFlags |= RS_MSG_FLAGS_DISTANT ;
|
||||||
/* we expect complete msgs - remove partial flag just in case someone has funny ideas */
|
/* we expect complete msgs - remove partial flag just in case someone has funny ideas */
|
||||||
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
|
msg_item->msgFlags &= ~RS_MSG_FLAGS_PARTIAL;
|
||||||
|
|
||||||
msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id.
|
msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id.
|
||||||
handleIncomingItem(msg_item) ;
|
handleIncomingItem(msg_item) ;
|
||||||
@ -1921,12 +1969,12 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
|
|||||||
std::cerr << " signing : " << signing_key_id << std::endl;
|
std::cerr << " signing : " << signing_key_id << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// The item is serialized and turned into a generic turtle item.
|
// The item is serialized and turned into a generic turtle item. Use use the explicit serialiser to make sure that the msgId is not included
|
||||||
|
|
||||||
uint32_t msg_serialized_rssize = _serialiser->size(msgitem) ;
|
uint32_t msg_serialized_rssize = msgitem->serial_size(false) ;
|
||||||
unsigned char *msg_serialized_data = new unsigned char[msg_serialized_rssize] ;
|
unsigned char *msg_serialized_data = new unsigned char[msg_serialized_rssize] ;
|
||||||
|
|
||||||
if(!_serialiser->serialise(msgitem,msg_serialized_data,&msg_serialized_rssize))
|
if(!msgitem->serialise(msg_serialized_data,msg_serialized_rssize,false))
|
||||||
{
|
{
|
||||||
std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error." << std::endl;
|
std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error." << std::endl;
|
||||||
delete[] msg_serialized_data ;
|
delete[] msg_serialized_data ;
|
||||||
|
@ -194,6 +194,7 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
|
|||||||
std::map<uint32_t, RsMsgTags*> mMsgTags;
|
std::map<uint32_t, RsMsgTags*> mMsgTags;
|
||||||
|
|
||||||
uint32_t mMsgUniqueId;
|
uint32_t mMsgUniqueId;
|
||||||
|
std::map<Sha1CheckSum,uint32_t> mRecentlyReceivedDistantMessageHashes;
|
||||||
|
|
||||||
// used delete msgSrcIds after config save
|
// used delete msgSrcIds after config save
|
||||||
std::map<uint32_t, RsMsgSrcId*> mSrcIds;
|
std::map<uint32_t, RsMsgSrcId*> mSrcIds;
|
||||||
|
Loading…
Reference in New Issue
Block a user