diff --git a/libretroshare/src/serialiser/rsmsgitems.cc b/libretroshare/src/serialiser/rsmsgitems.cc index 906fff6ef..69c1028ee 100644 --- a/libretroshare/src/serialiser/rsmsgitems.cc +++ b/libretroshare/src/serialiser/rsmsgitems.cc @@ -1023,12 +1023,14 @@ RsMsgDistantMessagesHashMap* RsMsgSerialiser::deserialiseMsgDistantMessageHashMa if (offset != rssize) { /* error */ + std::cerr << "(EE) size error in packet deserialisation: p3MsgItem, subtype " << getRsItemSubType(rstype) << ". offset=" << offset << " != rssize=" << rssize << std::endl; delete item; return NULL; } if (!ok) { + std::cerr << "(EE) size error in packet deserialisation: p3MsgItem, subtype " << getRsItemSubType(rstype) << std::endl; delete item; return NULL; } @@ -1177,27 +1179,14 @@ RsItem* RsMsgSerialiser::deserialise(void *data, uint32_t *pktsize) switch(getRsItemSubType(rstype)) { - case RS_PKT_SUBTYPE_DEFAULT: - return deserialiseMsgItem(data, pktsize); - break; - case RS_PKT_SUBTYPE_MSG_SRC_TAG: - return deserialiseMsgSrcIdItem(data, pktsize); - break; - case RS_PKT_SUBTYPE_MSG_PARENT_TAG: - return deserialiseMsgParentIdItem(data, pktsize); - break; - case RS_PKT_SUBTYPE_MSG_TAG_TYPE: - return deserialiseTagItem(data, pktsize); - break; - case RS_PKT_SUBTYPE_MSG_INVITE: - return deserialisePublicMsgInviteConfigItem(data, pktsize); - break; - case RS_PKT_SUBTYPE_MSG_TAGS: - return deserialiseMsgTagItem(data, pktsize); - break; - case RS_PKT_SUBTYPE_MSG_GROUTER_MAP: - return deserialiseMsgGRouterMap(data, pktsize); - break; + case RS_PKT_SUBTYPE_DEFAULT: return deserialiseMsgItem(data, pktsize); + case RS_PKT_SUBTYPE_MSG_SRC_TAG: return deserialiseMsgSrcIdItem(data, pktsize); + case RS_PKT_SUBTYPE_MSG_PARENT_TAG: return deserialiseMsgParentIdItem(data, pktsize); + case RS_PKT_SUBTYPE_MSG_TAG_TYPE: return deserialiseTagItem(data, pktsize); + case RS_PKT_SUBTYPE_MSG_INVITE: return deserialisePublicMsgInviteConfigItem(data, pktsize); + case RS_PKT_SUBTYPE_MSG_TAGS: return deserialiseMsgTagItem(data, pktsize); + case RS_PKT_SUBTYPE_MSG_GROUTER_MAP: return deserialiseMsgGRouterMap(data, pktsize); + case RS_PKT_SUBTYPE_MSG_DISTANT_MSG_MAP: return deserialiseMsgDistantMessageHashMap(data, pktsize); default: return NULL; break; diff --git a/libretroshare/src/services/p3msgservice.cc b/libretroshare/src/services/p3msgservice.cc index 6ded2bcc6..a01e49340 100644 --- a/libretroshare/src/services/p3msgservice.cc +++ b/libretroshare/src/services/p3msgservice.cc @@ -51,7 +51,9 @@ #include "util/rsstring.h" #include "util/radix64.h" #include "util/rsrandom.h" +#include "util/rsmemory.h" #include "util/rsprint.h" +#include "util/rsthreads.h" #include #include @@ -67,6 +69,8 @@ using namespace Rs::Msgs; const int msgservicezone = 54319; +static const uint32_t RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME = 2*30*86400 ; // keep msg hashes for 2 months to avoid re-sent msgs + /* Another little hack ..... unique message Ids * will be handled in this class..... * These are unique within this run of the server, @@ -78,23 +82,22 @@ const int msgservicezone = 54319; * (3) from storage... */ - p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv) :p3Service(), p3Config(), mIdService(id_serv), mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0) { _serialiser = new RsMsgSerialiser(); // this serialiser is used for services. It's not the same than the one returned by setupSerialiser(). We need both!! addSerialType(_serialiser); - mMsgUniqueId = RSRandom::random_u32() ; // better than time(NULL). We don't need crypto-safe random here. Just something much likely - // different from what friends use. + mMsgUniqueId = 1 ; // MsgIds are not transmitted, but only used locally as a storage index. As such, thay do not need to be different + // at friends nodes. mShouldEnableDistantMessaging = true ; mDistantMessagingEnabled = false ; mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ; - /* Initialize standard tag types */ - if(sc) - initStandardTagTypes(); + /* Initialize standard tag types */ + if(sc) + initStandardTagTypes(); } @@ -117,7 +120,7 @@ RsServiceInfo p3MsgService::getServiceInfo() uint32_t p3MsgService::getNewUniqueMsgId() { - RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/ + RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ return mMsgUniqueId++; } @@ -139,6 +142,7 @@ int p3MsgService::tick() { manageDistantPeers() ; checkOutgoingMessages(); + cleanListOfReceivedMessageHashes(); last_management_time = now ; } @@ -146,6 +150,24 @@ int p3MsgService::tick() return 0; } +void p3MsgService::cleanListOfReceivedMessageHashes() +{ + RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/ + + time_t now = time(NULL) ; + + for(std::map::iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();) + if(now > RS_MSG_DISTANT_MESSAGE_HASH_KEEP_TIME + it->second) + { + std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(). Removing old hash " << it->first << ", aged " << now - it->second << " secs ago" << std::endl; + std::map::iterator tmp(it) ; + ++tmp ; + mRecentlyReceivedDistantMessageHashes.erase(it) ; + it=tmp ; + } + else + ++it ; +} int p3MsgService::status() { @@ -332,9 +354,6 @@ int p3MsgService::checkOutgoingMessages() * if online, send */ - static const uint32_t OLD_MESSAGE_FLUSHING_DELAY = 86400*7 ; // re-send old messages every week. This mainly ensures that - // messages that where never sent get sent at some point. - time_t now = time(NULL); bool changed = false ; std::list output_queue ; @@ -363,18 +382,9 @@ int p3MsgService::checkOutgoingMessages() if( mServiceCtrl->isPeerConnected(getServiceInfo().mServiceType, pid) ) /* FEEDBACK Msg to Ourselves */ should_send = true ; - if (mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) - { - if(!(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED)) + if((mit->second->msgFlags & RS_MSG_FLAGS_DISTANT) && !(mit->second->msgFlags & RS_MSG_FLAGS_ROUTED)) should_send = true ; - // if(mit->second->sendTime + OLD_MESSAGE_FLUSHING_DELAY < now) - //{ - // should_send = true ; - // mit->second->sendTime = now; - // } - } - if(should_send) { /* send msg */ @@ -595,6 +605,13 @@ bool p3MsgService::loadList(std::list& load) else if(NULL != (ghm = dynamic_cast(*it))) { mRecentlyReceivedDistantMessageHashes = ghm->hash_map ; + +#ifdef DEBUG_DISTANT_MSG + std::cerr << " loaded recently received message map: " << std::endl; + + for(std::map::const_iterator it(mRecentlyReceivedDistantMessageHashes.begin());it!=mRecentlyReceivedDistantMessageHashes.end();++it) + std::cerr << " " << it->first << " received " << time(NULL)-it->second << " secs ago." << std::endl; +#endif } else if(NULL != (mtt = dynamic_cast(*it))) { @@ -1820,7 +1837,7 @@ void p3MsgService::manageDistantPeers() { #ifdef DEBUG_DISTANT_MSG for(std::list::const_iterator it(own_id_list.begin());it!=own_id_list.end();++it) - std::cerr << (b?"Enabling":"Disabling") << " distant messaging, with peer id = " << *it << std::endl; + std::cerr << (mShouldEnableDistantMessaging?"Enabling":"Disabling") << " distant messaging, with peer id = " << *it << std::endl; #endif for(std::list::const_iterator it(own_id_list.begin());it!=own_id_list.end();++it) @@ -1943,6 +1960,7 @@ void p3MsgService::receiveGRouterData(const RsGxsId& destination_key, const RsGx return ; } mRecentlyReceivedDistantMessageHashes[hash] = time(NULL) ; + IndicateConfigChanged() ; RsItem *item = _serialiser->deserialise(data,&data_size) ; free(data) ; @@ -1969,7 +1987,8 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem) RsGxsId destination_key_id(msgitem->PeerId()) ; RsGxsId signing_key_id ; - msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT ; + msgitem->msgFlags |= RS_MSG_FLAGS_DISTANT ;// just in case, but normally we should always have this flag set, when ending up here. + { RS_STACK_MUTEX(mMsgMtx) ; @@ -1994,12 +2013,11 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem) // The item is serialized and turned into a generic turtle item. Use use the explicit serialiser to make sure that the msgId is not included uint32_t msg_serialized_rssize = msgitem->serial_size(false) ; - unsigned char *msg_serialized_data = new unsigned char[msg_serialized_rssize] ; + RsTemporaryMemory msg_serialized_data(msg_serialized_rssize) ; if(!msgitem->serialise(msg_serialized_data,msg_serialized_rssize,false)) { std::cerr << "(EE) p3MsgService::sendTurtleData(): Serialization error." << std::endl; - delete[] msg_serialized_data ; return ; } #ifdef DEBUG_DISTANT_MSG @@ -2009,8 +2027,6 @@ void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem) GRouterMsgPropagationId grouter_message_id ; mGRouter->sendData(destination_key_id,GROUTER_CLIENT_ID_MESSAGES,msg_serialized_data,msg_serialized_rssize,signing_key_id,grouter_message_id) ; - delete[] msg_serialized_data ; - // now store the grouter id along with the message id, so that we can keep track of received messages { diff --git a/libretroshare/src/services/p3msgservice.h b/libretroshare/src/services/p3msgservice.h index b6be979bc..401fc4e0a 100644 --- a/libretroshare/src/services/p3msgservice.h +++ b/libretroshare/src/services/p3msgservice.h @@ -56,160 +56,161 @@ class p3IdService; // Temp tweak to test grouter class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor, public GRouterClientService { - public: - p3MsgService(p3ServiceControl *sc, p3IdService *id_service); - virtual RsServiceInfo getServiceInfo(); +public: + p3MsgService(p3ServiceControl *sc, p3IdService *id_service); + virtual RsServiceInfo getServiceInfo(); - /* External Interface */ - bool getMessageSummaries(std::list &msgList); - bool getMessage(const std::string &mid, Rs::Msgs::MessageInfo &msg); - void getMessageCount(unsigned int *pnInbox, unsigned int *pnInboxNew, unsigned int *pnOutbox, unsigned int *pnDraftbox, unsigned int *pnSentbox, unsigned int *pnTrashbox); + /* External Interface */ + bool getMessageSummaries(std::list &msgList); + bool getMessage(const std::string &mid, Rs::Msgs::MessageInfo &msg); + void getMessageCount(unsigned int *pnInbox, unsigned int *pnInboxNew, unsigned int *pnOutbox, unsigned int *pnDraftbox, unsigned int *pnSentbox, unsigned int *pnTrashbox); - bool decryptMessage(const std::string& mid) ; - bool removeMsgId(const std::string &mid); - bool markMsgIdRead(const std::string &mid, bool bUnreadByUser); - bool setMsgFlag(const std::string &mid, uint32_t flag, uint32_t mask); - bool getMsgParentId(const std::string &msgId, std::string &msgParentId); - // msgParentId == 0 --> remove - bool setMsgParentId(uint32_t msgId, uint32_t msgParentId); + bool decryptMessage(const std::string& mid) ; + bool removeMsgId(const std::string &mid); + bool markMsgIdRead(const std::string &mid, bool bUnreadByUser); + bool setMsgFlag(const std::string &mid, uint32_t flag, uint32_t mask); + bool getMsgParentId(const std::string &msgId, std::string &msgParentId); + // msgParentId == 0 --> remove + bool setMsgParentId(uint32_t msgId, uint32_t msgParentId); - bool MessageSend(Rs::Msgs::MessageInfo &info); - bool SystemMessage(const std::string &title, const std::string &message, uint32_t systemFlag); - bool MessageToDraft(Rs::Msgs::MessageInfo &info, const std::string &msgParentId); - bool MessageToTrash(const std::string &mid, bool bTrash); + bool MessageSend(Rs::Msgs::MessageInfo &info); + bool SystemMessage(const std::string &title, const std::string &message, uint32_t systemFlag); + bool MessageToDraft(Rs::Msgs::MessageInfo &info, const std::string &msgParentId); + bool MessageToTrash(const std::string &mid, bool bTrash); - bool getMessageTagTypes(Rs::Msgs::MsgTagType& tags); - bool setMessageTagType(uint32_t tagId, std::string& text, uint32_t rgb_color); - bool removeMessageTagType(uint32_t tagId); + bool getMessageTagTypes(Rs::Msgs::MsgTagType& tags); + bool setMessageTagType(uint32_t tagId, std::string& text, uint32_t rgb_color); + bool removeMessageTagType(uint32_t tagId); - bool getMessageTag(const std::string &msgId, Rs::Msgs::MsgTagInfo& info); - /* set == false && tagId == 0 --> remove all */ - bool setMessageTag(const std::string &msgId, uint32_t tagId, bool set); + bool getMessageTag(const std::string &msgId, Rs::Msgs::MsgTagInfo& info); + /* set == false && tagId == 0 --> remove all */ + bool setMessageTag(const std::string &msgId, uint32_t tagId, bool set); - bool resetMessageStandardTagTypes(Rs::Msgs::MsgTagType& tags); + bool resetMessageStandardTagTypes(Rs::Msgs::MsgTagType& tags); - void loadWelcomeMsg(); /* startup message */ + void loadWelcomeMsg(); /* startup message */ - - //std::list &getMsgList(); - //std::list &getMsgOutList(); - int tick(); - int status(); + //std::list &getMsgList(); + //std::list &getMsgOutList(); - /*** Overloaded from p3Config ****/ - virtual RsSerialiser *setupSerialiser(); - virtual bool saveList(bool& cleanup, std::list&); - virtual bool loadList(std::list& load); - virtual void saveDone(); - /*** Overloaded from p3Config ****/ + int tick(); + int status(); - /*** Overloaded from pqiMonitor ***/ - virtual void statusChange(const std::list &plist); - int checkOutgoingMessages(); - /*** Overloaded from pqiMonitor ***/ + /*** Overloaded from p3Config ****/ + virtual RsSerialiser *setupSerialiser(); + virtual bool saveList(bool& cleanup, std::list&); + virtual bool loadList(std::list& load); + virtual void saveDone(); + /*** Overloaded from p3Config ****/ - /*** overloaded from p3turtle ***/ + /*** Overloaded from pqiMonitor ***/ + virtual void statusChange(const std::list &plist); + int checkOutgoingMessages(); + /*** Overloaded from pqiMonitor ***/ - virtual void connectToGlobalRouter(p3GRouter *) ; + /*** overloaded from p3turtle ***/ - struct DistantMessengingInvite - { - time_t time_of_validity ; - }; - struct DistantMessengingContact - { - time_t last_hit_time ; - RsPeerId virtual_peer_id ; - uint32_t status ; - bool pending_messages ; - }; - void enableDistantMessaging(bool b) ; - bool distantMessagingEnabled() ; - - void setDistantMessagingPermissionFlags(uint32_t flags) ; - uint32_t getDistantMessagingPermissionFlags() ; + virtual void connectToGlobalRouter(p3GRouter *) ; - private: - void sendDistantMsgItem(RsMsgItem *msgitem) ; + struct DistantMessengingInvite + { + time_t time_of_validity ; + }; + struct DistantMessengingContact + { + time_t last_hit_time ; + RsPeerId virtual_peer_id ; + uint32_t status ; + bool pending_messages ; + }; + void enableDistantMessaging(bool b) ; + bool distantMessagingEnabled() ; - // This contains the ongoing tunnel handling contacts. - // The map is indexed by the hash - // - std::map _ongoing_messages ; + void setDistantMessagingPermissionFlags(uint32_t flags) ; + uint32_t getDistantMessagingPermissionFlags() ; - // Overloaded from GRouterClientService +private: + void sendDistantMsgItem(RsMsgItem *msgitem) ; - virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ; - virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ; - virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,uint32_t data_status) ; + // This contains the ongoing tunnel handling contacts. + // The map is indexed by the hash + // + std::map _ongoing_messages ; - // Utility functions + // Overloaded from GRouterClientService - bool createDistantMessage(const RsGxsId& destination_gxs_id,const RsGxsId& source_gxs_id,RsMsgItem *msg) ; - bool locked_findHashForVirtualPeerId(const RsPeerId& pid,Sha1CheckSum& hash) ; - void sendGRouterData(const RsGxsId &key_id,RsMsgItem *) ; + virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ; + virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ; + virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,uint32_t data_status) ; - void manageDistantPeers() ; + // Utility functions - void handleIncomingItem(RsMsgItem *) ; + bool createDistantMessage(const RsGxsId& destination_gxs_id,const RsGxsId& source_gxs_id,RsMsgItem *msg) ; + bool locked_findHashForVirtualPeerId(const RsPeerId& pid,Sha1CheckSum& hash) ; + void sendGRouterData(const RsGxsId &key_id,RsMsgItem *) ; - uint32_t getNewUniqueMsgId(); - uint32_t sendMessage(RsMsgItem *item); - uint32_t sendDistantMessage(RsMsgItem *item,const RsGxsId& signing_gxs_id); - void checkSizeAndSendMessage(RsMsgItem *msg); + void manageDistantPeers() ; - int incomingMsgs(); - void processIncomingMsg(RsMsgItem *mi) ; - bool checkAndRebuildPartialMessage(RsMsgItem*) ; + void handleIncomingItem(RsMsgItem *) ; - void initRsMI(RsMsgItem *msg, Rs::Msgs::MessageInfo &mi); - void initRsMIS(RsMsgItem *msg, Rs::Msgs::MsgInfoSummary &mis); + uint32_t getNewUniqueMsgId(); + uint32_t sendMessage(RsMsgItem *item); + uint32_t sendDistantMessage(RsMsgItem *item,const RsGxsId& signing_gxs_id); + void checkSizeAndSendMessage(RsMsgItem *msg); + void cleanListOfReceivedMessageHashes(); - RsMsgItem *initMIRsMsg(const Rs::Msgs::MessageInfo &info, const RsPeerId& to); - RsMsgItem *initMIRsMsg(const Rs::Msgs::MessageInfo &info, const RsGxsId& to); - void initMIRsMsg(RsMsgItem *item,const Rs::Msgs::MessageInfo &info) ; + int incomingMsgs(); + void processIncomingMsg(RsMsgItem *mi) ; + bool checkAndRebuildPartialMessage(RsMsgItem*) ; - void initStandardTagTypes(); + void initRsMI(RsMsgItem *msg, Rs::Msgs::MessageInfo &mi); + void initRsMIS(RsMsgItem *msg, Rs::Msgs::MsgInfoSummary &mis); - p3IdService *mIdService ; - p3ServiceControl *mServiceCtrl; - p3GRouter *mGRouter ; + RsMsgItem *initMIRsMsg(const Rs::Msgs::MessageInfo &info, const RsPeerId& to); + RsMsgItem *initMIRsMsg(const Rs::Msgs::MessageInfo &info, const RsGxsId& to); + void initMIRsMsg(RsMsgItem *item,const Rs::Msgs::MessageInfo &info) ; - /* Mutex Required for stuff below */ + void initStandardTagTypes(); - RsMutex mMsgMtx; - RsMsgSerialiser *_serialiser ; + p3IdService *mIdService ; + p3ServiceControl *mServiceCtrl; + p3GRouter *mGRouter ; - /* stored list of messages */ - std::map imsg; - /* ones that haven't made it out yet! */ - std::map msgOutgoing; + /* Mutex Required for stuff below */ - std::map _pendingPartialMessages ; + RsMutex mMsgMtx; + RsMsgSerialiser *_serialiser ; - /* maps for tags types and msg tags */ + /* stored list of messages */ + std::map imsg; + /* ones that haven't made it out yet! */ + std::map msgOutgoing; - std::map mTags; - std::map mMsgTags; + std::map _pendingPartialMessages ; - uint32_t mMsgUniqueId; - std::map mRecentlyReceivedDistantMessageHashes; + /* maps for tags types and msg tags */ - // used delete msgSrcIds after config save - std::map mSrcIds; + std::map mTags; + std::map mMsgTags; - // temporary storage. Will not be needed when messages have a proper "from" field. Not saved! - std::map mDistantOutgoingMsgSigners; + uint32_t mMsgUniqueId; + std::map mRecentlyReceivedDistantMessageHashes; - // save the parent of the messages in draft for replied and forwarded - std::map mParentId; + // used delete msgSrcIds after config save + std::map mSrcIds; - std::string config_dir; + // temporary storage. Will not be needed when messages have a proper "from" field. Not saved! + std::map mDistantOutgoingMsgSigners; - bool mDistantMessagingEnabled ; - uint32_t mDistantMessagePermissions ; - bool mShouldEnableDistantMessaging ; + // save the parent of the messages in draft for replied and forwarded + std::map mParentId; + + std::string config_dir; + + bool mDistantMessagingEnabled ; + uint32_t mDistantMessagePermissions ; + bool mShouldEnableDistantMessaging ; }; #endif // MESSAGE_SERVICE_HEADER