fixed basic incoming communication at server side

This commit is contained in:
csoler 2021-10-31 12:02:09 +01:00
parent f0b23b84f1
commit 42b4a821bd
6 changed files with 148 additions and 27 deletions

View File

@ -24,7 +24,7 @@
#include "fsbio.h" #include "fsbio.h"
FsBioInterface::FsBioInterface(int socket) FsBioInterface::FsBioInterface(int socket)
: mCLintConnt(socket) : mCLintConnt(socket),mIsActive(true)
{ {
mTotalReadBytes=0; mTotalReadBytes=0;
mTotalBufferBytes=0; mTotalBufferBytes=0;
@ -43,21 +43,22 @@ int FsBioInterface::tick()
if(readbytes == 0) if(readbytes == 0)
{ {
std::cerr << "Reached END of the stream!" << std::endl; RsDbg() << "Reached END of the stream!" << std::endl;
return 0; RsDbg() << "Closing!" << std::endl;
mIsActive = false;
return mTotalBufferBytes;
} }
if(readbytes < 0) if(readbytes < 0)
{ {
if(errno != EWOULDBLOCK && errno != EAGAIN) if(errno != EWOULDBLOCK && errno != EAGAIN)
RsErr() << "read() failed. Errno=" << errno ; RsErr() << "read() failed. Errno=" << errno ;
return false; return mTotalBufferBytes;
} }
std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl; std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl;
//::close(clintConnt);
// display some debug info // display some debug info
if(readbytes > 0) if(readbytes > 0)
@ -79,7 +80,7 @@ int FsBioInterface::tick()
std::cerr << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ; std::cerr << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ;
} }
return true; return mTotalBufferBytes;
} }
int FsBioInterface::readdata(void *data, int len) int FsBioInterface::readdata(void *data, int len)
@ -134,7 +135,7 @@ int FsBioInterface::senddata(void *data, int len)
} }
int FsBioInterface::netstatus() int FsBioInterface::netstatus()
{ {
return 1; // dummy response. return mIsActive; // dummy response.
} }
int FsBioInterface::isactive() int FsBioInterface::isactive()
@ -154,6 +155,7 @@ bool FsBioInterface::cansend(uint32_t)
int FsBioInterface::close() int FsBioInterface::close()
{ {
RsDbg() << "Stopping network interface" << std::endl; RsDbg() << "Stopping network interface" << std::endl;
mIsActive = false;
return 1; return 1;
} }

View File

@ -51,6 +51,7 @@ public:
private: private:
int mCLintConnt; int mCLintConnt;
bool mIsActive;
uint32_t mTotalReadBytes; uint32_t mTotalReadBytes;
uint32_t mTotalBufferBytes; uint32_t mTotalBufferBytes;

View File

@ -117,7 +117,7 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
// TODO: we should write in multiple chunks just in case the socket is not fully ready // TODO: we should write in multiple chunks just in case the socket is not fully ready
write(CreateSocket,data,size); write(CreateSocket,data,size);
// Now attempt to read and deserialize anything that comes back from that connexion // Now attempt to read and deserialize anything that comes back from that connexion until it gets closed by the server.
FsBioInterface bio(CreateSocket); FsBioInterface bio(CreateSocket);
pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE);
@ -128,14 +128,26 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
{ {
RsItem *item = p.GetItem(); RsItem *item = p.GetItem();
if(!item) if(item)
{
response.push_back(item);
std::cerr << "Got a response item: " << std::endl;
std::cerr << *item << std::endl;
}
else
{ {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
continue; continue;
} }
std::cerr << "Got a response item: " << std::endl; if(!bio.isactive()) // socket has probably closed
std::cerr << *item << std::endl; {
RsDbg() << "(client side) Socket has been closed by server. Killing pqistreamer and closing socket." ;
p.fullstop();
close(CreateSocket);
CreateSocket=0;
}
} }
return 0; return 0;

View File

@ -37,7 +37,15 @@ void FriendServer::threadTick()
void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *item) void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *item)
{ {
RsDbg() << "Received a client publish item:" << *item ; RsDbg() << "Received a client publish item from " << item->PeerId() << ":" << *item ;
// Respond with a list of potential friends
// Close client connection from server side, to tell the client that nothing more is coming.
RsDbg() << "Closing client connection." ;
mni->closeConnection(item->PeerId());
} }
void FriendServer::handleClientRemove(const RsFriendServerClientRemoveItem *item) void FriendServer::handleClientRemove(const RsFriendServerClientRemoveItem *item)
{ {

View File

@ -41,12 +41,11 @@
#include "friend_server/fsitem.h" #include "friend_server/fsitem.h"
FsNetworkInterface::FsNetworkInterface() FsNetworkInterface::FsNetworkInterface()
: mFsNiMtx(std::string("FsNetworkInterface")) : PQInterface(RsPeerId()),mFsNiMtx(std::string("FsNetworkInterface"))
{ {
RS_STACK_MUTEX(mFsNiMtx); RS_STACK_MUTEX(mFsNiMtx);
mClintListn = 0; mClintListn = 0;
mClintListn = socket(AF_INET, SOCK_STREAM, 0); // creating socket mClintListn = socket(AF_INET, SOCK_STREAM, 0); // creating socket
int flags = fcntl(mClintListn, F_GETFL); int flags = fcntl(mClintListn, F_GETFL);
@ -76,9 +75,10 @@ FsNetworkInterface::FsNetworkInterface()
FsNetworkInterface::~FsNetworkInterface() FsNetworkInterface::~FsNetworkInterface()
{ {
RS_STACK_MUTEX(mFsNiMtx);
for(auto& it:mConnections) for(auto& it:mConnections)
{ {
delete it.second.pqi; delete it.second.pqi_thread;
std::cerr << "Releasing socket " << it.second.socket << std::endl; std::cerr << "Releasing socket " << it.second.socket << std::endl;
close(it.second.socket); close(it.second.socket);
} }
@ -93,8 +93,11 @@ void FsNetworkInterface::threadTick()
// 2 - tick all streamers // 2 - tick all streamers
RS_STACK_MUTEX(mFsNiMtx);
for(auto& it:mConnections) for(auto& it:mConnections)
it.second.pqi->tick(); {
it.second.pqi_thread->tick();
}
rstime::rs_usleep(1000*200); rstime::rs_usleep(1000*200);
} }
@ -102,6 +105,8 @@ void FsNetworkInterface::threadTick()
static RsPeerId makePeerId(int t) static RsPeerId makePeerId(int t)
{ {
unsigned char s[RsPeerId::SIZE_IN_BYTES]; unsigned char s[RsPeerId::SIZE_IN_BYTES];
memset(s,0,sizeof(s));
*reinterpret_cast<int*>(&s) = t; *reinterpret_cast<int*>(&s) = t;
return RsPeerId::fromBufferUnsafe(s); return RsPeerId::fromBufferUnsafe(s);
} }
@ -145,28 +150,110 @@ bool FsNetworkInterface::checkForNewConnections()
FsBioInterface *bio = new FsBioInterface(clintConnt); FsBioInterface *bio = new FsBioInterface(clintConnt);
auto p = new pqistreamer(rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE); auto pqi = new pqithreadstreamer(this,rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE);
auto pqi = new pqithreadstreamer(p,rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE);
c.pqi = pqi; c.pqi_thread = pqi;
c.bio = bio;
pqi->start(); pqi->start();
RS_STACK_MUTEX(mFsNiMtx); RS_STACK_MUTEX(mFsNiMtx);
mConnections[makePeerId(clintConnt)] = c; mConnections[pid] = c;
return true; return true;
} }
bool FsNetworkInterface::RecvItem(RsItem *item)
{
RS_STACK_MUTEX(mFsNiMtx);
auto it = mConnections.find(item->PeerId());
if(it == mConnections.end())
{
RsErr() << "Receiving an item for peer ID " << item->PeerId() << " but no connection is known for that peer." << std::endl;
delete item;
return false;
}
it->second.incoming_items.push_back(item);
return true;
}
RsItem *FsNetworkInterface::GetItem() RsItem *FsNetworkInterface::GetItem()
{ {
RS_STACK_MUTEX(mFsNiMtx); RS_STACK_MUTEX(mFsNiMtx);
for(auto& it:mConnections) for(auto& it:mConnections)
{ {
RsItem *item = it.second.pqi->GetItem(); if(!it.second.incoming_items.empty())
if(item) {
RsItem *item = it.second.incoming_items.front();
it.second.incoming_items.pop_front();
return item; return item;
} }
}
return nullptr; return nullptr;
} }
int FsNetworkInterface::SendItem(RsItem *item)
{
RS_STACK_MUTEX(mFsNiMtx);
const auto& it = mConnections.find(item->PeerId());
if(it == mConnections.end())
{
RsErr() << "Cannot send item to peer " << item->PeerId() << ": no pending sockets available." ;
delete item;
return 0;
}
return it->second.pqi_thread->SendItem(item);
}
void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)
{
const auto& it = mConnections.find(peer_id);
if(it == mConnections.end())
{
RsErr() << "Cannot close connection to peer " << peer_id << ": no pending sockets available." ;
return;
}
if(!it->second.incoming_items.empty())
{
RsErr() << "Trying to close an incoming connection with incoming items still pending! The items will be lost." << std::endl;
for(auto& item:it->second.incoming_items)
delete item;
it->second.incoming_items.clear();
}
// Close the socket and delete everything.
it->second.pqi_thread->fullstop();
it->second.bio->close();
close(it->second.socket);
delete it->second.pqi_thread;
delete it->second.bio;
}

View File

@ -22,20 +22,25 @@
#pragma once #pragma once
#include "util/rsthreads.h" #include "util/rsthreads.h"
#include "pqi/pqi_base.h"
#include "retroshare/rspeers.h" #include "retroshare/rspeers.h"
class pqistreamer; class pqithreadstreamer;
class FsBioInterface;
struct ConnectionData struct ConnectionData
{ {
sockaddr client_address; sockaddr client_address;
int socket; int socket;
pqistreamer *pqi; pqithreadstreamer *pqi_thread;
FsBioInterface *bio;
std::list<RsItem*> incoming_items;
}; };
// This class handles multiple connections to the server and supplies RsItem elements // This class handles multiple connections to the server and supplies RsItem elements
class FsNetworkInterface: public RsTickingThread class FsNetworkInterface: public RsTickingThread, public PQInterface
{ {
public: public:
FsNetworkInterface() ; FsNetworkInterface() ;
@ -43,7 +48,13 @@ public:
// basic functionality // basic functionality
RsItem *GetItem(); void closeConnection(const RsPeerId& peer_id);
// Implements PQInterface
bool RecvItem(RsItem *item) override;
int SendItem(RsItem *item) override;
RsItem *GetItem() override;
// Implements RsTickingThread // Implements RsTickingThread