fixed two-ways communication between RS and friend server

This commit is contained in:
csoler 2021-10-31 16:46:06 +01:00
parent 42b4a821bd
commit a69f9dc09b
5 changed files with 51 additions and 20 deletions

View File

@ -43,10 +43,10 @@ int FsBioInterface::tick()
if(readbytes == 0) if(readbytes == 0)
{ {
RsDbg() << "Reached END of the stream!" << std::endl; RsDbg() << "Reached END of the stream!" ;
RsDbg() << "Closing!" << std::endl; RsDbg() << "Closing!" ;
mIsActive = false; close();
return mTotalBufferBytes; return mTotalBufferBytes;
} }
if(readbytes < 0) if(readbytes < 0)
@ -140,7 +140,7 @@ int FsBioInterface::netstatus()
int FsBioInterface::isactive() int FsBioInterface::isactive()
{ {
return mCLintConnt > 0; return mIsActive ;
} }
bool FsBioInterface::moretoread(uint32_t /* usec */) bool FsBioInterface::moretoread(uint32_t /* usec */)

View File

@ -113,20 +113,24 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
rss.addSerialType(fss); rss.addSerialType(fss);
FsSerializer().serialise(item,data,&size); FsSerializer().serialise(item,data,&size);
write(CreateSocket,data,size); // shouldn't we use the pqistreamer in R/W mode instead?
// TODO: we should write in multiple chunks just in case the socket is not fully ready RsDbg() << "Item sent. Waiting for response..." ;
write(CreateSocket,data,size);
// Now attempt to read and deserialize anything that comes back from that connexion until it gets closed by the server. // Now attempt to read and deserialize anything that comes back from that connexion until it gets closed by the server.
FsBioInterface bio(CreateSocket); FsBioInterface bio(CreateSocket);
pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); pqithreadstreamer p(this,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE | BIN_FLAGS_NO_DELETE);
pqithreadstreamer p(&pqi,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE);
p.start(); p.start();
uint32_t ss;
p.SendItem(item,ss);
while(true) while(true)
{ {
RsItem *item = p.GetItem(); p.tick();
RsItem *item = GetItem();
if(item) if(item)
{ {
@ -134,11 +138,6 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
std::cerr << "Got a response item: " << std::endl; std::cerr << "Got a response item: " << std::endl;
std::cerr << *item << std::endl; std::cerr << *item << std::endl;
} }
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
continue;
}
if(!bio.isactive()) // socket has probably closed if(!bio.isactive()) // socket has probably closed
{ {
@ -147,11 +146,28 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
close(CreateSocket); close(CreateSocket);
CreateSocket=0; CreateSocket=0;
} break;
} }
return 0; std::this_thread::sleep_for(std::chrono::milliseconds(200));
// if ok, stream the item through it
} }
return true;
}
bool FsClient::RecvItem(RsItem *item)
{
mIncomingItems.push_back(item);
return true;
}
RsItem *FsClient::GetItem()
{
if(mIncomingItems.empty())
return nullptr;
RsItem *item = mIncomingItems.front();
mIncomingItems.pop_front();
return item;
}

View File

@ -22,17 +22,27 @@
#include <string> #include <string>
#include "fsitem.h" #include "fsitem.h"
#include "pqi/pqi_base.h"
// This class runs a client connection to the friend server. It opens a socket at each connection. // This class runs a client connection to the friend server. It opens a socket at each connection.
class FsClient class FsClient: public PQInterface
{ {
public: public:
FsClient() {} FsClient() :PQInterface(RsPeerId()) {}
bool requestFriends(const std::string& address,uint16_t port,uint32_t reqs,std::map<std::string,bool>& friend_certificates); bool requestFriends(const std::string& address,uint16_t port,uint32_t reqs,std::map<std::string,bool>& friend_certificates);
protected:
// Implements PQInterface
bool RecvItem(RsItem *item) override;
int SendItem(RsItem *) override { RsErr() << "FsClient::SendItem() called although it should not." ; return 0;}
RsItem *GetItem() override;
private: private:
bool sendItem(const std::string &address, uint16_t port, RsItem *item, std::list<RsItem *> &response); bool sendItem(const std::string &address, uint16_t port, RsItem *item, std::list<RsItem *> &response);
std::list<RsItem*> mIncomingItems;
}; };

View File

@ -96,6 +96,8 @@ void FriendServerManager::threadTick()
if(mLastFriendReqestCampain + delay_for_request < now) if(mLastFriendReqestCampain + delay_for_request < now)
{ {
mLastFriendReqestCampain = now;
std::cerr << "Requesting new friends to friend server..." << std::endl; std::cerr << "Requesting new friends to friend server..." << std::endl;
std::map<std::string,bool> friend_certificates; std::map<std::string,bool> friend_certificates;

View File

@ -215,6 +215,8 @@ int FsNetworkInterface::SendItem(RsItem *item)
void FsNetworkInterface::closeConnection(const RsPeerId& peer_id) void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)
{ {
RS_STACK_MUTEX(mFsNiMtx);
const auto& it = mConnections.find(peer_id); const auto& it = mConnections.find(peer_id);
if(it == mConnections.end()) if(it == mConnections.end())
@ -240,7 +242,8 @@ void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)
close(it->second.socket); close(it->second.socket);
delete it->second.pqi_thread; delete it->second.pqi_thread;
delete it->second.bio;
mConnections.erase(it);
} }