Merging branches/v0.6-initdev into trunk.

These split at 6672 -> 7075, so quite a bit merge.
libretroshare compiles - but untested.
retroshare-gui needs GenCertDialog.ui and IdEditDialog.ui to be properly merged. (compile errors).
some plugins will be broken.
retroshare-nogui is untested.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7078 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2014-02-01 14:16:15 +00:00
commit c0738eec7f
407 changed files with 23716 additions and 50779 deletions

View file

@ -64,10 +64,6 @@ class ftDataSend
virtual bool sendChunkMapRequest(const std::string& peer_id,const std::string& hash,bool is_client) = 0;
virtual bool sendChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap,bool is_client) = 0;
/// Send a request for a chunk crc map
virtual bool sendCRC32MapRequest(const std::string& peer_id,const std::string& hash) = 0;
/// Send a chunk crc map
virtual bool sendCRC32Map(const std::string& peer_id,const std::string& hash,const CRC32Map& crc_map) = 0;
/// Send a request for a chunk crc map
virtual bool sendSingleChunkCRCRequest(const std::string& peer_id,const std::string& hash,uint32_t chunk_number) = 0;
/// Send a chunk crc map
@ -94,11 +90,6 @@ class ftDataRecv
/// Send a chunk map
virtual bool recvChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap,bool is_client) = 0;
/// Send a request for a chunk map
virtual bool recvCRC32MapRequest(const std::string& peer_id,const std::string& hash) = 0;
/// Send a chunk map
virtual bool recvCRC32Map(const std::string& peer_id,const std::string& hash,const CRC32Map& crcmap) = 0;
virtual bool recvSingleChunkCRCRequest(const std::string& peer_id,const std::string& hash,uint32_t chunk_id) = 0;
virtual bool recvSingleChunkCRC(const std::string& peer_id,const std::string& hash,uint32_t chunk_id,const Sha1CheckSum& sum) = 0;

View file

@ -60,7 +60,7 @@ const uint32_t FT_DATA = 0x0001; // data cuhnk to be stored
const uint32_t FT_DATA_REQ = 0x0002; // data request to be treated
const uint32_t FT_CLIENT_CHUNK_MAP_REQ = 0x0003; // chunk map request to be treated by client
const uint32_t FT_SERVER_CHUNK_MAP_REQ = 0x0004; // chunk map request to be treated by server
const uint32_t FT_CRC32MAP_REQ = 0x0005; // crc32 map request to be treated by server
//const uint32_t FT_CRC32MAP_REQ = 0x0005; // crc32 map request to be treated by server
const uint32_t FT_CLIENT_CHUNK_CRC_REQ = 0x0006; // chunk sha1 crc request to be treated
ftRequest::ftRequest(uint32_t type, std::string peerId, std::string hash, uint64_t size, uint64_t offset, uint32_t chunk, void *data)
@ -273,19 +273,6 @@ bool ftDataMultiplex::recvChunkMapRequest(const std::string& peerId, const std::
return true;
}
bool ftDataMultiplex::recvCRC32MapRequest(const std::string& peerId, const std::string& hash)
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::recvChunkMapRequest() Server Recv";
std::cerr << std::endl;
#endif
/* Store in Queue */
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
mRequestQueue.push_back(ftRequest(FT_CRC32MAP_REQ,peerId,hash,0,0,0,NULL));
return true;
}
bool ftDataMultiplex::recvSingleChunkCRCRequest(const std::string& peerId, const std::string& hash,uint32_t chunk_number)
{
#ifdef MPLEX_DEBUG
@ -299,27 +286,6 @@ bool ftDataMultiplex::recvSingleChunkCRCRequest(const std::string& peerId, const
return true;
}
class CRC32Thread: public RsThread
{
public:
CRC32Thread(ftDataMultiplex *dataplex,const std::string& peerId,const std::string& hash)
: _plex(dataplex),_finished(false),_peerId(peerId),_hash(hash) {}
virtual void run()
{
#ifdef MPLEX_DEBUG
std::cerr << "CRC32Thread is running for file " << _hash << std::endl;
#endif
_plex->computeAndSendCRC32Map(_peerId,_hash) ;
_finished = true ;
}
bool finished() { return _finished ; }
private:
ftDataMultiplex *_plex ;
bool _finished ;
std::string _peerId ;
std::string _hash ;
};
/*********** BACKGROUND THREAD OPERATIONS ***********/
bool ftDataMultiplex::workQueued()
@ -341,7 +307,6 @@ bool ftDataMultiplex::workQueued()
bool ftDataMultiplex::doWork()
{
bool doRequests = true;
time_t now = time(NULL) ;
/* Handle All the current Requests */
while(doRequests)
@ -396,14 +361,6 @@ bool ftDataMultiplex::doWork()
handleRecvServerChunkMapRequest(req.mPeerId,req.mHash) ;
break ;
case FT_CRC32MAP_REQ:
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::doWork() Handling FT_CLIENT_CRC32_MAP_REQ";
std::cerr << std::endl;
#endif
handleRecvCRC32MapRequest(req.mPeerId,req.mHash) ;
break ;
case FT_CLIENT_CHUNK_CRC_REQ:
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::doWork() Handling FT_CLIENT_CHUNK_CRC_REQ";
@ -421,47 +378,6 @@ bool ftDataMultiplex::doWork()
}
}
// Look for potentially finished CRC32Map threads, and destroys them.
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
for(std::list<CRC32Thread*>::iterator lit(_crc32map_threads.begin());lit!=_crc32map_threads.end();)
if((*lit)->finished())
{
std::cerr << "ftDataMultiplex::doWork: thread " << *lit << " ended. Deleting it." << std::endl;
(*lit)->join() ;
delete (*lit) ;
std::list<CRC32Thread*>::iterator tmp(lit) ;
++lit ;
_crc32map_threads.erase(tmp) ;
}
else
{
std::cerr << "ftDataMultiplex::doWork: thread " << *lit << " still working. Not quitting it." << std::endl;
++lit ;
}
// Take the opportunity to cleanup the list, so that it cannot grow indefinitely
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::doWork: Cleaning up list of cached maps." << std::endl ;
#endif
// Keep CRC32 maps in cache for 30 mins max.
//
for(std::map<std::string,std::pair<time_t,CRC32Map> >::iterator it = _cached_crc32maps.begin();it!=_cached_crc32maps.end();)
if(it->second.first + 30*60 < now)
{
std::cerr << "Removing cached map for file " << it->first << " that was kept for too long now." << std::endl;
std::map<std::string,std::pair<time_t,CRC32Map> >::iterator tmp(it) ;
++it ;
_cached_crc32maps.erase(tmp) ;
}
else
++it ;
}
/* Only Handle One Search Per Period....
* Lower Priority
*/
@ -583,27 +499,6 @@ bool ftDataMultiplex::dispatchReceivedChunkCheckSum()
return true ;
}
bool ftDataMultiplex::recvCRC32Map(const std::string& /*peerId*/, const std::string& hash,const CRC32Map& crc_map)
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
std::map<std::string, ftClient>::iterator it = mClients.find(hash);
if(it == mClients.end())
{
std::cerr << "ftDataMultiplex::recvCRCMap() ERROR: No matching Client for CRC32map. This is an error. " << hash << " !" << std::endl;
/* error */
return false;
}
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::recvCRCMap() Passing crc map of file " << hash << ", to FT Module" << std::endl;
#endif
(it->second).mModule->addCRC32Map(crc_map);
return true ;
}
// A chunk map has arrived. It can be two different situations:
// - an uploader has sent his chunk map, so we need to store it in the corresponding ftFileProvider
// - a source for a download has sent his chunk map, so we need to send it to the corresponding ftFileCreator.
@ -654,152 +549,6 @@ bool ftDataMultiplex::recvChunkMap(const std::string& peerId, const std::string&
return false;
}
bool ftDataMultiplex::handleRecvCRC32MapRequest(const std::string& peerId, const std::string& hash)
{
bool found = false ;
CRC32Map cmap ;
// 1 - look into cache
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() : source " << peerId << " asked for CRC32 map for file " << hash << std::endl;
#endif
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
std::map<std::string,std::pair<time_t,CRC32Map> >::iterator it = _cached_crc32maps.find(hash) ;
if(it != _cached_crc32maps.end())
{
cmap = it->second.second ;
it->second.first = time(NULL) ; // update time stamp
found = true ;
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() : CRC32 map found in cache !!" << std::endl;
#endif
}
}
if(found)
{
#ifdef MPLEX_DEBUG
std::cerr << "File CRC32 map was obtained successfully. Sending it." << std::endl ;
#endif
mDataSend->sendCRC32Map(peerId,hash,cmap);
return true ;
}
else
{
std::cerr << "File CRC32 Not found. Computing it." << std::endl ;
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
if(_crc32map_threads.size() > 1)
{
std::cerr << "Too many threads already computing CRC32Maps (2 is the current maximum)! Giving up." << std::endl;
return false ;
}
}
CRC32Thread *thread = new CRC32Thread(this,peerId,hash);
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
_crc32map_threads.push_back(thread) ;
}
thread->start() ;
return true ;
}
}
bool ftDataMultiplex::computeAndSendCRC32Map(const std::string& peerId, const std::string& hash)
{
bool found ;
std::map<std::string, ftFileProvider *>::iterator it ;
std::string filename ;
uint64_t filesize =0;
// 1 - look into the list of servers
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
it = mServers.find(hash) ;
if(it == mServers.end())
found = false ;
}
// 2 - if not found, create a server.
//
if(!found)
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() ERROR: No matching file Provider for hash " << hash ;
std::cerr << std::endl;
#endif
if(!handleSearchRequest(peerId,hash))
return false ;
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() A new file Provider has been made up for hash " << hash ;
std::cerr << std::endl;
#endif
}
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
it = mServers.find(hash) ;
if(it == mServers.end()) // handleSearchRequest should have filled mServers[hash], but we have been off-mutex since,
{
std::cerr << "Could definitely not find a provider for file " << hash << ". Maybe the file does not exist?" << std::endl;
return false ; // so it's safer to check again.
}
else
{
filesize = it->second->fileSize() ;
filename = it->second->fileName() ;
}
}
#ifdef MPLEX_DEBUG
std::cerr << "Computing CRC32Map for file " << filename << ", hash=" << hash << ", size=" << filesize << std::endl;
#endif
FILE *fd = RsDirUtil::rs_fopen(filename.c_str(),"rb") ;
if(fd == NULL)
{
std::cerr << "Could not open file " << filename << " for read!! CRC32Map computation cancelled." << std::endl ;
return false ;
}
CRC32Map cmap ;
if(!RsDirUtil::crc32File(fd,filesize,ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE,cmap))
{
std::cerr << "CRC32Map computation failed." << std::endl ;
fclose(fd) ;
return false ;
}
fclose(fd) ;
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
std::cerr << "File CRC32 was successfully computed. Storing it into cache." << std::endl ;
_cached_crc32maps[hash] = std::pair<time_t,CRC32Map>(time(NULL),cmap) ;
}
#ifdef MPLEX_DEBUG
std::cerr << "File CRC32 was successfully computed. Sending it." << std::endl ;
#endif
mDataSend->sendCRC32Map(peerId,hash,cmap);
return true ;
}
bool ftDataMultiplex::handleRecvClientChunkMapRequest(const std::string& peerId, const std::string& hash)
{
CompressedChunkMap cmap ;
@ -1152,10 +901,6 @@ bool ftDataMultiplex::sendChunkMapRequest(const std::string& peer_id,const std::
{
return mDataSend->sendChunkMapRequest(peer_id,hash,is_client);
}
bool ftDataMultiplex::sendCRC32MapRequest(const std::string& peer_id,const std::string& hash)
{
return mDataSend->sendCRC32MapRequest(peer_id,hash);
}
bool ftDataMultiplex::sendSingleChunkCRCRequests(const std::string& hash, const std::vector<uint32_t>& to_ask)
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/

View file

@ -36,7 +36,6 @@ class ftTransferModule;
class ftFileProvider;
class ftFileCreator;
class ftSearch;
class CRC32Thread;
#include <string>
#include <list>
@ -121,11 +120,6 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
/* Server/client Send */
bool sendChunkMapRequest(const std::string& peerId, const std::string& hash,bool is_client) ;
/* Client Send */
bool sendCRC32MapRequest(const std::string& peerId, const std::string& hash) ;
/* called from a separate thread */
bool computeAndSendCRC32Map(const std::string& peerId, const std::string& hash) ;
/* called from a separate thread */
bool sendSingleChunkCRCRequests(const std::string& hash, const std::vector<uint32_t>& to_ask) ;
@ -143,10 +137,6 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
virtual bool recvChunkMapRequest(const std::string& peer_id,const std::string& hash,bool is_client) ;
/// Receive a chunk map
virtual bool recvChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap,bool is_client) ;
/// Receive a CRC map
virtual bool recvCRC32Map(const std::string& peer_id,const std::string& hash,const CRC32Map& crc_map) ;
/// Receive a CRC map request
virtual bool recvCRC32MapRequest(const std::string& peer_id,const std::string& hash) ;
virtual bool recvSingleChunkCRCRequest(const std::string& peer_id,const std::string& hash,uint32_t chunk_id) ;
virtual bool recvSingleChunkCRC(const std::string& peer_id,const std::string& hash,uint32_t chunk_id,const Sha1CheckSum& sum) ;
@ -170,7 +160,6 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
bool handleSearchRequest(const std::string& peerId, const std::string& hash);
bool handleRecvClientChunkMapRequest(const std::string& peerId, const std::string& hash) ;
bool handleRecvServerChunkMapRequest(const std::string& peerId, const std::string& hash) ;
bool handleRecvCRC32MapRequest(const std::string& peerId, const std::string& hash) ;
bool handleRecvChunkCrcRequest(const std::string& peerId, const std::string& hash,uint32_t chunk_id) ;
/* We end up doing the actual server job here */
@ -185,9 +174,6 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
std::list<ftRequest> mSearchQueue;
// std::map<std::string, time_t> mUnknownHashs;
std::list<CRC32Thread *> _crc32map_threads ;
std::map<std::string,std::pair<time_t,CRC32Map> > _cached_crc32maps ;
std::map<std::string,Sha1CacheEntry> _cached_sha1maps ; // one cache entry per file hash. Handled dynamically.
ftDataSend *mDataSend;

View file

@ -52,23 +52,29 @@ const int ftserverzone = 29539;
#include "pqi/pqi.h"
#include "pqi/p3linkmgr.h"
#include "serialiser/rsfiletransferitems.h"
#include "serialiser/rsserviceids.h"
/***
* #define SERVER_DEBUG 1
* #define DEBUG_TICK 1
* #define SERVER_DEBUG 1
* #define SERVER_DEBUG_CACHE 1
***/
static const time_t FILE_TRANSFER_LOW_PRIORITY_TASKS_PERIOD = 5 ; // low priority tasks handling every 5 seconds
/* Setup */
ftServer::ftServer(p3PeerMgr *pm, p3LinkMgr *lm)
: mP3iface(NULL), mPeerMgr(pm),
: p3Service(RS_SERVICE_TYPE_FILE_TRANSFER),
mPeerMgr(pm),
mLinkMgr(lm),
mCacheStrapper(NULL),
mFiStore(NULL), mFiMon(NULL),
mFtController(NULL), mFtExtra(NULL),
mFtDataplex(NULL), mFtSearch(NULL), srvMutex("ftServer")
{
mCacheStrapper = new ftCacheStrapper(lm);
mCacheStrapper = new ftCacheStrapper(lm);
addSerialType(new RsFileTransferSerialiser()) ;
}
void ftServer::setConfigDirectory(std::string path)
@ -88,11 +94,6 @@ void ftServer::setConfigDirectory(std::string path)
RsDirUtil::checkCreateDirectory(remotecachedir) ;
}
void ftServer::setP3Interface(P3Interface *pqi)
{
mP3iface = pqi;
}
/* Control Interface */
/* add Config Items (Extra, Controller) */
@ -182,16 +183,10 @@ void ftServer::StartupThreads()
/* Dataplex */
mFtDataplex->start();
/* start own thread */
start();
}
void ftServer::StopThreads()
{
/* stop own thread */
join();
/* stop Dataplex */
mFtDataplex->join();
@ -228,31 +223,22 @@ CacheTransfer *ftServer::getCacheTransfer()
return mFtController;
}
void ftServer::run()
/***************************************************************/
/********************** RsFiles Interface **********************/
/***************************************************************/
/***************************************************************/
/********************** Controller Access **********************/
/***************************************************************/
bool ftServer::ResumeTransfers()
{
while(isRunning())
{
mFtDataplex->deleteUnusedServers() ;
mFtDataplex->handlePendingCrcRequests() ;
mFtDataplex->dispatchReceivedChunkCheckSum() ;
#ifdef WIN32
Sleep(5000);
#else
sleep(5);
#endif
}
mFtController->activate();
return true;
}
/***************************************************************/
/********************** RsFiles Interface **********************/
/***************************************************************/
/***************************************************************/
/********************** Controller Access **********************/
/***************************************************************/
bool ftServer::checkHash(const std::string& hash,std::string& error_string)
{
static const uint32_t HASH_LENGTH = 40 ;
@ -465,15 +451,11 @@ RsTurtleGenericTunnelItem *ftServer::deserialiseItem(void *data,uint32_t size) c
#ifdef SERVER_DEBUG
std::cerr << "p3turtle: deserialising packet: " << std::endl ;
#endif
#ifdef SERVER_DEBUG
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (RS_SERVICE_TYPE_TURTLE != getRsItemService(rstype)))
{
#ifdef SERVER_DEBUG
std::cerr << " Wrong type !!" << std::endl ;
#endif
return NULL; /* wrong type */
}
#endif
switch(getRsItemSubType(rstype))
{
@ -857,10 +839,15 @@ bool ftServer::shareDownloadDirectory(bool share)
/**************** Config Interface *****************************/
/***************************************************************/
/* Key Functions to be overloaded for Full Configuration */
/* Key Functions to be overloaded for Full Configuration */
RsSerialiser *ftServer::setupSerialiser()
{
return NULL;
RsSerialiser *rss = new RsSerialiser ;
rss->addSerialType(new RsFileTransferSerialiser) ;
//rss->addSerialType(new RsGeneralConfigSerialiser());
return rss ;
}
bool ftServer::saveList(bool &/*cleanup*/, std::list<RsItem *>& /*list*/)
@ -887,6 +874,9 @@ bool ftServer::loadConfigMap(std::map<std::string, std::string> &/*configMap*/)
/* Client Send */
bool ftServer::sendDataRequest(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize)
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::sendDataRequest() to peer " << peerId << " for hash " << hash << ", offset=" << offset << ", chunk size="<< chunksize << std::endl;
#endif
if(mTurtleRouter->isTurtlePeer(peerId))
{
RsTurtleFileRequestItem *item = new RsTurtleFileRequestItem ;
@ -898,9 +888,10 @@ bool ftServer::sendDataRequest(const std::string& peerId, const std::string& has
}
else
{
std::cerr << "ftServer::sendDataRequest() " <<std::endl;
/* create a packet */
/* push to networking part */
RsFileRequest *rfi = new RsFileRequest();
RsFileTransferDataRequestItem *rfi = new RsFileTransferDataRequestItem();
/* id */
rfi->PeerId(peerId);
@ -913,7 +904,7 @@ bool ftServer::sendDataRequest(const std::string& peerId, const std::string& has
rfi->fileoffset = offset; /* ftr->offset; */
rfi->chunksize = chunksize; /* ftr->chunk; */
mP3iface->SendFileRequest(rfi);
sendItem(rfi);
}
return true;
@ -921,6 +912,9 @@ bool ftServer::sendDataRequest(const std::string& peerId, const std::string& has
bool ftServer::sendChunkMapRequest(const std::string& peerId,const std::string& hash,bool is_client)
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::sendChunkMapRequest() to peer " << peerId << " for hash " << hash << std::endl;
#endif
if(mTurtleRouter->isTurtlePeer(peerId))
{
RsTurtleFileMapRequestItem *item = new RsTurtleFileMapRequestItem ;
@ -930,7 +924,7 @@ bool ftServer::sendChunkMapRequest(const std::string& peerId,const std::string&
{
/* create a packet */
/* push to networking part */
RsFileChunkMapRequest *rfi = new RsFileChunkMapRequest();
RsFileTransferChunkMapRequestItem *rfi = new RsFileTransferChunkMapRequestItem();
/* id */
rfi->PeerId(peerId);
@ -939,7 +933,7 @@ bool ftServer::sendChunkMapRequest(const std::string& peerId,const std::string&
rfi->hash = hash; /* ftr->hash; */
rfi->is_client = is_client ;
mP3iface->SendFileChunkMapRequest(rfi);
sendItem(rfi);
}
return true ;
@ -947,6 +941,9 @@ bool ftServer::sendChunkMapRequest(const std::string& peerId,const std::string&
bool ftServer::sendChunkMap(const std::string& peerId,const std::string& hash,const CompressedChunkMap& map,bool is_client)
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::sendChunkMap() to peer " << peerId << " for hash " << hash << std::endl;
#endif
if(mTurtleRouter->isTurtlePeer(peerId))
{
RsTurtleFileMapItem *item = new RsTurtleFileMapItem ;
@ -957,7 +954,7 @@ bool ftServer::sendChunkMap(const std::string& peerId,const std::string& hash,co
{
/* create a packet */
/* push to networking part */
RsFileChunkMap *rfi = new RsFileChunkMap();
RsFileTransferChunkMapItem *rfi = new RsFileTransferChunkMapItem();
/* id */
rfi->PeerId(peerId);
@ -967,38 +964,17 @@ bool ftServer::sendChunkMap(const std::string& peerId,const std::string& hash,co
rfi->is_client = is_client; /* ftr->hash; */
rfi->compressed_map = map; /* ftr->hash; */
mP3iface->SendFileChunkMap(rfi);
sendItem(rfi);
}
return true ;
}
bool ftServer::sendCRC32MapRequest(const std::string& peerId,const std::string& hash)
{
if(mTurtleRouter->isTurtlePeer(peerId))
{
RsTurtleFileCrcRequestItem *item = new RsTurtleFileCrcRequestItem;
mTurtleRouter->sendTurtleData(peerId,item) ;
}
else
{
/* create a packet */
/* push to networking part */
RsFileCRC32MapRequest *rfi = new RsFileCRC32MapRequest();
/* id */
rfi->PeerId(peerId);
/* file info */
rfi->hash = hash; /* ftr->hash; */
mP3iface->SendFileCRC32MapRequest(rfi);
}
return true ;
}
bool ftServer::sendSingleChunkCRCRequest(const std::string& peerId,const std::string& hash,uint32_t chunk_number)
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::sendSingleCRCRequest() to peer " << peerId << " for hash " << hash << ", chunk number=" << chunk_number << std::endl;
#endif
if(mTurtleRouter->isTurtlePeer(peerId))
{
RsTurtleChunkCrcRequestItem *item = new RsTurtleChunkCrcRequestItem;
@ -1010,7 +986,7 @@ bool ftServer::sendSingleChunkCRCRequest(const std::string& peerId,const std::st
{
/* create a packet */
/* push to networking part */
RsFileSingleChunkCrcRequest *rfi = new RsFileSingleChunkCrcRequest();
RsFileTransferSingleChunkCrcRequestItem *rfi = new RsFileTransferSingleChunkCrcRequestItem();
/* id */
rfi->PeerId(peerId);
@ -1019,41 +995,17 @@ bool ftServer::sendSingleChunkCRCRequest(const std::string& peerId,const std::st
rfi->hash = hash; /* ftr->hash; */
rfi->chunk_number = chunk_number ;
mP3iface->SendFileSingleChunkCrcRequest(rfi);
sendItem(rfi);
}
return true ;
}
bool ftServer::sendCRC32Map(const std::string& peerId,const std::string& hash,const CRC32Map& crcmap)
{
if(mTurtleRouter->isTurtlePeer(peerId))
{
RsTurtleFileCrcItem *item = new RsTurtleFileCrcItem ;
item->crc_map = crcmap ;
mTurtleRouter->sendTurtleData(peerId,item) ;
}
else
{
/* create a packet */
/* push to networking part */
RsFileCRC32Map *rfi = new RsFileCRC32Map();
/* id */
rfi->PeerId(peerId);
/* file info */
rfi->hash = hash; /* ftr->hash; */
rfi->crc_map = crcmap; /* ftr->hash; */
mP3iface->SendFileCRC32Map(rfi);
}
return true ;
}
bool ftServer::sendSingleChunkCRC(const std::string& peerId,const std::string& hash,uint32_t chunk_number,const Sha1CheckSum& crc)
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::sendSingleCRC() to peer " << peerId << " for hash " << hash << ", chunk number=" << chunk_number << std::endl;
#endif
if(mTurtleRouter->isTurtlePeer(peerId))
{
RsTurtleChunkCrcItem *item = new RsTurtleChunkCrcItem;
@ -1066,7 +1018,7 @@ bool ftServer::sendSingleChunkCRC(const std::string& peerId,const std::string& h
{
/* create a packet */
/* push to networking part */
RsFileSingleChunkCrc *rfi = new RsFileSingleChunkCrc();
RsFileTransferSingleChunkCrcItem *rfi = new RsFileTransferSingleChunkCrcItem();
/* id */
rfi->PeerId(peerId);
@ -1076,16 +1028,12 @@ bool ftServer::sendSingleChunkCRC(const std::string& peerId,const std::string& h
rfi->check_sum = crc;
rfi->chunk_number = chunk_number;
mP3iface->SendFileSingleChunkCrc(rfi);
sendItem(rfi);
}
return true ;
}
//const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */
//const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */
const uint32_t MAX_FT_CHUNK = 8 * 1024; /* 16K */
/* Server Send */
bool ftServer::sendData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t baseoffset, uint32_t chunksize, void *data)
{
@ -1106,6 +1054,11 @@ bool ftServer::sendData(const std::string& peerId, const std::string& hash, uint
while(tosend > 0)
{
//static const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */
//static const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */
//
static const uint32_t MAX_FT_CHUNK = 8 * 1024; /* 16K */
/* workout size */
chunk = MAX_FT_CHUNK;
if (chunk > tosend)
@ -1135,7 +1088,7 @@ bool ftServer::sendData(const std::string& peerId, const std::string& hash, uint
}
else
{
RsFileData *rfd = new RsFileData();
RsFileTransferDataItem *rfd = new RsFileTransferDataItem();
/* set id */
rfd->PeerId(peerId);
@ -1153,7 +1106,7 @@ bool ftServer::sendData(const std::string& peerId, const std::string& hash, uint
/* file data */
rfd->fd.binData.setBinData( &(((uint8_t *) data)[offset]), chunk);
mP3iface->SendFileData(rfd);
sendItem(rfd);
/* print the data pointer */
#ifdef SERVER_DEBUG
@ -1176,6 +1129,8 @@ bool ftServer::sendData(const std::string& peerId, const std::string& hash, uint
return true;
}
// Dont delete the item. The client (p3turtle) is doing it after calling this.
//
void ftServer::receiveTurtleData(RsTurtleGenericTunnelItem *i,
const std::string& hash,
const std::string& virtual_peer_id,
@ -1186,6 +1141,9 @@ void ftServer::receiveTurtleData(RsTurtleGenericTunnelItem *i,
case RS_TURTLE_SUBTYPE_FILE_REQUEST:
{
RsTurtleFileRequestItem *item = dynamic_cast<RsTurtleFileRequestItem *>(i) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::receiveTurtleData(): received file data request for " << hash << " from peer " << virtual_peer_id << std::endl;
#endif
getMultiplexer()->recvDataRequest(virtual_peer_id,hash,0,item->chunk_offset,item->chunk_size) ;
}
break ;
@ -1193,6 +1151,9 @@ void ftServer::receiveTurtleData(RsTurtleGenericTunnelItem *i,
case RS_TURTLE_SUBTYPE_FILE_DATA :
{
RsTurtleFileDataItem *item = dynamic_cast<RsTurtleFileDataItem *>(i) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::receiveTurtleData(): received file data for " << hash << " from peer " << virtual_peer_id << std::endl;
#endif
getMultiplexer()->recvData(virtual_peer_id,hash,0,item->chunk_offset,item->chunk_size,item->chunk_data) ;
item->chunk_data = NULL ; // this prevents deletion in the destructor of RsFileDataItem, because data will be deleted
@ -1204,33 +1165,29 @@ void ftServer::receiveTurtleData(RsTurtleGenericTunnelItem *i,
case RS_TURTLE_SUBTYPE_FILE_MAP :
{
RsTurtleFileMapItem *item = dynamic_cast<RsTurtleFileMapItem *>(i) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::receiveTurtleData(): received chunk map for hash " << hash << " from peer " << virtual_peer_id << std::endl;
#endif
getMultiplexer()->recvChunkMap(virtual_peer_id,hash,item->compressed_map,direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
}
break ;
case RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST:
{
RsTurtleFileMapRequestItem *item = dynamic_cast<RsTurtleFileMapRequestItem *>(i) ;
//RsTurtleFileMapRequestItem *item = dynamic_cast<RsTurtleFileMapRequestItem *>(i) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::receiveTurtleData(): received chunkmap request for hash " << hash << " from peer " << virtual_peer_id << std::endl;
#endif
getMultiplexer()->recvChunkMapRequest(virtual_peer_id,hash,direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
}
break ;
case RS_TURTLE_SUBTYPE_FILE_CRC :
{
RsTurtleFileCrcItem *item = dynamic_cast<RsTurtleFileCrcItem *>(i) ;
getMultiplexer()->recvCRC32Map(virtual_peer_id,hash,item->crc_map) ;
}
break ;
case RS_TURTLE_SUBTYPE_FILE_CRC_REQUEST:
{
getMultiplexer()->recvCRC32MapRequest(virtual_peer_id,hash) ;
}
break ;
case RS_TURTLE_SUBTYPE_CHUNK_CRC :
{
RsTurtleChunkCrcItem *item = dynamic_cast<RsTurtleChunkCrcItem *>(i) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::receiveTurtleData(): received single chunk CRC for hash " << hash << " from peer " << virtual_peer_id << std::endl;
#endif
getMultiplexer()->recvSingleChunkCRC(virtual_peer_id,hash,item->chunk_number,item->check_sum) ;
}
break ;
@ -1238,6 +1195,9 @@ void ftServer::receiveTurtleData(RsTurtleGenericTunnelItem *i,
case RS_TURTLE_SUBTYPE_CHUNK_CRC_REQUEST:
{
RsTurtleChunkCrcRequestItem *item = dynamic_cast<RsTurtleChunkCrcRequestItem *>(i) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::receiveTurtleData(): received single chunk CRC request for hash " << hash << " from peer " << virtual_peer_id << std::endl;
#endif
getMultiplexer()->recvSingleChunkCRCRequest(virtual_peer_id,hash,item->chunk_number) ;
}
break ;
@ -1246,358 +1206,189 @@ void ftServer::receiveTurtleData(RsTurtleGenericTunnelItem *i,
}
}
/* NB: The rsCore lock must be activated before calling this.
* This Lock should be moved lower into the system...
* most likely destination is in ftServer.
*/
int ftServer::tick()
{
rslog(RSL_DEBUG_BASIC, ftserverzone,
"filedexserver::tick()");
bool moreToTick = false ;
if (mP3iface == NULL)
if(handleIncoming())
moreToTick = true;
if(handleCacheData())
moreToTick = true;
static time_t last_law_priority_tasks_handling_time = 0 ;
time_t now = time(NULL) ;
if(last_law_priority_tasks_handling_time + FILE_TRANSFER_LOW_PRIORITY_TASKS_PERIOD < now)
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::tick() ERROR: mP3iface == NULL";
#endif
last_law_priority_tasks_handling_time = now ;
rslog(RSL_DEBUG_BASIC, ftserverzone,
"filedexserver::tick() Invalid Interface()");
return 1;
mFtDataplex->deleteUnusedServers() ;
mFtDataplex->handlePendingCrcRequests() ;
mFtDataplex->dispatchReceivedChunkCheckSum() ;
}
int moreToTick = 0;
if (0 < mP3iface -> tick())
{
moreToTick = 1;
#ifdef DEBUG_TICK
std::cerr << "filedexserver::tick() moreToTick from mP3iface" << std::endl;
#endif
}
if (0 < handleInputQueues())
{
moreToTick = 1;
#ifdef DEBUG_TICK
std::cerr << "filedexserver::tick() moreToTick from InputQueues" << std::endl;
#endif
}
return moreToTick;
}
// This function needs to be divided up.
bool ftServer::handleInputQueues()
{
bool moreToTick = false;
if (handleCacheData())
moreToTick = true;
if (handleFileData())
moreToTick = true;
return moreToTick;
}
bool ftServer::handleCacheData()
bool ftServer::handleCacheData()
{
// get all the incoming results.. and print to the screen.
RsCacheRequest *cr;
RsCacheItem *ci;
std::list<std::pair<RsPeerId, RsCacheData> > cacheUpdates;
std::list<std::pair<RsPeerId, RsCacheData> >::iterator it;
int i=0 ;
// Loop through Search Results.
int i = 0;
int i_init = 0;
#ifdef SERVER_DEBUG
//std::cerr << "ftServer::handleCacheData()" << std::endl;
#ifdef SERVER_DEBUG_CACHE
std::cerr << "handleCacheData() " << std::endl;
#endif
while((ci = mP3iface -> GetSearchResult()) != NULL)
mCacheStrapper->getCacheUpdates(cacheUpdates);
for(it = cacheUpdates.begin(); it != cacheUpdates.end(); it++)
{
/* construct reply */
RsFileTransferCacheItem *ci = new RsFileTransferCacheItem();
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleCacheData() Recvd SearchResult (CacheResponse!)" << std::endl;
std::string out;
if (i++ == i_init)
{
out += "Recieved Search Results:\n";
}
ci -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
/* id from incoming */
ci -> PeerId(it->first);
ci -> file.hash = (it->second).hash;
ci -> file.name = (it->second).name;
ci -> file.path = ""; // (it->second).path;
ci -> file.filesize = (it->second).size;
ci -> cacheType = (it->second).cid.type;
ci -> cacheSubId = (it->second).cid.subid;
#ifdef SERVER_DEBUG_CACHE
std::string out2 = "Outgoing CacheStrapper Update -> RsCacheItem:\n";
ci -> print_string(out2);
std::cerr << out2 << std::endl;
#endif
/* these go to the CacheStrapper! */
RsCacheData data;
data.pid = ci->PeerId();
data.cid = CacheId(ci->cacheType, ci->cacheSubId);
data.path = ci->file.path;
data.name = ci->file.name;
data.hash = ci->file.hash;
data.size = ci->file.filesize;
data.recvd = time(NULL) ;
sendItem(ci);
mCacheStrapper->recvCacheResponse(data, time(NULL));
delete ci;
i++;
}
// now requested Searches.
i_init = i;
while((cr = mP3iface -> RequestedSearch()) != NULL)
{
#ifdef SERVER_DEBUG
/* just delete these */
std::string out = "Requested Search:\n";
cr -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
#endif
delete cr;
}
// Now handle it replacement (pushed cache results)
{
std::list<std::pair<RsPeerId, RsCacheData> > cacheUpdates;
std::list<std::pair<RsPeerId, RsCacheData> >::iterator it;
mCacheStrapper->getCacheUpdates(cacheUpdates);
for(it = cacheUpdates.begin(); it != cacheUpdates.end(); it++)
{
/* construct reply */
RsCacheItem *ci = new RsCacheItem();
/* id from incoming */
ci -> PeerId(it->first);
ci -> file.hash = (it->second).hash;
ci -> file.name = (it->second).name;
ci -> file.path = ""; // (it->second).path;
ci -> file.filesize = (it->second).size;
ci -> cacheType = (it->second).cid.type;
ci -> cacheSubId = (it->second).cid.subid;
#ifdef SERVER_DEBUG
std::string out2 = "Outgoing CacheStrapper Update -> RsCacheItem:\n";
ci -> print_string(out2);
std::cerr << out2 << std::endl;
#endif
//rslog(RSL_DEBUG_BASIC, ftserverzone, out2.str());
mP3iface -> SendSearchResult(ci);
i++;
}
}
return (i > 0);
return i>0 ;
}
bool ftServer::handleFileData()
int ftServer::handleIncoming()
{
// now File Input.
RsFileRequest *fr;
RsFileData *fd;
RsFileChunkMapRequest *fcmr;
RsFileChunkMap *fcm;
RsFileCRC32MapRequest *fccrcmr;
RsFileCRC32Map *fccrcm;
RsFileSingleChunkCrcRequest *fscrcr;
RsFileSingleChunkCrc *fscrc;
int nhandled = 0 ;
int i_init = 0;
int i = 0;
i_init = i;
while((fr = mP3iface -> GetFileRequest()) != NULL )
{
RsItem *item = NULL ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ftFiler Request" << std::endl;
std::string out;
if (i == i_init)
{
out += "Incoming(Net) File Item:\n";
}
fr -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
std::cerr << "ftServer::handleIncoming() " << std::endl;
#endif
i++; /* count */
mFtDataplex->recvDataRequest(fr->PeerId(),
fr->file.hash, fr->file.filesize,
fr->fileoffset, fr->chunksize);
FileInfo(ffr);
delete fr;
}
// now File Data.
i_init = i;
while((fd = mP3iface -> GetFileData()) != NULL )
while(NULL != (item = recvItem()))
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ftFiler Data" << std::endl;
std::cerr << "hash: " << fd->fd.file.hash;
std::cerr << " length: " << fd->fd.binData.bin_len;
std::cerr << " data: " << fd->fd.binData.bin_data;
std::cerr << std::endl;
nhandled++ ;
std::string out;
if (i == i_init)
switch(item->PacketSubType())
{
out += "Incoming(Net) File Data:\n";
}
fd -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
#endif
i++; /* count */
/* incoming data */
mFtDataplex->recvData(fd->PeerId(),
fd->fd.file.hash, fd->fd.file.filesize,
fd->fd.file_offset,
fd->fd.binData.bin_len,
fd->fd.binData.bin_data);
/* we've stolen the data part -> so blank before delete
*/
fd->fd.binData.TlvShallowClear();
delete fd;
}
// now file chunkmap requests
i_init = i;
while((fcmr = mP3iface -> GetFileChunkMapRequest()) != NULL )
{
case RS_PKT_SUBTYPE_FT_DATA_REQUEST:
{
RsFileTransferDataRequestItem *f = dynamic_cast<RsFileTransferDataRequestItem*>(item) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::string out;
if (i == i_init)
{
out += "Incoming(Net) File Data:\n";
}
fcmr -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
std::cerr << "ftServer::handleIncoming: received data request for hash " << f->file.hash << ", offset=" << f->fileoffset << ", chunk size=" << f->chunksize << std::endl;
#endif
i++; /* count */
mFtDataplex->recvDataRequest(f->PeerId(), f->file.hash, f->file.filesize, f->fileoffset, f->chunksize);
}
break ;
/* incoming data */
mFtDataplex->recvChunkMapRequest(fcmr->PeerId(), fcmr->hash,fcmr->is_client) ;
delete fcmr;
}
// now file chunkmaps
i_init = i;
while((fcm = mP3iface -> GetFileChunkMap()) != NULL )
{
case RS_PKT_SUBTYPE_FT_DATA:
{
RsFileTransferDataItem *f = dynamic_cast<RsFileTransferDataItem*>(item) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::string out;
if (i == i_init)
{
out += "Incoming(Net) File Data:\n";
}
fcm -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
std::cerr << "ftServer::handleIncoming: received data for hash " << f->fd.file.hash << ", offset=" << f->fd.file_offset << ", chunk size=" << f->fd.binData.bin_len << std::endl;
#endif
i++; /* count */
mFtDataplex->recvData(f->PeerId(), f->fd.file.hash, f->fd.file.filesize, f->fd.file_offset, f->fd.binData.bin_len, f->fd.binData.bin_data);
/* incoming data */
mFtDataplex->recvChunkMap(fcm->PeerId(), fcm->hash,fcm->compressed_map,fcm->is_client) ;
/* we've stolen the data part -> so blank before delete
*/
f->fd.binData.TlvShallowClear();
}
break ;
delete fcm;
}
// now file chunkmap requests
i_init = i;
while((fccrcmr = mP3iface -> GetFileCRC32MapRequest()) != NULL )
{
case RS_PKT_SUBTYPE_FT_CHUNK_MAP_REQUEST:
{
RsFileTransferChunkMapRequestItem *f = dynamic_cast<RsFileTransferChunkMapRequestItem*>(item) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::string out;
if (i == i_init)
{
out += "Incoming(Net) File Data:\n";
}
fccrcmr -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
std::cerr << "ftServer::handleIncoming: received chunkmap request for hash " << f->hash << ", client=" << f->is_client << std::endl;
#endif
i++; /* count */
mFtDataplex->recvChunkMapRequest(f->PeerId(), f->hash,f->is_client) ;
}
break ;
/* incoming data */
mFtDataplex->recvCRC32MapRequest(fccrcmr->PeerId(), fccrcmr->hash) ;
delete fccrcmr;
}
// now file chunkmaps
i_init = i;
while((fccrcm = mP3iface -> GetFileCRC32Map()) != NULL )
{
case RS_PKT_SUBTYPE_FT_CHUNK_MAP:
{
RsFileTransferChunkMapItem *f = dynamic_cast<RsFileTransferChunkMapItem*>(item) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::string out;
if (i == i_init)
{
out += "Incoming(Net) File Data:\n";
}
fccrcm -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
std::cerr << "ftServer::handleIncoming: received chunkmap for hash " << f->hash << ", client=" << f->is_client << /*", map=" << f->compressed_map <<*/ std::endl;
#endif
i++; /* count */
mFtDataplex->recvChunkMap(f->PeerId(), f->hash,f->compressed_map,f->is_client) ;
}
break ;
/* incoming data */
mFtDataplex->recvCRC32Map(fccrcm->PeerId(), fccrcm->hash,fccrcm->crc_map);
delete fccrcm;
}
// now file chunk crc requests
i_init = i;
while((fscrcr = mP3iface -> GetFileSingleChunkCrcRequest()) != NULL )
{
case RS_PKT_SUBTYPE_FT_CHUNK_CRC_REQUEST:
{
RsFileTransferSingleChunkCrcRequestItem *f = dynamic_cast<RsFileTransferSingleChunkCrcRequestItem*>(item) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::string out;
if (i == i_init)
{
out += "Incoming(Net) File CRC Request:\n";
}
fscrcr -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
std::cerr << "ftServer::handleIncoming: received single chunk crc req for hash " << f->hash << ", chunk number=" << f->chunk_number << std::endl;
#endif
i++; /* count */
mFtDataplex->recvSingleChunkCRCRequest(f->PeerId(), f->hash,f->chunk_number) ;
}
break ;
/* incoming data */
mFtDataplex->recvSingleChunkCRCRequest(fscrcr->PeerId(), fscrcr->hash,fscrcr->chunk_number) ;
delete fscrcr;
}
// now file chunkmaps
i_init = i;
while((fscrc = mP3iface -> GetFileSingleChunkCrc()) != NULL )
{
case RS_PKT_SUBTYPE_FT_CHUNK_CRC:
{
RsFileTransferSingleChunkCrcItem *f = dynamic_cast<RsFileTransferSingleChunkCrcItem *>(item) ;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::string out;
if (i == i_init)
{
out += "Incoming(Net) File Data:\n";
}
fscrc -> print_string(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out);
std::cerr << "ftServer::handleIncoming: received single chunk crc req for hash " << f->hash << ", chunk number=" << f->chunk_number << ", checksum = " << f->check_sum << std::endl;
#endif
i++; /* count */
mFtDataplex->recvSingleChunkCRC(f->PeerId(), f->hash,f->chunk_number,f->check_sum);
}
break ;
/* incoming data */
mFtDataplex->recvSingleChunkCRC(fscrc->PeerId(), fscrc->hash,fscrc->chunk_number,fscrc->check_sum);
case RS_PKT_SUBTYPE_FT_CACHE_ITEM:
{
RsFileTransferCacheItem *ci = dynamic_cast<RsFileTransferCacheItem*>(item) ;
#ifdef SERVER_DEBUG_CACHE
std::cerr << "ftServer::handleIncoming: received cache item hash=" << ci->file.hash << ". from peer " << ci->PeerId() << std::endl;
#endif
/* these go to the CacheStrapper! */
RsCacheData data;
data.pid = ci->PeerId();
data.cid = CacheId(ci->cacheType, ci->cacheSubId);
data.path = ci->file.path;
data.name = ci->file.name;
data.hash = ci->file.hash;
data.size = ci->file.filesize;
data.recvd = time(NULL) ;
delete fscrcr;
mCacheStrapper->recvCacheResponse(data, time(NULL));
}
break ;
// case RS_PKT_SUBTYPE_FT_CACHE_REQUEST:
// {
// // do nothing
// RsFileTransferCacheRequestItem *cr = dynamic_cast<RsFileTransferCacheRequestItem*>(item) ;
//#ifdef SERVER_DEBUG_CACHE
// std::cerr << "ftServer::handleIncoming: received cache request from peer " << cr->PeerId() << std::endl;
//#endif
// }
// break ;
}
delete item ;
}
if (i > 0)
{
return 1;
}
return 0;
return nhandled;
}
/**********************************
@ -1617,10 +1408,3 @@ bool ftServer::addConfiguration(p3ConfigMgr *cfgmgr)
return true;
}
bool ftServer::ResumeTransfers()
{
mFtController->activate();
return true;
}

View file

@ -45,6 +45,7 @@
#include "ft/ftdata.h"
#include "turtle/turtleclientservice.h"
#include "services/p3service.h"
#include "retroshare/rsfiles.h"
//#include "dbase/cachestrapper.h"
@ -73,7 +74,7 @@ class ftDwlQueue;
class p3PeerMgr;
class p3LinkMgr;
class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService, public RsThread
class ftServer: public p3Service, public RsFiles, public ftDataSend, public RsTurtleClientService
{
public:
@ -87,8 +88,6 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
/* Assign important variables */
void setConfigDirectory(std::string path);
void setP3Interface(P3Interface *pqi);
/* add Config Items (Extra, Controller) */
void addConfigComponents(p3ConfigMgr *mgr);
@ -101,12 +100,6 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
void SetupFtServer() ;
virtual void connectToTurtleRouter(p3turtle *p) ;
void StartupThreads();
void StopThreads();
/* own thread */
virtual void run();
// Checks that the given hash is well formed. Used to chase
// string bugs.
static bool checkHash(const std::string& hash,std::string& error_string) ;
@ -125,6 +118,9 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
/************** (Implements RsFiles) ***************************/
/***************************************************************/
void StartupThreads();
void StopThreads();
// member access
ftDataMultiplex *getMultiplexer() const { return mFtDataplex ; }
@ -238,8 +234,6 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
virtual bool sendDataRequest(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize);
virtual bool sendChunkMapRequest(const std::string& peer_id,const std::string& hash,bool is_client) ;
virtual bool sendChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap,bool is_client) ;
virtual bool sendCRC32MapRequest(const std::string&, const std::string&) ;
virtual bool sendCRC32Map(const std::string&, const std::string&, const CRC32Map&) ;
virtual bool sendSingleChunkCRCRequest(const std::string& peer_id,const std::string& hash,uint32_t chunk_number) ;
virtual bool sendSingleChunkCRC(const std::string& peer_id,const std::string& hash,uint32_t chunk_number,const Sha1CheckSum& crc) ;
@ -250,17 +244,12 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
bool addConfiguration(p3ConfigMgr *cfgmgr);
bool ResumeTransfers();
private:
bool handleInputQueues();
bool handleCacheData();
bool handleFileData();
/******************* p3 Config Overload ************************/
protected:
/* Key Functions to be overloaded for Full Configuration */
virtual RsSerialiser *setupSerialiser();
virtual bool saveList(bool &cleanup, std::list<RsItem *>&);
virtual bool loadList(std::list<RsItem *>& load);
virtual bool loadList(std::list<RsItem *>& load);
private:
bool loadConfigMap(std::map<std::string, std::string> &configMap);
@ -268,6 +257,10 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
/*************************** p3 Config Overload ********************/
protected:
int handleIncoming() ;
bool handleCacheData() ;
private:
/**** INTERNAL FUNCTIONS ***/
@ -279,9 +272,6 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
/* no need for Mutex protection -
* as each component is protected independently.
*/
P3Interface *mP3iface; /* XXX THIS NEEDS PROTECTION */
p3PeerMgr *mPeerMgr;
p3LinkMgr *mLinkMgr;
@ -295,7 +285,6 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
ftDataMultiplex *mFtDataplex;
p3turtle *mTurtleRouter ;
ftFileSearch *mFtSearch;
ftDwlQueue *mFtDwlQueue;
@ -304,7 +293,6 @@ class ftServer: public RsFiles, public ftDataSend, public RsTurtleClientService,
std::string mConfigPath;
std::string mDownloadPath;
std::string mPartialsPath;
};

View file

@ -530,9 +530,6 @@ int ftTransferModule::tick()
case FT_TM_FLAG_CHECKING: // Check if file hash matches the hashed data
checkFile() ;
break ;
case FT_TM_FLAG_CHUNK_CRC: // File is waiting for CRC32 map. Check if received, and re-set matched chunks
checkCRC() ;
break ;
default:
break;
}
@ -648,171 +645,6 @@ void ftTransferModule::forceCheck()
mFileStatus.stat = ftFileStatus::PQIFILE_DOWNLOADING;
}
bool ftTransferModule::checkCRC()
{
// We have a finite state machine here.
//
// The states are, for each chunk, and what should be done:
// DONT_HAVE
// -> ask for the chunk CRC
// ASKED
// -> do nothing
// RECEIVED
// -> check the chunk
// CHECKED
// -> do nothing
//
// CRCs are asked by group of CRC_REQUEST_MAX_SIZE chunks at a time. The response may contain a different
// number of CRCs, depending on the availability.
//
// Server side:
// - Only complete files can compute CRCs, as the CRCs of downloaded chunks in an unchecked file are un-verified.
// - CRCs of files are cached, so that they don't get computed twice.
//
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): looking for CRC32 map." << std::endl ;
#endif
// Loop over chunks. Collect the ones to ask for, and hash the ones received.
// If we have
//
time_t now = time(NULL) ;
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/
switch(_crcmap_state)
{
case FT_TM_CRC_MAP_STATE_NOCHECK:
_crcreq_source = "" ;
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): state is NOCHECK. Doing nothing." << std::endl ;
#endif
break ;
case FT_TM_CRC_MAP_STATE_DONT_HAVE:
{
// Check wether we have a CRC map or not.
//
std::cerr << "FT_TM_CRC_MAP_STATE_ASKED: last time is " << _crcmap_last_asked_time << std::endl ;
std::cerr << "FT_TM_CRC_MAP_STATE_ASKED: now is " << now << std::endl ;
uint64_t threshold = (uint64_t)(FT_TM_CRC_MAP_MAX_WAIT_PER_GIG * (1+mSize/float(1024ull*1024ull*1024ull))) ;
std::cerr << "Threshold is " << threshold << std::endl;
std::cerr << "Limit is " << (uint64_t)_crcmap_last_asked_time + threshold << std::endl ;
std::cerr << "Requested source is \"" << _crcreq_source << "\"" << std::endl;
if( _crcreq_source != "" && (uint64_t)_crcmap_last_tunnel_keepup + 10 <= (uint64_t)now)
{
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): sending keepup to source " << _crcreq_source << std::endl ;
#endif
locked_requestData(_crcreq_source,0,(uint32_t)std::min((uint64_t)512,mSize));
_crcmap_last_tunnel_keepup = now ;
}
if( (uint64_t)_crcmap_last_asked_time + threshold > (uint64_t)now)
{
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): state is NOCHECK. Doing nothing." << std::endl ;
#endif
break ;
}
// Ask the ones we should ask for. We use a very coarse strategy now: we
// send the request to a random source. We'll make this more sensible
// later.
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): state is DONT_HAVE or last request is too old. Selecting a source for asking a CRC map." << std::endl ;
#endif
if(mFileSources.empty())
{
std::cerr << "ftTransferModule::checkCRC(): No sources available for checking file " << mHash << ": waiting 3 additional sec." <<std::endl ;
_crcmap_last_asked_time = now - threshold + 3 ;
break ;
}
bool found = false ;
for(std::map<std::string,peerInfo>::const_iterator mit = mFileSources.begin();mit != mFileSources.end();++mit)
if(mFileCreator->sourceIsComplete(mit->first))
{
//#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): sending CRC map request to source " << mit->first << std::endl ;
//#endif
_crcmap_last_asked_time = now ;
_crcreq_source = mit->first ;
mMultiplexor->sendCRC32MapRequest(mit->first,mHash);
found = true ;
break ;
}
}
break ;
case FT_TM_CRC_MAP_STATE_HAVE:
{
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): got a CRC32 map. Matching chunks..." << std::endl ;
#endif
// The CRCmap is complete. Let's cross check it with existing chunks in ftFileCreator !
//
uint32_t bad_chunks ;
uint32_t incomplete_chunks ;
if(!mFileCreator->crossCheckChunkMap(_crcmap,bad_chunks,incomplete_chunks))
{
std::cerr << "ftTransferModule::checkCRC(): could not check CRC chunks!" << std::endl ;
return false;
}
// go back to download stage, if not all chunks are correct.
//
if(bad_chunks > 0)
{
mFlag = FT_TM_FLAG_DOWNLOADING ;
mFileStatus.stat = ftFileStatus::PQIFILE_DOWNLOADING;
std::cerr << "ftTransferModule::checkCRC(): Done. File has errors: " << bad_chunks << " bad chunks found. Restarting download for these chunks only." << std::endl ;
}
else if(incomplete_chunks > 0)
{
mFlag = FT_TM_FLAG_DOWNLOADING ;
mFileStatus.stat = ftFileStatus::PQIFILE_DOWNLOADING;
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): Done. all chunks ok. Continuing download for remaining chunks." << std::endl ;
#endif
}
else
{
// We do as if the file is not complete. This way, it finishes properly.
//
//mFlag = FT_TM_FLAG_COMPLETE ; // Transfer is complete.
mFlag = FT_TM_FLAG_DOWNLOADING ;
mFileStatus.stat = ftFileStatus::PQIFILE_DOWNLOADING;
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::checkCRC(): Done. CRC check is ok, file is complete." << std::endl ;
#endif
}
_crcmap_state = FT_TM_CRC_MAP_STATE_NOCHECK ;
_crcreq_source = "" ;
}
}
return true ;
}
void ftTransferModule::addCRC32Map(const CRC32Map& crc_map)
{
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/
// Note: for now, we only send complete CRC32 maps, so the crc_map is always complete. When we
// send partial CRC maps, we will have to check them for completness.
//
_crcmap_state = FT_TM_CRC_MAP_STATE_HAVE ;
_crcmap = crc_map ;
}
void ftTransferModule::adjustSpeed()
{
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/