From 08faa3f5d54cc133657a07170119f0e4669eab3f Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 15 Jul 2017 22:16:45 +0200 Subject: [PATCH] fixed p3Gxstunnel so that it sends the data packets in the same order it received them. This fixes the bug in distant chat causing images to not transfer correctly in some cases --- libretroshare/src/chat/distantchat.cc | 42 ++++++++++++++++------ libretroshare/src/chat/p3chatservice.cc | 18 +++++++--- libretroshare/src/gxstunnel/p3gxstunnel.cc | 28 ++++++++++----- libretroshare/src/gxstunnel/p3gxstunnel.h | 5 ++- 4 files changed, 69 insertions(+), 24 deletions(-) diff --git a/libretroshare/src/chat/distantchat.cc b/libretroshare/src/chat/distantchat.cc index ffc2e7b82..26e7f7fe6 100644 --- a/libretroshare/src/chat/distantchat.cc +++ b/libretroshare/src/chat/distantchat.cc @@ -50,6 +50,19 @@ //#define DEBUG_DISTANT_CHAT +#ifdef DEBUG_DISTANT_CHAT + +#include + +uint32_t msecs_of_day() +{ + timeval tv ; + gettimeofday(&tv,NULL) ; + return tv.tv_usec / 1000 ; +} +#define DISTANT_CHAT_DEBUG() std::cerr << time(NULL) << "." << std::setfill('0') << std::setw(3) << msecs_of_day() << " : DISTANT_CHAT : " << __FUNCTION__ << " : " +#endif + static const uint32_t DISTANT_CHAT_KEEP_ALIVE_TIMEOUT = 6 ; // send keep alive packet so as to avoid tunnel breaks. static const uint32_t RS_DISTANT_CHAT_DH_STATUS_UNINITIALIZED = 0x0000 ; @@ -86,7 +99,7 @@ bool DistantChatService::handleOutgoingItem(RsChatItem *item) } #ifdef CHAT_DEBUG - std::cerr << "p3ChatService::handleOutgoingItem(): sending to " << item->PeerId() << ": interpreted as a distant chat virtual peer id." << std::endl; + DISTANT_CHAT_DEBUG() << "p3ChatService::handleOutgoingItem(): sending to " << item->PeerId() << ": interpreted as a distant chat virtual peer id." << std::endl; #endif uint32_t size = RsChatSerialiser().size(item) ; @@ -98,7 +111,9 @@ bool DistantChatService::handleOutgoingItem(RsChatItem *item) return false; } #ifdef DEBUG_DISTANT_CHAT - std::cerr << " sending: " << RsUtil::BinToHex(mem,size) << std::endl; + DISTANT_CHAT_DEBUG() << " sending: " << RsUtil::BinToHex(mem,size,100) << std::endl; + DISTANT_CHAT_DEBUG() << " size: " << std::dec << size << std::endl; + DISTANT_CHAT_DEBUG() << " hash: " << RsDirUtil::sha1sum(mem,size) << std::endl; #endif mGxsTunnels->sendData( RsGxsTunnelId(item->PeerId()),DISTANT_CHAT_GXS_TUNNEL_SERVICE_ID,mem,size); @@ -110,7 +125,7 @@ void DistantChatService::handleRecvChatStatusItem(RsChatStatusItem *cs) if(cs->flags & RS_CHAT_FLAG_CONNEXION_REFUSED) { #ifdef DEBUG_DISTANT_CHAT - std::cerr << "(II) Distant chat: received notification that peer refuses conversation." << std::endl; + DISTANT_CHAT_DEBUG() << "(II) Distant chat: received notification that peer refuses conversation." << std::endl; #endif RsServer::notify()->notifyChatStatus(ChatId(DistantChatPeerId(cs->PeerId())),"Connexion refused by distant peer!") ; } @@ -140,7 +155,7 @@ bool DistantChatService::acceptDataFromPeer(const RsGxsId& gxs_id,const RsGxsTun if(!res) { #ifdef DEBUG_DISTANT_CHAT - std::cerr << "(II) refusing distant chat from peer " << gxs_id << ". Sending a notification back to tunnel " << tunnel_id << std::endl; + DISTANT_CHAT_DEBUG() << "(II) refusing distant chat from peer " << gxs_id << ". Sending a notification back to tunnel " << tunnel_id << std::endl; #endif RsChatStatusItem *item = new RsChatStatusItem ; item->flags = RS_CHAT_FLAG_CONNEXION_REFUSED ; @@ -158,7 +173,11 @@ bool DistantChatService::acceptDataFromPeer(const RsGxsId& gxs_id,const RsGxsTun return false; } - std::cerr << " sending: " << RsUtil::BinToHex(mem,size) << std::endl; +#ifdef DEBUG_DISTANT_CHAT + DISTANT_CHAT_DEBUG() << " sending: " << RsUtil::BinToHex(mem,size,100) << std::endl; + DISTANT_CHAT_DEBUG() << " size: " << std::dec << std::endl; + DISTANT_CHAT_DEBUG() << " hash: " << RsDirUtil::sha1sum(mem,size) << std::endl; +#endif mGxsTunnels->sendData( RsGxsTunnelId(item->PeerId()),DISTANT_CHAT_GXS_TUNNEL_SERVICE_ID,mem,size); } @@ -169,7 +188,7 @@ bool DistantChatService::acceptDataFromPeer(const RsGxsId& gxs_id,const RsGxsTun void DistantChatService::notifyTunnelStatus(const RsGxsTunnelService::RsGxsTunnelId &tunnel_id, uint32_t tunnel_status) { #ifdef DEBUG_DISTANT_CHAT - std::cerr << "DistantChatService::notifyTunnelStatus(): got notification " << std::hex << tunnel_status << std::dec << " for tunnel " << tunnel_id << std::endl; + DISTANT_CHAT_DEBUG() << "DistantChatService::notifyTunnelStatus(): got notification " << std::hex << tunnel_status << std::dec << " for tunnel " << tunnel_id << std::endl; #endif switch(tunnel_status) @@ -195,9 +214,10 @@ void DistantChatService::notifyTunnelStatus(const RsGxsTunnelService::RsGxsTunne void DistantChatService::receiveData(const RsGxsTunnelService::RsGxsTunnelId &tunnel_id, unsigned char *data, uint32_t data_size) { #ifdef DEBUG_DISTANT_CHAT - std::cerr << "DistantChatService::receiveData(): got data of size " << data_size << " for tunnel " << tunnel_id << std::endl; - std::cerr << " received: " << RsUtil::BinToHex(data,data_size) << std::endl; - std::cerr << " deserialising..." << std::endl; + DISTANT_CHAT_DEBUG() << "DistantChatService::receiveData(): got data of size " << std::dec << data_size << " for tunnel " << tunnel_id << std::endl; + DISTANT_CHAT_DEBUG() << " received: " << RsUtil::BinToHex(data,data_size,100) << std::endl; + DISTANT_CHAT_DEBUG() << " hash: " << RsDirUtil::sha1sum(data,data_size) << std::endl; + DISTANT_CHAT_DEBUG() << " deserialising..." << std::endl; #endif // always make the contact up to date. This is useful for server side, which doesn't know about the chat until it @@ -324,7 +344,7 @@ bool DistantChatService::setDistantChatPermissionFlags(uint32_t flags) { mDistantChatPermissions = flags ; #ifdef DEBUG_DISTANT_CHAT - std::cerr << "(II) Changing distant chat permissions to " << flags << ". Existing openned chats will however remain active until closed" << std::endl; + DISTANT_CHAT_DEBUG() << "(II) Changing distant chat permissions to " << flags << ". Existing openned chats will however remain active until closed" << std::endl; #endif triggerConfigSave() ; } @@ -354,7 +374,7 @@ bool DistantChatService::processLoadListItem(const RsItem *item) if(kit->key == "DISTANT_CHAT_PERMISSION_FLAGS") { #ifdef DEBUG_DISTANT_CHAT - std::cerr << "Loaded distant chat permission flags: " << kit->value << std::endl ; + DISTANT_CHAT_DEBUG() << "Loaded distant chat permission flags: " << kit->value << std::endl ; #endif if (!kit->value.empty()) { diff --git a/libretroshare/src/chat/p3chatservice.cc b/libretroshare/src/chat/p3chatservice.cc index 64c4e9534..9b34460e7 100644 --- a/libretroshare/src/chat/p3chatservice.cc +++ b/libretroshare/src/chat/p3chatservice.cc @@ -264,6 +264,10 @@ void p3ChatService::checkSizeAndSendMessage(RsChatMsgItem *msg) static const uint32_t MAX_STRING_SIZE = 15000 ; +#ifdef CHAT_DEBUG + std::cerr << "Sending message: size=" << msg->message.size() << ", sha1sum=" << RsDirUtil::sha1sum((uint8_t*)msg->message.c_str(),msg->message.size()) << std::endl; +#endif + while(msg->message.size() > MAX_STRING_SIZE) { // chop off the first 15000 wchars @@ -278,11 +282,17 @@ void p3ChatService::checkSizeAndSendMessage(RsChatMsgItem *msg) // item->chatFlags &= (RS_CHAT_FLAG_PRIVATE | RS_CHAT_FLAG_PUBLIC | RS_CHAT_FLAG_LOBBY) ; +#ifdef CHAT_DEBUG + std::cerr << "Creating slice of size " << item->message.size() << std::endl; +#endif // Indicate that the message is to be continued. // item->chatFlags |= RS_CHAT_FLAG_PARTIAL_MESSAGE ; sendChatItem(item) ; } +#ifdef CHAT_DEBUG + std::cerr << "Creating slice of size " << msg->message.size() << std::endl; +#endif sendChatItem(msg) ; } @@ -386,7 +396,7 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg) if(it->second->_own_is_new) { #ifdef CHAT_DEBUG - std::cerr << "p3ChatService::sendChat: new avatar never sent to peer " << id << ". Setting flag to packet." << std::endl; + std::cerr << "p3ChatService::sendChat: new avatar never sent to peer " << vpid << ". Setting flag to packet." << std::endl; #endif ci->chatFlags |= RS_CHAT_FLAG_AVATAR_AVAILABLE ; @@ -395,7 +405,7 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg) } #ifdef CHAT_DEBUG - std::cerr << "Sending msg to (maybe virtual) peer " << id << ", flags = " << ci->chatFlags << std::endl ; + std::cerr << "Sending msg to (maybe virtual) peer " << vpid << ", flags = " << ci->chatFlags << std::endl ; std::cerr << "p3ChatService::sendChat() Item:"; std::cerr << std::endl; ci->print(std::cerr); @@ -435,7 +445,7 @@ bool p3ChatService::sendChat(ChatId destination, std::string msg) if(should_send_state_string) { #ifdef CHAT_DEBUG - std::cerr << "own status string is new for peer " << id << ": sending it." << std::endl ; + std::cerr << "own status string is new for peer " << vpid << ": sending it." << std::endl ; #endif RsChatStatusItem *cs = makeOwnCustomStateStringItem() ; cs->PeerId(vpid) ; @@ -493,7 +503,7 @@ bool p3ChatService::locked_checkAndRebuildPartialMessage(RsChatMsgItem *& ci) else { #ifdef CHAT_DEBUG - std::cerr << "Message is complete, using it now." << std::endl; + std::cerr << "Message is complete, using it now. Size = " << ci->message.size() << ", hash=" << RsDirUtil::sha1sum((uint8_t*)ci->message.c_str(),ci->message.size()) << std::endl; #endif return true ; } diff --git a/libretroshare/src/gxstunnel/p3gxstunnel.cc b/libretroshare/src/gxstunnel/p3gxstunnel.cc index 46d9179b1..677fa2f92 100644 --- a/libretroshare/src/gxstunnel/p3gxstunnel.cc +++ b/libretroshare/src/gxstunnel/p3gxstunnel.cc @@ -74,6 +74,7 @@ p3GxsTunnelService::p3GxsTunnelService(RsGixs *pids) : mGixs(pids), mGxsTunnelMtx("GXS tunnel") { mTurtle = NULL ; + mCurrentPacketCounter = 0 ; } void p3GxsTunnelService::connectToTurtleRouter(p3turtle *tr) @@ -104,6 +105,7 @@ int p3GxsTunnelService::tick() #ifdef DEBUG_GXS_TUNNEL time_t now = time(NULL); + static time_t last_dump = 0; if(now > last_dump + INTERVAL_BETWEEN_DEBUG_DUMP ) { @@ -288,7 +290,7 @@ void p3GxsTunnelService::handleIncomingItem(const RsGxsTunnelId& tunnel_id,RsGxs case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA: handleRecvTunnelDataItem(tunnel_id,dynamic_cast(item)) ; break ; - case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA_ACK: handleRecvTunnelDataAckItem(tunnel_id,dynamic_cast(item)) ; + case RS_PKT_SUBTYPE_GXS_TUNNEL_DATA_ACK: handleRecvTunnelDataAckItem(tunnel_id,dynamic_cast(item)) ; break ; case RS_PKT_SUBTYPE_GXS_TUNNEL_STATUS: handleRecvStatusItem(tunnel_id,dynamic_cast(item)) ; @@ -752,7 +754,7 @@ bool p3GxsTunnelService::handleEncryptedData(const uint8_t *data_bytes,uint32_t std::cerr << " size = " << data_size << std::endl; std::cerr << " data = " << (void*)data_bytes << std::endl; std::cerr << " IV = " << std::hex << *(uint64_t*)data_bytes << std::dec << std::endl; - std::cerr << " data = " << RsUtil::BinToHex((char*)data_bytes,data_size) ; + std::cerr << " data = " << RsUtil::BinToHex((unsigned char*)data_bytes,data_size,100) ; std::cerr << std::endl; #endif @@ -793,9 +795,9 @@ bool p3GxsTunnelService::handleEncryptedData(const uint8_t *data_bytes,uint32_t #ifdef DEBUG_GXS_TUNNEL std::cerr << " Using IV: " << std::hex << *(uint64_t*)data_bytes << std::dec << std::endl; std::cerr << " Decrypted buffer size: " << decrypted_size << std::endl; - std::cerr << " key : " << RsUtil::BinToHex((char*)aes_key,GXS_TUNNEL_AES_KEY_SIZE) << std::endl; - std::cerr << " hmac : " << RsUtil::BinToHex((char*)data_bytes+GXS_TUNNEL_ENCRYPTION_IV_SIZE,GXS_TUNNEL_ENCRYPTION_HMAC_SIZE) << std::endl; - std::cerr << " data : " << RsUtil::BinToHex((char*)data_bytes,data_size) << std::endl; + std::cerr << " key : " << RsUtil::BinToHex((unsigned char*)aes_key,GXS_TUNNEL_AES_KEY_SIZE) << std::endl; + std::cerr << " hmac : " << RsUtil::BinToHex((unsigned char*)data_bytes+GXS_TUNNEL_ENCRYPTION_IV_SIZE,GXS_TUNNEL_ENCRYPTION_HMAC_SIZE) << std::endl; + std::cerr << " data : " << RsUtil::BinToHex((unsigned char*)data_bytes,data_size,100) << std::endl; #endif // first, check the HMAC @@ -1211,7 +1213,7 @@ bool p3GxsTunnelService::locked_sendClearTunnelData(RsGxsTunnelDHPublicKeyItem * #ifdef DEBUG_GXS_TUNNEL std::cerr << " GxsTunnelService::sendClearTunnelData(): Sending clear data to virtual peer: " << item->PeerId() << std::endl; std::cerr << " gitem->data_size = " << gitem->data_size << std::endl; - std::cerr << " data = " << RsUtil::BinToHex((char*)gitem->data_bytes,gitem->data_size) ; + std::cerr << " data = " << RsUtil::BinToHex((unsigned char*)gitem->data_bytes,gitem->data_size,100) ; std::cerr << std::endl; #endif mTurtle->sendTurtleData(item->PeerId(),gitem) ; @@ -1307,7 +1309,7 @@ bool p3GxsTunnelService::locked_sendEncryptedTunnelData(RsGxsTunnelItem *item) #ifdef DEBUG_GXS_TUNNEL std::cerr << "GxsTunnelService::sendEncryptedTunnelData(): Sending encrypted data to virtual peer: " << virtual_peer_id << std::endl; std::cerr << " gitem->data_size = " << gitem->data_size << std::endl; - std::cerr << " serialised data = " << RsUtil::BinToHex((char*)gitem->data_bytes,gitem->data_size) ; + std::cerr << " serialised data = " << RsUtil::BinToHex((unsigned char*)gitem->data_bytes,gitem->data_size,100) ; std::cerr << std::endl; #endif @@ -1346,6 +1348,16 @@ bool p3GxsTunnelService::requestSecuredTunnel(const RsGxsId& to_gxs_id, const Rs return true ; } +// This method generates an ID that should be unique for each packet sent to a same peer. Rather than a random value, +// we use this counter because outgoing items are sorted in a map by their counter value. Using an increasing value +// ensures that the packets are sent in the same order than received from the service. This can be useful in some cases, +// for instance when services split their items while expecting them to arrive in the same order. + +uint64_t p3GxsTunnelService::locked_getPacketCounter() +{ + return mCurrentPacketCounter++ ; +} + bool p3GxsTunnelService::sendData(const RsGxsTunnelId &tunnel_id, uint32_t service_id, const uint8_t *data, uint32_t size) { // make sure that the tunnel ID is registered. @@ -1381,7 +1393,7 @@ bool p3GxsTunnelService::sendData(const RsGxsTunnelId &tunnel_id, uint32_t servi RsGxsTunnelDataItem *item = new RsGxsTunnelDataItem ; - item->unique_item_counter = RSRandom::random_u64(); // this allows to make the item unique, except very rarely, we we don't care. + item->unique_item_counter = locked_getPacketCounter() ;// this allows to make the item unique, while respecting the packet order! item->flags = 0; // not used yet. item->service_id = service_id; item->data_size = size; // encrypted data size diff --git a/libretroshare/src/gxstunnel/p3gxstunnel.h b/libretroshare/src/gxstunnel/p3gxstunnel.h index 8d4ebfefc..0586732ba 100644 --- a/libretroshare/src/gxstunnel/p3gxstunnel.h +++ b/libretroshare/src/gxstunnel/p3gxstunnel.h @@ -230,7 +230,8 @@ private: void handleRecvDHPublicKey(RsGxsTunnelDHPublicKeyItem *item) ; bool locked_sendDHPublicKey(const DH *dh, const RsGxsId& own_gxs_id, const RsPeerId& virtual_peer_id) ; bool locked_initDHSessionKey(DH *&dh); - + uint64_t locked_getPacketCounter(); + TurtleVirtualPeerId virtualPeerIdFromHash(const TurtleFileHash& hash) ; // ... and to a hash for p3turtle // item handling @@ -252,6 +253,8 @@ private: RsGixs *mGixs ; RsMutex mGxsTunnelMtx ; + uint64_t mCurrentPacketCounter ; + std::map mRegisteredServices ; void debug_dump();