implemented multiple clients in Friend Server

This commit is contained in:
csoler 2021-10-19 23:24:50 +02:00
parent 21ea281df4
commit 8e4a9e6a38
6 changed files with 246 additions and 63 deletions

View File

@ -8,10 +8,10 @@ public:
void start() {}
void stop() {}
void checkServerAddress_async(const std::string&,uint16_t, const std::function<void (bool result_status)>& callback)
void checkServerAddress_async(const std::string& addr,uint16_t, const std::function<void (const std::string& address,bool result_status)>& callback)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
callback(true);
callback(addr,true);
}
void setServerAddress(const std::string&,uint16_t) {}
void setFriendsToRequest(uint32_t) {}

View File

@ -9,11 +9,9 @@ void FriendServer::threadTick()
std::this_thread::sleep_for(std::chrono::milliseconds(200));
pqi->tick();
RsItem *item;
while(nullptr != (item = pqi->GetItem()))
while(nullptr != (item = mni->GetItem()))
{
RsFriendServerItem *fsitem = dynamic_cast<RsFriendServerItem*>(item);
@ -23,6 +21,7 @@ void FriendServer::threadTick()
continue;
}
std::cerr << "Received item: " << std::endl << *fsitem << std::endl;
switch(fsitem->PacketSubType())
{
@ -57,11 +56,6 @@ void FriendServer::run()
mni = new FsNetworkInterface;
mni->start();
RsSerialiser *rss = new RsSerialiser ;
rss->addSerialType(new FsSerializer) ;
pqi = new pqistreamer(rss, RsPeerId(), mni,BIN_FLAGS_READABLE);
while(!shouldStop()) { threadTick() ; }
}

View File

@ -42,7 +42,6 @@ private:
void handleClientPublish(const RsFriendServerClientPublishItem *item);
FsNetworkInterface *mni;
pqistreamer *pqi;
std::string mBaseDirectory;
};

View File

@ -34,7 +34,10 @@
#include "util/rsprint.h"
#include "util/rsdebug.h"
#include "pqi/pqithreadstreamer.h"
#include "network.h"
#include "fsitem.h"
FsNetworkInterface::FsNetworkInterface()
: mFsNiMtx(std::string("FsNetworkInterface"))
@ -42,8 +45,6 @@ FsNetworkInterface::FsNetworkInterface()
RS_STACK_MUTEX(mFsNiMtx);
mClintListn = 0;
mTotalReadBytes = 0;
mTotalBufferBytes = 0;
struct sockaddr_in ipOfServer;
@ -61,34 +62,117 @@ FsNetworkInterface::FsNetworkInterface()
RsDbg() << "Network interface now listening for TCP on " << sockaddr_storage_tostring( *(sockaddr_storage*)&ipOfServer) ;
}
int FsNetworkInterface::close()
{
RsDbg() << "Stopping network interface" << std::endl;
return 1;
}
void FsNetworkInterface::threadTick()
{
tick();
// 1 - check for new connections
checkForNewConnections();
// 2 - tick all streamers
for(auto& it:mConnections)
it.second.pqi->tick();
}
int FsNetworkInterface::tick()
static RsPeerId makePeerId(int t)
{
unsigned char s[RsPeerId::SIZE_IN_BYTES];
*reinterpret_cast<int*>(&s) = t;
return RsPeerId::fromBufferUnsafe(s);
}
bool FsNetworkInterface::checkForNewConnections()
{
// look for incoming data
// fd_set ReadFDs, WriteFDs, ExceptFDs;
// FD_ZERO(&ReadFDs);
// FD_ZERO(&WriteFDs);
// FD_ZERO(&ExceptFDs);
//
// FD_SET(mClintListn, &ReadFDs);
// FD_SET(mClintListn, &ExceptFDs);
//
// struct timeval timeout;
// timeout.tv_sec = 0;
// timeout.tv_usec = 500000; // 200 ms timeout
// int status = select(mClintListn+1, &ReadFDs, &WriteFDs, &ExceptFDs, &timeout);
//
// if(status <= 0) // if no incoming data, return. Each tick waits for 200 ms anyway.
// return false;
struct sockaddr addr;
//socklen_t addr_len = sizeof(sockaddr);
//int clintConnt = accept(mClintListn, &addr, &addr_len); // accept is a blocking call!
int clintConnt = accept(mClintListn, nullptr, nullptr); // accept is a blocking call!
if(clintConnt < 0)
{
RsErr()<< "Incoming connection with nothing to read!" << std::endl;
return false;
}
RsDbg() << "Got incoming connection from " << sockaddr_storage_tostring( *(sockaddr_storage*)&addr);
ConnectionData c;
c.socket = clintConnt;
c.client_address = addr;
RsPeerId pid = makePeerId(clintConnt);
RsSerialiser *rss = new RsSerialiser ;
rss->addSerialType(new FsSerializer) ;
FsBioInterface *bio = new FsBioInterface(clintConnt);
auto p = new pqistreamer(rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE);
auto pqi = new pqithreadstreamer(p,rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE);
c.pqi = pqi;
pqi->start();
RS_STACK_MUTEX(mFsNiMtx);
mConnections[makePeerId(clintConnt)] = c;
return true;
}
RsItem *FsNetworkInterface::GetItem()
{
RS_STACK_MUTEX(mFsNiMtx);
for(auto& it:mConnections)
{
RsItem *item = it.second.pqi->GetItem();
if(item)
return item;
}
return nullptr;
}
FsBioInterface::FsBioInterface(int socket)
: mCLintConnt(socket)
{
mTotalReadBytes=0;
mTotalBufferBytes=0;
}
int FsBioInterface::tick()
{
std::cerr << "ticking FsNetworkInterface" << std::endl;
int clintConnt = accept(mClintListn, (struct sockaddr*)NULL, NULL); // accept is a blocking call!
// 2 - read incoming data pending on existing connections
char inBuffer[1025];
memset(inBuffer,0,1025);
int readbytes = read(clintConnt, inBuffer, sizeof(inBuffer));
int readbytes = read(mCLintConnt, inBuffer, sizeof(inBuffer));
if(readbytes < 0)
RsErr() << "read() failed. Errno=" << errno ;
std::cerr << "clintConnt: " << clintConnt << ", readbytes: " << readbytes << std::endl;
std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl;
::close(clintConnt);
//::close(clintConnt);
// display some debug info
@ -104,27 +188,22 @@ int FsNetworkInterface::tick()
memcpy(ptr,inBuffer,readbytes);
RS_STACK_MUTEX(mFsNiMtx);
in_buffer.push_back(std::make_pair(ptr,readbytes));
mTotalBufferBytes += readbytes;
mTotalReadBytes += readbytes;
RsDbg() << "InBuffer: " << in_buffer.size() << " elements. Total size: " << mTotalBufferBytes << ". Total read: " << mTotalReadBytes ;
std::cerr << "Total bytes:" << mTotalReadBytes << std::endl;
std::cerr << "Connections:" << std::endl;
RsDbg() << "socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ;
}
return true;
}
int FsNetworkInterface::senddata(void *, int len)
int FsBioInterface::readdata(void *data, int len)
{
RsErr() << "Trying to send data through FsNetworkInterface although it's not implemented yet!"<< std::endl;
return false;
}
int FsNetworkInterface::readdata(void *data, int len)
{
RS_STACK_MUTEX(mFsNiMtx);
// read incoming bytes in the buffer
int total_len = 0;
@ -167,26 +246,81 @@ int FsNetworkInterface::readdata(void *data, int len)
return len;
}
int FsNetworkInterface::netstatus()
int FsBioInterface::senddata(void *data, int len)
{
return isactive();
}
int FsBioInterface::netstatus()
{
return 1; // dummy response.
}
int FsNetworkInterface::isactive()
int FsBioInterface::isactive()
{
RS_STACK_MUTEX(mFsNiMtx);
return mClintListn > 0;
return mCLintConnt > 0;
}
bool FsNetworkInterface::moretoread(uint32_t /* usec */)
bool FsBioInterface::moretoread(uint32_t /* usec */)
{
RS_STACK_MUTEX(mFsNiMtx);
return mTotalBufferBytes > 0;
}
bool FsNetworkInterface::cansend(uint32_t)
bool FsBioInterface::cansend(uint32_t)
{
return false;
return isactive();
}
int FsBioInterface::close()
{
RsDbg() << "Stopping network interface" << std::endl;
return 1;
}
FsClient::FsClient(const std::string& address)
: mServerAddress(address)
{
}
bool FsClient::sendItem(RsItem *item)
{
// request a connection
int CreateSocket = 0,n = 0;
char dataReceived[1024];
struct sockaddr_in ipOfServer;
memset(dataReceived, '0' ,sizeof(dataReceived));
if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0)
{
printf("Socket not created \n");
return 1;
}
ipOfServer.sin_family = AF_INET;
ipOfServer.sin_port = htons(2017);
ipOfServer.sin_addr.s_addr = inet_addr("127.0.0.1");
if(connect(CreateSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0)
{
printf("Connection failed due to port and ip problems\n");
return 1;
}
while((n = read(CreateSocket, dataReceived, sizeof(dataReceived)-1)) > 0)
{
dataReceived[n] = 0;
if(fputs(dataReceived, stdout) == EOF)
{
printf("\nStandard output error");
}
printf("\n");
}
if( n < 0)
{
printf("Standard input error \n");
}
return 0;
// if ok, stream the item through it
}

View File

@ -24,16 +24,21 @@
#include "util/rsthreads.h"
#include "pqi/pqi_base.h"
class FsNetworkInterface: public BinInterface, public RsTickingThread
class pqistreamer;
struct ConnectionData
{
sockaddr client_address;
int socket;
pqistreamer *pqi;
};
class FsBioInterface: public BinInterface
{
public:
FsNetworkInterface() ;
FsBioInterface(int socket);
// Implements RsTickingThread
void threadTick() override;
// Implements BinInterface methods
// Implements BinInterface methods
int tick() override;
@ -54,15 +59,58 @@ public:
uint64_t bytecount() override { return mTotalReadBytes; }
bool bandwidthLimited() override { return false; }
private:
int mCLintConnt;
uint32_t mTotalReadBytes;
uint32_t mTotalBufferBytes;
std::list<std::pair<void *,int> > in_buffer;
};
// This class handles multiple connections to the server and supplies RsItem elements
class FsNetworkInterface: public RsTickingThread
{
public:
FsNetworkInterface() ;
// basic functionality
RsItem *GetItem();
// Implements RsTickingThread
void threadTick() override;
protected:
bool checkForNewConnections();
private:
RsMutex mFsNiMtx;
void initListening();
void stopListening();
int mClintListn ;
uint64_t mTotalReadBytes;
uint64_t mTotalBufferBytes;
std::list<std::pair<void *,int> > in_buffer;
int mClintListn ; // listening socket
std::map<RsPeerId,ConnectionData> mConnections;
};
// This class runs a client connection to the friend server. It opens a socket at each connection.
class FsClient
{
public:
FsClient(const std::string& address);
bool sendItem(RsItem *item);
private:
std::string mServerAddress;
};

View File

@ -48,6 +48,7 @@ FriendServerControl::FriendServerControl(QWidget *parent)
QObject::connect(torServerAddress_LE,SIGNAL(textChanged(const QString&)),this,SLOT(onOnionAddressEdit(const QString&)));
mCheckingServerMovie = new QMovie(":/images/loader/circleball-16.gif");
serverStatusCheckResult_LB->setMovie(mCheckingServerMovie);
updateFriendServerStatusIcon(false);
}
@ -70,21 +71,26 @@ void FriendServerControl::onOnionAddressEdit(const QString&)
{
// Setup timer to auto-check the friend server address
mConnectionCheckTimer->stop();
mConnectionCheckTimer->setSingleShot(true);
mConnectionCheckTimer->setInterval(5000); // check in 5 secs unless something is changed in the mean time.
mConnectionCheckTimer->start();
serverStatusCheckResult_LB->setMovie(mCheckingServerMovie);
mCheckingServerMovie->start();
if(mCheckingServerMovie->fileName() != QString(":/images/loader/circleball-16.gif" ))
{
mCheckingServerMovie->setFileName(":/images/loader/circleball-16.gif");
mCheckingServerMovie->start();
}
}
void FriendServerControl::checkServerAddress()
{
rsFriendServer->checkServerAddress_async(torServerAddress_LE->text().toStdString(),torServerPort_SB->value(),
[this](bool test_result)
[this](const std::string& address,bool test_result)
{
if(test_result)
rsFriendServer->setServerAddress(address,1729);
RsQThreadUtils::postToObject( [=]() { updateFriendServerStatusIcon(test_result); },this);
}
);
@ -97,17 +103,19 @@ void FriendServerControl::onNbFriendsToRequestsChanged(int n)
void FriendServerControl::updateFriendServerStatusIcon(bool ok)
{
mCheckingServerMovie->stop();
if(ok)
{
torServerStatus_LB->setPixmap(FilesDefs::getPixmapFromQtResourcePath(ICON_STATUS_OK)) ;
torServerStatus_LB->setToolTip(tr("Friend server is currently reachable.")) ;
mCheckingServerMovie->setFileName(ICON_STATUS_OK);
}
else
{
torServerStatus_LB->setPixmap(FilesDefs::getPixmapFromQtResourcePath(ICON_STATUS_UNKNOWN)) ;
torServerStatus_LB->setToolTip(tr("The proxy is not enabled or broken.\nAre all services up and running fine??\nAlso check your ports!")) ;
mCheckingServerMovie->setFileName(ICON_STATUS_UNKNOWN);
}
mCheckingServerMovie->stop();
mCheckingServerMovie->start();
}