fixed memory error

This commit is contained in:
csoler 2021-10-31 18:00:43 +01:00
parent a69f9dc09b
commit e058b3a35f
7 changed files with 98 additions and 20 deletions

View File

@ -109,8 +109,8 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
} }
FsSerializer *fss = new FsSerializer; FsSerializer *fss = new FsSerializer;
RsSerialiser rss; RsSerialiser *rss = new RsSerialiser(); // deleted by ~pqistreamer()
rss.addSerialType(fss); rss->addSerialType(fss);
FsSerializer().serialise(item,data,&size); FsSerializer().serialise(item,data,&size);
write(CreateSocket,data,size); // shouldn't we use the pqistreamer in R/W mode instead? write(CreateSocket,data,size); // shouldn't we use the pqistreamer in R/W mode instead?
@ -119,8 +119,9 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
// Now attempt to read and deserialize anything that comes back from that connexion until it gets closed by the server. // Now attempt to read and deserialize anything that comes back from that connexion until it gets closed by the server.
FsBioInterface bio(CreateSocket); FsBioInterface *bio = new FsBioInterface(CreateSocket); // deleted by ~pqistreamer()
pqithreadstreamer p(this,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE | BIN_FLAGS_NO_DELETE);
pqithreadstreamer p(this,rss,RsPeerId(),bio,BIN_FLAGS_READABLE | BIN_FLAGS_NO_DELETE | BIN_FLAGS_NO_CLOSE);
p.start(); p.start();
uint32_t ss; uint32_t ss;
@ -139,13 +140,17 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
std::cerr << *item << std::endl; std::cerr << *item << std::endl;
} }
if(!bio.isactive()) // socket has probably closed if(!bio->isactive()) // socket has probably closed
{ {
RsDbg() << "(client side) Socket has been closed by server. Killing pqistreamer and closing socket." ; RsDbg() << "(client side) Socket has been closed by server.";
RsDbg() << " Stopping/killing pqistreamer" ;
p.fullstop(); p.fullstop();
RsDbg() << " Closing socket." ;
close(CreateSocket); close(CreateSocket);
CreateSocket=0; CreateSocket=0;
RsDbg() << " Exiting loop." ;
break; break;
} }

View File

@ -3,6 +3,10 @@
#include "friendserver.h" #include "friendserver.h"
#include "friend_server/fsitem.h" #include "friend_server/fsitem.h"
static const rstime_t MAXIMUM_PEER_INACTIVE_DELAY = 600;
static const rstime_t DELAY_BETWEEN_TWO_AUTOWASH = 60;
static const rstime_t DELAY_BETWEEN_TWO_DEBUG_PRINT = 10;
void FriendServer::threadTick() void FriendServer::threadTick()
{ {
// Listen to the network interface, capture incoming data etc. // Listen to the network interface, capture incoming data etc.
@ -33,6 +37,23 @@ void FriendServer::threadTick()
} }
delete item; delete item;
} }
static rstime_t last_autowash_TS = time(nullptr);
rstime_t now = time(nullptr);
if(last_autowash_TS + DELAY_BETWEEN_TWO_AUTOWASH < now)
{
last_autowash_TS = now;
autoWash();
}
static rstime_t last_debugprint_TS = time(nullptr);
if(last_debugprint_TS + DELAY_BETWEEN_TWO_DEBUG_PRINT < now)
{
last_debugprint_TS = now;
debugPrint();
}
} }
void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *item) void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *item)
@ -67,3 +88,39 @@ void FriendServer::run()
while(!shouldStop()) { threadTick() ; } while(!shouldStop()) { threadTick() ; }
} }
void FriendServer::autoWash()
{
rstime_t now = time(nullptr);
for(std::map<RsPeerId,PeerInfo>::iterator it(mCurrentClientPeers.begin());it!=mCurrentClientPeers.end();)
{
if(it->second.last_connection_TS + MAXIMUM_PEER_INACTIVE_DELAY < now)
{
RsDbg() << "Removing client peer " << it->first << " because it's inactive for more than " << MAXIMUM_PEER_INACTIVE_DELAY << " seconds." ;
auto tmp = it;
++tmp;
mCurrentClientPeers.erase(it);
it = tmp;
}
}
}
void FriendServer::debugPrint()
{
RsDbg() << "========== FriendServer statistics ============";
RsDbg() << " Base directory: "<< mBaseDirectory;
RsDbg() << " Network interface: ";
RsDbg() << " Current peers: " << mCurrentClientPeers.size() ;
rstime_t now = time(nullptr);
for(auto& it:mCurrentClientPeers)
RsDbg() << " " << it.first << ": " << "last contact: " << now - it.second.last_connection_TS;
RsDbg() << "===============================================";
}

View File

@ -29,19 +29,36 @@
class RsFriendServerClientRemoveItem; class RsFriendServerClientRemoveItem;
class RsFriendServerClientPublishItem; class RsFriendServerClientPublishItem;
struct PeerInfo
{
std::string short_certificate;
rstime_t last_connection_TS;
};
class FriendServer : public RsTickingThread class FriendServer : public RsTickingThread
{ {
public: public:
FriendServer(const std::string& base_directory); FriendServer(const std::string& base_directory);
private: private:
// overloads RsTickingThread
virtual void threadTick() override; virtual void threadTick() override;
virtual void run() override; virtual void run() override;
// Own algorithmics
void handleClientRemove(const RsFriendServerClientRemoveItem *item); void handleClientRemove(const RsFriendServerClientRemoveItem *item);
void handleClientPublish(const RsFriendServerClientPublishItem *item); void handleClientPublish(const RsFriendServerClientPublishItem *item);
void autoWash();
void debugPrint();
// Local members
FsNetworkInterface *mni; FsNetworkInterface *mni;
std::string mBaseDirectory; std::string mBaseDirectory;
std::map<RsPeerId, PeerInfo> mCurrentClientPeers;
}; };

View File

@ -246,6 +246,16 @@ void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)
mConnections.erase(it); mConnections.erase(it);
} }
void FsNetworkInterface::debugPrint()
{
RsDbg() << " " << mClintListn ; // listening socket
RsDbg() << " Connections: " << mConnections.size() ;
for(auto& it:mConnections)
RsDbg() << " " << it.first << ": from \"" << sockaddr_storage_tostring(*(sockaddr_storage*)(&it.second.client_address)) << "\", socket=" << it.second.socket ;
std::map<RsPeerId,ConnectionData> mConnections;
}

View File

@ -49,6 +49,7 @@ public:
// basic functionality // basic functionality
void closeConnection(const RsPeerId& peer_id); void closeConnection(const RsPeerId& peer_id);
void debugPrint();
// Implements PQInterface // Implements PQInterface

View File

@ -58,20 +58,8 @@ int main(int argc, char* argv[])
fs.start(); fs.start();
while(fs.isRunning()) while(fs.isRunning())
{
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(std::chrono::seconds(2));
// // send one request for testing to see what happens
//
// RsFriendServerClientPublishItem *item = new RsFriendServerClientPublishItem();
// item->long_invite = std::string("[Long Invite]");
// item->n_requested_friends = 10;
//
// std::cerr << "Sending fake request item for testing..." << std::endl;
// FsClient(std::string("127.0.0.1")).sendItem(item);
//
// std::this_thread::sleep_for(std::chrono::seconds(4));
}
return 0; return 0;
} }