diff --git a/libretroshare/src/friend_server/fsbio.cc b/libretroshare/src/friend_server/fsbio.cc index ecff610ea..8d6933a76 100644 --- a/libretroshare/src/friend_server/fsbio.cc +++ b/libretroshare/src/friend_server/fsbio.cc @@ -43,10 +43,10 @@ int FsBioInterface::tick() if(readbytes == 0) { - RsDbg() << "Reached END of the stream!" << std::endl; - RsDbg() << "Closing!" << std::endl; + RsDbg() << "Reached END of the stream!" ; + RsDbg() << "Closing!" ; - mIsActive = false; + close(); return mTotalBufferBytes; } if(readbytes < 0) @@ -140,7 +140,7 @@ int FsBioInterface::netstatus() int FsBioInterface::isactive() { - return mCLintConnt > 0; + return mIsActive ; } bool FsBioInterface::moretoread(uint32_t /* usec */) diff --git a/libretroshare/src/friend_server/fsclient.cc b/libretroshare/src/friend_server/fsclient.cc index 377055505..9b2459b7c 100644 --- a/libretroshare/src/friend_server/fsclient.cc +++ b/libretroshare/src/friend_server/fsclient.cc @@ -113,20 +113,24 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st rss.addSerialType(fss); FsSerializer().serialise(item,data,&size); + write(CreateSocket,data,size); // shouldn't we use the pqistreamer in R/W mode instead? - // TODO: we should write in multiple chunks just in case the socket is not fully ready - write(CreateSocket,data,size); + RsDbg() << "Item sent. Waiting for response..." ; // Now attempt to read and deserialize anything that comes back from that connexion until it gets closed by the server. FsBioInterface bio(CreateSocket); - pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); - pqithreadstreamer p(&pqi,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); + pqithreadstreamer p(this,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE | BIN_FLAGS_NO_DELETE); p.start(); + uint32_t ss; + p.SendItem(item,ss); + while(true) { - RsItem *item = p.GetItem(); + p.tick(); + + RsItem *item = GetItem(); if(item) { @@ -134,11 +138,6 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st std::cerr << "Got a response item: " << std::endl; std::cerr << *item << std::endl; } - else - { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - continue; - } if(!bio.isactive()) // socket has probably closed { @@ -147,11 +146,28 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st close(CreateSocket); CreateSocket=0; + break; } + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } - return 0; - - // if ok, stream the item through it + return true; } +bool FsClient::RecvItem(RsItem *item) +{ + mIncomingItems.push_back(item); + return true; +} + +RsItem *FsClient::GetItem() +{ + if(mIncomingItems.empty()) + return nullptr; + + RsItem *item = mIncomingItems.front(); + mIncomingItems.pop_front(); + + return item; +} diff --git a/libretroshare/src/friend_server/fsclient.h b/libretroshare/src/friend_server/fsclient.h index 76af44a43..a14fc82ef 100644 --- a/libretroshare/src/friend_server/fsclient.h +++ b/libretroshare/src/friend_server/fsclient.h @@ -22,17 +22,27 @@ #include #include "fsitem.h" +#include "pqi/pqi_base.h" // This class runs a client connection to the friend server. It opens a socket at each connection. -class FsClient +class FsClient: public PQInterface { public: - FsClient() {} + FsClient() :PQInterface(RsPeerId()) {} bool requestFriends(const std::string& address,uint16_t port,uint32_t reqs,std::map& friend_certificates); +protected: + // Implements PQInterface + + bool RecvItem(RsItem *item) override; + int SendItem(RsItem *) override { RsErr() << "FsClient::SendItem() called although it should not." ; return 0;} + RsItem *GetItem() override; + private: bool sendItem(const std::string &address, uint16_t port, RsItem *item, std::list &response); + + std::list mIncomingItems; }; diff --git a/libretroshare/src/friend_server/fsmanager.cc b/libretroshare/src/friend_server/fsmanager.cc index ca520bb4f..876ae8170 100644 --- a/libretroshare/src/friend_server/fsmanager.cc +++ b/libretroshare/src/friend_server/fsmanager.cc @@ -96,6 +96,8 @@ void FriendServerManager::threadTick() if(mLastFriendReqestCampain + delay_for_request < now) { + mLastFriendReqestCampain = now; + std::cerr << "Requesting new friends to friend server..." << std::endl; std::map friend_certificates; diff --git a/retroshare-friendserver/src/network.cc b/retroshare-friendserver/src/network.cc index 33c26c297..18ef4c7fa 100644 --- a/retroshare-friendserver/src/network.cc +++ b/retroshare-friendserver/src/network.cc @@ -215,6 +215,8 @@ int FsNetworkInterface::SendItem(RsItem *item) void FsNetworkInterface::closeConnection(const RsPeerId& peer_id) { + RS_STACK_MUTEX(mFsNiMtx); + const auto& it = mConnections.find(peer_id); if(it == mConnections.end()) @@ -240,7 +242,8 @@ void FsNetworkInterface::closeConnection(const RsPeerId& peer_id) close(it->second.socket); delete it->second.pqi_thread; - delete it->second.bio; + + mConnections.erase(it); }