Improvements/bugfixes to File Transfer.

* Lots more debugging messages.
 * Fixed Sleep / sleep issue on windows.
 * added pthread / WSAStartup.
 * added ownId to ftDataMultiplex for loopback file transfer.
 * now start ftDataMultiplex thread.
 * several bugfixes in ftfilecreator



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@710 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-08-30 01:07:24 +00:00
parent 059effcfac
commit f7fca4295b
15 changed files with 312 additions and 26 deletions

View File

@ -311,14 +311,27 @@ bool ftController::FileDownloads(std::list<std::string> &hashs)
/* Directory Handling */
bool ftController::setDownloadDirectory(std::string path)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::setDownloadDirectory(" << path << ")";
std::cerr << std::endl;
#endif
/* check if it exists */
if (RsDirUtil::checkCreateDirectory(path))
{
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
mDownloadPath = path;
#ifdef CONTROL_DEBUG
std::cerr << "ftController::setDownloadDirectory() Okay!";
std::cerr << std::endl;
#endif
return true;
}
#ifdef CONTROL_DEBUG
std::cerr << "ftController::setDownloadDirectory() Failed";
std::cerr << std::endl;
#endif
return false;
}

View File

@ -58,9 +58,9 @@ ftRequest::ftRequest(uint32_t type, std::string peerId, std::string hash, uint64
return;
}
ftDataMultiplex::ftDataMultiplex(ftDataSend *server, ftSearch *search)
ftDataMultiplex::ftDataMultiplex(std::string ownId, ftDataSend *server, ftSearch *search)
:RsQueueThread(DMULTIPLEX_MIN, DMULTIPLEX_MAX, DMULTIPLEX_RELAX),
mDataSend(server), mSearch(search)
mDataSend(server), mSearch(search), mOwnId(ownId)
{
return;
}
@ -247,16 +247,28 @@ bool ftDataMultiplex::doWork()
switch(req.mType)
{
case FT_DATA:
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::doWork() Handling FT_DATA";
std::cerr << std::endl;
#endif
handleRecvData(req.mPeerId, req.mHash, req.mSize,
req.mOffset, req.mChunk, req.mData);
break;
case FT_DATA_REQ:
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::doWork() Handling FT_DATA_REQ";
std::cerr << std::endl;
#endif
handleRecvDataRequest(req.mPeerId, req.mHash,
req.mSize, req.mOffset, req.mChunk);
break;
default:
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::doWork() Ignoring UNKNOWN";
std::cerr << std::endl;
#endif
break;
}
}
@ -278,6 +290,10 @@ bool ftDataMultiplex::doWork()
mSearchQueue.pop_front();
}
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::doWork() Handling Search Request";
std::cerr << std::endl;
#endif
handleSearchRequest(req.mPeerId, req.mHash, req.mSize,
req.mOffset, req.mChunk);
@ -293,10 +309,19 @@ bool ftDataMultiplex::handleRecvData(std::string peerId,
std::map<std::string, ftClient>::iterator it;
if (mClients.end() == (it = mClients.find(hash)))
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvData() ERROR: No matching Client!";
std::cerr << std::endl;
#endif
/* error */
return false;
}
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvData() Passing to Module";
std::cerr << std::endl;
#endif
(it->second).mModule->recvFileData(peerId, offset, chunksize, data);
return true;
@ -312,8 +337,20 @@ bool ftDataMultiplex::handleRecvDataRequest(std::string peerId,
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
std::map<std::string, ftClient>::iterator cit;
if (mClients.end() != (cit = mClients.find(hash)))
if (mOwnId == peerId)
{
/* own requests must be passed to Servers */
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvData() OwnId, so skip Clients...";
std::cerr << std::endl;
#endif
}
else if (mClients.end() != (cit = mClients.find(hash)))
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvData() Matched to a Client.";
std::cerr << std::endl;
#endif
locked_handleServerRequest((cit->second).mCreator,
peerId, hash, size, offset, chunksize);
return true;
@ -322,11 +359,19 @@ bool ftDataMultiplex::handleRecvDataRequest(std::string peerId,
std::map<std::string, ftFileProvider *>::iterator sit;
if (mServers.end() != (sit = mServers.find(hash)))
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvData() Matched to a Provider.";
std::cerr << std::endl;
#endif
locked_handleServerRequest(sit->second,
peerId, hash, size, offset, chunksize);
return true;
}
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handleRecvData() No Match... adding to Search Queue.";
std::cerr << std::endl;
#endif
/* Add to Search Queue */
mSearchQueue.push_back(

View File

@ -84,7 +84,7 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
public:
ftDataMultiplex(ftDataSend *server, ftSearch *search);
ftDataMultiplex(std::string ownId, ftDataSend *server, ftSearch *search);
/* ftController Interface */
bool addTransferModule(ftTransferModule *mod, ftFileCreator *f);
@ -153,6 +153,7 @@ bool locked_handleServerRequest(ftFileProvider *provider,
ftDataSend *mDataSend;
ftSearch *mSearch;
std::string mOwnId;
};
#endif

View File

@ -32,6 +32,10 @@
*
*/
#ifdef WIN32
#include "util/rswin.h"
#endif
#include "ft/ftextralist.h"
#include "ft/ftdatamultiplex.h"
#include "ft/ftfilesearch.h"
@ -93,7 +97,7 @@ int main(int argc, char **argv)
ftDataSend *ftds = new ftDataSendDummy();
/* setup Actual Test bit */
ftDataMultiplex *ftmplex = new ftDataMultiplex(ftds, ftfs);
ftDataMultiplex *ftmplex = new ftDataMultiplex("ownId", ftds, ftfs);
ftmplex->start();
/* Setup Search with some valid results */

View File

@ -23,6 +23,10 @@
*
*/
#ifdef WIN32
#include "util/rswin.h"
#endif
#include "ft/ftextralist.h"
extern "C" void* runExtraList(void* p)
@ -37,6 +41,7 @@ extern "C" void* runExtraList(void* p)
{
//eList->tick();
sleep(1);
}
delete eList;
@ -113,6 +118,7 @@ int main(int argc, char **argv)
while(1)
{
sleep(period);
displayExtraListDetails(eList, toHash, hashed);
}
}

View File

@ -14,6 +14,18 @@ hash, std::string chunker): ftFileProvider(path,size,hash)
/*
* FIXME any inits to do?
*/
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator()";
std::cerr << std::endl;
std::cerr << "\tpath: " << path;
std::cerr << std::endl;
std::cerr << "\tsize: " << size;
std::cerr << std::endl;
std::cerr << "\thash: " << hash;
std::cerr << std::endl;
#endif
initialize(chunker);
}
@ -36,7 +48,10 @@ int ftFileCreator::initializeFileAttrs()
/*
* check if the file exists
*/
std::cout << "ftFileProvider::initializeFileAttrs() Filename: " << file_name;
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::initializeFileAttrs() Filename: " << file_name;
std::cerr << std::endl;
#endif
/*
* attempt to open file in writing mode
@ -45,7 +60,9 @@ int ftFileCreator::initializeFileAttrs()
fd = fopen(file_name.c_str(), "w+b");
if (!fd)
{
std::cout << "ftFileProvider::initializeFileAttrs() Failed to open (w+b): " << file_name << std::endl;
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::initializeFileAttrs() Failed to open (w+b): " << file_name << std::endl;
#endif
return 0;
}
@ -56,23 +73,38 @@ int ftFileCreator::initializeFileAttrs()
*/
if (0 != fseek(fd, 0L, SEEK_END))
{
std::cerr << "ftFileProvider::initializeFileAttrs() Seek Failed" << std::endl;
std::cerr << "ftFileCreator::initializeFileAttrs() Seek Failed" << std::endl;
return 0;
}
/*
* get the file length
*/
recv_size = ftell(fd);
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::initializeFileAttrs() recv_size: " << recv_size << std::endl;
#endif
return 1;
}
bool ftFileCreator::addFileData(uint64_t offset, uint32_t chunk_size, void *data)
{
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::addFileData(";
std::cerr << offset;
std::cerr << ", " << chunk_size;
std::cerr << ", " << data << ")";
std::cerr << std::endl;
#endif
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
/* check the status */
if (fd==NULL)
{
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::addFileData() initialising";
std::cerr << std::endl;
#endif
int init = initializeFileAttrs();
if (init ==0) {
std::cerr <<"Initialization Failed" << std::endl;
@ -116,6 +148,13 @@ bool ftFileCreator::addFileData(uint64_t offset, uint32_t chunk_size, void *data
pos = ftell(fd);
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::addFileData() added Data...";
std::cerr << std::endl;
std::cerr << "recvd: " << recv_size;
std::cerr << " pos: " << pos;
std::cerr << std::endl;
#endif
/*
* Notify ftFileChunker about chunks received
*/
@ -190,10 +229,16 @@ int ftFileChunker::splitFile(){
#ifdef FILE_DEBUG
std::cerr << "ftFileChunker::splitFile()";
std::cerr << std::endl;
std::cerr << "\tfile_size: " << file_size;
std::cerr << std::endl;
std::cerr << "\tstd_chunk_size: " << std_chunk_size;
std::cerr << std::endl;
std::cerr << "\tnum_chunks: " << num_chunks;
std::cerr << std::endl;
std::cerr << "\trem: " << rem;
std::cerr << std::endl;
std::cerr << "\tmax_chunk_size: " << max_chunk_size;
std::cerr << std::endl;
#endif
time_t now = time(NULL);
@ -322,12 +367,26 @@ bool ftFileChunker::getMissingChunk(uint64_t &offset, uint32_t &chunk)
if (!found)
{
#ifdef FILE_DEBUG
std::cerr << "ftFileChunker::getMissingChuck() FULL CHUNK not found";
std::cerr << std::endl;
#endif
i=0;
uint64_t max = allocationTable.at(i)->max_chunk_size;
uint64_t size = max;
int maxi = -1;
if (max > 0)
{
maxi = 0;
}
while(i<allocationTable.size())
{
#ifdef FILE_DEBUG
std::cerr << "Checking(" << i << ") " <<
allocationTable.at(i)->max_chunk_size;
std::cerr << std::endl;
#endif
size = allocationTable.at(i)->max_chunk_size;
if(size > max)
{
@ -338,6 +397,12 @@ bool ftFileChunker::getMissingChunk(uint64_t &offset, uint32_t &chunk)
}
if (maxi > -1) //maxi or max
{
#ifdef FILE_DEBUG
std::cerr << "Biggest Avail Chunk: " << max;
std::cerr << std::endl;
#endif
offset = allocationTable.at(maxi)->offset;
chunk = allocationTable.at(maxi)->max_chunk_size;
chunks_after = chunk/std_chunk_size; //10KB
@ -361,6 +426,7 @@ bool ftFileChunker::getMissingChunk(uint64_t &offset, uint32_t &chunk)
allocationTable.at(maxi)->timestamp = time(NULL);
allocationTable.at(maxi)->chunk_status = ftChunk::ALLOCATED;
found = true;
i = maxi;
}
} //if not found
@ -368,6 +434,9 @@ bool ftFileChunker::getMissingChunk(uint64_t &offset, uint32_t &chunk)
if (found)
{
// i represents index to chunk(s) we will use.
// chunks_after is the count of how many we will use.
std::cout << "Chunks remaining " << chunks_rem << std::endl;
/*
* update all previous chunks max available size
@ -399,6 +468,15 @@ bool ftFileChunker::getMissingChunk(uint64_t &offset, uint32_t &chunk)
}
}
else
{
#ifdef FILE_DEBUG
std::cerr << "ftFileChunker::getMissingChuck() not found";
std::cerr << std::endl;
#endif
}
return found;
}
@ -492,9 +570,31 @@ int ftFileChunker::notifyReceived(uint64_t offset, uint32_t chunk_size)
RsStackMutex stack(chunkerMutex); /********** STACK LOCKED MTX ******/
int index = offset / std_chunk_size;
/* should receive a multiple of chunk_size.... */
uint32_t chunks = chunk_size / std_chunk_size;
uint32_t rem_chunk = chunk_size % std_chunk_size;
#ifdef FILE_DEBUG
std::cerr << "ftFileChunkerr::notifyReceived(";
std::cerr << offset;
std::cerr << ", " << chunk_size << ")";
std::cerr << std::endl;
std::cerr << "\t# complete chunks: " << chunks;
std::cerr << std::endl;
std::cerr << "\trem_chunk: " << rem_chunk;
std::cerr << std::endl;
#endif
if(allocationTable.at(index)->chunk_status == ftChunk::ALLOCATED){
allocationTable.at(index)->chunk_status = ftChunk::RECEIVED;
aggregate_status += ftChunk::RECEIVED;
#ifdef FILE_DEBUG
std::cerr << "ftFileChunkerr::notifyReceived()";
std::cerr << " flagged as RECVD";
std::cerr << std::endl;
#endif
}
return aggregate_status;
}

View File

@ -101,12 +101,12 @@ void ftServer::SetupFtServer(NotifyBase *cb)
mFtSearch = new ftFileSearch();
/* Transport */
mFtDataplex = new ftDataMultiplex(this, mFtSearch);
mFtDataplex = new ftDataMultiplex(ownId, this, mFtSearch);
/* make Controller */
mFtController = new ftController(mCacheStrapper, mFtDataplex, mConfigPath);
mFtController -> setFtSearch(mFtSearch);
std::string tmppath = "./";
std::string tmppath = ".";
mFtController->setPartialsDirectory(tmppath);
mFtController->setDownloadDirectory(tmppath);
@ -150,6 +150,9 @@ void ftServer::StartupThreads()
/* Controller thread */
mFtController->start();
/* Dataplex */
mFtDataplex->start();
/* start own thread */
start();
}

View File

@ -29,6 +29,10 @@
* Put it all together, and make it compile.
*/
#ifdef WIN32
#include "util/rswin.h"
#endif
#include "ft/ftserver.h"
#include "ft/ftextralist.h"
@ -68,6 +72,29 @@ int main(int argc, char **argv)
std::map<std::string, ftServer *> mFtServers;
std::map<std::string, p3ConnectMgr *> mConnMgrs;
#ifdef PTW32_STATIC_LIB
pthread_win32_process_attach_np();
#endif
#ifdef WIN32
// Windows Networking Init.
WORD wVerReq = MAKEWORD(2,2);
WSADATA wsaData;
if (0 != WSAStartup(wVerReq, &wsaData))
{
std::cerr << "Failed to Startup Windows Networking";
std::cerr << std::endl;
}
else
{
std::cerr << "Started Windows Networking";
std::cerr << std::endl;
}
#endif
while(-1 != (c = getopt(argc, argv, "d:p:s")))
{
switch (c)
@ -95,12 +122,14 @@ int main(int argc, char **argv)
std::cerr << "Missing Files" << std::endl;
usage(argv[0]);
}
std::cerr << "Point 1" << std::endl;
for(; optind < argc; optind++)
{
std::cerr << "Adding: " << argv[optind] << std::endl;
fileList.push_back(std::string(argv[optind]));
}
std::cerr << "Point 2" << std::endl;
/* We need to setup a series 2 - 4 different ftServers....
*
@ -115,8 +144,10 @@ int main(int argc, char **argv)
std::list<pqiAuthDetails> baseFriendList, friendList;
std::list<pqiAuthDetails>::iterator fit;
std::cerr << "Point 3" << std::endl;
P3Hub *testHub = new P3Hub();
testHub->start();
std::cerr << "Point 4" << std::endl;
/* Setup Base Friend Info */
for(it = peerIds.begin(); it != peerIds.end(); it++)
@ -133,13 +164,15 @@ int main(int argc, char **argv)
std::cerr << "ftserver1test::setup peer: " << *it;
std::cerr << std::endl;
}
std::cerr << "Point 5" << std::endl;
std::ostringstream pname;
pname << "/tmp/rstst-" << time(NULL);
pname << "./tmp/rstst-" << time(NULL);
std::string basepath = pname.str();
RsDirUtil::checkCreateDirectory(basepath);
std::cerr << "Point 6" << std::endl;
for(it = peerIds.begin(); it != peerIds.end(); it++)

View File

@ -27,6 +27,11 @@
* ftServer2Test - Demonstrates how to check for test stuff.
*/
#ifdef WIN32
#include "util/rswin.h"
#endif
#include "ft/ftserver.h"
#include "ft/ftextralist.h"
@ -83,6 +88,29 @@ int main(int argc, char **argv)
std::list<ftServer *> mOtherServers;
std::list<std::string>::iterator eit;
#ifdef PTW32_STATIC_LIB
pthread_win32_process_attach_np();
#endif
#ifdef WIN32
// Windows Networking Init.
WORD wVerReq = MAKEWORD(2,2);
WSADATA wsaData;
if (0 != WSAStartup(wVerReq, &wsaData))
{
std::cerr << "Failed to Startup Windows Networking";
std::cerr << std::endl;
}
else
{
std::cerr << "Started Windows Networking";
std::cerr << std::endl;
}
#endif
while(-1 != (c = getopt(argc, argv, "asd:p:e:")))
{
switch (c)
@ -156,7 +184,7 @@ int main(int argc, char **argv)
}
std::ostringstream pname;
pname << "/tmp/rstst-" << time(NULL);
pname << "./tmp/rstst-" << time(NULL);
std::string basepath = pname.str();
RsDirUtil::checkCreateDirectory(basepath);

View File

@ -157,10 +157,19 @@ bool ftTransferModule::getChunk(uint64_t &offset, uint32_t &chunk_size)
bool val = mFileCreator->getMissingChunk(offset, chunk_size);
std::cerr << "ftTransferModule::getChunk()";
std::cerr << " Answer: offset: " << offset;
std::cerr << " chunk_size: " << chunk_size;
std::cerr << std::endl;
if (val)
{
std::cerr << "ftTransferModule::getChunk()";
std::cerr << " Answer: offset: " << offset;
std::cerr << " chunk_size: " << chunk_size;
std::cerr << std::endl;
}
else
{
std::cerr << "ftTransferModule::getChunk()";
std::cerr << " Answer: No Chunk Available";
std::cerr << std::endl;
}
return val;
}

View File

@ -54,6 +54,9 @@ void P3Hub::addP3Pipe(std::string id, P3Pipe *pqi, p3ConnectMgr *mgr)
void P3Hub::run()
{
std::cerr << "P3Hub::run()";
std::cerr << std::endl;
RsItem *item;
std::list<std::pair<std::string, RsItem *> > recvdQ;
std::list<std::pair<std::string, RsItem *> >::iterator lit;

View File

@ -56,7 +56,11 @@ int main(int argc, char **argv)
int i;
for(i = 0; i < 10; i++)
{
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
forum->tick();
}
@ -76,29 +80,61 @@ int testForums(p3Forums *forum)
std::string fId1 = forum->createForum(L"Forum 1", L"first forum", RS_DISTRIB_PUBLIC);
forum->tick(); /* expect group publish */
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
forum->tick();
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
std::string fId2 = forum->createForum(L"Forum 2", L"next first forum", RS_DISTRIB_PRIVATE);
forum->tick(); /* expect group publish */
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
forum->tick();
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
std::string mId1 = forum->createForumMsg(fId2, "", L"Forum 2 Msg 1", L"first forum msg");
forum->tick(); /* expect msg publish */
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
forum->tick();
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
std::string mId2 = forum->createForumMsg(fId2, "", L"Forum 2 Msg 2", L"second forum msg");
forum->tick(); /* expect msg publish */
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
forum->tick();
sleep(1);
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
}

View File

@ -222,6 +222,7 @@ void UdpLayer::recv_loop()
}
else if (status < 0)
{
std::cerr << "UdpLayer::recv_loop() ";
std::cerr << "Error: " << tounet_errno() << std::endl;
}
};

View File

@ -37,6 +37,7 @@
/****
* #define RSDIR_DEBUG 1
****/
#define RSDIR_DEBUG 1
std::string RsDirUtil::getTopDir(std::string dir)
{

View File

@ -92,8 +92,11 @@ void RsQueueThread::run()
mLastSleep = mMaxSleep;
}
}
#ifdef WIN32
Sleep(mLastSleep);
#else
usleep(1000 * mLastSleep);
#endif
}
}