2008-07-23 18:01:59 -04:00
/*
* libretroshare / src / ft : ftdatamultiplex . h
*
* File Transfer for RetroShare .
*
* Copyright 2008 by Robert Fernie .
*
* This library is free software ; you can redistribute it and / or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation .
*
* This library is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the GNU
* Library General Public License for more details .
*
* You should have received a copy of the GNU Library General Public
* License along with this library ; if not , write to the Free Software
* Foundation , Inc . , 59 Temple Place , Suite 330 , Boston , MA 02111 - 1307
* USA .
*
* Please report all bugs and problems to " retroshare@lunamutt.com " .
*
*/
/*
* ftDataMultiplexModule .
*
* This multiplexes the data from PQInterface to the ftTransferModules .
*/
# include "ft/ftdatamultiplex.h"
# include "ft/fttransfermodule.h"
# include "ft/ftfilecreator.h"
# include "ft/ftfileprovider.h"
# include "ft/ftsearch.h"
2011-11-03 19:18:00 -04:00
# include "util/rsdir.h"
2011-10-21 18:25:18 -04:00
# include <retroshare/rsturtle.h>
2008-07-23 18:01:59 -04:00
2008-08-03 08:45:53 -04:00
/* For Thread Behaviour */
const uint32_t DMULTIPLEX_MIN = 10 ; /* 1ms sleep */
const uint32_t DMULTIPLEX_MAX = 1000 ; /* 1 sec sleep */
const double DMULTIPLEX_RELAX = 0.5 ; /* ??? */
2008-08-23 17:28:08 -04:00
2012-03-15 15:55:43 -04:00
static const uint32_t MAX_CHECKING_CHUNK_WAIT_DELAY = 120 ; //! TTL for an inactive chunk
const uint32_t MAX_SIMULTANEOUS_CRC_REQUESTS = 20 ;
2008-10-29 16:58:23 -04:00
/******
* # define MPLEX_DEBUG 1
* * * * */
2008-08-03 08:45:53 -04:00
2008-07-23 18:01:59 -04:00
ftClient : : ftClient ( ftTransferModule * module , ftFileCreator * creator )
: mModule ( module ) , mCreator ( creator )
{
return ;
}
2010-01-11 11:00:42 -05:00
const uint32_t FT_DATA = 0x0001 ; // data cuhnk to be stored
const uint32_t FT_DATA_REQ = 0x0002 ; // data request to be treated
const uint32_t FT_CLIENT_CHUNK_MAP_REQ = 0x0003 ; // chunk map request to be treated by client
2010-07-21 19:14:10 -04:00
const uint32_t FT_SERVER_CHUNK_MAP_REQ = 0x0004 ; // chunk map request to be treated by server
const uint32_t FT_CRC32MAP_REQ = 0x0005 ; // crc32 map request to be treated by server
2012-03-15 15:55:43 -04:00
const uint32_t FT_CLIENT_CHUNK_CRC_REQ = 0x0006 ; // chunk sha1 crc request to be treated
2008-07-23 18:01:59 -04:00
2008-08-03 08:45:53 -04:00
ftRequest : : ftRequest ( uint32_t type , std : : string peerId , std : : string hash , uint64_t size , uint64_t offset , uint32_t chunk , void * data )
: mType ( type ) , mPeerId ( peerId ) , mHash ( hash ) , mSize ( size ) ,
2008-07-23 18:01:59 -04:00
mOffset ( offset ) , mChunk ( chunk ) , mData ( data )
{
return ;
}
2008-08-29 21:07:24 -04:00
ftDataMultiplex : : ftDataMultiplex ( std : : string ownId , ftDataSend * server , ftSearch * search )
2011-07-04 18:59:39 -04:00
: RsQueueThread ( DMULTIPLEX_MIN , DMULTIPLEX_MAX , DMULTIPLEX_RELAX ) , dataMtx ( " ftDataMultiplex " ) ,
2008-08-29 21:07:24 -04:00
mDataSend ( server ) , mSearch ( search ) , mOwnId ( ownId )
2008-07-23 18:01:59 -04:00
{
return ;
}
bool ftDataMultiplex : : addTransferModule ( ftTransferModule * mod , ftFileCreator * f )
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftClient > : : iterator it ;
if ( mClients . end ( ) ! = ( it = mClients . find ( mod - > hash ( ) ) ) )
{
/* error */
return false ;
}
mClients [ mod - > hash ( ) ] = ftClient ( mod , f ) ;
return true ;
}
2008-08-09 13:03:24 -04:00
bool ftDataMultiplex : : removeTransferModule ( std : : string hash )
2008-07-23 18:01:59 -04:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
2010-11-29 15:46:21 -05:00
2008-07-23 18:01:59 -04:00
std : : map < std : : string , ftClient > : : iterator it ;
2008-08-09 13:03:24 -04:00
if ( mClients . end ( ) = = ( it = mClients . find ( hash ) ) )
2008-07-23 18:01:59 -04:00
{
/* error */
return false ;
}
mClients . erase ( it ) ;
2010-11-29 15:46:21 -05:00
// This is very important to delete the hash from servers as well, because
// after removing the transfer module, ftController will delete the fileCreator.
// If the file creator is also a server in use, then it will cause a crash
// at the next server request.
//
// With the current action, the next server request will re-create the server as
// a ftFileProvider.
//
std : : map < std : : string , ftFileProvider * > : : iterator sit = mServers . find ( hash ) ;
if ( sit ! = mServers . end ( ) )
mServers . erase ( sit ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
2008-08-09 13:03:24 -04:00
bool ftDataMultiplex : : FileUploads ( std : : list < std : : string > & hashs )
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftFileProvider * > : : iterator sit ;
for ( sit = mServers . begin ( ) ; sit ! = mServers . end ( ) ; sit + + )
{
hashs . push_back ( sit - > first ) ;
}
return true ;
}
bool ftDataMultiplex : : FileDownloads ( std : : list < std : : string > & hashs )
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftClient > : : iterator cit ;
for ( cit = mClients . begin ( ) ; cit ! = mClients . end ( ) ; cit + + )
{
hashs . push_back ( cit - > first ) ;
}
return true ;
}
2010-10-27 16:01:31 -04:00
bool ftDataMultiplex : : FileDetails ( const std : : string & hash , uint32_t hintsflag , FileInfo & info )
2008-08-09 13:03:24 -04:00
{
2008-11-04 18:12:53 -05:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::FileDetails( " ;
std : : cerr < < hash < < " , " < < hintsflag < < " ) " ;
std : : cerr < < std : : endl ;
# endif
2008-08-09 13:03:24 -04:00
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
2010-01-02 16:30:19 -05:00
if ( hintsflag & RS_FILE_HINTS_DOWNLOAD )
2008-08-09 13:03:24 -04:00
{
2010-01-02 16:30:19 -05:00
std : : map < std : : string , ftClient > : : iterator cit ;
if ( mClients . end ( ) ! = ( cit = mClients . find ( hash ) ) )
{
2008-11-04 18:12:53 -05:00
# ifdef MPLEX_DEBUG
2010-01-02 16:30:19 -05:00
std : : cerr < < " ftDataMultiplex::FileDetails() " ;
std : : cerr < < " Found ftFileCreator! " ;
std : : cerr < < std : : endl ;
2008-11-04 18:12:53 -05:00
# endif
2010-01-02 16:30:19 -05:00
//(cit->second).mModule->FileDetails(info);
( cit - > second ) . mCreator - > FileDetails ( info ) ;
return true ;
}
2008-08-09 13:03:24 -04:00
}
2010-01-02 16:30:19 -05:00
if ( hintsflag & RS_FILE_HINTS_UPLOAD )
2008-08-09 13:03:24 -04:00
{
2010-01-02 16:30:19 -05:00
std : : map < std : : string , ftFileProvider * > : : iterator sit ;
sit = mServers . find ( hash ) ;
if ( sit ! = mServers . end ( ) )
{
2008-11-04 18:12:53 -05:00
# ifdef MPLEX_DEBUG
2010-01-02 16:30:19 -05:00
std : : cerr < < " ftDataMultiplex::FileDetails() " ;
std : : cerr < < " Found ftFileProvider! " ;
std : : cerr < < std : : endl ;
2008-11-04 18:12:53 -05:00
# endif
2010-01-02 16:30:19 -05:00
( sit - > second ) - > FileDetails ( info ) ;
return true ;
}
2008-08-09 13:03:24 -04:00
}
2008-11-04 18:12:53 -05:00
2010-01-02 16:30:19 -05:00
2008-11-04 18:12:53 -05:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::FileDetails() " ;
std : : cerr < < " Found nothing " ;
std : : cerr < < std : : endl ;
# endif
2008-08-09 13:03:24 -04:00
return false ;
}
2008-07-23 18:01:59 -04:00
/* data interface */
/*************** SEND INTERFACE (calls ftDataSend) *******************/
/* Client Send */
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : sendDataRequest ( const std : : string & peerId , const std : : string & hash , uint64_t size , uint64_t offset , uint32_t chunksize )
2008-07-23 18:01:59 -04:00
{
2008-08-23 17:28:08 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::sendDataRequest() Client Send " ;
std : : cerr < < std : : endl ;
# endif
2008-08-03 08:45:53 -04:00
return mDataSend - > sendDataRequest ( peerId , hash , size , offset , chunksize ) ;
2008-07-23 18:01:59 -04:00
}
/* Server Send */
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : sendData ( const std : : string & peerId , const std : : string & hash , uint64_t size , uint64_t offset , uint32_t chunksize , void * data )
2008-07-23 18:01:59 -04:00
{
2008-08-23 17:28:08 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::sendData() Server Send " ;
std : : cerr < < std : : endl ;
# endif
2008-08-03 08:45:53 -04:00
return mDataSend - > sendData ( peerId , hash , size , offset , chunksize , data ) ;
2008-07-23 18:01:59 -04:00
}
/*************** RECV INTERFACE (provides ftDataRecv) ****************/
/* Client Recv */
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : recvData ( const std : : string & peerId , const std : : string & hash , uint64_t size , uint64_t offset , uint32_t chunksize , void * data )
2008-07-23 18:01:59 -04:00
{
2008-08-23 17:28:08 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::recvData() Client Recv " ;
std : : cerr < < std : : endl ;
# endif
2008-07-23 18:01:59 -04:00
/* Store in Queue */
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
2010-01-11 11:00:42 -05:00
mRequestQueue . push_back ( ftRequest ( FT_DATA , peerId , hash , size , offset , chunksize , data ) ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
/* Server Recv */
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : recvDataRequest ( const std : : string & peerId , const std : : string & hash , uint64_t size , uint64_t offset , uint32_t chunksize )
2008-07-23 18:01:59 -04:00
{
2008-08-23 17:28:08 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::recvDataRequest() Server Recv " ;
std : : cerr < < std : : endl ;
# endif
2008-07-23 18:01:59 -04:00
/* Store in Queue */
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
mRequestQueue . push_back (
2008-08-03 08:45:53 -04:00
ftRequest ( FT_DATA_REQ , peerId , hash , size , offset , chunksize , NULL ) ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : recvChunkMapRequest ( const std : : string & peerId , const std : : string & hash , bool is_client )
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::recvChunkMapRequest() Server Recv " ;
std : : cerr < < std : : endl ;
# endif
/* Store in Queue */
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
if ( is_client )
mRequestQueue . push_back ( ftRequest ( FT_CLIENT_CHUNK_MAP_REQ , peerId , hash , 0 , 0 , 0 , NULL ) ) ;
else
mRequestQueue . push_back ( ftRequest ( FT_SERVER_CHUNK_MAP_REQ , peerId , hash , 0 , 0 , 0 , NULL ) ) ;
return true ;
}
2010-07-29 17:07:07 -04:00
bool ftDataMultiplex : : recvCRC32MapRequest ( const std : : string & peerId , const std : : string & hash )
2010-07-21 19:14:10 -04:00
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::recvChunkMapRequest() Server Recv " ;
std : : cerr < < std : : endl ;
# endif
/* Store in Queue */
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
mRequestQueue . push_back ( ftRequest ( FT_CRC32MAP_REQ , peerId , hash , 0 , 0 , 0 , NULL ) ) ;
2010-01-11 11:00:42 -05:00
2010-07-21 19:14:10 -04:00
return true ;
}
2012-03-15 15:55:43 -04:00
bool ftDataMultiplex : : recvSingleChunkCrcRequest ( const std : : string & peerId , const std : : string & hash , uint32_t chunk_number )
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::recvChunkMapRequest() Server Recv " ;
std : : cerr < < std : : endl ;
# endif
/* Store in Queue */
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
2008-07-23 18:01:59 -04:00
2012-03-15 15:55:43 -04:00
mRequestQueue . push_back ( ftRequest ( FT_CLIENT_CHUNK_CRC_REQ , peerId , hash , 0 , 0 , chunk_number , NULL ) ) ;
return true ;
}
2011-11-03 19:18:00 -04:00
class CRC32Thread : public RsThread
{
public :
CRC32Thread ( ftDataMultiplex * dataplex , const std : : string & peerId , const std : : string & hash )
: _plex ( dataplex ) , _finished ( false ) , _peerId ( peerId ) , _hash ( hash ) { }
virtual void run ( )
{
# ifdef MPLEX_DEBUG
std : : cerr < < " CRC32Thread is running for file " < < _hash < < std : : endl ;
# endif
_plex - > computeAndSendCRC32Map ( _peerId , _hash ) ;
_finished = true ;
}
bool finished ( ) { return _finished ; }
private :
ftDataMultiplex * _plex ;
bool _finished ;
std : : string _peerId ;
std : : string _hash ;
} ;
2008-07-23 18:01:59 -04:00
/*********** BACKGROUND THREAD OPERATIONS ***********/
2008-08-03 08:45:53 -04:00
bool ftDataMultiplex : : workQueued ( )
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
if ( mRequestQueue . size ( ) > 0 )
{
return true ;
}
if ( mSearchQueue . size ( ) > 0 )
{
return true ;
}
return false ;
}
bool ftDataMultiplex : : doWork ( )
{
bool doRequests = true ;
2011-11-03 19:18:00 -04:00
time_t now = time ( NULL ) ;
2008-08-03 08:45:53 -04:00
/* Handle All the current Requests */
while ( doRequests )
{
ftRequest req ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
if ( mRequestQueue . size ( ) = = 0 )
{
doRequests = false ;
continue ;
}
req = mRequestQueue . front ( ) ;
mRequestQueue . pop_front ( ) ;
}
/* MUTEX FREE */
switch ( req . mType )
{
2010-01-11 11:00:42 -05:00
case FT_DATA :
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
2010-01-11 11:00:42 -05:00
std : : cerr < < " ftDataMultiplex::doWork() Handling FT_DATA " ;
std : : cerr < < std : : endl ;
2008-08-29 21:07:24 -04:00
# endif
2010-01-11 11:00:42 -05:00
handleRecvData ( req . mPeerId , req . mHash , req . mSize , req . mOffset , req . mChunk , req . mData ) ;
break ;
2008-08-03 08:45:53 -04:00
2010-01-11 11:00:42 -05:00
case FT_DATA_REQ :
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
2010-01-11 11:00:42 -05:00
std : : cerr < < " ftDataMultiplex::doWork() Handling FT_DATA_REQ " ;
std : : cerr < < std : : endl ;
2008-08-29 21:07:24 -04:00
# endif
2010-01-11 11:00:42 -05:00
handleRecvDataRequest ( req . mPeerId , req . mHash , req . mSize , req . mOffset , req . mChunk ) ;
break ;
2008-08-03 08:45:53 -04:00
2010-01-11 11:00:42 -05:00
case FT_CLIENT_CHUNK_MAP_REQ :
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
2010-01-11 11:00:42 -05:00
std : : cerr < < " ftDataMultiplex::doWork() Handling FT_CLIENT_CHUNK_MAP_REQ " ;
std : : cerr < < std : : endl ;
2008-08-29 21:07:24 -04:00
# endif
2010-01-11 11:00:42 -05:00
handleRecvClientChunkMapRequest ( req . mPeerId , req . mHash ) ;
break ;
case FT_SERVER_CHUNK_MAP_REQ :
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::doWork() Handling FT_CLIENT_CHUNK_MAP_REQ " ;
std : : cerr < < std : : endl ;
# endif
handleRecvServerChunkMapRequest ( req . mPeerId , req . mHash ) ;
break ;
2010-07-21 19:14:10 -04:00
case FT_CRC32MAP_REQ :
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::doWork() Handling FT_CLIENT_CRC32_MAP_REQ " ;
std : : cerr < < std : : endl ;
# endif
2010-07-29 17:07:07 -04:00
handleRecvCRC32MapRequest ( req . mPeerId , req . mHash ) ;
2010-07-21 19:14:10 -04:00
break ;
2012-03-15 15:55:43 -04:00
case FT_CLIENT_CHUNK_CRC_REQ :
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::doWork() Handling FT_CLIENT_CHUNK_CRC_REQ " ;
std : : cerr < < std : : endl ;
# endif
handleRecvChunkCrcRequest ( req . mPeerId , req . mHash , req . mChunk ) ;
break ;
2010-01-11 11:00:42 -05:00
default :
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::doWork() Ignoring UNKNOWN " ;
std : : cerr < < std : : endl ;
# endif
break ;
2008-08-03 08:45:53 -04:00
}
}
2011-11-03 19:18:00 -04:00
// Look for potentially finished CRC32Map threads, and destroys them.
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
for ( std : : list < CRC32Thread * > : : iterator lit ( _crc32map_threads . begin ( ) ) ; lit ! = _crc32map_threads . end ( ) ; )
if ( ( * lit ) - > finished ( ) )
{
std : : cerr < < " ftDataMultiplex::doWork: thread " < < * lit < < " ended. Deleting it. " < < std : : endl ;
( * lit ) - > join ( ) ;
delete ( * lit ) ;
std : : list < CRC32Thread * > : : iterator tmp ( lit ) ;
+ + lit ;
_crc32map_threads . erase ( tmp ) ;
}
else
{
std : : cerr < < " ftDataMultiplex::doWork: thread " < < * lit < < " still working. Not quitting it. " < < std : : endl ;
+ + lit ;
}
// Take the opportunity to cleanup the list, so that it cannot grow indefinitely
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::doWork: Cleaning up list of cached maps. " < < std : : endl ;
# endif
// Keep CRC32 maps in cache for 30 mins max.
//
for ( std : : map < std : : string , std : : pair < time_t , CRC32Map > > : : iterator it = _cached_crc32maps . begin ( ) ; it ! = _cached_crc32maps . end ( ) ; )
if ( it - > second . first + 30 * 60 < now )
{
std : : cerr < < " Removing cached map for file " < < it - > first < < " that was kept for too long now. " < < std : : endl ;
std : : map < std : : string , std : : pair < time_t , CRC32Map > > : : iterator tmp ( it ) ;
+ + it ;
_cached_crc32maps . erase ( tmp ) ;
}
else
+ + it ;
}
2008-08-03 08:45:53 -04:00
/* Only Handle One Search Per Period....
2011-11-03 19:18:00 -04:00
* Lower Priority
2008-08-03 08:45:53 -04:00
*/
ftRequest req ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
if ( mSearchQueue . size ( ) = = 0 )
{
/* Finished */
return true ;
}
req = mSearchQueue . front ( ) ;
mSearchQueue . pop_front ( ) ;
}
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::doWork() Handling Search Request " ;
std : : cerr < < std : : endl ;
# endif
2010-01-11 11:00:42 -05:00
if ( handleSearchRequest ( req . mPeerId , req . mHash ) )
handleRecvDataRequest ( req . mPeerId , req . mHash , req . mSize , req . mOffset , req . mChunk ) ;
2008-08-03 08:45:53 -04:00
2011-11-03 19:18:00 -04:00
2008-08-03 08:45:53 -04:00
return true ;
}
2008-07-23 18:01:59 -04:00
2012-03-17 10:08:27 -04:00
bool ftDataMultiplex : : recvSingleChunkCrc ( const std : : string & peerId , const std : : string & hash , uint32_t chunk_number , const Sha1CheckSum & crc )
2012-03-15 15:55:43 -04:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2012-03-17 10:08:27 -04:00
std : : cerr < < " ftDataMultiplex::recvSingleChunkCrc() Received crc of file " < < hash < < " , from peer id " < < peerId < < " , chunk " < < chunk_number < < " , crc= " < < crc . toStdString ( ) < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2012-03-24 10:45:33 -04:00
// remove this chunk from the request list as well.
Sha1CacheEntry & sha1cache ( _cached_sha1maps [ hash ] ) ;
2012-07-19 16:52:04 -04:00
std : : map < uint32_t , std : : pair < time_t , ChunkCheckSumSourceList > > : : iterator it2 ( sha1cache . _to_ask . find ( chunk_number ) ) ;
2012-03-24 10:45:33 -04:00
if ( it2 ! = sha1cache . _to_ask . end ( ) )
sha1cache . _to_ask . erase ( it2 ) ;
// update the cache: get size from the client.
2012-03-15 15:55:43 -04:00
std : : map < std : : string , ftClient > : : iterator it = mClients . find ( hash ) ;
if ( it = = mClients . end ( ) )
{
std : : cerr < < " ftDataMultiplex::recvSingleChunkCrc() ERROR: No matching Client for CRC. This is an error. " < < hash < < " ! " < < std : : endl ;
/* error */
return false ;
}
// store in the cache as well
if ( sha1cache . _map . size ( ) = = 0 )
sha1cache . _map = Sha1Map ( it - > second . mCreator - > fileSize ( ) , ChunkMap : : CHUNKMAP_FIXED_CHUNK_SIZE ) ;
sha1cache . _map . set ( chunk_number , crc ) ;
sha1cache . _received . push_back ( chunk_number ) ;
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2012-03-15 15:55:43 -04:00
std : : cerr < < " ftDataMultiplex::recvSingleChunkCrc() stored in cache. " < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2012-03-15 15:55:43 -04:00
return true ;
}
bool ftDataMultiplex : : dispatchReceivedChunkCheckSum ( )
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
uint32_t MAX_CHECKSUM_CHECK_PER_FILE = 25 ;
2012-03-24 10:45:33 -04:00
for ( std : : map < std : : string , Sha1CacheEntry > : : iterator it ( _cached_sha1maps . begin ( ) ) ; it ! = _cached_sha1maps . end ( ) ; )
2012-03-15 15:55:43 -04:00
{
2012-03-24 10:45:33 -04:00
std : : map < std : : string , ftClient > : : iterator itc = mClients . find ( it - > first ) ;
2012-03-15 15:55:43 -04:00
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2012-03-24 10:45:33 -04:00
std : : cerr < < " ftDataMultiplex::dispatchReceivedChunkCheckSum(): treating hash " < < it - > first < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2012-03-15 15:55:43 -04:00
2012-03-24 10:45:33 -04:00
if ( itc = = mClients . end ( ) )
{
std : : cerr < < " ftDataMultiplex::dispatchReceivedChunkCheckSum() ERROR: No matching Client for hash. This is probably a late answer. Dropping the hash. Hash= " < < it - > first < < std : : endl ;
std : : map < std : : string , Sha1CacheEntry > : : iterator tmp ( it ) ;
+ + tmp ;
_cached_sha1maps . erase ( it ) ;
it = tmp ;
/* error */
continue ;
}
ftFileCreator * client = itc - > second . mCreator ;
for ( uint32_t n = 0 ; n < MAX_CHECKSUM_CHECK_PER_FILE & & ! it - > second . _received . empty ( ) ; + + n )
{
int chunk_number = it - > second . _received . back ( ) ;
2012-03-15 15:55:43 -04:00
if ( ! it - > second . _map . isSet ( chunk_number ) )
std : : cerr < < " ftDataMultiplex::dispatchReceivedChunkCheckSum() ERROR: chunk " < < chunk_number < < " is supposed to be initialized but it was not received !! " < < std : : endl ;
2012-03-24 10:45:33 -04:00
else
{
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2012-03-24 10:45:33 -04:00
std : : cerr < < " ftDataMultiplex::dispatchReceivedChunkCheckSum(): checking chunk " < < chunk_number < < " with hash " < < it - > second . _map [ chunk_number ] . toStdString ( ) < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2012-03-24 10:45:33 -04:00
client - > verifyChunk ( chunk_number , it - > second . _map [ chunk_number ] ) ;
}
2012-03-15 15:55:43 -04:00
it - > second . _received . pop_back ( ) ;
}
2012-03-24 10:45:33 -04:00
+ + it ;
2012-03-15 15:55:43 -04:00
}
return true ;
}
2011-08-12 09:42:30 -04:00
bool ftDataMultiplex : : recvCRC32Map ( const std : : string & /*peerId*/ , const std : : string & hash , const CRC32Map & crc_map )
2010-07-21 19:14:10 -04:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftClient > : : iterator it = mClients . find ( hash ) ;
if ( it = = mClients . end ( ) )
{
std : : cerr < < " ftDataMultiplex::recvCRCMap() ERROR: No matching Client for CRC32map. This is an error. " < < hash < < " ! " < < std : : endl ;
/* error */
return false ;
}
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::recvCRCMap() Passing crc map of file " < < hash < < " , to FT Module " < < std : : endl ;
# endif
( it - > second ) . mModule - > addCRC32Map ( crc_map ) ;
return true ;
}
2010-01-11 11:00:42 -05:00
// A chunk map has arrived. It can be two different situations:
// - an uploader has sent his chunk map, so we need to store it in the corresponding ftFileProvider
// - a source for a download has sent his chunk map, so we need to send it to the corresponding ftFileCreator.
//
bool ftDataMultiplex : : recvChunkMap ( const std : : string & peerId , const std : : string & hash , const CompressedChunkMap & compressed_map , bool client )
2009-12-28 16:11:00 -05:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
2010-01-11 11:00:42 -05:00
if ( client ) // is the chunk map for a client, or for a server ?
2009-12-28 16:11:00 -05:00
{
2010-01-11 11:00:42 -05:00
std : : map < std : : string , ftClient > : : iterator it = mClients . find ( hash ) ;
if ( it = = mClients . end ( ) )
{
2009-12-28 16:11:00 -05:00
# ifdef MPLEX_DEBUG
2010-01-11 11:00:42 -05:00
std : : cerr < < " ftDataMultiplex::recvChunkMap() ERROR: No matching Client for hash " < < hash < < " ! " ;
std : : cerr < < std : : endl ;
# endif
/* error */
return false ;
}
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::recvChunkMap() Passing map of file " < < hash < < " , to FT Module " ;
2009-12-28 16:11:00 -05:00
std : : cerr < < std : : endl ;
# endif
2010-01-11 11:00:42 -05:00
( it - > second ) . mCreator - > setSourceMap ( peerId , compressed_map ) ;
return true ;
2009-12-28 16:11:00 -05:00
}
2010-01-11 11:00:42 -05:00
else
{
std : : map < std : : string , ftFileProvider * > : : iterator it = mServers . find ( hash ) ;
2009-12-28 16:11:00 -05:00
2010-01-11 11:00:42 -05:00
if ( it = = mServers . end ( ) )
{
2009-12-28 16:11:00 -05:00
# ifdef MPLEX_DEBUG
2010-01-11 11:00:42 -05:00
std : : cerr < < " ftDataMultiplex::handleRecvChunkMap() ERROR: No matching file Provider for hash " < < hash ;
std : : cerr < < std : : endl ;
2009-12-28 16:11:00 -05:00
# endif
2010-03-28 16:46:45 -04:00
return false ;
2010-01-11 11:00:42 -05:00
}
2009-12-28 16:11:00 -05:00
2010-01-11 11:00:42 -05:00
it - > second - > setClientMap ( peerId , compressed_map ) ;
return true ;
}
return false ;
}
2011-11-03 19:18:00 -04:00
2010-07-29 17:07:07 -04:00
bool ftDataMultiplex : : handleRecvCRC32MapRequest ( const std : : string & peerId , const std : : string & hash )
2010-07-21 19:14:10 -04:00
{
2011-11-03 19:18:00 -04:00
bool found = false ;
CRC32Map cmap ;
// 1 - look into cache
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() : source " < < peerId < < " asked for CRC32 map for file " < < hash < < std : : endl ;
# endif
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , std : : pair < time_t , CRC32Map > > : : iterator it = _cached_crc32maps . find ( hash ) ;
if ( it ! = _cached_crc32maps . end ( ) )
{
cmap = it - > second . second ;
it - > second . first = time ( NULL ) ; // update time stamp
found = true ;
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() : CRC32 map found in cache !! " < < std : : endl ;
# endif
}
}
if ( found )
{
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2011-11-03 19:18:00 -04:00
std : : cerr < < " File CRC32 map was obtained successfully. Sending it. " < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2011-11-03 19:18:00 -04:00
mDataSend - > sendCRC32Map ( peerId , hash , cmap ) ;
return true ;
}
else
{
std : : cerr < < " File CRC32 Not found. Computing it. " < < std : : endl ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
if ( _crc32map_threads . size ( ) > 1 )
{
std : : cerr < < " Too many threads already computing CRC32Maps (2 is the current maximum)! Giving up. " < < std : : endl ;
return false ;
}
}
CRC32Thread * thread = new CRC32Thread ( this , peerId , hash ) ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
_crc32map_threads . push_back ( thread ) ;
}
thread - > start ( ) ;
return true ;
}
}
bool ftDataMultiplex : : computeAndSendCRC32Map ( const std : : string & peerId , const std : : string & hash )
{
bool found ;
2010-07-21 19:14:10 -04:00
std : : map < std : : string , ftFileProvider * > : : iterator it ;
2011-11-03 19:18:00 -04:00
std : : string filename ;
uint64_t filesize = 0 ;
2010-07-21 19:14:10 -04:00
2011-11-03 19:18:00 -04:00
// 1 - look into the list of servers
2010-07-21 19:14:10 -04:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
it = mServers . find ( hash ) ;
if ( it = = mServers . end ( ) )
found = false ;
}
2011-11-03 19:18:00 -04:00
// 2 - if not found, create a server.
//
2010-07-21 19:14:10 -04:00
if ( ! found )
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() ERROR: No matching file Provider for hash " < < hash ;
std : : cerr < < std : : endl ;
# endif
if ( ! handleSearchRequest ( peerId , hash ) )
return false ;
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() A new file Provider has been made up for hash " < < hash ;
std : : cerr < < std : : endl ;
# endif
}
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
it = mServers . find ( hash ) ;
if ( it = = mServers . end ( ) ) // handleSearchRequest should have filled mServers[hash], but we have been off-mutex since,
2011-11-03 19:18:00 -04:00
{
std : : cerr < < " Could definitely not find a provider for file " < < hash < < " . Maybe the file does not exist? " < < std : : endl ;
return false ; // so it's safer to check again.
}
else
{
filesize = it - > second - > fileSize ( ) ;
filename = it - > second - > fileName ( ) ;
}
2010-07-21 19:14:10 -04:00
}
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2011-11-03 19:18:00 -04:00
std : : cerr < < " Computing CRC32Map for file " < < filename < < " , hash= " < < hash < < " , size= " < < filesize < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2011-11-03 19:18:00 -04:00
2012-03-17 10:08:27 -04:00
FILE * fd = RsDirUtil : : rs_fopen ( filename . c_str ( ) , " rb " ) ;
2011-11-03 19:18:00 -04:00
if ( fd = = NULL )
{
std : : cerr < < " Could not open file " < < filename < < " for read!! CRC32Map computation cancelled. " < < std : : endl ;
return false ;
}
2010-07-21 19:14:10 -04:00
2011-11-03 19:18:00 -04:00
CRC32Map cmap ;
if ( ! RsDirUtil : : crc32File ( fd , filesize , ChunkMap : : CHUNKMAP_FIXED_CHUNK_SIZE , cmap ) )
{
std : : cerr < < " CRC32Map computation failed. " < < std : : endl ;
fclose ( fd ) ;
return false ;
}
fclose ( fd ) ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : cerr < < " File CRC32 was successfully computed. Storing it into cache. " < < std : : endl ;
_cached_crc32maps [ hash ] = std : : pair < time_t , CRC32Map > ( time ( NULL ) , cmap ) ;
}
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2011-11-03 19:18:00 -04:00
std : : cerr < < " File CRC32 was successfully computed. Sending it. " < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2010-07-21 19:14:10 -04:00
mDataSend - > sendCRC32Map ( peerId , hash , cmap ) ;
2011-11-03 19:18:00 -04:00
2010-07-21 19:14:10 -04:00
return true ;
}
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : handleRecvClientChunkMapRequest ( const std : : string & peerId , const std : : string & hash )
{
CompressedChunkMap cmap ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftClient > : : iterator it = mClients . find ( hash ) ;
2009-12-28 16:11:00 -05:00
2010-01-11 11:00:42 -05:00
if ( it = = mClients . end ( ) )
{
// If we can't find the client, it's not a problem. Chunk maps from
// clients are not essential, as they are only used for display.
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvServerChunkMapRequest() ERROR: No matching Client for hash " < < hash ;
std : : cerr < < " . Performing local search. " < < std : : endl ;
# endif
return false ;
}
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvServerChunkMapRequest() Sending map of file " < < hash < < " , to peer " < < peerId < < std : : endl ;
# endif
( it - > second ) . mCreator - > getAvailabilityMap ( cmap ) ;
}
2010-07-25 15:04:31 -04:00
mDataSend - > sendChunkMap ( peerId , hash , cmap , false ) ;
2010-01-11 11:00:42 -05:00
return true ;
2009-12-28 16:11:00 -05:00
}
2008-07-23 18:01:59 -04:00
2012-03-15 15:55:43 -04:00
bool ftDataMultiplex : : handleRecvChunkCrcRequest ( const std : : string & peerId , const std : : string & hash , uint32_t chunk_number )
{
// look into the sha1sum cache
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2012-03-15 15:55:43 -04:00
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() looking for chunk " < < chunk_number < < " for hash " < < hash < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2012-03-15 15:55:43 -04:00
Sha1CheckSum crc ;
bool found = false ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
Sha1CacheEntry & sha1cache ( _cached_sha1maps [ hash ] ) ;
sha1cache . last_activity = time ( NULL ) ; // update time_stamp
if ( sha1cache . _map . size ( ) > 0 & & sha1cache . _map . isSet ( chunk_number ) )
{
crc = sha1cache . _map [ chunk_number ] ;
found = true ;
}
}
if ( found )
{
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2012-03-15 15:55:43 -04:00
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() found in cache ! Sending " < < crc . toStdString ( ) < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2012-03-15 15:55:43 -04:00
mDataSend - > sendSingleChunkCRC ( peerId , hash , chunk_number , crc ) ;
return true ;
}
std : : map < std : : string , ftFileProvider * > : : iterator it ;
std : : string filename ;
uint64_t filesize = 0 ;
found = true ;
// 1 - look into the list of servers.Not clients ! Clients dont' have verified data.
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
it = mServers . find ( hash ) ;
if ( it = = mServers . end ( ) )
found = false ;
}
// 2 - if not found, create a server.
//
if ( ! found )
{
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() ERROR: No matching file Provider for hash " < < hash ;
std : : cerr < < std : : endl ;
# endif
2012-03-15 15:55:43 -04:00
if ( ! handleSearchRequest ( peerId , hash ) )
return false ;
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() A new file Provider has been made up for hash " < < hash ;
std : : cerr < < std : : endl ;
# endif
2012-03-15 15:55:43 -04:00
}
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
it = mServers . find ( hash ) ;
if ( it = = mServers . end ( ) ) // handleSearchRequest should have filled mServers[hash], but we have been off-mutex since,
{
std : : cerr < < " Could definitely not find a provider for file " < < hash < < " . Maybe the file does not exist? " < < std : : endl ;
return false ; // so it's safer to check again.
}
else
{
filesize = it - > second - > fileSize ( ) ;
filename = it - > second - > fileName ( ) ;
}
}
2012-03-17 10:45:52 -04:00
# ifdef MPLEX_DEBUG
2012-03-15 15:55:43 -04:00
std : : cerr < < " Computing Sha1 for chunk " < < chunk_number < < " of file " < < filename < < " , hash= " < < hash < < " , size= " < < filesize < < std : : endl ;
2012-03-17 10:45:52 -04:00
# endif
2012-03-15 15:55:43 -04:00
unsigned char * buf = new unsigned char [ ChunkMap : : CHUNKMAP_FIXED_CHUNK_SIZE ] ;
2012-03-17 10:08:27 -04:00
FILE * fd = RsDirUtil : : rs_fopen ( filename . c_str ( ) , " rb " ) ;
2012-03-15 15:55:43 -04:00
if ( fd = = NULL )
{
std : : cerr < < " Cannot read file " < < filename < < " . Something's wrong! " < < std : : endl ;
2012-03-19 09:50:33 -04:00
delete [ ] buf ;
2012-03-15 15:55:43 -04:00
return false ;
}
uint32_t len ;
if ( fseeko64 ( fd , ( uint64_t ) chunk_number * ( uint64_t ) ChunkMap : : CHUNKMAP_FIXED_CHUNK_SIZE , SEEK_SET ) ! = 0 | | 0 = = ( len = fread ( buf , 1 , ChunkMap : : CHUNKMAP_FIXED_CHUNK_SIZE , fd ) ) )
{
std : : cerr < < " Cannot fseek/read from file " < < filename < < " at position " < < ( uint64_t ) chunk_number * ( uint64_t ) ChunkMap : : CHUNKMAP_FIXED_CHUNK_SIZE < < std : : endl ;
fclose ( fd ) ;
}
fclose ( fd ) ;
crc = RsDirUtil : : sha1sum ( buf , len ) ;
2012-03-19 09:50:33 -04:00
delete [ ] buf ;
2012-03-15 15:55:43 -04:00
// update cache
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
Sha1CacheEntry & sha1cache ( _cached_sha1maps [ hash ] ) ;
if ( sha1cache . _map . size ( ) = = 0 )
sha1cache . _map = Sha1Map ( filesize , ChunkMap : : CHUNKMAP_FIXED_CHUNK_SIZE ) ;
sha1cache . _map . set ( chunk_number , crc ) ;
}
std : : cerr < < " Sending CRC of chunk " < < chunk_number < < " of file " < < filename < < " , hash= " < < hash < < " , size= " < < filesize < < " , crc= " < < crc . toStdString ( ) < < std : : endl ;
mDataSend - > sendSingleChunkCRC ( peerId , hash , chunk_number , crc ) ;
return true ;
}
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : handleRecvServerChunkMapRequest ( const std : : string & peerId , const std : : string & hash )
{
CompressedChunkMap cmap ;
std : : map < std : : string , ftFileProvider * > : : iterator it ;
bool found = true ;
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
it = mServers . find ( hash ) ;
if ( it = = mServers . end ( ) )
found = false ;
}
if ( ! found )
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() ERROR: No matching file Provider for hash " < < hash ;
std : : cerr < < std : : endl ;
# endif
if ( ! handleSearchRequest ( peerId , hash ) )
return false ;
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvChunkMapReq() A new file Provider has been made up for hash " < < hash ;
std : : cerr < < std : : endl ;
# endif
}
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
it = mServers . find ( hash ) ;
2010-05-14 07:29:15 -04:00
if ( it = = mServers . end ( ) ) // handleSearchRequest should have filled mServers[hash], but we have been off-mutex since,
return false ; // so it's safer to check again.
else
it - > second - > getAvailabilityMap ( cmap ) ;
2010-01-11 11:00:42 -05:00
}
2010-07-25 15:04:31 -04:00
mDataSend - > sendChunkMap ( peerId , hash , cmap , true ) ;
2010-01-11 11:00:42 -05:00
return true ;
}
bool ftDataMultiplex : : handleRecvData ( const std : : string & peerId ,
2011-08-12 09:42:30 -04:00
const std : : string & hash , uint64_t /*size*/ ,
2008-08-03 08:45:53 -04:00
uint64_t offset , uint32_t chunksize , void * data )
2008-07-23 18:01:59 -04:00
{
2010-06-21 15:10:55 -04:00
ftTransferModule * transfer_module = NULL ;
2008-07-23 18:01:59 -04:00
{
2010-06-21 15:10:55 -04:00
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftClient > : : iterator it ;
if ( mClients . end ( ) = = ( it = mClients . find ( hash ) ) )
{
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
2010-06-21 15:10:55 -04:00
std : : cerr < < " ftDataMultiplex::handleRecvData() ERROR: No matching Client! " ;
std : : cerr < < std : : endl ;
2008-08-29 21:07:24 -04:00
# endif
2010-06-21 15:10:55 -04:00
/* error */
return false ;
}
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
2010-06-21 15:10:55 -04:00
std : : cerr < < " ftDataMultiplex::handleRecvData() Passing to Module " ;
std : : cerr < < std : : endl ;
2008-08-29 21:07:24 -04:00
# endif
2010-06-21 15:10:55 -04:00
transfer_module = ( it - > second ) . mModule ;
}
transfer_module - > recvFileData ( peerId , offset , chunksize , data ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
/* called by ftTransferModule */
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : handleRecvDataRequest ( const std : : string & peerId , const std : : string & hash , uint64_t size , uint64_t offset , uint32_t chunksize )
2008-07-23 18:01:59 -04:00
{
/**** Find Files *****/
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftClient > : : iterator cit ;
2008-08-29 21:07:24 -04:00
if ( mOwnId = = peerId )
2008-07-23 18:01:59 -04:00
{
2008-08-29 21:07:24 -04:00
/* own requests must be passed to Servers */
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvData() OwnId, so skip Clients... " ;
std : : cerr < < std : : endl ;
# endif
}
else if ( mClients . end ( ) ! = ( cit = mClients . find ( hash ) ) )
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvData() Matched to a Client. " ;
std : : cerr < < std : : endl ;
# endif
2010-01-11 11:00:42 -05:00
locked_handleServerRequest ( ( cit - > second ) . mCreator , peerId , hash , size , offset , chunksize ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
std : : map < std : : string , ftFileProvider * > : : iterator sit ;
if ( mServers . end ( ) ! = ( sit = mServers . find ( hash ) ) )
{
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvData() Matched to a Provider. " ;
std : : cerr < < std : : endl ;
# endif
2010-01-11 11:00:42 -05:00
locked_handleServerRequest ( sit - > second , peerId , hash , size , offset , chunksize ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
2008-08-29 21:07:24 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleRecvData() No Match... adding to Search Queue. " ;
std : : cerr < < std : : endl ;
# endif
2008-07-23 18:01:59 -04:00
/* Add to Search Queue */
2010-01-11 11:00:42 -05:00
mSearchQueue . push_back ( ftRequest ( FT_DATA_REQ , peerId , hash , size , offset , chunksize , NULL ) ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
bool ftDataMultiplex : : locked_handleServerRequest ( ftFileProvider * provider ,
2008-08-03 08:45:53 -04:00
std : : string peerId , std : : string hash , uint64_t size ,
uint64_t offset , uint32_t chunksize )
2008-07-23 18:01:59 -04:00
{
2010-02-15 15:44:37 -05:00
if ( chunksize > std : : min ( size , uint64_t ( 10 * 1024 * 1024 ) ) )
{
std : : cerr < < " Warning: peer " < < peerId < < " is asking a large chunk (s= " < < chunksize < < " ) for hash " < < hash < < " , filesize= " < < size < < " . This is unexpected. " < < std : : endl ;
return false ;
}
2009-05-26 17:42:45 -04:00
void * data = malloc ( chunksize ) ;
2008-10-29 16:58:23 -04:00
2009-07-27 16:31:56 -04:00
if ( data = = NULL )
{
std : : cerr < < " WARNING: Could not allocate data for a chunksize of " < < chunksize < < std : : endl ;
return false ;
}
2008-10-29 16:58:23 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::locked_handleServerRequest() " ;
std : : cerr < < " \t peer: " < < peerId < < " hash: " < < hash ;
std : : cerr < < " size: " < < size ;
std : : cerr < < std : : endl ;
std : : cerr < < " \t offset: " < < offset ;
std : : cerr < < " chunksize: " < < chunksize < < " data: " < < data ;
std : : cerr < < std : : endl ;
# endif
2010-07-27 12:05:21 -04:00
if ( provider - > getFileData ( peerId , offset , chunksize , data ) )
2008-07-23 18:01:59 -04:00
{
/* send data out */
2008-08-03 08:45:53 -04:00
sendData ( peerId , hash , size , offset , chunksize , data ) ;
2008-07-23 18:01:59 -04:00
return true ;
}
2008-10-29 16:58:23 -04:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::locked_handleServerRequest() " ;
std : : cerr < < " FAILED " ;
std : : cerr < < std : : endl ;
# endif
2008-11-04 18:12:53 -05:00
free ( data ) ;
2008-10-29 16:58:23 -04:00
2008-07-23 18:01:59 -04:00
return false ;
}
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : getClientChunkMap ( const std : : string & upload_hash , const std : : string & peerId , CompressedChunkMap & cmap )
{
2010-02-15 15:44:37 -05:00
bool too_old = false ;
2010-01-11 11:00:42 -05:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftFileProvider * > : : iterator sit = mServers . find ( upload_hash ) ;
if ( mServers . end ( ) = = sit )
return false ;
sit - > second - > getClientMap ( peerId , cmap , too_old ) ;
}
// If the map is too old then we should ask an other map to the peer.
//
if ( too_old )
2010-07-25 15:04:31 -04:00
sendChunkMapRequest ( peerId , upload_hash , true ) ;
2010-01-11 11:00:42 -05:00
return true ;
}
2010-07-25 15:04:31 -04:00
bool ftDataMultiplex : : sendChunkMapRequest ( const std : : string & peer_id , const std : : string & hash , bool is_client )
2010-01-11 11:00:42 -05:00
{
2010-07-25 15:04:31 -04:00
return mDataSend - > sendChunkMapRequest ( peer_id , hash , is_client ) ;
2010-01-11 11:00:42 -05:00
}
2010-07-29 17:07:07 -04:00
bool ftDataMultiplex : : sendCRC32MapRequest ( const std : : string & peer_id , const std : : string & hash )
2010-07-21 19:14:10 -04:00
{
return mDataSend - > sendCRC32MapRequest ( peer_id , hash ) ;
}
2012-07-19 16:52:04 -04:00
bool ftDataMultiplex : : sendSingleChunkCRCRequests ( const std : : string & hash , const std : : vector < uint32_t > & to_ask )
2012-03-15 15:55:43 -04:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
// Put all requested chunks in the request queue.
Sha1CacheEntry & ce ( _cached_sha1maps [ hash ] ) ;
for ( uint32_t i = 0 ; i < to_ask . size ( ) ; + + i )
{
2012-07-19 16:52:04 -04:00
std : : pair < time_t , ChunkCheckSumSourceList > & list ( ce . _to_ask [ to_ask [ i ] ] ) ;
list . first = 0 ; // set last request time to 0
2012-03-15 15:55:43 -04:00
}
return true ;
}
void ftDataMultiplex : : handlePendingCrcRequests ( )
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
time_t now = time ( NULL ) ;
uint32_t n = 0 ;
2012-07-19 16:52:04 -04:00
// Go through the list of currently handled hashes. For each of them,
// look for pending chunk crc requests.
// - if the last request is too old, re-ask:
// - ask the file creator about the possible sources for this chunk => returns a list of active sources
// - among active sources, pick the one that has the smallest request time stamp, in the request list.
//
// With this, only active sources are querried.
//
2012-03-15 15:55:43 -04:00
for ( std : : map < std : : string , Sha1CacheEntry > : : iterator it ( _cached_sha1maps . begin ( ) ) ; it ! = _cached_sha1maps . end ( ) ; + + it )
2012-07-19 16:52:04 -04:00
for ( std : : map < uint32_t , std : : pair < time_t , ChunkCheckSumSourceList > > : : iterator it2 ( it - > second . _to_ask . begin ( ) ) ; it2 ! = it - > second . _to_ask . end ( ) ; + + it2 )
if ( it2 - > second . first + MAX_CHECKING_CHUNK_WAIT_DELAY < now ) // is the last request old enough?
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handlePendingCrcRequests(): Requesting sources for chunk " < < it2 - > first < < " , hash " < < it - > first < < std : : endl ;
# endif
// 0 - ask which sources can be used for this chunk
//
std : : map < std : : string , ftClient > : : const_iterator it4 ( mClients . find ( it - > first ) ) ;
if ( it4 = = mClients . end ( ) )
continue ;
std : : vector < std : : string > sources ;
it4 - > second . mCreator - > getSourcesList ( it2 - > first , sources ) ;
// 1 - go through all sources. Take the oldest one.
//
std : : string best_source ;
time_t oldest_timestamp = now ;
for ( uint32_t i = 0 ; i < sources . size ( ) ; + + i )
2012-03-15 15:55:43 -04:00
{
2012-03-24 10:45:33 -04:00
# ifdef MPLEX_DEBUG
2012-07-19 16:52:04 -04:00
std : : cerr < < " ftDataMultiplex::handlePendingCrcRequests(): Examining source " < < sources [ i ] < < std : : endl ;
2012-03-24 10:45:33 -04:00
# endif
2012-07-19 16:52:04 -04:00
std : : map < std : : string , time_t > : : const_iterator it3 ( it2 - > second . second . find ( sources [ i ] ) ) ;
if ( it3 = = it2 - > second . second . end ( ) ) // source not found. So this one is surely the oldest one to have been requested.
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handlePendingCrcRequests(): not found! So using it directly. " < < std : : endl ;
# endif
best_source = sources [ i ] ;
break ;
}
else if ( it3 - > second < = oldest_timestamp ) // do nothing, otherwise, ask again
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handlePendingCrcRequests(): not found! So using it directly. " < < std : : endl ;
# endif
best_source = sources [ i ] ;
oldest_timestamp = it3 - > second ;
}
# ifdef MPLEX_DEBUG
else
std : : cerr < < " ftDataMultiplex::handlePendingCrcRequests(): Source too recently used! So using it directly. " < < std : : endl ;
# endif
}
if ( best_source ! = " " )
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handlePendingCrcRequests(): Asking crc of chunk " < < it2 - > first < < " to peer " < < best_source < < " for hash " < < it - > first < < std : : endl ;
# endif
// Use the source to ask the CRC.
//
// sendSingleChunkCRCRequest(peer_id, hash, chunk_id)
//
mDataSend - > sendSingleChunkCRCRequest ( best_source , it - > first , it2 - > first ) ;
it2 - > second . second [ best_source ] = now ;
it2 - > second . first = now ;
2012-03-15 15:55:43 -04:00
if ( + + n > MAX_SIMULTANEOUS_CRC_REQUESTS )
return ;
}
2012-07-19 16:52:04 -04:00
# ifdef MPLEX_DEBUG
else
std : : cerr < < " ftDataMultiplex::handlePendingCrcRequests(): no source for chunk " < < it2 - > first < < std : : endl ;
# endif
}
2012-03-15 15:55:43 -04:00
}
2010-02-15 15:44:37 -05:00
void ftDataMultiplex : : deleteUnusedServers ( )
2009-02-09 15:26:12 -05:00
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
2010-02-15 15:44:37 -05:00
//scan the uploads list in ftdatamultiplex and delete the items which time out
time_t now = time ( NULL ) ;
2010-07-27 12:05:21 -04:00
2010-02-15 15:44:37 -05:00
for ( std : : map < std : : string , ftFileProvider * > : : iterator sit ( mServers . begin ( ) ) ; sit ! = mServers . end ( ) ; )
2010-07-27 12:05:21 -04:00
if ( sit - > second - > purgeOldPeers ( now , 10 ) )
{
2011-01-08 18:27:33 -05:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::deleteUnusedServers(): provider " < < ( void * ) sit - > second < < " has no active peers. Removing. Now= " < < now < < std : : endl ;
2010-02-15 15:44:37 -05:00
# endif
2010-07-27 12:05:21 -04:00
// We don't delete servers that are clients at the same time !
if ( dynamic_cast < ftFileCreator * > ( sit - > second ) = = NULL )
2011-01-08 18:27:33 -05:00
{
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::deleteUnusedServers(): deleting file provider " < < ( void * ) sit - > second < < std : : endl ;
# endif
2010-07-27 12:05:21 -04:00
delete sit - > second ;
2011-01-08 18:27:33 -05:00
}
# ifdef MPLEX_DEBUG
else
std : : cerr < < " ftDataMultiplex::deleteUnusedServers(): " < < ( void * ) sit - > second < < " was not deleted because it's also a file creator. " < < std : : endl ;
# endif
2009-02-09 15:26:12 -05:00
2010-07-27 12:05:21 -04:00
std : : map < std : : string , ftFileProvider * > : : iterator tmp ( sit ) ;
+ + tmp ;
2010-01-11 11:00:42 -05:00
2010-07-27 12:05:21 -04:00
mServers . erase ( sit ) ;
2010-02-15 15:44:37 -05:00
2010-07-27 12:05:21 -04:00
sit = tmp ;
}
else
+ + sit ;
2009-02-09 15:26:12 -05:00
}
2008-07-23 18:01:59 -04:00
2010-01-11 11:00:42 -05:00
bool ftDataMultiplex : : handleSearchRequest ( const std : : string & peerId , const std : : string & hash )
2008-07-23 18:01:59 -04:00
{
2008-11-04 18:12:53 -05:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleSearchRequest( " ;
2010-01-11 11:00:42 -05:00
std : : cerr < < peerId < < " , " < < hash < < " ...) " ;
2008-11-04 18:12:53 -05:00
std : : cerr < < std : : endl ;
# endif
2008-07-23 18:01:59 -04:00
/*
* Do Actual search
* Could be Cache File , Local or Extra
* ( anywhere but remote really )
2010-05-29 09:17:09 -04:00
*
* the network wide and browsable flags are needed , otherwise results get filtered .
* For tunnel creation , the check of browsable / network wide flag is already done , so
* if we get a file download packet here , the source is already allowed to download it .
*
2008-07-23 18:01:59 -04:00
*/
FileInfo info ;
2011-10-21 18:25:18 -04:00
uint32_t hintflags = RS_FILE_HINTS_EXTRA | RS_FILE_HINTS_LOCAL | RS_FILE_HINTS_SPEC_ONLY ;
if ( rsTurtle - > isTurtlePeer ( peerId ) )
hintflags | = RS_FILE_HINTS_NETWORK_WIDE ;
else
hintflags | = RS_FILE_HINTS_BROWSABLE | RS_FILE_HINTS_CACHE ;
2008-07-23 18:01:59 -04:00
2010-01-11 11:00:42 -05:00
if ( mSearch - > search ( hash , hintflags , info ) )
2008-07-23 18:01:59 -04:00
{
2011-01-08 18:27:33 -05:00
/* setup a new provider */
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
// We might already have a file provider, if two requests have got stacked in the request queue. So let's
// check that before.
2008-11-04 18:12:53 -05:00
# ifdef MPLEX_DEBUG
std : : cerr < < " ftDataMultiplex::handleSearchRequest( " ;
std : : cerr < < " Found Local File, sharing... " ;
# endif
2011-01-08 18:27:33 -05:00
std : : map < std : : string , ftFileProvider * > : : const_iterator it = mServers . find ( hash ) ;
ftFileProvider * provider ;
2008-11-04 18:12:53 -05:00
2011-01-08 18:27:33 -05:00
if ( it = = mServers . end ( ) )
{
provider = new ftFileProvider ( info . path , info . size , hash ) ;
mServers [ hash ] = provider ;
# ifdef MPLEX_DEBUG
std : : cerr < < " created new file provider " < < ( void * ) provider < < std : : endl ;
# endif
}
else
{
# ifdef MPLEX_DEBUG
std : : cerr < < " re-using existing file provider " < < ( void * ) it - > second < < std : : endl ;
# endif
}
2008-07-23 18:01:59 -04:00
return true ;
}
2010-01-11 11:00:42 -05:00
// Now check wether the required file is actually being downloaded. In such a case,
// setup the file provider to be the file creator itself. Warning: this server should not
// be deleted when not used anymore. We need to restrict this to client peers that are
// not ourself, since the file transfer also handles the local cache traffic (this
// is something to be changed soon!!)
//
if ( peerId ! = mOwnId )
{
RsStackMutex stack ( dataMtx ) ; /******* LOCK MUTEX ******/
std : : map < std : : string , ftClient > : : const_iterator it ( mClients . find ( hash ) ) ;
if ( it ! = mClients . end ( ) )
{
mServers [ hash ] = it - > second . mCreator ;
return true ;
}
}
2008-07-23 18:01:59 -04:00
return false ;
}