From 01da2fbe9e1b754375b5d1bc58c206aa731e7499 Mon Sep 17 00:00:00 2001 From: csoler Date: Thu, 4 Nov 2021 14:24:19 +0100 Subject: [PATCH] added debug output and fixed one bug in pqistreamer --- libretroshare/src/friend_server/fsbio.cc | 11 ++- libretroshare/src/friend_server/fsclient.cc | 79 +++++++++++++-------- libretroshare/src/friend_server/fsclient.h | 1 + libretroshare/src/friend_server/fsitem.h | 23 ++++++ libretroshare/src/pqi/pqistreamer.cc | 13 ++-- retroshare-friendserver/src/friendserver.cc | 38 ++++++---- retroshare-friendserver/src/network.cc | 5 +- 7 files changed, 117 insertions(+), 53 deletions(-) diff --git a/libretroshare/src/friend_server/fsbio.cc b/libretroshare/src/friend_server/fsbio.cc index 8d6933a76..6668a59c9 100644 --- a/libretroshare/src/friend_server/fsbio.cc +++ b/libretroshare/src/friend_server/fsbio.cc @@ -129,9 +129,14 @@ int FsBioInterface::readdata(void *data, int len) int FsBioInterface::senddata(void *data, int len) { -// int written = write(mCLintConnt, data, len); -// return written; - return len; + // shouldn't we better send in multiple packets, similarly to how we read? + + RsDbg() << "FsBioInterface: sending data packet of size " << len ; + + int written = write(mCLintConnt, data, len); + RsDbg() << "FsBioInterface: done."; + + return written; } int FsBioInterface::netstatus() { diff --git a/libretroshare/src/friend_server/fsclient.cc b/libretroshare/src/friend_server/fsclient.cc index 5548168a2..e76e75d71 100644 --- a/libretroshare/src/friend_server/fsclient.cc +++ b/libretroshare/src/friend_server/fsclient.cc @@ -55,30 +55,35 @@ bool FsClient::requestFriends(const std::string& address,uint16_t port,uint32_t for(auto item:response) { - auto *encrypted_response_item = dynamic_cast(item); + // auto *encrypted_response_item = dynamic_cast(item); - if(!encrypted_response_item) - { - delete item; - continue; - } + // if(!encrypted_response_item) + // { + // delete item; + // continue; + // } // For now, also handle unencrypted response items. Will be disabled in production auto *response_item = dynamic_cast(item); - if(!response_item) - { - delete item; - continue; - } + if(response_item) + handleServerResponse(response_item); - for(const auto& it:response_item->friend_invites) - friend_certificates.insert(it); + delete item; } return friend_certificates.size(); } +void FsClient::handleServerResponse(RsFriendServerServerResponseItem *item) +{ + std::cerr << "Received a response item from server: " << std::endl; + std::cerr << *item << std::endl; + + // for(const auto& it:response_item->friend_invites) + // friend_certificates.insert(it); +} + bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,std::list& response) { // open a connection @@ -131,42 +136,56 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st FsBioInterface *bio = new FsBioInterface(CreateSocket); // deleted by ~pqistreamer() - pqithreadstreamer p(this,rss,RsPeerId(),bio,BIN_FLAGS_READABLE | BIN_FLAGS_NO_DELETE | BIN_FLAGS_NO_CLOSE); + pqithreadstreamer p(this,rss,RsPeerId(),bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE | BIN_FLAGS_NO_CLOSE); p.start(); uint32_t ss; p.SendItem(item,ss); + bool should_close = false; while(true) { - p.tick(); + p.tick(); // ticks bio RsItem *item = GetItem(); + RsDbg() << "Ticking for response..."; + if(item) { response.push_back(item); std::cerr << "Got a response item: " << std::endl; std::cerr << *item << std::endl; + + if(dynamic_cast(item) != nullptr) + { + RsDbg() << "End of transmission. " ; + should_close = true; + break; + } + + if(!bio->isactive()) // socket has probably closed + { + RsDbg() << "(client side) Socket has been closed by server."; + should_close =true; + break; + } } - - if(!bio->isactive()) // socket has probably closed - { - RsDbg() << "(client side) Socket has been closed by server."; - RsDbg() << " Stopping/killing pqistreamer" ; - p.fullstop(); - - RsDbg() << " Closing socket." ; - close(CreateSocket); - CreateSocket=0; - - RsDbg() << " Exiting loop." ; - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(200)); } + if(should_close) + { + RsDbg() << " Stopping/killing pqistreamer" ; + p.fullstop(); + + RsDbg() << " Closing socket." ; + close(CreateSocket); + CreateSocket=0; + + RsDbg() << " Exiting loop." ; + } + return true; } diff --git a/libretroshare/src/friend_server/fsclient.h b/libretroshare/src/friend_server/fsclient.h index a14fc82ef..4c17c142d 100644 --- a/libretroshare/src/friend_server/fsclient.h +++ b/libretroshare/src/friend_server/fsclient.h @@ -42,6 +42,7 @@ protected: private: bool sendItem(const std::string &address, uint16_t port, RsItem *item, std::list &response); + void handleServerResponse(RsFriendServerServerResponseItem *item); std::list mIncomingItems; }; diff --git a/libretroshare/src/friend_server/fsitem.h b/libretroshare/src/friend_server/fsitem.h index ca6d4d076..95522b550 100644 --- a/libretroshare/src/friend_server/fsitem.h +++ b/libretroshare/src/friend_server/fsitem.h @@ -34,6 +34,7 @@ const uint8_t RS_PKT_SUBTYPE_FS_CLIENT_PUBLISH = 0x01 ; const uint8_t RS_PKT_SUBTYPE_FS_CLIENT_REMOVE = 0x02 ; const uint8_t RS_PKT_SUBTYPE_FS_SERVER_RESPONSE = 0x03 ; const uint8_t RS_PKT_SUBTYPE_FS_SERVER_ENCRYPTED_RESPONSE = 0x04 ; +const uint8_t RS_PKT_SUBTYPE_FS_SERVER_STATUS = 0x05 ; class RsFriendServerItem: public RsItem { @@ -71,6 +72,27 @@ public: std::string pgp_public_key_b64; }; +class RsFriendServerStatusItem: public RsFriendServerItem +{ +public: + RsFriendServerStatusItem() : RsFriendServerItem(RS_PKT_SUBTYPE_FS_SERVER_STATUS) {} + + void serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx) override + { + RS_SERIAL_PROCESS(status); + } + + enum ConnectionStatus: uint8_t + { + UNKNOWN = 0x00, + END_OF_TRANSMISSION = 0x01 + }; + + // specific members for that item + + ConnectionStatus status; +}; + class RsFriendServerClientRemoveItem: public RsFriendServerItem { public: @@ -152,6 +174,7 @@ struct FsSerializer : RsServiceSerializer case RS_PKT_SUBTYPE_FS_CLIENT_REMOVE: return new RsFriendServerClientRemoveItem(); case RS_PKT_SUBTYPE_FS_CLIENT_PUBLISH: return new RsFriendServerClientPublishItem(); case RS_PKT_SUBTYPE_FS_SERVER_RESPONSE: return new RsFriendServerServerResponseItem(); + case RS_PKT_SUBTYPE_FS_SERVER_STATUS: return new RsFriendServerStatusItem(); case RS_PKT_SUBTYPE_FS_SERVER_ENCRYPTED_RESPONSE: return new RsFriendServerEncryptedServerResponseItem(); default: RsErr() << "Unknown subitem type " << item_sub_id << " in FsSerialiser" ; diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 96b91dae6..6e01b8e71 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -357,6 +357,7 @@ int pqistreamer::status() // this method is overloaded by pqiqosstreamer void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int) { + RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue."; mOutPkts.push_back(ptr); } @@ -375,9 +376,9 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize) if(ptr == NULL) return 0 ; -#ifdef DEBUG_PQISTREAMER +//#ifdef DEBUG_PQISTREAMER std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl; -#endif +//#endif /*******************************************************************************************/ // keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1 @@ -521,7 +522,7 @@ int pqistreamer::handleoutgoing_locked() { /* if we are not active - clear anything in the queues. */ locked_clear_out_queue() ; -#ifdef DEBUG_PACKET_SLICING +#ifdef DEBUG_PACKET_SLICING std::cerr << "(II) Switching off packet slicing." << std::endl; #endif mAcceptsPacketSlicing = false ; @@ -1430,8 +1431,12 @@ void *pqistreamer::locked_pop_out_data(uint32_t /*max_slice_size*/, uint32_t &si { res = *(mOutPkts.begin()); mOutPkts.pop_front(); + + // In pqistreamer, we do not split outgoing packets. For now only pqiQoSStreamer supports packet slicing. + size = getRsItemSize(res); + #ifdef DEBUG_TRANSFERS - std::cerr << "pqistreamer::locked_pop_out_data() getting next pkt from mOutPkts queue"; + std::cerr << "pqistreamer::locked_pop_out_data() getting next pkt " << std::hex << res << std::dec << " from mOutPkts queue"; std::cerr << std::endl; #endif } diff --git a/retroshare-friendserver/src/friendserver.cc b/retroshare-friendserver/src/friendserver.cc index c74fd4801..a9e699730 100644 --- a/retroshare-friendserver/src/friendserver.cc +++ b/retroshare-friendserver/src/friendserver.cc @@ -17,8 +17,6 @@ void FriendServer::threadTick() { // Listen to the network interface, capture incoming data etc. - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - RsItem *item; while(nullptr != (item = mni->GetItem())) @@ -43,6 +41,7 @@ void FriendServer::threadTick() } delete item; } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); static rstime_t last_autowash_TS = time(nullptr); rstime_t now = time(nullptr); @@ -78,6 +77,8 @@ void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *it // No need to test for it==mCurrentClients.end() because it will be directly caught by the exception handling below even before. // Respond with a list of potential friends + RsDbg() << "Sending response item to " << item->PeerId() ; + RsFriendServerServerResponseItem *sr_item = new RsFriendServerServerResponseItem; sr_item->nonce = pi->second.last_nonce; @@ -85,17 +86,30 @@ void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *it sr_item->PeerId(item->PeerId()); mni->SendItem(sr_item); + } catch(std::exception& e) { RsErr() << "ERROR: " << e.what() ; + + RsFriendServerStatusItem *status_item = new RsFriendServerStatusItem; + status_item->status = RsFriendServerStatusItem::END_OF_TRANSMISSION; + status_item->PeerId(item->PeerId()); + mni->SendItem(status_item); + return; } // Close client connection from server side, to tell the client that nothing more is coming. + //RsDbg() << "Closing client connection." ; + //mni->closeConnection(item->PeerId()); - RsDbg() << "Closing client connection." ; + RsDbg() << "Sending end-of-stream item to " << item->PeerId() ; - mni->closeConnection(item->PeerId()); + RsFriendServerStatusItem *status_item = new RsFriendServerStatusItem; + status_item->status = RsFriendServerStatusItem::END_OF_TRANSMISSION; + status_item->PeerId(item->PeerId()); + + mni->SendItem(status_item); } std::map FriendServer::computeListOfFriendInvites(uint32_t nb_reqs_invites, const RsPeerId &pid, const RsPgpFingerprint &fpr) @@ -126,16 +140,11 @@ std::map::iterator FriendServer::handleIncomingClientData(con RsPgpId pgp_id ; std::vector key_binary_data ; - key_binary_data = Radix64::decode(pgp_public_key_b64); + // key_binary_data = Radix64::decode(pgp_public_key_b64); - if(key_binary_data.empty()) + if(RsBase64::decode(pgp_public_key_b64,key_binary_data)) throw std::runtime_error(" Cannot decode client pgp public key: \"" + pgp_public_key_b64 + "\". Wrong format??"); -// Apparently RsBase64 doesn't work correctly. -// -// if(!RsBase64::decode(item->pgp_public_key_b64,key_binary_data)) -// throw std::runtime_error(" Cannot decode client pgp public key: \"" + item->pgp_public_key_b64 + "\". Wrong format??"); - RsDbg() << " Public key radix is fine." ; if(!mPgpHandler->LoadCertificateFromBinaryData(key_binary_data.data(),key_binary_data.size(), pgp_id, error_string)) @@ -236,9 +245,9 @@ void FriendServer::run() void FriendServer::autoWash() { rstime_t now = time(nullptr); + RsDbg() << "autoWash..." ; for(std::map::iterator it(mCurrentClientPeers.begin());it!=mCurrentClientPeers.end();) - { if(it->second.last_connection_TS + MAXIMUM_PEER_INACTIVE_DELAY < now) { RsDbg() << "Removing client peer " << it->first << " because it's inactive for more than " << MAXIMUM_PEER_INACTIVE_DELAY << " seconds." ; @@ -247,7 +256,10 @@ void FriendServer::autoWash() mCurrentClientPeers.erase(it); it = tmp; } - } + else + ++it; + + RsDbg() << "done." ; } void FriendServer::debugPrint() diff --git a/retroshare-friendserver/src/network.cc b/retroshare-friendserver/src/network.cc index 243981717..f02cb6395 100644 --- a/retroshare-friendserver/src/network.cc +++ b/retroshare-friendserver/src/network.cc @@ -95,9 +95,7 @@ void FsNetworkInterface::threadTick() RS_STACK_MUTEX(mFsNiMtx); for(auto& it:mConnections) - { it.second.pqi_thread->tick(); - } rstime::rs_usleep(1000*200); } @@ -210,7 +208,8 @@ int FsNetworkInterface::SendItem(RsItem *item) return 0; } - return it->second.pqi_thread->SendItem(item); + uint32_t ss; + return it->second.pqi_thread->SendItem(item,ss); } void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)