added new files

This commit is contained in:
csoler 2021-11-22 20:06:01 +01:00
parent 4d06ab2b4a
commit 3845dc1ea7
4 changed files with 465 additions and 0 deletions

View File

@ -0,0 +1,272 @@
/*******************************************************************************
* libretroshare/src/file_sharing: fsbio.cc *
* *
* libretroshare: retroshare core library *
* *
* Copyright 2021 by retroshare team <retroshare.project@gmail.com> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
******************************************************************************/
#include "util/rsprint.h"
#include "pqi/pqifdbin.h"
RsFdBinInterface::RsFdBinInterface(int file_descriptor)
: mCLintConnt(file_descriptor),mIsActive(file_descriptor!=0)
{
mTotalReadBytes=0;
mTotalInBufferBytes=0;
mTotalWrittenBytes=0;
mTotalOutBufferBytes=0;
}
void RsFdBinInterface::setSocket(int s)
{
if(mIsActive != 0)
{
RsErr() << "Changing socket to active FsBioInterface! Canceling all pending R/W data." ;
close();
}
mCLintConnt = s;
mIsActive = (s!=0);
}
int RsFdBinInterface::tick()
{
if(!mIsActive)
{
RsErr() << "Ticking a non active FsBioInterface!" ;
return 0;
}
// 2 - read incoming data pending on existing connections
int res=0;
res += read_pending();
res += write_pending();
return res;
}
int RsFdBinInterface::read_pending()
{
char inBuffer[1025];
memset(inBuffer,0,1025);
ssize_t readbytes = recv(mCLintConnt, inBuffer, sizeof(inBuffer),MSG_DONTWAIT);
if(readbytes == 0)
{
RsDbg() << "Reached END of the stream!" ;
RsDbg() << "Closing!" ;
close();
return mTotalInBufferBytes;
}
if(readbytes < 0)
{
if(errno != EWOULDBLOCK && errno != EAGAIN)
RsErr() << "read() failed. Errno=" << errno ;
return mTotalInBufferBytes;
}
RsDbg() << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes ;
// display some debug info
if(readbytes > 0)
{
RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast<unsigned char*>(inBuffer),readbytes,50) << std::endl;
//RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl;
void *ptr = malloc(readbytes);
if(!ptr)
throw std::runtime_error("Cannot allocate memory! Go buy some RAM!");
memcpy(ptr,inBuffer,readbytes);
in_buffer.push_back(std::make_pair(ptr,readbytes));
mTotalInBufferBytes += readbytes;
mTotalReadBytes += readbytes;
RsDbg() << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalInBufferBytes ;
}
return mTotalInBufferBytes;
}
int RsFdBinInterface::write_pending()
{
if(out_buffer.empty())
return mTotalOutBufferBytes;
auto& p = out_buffer.front();
int written = write(mCLintConnt, p.first, p.second);
if(written < 0)
{
if(errno != EWOULDBLOCK && errno != EAGAIN)
RsErr() << "write() failed. Errno=" << errno ;
return mTotalOutBufferBytes;
}
if(written == 0)
{
RsErr() << "write() failed. Nothing sent.";
return mTotalOutBufferBytes;
}
RsDbg() << "clintConnt: " << mCLintConnt << ", written: " << written ;
// display some debug info
RsDbg() << "Sent the following bytes: " << RsUtil::BinToHex( reinterpret_cast<unsigned char*>(p.first),written,50) << std::endl;
if(written < p.second)
{
void *ptr = malloc(p.second - written);
if(!ptr)
throw std::runtime_error("Cannot allocate memory! Go buy some RAM!");
memcpy(ptr,static_cast<unsigned char *>(p.first) + written,p.second - written);
free(p.first);
out_buffer.front().first = ptr;
out_buffer.front().second = p.second - written;
}
else
{
free(p.first);
out_buffer.pop_front();
}
mTotalOutBufferBytes -= written;
mTotalWrittenBytes += written;
return mTotalOutBufferBytes;
}
RsFdBinInterface::~RsFdBinInterface()
{
clean();
}
void RsFdBinInterface::clean()
{
for(auto p:in_buffer) free(p.first);
for(auto p:out_buffer) free(p.first);
in_buffer.clear();
out_buffer.clear();
}
int RsFdBinInterface::readdata(void *data, int len)
{
// read incoming bytes in the buffer
int total_len = 0;
while(total_len < len)
{
if(in_buffer.empty())
{
mTotalInBufferBytes -= total_len;
return total_len;
}
// If the remaining buffer is too large, chop of the beginning of it.
if(total_len + in_buffer.front().second > len)
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,len - total_len);
void *ptr = malloc(in_buffer.front().second - (len - total_len));
memcpy(ptr,&(static_cast<unsigned char*>(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len));
free(in_buffer.front().first);
in_buffer.front().first = ptr;
in_buffer.front().second -= len-total_len;
mTotalInBufferBytes -= len;
return len;
}
else // copy everything
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,in_buffer.front().second);
total_len += in_buffer.front().second;
free(in_buffer.front().first);
in_buffer.pop_front();
}
}
mTotalInBufferBytes -= len;
return len;
}
int RsFdBinInterface::senddata(void *data, int len)
{
// shouldn't we better send in multiple packets, similarly to how we read?
if(len == 0)
{
RsErr() << "Calling FsBioInterface::senddata() with null size or null data pointer";
return 0;
}
void *ptr = malloc(len);
if(!ptr)
{
RsErr() << "Cannot allocate data of size " << len ;
return 0;
}
memcpy(ptr,data,len);
out_buffer.push_back(std::make_pair(ptr,len));
mTotalOutBufferBytes += len;
return len;
}
int RsFdBinInterface::netstatus()
{
return mIsActive; // dummy response.
}
int RsFdBinInterface::isactive()
{
return mIsActive ;
}
bool RsFdBinInterface::moretoread(uint32_t /* usec */)
{
return mTotalInBufferBytes > 0;
}
bool RsFdBinInterface::cansend(uint32_t)
{
return isactive();
}
int RsFdBinInterface::close()
{
RsDbg() << "Stopping network interface" << std::endl;
mIsActive = false;
mCLintConnt = 0;
clean();
return 1;
}

View File

@ -0,0 +1,77 @@
/*******************************************************************************
* libretroshare/src/file_sharing: fsbio.h *
* *
* libretroshare: retroshare core library *
* *
* Copyright 2021 by retroshare team <retroshare.project@gmail.com> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
******************************************************************************/
#include "pqi/pqi_base.h"
class RsFdBinInterface: public BinInterface
{
public:
RsFdBinInterface(int file_descriptor);
~RsFdBinInterface();
// Implements BinInterface methods
int tick() override;
// Schedule data to be sent at the next tick(). The caller keeps memory ownership.
//
int senddata(void *data, int len) override;
// Obtains new data from the interface. "data" needs to be initialized for room
// to len bytes. The returned value is the actual size of what was read.
//
int readdata(void *data, int len) override;
int netstatus() override;
int isactive() override;
bool moretoread(uint32_t usec) override;
bool cansend(uint32_t usec) override;
int close() override;
/**
* If hashing data
**/
RsFileHash gethash() override { return RsFileHash() ; }
uint64_t bytecount() override { return mTotalReadBytes; }
bool bandwidthLimited() override { return false; }
protected:
void setSocket(int s);
void clean();
private:
int read_pending();
int write_pending();
int mCLintConnt;
bool mIsActive;
uint32_t mTotalReadBytes;
uint32_t mTotalInBufferBytes;
uint32_t mTotalWrittenBytes;
uint32_t mTotalOutBufferBytes;
std::list<std::pair<void *,int> > in_buffer;
std::list<std::pair<void *,int> > out_buffer;
};

View File

@ -0,0 +1,75 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <iostream>
#include "rstcpsocket.h"
RsTcpSocket::RsTcpSocket(const std::string& tcp_address,uint16_t tcp_port)
:RsFdBinInterface(0),mState(DISCONNECTED),mConnectAddress(tcp_address),mConnectPort(tcp_port),mSocket(0)
{
}
int RsTcpSocket::connect()
{
int CreateSocket = 0;
char dataReceived[1024];
struct sockaddr_in ipOfServer;
memset(dataReceived, '0' ,sizeof(dataReceived));
if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0)
{
printf("Socket not created \n");
return false;
}
ipOfServer.sin_family = AF_INET;
ipOfServer.sin_port = htons(mConnectPort);
ipOfServer.sin_addr.s_addr = inet_addr(mConnectAddress.c_str());
if(::connect(mSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0)
{
printf("Connection failed due to port and ip problems, or server is not available\n");
return false;
}
mState = CONNECTED;
setSocket(mSocket);
return true;
}
int RsTcpSocket::close()
{
RsFdBinInterface::close();
return !::close(mSocket);
}
RsThreadedTcpSocket::RsThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port)
: RsTcpSocket(tcp_address,tcp_port)
{
}
void RsThreadedTcpSocket::run()
{
if(!connect())
{
RsErr() << "Cannot connect socket to " << connectAddress() << ":" << connectPort() ;
return ;
}
while(connectionState() == CONNECTED)
{
tick();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
RsWarn() << "Connection to " << connectAddress() << ":" << connectPort() << " is now closed.";
}
RsThreadedTcpSocket::~RsThreadedTcpSocket()
{
fullstop(); // fully wait for stopping.
close();
}

View File

@ -0,0 +1,41 @@
#include <string>
#include "util/rsthreads.h"
#include "pqi/pqifdbin.h"
class RsTcpSocket: public RsFdBinInterface
{
public:
RsTcpSocket(const std::string& tcp_address,uint16_t tcp_port);
enum State: uint8_t {
UNKNOWN = 0x00,
DISCONNECTED = 0x01,
CONNECTED = 0x02
};
// Return 1 when OK, 0 otherwise.
int connect();
// Returns 1 when OK, 0 otherwise.
int close();
State connectionState() const { return mState; }
const std::string& connectAddress() const { return mConnectAddress ; }
uint16_t connectPort() const { return mConnectPort ; }
private:
State mState;
std::string mConnectAddress;
uint16_t mConnectPort;
int mSocket;
};
class RsThreadedTcpSocket: public RsTcpSocket, public RsThread
{
public:
RsThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port);
virtual ~RsThreadedTcpSocket();
virtual void run() override;
};