fixed p3Gxstunnel so that it sends the data packets in the same order it received them. This fixes the bug in distant chat causing images to not transfer correctly in some cases

This commit is contained in:
csoler 2017-07-15 22:16:45 +02:00
parent 9c391cb015
commit 08faa3f5d5
4 changed files with 69 additions and 24 deletions

View File

@ -50,6 +50,19 @@
//#define DEBUG_DISTANT_CHAT
#ifdef DEBUG_DISTANT_CHAT
#include <sys/time.h>
uint32_t msecs_of_day()
{
timeval tv ;
gettimeofday(&tv,NULL) ;
return tv.tv_usec / 1000 ;
}
#define DISTANT_CHAT_DEBUG() std::cerr << time(NULL) << "." << std::setfill('0') << std::setw(3) << msecs_of_day() << " : DISTANT_CHAT : " << __FUNCTION__ << " : "
#endif
static const uint32_t DISTANT_CHAT_KEEP_ALIVE_TIMEOUT = 6 ; // send keep alive packet so as to avoid tunnel breaks.
static const uint32_t RS_DISTANT_CHAT_DH_STATUS_UNINITIALIZED = 0x0000 ;
@ -86,7 +99,7 @@ bool DistantChatService::handleOutgoingItem(RsChatItem *item)
}
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::handleOutgoingItem(): sending to " << item->PeerId() << ": interpreted as a distant chat virtual peer id." << std::endl;
DISTANT_CHAT_DEBUG() << "p3ChatService::handleOutgoingItem(): sending to " << item->PeerId() << ": interpreted as a distant chat virtual peer id." << std::endl;
#endif
uint32_t size = RsChatSerialiser().size(item) ;
@ -98,7 +111,9 @@ bool DistantChatService::handleOutgoingItem(RsChatItem *item)
return false;
}
#ifdef DEBUG_DISTANT_CHAT
std::cerr << " sending: " << RsUtil::BinToHex(mem,size) << std::endl;
DISTANT_CHAT_DEBUG() << " sending: " << RsUtil::BinToHex(mem,size,100) << std::endl;
DISTANT_CHAT_DEBUG() << " size: " << std::dec << size << std::endl;
DISTANT_CHAT_DEBUG() << " hash: " << RsDirUtil::sha1sum(mem,size) << std::endl;
#endif
mGxsTunnels->sendData( RsGxsTunnelId(item->PeerId()),DISTANT_CHAT_GXS_TUNNEL_SERVICE_ID,mem,size);
@ -110,7 +125,7 @@ void DistantChatService::handleRecvChatStatusItem(RsChatStatusItem *cs)
if(cs->flags & RS_CHAT_FLAG_CONNEXION_REFUSED)
{
#ifdef DEBUG_DISTANT_CHAT
std::cerr << "(II) Distant chat: received notification that peer refuses conversation." << std::endl;
DISTANT_CHAT_DEBUG() << "(II) Distant chat: received notification that peer refuses conversation." << std::endl;
#endif
RsServer::notify()->notifyChatStatus(ChatId(DistantChatPeerId(cs->PeerId())),"Connexion refused by distant peer!") ;
}
@ -140,7 +155,7 @@ bool DistantChatService::acceptDataFromPeer(const RsGxsId& gxs_id,const RsGxsTun
if(!res)
{
#ifdef DEBUG_DISTANT_CHAT
std::cerr << "(II) refusing distant chat from peer " << gxs_id << ". Sending a notification back to tunnel " << tunnel_id << std::endl;
DISTANT_CHAT_DEBUG() << "(II) refusing distant chat from peer " << gxs_id << ". Sending a notification back to tunnel " << tunnel_id << std::endl;
#endif
RsChatStatusItem *item = new RsChatStatusItem ;
item->flags = RS_CHAT_FLAG_CONNEXION_REFUSED ;
@ -158,7 +173,11 @@ bool DistantChatService::acceptDataFromPeer(const RsGxsId& gxs_id,const RsGxsTun
return false;
}
std::cerr << " sending: " << RsUtil::BinToHex(mem,size) << std::endl;
#ifdef DEBUG_DISTANT_CHAT
DISTANT_CHAT_DEBUG() << " sending: " << RsUtil::BinToHex(mem,size,100) << std::endl;
DISTANT_CHAT_DEBUG() << " size: " << std::dec << std::endl;
DISTANT_CHAT_DEBUG() << " hash: " << RsDirUtil::sha1sum(mem,size) << std::endl;
#endif
mGxsTunnels->sendData( RsGxsTunnelId(item->PeerId()),DISTANT_CHAT_GXS_TUNNEL_SERVICE_ID,mem,size);
}
@ -169,7 +188,7 @@ bool DistantChatService::acceptDataFromPeer(const RsGxsId& gxs_id,const RsGxsTun
void DistantChatService::notifyTunnelStatus(const RsGxsTunnelService::RsGxsTunnelId &tunnel_id, uint32_t tunnel_status)
{
#ifdef DEBUG_DISTANT_CHAT
std::cerr << "DistantChatService::notifyTunnelStatus(): got notification " << std::hex << tunnel_status << std::dec << " for tunnel " << tunnel_id << std::endl;
DISTANT_CHAT_DEBUG() << "DistantChatService::notifyTunnelStatus(): got notification " << std::hex << tunnel_status << std::dec << " for tunnel " << tunnel_id << std::endl;
#endif
switch(tunnel_status)
@ -195,9 +214,10 @@ void DistantChatService::notifyTunnelStatus(const RsGxsTunnelService::RsGxsTunne
void DistantChatService::receiveData(const RsGxsTunnelService::RsGxsTunnelId &tunnel_id, unsigned char *data, uint32_t data_size)
{
#ifdef DEBUG_DISTANT_CHAT
std::cerr << "DistantChatService::receiveData(): got data of size " << data_size << " for tunnel " << tunnel_id << std::endl;
std::cerr << " received: " << RsUtil::BinToHex(data,data_size) << std::endl;
std::cerr << " deserialising..." << std::endl;
DISTANT_CHAT_DEBUG() << "DistantChatService::receiveData(): got data of size " << std::dec << data_size << " for tunnel " << tunnel_id << std::endl;
DISTANT_CHAT_DEBUG() << " received: " << RsUtil::BinToHex(data,data_size,100) << std::endl;
DISTANT_CHAT_DEBUG() << " hash: " << RsDirUtil::sha1sum(data,data_size) << std::endl;
DISTANT_CHAT_DEBUG() << " deserialising..." << std::endl;
#endif
// always make the contact up to date. This is useful for server side, which doesn't know about the chat until it
@ -324,7 +344,7 @@ bool DistantChatService::setDistantChatPermissionFlags(uint32_t flags)
{
mDistantChatPermissions = flags ;
#ifdef DEBUG_DISTANT_CHAT
std::cerr << "(II) Changing distant chat permissions to " << flags << ". Existing openned chats will however remain active until closed" << std::endl;
DISTANT_CHAT_DEBUG() << "(II) Changing distant chat permissions to " << flags << ". Existing openned chats will however remain active until closed" << std::endl;
#endif
triggerConfigSave() ;
}
@ -354,7 +374,7 @@ bool DistantChatService::processLoadListItem(const RsItem *item)
if(kit->key == "DISTANT_CHAT_PERMISSION_FLAGS")
{
#ifdef DEBUG_DISTANT_CHAT
std::cerr << "Loaded distant chat permission flags: " << kit->value << std::endl ;
DISTANT_CHAT_DEBUG() << "Loaded distant chat permission flags: " << kit->value << std::endl ;
#endif
if (!kit->value.empty())
{

View File

@ -264,6 +264,10 @@ void p3ChatService::checkSizeAndSendMessage(RsChatMsgItem *msg)
static const uint32_t MAX_STRING_SIZE = 15000 ;
#ifdef CHAT_DEBUG
std::cerr << "Sending message: size=" << msg->message.size() << ", sha1sum=" << RsDirUtil::sha1sum((uint8_t*)msg->message.c_str(),msg->message.size()) << std::endl;
#endif
while(msg->message.size() > MAX_STRING_SIZE)
{
// chop off the first 15000 wchars
@ -278,11 +282,17 @@ void p3ChatService::checkSizeAndSendMessage(RsChatMsgItem *msg)
//
item->chatFlags &= (RS_CHAT_FLAG_PRIVATE | RS_CHAT_FLAG_PUBLIC | RS_CHAT_FLAG_LOBBY) ;
#ifdef CHAT_DEBUG
std::cerr << "Creating slice of size " << item->message.size() << std::endl;
#endif
// Indicate that the message is to be continued.
//
item->chatFlags |= RS_CHAT_FLAG_PARTIAL_MESSAGE ;
sendChatItem(item) ;
}
#ifdef CHAT_DEBUG
std::cerr << "Creating slice of size " << msg->message.size() << std::endl;
#endif
sendChatItem(msg) ;
}
@ -386,7 +396,7 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg)
if(it->second->_own_is_new)
{
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::sendChat: new avatar never sent to peer " << id << ". Setting <new> flag to packet." << std::endl;
std::cerr << "p3ChatService::sendChat: new avatar never sent to peer " << vpid << ". Setting <new> flag to packet." << std::endl;
#endif
ci->chatFlags |= RS_CHAT_FLAG_AVATAR_AVAILABLE ;
@ -395,7 +405,7 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg)
}
#ifdef CHAT_DEBUG
std::cerr << "Sending msg to (maybe virtual) peer " << id << ", flags = " << ci->chatFlags << std::endl ;
std::cerr << "Sending msg to (maybe virtual) peer " << vpid << ", flags = " << ci->chatFlags << std::endl ;
std::cerr << "p3ChatService::sendChat() Item:";
std::cerr << std::endl;
ci->print(std::cerr);
@ -435,7 +445,7 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg)
if(should_send_state_string)
{
#ifdef CHAT_DEBUG
std::cerr << "own status string is new for peer " << id << ": sending it." << std::endl ;
std::cerr << "own status string is new for peer " << vpid << ": sending it." << std::endl ;
#endif
RsChatStatusItem *cs = makeOwnCustomStateStringItem() ;
cs->PeerId(vpid) ;
@ -493,7 +503,7 @@ bool p3ChatService::locked_checkAndRebuildPartialMessage(RsChatMsgItem *& ci)
else
{
#ifdef CHAT_DEBUG
std::cerr << "Message is complete, using it now." << std::endl;
std::cerr << "Message is complete, using it now. Size = " << ci->message.size() << ", hash=" << RsDirUtil::sha1sum((uint8_t*)ci->message.c_str(),ci->message.size()) << std::endl;
#endif
return true ;
}

View File

@ -74,6 +74,7 @@ p3GxsTunnelService::p3GxsTunnelService(RsGixs *pids)
: mGixs(pids), mGxsTunnelMtx("GXS tunnel")
{
mTurtle = NULL ;
mCurrentPacketCounter = 0 ;
}
void p3GxsTunnelService::connectToTurtleRouter(p3turtle *tr)
@ -104,6 +105,7 @@ int p3GxsTunnelService::tick()
#ifdef DEBUG_GXS_TUNNEL
time_t now = time(NULL);
static time_t last_dump = 0;
if(now > last_dump + INTERVAL_BETWEEN_DEBUG_DUMP )
{
@ -288,7 +290,7 @@ void p3GxsTunnelService::handleIncomingItem(const RsGxsTunnelId& tunnel_id,RsGxs
case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA: handleRecvTunnelDataItem(tunnel_id,dynamic_cast<RsGxsTunnelDataItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA_ACK: handleRecvTunnelDataAckItem(tunnel_id,dynamic_cast<RsGxsTunnelDataAckItem*>(item)) ;
case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA_ACK: handleRecvTunnelDataAckItem(tunnel_id,dynamic_cast<RsGxsTunnelDataAckItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GXS_TUNNEL_STATUS: handleRecvStatusItem(tunnel_id,dynamic_cast<RsGxsTunnelStatusItem*>(item)) ;
@ -752,7 +754,7 @@ bool p3GxsTunnelService::handleEncryptedData(const uint8_t *data_bytes,uint32_t
std::cerr << " size = " << data_size << std::endl;
std::cerr << " data = " << (void*)data_bytes << std::endl;
std::cerr << " IV = " << std::hex << *(uint64_t*)data_bytes << std::dec << std::endl;
std::cerr << " data = " << RsUtil::BinToHex((char*)data_bytes,data_size) ;
std::cerr << " data = " << RsUtil::BinToHex((unsigned char*)data_bytes,data_size,100) ;
std::cerr << std::endl;
#endif
@ -793,9 +795,9 @@ bool p3GxsTunnelService::handleEncryptedData(const uint8_t *data_bytes,uint32_t
#ifdef DEBUG_GXS_TUNNEL
std::cerr << " Using IV: " << std::hex << *(uint64_t*)data_bytes << std::dec << std::endl;
std::cerr << " Decrypted buffer size: " << decrypted_size << std::endl;
std::cerr << " key : " << RsUtil::BinToHex((char*)aes_key,GXS_TUNNEL_AES_KEY_SIZE) << std::endl;
std::cerr << " hmac : " << RsUtil::BinToHex((char*)data_bytes+GXS_TUNNEL_ENCRYPTION_IV_SIZE,GXS_TUNNEL_ENCRYPTION_HMAC_SIZE) << std::endl;
std::cerr << " data : " << RsUtil::BinToHex((char*)data_bytes,data_size) << std::endl;
std::cerr << " key : " << RsUtil::BinToHex((unsigned char*)aes_key,GXS_TUNNEL_AES_KEY_SIZE) << std::endl;
std::cerr << " hmac : " << RsUtil::BinToHex((unsigned char*)data_bytes+GXS_TUNNEL_ENCRYPTION_IV_SIZE,GXS_TUNNEL_ENCRYPTION_HMAC_SIZE) << std::endl;
std::cerr << " data : " << RsUtil::BinToHex((unsigned char*)data_bytes,data_size,100) << std::endl;
#endif
// first, check the HMAC
@ -1211,7 +1213,7 @@ bool p3GxsTunnelService::locked_sendClearTunnelData(RsGxsTunnelDHPublicKeyItem *
#ifdef DEBUG_GXS_TUNNEL
std::cerr << " GxsTunnelService::sendClearTunnelData(): Sending clear data to virtual peer: " << item->PeerId() << std::endl;
std::cerr << " gitem->data_size = " << gitem->data_size << std::endl;
std::cerr << " data = " << RsUtil::BinToHex((char*)gitem->data_bytes,gitem->data_size) ;
std::cerr << " data = " << RsUtil::BinToHex((unsigned char*)gitem->data_bytes,gitem->data_size,100) ;
std::cerr << std::endl;
#endif
mTurtle->sendTurtleData(item->PeerId(),gitem) ;
@ -1307,7 +1309,7 @@ bool p3GxsTunnelService::locked_sendEncryptedTunnelData(RsGxsTunnelItem *item)
#ifdef DEBUG_GXS_TUNNEL
std::cerr << "GxsTunnelService::sendEncryptedTunnelData(): Sending encrypted data to virtual peer: " << virtual_peer_id << std::endl;
std::cerr << " gitem->data_size = " << gitem->data_size << std::endl;
std::cerr << " serialised data = " << RsUtil::BinToHex((char*)gitem->data_bytes,gitem->data_size) ;
std::cerr << " serialised data = " << RsUtil::BinToHex((unsigned char*)gitem->data_bytes,gitem->data_size,100) ;
std::cerr << std::endl;
#endif
@ -1346,6 +1348,16 @@ bool p3GxsTunnelService::requestSecuredTunnel(const RsGxsId& to_gxs_id, const Rs
return true ;
}
// This method generates an ID that should be unique for each packet sent to a same peer. Rather than a random value,
// we use this counter because outgoing items are sorted in a map by their counter value. Using an increasing value
// ensures that the packets are sent in the same order than received from the service. This can be useful in some cases,
// for instance when services split their items while expecting them to arrive in the same order.
uint64_t p3GxsTunnelService::locked_getPacketCounter()
{
return mCurrentPacketCounter++ ;
}
bool p3GxsTunnelService::sendData(const RsGxsTunnelId &tunnel_id, uint32_t service_id, const uint8_t *data, uint32_t size)
{
// make sure that the tunnel ID is registered.
@ -1381,7 +1393,7 @@ bool p3GxsTunnelService::sendData(const RsGxsTunnelId &tunnel_id, uint32_t servi
RsGxsTunnelDataItem *item = new RsGxsTunnelDataItem ;
item->unique_item_counter = RSRandom::random_u64(); // this allows to make the item unique, except very rarely, we we don't care.
item->unique_item_counter = locked_getPacketCounter() ;// this allows to make the item unique, while respecting the packet order!
item->flags = 0; // not used yet.
item->service_id = service_id;
item->data_size = size; // encrypted data size

View File

@ -230,7 +230,8 @@ private:
void handleRecvDHPublicKey(RsGxsTunnelDHPublicKeyItem *item) ;
bool locked_sendDHPublicKey(const DH *dh, const RsGxsId& own_gxs_id, const RsPeerId& virtual_peer_id) ;
bool locked_initDHSessionKey(DH *&dh);
uint64_t locked_getPacketCounter();
TurtleVirtualPeerId virtualPeerIdFromHash(const TurtleFileHash& hash) ; // ... and to a hash for p3turtle
// item handling
@ -252,6 +253,8 @@ private:
RsGixs *mGixs ;
RsMutex mGxsTunnelMtx ;
uint64_t mCurrentPacketCounter ;
std::map<uint32_t,RsGxsTunnelClientService*> mRegisteredServices ;
void debug_dump();