From 4c89641d3eb37f179d7fe671797fe2c7e6e63ed4 Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Mon, 27 Feb 2017 22:18:37 +0100 Subject: [PATCH] p3ChatService support async distant chat via Gxs To implement async distant chat p3ChatService use p3GxsMails in a similar way that has been done with p3MsgService tought as p3ChatService was not thinked for async comunication in the first place the result is quite clumsy. A proper chat service should be rewritten from scratch in the near future, with proper chat history and other desiderables features. deprecated empty p3ChatService::locked_storeIncomingMsg(...) --- libretroshare/src/chat/p3chatservice.cc | 446 +++++++++++------- libretroshare/src/chat/p3chatservice.h | 47 +- libretroshare/src/chat/rschatitems.h | 161 ++++++- libretroshare/src/rsserver/rsinit.cc | 3 +- libretroshare/src/serialiser/rsgxsmailitems.h | 7 +- 5 files changed, 466 insertions(+), 198 deletions(-) diff --git a/libretroshare/src/chat/p3chatservice.cc b/libretroshare/src/chat/p3chatservice.cc index eb421ac31..d85ecc7e2 100644 --- a/libretroshare/src/chat/p3chatservice.cc +++ b/libretroshare/src/chat/p3chatservice.cc @@ -53,40 +53,30 @@ static const uint32_t MAX_AVATAR_JPEG_SIZE = 32767; // Maximum size // don't transfer correctly and can kill the system. // Images are 96x96, which makes approx. 27000 bytes uncompressed. -p3ChatService::p3ChatService(p3ServiceControl *sc,p3IdService *pids, p3LinkMgr *lm, p3HistoryMgr *historyMgr) - : DistributedChatService(getServiceInfo().mServiceType,sc,historyMgr,pids), mChatMtx("p3ChatService"),mServiceCtrl(sc), mLinkMgr(lm) , mHistoryMgr(historyMgr) +p3ChatService::p3ChatService( p3ServiceControl *sc, p3IdService *pids, + p3LinkMgr *lm, p3HistoryMgr *historyMgr, + p3GxsMails& gxsMailService ) : + DistributedChatService(getServiceInfo().mServiceType, sc, historyMgr,pids), + mChatMtx("p3ChatService"), mServiceCtrl(sc), mLinkMgr(lm), + mHistoryMgr(historyMgr), _own_avatar(NULL), + _serializer(new RsChatSerialiser()), mGxsTransport(gxsMailService), + recentlyReceivedMutex("p3ChatService recently received mutex") { - _serializer = new RsChatSerialiser() ; - - _own_avatar = NULL ; - _custom_status_string = "" ; - - addSerialType(_serializer) ; + addSerialType(_serializer); + mGxsTransport.registerGxsMailsClient( GxsMailSubServices::P3_CHAT_SERVICE, + this ); } -const std::string CHAT_APP_NAME = "chat"; -const uint16_t CHAT_APP_MAJOR_VERSION = 1; -const uint16_t CHAT_APP_MINOR_VERSION = 0; -const uint16_t CHAT_MIN_MAJOR_VERSION = 1; -const uint16_t CHAT_MIN_MINOR_VERSION = 0; - RsServiceInfo p3ChatService::getServiceInfo() -{ - return RsServiceInfo(RS_SERVICE_TYPE_CHAT, - CHAT_APP_NAME, - CHAT_APP_MAJOR_VERSION, - CHAT_APP_MINOR_VERSION, - CHAT_MIN_MAJOR_VERSION, - CHAT_MIN_MINOR_VERSION); -} +{ return RsServiceInfo(RS_SERVICE_TYPE_CHAT, "chat", 1, 0, 1, 0); } int p3ChatService::tick() { - if(receivedItems()) - receiveChatQueue(); + if(receivedItems()) receiveChatQueue(); - DistributedChatService::flush() ; - //DistantChatService::flush() ; + DistributedChatService::flush(); + + cleanListOfReceivedMessageHashes(); return 0; } @@ -253,12 +243,12 @@ void p3ChatService::clearChatLobby(const ChatId& id) void p3ChatService::sendChatItem(RsChatItem *item) { - if(DistantChatService::handleOutgoingItem(item)) - return ; + if(DistantChatService::handleOutgoingItem(item)) return; #ifdef CHAT_DEBUG - std::cerr << "p3ChatService::sendChatItem(): sending to " << item->PeerId() << ": interpreted as friend peer id." << std::endl; + std::cerr << "p3ChatService::sendChatItem(): sending to " << item->PeerId() + << ": interpreted as friend peer id." << std::endl; #endif - sendItem(item) ; + sendItem(item); } void p3ChatService::checkSizeAndSendMessage(RsChatMsgItem *msg) @@ -336,28 +326,49 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg) message.incoming = false; message.online = true; - if(!isOnline(vpid)) + if(!isOnline(vpid)) { - /* peer is offline, add to outgoing list */ + message.online = false; + RsServer::notify()->notifyChatMessage(message); + + // use the history to load pending messages to the gui + // this is not very nice, because the user may think the message was send, while it is still in the queue + mHistoryMgr->addMessage(message); + + RsGxsMailId tId = RSRandom::random_u64(); + + if(destination.isDistantChatId()) { - RS_STACK_MUTEX(mChatMtx); - privateOutgoingList.push_back(ci); + DEPMap::const_iterator it = + mDistantGxsMap.find(destination.toDistantChatId()); + if(it != mDistantGxsMap.end()) + { + const DistantEndpoints& de(it->second); + uint32_t sz = ci->serial_size(); + std::vector data; data.resize(sz); + ci->serialise(&data[0], sz); + mGxsTransport.sendMail(tId, GxsMailSubServices::P3_CHAT_SERVICE, + de.from, de.to, &data[0], sz); + } + else + std::cout << "p3ChatService::sendChat(...) can't find distant" + << "chat id in mDistantGxsMap this is unxpected!" + << std::endl; } - message.online = false; - RsServer::notify()->notifyChatMessage(message); + // peer is offline, add to outgoing list + { + RS_STACK_MUTEX(mChatMtx); + privateOutgoingMap.insert(outMP::value_type(tId, ci)); + } - // use the history to load pending messages to the gui - // this is not very nice, because the user may think the message was send, while it is still in the queue - mHistoryMgr->addMessage(message); + IndicateConfigChanged(); + return false; + } - IndicateConfigChanged(); - return false; - } - - { - RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ - std::map::iterator it = _avatars.find(vpid) ; + { + RS_STACK_MUTEX(mChatMtx); + std::map::iterator it = _avatars.find(vpid); if(it == _avatars.end()) { @@ -505,50 +516,50 @@ class MsgCounter void p3ChatService::handleIncomingItem(RsItem *item) { #ifdef CHAT_DEBUG - std::cerr << "p3ChatService::receiveChatQueue() Item:" << (void*)item << std::endl ; + std::cerr << "p3ChatService::receiveChatQueue() Item:" << (void*)item + << std::endl ; #endif // RsChatMsgItems needs dynamic_cast, since they have derived siblings. - // - RsChatMsgItem *ci = dynamic_cast(item) ; - if(ci != NULL) + RsChatMsgItem* ci = dynamic_cast(item); + if(ci) { handleRecvChatMsgItem(ci); - - if(ci) - delete ci ; - return ; // don't delete! It's handled by handleRecvChatMsgItem in some specific cases only. + /* +ci+ deletion is handled by handleRecvChatMsgItem ONLY in some + * specific cases, in case +ci+ has not been handled deleted it here */ + delete ci ; + + return; } -// if(DistantChatService::handleRecvItem(dynamic_cast(item))) -// { -// delete item ; -// return ; -// } - - if(DistributedChatService::handleRecvItem(dynamic_cast(item))) + if(DistributedChatService::handleRecvItem(dynamic_cast(item))) { - delete item ; - return ; + delete item; + return; } switch(item->PacketSubType()) { - case RS_PKT_SUBTYPE_CHAT_STATUS: handleRecvChatStatusItem(dynamic_cast(item)) ; break ; - case RS_PKT_SUBTYPE_CHAT_AVATAR: handleRecvChatAvatarItem(dynamic_cast(item)) ; break ; - default: - { - static int already = false ; - - if(!already) - { - std::cerr << "Unhandled item subtype " << (int)item->PacketSubType() << " in p3ChatService: " << std::endl; - already = true ; - } - } + case RS_PKT_SUBTYPE_CHAT_STATUS: + handleRecvChatStatusItem(dynamic_cast(item)); + break; + case RS_PKT_SUBTYPE_CHAT_AVATAR: + handleRecvChatAvatarItem(dynamic_cast(item)); + break; + default: + { + static int already = false; + if(!already) + { + std::cerr << "Unhandled item subtype " + << static_cast(item->PacketSubType()) + << " in p3ChatService: " << std::endl; + already = true; + } } - delete item ; + } + delete item; } void p3ChatService::handleRecvChatAvatarItem(RsChatAvatarItem *ca) @@ -676,35 +687,105 @@ bool p3ChatService::checkForMessageSecurity(RsChatMsgItem *ci) return true ; } +bool p3ChatService::initiateDistantChatConnexion( + const RsGxsId& to_gxs_id, const RsGxsId& from_gxs_id, + DistantChatPeerId& pid, uint32_t& error_code ) +{ + if(DistantChatService::initiateDistantChatConnexion( to_gxs_id, + from_gxs_id, pid, + error_code )) + { + DistantEndpoints ep; ep.from = from_gxs_id; ep.to = to_gxs_id; + mDistantGxsMap.insert(DEPMap::value_type(pid, ep)); + return true; + } + return false; +} + +bool p3ChatService::receiveGxsMail(const RsGxsMailItem&, const uint8_t* data, uint32_t dataSize) +{ + RsChatMsgItem* item = new RsChatMsgItem( const_cast(data), + dataSize ); + handleRecvChatMsgItem(item); + delete item; + return true; +} + +bool p3ChatService::notifySendMailStatus(const RsGxsMailItem& originalMessage, GxsMailStatus status) +{ + if ( status != GxsMailStatus::RECEIPT_RECEIVED ) return true; + + bool changed = false; + + { + RS_STACK_MUTEX(mChatMtx); + auto it = privateOutgoingMap.find(originalMessage.mailId); + if( it != privateOutgoingMap.end() ) + { + privateOutgoingMap.erase(it); + changed = true; + } + } + + if(changed) + { + RsServer::notify()->notifyListChange( + NOTIFY_LIST_PRIVATE_OUTGOING_CHAT, NOTIFY_TYPE_DEL ); + + IndicateConfigChanged(); + } + + return true; +} + bool p3ChatService::handleRecvChatMsgItem(RsChatMsgItem *& ci) { time_t now = time(NULL); + + { // Check for duplicates + uint32_t sz = ci->serial_size(); + std::vector srz; srz.resize(sz); + ci->serialise(&srz[0], sz); + Sha1CheckSum hash = RsDirUtil::sha1sum(&srz[0], sz); + { + RS_STACK_MUTEX(recentlyReceivedMutex); + if( mRecentlyReceivedMessageHashes.find(hash) != + mRecentlyReceivedMessageHashes.end() ) + { + std::cerr << "p3ChatService::handleRecvChatMsgItem(...) (II) " + << "receiving distant message of hash " << hash + << " more than once. Probably it has arrived before " + << "by other means." << std::endl; + delete ci; ci=NULL; + return true; + } + mRecentlyReceivedMessageHashes[hash] = now; + } + } + std::string name; - uint32_t popupChatFlag = RS_POPUP_CHAT; + uint32_t popupChatFlag = RS_POPUP_CHAT; - { - RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ + { + RS_STACK_MUTEX(mChatMtx); - if(!locked_checkAndRebuildPartialMessage(ci)) // we make sure this call does not take control over the memory - return true ; // message is a subpart of an existing message. So everything ok, but we need to return. - } + // we make sure this call does not take control over the memory + if(!locked_checkAndRebuildPartialMessage(ci)) return true; + /* message is a subpart of an existing message. + * So everything ok, but we need to return. */ + } - // Check for security. This avoids bombing messages, and so on. + // Check for security. This avoids bombing messages, and so on. + if(!checkForMessageSecurity(ci)) return false; - if(!checkForMessageSecurity(ci)) - return false ; - - // If it's a lobby item, we need to bounce it and possibly check for timings etc. - - if(!DistributedChatService::handleRecvChatLobbyMsgItem(ci)) - return false ; + /* If it's a lobby item, we need to bounce it and possibly check for timings + * etc. */ + if(!DistributedChatService::handleRecvChatLobbyMsgItem(ci)) return false; #ifdef CHAT_DEBUG - std::cerr << "p3ChatService::receiveChatQueue() Item:"; - std::cerr << std::endl; + std::cerr << "p3ChatService::receiveChatQueue() Item:" << std::endl; ci->print(std::cerr); - std::cerr << std::endl; - std::cerr << "Got msg. Flags = " << ci->chatFlags << std::endl ; + std::cerr << std::endl << "Got msg. Flags = " << ci->chatFlags << std::endl; #endif // Now treat normal chat stuff such as avatar requests, except for chat lobbies. @@ -1087,6 +1168,25 @@ RsChatStatusItem *p3ChatService::makeOwnCustomStateStringItem() return ci ; } + +void p3ChatService::cleanListOfReceivedMessageHashes() +{ + RS_STACK_MUTEX(recentlyReceivedMutex); + + time_t now = time(NULL); + + for( auto it = mRecentlyReceivedMessageHashes.begin(); + it != mRecentlyReceivedMessageHashes.end(); ) + if( now > RECENTLY_RECEIVED_INTERVAL + it->second ) + { + std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes(" + << "). Removing old hash " << it->first << ", aged " + << now - it->second << " secs ago." << std::endl; + + it = mRecentlyReceivedMessageHashes.erase(it); + } + else ++it; +} RsChatAvatarItem *p3ChatService::makeOwnAvatarItem() { RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ @@ -1100,11 +1200,11 @@ RsChatAvatarItem *p3ChatService::makeOwnAvatarItem() void p3ChatService::sendAvatarJpegData(const RsPeerId& peer_id) { - #ifdef CHAT_DEBUG - std::cerr << "p3chatservice: sending requested for peer " << peer_id << ", data=" << (void*)_own_avatar << std::endl ; - #endif +#ifdef CHAT_DEBUG + std::cerr << "p3chatservice: sending requested for peer " << peer_id << ", data=" << (void*)_own_avatar << std::endl ; +#endif - if(_own_avatar != NULL) + if(_own_avatar != NULL) { RsChatAvatarItem *ci = makeOwnAvatarItem(); ci->PeerId(peer_id); @@ -1144,19 +1244,34 @@ bool p3ChatService::loadList(std::list& load) for(std::list::const_iterator it(load.begin());it!=load.end();++it) { + if(PrivateOugoingMapItem* om=dynamic_cast(*it)) + { + RS_STACK_MUTEX(mChatMtx); + for( auto& pair : om->store ) + { + privateOutgoingMap.insert( + outMP::value_type(pair.first, + new RsChatMsgItem(pair.second)) ); + } + + delete om; continue; + } + + RsChatAvatarItem *ai = NULL ; if(NULL != (ai = dynamic_cast(*it))) { - RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ + RS_STACK_MUTEX(mChatMtx); if(ai->image_size <= MAX_AVATAR_JPEG_SIZE) _own_avatar = new AvatarInfo(ai->image_data,ai->image_size) ; else - std::cerr << "Dropping avatar image, because its size is " << ai->image_size << ", and the maximum allowed size is " << MAX_AVATAR_JPEG_SIZE << std::endl; + std::cerr << "Dropping avatar image, because its size is " + << ai->image_size << ", and the maximum allowed size " + << "is " << MAX_AVATAR_JPEG_SIZE << std::endl; delete *it; - continue; } @@ -1164,40 +1279,42 @@ bool p3ChatService::loadList(std::list& load) if(NULL != (mitem = dynamic_cast(*it))) { - RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ + RS_STACK_MUTEX(mChatMtx); _custom_status_string = mitem->status_string ; delete *it; - continue; } - RsPrivateChatMsgConfigItem *citem = NULL ; - - if(NULL != (citem = dynamic_cast(*it))) + /* TODO: G10h4ck 2017/02/27 this block is kept for retrocompatibility, + * and will be used just first time, to load messages in the old format + * should be removed in the following RS version */ + if( RsPrivateChatMsgConfigItem *citem = + dynamic_cast(*it) ) { - RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ + RS_STACK_MUTEX(mChatMtx); - if (citem->chatFlags & RS_CHAT_FLAG_PRIVATE) { - if (std::find(ssl_peers.begin(), ssl_peers.end(), citem->configPeerId) != ssl_peers.end()) { + if ( citem->chatFlags & RS_CHAT_FLAG_PRIVATE ) + { + if ( std::find(ssl_peers.begin(), ssl_peers.end(), + citem->configPeerId) != ssl_peers.end() ) + { RsChatMsgItem *ci = new RsChatMsgItem(); citem->get(ci); - if (citem->configFlags & RS_CHATMSG_CONFIGFLAG_INCOMING) { + if (citem->configFlags & RS_CHATMSG_CONFIGFLAG_INCOMING) + { locked_storeIncomingMsg(ci); - } else { - privateOutgoingList.push_back(ci); } - } else { - // no friends + else privateOutgoingMap.insert( + outMP::value_type(RSRandom::random_u64(), ci) ); } - } else { - // ignore all other items + else { /* no friends */ } } + else { /* ignore all other items */ } delete *it; - continue; } @@ -1228,21 +1345,18 @@ bool p3ChatService::saveList(bool& cleanup, std::list& list) mChatMtx.lock(); /****** MUTEX LOCKED *******/ + PrivateOugoingMapItem* om = new PrivateOugoingMapItem; + typedef std::map::value_type vT; + for( auto& pair : privateOutgoingMap ) + om->store.insert(vT(pair.first, *pair.second)); + list.push_back(om); + + RsChatStatusItem *di = new RsChatStatusItem ; di->status_string = _custom_status_string ; di->flags = RS_CHAT_FLAG_CUSTOM_STATE ; - list.push_back(di) ; - - /* save outgoing private chat messages */ - std::list::iterator it; - for (it = privateOutgoingList.begin(); it != privateOutgoingList.end(); it++) { - RsPrivateChatMsgConfigItem *ci = new RsPrivateChatMsgConfigItem; - - ci->set(*it, (*it)->PeerId(), 0); - - list.push_back(ci); - } + list.push_back(di); DistributedChatService::addToSaveList(list) ; DistantChatService::addToSaveList(list) ; @@ -1269,54 +1383,51 @@ RsSerialiser *p3ChatService::setupSerialiser() void p3ChatService::statusChange(const std::list &plist) { - std::list::const_iterator it; - for (it = plist.begin(); it != plist.end(); ++it) { + for (auto it = plist.cbegin(); it != plist.cend(); ++it) + { if (it->actions & RS_SERVICE_PEER_CONNECTED) { /* send the saved outgoing messages */ bool changed = false; - std::vector to_send ; + std::vector to_send; - if (privateOutgoingList.size()) { - RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ + RS_STACK_MUTEX(mChatMtx); - RsPeerId ownId = mServiceCtrl->getOwnId(); - - std::list::iterator cit = privateOutgoingList.begin(); - while (cit != privateOutgoingList.end()) { - RsChatMsgItem *c = *cit; - - if (c->PeerId() == it->id) { - //mHistoryMgr->addMessage(false, c->PeerId(), ownId, c); + for( auto cit = privateOutgoingMap.begin(); + cit != privateOutgoingMap.end(); ) + { + RsChatMsgItem *c = cit->second; + if (c->PeerId() == it->id) + { + //mHistoryMgr->addMessage(false, c->PeerId(), ownId, c); to_send.push_back(c) ; - changed = true; - - cit = privateOutgoingList.erase(cit); - + cit = privateOutgoingMap.erase(cit); continue; } ++cit; } - } /* UNLOCKED */ + } - for(uint32_t i=0;inotifyChatMessage(message); + for(auto toIt = to_send.begin(); toIt != to_send.end(); ++toIt) + { + ChatMessage message; + initChatMessage(*toIt, message); + message.incoming = false; + message.online = true; + RsServer::notify()->notifyChatMessage(message); - checkSizeAndSendMessage(to_send[i]); // delete item - } + checkSizeAndSendMessage(*toIt); // delete item + } - if (changed) { - RsServer::notify()->notifyListChange(NOTIFY_LIST_PRIVATE_OUTGOING_CHAT, NOTIFY_TYPE_DEL); + if (changed) + { + RsServer::notify()->notifyListChange( + NOTIFY_LIST_PRIVATE_OUTGOING_CHAT, NOTIFY_TYPE_DEL); IndicateConfigChanged(); } @@ -1324,18 +1435,17 @@ void p3ChatService::statusChange(const std::list &plist) else if (it->actions & RS_SERVICE_PEER_REMOVED) { /* now handle remove */ - mHistoryMgr->clear(ChatId(it->id)); + mHistoryMgr->clear(ChatId(it->id)); - std::list::iterator cit = privateOutgoingList.begin(); - while (cit != privateOutgoingList.end()) { - RsChatMsgItem *c = *cit; - if (c->PeerId() == it->id) { - cit = privateOutgoingList.erase(cit); - continue; - } - ++cit; - } - IndicateConfigChanged(); + RS_STACK_MUTEX(mChatMtx); + for ( auto cit = privateOutgoingMap.begin(); + cit != privateOutgoingMap.end(); ) + { + RsChatMsgItem *c = cit->second; + if (c->PeerId() == it->id) cit = privateOutgoingMap.erase(cit); + else ++cit; + } + IndicateConfigChanged(); } } } diff --git a/libretroshare/src/chat/p3chatservice.h b/libretroshare/src/chat/p3chatservice.h index 3096309b0..365ddb4a9 100644 --- a/libretroshare/src/chat/p3chatservice.h +++ b/libretroshare/src/chat/p3chatservice.h @@ -37,6 +37,8 @@ #include "chat/distantchat.h" #include "chat/distributedchat.h" #include "retroshare/rsmsgs.h" +#include "services/p3gxsmails.h" +#include "util/rsdeprecate.h" class p3ServiceControl; class p3LinkMgr; @@ -51,10 +53,12 @@ typedef RsPeerId ChatLobbyVirtualPeerId ; * This service uses rsnotify (callbacks librs clients (e.g. rs-gui)) * @see NotifyBase */ -class p3ChatService: public p3Service, public DistantChatService, public DistributedChatService, public p3Config, public pqiServiceMonitor +struct p3ChatService : + p3Service, DistantChatService, DistributedChatService, p3Config, + pqiServiceMonitor, GxsMailsClient { -public: - p3ChatService(p3ServiceControl *cs, p3IdService *pids,p3LinkMgr *cm, p3HistoryMgr *historyMgr); + p3ChatService( p3ServiceControl *cs, p3IdService *pids,p3LinkMgr *cm, + p3HistoryMgr *historyMgr, p3GxsMails& gxsMailService ); virtual RsServiceInfo getServiceInfo(); @@ -66,7 +70,7 @@ public: * : notifyCustomState, notifyChatStatus, notifyPeerHasNewAvatar * @see NotifyBase */ - virtual int tick(); + virtual int tick(); /*************** pqiMonitor callback ***********************/ virtual void statusChange(const std::list &plist); @@ -161,6 +165,20 @@ public: */ bool clearPrivateChatQueue(bool incoming, const RsPeerId &id); + virtual bool initiateDistantChatConnexion( const RsGxsId& to_gxs_id, + const RsGxsId& from_gxs_id, + DistantChatPeerId &pid, + uint32_t& error_code ); + + /// @see GxsMailsClient::receiveGxsMail(...) + virtual bool receiveGxsMail( const RsGxsMailItem& /*originalMessage*/, + const uint8_t* data, uint32_t dataSize ); + + /// @see GxsMailsClient::notifySendMailStatus(...) + virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage, + GxsMailStatus status ); + + protected: /************* from p3Config *******************/ virtual RsSerialiser *setupSerialiser() ; @@ -177,8 +195,9 @@ protected: /// This is to be used by subclasses/parents to call IndicateConfigChanged() virtual void triggerConfigSave() { IndicateConfigChanged() ; } + /// Same, for storing messages in incoming list - virtual void locked_storeIncomingMsg(RsChatMsgItem *) ; + RS_DEPRECATED virtual void locked_storeIncomingMsg(RsChatMsgItem *) ; private: RsMutex mChatMtx; @@ -231,7 +250,9 @@ private: p3LinkMgr *mLinkMgr; p3HistoryMgr *mHistoryMgr; - std::list privateOutgoingList; // messages waiting to be send when peer comes online + /// messages waiting to be send when peer comes online + typedef std::map outMP; + outMP privateOutgoingMap; AvatarInfo *_own_avatar ; std::map _avatars ; @@ -240,7 +261,19 @@ private: std::string _custom_status_string ; std::map _state_strings ; - RsChatSerialiser *_serializer ; + RsChatSerialiser *_serializer; + + struct DistantEndpoints { RsGxsId from; RsGxsId to; }; + typedef std::map DEPMap; + DEPMap mDistantGxsMap; + p3GxsMails& mGxsTransport; + + /** As we have multiple backends duplicates are possible, keep track of + * recently received messages hashes for at least 2h to avoid them */ + const static uint32_t RECENTLY_RECEIVED_INTERVAL = 2*3600; + std::map mRecentlyReceivedMessageHashes; + RsMutex recentlyReceivedMutex; + void cleanListOfReceivedMessageHashes(); }; class p3ChatService::StateStringInfo diff --git a/libretroshare/src/chat/rschatitems.h b/libretroshare/src/chat/rschatitems.h index 13b42c18f..b047f221f 100644 --- a/libretroshare/src/chat/rschatitems.h +++ b/libretroshare/src/chat/rschatitems.h @@ -75,6 +75,7 @@ const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_SIGNED_MSG = 0x17 ; const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_SIGNED_EVENT = 0x18 ; const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_LIST = 0x19 ; const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_INVITE = 0x1A ; +const uint8_t RS_PKT_SUBTYPE_OUTGOING_MAP = 0x1B ; typedef uint64_t ChatLobbyId ; typedef uint64_t ChatLobbyMsgId ; @@ -286,30 +287,32 @@ class RsChatLobbyInviteItem: public RsChatItem * For saving incoming and outgoing chat msgs * @see p3ChatService */ -class RsPrivateChatMsgConfigItem: public RsChatItem +struct RsPrivateChatMsgConfigItem : RsChatItem { - public: - RsPrivateChatMsgConfigItem() :RsChatItem(RS_PKT_SUBTYPE_PRIVATECHATMSG_CONFIG) {} - RsPrivateChatMsgConfigItem(void *data,uint32_t size) ; // deserialization + RsPrivateChatMsgConfigItem() :RsChatItem(RS_PKT_SUBTYPE_PRIVATECHATMSG_CONFIG) {} + RsPrivateChatMsgConfigItem(void *data,uint32_t size) ; // deserialization - virtual ~RsPrivateChatMsgConfigItem() {} - virtual void clear() {} - virtual std::ostream& print(std::ostream &out, uint16_t indent = 0); + virtual ~RsPrivateChatMsgConfigItem() {} + virtual void clear() {} + virtual std::ostream& print(std::ostream &out, uint16_t indent = 0); - virtual bool serialise(void *data,uint32_t& size) ; // Isn't it better that items can serialize themselves ? - virtual uint32_t serial_size() ; // deserialise is handled using a constructor + virtual bool serialise(void *data,uint32_t& size); + virtual uint32_t serial_size(); + /* Deserialize is handled using a constructor,it would be better have a + * deserialize method as constructor cannot fails while deserialization can. + */ - /* set data from RsChatMsgItem to RsPrivateChatMsgConfigItem */ - void set(RsChatMsgItem *ci, const RsPeerId &peerId, uint32_t confFlags); - /* get data from RsPrivateChatMsgConfigItem to RsChatMsgItem */ - void get(RsChatMsgItem *ci); + /* set data from RsChatMsgItem to RsPrivateChatMsgConfigItem */ + void set(RsChatMsgItem *ci, const RsPeerId &peerId, uint32_t confFlags); + /* get data from RsPrivateChatMsgConfigItem to RsChatMsgItem */ + void get(RsChatMsgItem *ci); - RsPeerId configPeerId; - uint32_t chatFlags; - uint32_t configFlags; - uint32_t sendTime; - std::string message; - uint32_t recvTime; + RsPeerId configPeerId; + uint32_t chatFlags; + uint32_t configFlags; + uint32_t sendTime; + std::string message; + uint32_t recvTime; }; class RsPrivateChatDistantInviteConfigItem: public RsChatItem { @@ -413,6 +416,126 @@ class RsChatDHPublicKeyItem: public RsChatItem const RsChatDHPublicKeyItem& operator=(const RsChatDHPublicKeyItem&) { return *this ;} }; + +struct PrivateOugoingMapItem : RsChatItem +{ + PrivateOugoingMapItem() : RsChatItem(RS_PKT_SUBTYPE_OUTGOING_MAP) {} + + uint32_t serial_size() + { + uint32_t s = 8; /* header */ + s += 4; // number of entries + for( auto entry : store ) + { + s += 8; // key size + s += entry.second.serial_size(); + } + return s; + } + bool serialise(void* data, uint32_t& pktsize) + { + uint32_t tlvsize = serial_size(); + uint32_t offset = 0; + + if (pktsize < tlvsize) return false; /* not enough space */ + + pktsize = tlvsize; + + bool ok = true; + + ok = ok && setRsItemHeader(data, tlvsize, PacketId(), tlvsize) + && (offset += 8); + + ok = ok && setRawUInt32(data, tlvsize, &offset, store.size()); + + for( auto entry : store ) + { + ok = ok && setRawUInt64(data, tlvsize, &offset, entry.first); + + uint8_t* hdrPtr = static_cast(data) + offset; + uint32_t tmpsize = entry.second.serial_size(); + ok = ok && entry.second.serialise(hdrPtr, tmpsize); + } + + if (offset != tlvsize) + { + ok = false; + std::cerr << "PrivateOugoingMapItem::serialise() Size Error!" + << std::endl; + } + + return ok; + } + PrivateOugoingMapItem* deserialise(const uint8_t* data, uint32_t& pktsize) + { + /* get the type and size */ + uint8_t* dataPtr = const_cast(data); + uint32_t rstype = getRsItemId(dataPtr); + uint32_t rssize = getRsItemSize(dataPtr); + + uint32_t offset = 0; + + if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || + (RS_SERVICE_TYPE_CHAT != getRsItemService(rstype)) || + (RS_PKT_SUBTYPE_OUTGOING_MAP != getRsItemSubType(rstype)) + ) return NULL; /* wrong type */ + + if (pktsize < rssize) return NULL; /* check size not enough data */ + + /* set the packet length */ + pktsize = rssize; + + bool ok = true; + + /* ready to load */ + PrivateOugoingMapItem* item = new PrivateOugoingMapItem(); + + /* skip the header */ + offset += 8; + + // get map size first */ + uint32_t s = 0; + ok = ok && getRawUInt32(dataPtr, rssize, &offset, &s); + + for(uint32_t i=0; i(data); hdrPtr += offset; + uint32_t tmpSize = getRsItemSize(hdrPtr); + RsChatMsgItem msgItem(hdrPtr, tmpSize); + item->store.insert(std::make_pair(msgId, msgItem)); + } + + if (offset != rssize) + { + /* error */ + std::cerr << "(EE) size error in packet deserialisation: p3MsgItem," + << " subtype " << getRsItemSubType(rstype) << ". offset=" + << offset << " != rssize=" << rssize << std::endl; + delete item; + return NULL; + } + + if (!ok) + { + std::cerr << "(EE) size error in packet deserialisation: p3MsgItem," + << " subtype " << getRsItemSubType(rstype) << std::endl; + delete item; + return NULL; + } + + return item; + } + + virtual std::ostream& print(std::ostream &out, uint16_t /*indent*/ = 0) + { return out << "PrivateOugoingMapItem store size: " << store.size(); } + + std::map store; +}; + class RsChatSerialiser: public RsSerialType { public: diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index ee2853502..2b6c8f9d9 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1511,7 +1511,8 @@ int RsServer::StartupRetroShare() mDisc = new p3discovery2(mPeerMgr, mLinkMgr, mNetMgr, serviceCtrl); mHeart = new p3heartbeat(serviceCtrl, pqih); msgSrv = new p3MsgService( serviceCtrl, mGxsIdService, *mGxsMails ); - chatSrv = new p3ChatService(serviceCtrl,mGxsIdService, mLinkMgr, mHistoryMgr); + chatSrv = new p3ChatService( serviceCtrl,mGxsIdService, mLinkMgr, + mHistoryMgr, *mGxsMails ); mStatusSrv = new p3StatusService(serviceCtrl); #ifdef ENABLE_GROUTER diff --git a/libretroshare/src/serialiser/rsgxsmailitems.h b/libretroshare/src/serialiser/rsgxsmailitems.h index 61545fbb5..e9c565d83 100644 --- a/libretroshare/src/serialiser/rsgxsmailitems.h +++ b/libretroshare/src/serialiser/rsgxsmailitems.h @@ -29,9 +29,10 @@ /// Subservices identifiers (like port for TCP) enum class GxsMailSubServices : uint16_t { - UNKNOWN = 0, - TEST_SERVICE = 1, - P3_MSG_SERVICE = 2 + UNKNOWN = 0, + TEST_SERVICE = 1, + P3_MSG_SERVICE = 2, + P3_CHAT_SERVICE = 3 }; /// Values must fit into uint8_t