p3ChatService support async distant chat via Gxs

To implement async distant chat p3ChatService use p3GxsMails in a similar
way that has been done with p3MsgService tought as p3ChatService was not
thinked for async comunication in the first place the result is quite
clumsy. A proper chat service should be rewritten from scratch in the near
future, with proper chat history and other desiderables features.
deprecated empty p3ChatService::locked_storeIncomingMsg(...)
This commit is contained in:
Gioacchino Mazzurco 2017-02-27 22:18:37 +01:00
parent f19fe56a93
commit 4c89641d3e
5 changed files with 466 additions and 198 deletions

View File

@ -53,40 +53,30 @@ static const uint32_t MAX_AVATAR_JPEG_SIZE = 32767; // Maximum size
// don't transfer correctly and can kill the system. // don't transfer correctly and can kill the system.
// Images are 96x96, which makes approx. 27000 bytes uncompressed. // Images are 96x96, which makes approx. 27000 bytes uncompressed.
p3ChatService::p3ChatService(p3ServiceControl *sc,p3IdService *pids, p3LinkMgr *lm, p3HistoryMgr *historyMgr) p3ChatService::p3ChatService( p3ServiceControl *sc, p3IdService *pids,
: DistributedChatService(getServiceInfo().mServiceType,sc,historyMgr,pids), mChatMtx("p3ChatService"),mServiceCtrl(sc), mLinkMgr(lm) , mHistoryMgr(historyMgr) p3LinkMgr *lm, p3HistoryMgr *historyMgr,
p3GxsMails& gxsMailService ) :
DistributedChatService(getServiceInfo().mServiceType, sc, historyMgr,pids),
mChatMtx("p3ChatService"), mServiceCtrl(sc), mLinkMgr(lm),
mHistoryMgr(historyMgr), _own_avatar(NULL),
_serializer(new RsChatSerialiser()), mGxsTransport(gxsMailService),
recentlyReceivedMutex("p3ChatService recently received mutex")
{ {
_serializer = new RsChatSerialiser() ;
_own_avatar = NULL ;
_custom_status_string = "" ;
addSerialType(_serializer); addSerialType(_serializer);
mGxsTransport.registerGxsMailsClient( GxsMailSubServices::P3_CHAT_SERVICE,
this );
} }
const std::string CHAT_APP_NAME = "chat";
const uint16_t CHAT_APP_MAJOR_VERSION = 1;
const uint16_t CHAT_APP_MINOR_VERSION = 0;
const uint16_t CHAT_MIN_MAJOR_VERSION = 1;
const uint16_t CHAT_MIN_MINOR_VERSION = 0;
RsServiceInfo p3ChatService::getServiceInfo() RsServiceInfo p3ChatService::getServiceInfo()
{ { return RsServiceInfo(RS_SERVICE_TYPE_CHAT, "chat", 1, 0, 1, 0); }
return RsServiceInfo(RS_SERVICE_TYPE_CHAT,
CHAT_APP_NAME,
CHAT_APP_MAJOR_VERSION,
CHAT_APP_MINOR_VERSION,
CHAT_MIN_MAJOR_VERSION,
CHAT_MIN_MINOR_VERSION);
}
int p3ChatService::tick() int p3ChatService::tick()
{ {
if(receivedItems()) if(receivedItems()) receiveChatQueue();
receiveChatQueue();
DistributedChatService::flush(); DistributedChatService::flush();
//DistantChatService::flush() ;
cleanListOfReceivedMessageHashes();
return 0; return 0;
} }
@ -253,10 +243,10 @@ void p3ChatService::clearChatLobby(const ChatId& id)
void p3ChatService::sendChatItem(RsChatItem *item) void p3ChatService::sendChatItem(RsChatItem *item)
{ {
if(DistantChatService::handleOutgoingItem(item)) if(DistantChatService::handleOutgoingItem(item)) return;
return ;
#ifdef CHAT_DEBUG #ifdef CHAT_DEBUG
std::cerr << "p3ChatService::sendChatItem(): sending to " << item->PeerId() << ": interpreted as friend peer id." << std::endl; std::cerr << "p3ChatService::sendChatItem(): sending to " << item->PeerId()
<< ": interpreted as friend peer id." << std::endl;
#endif #endif
sendItem(item); sendItem(item);
} }
@ -338,12 +328,6 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg)
if(!isOnline(vpid)) if(!isOnline(vpid))
{ {
/* peer is offline, add to outgoing list */
{
RS_STACK_MUTEX(mChatMtx);
privateOutgoingList.push_back(ci);
}
message.online = false; message.online = false;
RsServer::notify()->notifyChatMessage(message); RsServer::notify()->notifyChatMessage(message);
@ -351,12 +335,39 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg)
// this is not very nice, because the user may think the message was send, while it is still in the queue // this is not very nice, because the user may think the message was send, while it is still in the queue
mHistoryMgr->addMessage(message); mHistoryMgr->addMessage(message);
RsGxsMailId tId = RSRandom::random_u64();
if(destination.isDistantChatId())
{
DEPMap::const_iterator it =
mDistantGxsMap.find(destination.toDistantChatId());
if(it != mDistantGxsMap.end())
{
const DistantEndpoints& de(it->second);
uint32_t sz = ci->serial_size();
std::vector<uint8_t> data; data.resize(sz);
ci->serialise(&data[0], sz);
mGxsTransport.sendMail(tId, GxsMailSubServices::P3_CHAT_SERVICE,
de.from, de.to, &data[0], sz);
}
else
std::cout << "p3ChatService::sendChat(...) can't find distant"
<< "chat id in mDistantGxsMap this is unxpected!"
<< std::endl;
}
// peer is offline, add to outgoing list
{
RS_STACK_MUTEX(mChatMtx);
privateOutgoingMap.insert(outMP::value_type(tId, ci));
}
IndicateConfigChanged(); IndicateConfigChanged();
return false; return false;
} }
{ {
RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mChatMtx);
std::map<RsPeerId,AvatarInfo*>::iterator it = _avatars.find(vpid); std::map<RsPeerId,AvatarInfo*>::iterator it = _avatars.find(vpid);
if(it == _avatars.end()) if(it == _avatars.end())
@ -505,28 +516,23 @@ class MsgCounter
void p3ChatService::handleIncomingItem(RsItem *item) void p3ChatService::handleIncomingItem(RsItem *item)
{ {
#ifdef CHAT_DEBUG #ifdef CHAT_DEBUG
std::cerr << "p3ChatService::receiveChatQueue() Item:" << (void*)item << std::endl ; std::cerr << "p3ChatService::receiveChatQueue() Item:" << (void*)item
<< std::endl ;
#endif #endif
// RsChatMsgItems needs dynamic_cast, since they have derived siblings. // RsChatMsgItems needs dynamic_cast, since they have derived siblings.
//
RsChatMsgItem* ci = dynamic_cast<RsChatMsgItem*>(item); RsChatMsgItem* ci = dynamic_cast<RsChatMsgItem*>(item);
if(ci != NULL) if(ci)
{ {
handleRecvChatMsgItem(ci); handleRecvChatMsgItem(ci);
if(ci) /* +ci+ deletion is handled by handleRecvChatMsgItem ONLY in some
* specific cases, in case +ci+ has not been handled deleted it here */
delete ci ; delete ci ;
return ; // don't delete! It's handled by handleRecvChatMsgItem in some specific cases only. return;
} }
// if(DistantChatService::handleRecvItem(dynamic_cast<RsChatItem*>(item)))
// {
// delete item ;
// return ;
// }
if(DistributedChatService::handleRecvItem(dynamic_cast<RsChatItem*>(item))) if(DistributedChatService::handleRecvItem(dynamic_cast<RsChatItem*>(item)))
{ {
delete item; delete item;
@ -535,15 +541,20 @@ void p3ChatService::handleIncomingItem(RsItem *item)
switch(item->PacketSubType()) switch(item->PacketSubType())
{ {
case RS_PKT_SUBTYPE_CHAT_STATUS: handleRecvChatStatusItem(dynamic_cast<RsChatStatusItem*>(item)) ; break ; case RS_PKT_SUBTYPE_CHAT_STATUS:
case RS_PKT_SUBTYPE_CHAT_AVATAR: handleRecvChatAvatarItem(dynamic_cast<RsChatAvatarItem*>(item)) ; break ; handleRecvChatStatusItem(dynamic_cast<RsChatStatusItem*>(item));
break;
case RS_PKT_SUBTYPE_CHAT_AVATAR:
handleRecvChatAvatarItem(dynamic_cast<RsChatAvatarItem*>(item));
break;
default: default:
{ {
static int already = false; static int already = false;
if(!already) if(!already)
{ {
std::cerr << "Unhandled item subtype " << (int)item->PacketSubType() << " in p3ChatService: " << std::endl; std::cerr << "Unhandled item subtype "
<< static_cast<int>(item->PacketSubType())
<< " in p3ChatService: " << std::endl;
already = true; already = true;
} }
} }
@ -676,35 +687,105 @@ bool p3ChatService::checkForMessageSecurity(RsChatMsgItem *ci)
return true ; return true ;
} }
bool p3ChatService::initiateDistantChatConnexion(
const RsGxsId& to_gxs_id, const RsGxsId& from_gxs_id,
DistantChatPeerId& pid, uint32_t& error_code )
{
if(DistantChatService::initiateDistantChatConnexion( to_gxs_id,
from_gxs_id, pid,
error_code ))
{
DistantEndpoints ep; ep.from = from_gxs_id; ep.to = to_gxs_id;
mDistantGxsMap.insert(DEPMap::value_type(pid, ep));
return true;
}
return false;
}
bool p3ChatService::receiveGxsMail(const RsGxsMailItem&, const uint8_t* data, uint32_t dataSize)
{
RsChatMsgItem* item = new RsChatMsgItem( const_cast<uint8_t*>(data),
dataSize );
handleRecvChatMsgItem(item);
delete item;
return true;
}
bool p3ChatService::notifySendMailStatus(const RsGxsMailItem& originalMessage, GxsMailStatus status)
{
if ( status != GxsMailStatus::RECEIPT_RECEIVED ) return true;
bool changed = false;
{
RS_STACK_MUTEX(mChatMtx);
auto it = privateOutgoingMap.find(originalMessage.mailId);
if( it != privateOutgoingMap.end() )
{
privateOutgoingMap.erase(it);
changed = true;
}
}
if(changed)
{
RsServer::notify()->notifyListChange(
NOTIFY_LIST_PRIVATE_OUTGOING_CHAT, NOTIFY_TYPE_DEL );
IndicateConfigChanged();
}
return true;
}
bool p3ChatService::handleRecvChatMsgItem(RsChatMsgItem *& ci) bool p3ChatService::handleRecvChatMsgItem(RsChatMsgItem *& ci)
{ {
time_t now = time(NULL); time_t now = time(NULL);
{ // Check for duplicates
uint32_t sz = ci->serial_size();
std::vector<uint8_t> srz; srz.resize(sz);
ci->serialise(&srz[0], sz);
Sha1CheckSum hash = RsDirUtil::sha1sum(&srz[0], sz);
{
RS_STACK_MUTEX(recentlyReceivedMutex);
if( mRecentlyReceivedMessageHashes.find(hash) !=
mRecentlyReceivedMessageHashes.end() )
{
std::cerr << "p3ChatService::handleRecvChatMsgItem(...) (II) "
<< "receiving distant message of hash " << hash
<< " more than once. Probably it has arrived before "
<< "by other means." << std::endl;
delete ci; ci=NULL;
return true;
}
mRecentlyReceivedMessageHashes[hash] = now;
}
}
std::string name; std::string name;
uint32_t popupChatFlag = RS_POPUP_CHAT; uint32_t popupChatFlag = RS_POPUP_CHAT;
{ {
RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mChatMtx);
if(!locked_checkAndRebuildPartialMessage(ci)) // we make sure this call does not take control over the memory // we make sure this call does not take control over the memory
return true ; // message is a subpart of an existing message. So everything ok, but we need to return. if(!locked_checkAndRebuildPartialMessage(ci)) return true;
/* message is a subpart of an existing message.
* So everything ok, but we need to return. */
} }
// Check for security. This avoids bombing messages, and so on. // Check for security. This avoids bombing messages, and so on.
if(!checkForMessageSecurity(ci)) return false;
if(!checkForMessageSecurity(ci)) /* If it's a lobby item, we need to bounce it and possibly check for timings
return false ; * etc. */
if(!DistributedChatService::handleRecvChatLobbyMsgItem(ci)) return false;
// If it's a lobby item, we need to bounce it and possibly check for timings etc.
if(!DistributedChatService::handleRecvChatLobbyMsgItem(ci))
return false ;
#ifdef CHAT_DEBUG #ifdef CHAT_DEBUG
std::cerr << "p3ChatService::receiveChatQueue() Item:"; std::cerr << "p3ChatService::receiveChatQueue() Item:" << std::endl;
std::cerr << std::endl;
ci->print(std::cerr); ci->print(std::cerr);
std::cerr << std::endl; std::cerr << std::endl << "Got msg. Flags = " << ci->chatFlags << std::endl;
std::cerr << "Got msg. Flags = " << ci->chatFlags << std::endl ;
#endif #endif
// Now treat normal chat stuff such as avatar requests, except for chat lobbies. // Now treat normal chat stuff such as avatar requests, except for chat lobbies.
@ -1087,6 +1168,25 @@ RsChatStatusItem *p3ChatService::makeOwnCustomStateStringItem()
return ci ; return ci ;
} }
void p3ChatService::cleanListOfReceivedMessageHashes()
{
RS_STACK_MUTEX(recentlyReceivedMutex);
time_t now = time(NULL);
for( auto it = mRecentlyReceivedMessageHashes.begin();
it != mRecentlyReceivedMessageHashes.end(); )
if( now > RECENTLY_RECEIVED_INTERVAL + it->second )
{
std::cerr << "p3MsgService(): cleanListOfReceivedMessageHashes("
<< "). Removing old hash " << it->first << ", aged "
<< now - it->second << " secs ago." << std::endl;
it = mRecentlyReceivedMessageHashes.erase(it);
}
else ++it;
}
RsChatAvatarItem *p3ChatService::makeOwnAvatarItem() RsChatAvatarItem *p3ChatService::makeOwnAvatarItem()
{ {
RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/
@ -1144,19 +1244,34 @@ bool p3ChatService::loadList(std::list<RsItem*>& load)
for(std::list<RsItem*>::const_iterator it(load.begin());it!=load.end();++it) for(std::list<RsItem*>::const_iterator it(load.begin());it!=load.end();++it)
{ {
if(PrivateOugoingMapItem* om=dynamic_cast<PrivateOugoingMapItem*>(*it))
{
RS_STACK_MUTEX(mChatMtx);
for( auto& pair : om->store )
{
privateOutgoingMap.insert(
outMP::value_type(pair.first,
new RsChatMsgItem(pair.second)) );
}
delete om; continue;
}
RsChatAvatarItem *ai = NULL ; RsChatAvatarItem *ai = NULL ;
if(NULL != (ai = dynamic_cast<RsChatAvatarItem *>(*it))) if(NULL != (ai = dynamic_cast<RsChatAvatarItem *>(*it)))
{ {
RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mChatMtx);
if(ai->image_size <= MAX_AVATAR_JPEG_SIZE) if(ai->image_size <= MAX_AVATAR_JPEG_SIZE)
_own_avatar = new AvatarInfo(ai->image_data,ai->image_size) ; _own_avatar = new AvatarInfo(ai->image_data,ai->image_size) ;
else else
std::cerr << "Dropping avatar image, because its size is " << ai->image_size << ", and the maximum allowed size is " << MAX_AVATAR_JPEG_SIZE << std::endl; std::cerr << "Dropping avatar image, because its size is "
<< ai->image_size << ", and the maximum allowed size "
<< "is " << MAX_AVATAR_JPEG_SIZE << std::endl;
delete *it; delete *it;
continue; continue;
} }
@ -1164,40 +1279,42 @@ bool p3ChatService::loadList(std::list<RsItem*>& load)
if(NULL != (mitem = dynamic_cast<RsChatStatusItem *>(*it))) if(NULL != (mitem = dynamic_cast<RsChatStatusItem *>(*it)))
{ {
RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mChatMtx);
_custom_status_string = mitem->status_string ; _custom_status_string = mitem->status_string ;
delete *it; delete *it;
continue; continue;
} }
RsPrivateChatMsgConfigItem *citem = NULL ; /* TODO: G10h4ck 2017/02/27 this block is kept for retrocompatibility,
* and will be used just first time, to load messages in the old format
if(NULL != (citem = dynamic_cast<RsPrivateChatMsgConfigItem *>(*it))) * should be removed in the following RS version */
if( RsPrivateChatMsgConfigItem *citem =
dynamic_cast<RsPrivateChatMsgConfigItem *>(*it) )
{ {
RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mChatMtx);
if (citem->chatFlags & RS_CHAT_FLAG_PRIVATE) { if ( citem->chatFlags & RS_CHAT_FLAG_PRIVATE )
if (std::find(ssl_peers.begin(), ssl_peers.end(), citem->configPeerId) != ssl_peers.end()) { {
if ( std::find(ssl_peers.begin(), ssl_peers.end(),
citem->configPeerId) != ssl_peers.end() )
{
RsChatMsgItem *ci = new RsChatMsgItem(); RsChatMsgItem *ci = new RsChatMsgItem();
citem->get(ci); citem->get(ci);
if (citem->configFlags & RS_CHATMSG_CONFIGFLAG_INCOMING) { if (citem->configFlags & RS_CHATMSG_CONFIGFLAG_INCOMING)
{
locked_storeIncomingMsg(ci); locked_storeIncomingMsg(ci);
} else {
privateOutgoingList.push_back(ci);
} }
} else { else privateOutgoingMap.insert(
// no friends outMP::value_type(RSRandom::random_u64(), ci) );
} }
} else { else { /* no friends */ }
// ignore all other items
} }
else { /* ignore all other items */ }
delete *it; delete *it;
continue; continue;
} }
@ -1228,22 +1345,19 @@ bool p3ChatService::saveList(bool& cleanup, std::list<RsItem*>& list)
mChatMtx.lock(); /****** MUTEX LOCKED *******/ mChatMtx.lock(); /****** MUTEX LOCKED *******/
PrivateOugoingMapItem* om = new PrivateOugoingMapItem;
typedef std::map<uint64_t, RsChatMsgItem>::value_type vT;
for( auto& pair : privateOutgoingMap )
om->store.insert(vT(pair.first, *pair.second));
list.push_back(om);
RsChatStatusItem *di = new RsChatStatusItem ; RsChatStatusItem *di = new RsChatStatusItem ;
di->status_string = _custom_status_string ; di->status_string = _custom_status_string ;
di->flags = RS_CHAT_FLAG_CUSTOM_STATE ; di->flags = RS_CHAT_FLAG_CUSTOM_STATE ;
list.push_back(di); list.push_back(di);
/* save outgoing private chat messages */
std::list<RsChatMsgItem *>::iterator it;
for (it = privateOutgoingList.begin(); it != privateOutgoingList.end(); it++) {
RsPrivateChatMsgConfigItem *ci = new RsPrivateChatMsgConfigItem;
ci->set(*it, (*it)->PeerId(), 0);
list.push_back(ci);
}
DistributedChatService::addToSaveList(list) ; DistributedChatService::addToSaveList(list) ;
DistantChatService::addToSaveList(list) ; DistantChatService::addToSaveList(list) ;
@ -1269,8 +1383,8 @@ RsSerialiser *p3ChatService::setupSerialiser()
void p3ChatService::statusChange(const std::list<pqiServicePeer> &plist) void p3ChatService::statusChange(const std::list<pqiServicePeer> &plist)
{ {
std::list<pqiServicePeer>::const_iterator it; for (auto it = plist.cbegin(); it != plist.cend(); ++it)
for (it = plist.begin(); it != plist.end(); ++it) { {
if (it->actions & RS_SERVICE_PEER_CONNECTED) if (it->actions & RS_SERVICE_PEER_CONNECTED)
{ {
/* send the saved outgoing messages */ /* send the saved outgoing messages */
@ -1278,45 +1392,42 @@ void p3ChatService::statusChange(const std::list<pqiServicePeer> &plist)
std::vector<RsChatMsgItem*> to_send; std::vector<RsChatMsgItem*> to_send;
if (privateOutgoingList.size())
{ {
RsStackMutex stack(mChatMtx); /********** STACK LOCKED MTX ******/ RS_STACK_MUTEX(mChatMtx);
RsPeerId ownId = mServiceCtrl->getOwnId(); for( auto cit = privateOutgoingMap.begin();
cit != privateOutgoingMap.end(); )
std::list<RsChatMsgItem *>::iterator cit = privateOutgoingList.begin(); {
while (cit != privateOutgoingList.end()) { RsChatMsgItem *c = cit->second;
RsChatMsgItem *c = *cit; if (c->PeerId() == it->id)
{
if (c->PeerId() == it->id) {
//mHistoryMgr->addMessage(false, c->PeerId(), ownId, c); //mHistoryMgr->addMessage(false, c->PeerId(), ownId, c);
to_send.push_back(c) ; to_send.push_back(c) ;
changed = true; changed = true;
cit = privateOutgoingMap.erase(cit);
cit = privateOutgoingList.erase(cit);
continue; continue;
} }
++cit; ++cit;
} }
} /* UNLOCKED */ }
for(uint32_t i=0;i<to_send.size();++i) for(auto toIt = to_send.begin(); toIt != to_send.end(); ++toIt)
{ {
ChatMessage message; ChatMessage message;
initChatMessage(to_send[i], message); initChatMessage(*toIt, message);
message.incoming = false; message.incoming = false;
message.online = true; message.online = true;
RsServer::notify()->notifyChatMessage(message); RsServer::notify()->notifyChatMessage(message);
checkSizeAndSendMessage(to_send[i]); // delete item checkSizeAndSendMessage(*toIt); // delete item
} }
if (changed) { if (changed)
RsServer::notify()->notifyListChange(NOTIFY_LIST_PRIVATE_OUTGOING_CHAT, NOTIFY_TYPE_DEL); {
RsServer::notify()->notifyListChange(
NOTIFY_LIST_PRIVATE_OUTGOING_CHAT, NOTIFY_TYPE_DEL);
IndicateConfigChanged(); IndicateConfigChanged();
} }
@ -1326,14 +1437,13 @@ void p3ChatService::statusChange(const std::list<pqiServicePeer> &plist)
/* now handle remove */ /* now handle remove */
mHistoryMgr->clear(ChatId(it->id)); mHistoryMgr->clear(ChatId(it->id));
std::list<RsChatMsgItem *>::iterator cit = privateOutgoingList.begin(); RS_STACK_MUTEX(mChatMtx);
while (cit != privateOutgoingList.end()) { for ( auto cit = privateOutgoingMap.begin();
RsChatMsgItem *c = *cit; cit != privateOutgoingMap.end(); )
if (c->PeerId() == it->id) { {
cit = privateOutgoingList.erase(cit); RsChatMsgItem *c = cit->second;
continue; if (c->PeerId() == it->id) cit = privateOutgoingMap.erase(cit);
} else ++cit;
++cit;
} }
IndicateConfigChanged(); IndicateConfigChanged();
} }

View File

@ -37,6 +37,8 @@
#include "chat/distantchat.h" #include "chat/distantchat.h"
#include "chat/distributedchat.h" #include "chat/distributedchat.h"
#include "retroshare/rsmsgs.h" #include "retroshare/rsmsgs.h"
#include "services/p3gxsmails.h"
#include "util/rsdeprecate.h"
class p3ServiceControl; class p3ServiceControl;
class p3LinkMgr; class p3LinkMgr;
@ -51,10 +53,12 @@ typedef RsPeerId ChatLobbyVirtualPeerId ;
* This service uses rsnotify (callbacks librs clients (e.g. rs-gui)) * This service uses rsnotify (callbacks librs clients (e.g. rs-gui))
* @see NotifyBase * @see NotifyBase
*/ */
class p3ChatService: public p3Service, public DistantChatService, public DistributedChatService, public p3Config, public pqiServiceMonitor struct p3ChatService :
p3Service, DistantChatService, DistributedChatService, p3Config,
pqiServiceMonitor, GxsMailsClient
{ {
public: p3ChatService( p3ServiceControl *cs, p3IdService *pids,p3LinkMgr *cm,
p3ChatService(p3ServiceControl *cs, p3IdService *pids,p3LinkMgr *cm, p3HistoryMgr *historyMgr); p3HistoryMgr *historyMgr, p3GxsMails& gxsMailService );
virtual RsServiceInfo getServiceInfo(); virtual RsServiceInfo getServiceInfo();
@ -161,6 +165,20 @@ public:
*/ */
bool clearPrivateChatQueue(bool incoming, const RsPeerId &id); bool clearPrivateChatQueue(bool incoming, const RsPeerId &id);
virtual bool initiateDistantChatConnexion( const RsGxsId& to_gxs_id,
const RsGxsId& from_gxs_id,
DistantChatPeerId &pid,
uint32_t& error_code );
/// @see GxsMailsClient::receiveGxsMail(...)
virtual bool receiveGxsMail( const RsGxsMailItem& /*originalMessage*/,
const uint8_t* data, uint32_t dataSize );
/// @see GxsMailsClient::notifySendMailStatus(...)
virtual bool notifySendMailStatus( const RsGxsMailItem& originalMessage,
GxsMailStatus status );
protected: protected:
/************* from p3Config *******************/ /************* from p3Config *******************/
virtual RsSerialiser *setupSerialiser() ; virtual RsSerialiser *setupSerialiser() ;
@ -177,8 +195,9 @@ protected:
/// This is to be used by subclasses/parents to call IndicateConfigChanged() /// This is to be used by subclasses/parents to call IndicateConfigChanged()
virtual void triggerConfigSave() { IndicateConfigChanged() ; } virtual void triggerConfigSave() { IndicateConfigChanged() ; }
/// Same, for storing messages in incoming list /// Same, for storing messages in incoming list
virtual void locked_storeIncomingMsg(RsChatMsgItem *) ; RS_DEPRECATED virtual void locked_storeIncomingMsg(RsChatMsgItem *) ;
private: private:
RsMutex mChatMtx; RsMutex mChatMtx;
@ -231,7 +250,9 @@ private:
p3LinkMgr *mLinkMgr; p3LinkMgr *mLinkMgr;
p3HistoryMgr *mHistoryMgr; p3HistoryMgr *mHistoryMgr;
std::list<RsChatMsgItem *> privateOutgoingList; // messages waiting to be send when peer comes online /// messages waiting to be send when peer comes online
typedef std::map<uint64_t, RsChatMsgItem *> outMP;
outMP privateOutgoingMap;
AvatarInfo *_own_avatar ; AvatarInfo *_own_avatar ;
std::map<RsPeerId,AvatarInfo *> _avatars ; std::map<RsPeerId,AvatarInfo *> _avatars ;
@ -241,6 +262,18 @@ private:
std::map<RsPeerId,StateStringInfo> _state_strings ; std::map<RsPeerId,StateStringInfo> _state_strings ;
RsChatSerialiser *_serializer; RsChatSerialiser *_serializer;
struct DistantEndpoints { RsGxsId from; RsGxsId to; };
typedef std::map<DistantChatPeerId, DistantEndpoints> DEPMap;
DEPMap mDistantGxsMap;
p3GxsMails& mGxsTransport;
/** As we have multiple backends duplicates are possible, keep track of
* recently received messages hashes for at least 2h to avoid them */
const static uint32_t RECENTLY_RECEIVED_INTERVAL = 2*3600;
std::map<Sha1CheckSum, uint32_t> mRecentlyReceivedMessageHashes;
RsMutex recentlyReceivedMutex;
void cleanListOfReceivedMessageHashes();
}; };
class p3ChatService::StateStringInfo class p3ChatService::StateStringInfo

View File

@ -75,6 +75,7 @@ const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_SIGNED_MSG = 0x17 ;
const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_SIGNED_EVENT = 0x18 ; const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_SIGNED_EVENT = 0x18 ;
const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_LIST = 0x19 ; const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_LIST = 0x19 ;
const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_INVITE = 0x1A ; const uint8_t RS_PKT_SUBTYPE_CHAT_LOBBY_INVITE = 0x1A ;
const uint8_t RS_PKT_SUBTYPE_OUTGOING_MAP = 0x1B ;
typedef uint64_t ChatLobbyId ; typedef uint64_t ChatLobbyId ;
typedef uint64_t ChatLobbyMsgId ; typedef uint64_t ChatLobbyMsgId ;
@ -286,9 +287,8 @@ class RsChatLobbyInviteItem: public RsChatItem
* For saving incoming and outgoing chat msgs * For saving incoming and outgoing chat msgs
* @see p3ChatService * @see p3ChatService
*/ */
class RsPrivateChatMsgConfigItem: public RsChatItem struct RsPrivateChatMsgConfigItem : RsChatItem
{ {
public:
RsPrivateChatMsgConfigItem() :RsChatItem(RS_PKT_SUBTYPE_PRIVATECHATMSG_CONFIG) {} RsPrivateChatMsgConfigItem() :RsChatItem(RS_PKT_SUBTYPE_PRIVATECHATMSG_CONFIG) {}
RsPrivateChatMsgConfigItem(void *data,uint32_t size) ; // deserialization RsPrivateChatMsgConfigItem(void *data,uint32_t size) ; // deserialization
@ -296,8 +296,11 @@ class RsPrivateChatMsgConfigItem: public RsChatItem
virtual void clear() {} virtual void clear() {}
virtual std::ostream& print(std::ostream &out, uint16_t indent = 0); virtual std::ostream& print(std::ostream &out, uint16_t indent = 0);
virtual bool serialise(void *data,uint32_t& size) ; // Isn't it better that items can serialize themselves ? virtual bool serialise(void *data,uint32_t& size);
virtual uint32_t serial_size() ; // deserialise is handled using a constructor virtual uint32_t serial_size();
/* Deserialize is handled using a constructor,it would be better have a
* deserialize method as constructor cannot fails while deserialization can.
*/
/* set data from RsChatMsgItem to RsPrivateChatMsgConfigItem */ /* set data from RsChatMsgItem to RsPrivateChatMsgConfigItem */
void set(RsChatMsgItem *ci, const RsPeerId &peerId, uint32_t confFlags); void set(RsChatMsgItem *ci, const RsPeerId &peerId, uint32_t confFlags);
@ -413,6 +416,126 @@ class RsChatDHPublicKeyItem: public RsChatItem
const RsChatDHPublicKeyItem& operator=(const RsChatDHPublicKeyItem&) { return *this ;} const RsChatDHPublicKeyItem& operator=(const RsChatDHPublicKeyItem&) { return *this ;}
}; };
struct PrivateOugoingMapItem : RsChatItem
{
PrivateOugoingMapItem() : RsChatItem(RS_PKT_SUBTYPE_OUTGOING_MAP) {}
uint32_t serial_size()
{
uint32_t s = 8; /* header */
s += 4; // number of entries
for( auto entry : store )
{
s += 8; // key size
s += entry.second.serial_size();
}
return s;
}
bool serialise(void* data, uint32_t& pktsize)
{
uint32_t tlvsize = serial_size();
uint32_t offset = 0;
if (pktsize < tlvsize) return false; /* not enough space */
pktsize = tlvsize;
bool ok = true;
ok = ok && setRsItemHeader(data, tlvsize, PacketId(), tlvsize)
&& (offset += 8);
ok = ok && setRawUInt32(data, tlvsize, &offset, store.size());
for( auto entry : store )
{
ok = ok && setRawUInt64(data, tlvsize, &offset, entry.first);
uint8_t* hdrPtr = static_cast<uint8_t*>(data) + offset;
uint32_t tmpsize = entry.second.serial_size();
ok = ok && entry.second.serialise(hdrPtr, tmpsize);
}
if (offset != tlvsize)
{
ok = false;
std::cerr << "PrivateOugoingMapItem::serialise() Size Error!"
<< std::endl;
}
return ok;
}
PrivateOugoingMapItem* deserialise(const uint8_t* data, uint32_t& pktsize)
{
/* get the type and size */
uint8_t* dataPtr = const_cast<uint8_t*>(data);
uint32_t rstype = getRsItemId(dataPtr);
uint32_t rssize = getRsItemSize(dataPtr);
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(RS_SERVICE_TYPE_CHAT != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_OUTGOING_MAP != getRsItemSubType(rstype))
) return NULL; /* wrong type */
if (pktsize < rssize) return NULL; /* check size not enough data */
/* set the packet length */
pktsize = rssize;
bool ok = true;
/* ready to load */
PrivateOugoingMapItem* item = new PrivateOugoingMapItem();
/* skip the header */
offset += 8;
// get map size first */
uint32_t s = 0;
ok = ok && getRawUInt32(dataPtr, rssize, &offset, &s);
for(uint32_t i=0; i<s && ok; ++i)
{
uint64_t msgId;
ok = ok && getRawUInt64(dataPtr, rssize, &offset, &msgId);
uint8_t* hdrPtr = const_cast<uint8_t*>(data); hdrPtr += offset;
uint32_t tmpSize = getRsItemSize(hdrPtr);
RsChatMsgItem msgItem(hdrPtr, tmpSize);
item->store.insert(std::make_pair(msgId, msgItem));
}
if (offset != rssize)
{
/* error */
std::cerr << "(EE) size error in packet deserialisation: p3MsgItem,"
<< " subtype " << getRsItemSubType(rstype) << ". offset="
<< offset << " != rssize=" << rssize << std::endl;
delete item;
return NULL;
}
if (!ok)
{
std::cerr << "(EE) size error in packet deserialisation: p3MsgItem,"
<< " subtype " << getRsItemSubType(rstype) << std::endl;
delete item;
return NULL;
}
return item;
}
virtual std::ostream& print(std::ostream &out, uint16_t /*indent*/ = 0)
{ return out << "PrivateOugoingMapItem store size: " << store.size(); }
std::map<uint64_t, RsChatMsgItem> store;
};
class RsChatSerialiser: public RsSerialType class RsChatSerialiser: public RsSerialType
{ {
public: public:

View File

@ -1511,7 +1511,8 @@ int RsServer::StartupRetroShare()
mDisc = new p3discovery2(mPeerMgr, mLinkMgr, mNetMgr, serviceCtrl); mDisc = new p3discovery2(mPeerMgr, mLinkMgr, mNetMgr, serviceCtrl);
mHeart = new p3heartbeat(serviceCtrl, pqih); mHeart = new p3heartbeat(serviceCtrl, pqih);
msgSrv = new p3MsgService( serviceCtrl, mGxsIdService, *mGxsMails ); msgSrv = new p3MsgService( serviceCtrl, mGxsIdService, *mGxsMails );
chatSrv = new p3ChatService(serviceCtrl,mGxsIdService, mLinkMgr, mHistoryMgr); chatSrv = new p3ChatService( serviceCtrl,mGxsIdService, mLinkMgr,
mHistoryMgr, *mGxsMails );
mStatusSrv = new p3StatusService(serviceCtrl); mStatusSrv = new p3StatusService(serviceCtrl);
#ifdef ENABLE_GROUTER #ifdef ENABLE_GROUTER

View File

@ -31,7 +31,8 @@ enum class GxsMailSubServices : uint16_t
{ {
UNKNOWN = 0, UNKNOWN = 0,
TEST_SERVICE = 1, TEST_SERVICE = 1,
P3_MSG_SERVICE = 2 P3_MSG_SERVICE = 2,
P3_CHAT_SERVICE = 3
}; };
/// Values must fit into uint8_t /// Values must fit into uint8_t