mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
Improved CRC32Map checking in several ways:
- servers now compute the map in a separate thread - CRC32Maps are kept in cache for 30 mins - CRC32Maps requests cannot be used to overflood a server anymore since their number is limited. - Transfer modules now send keep alive packets to maintain tunnels when asking for a CRC32Map git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4661 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
a9cb864717
commit
d43a131c04
@ -34,6 +34,7 @@
|
||||
#include "ft/ftfilecreator.h"
|
||||
#include "ft/ftfileprovider.h"
|
||||
#include "ft/ftsearch.h"
|
||||
#include "util/rsdir.h"
|
||||
#include <retroshare/rsturtle.h>
|
||||
|
||||
/* For Thread Behaviour */
|
||||
@ -281,6 +282,28 @@ bool ftDataMultiplex::recvCRC32MapRequest(const std::string& peerId, const std::
|
||||
return true;
|
||||
}
|
||||
|
||||
class CRC32Thread: public RsThread
|
||||
{
|
||||
public:
|
||||
CRC32Thread(ftDataMultiplex *dataplex,const std::string& peerId,const std::string& hash)
|
||||
: _plex(dataplex),_finished(false),_peerId(peerId),_hash(hash) {}
|
||||
|
||||
virtual void run()
|
||||
{
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "CRC32Thread is running for file " << _hash << std::endl;
|
||||
#endif
|
||||
_plex->computeAndSendCRC32Map(_peerId,_hash) ;
|
||||
_finished = true ;
|
||||
}
|
||||
bool finished() { return _finished ; }
|
||||
private:
|
||||
ftDataMultiplex *_plex ;
|
||||
bool _finished ;
|
||||
std::string _peerId ;
|
||||
std::string _hash ;
|
||||
};
|
||||
|
||||
/*********** BACKGROUND THREAD OPERATIONS ***********/
|
||||
bool ftDataMultiplex::workQueued()
|
||||
{
|
||||
@ -301,6 +324,7 @@ bool ftDataMultiplex::workQueued()
|
||||
bool ftDataMultiplex::doWork()
|
||||
{
|
||||
bool doRequests = true;
|
||||
time_t now = time(NULL) ;
|
||||
|
||||
/* Handle All the current Requests */
|
||||
while(doRequests)
|
||||
@ -372,6 +396,47 @@ bool ftDataMultiplex::doWork()
|
||||
}
|
||||
}
|
||||
|
||||
// Look for potentially finished CRC32Map threads, and destroys them.
|
||||
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
for(std::list<CRC32Thread*>::iterator lit(_crc32map_threads.begin());lit!=_crc32map_threads.end();)
|
||||
if((*lit)->finished())
|
||||
{
|
||||
std::cerr << "ftDataMultiplex::doWork: thread " << *lit << " ended. Deleting it." << std::endl;
|
||||
(*lit)->join() ;
|
||||
delete (*lit) ;
|
||||
std::list<CRC32Thread*>::iterator tmp(lit) ;
|
||||
++lit ;
|
||||
_crc32map_threads.erase(tmp) ;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "ftDataMultiplex::doWork: thread " << *lit << " still working. Not quitting it." << std::endl;
|
||||
++lit ;
|
||||
}
|
||||
|
||||
// Take the opportunity to cleanup the list, so that it cannot grow indefinitely
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::doWork: Cleaning up list of cached maps." << std::endl ;
|
||||
#endif
|
||||
|
||||
// Keep CRC32 maps in cache for 30 mins max.
|
||||
//
|
||||
for(std::map<std::string,std::pair<time_t,CRC32Map> >::iterator it = _cached_crc32maps.begin();it!=_cached_crc32maps.end();)
|
||||
if(it->second.first + 30*60 < now)
|
||||
{
|
||||
std::cerr << "Removing cached map for file " << it->first << " that was kept for too long now." << std::endl;
|
||||
|
||||
std::map<std::string,std::pair<time_t,CRC32Map> >::iterator tmp(it) ;
|
||||
++it ;
|
||||
_cached_crc32maps.erase(tmp) ;
|
||||
}
|
||||
else
|
||||
++it ;
|
||||
}
|
||||
|
||||
/* Only Handle One Search Per Period....
|
||||
* Lower Priority
|
||||
*/
|
||||
@ -396,6 +461,7 @@ bool ftDataMultiplex::doWork()
|
||||
if(handleSearchRequest(req.mPeerId, req.mHash))
|
||||
handleRecvDataRequest(req.mPeerId, req.mHash, req.mSize, req.mOffset, req.mChunk) ;
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -470,11 +536,72 @@ bool ftDataMultiplex::recvChunkMap(const std::string& peerId, const std::string&
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bool ftDataMultiplex::handleRecvCRC32MapRequest(const std::string& peerId, const std::string& hash)
|
||||
{
|
||||
std::map<std::string, ftFileProvider *>::iterator it ;
|
||||
bool found = true ;
|
||||
bool found = false ;
|
||||
CRC32Map cmap ;
|
||||
|
||||
// 1 - look into cache
|
||||
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() : source " << peerId << " asked for CRC32 map for file " << hash << std::endl;
|
||||
#endif
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
std::map<std::string,std::pair<time_t,CRC32Map> >::iterator it = _cached_crc32maps.find(hash) ;
|
||||
|
||||
if(it != _cached_crc32maps.end())
|
||||
{
|
||||
cmap = it->second.second ;
|
||||
it->second.first = time(NULL) ; // update time stamp
|
||||
found = true ;
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() : CRC32 map found in cache !!" << std::endl;
|
||||
#endif
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if(found)
|
||||
{
|
||||
std::cerr << "File CRC32 map was obtained successfully. Sending it." << std::endl ;
|
||||
|
||||
mDataSend->sendCRC32Map(peerId,hash,cmap);
|
||||
return true ;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "File CRC32 Not found. Computing it." << std::endl ;
|
||||
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
if(_crc32map_threads.size() > 1)
|
||||
{
|
||||
std::cerr << "Too many threads already computing CRC32Maps (2 is the current maximum)! Giving up." << std::endl;
|
||||
return false ;
|
||||
}
|
||||
}
|
||||
|
||||
CRC32Thread *thread = new CRC32Thread(this,peerId,hash);
|
||||
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
_crc32map_threads.push_back(thread) ;
|
||||
}
|
||||
thread->start() ;
|
||||
return true ;
|
||||
}
|
||||
}
|
||||
|
||||
bool ftDataMultiplex::computeAndSendCRC32Map(const std::string& peerId, const std::string& hash)
|
||||
{
|
||||
bool found ;
|
||||
std::map<std::string, ftFileProvider *>::iterator it ;
|
||||
std::string filename ;
|
||||
uint64_t filesize =0;
|
||||
|
||||
// 1 - look into the list of servers
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
@ -484,6 +611,8 @@ bool ftDataMultiplex::handleRecvCRC32MapRequest(const std::string& peerId, const
|
||||
found = false ;
|
||||
}
|
||||
|
||||
// 2 - if not found, create a server.
|
||||
//
|
||||
if(!found)
|
||||
{
|
||||
#ifdef MPLEX_DEBUG
|
||||
@ -499,21 +628,51 @@ bool ftDataMultiplex::handleRecvCRC32MapRequest(const std::string& peerId, const
|
||||
#endif
|
||||
}
|
||||
|
||||
CRC32Map cmap ;
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
it = mServers.find(hash) ;
|
||||
|
||||
if(it == mServers.end()) // handleSearchRequest should have filled mServers[hash], but we have been off-mutex since,
|
||||
{
|
||||
std::cerr << "Could definitely not find a provider for file " << hash << ". Maybe the file does not exist?" << std::endl;
|
||||
return false ; // so it's safer to check again.
|
||||
else if(!it->second->getCRC32Map(cmap))
|
||||
}
|
||||
else
|
||||
{
|
||||
filesize = it->second->fileSize() ;
|
||||
filename = it->second->fileName() ;
|
||||
}
|
||||
}
|
||||
|
||||
std::cerr << "Computing CRC32Map for file " << filename << ", hash=" << hash << ", size=" << filesize << std::endl;
|
||||
|
||||
FILE *fd = fopen(filename.c_str(),"r") ;
|
||||
|
||||
if(fd == NULL)
|
||||
{
|
||||
std::cerr << "Could not open file " << filename << " for read!! CRC32Map computation cancelled." << std::endl ;
|
||||
return false ;
|
||||
}
|
||||
|
||||
std::cerr << "File CRC32 map was successfully computed. Sending it." << std::endl ;
|
||||
CRC32Map cmap ;
|
||||
if(!RsDirUtil::crc32File(fd,filesize,ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE,cmap))
|
||||
{
|
||||
std::cerr << "CRC32Map computation failed." << std::endl ;
|
||||
fclose(fd) ;
|
||||
return false ;
|
||||
}
|
||||
fclose(fd) ;
|
||||
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
std::cerr << "File CRC32 was successfully computed. Storing it into cache." << std::endl ;
|
||||
|
||||
_cached_crc32maps[hash] = std::pair<time_t,CRC32Map>(time(NULL),cmap) ;
|
||||
}
|
||||
|
||||
std::cerr << "File CRC32 was successfully computed. Sending it." << std::endl ;
|
||||
mDataSend->sendCRC32Map(peerId,hash,cmap);
|
||||
|
||||
return true ;
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,7 @@ class ftTransferModule;
|
||||
class ftFileProvider;
|
||||
class ftFileCreator;
|
||||
class ftSearch;
|
||||
class CRC32Thread;
|
||||
|
||||
#include <string>
|
||||
#include <list>
|
||||
@ -113,6 +114,9 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
|
||||
/* Client Send */
|
||||
bool sendCRC32MapRequest(const std::string& peerId, const std::string& hash) ;
|
||||
|
||||
/* called from a separate thread */
|
||||
bool computeAndSendCRC32Map(const std::string& peerId, const std::string& hash) ;
|
||||
|
||||
/*************** RECV INTERFACE (provides ftDataRecv) ****************/
|
||||
|
||||
/* Client Recv */
|
||||
@ -162,6 +166,8 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
|
||||
std::list<ftRequest> mSearchQueue;
|
||||
// std::map<std::string, time_t> mUnknownHashs;
|
||||
|
||||
std::list<CRC32Thread *> _crc32map_threads ;
|
||||
std::map<std::string,std::pair<time_t,CRC32Map> > _cached_crc32maps ;
|
||||
ftDataSend *mDataSend;
|
||||
ftSearch *mSearch;
|
||||
std::string mOwnId;
|
||||
|
@ -113,12 +113,6 @@ class ftFileCreator: public ftFileProvider
|
||||
virtual void getAvailabilityMap(CompressedChunkMap& cmap) ;
|
||||
void setAvailabilityMap(const CompressedChunkMap& cmap) ;
|
||||
|
||||
// Provides a complete per-chunk CRC32 map to client who want to check their data.
|
||||
// This is overloads ftFileProvider, but returns false, because we can't ensure that unchecked chunks
|
||||
// will provide a CRC32 that is faithful to the original hash.
|
||||
//
|
||||
virtual bool getCRC32Map(CRC32Map& /*crc_map*/) { return false ; }
|
||||
|
||||
// This is called when receiving the availability map from a source peer, for the file being handled.
|
||||
//
|
||||
void setSourceMap(const std::string& peer_id,const CompressedChunkMap& map) ;
|
||||
|
@ -318,15 +318,4 @@ int ftFileProvider::initializeFileAttrs()
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool ftFileProvider::getCRC32Map(CRC32Map& crc_map)
|
||||
{
|
||||
if(!initializeFileAttrs())
|
||||
{
|
||||
std::cerr << "ftFileProvider::getCRC32Map(...): ERROR: can't initialize file !" << std::endl ;
|
||||
return false ;
|
||||
}
|
||||
|
||||
std::cerr << "ftFileProvider::getClientMap(): computing CRC32 map for file " << file_name << " (" << hash << ")" << std::endl ;
|
||||
return RsDirUtil::crc32File(fd,mSize,ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE,crc_map) ;
|
||||
}
|
||||
|
||||
|
@ -53,12 +53,6 @@ class ftFileProvider
|
||||
//
|
||||
virtual void getAvailabilityMap(CompressedChunkMap& cmap) ;
|
||||
|
||||
// Provides a complete per-chunk CRC32 map to client who want to check their data.
|
||||
// This is derived in ftFileCreator, but returns false, because we can't ensure that unchecked chunks
|
||||
// will provide a CRC32 that is faithful to the original hash.
|
||||
//
|
||||
virtual bool getCRC32Map(CRC32Map& crc_map) ;
|
||||
|
||||
// a ftFileProvider feeds a distant peer. To display what the peers already has, we need to store/read this info.
|
||||
void getClientMap(const std::string& peer_id,CompressedChunkMap& cmap,bool& map_is_too_old) ;
|
||||
void setClientMap(const std::string& peer_id,const CompressedChunkMap& cmap) ;
|
||||
@ -66,6 +60,10 @@ class ftFileProvider
|
||||
// Removes inactive peers from the client list. Returns true if all peers have been removed.
|
||||
//
|
||||
bool purgeOldPeers(time_t now,uint32_t max_duration) ;
|
||||
|
||||
const std::string& fileHash() const { return hash ; }
|
||||
const std::string& fileName() const { return file_name ; }
|
||||
uint64_t fileSize() const { return mSize ; }
|
||||
protected:
|
||||
virtual int initializeFileAttrs(); /* does for both */
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
/******
|
||||
* #define FT_DEBUG 1
|
||||
*****/
|
||||
#define FT_DEBUG 1
|
||||
|
||||
#include "retroshare/rsturtle.h"
|
||||
#include "fttransfermodule.h"
|
||||
@ -88,6 +89,8 @@ ftTransferModule::ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm, ftCon
|
||||
actualRate = 0;
|
||||
_crcmap_state = FT_TM_CRC_MAP_STATE_NOCHECK ;
|
||||
_crcmap_last_asked_time = 0 ;
|
||||
_crcmap_last_tunnel_keepup = 0 ;
|
||||
_crcreq_source = "";
|
||||
}
|
||||
|
||||
ftTransferModule::~ftTransferModule()
|
||||
@ -643,6 +646,7 @@ bool ftTransferModule::checkCRC()
|
||||
switch(_crcmap_state)
|
||||
{
|
||||
case FT_TM_CRC_MAP_STATE_NOCHECK:
|
||||
_crcreq_source = "" ;
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::checkCRC(): state is NOCHECK. Doing nothing." << std::endl ;
|
||||
#endif
|
||||
@ -659,6 +663,16 @@ bool ftTransferModule::checkCRC()
|
||||
|
||||
std::cerr << "Threshold is " << threshold << std::endl;
|
||||
std::cerr << "Limit is " << (uint64_t)_crcmap_last_asked_time + threshold << std::endl ;
|
||||
std::cerr << "Requested source is \"" << _crcreq_source << "\"" << std::endl;
|
||||
|
||||
if( _crcreq_source != "" && (uint64_t)_crcmap_last_tunnel_keepup + 10 <= (uint64_t)now)
|
||||
{
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::checkCRC(): sending keepup to source " << _crcreq_source << std::endl ;
|
||||
#endif
|
||||
locked_requestData(_crcreq_source,0,(uint32_t)std::min((uint64_t)512,mSize));
|
||||
_crcmap_last_tunnel_keepup = now ;
|
||||
}
|
||||
|
||||
if( (uint64_t)_crcmap_last_asked_time + threshold > (uint64_t)now)
|
||||
{
|
||||
@ -690,6 +704,8 @@ bool ftTransferModule::checkCRC()
|
||||
std::cerr << "ftTransferModule::checkCRC(): sending CRC map request to source " << mit->first << std::endl ;
|
||||
#endif
|
||||
_crcmap_last_asked_time = now ;
|
||||
_crcreq_source = mit->first ;
|
||||
|
||||
mMultiplexor->sendCRC32MapRequest(mit->first,mHash);
|
||||
}
|
||||
break ;
|
||||
@ -735,6 +751,7 @@ bool ftTransferModule::checkCRC()
|
||||
}
|
||||
|
||||
_crcmap_state = FT_TM_CRC_MAP_STATE_NOCHECK ;
|
||||
_crcreq_source = "" ;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,6 +193,8 @@ private:
|
||||
CRC32Map _crcmap ;
|
||||
uint32_t _crcmap_state ;
|
||||
time_t _crcmap_last_asked_time ;
|
||||
time_t _crcmap_last_tunnel_keepup ;
|
||||
std::string _crcreq_source ;
|
||||
|
||||
ftFileStatus mFileStatus; //used for pause/resume file transfer
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user